java自学网VIP

Java自学网

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 3369|回复: 0

《深入解析YARN架构设计与实现原理》第3章 YARN基础库【3.5】

[复制链接]
  • TA的每日心情
    开心
    2021-5-25 00:00
  • 签到天数: 1917 天

    [LV.Master]出神入化

    2093

    主题

    3751

    帖子

    6万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    66775

    宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2017-4-17 14:04:29 | 显示全部楼层 |阅读模式
    3.5 状态机库
    9 q/ i: g: z3 P$ K# n0 c* S状态机由一组状态组成, 这些状态分为三类: 初始状态、 中间状态和最终状态。 状态机从初始状态开始运行, 经过一系列中
    . L- q3 {1 u# ~$ h, n间状态后, 到达最终状态并退出。 在一个状态机中, 每个状态都可以接收一组特定事件, 并根据具体的事件类型转换到另一个状
    9 i$ P) ?/ [5 e  A, ~6 J( k态。 当状态机转换到最终状态时, 则退出。
    , _: K& i( T- T
    3.5.1 YARN状态转换方式& u; L' \! p& k3 I
    YARN中, 每种状态转换由一个四元组表示, 分别是转换前状态( preState) 、 转换后状态( postState) 、 事件( event) 和, B( B/ N- b* d1 u; [- R
    回调函数(
    hook) 。 YARN定义了三种状态转换方式, 具体如下:
    % m3 Q8 s/ V4 e" h7 x. U
    1) 一个初始状态、 一个最终状态、 一种事件( 见图3-18) 。 该方式表示状态机在preState状态下, 接收到Event事件后, 执行
    9 J% S; P6 L! P( R* m' b函数状态转移函数
    Hook, 并在执行完成后将当前状态转换为postState
    2 k0 w7 Z% Z. _6 J! ]
    3-18 初始状态:最终状态:事件=1:1:1
    7 _& E9 Q; C" G/ j: c2
    ) 一个初始状态、 多个最终状态、 一种事件( 见图3-19) 。 该方式表示状态机在preState状态下, 接收到Event事件后, 执行
    ; R! j2 f8 j  ]# ?* @; v/ D9 d函数状态转移函数
    Hook, 并将当前状态转移为函数Hook的返回值所表示的状态。. m. E% c5 Z$ f! x. A
    3-19 初始状态:最终状态:事件=1:N :1
    # J2 k3 q! B2 t, C* c# ^3
    ) 一个初始状态、 一个最终状态、 多种事件( 见图3-20) 。 该方式表示状态机在preState状态下, 接收到Event1Event2
    $ w$ T. h3 ^5 J7 F
    Event3中的任何一个事件, 将执行函数状态转移函数Hook, 并在执行完成后将当前状态转换为postState
    8 |; i" u& z; }( }1 M  X; L: o3-20 初始状态:最终状态:事件=1:1:N8 _; {: h. t' c+ L6 L# x2 Z
    3.5.2 状态机类  g( f) g! U3 B/ b# r
    YARN自己实现了一个非常简单的状态机库( 位于包org.apache.hadcop.yarn.state中) , 具体如图3-21所示。 YARN对外提供了. y9 k9 v/ @2 ?; C4 P4 o0 L
    一个状态机工厂
    StatemachineFactory, 它提供多种addTransition方法供用户添加各种状态转移, 一旦状态机添加完毕后, 可通过调' Y, ~. ~  K1 Z" ^; H
    installTopology完成一个状态机的构建。* Z$ F; Y4 O& y! _. [
    3-21 状态机类图
    3 e8 k; p: p' h3 `( k% M
    3.5.3 状态机的使用方法
    5 h; t9 @0 W! \4 @
    本小节将给出一个状态机应用实例, 在该实例中, 创建一个作业状态机JobStateMachine, 该状态机维护作业内部的各种状态
    ( D/ u4 R: r5 Y- b& w变化。 该状态机同时也是一个事件处理器, 当接收到某种事件后, 会触发相应的状态转移。 该实例中没有给出一个中央异步调度0 a. |' m8 d" m1 G
    器, 可以嵌到
    3.4.3节的实例程序中运行。
    / D9 H1 l6 |3 W  Z. n# k
    1) 定义作业类型。
    6 I2 w$ h/ \! F3 o# w9 g
    public enum JobEventType {
    1 n/ C, d! ~+ \9 [, |5 `0 @JOB_KILL,
    0 X& r1 J+ i! v% ]0 w7 h0 R1 JJOB_INIT,
    - Z1 @; r3 g! x8 P/ K2 vJOB_START,
    ! ~+ M! e6 U# S& Y( h9 EJOB_SETUP_COMPLETED,
    & I  O& Y9 v6 Y- ]9 Y# nJOB_COMPLETED; y3 I; w5 s8 G1 L
    }6 Y2 @+ X* y0 q+ M- R! _+ h
    2) 定义作业状态机。
    4 t1 x; {4 I, r, e$ {( }6 @
    @SuppressWarnings({ "rawtypes", "unchecked" })- K5 R. a, i7 k) ]
    public class JobStateMachine implements EventHandler<JobEvent>{
    ; g$ D7 g" U" C3 X6 \; nprivate final String jobID;
    * C7 j! H8 F8 oprivate EventHandler eventHandler;* S- P8 m+ H0 f/ S' \6 b0 c9 i9 z
    private final Lock writeLock;
    - C1 v) o  y& C1 X( B/ w' Rprivate final Lock readLock;$ P5 t5 g/ N5 N5 k' @
    // 定义状态机
    * \( h1 g7 P0 I* j- u
    protected static final* G& e+ j/ _: O& G; c0 V' P
    StateMachineFactory<JobStateMachine, JobStateInternal, JobEventType, JobEvent>
    0 ?9 X+ p# \1 Q' Z" N4 O) cstateMachineFactory/ f- j% }& T4 }
    = new StateMachineFactory<JobStateMachine, JobStateInternal, JobEventType, JobEvent>
    ( f+ C# Z8 O- X& ~( ?& y3 B! M(JobStateInternal.NEW)5 b6 U2 v4 e) x0 Q
    .addTransition(JobStateInternal.NEW, JobStateInternal.INITED,4 k' f1 p) v5 b% [0 A* ^
    JobEventType.JOB_INIT,
    : m1 J# j3 o( C7 u- k; A0 Dnew InitTransition())
    " i% V( y' R9 p0 {6 @.addTransition(JobStateInternal.INITED, JobStateInternal.SETUP,' V1 _7 P1 M5 b9 U" ~9 s+ ^$ Y7 r
    JobEventType.JOB_START,% n4 ]% k' h) ^) T
    new StartTransition())& ~: K, O0 k! y* {: I8 h7 \
    .addTransition(JobStateInternal.SETUP, JobStateInternal.RUNNING,
    % L% m  I* `$ _6 U; g4 eJobEventType.JOB_SETUP_COMPLETED,
    . s) _, I; \+ I6 M# J# `new SetupCompletedTransition())7 p. A8 m5 I% y. a$ @, |
    .addTransition
    . }9 f+ Y3 F. P2 l$ ~(JobStateInternal.RUNNING,
    " b: B" N, o  @$ y* gEnumSet.of(JobStateInternal.KILLED, JobStateInternal.SUCCEEDED),5 r- M+ e" y# s2 X
    JobEventType.JOB_COMPLETED,( H  `, i7 Z$ G' a5 n: m
    new JobTasksCompletedTransition())
    * e8 m; Z* z- [, k; e.installTopology();! t6 ]/ D6 u4 z- H) M% O# y% c
    private final StateMachine<JobStateInternal, JobEventType, JobEvent> stateMachine;, h+ K2 Q, X3 R4 h3 W
    public JobStateMachine(String jobID, EventHandler eventHandler) {! y8 u% [- {2 n! k6 V& b: J
    this.jobID = jobID;
    $ h: u9 E1 ?+ OReadWriteLock readWriteLock = new ReentrantReadWriteLock();: ^9 x5 M- E6 u- P: O
    this.readLock = readWriteLock.readLock();. F- D4 I' j/ ^
    this.writeLock = readWriteLock.writeLock();
    0 r6 Z3 k) |; ^! o, X- Othis.eventHandler = eventHandler;. C$ s" A! q+ i& E
    stateMachine = stateMachineFactory.make(this);, u1 A6 V9 A4 m( Y: g: @+ f6 e* z
    }p
    6 S; _4 k5 s; b- e, mrotected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {
    & X- i# p' [  z1 |" Sreturn stateMachine;
    1 v- H# V7 ~5 p2 H0 f} p+ G5 ~! X5 H0 S* Q# p+ @3 |) C( U
    ublic static class InitTransition
      X/ {6 S& b/ C& K+ l3 N* Wimplements SingleArcTransition<JobStateMachine, JobEvent> {
    - M3 L" o: l- R5 k8 J* J9 p@Override0 L) L: n# N+ q+ b7 P$ ?2 [
    public void transition(JobStateMachine job, JobEvent event) {6 b7 s, ~7 z& y# z: s
    System.out.println("Receiving event " + event);5 c9 G, O# Y# P! y. B# j
    job.eventHandler.handle(new JobEvent(job.getJobId(), JobEventType.JOB_START));  n8 _' z6 ?! F" Q* t
    }
    9 T; I3 v- G9 I# a} p
    , i$ o" h2 h! {ublic static class StartTransition
    9 T' D8 W( K' U' Y0 ^% himplements SingleArcTransition<JobStateMachine, JobEvent>>{% g. B& f& e; L" H
    @Override& K- Q" B' r' a1 [" V  D8 K
    public void transition(JobStateMachine job, JobEvent event) {
    / I9 Y$ \, Q+ u4 _9 OSystem.out.println("Receiving event " + event);1 g  {+ S- Y7 a
    job.eventHandler.handle(new JobEvent(job.getJobId(), JobEventType.JOB_SETUP_COMPLETED));6 _; _5 R0 N' _6 A6 v- Q
    }: G$ ~0 k* I5 z
    }…/
    3 J' H$ n# L% ^, C$ [; a/定义类SetupCompletedTransitionJobTasksCompletedTransition
    5 w1 o  m. o% q8 K, d@Override
    : b$ n1 \% ?' o7 Vpublic void handle(JobEvent event) {
    8 a+ p0 ?  l1 b0 c! F: \) O; Ztry {: P3 R: S* p; t9 V' j" f
    writeLock.lock();9 z4 q; n- f4 T- t5 a; ]
    JobStateInternal oldState = getInternalState();
    + w0 A0 Q2 \: V9 a$ }try {+ t2 b: w% y! V7 g- A  s' ]
    getStateMachine().doTransition(event.getType(), event);
      ]* i0 N. e7 m3 Y. b  C; V} catch (InvalidStateTransitonException e) {/ ]" a5 {% u9 E$ ^* N
    System.out.println("Can't handle this event at current state");
    . L+ ~9 f; h& Y; l- _}i
    1 y) l& |1 Q2 h- s9 Q2 Vf (oldState != getInternalState()) {1 j6 ?1 |% Q0 h/ X5 R
    System.out.println("Job Transitioned from " + oldState + " to "; N, U+ H+ `0 F" s) }& |
    + getInternalState());
    . K6 P# Z, o. g3 T' u}
    8 G8 r0 D. d" i+ p7 B) }}f; M& X9 n  l! _, u8 ]
    inally {7 h4 W4 n1 L3 M( u# J! T
    writeLock.unlock();4 @1 P: q2 ~% F0 r+ m- t9 A
    }8 m0 x2 {+ m9 L5 N% C; Q6 ]( E* @1 m
    } p
    # Y% \) t/ |0 v; V0 |0 q8 Hublic JobStateInternal getInternalState() {) a# J/ Z" _' ], r7 e* [# C
    readLock.lock();
    & ~+ Z/ F& p7 O& t* j, dtry {0 g# e+ J+ Z- s
    return getStateMachine().getCurrentState();# w: x3 S. ^4 J5 v' U! g! @
    } finally {
    ! Z1 t" [5 p( `readLock.unlock();& ?0 ~- i, c- V# a! s, K
    }
    " O4 x" p0 w. R+ B' V) }}p
    # @. ?! |' l4 N; Rublic enum JobStateInternal { //作业内部状态7 B4 [. R8 F" M; P5 N: e! x
    NEW,
    * _' m+ u8 r4 Z" S0 `+ xSETUP,
    4 }% a2 b) H+ f7 o4 g9 y7 _+ v1 VINITED,
    ! `9 r9 A& H- b* ]1 pRUNNING,
    ) e: ?- @4 _( a; s+ l8 S2 }SUCCEEDED,
    % P+ f8 Q0 q# z3 ^+ RKILLED,
    ; {! x* k' O1 `3 N}
    ; g0 T  j2 B  T9 S0 b6 [3 w0 \}6 L$ w+ w" e7 O1 _" S( \
    3.5.4 状态机可视化
    6 Z$ v3 x4 u, ?' E# K! I
    YARN中实现了多个状态机对象, 包括ResourceManager中的RMAppImplRMApp-AttemptImplRMContainerImpl3 Y$ T2 P- ^1 P
    RMNodeImplNodeManager中的ApplicationImplContainerImplLocalizedResourceMRAppMaster中的JobImplTaskImpl
    5 {( N$ G! w0 P& L
    TaskAttemptImpl等。 为了便于用户查看这些状态机的状态变化以及相关事件, YARN提供了一个状态机 可视化工具 [16] , 具体操
    " ~$ f: ]/ `9 N' {. r8 d- l: s作步骤如下。
    : d) r+ m; f0 S% x( A步骤
    1 将状态机转化为graphviz(.gv)格式的文件, 编译命令如下:* m+ u! z6 u4 F9 N  f8 V
    mvn compile -Pvisualize9 S  I8 ?7 `$ c+ j
    经过该步骤后, 本地目录中生成了ResourceManager.gvNodeManager.gvMapReduce.gv三个graphviz格式的文件( 有兴趣可以' s1 Z9 _& B1 H* i! q+ h& _
    直接打开查看具体内容) 。
    6 J3 R  G8 g: d  X+ W- C5 h$ Q
    步骤2 使用可视化包graphviz中的相关命令生成状态机图, Shell命令具体如下:5 L9 W8 T) S( Z) Z5 Z. r  S; M# W
    dot -Tpng NodeManager.gv > NodeManager.png
    ; f$ d# A4 i- O  R$ E: I1 U2 k如果尚未安装graphviz包, 操作该步骤之前先要安装该包。: u6 s" [/ A$ V& q' T! g4 o
    注释
    $ {, c% j# c- E5 t, I/ X
    [16] 参见网址https://issues.apache.org/jira/browse/MAPREDUCE-2930  
    , s/ Y0 F3 a! Q/ }
    , ~" z( N1 }# V! \* u0 S4 R! {% F, o. i
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|Archiver|手机版|小黑屋|Java自学网

    GMT+8, 2025-3-30 06:04 , Processed in 0.121065 second(s), 31 queries .

    Powered by Javazx

    Copyright © 2012-2022, Javazx Cloud.

    快速回复 返回顶部 返回列表