java自学网VIP

Java自学网

用户名  找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 4324|回复: 0

《深入解析YARN架构设计与实现原理》第3章 YARN基础库【3.4】

[复制链接]
  • TA的每日心情
    开心
    2021-5-25 00:00
  • 签到天数: 1917 天

    [LV.Master]出神入化

    2093

    主题

    3751

    帖子

    6万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    66775

    宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2017-4-17 13:58:22 | 显示全部楼层 |阅读模式
    3.4 服务库与事件库
    5 }/ G5 Q( ^$ s* V9 X& h( H9 h本节介绍服务库和事件库。0 T' P" K& Q7 Q4 I) a( R+ ~: H( v
    3.4.1 服务库/ ?+ n% |. f* \4 g# z
    对于生命周期较长的对象, YARN采用了基于服务的对象管理模型对其进行管理, 该模型主要有以下几个特点。
    % ?! E- q3 y4 h! ^$ T: z7 |: r/ O
    ❑将每个被服务化的对象分为4个状态: NOTINITED( 被创建) 、 INITED( 已初始化) 、 STARTED( 已启动) 、
    : x6 A( f7 h' @) [% Y7 G% ]8 y6 U
    STOPPED( 已停止) 。7 ^1 v+ B: H9 d- w. h( m2 x
    ❑任何服务状态变化都可以触发另外一些动作。* u7 q" G* e8 F6 v
    ❑可通过组合的方式对任意服务进行组合, 以便进行统一管理。
    . f3 a- ]7 m# v; p9 J
    YARN中关于服务模型的类图( 位于包org.apache.hadoop.service中) 如图3-13所示。 在这个图中, 我们可以看到, 所有的服务
    , d# P9 _5 c: z; @% U" h4 A对象最终均实现了接口
    Service, 它定义了最基本的服务初始化、 启动、 停止等操作, 而AbstractService类提供了一个最基本的
    " H) t9 C" B' u
    Service实现。 YARN中所有对象, 如果是非组合服务, 直接继承AbstractService类即可, 否则需继承CompositeService。 比如, 对于% l1 C+ N( w1 I
    ResourceManager而言, 它是一个组合服务, 它组合了各种服务对象, 包括ClientRMServiceApplicationMasterLauncher1 s, R. Q; x  |! K7 n0 X- k0 E  B8 T
    ApplicationMasterService等。0 u! Y% k9 W# V+ C2 P' G, ]$ G
    3-13 YARN中服务模型的类图
    ( S8 T2 h5 t7 d4 |# ]
    YARN中, ResourceManagerNodeManager属于组合服务, 它们内部包含多个单一服务和组合服务, 以实现对内部多种服+ H* c( o2 `" A9 S
    务的统一管理。
    2 m6 r/ y! X. b: A' O/ M
    3.4.2 事件库
    ! S. t  k$ M$ I, I/ sYARN采用了基于事件驱动的并发模型, 该模型能够大大增强并发性, 从而提高系统整体性能。 为了构建该模型, YARN
    7 F2 o  Q( R$ c  D( _1 o各种处理逻辑抽象成事件和对应事件调度器, 并将每类事件的处理过程分割成多个步骤, 用有限状态机表示。
    YARN中的事件处3 m) l7 N/ Y1 O' Q8 X4 ^
    理模型可概括为图
    3-14所示。; a" r" z, s7 I2 a! J1 \0 ~! g
    3-14 YARN的事件处理模型
    : N) {  q! z9 {, h( P5 [7 ?整个处理过程大致为: 处理请求会作为事件进入系统, 由中央异步调度器(
    Async-Dispatcher) 负责传递给相应事件调度器- y2 |: W; I9 [' ^4 l7 x
    Event Handler) 。 该事件调度器可能将该事件转发给另外一个事件调度器, 也可能交给一个带有有限状态机的事件处理器, 其( @5 U0 v) ]% U" A
    处理结果也以事件的形式输出给中央异步调度器。 而新的事件会再次被中央异步调度器转发给下一个事件调度器, 直至处理完成
    ( o9 K9 j5 s. E" {3 F5 c( 达到终止条件) 。
    ' M% Q; ]0 F' r! G+ G
    YARN中, 所有核心服务实际上都是一个中央异步调度器, 包括ResourceManagerNodeManager. q& \. U2 T8 l& m  P
    MRAppMasterMapReduce应用程序的ApplicationMaster) 等, 它们维护了事先注册的事件与事件处理器, 并根据接收的事件类型9 O" f3 ]5 s! O; g( [; _
    驱动服务的运行。# Q: B0 `4 u( w
    YARN中事件与事件处理器类的关系( 位于包org.apache.hadoop.yarn.event中) 如图3-15所示。 当使用YARN事件库时, 通常先# ]& R) M. v( Z( U' S  n. N
    要定义一个中央异步调度器
    AsyncDispatcher, 负责事件的处理与转发, 然后根据实际业务需求定义一系列事件Event与事件处理器, F- y; w# y7 _) s# m, ]
    EventHandler, 并注册到中央异步调度器中以实现事件统一管理和调度。 以MRAppMaster为例, 它内部包含一个中央异步调度器
    & U, R0 M2 L/ b0 x2 `" p2 b5 z
    AsyncDispatcher, 并注册了TaskAttemptEvent/TaskAttemptImplTaskEvent/TaskImplJobEvent/JobImpl等一系列事件/事件处理器, 由
    6 ^5 e; C( a  g/ f; v中央异步调度器统一管理和调度。
    # M0 r: |9 g0 I- j2 I' y: X服务化和事件驱动软件设计思想的引入, 使得
    YARN具有低耦合、 高内聚的特点, 各个模块只需完成各自功能, 而模块之间
    7 \/ F3 I& {9 G1 g) S则采用事件联系起来, 系统设计简单且维护方便。

    ' w1 m. u! t* {! p- A/ h3-15 事件与事件处理器
    ) F" [" O2 g  x" @7 u. h  {$ u
    3.4.3 YARN服务库和事件库的使用方法1 E7 _. ~) \  Z# r" R# I
    为了说明YARN服务库和事件库的使用方法, 本小节介绍一个简单的实例, 该实例可看做MapReduce6 E- @5 ~* c% Z% Y7 r
    ApplicationMaster
    MRAppMaster) 的简化版。 该例子涉及任务和作业两种对象的事件以及一个中央异步调度器。 步骤如下。* y( N( M$ b! Q( W: ^
    1) 定义Task事件。, o% h$ ?# \8 y( T
    public class TaskEvent extends AbstractEvent<TaskEventType> {5 x' d# Z. C1 J% W
    private String taskID; //Task ID
    1 w) w6 i. `1 u0 ]public TaskEvent(String taskID, TaskEventType type) {
    6 w4 c' Z3 B' x$ X( ?1 Y  [; ]1 Osuper(type);
    * g& N% h6 _7 p, @. Ithis.taskID = taskID;
    7 E' A& B# d& G% K: d0 O9 l% F3 `} p
    6 K5 }& q* c# y/ h# wublic String getTaskID() {! d# ]3 n2 c% D% ^
    return taskID;
    % u5 a% i! Y  }9 H& R5 D2 Q1 G) q} ) J3 I/ ~, N$ t+ ]0 m# ?( L& a
    中,
    Task事件类型定义如下:& \) w8 [. ~. o
    public enum TaskEventType {
    ; f1 u$ {# \; {T_KILL,
    & i  r/ `7 F8 H( sT_SCHEDULE
    , W9 ?/ ]) N+ s: d2 Y& I}& Y1 t% A3 h' V4 v' `1 u4 v9 V
    2) 定义Job事件。) v2 t9 @& g  I/ T  m
    public class JobEvent extends AbstractEvent<JobEventType> {
    " y9 W: T, j( d* n0 C5 w2 jprivate String jobID;
    # S4 o9 Z( j  G/ @8 L) xpublic JobEvent(String jobID, JobEventType type) {1 P) \! U1 n8 H
    super(type);
    : _( W4 @  G2 d  X- hthis.jobID = jobID;; P( b% q0 ]* [# L8 a8 E
    } p
    ) L: ~% L! X' h' w% L! Sublic String getJobId() {
    - j# {9 L2 }; }return jobID;
    5 O0 i3 ]4 T" f( V}  l; o) H* a9 s
    }$ z+ t+ t+ G2 |+ Z" D% B" J7 N  c
    其中, Job事件类型定义如下:
      O1 K" a# q3 }5 G) m4 g3 l( ~# G
    public enum JobEventType {
    , A( y8 Z! J& I, B1 B3 EJOB_KILL,
    . Y: ^+ [! f0 k* @! y0 mJOB_INIT,
    ! S. H; u0 G( {2 H2 M5 ZJOB_START6 J2 ^: l2 {, I) j' z
    }, [7 ]; Q% i9 Y" I
    3) 事件调度器。8 @( ]) _- w1 E# S
    接下来定义一个中央异步调度器, 它接收
    JobTask两种类型事件, 并交给对应的事件处理器处理, 代码如下:
    , d7 Y8 ~/ _( X@SuppressWarnings("unchecked")
    " D8 V" c. _9 ]public class SimpleMRAppMaster extends CompositeService {
    " V; s: K* ^  R* F7 b5 `private Dispatcher dispatcher; //中央异步调度器
    / E# f- k& K: a7 i! j
    private String jobID;5 O# `7 u, q2 Q
    private int taskNumber; //该作业包含的任务数目9 F0 ^% x- Y6 o
    private String[] taskIDs; //该作业内部包含的所有任务/ k9 ~  o( V3 h5 i
    public SimpleMRAppMaster(String name, String jobID, int taskNumber) {/ F+ K( X5 m. _/ _+ r) q
    super(name);8 R( U2 a3 o# ^7 d
    this.jobID = jobID;6 {* R1 B. S& N* L. k% A- `. t; P
    this.taskNumber = taskNumber;) W8 J) \% r7 u6 u
    taskIDs = new String[taskNumber];
    + z1 I1 `- n4 U" t* K) V2 F% ufor(int i = 0; i < taskNumber; i++) {
    , ]1 f# P% G; v* n* x9 ^taskIDs = new String(jobID + "_task_" + i);& ~: v+ S- V' l8 W" O
    }% o; f# x# u9 [7 U5 Q7 k2 e) D
    } p
    0 X0 [" }8 j' `- z; wublic void serviceInit(final Configuration conf) throws Exception {8 B6 `, I# Q6 g; ~
    dispatcher = new AsyncDispatcher();//定义一个中央异步调度器
    5 w/ Z+ }/ |2 Q5 P6 t! s+ m
    //分别注册JobTask事件调度器) W: C" w  h& ^  \$ u
    dispatcher.register(JobEventType.class, new JobEventDispatcher());9 }- o+ }, \3 ^# [( ?2 O
    dispatcher.register(TaskEventType.class, new TaskEventDispatcher());+ T0 i# y1 s8 W( A: a! r' }
    addService((Service) dispatcher);" o. S8 D+ G+ I5 _) k
    super.serviceInit(conf);
    5 |6 u0 N9 j# s& I' v/ d} p, Q) Z' u; A6 y! {8 O
    ublic Dispatcher getDispatcher() {
    ! r! ?4 p9 ^3 }" \return dispatcher;
    6 ^4 j" ^$ i$ j# f/ C} p
    : V% _) z& M  C4 E* f2 wrivate class JobEventDispatcher implements EventHandler<JobEvent> {
    8 A8 r+ _7 w2 M, n- W) X6 }5 n9 ]@Override
    2 [& O' ~/ X$ h- a. @5 s7 lpublic void handle(JobEvent event) {+ r' S2 d6 Z+ V2 ]5 b: D
    if(event.getType() == JobEventType.JOB_KILL) {
    ; V7 O0 k: m& c% x& ASystem.out.println("Receive JOB_KILL event, killing all the tasks");# W: \7 ]; t( h, v
    for(int i = 0; i < taskNumber; i++) {' V$ I2 }2 w. V; U& P8 R: N0 G
    dispatcher.getEventHandler().handle(new TaskEvent(taskIDs,( V: a5 Q1 ~  g+ L8 W
    TaskEventType.T_KILL));
    4 ?1 l: ]$ k7 \6 w- J9 a}
    1 O! O: K% x" v" J! V} else if(event.getType() == JobEventType.JOB_INIT) {" P9 s5 j. T' i; J& d2 I% _
    System.out.println("Receive JOB_INIT event, scheduling tasks");: S6 @! Y. J, O; h5 L
    for(int i = 0; i < taskNumber; i++) {
    + c" a% ?" @& q& e& a; odispatcher.getEventHandler().handle(new TaskEvent(taskIDs,  ?$ Q2 T, }" H/ g( v/ U
    TaskEventType.T_SCHEDULE));5 }1 B+ U! h5 `7 \9 R- `& l' r
    }' t2 a# Y: F/ R
    }' [" P/ f% T+ ]5 b
    }
    4 L  k3 t2 P( c" M}p
      f7 [  b+ C4 r2 s& N+ F- K  z: crivate class TaskEventDispatcher implements EventHandler<TaskEvent> {
      L9 T: [( P1 F3 x: B& d@Override8 r" n. s, ]5 q
    public void handle(TaskEvent event) {
    $ Q% q# [: a# S* Wif(event.getType() == TaskEventType.T_KILL) {
      {1 U/ _& w' d6 y; I0 K) Z: H5 A2 OSystem.out.println("Receive T_KILL event of task " + event.getTaskID());
    - h! W" j. u4 t4 y  P" P} else if(event.getType() == TaskEventType.T_SCHEDULE) {8 r$ Y& M% s4 ^( O1 k. |
    System.out.println("Receive T_SCHEDULE event of task " + event.getTaskID());0 ~, z# V# C! g# L
    }
    0 K6 b1 W8 \4 V& s7 s! y3 T- T& g}
    3 q4 s4 s) z" j" m) ]) ]}% i$ e) w. G$ a' [
    }5 P* L8 D+ A+ j8 S: {! {5 R
    4) 测试程序。
    " q( q; l2 ]! {  l2 o4 z
    @SuppressWarnings("unchecked")
    ( [" _' b* [) }2 H) zpublic class SimpleMRAppMasterTest {
    . l/ `: G$ P. h" X: w" Xpublic static void main(String[] args) throws Exception {% p; ^: y8 h  W6 M
    String jobID = "job_20131215_12";7 o$ g3 R  ]: H" n' S1 A) n
    SimpleMRAppMaster appMaster = new SimpleMRAppMaster("Simple MRAppMaster", jobID, 5);  M: b, f. V( s- u: g  x
    YarnConfiguration conf = new YarnConfiguration(new Configuration());' u8 V9 t% R6 X0 {3 I
    appMaster.serviceInit(conf);0 @7 G7 u+ v" ?/ Q
    appMaster.serviceStart();! H( S# E* H4 i
    appMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID,
    8 c7 ~6 v; s' N4 CJobEventType.JOB_KILL));
    / k+ d/ G  k6 {1 n* ^appMaster.getDispatcher().getEventHandler().handle(new JobEvent(jobID,
    - ?7 T5 \0 a0 e; h  m# uJobEventType.JOB_INIT));
    1 `' w7 n  x8 R: z: m$ Q}: ?& t+ P* O! W3 x6 i. _
    3.4.4 事件驱动带来的变化  v. W5 z& ^0 d- s. p* y7 `$ h
    MRv1中, 对象之间的作用关系是基于函数调用实现的, 当一个对象向另外一个对象传递信息时, 会直接采用函数调用的
    0 K9 c# O$ Q0 A9 C, a$ }5 V' B方式, 且整个过程是串行的。 比如, 当
    TaskTracker需要执行一个Task时, 将首先下载Task依赖的文件( JAR包、 二进制文件等、6 y/ O7 k4 ~9 _8 C
    字典文件等) 、 然后执行
    Task。 同时在整个过程中会记录一些关键日志, 该过程可用图3-16描述。 在整个过程中, 下载依赖文件
    3 W/ t7 w( B; J是阻塞式的, 也就是说, 前一个任务未完成文件下载之前, 后一个新任务将一直处于等待状态, 只有在下载完成后, 才会启动一5 O, r! o  K. G1 n7 [
    个独立进程运行该任务。 尽管后来
    MRv1通过启动过独立线程下载文件 解决了该问题 [15] , 但这种方式不是在大系统中彻底解决+ S* J) ]; |) I$ m
    问题之道, 必须引入新的编程模型。

    2 \0 g$ E8 I6 T  r6 f3-16 基于函数调用的工作流程
    + [- @0 O- E5 I/ w  N3 U基于函数调用的编程模型是低效的, 它隐含着整个过程是串行、 同步进行的。 相比之下,
    MRv2引入的事件驱动编程模型则
    : |6 B' Z% L9 n+ `7 {& N, O是一种更加高效的方式。 在基于事件驱动的编程模型中, 所有对象被抽象成了事件处理器, 而事件处理器之间通过事件相互关
    % m$ r$ U% `2 M. y. p8 s7 `' [联。 每种事件处理器处理一种类型的事件, 同时根据需要触发另外一种事件, 该过程如图
    3-17所示, 当A需要下载文件时, 只需
    3 s! F& G& b# |0 e6 B向中央异步处理器发送一个事件即可( 之后可以继续完成后面的功能而无须等待下载完成) , 该事件会被传递给对应的事件处理+ e3 ]* q4 _  p! x( U1 e
    B, 由B完成具体的下载任务。 一旦B完成下载任务, 便可以通过事件通知A) m% Z0 t" @- g2 `7 z4 h7 U- G
    3-17 基于事件驱动的工作流程
    8 K: j3 S' ]' Z! T0 U相比于基于函数调用的编程模型, 这种编程方式具有异步、 并发等特点, 更加高效, 因此更适合大型分布式系统。9 A; s8 c3 o8 F' W
    [15] 参见网址https://issues.apache.org/jira/browse/MAPREDUCE-2705  
    9 P' J3 |! ~  T* ~: r- S6 k7 H
    + x6 q% p; L( x  ?' D  n* m/ E/ s& p* x) E, H8 O$ V3 p
    回复

    举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|Archiver|手机版|小黑屋|Java自学网

    GMT+8, 2025-3-30 07:56 , Processed in 0.210007 second(s), 33 queries .

    Powered by Javazx

    Copyright © 2012-2022, Javazx Cloud.

    快速回复 返回顶部 返回列表