java自学网VIP

Java自学网

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 4323|回复: 0

《深入解析YARN架构设计与实现原理》第3章 YARN基础库【3.4】

[复制链接]
  • TA的每日心情
    开心
    2021-5-25 00:00
  • 签到天数: 1917 天

    [LV.Master]出神入化

    2093

    主题

    3751

    帖子

    6万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    66775

    宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2017-4-17 13:58:22 | 显示全部楼层 |阅读模式
    3.4 服务库与事件库
    # t$ Y7 x3 s, J5 p# i% G' S4 _  N本节介绍服务库和事件库。- K% D1 F. B( P
    3.4.1 服务库7 |' a! U" U/ r3 D6 y# t3 m, m
    对于生命周期较长的对象, YARN采用了基于服务的对象管理模型对其进行管理, 该模型主要有以下几个特点。/ i# \- p' w' o; V
    ❑将每个被服务化的对象分为4个状态: NOTINITED( 被创建) 、 INITED( 已初始化) 、 STARTED( 已启动) 、, o" l7 E( G7 @, B
    STOPPED( 已停止) 。' B6 u8 Q: M5 M! e- w
    ❑任何服务状态变化都可以触发另外一些动作。
    + J7 P$ z6 }. K* @$ @. P4 c
    ❑可通过组合的方式对任意服务进行组合, 以便进行统一管理。8 ?3 s4 T. o! O( @9 d. w. L
    YARN中关于服务模型的类图( 位于包org.apache.hadoop.service中) 如图3-13所示。 在这个图中, 我们可以看到, 所有的服务
    3 R0 x- j2 e, y: P% e对象最终均实现了接口
    Service, 它定义了最基本的服务初始化、 启动、 停止等操作, 而AbstractService类提供了一个最基本的) c: M0 p. x" q9 M4 h! ^
    Service实现。 YARN中所有对象, 如果是非组合服务, 直接继承AbstractService类即可, 否则需继承CompositeService。 比如, 对于2 w( j/ t1 I$ V/ d
    ResourceManager而言, 它是一个组合服务, 它组合了各种服务对象, 包括ClientRMServiceApplicationMasterLauncher1 o: B% T* X" J8 q8 z$ S8 ?3 i  V
    ApplicationMasterService等。
    ! x3 q2 E' U) V, r* c" M/ X; t8 p$ d4 m
    3-13 YARN中服务模型的类图1 J/ M  D" a5 J* S- e: u
    YARN中, ResourceManagerNodeManager属于组合服务, 它们内部包含多个单一服务和组合服务, 以实现对内部多种服2 Y( ^& F# O+ Q" K8 l
    务的统一管理。
    / i' A8 e1 Y5 K4 n$ L
    3.4.2 事件库) L4 _1 _/ P0 b% }+ \. e$ }( K
    YARN采用了基于事件驱动的并发模型, 该模型能够大大增强并发性, 从而提高系统整体性能。 为了构建该模型, YARN7 p. |+ l2 g$ s3 N
    各种处理逻辑抽象成事件和对应事件调度器, 并将每类事件的处理过程分割成多个步骤, 用有限状态机表示。
    YARN中的事件处
    3 s2 [* u, D9 e  d# ^理模型可概括为图
    3-14所示。
    ! z& H+ F- ^: K
    3-14 YARN的事件处理模型9 z( c) ?/ m+ _
    整个处理过程大致为: 处理请求会作为事件进入系统, 由中央异步调度器(
    Async-Dispatcher) 负责传递给相应事件调度器3 R( j8 i2 L+ s6 u8 O/ I& ^& }, Q
    Event Handler) 。 该事件调度器可能将该事件转发给另外一个事件调度器, 也可能交给一个带有有限状态机的事件处理器, 其
    6 M; s- \& }* m1 D. L: \处理结果也以事件的形式输出给中央异步调度器。 而新的事件会再次被中央异步调度器转发给下一个事件调度器, 直至处理完成9 i9 o9 R1 l( _- T& S' v9 G- X
    ( 达到终止条件) 。( l& s2 N' R, t8 E
    YARN中, 所有核心服务实际上都是一个中央异步调度器, 包括ResourceManagerNodeManager
    + d$ F" L, k0 Q8 X, H, Y0 Z
    MRAppMasterMapReduce应用程序的ApplicationMaster) 等, 它们维护了事先注册的事件与事件处理器, 并根据接收的事件类型7 s9 l, }/ r0 @: w9 e( H4 s5 o! }
    驱动服务的运行。; C9 d3 o: Z; k. |& {4 Y: R
    YARN中事件与事件处理器类的关系( 位于包org.apache.hadoop.yarn.event中) 如图3-15所示。 当使用YARN事件库时, 通常先
    * y9 }6 S, l, u, O8 v; e& Y  H; W1 M1 q要定义一个中央异步调度器
    AsyncDispatcher, 负责事件的处理与转发, 然后根据实际业务需求定义一系列事件Event与事件处理器
    + }& e: L5 c1 ^0 S
    EventHandler, 并注册到中央异步调度器中以实现事件统一管理和调度。 以MRAppMaster为例, 它内部包含一个中央异步调度器( K; K# V6 d5 l9 `: e
    AsyncDispatcher, 并注册了TaskAttemptEvent/TaskAttemptImplTaskEvent/TaskImplJobEvent/JobImpl等一系列事件/事件处理器, 由; v: _5 H! U; x; X! l
    中央异步调度器统一管理和调度。- j0 ?4 i* j8 ?/ F; p
    服务化和事件驱动软件设计思想的引入, 使得
    YARN具有低耦合、 高内聚的特点, 各个模块只需完成各自功能, 而模块之间
      j4 C2 s" F5 `8 y( A则采用事件联系起来, 系统设计简单且维护方便。

    8 t  q) `/ `, j5 @1 l% b, w* n0 h3-15 事件与事件处理器5 c: Q6 i) V/ s: H; t5 Y
    3.4.3 YARN服务库和事件库的使用方法
    / X5 t, k0 G5 O4 f& l. n
    为了说明YARN服务库和事件库的使用方法, 本小节介绍一个简单的实例, 该实例可看做MapReduce& `1 `) {! S# H0 K- P3 Z/ Y4 x
    ApplicationMaster
    MRAppMaster) 的简化版。 该例子涉及任务和作业两种对象的事件以及一个中央异步调度器。 步骤如下。
    : `# t# w' u3 e/ ]# M) \
    1) 定义Task事件。4 w6 N! w; X% n
    public class TaskEvent extends AbstractEvent<TaskEventType> {
    - J. |$ g7 S0 _0 zprivate String taskID; //Task ID
    ' Z$ F6 i" g. h, lpublic TaskEvent(String taskID, TaskEventType type) {
    8 |7 s" l" {1 L, y+ c# m% |4 I) asuper(type);; }* M0 q, J- W8 B! Y
    this.taskID = taskID;
    2 R/ P2 i1 K0 E! l$ R7 l, l} p& v3 N" O  I$ x8 b) S( Y
    ublic String getTaskID() {
    % \3 X5 d; A2 C- J; c" Breturn taskID;
    . H# B0 M. i. [# z}
    # s# m# Y) u3 c6 Y中,
    Task事件类型定义如下:+ q6 H* [6 i( n9 e" O
    public enum TaskEventType {* H$ O* R0 r& J5 i9 }
    T_KILL,+ K$ o: ^( e3 L+ r0 ^% a3 `5 l
    T_SCHEDULE4 z5 S$ q8 X- V5 J9 N
    }& M% e# u# @) [
    2) 定义Job事件。
    . K7 g; `9 h3 A( Y3 K# @
    public class JobEvent extends AbstractEvent<JobEventType> {3 u6 }" R' `5 g, q2 i
    private String jobID;
      _- ~  ?: d3 wpublic JobEvent(String jobID, JobEventType type) {
    ; V+ v% h* h! f7 F* m6 e% @: ysuper(type);
    , ?; R4 w( F/ _' M- r, f' ~) Dthis.jobID = jobID;
    3 E7 t& o' f+ ]5 u. d4 l0 m$ ^9 M5 b% T} p
    % @8 X  ]$ b; j  p& u! d4 s% ?ublic String getJobId() {
    ( ]( b. }; n9 F) @6 f/ @: Ereturn jobID;$ X1 {7 `% l& R, J% y: u" D
    }* H& @! x0 ]9 M
    }! N. L( B. U5 g+ y
    其中, Job事件类型定义如下:% {3 A+ ^, T9 q1 c) v  H, U
    public enum JobEventType {- K: y' T) d# n/ S+ U& f9 x4 j$ a9 o
    JOB_KILL,3 R5 F' R* p& U9 E( z
    JOB_INIT,
    7 L' C9 M& ]9 W; o6 h6 c6 Q8 D& wJOB_START& F) I: }* v0 ?2 i6 T' W
    }1 m$ S" J. r( }/ f/ [- q' v
    3) 事件调度器。# [) [( G. C' `9 P+ T
    接下来定义一个中央异步调度器, 它接收
    JobTask两种类型事件, 并交给对应的事件处理器处理, 代码如下:
    ' J6 w, k) Z6 o; ^% J@SuppressWarnings("unchecked")
    8 g% Y! g/ J, Y8 r: wpublic class SimpleMRAppMaster extends CompositeService {
    ( U0 g3 p2 g) `0 Lprivate Dispatcher dispatcher; //中央异步调度器) r* Q( k* s# t: ]8 L) a: P
    private String jobID;% k: |& }/ t) o, i
    private int taskNumber; //该作业包含的任务数目
    6 L: [0 d9 O/ L! o% e  d, W& a
    private String[] taskIDs; //该作业内部包含的所有任务
    1 s/ h3 j5 D& V5 s9 _# V
    public SimpleMRAppMaster(String name, String jobID, int taskNumber) {
    : G3 {- E- n% O; t2 Tsuper(name);& l. X# u0 Q1 V" z/ W( T4 n
    this.jobID = jobID;
    0 ^* U( @* Y, i4 w+ Rthis.taskNumber = taskNumber;- x4 ]' N/ G' ?; S+ r
    taskIDs = new String[taskNumber];
    9 a; C7 y2 P$ H- b) y6 B9 E6 `6 }for(int i = 0; i < taskNumber; i++) {6 ^# `6 n% V) \. D
    taskIDs = new String(jobID + "_task_" + i);" S- P: I( W3 K% Y) k
    }' \$ w8 G9 k! K. H' B
    } p
    7 k. z( U, \2 ^) |; @ublic void serviceInit(final Configuration conf) throws Exception {
    $ b6 @2 a! S0 H% _) t5 f7 \6 q- t9 Wdispatcher = new AsyncDispatcher();//定义一个中央异步调度器
    ! k& L) ?& g; d% O" ?7 ~2 Z
    //分别注册JobTask事件调度器3 N5 m$ h! c" X* p; U' d
    dispatcher.register(JobEventType.class, new JobEventDispatcher());7 B9 E: f2 a1 b. ?- |1 I
    dispatcher.register(TaskEventType.class, new TaskEventDispatcher());: K8 e, D- I; H2 Z8 {! E7 v. K
    addService((Service) dispatcher);
    " i3 E  M# _2 F3 ]2 Rsuper.serviceInit(conf);
    0 R; X" V' O" q' c" R, C& O0 H, v} p
    % W( R5 r. W+ I3 b8 o! vublic Dispatcher getDispatcher() {
    ! N! p9 C$ B! A* ireturn dispatcher;# a. B+ I3 V0 z5 g& K: q
    } p
    % E1 C' @: T/ L' C+ I# X. ]1 k- urivate class JobEventDispatcher implements EventHandler<JobEvent> {/ X7 F) z( p9 S: p! `: E2 A( l, S
    @Override
    & O& y3 a$ p5 I1 xpublic void handle(JobEvent event) {
    ; Z- T! u0 L5 h  N; t4 tif(event.getType() == JobEventType.JOB_KILL) {  O  ]' J; d) b( i4 F! Y% A9 f
    System.out.println("Receive JOB_KILL event, killing all the tasks");: p9 v7 E  u( ?) {' g9 ?
    for(int i = 0; i < taskNumber; i++) {
    4 E" O7 [' V5 N+ {dispatcher.getEventHandler().handle(new TaskEvent(taskIDs," Q6 r/ G2 \8 a$ U% Z& k
    TaskEventType.T_KILL));
    9 T3 D. Z, U9 t% h: b0 v}1 x+ Y2 b4 h1 j/ C2 _
    } else if(event.getType() == JobEventType.JOB_INIT) {3 @, h2 E+ y8 h- J9 F
    System.out.println("Receive JOB_INIT event, scheduling tasks");
    + v/ q' E& l  H+ \for(int i = 0; i < taskNumber; i++) {
    9 \5 _. Q$ D! adispatcher.getEventHandler().handle(new TaskEvent(taskIDs,7 F& r. k* {/ d- T/ q
    TaskEventType.T_SCHEDULE));* e7 n9 W: }) C  b: z& {3 G9 ~2 R
    }
    % \1 C- W+ y% B6 Y& z}
    ) @& t+ ~, E- V" G0 \' o: z& P1 D}# N- `' Q/ s9 x% F8 P
    }p
    5 |# F2 C7 `. L8 u0 B5 |$ S  [( q1 Frivate class TaskEventDispatcher implements EventHandler<TaskEvent> {
    0 ]" a0 t. d$ ~* K/ P( C5 B) K% E@Override
    : m# n; s+ o% d0 j/ _9 Mpublic void handle(TaskEvent event) {
    , J. I/ t. ~  D3 g& |/ R5 ]if(event.getType() == TaskEventType.T_KILL) {
    . q8 P) X& f! l' LSystem.out.println("Receive T_KILL event of task " + event.getTaskID());8 S' |! F7 c# I( Z- c& m
    } else if(event.getType() == TaskEventType.T_SCHEDULE) {1 C$ K& q9 ^) c8 f. |1 x$ ~8 B$ E) V
    System.out.println("Receive T_SCHEDULE event of task " + event.getTaskID());# G0 I) b9 u3 O7 M: [) i, k' P
    }
    $ }2 t! ?+ h& y) `2 y}
    $ s" E2 y: W' q7 M; s+ E; w}. ^1 f. \# D3 E( l, V3 Q/ k) d0 |
    }
    : U3 f, i4 M" g; [' X0 l5 J  R1 ]4) 测试程序。" H8 {  A: V' _- C7 G5 O3 u$ r0 F
    @SuppressWarnings("unchecked")# q$ y1 M( a0 n6 r
    public class SimpleMRAppMasterTest {
    , }, F8 o' r  B" t0 rpublic static void main(String[] args) throws Exception {$ Y' M' a% l/ [* b- L
    String jobID = "job_20131215_12";7 G6 E# w! S8 r
    SimpleMRAppMaster appMaster = new SimpleMRAppMaster("Simple MRAppMaster", jobID, 5);
    7 O8 r& r) _3 v3 ?. F4 @YarnConfiguration conf = new YarnConfiguration(new Configuration());' H' R4 F# E- j- V( l, k
    appMaster.serviceInit(conf);
    * i1 d* \  W! S% \" [- PappMaster.serviceStart();
    / p. l- W: O* l% x- t3 K5 XappMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID,
    9 Q# {3 u5 `3 J! ]+ U" Q. BJobEventType.JOB_KILL));
    0 \- A6 V8 C6 p  F- \appMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID,
    / _5 h# Y' S! J- k# g! `% GJobEventType.JOB_INIT));
    - |2 i5 @! v9 S}2 Z$ k7 x2 f: s  e
    3.4.4 事件驱动带来的变化, p3 V6 n- b* o  O, l, ]3 }8 Q  U
    MRv1中, 对象之间的作用关系是基于函数调用实现的, 当一个对象向另外一个对象传递信息时, 会直接采用函数调用的7 ^1 ~2 y; r6 b* r8 s+ W4 i& |
    方式, 且整个过程是串行的。 比如, 当
    TaskTracker需要执行一个Task时, 将首先下载Task依赖的文件( JAR包、 二进制文件等、$ K# b* Q/ ^% K2 v" F; G: u8 ]
    字典文件等) 、 然后执行
    Task。 同时在整个过程中会记录一些关键日志, 该过程可用图3-16描述。 在整个过程中, 下载依赖文件
    0 W: o+ Y. X4 Q  h/ {$ E是阻塞式的, 也就是说, 前一个任务未完成文件下载之前, 后一个新任务将一直处于等待状态, 只有在下载完成后, 才会启动一
    - W7 Q8 J4 V) E/ g个独立进程运行该任务。 尽管后来
    MRv1通过启动过独立线程下载文件 解决了该问题 [15] , 但这种方式不是在大系统中彻底解决, L7 a$ p  Y5 j2 c5 G
    问题之道, 必须引入新的编程模型。
    - i1 |- T* `; i8 c! _' N
    3-16 基于函数调用的工作流程
    $ Y$ S/ X; Q1 l  o7 n: t4 i. ^基于函数调用的编程模型是低效的, 它隐含着整个过程是串行、 同步进行的。 相比之下,
    MRv2引入的事件驱动编程模型则; W, T1 w' Y# x/ Y; U4 N
    是一种更加高效的方式。 在基于事件驱动的编程模型中, 所有对象被抽象成了事件处理器, 而事件处理器之间通过事件相互关
    1 e  K5 W" I7 l/ V! [' ], u联。 每种事件处理器处理一种类型的事件, 同时根据需要触发另外一种事件, 该过程如图
    3-17所示, 当A需要下载文件时, 只需
    7 o% h" \# W7 j7 b向中央异步处理器发送一个事件即可( 之后可以继续完成后面的功能而无须等待下载完成) , 该事件会被传递给对应的事件处理
    $ a. ^* I& D: ?' r6 }
    B, 由B完成具体的下载任务。 一旦B完成下载任务, 便可以通过事件通知A4 j0 @$ \% S% c! v9 k% ]1 H
    3-17 基于事件驱动的工作流程5 ^5 k% F/ l% j& o
    相比于基于函数调用的编程模型, 这种编程方式具有异步、 并发等特点, 更加高效, 因此更适合大型分布式系统。
    / m& ~2 H( R5 @  }9 S9 ?" u
    [15] 参见网址https://issues.apache.org/jira/browse/MAPREDUCE-2705  
    0 g$ @; Q7 }: w( E( s7 y) Y9 u# n" _4 S2 F9 l* O) h$ V
    $ }7 [/ h  T/ x- _3 y* f4 `
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|Archiver|手机版|小黑屋|Java自学网

    GMT+8, 2025-3-30 06:15 , Processed in 0.288005 second(s), 33 queries .

    Powered by Javazx

    Copyright © 2012-2022, Javazx Cloud.

    快速回复 返回顶部 返回列表