java自学网VIP

Java自学网

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 4092|回复: 0

《深入解析YARN架构设计与实现原理》第3章 YARN基础库【3.4】

[复制链接]
  • TA的每日心情
    开心
    2021-5-25 00:00
  • 签到天数: 1917 天

    [LV.Master]出神入化

    2025

    主题

    3683

    帖子

    6万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    66345

    宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2017-4-17 13:58:22 | 显示全部楼层 |阅读模式
    3.4 服务库与事件库
    ' h3 l- Z" I$ i7 S; S本节介绍服务库和事件库。- K$ S: D3 W; c5 V
    3.4.1 服务库
    " B! r; b* q9 J7 S8 U: Y9 [
    对于生命周期较长的对象, YARN采用了基于服务的对象管理模型对其进行管理, 该模型主要有以下几个特点。
    2 f1 Q9 E. Z3 Y* N) G- Q+ h
    ❑将每个被服务化的对象分为4个状态: NOTINITED( 被创建) 、 INITED( 已初始化) 、 STARTED( 已启动) 、) u* U( K) w# @% z! y0 v9 Q
    STOPPED( 已停止) 。
    ; Z# l9 y( \6 I+ P5 m/ g5 ~' N' t
    ❑任何服务状态变化都可以触发另外一些动作。1 H6 m* L0 s0 J9 r. e  h
    ❑可通过组合的方式对任意服务进行组合, 以便进行统一管理。
    & x, b- n( E: Y5 S& S9 d9 X8 @
    YARN中关于服务模型的类图( 位于包org.apache.hadoop.service中) 如图3-13所示。 在这个图中, 我们可以看到, 所有的服务
    $ h" P. z' H' O" ]& y对象最终均实现了接口
    Service, 它定义了最基本的服务初始化、 启动、 停止等操作, 而AbstractService类提供了一个最基本的
    ; s; r1 W6 n5 b1 V) n
    Service实现。 YARN中所有对象, 如果是非组合服务, 直接继承AbstractService类即可, 否则需继承CompositeService。 比如, 对于3 A# L1 k4 w3 J" k" v4 D' s
    ResourceManager而言, 它是一个组合服务, 它组合了各种服务对象, 包括ClientRMServiceApplicationMasterLauncher
    9 |7 t" h) v5 ~9 O' G
    ApplicationMasterService等。1 a2 s$ E& r9 ^: I# T
    3-13 YARN中服务模型的类图4 A: `( k4 d, U- D6 V7 n7 O9 \
    YARN中, ResourceManagerNodeManager属于组合服务, 它们内部包含多个单一服务和组合服务, 以实现对内部多种服" P+ Q2 J) s; A; g! j: J. V
    务的统一管理。
    5 l- Y$ l' A8 e4 g1 R- `
    3.4.2 事件库
    . L' x! {8 n6 t! bYARN采用了基于事件驱动的并发模型, 该模型能够大大增强并发性, 从而提高系统整体性能。 为了构建该模型, YARN: H' Q% Q& D" M- g) {( x
    各种处理逻辑抽象成事件和对应事件调度器, 并将每类事件的处理过程分割成多个步骤, 用有限状态机表示。
    YARN中的事件处4 x9 H2 L1 p3 U  X' B
    理模型可概括为图
    3-14所示。! `) G: {# K8 x4 C& c
    3-14 YARN的事件处理模型4 F0 e& l$ C. `7 a; S( z5 a3 L+ b
    整个处理过程大致为: 处理请求会作为事件进入系统, 由中央异步调度器(
    Async-Dispatcher) 负责传递给相应事件调度器  J6 I6 a; _  Q; ]$ O
    Event Handler) 。 该事件调度器可能将该事件转发给另外一个事件调度器, 也可能交给一个带有有限状态机的事件处理器, 其5 }" v  v( O$ X  W2 R1 H- o  @
    处理结果也以事件的形式输出给中央异步调度器。 而新的事件会再次被中央异步调度器转发给下一个事件调度器, 直至处理完成7 G  H& W& y* J& b
    ( 达到终止条件) 。
      ]1 i! v/ o4 L# Z
    YARN中, 所有核心服务实际上都是一个中央异步调度器, 包括ResourceManagerNodeManager
    . ?+ O7 w: r  U
    MRAppMasterMapReduce应用程序的ApplicationMaster) 等, 它们维护了事先注册的事件与事件处理器, 并根据接收的事件类型
    & A  X) |. T5 N7 Q驱动服务的运行。
    & U' L1 p' {2 D" _" Y
    YARN中事件与事件处理器类的关系( 位于包org.apache.hadoop.yarn.event中) 如图3-15所示。 当使用YARN事件库时, 通常先9 d5 O+ J' W  l# I6 H5 Q. q
    要定义一个中央异步调度器
    AsyncDispatcher, 负责事件的处理与转发, 然后根据实际业务需求定义一系列事件Event与事件处理器/ E) y$ ?  O: B7 w7 a* D9 M
    EventHandler, 并注册到中央异步调度器中以实现事件统一管理和调度。 以MRAppMaster为例, 它内部包含一个中央异步调度器3 F% h2 r: z, U+ O6 O
    AsyncDispatcher, 并注册了TaskAttemptEvent/TaskAttemptImplTaskEvent/TaskImplJobEvent/JobImpl等一系列事件/事件处理器, 由
    9 g* }' e& o  U' y) E( L2 U中央异步调度器统一管理和调度。1 s0 `. d$ F4 }
    服务化和事件驱动软件设计思想的引入, 使得
    YARN具有低耦合、 高内聚的特点, 各个模块只需完成各自功能, 而模块之间
    8 ?6 q  L* j& m% h1 Q( j则采用事件联系起来, 系统设计简单且维护方便。

    ( D; O$ j/ L( R- q0 F3-15 事件与事件处理器" o% V3 k( f9 B* y
    3.4.3 YARN服务库和事件库的使用方法0 P& o7 ~+ c, I/ E
    为了说明YARN服务库和事件库的使用方法, 本小节介绍一个简单的实例, 该实例可看做MapReduce+ u: z- a4 J$ R4 R7 y: s
    ApplicationMaster
    MRAppMaster) 的简化版。 该例子涉及任务和作业两种对象的事件以及一个中央异步调度器。 步骤如下。
    ! O9 C! y1 S8 l: d
    1) 定义Task事件。  i) Q. x6 O9 u0 ?- P
    public class TaskEvent extends AbstractEvent<TaskEventType> {4 F2 P2 ?! q3 N* \
    private String taskID; //Task ID, T7 w% V7 \- ^1 t
    public TaskEvent(String taskID, TaskEventType type) {0 h2 W; _$ G" L; T
    super(type);
    2 h' k4 G: l% \+ sthis.taskID = taskID;5 t+ P* c' g: m5 p
    } p
    ' u! v; Q7 Y, X9 T6 fublic String getTaskID() {( u2 r; G! {- g2 c, v/ u6 `8 ?
    return taskID;
    # \. Z& t- r& Z6 J8 }( s  Q+ ^$ M}
    8 w- a, r5 Q1 ~2 x. b- U中,
    Task事件类型定义如下:
    ; M" |. S" g5 |' k! }! _
    public enum TaskEventType {
    8 U9 a) D; `2 }5 e" gT_KILL,2 l1 Y" c7 Z* q6 H: h) O+ s
    T_SCHEDULE
    ! h5 P) p4 f3 u) `- y  j" \}8 J; S' o- }/ X9 a$ P6 F. F5 p* Z( a
    2) 定义Job事件。& m' A! _5 g; H; t/ g
    public class JobEvent extends AbstractEvent<JobEventType> {
    ! _  c1 |$ o6 d; n3 M, I6 ^private String jobID;
    $ P) V1 G, M+ s: @8 O9 z8 F9 k! H  tpublic JobEvent(String jobID, JobEventType type) {4 }+ }" d  ^, ^1 j1 K
    super(type);* r( Q- x  N, a5 k8 w5 t
    this.jobID = jobID;
    . O6 T; l0 Y" u+ _% S} p" F$ {( Y$ o  G7 H. b
    ublic String getJobId() {" b8 ^) Q2 D: s1 ?+ _
    return jobID;
    0 I+ @$ \* I* Q5 H. J! H- T- Q}$ x6 R+ o) o) O. {" n; [' E' d$ I5 T7 ]
    }
    6 v3 x, u: M. ]/ e0 ^9 n  z其中, Job事件类型定义如下:& G5 f' D) @- C
    public enum JobEventType {
    ) p  o3 ^- ^' i. @6 S3 ]8 S1 VJOB_KILL,
    * @% C) z. h9 s: r5 n3 F0 _9 ~9 C" wJOB_INIT," b4 E4 n9 Y+ u& x5 G- f% |: @" i
    JOB_START
    + _$ }$ x; g2 ]" \' q7 l}# H- D$ }4 V( _
    3) 事件调度器。& W6 E4 w: |( v2 X
    接下来定义一个中央异步调度器, 它接收
    JobTask两种类型事件, 并交给对应的事件处理器处理, 代码如下:
    $ @1 {. [$ p& b@SuppressWarnings("unchecked")
    $ B+ Q0 w( G9 l+ [public class SimpleMRAppMaster extends CompositeService {$ u) X% D2 u+ p3 x4 J
    private Dispatcher dispatcher; //中央异步调度器
    3 L3 v/ `6 ?% t# t0 P3 ?" _% W( }
    private String jobID;7 Z; w- K+ t2 t
    private int taskNumber; //该作业包含的任务数目8 _2 L0 ?. b# u: A! R
    private String[] taskIDs; //该作业内部包含的所有任务
    4 b" L: Z8 t% s+ c% H# ]) {
    public SimpleMRAppMaster(String name, String jobID, int taskNumber) {
    , O/ s! n3 {. ^4 L' ssuper(name);. l/ }$ x; C* p8 Q0 ?
    this.jobID = jobID;" J' d; w( M0 {' g- V
    this.taskNumber = taskNumber;
    - ~. t2 R  _( C8 g/ M9 gtaskIDs = new String[taskNumber];- a, o& K$ D/ J, N6 Z/ k& f
    for(int i = 0; i < taskNumber; i++) {
    - D4 y) L. ]; k2 f7 ttaskIDs = new String(jobID + "_task_" + i);1 y' \+ ~4 ?( S* S
    }# B7 X6 P2 H; t4 Z
    } p- @8 y% @' y8 {' S7 h5 S
    ublic void serviceInit(final Configuration conf) throws Exception {
    ' q4 Y0 z5 ?! Y- D# d3 g/ Mdispatcher = new AsyncDispatcher();//定义一个中央异步调度器
    , Z1 w# h% r" T. \: [1 X- s- O
    //分别注册JobTask事件调度器
    ! [( w1 y6 ]- D" D
    dispatcher.register(JobEventType.class, new JobEventDispatcher());/ \2 q' b3 Z" _% i1 H- D% ^9 c
    dispatcher.register(TaskEventType.class, new TaskEventDispatcher());- o$ x2 Q+ j- ?( t1 f& z" h1 W# \. N
    addService((Service) dispatcher);
    + [" X5 b7 y, M2 B5 A2 \3 Jsuper.serviceInit(conf);
    7 w1 O5 D3 T. K( X1 z6 [9 V& c} p0 ~9 @6 f  f* I. g% k3 w9 ^# ~
    ublic Dispatcher getDispatcher() {* b& g# R7 j  f$ `2 g  @) c/ Q; G
    return dispatcher;
    / g) H$ Y: e& w6 S3 O8 n! ?- y5 g/ Y9 H} p* s( c$ A! D) G2 A$ h
    rivate class JobEventDispatcher implements EventHandler<JobEvent> {
      v/ K7 k2 |9 U5 O# Y/ D$ K- T@Override
    : G9 Q! c8 i& E- U- @public void handle(JobEvent event) {
    6 ~# H5 ?# R, Fif(event.getType() == JobEventType.JOB_KILL) {
    ! u& W; u4 s" ESystem.out.println("Receive JOB_KILL event, killing all the tasks");' u9 L2 r; L9 E8 K1 o% Z
    for(int i = 0; i < taskNumber; i++) {
    % p$ M0 f) T, K0 l' B$ Gdispatcher.getEventHandler().handle(new TaskEvent(taskIDs,
    7 W3 B% o) K& V* ~  oTaskEventType.T_KILL));4 {3 {6 a! F8 X: u
    }
    - p9 H1 a! b( y$ O5 ~9 x} else if(event.getType() == JobEventType.JOB_INIT) {
    0 H; @2 K' t+ LSystem.out.println("Receive JOB_INIT event, scheduling tasks");
    ! B, `  Q$ S: q+ Y3 K$ Z  [; B# Zfor(int i = 0; i < taskNumber; i++) {+ ~* V3 Z) r% H- Q" ]
    dispatcher.getEventHandler().handle(new TaskEvent(taskIDs,
    % {! [! ?2 F3 F; k  o. l7 QTaskEventType.T_SCHEDULE));2 C6 L& X8 Y6 `' @
    }
    3 G5 O3 a& Q  a, @& ~}
    : P* V1 \9 F1 N9 P6 l}- v. @0 z/ Q3 n: {; v/ p/ S; k
    }p- v1 @/ o$ b* T/ d6 `; D
    rivate class TaskEventDispatcher implements EventHandler<TaskEvent> {+ k! G, [5 M$ E7 g( T
    @Override
    ; m5 L# p* T9 Wpublic void handle(TaskEvent event) {; R' X7 V' S' A  t
    if(event.getType() == TaskEventType.T_KILL) {# d0 e' ]1 O! u* V' P1 @$ m
    System.out.println("Receive T_KILL event of task " + event.getTaskID());# J5 I( H- x& ~9 E/ l
    } else if(event.getType() == TaskEventType.T_SCHEDULE) {) l9 Z( o* k) E1 X/ {, |
    System.out.println("Receive T_SCHEDULE event of task " + event.getTaskID());
    , A- N8 l& N4 b9 D+ ]. t; ^}: k* h$ c* a2 K/ W9 k3 z3 X
    }: U4 F  m; I2 W- e
    }
    ' f# ^8 L( y" w8 S}
    1 r( U8 e+ ?5 E4) 测试程序。1 D- Q( f2 @* P6 Z( m
    @SuppressWarnings("unchecked")
    4 Z0 |! n# K  c0 `# g" o$ B& Ipublic class SimpleMRAppMasterTest {
    2 m" h, R- `: ~- [public static void main(String[] args) throws Exception {, }1 o) N6 R0 g7 a- p+ m# j
    String jobID = "job_20131215_12";
    7 K7 [3 ]$ U* ]SimpleMRAppMaster appMaster = new SimpleMRAppMaster("Simple MRAppMaster", jobID, 5);/ Z! B: q/ e8 |+ Q
    YarnConfiguration conf = new YarnConfiguration(new Configuration());" X9 R9 K  k2 s7 S! G4 x
    appMaster.serviceInit(conf);, ]/ ?! ~: _  x( J# S
    appMaster.serviceStart();% V9 T' |" o( s" k/ D
    appMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID,
    ) I3 e" ~# q; A3 rJobEventType.JOB_KILL));
    ; P. [" Z. I" }9 M5 F) U+ [appMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID,
    1 c6 e- E9 G9 J5 r$ P( Y6 {" ?$ CJobEventType.JOB_INIT));7 f. k2 ~# K3 c6 D7 F
    }: f, {1 A5 ~4 v, N
    3.4.4 事件驱动带来的变化9 ?/ @7 ~$ a! o2 o1 ?7 H
    MRv1中, 对象之间的作用关系是基于函数调用实现的, 当一个对象向另外一个对象传递信息时, 会直接采用函数调用的
    3 f! b- o- l$ u( g/ H6 u方式, 且整个过程是串行的。 比如, 当
    TaskTracker需要执行一个Task时, 将首先下载Task依赖的文件( JAR包、 二进制文件等、
    , F- j9 f, q' M( J, A字典文件等) 、 然后执行
    Task。 同时在整个过程中会记录一些关键日志, 该过程可用图3-16描述。 在整个过程中, 下载依赖文件
    ! u  ?  T) z. u- Y0 I是阻塞式的, 也就是说, 前一个任务未完成文件下载之前, 后一个新任务将一直处于等待状态, 只有在下载完成后, 才会启动一
    ; R  Q1 y2 R  T. Z4 L个独立进程运行该任务。 尽管后来
    MRv1通过启动过独立线程下载文件 解决了该问题 [15] , 但这种方式不是在大系统中彻底解决
    ; @' |& B3 d% q问题之道, 必须引入新的编程模型。

    * W7 Q9 ~* ]5 R3-16 基于函数调用的工作流程, [$ e( m2 \* N8 z0 @
    基于函数调用的编程模型是低效的, 它隐含着整个过程是串行、 同步进行的。 相比之下,
    MRv2引入的事件驱动编程模型则- k- r, Y. [7 V! E- `( o0 n
    是一种更加高效的方式。 在基于事件驱动的编程模型中, 所有对象被抽象成了事件处理器, 而事件处理器之间通过事件相互关
    . [6 H4 a9 E3 v4 E9 d, Z9 r联。 每种事件处理器处理一种类型的事件, 同时根据需要触发另外一种事件, 该过程如图
    3-17所示, 当A需要下载文件时, 只需) k+ Z9 B' m  h' G! k/ ~& h# V
    向中央异步处理器发送一个事件即可( 之后可以继续完成后面的功能而无须等待下载完成) , 该事件会被传递给对应的事件处理8 T' {1 d! \% [* @1 _
    B, 由B完成具体的下载任务。 一旦B完成下载任务, 便可以通过事件通知A: w1 h) q& X+ s4 A2 o' G
    3-17 基于事件驱动的工作流程$ ]5 O* ?* d- Q1 S% {
    相比于基于函数调用的编程模型, 这种编程方式具有异步、 并发等特点, 更加高效, 因此更适合大型分布式系统。# u4 C: p6 }1 \0 U3 Q4 G  l
    [15] 参见网址https://issues.apache.org/jira/browse/MAPREDUCE-2705  , {7 W5 r' g; n+ u, V
      e  L# Z9 [% `( u" ^& Q+ o& ]

    3 E. m; u9 `2 P
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|Archiver|手机版|小黑屋|Java自学网

    GMT+8, 2024-11-21 17:38 , Processed in 0.286851 second(s), 33 queries .

    Powered by Javazx

    Copyright © 2012-2022, Javazx Cloud.

    快速回复 返回顶部 返回列表