|
3.4 服务库与事件库
# t$ Y7 x3 s, J5 p# i% G' S4 _ N本节介绍服务库和事件库。- K% D1 F. B( P
3.4.1 服务库7 |' a! U" U/ r3 D6 y# t3 m, m
对于生命周期较长的对象, YARN采用了基于服务的对象管理模型对其进行管理, 该模型主要有以下几个特点。/ i# \- p' w' o; V
❑将每个被服务化的对象分为4个状态: NOTINITED( 被创建) 、 INITED( 已初始化) 、 STARTED( 已启动) 、, o" l7 E( G7 @, B
STOPPED( 已停止) 。' B6 u8 Q: M5 M! e- w
❑任何服务状态变化都可以触发另外一些动作。
+ J7 P$ z6 }. K* @$ @. P4 c❑可通过组合的方式对任意服务进行组合, 以便进行统一管理。8 ?3 s4 T. o! O( @9 d. w. L
YARN中关于服务模型的类图( 位于包org.apache.hadoop.service中) 如图3-13所示。 在这个图中, 我们可以看到, 所有的服务
3 R0 x- j2 e, y: P% e对象最终均实现了接口Service, 它定义了最基本的服务初始化、 启动、 停止等操作, 而AbstractService类提供了一个最基本的) c: M0 p. x" q9 M4 h! ^
Service实现。 YARN中所有对象, 如果是非组合服务, 直接继承AbstractService类即可, 否则需继承CompositeService。 比如, 对于2 w( j/ t1 I$ V/ d
ResourceManager而言, 它是一个组合服务, 它组合了各种服务对象, 包括ClientRMService、 ApplicationMasterLauncher、1 o: B% T* X" J8 q8 z$ S8 ?3 i V
ApplicationMasterService等。
! x3 q2 E' U) V, r* c" M/ X; t8 p$ d4 m图3-13 YARN中服务模型的类图1 J/ M D" a5 J* S- e: u
在YARN中, ResourceManager和NodeManager属于组合服务, 它们内部包含多个单一服务和组合服务, 以实现对内部多种服2 Y( ^& F# O+ Q" K8 l
务的统一管理。
/ i' A8 e1 Y5 K4 n$ L3.4.2 事件库) L4 _1 _/ P0 b% }+ \. e$ }( K
YARN采用了基于事件驱动的并发模型, 该模型能够大大增强并发性, 从而提高系统整体性能。 为了构建该模型, YARN将7 p. |+ l2 g$ s3 N
各种处理逻辑抽象成事件和对应事件调度器, 并将每类事件的处理过程分割成多个步骤, 用有限状态机表示。 YARN中的事件处
3 s2 [* u, D9 e d# ^理模型可概括为图3-14所示。
! z& H+ F- ^: K图3-14 YARN的事件处理模型9 z( c) ?/ m+ _
整个处理过程大致为: 处理请求会作为事件进入系统, 由中央异步调度器( Async-Dispatcher) 负责传递给相应事件调度器3 R( j8 i2 L+ s6 u8 O/ I& ^& }, Q
( Event Handler) 。 该事件调度器可能将该事件转发给另外一个事件调度器, 也可能交给一个带有有限状态机的事件处理器, 其
6 M; s- \& }* m1 D. L: \处理结果也以事件的形式输出给中央异步调度器。 而新的事件会再次被中央异步调度器转发给下一个事件调度器, 直至处理完成9 i9 o9 R1 l( _- T& S' v9 G- X
( 达到终止条件) 。( l& s2 N' R, t8 E
在YARN中, 所有核心服务实际上都是一个中央异步调度器, 包括ResourceManager、 NodeManager、
+ d$ F" L, k0 Q8 X, H, Y0 ZMRAppMaster( MapReduce应用程序的ApplicationMaster) 等, 它们维护了事先注册的事件与事件处理器, 并根据接收的事件类型7 s9 l, }/ r0 @: w9 e( H4 s5 o! }
驱动服务的运行。; C9 d3 o: Z; k. |& {4 Y: R
YARN中事件与事件处理器类的关系( 位于包org.apache.hadoop.yarn.event中) 如图3-15所示。 当使用YARN事件库时, 通常先
* y9 }6 S, l, u, O8 v; e& Y H; W1 M1 q要定义一个中央异步调度器AsyncDispatcher, 负责事件的处理与转发, 然后根据实际业务需求定义一系列事件Event与事件处理器
+ }& e: L5 c1 ^0 SEventHandler, 并注册到中央异步调度器中以实现事件统一管理和调度。 以MRAppMaster为例, 它内部包含一个中央异步调度器( K; K# V6 d5 l9 `: e
AsyncDispatcher, 并注册了TaskAttemptEvent/TaskAttemptImpl、 TaskEvent/TaskImpl、 JobEvent/JobImpl等一系列事件/事件处理器, 由; v: _5 H! U; x; X! l
中央异步调度器统一管理和调度。- j0 ?4 i* j8 ?/ F; p
服务化和事件驱动软件设计思想的引入, 使得YARN具有低耦合、 高内聚的特点, 各个模块只需完成各自功能, 而模块之间
j4 C2 s" F5 `8 y( A则采用事件联系起来, 系统设计简单且维护方便。
8 t q) `/ `, j5 @1 l% b, w* n0 h图3-15 事件与事件处理器5 c: Q6 i) V/ s: H; t5 Y
3.4.3 YARN服务库和事件库的使用方法
/ X5 t, k0 G5 O4 f& l. n为了说明YARN服务库和事件库的使用方法, 本小节介绍一个简单的实例, 该实例可看做MapReduce& `1 `) {! S# H0 K- P3 Z/ Y4 x
ApplicationMaster( MRAppMaster) 的简化版。 该例子涉及任务和作业两种对象的事件以及一个中央异步调度器。 步骤如下。
: `# t# w' u3 e/ ]# M) \1) 定义Task事件。4 w6 N! w; X% n
public class TaskEvent extends AbstractEvent<TaskEventType> {
- J. |$ g7 S0 _0 zprivate String taskID; //Task ID
' Z$ F6 i" g. h, lpublic TaskEvent(String taskID, TaskEventType type) {
8 |7 s" l" {1 L, y+ c# m% |4 I) asuper(type);; }* M0 q, J- W8 B! Y
this.taskID = taskID;
2 R/ P2 i1 K0 E! l$ R7 l, l} p& v3 N" O I$ x8 b) S( Y
ublic String getTaskID() {
% \3 X5 d; A2 C- J; c" Breturn taskID;
. H# B0 M. i. [# z} 其
# s# m# Y) u3 c6 Y中, Task事件类型定义如下:+ q6 H* [6 i( n9 e" O
public enum TaskEventType {* H$ O* R0 r& J5 i9 }
T_KILL,+ K$ o: ^( e3 L+ r0 ^% a3 `5 l
T_SCHEDULE4 z5 S$ q8 X- V5 J9 N
}& M% e# u# @) [
2) 定义Job事件。
. K7 g; `9 h3 A( Y3 K# @public class JobEvent extends AbstractEvent<JobEventType> {3 u6 }" R' `5 g, q2 i
private String jobID;
_- ~ ?: d3 wpublic JobEvent(String jobID, JobEventType type) {
; V+ v% h* h! f7 F* m6 e% @: ysuper(type);
, ?; R4 w( F/ _' M- r, f' ~) Dthis.jobID = jobID;
3 E7 t& o' f+ ]5 u. d4 l0 m$ ^9 M5 b% T} p
% @8 X ]$ b; j p& u! d4 s% ?ublic String getJobId() {
( ]( b. }; n9 F) @6 f/ @: Ereturn jobID;$ X1 {7 `% l& R, J% y: u" D
}* H& @! x0 ]9 M
}! N. L( B. U5 g+ y
其中, Job事件类型定义如下:% {3 A+ ^, T9 q1 c) v H, U
public enum JobEventType {- K: y' T) d# n/ S+ U& f9 x4 j$ a9 o
JOB_KILL,3 R5 F' R* p& U9 E( z
JOB_INIT,
7 L' C9 M& ]9 W; o6 h6 c6 Q8 D& wJOB_START& F) I: }* v0 ?2 i6 T' W
}1 m$ S" J. r( }/ f/ [- q' v
3) 事件调度器。# [) [( G. C' `9 P+ T
接下来定义一个中央异步调度器, 它接收Job和Task两种类型事件, 并交给对应的事件处理器处理, 代码如下:
' J6 w, k) Z6 o; ^% J@SuppressWarnings("unchecked")
8 g% Y! g/ J, Y8 r: wpublic class SimpleMRAppMaster extends CompositeService {
( U0 g3 p2 g) `0 Lprivate Dispatcher dispatcher; //中央异步调度器) r* Q( k* s# t: ]8 L) a: P
private String jobID;% k: |& }/ t) o, i
private int taskNumber; //该作业包含的任务数目
6 L: [0 d9 O/ L! o% e d, W& aprivate String[] taskIDs; //该作业内部包含的所有任务
1 s/ h3 j5 D& V5 s9 _# Vpublic SimpleMRAppMaster(String name, String jobID, int taskNumber) {
: G3 {- E- n% O; t2 Tsuper(name);& l. X# u0 Q1 V" z/ W( T4 n
this.jobID = jobID;
0 ^* U( @* Y, i4 w+ Rthis.taskNumber = taskNumber;- x4 ]' N/ G' ?; S+ r
taskIDs = new String[taskNumber];
9 a; C7 y2 P$ H- b) y6 B9 E6 `6 }for(int i = 0; i < taskNumber; i++) {6 ^# `6 n% V) \. D
taskIDs = new String(jobID + "_task_" + i);" S- P: I( W3 K% Y) k
}' \$ w8 G9 k! K. H' B
} p
7 k. z( U, \2 ^) |; @ublic void serviceInit(final Configuration conf) throws Exception {
$ b6 @2 a! S0 H% _) t5 f7 \6 q- t9 Wdispatcher = new AsyncDispatcher();//定义一个中央异步调度器
! k& L) ?& g; d% O" ?7 ~2 Z//分别注册Job和Task事件调度器3 N5 m$ h! c" X* p; U' d
dispatcher.register(JobEventType.class, new JobEventDispatcher());7 B9 E: f2 a1 b. ?- |1 I
dispatcher.register(TaskEventType.class, new TaskEventDispatcher());: K8 e, D- I; H2 Z8 {! E7 v. K
addService((Service) dispatcher);
" i3 E M# _2 F3 ]2 Rsuper.serviceInit(conf);
0 R; X" V' O" q' c" R, C& O0 H, v} p
% W( R5 r. W+ I3 b8 o! vublic Dispatcher getDispatcher() {
! N! p9 C$ B! A* ireturn dispatcher;# a. B+ I3 V0 z5 g& K: q
} p
% E1 C' @: T/ L' C+ I# X. ]1 k- urivate class JobEventDispatcher implements EventHandler<JobEvent> {/ X7 F) z( p9 S: p! `: E2 A( l, S
@Override
& O& y3 a$ p5 I1 xpublic void handle(JobEvent event) {
; Z- T! u0 L5 h N; t4 tif(event.getType() == JobEventType.JOB_KILL) { O ]' J; d) b( i4 F! Y% A9 f
System.out.println("Receive JOB_KILL event, killing all the tasks");: p9 v7 E u( ?) {' g9 ?
for(int i = 0; i < taskNumber; i++) {
4 E" O7 [' V5 N+ {dispatcher.getEventHandler().handle(new TaskEvent(taskIDs," Q6 r/ G2 \8 a$ U% Z& k
TaskEventType.T_KILL));
9 T3 D. Z, U9 t% h: b0 v}1 x+ Y2 b4 h1 j/ C2 _
} else if(event.getType() == JobEventType.JOB_INIT) {3 @, h2 E+ y8 h- J9 F
System.out.println("Receive JOB_INIT event, scheduling tasks");
+ v/ q' E& l H+ \for(int i = 0; i < taskNumber; i++) {
9 \5 _. Q$ D! adispatcher.getEventHandler().handle(new TaskEvent(taskIDs,7 F& r. k* {/ d- T/ q
TaskEventType.T_SCHEDULE));* e7 n9 W: }) C b: z& {3 G9 ~2 R
}
% \1 C- W+ y% B6 Y& z}
) @& t+ ~, E- V" G0 \' o: z& P1 D}# N- `' Q/ s9 x% F8 P
}p
5 |# F2 C7 `. L8 u0 B5 |$ S [( q1 Frivate class TaskEventDispatcher implements EventHandler<TaskEvent> {
0 ]" a0 t. d$ ~* K/ P( C5 B) K% E@Override
: m# n; s+ o% d0 j/ _9 Mpublic void handle(TaskEvent event) {
, J. I/ t. ~ D3 g& |/ R5 ]if(event.getType() == TaskEventType.T_KILL) {
. q8 P) X& f! l' LSystem.out.println("Receive T_KILL event of task " + event.getTaskID());8 S' |! F7 c# I( Z- c& m
} else if(event.getType() == TaskEventType.T_SCHEDULE) {1 C$ K& q9 ^) c8 f. |1 x$ ~8 B$ E) V
System.out.println("Receive T_SCHEDULE event of task " + event.getTaskID());# G0 I) b9 u3 O7 M: [) i, k' P
}
$ }2 t! ?+ h& y) `2 y}
$ s" E2 y: W' q7 M; s+ E; w}. ^1 f. \# D3 E( l, V3 Q/ k) d0 |
}
: U3 f, i4 M" g; [' X0 l5 J R1 ]4) 测试程序。" H8 { A: V' _- C7 G5 O3 u$ r0 F
@SuppressWarnings("unchecked")# q$ y1 M( a0 n6 r
public class SimpleMRAppMasterTest {
, }, F8 o' r B" t0 rpublic static void main(String[] args) throws Exception {$ Y' M' a% l/ [* b- L
String jobID = "job_20131215_12";7 G6 E# w! S8 r
SimpleMRAppMaster appMaster = new SimpleMRAppMaster("Simple MRAppMaster", jobID, 5);
7 O8 r& r) _3 v3 ?. F4 @YarnConfiguration conf = new YarnConfiguration(new Configuration());' H' R4 F# E- j- V( l, k
appMaster.serviceInit(conf);
* i1 d* \ W! S% \" [- PappMaster.serviceStart();
/ p. l- W: O* l% x- t3 K5 XappMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID,
9 Q# {3 u5 `3 J! ]+ U" Q. BJobEventType.JOB_KILL));
0 \- A6 V8 C6 p F- \appMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID,
/ _5 h# Y' S! J- k# g! `% GJobEventType.JOB_INIT));
- |2 i5 @! v9 S}2 Z$ k7 x2 f: s e
3.4.4 事件驱动带来的变化, p3 V6 n- b* o O, l, ]3 }8 Q U
在MRv1中, 对象之间的作用关系是基于函数调用实现的, 当一个对象向另外一个对象传递信息时, 会直接采用函数调用的7 ^1 ~2 y; r6 b* r8 s+ W4 i& |
方式, 且整个过程是串行的。 比如, 当TaskTracker需要执行一个Task时, 将首先下载Task依赖的文件( JAR包、 二进制文件等、$ K# b* Q/ ^% K2 v" F; G: u8 ]
字典文件等) 、 然后执行Task。 同时在整个过程中会记录一些关键日志, 该过程可用图3-16描述。 在整个过程中, 下载依赖文件
0 W: o+ Y. X4 Q h/ {$ E是阻塞式的, 也就是说, 前一个任务未完成文件下载之前, 后一个新任务将一直处于等待状态, 只有在下载完成后, 才会启动一
- W7 Q8 J4 V) E/ g个独立进程运行该任务。 尽管后来MRv1通过启动过独立线程下载文件 解决了该问题 [15] , 但这种方式不是在大系统中彻底解决, L7 a$ p Y5 j2 c5 G
问题之道, 必须引入新的编程模型。- i1 |- T* `; i8 c! _' N
图3-16 基于函数调用的工作流程
$ Y$ S/ X; Q1 l o7 n: t4 i. ^基于函数调用的编程模型是低效的, 它隐含着整个过程是串行、 同步进行的。 相比之下, MRv2引入的事件驱动编程模型则; W, T1 w' Y# x/ Y; U4 N
是一种更加高效的方式。 在基于事件驱动的编程模型中, 所有对象被抽象成了事件处理器, 而事件处理器之间通过事件相互关
1 e K5 W" I7 l/ V! [' ], u联。 每种事件处理器处理一种类型的事件, 同时根据需要触发另外一种事件, 该过程如图3-17所示, 当A需要下载文件时, 只需
7 o% h" \# W7 j7 b向中央异步处理器发送一个事件即可( 之后可以继续完成后面的功能而无须等待下载完成) , 该事件会被传递给对应的事件处理
$ a. ^* I& D: ?' r6 }器B, 由B完成具体的下载任务。 一旦B完成下载任务, 便可以通过事件通知A。4 j0 @$ \% S% c! v9 k% ]1 H
图3-17 基于事件驱动的工作流程5 ^5 k% F/ l% j& o
相比于基于函数调用的编程模型, 这种编程方式具有异步、 并发等特点, 更加高效, 因此更适合大型分布式系统。
/ m& ~2 H( R5 @ }9 S9 ?" u[15] 参见网址https://issues.apache.org/jira/browse/MAPREDUCE-2705。
0 g$ @; Q7 }: w( E( s7 y) Y9 u# n" _4 S2 F9 l* O) h$ V
$ }7 [/ h T/ x- _3 y* f4 `
|
|