java自学网VIP

Java自学网

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 3274|回复: 0

《深入解析YARN架构设计与实现原理》第3章 YARN基础库【3.5】

[复制链接]
  • TA的每日心情
    开心
    2021-5-25 00:00
  • 签到天数: 1917 天

    [LV.Master]出神入化

    2039

    主题

    3697

    帖子

    6万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    66471

    宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2017-4-17 14:04:29 | 显示全部楼层 |阅读模式
    3.5 状态机库( R9 r7 y1 J5 m2 Q) Y
    状态机由一组状态组成, 这些状态分为三类: 初始状态、 中间状态和最终状态。 状态机从初始状态开始运行, 经过一系列中- x3 v9 z* n7 Z+ a4 k- s, Y
    间状态后, 到达最终状态并退出。 在一个状态机中, 每个状态都可以接收一组特定事件, 并根据具体的事件类型转换到另一个状$ N- |; |/ C. k: L( X+ {+ Y
    态。 当状态机转换到最终状态时, 则退出。8 C; s+ A: f! d# q- u, N; ]2 n/ u2 k% z
    3.5.1 YARN状态转换方式; d5 W/ F4 ^+ ?$ H# ~) D+ i
    YARN中, 每种状态转换由一个四元组表示, 分别是转换前状态( preState) 、 转换后状态( postState) 、 事件( event) 和- p, n  ?- e( Z& ^* T" B% S
    回调函数(
    hook) 。 YARN定义了三种状态转换方式, 具体如下:$ o( H5 I7 U7 f+ o2 b. p7 f
    1) 一个初始状态、 一个最终状态、 一种事件( 见图3-18) 。 该方式表示状态机在preState状态下, 接收到Event事件后, 执行- }7 X. K: z3 H7 g. O/ T
    函数状态转移函数
    Hook, 并在执行完成后将当前状态转换为postState6 n! V' i& u5 ]0 A% l$ d
    3-18 初始状态:最终状态:事件=1:1:1
    " k. f  W* P5 r  h2
    ) 一个初始状态、 多个最终状态、 一种事件( 见图3-19) 。 该方式表示状态机在preState状态下, 接收到Event事件后, 执行
    - L  @, p4 F2 d函数状态转移函数
    Hook, 并将当前状态转移为函数Hook的返回值所表示的状态。/ W0 l& O/ `* W! V- e. H
    3-19 初始状态:最终状态:事件=1:N :17 m/ h) o: ~' ~9 ^6 ^9 P
    3
    ) 一个初始状态、 一个最终状态、 多种事件( 见图3-20) 。 该方式表示状态机在preState状态下, 接收到Event1Event2; c0 H) Q8 b- m' j
    Event3中的任何一个事件, 将执行函数状态转移函数Hook, 并在执行完成后将当前状态转换为postState
    6 {, E  u8 k8 `/ v( O: ~: Z1 F- A3-20 初始状态:最终状态:事件=1:1:N
    7 E: n6 W0 x8 S0 ?; x4 i3 ^
    3.5.2 状态机类+ N7 c1 E4 T3 I1 P0 D
    YARN自己实现了一个非常简单的状态机库( 位于包org.apache.hadcop.yarn.state中) , 具体如图3-21所示。 YARN对外提供了- y0 z- u: X3 b3 L. ]: c# ^
    一个状态机工厂
    StatemachineFactory, 它提供多种addTransition方法供用户添加各种状态转移, 一旦状态机添加完毕后, 可通过调+ E% G/ w$ ?7 l! @  b# x5 Q4 n; m
    installTopology完成一个状态机的构建。8 s! U0 F$ l( k. [. z7 `( Q* R* o
    3-21 状态机类图& G" w& k& h. \* L
    3.5.3 状态机的使用方法8 J' d# y9 l. o* [, A" q2 h4 U
    本小节将给出一个状态机应用实例, 在该实例中, 创建一个作业状态机JobStateMachine, 该状态机维护作业内部的各种状态
      U4 W. A- Y$ ?# n4 g变化。 该状态机同时也是一个事件处理器, 当接收到某种事件后, 会触发相应的状态转移。 该实例中没有给出一个中央异步调度9 {! V% T0 n6 G
    器, 可以嵌到
    3.4.3节的实例程序中运行。* x- L0 o" s2 m' Z! p
    1) 定义作业类型。
    - [  j* i5 f# p0 P
    public enum JobEventType {6 a5 T+ V9 C! x, d7 F5 w
    JOB_KILL,6 r1 j& J, A) l3 f" m& @+ ?  J; |" o
    JOB_INIT,: i2 h3 {5 h, g- P" H4 [( X- v) Z& f
    JOB_START,
    ) k" F1 p* n' \$ y, PJOB_SETUP_COMPLETED,
    ! s6 |4 v. @% VJOB_COMPLETED/ [  @" b) T% K
    }
    4 S! N$ O3 Y4 W% e7 V2) 定义作业状态机。4 I; z4 y) ^# l2 v  W2 N: w
    @SuppressWarnings({ "rawtypes", "unchecked" })
    . O- v. e0 p& k9 T9 Npublic class JobStateMachine implements EventHandler<JobEvent>{
    # `8 K) t  Z7 P- C5 n7 |private final String jobID;% z0 R4 E& T; \, A0 f* _) u6 k& U
    private EventHandler eventHandler;
    5 l9 c* O0 y4 S- s0 Eprivate final Lock writeLock;
    ' w, R! `8 L. `) dprivate final Lock readLock;
    ' I. h. S' J0 l- F6 ~// 定义状态机
    0 J& t" \/ d: V8 ?
    protected static final
    % A1 z- _; Z# H* E! b2 q& R! y, }, bStateMachineFactory<JobStateMachine, JobStateInternal, JobEventType, JobEvent>( ?3 P! I2 @$ g* m  @2 t
    stateMachineFactory
    + p7 J* c3 f8 V. H: a5 Y= new StateMachineFactory<JobStateMachine, JobStateInternal, JobEventType, JobEvent>7 p; b. w) j6 ^
    (JobStateInternal.NEW)
    ) u* ?# Q1 C& S6 @.addTransition(JobStateInternal.NEW, JobStateInternal.INITED,7 t9 N3 F1 ~9 ]) m# `
    JobEventType.JOB_INIT,
    - A! {1 N! Y1 U. o7 xnew InitTransition())
    ( B* i3 `8 z8 k  K.addTransition(JobStateInternal.INITED, JobStateInternal.SETUP,
    6 a& W' K& ?) }  t+ M9 pJobEventType.JOB_START,
    $ E' I: V+ Q& q2 o( D: snew StartTransition())
    1 y* t) D8 [2 g1 z.addTransition(JobStateInternal.SETUP, JobStateInternal.RUNNING,! K, W# m# R7 \0 \; n
    JobEventType.JOB_SETUP_COMPLETED,% G# ~" W1 D. r5 Q, z+ Y: U
    new SetupCompletedTransition())
    - a. G3 f1 O* I, i, f.addTransition& O5 V; M1 h) S! P2 C
    (JobStateInternal.RUNNING,
    5 {9 C! I5 n7 O! j& l/ jEnumSet.of(JobStateInternal.KILLED, JobStateInternal.SUCCEEDED),: j8 f  }+ Q% [4 ]2 k5 j9 l
    JobEventType.JOB_COMPLETED,4 H! O: r! J5 i9 T8 C) [' X
    new JobTasksCompletedTransition())
    ( ~' l' `' Q! [( d.installTopology();
    - F- i* m7 F8 n+ q& }+ e2 ?private final StateMachine<JobStateInternal, JobEventType, JobEvent> stateMachine;
    5 ^0 P. @! M) d( ^public JobStateMachine(String jobID, EventHandler eventHandler) {
    8 ]- L# ]4 [$ ]0 cthis.jobID = jobID;. A4 A) t% j" j4 i
    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();. }6 H, N8 q" @0 C4 L
    this.readLock = readWriteLock.readLock();
    + Y! B* d0 ?: O# I+ \1 M* Bthis.writeLock = readWriteLock.writeLock();9 H0 O. a! y4 R# G* ^4 ]" h9 W
    this.eventHandler = eventHandler;
    8 k8 f$ P% B  ^stateMachine = stateMachineFactory.make(this);& |' X. X8 l! N1 U' C' H# Q3 C
    }p/ D) k* L# Q$ f3 m9 \' \8 A% F) n
    rotected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {
    , ~2 }* f/ M* \/ {1 L; Ereturn stateMachine;
    * Z# ?, Y$ \% _: Q! e0 i} p
    $ T1 y- N& W* s4 \" hublic static class InitTransition
    ; \3 \2 b- t6 Z6 I' V- {0 @$ Jimplements SingleArcTransition<JobStateMachine, JobEvent> {* I, w. E" P8 {1 N8 K! L% W2 Y& I$ B
    @Override
    " f( n( W2 \$ \# C8 G9 o+ Upublic void transition(JobStateMachine job, JobEvent event) {
    3 g8 e2 v  e$ E" @* mSystem.out.println("Receiving event " + event);/ l  R4 [7 [  G
    job.eventHandler.handle(new JobEvent(job.getJobId(), JobEventType.JOB_START));% d! T( k; ^5 R
    }
    * a$ F! S$ i1 Z* a: `} p: s" p3 {) X: c+ f& O, l4 [
    ublic static class StartTransition* R3 W# R  ]1 [# G4 r- _- D
    implements SingleArcTransition<JobStateMachine, JobEvent>>{
    1 C$ U6 l! o/ c# t! I, j@Override  f/ ?$ j8 u# O5 z* _& [& t
    public void transition(JobStateMachine job, JobEvent event) {; S( Z4 Q. V5 c# @
    System.out.println("Receiving event " + event);0 k( O0 d3 G# r6 v
    job.eventHandler.handle(new JobEvent(job.getJobId(), JobEventType.JOB_SETUP_COMPLETED));2 z. K- j; I! m. @( q
    }- d1 I! M" T6 y9 g& G6 z: B5 q3 K
    }…/. c2 _7 ~) y- Q  U
    /定义类SetupCompletedTransitionJobTasksCompletedTransition
    8 n3 v) {* M' Y* f, o7 o@Override5 I: Q6 r1 s, n' a9 V/ l
    public void handle(JobEvent event) {
    . s7 m, M6 D, {* Y, P. ytry {
    ' |' V4 R  t6 I. `% [' IwriteLock.lock();! ~+ w% U' e( r
    JobStateInternal oldState = getInternalState();
    9 e1 Q" x2 p' N$ x7 h; j- m9 ?try {% Q4 M. i0 f. D5 }9 N# _
    getStateMachine().doTransition(event.getType(), event);* ~6 m5 g& \# g) s+ z
    } catch (InvalidStateTransitonException e) {% s* {2 T4 f# G- D
    System.out.println("Can't handle this event at current state");
    0 V" [3 c3 ?  F  D% f4 s9 \}i
    - p" m# q$ c& q# Mf (oldState != getInternalState()) {' ^+ F5 @) \1 t8 ]
    System.out.println("Job Transitioned from " + oldState + " to "0 n. M" i9 }% M2 y) r
    + getInternalState());
    * ~- _3 ~1 w5 S- s9 V9 ~}
    8 d2 a7 D* a" v" F1 S! C! {1 @}f
    # e: ?: O' |& D7 P" l! E# Binally {( q. R0 c! q0 H0 b9 o3 B$ c
    writeLock.unlock();4 O2 @0 k- {  k  }5 Y  J! P
    }
    + k  w6 o7 a: c} p
    9 E  \3 z5 j& y4 s  rublic JobStateInternal getInternalState() {( w7 M: k$ C1 q' t
    readLock.lock();
    % ~% Y/ e+ E, Z# e  Z7 v) \5 mtry {: \: a/ m+ J" M0 t# r% c% ^9 z
    return getStateMachine().getCurrentState();2 E) Q: |8 q- Q1 v1 G: T0 ^
    } finally {
    ( Z3 s0 y( v3 f* l, F3 K+ freadLock.unlock();" v' t& ]. p: |' T& F7 k
    }
    4 n6 ?$ I8 A/ N% i}p
    / N6 r3 `! i$ S. M1 oublic enum JobStateInternal { //作业内部状态2 J4 ?& @0 }) J0 Z  ]" F/ Q
    NEW,
    . J* V5 ]* {( @. Y1 `2 r+ `5 R: XSETUP,9 N8 E% v+ M  B" `
    INITED,( N; D) o4 O0 _; g- T8 f
    RUNNING,9 T6 z0 _! C: ^
    SUCCEEDED,
    ( p; x+ \5 w( q) G1 Y1 EKILLED,
    ' f7 |1 D6 L0 P+ E& k* F: ?}
    1 _" H' z7 u) l7 v}
    8 J+ u. z0 c6 }+ G. _: Q3.5.4 状态机可视化0 Z! M; Y# Y# Y0 j7 E$ X1 q4 Z
    YARN中实现了多个状态机对象, 包括ResourceManager中的RMAppImplRMApp-AttemptImplRMContainerImpl
    + H: z7 ?: x7 ]( \8 D9 x
    RMNodeImplNodeManager中的ApplicationImplContainerImplLocalizedResourceMRAppMaster中的JobImplTaskImpl
    : @9 u3 e& D: d6 R* }' ]/ C
    TaskAttemptImpl等。 为了便于用户查看这些状态机的状态变化以及相关事件, YARN提供了一个状态机 可视化工具 [16] , 具体操
    / ]' A0 U' u5 l) k$ s  S! }6 t5 S作步骤如下。1 f: Q# f. L/ X2 ~; o: ]* y
    步骤
    1 将状态机转化为graphviz(.gv)格式的文件, 编译命令如下:
    ) q! t6 |4 U; ~2 S* h' k3 N
    mvn compile -Pvisualize
    " J: |' @3 U- R经过该步骤后, 本地目录中生成了ResourceManager.gvNodeManager.gvMapReduce.gv三个graphviz格式的文件( 有兴趣可以
    % \1 S; C- J* m- }直接打开查看具体内容) 。

    + y* F1 D, `6 j" K步骤2 使用可视化包graphviz中的相关命令生成状态机图, Shell命令具体如下:
    9 A) t4 L; y8 p' H
    dot -Tpng NodeManager.gv > NodeManager.png& m+ a  D6 ?1 p+ P% p9 Y" n
    如果尚未安装graphviz包, 操作该步骤之前先要安装该包。
    + r+ O- |- j3 V1 y5 T& s0 f* h$ p6 T注释
    3 q; ^( K& k, ~* [! R% H0 [: {
    [16] 参见网址https://issues.apache.org/jira/browse/MAPREDUCE-2930  
    9 B  ^8 Y' J1 y; G
    0 F1 i( ]  V" I$ b) A1 q. Y% G. c- M' A: T
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|Archiver|手机版|小黑屋|Java自学网

    GMT+8, 2025-1-22 12:24 , Processed in 0.379103 second(s), 30 queries .

    Powered by Javazx

    Copyright © 2012-2022, Javazx Cloud.

    快速回复 返回顶部 返回列表