|
3.5 状态机库
6 @# g9 w! @; @状态机由一组状态组成, 这些状态分为三类: 初始状态、 中间状态和最终状态。 状态机从初始状态开始运行, 经过一系列中9 [- d& U, l2 T
间状态后, 到达最终状态并退出。 在一个状态机中, 每个状态都可以接收一组特定事件, 并根据具体的事件类型转换到另一个状
9 k1 R1 H" Y- U# Y9 X态。 当状态机转换到最终状态时, 则退出。# y# ]( U5 v& m9 L9 x
3.5.1 YARN状态转换方式
, f5 V* s7 x8 v在YARN中, 每种状态转换由一个四元组表示, 分别是转换前状态( preState) 、 转换后状态( postState) 、 事件( event) 和
, E2 o& p# n% S' U* Z# V回调函数( hook) 。 YARN定义了三种状态转换方式, 具体如下:1 K6 C$ r c- A8 R4 P
1) 一个初始状态、 一个最终状态、 一种事件( 见图3-18) 。 该方式表示状态机在preState状态下, 接收到Event事件后, 执行3 E6 j8 a6 ~+ \1 p- c: Q
函数状态转移函数Hook, 并在执行完成后将当前状态转换为postState。2 m9 d$ \& L. q9 ]2 s4 l- h6 C
图3-18 初始状态:最终状态:事件=1:1:1# ]) b9 `% \5 ^0 ?
2) 一个初始状态、 多个最终状态、 一种事件( 见图3-19) 。 该方式表示状态机在preState状态下, 接收到Event事件后, 执行
4 P0 ]" t: s& h函数状态转移函数Hook, 并将当前状态转移为函数Hook的返回值所表示的状态。3 f$ n: i: U4 W; B8 t' f d0 B
图3-19 初始状态:最终状态:事件=1:N :18 r: Z0 Q! ]& r0 [8 H# F
3) 一个初始状态、 一个最终状态、 多种事件( 见图3-20) 。 该方式表示状态机在preState状态下, 接收到Event1、 Event2和. |$ |5 ~' n, v, a# U0 P1 ]' o C! N: x
Event3中的任何一个事件, 将执行函数状态转移函数Hook, 并在执行完成后将当前状态转换为postState。 I8 ^, {' e; n6 A. D6 j' A
图3-20 初始状态:最终状态:事件=1:1:N
* h; {$ x/ H- W, {" I) S9 b3.5.2 状态机类1 f& g8 x8 N1 d; z0 j# P% U: A
YARN自己实现了一个非常简单的状态机库( 位于包org.apache.hadcop.yarn.state中) , 具体如图3-21所示。 YARN对外提供了6 R9 R" {! o5 L+ A- l
一个状态机工厂StatemachineFactory, 它提供多种addTransition方法供用户添加各种状态转移, 一旦状态机添加完毕后, 可通过调( Q, [ r( Z/ m" C2 B
用installTopology完成一个状态机的构建。
" c9 s/ @5 `# E5 C图3-21 状态机类图
7 T% [) p7 S" F) z% d' s3.5.3 状态机的使用方法6 T* Z! k8 l- o% a7 C/ A
本小节将给出一个状态机应用实例, 在该实例中, 创建一个作业状态机JobStateMachine, 该状态机维护作业内部的各种状态- t0 V2 \) ? s% H7 }
变化。 该状态机同时也是一个事件处理器, 当接收到某种事件后, 会触发相应的状态转移。 该实例中没有给出一个中央异步调度: w3 D, }0 J" q% ?
器, 可以嵌到3.4.3节的实例程序中运行。
1 @4 e# o3 ?# N% m# H1) 定义作业类型。
4 b0 d) r6 F) S, t$ H& Wpublic enum JobEventType {
7 k/ s/ k/ p# eJOB_KILL,( h% P' A6 ]1 H3 P9 v
JOB_INIT,' I* ^$ Q& B a! t! O
JOB_START,
. G6 A% C/ n4 Q4 v9 VJOB_SETUP_COMPLETED,& Z' z/ A$ l1 G# Z6 O* S* S6 m% p
JOB_COMPLETED
, m1 J8 L, P. \0 T}
% ?, a3 j. ?# O% \* Q2) 定义作业状态机。4 I4 j2 H) w2 c
@SuppressWarnings({ "rawtypes", "unchecked" })
0 D% J, M- f1 Y" `. r1 z4 A1 B3 qpublic class JobStateMachine implements EventHandler<JobEvent>{! U6 ^3 e' j5 a$ E- A3 T
private final String jobID;
( g% B/ D8 A* s: j' Q, [+ i' t2 O2 _private EventHandler eventHandler;7 N7 C0 J; n# C0 S- @& c/ o R
private final Lock writeLock;
$ Y. Y2 L' Z/ fprivate final Lock readLock;
q6 c! x; b0 m2 A/ E// 定义状态机
1 {+ ^( ? z) ]# a6 `' G9 {$ [/ gprotected static final- h7 [0 _5 L0 y7 ]. B: S
StateMachineFactory<JobStateMachine, JobStateInternal, JobEventType, JobEvent>& t( F8 A0 n, z9 W
stateMachineFactory, w N- U% v9 l1 v. R
= new StateMachineFactory<JobStateMachine, JobStateInternal, JobEventType, JobEvent>
! R2 {) A( Q4 r" e8 m. \5 N6 o(JobStateInternal.NEW)% X; y% P5 X+ K1 l# f5 D
.addTransition(JobStateInternal.NEW, JobStateInternal.INITED,
( v1 P9 V: D1 m2 v9 {# B; V. _JobEventType.JOB_INIT,& I1 t4 U8 N) p1 S# {' O5 q2 |
new InitTransition()); v/ T* O ?7 }" E: Q
.addTransition(JobStateInternal.INITED, JobStateInternal.SETUP,
3 `7 o# `' N9 S( L {: K3 L4 HJobEventType.JOB_START,
+ m1 z5 _) M( f/ ^( tnew StartTransition())% a; x. j/ p7 w3 v3 i6 `
.addTransition(JobStateInternal.SETUP, JobStateInternal.RUNNING,& D- Y5 n0 ~0 m4 ?
JobEventType.JOB_SETUP_COMPLETED,
' B( f+ J+ X8 S6 `new SetupCompletedTransition())
, f0 M" Z% _) L3 B2 f.addTransition. N; U- }- k: f
(JobStateInternal.RUNNING,8 A" ]5 h% v; s( z0 ~
EnumSet.of(JobStateInternal.KILLED, JobStateInternal.SUCCEEDED),
3 B6 P9 h* p s6 K" j' R l4 HJobEventType.JOB_COMPLETED,# K& z$ A7 ~) r# A5 K
new JobTasksCompletedTransition())
$ Q3 \, n" y5 Y, F# r* y# c" A.installTopology();8 f% c* R& ]9 w ]
private final StateMachine<JobStateInternal, JobEventType, JobEvent> stateMachine;1 M* F" u1 q6 ~
public JobStateMachine(String jobID, EventHandler eventHandler) {& n4 X: n Q; e: A
this.jobID = jobID;" a, S- E. R3 R' J- j) `4 p* ^
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
) i B& Z9 X9 g4 ~/ Sthis.readLock = readWriteLock.readLock();
' N; F2 Q' \6 B% lthis.writeLock = readWriteLock.writeLock();
# w4 I) W$ z8 k8 u/ K0 Bthis.eventHandler = eventHandler;
( u7 |% [8 L) v* z" }7 i3 ostateMachine = stateMachineFactory.make(this);8 x! i: U& `1 _5 ^
}p
' Q; b; i3 z& t) P: g+ ^rotected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {
! b9 J; a- j/ }return stateMachine;
( S j$ r3 V- `6 c2 S} p- o0 k0 p" O$ a: N0 w2 D9 q0 I2 C- K
ublic static class InitTransition4 H7 Q. D3 ~. g6 L
implements SingleArcTransition<JobStateMachine, JobEvent> {. G1 \! u; ~# l6 g+ A
@Override% d+ X5 }3 Y8 D8 Y u1 t
public void transition(JobStateMachine job, JobEvent event) {
9 M8 i& Q6 L, g8 Y' W' ^/ @7 l- ~% KSystem.out.println("Receiving event " + event);
* s Z% ? Z* h1 v; `job.eventHandler.handle(new JobEvent(job.getJobId(), JobEventType.JOB_START));
" B( [$ }# l* ]5 @3 ~+ U1 P- h}
J2 B0 M0 r* Q} p6 v4 z5 u$ n8 t9 s- y
ublic static class StartTransition
! ]- s \( d# a/ I$ S" N" Y. vimplements SingleArcTransition<JobStateMachine, JobEvent>>{$ \# a! y: z) n8 u$ r! Q1 ]
@Override
) Z/ M4 U5 f. i7 o( k: | Gpublic void transition(JobStateMachine job, JobEvent event) {3 A' p( W' \3 Y0 T5 @: K
System.out.println("Receiving event " + event);, N. z6 {3 S; c: W4 s
job.eventHandler.handle(new JobEvent(job.getJobId(), JobEventType.JOB_SETUP_COMPLETED));) @3 D- `# D" A( t' v' i
}6 z7 y% Q3 V+ ~' A- [9 |& W8 ^( k$ Q
}…/
) Q$ O% G) P7 l: d; e0 e4 u/定义类SetupCompletedTransition和JobTasksCompletedTransition
" o4 N+ G+ c, M% H@Override
4 {) n; X' D) q4 Jpublic void handle(JobEvent event) {
* F9 ^, b, W( B- D% g+ ]try {: K' d) Z. z& D. x3 G' t
writeLock.lock();( E; W7 \; `# ~/ g4 a/ D1 \2 C
JobStateInternal oldState = getInternalState();
; R3 B: k7 O8 M+ b6 Ptry {
8 J6 ?# F, u, D8 ^& c4 c9 p j4 rgetStateMachine().doTransition(event.getType(), event);& ]; J1 ]: A6 R" X" A a- ]& i
} catch (InvalidStateTransitonException e) {
- N5 K T7 s X$ V% G8 o3 W4 bSystem.out.println("Can't handle this event at current state");
: L5 ^0 G* E2 ~ ~0 h9 {}i
, ]' P3 y" Z3 f- I+ }* m5 If (oldState != getInternalState()) { z% E7 U7 n- e% x# [7 j: n8 l
System.out.println("Job Transitioned from " + oldState + " to "! w* _: m: K' I$ D) e1 J& u B( F
+ getInternalState());
2 ^9 @& Z- E# {, a6 _}: j G! O) j7 l, n8 l+ `
}f
# y7 y+ i) p8 r' w( ainally {
. B& W( d4 y% DwriteLock.unlock();, ~. M: d( t6 G& {% k4 Y
}
& j l7 {) i+ Z) x} p
( j5 ]2 D" B, Y8 P, ^' J. i' Eublic JobStateInternal getInternalState() {; X& Y/ R. U7 E$ s8 E
readLock.lock();3 x' k- s1 x8 K' ^' Q
try {4 a' X; @. g5 ~: Q8 l
return getStateMachine().getCurrentState();
& R n, z; `: \# y& l9 M} finally {4 H& ?; t% e+ p( F$ c
readLock.unlock();, v; P+ H; Y+ C4 V1 q' c8 B7 t
}
. C7 M d5 }* Y- a- L}p
% O3 Q/ D: X2 v4 i% j0 Bublic enum JobStateInternal { //作业内部状态
) t% p8 R5 Y) A; ~- J; T' P' R0 y ~NEW, j; h; m3 W8 ]) S
SETUP,: Q- A/ }9 i: O2 n& |; m
INITED,. i5 L; S: a# x
RUNNING,
: a8 S8 p( r, _1 ` k `SUCCEEDED,
0 \* P& n. i" A2 ZKILLED,
& H$ h! G' T) K9 _}9 P3 V5 x) x2 y- R( v. a( i
}
2 z' G: c/ r/ W* j" U" ^. x3.5.4 状态机可视化0 o. E3 N# e/ L4 X* ^
YARN中实现了多个状态机对象, 包括ResourceManager中的RMAppImpl、 RMApp-AttemptImpl、 RMContainerImpl和
, `" o- b+ Z, O. t. p% dRMNodeImpl, NodeManager中的ApplicationImpl、 ContainerImpl和LocalizedResource, MRAppMaster中的JobImpl、 TaskImpl和
5 G* n! O( Y+ v6 I$ t6 r0 }TaskAttemptImpl等。 为了便于用户查看这些状态机的状态变化以及相关事件, YARN提供了一个状态机 可视化工具 [16] , 具体操
5 {! u4 {4 C% _9 s5 @9 U作步骤如下。
+ d, o. \$ w) w: ]步骤1 将状态机转化为graphviz(.gv)格式的文件, 编译命令如下:
; ~$ P# f7 D6 P6 p3 fmvn compile -Pvisualize
$ }$ P0 X5 T3 H: \' o经过该步骤后, 本地目录中生成了ResourceManager.gv、 NodeManager.gv和MapReduce.gv三个graphviz格式的文件( 有兴趣可以
8 P: m# t4 Y2 _7 _2 [直接打开查看具体内容) 。
) l8 D' D( [2 D: M0 Q9 q5 k, X+ J: Q步骤2 使用可视化包graphviz中的相关命令生成状态机图, Shell命令具体如下:
+ K! q5 S& f* G0 ?) @, [6 k- cdot -Tpng NodeManager.gv > NodeManager.png; ]6 T8 }5 N9 N9 O5 k- J ]+ R8 _
如果尚未安装graphviz包, 操作该步骤之前先要安装该包。 S) T) j4 P% o/ V5 J
注释
% S0 H/ M9 L# z[16] 参见网址https://issues.apache.org/jira/browse/MAPREDUCE-2930。
( }9 w1 r3 ~" [" Z$ H. F- K0 ~
# p/ q" T$ T& K. d. j7 e$ \
. E/ T9 C6 `0 P% l b( ` |
|