|
3.4 服务库与事件库
' h3 l- Z" I$ i7 S; S本节介绍服务库和事件库。- K$ S: D3 W; c5 V
3.4.1 服务库
" B! r; b* q9 J7 S8 U: Y9 [对于生命周期较长的对象, YARN采用了基于服务的对象管理模型对其进行管理, 该模型主要有以下几个特点。
2 f1 Q9 E. Z3 Y* N) G- Q+ h❑将每个被服务化的对象分为4个状态: NOTINITED( 被创建) 、 INITED( 已初始化) 、 STARTED( 已启动) 、) u* U( K) w# @% z! y0 v9 Q
STOPPED( 已停止) 。
; Z# l9 y( \6 I+ P5 m/ g5 ~' N' t❑任何服务状态变化都可以触发另外一些动作。1 H6 m* L0 s0 J9 r. e h
❑可通过组合的方式对任意服务进行组合, 以便进行统一管理。
& x, b- n( E: Y5 S& S9 d9 X8 @YARN中关于服务模型的类图( 位于包org.apache.hadoop.service中) 如图3-13所示。 在这个图中, 我们可以看到, 所有的服务
$ h" P. z' H' O" ]& y对象最终均实现了接口Service, 它定义了最基本的服务初始化、 启动、 停止等操作, 而AbstractService类提供了一个最基本的
; s; r1 W6 n5 b1 V) nService实现。 YARN中所有对象, 如果是非组合服务, 直接继承AbstractService类即可, 否则需继承CompositeService。 比如, 对于3 A# L1 k4 w3 J" k" v4 D' s
ResourceManager而言, 它是一个组合服务, 它组合了各种服务对象, 包括ClientRMService、 ApplicationMasterLauncher、
9 |7 t" h) v5 ~9 O' GApplicationMasterService等。1 a2 s$ E& r9 ^: I# T
图3-13 YARN中服务模型的类图4 A: `( k4 d, U- D6 V7 n7 O9 \
在YARN中, ResourceManager和NodeManager属于组合服务, 它们内部包含多个单一服务和组合服务, 以实现对内部多种服" P+ Q2 J) s; A; g! j: J. V
务的统一管理。
5 l- Y$ l' A8 e4 g1 R- `3.4.2 事件库
. L' x! {8 n6 t! bYARN采用了基于事件驱动的并发模型, 该模型能够大大增强并发性, 从而提高系统整体性能。 为了构建该模型, YARN将: H' Q% Q& D" M- g) {( x
各种处理逻辑抽象成事件和对应事件调度器, 并将每类事件的处理过程分割成多个步骤, 用有限状态机表示。 YARN中的事件处4 x9 H2 L1 p3 U X' B
理模型可概括为图3-14所示。! `) G: {# K8 x4 C& c
图3-14 YARN的事件处理模型4 F0 e& l$ C. `7 a; S( z5 a3 L+ b
整个处理过程大致为: 处理请求会作为事件进入系统, 由中央异步调度器( Async-Dispatcher) 负责传递给相应事件调度器 J6 I6 a; _ Q; ]$ O
( Event Handler) 。 该事件调度器可能将该事件转发给另外一个事件调度器, 也可能交给一个带有有限状态机的事件处理器, 其5 }" v v( O$ X W2 R1 H- o @
处理结果也以事件的形式输出给中央异步调度器。 而新的事件会再次被中央异步调度器转发给下一个事件调度器, 直至处理完成7 G H& W& y* J& b
( 达到终止条件) 。
]1 i! v/ o4 L# Z在YARN中, 所有核心服务实际上都是一个中央异步调度器, 包括ResourceManager、 NodeManager、
. ?+ O7 w: r UMRAppMaster( MapReduce应用程序的ApplicationMaster) 等, 它们维护了事先注册的事件与事件处理器, 并根据接收的事件类型
& A X) |. T5 N7 Q驱动服务的运行。
& U' L1 p' {2 D" _" YYARN中事件与事件处理器类的关系( 位于包org.apache.hadoop.yarn.event中) 如图3-15所示。 当使用YARN事件库时, 通常先9 d5 O+ J' W l# I6 H5 Q. q
要定义一个中央异步调度器AsyncDispatcher, 负责事件的处理与转发, 然后根据实际业务需求定义一系列事件Event与事件处理器/ E) y$ ? O: B7 w7 a* D9 M
EventHandler, 并注册到中央异步调度器中以实现事件统一管理和调度。 以MRAppMaster为例, 它内部包含一个中央异步调度器3 F% h2 r: z, U+ O6 O
AsyncDispatcher, 并注册了TaskAttemptEvent/TaskAttemptImpl、 TaskEvent/TaskImpl、 JobEvent/JobImpl等一系列事件/事件处理器, 由
9 g* }' e& o U' y) E( L2 U中央异步调度器统一管理和调度。1 s0 `. d$ F4 }
服务化和事件驱动软件设计思想的引入, 使得YARN具有低耦合、 高内聚的特点, 各个模块只需完成各自功能, 而模块之间
8 ?6 q L* j& m% h1 Q( j则采用事件联系起来, 系统设计简单且维护方便。
( D; O$ j/ L( R- q0 F图3-15 事件与事件处理器" o% V3 k( f9 B* y
3.4.3 YARN服务库和事件库的使用方法0 P& o7 ~+ c, I/ E
为了说明YARN服务库和事件库的使用方法, 本小节介绍一个简单的实例, 该实例可看做MapReduce+ u: z- a4 J$ R4 R7 y: s
ApplicationMaster( MRAppMaster) 的简化版。 该例子涉及任务和作业两种对象的事件以及一个中央异步调度器。 步骤如下。
! O9 C! y1 S8 l: d1) 定义Task事件。 i) Q. x6 O9 u0 ?- P
public class TaskEvent extends AbstractEvent<TaskEventType> {4 F2 P2 ?! q3 N* \
private String taskID; //Task ID, T7 w% V7 \- ^1 t
public TaskEvent(String taskID, TaskEventType type) {0 h2 W; _$ G" L; T
super(type);
2 h' k4 G: l% \+ sthis.taskID = taskID;5 t+ P* c' g: m5 p
} p
' u! v; Q7 Y, X9 T6 fublic String getTaskID() {( u2 r; G! {- g2 c, v/ u6 `8 ?
return taskID;
# \. Z& t- r& Z6 J8 }( s Q+ ^$ M} 其
8 w- a, r5 Q1 ~2 x. b- U中, Task事件类型定义如下:
; M" |. S" g5 |' k! }! _public enum TaskEventType {
8 U9 a) D; `2 }5 e" gT_KILL,2 l1 Y" c7 Z* q6 H: h) O+ s
T_SCHEDULE
! h5 P) p4 f3 u) `- y j" \}8 J; S' o- }/ X9 a$ P6 F. F5 p* Z( a
2) 定义Job事件。& m' A! _5 g; H; t/ g
public class JobEvent extends AbstractEvent<JobEventType> {
! _ c1 |$ o6 d; n3 M, I6 ^private String jobID;
$ P) V1 G, M+ s: @8 O9 z8 F9 k! H tpublic JobEvent(String jobID, JobEventType type) {4 }+ }" d ^, ^1 j1 K
super(type);* r( Q- x N, a5 k8 w5 t
this.jobID = jobID;
. O6 T; l0 Y" u+ _% S} p" F$ {( Y$ o G7 H. b
ublic String getJobId() {" b8 ^) Q2 D: s1 ?+ _
return jobID;
0 I+ @$ \* I* Q5 H. J! H- T- Q}$ x6 R+ o) o) O. {" n; [' E' d$ I5 T7 ]
}
6 v3 x, u: M. ]/ e0 ^9 n z其中, Job事件类型定义如下:& G5 f' D) @- C
public enum JobEventType {
) p o3 ^- ^' i. @6 S3 ]8 S1 VJOB_KILL,
* @% C) z. h9 s: r5 n3 F0 _9 ~9 C" wJOB_INIT," b4 E4 n9 Y+ u& x5 G- f% |: @" i
JOB_START
+ _$ }$ x; g2 ]" \' q7 l}# H- D$ }4 V( _
3) 事件调度器。& W6 E4 w: |( v2 X
接下来定义一个中央异步调度器, 它接收Job和Task两种类型事件, 并交给对应的事件处理器处理, 代码如下:
$ @1 {. [$ p& b@SuppressWarnings("unchecked")
$ B+ Q0 w( G9 l+ [public class SimpleMRAppMaster extends CompositeService {$ u) X% D2 u+ p3 x4 J
private Dispatcher dispatcher; //中央异步调度器
3 L3 v/ `6 ?% t# t0 P3 ?" _% W( }private String jobID;7 Z; w- K+ t2 t
private int taskNumber; //该作业包含的任务数目8 _2 L0 ?. b# u: A! R
private String[] taskIDs; //该作业内部包含的所有任务
4 b" L: Z8 t% s+ c% H# ]) {public SimpleMRAppMaster(String name, String jobID, int taskNumber) {
, O/ s! n3 {. ^4 L' ssuper(name);. l/ }$ x; C* p8 Q0 ?
this.jobID = jobID;" J' d; w( M0 {' g- V
this.taskNumber = taskNumber;
- ~. t2 R _( C8 g/ M9 gtaskIDs = new String[taskNumber];- a, o& K$ D/ J, N6 Z/ k& f
for(int i = 0; i < taskNumber; i++) {
- D4 y) L. ]; k2 f7 ttaskIDs = new String(jobID + "_task_" + i);1 y' \+ ~4 ?( S* S
}# B7 X6 P2 H; t4 Z
} p- @8 y% @' y8 {' S7 h5 S
ublic void serviceInit(final Configuration conf) throws Exception {
' q4 Y0 z5 ?! Y- D# d3 g/ Mdispatcher = new AsyncDispatcher();//定义一个中央异步调度器
, Z1 w# h% r" T. \: [1 X- s- O//分别注册Job和Task事件调度器
! [( w1 y6 ]- D" Ddispatcher.register(JobEventType.class, new JobEventDispatcher());/ \2 q' b3 Z" _% i1 H- D% ^9 c
dispatcher.register(TaskEventType.class, new TaskEventDispatcher());- o$ x2 Q+ j- ?( t1 f& z" h1 W# \. N
addService((Service) dispatcher);
+ [" X5 b7 y, M2 B5 A2 \3 Jsuper.serviceInit(conf);
7 w1 O5 D3 T. K( X1 z6 [9 V& c} p0 ~9 @6 f f* I. g% k3 w9 ^# ~
ublic Dispatcher getDispatcher() {* b& g# R7 j f$ `2 g @) c/ Q; G
return dispatcher;
/ g) H$ Y: e& w6 S3 O8 n! ?- y5 g/ Y9 H} p* s( c$ A! D) G2 A$ h
rivate class JobEventDispatcher implements EventHandler<JobEvent> {
v/ K7 k2 |9 U5 O# Y/ D$ K- T@Override
: G9 Q! c8 i& E- U- @public void handle(JobEvent event) {
6 ~# H5 ?# R, Fif(event.getType() == JobEventType.JOB_KILL) {
! u& W; u4 s" ESystem.out.println("Receive JOB_KILL event, killing all the tasks");' u9 L2 r; L9 E8 K1 o% Z
for(int i = 0; i < taskNumber; i++) {
% p$ M0 f) T, K0 l' B$ Gdispatcher.getEventHandler().handle(new TaskEvent(taskIDs,
7 W3 B% o) K& V* ~ oTaskEventType.T_KILL));4 {3 {6 a! F8 X: u
}
- p9 H1 a! b( y$ O5 ~9 x} else if(event.getType() == JobEventType.JOB_INIT) {
0 H; @2 K' t+ LSystem.out.println("Receive JOB_INIT event, scheduling tasks");
! B, ` Q$ S: q+ Y3 K$ Z [; B# Zfor(int i = 0; i < taskNumber; i++) {+ ~* V3 Z) r% H- Q" ]
dispatcher.getEventHandler().handle(new TaskEvent(taskIDs,
% {! [! ?2 F3 F; k o. l7 QTaskEventType.T_SCHEDULE));2 C6 L& X8 Y6 `' @
}
3 G5 O3 a& Q a, @& ~}
: P* V1 \9 F1 N9 P6 l}- v. @0 z/ Q3 n: {; v/ p/ S; k
}p- v1 @/ o$ b* T/ d6 `; D
rivate class TaskEventDispatcher implements EventHandler<TaskEvent> {+ k! G, [5 M$ E7 g( T
@Override
; m5 L# p* T9 Wpublic void handle(TaskEvent event) {; R' X7 V' S' A t
if(event.getType() == TaskEventType.T_KILL) {# d0 e' ]1 O! u* V' P1 @$ m
System.out.println("Receive T_KILL event of task " + event.getTaskID());# J5 I( H- x& ~9 E/ l
} else if(event.getType() == TaskEventType.T_SCHEDULE) {) l9 Z( o* k) E1 X/ {, |
System.out.println("Receive T_SCHEDULE event of task " + event.getTaskID());
, A- N8 l& N4 b9 D+ ]. t; ^}: k* h$ c* a2 K/ W9 k3 z3 X
}: U4 F m; I2 W- e
}
' f# ^8 L( y" w8 S}
1 r( U8 e+ ?5 E4) 测试程序。1 D- Q( f2 @* P6 Z( m
@SuppressWarnings("unchecked")
4 Z0 |! n# K c0 `# g" o$ B& Ipublic class SimpleMRAppMasterTest {
2 m" h, R- `: ~- [public static void main(String[] args) throws Exception {, }1 o) N6 R0 g7 a- p+ m# j
String jobID = "job_20131215_12";
7 K7 [3 ]$ U* ]SimpleMRAppMaster appMaster = new SimpleMRAppMaster("Simple MRAppMaster", jobID, 5);/ Z! B: q/ e8 |+ Q
YarnConfiguration conf = new YarnConfiguration(new Configuration());" X9 R9 K k2 s7 S! G4 x
appMaster.serviceInit(conf);, ]/ ?! ~: _ x( J# S
appMaster.serviceStart();% V9 T' |" o( s" k/ D
appMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID,
) I3 e" ~# q; A3 rJobEventType.JOB_KILL));
; P. [" Z. I" }9 M5 F) U+ [appMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID,
1 c6 e- E9 G9 J5 r$ P( Y6 {" ?$ CJobEventType.JOB_INIT));7 f. k2 ~# K3 c6 D7 F
}: f, {1 A5 ~4 v, N
3.4.4 事件驱动带来的变化9 ?/ @7 ~$ a! o2 o1 ?7 H
在MRv1中, 对象之间的作用关系是基于函数调用实现的, 当一个对象向另外一个对象传递信息时, 会直接采用函数调用的
3 f! b- o- l$ u( g/ H6 u方式, 且整个过程是串行的。 比如, 当TaskTracker需要执行一个Task时, 将首先下载Task依赖的文件( JAR包、 二进制文件等、
, F- j9 f, q' M( J, A字典文件等) 、 然后执行Task。 同时在整个过程中会记录一些关键日志, 该过程可用图3-16描述。 在整个过程中, 下载依赖文件
! u ? T) z. u- Y0 I是阻塞式的, 也就是说, 前一个任务未完成文件下载之前, 后一个新任务将一直处于等待状态, 只有在下载完成后, 才会启动一
; R Q1 y2 R T. Z4 L个独立进程运行该任务。 尽管后来MRv1通过启动过独立线程下载文件 解决了该问题 [15] , 但这种方式不是在大系统中彻底解决
; @' |& B3 d% q问题之道, 必须引入新的编程模型。
* W7 Q9 ~* ]5 R图3-16 基于函数调用的工作流程, [$ e( m2 \* N8 z0 @
基于函数调用的编程模型是低效的, 它隐含着整个过程是串行、 同步进行的。 相比之下, MRv2引入的事件驱动编程模型则- k- r, Y. [7 V! E- `( o0 n
是一种更加高效的方式。 在基于事件驱动的编程模型中, 所有对象被抽象成了事件处理器, 而事件处理器之间通过事件相互关
. [6 H4 a9 E3 v4 E9 d, Z9 r联。 每种事件处理器处理一种类型的事件, 同时根据需要触发另外一种事件, 该过程如图3-17所示, 当A需要下载文件时, 只需) k+ Z9 B' m h' G! k/ ~& h# V
向中央异步处理器发送一个事件即可( 之后可以继续完成后面的功能而无须等待下载完成) , 该事件会被传递给对应的事件处理8 T' {1 d! \% [* @1 _
器B, 由B完成具体的下载任务。 一旦B完成下载任务, 便可以通过事件通知A。: w1 h) q& X+ s4 A2 o' G
图3-17 基于事件驱动的工作流程$ ]5 O* ?* d- Q1 S% {
相比于基于函数调用的编程模型, 这种编程方式具有异步、 并发等特点, 更加高效, 因此更适合大型分布式系统。# u4 C: p6 }1 \0 U3 Q4 G l
[15] 参见网址https://issues.apache.org/jira/browse/MAPREDUCE-2705。 , {7 W5 r' g; n+ u, V
e L# Z9 [% `( u" ^& Q+ o& ]
3 E. m; u9 `2 P |
|