|
3.5 状态机库
9 q/ i: g: z3 P$ K# n0 c* S状态机由一组状态组成, 这些状态分为三类: 初始状态、 中间状态和最终状态。 状态机从初始状态开始运行, 经过一系列中
. L- q3 {1 u# ~$ h, n间状态后, 到达最终状态并退出。 在一个状态机中, 每个状态都可以接收一组特定事件, 并根据具体的事件类型转换到另一个状
9 i$ P) ?/ [5 e A, ~6 J( k态。 当状态机转换到最终状态时, 则退出。
, _: K& i( T- T3.5.1 YARN状态转换方式& u; L' \! p& k3 I
在YARN中, 每种状态转换由一个四元组表示, 分别是转换前状态( preState) 、 转换后状态( postState) 、 事件( event) 和, B( B/ N- b* d1 u; [- R
回调函数( hook) 。 YARN定义了三种状态转换方式, 具体如下:
% m3 Q8 s/ V4 e" h7 x. U1) 一个初始状态、 一个最终状态、 一种事件( 见图3-18) 。 该方式表示状态机在preState状态下, 接收到Event事件后, 执行
9 J% S; P6 L! P( R* m' b函数状态转移函数Hook, 并在执行完成后将当前状态转换为postState。
2 k0 w7 Z% Z. _6 J! ]图3-18 初始状态:最终状态:事件=1:1:1
7 _& E9 Q; C" G/ j: c2) 一个初始状态、 多个最终状态、 一种事件( 见图3-19) 。 该方式表示状态机在preState状态下, 接收到Event事件后, 执行
; R! j2 f8 j ]# ?* @; v/ D9 d函数状态转移函数Hook, 并将当前状态转移为函数Hook的返回值所表示的状态。. m. E% c5 Z$ f! x. A
图3-19 初始状态:最终状态:事件=1:N :1
# J2 k3 q! B2 t, C* c# ^3) 一个初始状态、 一个最终状态、 多种事件( 见图3-20) 。 该方式表示状态机在preState状态下, 接收到Event1、 Event2和
$ w$ T. h3 ^5 J7 FEvent3中的任何一个事件, 将执行函数状态转移函数Hook, 并在执行完成后将当前状态转换为postState。
8 |; i" u& z; }( }1 M X; L: o图3-20 初始状态:最终状态:事件=1:1:N8 _; {: h. t' c+ L6 L# x2 Z
3.5.2 状态机类 g( f) g! U3 B/ b# r
YARN自己实现了一个非常简单的状态机库( 位于包org.apache.hadcop.yarn.state中) , 具体如图3-21所示。 YARN对外提供了. y9 k9 v/ @2 ?; C4 P4 o0 L
一个状态机工厂StatemachineFactory, 它提供多种addTransition方法供用户添加各种状态转移, 一旦状态机添加完毕后, 可通过调' Y, ~. ~ K1 Z" ^; H
用installTopology完成一个状态机的构建。* Z$ F; Y4 O& y! _. [
图3-21 状态机类图
3 e8 k; p: p' h3 `( k% M3.5.3 状态机的使用方法
5 h; t9 @0 W! \4 @本小节将给出一个状态机应用实例, 在该实例中, 创建一个作业状态机JobStateMachine, 该状态机维护作业内部的各种状态
( D/ u4 R: r5 Y- b& w变化。 该状态机同时也是一个事件处理器, 当接收到某种事件后, 会触发相应的状态转移。 该实例中没有给出一个中央异步调度0 a. |' m8 d" m1 G
器, 可以嵌到3.4.3节的实例程序中运行。
/ D9 H1 l6 |3 W Z. n# k1) 定义作业类型。
6 I2 w$ h/ \! F3 o# w9 gpublic enum JobEventType {
1 n/ C, d! ~+ \9 [, |5 `0 @JOB_KILL,
0 X& r1 J+ i! v% ]0 w7 h0 R1 JJOB_INIT,
- Z1 @; r3 g! x8 P/ K2 vJOB_START,
! ~+ M! e6 U# S& Y( h9 EJOB_SETUP_COMPLETED,
& I O& Y9 v6 Y- ]9 Y# nJOB_COMPLETED; y3 I; w5 s8 G1 L
}6 Y2 @+ X* y0 q+ M- R! _+ h
2) 定义作业状态机。
4 t1 x; {4 I, r, e$ {( }6 @@SuppressWarnings({ "rawtypes", "unchecked" })- K5 R. a, i7 k) ]
public class JobStateMachine implements EventHandler<JobEvent>{
; g$ D7 g" U" C3 X6 \; nprivate final String jobID;
* C7 j! H8 F8 oprivate EventHandler eventHandler;* S- P8 m+ H0 f/ S' \6 b0 c9 i9 z
private final Lock writeLock;
- C1 v) o y& C1 X( B/ w' Rprivate final Lock readLock;$ P5 t5 g/ N5 N5 k' @
// 定义状态机
* \( h1 g7 P0 I* j- uprotected static final* G& e+ j/ _: O& G; c0 V' P
StateMachineFactory<JobStateMachine, JobStateInternal, JobEventType, JobEvent>
0 ?9 X+ p# \1 Q' Z" N4 O) cstateMachineFactory/ f- j% }& T4 }
= new StateMachineFactory<JobStateMachine, JobStateInternal, JobEventType, JobEvent>
( f+ C# Z8 O- X& ~( ?& y3 B! M(JobStateInternal.NEW)5 b6 U2 v4 e) x0 Q
.addTransition(JobStateInternal.NEW, JobStateInternal.INITED,4 k' f1 p) v5 b% [0 A* ^
JobEventType.JOB_INIT,
: m1 J# j3 o( C7 u- k; A0 Dnew InitTransition())
" i% V( y' R9 p0 {6 @.addTransition(JobStateInternal.INITED, JobStateInternal.SETUP,' V1 _7 P1 M5 b9 U" ~9 s+ ^$ Y7 r
JobEventType.JOB_START,% n4 ]% k' h) ^) T
new StartTransition())& ~: K, O0 k! y* {: I8 h7 \
.addTransition(JobStateInternal.SETUP, JobStateInternal.RUNNING,
% L% m I* `$ _6 U; g4 eJobEventType.JOB_SETUP_COMPLETED,
. s) _, I; \+ I6 M# J# `new SetupCompletedTransition())7 p. A8 m5 I% y. a$ @, |
.addTransition
. }9 f+ Y3 F. P2 l$ ~(JobStateInternal.RUNNING,
" b: B" N, o @$ y* gEnumSet.of(JobStateInternal.KILLED, JobStateInternal.SUCCEEDED),5 r- M+ e" y# s2 X
JobEventType.JOB_COMPLETED,( H `, i7 Z$ G' a5 n: m
new JobTasksCompletedTransition())
* e8 m; Z* z- [, k; e.installTopology();! t6 ]/ D6 u4 z- H) M% O# y% c
private final StateMachine<JobStateInternal, JobEventType, JobEvent> stateMachine;, h+ K2 Q, X3 R4 h3 W
public JobStateMachine(String jobID, EventHandler eventHandler) {! y8 u% [- {2 n! k6 V& b: J
this.jobID = jobID;
$ h: u9 E1 ?+ OReadWriteLock readWriteLock = new ReentrantReadWriteLock();: ^9 x5 M- E6 u- P: O
this.readLock = readWriteLock.readLock();. F- D4 I' j/ ^
this.writeLock = readWriteLock.writeLock();
0 r6 Z3 k) |; ^! o, X- Othis.eventHandler = eventHandler;. C$ s" A! q+ i& E
stateMachine = stateMachineFactory.make(this);, u1 A6 V9 A4 m( Y: g: @+ f6 e* z
}p
6 S; _4 k5 s; b- e, mrotected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {
& X- i# p' [ z1 |" Sreturn stateMachine;
1 v- H# V7 ~5 p2 H0 f} p+ G5 ~! X5 H0 S* Q# p+ @3 |) C( U
ublic static class InitTransition
X/ {6 S& b/ C& K+ l3 N* Wimplements SingleArcTransition<JobStateMachine, JobEvent> {
- M3 L" o: l- R5 k8 J* J9 p@Override0 L) L: n# N+ q+ b7 P$ ?2 [
public void transition(JobStateMachine job, JobEvent event) {6 b7 s, ~7 z& y# z: s
System.out.println("Receiving event " + event);5 c9 G, O# Y# P! y. B# j
job.eventHandler.handle(new JobEvent(job.getJobId(), JobEventType.JOB_START)); n8 _' z6 ?! F" Q* t
}
9 T; I3 v- G9 I# a} p
, i$ o" h2 h! {ublic static class StartTransition
9 T' D8 W( K' U' Y0 ^% himplements SingleArcTransition<JobStateMachine, JobEvent>>{% g. B& f& e; L" H
@Override& K- Q" B' r' a1 [" V D8 K
public void transition(JobStateMachine job, JobEvent event) {
/ I9 Y$ \, Q+ u4 _9 OSystem.out.println("Receiving event " + event);1 g {+ S- Y7 a
job.eventHandler.handle(new JobEvent(job.getJobId(), JobEventType.JOB_SETUP_COMPLETED));6 _; _5 R0 N' _6 A6 v- Q
}: G$ ~0 k* I5 z
}…/
3 J' H$ n# L% ^, C$ [; a/定义类SetupCompletedTransition和JobTasksCompletedTransition
5 w1 o m. o% q8 K, d@Override
: b$ n1 \% ?' o7 Vpublic void handle(JobEvent event) {
8 a+ p0 ? l1 b0 c! F: \) O; Ztry {: P3 R: S* p; t9 V' j" f
writeLock.lock();9 z4 q; n- f4 T- t5 a; ]
JobStateInternal oldState = getInternalState();
+ w0 A0 Q2 \: V9 a$ }try {+ t2 b: w% y! V7 g- A s' ]
getStateMachine().doTransition(event.getType(), event);
]* i0 N. e7 m3 Y. b C; V} catch (InvalidStateTransitonException e) {/ ]" a5 {% u9 E$ ^* N
System.out.println("Can't handle this event at current state");
. L+ ~9 f; h& Y; l- _}i
1 y) l& |1 Q2 h- s9 Q2 Vf (oldState != getInternalState()) {1 j6 ?1 |% Q0 h/ X5 R
System.out.println("Job Transitioned from " + oldState + " to "; N, U+ H+ `0 F" s) }& |
+ getInternalState());
. K6 P# Z, o. g3 T' u}
8 G8 r0 D. d" i+ p7 B) }}f; M& X9 n l! _, u8 ]
inally {7 h4 W4 n1 L3 M( u# J! T
writeLock.unlock();4 @1 P: q2 ~% F0 r+ m- t9 A
}8 m0 x2 {+ m9 L5 N% C; Q6 ]( E* @1 m
} p
# Y% \) t/ |0 v; V0 |0 q8 Hublic JobStateInternal getInternalState() {) a# J/ Z" _' ], r7 e* [# C
readLock.lock();
& ~+ Z/ F& p7 O& t* j, dtry {0 g# e+ J+ Z- s
return getStateMachine().getCurrentState();# w: x3 S. ^4 J5 v' U! g! @
} finally {
! Z1 t" [5 p( `readLock.unlock();& ?0 ~- i, c- V# a! s, K
}
" O4 x" p0 w. R+ B' V) }}p
# @. ?! |' l4 N; Rublic enum JobStateInternal { //作业内部状态7 B4 [. R8 F" M; P5 N: e! x
NEW,
* _' m+ u8 r4 Z" S0 `+ xSETUP,
4 }% a2 b) H+ f7 o4 g9 y7 _+ v1 VINITED,
! `9 r9 A& H- b* ]1 pRUNNING,
) e: ?- @4 _( a; s+ l8 S2 }SUCCEEDED,
% P+ f8 Q0 q# z3 ^+ RKILLED,
; {! x* k' O1 `3 N}
; g0 T j2 B T9 S0 b6 [3 w0 \}6 L$ w+ w" e7 O1 _" S( \
3.5.4 状态机可视化
6 Z$ v3 x4 u, ?' E# K! IYARN中实现了多个状态机对象, 包括ResourceManager中的RMAppImpl、 RMApp-AttemptImpl、 RMContainerImpl和3 Y$ T2 P- ^1 P
RMNodeImpl, NodeManager中的ApplicationImpl、 ContainerImpl和LocalizedResource, MRAppMaster中的JobImpl、 TaskImpl和
5 {( N$ G! w0 P& LTaskAttemptImpl等。 为了便于用户查看这些状态机的状态变化以及相关事件, YARN提供了一个状态机 可视化工具 [16] , 具体操
" ~$ f: ]/ `9 N' {. r8 d- l: s作步骤如下。
: d) r+ m; f0 S% x( A步骤1 将状态机转化为graphviz(.gv)格式的文件, 编译命令如下:* m+ u! z6 u4 F9 N f8 V
mvn compile -Pvisualize9 S I8 ?7 `$ c+ j
经过该步骤后, 本地目录中生成了ResourceManager.gv、 NodeManager.gv和MapReduce.gv三个graphviz格式的文件( 有兴趣可以' s1 Z9 _& B1 H* i! q+ h& _
直接打开查看具体内容) 。6 J3 R G8 g: d X+ W- C5 h$ Q
步骤2 使用可视化包graphviz中的相关命令生成状态机图, Shell命令具体如下:5 L9 W8 T) S( Z) Z5 Z. r S; M# W
dot -Tpng NodeManager.gv > NodeManager.png
; f$ d# A4 i- O R$ E: I1 U2 k如果尚未安装graphviz包, 操作该步骤之前先要安装该包。: u6 s" [/ A$ V& q' T! g4 o
注释
$ {, c% j# c- E5 t, I/ X[16] 参见网址https://issues.apache.org/jira/browse/MAPREDUCE-2930。
, s/ Y0 F3 a! Q/ }
, ~" z( N1 }# V! \* u0 S4 R! {% F, o. i
|
|