java自学网VIP

Java自学网

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 3126|回复: 0

《深入解析YARN架构设计与实现原理》第3章 YARN基础库【3.3】part2

[复制链接]
  • TA的每日心情
    开心
    2021-5-25 00:00
  • 签到天数: 1917 天

    [LV.Master]出神入化

    2093

    主题

    3751

    帖子

    6万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    66775

    宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2017-4-17 13:56:45 | 显示全部楼层 |阅读模式
    3.3.5 Hadoop RPC类详解
    , A0 w4 J  t- O2 s7 V) L. ]# e- iHadoop RPC主要由三个大类组成, 即RPCClientServer, 分别对应对外编程接口、 客户端实现和服务器实现。
    - B$ C! ^5 n, O
    1.ipc.RPC类分析- G4 R( I8 x( I+ k4 _6 W
    RPC类实际上是对底层客户机服务器网络模型的封装, 以便为程序员提供一套更方便简洁的编程接口。
    0 V4 M' L( O) z如图
    3-4所示, RPC类定义了一系列构建和销毁RPC客户端的方法, 构建方法分为getProxywaitForProxy两类, 销毁方只有一0 S9 e+ W9 _7 |& q
    个, 即为
    stopProxyRPC服务器的构建则由静态内部类RPC.Builder, 该类提供了一些列setXxx方法( Xxx为某个参数名称) 供用户- X2 H* b% I7 c% R1 |
    设置一些基本的参数, 比如
    RPC协议、 RPC协议实现对象、 服务器绑定地址、 端口号等, 一旦设置完成这些参数后, 可通过调用+ m* A/ Z  K8 }$ K% _
    RPC.Builder.build()完成一个服务器对象的构建, 之后直接调用Server.start()方法便可以启动该服务器。% I0 z4 E, \  k, z
    Hadoop 1.x中的RPC仅支持基于Writable序列化方式不同, Hadoop 2.x允许用户使用其他序列化框架, 比如Protocol Buffers. N8 _/ {4 ]' q" l* c$ Y
    等, 目前提供了WritableWritableRpcEngine) 和Protocol BuffersProtobufRpcEngine) 两种, 默认实现是Writable方式, 用户可通过
    3 g- F+ A5 ^( Q调用
    RPC.setProtocolEngine(…)修改采用的序列化方式。
    / _$ K' n' J/ G" Q9 @1 `下面以采用
    Writable序列化为例( 采用Protocol Buffers的过程类似) , 介绍Hadoop RPC的远程过程调用流程。 Hadoop RPC使用
      A  M6 n7 b/ n0 l
    Java动态代理完成对远程方法的调用: 用户只需实现java.lang.reflect.InvocationHandler接口, 并按照自己需求实现invoke 方法即可6 B% `, L, W$ j* p# I
    完成动态代理类对象上的方法调用。 但对于
    Hadoop RPC, 函数调用由客户端发出, 并在服务器端执行并返回, 因此不能像单机# ~* e0 @2 S5 H; N* z* T
    程序那样直接在
    invoke 方法中本地调用相关函数, 它的做法是, 在invoke方法中, 将函数调用信息( 函数名, 函数参数列表等)
    ; x& L. o* i$ @3 s" |) a, m打包成可序列化的
    WritableRpcEngine.Invocation对象, 并通过网络发送给服务器端, 服务端收到该调用信息后, 解析出和函数名,: |1 u& p) y2 {6 ^. a5 h1 ^
    函数参数列表等信息, 利用
    Java反射机制完成函数调用, 期间涉及到的类关系如下图所示。
    4 O! E' B5 E* [  t7 P+ ^5 n8 n
    3-4 HadoopRPC的主要类关系图( i" ^5 x1 A; x* n$ v4 a1 O1 p
    3-5 HadoopRPC中服务器端动态代理实现类图
    + |* J/ _6 i3 Y; ]& z: p
    2.ipc.Client) R) P0 S+ q/ @. |+ q
    Client主要完成的功能是发送远程过程调用信息并接收执行结果。 它涉及到的类关系如图3-6所示。 Client类对外提供了一类执2 J& d$ _( J4 m
    行远程调用的接口, 这些接口的名称一样, 仅仅是参数列表不同, 比如其中一个的声明如下所示:
    - N) n0 S( `" {8 d" [
    public Writable call(Writable param, ConnectionIdremoteId)
    4 C- {0 w# [: U, [9 j' sthrows InterruptedException, IOException;
    ( W% W: Y/ [; v4 d% m, [6 U* H: f1 J; N3-6 Client类图
    % j1 V2 j& \- C& m; D
    Client内部有两个重要的内部类, 分别是CallConnection+ |1 @7 k4 C, g: W$ E+ n
    ❑Call: 封装了一个RPC请求, 它包含5个成员变量, 分别是唯一标识id、 函数调用信息param、 函数执行返回值value、 出
    . Y4 _" G1 [% q' o' c- `4 Q$ H错或者异常信息
    error和执行完成标识符done。 由于Hadoop RPC Server采用异步方式处理客户端请求, 这使远程过程调用的发生顺' \: \  L! y: B/ S3 O1 d
    序与结果返回顺序无直接关系, 而
    Client端正是通过id识别不同的函数调用的。 当客户端向服务器端发送请求时, 只需填充id0 q8 h7 O0 Q; p
    param两个变量, 而剩下的3个变量( valueerrordone) 则由服务器端根据函数执行情况填充。
    - K# G- u. q5 X1 P3 P
    ❑ConnectionClient与每个Server之间维护一个通信连接, 与该连接相关的基本信息及操作被封装到Connection类中, 基本
    ( \5 b0 D  k" K信息主要包括通信连接唯一标识(
    remoteId) 、 与Server端通信的Socketsocket) 、 网络输入数据流( in) 、 网络输出数据流! ~( s, T# s) Z; ^: N
    out) 、 保存RPC请求的哈希表( calls) 等。 操作则包括:
    ( o% e; ^$ _$ y2 B* ?+ D
    ❍addCall—将一个Call对象添加到哈希表中;
      t6 V9 Q% `. ?1 {
    ❍sendParam—向服务器端发送RPC请求;* }; {+ N: e: t& k0 H
    ❍receiveResponse —从服务器端接收已经处理完成的RPC请求;; _1 l# M; m, O) F# t8 M; s
    ❍run—Connection是一个线程类, 它的run方法调用了receiveResponse方法, 会一直等待接收RPC返回结果。
    & k: |. P+ c* d( G( r& f当调用
    call函数执行某个远程方法时, Client端需要进行( 如图3-7所示) 以下4个步骤。
    , Z+ r6 b& G7 Z# V; y
    1) 创建一个Connection对象, 并将远程方法调用信息封装成Call对象, 放到Connection对象中的哈希表中;
    0 L) ?" Q( `$ b$ H, H( ~) z7 t$ O3 Y
    2) 调用Connection类中的sendRpcRequest()方法将当前Call对象发送给Server端;
    ; P$ z1 n1 B% q7 z4 i+ A
    3Server端处理完RPC请求后, 将结果通过网络返回给Client端, Client端通过receiveRpcResponse()函数获取结果;
    # o+ h! [3 e' j. k6 T4Client检查结果处理状态( 成功还是失败) , 并将对应Call对象从哈希表中删除。1 h  R  f+ P! {3 a
    3-7 Hadoop RPC Client处理流程
    & |! z0 L! m+ m# L8 O( w
    3.ipc.Server类分析% a4 s5 h, R; c! ]6 S
    Hadoop采用了Master/Slave结构, 其中Master是整个系统的单点, 如NameNodeJobTracker [8] , 这是制约系统性能和可扩展: M+ [# q" d4 R# ~# ^4 n( Y5 e
    性的最关键因素之一; 而
    Master通过ipc.Server接收并处理所有Slave发送的请求, 这就要求ipc.Server 将高并发和可扩展性作为设计
    ! Y  w) f& E8 p6 z: S8 T目标。 为此,
    ipc.Server采用了很多提高并发处理能力的技术, 主要包括线程池、 事件驱动和Reactor设计模式等, 这些技术均采用6 m0 m5 }2 {* W9 Q
    JDK自带的库实现, 这里重点分析它是如何利用Reactor设计模式提高整体性能的。. O, K: g# P0 Q5 |; X: ]! w
    Reactor是并发编程中的一种基于事件驱动的设计模式, 它具有以下两个特点: 通过派发/分离I/O操作事件提高系统的并发性
    3 s$ z; r, b0 [; t能; 提供了粗粒度的并发控制, 使用单线程实现, 避免了复杂的同步处理。 典型的
    Reactor实现原理如图3-8所示。! X8 F; `, Z9 `2 l) d6 x1 b
    3-8 Reactor模式工作原理
    8 E1 T: g: k  h3 z& _" g典型的
    Reactor模式中主要包括以下几个角色。
    * O+ v: ^8 s2 i; J+ j! Z. v❑ReactorI/O事件的派发者。# q$ h; w$ u' a/ c# Z' D
    ❑Acceptor接受来自Client的连接, 建立与Client对应的Handler, 并向Reactor注册此Handler
    2 s0 Q' E+ n# `
    ❑Handler与一个Client通信的实体, 并按一定的过程实现业务的处理。 Handler内部往往会有更进一步的层次划分, 用来抽
    * F7 R$ K: @5 H% I象诸如
    readdecodecomputeencodesend等过程。 在Reactor模式中, 业务逻辑被分散的I/O事件所打破, 所以Handler需要有适8 {* @& D: ^, r2 _( A4 f) @/ e2 v
    当的机制在所需的信息还不全( 读到一半) 的时候保存上下文, 并在下一次
    I/O事件到来的时候( 另一半可读) 能继续上次中断& c6 U/ c- a# w% Z7 a
    的处理。7 c& i! v; O: H# X5 E) @
    ❑Reader/Sender为了加速处理速度, Reactor模式往往构建一个存放数据处理线程的线程池, 这样数据读出后, 立即扔到线
    ! [2 b9 R8 n7 `; U3 `6 Y程池中等待后续处理即可。 为此,
    Reactor模式一般分离Handler中的读和写两个过程, 分别注册成单独的读事件和写事件, 并由对
    6 I2 x' b) c- r( k; j2 F( ^应的
    ReaderSender线程处理。7 l' R' o% U! |
    ip
    5 h. Z5 B' t+ F3 yc.Server
    实际上实现了一个典型的Reactor设计模式, 其整体架构与上述完全一致。 一旦读者了解典型Reactor架构便可很容易
    ( }# \  g, P) _; t% i" B* m地学习
    ipc.Server的设计思路及实现。 接下来, 我们分析ipc.Server的实现细节。8 X, N6 ^. d& {8 I3 C
    前面提到,
    ipc.Server的主要功能是接收来自客户端的RPC请求, 经过调用相应的函数获取结果后, 返回给对应的客户端。 为! z8 O1 y- p3 a; E6 y& i9 i, q
    此,
    ipc.Server被划分成3个阶段: 接收请求、 处理请求和返回结果, 如图3-9所示。 各阶段实现细节如下。% L" z, ^6 I7 t/ n6 v6 \
    3-9 Hadoop RPC Server处理流程
    ) z" p0 K2 L2 S" x  Y% v
    1) 接收请求* l8 Z' P6 v1 e
    该阶段主要任务是接收来自各个客户端的
    RPC请求, 并将它们封装成固定的格式( Call类) 放到一个共享队列( callQueue; _; y; T4 s4 a( @; h8 N6 C: D
    中, 以便进行后续处理。 该阶段内部又分为建立连接和接收请求两个子阶段, 分别由
    ListenerReader两种线程完成。
    3 S0 K1 h, D$ H5 ?9 @  ^整个
    Server只有一个Listener线程, 统一负责监听来自客户端的连接请求, 一旦有新的请求到达, 它会采用轮询的方式从线程6 [' b9 g6 t0 p6 o4 h+ _* |7 X5 c
    池中选择一个
    Reader线程进行处理, 而Reader线程可同时存在多个, 它们分别负责接收一部分客户端连接的RPC请求, 至于每个; P$ k* I1 g) a/ M! A2 H
    Reader线程负责哪些客户端连接, 完全由Listener决定, 当前Listener只是采用了简单的轮询分配机制。
    & i1 K1 N% }8 M5 V. UListenerReader线程内部各自包含一个Selector对象, 分别用于监听SelectionKey.OP_ACCEPTSelectionKey.OP_READ事件。
    - @# A5 p: ~: _; f& {  Z对于
    Listener线程, 主循环的实现体是监听是否有新的连接请求到达, 并采用轮询策略选择一个Reader线程处理新连接; 对于
    6 P8 c/ p6 L- N/ y. A' w
    Reader线程, 主循环的实现体是监听( 它负责的那部分) 客户端连接中是否有新的RPC请求到达, 并将新的RPC请求封装成Call' U; N! V' q# d6 Z9 h% O
    象, 放到共享队列
    callQueue中。
    2 N0 V5 N# r! n( }
    2) 处理请求' c% N4 n; r3 z& ^" I+ ]7 g
    该阶段主要任务是从共享队列
    callQueue中获取Call对象, 执行对应的函数调用, 并将结果返回给客户端, 这全部由Handler线* L5 w! Q7 q% W7 Y" s
    程完成。  {" p8 {& d1 N( ], T7 v3 w
    Server端可同时存在多个Handler线程, 它们并行从共享队列中读取Call对象, 经执行对应的函数调用后, 将尝试着直接将结果
    ! m1 D4 e/ I; ]+ E2 [返回给对应的客户端。 但考虑到某些函数调用返回结果很大或者网络速度过慢, 可能难以将结果一次性发送到客户端, 此时
    - `& ?) E  p% S7 }% s' Q
    Handler将尝试着将后续发送任务交给Responder线程。
    : B2 R% W1 v' W
    3) 返回结果
    ; n2 `7 x  t# w9 X前面提到, 每个
    Handler线程执行完函数调用后, 会尝试着将执行结果返回给客户端, 但对于特殊情况, 比如函数调用返回结, l* m$ Q# u$ b# |8 S
    果过大或者网络异常情况( 网速过慢) , 会将发送任务交给
    Responder线程。
    % x' b" _* R7 n" T) R  O( e
    Server端仅存在一个Responder线程, 它的内部包含一个Selector对象, 用于监听SelectionKey.OP_WRITE事件。 当Handler没能将
    2 B% U  a% F( r4 p3 ]% E结果一次性发送到客户端时, 会向该
    Selector对象注册SelectionKey.OP_WRITE事件, 进而由Responder线程采用异步方式继续发送! }. `5 t+ L% I1 b9 T& b
    未发送完成的结果。
    + z& v, [. O& s, k2 C! [; f( W
    3.3.6 Hadoop RPC参数调优5 R# e. }0 o2 Q# o, X4 f* q# [
    Hadoop RPC对外提供了一些可配置参数, 以便于用户根据业务需求和硬件环境对其进行调优。 主要的配置参数如下。
    & A. \5 N5 G3 D2 ^3 m! G
    ❑Reader线程数目。 由参数ipc.server.read.threadpool.size配置, 默认是1, 也就是说, 默认情况下, 一个RPC Server只包含一个
    $ p0 W; s% p4 M7 x4 M  }- L5 ]% `: V
    Reader线程。
    # P& A0 r. h6 P
    ❑每个Handler线程对应的最大Call数目。 由参数ipc.server.handler.queue.size指定, 默认是100, 也就是说, 默认情况下, 每个4 m7 K" Y3 _8 R! I" K) Z1 C
    Handler线程对应的Call队列长度为100。 比如, 如果Handler数目为10, 则整个Call队列( 即共享队列callQueue) 最大长度为:$ T8 q2 U2 a0 Q; ?
    100×10=1000! k( M* F9 A& c, I
    ❑Handler线程数目。 Hadoop中, ResourceManagerNameNode分别是YARNHDFS两个子系统中的RPC Server, 其对应的9 q6 u- m" T) g$ n& j' s) o' a# e
    Handler数目分别由参数yarn.resourcemanager.resource-tracker.client.thread-countdfs.namenode.service.handler.count指定, 默认值分别为
    4 I, c2 D& Z2 ]; T5 R+ \
    5010, 当集群规模较大时, 这两个参数值会大大影响系统性能。
    / i9 c' N) K4 n8 e: ?" x5 a
    ❑客户端最大重试次数。 在分布式环境下, 因网络故障或者其他原因迫使客户端重试连接是很常见的, 但尝试次数过多可
    ' u+ @. N2 C. g8 R能不利于对实时性要求较高的应用。 客户端最大重试次数由参数
    ipc.client.connect.max.retries指定, 默认值为10, 也就是会连续尝试; R, B6 T6 c5 O2 z9 l/ A
    10次( 每两次之间相隔1秒) 。5 G& \4 X; u- Z# s8 D" |' m# b3 k
    3.3.7 YARN RPC实现* e% }$ D0 P. b; @# T
    当前存在非常多的开源RPC框架, 比较有名 的有Thrift [9] Protocol BuffersAvro。 同Hadoop RPC一样, 它们均由两部分组, f, O+ N) r# C7 B% \" }2 {" q; x3 y
    成: 对象序列化和远程过程调用(
    Protocol Buflers官方仅提供了序列化实现, 未提供远程调用相关实现, 但三 方RPC库非常多 [10]) H7 B0 j$ w$ d; \8 p/ A7 v
    ) 。 相比于Hadoop RPC, 它们有以下几个特点:( _; f  B  m' K
    ❑跨语言特性 。 前面提到, RPC框架实际上是客户机服务器模型的一个应用实例, 对于Hadoop RPC而言, 由于Hadoop采用$ ^& e1 B$ ]/ }2 u. z! d" P
    Java语言编写, 因而其RPC客户端和服务器端仅支持Java语言; 但对于更通用的RPC框架, 如Thrift或者Protocol Buffers等, 其客户% h7 n" A' K4 Y8 A. N3 V7 m
    端和服务器端可采用任何语言编写, 如
    JavaC++Python等, 这给用户编程带来极大方便。
    1 I( e' M, X( @, i# ^: I) v
    ❑引入IDL 。 开源RPC框架均提供了一套接口描述语言( Interface Description LanguageIDL) , 它提供一套通用的数据类型,4 @: A  n  c1 T4 Q( x
    并以这些数据类型来定义更为复杂的数据类型和对外服务接口。 一旦用户按照
    IDL定义的语法编写完接口文件后, 可根据实际应. V( {8 T6 M+ [$ A' S7 b
    用需要生成特定编程语言( 如
    JavaC++Python等) 的客户端和服务器端代码。% M6 V3 r' v- [8 g3 L$ v) R7 p
    ❑协议兼容性 。 开源RPC框架在设计上均考虑到了协议兼容性问题, 即当协议格式发生改变时, 比如某个类需要添加或者/ O1 w0 n( O) k. V
    删除一个成员变量( 字段) 后, 旧版本代码仍然能识别新格式的数据, 也就是说, 具有向后兼容性。% C7 o  g, W9 J6 @1 G. I
    随着
    Hadoop版本的不断演化, 研发人员发现Hadoop RPC在跨语言支持和协议兼容性两个方面存在不足, 具体表现为:- h& o, g, G6 s6 k& i+ ]
    ❑从长远发展看, Hadoop RPC应允许某些协议的客户端或者服务器端采用其他语言实现, 比如用户希望直接使用C/C++语言# P* \* C5 ^; ?  F
    读写
    HDFS中的文件, 这就需要有C/C++语言的HDFS客户端。' z5 ~( y. {2 ^
    ❑当前Hadoop版本较多, 而不同版本之间不能通信, 比如0.20.2版本的JobTracker不能与0.21.0版本中的TaskTracker通信, 如
    ' S, S9 ]! l" f果用户企图这样做, 会抛出
    VersionMismatch异常。, E6 f9 O5 T$ k8 |
    为了解决以上几个问题,
    Hadoop YARNRPC中的序列化部分剥离开, 以便将现有的开源RPC框架集成进来。 经过改进之- I" p; x7 g0 Z0 t) W
    后,
    Hadoop RPC的类关系如图3-10所示, RPC类变成了一个工厂, 它将具体的RPC实现授权给RpcEngine实现类, 而现有的开源
    & ^8 S1 g9 h+ j( Y6 R) B" ]" K
    RPC只要实现RpcEngine接口, 便可以集成到Hadoop RPC中。 在该图中, WritableRpcEngine是采用Hadoop自带的序列化框架实现的
    ! G# r1 f- Z7 b/ ~
    RPC, 而 AvroRpcEngine [11] ProtobufRpcEngine [12] 分别是开源RPC( 或序列化) 框架Apache AvroProtocol Buffers对应的( d# ^; O, l0 @% t
    RpcEngine实现, 用户可通过配置参数rpc.engine.{protocol}以指定协议{protocol}采用的序列化方式。 需要注意的是, 当前实现3 s4 _) W! ^. ?# j. r7 v0 b
    中,
    Hadoop RPC只是采用了这些开源框架的序列化机制, 底层的函数调用机制仍采用Hadoop自带的。
    & |7 h  y+ D9 w! x% r" c
    YARN提供的对外类是YarnRPC, 用户只需使用该类便可以构建一个基于Hadoop RPC且采用Protocol Buffers序列化框架的通信
    % z' }* Q2 d3 `2 x: G9 z7 h; U: W协议。
    YarnRPC相关实现类如图3-11所示。 YarnRPC是一个抽象类, 实际的实现由参数yarn.ipc.rpc.class指定, 默认值是
    / T' x( [( |/ F9 Y, M
    org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPCHadoopYarnProtoRPC通过RPC工厂生成器( 工厂设计模式) RpcFactoryProvider
    - g9 t/ E: H( v) D5 l! v1 W' l成客户端工厂( 由参数
    yarn.ipc.client.factory.class指定, 默认值是org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl) 和服3 M  W- V: t4 H* U! {" V
    务器工厂( 由参数
    yarn.ipc.server.factory.class指定, 默认值是org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl) , 以根; O# L1 |7 ^2 N/ Y6 U) o) m! L
    据通信协议的
    Protocol Buffers定义生成客户端对象和服务器对象。
    3 d( {* I, z& j5 p3-10 Hadoop RPC 集成多种开源RPC 框架
    + Q, H4 Z, d- L9 `5 s
    3-11 YarnRPC 相关类图
    1 s; D$ i% g) M, y5 }
    ❑RpcClientFactoryPBImpl根据通信协议接口( 实际上就是一个Java interface) 及Protocol Buffers定义构造RPC客户端句柄, 但# B& M$ u9 g% `& P
    它对通信协议的存放位置和类名命有一定要求。 假设通信协议接口
    Xxx所在Java包名为XxxPackage, 则客户端实现代码必须位于
    $ U+ S8 y7 V: a
    JavaXxxPackage.impl.pb.client中( 在接口包名后面增加".impl.pb.client") , 且实现类名为PBClientImplXxx( 在接口名前面增加前' w$ v: _: @) G( X
    "PBClientImpl") 。1 I5 B( }4 Y4 K% i
    ❑RpcServerFactoryPBImpl根据通信协议接口( 实际上就是一个Java interface) 及Protocol Buffers定义构造RPC服务器句柄
    1 h) E1 U0 X- k1 l(具体会调用前面节介绍的RPC.Server类) , 但它对通信协议的存放位置和类命名有一定要求。 假设通信协议接口Xxx所在Java
    1 x; f" r+ E3 \' G& S5 g名为
    XxxPackage, 则客户端实现代码必须位于JavaXxxPackage.impl.pb.server中(在接口包名后面增加".impl.pb.server") , 且实现8 N" t7 V' `( e5 l# y5 M$ R
    类名为
    PBServiceImplXxx(在接口名前面增加前缀"PBServiceImpl") 。5 h$ k) n' X6 [+ O, X* R3 z2 d
    Hadoop YARN已将Protocol Buffers作为默认 的序列化机制 [13] (而不是Hadoop自带的Writable) , 这带来的好处主要表现在以1 T4 [+ \& g* b- X# H# _
    下几个方面:
    4 E) F5 E& `8 D
    ❑继承了Protocol Buffers的优势 Protocol Buffers已在实践中证明了其高效性、 可扩展性、 紧凑性和跨语言特性。 首先, 它允
    : l' z- G0 V' f: {. v- w许在保持向后兼容性的前提下修改协议, 比如为某个定义好的数据格式添加一个新的字段; 其次, 它支持多种语言, 进而方便用. W+ ^3 }$ Y. R! u& x3 M3 C
    户为某些服务(比如
    HDFSNameNode) 编写 非Java客户端 [14] ; 此外, 实验表明Protocol BuffersHadoop 自带的Writable在性能% `8 `+ `/ Q( ?# \" r7 N& S
    方面有很大提升。0 d, f" Y% x3 O7 L
    ❑支持升级回滚 Hadoop 2.0已经将NameNode HA方案合并进来, 在该方案中, Name-Node分为ActiveStandby两种角色,/ w8 C% J+ t5 F
    其中,
    Active NameNode在当前对外提供服务, 而Standby NameNode则是能够在Active NameNode出现故障时接替它。 采用Protocol
    5 c  f( H" Q5 o' DBuffers
    序列化机制后, 管理员能够在不停止NameNode对外服务的前提下, 通过主备NameNode之间的切换, 依次对主备
    # N- o' {* q3 i' I, Z
    NameNode进行在线升级(不用考虑版本和协议兼容性等问题) 。. D* V' A/ X/ G
    3.3.8 YARN RPC应用实例
    / V0 F" M' j+ R7 x为了进一步说明YARN RPC的使用方法, 本小节给出一个具体的应用实例。3 l/ F, P& l4 G1 L# t  {) ]
    YARN中, ResourceManagerNodeManager之间的通信协议是ResourceTracker, 其中NodeManager是该协议的客户
    , V+ S8 b. e' |0 }( r端,
    ResourceManager是服务端, NodeManager通过该协议中定义的两个RPC函数( registerNodeManagernodeHeartbeat) 向
    9 R. Z6 O7 B# ?% p% B
    ResourceManager注册和周期性发送心跳信息。 ResourceManager(服务器端) 中的相关代码如下:
    , M3 M% H: o! H' [6 d
    // ResourceTrackerService实现了ResourceTracker通信接口, 并启动RPC Server! c) Y+ d) m- R
    public class ResourceTrackerService extends AbstractService implements8 E0 W( F4 o5 z) H  ^+ C
    ResourceTracker {3 g4 s3 P9 f0 U/ \
    private Server server;0 Q' _1 T: I* u
    ...
    - E! q' z8 O, ^3 g; G9 }protected void serviceStart() throws Exception {
    , V5 b7 p* |/ Z/ g4 ?3 ]super.serviceStart();
    1 U6 G+ v/ Z5 L$ WConfiguration conf = getConfig();
    . @& m( T6 g6 }YarnRPC rpc = YarnRPC.create(conf); //使用YarnRPC( i% m6 {, E8 H- X1 u. f
    this.server = rpc.getServer(ResourceTracker.class, this, resourceTrackerAddress,
    / I6 s% |. n" s7 c# q& j# gconf, null, conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT,
    5 ]. A% H) C; a/ X% _  kYarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT));
    % `, l# x9 ~- f4 C+ V, D2 hthis.server.start();
    4 n& p/ _! B' j}.2 j6 M  H* ]" x6 p; F
    ..1 `+ a$ X( H* E4 e8 p: Y+ c3 f
    @Override  {9 @7 ?+ r8 m6 S& A' g# q
    public RegisterNodeManagerResponse registerNodeManager(
    * R& e# ^7 `3 q' iRegisterNodeManagerRequest request) throws YarnException,( K- A! f3 h1 ^$ x
    IOException {
    ; |6 n; `$ ~7 g- }0 T6 H4 R//具体实现3 D- t' g3 B, |0 n0 z- @1 U: K
    }@
    ) r% y) m, i6 w. XOverride6 x* B4 U+ J& h
    public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
    ' Y/ e- `: F1 l, A. @2 O5 Fthrows YarnException, IOException {; t& s7 M  N* D! n
    //具体实现6 Q1 R+ J$ v) V. M9 H  O& `
    }
    ( L6 [+ f8 \* N$ _+ O}
    1 k3 r  i( v- O( x! W6 H/ @4 u/ dNodeManager(客户端) 中的相关代码如下。
    ! ?0 j- N, N: i5 T3 Q( k
    // 该函数是从YARN源代码中简单修改而来的
    7 z. F  n8 T$ ?1 }
    protected ResourceTracker getRMClient() throws IOException {
    4 D% |% ?  e  B  p1 QConfiguration conf = getConfig();
    0 j, v# y: ^4 Z0 sInetSocketAddress rmAddress = getRMAddress(conf, protocol);
    ! m. Q) f0 J# @* x! sRetryPolicy retryPolicy = createRetryPolicy(conf);
    5 o! l% u- U- T- s; e7 IResourceTracker proxy = RMProxy.<T>getProxy(conf, ResourceTracker.class, rmAddress);
    & M. P! ~  ^6 {# [LOG.info("Connecting to ResourceManager at " + rmAddress);& F* g# n. [9 R; ]/ ~
    return (ResourceTracker) RetryProxy.create(protocol, proxy, retryPolicy);0 U7 z5 i/ H0 m, W
    }.
    & d4 i+ e: |. H$ P% ~! I; @..
    / o7 X1 B  ]5 ~  w. Y) Wthis.resourceTracker = getRMClient();
    * x0 t. _) q  j. @( n& L9 {, p" Y...; z  ~  T; h# h
    RegisterNodeManagerResponse regNMResponse = resourceTracker.registerNodeManager(request);5 V$ a& A* |' U3 Z1 F( s
    ...
    0 n9 K! }& [. g6 z& a( }response = resourceTracker.nodeHeartbeat(request);) {1 K! [" z+ q- N5 q) Y
    为了能够让以上代码正常工作, YARN按照以下流程实现各种功能。5 L# [2 V: |8 d- q0 P0 z" H" C
    步骤1 定义通信协议接口( Java Interface) 。 定义通信协议接口ResourceTracker, 它包含registerNodeManagernodeHeartbeat
    ( C5 [+ \* a0 X. G9 X
    两个函数, 且每个函数包含一个参数和一个返回值, 具体如下:, y9 b' s* P; L$ Z6 _5 ]$ d# F
    public interface ResourceTracker {
    ( {* `  \6 W7 P1 [public RegisterNodeManagerResponse registerNodeManager(, n- @% H+ L& n6 |( \+ e
    RegisterNodeManagerRequest request) throws YarnException, IOException;9 Y9 s" e; g7 j( c- V
    public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)! K- }- V2 A* z' j: H0 W- j
    throws YarnException, IOException;
    $ U9 G: [8 W+ Y5 d2 d}
    % O- o' ~. ?+ Y5 d步骤2 为通信协议ResourceTracker提供Protocol Buffers定义和Java实现。 前面提到, Protocol Buffers仅提供了序列化框架, 但  m' F/ e- f' E, N; e0 c0 u
    未提供
    RPC实现, 因此RPC部分需要由用户自己实现, 而YARN则让ResourceTrackerService类实现了ResourceTracker协议, 它的
    4 n0 `- @& m- m8 j! F( s
    Protocol Buffers定义( 具体见文件ResourceTracker.proto) 如下:
    " Y$ W" U3 I1 r7 @
    option java_package = "org.apache.hadoop.yarn.proto";
    # H& F7 c) O6 \# J! Eoption java_outer_classname = "ResourceTracker";3 M$ s' ]  J  x' |% a, w
    option java_generic_services = true;2 x9 M- S; l! p  c9 a' T$ s
    option java_generate_equals_and_hash = true;" U0 d: |  R+ q! l  s( p; B
    import "yarn_server_common_service_protos.proto";5 X9 l4 A# j, G& U/ R& _0 N) h
    service ResourceTrackerService {" ]% Y7 h! I# z3 s; t
    rpc registerNodeManager(RegisterNodeManagerRequestProto) returns (RegisterNodeManagerResponseProto);, R4 t3 ^7 l4 {1 W; i" S- G0 x
    rpc nodeHeartbeat(NodeHeartbeatRequestProto) returns (NodeHeartbeatResponseProto);
    5 T0 u! D7 a! S* j* P" k3 @}
    % R2 R) r/ w% cResourceTrackerRPC函数实现是由ResourceManager中的ResourceTrackerService完成的。
    # Q  Y( I! c" r& o步骤
    3 RPC函数的参数和返回值提供Protocol Buffers定义。 YARN需要保证每个RPC函数的参数和返回值是采用Protocol
    ; a+ z; g' `7 ^( b  RBuffers
    定义的, 因此ResourceTracker协议中RegisterNodeManagerRequestRegisterNodeManagerResponseNodeHeartbeatRequest
    ) x8 p/ p2 r( r* ^2 V+ H2 _# x
    NodeHeartbeatResponse四个参数或者返回值需要使用Protocol Buffers定义, 具体如下( 见yarn_server_common_service_protos.proto' S/ l, L. m; {
    件) :
      J+ v' B- X( f% A$ m8 ]- A6 n+ Y
    import "yarn_protos.proto";
    ! C, l" Z" [6 [/ d" ]3 {import "yarn_server_common_protos.proto";
    4 S' w+ c3 u" c9 k9 C: p0 i* m# _message RegisterNodeManagerRequestProto {
      l, B6 n+ D; S  |4 z  eoptional NodeIdProto node_id = 1;* M" u9 l) }+ W' i% z- z9 C
    optional int32 http_port = 3;9 d' M$ ^( e) p$ f0 g) d
    optional ResourceProto resource = 4;
    4 P- v6 A3 l' [" F} m
    # V2 y# F& f  ^* _essage RegisterNodeManagerResponseProto {/ ]* q4 R  k  [$ ~
    optional MasterKeyProto container_token_master_key = 1;& d5 Y$ @  B1 ^) m" Q. A6 O( ]0 s
    optional MasterKeyProto nm_token_master_key = 2;
    1 c5 }# C6 R1 Qoptional NodeActionProto nodeAction = 3;4 B* M; b5 b- p0 Z1 }8 }
    optional int64 rm_identifier = 4;& L" O# w; ~$ F( u
    optional string diagnostics_message = 5;
    / U9 S7 U6 Z& l4 \7 Y}.
    $ F# d5 ^) T. k% y; s, J7 n% n.. //其他几个参数和返回值的定义
    + i8 r1 f! H. J; N9 H
    步骤4 RPC函数的参数和返回值提供Java定义和封装。 YARN采用了Protocol Buffers作为参数和返回值的序列化框架, 且以8 Z+ N# a4 W1 H" \
    原生态
    .proto文件的方式给出了定义, 而具体的Java代码生成需在代码编写之后完成。 基于以上考虑, 为了更容易使用Protocol, `: e) W( N  l; R! [( U7 [
    Buffers
    生成的( Java语言) 参数和返回值定义, YARN RPC为每个RPC函数的参数和返回值提供Java定义和封装, 以参数0 c0 w8 g- g! h2 k% P0 G( e5 u
    RegisterNodeManagerRequest为例进行说明。
    7 s# X6 K  i) r6 T% \  a
    Java接口定义如下( 见Javaorg.apache.hadoop.yarn.server.api.protocolrecords) :
    , Z- z4 C8 ?- f1 O* q
    public interface RegisterNodeManagerRequest {
    6 S# R: P' x1 ^) ?; a* g* h7 ?. GNodeId getNodeId();
    6 _9 b1 K+ u, Kint getHttpPort();
    0 L$ v: q4 ]  h( l' C2 T  KResource getResource();  z0 W  n2 A. D- {( N1 F  @
    void setNodeId(NodeId nodeId);
    1 v/ K& L& L/ o/ z; Lvoid setHttpPort(int port);/ N! V- l9 K) X
    void setResource(Resource resource);& w5 B1 y6 J" O3 V3 Y) ~" y
    }
    4 J, S( ~) a/ H7 ]+ FJava封装如下( 见Javaorg.apache.hadoop.yarn.server.api.protocolrecords.impl.pb) :
    1 I' m' A3 v0 \8 H+ H7 F
    public class RegisterNodeManagerRequestPBImpl extends% n9 r- X2 J2 {& I9 f. j: z. U7 I% A
    ProtoBase<RegisterNodeManagerRequestProto> implements RegisterNodeManagerRequest {
    * Q4 f. d, Y7 P$ U( o+ D  Q+ w2 HRegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance();" G" ^+ O. Q& q- A/ @# W
    RegisterNodeManagerRequestProto.Builder builder = null;8 r1 s0 h' w" J. X
    private NodeId nodeId = null;
    0 U2 M/ x4 Q% J% V8 M...
    7 g+ e- V  G# B2 u@Override! d9 ]" _& f7 D1 w+ v: k: ?
    public NodeId getNodeId() {
    " R# f/ S6 p$ s' Q3 KRegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
    ! S3 ~/ w! _# M& p8 s! Sif (this.nodeId != null) {, Z+ Q1 [7 F- T! l0 p! X
    return this.nodeId;
    2 M0 z! F7 ?) Z& J* T3 K+ v3 c9 J}i$ T9 G6 F/ F2 n% p* o" O8 s  H
    f (!p.hasNodeId()) {
    4 |; |- c! z1 V7 f0 Lreturn null;. U# a$ h% y$ W8 v* I
    } t( k, K, d( u2 }7 s# ]* d! ]
    his.nodeId = convertFromProtoFormat(p.getNodeId());
    ; V! Q1 A2 ?3 _return this.nodeId;
    ' l2 ?2 f0 B) m0 A8 d" ]} @: O* X0 R3 a5 m6 z
    Override" p. {+ y+ X  E* {- Q
    public void setNodeId(NodeId nodeId) {
    . M% W# u0 ?' u" [" h, |4 N& O' x3 nmaybeInitBuilder();, C) C# k( b6 I8 v" H
    if (nodeId == null)
    & v1 s2 d; G2 X, o& Dbuilder.clearNodeId();4 f& H* h, j+ ]5 z
    this.nodeId = nodeId;
    * N4 H# b7 x- [} .
    * k* Y- n. J7 r$ c1 f8 ~; m) g..
    2 R$ i6 u0 {* ]  Z6 X}
    ; g6 @' Z& `  g步骤5 为通信协议提供客户端和服务器端实现。 客户端代码放在org.apache.hadoop.yarn.server.api.impl.pb.client包中, 且类名为
    8 P+ g" v, I+ B4 z% i* Y
    ResourceTrackerPBClientImpl, 实现如下:) f! _9 D1 f% e5 q. n& D1 W
    public class ResourceTrackerPBClientImpl implements ResourceTracker, Closeable {/ G& X& k7 @# ]' A
    private ResourceTrackerPB proxy;
    ( u! {4 n- [6 y& O5 N( J! Apublic ResourceTrackerPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException {% Z8 c! v6 |: ?
    RPC.setProtocolEngine(conf, ResourceTrackerPB.class, ProtobufRpcEngine.class);# P: ]% @7 G) o( V% Y8 \, W- v
    proxy = (ResourceTrackerPB)RPC.getProxy(/ h) f4 Y0 ]% d  h6 P- L& S3 u
    ResourceTrackerPB.class, clientVersion, addr, conf);
    1 G% O* C8 \6 W( o; o; l5 t} @+ K3 t9 t4 \* z+ A% s
    Override2 ?+ O- U3 f/ \/ t, V/ k4 {
    public RegisterNodeManagerResponse registerNodeManager(
    6 N' C7 V7 Y; f" U) ERegisterNodeManagerRequest request) throws YarnException,
    " w4 {; j1 W3 j9 o: xIOException {
    # {4 I) Q( I% H: ?RegisterNodeManagerRequestProto requestProto = ((RegisterNodeManagerRequestPBImpl)request).getProto();! _8 E% N+ F, Y3 W5 f, u
    try {) f9 v* T; T  F0 n+ _$ v; j
    return new RegisterNodeManagerResponsePBImpl(proxy.registerNodeManager (null, requestProto));
    / z+ R) j8 z$ d4 _* s# p} catch (ServiceException e) {* q; k4 `. Q+ a2 B; g$ o( X5 j
    RPCUtil.unwrapAndThrowException(e);. b& D  b( Q4 G: n
    return null;
    - C; B  P: N- o3 h% H: @: t# }}( _. |, P9 ]- d- B1 w7 t( D+ o' v# g
    } .
    8 K& ?% ^$ L8 l..' G6 m  [! z6 i  t
    }
    - S6 j) C9 l+ c; s4 g! S1 X) f服务端代码放在org.apache.hadoop.yarn.server.api.impl.pb.server包中, 且类名为Resource-TrackerPBServerImpl, 实现如下:
    / L& t' E/ o% K8 ?
    public class ResourceTrackerPBServiceImpl implements ResourceTrackerPB {1 a8 Q) |; X1 c- K# n
    private ResourceTracker real;
    : I& u( ~$ _/ V+ |4 lpublic ResourceTrackerPBServiceImpl(ResourceTracker impl) {
    3 F2 }; s+ f. T% Cthis.real = impl;' l4 K/ {1 U- I$ ?4 t* I
    } @
    2 }* Z$ T, k. D2 t/ IOverride
    : Z" Y: h# m( y" t% O% ~public RegisterNodeManagerResponseProto registerNodeManager(# P8 s) c7 k3 D9 h
    RpcController controller, RegisterNodeManagerRequestProto proto)
    " T1 f+ h+ M7 x7 ?9 o; athrows ServiceException {: M( X' v3 M5 v; `# M  o
    RegisterNodeManagerRequestPBImpl request = new RegisterNodeManagerRequestPBImpl(proto);' m+ D& j) W& x7 Z* N$ ~
    try {, p& j; X9 a, v1 _5 Y7 b
    RegisterNodeManagerResponse response = real.registerNodeManager(request);
    1 x* b: P; C+ B" r) o& ?return ((RegisterNodeManagerResponsePBImpl)response).getProto();
    1 I6 H3 e7 W% ^' |: z( j* z7 l} catch (YarnException e) {
    1 w& y, U' p# x6 w* F4 `; ]throw new ServiceException(e);- y& O* T# i& q1 S0 Q, S# Z
    } catch (IOException e) {9 f' U" H# I  r: [
    throw new ServiceException(e);
    % S, j2 E3 m4 H4 S* Y; J7 ^* k}+ g7 B& E! J* g" G" q) t
    } .* I# y4 f+ Z* w: E
    ..+ U! |( b6 Q, ^: }# n4 E
    }
      l- x+ u. [1 r/ P+ `. n3 _总结上面几个步骤, 为了实现基于Protocol Buffers序列化框架的YARN RPC通信协议ResourceTrackerYARN实现了一系列7 f( j! r/ Q8 n  e  N7 }
    Java接口定义和Protocol Buffers封装, 具体如图3-12所示( 以服务器端实现为例) 。
    . w, ~8 z/ J, ~2 [  E5 L3-12 YARN RPC中的Protocol Buffers封装
    : n) t0 N% ]. S) g: ^
    [6] 参见网址http://en.wikipedia.org/wiki/Remote_procedure_call& o, \3 A* {5 T+ C. t$ n, _$ u
    [7] Doug CuttingHadoop最初设计时就是这样描述Hadoop RPC设计动机的。
    0 P& Q4 S( t  @) H+ _
    [8] HDFS的单点故障已经在Hadoop 2.0中得到了解决, MRv1中的JobTracker的单点故障在CDH4中也得到了解决。3 d4 R* L# D, Z9 _1 q
    [9] 参见网址http://thrift.apache.org/7 C( l5 }0 v/ j/ h
    [10] 参见网址: http://code.google.com/p/protobuf/wiki/ThirdParty/AddOns' e' @& D/ Y( |' t: W9 b' }5 Z# o
    [11] AvroRpcEngineHadoop 0.21.0版本开始出现。) e- _! y' B- r. H
    [12] ProtobufRpcEngineHadoop 2.0-apha版本开始出现。
    : U2 n4 h8 G  ?3 \1 G. {. K% `
    [13] 参见网址https://issues.apache.org/jira/browse/HADOOP-7347
    : j& q' z/ m, |' l, p0 d
    [14] Hadoop 2.0中的RFC框架是采用Java编写的, 尚不能像ThriftAvro那样支持多语言编程, 但引入Protocol Buffers序列化框架则
    , o5 z3 d. Z+ b+ }使其向前迈进了一步。
      
    8 B  [4 D7 f1 s5 l* O) @, U% j8 ?* q7 N+ z7 o( G8 Q5 T
    1 \7 q5 Z7 m5 h$ p% P6 G
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|Archiver|手机版|小黑屋|Java自学网

    GMT+8, 2025-3-30 07:15 , Processed in 0.607175 second(s), 30 queries .

    Powered by Javazx

    Copyright © 2012-2022, Javazx Cloud.

    快速回复 返回顶部 返回列表