|
3.5 状态机库
* K! k8 b- v! ?5 g+ ?状态机由一组状态组成, 这些状态分为三类: 初始状态、 中间状态和最终状态。 状态机从初始状态开始运行, 经过一系列中
# o; \8 ~. N0 b* D, j6 G' i间状态后, 到达最终状态并退出。 在一个状态机中, 每个状态都可以接收一组特定事件, 并根据具体的事件类型转换到另一个状
+ ~* A4 \' y/ g A, q态。 当状态机转换到最终状态时, 则退出。2 n; j" l2 C& ] H# a
3.5.1 YARN状态转换方式/ D& k2 A0 v1 P* ?0 W
在YARN中, 每种状态转换由一个四元组表示, 分别是转换前状态( preState) 、 转换后状态( postState) 、 事件( event) 和3 y- ^6 }" J4 c; u5 C$ P
回调函数( hook) 。 YARN定义了三种状态转换方式, 具体如下:
% `* w( Z- V2 }( O/ P1) 一个初始状态、 一个最终状态、 一种事件( 见图3-18) 。 该方式表示状态机在preState状态下, 接收到Event事件后, 执行
* u& H* L# K7 x' N6 \: j# H$ U函数状态转移函数Hook, 并在执行完成后将当前状态转换为postState。: S, T5 L0 ?6 k" P
图3-18 初始状态:最终状态:事件=1:1:1
6 X! O! s( `# L2 w4 Q: I2) 一个初始状态、 多个最终状态、 一种事件( 见图3-19) 。 该方式表示状态机在preState状态下, 接收到Event事件后, 执行# F5 E8 V1 L* u# i/ y5 n1 b. s
函数状态转移函数Hook, 并将当前状态转移为函数Hook的返回值所表示的状态。2 t* G, z* A' x' s6 f2 t# F, b, g
图3-19 初始状态:最终状态:事件=1:N :1
8 ^' D" r8 m* j* Q3) 一个初始状态、 一个最终状态、 多种事件( 见图3-20) 。 该方式表示状态机在preState状态下, 接收到Event1、 Event2和
" C) Q7 t) d4 W. ^; n7 `5 kEvent3中的任何一个事件, 将执行函数状态转移函数Hook, 并在执行完成后将当前状态转换为postState。# }% l1 e1 y; E$ y
图3-20 初始状态:最终状态:事件=1:1:N+ g) V* j0 E) H/ d0 R# L1 u
3.5.2 状态机类4 M; ~9 [9 t% P! _1 a* V: D
YARN自己实现了一个非常简单的状态机库( 位于包org.apache.hadcop.yarn.state中) , 具体如图3-21所示。 YARN对外提供了
! u& v$ Q- I- ~6 g. f$ @5 S一个状态机工厂StatemachineFactory, 它提供多种addTransition方法供用户添加各种状态转移, 一旦状态机添加完毕后, 可通过调. P- \* l( ~) |
用installTopology完成一个状态机的构建。
& p) q4 N8 v+ d2 j/ H' Z图3-21 状态机类图
; \3 F7 @3 @0 H& y" H9 z2 [! m: x3.5.3 状态机的使用方法
0 X3 [0 A' M. K+ Q4 t& V/ B1 [本小节将给出一个状态机应用实例, 在该实例中, 创建一个作业状态机JobStateMachine, 该状态机维护作业内部的各种状态4 b! f0 c3 C2 E5 Z# p# C5 R0 R
变化。 该状态机同时也是一个事件处理器, 当接收到某种事件后, 会触发相应的状态转移。 该实例中没有给出一个中央异步调度
; F9 ~4 o4 S+ K% w# E器, 可以嵌到3.4.3节的实例程序中运行。
7 \0 B! {0 Z4 |$ d1) 定义作业类型。
1 J( m% @& l0 g; | @; Z7 Spublic enum JobEventType {
& F, A9 r. E2 b( GJOB_KILL,# ~( a- F: W, H$ u+ Y
JOB_INIT,: v% w! @* ?0 q: _6 D
JOB_START,
/ }! ^- |' Q: q4 |) o6 l4 LJOB_SETUP_COMPLETED,) S' \/ _; B2 x/ R! i- v; S) j) A
JOB_COMPLETED. k# V. w, v* R$ k7 z/ o: c2 A
}
4 f! ^! ~: l$ S1 Z# l# @1 `# }2) 定义作业状态机。
9 ?$ t0 `3 p8 `/ s8 J; L@SuppressWarnings({ "rawtypes", "unchecked" })
: L+ \) T+ L( Jpublic class JobStateMachine implements EventHandler<JobEvent>{4 a y- x% f1 l$ j
private final String jobID;" {' L" T" {1 `( W8 a. a
private EventHandler eventHandler;, r# S! Z6 T5 G! S
private final Lock writeLock;1 a, J* e9 v* c1 \' E3 U
private final Lock readLock;
6 Y3 v7 i; H/ B0 _1 f// 定义状态机6 A2 @7 D0 ^' f: x, O3 a* x
protected static final, |& a# Q! b- }8 c: S
StateMachineFactory<JobStateMachine, JobStateInternal, JobEventType, JobEvent>9 T2 U' K% E* b$ p( `* A
stateMachineFactory
5 \4 t9 W" h% t* E7 ? f5 N1 M= new StateMachineFactory<JobStateMachine, JobStateInternal, JobEventType, JobEvent>
, o- w7 K$ @4 ?9 y1 K(JobStateInternal.NEW); ^+ k1 Z. F' q) z8 z8 t% i
.addTransition(JobStateInternal.NEW, JobStateInternal.INITED,
! J+ X7 i" v# \/ c3 bJobEventType.JOB_INIT,7 Z% ?- D$ _& j+ y8 Q. {: S' G0 |/ k( k
new InitTransition()) V- X- ^' r/ {2 Y# g( V
.addTransition(JobStateInternal.INITED, JobStateInternal.SETUP,
6 a9 y8 J6 x3 m) y S# z) k- A. MJobEventType.JOB_START,. H7 A& {, O. c0 Y* z% T
new StartTransition())/ N8 r) z3 d6 r5 d, j/ h' L
.addTransition(JobStateInternal.SETUP, JobStateInternal.RUNNING,# a7 a% } l8 S9 g: P- S
JobEventType.JOB_SETUP_COMPLETED,
( M: h2 c7 }% n) Pnew SetupCompletedTransition())
/ X) d' T. }/ c.addTransition
0 u( i. \5 }; X* g7 `4 m(JobStateInternal.RUNNING,& j; C0 V9 g# V% T! l, q
EnumSet.of(JobStateInternal.KILLED, JobStateInternal.SUCCEEDED),- L6 Q1 a4 x: R4 ~
JobEventType.JOB_COMPLETED,1 ^) D" B A3 N9 P. x$ }
new JobTasksCompletedTransition())
8 d6 N' d$ q# f: W( N.installTopology();& u% r$ w6 S$ M/ U1 C9 N; R
private final StateMachine<JobStateInternal, JobEventType, JobEvent> stateMachine;6 w7 ?( n: |5 A- k
public JobStateMachine(String jobID, EventHandler eventHandler) {: Z: g& E$ M1 B" u8 \6 Y9 i+ q0 h w
this.jobID = jobID;
1 {3 d: W; z( w2 a) @ReadWriteLock readWriteLock = new ReentrantReadWriteLock();& h( |1 y( }9 T7 A2 d
this.readLock = readWriteLock.readLock();
. S' a! L2 S4 q( z1 pthis.writeLock = readWriteLock.writeLock();0 Z0 Z3 }& m8 l6 W6 s
this.eventHandler = eventHandler;
. X# E& y4 E/ z8 X7 c0 \. ^stateMachine = stateMachineFactory.make(this);
8 ]/ ?! w: a# ? `$ n1 k}p: d5 L# b. N2 Q/ I) U4 n( @; _
rotected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {& G/ Q) ~) r( Z8 b% l2 V( R7 j
return stateMachine;0 z7 G' {% K7 k& N$ ~: `
} p
' ^( F2 e& b0 ~; M. N* |) rublic static class InitTransition
. Z4 v! s4 P7 nimplements SingleArcTransition<JobStateMachine, JobEvent> {
- {) H! F3 m5 A+ o@Override1 ^' ~# ?1 L5 a: k0 Q+ C. _, i4 w" w
public void transition(JobStateMachine job, JobEvent event) {
. D7 D K9 n8 n& O l! nSystem.out.println("Receiving event " + event);6 v; t3 X$ q' T2 z1 v" C! l
job.eventHandler.handle(new JobEvent(job.getJobId(), JobEventType.JOB_START));
5 r/ v" D9 S6 ~9 }. ]1 _}7 y* C! }, @1 G6 S0 |6 i
} p
) Q0 a& S# L8 [* d( s5 J% Uublic static class StartTransition, f( a! E1 `: G5 w5 d- N3 N/ B
implements SingleArcTransition<JobStateMachine, JobEvent>>{
! w4 D3 D4 O* H9 u5 p/ H@Override
) @+ C. M1 S0 c1 Epublic void transition(JobStateMachine job, JobEvent event) {
; T# z- j7 ~5 { DSystem.out.println("Receiving event " + event);0 j8 F5 K4 V3 I7 I
job.eventHandler.handle(new JobEvent(job.getJobId(), JobEventType.JOB_SETUP_COMPLETED));" f; E, Z' _/ V* t. H
}$ I7 ^: ~3 z5 g. z
}…/
% M8 e7 f! v( [- {2 D/定义类SetupCompletedTransition和JobTasksCompletedTransition2 c1 [$ m! |3 B. A: Z& C% U8 g+ n: j
@Override
2 H7 Y5 G# ]( opublic void handle(JobEvent event) {2 Q, Z2 J! {; L3 H7 ]+ [
try {+ I0 l" p0 K& ^
writeLock.lock();
) W0 ^2 A; f/ cJobStateInternal oldState = getInternalState();7 H; X2 J% Z$ K' B
try {
/ g8 K) e2 B2 v0 Y& Q( ^. g! @% rgetStateMachine().doTransition(event.getType(), event);
/ N! b& J; j9 S( N/ Q6 |1 s} catch (InvalidStateTransitonException e) {, V0 t& R4 f1 @
System.out.println("Can't handle this event at current state");6 w+ U% @. S7 I6 r
}i
- u4 Y/ U( ]. t: A5 f0 c' D" N0 Gf (oldState != getInternalState()) {5 Y0 t7 A3 v) L5 D$ U
System.out.println("Job Transitioned from " + oldState + " to "3 {/ N- R0 ^2 X- U5 e
+ getInternalState());! R/ c( d; X3 z% E5 P! ~ N
}* j2 ~- w4 v" B& n* y3 ^- D% U( S
}f
8 N9 G7 y* g+ K1 A" uinally {
4 r$ i; F; m" j: KwriteLock.unlock();* c5 B9 i5 ?7 i6 _( [# J
}
9 T u5 V3 q' f m9 G, Y0 \} p/ P& O2 {0 [: t* [' s! u# i1 U: w
ublic JobStateInternal getInternalState() {! ]7 V) E0 Z, L% E' N, n' Y
readLock.lock();5 P6 n$ G$ Q9 d+ K* n0 h0 B1 y
try {
8 a& ]5 ` q' E" \% Breturn getStateMachine().getCurrentState();
: ]) u9 N+ D! H/ x, R$ C2 O2 g} finally {
, b/ w% ]* E7 f+ Y; IreadLock.unlock();2 m$ ?* w2 j# o( n# u: u/ B
}
: H& F# l0 h/ y}p
0 q9 S* ]* D5 y4 `! O5 @ublic enum JobStateInternal { //作业内部状态
1 H' x! ?' H$ e H8 o/ G9 R, zNEW,6 f" Z# s/ e; J' B' z) x
SETUP,, e( E" l: T5 _: F$ b
INITED,$ Q/ Y7 e; e' P
RUNNING,
/ Z5 D+ C0 R l" eSUCCEEDED,# O, E8 C/ B" @6 D* t* V7 t
KILLED,
& D) S& |, I- C1 g/ V3 a}
% x8 Z B- _) z7 {}, X0 I% z! t- P3 O' c! U
3.5.4 状态机可视化$ i2 \6 V) f% }8 L5 I9 S
YARN中实现了多个状态机对象, 包括ResourceManager中的RMAppImpl、 RMApp-AttemptImpl、 RMContainerImpl和4 `3 w4 h" r& z' X+ j: e
RMNodeImpl, NodeManager中的ApplicationImpl、 ContainerImpl和LocalizedResource, MRAppMaster中的JobImpl、 TaskImpl和
, }, i2 R0 u, C: S' O+ _( hTaskAttemptImpl等。 为了便于用户查看这些状态机的状态变化以及相关事件, YARN提供了一个状态机 可视化工具 [16] , 具体操! @7 e4 J9 `1 y0 {
作步骤如下。9 Z% [# k8 O5 w1 E' M: Y6 K: _) g
步骤1 将状态机转化为graphviz(.gv)格式的文件, 编译命令如下:
/ X) R, d: ~- Imvn compile -Pvisualize
0 F1 G4 A* E5 y' S" M经过该步骤后, 本地目录中生成了ResourceManager.gv、 NodeManager.gv和MapReduce.gv三个graphviz格式的文件( 有兴趣可以
4 s' V* W% ~! D0 \直接打开查看具体内容) 。
) W! @ n4 l. w7 I0 }* k6 S步骤2 使用可视化包graphviz中的相关命令生成状态机图, Shell命令具体如下:4 y* c, z. e4 G8 f: n2 q' H
dot -Tpng NodeManager.gv > NodeManager.png: {# u. u" e( D# ?
如果尚未安装graphviz包, 操作该步骤之前先要安装该包。! N7 E1 G: d7 E# n3 x" C4 `
注释; v" q4 m) ^! j+ j2 l7 R
[16] 参见网址https://issues.apache.org/jira/browse/MAPREDUCE-2930。 7 O) a! X9 N! F9 k+ Y1 `
7 L; K# T; R. ^2 h, Q/ B/ d( a
/ B3 A$ ~! ~5 ^* a, \/ L$ n |
|