|
3.4 服务库与事件库
: F9 U) M5 Z1 }+ i8 }9 E0 g本节介绍服务库和事件库。
3 N) Y) O( Y& A9 { V+ f0 [3.4.1 服务库
2 J1 n" L; b( G: C# b7 ?( e$ v9 ~对于生命周期较长的对象, YARN采用了基于服务的对象管理模型对其进行管理, 该模型主要有以下几个特点。% T8 l2 {3 _& d( r; }% c
❑将每个被服务化的对象分为4个状态: NOTINITED( 被创建) 、 INITED( 已初始化) 、 STARTED( 已启动) 、
3 }) i0 V R4 P7 X: n |STOPPED( 已停止) 。+ B. h! c, Q6 i. B5 \
❑任何服务状态变化都可以触发另外一些动作。7 c- b' r5 A2 V# X: b& ^
❑可通过组合的方式对任意服务进行组合, 以便进行统一管理。! i* C1 _% q$ o8 H6 w# |
YARN中关于服务模型的类图( 位于包org.apache.hadoop.service中) 如图3-13所示。 在这个图中, 我们可以看到, 所有的服务- t4 M1 ]. C/ j' W+ o7 j+ j
对象最终均实现了接口Service, 它定义了最基本的服务初始化、 启动、 停止等操作, 而AbstractService类提供了一个最基本的" H" R0 k, y; i6 u
Service实现。 YARN中所有对象, 如果是非组合服务, 直接继承AbstractService类即可, 否则需继承CompositeService。 比如, 对于
# ~! H0 t- a2 N m$ m7 aResourceManager而言, 它是一个组合服务, 它组合了各种服务对象, 包括ClientRMService、 ApplicationMasterLauncher、
$ j* Z* S; l5 E9 AApplicationMasterService等。% u& b3 c* e0 l2 l5 Y) y1 \
图3-13 YARN中服务模型的类图
* d2 r2 `# O }+ Q" C. M在YARN中, ResourceManager和NodeManager属于组合服务, 它们内部包含多个单一服务和组合服务, 以实现对内部多种服
# M. Z; C$ N2 p5 F5 J7 ~) I务的统一管理。, {! `- K* o& ^4 K5 }
3.4.2 事件库 Z' [& O: ] c/ U3 z
YARN采用了基于事件驱动的并发模型, 该模型能够大大增强并发性, 从而提高系统整体性能。 为了构建该模型, YARN将
2 y' f1 R) _3 v1 n各种处理逻辑抽象成事件和对应事件调度器, 并将每类事件的处理过程分割成多个步骤, 用有限状态机表示。 YARN中的事件处
. a) D& W- Y2 l3 q3 d理模型可概括为图3-14所示。
' r3 G D$ A: s7 F! W8 X9 e4 z图3-14 YARN的事件处理模型! D4 P* Z0 w) Q, T% R* N( c
整个处理过程大致为: 处理请求会作为事件进入系统, 由中央异步调度器( Async-Dispatcher) 负责传递给相应事件调度器
: z9 s) g2 ?+ }0 n( Event Handler) 。 该事件调度器可能将该事件转发给另外一个事件调度器, 也可能交给一个带有有限状态机的事件处理器, 其 E$ D9 O8 s# w5 v, ]
处理结果也以事件的形式输出给中央异步调度器。 而新的事件会再次被中央异步调度器转发给下一个事件调度器, 直至处理完成
[: [3 Q0 m5 G- P4 y( 达到终止条件) 。; A$ n, ^* g! z6 C
在YARN中, 所有核心服务实际上都是一个中央异步调度器, 包括ResourceManager、 NodeManager、0 u! I% D7 o8 {
MRAppMaster( MapReduce应用程序的ApplicationMaster) 等, 它们维护了事先注册的事件与事件处理器, 并根据接收的事件类型
% n O7 q& s% z3 p9 A& k! [( w n n驱动服务的运行。9 z0 F& Z7 w7 c2 p( b
YARN中事件与事件处理器类的关系( 位于包org.apache.hadoop.yarn.event中) 如图3-15所示。 当使用YARN事件库时, 通常先! d) u9 D4 e# Z( L
要定义一个中央异步调度器AsyncDispatcher, 负责事件的处理与转发, 然后根据实际业务需求定义一系列事件Event与事件处理器
K+ k7 I( ? @+ l0 YEventHandler, 并注册到中央异步调度器中以实现事件统一管理和调度。 以MRAppMaster为例, 它内部包含一个中央异步调度器
4 v, j( o1 c& xAsyncDispatcher, 并注册了TaskAttemptEvent/TaskAttemptImpl、 TaskEvent/TaskImpl、 JobEvent/JobImpl等一系列事件/事件处理器, 由/ F& P# }. n9 S- C0 D3 w9 J
中央异步调度器统一管理和调度。% Q5 A* L$ V K, v
服务化和事件驱动软件设计思想的引入, 使得YARN具有低耦合、 高内聚的特点, 各个模块只需完成各自功能, 而模块之间! e1 E: p# g9 D" R( G- s
则采用事件联系起来, 系统设计简单且维护方便。
( j% r0 l; f: w5 u5 V( }; i图3-15 事件与事件处理器. B. q& i& K0 L7 v
3.4.3 YARN服务库和事件库的使用方法
# Z( D( s0 x& I1 T' u& r' B7 K) U为了说明YARN服务库和事件库的使用方法, 本小节介绍一个简单的实例, 该实例可看做MapReduce4 F Y) x- r! A: p
ApplicationMaster( MRAppMaster) 的简化版。 该例子涉及任务和作业两种对象的事件以及一个中央异步调度器。 步骤如下。
+ l$ f+ e+ j D5 J. l$ ~. H1) 定义Task事件。0 J3 o5 ?; l3 G8 d5 c* Q) G+ e
public class TaskEvent extends AbstractEvent<TaskEventType> {% E( Y( K# f7 f) N% e
private String taskID; //Task ID: n' N* T. p% T/ l# ]
public TaskEvent(String taskID, TaskEventType type) {
$ ^3 [- @1 ~/ {/ w+ ]! u* q; msuper(type);
7 [9 r" f2 X# ~7 s: j2 ~/ Hthis.taskID = taskID;7 V0 \# `3 m9 v7 Z9 b2 n9 `9 j5 n8 y4 Z
} p
8 e0 t# ?3 z9 ` Uublic String getTaskID() {
2 w3 V: F# F4 ^2 c6 Ereturn taskID;
" ?( h& E1 q1 ^3 B) B2 x$ ~ k} 其
5 [' c. z Q4 s. ^2 V中, Task事件类型定义如下:
- m7 l! V/ n8 |" i3 g* H3 y( {3 g+ ~public enum TaskEventType {' q! L% l/ f6 |
T_KILL,
/ L/ C+ P, o5 Y, {0 ?, MT_SCHEDULE
% z8 O0 L) m$ ^( [8 ?1 G( ~}( ~) B- V/ `/ C" ^* |; |- q
2) 定义Job事件。- q7 z" J! n, P1 Q" H
public class JobEvent extends AbstractEvent<JobEventType> {7 T8 z' }' @, W5 Q
private String jobID;
! {" V# }& n0 D/ j! t' p! Npublic JobEvent(String jobID, JobEventType type) {
* D& ~! t1 j( X* Osuper(type);9 @- e/ [- P* ]2 ~: A, K; o
this.jobID = jobID;
# v6 ^! C9 E E: v s' B) G. V} p
5 s/ `3 S$ H( q: K; I. C* L" wublic String getJobId() {$ J( g3 d) O" F# z, |* J
return jobID;
3 j/ [ t% c6 T: p3 M}
1 v/ b) N a: @( A. [$ m}2 r+ u# ^0 A0 J2 b
其中, Job事件类型定义如下:
+ R8 y2 c, ^3 W, O- Ipublic enum JobEventType {& f( @# N( M: U. G! V0 n
JOB_KILL,, Z, W, L" ~8 x) z5 \$ P2 b
JOB_INIT,
6 b3 p8 r: q7 V& }JOB_START
- P! K0 g! ^" N8 Z}
$ {; M+ m8 G& l3) 事件调度器。
. I2 @( }$ Y" P l, R& r) C* G$ M6 u接下来定义一个中央异步调度器, 它接收Job和Task两种类型事件, 并交给对应的事件处理器处理, 代码如下:
; q6 s2 E. |# c6 q@SuppressWarnings("unchecked")$ M3 B3 L; R$ T9 j5 _+ R# S
public class SimpleMRAppMaster extends CompositeService {% }; K! X8 { `# Z, v
private Dispatcher dispatcher; //中央异步调度器6 | V/ W+ Y t& r0 g5 e8 f
private String jobID;
* Z) W& Z3 q4 f& Dprivate int taskNumber; //该作业包含的任务数目" {% S) l+ n4 g9 w
private String[] taskIDs; //该作业内部包含的所有任务8 ^' K- R! |- h j0 _* f
public SimpleMRAppMaster(String name, String jobID, int taskNumber) {) j+ x, d1 [, @ v1 q$ b3 a" f
super(name);
- d% h* T$ h8 L! e- I; z. H8 w& g0 \$ {this.jobID = jobID;
* W' O; U& l% h* S0 q3 w& Wthis.taskNumber = taskNumber;
; T3 E* u: j1 Y# UtaskIDs = new String[taskNumber];7 Q5 Y x; ^( a+ t6 f. W
for(int i = 0; i < taskNumber; i++) {2 U( N+ I# f+ r g. I' e
taskIDs = new String(jobID + "_task_" + i);5 y! q! M9 Y1 M& N+ ^
}
& w$ o7 k9 h6 k0 b: e} p0 k5 H9 `. C7 }3 K$ |5 i/ ^' ~
ublic void serviceInit(final Configuration conf) throws Exception {0 x& }) {" x2 A- S+ e! `. g
dispatcher = new AsyncDispatcher();//定义一个中央异步调度器- Z6 t- h( z9 h% @
//分别注册Job和Task事件调度器
. {0 }+ W7 p7 Rdispatcher.register(JobEventType.class, new JobEventDispatcher());
. g8 |+ J- |% X0 e! hdispatcher.register(TaskEventType.class, new TaskEventDispatcher());6 Y( {& O$ y |3 v
addService((Service) dispatcher);
) j. ~8 [) C$ Q( K2 Zsuper.serviceInit(conf);
" D+ g- F4 M9 K6 Y$ ?% ~& F} p; x- \7 W# z: e6 e2 `% {3 l& x* N
ublic Dispatcher getDispatcher() {
& v/ [2 x: o% L( yreturn dispatcher;' E% i$ f8 |+ n9 ~5 T
} p
* N0 B" {: T4 g) ~0 ^rivate class JobEventDispatcher implements EventHandler<JobEvent> {, L1 f7 c/ M3 m7 M+ l9 o2 Z
@Override
' Y& s, {4 Z' r2 d5 Epublic void handle(JobEvent event) {0 J' g. N# P( h! o
if(event.getType() == JobEventType.JOB_KILL) {+ t* N$ M# z Y8 X# w: d
System.out.println("Receive JOB_KILL event, killing all the tasks"); F. G9 f6 \6 t" n' S
for(int i = 0; i < taskNumber; i++) {2 V& Y: ]' h& U8 d* B
dispatcher.getEventHandler().handle(new TaskEvent(taskIDs,
+ i* o# \4 t' p- V- }$ r8 kTaskEventType.T_KILL));
1 z: C; q* Z' O- x}& u% j6 A. G5 n) T7 d: A
} else if(event.getType() == JobEventType.JOB_INIT) {6 N* Q/ S6 Q( R: J/ {5 I
System.out.println("Receive JOB_INIT event, scheduling tasks");9 W% g1 }' m9 U3 y" |( P
for(int i = 0; i < taskNumber; i++) {, F5 x* [5 I: |- i3 _2 k5 C+ Z
dispatcher.getEventHandler().handle(new TaskEvent(taskIDs,' M. n! U( K- v D+ f9 J: }
TaskEventType.T_SCHEDULE));
0 B% r: Y" Y( {" m: R}
( p* g- p, B) }}
5 \. |! ]1 A' T2 l- q* `% B9 L}% X% b g- l0 x6 e' O0 c
}p/ z( ~8 B, V( ]/ p
rivate class TaskEventDispatcher implements EventHandler<TaskEvent> {0 m0 p" Z7 ?. j; W4 j
@Override
( o2 \. s! Q8 d/ a6 u* B0 Ypublic void handle(TaskEvent event) {6 @9 p- Q8 _1 X
if(event.getType() == TaskEventType.T_KILL) {5 {2 n: }* F+ C
System.out.println("Receive T_KILL event of task " + event.getTaskID());% ?2 ?$ \9 m5 j6 a) a, l; c
} else if(event.getType() == TaskEventType.T_SCHEDULE) {
; P( f0 B6 W# E4 ISystem.out.println("Receive T_SCHEDULE event of task " + event.getTaskID());! c/ w9 s0 a1 J. G2 b6 [* z
}4 G& v0 E" T! h$ Q! I# h* t
}! }, |6 ?8 T; c; @( }( r2 k4 f- B
}% `( C* p$ g* J s; y! I
}" C: `# b: I+ L, u( e, l2 _0 k
4) 测试程序。
4 V( a: }# g* ^2 M# }; h* s7 C, ]@SuppressWarnings("unchecked")5 ]/ y$ O6 G, l8 e( b D3 j
public class SimpleMRAppMasterTest {
0 a9 A: }. ^- S% P/ G' Y1 Vpublic static void main(String[] args) throws Exception {
" x$ A6 |, \6 O# T, _String jobID = "job_20131215_12";
% G0 p' q% }* HSimpleMRAppMaster appMaster = new SimpleMRAppMaster("Simple MRAppMaster", jobID, 5);) U% D' I4 |, e) D* E7 u
YarnConfiguration conf = new YarnConfiguration(new Configuration());
, N' Q; U% z @8 VappMaster.serviceInit(conf);
7 ?& O2 r) g. Z% v2 {4 {appMaster.serviceStart();
( n% c; h$ @0 {" happMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID,2 _1 e/ |( E( t2 ]7 u4 g9 t
JobEventType.JOB_KILL));9 r* _ E7 ]6 J8 a
appMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID,
& i. J0 J- j8 {# b9 u1 K8 V3 vJobEventType.JOB_INIT));
6 T4 ~( ~9 P/ B- s/ H, G! m/ P$ y% [}* u+ x2 m1 q$ @( n
3.4.4 事件驱动带来的变化
8 F! }6 `( S, O在MRv1中, 对象之间的作用关系是基于函数调用实现的, 当一个对象向另外一个对象传递信息时, 会直接采用函数调用的' p6 n5 b \$ c! i: F
方式, 且整个过程是串行的。 比如, 当TaskTracker需要执行一个Task时, 将首先下载Task依赖的文件( JAR包、 二进制文件等、
" S2 e9 z7 \) E6 j9 Z' Y字典文件等) 、 然后执行Task。 同时在整个过程中会记录一些关键日志, 该过程可用图3-16描述。 在整个过程中, 下载依赖文件& w: L5 u$ x, I$ E
是阻塞式的, 也就是说, 前一个任务未完成文件下载之前, 后一个新任务将一直处于等待状态, 只有在下载完成后, 才会启动一; c" [# [" j* i8 H, l9 x
个独立进程运行该任务。 尽管后来MRv1通过启动过独立线程下载文件 解决了该问题 [15] , 但这种方式不是在大系统中彻底解决4 T3 Z5 G3 Q& P+ P3 x9 r
问题之道, 必须引入新的编程模型。
# A; Y H7 f1 X6 V/ W图3-16 基于函数调用的工作流程' Z3 m3 ^- b: n3 m
基于函数调用的编程模型是低效的, 它隐含着整个过程是串行、 同步进行的。 相比之下, MRv2引入的事件驱动编程模型则 U' Q; V K! v' g& k4 s% e4 g, y5 ]
是一种更加高效的方式。 在基于事件驱动的编程模型中, 所有对象被抽象成了事件处理器, 而事件处理器之间通过事件相互关
- A$ Q" a' i5 k; k& j联。 每种事件处理器处理一种类型的事件, 同时根据需要触发另外一种事件, 该过程如图3-17所示, 当A需要下载文件时, 只需
* s V1 |7 F/ o向中央异步处理器发送一个事件即可( 之后可以继续完成后面的功能而无须等待下载完成) , 该事件会被传递给对应的事件处理
1 M7 \/ X0 y& v- ?* G& a& G器B, 由B完成具体的下载任务。 一旦B完成下载任务, 便可以通过事件通知A。7 K# s" M; {; P9 E9 ~
图3-17 基于事件驱动的工作流程- Q! S$ o9 P! F" j2 O! ?
相比于基于函数调用的编程模型, 这种编程方式具有异步、 并发等特点, 更加高效, 因此更适合大型分布式系统。
( T4 R* X. f1 d" k[15] 参见网址https://issues.apache.org/jira/browse/MAPREDUCE-2705。
* S" x4 y6 \6 m; q8 I( t( [
@1 |" O1 D2 E& i7 ~0 I/ a$ D
/ @/ D8 P, h+ g% n. }8 d0 C |
|