|
3.5 状态机库( R9 r7 y1 J5 m2 Q) Y
状态机由一组状态组成, 这些状态分为三类: 初始状态、 中间状态和最终状态。 状态机从初始状态开始运行, 经过一系列中- x3 v9 z* n7 Z+ a4 k- s, Y
间状态后, 到达最终状态并退出。 在一个状态机中, 每个状态都可以接收一组特定事件, 并根据具体的事件类型转换到另一个状$ N- |; |/ C. k: L( X+ {+ Y
态。 当状态机转换到最终状态时, 则退出。8 C; s+ A: f! d# q- u, N; ]2 n/ u2 k% z
3.5.1 YARN状态转换方式; d5 W/ F4 ^+ ?$ H# ~) D+ i
在YARN中, 每种状态转换由一个四元组表示, 分别是转换前状态( preState) 、 转换后状态( postState) 、 事件( event) 和- p, n ?- e( Z& ^* T" B% S
回调函数( hook) 。 YARN定义了三种状态转换方式, 具体如下:$ o( H5 I7 U7 f+ o2 b. p7 f
1) 一个初始状态、 一个最终状态、 一种事件( 见图3-18) 。 该方式表示状态机在preState状态下, 接收到Event事件后, 执行- }7 X. K: z3 H7 g. O/ T
函数状态转移函数Hook, 并在执行完成后将当前状态转换为postState。6 n! V' i& u5 ]0 A% l$ d
图3-18 初始状态:最终状态:事件=1:1:1
" k. f W* P5 r h2) 一个初始状态、 多个最终状态、 一种事件( 见图3-19) 。 该方式表示状态机在preState状态下, 接收到Event事件后, 执行
- L @, p4 F2 d函数状态转移函数Hook, 并将当前状态转移为函数Hook的返回值所表示的状态。/ W0 l& O/ `* W! V- e. H
图3-19 初始状态:最终状态:事件=1:N :17 m/ h) o: ~' ~9 ^6 ^9 P
3) 一个初始状态、 一个最终状态、 多种事件( 见图3-20) 。 该方式表示状态机在preState状态下, 接收到Event1、 Event2和; c0 H) Q8 b- m' j
Event3中的任何一个事件, 将执行函数状态转移函数Hook, 并在执行完成后将当前状态转换为postState。
6 {, E u8 k8 `/ v( O: ~: Z1 F- A图3-20 初始状态:最终状态:事件=1:1:N
7 E: n6 W0 x8 S0 ?; x4 i3 ^3.5.2 状态机类+ N7 c1 E4 T3 I1 P0 D
YARN自己实现了一个非常简单的状态机库( 位于包org.apache.hadcop.yarn.state中) , 具体如图3-21所示。 YARN对外提供了- y0 z- u: X3 b3 L. ]: c# ^
一个状态机工厂StatemachineFactory, 它提供多种addTransition方法供用户添加各种状态转移, 一旦状态机添加完毕后, 可通过调+ E% G/ w$ ?7 l! @ b# x5 Q4 n; m
用installTopology完成一个状态机的构建。8 s! U0 F$ l( k. [. z7 `( Q* R* o
图3-21 状态机类图& G" w& k& h. \* L
3.5.3 状态机的使用方法8 J' d# y9 l. o* [, A" q2 h4 U
本小节将给出一个状态机应用实例, 在该实例中, 创建一个作业状态机JobStateMachine, 该状态机维护作业内部的各种状态
U4 W. A- Y$ ?# n4 g变化。 该状态机同时也是一个事件处理器, 当接收到某种事件后, 会触发相应的状态转移。 该实例中没有给出一个中央异步调度9 {! V% T0 n6 G
器, 可以嵌到3.4.3节的实例程序中运行。* x- L0 o" s2 m' Z! p
1) 定义作业类型。
- [ j* i5 f# p0 Ppublic enum JobEventType {6 a5 T+ V9 C! x, d7 F5 w
JOB_KILL,6 r1 j& J, A) l3 f" m& @+ ? J; |" o
JOB_INIT,: i2 h3 {5 h, g- P" H4 [( X- v) Z& f
JOB_START,
) k" F1 p* n' \$ y, PJOB_SETUP_COMPLETED,
! s6 |4 v. @% VJOB_COMPLETED/ [ @" b) T% K
}
4 S! N$ O3 Y4 W% e7 V2) 定义作业状态机。4 I; z4 y) ^# l2 v W2 N: w
@SuppressWarnings({ "rawtypes", "unchecked" })
. O- v. e0 p& k9 T9 Npublic class JobStateMachine implements EventHandler<JobEvent>{
# `8 K) t Z7 P- C5 n7 |private final String jobID;% z0 R4 E& T; \, A0 f* _) u6 k& U
private EventHandler eventHandler;
5 l9 c* O0 y4 S- s0 Eprivate final Lock writeLock;
' w, R! `8 L. `) dprivate final Lock readLock;
' I. h. S' J0 l- F6 ~// 定义状态机
0 J& t" \/ d: V8 ?protected static final
% A1 z- _; Z# H* E! b2 q& R! y, }, bStateMachineFactory<JobStateMachine, JobStateInternal, JobEventType, JobEvent>( ?3 P! I2 @$ g* m @2 t
stateMachineFactory
+ p7 J* c3 f8 V. H: a5 Y= new StateMachineFactory<JobStateMachine, JobStateInternal, JobEventType, JobEvent>7 p; b. w) j6 ^
(JobStateInternal.NEW)
) u* ?# Q1 C& S6 @.addTransition(JobStateInternal.NEW, JobStateInternal.INITED,7 t9 N3 F1 ~9 ]) m# `
JobEventType.JOB_INIT,
- A! {1 N! Y1 U. o7 xnew InitTransition())
( B* i3 `8 z8 k K.addTransition(JobStateInternal.INITED, JobStateInternal.SETUP,
6 a& W' K& ?) } t+ M9 pJobEventType.JOB_START,
$ E' I: V+ Q& q2 o( D: snew StartTransition())
1 y* t) D8 [2 g1 z.addTransition(JobStateInternal.SETUP, JobStateInternal.RUNNING,! K, W# m# R7 \0 \; n
JobEventType.JOB_SETUP_COMPLETED,% G# ~" W1 D. r5 Q, z+ Y: U
new SetupCompletedTransition())
- a. G3 f1 O* I, i, f.addTransition& O5 V; M1 h) S! P2 C
(JobStateInternal.RUNNING,
5 {9 C! I5 n7 O! j& l/ jEnumSet.of(JobStateInternal.KILLED, JobStateInternal.SUCCEEDED),: j8 f }+ Q% [4 ]2 k5 j9 l
JobEventType.JOB_COMPLETED,4 H! O: r! J5 i9 T8 C) [' X
new JobTasksCompletedTransition())
( ~' l' `' Q! [( d.installTopology();
- F- i* m7 F8 n+ q& }+ e2 ?private final StateMachine<JobStateInternal, JobEventType, JobEvent> stateMachine;
5 ^0 P. @! M) d( ^public JobStateMachine(String jobID, EventHandler eventHandler) {
8 ]- L# ]4 [$ ]0 cthis.jobID = jobID;. A4 A) t% j" j4 i
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();. }6 H, N8 q" @0 C4 L
this.readLock = readWriteLock.readLock();
+ Y! B* d0 ?: O# I+ \1 M* Bthis.writeLock = readWriteLock.writeLock();9 H0 O. a! y4 R# G* ^4 ]" h9 W
this.eventHandler = eventHandler;
8 k8 f$ P% B ^stateMachine = stateMachineFactory.make(this);& |' X. X8 l! N1 U' C' H# Q3 C
}p/ D) k* L# Q$ f3 m9 \' \8 A% F) n
rotected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {
, ~2 }* f/ M* \/ {1 L; Ereturn stateMachine;
* Z# ?, Y$ \% _: Q! e0 i} p
$ T1 y- N& W* s4 \" hublic static class InitTransition
; \3 \2 b- t6 Z6 I' V- {0 @$ Jimplements SingleArcTransition<JobStateMachine, JobEvent> {* I, w. E" P8 {1 N8 K! L% W2 Y& I$ B
@Override
" f( n( W2 \$ \# C8 G9 o+ Upublic void transition(JobStateMachine job, JobEvent event) {
3 g8 e2 v e$ E" @* mSystem.out.println("Receiving event " + event);/ l R4 [7 [ G
job.eventHandler.handle(new JobEvent(job.getJobId(), JobEventType.JOB_START));% d! T( k; ^5 R
}
* a$ F! S$ i1 Z* a: `} p: s" p3 {) X: c+ f& O, l4 [
ublic static class StartTransition* R3 W# R ]1 [# G4 r- _- D
implements SingleArcTransition<JobStateMachine, JobEvent>>{
1 C$ U6 l! o/ c# t! I, j@Override f/ ?$ j8 u# O5 z* _& [& t
public void transition(JobStateMachine job, JobEvent event) {; S( Z4 Q. V5 c# @
System.out.println("Receiving event " + event);0 k( O0 d3 G# r6 v
job.eventHandler.handle(new JobEvent(job.getJobId(), JobEventType.JOB_SETUP_COMPLETED));2 z. K- j; I! m. @( q
}- d1 I! M" T6 y9 g& G6 z: B5 q3 K
}…/. c2 _7 ~) y- Q U
/定义类SetupCompletedTransition和JobTasksCompletedTransition
8 n3 v) {* M' Y* f, o7 o@Override5 I: Q6 r1 s, n' a9 V/ l
public void handle(JobEvent event) {
. s7 m, M6 D, {* Y, P. ytry {
' |' V4 R t6 I. `% [' IwriteLock.lock();! ~+ w% U' e( r
JobStateInternal oldState = getInternalState();
9 e1 Q" x2 p' N$ x7 h; j- m9 ?try {% Q4 M. i0 f. D5 }9 N# _
getStateMachine().doTransition(event.getType(), event);* ~6 m5 g& \# g) s+ z
} catch (InvalidStateTransitonException e) {% s* {2 T4 f# G- D
System.out.println("Can't handle this event at current state");
0 V" [3 c3 ? F D% f4 s9 \}i
- p" m# q$ c& q# Mf (oldState != getInternalState()) {' ^+ F5 @) \1 t8 ]
System.out.println("Job Transitioned from " + oldState + " to "0 n. M" i9 }% M2 y) r
+ getInternalState());
* ~- _3 ~1 w5 S- s9 V9 ~}
8 d2 a7 D* a" v" F1 S! C! {1 @}f
# e: ?: O' |& D7 P" l! E# Binally {( q. R0 c! q0 H0 b9 o3 B$ c
writeLock.unlock();4 O2 @0 k- { k }5 Y J! P
}
+ k w6 o7 a: c} p
9 E \3 z5 j& y4 s rublic JobStateInternal getInternalState() {( w7 M: k$ C1 q' t
readLock.lock();
% ~% Y/ e+ E, Z# e Z7 v) \5 mtry {: \: a/ m+ J" M0 t# r% c% ^9 z
return getStateMachine().getCurrentState();2 E) Q: |8 q- Q1 v1 G: T0 ^
} finally {
( Z3 s0 y( v3 f* l, F3 K+ freadLock.unlock();" v' t& ]. p: |' T& F7 k
}
4 n6 ?$ I8 A/ N% i}p
/ N6 r3 `! i$ S. M1 oublic enum JobStateInternal { //作业内部状态2 J4 ?& @0 }) J0 Z ]" F/ Q
NEW,
. J* V5 ]* {( @. Y1 `2 r+ `5 R: XSETUP,9 N8 E% v+ M B" `
INITED,( N; D) o4 O0 _; g- T8 f
RUNNING,9 T6 z0 _! C: ^
SUCCEEDED,
( p; x+ \5 w( q) G1 Y1 EKILLED,
' f7 |1 D6 L0 P+ E& k* F: ?}
1 _" H' z7 u) l7 v}
8 J+ u. z0 c6 }+ G. _: Q3.5.4 状态机可视化0 Z! M; Y# Y# Y0 j7 E$ X1 q4 Z
YARN中实现了多个状态机对象, 包括ResourceManager中的RMAppImpl、 RMApp-AttemptImpl、 RMContainerImpl和
+ H: z7 ?: x7 ]( \8 D9 xRMNodeImpl, NodeManager中的ApplicationImpl、 ContainerImpl和LocalizedResource, MRAppMaster中的JobImpl、 TaskImpl和
: @9 u3 e& D: d6 R* }' ]/ CTaskAttemptImpl等。 为了便于用户查看这些状态机的状态变化以及相关事件, YARN提供了一个状态机 可视化工具 [16] , 具体操
/ ]' A0 U' u5 l) k$ s S! }6 t5 S作步骤如下。1 f: Q# f. L/ X2 ~; o: ]* y
步骤1 将状态机转化为graphviz(.gv)格式的文件, 编译命令如下:
) q! t6 |4 U; ~2 S* h' k3 Nmvn compile -Pvisualize
" J: |' @3 U- R经过该步骤后, 本地目录中生成了ResourceManager.gv、 NodeManager.gv和MapReduce.gv三个graphviz格式的文件( 有兴趣可以
% \1 S; C- J* m- }直接打开查看具体内容) 。
+ y* F1 D, `6 j" K步骤2 使用可视化包graphviz中的相关命令生成状态机图, Shell命令具体如下:
9 A) t4 L; y8 p' Hdot -Tpng NodeManager.gv > NodeManager.png& m+ a D6 ?1 p+ P% p9 Y" n
如果尚未安装graphviz包, 操作该步骤之前先要安装该包。
+ r+ O- |- j3 V1 y5 T& s0 f* h$ p6 T注释
3 q; ^( K& k, ~* [! R% H0 [: {[16] 参见网址https://issues.apache.org/jira/browse/MAPREDUCE-2930。
9 B ^8 Y' J1 y; G
0 F1 i( ] V" I$ b) A1 q. Y% G. c- M' A: T
|
|