|
3.4 服务库与事件库
5 }/ G5 Q( ^$ s* V9 X& h( H9 h本节介绍服务库和事件库。0 T' P" K& Q7 Q4 I) a( R+ ~: H( v
3.4.1 服务库/ ?+ n% |. f* \4 g# z
对于生命周期较长的对象, YARN采用了基于服务的对象管理模型对其进行管理, 该模型主要有以下几个特点。
% ?! E- q3 y4 h! ^$ T: z7 |: r/ O❑将每个被服务化的对象分为4个状态: NOTINITED( 被创建) 、 INITED( 已初始化) 、 STARTED( 已启动) 、
: x6 A( f7 h' @) [% Y7 G% ]8 y6 USTOPPED( 已停止) 。7 ^1 v+ B: H9 d- w. h( m2 x
❑任何服务状态变化都可以触发另外一些动作。* u7 q" G* e8 F6 v
❑可通过组合的方式对任意服务进行组合, 以便进行统一管理。
. f3 a- ]7 m# v; p9 JYARN中关于服务模型的类图( 位于包org.apache.hadoop.service中) 如图3-13所示。 在这个图中, 我们可以看到, 所有的服务
, d# P9 _5 c: z; @% U" h4 A对象最终均实现了接口Service, 它定义了最基本的服务初始化、 启动、 停止等操作, 而AbstractService类提供了一个最基本的
" H) t9 C" B' uService实现。 YARN中所有对象, 如果是非组合服务, 直接继承AbstractService类即可, 否则需继承CompositeService。 比如, 对于% l1 C+ N( w1 I
ResourceManager而言, 它是一个组合服务, 它组合了各种服务对象, 包括ClientRMService、 ApplicationMasterLauncher、1 s, R. Q; x |! K7 n0 X- k0 E B8 T
ApplicationMasterService等。0 u! Y% k9 W# V+ C2 P' G, ]$ G
图3-13 YARN中服务模型的类图
( S8 T2 h5 t7 d4 |# ]在YARN中, ResourceManager和NodeManager属于组合服务, 它们内部包含多个单一服务和组合服务, 以实现对内部多种服+ H* c( o2 `" A9 S
务的统一管理。
2 m6 r/ y! X. b: A' O/ M3.4.2 事件库
! S. t k$ M$ I, I/ sYARN采用了基于事件驱动的并发模型, 该模型能够大大增强并发性, 从而提高系统整体性能。 为了构建该模型, YARN将
7 F2 o Q( R$ c D( _1 o各种处理逻辑抽象成事件和对应事件调度器, 并将每类事件的处理过程分割成多个步骤, 用有限状态机表示。 YARN中的事件处3 m) l7 N/ Y1 O' Q8 X4 ^
理模型可概括为图3-14所示。; a" r" z, s7 I2 a! J1 \0 ~! g
图3-14 YARN的事件处理模型
: N) { q! z9 {, h( P5 [7 ?整个处理过程大致为: 处理请求会作为事件进入系统, 由中央异步调度器( Async-Dispatcher) 负责传递给相应事件调度器- y2 |: W; I9 [' ^4 l7 x
( Event Handler) 。 该事件调度器可能将该事件转发给另外一个事件调度器, 也可能交给一个带有有限状态机的事件处理器, 其( @5 U0 v) ]% U" A
处理结果也以事件的形式输出给中央异步调度器。 而新的事件会再次被中央异步调度器转发给下一个事件调度器, 直至处理完成
( o9 K9 j5 s. E" {3 F5 c( 达到终止条件) 。
' M% Q; ]0 F' r! G+ G在YARN中, 所有核心服务实际上都是一个中央异步调度器, 包括ResourceManager、 NodeManager、. q& \. U2 T8 l& m P
MRAppMaster( MapReduce应用程序的ApplicationMaster) 等, 它们维护了事先注册的事件与事件处理器, 并根据接收的事件类型9 O" f3 ]5 s! O; g( [; _
驱动服务的运行。# Q: B0 `4 u( w
YARN中事件与事件处理器类的关系( 位于包org.apache.hadoop.yarn.event中) 如图3-15所示。 当使用YARN事件库时, 通常先# ]& R) M. v( Z( U' S n. N
要定义一个中央异步调度器AsyncDispatcher, 负责事件的处理与转发, 然后根据实际业务需求定义一系列事件Event与事件处理器, F- y; w# y7 _) s# m, ]
EventHandler, 并注册到中央异步调度器中以实现事件统一管理和调度。 以MRAppMaster为例, 它内部包含一个中央异步调度器
& U, R0 M2 L/ b0 x2 `" p2 b5 zAsyncDispatcher, 并注册了TaskAttemptEvent/TaskAttemptImpl、 TaskEvent/TaskImpl、 JobEvent/JobImpl等一系列事件/事件处理器, 由
6 ^5 e; C( a g/ f; v中央异步调度器统一管理和调度。
# M0 r: |9 g0 I- j2 I' y: X服务化和事件驱动软件设计思想的引入, 使得YARN具有低耦合、 高内聚的特点, 各个模块只需完成各自功能, 而模块之间
7 \/ F3 I& {9 G1 g) S则采用事件联系起来, 系统设计简单且维护方便。
' w1 m. u! t* {! p- A/ h图3-15 事件与事件处理器
) F" [" O2 g x" @7 u. h {$ u3.4.3 YARN服务库和事件库的使用方法1 E7 _. ~) \ Z# r" R# I
为了说明YARN服务库和事件库的使用方法, 本小节介绍一个简单的实例, 该实例可看做MapReduce6 E- @5 ~* c% Z% Y7 r
ApplicationMaster( MRAppMaster) 的简化版。 该例子涉及任务和作业两种对象的事件以及一个中央异步调度器。 步骤如下。* y( N( M$ b! Q( W: ^
1) 定义Task事件。, o% h$ ?# \8 y( T
public class TaskEvent extends AbstractEvent<TaskEventType> {5 x' d# Z. C1 J% W
private String taskID; //Task ID
1 w) w6 i. `1 u0 ]public TaskEvent(String taskID, TaskEventType type) {
6 w4 c' Z3 B' x$ X( ?1 Y [; ]1 Osuper(type);
* g& N% h6 _7 p, @. Ithis.taskID = taskID;
7 E' A& B# d& G% K: d0 O9 l% F3 `} p
6 K5 }& q* c# y/ h# wublic String getTaskID() {! d# ]3 n2 c% D% ^
return taskID;
% u5 a% i! Y }9 H& R5 D2 Q1 G) q} 其) J3 I/ ~, N$ t+ ]0 m# ?( L& a
中, Task事件类型定义如下:& \) w8 [. ~. o
public enum TaskEventType {
; f1 u$ {# \; {T_KILL,
& i r/ `7 F8 H( sT_SCHEDULE
, W9 ?/ ]) N+ s: d2 Y& I}& Y1 t% A3 h' V4 v' `1 u4 v9 V
2) 定义Job事件。) v2 t9 @& g I/ T m
public class JobEvent extends AbstractEvent<JobEventType> {
" y9 W: T, j( d* n0 C5 w2 jprivate String jobID;
# S4 o9 Z( j G/ @8 L) xpublic JobEvent(String jobID, JobEventType type) {1 P) \! U1 n8 H
super(type);
: _( W4 @ G2 d X- hthis.jobID = jobID;; P( b% q0 ]* [# L8 a8 E
} p
) L: ~% L! X' h' w% L! Sublic String getJobId() {
- j# {9 L2 }; }return jobID;
5 O0 i3 ]4 T" f( V} l; o) H* a9 s
}$ z+ t+ t+ G2 |+ Z" D% B" J7 N c
其中, Job事件类型定义如下:
O1 K" a# q3 }5 G) m4 g3 l( ~# Gpublic enum JobEventType {
, A( y8 Z! J& I, B1 B3 EJOB_KILL,
. Y: ^+ [! f0 k* @! y0 mJOB_INIT,
! S. H; u0 G( {2 H2 M5 ZJOB_START6 J2 ^: l2 {, I) j' z
}, [7 ]; Q% i9 Y" I
3) 事件调度器。8 @( ]) _- w1 E# S
接下来定义一个中央异步调度器, 它接收Job和Task两种类型事件, 并交给对应的事件处理器处理, 代码如下:
, d7 Y8 ~/ _( X@SuppressWarnings("unchecked")
" D8 V" c. _9 ]public class SimpleMRAppMaster extends CompositeService {
" V; s: K* ^ R* F7 b5 `private Dispatcher dispatcher; //中央异步调度器
/ E# f- k& K: a7 i! jprivate String jobID;5 O# `7 u, q2 Q
private int taskNumber; //该作业包含的任务数目9 F0 ^% x- Y6 o
private String[] taskIDs; //该作业内部包含的所有任务/ k9 ~ o( V3 h5 i
public SimpleMRAppMaster(String name, String jobID, int taskNumber) {/ F+ K( X5 m. _/ _+ r) q
super(name);8 R( U2 a3 o# ^7 d
this.jobID = jobID;6 {* R1 B. S& N* L. k% A- `. t; P
this.taskNumber = taskNumber;) W8 J) \% r7 u6 u
taskIDs = new String[taskNumber];
+ z1 I1 `- n4 U" t* K) V2 F% ufor(int i = 0; i < taskNumber; i++) {
, ]1 f# P% G; v* n* x9 ^taskIDs = new String(jobID + "_task_" + i);& ~: v+ S- V' l8 W" O
}% o; f# x# u9 [7 U5 Q7 k2 e) D
} p
0 X0 [" }8 j' `- z; wublic void serviceInit(final Configuration conf) throws Exception {8 B6 `, I# Q6 g; ~
dispatcher = new AsyncDispatcher();//定义一个中央异步调度器
5 w/ Z+ }/ |2 Q5 P6 t! s+ m//分别注册Job和Task事件调度器) W: C" w h& ^ \$ u
dispatcher.register(JobEventType.class, new JobEventDispatcher());9 }- o+ }, \3 ^# [( ?2 O
dispatcher.register(TaskEventType.class, new TaskEventDispatcher());+ T0 i# y1 s8 W( A: a! r' }
addService((Service) dispatcher);" o. S8 D+ G+ I5 _) k
super.serviceInit(conf);
5 |6 u0 N9 j# s& I' v/ d} p, Q) Z' u; A6 y! {8 O
ublic Dispatcher getDispatcher() {
! r! ?4 p9 ^3 }" \return dispatcher;
6 ^4 j" ^$ i$ j# f/ C} p
: V% _) z& M C4 E* f2 wrivate class JobEventDispatcher implements EventHandler<JobEvent> {
8 A8 r+ _7 w2 M, n- W) X6 }5 n9 ]@Override
2 [& O' ~/ X$ h- a. @5 s7 lpublic void handle(JobEvent event) {+ r' S2 d6 Z+ V2 ]5 b: D
if(event.getType() == JobEventType.JOB_KILL) {
; V7 O0 k: m& c% x& ASystem.out.println("Receive JOB_KILL event, killing all the tasks");# W: \7 ]; t( h, v
for(int i = 0; i < taskNumber; i++) {' V$ I2 }2 w. V; U& P8 R: N0 G
dispatcher.getEventHandler().handle(new TaskEvent(taskIDs,( V: a5 Q1 ~ g+ L8 W
TaskEventType.T_KILL));
4 ?1 l: ]$ k7 \6 w- J9 a}
1 O! O: K% x" v" J! V} else if(event.getType() == JobEventType.JOB_INIT) {" P9 s5 j. T' i; J& d2 I% _
System.out.println("Receive JOB_INIT event, scheduling tasks");: S6 @! Y. J, O; h5 L
for(int i = 0; i < taskNumber; i++) {
+ c" a% ?" @& q& e& a; odispatcher.getEventHandler().handle(new TaskEvent(taskIDs, ?$ Q2 T, }" H/ g( v/ U
TaskEventType.T_SCHEDULE));5 }1 B+ U! h5 `7 \9 R- `& l' r
}' t2 a# Y: F/ R
}' [" P/ f% T+ ]5 b
}
4 L k3 t2 P( c" M}p
f7 [ b+ C4 r2 s& N+ F- K z: crivate class TaskEventDispatcher implements EventHandler<TaskEvent> {
L9 T: [( P1 F3 x: B& d@Override8 r" n. s, ]5 q
public void handle(TaskEvent event) {
$ Q% q# [: a# S* Wif(event.getType() == TaskEventType.T_KILL) {
{1 U/ _& w' d6 y; I0 K) Z: H5 A2 OSystem.out.println("Receive T_KILL event of task " + event.getTaskID());
- h! W" j. u4 t4 y P" P} else if(event.getType() == TaskEventType.T_SCHEDULE) {8 r$ Y& M% s4 ^( O1 k. |
System.out.println("Receive T_SCHEDULE event of task " + event.getTaskID());0 ~, z# V# C! g# L
}
0 K6 b1 W8 \4 V& s7 s! y3 T- T& g}
3 q4 s4 s) z" j" m) ]) ]}% i$ e) w. G$ a' [
}5 P* L8 D+ A+ j8 S: {! {5 R
4) 测试程序。
" q( q; l2 ]! { l2 o4 z@SuppressWarnings("unchecked")
( [" _' b* [) }2 H) zpublic class SimpleMRAppMasterTest {
. l/ `: G$ P. h" X: w" Xpublic static void main(String[] args) throws Exception {% p; ^: y8 h W6 M
String jobID = "job_20131215_12";7 o$ g3 R ]: H" n' S1 A) n
SimpleMRAppMaster appMaster = new SimpleMRAppMaster("Simple MRAppMaster", jobID, 5); M: b, f. V( s- u: g x
YarnConfiguration conf = new YarnConfiguration(new Configuration());' u8 V9 t% R6 X0 {3 I
appMaster.serviceInit(conf);0 @7 G7 u+ v" ?/ Q
appMaster.serviceStart();! H( S# E* H4 i
appMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID,
8 c7 ~6 v; s' N4 CJobEventType.JOB_KILL));
/ k+ d/ G k6 {1 n* ^appMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID,
- ?7 T5 \0 a0 e; h m# uJobEventType.JOB_INIT));
1 `' w7 n x8 R: z: m$ Q}: ?& t+ P* O! W3 x6 i. _
3.4.4 事件驱动带来的变化 v. W5 z& ^0 d- s. p* y7 `$ h
在MRv1中, 对象之间的作用关系是基于函数调用实现的, 当一个对象向另外一个对象传递信息时, 会直接采用函数调用的
0 K9 c# O$ Q0 A9 C, a$ }5 V' B方式, 且整个过程是串行的。 比如, 当TaskTracker需要执行一个Task时, 将首先下载Task依赖的文件( JAR包、 二进制文件等、6 y/ O7 k4 ~9 _8 C
字典文件等) 、 然后执行Task。 同时在整个过程中会记录一些关键日志, 该过程可用图3-16描述。 在整个过程中, 下载依赖文件
3 W/ t7 w( B; J是阻塞式的, 也就是说, 前一个任务未完成文件下载之前, 后一个新任务将一直处于等待状态, 只有在下载完成后, 才会启动一5 O, r! o K. G1 n7 [
个独立进程运行该任务。 尽管后来MRv1通过启动过独立线程下载文件 解决了该问题 [15] , 但这种方式不是在大系统中彻底解决+ S* J) ]; |) I$ m
问题之道, 必须引入新的编程模型。
2 \0 g$ E8 I6 T r6 f图3-16 基于函数调用的工作流程
+ [- @0 O- E5 I/ w N3 U基于函数调用的编程模型是低效的, 它隐含着整个过程是串行、 同步进行的。 相比之下, MRv2引入的事件驱动编程模型则
: |6 B' Z% L9 n+ `7 {& N, O是一种更加高效的方式。 在基于事件驱动的编程模型中, 所有对象被抽象成了事件处理器, 而事件处理器之间通过事件相互关
% m$ r$ U% `2 M. y. p8 s7 `' [联。 每种事件处理器处理一种类型的事件, 同时根据需要触发另外一种事件, 该过程如图3-17所示, 当A需要下载文件时, 只需
3 s! F& G& b# |0 e6 B向中央异步处理器发送一个事件即可( 之后可以继续完成后面的功能而无须等待下载完成) , 该事件会被传递给对应的事件处理+ e3 ]* q4 _ p! x( U1 e
器B, 由B完成具体的下载任务。 一旦B完成下载任务, 便可以通过事件通知A。) m% Z0 t" @- g2 `7 z4 h7 U- G
图3-17 基于事件驱动的工作流程
8 K: j3 S' ]' Z! T0 U相比于基于函数调用的编程模型, 这种编程方式具有异步、 并发等特点, 更加高效, 因此更适合大型分布式系统。9 A; s8 c3 o8 F' W
[15] 参见网址https://issues.apache.org/jira/browse/MAPREDUCE-2705。
9 P' J3 |! ~ T* ~: r- S6 k7 H
+ x6 q% p; L( x ?' D n* m/ E/ s& p* x) E, H8 O$ V3 p
|
|