java自学网VIP

Java自学网

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 3205|回复: 0

《深入解析YARN架构设计与实现原理》第3章 YARN基础库【3.5】

[复制链接]
  • TA的每日心情
    开心
    2021-5-25 00:00
  • 签到天数: 1917 天

    [LV.Master]出神入化

    2025

    主题

    3683

    帖子

    6万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    66345

    宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2017-4-17 14:04:29 | 显示全部楼层 |阅读模式
    3.5 状态机库
    6 @# g9 w! @; @状态机由一组状态组成, 这些状态分为三类: 初始状态、 中间状态和最终状态。 状态机从初始状态开始运行, 经过一系列中9 [- d& U, l2 T
    间状态后, 到达最终状态并退出。 在一个状态机中, 每个状态都可以接收一组特定事件, 并根据具体的事件类型转换到另一个状
    9 k1 R1 H" Y- U# Y9 X态。 当状态机转换到最终状态时, 则退出。# y# ]( U5 v& m9 L9 x
    3.5.1 YARN状态转换方式
    , f5 V* s7 x8 v
    YARN中, 每种状态转换由一个四元组表示, 分别是转换前状态( preState) 、 转换后状态( postState) 、 事件( event) 和
    , E2 o& p# n% S' U* Z# V回调函数(
    hook) 。 YARN定义了三种状态转换方式, 具体如下:1 K6 C$ r  c- A8 R4 P
    1) 一个初始状态、 一个最终状态、 一种事件( 见图3-18) 。 该方式表示状态机在preState状态下, 接收到Event事件后, 执行3 E6 j8 a6 ~+ \1 p- c: Q
    函数状态转移函数
    Hook, 并在执行完成后将当前状态转换为postState2 m9 d$ \& L. q9 ]2 s4 l- h6 C
    3-18 初始状态:最终状态:事件=1:1:1# ]) b9 `% \5 ^0 ?
    2
    ) 一个初始状态、 多个最终状态、 一种事件( 见图3-19) 。 该方式表示状态机在preState状态下, 接收到Event事件后, 执行
    4 P0 ]" t: s& h函数状态转移函数
    Hook, 并将当前状态转移为函数Hook的返回值所表示的状态。3 f$ n: i: U4 W; B8 t' f  d0 B
    3-19 初始状态:最终状态:事件=1:N :18 r: Z0 Q! ]& r0 [8 H# F
    3
    ) 一个初始状态、 一个最终状态、 多种事件( 见图3-20) 。 该方式表示状态机在preState状态下, 接收到Event1Event2. |$ |5 ~' n, v, a# U0 P1 ]' o  C! N: x
    Event3中的任何一个事件, 将执行函数状态转移函数Hook, 并在执行完成后将当前状态转换为postState  I8 ^, {' e; n6 A. D6 j' A
    3-20 初始状态:最终状态:事件=1:1:N
    * h; {$ x/ H- W, {" I) S9 b
    3.5.2 状态机类1 f& g8 x8 N1 d; z0 j# P% U: A
    YARN自己实现了一个非常简单的状态机库( 位于包org.apache.hadcop.yarn.state中) , 具体如图3-21所示。 YARN对外提供了6 R9 R" {! o5 L+ A- l
    一个状态机工厂
    StatemachineFactory, 它提供多种addTransition方法供用户添加各种状态转移, 一旦状态机添加完毕后, 可通过调( Q, [  r( Z/ m" C2 B
    installTopology完成一个状态机的构建。
    " c9 s/ @5 `# E5 C
    3-21 状态机类图
    7 T% [) p7 S" F) z% d' s
    3.5.3 状态机的使用方法6 T* Z! k8 l- o% a7 C/ A
    本小节将给出一个状态机应用实例, 在该实例中, 创建一个作业状态机JobStateMachine, 该状态机维护作业内部的各种状态- t0 V2 \) ?  s% H7 }
    变化。 该状态机同时也是一个事件处理器, 当接收到某种事件后, 会触发相应的状态转移。 该实例中没有给出一个中央异步调度: w3 D, }0 J" q% ?
    器, 可以嵌到
    3.4.3节的实例程序中运行。
    1 @4 e# o3 ?# N% m# H
    1) 定义作业类型。
    4 b0 d) r6 F) S, t$ H& W
    public enum JobEventType {
    7 k/ s/ k/ p# eJOB_KILL,( h% P' A6 ]1 H3 P9 v
    JOB_INIT,' I* ^$ Q& B  a! t! O
    JOB_START,
    . G6 A% C/ n4 Q4 v9 VJOB_SETUP_COMPLETED,& Z' z/ A$ l1 G# Z6 O* S* S6 m% p
    JOB_COMPLETED
    , m1 J8 L, P. \0 T}
    % ?, a3 j. ?# O% \* Q2) 定义作业状态机。4 I4 j2 H) w2 c
    @SuppressWarnings({ "rawtypes", "unchecked" })
    0 D% J, M- f1 Y" `. r1 z4 A1 B3 qpublic class JobStateMachine implements EventHandler<JobEvent>{! U6 ^3 e' j5 a$ E- A3 T
    private final String jobID;
    ( g% B/ D8 A* s: j' Q, [+ i' t2 O2 _private EventHandler eventHandler;7 N7 C0 J; n# C0 S- @& c/ o  R
    private final Lock writeLock;
    $ Y. Y2 L' Z/ fprivate final Lock readLock;
      q6 c! x; b0 m2 A/ E// 定义状态机
    1 {+ ^( ?  z) ]# a6 `' G9 {$ [/ g
    protected static final- h7 [0 _5 L0 y7 ]. B: S
    StateMachineFactory<JobStateMachine, JobStateInternal, JobEventType, JobEvent>& t( F8 A0 n, z9 W
    stateMachineFactory, w  N- U% v9 l1 v. R
    = new StateMachineFactory<JobStateMachine, JobStateInternal, JobEventType, JobEvent>
    ! R2 {) A( Q4 r" e8 m. \5 N6 o(JobStateInternal.NEW)% X; y% P5 X+ K1 l# f5 D
    .addTransition(JobStateInternal.NEW, JobStateInternal.INITED,
    ( v1 P9 V: D1 m2 v9 {# B; V. _JobEventType.JOB_INIT,& I1 t4 U8 N) p1 S# {' O5 q2 |
    new InitTransition()); v/ T* O  ?7 }" E: Q
    .addTransition(JobStateInternal.INITED, JobStateInternal.SETUP,
    3 `7 o# `' N9 S( L  {: K3 L4 HJobEventType.JOB_START,
    + m1 z5 _) M( f/ ^( tnew StartTransition())% a; x. j/ p7 w3 v3 i6 `
    .addTransition(JobStateInternal.SETUP, JobStateInternal.RUNNING,& D- Y5 n0 ~0 m4 ?
    JobEventType.JOB_SETUP_COMPLETED,
    ' B( f+ J+ X8 S6 `new SetupCompletedTransition())
    , f0 M" Z% _) L3 B2 f.addTransition. N; U- }- k: f
    (JobStateInternal.RUNNING,8 A" ]5 h% v; s( z0 ~
    EnumSet.of(JobStateInternal.KILLED, JobStateInternal.SUCCEEDED),
    3 B6 P9 h* p  s6 K" j' R  l4 HJobEventType.JOB_COMPLETED,# K& z$ A7 ~) r# A5 K
    new JobTasksCompletedTransition())
    $ Q3 \, n" y5 Y, F# r* y# c" A.installTopology();8 f% c* R& ]9 w  ]
    private final StateMachine<JobStateInternal, JobEventType, JobEvent> stateMachine;1 M* F" u1 q6 ~
    public JobStateMachine(String jobID, EventHandler eventHandler) {& n4 X: n  Q; e: A
    this.jobID = jobID;" a, S- E. R3 R' J- j) `4 p* ^
    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    ) i  B& Z9 X9 g4 ~/ Sthis.readLock = readWriteLock.readLock();
    ' N; F2 Q' \6 B% lthis.writeLock = readWriteLock.writeLock();
    # w4 I) W$ z8 k8 u/ K0 Bthis.eventHandler = eventHandler;
    ( u7 |% [8 L) v* z" }7 i3 ostateMachine = stateMachineFactory.make(this);8 x! i: U& `1 _5 ^
    }p
    ' Q; b; i3 z& t) P: g+ ^rotected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {
    ! b9 J; a- j/ }return stateMachine;
    ( S  j$ r3 V- `6 c2 S} p- o0 k0 p" O$ a: N0 w2 D9 q0 I2 C- K
    ublic static class InitTransition4 H7 Q. D3 ~. g6 L
    implements SingleArcTransition<JobStateMachine, JobEvent> {. G1 \! u; ~# l6 g+ A
    @Override% d+ X5 }3 Y8 D8 Y  u1 t
    public void transition(JobStateMachine job, JobEvent event) {
    9 M8 i& Q6 L, g8 Y' W' ^/ @7 l- ~% KSystem.out.println("Receiving event " + event);
    * s  Z% ?  Z* h1 v; `job.eventHandler.handle(new JobEvent(job.getJobId(), JobEventType.JOB_START));
    " B( [$ }# l* ]5 @3 ~+ U1 P- h}
      J2 B0 M0 r* Q} p6 v4 z5 u$ n8 t9 s- y
    ublic static class StartTransition
    ! ]- s  \( d# a/ I$ S" N" Y. vimplements SingleArcTransition<JobStateMachine, JobEvent>>{$ \# a! y: z) n8 u$ r! Q1 ]
    @Override
    ) Z/ M4 U5 f. i7 o( k: |  Gpublic void transition(JobStateMachine job, JobEvent event) {3 A' p( W' \3 Y0 T5 @: K
    System.out.println("Receiving event " + event);, N. z6 {3 S; c: W4 s
    job.eventHandler.handle(new JobEvent(job.getJobId(), JobEventType.JOB_SETUP_COMPLETED));) @3 D- `# D" A( t' v' i
    }6 z7 y% Q3 V+ ~' A- [9 |& W8 ^( k$ Q
    }…/
    ) Q$ O% G) P7 l: d; e0 e4 u/定义类SetupCompletedTransitionJobTasksCompletedTransition
    " o4 N+ G+ c, M% H@Override
    4 {) n; X' D) q4 Jpublic void handle(JobEvent event) {
    * F9 ^, b, W( B- D% g+ ]try {: K' d) Z. z& D. x3 G' t
    writeLock.lock();( E; W7 \; `# ~/ g4 a/ D1 \2 C
    JobStateInternal oldState = getInternalState();
    ; R3 B: k7 O8 M+ b6 Ptry {
    8 J6 ?# F, u, D8 ^& c4 c9 p  j4 rgetStateMachine().doTransition(event.getType(), event);& ]; J1 ]: A6 R" X" A  a- ]& i
    } catch (InvalidStateTransitonException e) {
    - N5 K  T7 s  X$ V% G8 o3 W4 bSystem.out.println("Can't handle this event at current state");
    : L5 ^0 G* E2 ~  ~0 h9 {}i
    , ]' P3 y" Z3 f- I+ }* m5 If (oldState != getInternalState()) {  z% E7 U7 n- e% x# [7 j: n8 l
    System.out.println("Job Transitioned from " + oldState + " to "! w* _: m: K' I$ D) e1 J& u  B( F
    + getInternalState());
    2 ^9 @& Z- E# {, a6 _}: j  G! O) j7 l, n8 l+ `
    }f
    # y7 y+ i) p8 r' w( ainally {
    . B& W( d4 y% DwriteLock.unlock();, ~. M: d( t6 G& {% k4 Y
    }
    & j  l7 {) i+ Z) x} p
    ( j5 ]2 D" B, Y8 P, ^' J. i' Eublic JobStateInternal getInternalState() {; X& Y/ R. U7 E$ s8 E
    readLock.lock();3 x' k- s1 x8 K' ^' Q
    try {4 a' X; @. g5 ~: Q8 l
    return getStateMachine().getCurrentState();
    & R  n, z; `: \# y& l9 M} finally {4 H& ?; t% e+ p( F$ c
    readLock.unlock();, v; P+ H; Y+ C4 V1 q' c8 B7 t
    }
    . C7 M  d5 }* Y- a- L}p
    % O3 Q/ D: X2 v4 i% j0 Bublic enum JobStateInternal { //作业内部状态
    ) t% p8 R5 Y) A; ~- J; T' P' R0 y  ~
    NEW,  j; h; m3 W8 ]) S
    SETUP,: Q- A/ }9 i: O2 n& |; m
    INITED,. i5 L; S: a# x
    RUNNING,
    : a8 S8 p( r, _1 `  k  `SUCCEEDED,
    0 \* P& n. i" A2 ZKILLED,
    & H$ h! G' T) K9 _}9 P3 V5 x) x2 y- R( v. a( i
    }
    2 z' G: c/ r/ W* j" U" ^. x3.5.4 状态机可视化0 o. E3 N# e/ L4 X* ^
    YARN中实现了多个状态机对象, 包括ResourceManager中的RMAppImplRMApp-AttemptImplRMContainerImpl
    , `" o- b+ Z, O. t. p% d
    RMNodeImplNodeManager中的ApplicationImplContainerImplLocalizedResourceMRAppMaster中的JobImplTaskImpl
    5 G* n! O( Y+ v6 I$ t6 r0 }
    TaskAttemptImpl等。 为了便于用户查看这些状态机的状态变化以及相关事件, YARN提供了一个状态机 可视化工具 [16] , 具体操
    5 {! u4 {4 C% _9 s5 @9 U作步骤如下。
    + d, o. \$ w) w: ]步骤
    1 将状态机转化为graphviz(.gv)格式的文件, 编译命令如下:
    ; ~$ P# f7 D6 P6 p3 f
    mvn compile -Pvisualize
    $ }$ P0 X5 T3 H: \' o经过该步骤后, 本地目录中生成了ResourceManager.gvNodeManager.gvMapReduce.gv三个graphviz格式的文件( 有兴趣可以
    8 P: m# t4 Y2 _7 _2 [直接打开查看具体内容) 。

    ) l8 D' D( [2 D: M0 Q9 q5 k, X+ J: Q步骤2 使用可视化包graphviz中的相关命令生成状态机图, Shell命令具体如下:
    + K! q5 S& f* G0 ?) @, [6 k- c
    dot -Tpng NodeManager.gv > NodeManager.png; ]6 T8 }5 N9 N9 O5 k- J  ]+ R8 _
    如果尚未安装graphviz包, 操作该步骤之前先要安装该包。  S) T) j4 P% o/ V5 J
    注释
    % S0 H/ M9 L# z
    [16] 参见网址https://issues.apache.org/jira/browse/MAPREDUCE-2930  
    ( }9 w1 r3 ~" [" Z$ H. F- K0 ~
    # p/ q" T$ T& K. d. j7 e$ \
    . E/ T9 C6 `0 P% l  b( `
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|Archiver|手机版|小黑屋|Java自学网

    GMT+8, 2024-11-21 17:47 , Processed in 0.279852 second(s), 28 queries .

    Powered by Javazx

    Copyright © 2012-2022, Javazx Cloud.

    快速回复 返回顶部 返回列表