|
3.4 服务库与事件库& q0 g7 T8 f1 s1 D4 \" x' M/ E }% K5 C
本节介绍服务库和事件库。$ X0 |9 s# a- b! ~4 g
3.4.1 服务库
: P5 H' A4 u, E$ z# z3 ^" ^' g对于生命周期较长的对象, YARN采用了基于服务的对象管理模型对其进行管理, 该模型主要有以下几个特点。& N- \2 i( C0 X
❑将每个被服务化的对象分为4个状态: NOTINITED( 被创建) 、 INITED( 已初始化) 、 STARTED( 已启动) 、* l [! g8 T& F' Z4 N4 @6 ~% |8 P) _
STOPPED( 已停止) 。
6 P" l% P j( R Q9 ]❑任何服务状态变化都可以触发另外一些动作。2 L' K! X/ H2 ^; F
❑可通过组合的方式对任意服务进行组合, 以便进行统一管理。- T- T% l/ r, e; M) b
YARN中关于服务模型的类图( 位于包org.apache.hadoop.service中) 如图3-13所示。 在这个图中, 我们可以看到, 所有的服务
+ U, z! u9 e& X" S b s/ |对象最终均实现了接口Service, 它定义了最基本的服务初始化、 启动、 停止等操作, 而AbstractService类提供了一个最基本的
0 X/ t* R" [9 b( h* Z5 h: J" aService实现。 YARN中所有对象, 如果是非组合服务, 直接继承AbstractService类即可, 否则需继承CompositeService。 比如, 对于9 F) C4 c, \7 ^8 @
ResourceManager而言, 它是一个组合服务, 它组合了各种服务对象, 包括ClientRMService、 ApplicationMasterLauncher、; ]" `. l7 |6 B+ l, {
ApplicationMasterService等。
" t% |( T: v7 D. e图3-13 YARN中服务模型的类图
9 v2 N3 @$ \2 j* V在YARN中, ResourceManager和NodeManager属于组合服务, 它们内部包含多个单一服务和组合服务, 以实现对内部多种服
, R( C# G# b Q% o6 E务的统一管理。
9 E& C! z" F' N; b3.4.2 事件库, N6 U( K% E# z( b y) J5 V0 {/ ^& l7 f
YARN采用了基于事件驱动的并发模型, 该模型能够大大增强并发性, 从而提高系统整体性能。 为了构建该模型, YARN将
5 }9 d; y6 Y" ?) {各种处理逻辑抽象成事件和对应事件调度器, 并将每类事件的处理过程分割成多个步骤, 用有限状态机表示。 YARN中的事件处
" U& p2 T( P+ B' a理模型可概括为图3-14所示。
2 A% t' P/ h$ v( R: m. F' ^图3-14 YARN的事件处理模型
8 a" M& y, Y! K4 E* } ], V; {整个处理过程大致为: 处理请求会作为事件进入系统, 由中央异步调度器( Async-Dispatcher) 负责传递给相应事件调度器
: M1 Z7 {9 e" @+ l* u6 D( Event Handler) 。 该事件调度器可能将该事件转发给另外一个事件调度器, 也可能交给一个带有有限状态机的事件处理器, 其# h. a& \. Z$ t: f/ B# d
处理结果也以事件的形式输出给中央异步调度器。 而新的事件会再次被中央异步调度器转发给下一个事件调度器, 直至处理完成% K6 [, l% \$ R0 v& l1 H8 D6 a0 Q
( 达到终止条件) 。. O! m+ `# f' R* d/ s. h5 g
在YARN中, 所有核心服务实际上都是一个中央异步调度器, 包括ResourceManager、 NodeManager、
( p; @. t6 |2 [& h7 d- W4 ZMRAppMaster( MapReduce应用程序的ApplicationMaster) 等, 它们维护了事先注册的事件与事件处理器, 并根据接收的事件类型
# j+ y1 M3 [9 [& [4 W9 Y驱动服务的运行。
( f' D& k# V. \3 y" `3 RYARN中事件与事件处理器类的关系( 位于包org.apache.hadoop.yarn.event中) 如图3-15所示。 当使用YARN事件库时, 通常先8 h6 b+ T* ^3 c: b& B& _7 E p
要定义一个中央异步调度器AsyncDispatcher, 负责事件的处理与转发, 然后根据实际业务需求定义一系列事件Event与事件处理器
. B( J: w! r; EEventHandler, 并注册到中央异步调度器中以实现事件统一管理和调度。 以MRAppMaster为例, 它内部包含一个中央异步调度器
% v# G: ^5 D0 p. |: l7 n1 O2 ]AsyncDispatcher, 并注册了TaskAttemptEvent/TaskAttemptImpl、 TaskEvent/TaskImpl、 JobEvent/JobImpl等一系列事件/事件处理器, 由* W7 @8 \ r8 m
中央异步调度器统一管理和调度。
1 x& V* |; J9 l& A2 E0 o* I9 I% W服务化和事件驱动软件设计思想的引入, 使得YARN具有低耦合、 高内聚的特点, 各个模块只需完成各自功能, 而模块之间: [: g' m4 W! l: l4 ~
则采用事件联系起来, 系统设计简单且维护方便。0 V9 t/ J9 ?- j1 X8 V
图3-15 事件与事件处理器% j( f! P& b g2 R+ T$ Z8 I$ o
3.4.3 YARN服务库和事件库的使用方法9 L6 K, ?% Y+ T$ d$ L. `
为了说明YARN服务库和事件库的使用方法, 本小节介绍一个简单的实例, 该实例可看做MapReduce" B4 b! {- H1 W! n4 N3 s$ Z, o6 ^6 X
ApplicationMaster( MRAppMaster) 的简化版。 该例子涉及任务和作业两种对象的事件以及一个中央异步调度器。 步骤如下。
/ l; u$ ]; Z, t8 e5 l: T1) 定义Task事件。
8 P) i/ X/ v, Xpublic class TaskEvent extends AbstractEvent<TaskEventType> {
8 d; a- { Z5 zprivate String taskID; //Task ID9 z/ ?# Z( _( p9 f( O7 q4 ?
public TaskEvent(String taskID, TaskEventType type) {
4 B: }6 x5 @5 d6 j N3 A) b& b8 J$ msuper(type);9 Y& j4 R( e( m# ?% w% k
this.taskID = taskID;/ d% G8 }4 M9 k# L% f. p- I% Z' z
} p2 X9 Q! I2 F" K
ublic String getTaskID() {& D, ^" U) p+ o" N; z9 N! @% S
return taskID;- Q& q! G( C4 @4 Q, S6 U; `* O/ e
} 其; P- w3 d( c3 a1 ]+ \: \/ S- C
中, Task事件类型定义如下:- T: l/ T: L) |
public enum TaskEventType {* A- |, r1 j p
T_KILL,
, c/ O3 F! A. T2 a: B& u- }# K. f0 D6 a6 B7 QT_SCHEDULE8 X8 C, }7 _% R% X, U. F; w/ X3 A
}& r" a. ]2 r1 S( R; v6 ~9 O; b# U, P _' j
2) 定义Job事件。
) e; a, b, S) i0 epublic class JobEvent extends AbstractEvent<JobEventType> {
' D: Z8 I' H. fprivate String jobID;
1 Q! |8 L3 _9 X% \& _public JobEvent(String jobID, JobEventType type) {( `* B) s9 t9 h" h2 F
super(type);
8 P+ v3 `) d1 B1 c7 othis.jobID = jobID;
. k4 Z; j' K# W ` X} p, \- P0 x; Y3 c5 Y w( V; {
ublic String getJobId() {& u' N4 r) ^4 o. W9 E) h
return jobID;
. B O9 X/ R; J# { {: v}
, P, Q- M6 L* I% }: l1 P2 q" x}& a: d6 H: `1 c N, p8 `
其中, Job事件类型定义如下:4 m% k/ x# r" x
public enum JobEventType {
1 i8 _; c" h) G ^( EJOB_KILL,
' y3 |. B6 `3 Q4 |3 Z, {! FJOB_INIT,
% z2 z3 }) W6 ^. f. D2 Z5 YJOB_START$ w2 U5 e5 e% ?0 ]! Z# i9 _
}; A l! ^1 f% P5 p2 X
3) 事件调度器。
' h: I* H! B0 L( u* z* U接下来定义一个中央异步调度器, 它接收Job和Task两种类型事件, 并交给对应的事件处理器处理, 代码如下:
J- f, y7 K7 g8 g2 f. l, h@SuppressWarnings("unchecked")
1 G, \; l# x1 d: }public class SimpleMRAppMaster extends CompositeService {
8 u9 H$ u2 I- b' p6 q v9 R3 x. b; Aprivate Dispatcher dispatcher; //中央异步调度器
8 h0 p4 o6 w1 r% vprivate String jobID;
! w3 T2 |5 J3 U& Y: [private int taskNumber; //该作业包含的任务数目0 h) q1 W4 X+ b
private String[] taskIDs; //该作业内部包含的所有任务
* y- z9 \9 H6 S) E. c( ]& lpublic SimpleMRAppMaster(String name, String jobID, int taskNumber) {
( o3 T* P* ?1 \6 P1 i, C, I0 Fsuper(name);
v" |: t* ]7 Fthis.jobID = jobID; \" E% g/ D4 L0 A, M4 _
this.taskNumber = taskNumber;
8 Q! M# a* Y6 f3 B5 e3 f. x# FtaskIDs = new String[taskNumber];" m% w* F/ @2 ?8 ]: r" A6 m7 M% F
for(int i = 0; i < taskNumber; i++) {
( l5 O. E) B) T! y' w4 LtaskIDs = new String(jobID + "_task_" + i);
6 U" R8 j% R. x& c5 a5 m$ v} s) W. M& [ P9 K' t9 A9 U
} p6 A' r. S6 h: Z* T" B/ ~) z+ l
ublic void serviceInit(final Configuration conf) throws Exception {" U( _7 S+ f. c5 A/ i1 u
dispatcher = new AsyncDispatcher();//定义一个中央异步调度器6 w* G$ X0 }( d% S
//分别注册Job和Task事件调度器" A8 w) r7 S- t4 [. C4 K0 ]7 U
dispatcher.register(JobEventType.class, new JobEventDispatcher());
5 c" l m( L6 G; {& ydispatcher.register(TaskEventType.class, new TaskEventDispatcher());6 a2 z/ V0 k ~
addService((Service) dispatcher);! c7 y4 U) f+ y# g
super.serviceInit(conf); ]4 U/ u# c+ V
} p9 K/ l3 F" C) C2 }2 h% U" s* Q
ublic Dispatcher getDispatcher() {: i# d' q& z4 M O5 V
return dispatcher;' S8 @4 B& ]3 I$ z
} p
5 @5 D9 N" n" b8 T" c- \rivate class JobEventDispatcher implements EventHandler<JobEvent> {* _* b: ]0 g/ z) c
@Override" q9 {/ p { E7 g, K
public void handle(JobEvent event) {
8 k3 F' l" P9 k) B! C) Zif(event.getType() == JobEventType.JOB_KILL) {
& a( O; \- W& e1 A/ ?; QSystem.out.println("Receive JOB_KILL event, killing all the tasks");* B; s$ M/ |/ l' b( |
for(int i = 0; i < taskNumber; i++) {
& g! j; ^1 z9 X. i8 G: Udispatcher.getEventHandler().handle(new TaskEvent(taskIDs,5 L4 h7 Y9 P8 t6 j- e7 F9 d1 p0 O
TaskEventType.T_KILL)); s; q: ~3 M/ R
}
) Q/ g) F' X7 U9 j0 f} else if(event.getType() == JobEventType.JOB_INIT) {
) Y- b4 I' [4 b2 jSystem.out.println("Receive JOB_INIT event, scheduling tasks");
3 l- J, `, J* x4 W; nfor(int i = 0; i < taskNumber; i++) {; e- Y# j, i6 ?% ? G! L9 n
dispatcher.getEventHandler().handle(new TaskEvent(taskIDs,6 `# d; Y( |* Z4 Q" [
TaskEventType.T_SCHEDULE)); k* ^" [- L" K$ a
}& I l+ z! w \4 }7 w- e1 o+ M
}
9 P( h) f/ T$ S/ q, w4 r& y0 p}
% S" m: d- ?0 f2 z% S/ Q: z}p
@, P/ b P& Q5 A7 Nrivate class TaskEventDispatcher implements EventHandler<TaskEvent> {0 B: `- [. U- Y( P$ [% p" x
@Override6 [4 K7 |- V+ H
public void handle(TaskEvent event) {
1 ]& r0 k, J* z' d1 m( {/ U; k9 fif(event.getType() == TaskEventType.T_KILL) {: c8 y" _6 o& W/ |2 s
System.out.println("Receive T_KILL event of task " + event.getTaskID());; M* ?7 e' s0 Q3 t6 X! s9 x$ Q
} else if(event.getType() == TaskEventType.T_SCHEDULE) {
. r; I# J* s4 i" W( ], JSystem.out.println("Receive T_SCHEDULE event of task " + event.getTaskID());
) ?; l$ J# l! w0 A/ w0 o& `}+ B, X. a! K9 b' w7 s. {) C0 J
}* \9 w8 x" N9 K3 \" T
}
+ {- P8 O: O d6 R0 F+ q}
j) ~5 R. x# Y" ?4) 测试程序。2 \- h% q9 \5 s. l1 t7 @) v( v
@SuppressWarnings("unchecked")
* ~8 P8 I- u0 A; O9 ~7 Npublic class SimpleMRAppMasterTest {
* ~- i# v6 ]( L, s3 b& Cpublic static void main(String[] args) throws Exception {
1 b. J5 S, z* P" ]% r6 yString jobID = "job_20131215_12";, K: s9 y' s" k8 T
SimpleMRAppMaster appMaster = new SimpleMRAppMaster("Simple MRAppMaster", jobID, 5);& ` \/ W/ K4 N& U$ R7 y. e
YarnConfiguration conf = new YarnConfiguration(new Configuration());
- g! l L" G- VappMaster.serviceInit(conf);! J3 q- C2 d6 o; Y" F. O# a
appMaster.serviceStart();
3 b/ C( y3 | x, e& M1 f! WappMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID,) F }8 G- g* P" {# {
JobEventType.JOB_KILL));
6 B/ w! J/ g) m! x* \appMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID,. n4 M1 J$ S" P) t
JobEventType.JOB_INIT));
0 h( f" {* ~8 [4 j# A}
; B, L- _ m3 q3 Z6 j2 W5 \; O3.4.4 事件驱动带来的变化 [+ |3 P6 K# j0 O* x# n4 U9 p
在MRv1中, 对象之间的作用关系是基于函数调用实现的, 当一个对象向另外一个对象传递信息时, 会直接采用函数调用的1 P6 f" H0 S6 f# e
方式, 且整个过程是串行的。 比如, 当TaskTracker需要执行一个Task时, 将首先下载Task依赖的文件( JAR包、 二进制文件等、
% c$ [' x4 F# A字典文件等) 、 然后执行Task。 同时在整个过程中会记录一些关键日志, 该过程可用图3-16描述。 在整个过程中, 下载依赖文件
! P( S0 z" E2 {0 |& F7 @! U# m是阻塞式的, 也就是说, 前一个任务未完成文件下载之前, 后一个新任务将一直处于等待状态, 只有在下载完成后, 才会启动一/ t, ]& C! _7 U: ]
个独立进程运行该任务。 尽管后来MRv1通过启动过独立线程下载文件 解决了该问题 [15] , 但这种方式不是在大系统中彻底解决
' T# \0 Y' p8 u$ m! w+ e问题之道, 必须引入新的编程模型。
' U: I6 ~' z* i( ?图3-16 基于函数调用的工作流程0 W) \/ ]# \5 a: Z- g& e2 e7 l2 f
基于函数调用的编程模型是低效的, 它隐含着整个过程是串行、 同步进行的。 相比之下, MRv2引入的事件驱动编程模型则
3 v9 q9 r; i5 {4 x O5 V是一种更加高效的方式。 在基于事件驱动的编程模型中, 所有对象被抽象成了事件处理器, 而事件处理器之间通过事件相互关/ [4 J, e+ x( ]+ R
联。 每种事件处理器处理一种类型的事件, 同时根据需要触发另外一种事件, 该过程如图3-17所示, 当A需要下载文件时, 只需9 e+ ^/ t; a) X, G' Z
向中央异步处理器发送一个事件即可( 之后可以继续完成后面的功能而无须等待下载完成) , 该事件会被传递给对应的事件处理- O, R: P. ]! N. u# ?* k
器B, 由B完成具体的下载任务。 一旦B完成下载任务, 便可以通过事件通知A。
: Z. ]" L* H" L图3-17 基于事件驱动的工作流程
$ l X1 c) g$ o- m相比于基于函数调用的编程模型, 这种编程方式具有异步、 并发等特点, 更加高效, 因此更适合大型分布式系统。
, g$ B4 ~' \& m6 X& E4 ^& Z[15] 参见网址https://issues.apache.org/jira/browse/MAPREDUCE-2705。 5 x6 N* F9 v" L% ^
" R8 d% d+ {/ P9 \( ?' ^6 x7 I w3 \' Y. P9 U$ S
|
|