java自学网VIP

Java自学网

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 2957|回复: 0

《深入解析YARN架构设计与实现原理》第3章 YARN基础库【3.3】part2

[复制链接]
  • TA的每日心情
    开心
    2021-5-25 00:00
  • 签到天数: 1917 天

    [LV.Master]出神入化

    2025

    主题

    3683

    帖子

    6万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    66345

    宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2017-4-17 13:56:45 | 显示全部楼层 |阅读模式
    3.3.5 Hadoop RPC类详解3 a4 A3 i0 S( b( S  A. E6 I6 R
    Hadoop RPC主要由三个大类组成, 即RPCClientServer, 分别对应对外编程接口、 客户端实现和服务器实现。
    : x: N& C# Q7 }* ?
    1.ipc.RPC类分析
    $ j' f# G9 I2 n# ?
    RPC类实际上是对底层客户机服务器网络模型的封装, 以便为程序员提供一套更方便简洁的编程接口。6 X$ z. V( q. E0 g) X& C1 w
    如图
    3-4所示, RPC类定义了一系列构建和销毁RPC客户端的方法, 构建方法分为getProxywaitForProxy两类, 销毁方只有一# X3 j  ?- }/ S9 |
    个, 即为
    stopProxyRPC服务器的构建则由静态内部类RPC.Builder, 该类提供了一些列setXxx方法( Xxx为某个参数名称) 供用户( \3 C1 E+ @7 D# n6 M8 ~
    设置一些基本的参数, 比如
    RPC协议、 RPC协议实现对象、 服务器绑定地址、 端口号等, 一旦设置完成这些参数后, 可通过调用8 t7 X5 X) r0 i4 q- |
    RPC.Builder.build()完成一个服务器对象的构建, 之后直接调用Server.start()方法便可以启动该服务器。3 |+ \; s1 E5 P2 `
    Hadoop 1.x中的RPC仅支持基于Writable序列化方式不同, Hadoop 2.x允许用户使用其他序列化框架, 比如Protocol Buffers7 t# `6 [5 ]$ W! |, y% B5 {2 G9 x: h5 u% {
    等, 目前提供了WritableWritableRpcEngine) 和Protocol BuffersProtobufRpcEngine) 两种, 默认实现是Writable方式, 用户可通过
    , a" g: s8 j5 n4 a9 v( f调用
    RPC.setProtocolEngine(…)修改采用的序列化方式。
    ' J! n  z$ J2 ~: u2 e1 v/ l! V7 L0 i下面以采用
    Writable序列化为例( 采用Protocol Buffers的过程类似) , 介绍Hadoop RPC的远程过程调用流程。 Hadoop RPC使用
    4 w* n( Q3 m# O- O" k- S0 l  C
    Java动态代理完成对远程方法的调用: 用户只需实现java.lang.reflect.InvocationHandler接口, 并按照自己需求实现invoke 方法即可' C+ V! k9 E6 @, t
    完成动态代理类对象上的方法调用。 但对于
    Hadoop RPC, 函数调用由客户端发出, 并在服务器端执行并返回, 因此不能像单机
    - Q  M0 r+ D4 _- c- N+ T程序那样直接在
    invoke 方法中本地调用相关函数, 它的做法是, 在invoke方法中, 将函数调用信息( 函数名, 函数参数列表等)* |1 Y0 P% p+ I; F$ W+ I
    打包成可序列化的
    WritableRpcEngine.Invocation对象, 并通过网络发送给服务器端, 服务端收到该调用信息后, 解析出和函数名,
    ( R& I6 u, B6 C* H6 r6 e9 T/ g函数参数列表等信息, 利用
    Java反射机制完成函数调用, 期间涉及到的类关系如下图所示。4 V( [, B  ]& j/ g1 m
    3-4 HadoopRPC的主要类关系图# z  z2 b1 T3 d" n7 J* x: X, _
    3-5 HadoopRPC中服务器端动态代理实现类图
    4 r/ y+ D; `6 h% P/ p
    2.ipc.Client
    ; [  F. n; n- L3 Y- D0 h  ]
    Client主要完成的功能是发送远程过程调用信息并接收执行结果。 它涉及到的类关系如图3-6所示。 Client类对外提供了一类执8 D* s/ k$ T8 }4 }8 v  ?1 a+ K
    行远程调用的接口, 这些接口的名称一样, 仅仅是参数列表不同, 比如其中一个的声明如下所示:
    / |+ n% b5 C+ ^1 \) D
    public Writable call(Writable param, ConnectionIdremoteId)
    ) ^8 m6 ?/ F, B- zthrows InterruptedException, IOException;
    4 l1 X9 R! w6 [8 T, W! C2 I) _! K7 U3-6 Client类图& G* O! P2 V0 j9 T  w7 c# X( o
    Client内部有两个重要的内部类, 分别是CallConnection+ ~7 e" h/ N" t7 r" H& w
    ❑Call: 封装了一个RPC请求, 它包含5个成员变量, 分别是唯一标识id、 函数调用信息param、 函数执行返回值value、 出! X/ |" i& x5 N1 \: d
    错或者异常信息
    error和执行完成标识符done。 由于Hadoop RPC Server采用异步方式处理客户端请求, 这使远程过程调用的发生顺
    9 G5 M+ s. M3 j/ ]+ P序与结果返回顺序无直接关系, 而
    Client端正是通过id识别不同的函数调用的。 当客户端向服务器端发送请求时, 只需填充id" D5 J0 m1 _5 l4 |7 z4 F0 b% @
    param两个变量, 而剩下的3个变量( valueerrordone) 则由服务器端根据函数执行情况填充。; O. |: T1 _. ^6 u! X, F" W' s
    ❑ConnectionClient与每个Server之间维护一个通信连接, 与该连接相关的基本信息及操作被封装到Connection类中, 基本) ~; b  C! B& t3 O
    信息主要包括通信连接唯一标识(
    remoteId) 、 与Server端通信的Socketsocket) 、 网络输入数据流( in) 、 网络输出数据流& d% n1 X2 }* S1 L  h
    out) 、 保存RPC请求的哈希表( calls) 等。 操作则包括:
    , `0 P! h4 \: m& {6 z4 X( I0 x
    ❍addCall—将一个Call对象添加到哈希表中;- A* X& C6 l1 B
    ❍sendParam—向服务器端发送RPC请求;
    $ z& m) g9 w/ Z5 Q- b
    ❍receiveResponse —从服务器端接收已经处理完成的RPC请求;
    $ ?( d' {  D. m; [
    ❍run—Connection是一个线程类, 它的run方法调用了receiveResponse方法, 会一直等待接收RPC返回结果。
    : ~0 o% _6 W! N: _8 R. H当调用
    call函数执行某个远程方法时, Client端需要进行( 如图3-7所示) 以下4个步骤。. y+ B# x' W/ c; W- l2 f% |
    1) 创建一个Connection对象, 并将远程方法调用信息封装成Call对象, 放到Connection对象中的哈希表中;
    8 y$ Y) @4 B& A" J. B
    2) 调用Connection类中的sendRpcRequest()方法将当前Call对象发送给Server端;7 x9 i* S: t' ]5 a& `4 Z" G
    3Server端处理完RPC请求后, 将结果通过网络返回给Client端, Client端通过receiveRpcResponse()函数获取结果;
    , A. G; E; d9 x4Client检查结果处理状态( 成功还是失败) , 并将对应Call对象从哈希表中删除。" `! O# t7 k! }4 v6 |9 I" O+ f4 n: z
    3-7 Hadoop RPC Client处理流程9 z% q+ w; k) V9 y5 F, ?) \0 ~
    3.ipc.Server类分析
    6 M1 m" ?6 C" m& n+ |/ K. W0 j
    Hadoop采用了Master/Slave结构, 其中Master是整个系统的单点, 如NameNodeJobTracker [8] , 这是制约系统性能和可扩展
    & z2 X( P! q$ C9 r  Z/ C3 p5 j性的最关键因素之一; 而
    Master通过ipc.Server接收并处理所有Slave发送的请求, 这就要求ipc.Server 将高并发和可扩展性作为设计
    * k! H1 x5 _9 D! b4 p目标。 为此,
    ipc.Server采用了很多提高并发处理能力的技术, 主要包括线程池、 事件驱动和Reactor设计模式等, 这些技术均采用3 ~7 I: l& f) c0 p
    JDK自带的库实现, 这里重点分析它是如何利用Reactor设计模式提高整体性能的。4 U! ~5 M: N+ p( M6 B+ N
    Reactor是并发编程中的一种基于事件驱动的设计模式, 它具有以下两个特点: 通过派发/分离I/O操作事件提高系统的并发性
    - B' Z7 A% {# h+ H; s能; 提供了粗粒度的并发控制, 使用单线程实现, 避免了复杂的同步处理。 典型的
    Reactor实现原理如图3-8所示。
    - i( `9 s4 s( E6 P$ i
    3-8 Reactor模式工作原理+ t% R+ G- d# o- ^0 |; O
    典型的
    Reactor模式中主要包括以下几个角色。- m+ q- A1 E# s( d
    ❑ReactorI/O事件的派发者。
    ( Q; c9 d$ Z; m! ]" Y7 w) p9 N; u
    ❑Acceptor接受来自Client的连接, 建立与Client对应的Handler, 并向Reactor注册此Handler8 F/ i) U9 S0 r; s( z
    ❑Handler与一个Client通信的实体, 并按一定的过程实现业务的处理。 Handler内部往往会有更进一步的层次划分, 用来抽' r. U" G* X# m) J& t9 u) b& E1 _
    象诸如
    readdecodecomputeencodesend等过程。 在Reactor模式中, 业务逻辑被分散的I/O事件所打破, 所以Handler需要有适/ A3 n6 n. k% j( D! }
    当的机制在所需的信息还不全( 读到一半) 的时候保存上下文, 并在下一次
    I/O事件到来的时候( 另一半可读) 能继续上次中断
    ( x" P! O1 S+ \; @5 t的处理。
      ^! \3 i% ]9 h3 m( g* x
    ❑Reader/Sender为了加速处理速度, Reactor模式往往构建一个存放数据处理线程的线程池, 这样数据读出后, 立即扔到线  a7 w% O% v: |* s6 d9 j! e" m
    程池中等待后续处理即可。 为此,
    Reactor模式一般分离Handler中的读和写两个过程, 分别注册成单独的读事件和写事件, 并由对  `/ d4 h6 l+ j2 E
    应的
    ReaderSender线程处理。( I( A$ M3 N$ }% R$ J0 i  m2 I
    ip2 X6 c6 v: L9 `+ B
    c.Server
    实际上实现了一个典型的Reactor设计模式, 其整体架构与上述完全一致。 一旦读者了解典型Reactor架构便可很容易5 M% Y& w6 q& n% _
    地学习
    ipc.Server的设计思路及实现。 接下来, 我们分析ipc.Server的实现细节。
    % B+ I. s+ A; A前面提到,
    ipc.Server的主要功能是接收来自客户端的RPC请求, 经过调用相应的函数获取结果后, 返回给对应的客户端。 为1 y7 j1 S" f- w- y8 z2 c
    此,
    ipc.Server被划分成3个阶段: 接收请求、 处理请求和返回结果, 如图3-9所示。 各阶段实现细节如下。
    ! o  w: H5 G+ p$ G
    3-9 Hadoop RPC Server处理流程2 p; `: X# m) \# [( M6 q# T
    1) 接收请求
    # ~' ?2 s) {* m( Z/ i; A该阶段主要任务是接收来自各个客户端的
    RPC请求, 并将它们封装成固定的格式( Call类) 放到一个共享队列( callQueue
    7 d6 p' M  H/ e中, 以便进行后续处理。 该阶段内部又分为建立连接和接收请求两个子阶段, 分别由
    ListenerReader两种线程完成。
      @, B5 [# k& A, Y  t整个
    Server只有一个Listener线程, 统一负责监听来自客户端的连接请求, 一旦有新的请求到达, 它会采用轮询的方式从线程
    - }2 {0 W& W* m! Q: C1 ^池中选择一个
    Reader线程进行处理, 而Reader线程可同时存在多个, 它们分别负责接收一部分客户端连接的RPC请求, 至于每个
    5 J; S& |) H& ?: @
    Reader线程负责哪些客户端连接, 完全由Listener决定, 当前Listener只是采用了简单的轮询分配机制。8 H. n! `+ b4 b- b* I. a9 M
    ListenerReader线程内部各自包含一个Selector对象, 分别用于监听SelectionKey.OP_ACCEPTSelectionKey.OP_READ事件。
    " Q) X4 X2 C; ^" {8 Z/ X对于
    Listener线程, 主循环的实现体是监听是否有新的连接请求到达, 并采用轮询策略选择一个Reader线程处理新连接; 对于
    6 B0 q7 k( w( H! ~5 h  R* S4 k
    Reader线程, 主循环的实现体是监听( 它负责的那部分) 客户端连接中是否有新的RPC请求到达, 并将新的RPC请求封装成Call
    ) `  P1 p& D" i' Q9 F9 g象, 放到共享队列
    callQueue中。7 a& u2 i' @8 `" J
    2) 处理请求
    0 c" ~! v: b( w7 w5 V该阶段主要任务是从共享队列
    callQueue中获取Call对象, 执行对应的函数调用, 并将结果返回给客户端, 这全部由Handler线
    & n. i$ s' a! R  Q/ t$ s' d程完成。
    4 w5 m& g8 R& V) a
    Server端可同时存在多个Handler线程, 它们并行从共享队列中读取Call对象, 经执行对应的函数调用后, 将尝试着直接将结果0 L* Z& n) v6 u6 S; u/ w" P* @
    返回给对应的客户端。 但考虑到某些函数调用返回结果很大或者网络速度过慢, 可能难以将结果一次性发送到客户端, 此时. R7 G1 H" H0 O- ]# }0 c1 j! c+ X
    Handler将尝试着将后续发送任务交给Responder线程。5 Y0 w1 r6 ~8 w' C
    3) 返回结果# B. X% u+ b7 ?3 e* K9 x
    前面提到, 每个
    Handler线程执行完函数调用后, 会尝试着将执行结果返回给客户端, 但对于特殊情况, 比如函数调用返回结  {- v: ?0 }3 [5 g
    果过大或者网络异常情况( 网速过慢) , 会将发送任务交给
    Responder线程。
    6 j' J* ~7 v6 {3 Z* x1 N* b* ]: W
    Server端仅存在一个Responder线程, 它的内部包含一个Selector对象, 用于监听SelectionKey.OP_WRITE事件。 当Handler没能将
    8 o1 f* U+ B! j8 q, f6 E结果一次性发送到客户端时, 会向该
    Selector对象注册SelectionKey.OP_WRITE事件, 进而由Responder线程采用异步方式继续发送* ]7 E* L8 [, H5 o# c! E, ~+ x
    未发送完成的结果。1 H" F, D' v; }$ W7 `/ @0 [- Y
    3.3.6 Hadoop RPC参数调优
    , M# r4 E) [4 \" f6 UHadoop RPC对外提供了一些可配置参数, 以便于用户根据业务需求和硬件环境对其进行调优。 主要的配置参数如下。
    - @- U% i2 y) i
    ❑Reader线程数目。 由参数ipc.server.read.threadpool.size配置, 默认是1, 也就是说, 默认情况下, 一个RPC Server只包含一个
      x. G7 c- I% Q# V( r$ ^8 C
    Reader线程。1 F$ f. T+ M; h+ a0 w+ V! {
    ❑每个Handler线程对应的最大Call数目。 由参数ipc.server.handler.queue.size指定, 默认是100, 也就是说, 默认情况下, 每个
    - A0 A6 k  \( N2 v8 k9 G: Y
    Handler线程对应的Call队列长度为100。 比如, 如果Handler数目为10, 则整个Call队列( 即共享队列callQueue) 最大长度为:+ p4 b+ b' W- @0 H) U
    100×10=1000
    1 ?0 g8 ~& R* w8 G+ J& S' G* q
    ❑Handler线程数目。 Hadoop中, ResourceManagerNameNode分别是YARNHDFS两个子系统中的RPC Server, 其对应的  G5 f- T; T1 o: A* a
    Handler数目分别由参数yarn.resourcemanager.resource-tracker.client.thread-countdfs.namenode.service.handler.count指定, 默认值分别为
    : _6 d' A2 X6 D$ Q
    5010, 当集群规模较大时, 这两个参数值会大大影响系统性能。/ L2 x& @2 X+ i
    ❑客户端最大重试次数。 在分布式环境下, 因网络故障或者其他原因迫使客户端重试连接是很常见的, 但尝试次数过多可; m( n7 A! @4 W6 H
    能不利于对实时性要求较高的应用。 客户端最大重试次数由参数
    ipc.client.connect.max.retries指定, 默认值为10, 也就是会连续尝试3 o! r, f3 ]  P" j$ U+ v, R
    10次( 每两次之间相隔1秒) 。5 }0 i% X4 z$ m. E
    3.3.7 YARN RPC实现/ T1 w. Q0 B( X! x" U
    当前存在非常多的开源RPC框架, 比较有名 的有Thrift [9] Protocol BuffersAvro。 同Hadoop RPC一样, 它们均由两部分组
    7 o0 Q: m* m2 u/ r) f1 c4 {8 [: g成: 对象序列化和远程过程调用(
    Protocol Buflers官方仅提供了序列化实现, 未提供远程调用相关实现, 但三 方RPC库非常多 [10]# \  I! U$ ?, f7 M+ d( [# ]. o7 V4 C
    ) 。 相比于Hadoop RPC, 它们有以下几个特点:
    9 Z( ?" w9 o5 s. ~3 W
    ❑跨语言特性 。 前面提到, RPC框架实际上是客户机服务器模型的一个应用实例, 对于Hadoop RPC而言, 由于Hadoop采用. G# J0 F/ \9 b: n
    Java语言编写, 因而其RPC客户端和服务器端仅支持Java语言; 但对于更通用的RPC框架, 如Thrift或者Protocol Buffers等, 其客户
    # Q- N- e7 X' D, v! J端和服务器端可采用任何语言编写, 如
    JavaC++Python等, 这给用户编程带来极大方便。
    9 {7 ?7 T2 X. k7 E3 k
    ❑引入IDL 。 开源RPC框架均提供了一套接口描述语言( Interface Description LanguageIDL) , 它提供一套通用的数据类型,
    9 T1 t7 G# H0 p' d4 b" v8 ^# c并以这些数据类型来定义更为复杂的数据类型和对外服务接口。 一旦用户按照
    IDL定义的语法编写完接口文件后, 可根据实际应
    9 ]$ @8 x- P1 e用需要生成特定编程语言( 如
    JavaC++Python等) 的客户端和服务器端代码。
    ) o4 }: O+ V2 X8 o, F; r
    ❑协议兼容性 。 开源RPC框架在设计上均考虑到了协议兼容性问题, 即当协议格式发生改变时, 比如某个类需要添加或者
    - U' X) N! ]& j2 t) i删除一个成员变量( 字段) 后, 旧版本代码仍然能识别新格式的数据, 也就是说, 具有向后兼容性。# A" X+ G: w% M3 k7 ^: F9 p
    随着
    Hadoop版本的不断演化, 研发人员发现Hadoop RPC在跨语言支持和协议兼容性两个方面存在不足, 具体表现为:
    ) c2 c0 b% k2 V5 S: }3 @
    ❑从长远发展看, Hadoop RPC应允许某些协议的客户端或者服务器端采用其他语言实现, 比如用户希望直接使用C/C++语言0 }; a# _" w6 v8 e" B; z1 X; Z3 a
    读写
    HDFS中的文件, 这就需要有C/C++语言的HDFS客户端。
    ; P7 T- D% ?7 |# c0 w+ F7 u. s; ?1 u
    ❑当前Hadoop版本较多, 而不同版本之间不能通信, 比如0.20.2版本的JobTracker不能与0.21.0版本中的TaskTracker通信, 如
    1 `) D1 }  K' s5 B5 _: G果用户企图这样做, 会抛出
    VersionMismatch异常。
    , _. x' L/ C8 _3 k% }为了解决以上几个问题,
    Hadoop YARNRPC中的序列化部分剥离开, 以便将现有的开源RPC框架集成进来。 经过改进之- z9 g  H8 q2 L9 X' S* ^
    后,
    Hadoop RPC的类关系如图3-10所示, RPC类变成了一个工厂, 它将具体的RPC实现授权给RpcEngine实现类, 而现有的开源
    1 F' k& L6 z2 Y; G/ s; Z. x
    RPC只要实现RpcEngine接口, 便可以集成到Hadoop RPC中。 在该图中, WritableRpcEngine是采用Hadoop自带的序列化框架实现的
    + w4 R3 B0 @& t  b( |
    RPC, 而 AvroRpcEngine [11] ProtobufRpcEngine [12] 分别是开源RPC( 或序列化) 框架Apache AvroProtocol Buffers对应的
    ) u! P" c' U: F& @: M% g
    RpcEngine实现, 用户可通过配置参数rpc.engine.{protocol}以指定协议{protocol}采用的序列化方式。 需要注意的是, 当前实现. G+ u) R4 S, H! T: W
    中,
    Hadoop RPC只是采用了这些开源框架的序列化机制, 底层的函数调用机制仍采用Hadoop自带的。- w, p9 a- v/ y
    YARN提供的对外类是YarnRPC, 用户只需使用该类便可以构建一个基于Hadoop RPC且采用Protocol Buffers序列化框架的通信: M7 O& L/ `! c. ^
    协议。
    YarnRPC相关实现类如图3-11所示。 YarnRPC是一个抽象类, 实际的实现由参数yarn.ipc.rpc.class指定, 默认值是
    4 {! h3 h4 M( d9 t! h; a, w
    org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPCHadoopYarnProtoRPC通过RPC工厂生成器( 工厂设计模式) RpcFactoryProvider
    $ R! R# p: b- X! l9 o成客户端工厂( 由参数
    yarn.ipc.client.factory.class指定, 默认值是org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl) 和服
    4 c0 B. y6 }7 i0 X& ]务器工厂( 由参数
    yarn.ipc.server.factory.class指定, 默认值是org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl) , 以根
    6 }: M$ y. [5 I  ]7 v- a据通信协议的
    Protocol Buffers定义生成客户端对象和服务器对象。
    . k9 w* S; B5 O4 n5 c3-10 Hadoop RPC 集成多种开源RPC 框架: a% q; c% \: W$ ?  d0 j' r
    3-11 YarnRPC 相关类图
    : h  y$ x1 I$ [7 @- O" A  y
    ❑RpcClientFactoryPBImpl根据通信协议接口( 实际上就是一个Java interface) 及Protocol Buffers定义构造RPC客户端句柄, 但& v  J9 y1 v% S# L
    它对通信协议的存放位置和类名命有一定要求。 假设通信协议接口
    Xxx所在Java包名为XxxPackage, 则客户端实现代码必须位于
    3 z( A9 e1 J  _8 t
    JavaXxxPackage.impl.pb.client中( 在接口包名后面增加".impl.pb.client") , 且实现类名为PBClientImplXxx( 在接口名前面增加前
    6 i5 Y, T' Q, M
    "PBClientImpl") 。* @# r+ Z& m& n" ~
    ❑RpcServerFactoryPBImpl根据通信协议接口( 实际上就是一个Java interface) 及Protocol Buffers定义构造RPC服务器句柄: {% q  H( b6 [0 ~& X5 x; ?2 p
    (具体会调用前面节介绍的RPC.Server类) , 但它对通信协议的存放位置和类命名有一定要求。 假设通信协议接口Xxx所在Java# x' K7 [  h  E; U0 B$ v
    名为
    XxxPackage, 则客户端实现代码必须位于JavaXxxPackage.impl.pb.server中(在接口包名后面增加".impl.pb.server") , 且实现
    , Y8 v. i" g$ ~+ G* \: E类名为
    PBServiceImplXxx(在接口名前面增加前缀"PBServiceImpl") 。
    7 b2 P) D% I: V/ _9 I# ^
    Hadoop YARN已将Protocol Buffers作为默认 的序列化机制 [13] (而不是Hadoop自带的Writable) , 这带来的好处主要表现在以9 b0 q9 N0 v) ^( U( t  ]+ J1 |
    下几个方面:
    ; K/ k3 H: `* x  n9 Z& i
    ❑继承了Protocol Buffers的优势 Protocol Buffers已在实践中证明了其高效性、 可扩展性、 紧凑性和跨语言特性。 首先, 它允* {  _5 G  s* l, v9 [3 `
    许在保持向后兼容性的前提下修改协议, 比如为某个定义好的数据格式添加一个新的字段; 其次, 它支持多种语言, 进而方便用
    % A3 y6 n/ V& k) g/ R, p户为某些服务(比如
    HDFSNameNode) 编写 非Java客户端 [14] ; 此外, 实验表明Protocol BuffersHadoop 自带的Writable在性能
    + i" l# Z: b/ t方面有很大提升。4 v' B- A" R6 U4 S0 H' X
    ❑支持升级回滚 Hadoop 2.0已经将NameNode HA方案合并进来, 在该方案中, Name-Node分为ActiveStandby两种角色,
    * ]# M' u; C. v0 _: D其中,
    Active NameNode在当前对外提供服务, 而Standby NameNode则是能够在Active NameNode出现故障时接替它。 采用Protocol( v0 ]1 ^7 s9 ]+ }- M
    Buffers
    序列化机制后, 管理员能够在不停止NameNode对外服务的前提下, 通过主备NameNode之间的切换, 依次对主备1 }  M& V; u$ I' f( Z
    NameNode进行在线升级(不用考虑版本和协议兼容性等问题) 。4 q$ R2 |0 q/ k! V. C, U0 z# D
    3.3.8 YARN RPC应用实例: s, K5 R, ~4 X0 _
    为了进一步说明YARN RPC的使用方法, 本小节给出一个具体的应用实例。
    / X0 B0 u# R3 k. W& w* g& L
    YARN中, ResourceManagerNodeManager之间的通信协议是ResourceTracker, 其中NodeManager是该协议的客户
      n) f6 s" y4 y端,
    ResourceManager是服务端, NodeManager通过该协议中定义的两个RPC函数( registerNodeManagernodeHeartbeat) 向
    - n# I' z' x0 t; `6 S
    ResourceManager注册和周期性发送心跳信息。 ResourceManager(服务器端) 中的相关代码如下:
    7 Y$ b: h. W3 P
    // ResourceTrackerService实现了ResourceTracker通信接口, 并启动RPC Server- h* n& L( A6 o2 t
    public class ResourceTrackerService extends AbstractService implements
    % A$ A' P1 I1 {/ WResourceTracker {
    7 m) M+ R$ I, Q9 u  }* c" Pprivate Server server;
    3 x2 G- l* @+ _8 I5 ?...3 R6 m1 r( A4 v
    protected void serviceStart() throws Exception {
    - P; u0 t8 M* gsuper.serviceStart();
    2 k+ O- ?9 d7 i7 r- L- ?0 n7 |Configuration conf = getConfig();
    . U' N7 `( q5 m" RYarnRPC rpc = YarnRPC.create(conf); //使用YarnRPC
    5 `8 D& H. D0 J7 s+ v- p
    this.server = rpc.getServer(ResourceTracker.class, this, resourceTrackerAddress,3 i$ R3 Z- F6 q7 c6 O0 {( _
    conf, null, conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT,5 a7 ~( B. a9 d  y
    YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT));* E/ `& ]# [, C5 r4 J. \) v7 m/ C
    this.server.start();3 k5 |) C" S% i
    }.# O- T5 H; M% S: F% h
    ..
    - n9 H- W- j% `2 V+ z@Override5 o, n% w. Z7 Q+ _" S5 J/ n5 I
    public RegisterNodeManagerResponse registerNodeManager(' C# s( P% S! B  Z) p
    RegisterNodeManagerRequest request) throws YarnException,
    # y: q, q) B& C$ Z3 z; i' rIOException {, c5 h* _* n1 B# g" X4 V
    //具体实现" i9 ], [+ o* [
    }@
    ) C9 T) I- v9 G; [% Y& NOverride
    1 Y$ O" x. L" ^7 Jpublic NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)$ H' g' i3 Q+ }, W
    throws YarnException, IOException {
    * w5 Q4 A  A' @% g+ P//具体实现' W; v1 L' H1 ^8 y; b+ r5 Z
    }) C0 d8 z6 `% c/ P( t
    }- U# I* S$ F! N- T2 y
    NodeManager(客户端) 中的相关代码如下。, k9 ?2 l; k! T% U
    // 该函数是从YARN源代码中简单修改而来的9 ~/ v* _' w; F
    protected ResourceTracker getRMClient() throws IOException {
      X, E5 q  J2 \2 Q( A2 q* j2 H, PConfiguration conf = getConfig();
    " b& d1 e' g; }* B, XInetSocketAddress rmAddress = getRMAddress(conf, protocol);
    - s9 y7 ^4 g) ~4 V' aRetryPolicy retryPolicy = createRetryPolicy(conf);; j+ _& U# e& }6 |. Z! d. {1 Z% n
    ResourceTracker proxy = RMProxy.<T>getProxy(conf, ResourceTracker.class, rmAddress);. ~9 T6 d/ X4 m% D
    LOG.info("Connecting to ResourceManager at " + rmAddress);4 |4 E; N" }5 K( d0 d
    return (ResourceTracker) RetryProxy.create(protocol, proxy, retryPolicy);
    8 Z! A: K2 p1 T* z! e}.; w' b+ T. S7 c  X2 ]/ F! l
    ..) K: e' j2 b$ V$ i) `
    this.resourceTracker = getRMClient();
    + _2 Z# L2 p0 H- o..." B) S' V4 X0 f! \2 T# ^9 t9 J. e
    RegisterNodeManagerResponse regNMResponse = resourceTracker.registerNodeManager(request);4 @0 j' Z3 g( }. x! ?
    ...9 U4 S3 |3 u& f, G
    response = resourceTracker.nodeHeartbeat(request);
    $ j. @. o) C9 ]7 f1 b* \为了能够让以上代码正常工作, YARN按照以下流程实现各种功能。
    9 p3 w6 C# J9 L3 @步骤1 定义通信协议接口( Java Interface) 。 定义通信协议接口ResourceTracker, 它包含registerNodeManagernodeHeartbeat& r4 w( p0 w. |9 m- ~
    两个函数, 且每个函数包含一个参数和一个返回值, 具体如下:! B- C# ], {) s  r  }
    public interface ResourceTracker {% ~  l2 C+ _( H% j
    public RegisterNodeManagerResponse registerNodeManager(9 L) Y4 [8 H; ~
    RegisterNodeManagerRequest request) throws YarnException, IOException;0 o7 @+ L6 c. o9 |0 Z# ^: _
    public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
    # t+ V  q5 T4 M4 w- p0 j2 [throws YarnException, IOException;5 @( W) p6 L% c* Q1 F% e5 M
    }
    7 n2 P# N6 c1 N& Y步骤2 为通信协议ResourceTracker提供Protocol Buffers定义和Java实现。 前面提到, Protocol Buffers仅提供了序列化框架, 但
    ; Y2 }( I1 z# R& i未提供
    RPC实现, 因此RPC部分需要由用户自己实现, 而YARN则让ResourceTrackerService类实现了ResourceTracker协议, 它的
    8 V5 r  b; g0 r* g' q' ~/ T6 z
    Protocol Buffers定义( 具体见文件ResourceTracker.proto) 如下:& i3 G* c2 {( B1 W+ i: h( `
    option java_package = "org.apache.hadoop.yarn.proto";$ ^, {7 r, G; D. G' w: [4 ?
    option java_outer_classname = "ResourceTracker";! k8 f8 N# q: u, F0 }9 m
    option java_generic_services = true;; `/ V* p8 q! o% u
    option java_generate_equals_and_hash = true;1 E* C9 }8 g5 H7 j4 w
    import "yarn_server_common_service_protos.proto";1 I" O  w1 ^' o+ A. o# A: L0 J/ E
    service ResourceTrackerService {
    5 [, R0 F9 @; T8 d' krpc registerNodeManager(RegisterNodeManagerRequestProto) returns (RegisterNodeManagerResponseProto);
    " x1 A+ g: K. [' R$ x  [; Yrpc nodeHeartbeat(NodeHeartbeatRequestProto) returns (NodeHeartbeatResponseProto);; g  _! z% ?1 _, H) ~+ ]3 X
    }
    1 o3 h' W$ p# p8 `3 t+ m- yResourceTrackerRPC函数实现是由ResourceManager中的ResourceTrackerService完成的。
    + c/ ^5 U6 q: Y% E步骤
    3 RPC函数的参数和返回值提供Protocol Buffers定义。 YARN需要保证每个RPC函数的参数和返回值是采用Protocol* S. W7 j5 `5 @
    Buffers
    定义的, 因此ResourceTracker协议中RegisterNodeManagerRequestRegisterNodeManagerResponseNodeHeartbeatRequest* K+ y9 g3 u$ v- ]! `
    NodeHeartbeatResponse四个参数或者返回值需要使用Protocol Buffers定义, 具体如下( 见yarn_server_common_service_protos.proto" b+ ^' d1 s* ?- q7 f
    件) :
    ) q2 W3 L$ e6 u% g& l6 V+ _
    import "yarn_protos.proto";. m. |  m' m/ B* _8 I) Z2 ]
    import "yarn_server_common_protos.proto";) o6 s& f9 r! i4 s
    message RegisterNodeManagerRequestProto {, t2 o! f8 Z8 ?8 F0 H- C
    optional NodeIdProto node_id = 1;
    7 J2 G" g# `1 Q/ Q7 P; e& C3 Toptional int32 http_port = 3;  O+ p) `: I) H3 G2 F0 x4 M$ @
    optional ResourceProto resource = 4;
    ; u1 m. H: c4 Q- T/ l7 `0 ]2 V5 p% o} m
    $ d( Q& g/ W; ~essage RegisterNodeManagerResponseProto {6 P7 w; d8 O8 N, j4 S1 u$ N9 j" A
    optional MasterKeyProto container_token_master_key = 1;0 d: [! k  t  c5 ~, l+ s* D
    optional MasterKeyProto nm_token_master_key = 2;
    , S) Z" e" B: w& roptional NodeActionProto nodeAction = 3;
    ( F- z7 P$ S; \% c/ foptional int64 rm_identifier = 4;8 d, I5 i, a; A9 J0 [
    optional string diagnostics_message = 5;
    ! ]: W: e# H) T4 T% N}.' K) C4 v" a; g3 ^
    .. //其他几个参数和返回值的定义
    0 y$ p* b' a% F8 @) f9 K1 H$ g
    步骤4 RPC函数的参数和返回值提供Java定义和封装。 YARN采用了Protocol Buffers作为参数和返回值的序列化框架, 且以
    % u+ K3 u* y0 C4 k# w原生态
    .proto文件的方式给出了定义, 而具体的Java代码生成需在代码编写之后完成。 基于以上考虑, 为了更容易使用Protocol
    ! l- h; r0 r: x5 |. p) GBuffers
    生成的( Java语言) 参数和返回值定义, YARN RPC为每个RPC函数的参数和返回值提供Java定义和封装, 以参数
    " ]0 q  U7 B8 \& I' s
    RegisterNodeManagerRequest为例进行说明。% n* v1 _# w% c$ c! U2 T
    Java接口定义如下( 见Javaorg.apache.hadoop.yarn.server.api.protocolrecords) :) n) S' n7 I6 I% _2 w
    public interface RegisterNodeManagerRequest {5 d$ f6 W/ Y; |, x5 M7 ~# L
    NodeId getNodeId();
    . A+ I( w) x! e6 w3 y1 B4 V6 u  Oint getHttpPort();. ?0 w$ Y/ Z& m! Y5 i+ J8 f% f! z
    Resource getResource();2 _* U, z6 I! R( |# t$ @
    void setNodeId(NodeId nodeId);
    # J. U1 t5 Q, ^# F& P# |; ?void setHttpPort(int port);
    9 }3 m4 Q% [+ K  H9 gvoid setResource(Resource resource);
    4 k+ c- e( O) {8 E  B}
    + b4 {9 d. a; |& Y9 }: oJava封装如下( 见Javaorg.apache.hadoop.yarn.server.api.protocolrecords.impl.pb) :8 a1 H  ?  e# j' p) X
    public class RegisterNodeManagerRequestPBImpl extends
    - B; V& O+ L  R, T. x& hProtoBase<RegisterNodeManagerRequestProto> implements RegisterNodeManagerRequest {
    # c1 o, [' d! A) q& TRegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance();( }' d9 I; K2 b0 L9 |3 X
    RegisterNodeManagerRequestProto.Builder builder = null;- J6 [' {9 O) @$ [6 ?5 q
    private NodeId nodeId = null;
    # K* T8 y) h* S& t- P1 N7 h...$ A: G7 w$ d  E5 \4 I- D9 q. |
    @Override3 V# h! L8 ?: Y  }7 i; F
    public NodeId getNodeId() {
    ; E1 J/ m( Q$ C' \2 \. tRegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;. O  }4 B( |! U; U
    if (this.nodeId != null) {  I% C6 }+ `. p1 A! ]
    return this.nodeId;
    3 W4 o+ O; ~9 Z}i/ c' V% ^( t, p2 h+ A) j/ s$ ]: ?
    f (!p.hasNodeId()) {
    4 Y. d2 Y0 t) Freturn null;
      m+ W1 h4 K6 M% y} t
    $ E' d& z6 X, q$ ~: v4 i6 T4 mhis.nodeId = convertFromProtoFormat(p.getNodeId());
    4 l+ e" ~8 j  P4 |2 z. @1 Y) `& vreturn this.nodeId;
    7 |( z+ P: H* p7 q3 \% P& |} @
    . S3 [( D; w3 t( a/ tOverride
    4 A4 t( [* U8 N; w$ Z* Upublic void setNodeId(NodeId nodeId) {
    9 c% }: Q5 w0 h- SmaybeInitBuilder();( g  A. `3 Z; [5 j' I
    if (nodeId == null)
    - q2 N: \: [# B: |% A  cbuilder.clearNodeId();
    ) |) J+ I( N& Q% g: F2 xthis.nodeId = nodeId;  K2 r( ~2 r& T, j
    } .
    2 K/ p( {1 G  n5 h* l5 C5 A8 s..# F& Z4 o5 Y7 [7 u
    }
    & {5 Q1 ^" e4 r0 o  G- i' }5 s步骤5 为通信协议提供客户端和服务器端实现。 客户端代码放在org.apache.hadoop.yarn.server.api.impl.pb.client包中, 且类名为3 }- N' R/ j! j
    ResourceTrackerPBClientImpl, 实现如下:5 w# I8 ?4 `( h3 W% z& E
    public class ResourceTrackerPBClientImpl implements ResourceTracker, Closeable {3 H* U2 Z- r; q$ u3 Z1 _4 P
    private ResourceTrackerPB proxy;
    , Z+ @- Z# {: X8 ~; F6 K- @public ResourceTrackerPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException {% y) X$ ]  @  l; o5 J
    RPC.setProtocolEngine(conf, ResourceTrackerPB.class, ProtobufRpcEngine.class);
    / |$ F- S" z  L& g! [* b5 Qproxy = (ResourceTrackerPB)RPC.getProxy(6 r3 M! K5 R+ g' E4 {
    ResourceTrackerPB.class, clientVersion, addr, conf);0 B0 H- |8 Z. @9 D% Y0 ^
    } @
    " H1 ?7 L5 Z4 H8 M7 \Override# T9 h* d$ H+ q' y0 _& c
    public RegisterNodeManagerResponse registerNodeManager(
    % S& M9 w9 m; W2 K9 O8 ]RegisterNodeManagerRequest request) throws YarnException,$ s# A6 e# q- H
    IOException {% l  v' I5 }. @9 O  l: C! A
    RegisterNodeManagerRequestProto requestProto = ((RegisterNodeManagerRequestPBImpl)request).getProto();/ w+ H7 `) D5 Z
    try {9 u% a3 c/ e8 |" }6 K
    return new RegisterNodeManagerResponsePBImpl(proxy.registerNodeManager (null, requestProto));- J6 j. t# V1 T0 Y& a' I
    } catch (ServiceException e) {0 S! W+ Q6 s! \+ C1 j4 F
    RPCUtil.unwrapAndThrowException(e);' E9 U7 `& t& e' R0 K7 O
    return null;
    $ x/ K* @; C3 ]1 M5 G1 U+ s  ^}0 u9 q' E3 Z1 G
    } .
    - `% L) h  g5 t..
    8 X! L2 k+ J  F, s6 }( e9 i}3 h& C) t: _& d! \: f+ q
    服务端代码放在org.apache.hadoop.yarn.server.api.impl.pb.server包中, 且类名为Resource-TrackerPBServerImpl, 实现如下:
    , e2 E- Z# X( v4 j! Y; |
    public class ResourceTrackerPBServiceImpl implements ResourceTrackerPB {2 P+ A, T2 p% E+ J
    private ResourceTracker real;0 s2 _( M' w; z8 ?2 ?' p/ R4 U+ s( @
    public ResourceTrackerPBServiceImpl(ResourceTracker impl) {
    ' X4 q  [/ m# M4 Xthis.real = impl;
    ! \! B$ X( A" N& C+ `# y4 L} @
    $ Z5 s" `2 T4 F4 o! ^) h+ C* ~  e* jOverride
    % \2 o* V: N" d7 U1 \public RegisterNodeManagerResponseProto registerNodeManager(
    # y1 q8 V9 z" Z' U$ sRpcController controller, RegisterNodeManagerRequestProto proto)9 v, Z# J( R6 S# ^
    throws ServiceException {, R9 A' c/ O: X% f
    RegisterNodeManagerRequestPBImpl request = new RegisterNodeManagerRequestPBImpl(proto);) v- _5 X: ?, z, R
    try {$ Z5 Z. w$ C5 c
    RegisterNodeManagerResponse response = real.registerNodeManager(request);
    5 ]% g  E+ c$ V) x0 {' V  treturn ((RegisterNodeManagerResponsePBImpl)response).getProto();
    + l3 B4 d8 g$ |& Y$ ?: k: X} catch (YarnException e) {
    4 B8 H7 ~  B& n$ Dthrow new ServiceException(e);2 V# X, h  b" o6 |+ e9 a1 Y
    } catch (IOException e) {+ [7 Y6 \9 Q9 L' k
    throw new ServiceException(e);6 ~  ^  ^' f- K# [) ^) i- Z0 K
    }
    : V' P7 Z/ K5 R5 ^$ f" K2 a} .
    9 y% ]5 B6 b: U0 \7 l& a..
    / v7 L) e: B. K4 s3 N/ C2 r0 {}
    ; ~( ~5 ?0 x# o9 w3 p) B总结上面几个步骤, 为了实现基于Protocol Buffers序列化框架的YARN RPC通信协议ResourceTrackerYARN实现了一系列4 [' l- ]$ n) g' j8 M0 d
    Java接口定义和Protocol Buffers封装, 具体如图3-12所示( 以服务器端实现为例) 。
    : M- x) y1 k  f0 Z0 @! A3-12 YARN RPC中的Protocol Buffers封装  e3 [$ V5 R! ?5 h! M7 |
    [6] 参见网址http://en.wikipedia.org/wiki/Remote_procedure_call
    ! d2 H- k3 f2 R/ R8 Y
    [7] Doug CuttingHadoop最初设计时就是这样描述Hadoop RPC设计动机的。- A" q9 f8 `* w  P& C% ^% ^
    [8] HDFS的单点故障已经在Hadoop 2.0中得到了解决, MRv1中的JobTracker的单点故障在CDH4中也得到了解决。, \' G; j! s6 O5 A) ]
    [9] 参见网址http://thrift.apache.org/! G4 f& J- L9 w, ]4 `
    [10] 参见网址: http://code.google.com/p/protobuf/wiki/ThirdParty/AddOns
    ; }5 t1 T; I1 H7 f% n, [: E
    [11] AvroRpcEngineHadoop 0.21.0版本开始出现。
    " ~( a, u! _( e$ N" S
    [12] ProtobufRpcEngineHadoop 2.0-apha版本开始出现。% _) t+ [+ X. U+ `9 y
    [13] 参见网址https://issues.apache.org/jira/browse/HADOOP-7347- i0 v: P1 i$ {0 @3 T* q
    [14] Hadoop 2.0中的RFC框架是采用Java编写的, 尚不能像ThriftAvro那样支持多语言编程, 但引入Protocol Buffers序列化框架则
    : l. b+ ?% a! X! L+ _使其向前迈进了一步。
      6 U2 M  S6 x8 K1 r/ {7 [2 S
    ' O0 D( J7 s9 g$ Q! t2 c* B3 G

    7 C& ^5 I; i; `  D+ `: |
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|Archiver|手机版|小黑屋|Java自学网

    GMT+8, 2024-11-21 17:44 , Processed in 0.142768 second(s), 34 queries .

    Powered by Javazx

    Copyright © 2012-2022, Javazx Cloud.

    快速回复 返回顶部 返回列表