java自学网VIP

Java自学网

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 4184|回复: 0

《深入解析YARN架构设计与实现原理》第3章 YARN基础库【3.4】

[复制链接]
  • TA的每日心情
    开心
    2021-5-25 00:00
  • 签到天数: 1917 天

    [LV.Master]出神入化

    2039

    主题

    3697

    帖子

    6万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    66471

    宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2017-4-17 13:58:22 | 显示全部楼层 |阅读模式
    3.4 服务库与事件库& q0 g7 T8 f1 s1 D4 \" x' M/ E  }% K5 C
    本节介绍服务库和事件库。$ X0 |9 s# a- b! ~4 g
    3.4.1 服务库
    : P5 H' A4 u, E$ z# z3 ^" ^' g
    对于生命周期较长的对象, YARN采用了基于服务的对象管理模型对其进行管理, 该模型主要有以下几个特点。& N- \2 i( C0 X
    ❑将每个被服务化的对象分为4个状态: NOTINITED( 被创建) 、 INITED( 已初始化) 、 STARTED( 已启动) 、* l  [! g8 T& F' Z4 N4 @6 ~% |8 P) _
    STOPPED( 已停止) 。
    6 P" l% P  j( R  Q9 ]
    ❑任何服务状态变化都可以触发另外一些动作。2 L' K! X/ H2 ^; F
    ❑可通过组合的方式对任意服务进行组合, 以便进行统一管理。- T- T% l/ r, e; M) b
    YARN中关于服务模型的类图( 位于包org.apache.hadoop.service中) 如图3-13所示。 在这个图中, 我们可以看到, 所有的服务
    + U, z! u9 e& X" S  b  s/ |对象最终均实现了接口
    Service, 它定义了最基本的服务初始化、 启动、 停止等操作, 而AbstractService类提供了一个最基本的
    0 X/ t* R" [9 b( h* Z5 h: J" a
    Service实现。 YARN中所有对象, 如果是非组合服务, 直接继承AbstractService类即可, 否则需继承CompositeService。 比如, 对于9 F) C4 c, \7 ^8 @
    ResourceManager而言, 它是一个组合服务, 它组合了各种服务对象, 包括ClientRMServiceApplicationMasterLauncher; ]" `. l7 |6 B+ l, {
    ApplicationMasterService等。
    " t% |( T: v7 D. e
    3-13 YARN中服务模型的类图
    9 v2 N3 @$ \2 j* V
    YARN中, ResourceManagerNodeManager属于组合服务, 它们内部包含多个单一服务和组合服务, 以实现对内部多种服
    , R( C# G# b  Q% o6 E务的统一管理。
    9 E& C! z" F' N; b
    3.4.2 事件库, N6 U( K% E# z( b  y) J5 V0 {/ ^& l7 f
    YARN采用了基于事件驱动的并发模型, 该模型能够大大增强并发性, 从而提高系统整体性能。 为了构建该模型, YARN
    5 }9 d; y6 Y" ?) {各种处理逻辑抽象成事件和对应事件调度器, 并将每类事件的处理过程分割成多个步骤, 用有限状态机表示。
    YARN中的事件处
    " U& p2 T( P+ B' a理模型可概括为图
    3-14所示。
    2 A% t' P/ h$ v( R: m. F' ^
    3-14 YARN的事件处理模型
    8 a" M& y, Y! K4 E* }  ], V; {整个处理过程大致为: 处理请求会作为事件进入系统, 由中央异步调度器(
    Async-Dispatcher) 负责传递给相应事件调度器
    : M1 Z7 {9 e" @+ l* u6 D
    Event Handler) 。 该事件调度器可能将该事件转发给另外一个事件调度器, 也可能交给一个带有有限状态机的事件处理器, 其# h. a& \. Z$ t: f/ B# d
    处理结果也以事件的形式输出给中央异步调度器。 而新的事件会再次被中央异步调度器转发给下一个事件调度器, 直至处理完成% K6 [, l% \$ R0 v& l1 H8 D6 a0 Q
    ( 达到终止条件) 。. O! m+ `# f' R* d/ s. h5 g
    YARN中, 所有核心服务实际上都是一个中央异步调度器, 包括ResourceManagerNodeManager
    ( p; @. t6 |2 [& h7 d- W4 Z
    MRAppMasterMapReduce应用程序的ApplicationMaster) 等, 它们维护了事先注册的事件与事件处理器, 并根据接收的事件类型
    # j+ y1 M3 [9 [& [4 W9 Y驱动服务的运行。
    ( f' D& k# V. \3 y" `3 R
    YARN中事件与事件处理器类的关系( 位于包org.apache.hadoop.yarn.event中) 如图3-15所示。 当使用YARN事件库时, 通常先8 h6 b+ T* ^3 c: b& B& _7 E  p
    要定义一个中央异步调度器
    AsyncDispatcher, 负责事件的处理与转发, 然后根据实际业务需求定义一系列事件Event与事件处理器
    . B( J: w! r; E
    EventHandler, 并注册到中央异步调度器中以实现事件统一管理和调度。 以MRAppMaster为例, 它内部包含一个中央异步调度器
    % v# G: ^5 D0 p. |: l7 n1 O2 ]
    AsyncDispatcher, 并注册了TaskAttemptEvent/TaskAttemptImplTaskEvent/TaskImplJobEvent/JobImpl等一系列事件/事件处理器, 由* W7 @8 \  r8 m
    中央异步调度器统一管理和调度。
    1 x& V* |; J9 l& A2 E0 o* I9 I% W服务化和事件驱动软件设计思想的引入, 使得
    YARN具有低耦合、 高内聚的特点, 各个模块只需完成各自功能, 而模块之间: [: g' m4 W! l: l4 ~
    则采用事件联系起来, 系统设计简单且维护方便。
    0 V9 t/ J9 ?- j1 X8 V
    3-15 事件与事件处理器% j( f! P& b  g2 R+ T$ Z8 I$ o
    3.4.3 YARN服务库和事件库的使用方法9 L6 K, ?% Y+ T$ d$ L. `
    为了说明YARN服务库和事件库的使用方法, 本小节介绍一个简单的实例, 该实例可看做MapReduce" B4 b! {- H1 W! n4 N3 s$ Z, o6 ^6 X
    ApplicationMaster
    MRAppMaster) 的简化版。 该例子涉及任务和作业两种对象的事件以及一个中央异步调度器。 步骤如下。
    / l; u$ ]; Z, t8 e5 l: T
    1) 定义Task事件。
    8 P) i/ X/ v, X
    public class TaskEvent extends AbstractEvent<TaskEventType> {
    8 d; a- {  Z5 zprivate String taskID; //Task ID9 z/ ?# Z( _( p9 f( O7 q4 ?
    public TaskEvent(String taskID, TaskEventType type) {
    4 B: }6 x5 @5 d6 j  N3 A) b& b8 J$ msuper(type);9 Y& j4 R( e( m# ?% w% k
    this.taskID = taskID;/ d% G8 }4 M9 k# L% f. p- I% Z' z
    } p2 X9 Q! I2 F" K
    ublic String getTaskID() {& D, ^" U) p+ o" N; z9 N! @% S
    return taskID;- Q& q! G( C4 @4 Q, S6 U; `* O/ e
    } ; P- w3 d( c3 a1 ]+ \: \/ S- C
    中,
    Task事件类型定义如下:- T: l/ T: L) |
    public enum TaskEventType {* A- |, r1 j  p
    T_KILL,
    , c/ O3 F! A. T2 a: B& u- }# K. f0 D6 a6 B7 QT_SCHEDULE8 X8 C, }7 _% R% X, U. F; w/ X3 A
    }& r" a. ]2 r1 S( R; v6 ~9 O; b# U, P  _' j
    2) 定义Job事件。
    ) e; a, b, S) i0 e
    public class JobEvent extends AbstractEvent<JobEventType> {
    ' D: Z8 I' H. fprivate String jobID;
    1 Q! |8 L3 _9 X% \& _public JobEvent(String jobID, JobEventType type) {( `* B) s9 t9 h" h2 F
    super(type);
    8 P+ v3 `) d1 B1 c7 othis.jobID = jobID;
    . k4 Z; j' K# W  `  X} p, \- P0 x; Y3 c5 Y  w( V; {
    ublic String getJobId() {& u' N4 r) ^4 o. W9 E) h
    return jobID;
    . B  O9 X/ R; J# {  {: v}
    , P, Q- M6 L* I% }: l1 P2 q" x}& a: d6 H: `1 c  N, p8 `
    其中, Job事件类型定义如下:4 m% k/ x# r" x
    public enum JobEventType {
    1 i8 _; c" h) G  ^( EJOB_KILL,
    ' y3 |. B6 `3 Q4 |3 Z, {! FJOB_INIT,
    % z2 z3 }) W6 ^. f. D2 Z5 YJOB_START$ w2 U5 e5 e% ?0 ]! Z# i9 _
    }; A  l! ^1 f% P5 p2 X
    3) 事件调度器。
    ' h: I* H! B0 L( u* z* U接下来定义一个中央异步调度器, 它接收
    JobTask两种类型事件, 并交给对应的事件处理器处理, 代码如下:
      J- f, y7 K7 g8 g2 f. l, h@SuppressWarnings("unchecked")
    1 G, \; l# x1 d: }public class SimpleMRAppMaster extends CompositeService {
    8 u9 H$ u2 I- b' p6 q  v9 R3 x. b; Aprivate Dispatcher dispatcher; //中央异步调度器
    8 h0 p4 o6 w1 r% v
    private String jobID;
    ! w3 T2 |5 J3 U& Y: [private int taskNumber; //该作业包含的任务数目0 h) q1 W4 X+ b
    private String[] taskIDs; //该作业内部包含的所有任务
    * y- z9 \9 H6 S) E. c( ]& l
    public SimpleMRAppMaster(String name, String jobID, int taskNumber) {
    ( o3 T* P* ?1 \6 P1 i, C, I0 Fsuper(name);
      v" |: t* ]7 Fthis.jobID = jobID;  \" E% g/ D4 L0 A, M4 _
    this.taskNumber = taskNumber;
    8 Q! M# a* Y6 f3 B5 e3 f. x# FtaskIDs = new String[taskNumber];" m% w* F/ @2 ?8 ]: r" A6 m7 M% F
    for(int i = 0; i < taskNumber; i++) {
    ( l5 O. E) B) T! y' w4 LtaskIDs = new String(jobID + "_task_" + i);
    6 U" R8 j% R. x& c5 a5 m$ v}  s) W. M& [  P9 K' t9 A9 U
    } p6 A' r. S6 h: Z* T" B/ ~) z+ l
    ublic void serviceInit(final Configuration conf) throws Exception {" U( _7 S+ f. c5 A/ i1 u
    dispatcher = new AsyncDispatcher();//定义一个中央异步调度器6 w* G$ X0 }( d% S
    //分别注册JobTask事件调度器" A8 w) r7 S- t4 [. C4 K0 ]7 U
    dispatcher.register(JobEventType.class, new JobEventDispatcher());
    5 c" l  m( L6 G; {& ydispatcher.register(TaskEventType.class, new TaskEventDispatcher());6 a2 z/ V0 k  ~
    addService((Service) dispatcher);! c7 y4 U) f+ y# g
    super.serviceInit(conf);  ]4 U/ u# c+ V
    } p9 K/ l3 F" C) C2 }2 h% U" s* Q
    ublic Dispatcher getDispatcher() {: i# d' q& z4 M  O5 V
    return dispatcher;' S8 @4 B& ]3 I$ z
    } p
    5 @5 D9 N" n" b8 T" c- \rivate class JobEventDispatcher implements EventHandler<JobEvent> {* _* b: ]0 g/ z) c
    @Override" q9 {/ p  {  E7 g, K
    public void handle(JobEvent event) {
    8 k3 F' l" P9 k) B! C) Zif(event.getType() == JobEventType.JOB_KILL) {
    & a( O; \- W& e1 A/ ?; QSystem.out.println("Receive JOB_KILL event, killing all the tasks");* B; s$ M/ |/ l' b( |
    for(int i = 0; i < taskNumber; i++) {
    & g! j; ^1 z9 X. i8 G: Udispatcher.getEventHandler().handle(new TaskEvent(taskIDs,5 L4 h7 Y9 P8 t6 j- e7 F9 d1 p0 O
    TaskEventType.T_KILL));  s; q: ~3 M/ R
    }
    ) Q/ g) F' X7 U9 j0 f} else if(event.getType() == JobEventType.JOB_INIT) {
    ) Y- b4 I' [4 b2 jSystem.out.println("Receive JOB_INIT event, scheduling tasks");
    3 l- J, `, J* x4 W; nfor(int i = 0; i < taskNumber; i++) {; e- Y# j, i6 ?% ?  G! L9 n
    dispatcher.getEventHandler().handle(new TaskEvent(taskIDs,6 `# d; Y( |* Z4 Q" [
    TaskEventType.T_SCHEDULE));  k* ^" [- L" K$ a
    }& I  l+ z! w  \4 }7 w- e1 o+ M
    }
    9 P( h) f/ T$ S/ q, w4 r& y0 p}
    % S" m: d- ?0 f2 z% S/ Q: z}p
      @, P/ b  P& Q5 A7 Nrivate class TaskEventDispatcher implements EventHandler<TaskEvent> {0 B: `- [. U- Y( P$ [% p" x
    @Override6 [4 K7 |- V+ H
    public void handle(TaskEvent event) {
    1 ]& r0 k, J* z' d1 m( {/ U; k9 fif(event.getType() == TaskEventType.T_KILL) {: c8 y" _6 o& W/ |2 s
    System.out.println("Receive T_KILL event of task " + event.getTaskID());; M* ?7 e' s0 Q3 t6 X! s9 x$ Q
    } else if(event.getType() == TaskEventType.T_SCHEDULE) {
    . r; I# J* s4 i" W( ], JSystem.out.println("Receive T_SCHEDULE event of task " + event.getTaskID());
    ) ?; l$ J# l! w0 A/ w0 o& `}+ B, X. a! K9 b' w7 s. {) C0 J
    }* \9 w8 x" N9 K3 \" T
    }
    + {- P8 O: O  d6 R0 F+ q}
      j) ~5 R. x# Y" ?4) 测试程序。2 \- h% q9 \5 s. l1 t7 @) v( v
    @SuppressWarnings("unchecked")
    * ~8 P8 I- u0 A; O9 ~7 Npublic class SimpleMRAppMasterTest {
    * ~- i# v6 ]( L, s3 b& Cpublic static void main(String[] args) throws Exception {
    1 b. J5 S, z* P" ]% r6 yString jobID = "job_20131215_12";, K: s9 y' s" k8 T
    SimpleMRAppMaster appMaster = new SimpleMRAppMaster("Simple MRAppMaster", jobID, 5);& `  \/ W/ K4 N& U$ R7 y. e
    YarnConfiguration conf = new YarnConfiguration(new Configuration());
    - g! l  L" G- VappMaster.serviceInit(conf);! J3 q- C2 d6 o; Y" F. O# a
    appMaster.serviceStart();
    3 b/ C( y3 |  x, e& M1 f! WappMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID,) F  }8 G- g* P" {# {
    JobEventType.JOB_KILL));
    6 B/ w! J/ g) m! x* \appMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID,. n4 M1 J$ S" P) t
    JobEventType.JOB_INIT));
    0 h( f" {* ~8 [4 j# A}
    ; B, L- _  m3 q3 Z6 j2 W5 \; O3.4.4 事件驱动带来的变化  [+ |3 P6 K# j0 O* x# n4 U9 p
    MRv1中, 对象之间的作用关系是基于函数调用实现的, 当一个对象向另外一个对象传递信息时, 会直接采用函数调用的1 P6 f" H0 S6 f# e
    方式, 且整个过程是串行的。 比如, 当
    TaskTracker需要执行一个Task时, 将首先下载Task依赖的文件( JAR包、 二进制文件等、
    % c$ [' x4 F# A字典文件等) 、 然后执行
    Task。 同时在整个过程中会记录一些关键日志, 该过程可用图3-16描述。 在整个过程中, 下载依赖文件
    ! P( S0 z" E2 {0 |& F7 @! U# m是阻塞式的, 也就是说, 前一个任务未完成文件下载之前, 后一个新任务将一直处于等待状态, 只有在下载完成后, 才会启动一/ t, ]& C! _7 U: ]
    个独立进程运行该任务。 尽管后来
    MRv1通过启动过独立线程下载文件 解决了该问题 [15] , 但这种方式不是在大系统中彻底解决
    ' T# \0 Y' p8 u$ m! w+ e问题之道, 必须引入新的编程模型。

    ' U: I6 ~' z* i( ?3-16 基于函数调用的工作流程0 W) \/ ]# \5 a: Z- g& e2 e7 l2 f
    基于函数调用的编程模型是低效的, 它隐含着整个过程是串行、 同步进行的。 相比之下,
    MRv2引入的事件驱动编程模型则
    3 v9 q9 r; i5 {4 x  O5 V是一种更加高效的方式。 在基于事件驱动的编程模型中, 所有对象被抽象成了事件处理器, 而事件处理器之间通过事件相互关/ [4 J, e+ x( ]+ R
    联。 每种事件处理器处理一种类型的事件, 同时根据需要触发另外一种事件, 该过程如图
    3-17所示, 当A需要下载文件时, 只需9 e+ ^/ t; a) X, G' Z
    向中央异步处理器发送一个事件即可( 之后可以继续完成后面的功能而无须等待下载完成) , 该事件会被传递给对应的事件处理- O, R: P. ]! N. u# ?* k
    B, 由B完成具体的下载任务。 一旦B完成下载任务, 便可以通过事件通知A
    : Z. ]" L* H" L
    3-17 基于事件驱动的工作流程
    $ l  X1 c) g$ o- m相比于基于函数调用的编程模型, 这种编程方式具有异步、 并发等特点, 更加高效, 因此更适合大型分布式系统。
    , g$ B4 ~' \& m6 X& E4 ^& Z
    [15] 参见网址https://issues.apache.org/jira/browse/MAPREDUCE-2705  5 x6 N* F9 v" L% ^

    " R8 d% d+ {/ P9 \( ?' ^6 x7 I  w3 \' Y. P9 U$ S
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|Archiver|手机版|小黑屋|Java自学网

    GMT+8, 2025-1-22 12:35 , Processed in 0.168125 second(s), 30 queries .

    Powered by Javazx

    Copyright © 2012-2022, Javazx Cloud.

    快速回复 返回顶部 返回列表