java自学网VIP

Java自学网

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 4129|回复: 0

《深入解析YARN架构设计与实现原理》第3章 YARN基础库【3.4】

[复制链接]
  • TA的每日心情
    开心
    2021-5-25 00:00
  • 签到天数: 1917 天

    [LV.Master]出神入化

    2025

    主题

    3683

    帖子

    6万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    66375

    宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2017-4-17 13:58:22 | 显示全部楼层 |阅读模式
    3.4 服务库与事件库
    - f& h; |- F  {# ?$ f4 K# ^9 B4 A本节介绍服务库和事件库。; l: c( y0 u0 J4 C
    3.4.1 服务库: q" h3 ]. a( m$ l; u) y; c6 Q
    对于生命周期较长的对象, YARN采用了基于服务的对象管理模型对其进行管理, 该模型主要有以下几个特点。$ \  y( P% O# g5 ^5 x% h
    ❑将每个被服务化的对象分为4个状态: NOTINITED( 被创建) 、 INITED( 已初始化) 、 STARTED( 已启动) 、* ~; w( U+ F3 b7 y! o
    STOPPED( 已停止) 。
      t  i8 ~8 Q* |# ?
    ❑任何服务状态变化都可以触发另外一些动作。3 ]' d8 U! v1 A8 }* T5 ^
    ❑可通过组合的方式对任意服务进行组合, 以便进行统一管理。9 }( y  H( R2 X. I1 A' i4 [9 ?% w1 ~
    YARN中关于服务模型的类图( 位于包org.apache.hadoop.service中) 如图3-13所示。 在这个图中, 我们可以看到, 所有的服务
    7 A: Q9 J6 H# p& z% Q5 _1 _对象最终均实现了接口
    Service, 它定义了最基本的服务初始化、 启动、 停止等操作, 而AbstractService类提供了一个最基本的
    - X; |3 T" |. S: c
    Service实现。 YARN中所有对象, 如果是非组合服务, 直接继承AbstractService类即可, 否则需继承CompositeService。 比如, 对于
    4 W& s# }$ U- a. e5 e( c
    ResourceManager而言, 它是一个组合服务, 它组合了各种服务对象, 包括ClientRMServiceApplicationMasterLauncher
    6 S0 e" L# K8 S9 a. Q0 |0 w% g
    ApplicationMasterService等。
    9 h1 W' Y7 _) u8 u
    3-13 YARN中服务模型的类图5 J& X) H  y: h( `
    YARN中, ResourceManagerNodeManager属于组合服务, 它们内部包含多个单一服务和组合服务, 以实现对内部多种服
    8 c2 P; I" f  A; j: j1 y) d/ ?8 B务的统一管理。
    ! M: l, T* N+ T  T  b
    3.4.2 事件库- ~5 J7 M4 }! a
    YARN采用了基于事件驱动的并发模型, 该模型能够大大增强并发性, 从而提高系统整体性能。 为了构建该模型, YARN
    . \% O0 O2 M4 ]5 s6 R. k1 e各种处理逻辑抽象成事件和对应事件调度器, 并将每类事件的处理过程分割成多个步骤, 用有限状态机表示。
    YARN中的事件处
    8 h( O0 O$ }7 D. [理模型可概括为图
    3-14所示。  b7 H8 N. z4 S
    3-14 YARN的事件处理模型
    + W7 }( J9 \0 k0 u: ]& w整个处理过程大致为: 处理请求会作为事件进入系统, 由中央异步调度器(
    Async-Dispatcher) 负责传递给相应事件调度器
    : n; X, f( s& t+ g. i3 j
    Event Handler) 。 该事件调度器可能将该事件转发给另外一个事件调度器, 也可能交给一个带有有限状态机的事件处理器, 其
    1 |# \) m# h& D+ t' _0 K处理结果也以事件的形式输出给中央异步调度器。 而新的事件会再次被中央异步调度器转发给下一个事件调度器, 直至处理完成# e" [$ C% I! q2 h
    ( 达到终止条件) 。. e, M5 w) q. a/ f8 h6 y
    YARN中, 所有核心服务实际上都是一个中央异步调度器, 包括ResourceManagerNodeManager
    & J( |7 U, L" @
    MRAppMasterMapReduce应用程序的ApplicationMaster) 等, 它们维护了事先注册的事件与事件处理器, 并根据接收的事件类型; J# }/ ?+ {! b  ~5 I
    驱动服务的运行。
    ; c7 u, B# l2 o+ |) r; A' x$ B
    YARN中事件与事件处理器类的关系( 位于包org.apache.hadoop.yarn.event中) 如图3-15所示。 当使用YARN事件库时, 通常先0 K" ~/ i7 K* m, P  p+ O
    要定义一个中央异步调度器
    AsyncDispatcher, 负责事件的处理与转发, 然后根据实际业务需求定义一系列事件Event与事件处理器- |5 O8 V# K! o8 X/ y( V
    EventHandler, 并注册到中央异步调度器中以实现事件统一管理和调度。 以MRAppMaster为例, 它内部包含一个中央异步调度器: _4 ~: D$ `/ @7 l2 Z% j
    AsyncDispatcher, 并注册了TaskAttemptEvent/TaskAttemptImplTaskEvent/TaskImplJobEvent/JobImpl等一系列事件/事件处理器, 由' ?% D: [/ G  F3 ]1 U" i
    中央异步调度器统一管理和调度。) h. ?$ K/ q; C% \) {4 W" ]& [
    服务化和事件驱动软件设计思想的引入, 使得
    YARN具有低耦合、 高内聚的特点, 各个模块只需完成各自功能, 而模块之间
    . J7 }1 t3 [* M6 b, ?/ y- U则采用事件联系起来, 系统设计简单且维护方便。

    # t) w4 j+ e* k% Y2 m: t* e- r3-15 事件与事件处理器4 H5 m$ c; l% W% @! w2 r
    3.4.3 YARN服务库和事件库的使用方法/ e& ~1 s! @$ {+ [) X
    为了说明YARN服务库和事件库的使用方法, 本小节介绍一个简单的实例, 该实例可看做MapReduce
    $ s  D0 s4 i/ z% q4 A' JApplicationMaster
    MRAppMaster) 的简化版。 该例子涉及任务和作业两种对象的事件以及一个中央异步调度器。 步骤如下。
    5 Q; b8 }/ @1 W9 h6 }' |% T. @
    1) 定义Task事件。
    # ?) }0 b3 ^& n2 D3 r$ Q! [
    public class TaskEvent extends AbstractEvent<TaskEventType> {, [- g# p. L% g. Q; C+ m' V
    private String taskID; //Task ID
      B. @7 d* N1 b! Dpublic TaskEvent(String taskID, TaskEventType type) {+ b  K- ?; K3 U% F+ c
    super(type);# q; P0 U7 ^! P- c0 Z! F
    this.taskID = taskID;- b; h$ ?( b2 c
    } p
    - C# S7 g: `: V) Y# n1 D2 O5 vublic String getTaskID() {9 D0 W1 f( M: A8 \; I
    return taskID;* i5 ^+ E& }6 D: L9 v$ Y. H& f
    } 4 l. F5 `, N) i# e$ N6 i, w4 l2 k' i
    中,
    Task事件类型定义如下:
    " D/ q# i  f5 I9 E9 M
    public enum TaskEventType {% t) i, ~% a( J
    T_KILL,: s, c. ^4 c6 Z- T7 z% S
    T_SCHEDULE
    , `0 v) B) I6 Q) n& o0 I$ x* ?}
    8 c9 z% y4 F0 V6 k$ Q% H2) 定义Job事件。" k  p- \" y' ]
    public class JobEvent extends AbstractEvent<JobEventType> {1 a! n+ Y: B* d2 x- K9 q
    private String jobID;
    4 `/ W7 `$ Z2 J4 F( v1 {3 ipublic JobEvent(String jobID, JobEventType type) {
    0 p1 J6 `) K: C& nsuper(type);
    & g# H2 w& e3 o$ Mthis.jobID = jobID;
    9 v  J/ Y) O6 x5 A} p+ |5 [( D4 U5 ~  |7 [
    ublic String getJobId() {
    - `3 V! F9 k, z1 V, treturn jobID;$ i0 E3 T! B( ?0 X$ w2 D- s0 ^
    }
    1 E5 c7 \: D+ V" G}
    2 k8 a9 P% J, C+ t! Y& B* z其中, Job事件类型定义如下:
    , c; [! B# A* A$ R6 \
    public enum JobEventType {4 I9 [; K- h/ [
    JOB_KILL,6 d) Y# E7 B7 e- O# [6 z, Y$ E
    JOB_INIT,
    5 {( c, H! A. [9 u# UJOB_START
    5 k6 N3 H/ O. Z9 u1 S- m}
    , _% t, [7 J5 H" L, }* N3) 事件调度器。" h" R7 e3 A8 z- ]; Q( Y  j
    接下来定义一个中央异步调度器, 它接收
    JobTask两种类型事件, 并交给对应的事件处理器处理, 代码如下:/ x$ m1 R8 l% ]8 K( {, U" c) A
    @SuppressWarnings("unchecked")& X+ V5 i+ R+ _
    public class SimpleMRAppMaster extends CompositeService {6 }! q- k3 e& P; X
    private Dispatcher dispatcher; //中央异步调度器) b# B' F( p! J1 H# {
    private String jobID;5 K4 Z3 \6 P3 P. Q- S0 x
    private int taskNumber; //该作业包含的任务数目) x6 |+ T* r' C% p2 e: ~
    private String[] taskIDs; //该作业内部包含的所有任务
    9 N  u% ?! s/ T, c  p4 j( E
    public SimpleMRAppMaster(String name, String jobID, int taskNumber) {
    $ T3 k& C" y- h1 f7 _9 z/ M% `super(name);
    4 G! r! B$ ]; Q9 g; ?' pthis.jobID = jobID;
    6 R' r/ ~9 J9 kthis.taskNumber = taskNumber;8 ^0 B2 I$ y+ l( _9 Q2 b
    taskIDs = new String[taskNumber];
    1 d' D1 u! _& I) W4 Z3 c6 w+ e8 n8 `for(int i = 0; i < taskNumber; i++) {; \! g; K, k/ Z5 D
    taskIDs = new String(jobID + "_task_" + i);6 ^$ G  @# ]2 ^
    }( p# V3 K8 K9 x9 U( w
    } p1 {& n: Q. |3 ]! A+ _( q' U
    ublic void serviceInit(final Configuration conf) throws Exception {/ s, z+ e) V8 R- S& M
    dispatcher = new AsyncDispatcher();//定义一个中央异步调度器9 F8 R0 L2 |/ v5 F" t/ K9 O1 E
    //分别注册JobTask事件调度器& c( U! V- a! z8 Q- x  q/ g
    dispatcher.register(JobEventType.class, new JobEventDispatcher());1 `+ v4 R& t+ J5 ^7 J. m2 u
    dispatcher.register(TaskEventType.class, new TaskEventDispatcher());5 V, J( t' O2 H# T; d$ @
    addService((Service) dispatcher);
    5 [2 ^* T0 u0 d0 tsuper.serviceInit(conf);
    " @- i$ ^) d+ x9 P} p9 U  E( z: r( f& N( r3 a) w+ _
    ublic Dispatcher getDispatcher() {
    - q4 n: ?& {) xreturn dispatcher;
    " H- j; e& [4 x6 j6 j3 f} p
    * ~$ \. B, x/ q# d3 Srivate class JobEventDispatcher implements EventHandler<JobEvent> {
    # ^; n$ x( t: E: N9 [4 E/ p+ N@Override( f0 c( [/ t8 N2 `% q2 p0 k7 m
    public void handle(JobEvent event) {! g0 x- S- o$ l; a1 V: k. R' x
    if(event.getType() == JobEventType.JOB_KILL) {) W' p0 v$ V: t6 k
    System.out.println("Receive JOB_KILL event, killing all the tasks");; c5 `3 R9 I  |2 w+ K
    for(int i = 0; i < taskNumber; i++) {
    2 ?, I7 P, v3 {8 Bdispatcher.getEventHandler().handle(new TaskEvent(taskIDs,
    " {! M) a+ R* o3 _TaskEventType.T_KILL));
    # [) f0 g0 ]. n! y9 ^6 q}) @$ T; M9 `' o( v
    } else if(event.getType() == JobEventType.JOB_INIT) {+ n+ [- {! L2 G5 I4 v8 x# ]4 w7 B. c5 f
    System.out.println("Receive JOB_INIT event, scheduling tasks");# ~% ?' h  Q7 u( b6 N; \% @
    for(int i = 0; i < taskNumber; i++) {+ V: C& m: O' c- V! t- |8 N( D
    dispatcher.getEventHandler().handle(new TaskEvent(taskIDs,- }6 f5 G: U- N; B' E2 Y; T+ I1 L
    TaskEventType.T_SCHEDULE));. F3 o$ H3 F7 `9 r
    }
    2 p7 w' E- l8 M' F}
    2 z) Q% J1 e, Y$ |/ _& q! Z5 i}
    & R2 J0 ^4 p' P$ Y) a7 l}p8 l* k/ v" ~( R
    rivate class TaskEventDispatcher implements EventHandler<TaskEvent> {
      z* I# s2 k: ?1 U/ Y, g@Override/ `$ `5 M- m# D4 |
    public void handle(TaskEvent event) {- X' E9 z  V2 C" v2 ^
    if(event.getType() == TaskEventType.T_KILL) {
    ! L7 K% K& }. i9 m1 }System.out.println("Receive T_KILL event of task " + event.getTaskID());- Y4 ?) N& Q* p7 x" R
    } else if(event.getType() == TaskEventType.T_SCHEDULE) {* t7 k- w, \3 n; V/ j' Z0 A  R. r
    System.out.println("Receive T_SCHEDULE event of task " + event.getTaskID());) p' q  t5 U9 A% s  |
    }( `8 o5 m, N- s- D" W
    }, Q- o" W8 h: v
    }! s- |' o( b8 F
    }: G, T; {! J8 o0 F( `
    4) 测试程序。
    - n. j) r0 c" _1 i' t3 s
    @SuppressWarnings("unchecked"): f. i+ m8 x# n# q# Z
    public class SimpleMRAppMasterTest {
    5 d. T/ ]2 y: \8 d4 Gpublic static void main(String[] args) throws Exception {3 W: H. B$ s  t
    String jobID = "job_20131215_12";
    % w& \# `( v' b; O7 \) XSimpleMRAppMaster appMaster = new SimpleMRAppMaster("Simple MRAppMaster", jobID, 5);, \2 A$ Y7 i0 O8 s- {. ?
    YarnConfiguration conf = new YarnConfiguration(new Configuration());
    ' `+ y) B* L& z2 t6 \4 k% [# VappMaster.serviceInit(conf);
    ; u- O5 \& G+ A4 c* f- o# p5 P, \appMaster.serviceStart();
    & d2 A/ U* Z  ?* u3 ]appMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID,: m# ]! f# r) N5 `- ]
    JobEventType.JOB_KILL));
    + U* \. w7 A1 E3 w7 F* gappMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID,; O# z8 d& M0 w5 Y+ C
    JobEventType.JOB_INIT));6 s; `* A9 u3 b' w% G
    }- }; B& h/ z  q( I) N- Y" Q! C
    3.4.4 事件驱动带来的变化1 j  ~( m9 [# @1 R, P
    MRv1中, 对象之间的作用关系是基于函数调用实现的, 当一个对象向另外一个对象传递信息时, 会直接采用函数调用的
    6 l' y; h9 s4 C% T方式, 且整个过程是串行的。 比如, 当
    TaskTracker需要执行一个Task时, 将首先下载Task依赖的文件( JAR包、 二进制文件等、
    ; I5 C6 A' L2 l! b  n字典文件等) 、 然后执行
    Task。 同时在整个过程中会记录一些关键日志, 该过程可用图3-16描述。 在整个过程中, 下载依赖文件
    . q! T: r) f, R& {+ \是阻塞式的, 也就是说, 前一个任务未完成文件下载之前, 后一个新任务将一直处于等待状态, 只有在下载完成后, 才会启动一5 u. E! R4 z% e3 h! N
    个独立进程运行该任务。 尽管后来
    MRv1通过启动过独立线程下载文件 解决了该问题 [15] , 但这种方式不是在大系统中彻底解决
    " `' b  N) k  j4 T9 `问题之道, 必须引入新的编程模型。

    $ Q# l7 {4 Y) A! a$ n( Y. U* `3-16 基于函数调用的工作流程  S' ~  O5 h0 i& K% e
    基于函数调用的编程模型是低效的, 它隐含着整个过程是串行、 同步进行的。 相比之下,
    MRv2引入的事件驱动编程模型则
    4 k  U  B% F4 p% H2 J是一种更加高效的方式。 在基于事件驱动的编程模型中, 所有对象被抽象成了事件处理器, 而事件处理器之间通过事件相互关
    # n: {" @# G4 H) \联。 每种事件处理器处理一种类型的事件, 同时根据需要触发另外一种事件, 该过程如图
    3-17所示, 当A需要下载文件时, 只需( p" H( _8 w) f8 u0 u
    向中央异步处理器发送一个事件即可( 之后可以继续完成后面的功能而无须等待下载完成) , 该事件会被传递给对应的事件处理
    3 x0 f3 A! O( n9 r0 j% D6 G
    B, 由B完成具体的下载任务。 一旦B完成下载任务, 便可以通过事件通知A
    ; V$ x, |; q" u
    3-17 基于事件驱动的工作流程
    % Y0 L, }* S( E  Y  U0 u8 t8 {相比于基于函数调用的编程模型, 这种编程方式具有异步、 并发等特点, 更加高效, 因此更适合大型分布式系统。# n2 e; w* i) T: G6 B9 d. ^" R
    [15] 参见网址https://issues.apache.org/jira/browse/MAPREDUCE-2705  ; }# L# g+ k% |: k" ]4 V

    / t9 ]/ X8 Y( k& U/ n$ r
    6 k% k' d: [/ v" E: {9 S: @
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|Archiver|手机版|小黑屋|Java自学网

    GMT+8, 2024-12-22 09:27 , Processed in 0.140059 second(s), 30 queries .

    Powered by Javazx

    Copyright © 2012-2022, Javazx Cloud.

    快速回复 返回顶部 返回列表