|
3.5 状态机库. a9 l; }" b' Y3 f
状态机由一组状态组成, 这些状态分为三类: 初始状态、 中间状态和最终状态。 状态机从初始状态开始运行, 经过一系列中4 S" ~! F7 \$ B/ ?1 J, C1 V
间状态后, 到达最终状态并退出。 在一个状态机中, 每个状态都可以接收一组特定事件, 并根据具体的事件类型转换到另一个状
* ]1 I+ R) c/ F) I2 f+ K态。 当状态机转换到最终状态时, 则退出。
. B) P2 Y2 a. f) G2 T3 V' b3.5.1 YARN状态转换方式" u( S9 B- D. ]2 s6 s( e; ?
在YARN中, 每种状态转换由一个四元组表示, 分别是转换前状态( preState) 、 转换后状态( postState) 、 事件( event) 和
* _$ R1 U0 c- ~" _, w回调函数( hook) 。 YARN定义了三种状态转换方式, 具体如下:) x1 o& Q' F( D! c: v! M
1) 一个初始状态、 一个最终状态、 一种事件( 见图3-18) 。 该方式表示状态机在preState状态下, 接收到Event事件后, 执行
/ S7 s% M c8 M$ f1 j5 ]2 c函数状态转移函数Hook, 并在执行完成后将当前状态转换为postState。 Y, K4 `3 Z. l2 |3 N. w6 d6 W' h
图3-18 初始状态:最终状态:事件=1:1:1
6 r9 o8 c( o" V {9 K& O1 T& [2) 一个初始状态、 多个最终状态、 一种事件( 见图3-19) 。 该方式表示状态机在preState状态下, 接收到Event事件后, 执行
% L4 r$ }4 L9 [( M6 I9 K函数状态转移函数Hook, 并将当前状态转移为函数Hook的返回值所表示的状态。9 |0 \. m/ h7 I L2 V
图3-19 初始状态:最终状态:事件=1:N :1
# ?( L" s8 c) E. R( P- J! W8 F3) 一个初始状态、 一个最终状态、 多种事件( 见图3-20) 。 该方式表示状态机在preState状态下, 接收到Event1、 Event2和5 H) n# {3 _( e$ r% O- \. F5 b" s
Event3中的任何一个事件, 将执行函数状态转移函数Hook, 并在执行完成后将当前状态转换为postState。/ R. Z1 H. X5 G: D
图3-20 初始状态:最终状态:事件=1:1:N
0 _. F6 q5 I5 p0 e8 k6 D! W5 a3.5.2 状态机类$ x* Q- N, o; S( o5 _
YARN自己实现了一个非常简单的状态机库( 位于包org.apache.hadcop.yarn.state中) , 具体如图3-21所示。 YARN对外提供了; z9 [5 N; }1 \* b0 U) z
一个状态机工厂StatemachineFactory, 它提供多种addTransition方法供用户添加各种状态转移, 一旦状态机添加完毕后, 可通过调
4 Y& I( Z! N; Q2 n# e用installTopology完成一个状态机的构建。, z- |, k; h- ?
图3-21 状态机类图) Y3 c/ ] V2 N- Q- E( G: C% }
3.5.3 状态机的使用方法2 l0 h6 F: t- Q& q) g
本小节将给出一个状态机应用实例, 在该实例中, 创建一个作业状态机JobStateMachine, 该状态机维护作业内部的各种状态
5 \7 I, b! R( k- l0 {变化。 该状态机同时也是一个事件处理器, 当接收到某种事件后, 会触发相应的状态转移。 该实例中没有给出一个中央异步调度/ e$ X$ d7 K% K/ k
器, 可以嵌到3.4.3节的实例程序中运行。
+ k( W! p0 m$ a1) 定义作业类型。7 \, r! @# N. N; o
public enum JobEventType {6 I' M/ Z4 h/ H3 _$ v" r
JOB_KILL,
- v' m7 }1 k1 b+ |) w. {+ ~JOB_INIT,
" u" X' B1 d& Y! iJOB_START,! s7 U( @# V% O0 k) |2 v2 m
JOB_SETUP_COMPLETED,
" `% Y, \6 I7 j' m* R8 wJOB_COMPLETED' a7 J& M8 S- G9 J
}$ G8 F" } `! q$ Z& A4 E2 C7 \ Y$ x3 i
2) 定义作业状态机。 z3 A& c: A" l4 l+ s3 X
@SuppressWarnings({ "rawtypes", "unchecked" })& [; Q0 T6 _2 @3 w0 X- {
public class JobStateMachine implements EventHandler<JobEvent>{
' j+ _. C5 r) d6 J; }3 \1 x- Dprivate final String jobID;
6 V7 R7 s5 p7 Y' oprivate EventHandler eventHandler;" o* X! d4 |8 }
private final Lock writeLock;/ h" l6 L) ]: Z" T
private final Lock readLock;) z1 I* K* ^+ }: |6 I9 Y
// 定义状态机+ S+ Y; E. a* j, F; s- z
protected static final+ ]0 T. Y/ D' v) ~; V l) z* _
StateMachineFactory<JobStateMachine, JobStateInternal, JobEventType, JobEvent>, _- O( C- D; a9 R! O
stateMachineFactory! i1 I8 O( t8 O7 _
= new StateMachineFactory<JobStateMachine, JobStateInternal, JobEventType, JobEvent> `' n; x H3 A9 i6 F4 F6 B
(JobStateInternal.NEW); D9 |: F6 c2 m8 ^
.addTransition(JobStateInternal.NEW, JobStateInternal.INITED,; S! u7 ~% N3 _# q! Z+ }
JobEventType.JOB_INIT," x' n2 E3 h8 S5 b
new InitTransition()), G9 J5 Q0 P9 f9 [1 B/ W
.addTransition(JobStateInternal.INITED, JobStateInternal.SETUP,
$ U& q. l i+ D2 u; {JobEventType.JOB_START,
$ s: y6 m4 B) gnew StartTransition())
9 r( L( `/ [; F.addTransition(JobStateInternal.SETUP, JobStateInternal.RUNNING,6 ], m2 N6 ~9 J! p. k! X
JobEventType.JOB_SETUP_COMPLETED,
" O- P$ h- s% k- G. I* Tnew SetupCompletedTransition())
$ P$ _4 h2 [# d3 Z6 Z: @.addTransition
5 O6 v) T+ x& o u(JobStateInternal.RUNNING,
! I( j" L) v3 p s9 ]! o. T# PEnumSet.of(JobStateInternal.KILLED, JobStateInternal.SUCCEEDED),
R$ _* S# [# D- T0 NJobEventType.JOB_COMPLETED,
6 E m& J4 g9 N/ `! [' x+ j P, ?9 l* t3 pnew JobTasksCompletedTransition())
u& a' L4 N+ z2 H) A.installTopology();. L! D# A9 G: a9 v# Y y' j
private final StateMachine<JobStateInternal, JobEventType, JobEvent> stateMachine;6 |# D* U1 V8 m1 D8 V
public JobStateMachine(String jobID, EventHandler eventHandler) {
5 h' G) B+ Y# M" Q+ t/ w: othis.jobID = jobID;
2 f- [- m; Y5 I5 w1 ]ReadWriteLock readWriteLock = new ReentrantReadWriteLock();0 j9 j# n! p1 K2 ?% t9 y. z
this.readLock = readWriteLock.readLock();, E/ Y6 [ D: N
this.writeLock = readWriteLock.writeLock();
2 H0 j% d* D* s4 z, i' ythis.eventHandler = eventHandler;* S; U( L( L! S9 U' j& f# U
stateMachine = stateMachineFactory.make(this);
- o: u( n) C, P( A) t}p
1 x4 y* x. e" [ _rotected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {
# u: B; a# P1 S0 b. Xreturn stateMachine;
) {0 F; L( _, O ?1 [+ K) h m" h2 b} p
* J8 _" q. S$ O F3 H0 x. Q+ Qublic static class InitTransition$ l6 }, x& q1 m/ `
implements SingleArcTransition<JobStateMachine, JobEvent> {
5 @" g) J% v/ l9 T \5 O@Override
7 d7 @- @1 s; E* q( A9 rpublic void transition(JobStateMachine job, JobEvent event) {
' U) _. C+ @1 S: tSystem.out.println("Receiving event " + event);$ p/ d0 v- B7 g, s7 o: o
job.eventHandler.handle(new JobEvent(job.getJobId(), JobEventType.JOB_START));: z( K5 C" r& v
}
9 W1 I. Y3 \5 w) ?9 ^) M} p
' g% x* L9 @( ]3 @ o% X5 b+ j1 fublic static class StartTransition
5 k+ R- i* h/ S: R9 a' wimplements SingleArcTransition<JobStateMachine, JobEvent>>{! c# _9 A; t+ k' c$ f U# x& [7 p
@Override
6 d, W' r1 `/ ] T+ K. V) Ypublic void transition(JobStateMachine job, JobEvent event) {* h" w C2 d9 D" D
System.out.println("Receiving event " + event);
/ ]; ]4 u. t+ ]/ q) R9 s bjob.eventHandler.handle(new JobEvent(job.getJobId(), JobEventType.JOB_SETUP_COMPLETED));
# c3 {' A+ g% a }! c} J5 |* l9 M1 v" N/ V5 Z
}…/: \& h, Q2 l. i W9 p0 A- Q" V8 g
/定义类SetupCompletedTransition和JobTasksCompletedTransition+ ?& @7 Q$ L# g/ |
@Override
" p& `4 S, b1 Q0 e# S/ E5 npublic void handle(JobEvent event) {6 {- ~0 W' \ A: P) P8 P
try {
" H3 V* n/ t) DwriteLock.lock();
5 M, o4 ]6 Y0 o' V, GJobStateInternal oldState = getInternalState();
* U. ]8 C, J: ^: O5 Ltry {7 l) U& Z: ^2 f* ^# c8 s
getStateMachine().doTransition(event.getType(), event);. Q% K- J0 l7 Y' L
} catch (InvalidStateTransitonException e) {6 b& \& s, X- O
System.out.println("Can't handle this event at current state");
5 H' K9 T) y# {}i
) m/ a$ K6 U5 }f (oldState != getInternalState()) {9 c. l$ Q+ P- W0 \ E
System.out.println("Job Transitioned from " + oldState + " to "
; e9 ~. u# L2 n: G9 a& t+ getInternalState());; W. [+ A0 A* o
}
: L0 w" y$ Z! y4 R5 m}f0 H$ T: P: i b
inally {# ]4 y6 R$ F! N
writeLock.unlock();
' u8 [, Y7 e3 j0 S* ]}7 F, j- H" e9 i
} p
* o1 s- U/ t3 R2 J, [ublic JobStateInternal getInternalState() {4 C, [+ a+ q* b
readLock.lock();
! N- B) f8 ~% E. ^% e- g" Stry {
& R: ]4 z% u* y freturn getStateMachine().getCurrentState();
' F3 l7 \, N8 K5 R} finally {' T+ ]# |/ k0 ^
readLock.unlock();; F) I/ Z& p$ R9 ^
}
, `( D/ o! f/ j}p
o: }& E8 S: T# Lublic enum JobStateInternal { //作业内部状态
6 O l: E" g: y0 r# ]NEW,
0 j* V6 Q: g7 j9 y$ z# k' jSETUP,( U& R. w0 @/ J3 _3 E) y
INITED,3 X# [/ V; [" Q; n) P" C+ X5 |: B
RUNNING,/ X8 a' U+ X M S+ b
SUCCEEDED,% T' }& @1 v) n. x9 z5 g
KILLED,5 @; H4 F6 @" F* F/ o) v' a }
}
! W$ X1 T& i! y1 F) z5 k) n. _}3 C3 A* M8 z: O9 C/ z! l: N
3.5.4 状态机可视化
1 e% J% [& q. P7 L+ ^YARN中实现了多个状态机对象, 包括ResourceManager中的RMAppImpl、 RMApp-AttemptImpl、 RMContainerImpl和/ f5 l( ?1 f- y: V* _8 T
RMNodeImpl, NodeManager中的ApplicationImpl、 ContainerImpl和LocalizedResource, MRAppMaster中的JobImpl、 TaskImpl和5 b* c# ]/ i& `
TaskAttemptImpl等。 为了便于用户查看这些状态机的状态变化以及相关事件, YARN提供了一个状态机 可视化工具 [16] , 具体操
, |1 [1 N j5 A作步骤如下。6 g0 h5 H/ |& J& F
步骤1 将状态机转化为graphviz(.gv)格式的文件, 编译命令如下:7 c0 B+ x; o' I; Z" ]3 T4 _, @
mvn compile -Pvisualize
' c; t7 }& S1 q0 R- R经过该步骤后, 本地目录中生成了ResourceManager.gv、 NodeManager.gv和MapReduce.gv三个graphviz格式的文件( 有兴趣可以" h) ?' e/ j, z7 L
直接打开查看具体内容) 。; U; k" E+ g! ?* r% I
步骤2 使用可视化包graphviz中的相关命令生成状态机图, Shell命令具体如下:
: X5 Z/ o8 T! l2 N# xdot -Tpng NodeManager.gv > NodeManager.png
: f8 e$ i0 i1 v. R, f如果尚未安装graphviz包, 操作该步骤之前先要安装该包。
2 V% [9 ~5 p2 _& w1 v注释
' T" x$ \' C: t5 x1 e[16] 参见网址https://issues.apache.org/jira/browse/MAPREDUCE-2930。
! e. v, P5 ?1 h2 ~4 f8 n, S4 `3 @7 D( v1 `, M1 m% G# t v
' l0 ]/ T/ V' c) e( J4 Z0 j% c |
|