java自学网VIP

Java自学网

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 3204|回复: 0

《深入解析YARN架构设计与实现原理》第3章 YARN基础库【3.5】

[复制链接]
  • TA的每日心情
    开心
    2021-5-25 00:00
  • 签到天数: 1917 天

    [LV.Master]出神入化

    2025

    主题

    3683

    帖子

    6万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    66345

    宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2017-4-17 14:04:29 | 显示全部楼层 |阅读模式
    3.5 状态机库
    $ L, v% K3 _; \% j* Y状态机由一组状态组成, 这些状态分为三类: 初始状态、 中间状态和最终状态。 状态机从初始状态开始运行, 经过一系列中
    : D  j" i% A1 n# I. r8 ~+ T间状态后, 到达最终状态并退出。 在一个状态机中, 每个状态都可以接收一组特定事件, 并根据具体的事件类型转换到另一个状
    % F% |3 _6 l1 O' M3 A) b态。 当状态机转换到最终状态时, 则退出。. B) I- _3 A/ o% }2 p
    3.5.1 YARN状态转换方式0 g; F4 p1 H7 M( _( D! e
    YARN中, 每种状态转换由一个四元组表示, 分别是转换前状态( preState) 、 转换后状态( postState) 、 事件( event) 和' K7 H& x4 d/ t$ j' h5 W
    回调函数(
    hook) 。 YARN定义了三种状态转换方式, 具体如下:
    2 _/ V. r( F  r
    1) 一个初始状态、 一个最终状态、 一种事件( 见图3-18) 。 该方式表示状态机在preState状态下, 接收到Event事件后, 执行
    2 B. S4 }& N( s, v函数状态转移函数
    Hook, 并在执行完成后将当前状态转换为postState
    5 q. V/ o% ~" ~6 C/ i8 [
    3-18 初始状态:最终状态:事件=1:1:1
    * F! k7 r! t/ A: ^0 O0 {2
    ) 一个初始状态、 多个最终状态、 一种事件( 见图3-19) 。 该方式表示状态机在preState状态下, 接收到Event事件后, 执行
    2 G; j* o" e& t) A9 {/ H5 [. R9 D函数状态转移函数
    Hook, 并将当前状态转移为函数Hook的返回值所表示的状态。  L9 |. d8 H3 L3 S$ @$ N
    3-19 初始状态:最终状态:事件=1:N :1
    8 v8 J- }6 T# {# a' h# h3
    ) 一个初始状态、 一个最终状态、 多种事件( 见图3-20) 。 该方式表示状态机在preState状态下, 接收到Event1Event25 G, i. P3 `. ^' A8 k
    Event3中的任何一个事件, 将执行函数状态转移函数Hook, 并在执行完成后将当前状态转换为postState" Q" R! m& S, U3 S$ ?
    3-20 初始状态:最终状态:事件=1:1:N- q, z; C7 k" e/ [; d3 Q7 z
    3.5.2 状态机类
    $ c' W' S4 q4 a4 l
    YARN自己实现了一个非常简单的状态机库( 位于包org.apache.hadcop.yarn.state中) , 具体如图3-21所示。 YARN对外提供了. ~4 J; j* y( x2 w1 {; N
    一个状态机工厂
    StatemachineFactory, 它提供多种addTransition方法供用户添加各种状态转移, 一旦状态机添加完毕后, 可通过调
    & D( Q$ [& c! X+ C
    installTopology完成一个状态机的构建。6 X& a5 W9 }! X1 U, D3 l& a+ u
    3-21 状态机类图9 O1 t4 A; p9 ^% V% K) O* O$ `9 R1 D
    3.5.3 状态机的使用方法
    % [, H# z. E! P3 D2 y
    本小节将给出一个状态机应用实例, 在该实例中, 创建一个作业状态机JobStateMachine, 该状态机维护作业内部的各种状态
    0 m4 I" T4 O- k0 q& Z- d变化。 该状态机同时也是一个事件处理器, 当接收到某种事件后, 会触发相应的状态转移。 该实例中没有给出一个中央异步调度! W# T. k+ E9 [' ]) x) n4 V* N
    器, 可以嵌到
    3.4.3节的实例程序中运行。
    ' i2 Z4 o1 i7 Y' S, n- u" [# f
    1) 定义作业类型。! ]& R. Z$ M" N0 T( k: M9 k7 s
    public enum JobEventType {
    * V) M+ }+ M: i* DJOB_KILL,8 h2 u5 E7 s1 G9 R; `
    JOB_INIT,
    5 p6 i5 c* |1 n9 aJOB_START,
    + x! `+ ]  \5 r- s! n, N: M9 D3 }JOB_SETUP_COMPLETED,
    ' d6 M' f/ M. f# G  zJOB_COMPLETED
    4 Y5 ?4 A7 H3 v& z8 d0 ]}
    ; o6 t) t* c. t# B, q! {5 j. n2) 定义作业状态机。* ]  E4 q+ `) b; \- X/ y3 a
    @SuppressWarnings({ "rawtypes", "unchecked" })
    4 K4 X3 ~3 r6 B$ J$ r* h, v- `# Ipublic class JobStateMachine implements EventHandler<JobEvent>{
    - ?( H8 m' R8 \private final String jobID;
    ! W0 r0 U5 h8 }: @private EventHandler eventHandler;. W% H( C; T. w- y3 ]
    private final Lock writeLock;
    ' D& d" @3 F- A* a8 }% t) Tprivate final Lock readLock;0 n$ o5 a! l# b8 F1 G$ h; [
    // 定义状态机
    " `0 d, O- n1 E2 Y
    protected static final
    $ U" g* v9 b* K& b; d3 bStateMachineFactory<JobStateMachine, JobStateInternal, JobEventType, JobEvent>5 r) o/ }+ C' ^- @. z
    stateMachineFactory
    ) M: r* W: n" G& ]/ G4 H= new StateMachineFactory<JobStateMachine, JobStateInternal, JobEventType, JobEvent>
    4 {( q2 r; |; @+ O# x(JobStateInternal.NEW)
    / @5 O) T$ E/ b+ m.addTransition(JobStateInternal.NEW, JobStateInternal.INITED,
    : `  C3 }8 q; g: x/ H1 TJobEventType.JOB_INIT,
    ' K( [' L) B8 V, \  Knew InitTransition())! f8 C" V3 ]- v0 O; b
    .addTransition(JobStateInternal.INITED, JobStateInternal.SETUP,8 z* A+ I3 t9 i8 W
    JobEventType.JOB_START,
      z# A9 d4 @9 \1 a& znew StartTransition())6 c5 d' z5 }, y/ F8 c  b; P+ |8 ?
    .addTransition(JobStateInternal.SETUP, JobStateInternal.RUNNING," b0 S" k# F6 E) z7 U% U/ a- ]% o# X& \
    JobEventType.JOB_SETUP_COMPLETED,
    8 K$ p4 Z- z8 C2 k# |3 k4 C/ {1 ynew SetupCompletedTransition()); X# p$ U+ E; x8 J5 k
    .addTransition
    0 i" @; H1 H4 D0 w" |(JobStateInternal.RUNNING,
    % K" J( C% P) b2 w0 E* H4 h# EEnumSet.of(JobStateInternal.KILLED, JobStateInternal.SUCCEEDED),- A# {* _1 E- w" G- l' f' @( }
    JobEventType.JOB_COMPLETED,9 {, ]6 j( r* f
    new JobTasksCompletedTransition())
    2 F! [, n1 G( k! D.installTopology();
    " p# e) n. m9 t' yprivate final StateMachine<JobStateInternal, JobEventType, JobEvent> stateMachine;
    5 w% I9 g: p* Hpublic JobStateMachine(String jobID, EventHandler eventHandler) {
    : b7 [3 T5 h! h: A; Sthis.jobID = jobID;! U0 {$ b% j% j$ Z4 G' P
    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();% d' ]  U( {' \; C
    this.readLock = readWriteLock.readLock();* g9 K4 E5 @; j) t
    this.writeLock = readWriteLock.writeLock();
    " i4 c* u0 `+ C2 m+ O# I7 ethis.eventHandler = eventHandler;7 a% m# |1 i0 Z* ^9 j. H* h. M
    stateMachine = stateMachineFactory.make(this);/ i! x' a; \/ i* z
    }p
    4 c0 c; x/ e) t7 Frotected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {" @5 `  t, g& i$ G7 F
    return stateMachine;
    , }7 ~; C& W& Z9 E5 K6 U4 D} p
    , V+ l) K  K) D- q" Lublic static class InitTransition
    7 G# H$ ]! u+ A% h8 X# r7 S$ Bimplements SingleArcTransition<JobStateMachine, JobEvent> {; P  X3 h5 U8 z5 `7 q: H+ o: v
    @Override4 U. q) H6 f1 D  p# `
    public void transition(JobStateMachine job, JobEvent event) {/ f8 C! M9 D, Y  g
    System.out.println("Receiving event " + event);4 n1 y8 c( g: p! `; X' L
    job.eventHandler.handle(new JobEvent(job.getJobId(), JobEventType.JOB_START));3 v4 ~3 h" A, Y' P! k
    }
    1 n8 \4 S& x! W} p; \8 y2 F& s$ ~# W) P
    ublic static class StartTransition* e9 J8 ^& D3 O& h2 p6 c  C7 f
    implements SingleArcTransition<JobStateMachine, JobEvent>>{1 o1 {$ {9 w+ E7 F
    @Override" U9 M' f& {' Z% |5 T
    public void transition(JobStateMachine job, JobEvent event) {
    ( l* b% o+ ~8 wSystem.out.println("Receiving event " + event);
    / V  g, V& M3 ~3 U! S8 ^job.eventHandler.handle(new JobEvent(job.getJobId(), JobEventType.JOB_SETUP_COMPLETED));% }3 {. Q! j+ l; N9 A: o
    }7 D7 U' l/ u# i1 z/ _' n/ N& F
    }…/
    . n/ ~% W3 e0 C+ W1 ~/定义类SetupCompletedTransitionJobTasksCompletedTransition
    4 ^9 X; T1 |$ k# |1 f@Override& y0 n) V3 v* S9 I! G# i
    public void handle(JobEvent event) {
    0 E9 Q" W( X& s3 ~0 p8 xtry {; i( ^5 l. @# G5 `
    writeLock.lock();
    4 v* k" h5 n  T# t( zJobStateInternal oldState = getInternalState();
    * O* N/ c* o+ q5 {* ytry {
    % t/ L  @- Z2 T% b, S2 w$ \getStateMachine().doTransition(event.getType(), event);
    9 H& }& b* [" T8 k8 d! c0 c5 \} catch (InvalidStateTransitonException e) {2 w" \1 t6 l' P3 \' Y
    System.out.println("Can't handle this event at current state");5 I; U) G: t! F1 c  X2 K
    }i
      J/ g0 t! g3 M8 E/ of (oldState != getInternalState()) {
    0 b2 J6 p4 d3 J4 u: r* l5 h" fSystem.out.println("Job Transitioned from " + oldState + " to "
    8 d+ E" k8 n9 T! o, C+ getInternalState());
    * s! ]% j, O9 H  a6 P; ?}7 [- Q7 n! c8 P4 Q1 x: `
    }f" }# {  [3 W8 T+ A1 M. e
    inally {
    ( j4 k/ u6 s9 Q9 }' p; ZwriteLock.unlock();0 d% B: G4 E. L" @5 W, `; v+ _
    }
    8 W) r1 {/ _" M9 j& |6 K) b8 f} p
    ; L9 C5 _9 d+ ]7 h$ O' v1 ?ublic JobStateInternal getInternalState() {6 h7 K) B. ^( G- W
    readLock.lock();
    + ?" K8 Q0 c! G8 q$ Etry {
    1 M7 \8 K( W; K0 |4 E8 xreturn getStateMachine().getCurrentState();+ K$ |" m7 c& T
    } finally {
    ; Y7 U: a: ]" |) [4 A- NreadLock.unlock();
    7 E4 I/ G$ z6 J}
    ! T, ~# M8 C/ v* E}p+ _5 g  |" c( F9 y. s* a; a
    ublic enum JobStateInternal { //作业内部状态$ v* x! w% L4 W2 O+ ?# P
    NEW,8 o" d, L! q* M
    SETUP,( E1 h8 q. C$ L8 |# ?6 o# L
    INITED,
    / Y5 V4 l  H% K% X4 v2 ERUNNING,
    * L" x) s5 A2 PSUCCEEDED,; A5 @1 G- h+ ~, F' O
    KILLED,* K- |" A  z! G8 u6 C. M3 x
    }
      h$ [0 U6 u2 A+ l}
    " X$ ?9 v4 d; l, }( F/ l) G) F! w3.5.4 状态机可视化* `2 ~" ]: v% K% |4 D; v& T4 ~4 B5 s
    YARN中实现了多个状态机对象, 包括ResourceManager中的RMAppImplRMApp-AttemptImplRMContainerImpl
    " }2 [; Y  ~8 b9 G9 B
    RMNodeImplNodeManager中的ApplicationImplContainerImplLocalizedResourceMRAppMaster中的JobImplTaskImpl0 o5 g  O% q3 V% E# [
    TaskAttemptImpl等。 为了便于用户查看这些状态机的状态变化以及相关事件, YARN提供了一个状态机 可视化工具 [16] , 具体操! A" k4 C; Y& H5 C- M+ M0 c4 \
    作步骤如下。
    4 L- P. A. \1 S* a1 Z9 p! g步骤
    1 将状态机转化为graphviz(.gv)格式的文件, 编译命令如下:
    2 C. }) |( l# a: o3 X
    mvn compile -Pvisualize& l- N* B/ H9 z0 I0 O% V
    经过该步骤后, 本地目录中生成了ResourceManager.gvNodeManager.gvMapReduce.gv三个graphviz格式的文件( 有兴趣可以& e, b) p% U. Q5 E# `
    直接打开查看具体内容) 。

    ) L+ W% A8 W7 U2 k' @步骤2 使用可视化包graphviz中的相关命令生成状态机图, Shell命令具体如下:: h; S* w* l1 W: }6 o7 U1 I& a
    dot -Tpng NodeManager.gv > NodeManager.png! I* s4 F/ Q: K8 k: z+ l0 z
    如果尚未安装graphviz包, 操作该步骤之前先要安装该包。
    * u# p, E' d" D6 x" \: W$ o% Z注释- Y8 R( H& X9 l8 k3 k( `6 \4 ?
    [16] 参见网址https://issues.apache.org/jira/browse/MAPREDUCE-2930  
    # Z% @! `/ g) H- H& P
    - {0 d3 _  ]8 q6 P: ?2 D9 B" u' D: E# q1 |
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|Archiver|手机版|小黑屋|Java自学网

    GMT+8, 2024-11-21 17:42 , Processed in 0.244001 second(s), 30 queries .

    Powered by Javazx

    Copyright © 2012-2022, Javazx Cloud.

    快速回复 返回顶部 返回列表