java自学网VIP

Java自学网

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 3986|回复: 0

《深入解析YARN架构设计与实现原理》第3章 YARN基础库【3.4】

[复制链接]
  • TA的每日心情
    开心
    2021-5-25 00:00
  • 签到天数: 1917 天

    [LV.Master]出神入化

    2025

    主题

    3683

    帖子

    6万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    66265

    宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2017-4-17 13:58:22 | 显示全部楼层 |阅读模式
    3.4 服务库与事件库
    , L$ e, T% @7 r本节介绍服务库和事件库。
    : N  a* m7 N$ Y4 I" p' r* c% o1 N& |
    3.4.1 服务库
    # J6 A& I0 j: q9 T. x# z
    对于生命周期较长的对象, YARN采用了基于服务的对象管理模型对其进行管理, 该模型主要有以下几个特点。' P/ z& V  K5 M) D
    ❑将每个被服务化的对象分为4个状态: NOTINITED( 被创建) 、 INITED( 已初始化) 、 STARTED( 已启动) 、
    4 `, H$ ^4 ?0 `/ x5 V4 R0 ?
    STOPPED( 已停止) 。
    2 o2 `' I: T; ]4 F0 B; B5 x
    ❑任何服务状态变化都可以触发另外一些动作。* }  O: [+ `8 m$ Q+ P: Y
    ❑可通过组合的方式对任意服务进行组合, 以便进行统一管理。
    ; g" l$ i, C: x9 p) `) A( J, h
    YARN中关于服务模型的类图( 位于包org.apache.hadoop.service中) 如图3-13所示。 在这个图中, 我们可以看到, 所有的服务" H/ M$ h! A, W9 }
    对象最终均实现了接口
    Service, 它定义了最基本的服务初始化、 启动、 停止等操作, 而AbstractService类提供了一个最基本的& w5 m7 @8 T( z8 o3 W& U9 X/ }
    Service实现。 YARN中所有对象, 如果是非组合服务, 直接继承AbstractService类即可, 否则需继承CompositeService。 比如, 对于) g1 Z1 D  X" M" E2 q3 A5 h
    ResourceManager而言, 它是一个组合服务, 它组合了各种服务对象, 包括ClientRMServiceApplicationMasterLauncher
    * K1 e, F8 e% X' J& ?" p' b+ O& @* n& X
    ApplicationMasterService等。' _+ t1 ]& O$ A  T$ n; Z1 I" F9 t
    3-13 YARN中服务模型的类图
    7 |- _3 I, V4 A! e( B! x9 k1 `
    YARN中, ResourceManagerNodeManager属于组合服务, 它们内部包含多个单一服务和组合服务, 以实现对内部多种服
    4 b' g9 G7 w7 q1 X务的统一管理。
    5 D* I6 q5 y5 d9 U+ i
    3.4.2 事件库
    ( j9 C, a9 e0 O. y6 o+ dYARN采用了基于事件驱动的并发模型, 该模型能够大大增强并发性, 从而提高系统整体性能。 为了构建该模型, YARN
    1 `1 w$ A; T: I! O( f各种处理逻辑抽象成事件和对应事件调度器, 并将每类事件的处理过程分割成多个步骤, 用有限状态机表示。
    YARN中的事件处5 Y9 Q, a( ^3 r- X; t8 K
    理模型可概括为图
    3-14所示。
    1 x+ t4 d- S, V" ~. f. V
    3-14 YARN的事件处理模型
    , e# K* N' R4 h1 p" x. b整个处理过程大致为: 处理请求会作为事件进入系统, 由中央异步调度器(
    Async-Dispatcher) 负责传递给相应事件调度器
    0 ]" j7 l7 p' h! b: D7 k
    Event Handler) 。 该事件调度器可能将该事件转发给另外一个事件调度器, 也可能交给一个带有有限状态机的事件处理器, 其" g( `1 K' I& a5 ~! X7 {# o* H
    处理结果也以事件的形式输出给中央异步调度器。 而新的事件会再次被中央异步调度器转发给下一个事件调度器, 直至处理完成' |* t% m6 ]: _/ p+ o. @8 \
    ( 达到终止条件) 。8 _& l2 D7 w- l% D! N
    YARN中, 所有核心服务实际上都是一个中央异步调度器, 包括ResourceManagerNodeManager
    2 ]2 t$ [5 @# m2 G' E
    MRAppMasterMapReduce应用程序的ApplicationMaster) 等, 它们维护了事先注册的事件与事件处理器, 并根据接收的事件类型
    % ^& T7 w, e+ J4 n( U驱动服务的运行。
    , U- ~7 H9 f: I" t1 q* \+ C
    YARN中事件与事件处理器类的关系( 位于包org.apache.hadoop.yarn.event中) 如图3-15所示。 当使用YARN事件库时, 通常先
    . F" N/ |* e5 i% q" M! Y# ]要定义一个中央异步调度器
    AsyncDispatcher, 负责事件的处理与转发, 然后根据实际业务需求定义一系列事件Event与事件处理器
    3 ^1 {8 v. Y  ^
    EventHandler, 并注册到中央异步调度器中以实现事件统一管理和调度。 以MRAppMaster为例, 它内部包含一个中央异步调度器( ~/ W* f5 a/ f7 y
    AsyncDispatcher, 并注册了TaskAttemptEvent/TaskAttemptImplTaskEvent/TaskImplJobEvent/JobImpl等一系列事件/事件处理器, 由  G# R, ^+ X) P! J
    中央异步调度器统一管理和调度。2 }$ J6 ~1 m3 |
    服务化和事件驱动软件设计思想的引入, 使得
    YARN具有低耦合、 高内聚的特点, 各个模块只需完成各自功能, 而模块之间- e/ b/ i( H8 N8 r3 a4 n
    则采用事件联系起来, 系统设计简单且维护方便。
    . o5 `% o# P. s; N, c0 h
    3-15 事件与事件处理器7 ^/ ]7 k$ M0 w
    3.4.3 YARN服务库和事件库的使用方法
    6 z( k; t: n0 ?$ f6 c* T( ^
    为了说明YARN服务库和事件库的使用方法, 本小节介绍一个简单的实例, 该实例可看做MapReduce$ U7 j; w+ ?, l2 P( }
    ApplicationMaster
    MRAppMaster) 的简化版。 该例子涉及任务和作业两种对象的事件以及一个中央异步调度器。 步骤如下。6 u$ ]; P! Q5 ~" r
    1) 定义Task事件。/ ?: L7 q2 b$ q; }$ t9 B5 V( o
    public class TaskEvent extends AbstractEvent<TaskEventType> {* E1 ~( f' s5 N2 r% k- {& k8 `2 ]& j
    private String taskID; //Task ID
    6 S2 r0 o2 Z$ [& mpublic TaskEvent(String taskID, TaskEventType type) {
    , h( G1 l6 Y7 S: X+ ysuper(type);) W8 J- K/ }. j
    this.taskID = taskID;  d+ Z& _4 y  G4 |1 Y9 g- T- P
    } p
    / n! |% E. W5 Q; d, bublic String getTaskID() {7 }! o+ D7 N$ @! O; e7 J
    return taskID;
    * b" v7 Y5 f3 W  g/ [, a3 j9 H: b$ _) z} 0 U- L. T) ^8 }/ A2 F
    中,
    Task事件类型定义如下:
      Y5 O! M. n7 G$ o/ Z& B. E
    public enum TaskEventType {
    3 X- Z5 B- G3 jT_KILL,
    : }3 X* F( f. h( a1 dT_SCHEDULE, [( e& A, ?/ g7 C& G
    }
    7 x2 d. {5 @/ B! I# W; ~2) 定义Job事件。5 ?% Q2 z* P% @0 L7 w
    public class JobEvent extends AbstractEvent<JobEventType> {
    2 o) s) ?+ m& h' j) ^  f/ c9 r6 uprivate String jobID;$ _6 @+ P( O# N) B6 [& @+ a" U
    public JobEvent(String jobID, JobEventType type) {* ?: f, s. J) C) w; i# w2 y4 s
    super(type);( c3 ~, C( g* f
    this.jobID = jobID;4 p8 |  h) ]4 p1 V) I; \
    } p
    ; g  s) @7 G3 S# F- w7 Wublic String getJobId() {
    + @/ m" J9 f$ I' d3 D' lreturn jobID;: w4 U  A! I* g/ I5 r4 d) z3 r2 a' e* k
    }3 r7 ]8 T4 @4 O, \" Y
    }9 X  d/ [! I( |8 f* y# l) s1 R
    其中, Job事件类型定义如下:3 s8 M0 y$ P* k3 T
    public enum JobEventType {
    $ G: M' {4 [: \1 I9 W! MJOB_KILL,
    # N% g( h/ k/ g; v' ]- PJOB_INIT,$ \' Q; y  I# `  l& n$ ]# O
    JOB_START
    7 l; V: a2 _4 j}7 N* Q4 X& ?, Y
    3) 事件调度器。# T4 l$ K  o5 `. c( v9 E
    接下来定义一个中央异步调度器, 它接收
    JobTask两种类型事件, 并交给对应的事件处理器处理, 代码如下:& U, ~) ^- f4 S- I* ]7 c
    @SuppressWarnings("unchecked")
    7 K  B* _5 N# Q* m- H" }' Mpublic class SimpleMRAppMaster extends CompositeService {8 a" T% A/ {) S5 O3 t6 Z
    private Dispatcher dispatcher; //中央异步调度器( v9 [" n8 g$ Z9 ^9 L& ]1 f
    private String jobID;
    . F+ @( ]. G1 f# M4 Iprivate int taskNumber; //该作业包含的任务数目
    ! B( |: J% U1 F$ P: `
    private String[] taskIDs; //该作业内部包含的所有任务
    5 C% i. z2 P9 Z+ s, Q. k- J
    public SimpleMRAppMaster(String name, String jobID, int taskNumber) {
    4 t+ [$ Z- l3 B4 M7 Esuper(name);5 K2 M  F" K, k2 S& ^' ]0 L
    this.jobID = jobID;! L- l2 s/ Z# {) O/ R2 J
    this.taskNumber = taskNumber;
      D8 R/ }) R. ^5 G# o* v/ w8 W3 V9 otaskIDs = new String[taskNumber];8 ?- A' j. ~3 h
    for(int i = 0; i < taskNumber; i++) {% S: z' n) H+ m, b- Z3 k* U) F
    taskIDs = new String(jobID + "_task_" + i);2 p# \- C7 ^; @& y# `6 I
    }. B4 o: D, e2 K" z
    } p# v) ^6 M  V9 {$ L& \
    ublic void serviceInit(final Configuration conf) throws Exception {
    & \! y4 R2 L' }/ Rdispatcher = new AsyncDispatcher();//定义一个中央异步调度器4 r- Y, W+ B/ L% s8 ~% ?
    //分别注册JobTask事件调度器8 Z' s, j3 g8 c  E1 ~6 T; Z: l
    dispatcher.register(JobEventType.class, new JobEventDispatcher());
    4 ?4 ]- o  Q: w0 Vdispatcher.register(TaskEventType.class, new TaskEventDispatcher());
      \4 I; h* i5 a/ `; A8 G" ZaddService((Service) dispatcher);
    ; G7 z+ {; J& y1 _: Z' Gsuper.serviceInit(conf);
    6 k$ w3 E( \3 [* ]9 W} p
    ( n2 I6 Y7 g2 Bublic Dispatcher getDispatcher() {
    5 ~. E1 e2 M. j. g5 r7 \4 {return dispatcher;
    : U/ [: G! w9 w4 V& c- P} p0 y' D, Y/ A0 v) Z1 ]1 M" n# E
    rivate class JobEventDispatcher implements EventHandler<JobEvent> {
    5 [8 S4 Q$ Y4 M( N; v0 e@Override' ]2 j+ f/ R" p0 [5 j' v
    public void handle(JobEvent event) {- ^, z( m+ s8 J, \: C
    if(event.getType() == JobEventType.JOB_KILL) {
    + d& y- t* J# L% i7 f' JSystem.out.println("Receive JOB_KILL event, killing all the tasks");
    7 D! o9 M9 v; N. M8 {( `& efor(int i = 0; i < taskNumber; i++) {% l: E' u5 Y7 e, S5 S! o
    dispatcher.getEventHandler().handle(new TaskEvent(taskIDs,
    ) Y9 e4 T9 I& J7 g, B4 x' `$ FTaskEventType.T_KILL));+ e8 ?8 _) n: d# B( m
    }  e: Z+ f7 b$ F) S0 F5 J* P
    } else if(event.getType() == JobEventType.JOB_INIT) {
    6 f6 W4 T- h& ?" |6 n: U" oSystem.out.println("Receive JOB_INIT event, scheduling tasks");2 x; F7 f. e% J& P! H$ W
    for(int i = 0; i < taskNumber; i++) {
    2 w, u+ t1 ]* t+ Ndispatcher.getEventHandler().handle(new TaskEvent(taskIDs,5 S9 X& z- @3 I; S
    TaskEventType.T_SCHEDULE));- {9 z, f" [4 {7 g& U2 {2 E
    }
    6 d" M; R) C- R9 b( W}
    " R3 z; x2 s4 I3 G! B}2 R  z6 }5 a; Y6 S: J
    }p
    3 ]8 \( T/ E, s- erivate class TaskEventDispatcher implements EventHandler<TaskEvent> {1 V3 n! i/ B9 Z
    @Override4 v% `! F: Q2 E
    public void handle(TaskEvent event) {
    : g/ G$ a2 m+ c+ H& V! ^$ Q2 Pif(event.getType() == TaskEventType.T_KILL) {
    ) d3 u2 c$ ^& |. N9 b6 {System.out.println("Receive T_KILL event of task " + event.getTaskID());
    9 |' x$ o+ X+ y% ^2 j} else if(event.getType() == TaskEventType.T_SCHEDULE) {7 s+ D2 A; _- X8 r
    System.out.println("Receive T_SCHEDULE event of task " + event.getTaskID());
    1 l, g; E* c/ K( l2 |8 B, ^# r9 E; l}
    6 ]. \: Y1 x( H* u5 R7 e  ?& Z}0 r' A6 u5 K1 w  M4 s
    }5 H% ^/ H( }& q) |6 F
    }
    3 ~1 g- ]* Z9 t3 i5 g' I& S' s4) 测试程序。
    % A( r! x" m8 _0 ~: b
    @SuppressWarnings("unchecked")0 l# `, h8 }  F4 ?1 _6 b
    public class SimpleMRAppMasterTest {2 c, r: F# M: r
    public static void main(String[] args) throws Exception {
    + D: d6 Z, B. j2 ~String jobID = "job_20131215_12";$ o6 `* p' N" R3 U, q
    SimpleMRAppMaster appMaster = new SimpleMRAppMaster("Simple MRAppMaster", jobID, 5);
    ; G) @: M- q8 C5 Z5 YYarnConfiguration conf = new YarnConfiguration(new Configuration());
    3 K5 l% i' ~" x0 |, H; KappMaster.serviceInit(conf);$ {6 [4 p( x' D9 L) n
    appMaster.serviceStart();
      }3 B2 J( z3 L8 N) \* QappMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID,8 S. r" X( r" a7 u+ K
    JobEventType.JOB_KILL));
    ! d; G2 E6 q' ~0 u* @appMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID,1 b" ]. h' D* A8 k2 p' i
    JobEventType.JOB_INIT));
    : |; _: \9 i' M& H0 n# r}- x3 f3 u- [$ o
    3.4.4 事件驱动带来的变化
    ; t8 r4 N7 z( `0 ]3 c$ l: v
    MRv1中, 对象之间的作用关系是基于函数调用实现的, 当一个对象向另外一个对象传递信息时, 会直接采用函数调用的9 j3 E6 g4 i. E2 J# G! K( X* ^
    方式, 且整个过程是串行的。 比如, 当
    TaskTracker需要执行一个Task时, 将首先下载Task依赖的文件( JAR包、 二进制文件等、
    % S3 c; K* r% C3 F* E+ t字典文件等) 、 然后执行
    Task。 同时在整个过程中会记录一些关键日志, 该过程可用图3-16描述。 在整个过程中, 下载依赖文件
    - H- [" z% X3 }( n0 F$ D是阻塞式的, 也就是说, 前一个任务未完成文件下载之前, 后一个新任务将一直处于等待状态, 只有在下载完成后, 才会启动一7 d9 t7 L  G% r$ t3 ^5 a/ i5 K% Y
    个独立进程运行该任务。 尽管后来
    MRv1通过启动过独立线程下载文件 解决了该问题 [15] , 但这种方式不是在大系统中彻底解决3 c! m. B/ L7 F
    问题之道, 必须引入新的编程模型。
    $ I$ z/ Z" M, j. y
    3-16 基于函数调用的工作流程
    & f' q  S# C8 e+ S0 `0 ~8 p基于函数调用的编程模型是低效的, 它隐含着整个过程是串行、 同步进行的。 相比之下,
    MRv2引入的事件驱动编程模型则
    2 Z8 @/ l' v: e是一种更加高效的方式。 在基于事件驱动的编程模型中, 所有对象被抽象成了事件处理器, 而事件处理器之间通过事件相互关
    ; M6 M5 S. U" b: C7 m6 F, Z! B联。 每种事件处理器处理一种类型的事件, 同时根据需要触发另外一种事件, 该过程如图
    3-17所示, 当A需要下载文件时, 只需
    ! j, ?) O) x5 V6 F向中央异步处理器发送一个事件即可( 之后可以继续完成后面的功能而无须等待下载完成) , 该事件会被传递给对应的事件处理
    & v# J2 U  {' A5 b# m
    B, 由B完成具体的下载任务。 一旦B完成下载任务, 便可以通过事件通知A
    $ _! P% [% S+ l! F
    3-17 基于事件驱动的工作流程
    0 p/ k" m& z" r  t' j- M: I相比于基于函数调用的编程模型, 这种编程方式具有异步、 并发等特点, 更加高效, 因此更适合大型分布式系统。
    2 b5 R/ J+ ^/ Y8 T
    [15] 参见网址https://issues.apache.org/jira/browse/MAPREDUCE-2705  + ]4 a1 j4 g8 l/ N( ~

    9 X8 {# p- ?/ N3 _9 c3 K; Y3 r, o$ ]# f, W2 A* i( c
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|Archiver|手机版|小黑屋|Java自学网

    GMT+8, 2024-9-8 09:36 , Processed in 0.104000 second(s), 31 queries .

    Powered by Javazx

    Copyright © 2012-2022, Javazx Cloud.

    快速回复 返回顶部 返回列表