java自学网VIP

Java自学网

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 3152|回复: 0

《深入解析YARN架构设计与实现原理》第3章 YARN基础库【3.5】

[复制链接]
  • TA的每日心情
    开心
    2021-5-25 00:00
  • 签到天数: 1917 天

    [LV.Master]出神入化

    2025

    主题

    3683

    帖子

    6万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    66265

    宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2017-4-17 14:04:29 | 显示全部楼层 |阅读模式
    3.5 状态机库
    ! a6 `8 ?) r( q/ i( `0 I状态机由一组状态组成, 这些状态分为三类: 初始状态、 中间状态和最终状态。 状态机从初始状态开始运行, 经过一系列中6 A9 Z2 u3 N6 x+ Q6 @
    间状态后, 到达最终状态并退出。 在一个状态机中, 每个状态都可以接收一组特定事件, 并根据具体的事件类型转换到另一个状! q- G; k3 G, {
    态。 当状态机转换到最终状态时, 则退出。
    ! Q, q0 t( A8 Z0 r4 ]( C
    3.5.1 YARN状态转换方式
    $ x7 C3 @% b) a, p
    YARN中, 每种状态转换由一个四元组表示, 分别是转换前状态( preState) 、 转换后状态( postState) 、 事件( event) 和
    : E, f$ K( J6 H  A回调函数(
    hook) 。 YARN定义了三种状态转换方式, 具体如下:
    % B4 n1 k; b5 H  q% Z9 d
    1) 一个初始状态、 一个最终状态、 一种事件( 见图3-18) 。 该方式表示状态机在preState状态下, 接收到Event事件后, 执行4 l) j% z; n  F8 p7 [( ~
    函数状态转移函数
    Hook, 并在执行完成后将当前状态转换为postState' D- O# i: \0 k  `' P
    3-18 初始状态:最终状态:事件=1:1:16 N2 j' U- v4 e3 o4 b5 a
    2
    ) 一个初始状态、 多个最终状态、 一种事件( 见图3-19) 。 该方式表示状态机在preState状态下, 接收到Event事件后, 执行$ y$ B) ?8 H$ O! e
    函数状态转移函数
    Hook, 并将当前状态转移为函数Hook的返回值所表示的状态。
    ; y5 R4 x( \# j0 N% r$ e
    3-19 初始状态:最终状态:事件=1:N :1/ _2 ~4 `, g, U# f/ T2 G, ^7 Z
    3
    ) 一个初始状态、 一个最终状态、 多种事件( 见图3-20) 。 该方式表示状态机在preState状态下, 接收到Event1Event2
    3 Y7 o/ H* D% A; k) B
    Event3中的任何一个事件, 将执行函数状态转移函数Hook, 并在执行完成后将当前状态转换为postState
    8 R3 O/ `9 X9 L9 U% A3-20 初始状态:最终状态:事件=1:1:N. g8 D, K& N/ u' v8 i
    3.5.2 状态机类) @4 \& E9 V; [" G8 m( {
    YARN自己实现了一个非常简单的状态机库( 位于包org.apache.hadcop.yarn.state中) , 具体如图3-21所示。 YARN对外提供了
    7 G) A7 U$ G% F) O; z8 N一个状态机工厂
    StatemachineFactory, 它提供多种addTransition方法供用户添加各种状态转移, 一旦状态机添加完毕后, 可通过调; ^* H: d. i* G* a; P
    installTopology完成一个状态机的构建。
    3 B4 K3 ], E/ W/ C( k/ z
    3-21 状态机类图
    . O/ Q6 ?- A- ~6 E- }  W2 h
    3.5.3 状态机的使用方法* t8 |2 [+ N$ i! K
    本小节将给出一个状态机应用实例, 在该实例中, 创建一个作业状态机JobStateMachine, 该状态机维护作业内部的各种状态
    2 N4 f& Q' v  H; S7 a* }变化。 该状态机同时也是一个事件处理器, 当接收到某种事件后, 会触发相应的状态转移。 该实例中没有给出一个中央异步调度, `( R# [! j6 Q
    器, 可以嵌到
    3.4.3节的实例程序中运行。) E  J# x, Q, ~
    1) 定义作业类型。
    $ g) ~* g- I4 u* N6 E+ k
    public enum JobEventType {
    : F* a8 ?, ~6 E  ]: TJOB_KILL,
    1 c4 f8 P( d& ^- S$ HJOB_INIT,
    0 E( ]% O  {1 mJOB_START,( C3 n. a! q* v) Q# h
    JOB_SETUP_COMPLETED,/ u; m( H$ `! v7 m; N, e
    JOB_COMPLETED
    9 ~" Q- [( c  S4 n, H( @0 h}
    1 B1 u! c# \2 y, ^8 {2) 定义作业状态机。8 x9 k3 p8 f7 J0 H9 K: w
    @SuppressWarnings({ "rawtypes", "unchecked" })8 T+ o4 G; M' ~. S
    public class JobStateMachine implements EventHandler<JobEvent>{- n9 j  e/ _% p% {9 p. G
    private final String jobID;7 V3 W/ a5 C# q: p! o; `8 G; h
    private EventHandler eventHandler;
    , \4 Y6 _7 O; m+ f- fprivate final Lock writeLock;. Q$ V) e# `# K0 _7 c
    private final Lock readLock;
    # O9 ]* H+ R2 b  |" Q2 E0 D// 定义状态机
    1 [" M0 y- U4 m$ A6 M
    protected static final
    ; N. S0 q5 v% c% z. C+ D) w8 g! QStateMachineFactory<JobStateMachine, JobStateInternal, JobEventType, JobEvent>
    . b8 X  p5 J2 l. g1 S& LstateMachineFactory
    2 k: S+ R! ^- o& ?0 U* B  j= new StateMachineFactory<JobStateMachine, JobStateInternal, JobEventType, JobEvent>) o( i$ ^8 K: W: k% Q
    (JobStateInternal.NEW)- c; G/ q4 K( {
    .addTransition(JobStateInternal.NEW, JobStateInternal.INITED,3 P( k; i* c0 j( k3 K) Y% S1 X
    JobEventType.JOB_INIT,
    6 `# f) B0 b  l, l5 M. Z+ mnew InitTransition())
    ' ?) e1 v$ x6 h9 v- k7 X. S# `.addTransition(JobStateInternal.INITED, JobStateInternal.SETUP,
    5 K$ J' L# @1 ~" x4 JJobEventType.JOB_START,& [* D: G' v% Y5 R1 I* R2 r( N
    new StartTransition())
    ; E0 K, b1 M$ J, y, `.addTransition(JobStateInternal.SETUP, JobStateInternal.RUNNING,
    % Z2 b% B9 h" v. W5 U, I, |JobEventType.JOB_SETUP_COMPLETED,; S8 L7 u' L7 h* Z4 b
    new SetupCompletedTransition())
    # j8 ?- u: v4 K! x- z.addTransition
    5 R! k3 C+ O* ?, [4 N  A, ?(JobStateInternal.RUNNING,# ]+ r: F4 ?/ |8 u
    EnumSet.of(JobStateInternal.KILLED, JobStateInternal.SUCCEEDED),; ?! X. y" m) L# I/ [1 |
    JobEventType.JOB_COMPLETED,
    % h$ @0 D$ l: M# g9 Qnew JobTasksCompletedTransition())) U3 c6 ?7 S9 o. d* B$ ?! f
    .installTopology();; {+ Z; K( a5 @' g4 f
    private final StateMachine<JobStateInternal, JobEventType, JobEvent> stateMachine;" s6 B: h5 L- _' \+ y
    public JobStateMachine(String jobID, EventHandler eventHandler) {
    4 I! B0 l5 l& z3 a" ?9 B- `& sthis.jobID = jobID;5 W* ]/ e' q; K7 `. K) g1 a7 I4 F
    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    7 z; g; O7 A1 {this.readLock = readWriteLock.readLock();
    / P( @  @4 x. i  v" o+ v( hthis.writeLock = readWriteLock.writeLock();( l; h+ F, O0 d% _. U
    this.eventHandler = eventHandler;
    * M! E4 _% H. r! M3 o* nstateMachine = stateMachineFactory.make(this);* n- {" C/ ~4 r# |9 ]
    }p
    0 d+ U5 B7 T6 u/ s* k0 ]rotected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {
    % F7 \0 T7 p! F& R' z, M/ }return stateMachine;
    * n: Q! o% }( K" V7 ^3 P} p
    $ m0 @1 P4 c3 s6 w5 ?/ R6 f# i9 |ublic static class InitTransition, A" Z/ j7 h$ i8 {3 Y, I; z& h
    implements SingleArcTransition<JobStateMachine, JobEvent> {
    7 b, y$ j' I/ }# b4 O. g2 h@Override* n' e# ~% {0 r7 }- e
    public void transition(JobStateMachine job, JobEvent event) {) K& y5 t* J# R
    System.out.println("Receiving event " + event);
    $ o7 `9 c# B1 J+ a  M2 }job.eventHandler.handle(new JobEvent(job.getJobId(), JobEventType.JOB_START));
    ; T# \! {6 T. f* I}/ ~. k# X4 u7 A8 ^
    } p
    9 @) X; l9 R9 b6 K4 y/ v5 N( Lublic static class StartTransition( N0 z' s8 s0 z7 N' a1 y5 [/ }
    implements SingleArcTransition<JobStateMachine, JobEvent>>{
    ! j% `3 a( _( s3 n9 d. ^@Override+ T  S6 t5 E& ^3 A
    public void transition(JobStateMachine job, JobEvent event) {
    ) _) d) ?& v/ J6 lSystem.out.println("Receiving event " + event);& g" G; t' k' u& P/ ?+ Q( R' a
    job.eventHandler.handle(new JobEvent(job.getJobId(), JobEventType.JOB_SETUP_COMPLETED));* f; Q) Y' s& ]
    }
    % Q1 }9 y- g0 B: m* V6 V& F}…/% w2 l: E( ]& M7 u
    /定义类SetupCompletedTransitionJobTasksCompletedTransition
    5 B0 ^# X, A3 L1 a4 x* H@Override
    , z. n( T1 h/ z$ L5 S* ^/ Rpublic void handle(JobEvent event) {) j  ~; x' l: u' s# h( y4 I
    try {2 @8 h+ L' L8 u' Q$ S, j0 A
    writeLock.lock();( b" N) d* \8 h' z  A' Y2 R* \7 y
    JobStateInternal oldState = getInternalState();
    0 z/ k1 R! A8 P9 r" p6 ntry {
    8 h/ k+ `3 n% s7 `& r2 ugetStateMachine().doTransition(event.getType(), event);" Z( D$ i! U7 g2 S# K
    } catch (InvalidStateTransitonException e) {
    & q: `. r+ z6 f8 Q* xSystem.out.println("Can't handle this event at current state");+ |! c2 R4 m) K, S- Y# m
    }i2 m) r5 s9 W/ M4 `5 P. h
    f (oldState != getInternalState()) {
    & A( G0 C9 l+ oSystem.out.println("Job Transitioned from " + oldState + " to "
    4 j8 V# s, t4 Q1 _; Z. V( q+ getInternalState());! t( u) c0 ^3 _
    }& k  e+ P& j( i! X% r. [! n
    }f
    8 D. s1 c$ W2 Q' rinally {
    * t9 g% Z( O  @) f2 ]3 G; s! z" }writeLock.unlock();6 K" i7 Q; P, s# W$ @: ~
    }* U0 Z" y5 A# M/ x2 W
    } p0 v# ^- `- c( b
    ublic JobStateInternal getInternalState() {8 ^0 R" i) G  w2 |4 g/ g
    readLock.lock();) U) z2 H' h  ]
    try {
    3 Y# L* I( _8 s, x0 \return getStateMachine().getCurrentState();
    ' ~( ]) F! U& p0 _} finally {# W" B% Y: V' s2 k0 Z' h
    readLock.unlock();
    , O' W2 Z# X  K" H+ |# G# z% T}
    % [/ f: r* v7 U7 ]5 {9 K}p1 k: s6 L) d4 o6 k6 ?
    ublic enum JobStateInternal { //作业内部状态) g! [6 R8 }% U; `9 s
    NEW,
    2 l. {; X8 t1 K: `" U7 e: q& DSETUP,
    3 S0 k+ B* C2 iINITED,! F3 U$ d9 Y1 p9 j" a  \
    RUNNING,
    " l1 }* \8 {; I- T; rSUCCEEDED,/ i2 t8 |( W6 m" q0 x( F9 T+ Y" R
    KILLED,
    3 W7 j. B- w8 g}. x8 |3 \( A1 L9 g8 |4 W# \
    }1 T! o2 v5 ?4 b
    3.5.4 状态机可视化3 v# T$ U( ^! U% u, M  H* \
    YARN中实现了多个状态机对象, 包括ResourceManager中的RMAppImplRMApp-AttemptImplRMContainerImpl
    & W' Q$ M7 A0 S
    RMNodeImplNodeManager中的ApplicationImplContainerImplLocalizedResourceMRAppMaster中的JobImplTaskImpl2 k, o- ~( r3 m4 K: Q
    TaskAttemptImpl等。 为了便于用户查看这些状态机的状态变化以及相关事件, YARN提供了一个状态机 可视化工具 [16] , 具体操! d* j( m1 y$ Z: b
    作步骤如下。
    ) R- e3 k: Y3 `2 m: M7 o  a步骤
    1 将状态机转化为graphviz(.gv)格式的文件, 编译命令如下:; b! Y- z6 f& |" V" K$ ?: u
    mvn compile -Pvisualize3 x1 T) ~4 _1 D0 Y4 d. B3 b
    经过该步骤后, 本地目录中生成了ResourceManager.gvNodeManager.gvMapReduce.gv三个graphviz格式的文件( 有兴趣可以3 H: F2 o+ w0 {$ E! x
    直接打开查看具体内容) 。
    6 l  L2 O+ G7 E% L
    步骤2 使用可视化包graphviz中的相关命令生成状态机图, Shell命令具体如下:
    8 o2 C5 Q( P: l  Q- ]' g
    dot -Tpng NodeManager.gv > NodeManager.png' d3 r: l! c9 V4 y5 H. a% s
    如果尚未安装graphviz包, 操作该步骤之前先要安装该包。
    4 s0 `) j7 s# ^) H) y注释7 k9 H, _1 S) t
    [16] 参见网址https://issues.apache.org/jira/browse/MAPREDUCE-2930  
    . \* p" ]) h( C% x! `3 N% c" Y4 i
    + v, I# ~: i  W, V% g! A" d0 C2 @
    8 z" m1 i0 x  s- k
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|Archiver|手机版|小黑屋|Java自学网

    GMT+8, 2024-9-8 09:49 , Processed in 0.113377 second(s), 31 queries .

    Powered by Javazx

    Copyright © 2012-2022, Javazx Cloud.

    快速回复 返回顶部 返回列表