java自学网VIP

Java自学网

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 3348|回复: 0

《深入解析YARN架构设计与实现原理》第3章 YARN基础库【3.5】

[复制链接]
  • TA的每日心情
    开心
    2021-5-25 00:00
  • 签到天数: 1917 天

    [LV.Master]出神入化

    2076

    主题

    3734

    帖子

    6万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    66670

    宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2017-4-17 14:04:29 | 显示全部楼层 |阅读模式
    3.5 状态机库
    * K! k8 b- v! ?5 g+ ?状态机由一组状态组成, 这些状态分为三类: 初始状态、 中间状态和最终状态。 状态机从初始状态开始运行, 经过一系列中
    # o; \8 ~. N0 b* D, j6 G' i间状态后, 到达最终状态并退出。 在一个状态机中, 每个状态都可以接收一组特定事件, 并根据具体的事件类型转换到另一个状
    + ~* A4 \' y/ g  A, q态。 当状态机转换到最终状态时, 则退出。2 n; j" l2 C& ]  H# a
    3.5.1 YARN状态转换方式/ D& k2 A0 v1 P* ?0 W
    YARN中, 每种状态转换由一个四元组表示, 分别是转换前状态( preState) 、 转换后状态( postState) 、 事件( event) 和3 y- ^6 }" J4 c; u5 C$ P
    回调函数(
    hook) 。 YARN定义了三种状态转换方式, 具体如下:
    % `* w( Z- V2 }( O/ P
    1) 一个初始状态、 一个最终状态、 一种事件( 见图3-18) 。 该方式表示状态机在preState状态下, 接收到Event事件后, 执行
    * u& H* L# K7 x' N6 \: j# H$ U函数状态转移函数
    Hook, 并在执行完成后将当前状态转换为postState: S, T5 L0 ?6 k" P
    3-18 初始状态:最终状态:事件=1:1:1
    6 X! O! s( `# L2 w4 Q: I2
    ) 一个初始状态、 多个最终状态、 一种事件( 见图3-19) 。 该方式表示状态机在preState状态下, 接收到Event事件后, 执行# F5 E8 V1 L* u# i/ y5 n1 b. s
    函数状态转移函数
    Hook, 并将当前状态转移为函数Hook的返回值所表示的状态。2 t* G, z* A' x' s6 f2 t# F, b, g
    3-19 初始状态:最终状态:事件=1:N :1
    8 ^' D" r8 m* j* Q3
    ) 一个初始状态、 一个最终状态、 多种事件( 见图3-20) 。 该方式表示状态机在preState状态下, 接收到Event1Event2
    " C) Q7 t) d4 W. ^; n7 `5 k
    Event3中的任何一个事件, 将执行函数状态转移函数Hook, 并在执行完成后将当前状态转换为postState# }% l1 e1 y; E$ y
    3-20 初始状态:最终状态:事件=1:1:N+ g) V* j0 E) H/ d0 R# L1 u
    3.5.2 状态机类4 M; ~9 [9 t% P! _1 a* V: D
    YARN自己实现了一个非常简单的状态机库( 位于包org.apache.hadcop.yarn.state中) , 具体如图3-21所示。 YARN对外提供了
    ! u& v$ Q- I- ~6 g. f$ @5 S一个状态机工厂
    StatemachineFactory, 它提供多种addTransition方法供用户添加各种状态转移, 一旦状态机添加完毕后, 可通过调. P- \* l( ~) |
    installTopology完成一个状态机的构建。
    & p) q4 N8 v+ d2 j/ H' Z
    3-21 状态机类图
    ; \3 F7 @3 @0 H& y" H9 z2 [! m: x
    3.5.3 状态机的使用方法
    0 X3 [0 A' M. K+ Q4 t& V/ B1 [
    本小节将给出一个状态机应用实例, 在该实例中, 创建一个作业状态机JobStateMachine, 该状态机维护作业内部的各种状态4 b! f0 c3 C2 E5 Z# p# C5 R0 R
    变化。 该状态机同时也是一个事件处理器, 当接收到某种事件后, 会触发相应的状态转移。 该实例中没有给出一个中央异步调度
    ; F9 ~4 o4 S+ K% w# E器, 可以嵌到
    3.4.3节的实例程序中运行。
    7 \0 B! {0 Z4 |$ d
    1) 定义作业类型。
    1 J( m% @& l0 g; |  @; Z7 S
    public enum JobEventType {
    & F, A9 r. E2 b( GJOB_KILL,# ~( a- F: W, H$ u+ Y
    JOB_INIT,: v% w! @* ?0 q: _6 D
    JOB_START,
    / }! ^- |' Q: q4 |) o6 l4 LJOB_SETUP_COMPLETED,) S' \/ _; B2 x/ R! i- v; S) j) A
    JOB_COMPLETED. k# V. w, v* R$ k7 z/ o: c2 A
    }
    4 f! ^! ~: l$ S1 Z# l# @1 `# }2) 定义作业状态机。
    9 ?$ t0 `3 p8 `/ s8 J; L
    @SuppressWarnings({ "rawtypes", "unchecked" })
    : L+ \) T+ L( Jpublic class JobStateMachine implements EventHandler<JobEvent>{4 a  y- x% f1 l$ j
    private final String jobID;" {' L" T" {1 `( W8 a. a
    private EventHandler eventHandler;, r# S! Z6 T5 G! S
    private final Lock writeLock;1 a, J* e9 v* c1 \' E3 U
    private final Lock readLock;
    6 Y3 v7 i; H/ B0 _1 f// 定义状态机6 A2 @7 D0 ^' f: x, O3 a* x
    protected static final, |& a# Q! b- }8 c: S
    StateMachineFactory<JobStateMachine, JobStateInternal, JobEventType, JobEvent>9 T2 U' K% E* b$ p( `* A
    stateMachineFactory
    5 \4 t9 W" h% t* E7 ?  f5 N1 M= new StateMachineFactory<JobStateMachine, JobStateInternal, JobEventType, JobEvent>
    , o- w7 K$ @4 ?9 y1 K(JobStateInternal.NEW); ^+ k1 Z. F' q) z8 z8 t% i
    .addTransition(JobStateInternal.NEW, JobStateInternal.INITED,
    ! J+ X7 i" v# \/ c3 bJobEventType.JOB_INIT,7 Z% ?- D$ _& j+ y8 Q. {: S' G0 |/ k( k
    new InitTransition())  V- X- ^' r/ {2 Y# g( V
    .addTransition(JobStateInternal.INITED, JobStateInternal.SETUP,
    6 a9 y8 J6 x3 m) y  S# z) k- A. MJobEventType.JOB_START,. H7 A& {, O. c0 Y* z% T
    new StartTransition())/ N8 r) z3 d6 r5 d, j/ h' L
    .addTransition(JobStateInternal.SETUP, JobStateInternal.RUNNING,# a7 a% }  l8 S9 g: P- S
    JobEventType.JOB_SETUP_COMPLETED,
    ( M: h2 c7 }% n) Pnew SetupCompletedTransition())
    / X) d' T. }/ c.addTransition
    0 u( i. \5 }; X* g7 `4 m(JobStateInternal.RUNNING,& j; C0 V9 g# V% T! l, q
    EnumSet.of(JobStateInternal.KILLED, JobStateInternal.SUCCEEDED),- L6 Q1 a4 x: R4 ~
    JobEventType.JOB_COMPLETED,1 ^) D" B  A3 N9 P. x$ }
    new JobTasksCompletedTransition())
    8 d6 N' d$ q# f: W( N.installTopology();& u% r$ w6 S$ M/ U1 C9 N; R
    private final StateMachine<JobStateInternal, JobEventType, JobEvent> stateMachine;6 w7 ?( n: |5 A- k
    public JobStateMachine(String jobID, EventHandler eventHandler) {: Z: g& E$ M1 B" u8 \6 Y9 i+ q0 h  w
    this.jobID = jobID;
    1 {3 d: W; z( w2 a) @ReadWriteLock readWriteLock = new ReentrantReadWriteLock();& h( |1 y( }9 T7 A2 d
    this.readLock = readWriteLock.readLock();
    . S' a! L2 S4 q( z1 pthis.writeLock = readWriteLock.writeLock();0 Z0 Z3 }& m8 l6 W6 s
    this.eventHandler = eventHandler;
    . X# E& y4 E/ z8 X7 c0 \. ^stateMachine = stateMachineFactory.make(this);
    8 ]/ ?! w: a# ?  `$ n1 k}p: d5 L# b. N2 Q/ I) U4 n( @; _
    rotected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {& G/ Q) ~) r( Z8 b% l2 V( R7 j
    return stateMachine;0 z7 G' {% K7 k& N$ ~: `
    } p
    ' ^( F2 e& b0 ~; M. N* |) rublic static class InitTransition
    . Z4 v! s4 P7 nimplements SingleArcTransition<JobStateMachine, JobEvent> {
    - {) H! F3 m5 A+ o@Override1 ^' ~# ?1 L5 a: k0 Q+ C. _, i4 w" w
    public void transition(JobStateMachine job, JobEvent event) {
    . D7 D  K9 n8 n& O  l! nSystem.out.println("Receiving event " + event);6 v; t3 X$ q' T2 z1 v" C! l
    job.eventHandler.handle(new JobEvent(job.getJobId(), JobEventType.JOB_START));
    5 r/ v" D9 S6 ~9 }. ]1 _}7 y* C! }, @1 G6 S0 |6 i
    } p
    ) Q0 a& S# L8 [* d( s5 J% Uublic static class StartTransition, f( a! E1 `: G5 w5 d- N3 N/ B
    implements SingleArcTransition<JobStateMachine, JobEvent>>{
    ! w4 D3 D4 O* H9 u5 p/ H@Override
    ) @+ C. M1 S0 c1 Epublic void transition(JobStateMachine job, JobEvent event) {
    ; T# z- j7 ~5 {  DSystem.out.println("Receiving event " + event);0 j8 F5 K4 V3 I7 I
    job.eventHandler.handle(new JobEvent(job.getJobId(), JobEventType.JOB_SETUP_COMPLETED));" f; E, Z' _/ V* t. H
    }$ I7 ^: ~3 z5 g. z
    }…/
    % M8 e7 f! v( [- {2 D/定义类SetupCompletedTransitionJobTasksCompletedTransition2 c1 [$ m! |3 B. A: Z& C% U8 g+ n: j
    @Override
    2 H7 Y5 G# ]( opublic void handle(JobEvent event) {2 Q, Z2 J! {; L3 H7 ]+ [
    try {+ I0 l" p0 K& ^
    writeLock.lock();
    ) W0 ^2 A; f/ cJobStateInternal oldState = getInternalState();7 H; X2 J% Z$ K' B
    try {
    / g8 K) e2 B2 v0 Y& Q( ^. g! @% rgetStateMachine().doTransition(event.getType(), event);
    / N! b& J; j9 S( N/ Q6 |1 s} catch (InvalidStateTransitonException e) {, V0 t& R4 f1 @
    System.out.println("Can't handle this event at current state");6 w+ U% @. S7 I6 r
    }i
    - u4 Y/ U( ]. t: A5 f0 c' D" N0 Gf (oldState != getInternalState()) {5 Y0 t7 A3 v) L5 D$ U
    System.out.println("Job Transitioned from " + oldState + " to "3 {/ N- R0 ^2 X- U5 e
    + getInternalState());! R/ c( d; X3 z% E5 P! ~  N
    }* j2 ~- w4 v" B& n* y3 ^- D% U( S
    }f
    8 N9 G7 y* g+ K1 A" uinally {
    4 r$ i; F; m" j: KwriteLock.unlock();* c5 B9 i5 ?7 i6 _( [# J
    }
    9 T  u5 V3 q' f  m9 G, Y0 \} p/ P& O2 {0 [: t* [' s! u# i1 U: w
    ublic JobStateInternal getInternalState() {! ]7 V) E0 Z, L% E' N, n' Y
    readLock.lock();5 P6 n$ G$ Q9 d+ K* n0 h0 B1 y
    try {
    8 a& ]5 `  q' E" \% Breturn getStateMachine().getCurrentState();
    : ]) u9 N+ D! H/ x, R$ C2 O2 g} finally {
    , b/ w% ]* E7 f+ Y; IreadLock.unlock();2 m$ ?* w2 j# o( n# u: u/ B
    }
    : H& F# l0 h/ y}p
    0 q9 S* ]* D5 y4 `! O5 @ublic enum JobStateInternal { //作业内部状态
    1 H' x! ?' H$ e  H8 o/ G9 R, z
    NEW,6 f" Z# s/ e; J' B' z) x
    SETUP,, e( E" l: T5 _: F$ b
    INITED,$ Q/ Y7 e; e' P
    RUNNING,
    / Z5 D+ C0 R  l" eSUCCEEDED,# O, E8 C/ B" @6 D* t* V7 t
    KILLED,
    & D) S& |, I- C1 g/ V3 a}
    % x8 Z  B- _) z7 {}, X0 I% z! t- P3 O' c! U
    3.5.4 状态机可视化$ i2 \6 V) f% }8 L5 I9 S
    YARN中实现了多个状态机对象, 包括ResourceManager中的RMAppImplRMApp-AttemptImplRMContainerImpl4 `3 w4 h" r& z' X+ j: e
    RMNodeImplNodeManager中的ApplicationImplContainerImplLocalizedResourceMRAppMaster中的JobImplTaskImpl
    , }, i2 R0 u, C: S' O+ _( h
    TaskAttemptImpl等。 为了便于用户查看这些状态机的状态变化以及相关事件, YARN提供了一个状态机 可视化工具 [16] , 具体操! @7 e4 J9 `1 y0 {
    作步骤如下。9 Z% [# k8 O5 w1 E' M: Y6 K: _) g
    步骤
    1 将状态机转化为graphviz(.gv)格式的文件, 编译命令如下:
    / X) R, d: ~- I
    mvn compile -Pvisualize
    0 F1 G4 A* E5 y' S" M经过该步骤后, 本地目录中生成了ResourceManager.gvNodeManager.gvMapReduce.gv三个graphviz格式的文件( 有兴趣可以
    4 s' V* W% ~! D0 \直接打开查看具体内容) 。

    ) W! @  n4 l. w7 I0 }* k6 S步骤2 使用可视化包graphviz中的相关命令生成状态机图, Shell命令具体如下:4 y* c, z. e4 G8 f: n2 q' H
    dot -Tpng NodeManager.gv > NodeManager.png: {# u. u" e( D# ?
    如果尚未安装graphviz包, 操作该步骤之前先要安装该包。! N7 E1 G: d7 E# n3 x" C4 `
    注释; v" q4 m) ^! j+ j2 l7 R
    [16] 参见网址https://issues.apache.org/jira/browse/MAPREDUCE-2930  7 O) a! X9 N! F9 k+ Y1 `

    7 L; K# T; R. ^2 h, Q/ B/ d( a
    / B3 A$ ~! ~5 ^* a, \/ L$ n
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|Archiver|手机版|小黑屋|Java自学网

    GMT+8, 2025-3-11 15:00 , Processed in 0.135819 second(s), 33 queries .

    Powered by Javazx

    Copyright © 2012-2022, Javazx Cloud.

    快速回复 返回顶部 返回列表