java自学网VIP

Java自学网

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 3334|回复: 0

《深入解析YARN架构设计与实现原理》第3章 YARN基础库【3.5】

[复制链接]
  • TA的每日心情
    开心
    2021-5-25 00:00
  • 签到天数: 1917 天

    [LV.Master]出神入化

    2062

    主题

    3720

    帖子

    6万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    66592

    宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2017-4-17 14:04:29 | 显示全部楼层 |阅读模式
    3.5 状态机库. a9 l; }" b' Y3 f
    状态机由一组状态组成, 这些状态分为三类: 初始状态、 中间状态和最终状态。 状态机从初始状态开始运行, 经过一系列中4 S" ~! F7 \$ B/ ?1 J, C1 V
    间状态后, 到达最终状态并退出。 在一个状态机中, 每个状态都可以接收一组特定事件, 并根据具体的事件类型转换到另一个状
    * ]1 I+ R) c/ F) I2 f+ K态。 当状态机转换到最终状态时, 则退出。
    . B) P2 Y2 a. f) G2 T3 V' b
    3.5.1 YARN状态转换方式" u( S9 B- D. ]2 s6 s( e; ?
    YARN中, 每种状态转换由一个四元组表示, 分别是转换前状态( preState) 、 转换后状态( postState) 、 事件( event) 和
    * _$ R1 U0 c- ~" _, w回调函数(
    hook) 。 YARN定义了三种状态转换方式, 具体如下:) x1 o& Q' F( D! c: v! M
    1) 一个初始状态、 一个最终状态、 一种事件( 见图3-18) 。 该方式表示状态机在preState状态下, 接收到Event事件后, 执行
    / S7 s% M  c8 M$ f1 j5 ]2 c函数状态转移函数
    Hook, 并在执行完成后将当前状态转换为postState  Y, K4 `3 Z. l2 |3 N. w6 d6 W' h
    3-18 初始状态:最终状态:事件=1:1:1
    6 r9 o8 c( o" V  {9 K& O1 T& [2
    ) 一个初始状态、 多个最终状态、 一种事件( 见图3-19) 。 该方式表示状态机在preState状态下, 接收到Event事件后, 执行
    % L4 r$ }4 L9 [( M6 I9 K函数状态转移函数
    Hook, 并将当前状态转移为函数Hook的返回值所表示的状态。9 |0 \. m/ h7 I  L2 V
    3-19 初始状态:最终状态:事件=1:N :1
    # ?( L" s8 c) E. R( P- J! W8 F3
    ) 一个初始状态、 一个最终状态、 多种事件( 见图3-20) 。 该方式表示状态机在preState状态下, 接收到Event1Event25 H) n# {3 _( e$ r% O- \. F5 b" s
    Event3中的任何一个事件, 将执行函数状态转移函数Hook, 并在执行完成后将当前状态转换为postState/ R. Z1 H. X5 G: D
    3-20 初始状态:最终状态:事件=1:1:N
    0 _. F6 q5 I5 p0 e8 k6 D! W5 a
    3.5.2 状态机类$ x* Q- N, o; S( o5 _
    YARN自己实现了一个非常简单的状态机库( 位于包org.apache.hadcop.yarn.state中) , 具体如图3-21所示。 YARN对外提供了; z9 [5 N; }1 \* b0 U) z
    一个状态机工厂
    StatemachineFactory, 它提供多种addTransition方法供用户添加各种状态转移, 一旦状态机添加完毕后, 可通过调
    4 Y& I( Z! N; Q2 n# e
    installTopology完成一个状态机的构建。, z- |, k; h- ?
    3-21 状态机类图) Y3 c/ ]  V2 N- Q- E( G: C% }
    3.5.3 状态机的使用方法2 l0 h6 F: t- Q& q) g
    本小节将给出一个状态机应用实例, 在该实例中, 创建一个作业状态机JobStateMachine, 该状态机维护作业内部的各种状态
    5 \7 I, b! R( k- l0 {变化。 该状态机同时也是一个事件处理器, 当接收到某种事件后, 会触发相应的状态转移。 该实例中没有给出一个中央异步调度/ e$ X$ d7 K% K/ k
    器, 可以嵌到
    3.4.3节的实例程序中运行。
    + k( W! p0 m$ a
    1) 定义作业类型。7 \, r! @# N. N; o
    public enum JobEventType {6 I' M/ Z4 h/ H3 _$ v" r
    JOB_KILL,
    - v' m7 }1 k1 b+ |) w. {+ ~JOB_INIT,
    " u" X' B1 d& Y! iJOB_START,! s7 U( @# V% O0 k) |2 v2 m
    JOB_SETUP_COMPLETED,
    " `% Y, \6 I7 j' m* R8 wJOB_COMPLETED' a7 J& M8 S- G9 J
    }$ G8 F" }  `! q$ Z& A4 E2 C7 \  Y$ x3 i
    2) 定义作业状态机。  z3 A& c: A" l4 l+ s3 X
    @SuppressWarnings({ "rawtypes", "unchecked" })& [; Q0 T6 _2 @3 w0 X- {
    public class JobStateMachine implements EventHandler<JobEvent>{
    ' j+ _. C5 r) d6 J; }3 \1 x- Dprivate final String jobID;
    6 V7 R7 s5 p7 Y' oprivate EventHandler eventHandler;" o* X! d4 |8 }
    private final Lock writeLock;/ h" l6 L) ]: Z" T
    private final Lock readLock;) z1 I* K* ^+ }: |6 I9 Y
    // 定义状态机+ S+ Y; E. a* j, F; s- z
    protected static final+ ]0 T. Y/ D' v) ~; V  l) z* _
    StateMachineFactory<JobStateMachine, JobStateInternal, JobEventType, JobEvent>, _- O( C- D; a9 R! O
    stateMachineFactory! i1 I8 O( t8 O7 _
    = new StateMachineFactory<JobStateMachine, JobStateInternal, JobEventType, JobEvent>  `' n; x  H3 A9 i6 F4 F6 B
    (JobStateInternal.NEW); D9 |: F6 c2 m8 ^
    .addTransition(JobStateInternal.NEW, JobStateInternal.INITED,; S! u7 ~% N3 _# q! Z+ }
    JobEventType.JOB_INIT," x' n2 E3 h8 S5 b
    new InitTransition()), G9 J5 Q0 P9 f9 [1 B/ W
    .addTransition(JobStateInternal.INITED, JobStateInternal.SETUP,
    $ U& q. l  i+ D2 u; {JobEventType.JOB_START,
    $ s: y6 m4 B) gnew StartTransition())
    9 r( L( `/ [; F.addTransition(JobStateInternal.SETUP, JobStateInternal.RUNNING,6 ], m2 N6 ~9 J! p. k! X
    JobEventType.JOB_SETUP_COMPLETED,
    " O- P$ h- s% k- G. I* Tnew SetupCompletedTransition())
    $ P$ _4 h2 [# d3 Z6 Z: @.addTransition
    5 O6 v) T+ x& o  u(JobStateInternal.RUNNING,
    ! I( j" L) v3 p  s9 ]! o. T# PEnumSet.of(JobStateInternal.KILLED, JobStateInternal.SUCCEEDED),
      R$ _* S# [# D- T0 NJobEventType.JOB_COMPLETED,
    6 E  m& J4 g9 N/ `! [' x+ j  P, ?9 l* t3 pnew JobTasksCompletedTransition())
      u& a' L4 N+ z2 H) A.installTopology();. L! D# A9 G: a9 v# Y  y' j
    private final StateMachine<JobStateInternal, JobEventType, JobEvent> stateMachine;6 |# D* U1 V8 m1 D8 V
    public JobStateMachine(String jobID, EventHandler eventHandler) {
    5 h' G) B+ Y# M" Q+ t/ w: othis.jobID = jobID;
    2 f- [- m; Y5 I5 w1 ]ReadWriteLock readWriteLock = new ReentrantReadWriteLock();0 j9 j# n! p1 K2 ?% t9 y. z
    this.readLock = readWriteLock.readLock();, E/ Y6 [  D: N
    this.writeLock = readWriteLock.writeLock();
    2 H0 j% d* D* s4 z, i' ythis.eventHandler = eventHandler;* S; U( L( L! S9 U' j& f# U
    stateMachine = stateMachineFactory.make(this);
    - o: u( n) C, P( A) t}p
    1 x4 y* x. e" [  _rotected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {
    # u: B; a# P1 S0 b. Xreturn stateMachine;
    ) {0 F; L( _, O  ?1 [+ K) h  m" h2 b} p
    * J8 _" q. S$ O  F3 H0 x. Q+ Qublic static class InitTransition$ l6 }, x& q1 m/ `
    implements SingleArcTransition<JobStateMachine, JobEvent> {
    5 @" g) J% v/ l9 T  \5 O@Override
    7 d7 @- @1 s; E* q( A9 rpublic void transition(JobStateMachine job, JobEvent event) {
    ' U) _. C+ @1 S: tSystem.out.println("Receiving event " + event);$ p/ d0 v- B7 g, s7 o: o
    job.eventHandler.handle(new JobEvent(job.getJobId(), JobEventType.JOB_START));: z( K5 C" r& v
    }
    9 W1 I. Y3 \5 w) ?9 ^) M} p
    ' g% x* L9 @( ]3 @  o% X5 b+ j1 fublic static class StartTransition
    5 k+ R- i* h/ S: R9 a' wimplements SingleArcTransition<JobStateMachine, JobEvent>>{! c# _9 A; t+ k' c$ f  U# x& [7 p
    @Override
    6 d, W' r1 `/ ]  T+ K. V) Ypublic void transition(JobStateMachine job, JobEvent event) {* h" w  C2 d9 D" D
    System.out.println("Receiving event " + event);
    / ]; ]4 u. t+ ]/ q) R9 s  bjob.eventHandler.handle(new JobEvent(job.getJobId(), JobEventType.JOB_SETUP_COMPLETED));
    # c3 {' A+ g% a  }! c}  J5 |* l9 M1 v" N/ V5 Z
    }…/: \& h, Q2 l. i  W9 p0 A- Q" V8 g
    /定义类SetupCompletedTransitionJobTasksCompletedTransition+ ?& @7 Q$ L# g/ |
    @Override
    " p& `4 S, b1 Q0 e# S/ E5 npublic void handle(JobEvent event) {6 {- ~0 W' \  A: P) P8 P
    try {
    " H3 V* n/ t) DwriteLock.lock();
    5 M, o4 ]6 Y0 o' V, GJobStateInternal oldState = getInternalState();
    * U. ]8 C, J: ^: O5 Ltry {7 l) U& Z: ^2 f* ^# c8 s
    getStateMachine().doTransition(event.getType(), event);. Q% K- J0 l7 Y' L
    } catch (InvalidStateTransitonException e) {6 b& \& s, X- O
    System.out.println("Can't handle this event at current state");
    5 H' K9 T) y# {}i
    ) m/ a$ K6 U5 }f (oldState != getInternalState()) {9 c. l$ Q+ P- W0 \  E
    System.out.println("Job Transitioned from " + oldState + " to "
    ; e9 ~. u# L2 n: G9 a& t+ getInternalState());; W. [+ A0 A* o
    }
    : L0 w" y$ Z! y4 R5 m}f0 H$ T: P: i  b
    inally {# ]4 y6 R$ F! N
    writeLock.unlock();
    ' u8 [, Y7 e3 j0 S* ]}7 F, j- H" e9 i
    } p
    * o1 s- U/ t3 R2 J, [ublic JobStateInternal getInternalState() {4 C, [+ a+ q* b
    readLock.lock();
    ! N- B) f8 ~% E. ^% e- g" Stry {
    & R: ]4 z% u* y  freturn getStateMachine().getCurrentState();
    ' F3 l7 \, N8 K5 R} finally {' T+ ]# |/ k0 ^
    readLock.unlock();; F) I/ Z& p$ R9 ^
    }
    , `( D/ o! f/ j}p
      o: }& E8 S: T# Lublic enum JobStateInternal { //作业内部状态
    6 O  l: E" g: y0 r# ]
    NEW,
    0 j* V6 Q: g7 j9 y$ z# k' jSETUP,( U& R. w0 @/ J3 _3 E) y
    INITED,3 X# [/ V; [" Q; n) P" C+ X5 |: B
    RUNNING,/ X8 a' U+ X  M  S+ b
    SUCCEEDED,% T' }& @1 v) n. x9 z5 g
    KILLED,5 @; H4 F6 @" F* F/ o) v' a  }
    }
    ! W$ X1 T& i! y1 F) z5 k) n. _}3 C3 A* M8 z: O9 C/ z! l: N
    3.5.4 状态机可视化
    1 e% J% [& q. P7 L+ ^
    YARN中实现了多个状态机对象, 包括ResourceManager中的RMAppImplRMApp-AttemptImplRMContainerImpl/ f5 l( ?1 f- y: V* _8 T
    RMNodeImplNodeManager中的ApplicationImplContainerImplLocalizedResourceMRAppMaster中的JobImplTaskImpl5 b* c# ]/ i& `
    TaskAttemptImpl等。 为了便于用户查看这些状态机的状态变化以及相关事件, YARN提供了一个状态机 可视化工具 [16] , 具体操
    , |1 [1 N  j5 A作步骤如下。6 g0 h5 H/ |& J& F
    步骤
    1 将状态机转化为graphviz(.gv)格式的文件, 编译命令如下:7 c0 B+ x; o' I; Z" ]3 T4 _, @
    mvn compile -Pvisualize
    ' c; t7 }& S1 q0 R- R经过该步骤后, 本地目录中生成了ResourceManager.gvNodeManager.gvMapReduce.gv三个graphviz格式的文件( 有兴趣可以" h) ?' e/ j, z7 L
    直接打开查看具体内容) 。
    ; U; k" E+ g! ?* r% I
    步骤2 使用可视化包graphviz中的相关命令生成状态机图, Shell命令具体如下:
    : X5 Z/ o8 T! l2 N# x
    dot -Tpng NodeManager.gv > NodeManager.png
    : f8 e$ i0 i1 v. R, f如果尚未安装graphviz包, 操作该步骤之前先要安装该包。
    2 V% [9 ~5 p2 _& w1 v注释
    ' T" x$ \' C: t5 x1 e
    [16] 参见网址https://issues.apache.org/jira/browse/MAPREDUCE-2930  
    ! e. v, P5 ?1 h2 ~4 f8 n, S4 `3 @7 D( v1 `, M1 m% G# t  v

    ' l0 ]/ T/ V' c) e( J4 Z0 j% c
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|Archiver|手机版|小黑屋|Java自学网

    GMT+8, 2025-2-23 04:39 , Processed in 0.175558 second(s), 26 queries .

    Powered by Javazx

    Copyright © 2012-2022, Javazx Cloud.

    快速回复 返回顶部 返回列表