java自学网VIP

Java自学网

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 4093|回复: 0

《深入解析YARN架构设计与实现原理》第3章 YARN基础库【3.4】

[复制链接]
  • TA的每日心情
    开心
    2021-5-25 00:00
  • 签到天数: 1917 天

    [LV.Master]出神入化

    2025

    主题

    3683

    帖子

    6万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    66345

    宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2017-4-17 13:58:22 | 显示全部楼层 |阅读模式
    3.4 服务库与事件库
    : F9 U) M5 Z1 }+ i8 }9 E0 g本节介绍服务库和事件库。
    3 N) Y) O( Y& A9 {  V+ f0 [
    3.4.1 服务库
    2 J1 n" L; b( G: C# b7 ?( e$ v9 ~
    对于生命周期较长的对象, YARN采用了基于服务的对象管理模型对其进行管理, 该模型主要有以下几个特点。% T8 l2 {3 _& d( r; }% c
    ❑将每个被服务化的对象分为4个状态: NOTINITED( 被创建) 、 INITED( 已初始化) 、 STARTED( 已启动) 、
    3 }) i0 V  R4 P7 X: n  |
    STOPPED( 已停止) 。+ B. h! c, Q6 i. B5 \
    ❑任何服务状态变化都可以触发另外一些动作。7 c- b' r5 A2 V# X: b& ^
    ❑可通过组合的方式对任意服务进行组合, 以便进行统一管理。! i* C1 _% q$ o8 H6 w# |
    YARN中关于服务模型的类图( 位于包org.apache.hadoop.service中) 如图3-13所示。 在这个图中, 我们可以看到, 所有的服务- t4 M1 ]. C/ j' W+ o7 j+ j
    对象最终均实现了接口
    Service, 它定义了最基本的服务初始化、 启动、 停止等操作, 而AbstractService类提供了一个最基本的" H" R0 k, y; i6 u
    Service实现。 YARN中所有对象, 如果是非组合服务, 直接继承AbstractService类即可, 否则需继承CompositeService。 比如, 对于
    # ~! H0 t- a2 N  m$ m7 a
    ResourceManager而言, 它是一个组合服务, 它组合了各种服务对象, 包括ClientRMServiceApplicationMasterLauncher
    $ j* Z* S; l5 E9 A
    ApplicationMasterService等。% u& b3 c* e0 l2 l5 Y) y1 \
    3-13 YARN中服务模型的类图
    * d2 r2 `# O  }+ Q" C. M
    YARN中, ResourceManagerNodeManager属于组合服务, 它们内部包含多个单一服务和组合服务, 以实现对内部多种服
    # M. Z; C$ N2 p5 F5 J7 ~) I务的统一管理。, {! `- K* o& ^4 K5 }
    3.4.2 事件库  Z' [& O: ]  c/ U3 z
    YARN采用了基于事件驱动的并发模型, 该模型能够大大增强并发性, 从而提高系统整体性能。 为了构建该模型, YARN
    2 y' f1 R) _3 v1 n各种处理逻辑抽象成事件和对应事件调度器, 并将每类事件的处理过程分割成多个步骤, 用有限状态机表示。
    YARN中的事件处
    . a) D& W- Y2 l3 q3 d理模型可概括为图
    3-14所示。
    ' r3 G  D$ A: s7 F! W8 X9 e4 z
    3-14 YARN的事件处理模型! D4 P* Z0 w) Q, T% R* N( c
    整个处理过程大致为: 处理请求会作为事件进入系统, 由中央异步调度器(
    Async-Dispatcher) 负责传递给相应事件调度器
    : z9 s) g2 ?+ }0 n
    Event Handler) 。 该事件调度器可能将该事件转发给另外一个事件调度器, 也可能交给一个带有有限状态机的事件处理器, 其  E$ D9 O8 s# w5 v, ]
    处理结果也以事件的形式输出给中央异步调度器。 而新的事件会再次被中央异步调度器转发给下一个事件调度器, 直至处理完成
      [: [3 Q0 m5 G- P4 y( 达到终止条件) 。; A$ n, ^* g! z6 C
    YARN中, 所有核心服务实际上都是一个中央异步调度器, 包括ResourceManagerNodeManager0 u! I% D7 o8 {
    MRAppMasterMapReduce应用程序的ApplicationMaster) 等, 它们维护了事先注册的事件与事件处理器, 并根据接收的事件类型
    % n  O7 q& s% z3 p9 A& k! [( w  n  n驱动服务的运行。9 z0 F& Z7 w7 c2 p( b
    YARN中事件与事件处理器类的关系( 位于包org.apache.hadoop.yarn.event中) 如图3-15所示。 当使用YARN事件库时, 通常先! d) u9 D4 e# Z( L
    要定义一个中央异步调度器
    AsyncDispatcher, 负责事件的处理与转发, 然后根据实际业务需求定义一系列事件Event与事件处理器
      K+ k7 I( ?  @+ l0 Y
    EventHandler, 并注册到中央异步调度器中以实现事件统一管理和调度。 以MRAppMaster为例, 它内部包含一个中央异步调度器
    4 v, j( o1 c& x
    AsyncDispatcher, 并注册了TaskAttemptEvent/TaskAttemptImplTaskEvent/TaskImplJobEvent/JobImpl等一系列事件/事件处理器, 由/ F& P# }. n9 S- C0 D3 w9 J
    中央异步调度器统一管理和调度。% Q5 A* L$ V  K, v
    服务化和事件驱动软件设计思想的引入, 使得
    YARN具有低耦合、 高内聚的特点, 各个模块只需完成各自功能, 而模块之间! e1 E: p# g9 D" R( G- s
    则采用事件联系起来, 系统设计简单且维护方便。

    ( j% r0 l; f: w5 u5 V( }; i3-15 事件与事件处理器. B. q& i& K0 L7 v
    3.4.3 YARN服务库和事件库的使用方法
    # Z( D( s0 x& I1 T' u& r' B7 K) U
    为了说明YARN服务库和事件库的使用方法, 本小节介绍一个简单的实例, 该实例可看做MapReduce4 F  Y) x- r! A: p
    ApplicationMaster
    MRAppMaster) 的简化版。 该例子涉及任务和作业两种对象的事件以及一个中央异步调度器。 步骤如下。
    + l$ f+ e+ j  D5 J. l$ ~. H
    1) 定义Task事件。0 J3 o5 ?; l3 G8 d5 c* Q) G+ e
    public class TaskEvent extends AbstractEvent<TaskEventType> {% E( Y( K# f7 f) N% e
    private String taskID; //Task ID: n' N* T. p% T/ l# ]
    public TaskEvent(String taskID, TaskEventType type) {
    $ ^3 [- @1 ~/ {/ w+ ]! u* q; msuper(type);
    7 [9 r" f2 X# ~7 s: j2 ~/ Hthis.taskID = taskID;7 V0 \# `3 m9 v7 Z9 b2 n9 `9 j5 n8 y4 Z
    } p
    8 e0 t# ?3 z9 `  Uublic String getTaskID() {
    2 w3 V: F# F4 ^2 c6 Ereturn taskID;
    " ?( h& E1 q1 ^3 B) B2 x$ ~  k}
    5 [' c. z  Q4 s. ^2 V中,
    Task事件类型定义如下:
    - m7 l! V/ n8 |" i3 g* H3 y( {3 g+ ~
    public enum TaskEventType {' q! L% l/ f6 |
    T_KILL,
    / L/ C+ P, o5 Y, {0 ?, MT_SCHEDULE
    % z8 O0 L) m$ ^( [8 ?1 G( ~}( ~) B- V/ `/ C" ^* |; |- q
    2) 定义Job事件。- q7 z" J! n, P1 Q" H
    public class JobEvent extends AbstractEvent<JobEventType> {7 T8 z' }' @, W5 Q
    private String jobID;
    ! {" V# }& n0 D/ j! t' p! Npublic JobEvent(String jobID, JobEventType type) {
    * D& ~! t1 j( X* Osuper(type);9 @- e/ [- P* ]2 ~: A, K; o
    this.jobID = jobID;
    # v6 ^! C9 E  E: v  s' B) G. V} p
    5 s/ `3 S$ H( q: K; I. C* L" wublic String getJobId() {$ J( g3 d) O" F# z, |* J
    return jobID;
    3 j/ [  t% c6 T: p3 M}
    1 v/ b) N  a: @( A. [$ m}2 r+ u# ^0 A0 J2 b
    其中, Job事件类型定义如下:
    + R8 y2 c, ^3 W, O- I
    public enum JobEventType {& f( @# N( M: U. G! V0 n
    JOB_KILL,, Z, W, L" ~8 x) z5 \$ P2 b
    JOB_INIT,
    6 b3 p8 r: q7 V& }JOB_START
    - P! K0 g! ^" N8 Z}
    $ {; M+ m8 G& l3) 事件调度器。
    . I2 @( }$ Y" P  l, R& r) C* G$ M6 u接下来定义一个中央异步调度器, 它接收
    JobTask两种类型事件, 并交给对应的事件处理器处理, 代码如下:
    ; q6 s2 E. |# c6 q@SuppressWarnings("unchecked")$ M3 B3 L; R$ T9 j5 _+ R# S
    public class SimpleMRAppMaster extends CompositeService {% }; K! X8 {  `# Z, v
    private Dispatcher dispatcher; //中央异步调度器6 |  V/ W+ Y  t& r0 g5 e8 f
    private String jobID;
    * Z) W& Z3 q4 f& Dprivate int taskNumber; //该作业包含的任务数目" {% S) l+ n4 g9 w
    private String[] taskIDs; //该作业内部包含的所有任务8 ^' K- R! |- h  j0 _* f
    public SimpleMRAppMaster(String name, String jobID, int taskNumber) {) j+ x, d1 [, @  v1 q$ b3 a" f
    super(name);
    - d% h* T$ h8 L! e- I; z. H8 w& g0 \$ {this.jobID = jobID;
    * W' O; U& l% h* S0 q3 w& Wthis.taskNumber = taskNumber;
    ; T3 E* u: j1 Y# UtaskIDs = new String[taskNumber];7 Q5 Y  x; ^( a+ t6 f. W
    for(int i = 0; i < taskNumber; i++) {2 U( N+ I# f+ r  g. I' e
    taskIDs = new String(jobID + "_task_" + i);5 y! q! M9 Y1 M& N+ ^
    }
    & w$ o7 k9 h6 k0 b: e} p0 k5 H9 `. C7 }3 K$ |5 i/ ^' ~
    ublic void serviceInit(final Configuration conf) throws Exception {0 x& }) {" x2 A- S+ e! `. g
    dispatcher = new AsyncDispatcher();//定义一个中央异步调度器- Z6 t- h( z9 h% @
    //分别注册JobTask事件调度器
    . {0 }+ W7 p7 R
    dispatcher.register(JobEventType.class, new JobEventDispatcher());
    . g8 |+ J- |% X0 e! hdispatcher.register(TaskEventType.class, new TaskEventDispatcher());6 Y( {& O$ y  |3 v
    addService((Service) dispatcher);
    ) j. ~8 [) C$ Q( K2 Zsuper.serviceInit(conf);
    " D+ g- F4 M9 K6 Y$ ?% ~& F} p; x- \7 W# z: e6 e2 `% {3 l& x* N
    ublic Dispatcher getDispatcher() {
    & v/ [2 x: o% L( yreturn dispatcher;' E% i$ f8 |+ n9 ~5 T
    } p
    * N0 B" {: T4 g) ~0 ^rivate class JobEventDispatcher implements EventHandler<JobEvent> {, L1 f7 c/ M3 m7 M+ l9 o2 Z
    @Override
    ' Y& s, {4 Z' r2 d5 Epublic void handle(JobEvent event) {0 J' g. N# P( h! o
    if(event.getType() == JobEventType.JOB_KILL) {+ t* N$ M# z  Y8 X# w: d
    System.out.println("Receive JOB_KILL event, killing all the tasks");  F. G9 f6 \6 t" n' S
    for(int i = 0; i < taskNumber; i++) {2 V& Y: ]' h& U8 d* B
    dispatcher.getEventHandler().handle(new TaskEvent(taskIDs,
    + i* o# \4 t' p- V- }$ r8 kTaskEventType.T_KILL));
    1 z: C; q* Z' O- x}& u% j6 A. G5 n) T7 d: A
    } else if(event.getType() == JobEventType.JOB_INIT) {6 N* Q/ S6 Q( R: J/ {5 I
    System.out.println("Receive JOB_INIT event, scheduling tasks");9 W% g1 }' m9 U3 y" |( P
    for(int i = 0; i < taskNumber; i++) {, F5 x* [5 I: |- i3 _2 k5 C+ Z
    dispatcher.getEventHandler().handle(new TaskEvent(taskIDs,' M. n! U( K- v  D+ f9 J: }
    TaskEventType.T_SCHEDULE));
    0 B% r: Y" Y( {" m: R}
    ( p* g- p, B) }}
    5 \. |! ]1 A' T2 l- q* `% B9 L}% X% b  g- l0 x6 e' O0 c
    }p/ z( ~8 B, V( ]/ p
    rivate class TaskEventDispatcher implements EventHandler<TaskEvent> {0 m0 p" Z7 ?. j; W4 j
    @Override
    ( o2 \. s! Q8 d/ a6 u* B0 Ypublic void handle(TaskEvent event) {6 @9 p- Q8 _1 X
    if(event.getType() == TaskEventType.T_KILL) {5 {2 n: }* F+ C
    System.out.println("Receive T_KILL event of task " + event.getTaskID());% ?2 ?$ \9 m5 j6 a) a, l; c
    } else if(event.getType() == TaskEventType.T_SCHEDULE) {
    ; P( f0 B6 W# E4 ISystem.out.println("Receive T_SCHEDULE event of task " + event.getTaskID());! c/ w9 s0 a1 J. G2 b6 [* z
    }4 G& v0 E" T! h$ Q! I# h* t
    }! }, |6 ?8 T; c; @( }( r2 k4 f- B
    }% `( C* p$ g* J  s; y! I
    }" C: `# b: I+ L, u( e, l2 _0 k
    4) 测试程序。
    4 V( a: }# g* ^2 M# }; h* s7 C, ]
    @SuppressWarnings("unchecked")5 ]/ y$ O6 G, l8 e( b  D3 j
    public class SimpleMRAppMasterTest {
    0 a9 A: }. ^- S% P/ G' Y1 Vpublic static void main(String[] args) throws Exception {
    " x$ A6 |, \6 O# T, _String jobID = "job_20131215_12";
    % G0 p' q% }* HSimpleMRAppMaster appMaster = new SimpleMRAppMaster("Simple MRAppMaster", jobID, 5);) U% D' I4 |, e) D* E7 u
    YarnConfiguration conf = new YarnConfiguration(new Configuration());
    , N' Q; U% z  @8 VappMaster.serviceInit(conf);
    7 ?& O2 r) g. Z% v2 {4 {appMaster.serviceStart();
    ( n% c; h$ @0 {" happMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID,2 _1 e/ |( E( t2 ]7 u4 g9 t
    JobEventType.JOB_KILL));9 r* _  E7 ]6 J8 a
    appMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID,
    & i. J0 J- j8 {# b9 u1 K8 V3 vJobEventType.JOB_INIT));
    6 T4 ~( ~9 P/ B- s/ H, G! m/ P$ y% [}* u+ x2 m1 q$ @( n
    3.4.4 事件驱动带来的变化
    8 F! }6 `( S, O
    MRv1中, 对象之间的作用关系是基于函数调用实现的, 当一个对象向另外一个对象传递信息时, 会直接采用函数调用的' p6 n5 b  \$ c! i: F
    方式, 且整个过程是串行的。 比如, 当
    TaskTracker需要执行一个Task时, 将首先下载Task依赖的文件( JAR包、 二进制文件等、
    " S2 e9 z7 \) E6 j9 Z' Y字典文件等) 、 然后执行
    Task。 同时在整个过程中会记录一些关键日志, 该过程可用图3-16描述。 在整个过程中, 下载依赖文件& w: L5 u$ x, I$ E
    是阻塞式的, 也就是说, 前一个任务未完成文件下载之前, 后一个新任务将一直处于等待状态, 只有在下载完成后, 才会启动一; c" [# [" j* i8 H, l9 x
    个独立进程运行该任务。 尽管后来
    MRv1通过启动过独立线程下载文件 解决了该问题 [15] , 但这种方式不是在大系统中彻底解决4 T3 Z5 G3 Q& P+ P3 x9 r
    问题之道, 必须引入新的编程模型。

    # A; Y  H7 f1 X6 V/ W3-16 基于函数调用的工作流程' Z3 m3 ^- b: n3 m
    基于函数调用的编程模型是低效的, 它隐含着整个过程是串行、 同步进行的。 相比之下,
    MRv2引入的事件驱动编程模型则  U' Q; V  K! v' g& k4 s% e4 g, y5 ]
    是一种更加高效的方式。 在基于事件驱动的编程模型中, 所有对象被抽象成了事件处理器, 而事件处理器之间通过事件相互关
    - A$ Q" a' i5 k; k& j联。 每种事件处理器处理一种类型的事件, 同时根据需要触发另外一种事件, 该过程如图
    3-17所示, 当A需要下载文件时, 只需
    * s  V1 |7 F/ o向中央异步处理器发送一个事件即可( 之后可以继续完成后面的功能而无须等待下载完成) , 该事件会被传递给对应的事件处理
    1 M7 \/ X0 y& v- ?* G& a& G
    B, 由B完成具体的下载任务。 一旦B完成下载任务, 便可以通过事件通知A7 K# s" M; {; P9 E9 ~
    3-17 基于事件驱动的工作流程- Q! S$ o9 P! F" j2 O! ?
    相比于基于函数调用的编程模型, 这种编程方式具有异步、 并发等特点, 更加高效, 因此更适合大型分布式系统。
    ( T4 R* X. f1 d" k
    [15] 参见网址https://issues.apache.org/jira/browse/MAPREDUCE-2705  
    * S" x4 y6 \6 m; q8 I( t( [
      @1 |" O1 D2 E& i7 ~0 I/ a$ D
    / @/ D8 P, h+ g% n. }8 d0 C
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|Archiver|手机版|小黑屋|Java自学网

    GMT+8, 2024-11-21 17:59 , Processed in 0.265593 second(s), 28 queries .

    Powered by Javazx

    Copyright © 2012-2022, Javazx Cloud.

    快速回复 返回顶部 返回列表