java自学网VIP

Java自学网

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 2986|回复: 0

《深入解析YARN架构设计与实现原理》第3章 YARN基础库【3.3】part2

[复制链接]
  • TA的每日心情
    开心
    2021-5-25 00:00
  • 签到天数: 1917 天

    [LV.Master]出神入化

    2025

    主题

    3683

    帖子

    6万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    66375

    宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2017-4-17 13:56:45 | 显示全部楼层 |阅读模式
    3.3.5 Hadoop RPC类详解/ U) J! {0 M# [' G3 A( F, \
    Hadoop RPC主要由三个大类组成, 即RPCClientServer, 分别对应对外编程接口、 客户端实现和服务器实现。2 B% }# p; ]# y" a) Y" n
    1.ipc.RPC类分析; ~4 F) |% ~% w3 }
    RPC类实际上是对底层客户机服务器网络模型的封装, 以便为程序员提供一套更方便简洁的编程接口。4 a9 ^$ q: b/ h' N7 J( f
    如图
    3-4所示, RPC类定义了一系列构建和销毁RPC客户端的方法, 构建方法分为getProxywaitForProxy两类, 销毁方只有一2 f: J) _( c/ Q( h( _/ u
    个, 即为
    stopProxyRPC服务器的构建则由静态内部类RPC.Builder, 该类提供了一些列setXxx方法( Xxx为某个参数名称) 供用户1 I( W$ P- J! Z( K  s4 m# Q0 y
    设置一些基本的参数, 比如
    RPC协议、 RPC协议实现对象、 服务器绑定地址、 端口号等, 一旦设置完成这些参数后, 可通过调用9 m; k. p& K# O0 M
    RPC.Builder.build()完成一个服务器对象的构建, 之后直接调用Server.start()方法便可以启动该服务器。
    3 e& [" A: U. }9 f. W5 W
    Hadoop 1.x中的RPC仅支持基于Writable序列化方式不同, Hadoop 2.x允许用户使用其他序列化框架, 比如Protocol Buffers
    , h- W8 B. v  B3 r5 N: a) D" I
    等, 目前提供了WritableWritableRpcEngine) 和Protocol BuffersProtobufRpcEngine) 两种, 默认实现是Writable方式, 用户可通过
      L4 `9 \7 P$ D$ @$ r; p9 i调用
    RPC.setProtocolEngine(…)修改采用的序列化方式。1 `8 ~/ A5 b) q" z+ x8 d6 K* X
    下面以采用
    Writable序列化为例( 采用Protocol Buffers的过程类似) , 介绍Hadoop RPC的远程过程调用流程。 Hadoop RPC使用6 q/ H$ d6 ]( N: K4 _! L
    Java动态代理完成对远程方法的调用: 用户只需实现java.lang.reflect.InvocationHandler接口, 并按照自己需求实现invoke 方法即可( I: B1 e! F# G* N+ Q" ^
    完成动态代理类对象上的方法调用。 但对于
    Hadoop RPC, 函数调用由客户端发出, 并在服务器端执行并返回, 因此不能像单机( B1 o. {$ \) q5 \" S" S; X
    程序那样直接在
    invoke 方法中本地调用相关函数, 它的做法是, 在invoke方法中, 将函数调用信息( 函数名, 函数参数列表等)
    0 Z5 `1 V9 O' e6 F- x" b: }, z打包成可序列化的
    WritableRpcEngine.Invocation对象, 并通过网络发送给服务器端, 服务端收到该调用信息后, 解析出和函数名,
    " Y2 }: L3 a3 ^- _函数参数列表等信息, 利用
    Java反射机制完成函数调用, 期间涉及到的类关系如下图所示。
    ' ~* y9 x4 y8 I9 @
    3-4 HadoopRPC的主要类关系图
    7 E' [! N# x" r# {2 G3 r5 Q3-5 HadoopRPC中服务器端动态代理实现类图, X2 d5 m( o0 @$ I
    2.ipc.Client3 S: `! F/ k0 H1 D9 Q
    Client主要完成的功能是发送远程过程调用信息并接收执行结果。 它涉及到的类关系如图3-6所示。 Client类对外提供了一类执
    $ E8 g+ Q2 z0 x/ w" C9 L3 V+ }行远程调用的接口, 这些接口的名称一样, 仅仅是参数列表不同, 比如其中一个的声明如下所示:
    - q; d. I7 S1 {$ l* u2 F
    public Writable call(Writable param, ConnectionIdremoteId)7 _, |2 T! E) r, `  d: {
    throws InterruptedException, IOException;
    / D& ], N. o# w4 ?3-6 Client类图
    2 g+ Z7 Y" f4 \' p
    Client内部有两个重要的内部类, 分别是CallConnection, O: @8 f: O/ |, X' J1 j
    ❑Call: 封装了一个RPC请求, 它包含5个成员变量, 分别是唯一标识id、 函数调用信息param、 函数执行返回值value、 出
    * p4 g% }: q) v9 K错或者异常信息
    error和执行完成标识符done。 由于Hadoop RPC Server采用异步方式处理客户端请求, 这使远程过程调用的发生顺
    / ~: Q+ [& V, E. h& J' C序与结果返回顺序无直接关系, 而
    Client端正是通过id识别不同的函数调用的。 当客户端向服务器端发送请求时, 只需填充id, \5 R& O4 ?0 M# w% I% J9 \( T
    param两个变量, 而剩下的3个变量( valueerrordone) 则由服务器端根据函数执行情况填充。. h7 w, G; n* |/ }8 p
    ❑ConnectionClient与每个Server之间维护一个通信连接, 与该连接相关的基本信息及操作被封装到Connection类中, 基本
    ) ~; P1 ]# N- l5 z& L1 r- O  s信息主要包括通信连接唯一标识(
    remoteId) 、 与Server端通信的Socketsocket) 、 网络输入数据流( in) 、 网络输出数据流, e4 i8 K1 Y$ p$ K. F# V* t
    out) 、 保存RPC请求的哈希表( calls) 等。 操作则包括:
    8 G6 Z# z* N+ ]$ A& p) ~! m4 }' a
    ❍addCall—将一个Call对象添加到哈希表中;$ j" m! f, c' J8 u# n. R* C4 w
    ❍sendParam—向服务器端发送RPC请求;
    , ~* s* U8 S; h3 P2 Z5 b  ?4 d
    ❍receiveResponse —从服务器端接收已经处理完成的RPC请求;
    . H$ N& W8 h8 S$ L" L% b
    ❍run—Connection是一个线程类, 它的run方法调用了receiveResponse方法, 会一直等待接收RPC返回结果。5 b& u# P* p& V5 q$ @/ b
    当调用
    call函数执行某个远程方法时, Client端需要进行( 如图3-7所示) 以下4个步骤。
    * k! J' ^- @* w) _) B+ n) T
    1) 创建一个Connection对象, 并将远程方法调用信息封装成Call对象, 放到Connection对象中的哈希表中;
    : U) \! q* m& u  m
    2) 调用Connection类中的sendRpcRequest()方法将当前Call对象发送给Server端;. H4 s8 m7 A8 Y' R& d( \+ [1 M* `
    3Server端处理完RPC请求后, 将结果通过网络返回给Client端, Client端通过receiveRpcResponse()函数获取结果;
    ; G3 y3 t) w3 W/ Z  D- W/ D! O; [2 a7 @4Client检查结果处理状态( 成功还是失败) , 并将对应Call对象从哈希表中删除。: N. R9 Q# Y+ V
    3-7 Hadoop RPC Client处理流程6 G+ E$ y$ b3 U+ G2 z
    3.ipc.Server类分析8 B+ j( m3 O6 A" x5 J0 r- o
    Hadoop采用了Master/Slave结构, 其中Master是整个系统的单点, 如NameNodeJobTracker [8] , 这是制约系统性能和可扩展
    & V7 U9 f3 I6 [! s- K性的最关键因素之一; 而
    Master通过ipc.Server接收并处理所有Slave发送的请求, 这就要求ipc.Server 将高并发和可扩展性作为设计# w2 ^9 M, x5 f- q
    目标。 为此,
    ipc.Server采用了很多提高并发处理能力的技术, 主要包括线程池、 事件驱动和Reactor设计模式等, 这些技术均采用6 R6 c4 E$ C2 H
    JDK自带的库实现, 这里重点分析它是如何利用Reactor设计模式提高整体性能的。5 H; t/ A9 I9 u& X4 u
    Reactor是并发编程中的一种基于事件驱动的设计模式, 它具有以下两个特点: 通过派发/分离I/O操作事件提高系统的并发性% p3 r0 V- U% q/ Y) g$ V
    能; 提供了粗粒度的并发控制, 使用单线程实现, 避免了复杂的同步处理。 典型的
    Reactor实现原理如图3-8所示。$ n( F; @4 H1 V* Z; |9 g$ ^. D
    3-8 Reactor模式工作原理
    - s4 S' _9 s) P/ O0 a7 L典型的
    Reactor模式中主要包括以下几个角色。
    $ p" j1 }3 R( X( B; r❑ReactorI/O事件的派发者。
    % I. ?$ Q$ T5 @1 V
    ❑Acceptor接受来自Client的连接, 建立与Client对应的Handler, 并向Reactor注册此Handler0 ]0 s2 Y% l  u( L' L$ x6 U
    ❑Handler与一个Client通信的实体, 并按一定的过程实现业务的处理。 Handler内部往往会有更进一步的层次划分, 用来抽
    / g6 i7 L- G+ V象诸如
    readdecodecomputeencodesend等过程。 在Reactor模式中, 业务逻辑被分散的I/O事件所打破, 所以Handler需要有适% I/ r1 ?- p6 d3 C  n; Y( }
    当的机制在所需的信息还不全( 读到一半) 的时候保存上下文, 并在下一次
    I/O事件到来的时候( 另一半可读) 能继续上次中断5 ~/ }2 b# m& I5 u2 \: o8 m  T
    的处理。( H0 Z5 P* e- ?
    ❑Reader/Sender为了加速处理速度, Reactor模式往往构建一个存放数据处理线程的线程池, 这样数据读出后, 立即扔到线
    2 o9 n5 S9 w" ]! q( L* j程池中等待后续处理即可。 为此,
    Reactor模式一般分离Handler中的读和写两个过程, 分别注册成单独的读事件和写事件, 并由对
    9 i6 L/ l' G7 W! g9 V; a, r1 [$ V1 S应的
    ReaderSender线程处理。3 d7 d( L$ l+ }1 G% K% M- E
    ip
    9 h4 Q) x/ m  @. v) O% {" Tc.Server
    实际上实现了一个典型的Reactor设计模式, 其整体架构与上述完全一致。 一旦读者了解典型Reactor架构便可很容易
    % L1 L) `- F- O0 }7 ?地学习
    ipc.Server的设计思路及实现。 接下来, 我们分析ipc.Server的实现细节。
    ' q' n3 ]. ]* a* {) U% l# u' F前面提到,
    ipc.Server的主要功能是接收来自客户端的RPC请求, 经过调用相应的函数获取结果后, 返回给对应的客户端。 为! d4 d8 e# l& q* u, S
    此,
    ipc.Server被划分成3个阶段: 接收请求、 处理请求和返回结果, 如图3-9所示。 各阶段实现细节如下。
    , p4 ?9 ?' L& @3 Z$ c
    3-9 Hadoop RPC Server处理流程9 n8 c# P( A; n" U; v8 [
    1) 接收请求
    ( u9 W0 g2 b+ {7 t该阶段主要任务是接收来自各个客户端的
    RPC请求, 并将它们封装成固定的格式( Call类) 放到一个共享队列( callQueue- E* N5 }* p' N" ~. \
    中, 以便进行后续处理。 该阶段内部又分为建立连接和接收请求两个子阶段, 分别由
    ListenerReader两种线程完成。
    & K" g) G' j4 Q整个
    Server只有一个Listener线程, 统一负责监听来自客户端的连接请求, 一旦有新的请求到达, 它会采用轮询的方式从线程
    9 m3 @2 U1 G* z4 K+ ^  O池中选择一个
    Reader线程进行处理, 而Reader线程可同时存在多个, 它们分别负责接收一部分客户端连接的RPC请求, 至于每个
    ( d* {  E- q( b" t, R
    Reader线程负责哪些客户端连接, 完全由Listener决定, 当前Listener只是采用了简单的轮询分配机制。: T6 y: l/ b+ T+ \- `0 O
    ListenerReader线程内部各自包含一个Selector对象, 分别用于监听SelectionKey.OP_ACCEPTSelectionKey.OP_READ事件。) W! O! @6 r( v  K
    对于
    Listener线程, 主循环的实现体是监听是否有新的连接请求到达, 并采用轮询策略选择一个Reader线程处理新连接; 对于
    - t) b  I: h1 ]/ @! j
    Reader线程, 主循环的实现体是监听( 它负责的那部分) 客户端连接中是否有新的RPC请求到达, 并将新的RPC请求封装成Call
    6 M8 j- T# W* |9 {) v象, 放到共享队列
    callQueue中。$ T  W( E9 }2 D8 L1 G/ `) ^/ D+ D2 X
    2) 处理请求( k( E5 x2 _' S3 H" N- B' D
    该阶段主要任务是从共享队列
    callQueue中获取Call对象, 执行对应的函数调用, 并将结果返回给客户端, 这全部由Handler线- _  a8 B* \3 c2 \9 o4 j4 i
    程完成。
    ; V  Z5 S2 h9 o' Q6 E' p( ]
    Server端可同时存在多个Handler线程, 它们并行从共享队列中读取Call对象, 经执行对应的函数调用后, 将尝试着直接将结果+ H+ u, M2 M  c# h
    返回给对应的客户端。 但考虑到某些函数调用返回结果很大或者网络速度过慢, 可能难以将结果一次性发送到客户端, 此时9 @! L( K0 Y: v9 t9 S& e
    Handler将尝试着将后续发送任务交给Responder线程。
    $ W+ Q7 V' |$ X4 l
    3) 返回结果' t+ Q: i  o0 }" m
    前面提到, 每个
    Handler线程执行完函数调用后, 会尝试着将执行结果返回给客户端, 但对于特殊情况, 比如函数调用返回结
    7 E+ N1 ]9 I* q( F# P9 K" i8 F; O7 ~果过大或者网络异常情况( 网速过慢) , 会将发送任务交给
    Responder线程。6 d; V0 m( ~4 k* k! z( B3 D
    Server端仅存在一个Responder线程, 它的内部包含一个Selector对象, 用于监听SelectionKey.OP_WRITE事件。 当Handler没能将8 C1 J3 b/ Y2 N! o/ X
    结果一次性发送到客户端时, 会向该
    Selector对象注册SelectionKey.OP_WRITE事件, 进而由Responder线程采用异步方式继续发送
    ; x: W) N4 n6 h2 [$ e4 G3 c2 `未发送完成的结果。3 t8 u8 N; p* X0 w0 H" B! P
    3.3.6 Hadoop RPC参数调优
    % S! c. M0 }+ ?; A- bHadoop RPC对外提供了一些可配置参数, 以便于用户根据业务需求和硬件环境对其进行调优。 主要的配置参数如下。* R% d: h* ~' R2 a
    ❑Reader线程数目。 由参数ipc.server.read.threadpool.size配置, 默认是1, 也就是说, 默认情况下, 一个RPC Server只包含一个
    * q" U7 u- v, p" Q9 l
    Reader线程。
    , {" B6 O! `& {
    ❑每个Handler线程对应的最大Call数目。 由参数ipc.server.handler.queue.size指定, 默认是100, 也就是说, 默认情况下, 每个
      Y! R" V, r# i0 s
    Handler线程对应的Call队列长度为100。 比如, 如果Handler数目为10, 则整个Call队列( 即共享队列callQueue) 最大长度为:
    & o* u$ a5 e+ \
    100×10=1000" A+ \; [5 a6 e' M, C
    ❑Handler线程数目。 Hadoop中, ResourceManagerNameNode分别是YARNHDFS两个子系统中的RPC Server, 其对应的
    0 H6 x0 P# ?0 U. y7 I, a& e% J0 u! D, u
    Handler数目分别由参数yarn.resourcemanager.resource-tracker.client.thread-countdfs.namenode.service.handler.count指定, 默认值分别为5 o4 L( [* X* u' }+ v% A  B
    5010, 当集群规模较大时, 这两个参数值会大大影响系统性能。- E8 T  C. O$ G
    ❑客户端最大重试次数。 在分布式环境下, 因网络故障或者其他原因迫使客户端重试连接是很常见的, 但尝试次数过多可
    2 _* d4 w) r" h: H$ d/ y) b能不利于对实时性要求较高的应用。 客户端最大重试次数由参数
    ipc.client.connect.max.retries指定, 默认值为10, 也就是会连续尝试% B- J! U% U( D8 ^
    10次( 每两次之间相隔1秒) 。9 n# @0 ?- w3 |; ?9 H
    3.3.7 YARN RPC实现
    7 H" G* E% v: h8 c9 u3 h% n* h当前存在非常多的开源RPC框架, 比较有名 的有Thrift [9] Protocol BuffersAvro。 同Hadoop RPC一样, 它们均由两部分组
    , O; w# l4 P8 @/ }9 M3 }2 O1 w成: 对象序列化和远程过程调用(
    Protocol Buflers官方仅提供了序列化实现, 未提供远程调用相关实现, 但三 方RPC库非常多 [10]
    9 y2 Z$ C0 N0 g- L6 M- S) 。 相比于Hadoop RPC, 它们有以下几个特点:
    6 l- G2 m6 F2 @2 E0 ~6 _9 ~  V' m
    ❑跨语言特性 。 前面提到, RPC框架实际上是客户机服务器模型的一个应用实例, 对于Hadoop RPC而言, 由于Hadoop采用2 F1 p6 n& Q+ p% Q0 p& V
    Java语言编写, 因而其RPC客户端和服务器端仅支持Java语言; 但对于更通用的RPC框架, 如Thrift或者Protocol Buffers等, 其客户
    3 G0 G! E: G2 J端和服务器端可采用任何语言编写, 如
    JavaC++Python等, 这给用户编程带来极大方便。( x3 G: w: s' k/ B& k( X7 x
    ❑引入IDL 。 开源RPC框架均提供了一套接口描述语言( Interface Description LanguageIDL) , 它提供一套通用的数据类型,0 D7 m6 f- d0 _+ B) }* p: M4 u
    并以这些数据类型来定义更为复杂的数据类型和对外服务接口。 一旦用户按照
    IDL定义的语法编写完接口文件后, 可根据实际应9 j% \7 y& M0 @
    用需要生成特定编程语言( 如
    JavaC++Python等) 的客户端和服务器端代码。
    + x4 @# C# P/ _* x
    ❑协议兼容性 。 开源RPC框架在设计上均考虑到了协议兼容性问题, 即当协议格式发生改变时, 比如某个类需要添加或者
    3 [% O, D" z: v$ G) P删除一个成员变量( 字段) 后, 旧版本代码仍然能识别新格式的数据, 也就是说, 具有向后兼容性。
    ( d: ^$ C1 Z* L2 Z5 Q( D随着
    Hadoop版本的不断演化, 研发人员发现Hadoop RPC在跨语言支持和协议兼容性两个方面存在不足, 具体表现为:; G& f' K, R( Z) U8 e! L
    ❑从长远发展看, Hadoop RPC应允许某些协议的客户端或者服务器端采用其他语言实现, 比如用户希望直接使用C/C++语言
    5 w9 e0 F, ^2 Q; f读写
    HDFS中的文件, 这就需要有C/C++语言的HDFS客户端。
    ! W# O) ]4 `* Z0 F0 V: |1 _
    ❑当前Hadoop版本较多, 而不同版本之间不能通信, 比如0.20.2版本的JobTracker不能与0.21.0版本中的TaskTracker通信, 如
    8 r# G$ d1 S4 C0 i果用户企图这样做, 会抛出
    VersionMismatch异常。
    / p6 }. ?& S0 H8 ]; p; f6 j为了解决以上几个问题,
    Hadoop YARNRPC中的序列化部分剥离开, 以便将现有的开源RPC框架集成进来。 经过改进之3 G- k* o0 n( N2 ]9 k4 f
    后,
    Hadoop RPC的类关系如图3-10所示, RPC类变成了一个工厂, 它将具体的RPC实现授权给RpcEngine实现类, 而现有的开源
    ' r1 m1 G: d& l* V" j8 @: ?' @: K- q0 x
    RPC只要实现RpcEngine接口, 便可以集成到Hadoop RPC中。 在该图中, WritableRpcEngine是采用Hadoop自带的序列化框架实现的
    + D# u+ \  h. c
    RPC, 而 AvroRpcEngine [11] ProtobufRpcEngine [12] 分别是开源RPC( 或序列化) 框架Apache AvroProtocol Buffers对应的; K/ n- K, ^8 b
    RpcEngine实现, 用户可通过配置参数rpc.engine.{protocol}以指定协议{protocol}采用的序列化方式。 需要注意的是, 当前实现, I4 W  V7 M0 G/ N
    中,
    Hadoop RPC只是采用了这些开源框架的序列化机制, 底层的函数调用机制仍采用Hadoop自带的。
    + H! o! i3 H: m- j$ p: @
    YARN提供的对外类是YarnRPC, 用户只需使用该类便可以构建一个基于Hadoop RPC且采用Protocol Buffers序列化框架的通信
      ~  U! m3 j0 P' M2 y协议。
    YarnRPC相关实现类如图3-11所示。 YarnRPC是一个抽象类, 实际的实现由参数yarn.ipc.rpc.class指定, 默认值是. W$ v5 I2 e4 h" D
    org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPCHadoopYarnProtoRPC通过RPC工厂生成器( 工厂设计模式) RpcFactoryProvider
    4 U) Q4 z: I+ S7 b9 \成客户端工厂( 由参数
    yarn.ipc.client.factory.class指定, 默认值是org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl) 和服
    : M+ _8 {! x, _' x务器工厂( 由参数
    yarn.ipc.server.factory.class指定, 默认值是org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl) , 以根1 R. n% a' x  p! D4 q  r
    据通信协议的
    Protocol Buffers定义生成客户端对象和服务器对象。
    2 {% p& Q9 L6 q$ t1 K3-10 Hadoop RPC 集成多种开源RPC 框架
    : b  S  m5 J  U$ B5 g/ W7 R4 q' P
    3-11 YarnRPC 相关类图5 \  ]; s; F8 v8 w$ j7 @+ }3 l
    ❑RpcClientFactoryPBImpl根据通信协议接口( 实际上就是一个Java interface) 及Protocol Buffers定义构造RPC客户端句柄, 但" }0 y& Y0 D7 h1 L
    它对通信协议的存放位置和类名命有一定要求。 假设通信协议接口
    Xxx所在Java包名为XxxPackage, 则客户端实现代码必须位于" G) e/ S/ {' N% \( m
    JavaXxxPackage.impl.pb.client中( 在接口包名后面增加".impl.pb.client") , 且实现类名为PBClientImplXxx( 在接口名前面增加前
    - R9 H4 l1 q5 q) S1 }
    "PBClientImpl") 。
    8 u, j( Y' f% }. f7 B) Y
    ❑RpcServerFactoryPBImpl根据通信协议接口( 实际上就是一个Java interface) 及Protocol Buffers定义构造RPC服务器句柄3 |  e  r1 d$ I1 x7 X/ _' o! }* Q
    (具体会调用前面节介绍的RPC.Server类) , 但它对通信协议的存放位置和类命名有一定要求。 假设通信协议接口Xxx所在Java
    9 E+ u& P7 Q& c! ?& w$ H名为
    XxxPackage, 则客户端实现代码必须位于JavaXxxPackage.impl.pb.server中(在接口包名后面增加".impl.pb.server") , 且实现! Q3 |; ?! g* u) t$ ~2 v- G
    类名为
    PBServiceImplXxx(在接口名前面增加前缀"PBServiceImpl") 。: r" M8 o% V, I
    Hadoop YARN已将Protocol Buffers作为默认 的序列化机制 [13] (而不是Hadoop自带的Writable) , 这带来的好处主要表现在以6 }9 b, ~, |- K- {
    下几个方面:
    , l, a- Z5 b4 {9 r
    ❑继承了Protocol Buffers的优势 Protocol Buffers已在实践中证明了其高效性、 可扩展性、 紧凑性和跨语言特性。 首先, 它允1 [% S4 Q. W( A; Z
    许在保持向后兼容性的前提下修改协议, 比如为某个定义好的数据格式添加一个新的字段; 其次, 它支持多种语言, 进而方便用
    4 R7 v' R' s* Y4 w- S户为某些服务(比如
    HDFSNameNode) 编写 非Java客户端 [14] ; 此外, 实验表明Protocol BuffersHadoop 自带的Writable在性能8 t9 p  i5 X* \6 ^
    方面有很大提升。: j+ L! L4 R% T, d5 h9 V1 }6 a) a
    ❑支持升级回滚 Hadoop 2.0已经将NameNode HA方案合并进来, 在该方案中, Name-Node分为ActiveStandby两种角色," I& j) Y. B. I2 u$ d0 T. T% p- k1 @
    其中,
    Active NameNode在当前对外提供服务, 而Standby NameNode则是能够在Active NameNode出现故障时接替它。 采用Protocol" l- u( T; d2 r& _: Y
    Buffers
    序列化机制后, 管理员能够在不停止NameNode对外服务的前提下, 通过主备NameNode之间的切换, 依次对主备3 j) c3 _6 t: w4 @& |& d
    NameNode进行在线升级(不用考虑版本和协议兼容性等问题) 。
    5 L9 O: G* F/ Z
    3.3.8 YARN RPC应用实例
    7 {0 b' \0 Z+ @2 h% j/ `为了进一步说明YARN RPC的使用方法, 本小节给出一个具体的应用实例。
    , J( b& e# E. T) x& U& y7 x" D
    YARN中, ResourceManagerNodeManager之间的通信协议是ResourceTracker, 其中NodeManager是该协议的客户9 y' U" a! T9 v* I8 a, g& K
    端,
    ResourceManager是服务端, NodeManager通过该协议中定义的两个RPC函数( registerNodeManagernodeHeartbeat) 向
    # |. T, s. l. s) W% F
    ResourceManager注册和周期性发送心跳信息。 ResourceManager(服务器端) 中的相关代码如下:: L- g  N; w$ O) {7 P
    // ResourceTrackerService实现了ResourceTracker通信接口, 并启动RPC Server' n5 Y, r+ B" m, u! _$ F7 l2 k1 U
    public class ResourceTrackerService extends AbstractService implements
    % l( R0 P0 ]8 VResourceTracker {0 ?) Y5 A! D0 A6 m$ z3 }
    private Server server;/ J# a5 h4 c8 [
    ...
    " v- {* ], P- u' c5 _* Nprotected void serviceStart() throws Exception {
    1 Y. N  V2 m9 A/ \8 A8 nsuper.serviceStart();
    : F7 o2 A: n. q- @, _: ^' iConfiguration conf = getConfig();, H& f8 N6 V; k/ Z) x0 T
    YarnRPC rpc = YarnRPC.create(conf); //使用YarnRPC- u/ g4 X; ^/ Q5 T3 ]
    this.server = rpc.getServer(ResourceTracker.class, this, resourceTrackerAddress,  b- \4 d) r$ S" Q6 X8 D
    conf, null, conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT,: k1 L: D, Q- q) o5 ]% e% V
    YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT));
    3 e2 C8 u/ H' C+ z9 O% T2 X' pthis.server.start();& y* e9 D$ @6 X" }8 F, F2 j: p( m1 t. q
    }.
    + o: L5 O+ H4 e; Y; K" p: d..+ |$ [- o& f: O/ R3 k/ j/ C4 t
    @Override( T0 Y% R& ?1 g
    public RegisterNodeManagerResponse registerNodeManager(
    ' e* `7 Y7 ?! _& \RegisterNodeManagerRequest request) throws YarnException,
    7 r6 q; }! Y+ k& wIOException {3 L- t' _+ Z0 [
    //具体实现
    ) M0 S  t- E6 J7 Q8 t( _8 C+ B
    }@' w6 X/ ]4 C7 K( w
    Override3 g. m5 W3 b. J/ H6 n% z0 R5 y0 L
    public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)1 G2 R0 w" c9 l6 Y4 v
    throws YarnException, IOException {
    " v4 B7 w3 F8 J$ ^8 e$ \& p. C//具体实现# A4 g2 G6 F$ @" y5 `
    }
    ' q. g2 f' E: T5 L: j: Z0 Q* D}5 z( m7 Z, Z: g$ B1 A/ r
    NodeManager(客户端) 中的相关代码如下。3 d1 H/ R4 {! [. b' P
    // 该函数是从YARN源代码中简单修改而来的
    4 Z* `8 N; M+ z1 k3 o8 g. o% U1 A
    protected ResourceTracker getRMClient() throws IOException {+ z% Z3 E" ~! R$ w/ v
    Configuration conf = getConfig();
    ' R2 ]1 K4 o' G& h, ~InetSocketAddress rmAddress = getRMAddress(conf, protocol);
    / z. y6 N7 m4 `% f6 L/ v% NRetryPolicy retryPolicy = createRetryPolicy(conf);* S& L6 I, A8 t% q
    ResourceTracker proxy = RMProxy.<T>getProxy(conf, ResourceTracker.class, rmAddress);& g7 b/ V& o/ W( q' Z
    LOG.info("Connecting to ResourceManager at " + rmAddress);
    3 i, V6 H& l: V* i, \8 mreturn (ResourceTracker) RetryProxy.create(protocol, proxy, retryPolicy);6 Q& y4 q5 y- L5 V8 j4 `" W( _1 h5 w
    }., k0 O7 V3 s5 Y
    ..
    ( P+ Z4 A* ?2 G6 x9 e0 h) dthis.resourceTracker = getRMClient();9 K0 t  s! A2 ?" t3 Z$ {* }0 u0 {
    ...
    5 w- ]8 o9 M* q4 n- fRegisterNodeManagerResponse regNMResponse = resourceTracker.registerNodeManager(request);
    " P& v1 _! l' v8 [...
    ( \; a2 w* A1 D$ {6 h5 O* Kresponse = resourceTracker.nodeHeartbeat(request);
    1 G, \- r/ P( j) d5 f为了能够让以上代码正常工作, YARN按照以下流程实现各种功能。" u+ f, F/ I$ t% m/ B
    步骤1 定义通信协议接口( Java Interface) 。 定义通信协议接口ResourceTracker, 它包含registerNodeManagernodeHeartbeat
    1 J1 a* ~& K# R8 L" F( {
    两个函数, 且每个函数包含一个参数和一个返回值, 具体如下:& `3 @. E) e" T! I  I
    public interface ResourceTracker {
    0 m" I! @8 l8 M1 qpublic RegisterNodeManagerResponse registerNodeManager(4 d: _, v9 i3 @" H: X4 [4 q
    RegisterNodeManagerRequest request) throws YarnException, IOException;+ p! X- \4 _3 t3 K7 K% n
    public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)5 G8 l9 b9 U9 g7 W# s. M
    throws YarnException, IOException;! D# a+ w/ B5 ^% Q; j
    }6 C4 O3 {' w, e5 ]6 A
    步骤2 为通信协议ResourceTracker提供Protocol Buffers定义和Java实现。 前面提到, Protocol Buffers仅提供了序列化框架, 但2 d+ H' M* m" o( W
    未提供
    RPC实现, 因此RPC部分需要由用户自己实现, 而YARN则让ResourceTrackerService类实现了ResourceTracker协议, 它的$ W  J$ L7 J% M9 t* B
    Protocol Buffers定义( 具体见文件ResourceTracker.proto) 如下:
    : n' E6 y; t) o+ P5 u
    option java_package = "org.apache.hadoop.yarn.proto";
    % i( G3 M6 X7 D! zoption java_outer_classname = "ResourceTracker";( k9 J! l# Q0 I
    option java_generic_services = true;0 `( s* D9 q; U6 ^  |
    option java_generate_equals_and_hash = true;; H+ J8 O4 K6 E! j* p  C8 V
    import "yarn_server_common_service_protos.proto";
    ( }' B8 z1 E7 uservice ResourceTrackerService {
    $ {: c  z4 s# c2 q/ r# Y  {rpc registerNodeManager(RegisterNodeManagerRequestProto) returns (RegisterNodeManagerResponseProto);
    0 a6 u; N, l) ^rpc nodeHeartbeat(NodeHeartbeatRequestProto) returns (NodeHeartbeatResponseProto);. d2 N4 R6 y: k. B2 T
    }
    - v3 C& S2 T0 N& n$ d- U2 \  _ResourceTrackerRPC函数实现是由ResourceManager中的ResourceTrackerService完成的。3 Y) a6 z# t! H3 B8 x2 g
    步骤
    3 RPC函数的参数和返回值提供Protocol Buffers定义。 YARN需要保证每个RPC函数的参数和返回值是采用Protocol
    4 Y* {/ g" Y/ b( r: qBuffers
    定义的, 因此ResourceTracker协议中RegisterNodeManagerRequestRegisterNodeManagerResponseNodeHeartbeatRequest' J, F3 B9 p7 T( @: _6 k6 O, H
    NodeHeartbeatResponse四个参数或者返回值需要使用Protocol Buffers定义, 具体如下( 见yarn_server_common_service_protos.proto
    , k5 o& o( p! o3 K件) :8 m) q; j2 a2 v' z- }/ p
    import "yarn_protos.proto";/ u; H6 F, R1 z+ r. @3 R8 f
    import "yarn_server_common_protos.proto";
    $ [9 E7 ~- M+ r6 @# |! Ymessage RegisterNodeManagerRequestProto {- r1 @; Z; Y$ X/ k
    optional NodeIdProto node_id = 1;( `) r6 ?/ O1 H
    optional int32 http_port = 3;0 t5 @" {* c6 @! Q
    optional ResourceProto resource = 4;
    / L3 m9 s! {: [} m
    0 X7 o9 Y$ J' `& vessage RegisterNodeManagerResponseProto {
    : ~/ S: B, z/ t) L9 y8 z* Eoptional MasterKeyProto container_token_master_key = 1;- B# \" @9 E/ l+ c
    optional MasterKeyProto nm_token_master_key = 2;
    / g% w! G  @; w4 Yoptional NodeActionProto nodeAction = 3;
    9 z! i' h- i, q* d3 {optional int64 rm_identifier = 4;
    ) e" A# T& p) y. n+ T' x! g2 M( Coptional string diagnostics_message = 5;
    / T, a: M: a) n+ O- m( |}.
    + K' m; M/ l5 W  Z( h6 |. e9 [.. //其他几个参数和返回值的定义
    ' U  ]  Y0 m1 H% z8 F3 y% t4 k* m
    步骤4 RPC函数的参数和返回值提供Java定义和封装。 YARN采用了Protocol Buffers作为参数和返回值的序列化框架, 且以
    ) X; w, ?: R4 r+ U" S; X( s' m原生态
    .proto文件的方式给出了定义, 而具体的Java代码生成需在代码编写之后完成。 基于以上考虑, 为了更容易使用Protocol
    ) c, j  n7 Z1 N. R+ ]Buffers
    生成的( Java语言) 参数和返回值定义, YARN RPC为每个RPC函数的参数和返回值提供Java定义和封装, 以参数
    ( O$ ^; m$ F2 U+ ?' u5 Y
    RegisterNodeManagerRequest为例进行说明。, t' r3 v1 Q$ m2 U: Z
    Java接口定义如下( 见Javaorg.apache.hadoop.yarn.server.api.protocolrecords) :& E2 M; \% `% n, e
    public interface RegisterNodeManagerRequest {; p, z/ w# o) m
    NodeId getNodeId();
    : ~' q  a4 }4 N" z5 k: \* Fint getHttpPort();: g; O4 M) X, v- m: e5 \
    Resource getResource();
    3 D8 _: o3 n+ S4 F: c+ B9 evoid setNodeId(NodeId nodeId);. p, f9 l1 W7 {3 @5 l  t! C* v1 l; e
    void setHttpPort(int port);: g% u7 t6 m1 E/ J0 r  D
    void setResource(Resource resource);
      B0 p- }6 M6 \( A}
    ' b2 L4 _/ L6 Y& K5 IJava封装如下( 见Javaorg.apache.hadoop.yarn.server.api.protocolrecords.impl.pb) :
    7 G0 D9 i4 [6 {$ I( n( U& |2 h$ p
    public class RegisterNodeManagerRequestPBImpl extends
    / z1 R4 p! l8 L/ T) xProtoBase<RegisterNodeManagerRequestProto> implements RegisterNodeManagerRequest {
    4 c) o" ~; J4 t  T% ^% eRegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance();$ }0 H* F, L- N8 z4 G
    RegisterNodeManagerRequestProto.Builder builder = null;- f2 _' U3 S3 l- H2 X
    private NodeId nodeId = null;0 j# e' D' Y, Y& T+ m  E) v
    ...6 h' t* s% `0 k
    @Override" O/ C' S/ _2 x' Y
    public NodeId getNodeId() {
    # K4 O7 c* S/ U' ~7 i! jRegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
    - a! L/ `- {9 B% Z  o$ c, A: jif (this.nodeId != null) {
      P/ O) h; y/ E0 [return this.nodeId;. H/ _$ q. Z# w3 T. I' f* ^2 D
    }i
    % k8 ?* Z% k" g5 @4 `/ f* Lf (!p.hasNodeId()) {9 z, \. v2 N* L/ {' d. }% \1 n
    return null;6 F9 h0 ~* d9 ^( ~
    } t) J' T2 s# l' N
    his.nodeId = convertFromProtoFormat(p.getNodeId());4 q* y6 m3 ^" G
    return this.nodeId;
    1 z0 @& k4 M. W1 K% ?2 r} @
    . Q" {% f) D: _( F( u8 Q( E/ FOverride$ I  r2 `* j9 Z. Z0 ~0 o( v
    public void setNodeId(NodeId nodeId) {
    . J5 E% N, O! c. Q+ ]' hmaybeInitBuilder();* n( [4 f6 X% N+ b5 P) u. A; g
    if (nodeId == null); Q0 ~( _6 V* ^  x# q. H2 W) `! X
    builder.clearNodeId();
    + [' V# z% f1 ^) L, K4 \this.nodeId = nodeId;# F# v9 D2 J$ ^
    } .$ l. M( L$ f1 f6 d: {, n
    ..
    ( V# e. T/ `! w1 c, ?' H: H7 _}
    3 O. q" r- |( j6 X9 R4 S: E6 ?步骤5 为通信协议提供客户端和服务器端实现。 客户端代码放在org.apache.hadoop.yarn.server.api.impl.pb.client包中, 且类名为& Z; C# e: F: X) }. W
    ResourceTrackerPBClientImpl, 实现如下:; N; E1 W% e2 \" e8 |+ Q
    public class ResourceTrackerPBClientImpl implements ResourceTracker, Closeable {4 o8 F! d6 s' I* T: E+ n8 |" y
    private ResourceTrackerPB proxy;
    - n- E% y5 o& ~# g0 t, W% a3 Spublic ResourceTrackerPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException {8 N5 L" p% `) G
    RPC.setProtocolEngine(conf, ResourceTrackerPB.class, ProtobufRpcEngine.class);
    3 E& C" g6 o: {" k2 `7 o3 bproxy = (ResourceTrackerPB)RPC.getProxy(/ g6 I+ y; G9 B! P3 H! Q" [. c
    ResourceTrackerPB.class, clientVersion, addr, conf);
    - Z7 |8 z* I: Z8 q; |} @3 J5 l* H" g* `  W% K, k
    Override, N; W( ?! }( m9 W. k9 a
    public RegisterNodeManagerResponse registerNodeManager(
    ) j, q: P$ Q/ T$ V" n/ A4 b0 f- IRegisterNodeManagerRequest request) throws YarnException,
    2 N, ^' b* n$ p, |0 cIOException {
    % L8 U$ y7 z  i! x5 LRegisterNodeManagerRequestProto requestProto = ((RegisterNodeManagerRequestPBImpl)request).getProto();
    : W, W; Z1 h4 itry {
    - l: ]: R& N5 d: x$ nreturn new RegisterNodeManagerResponsePBImpl(proxy.registerNodeManager (null, requestProto));
    , Y4 A$ K: m% t3 {! ^$ K2 h} catch (ServiceException e) {
    1 l0 D2 P1 t/ O0 wRPCUtil.unwrapAndThrowException(e);# y9 ^' z$ s- G( ?6 s5 E
    return null;) _7 X6 Z# T. i$ T, m
    }
    % r: _: ~. K; X/ P$ |4 F* _} .
    9 p! ~7 f( N. i2 }..
    2 `8 R7 a; j6 p# G6 ~" F$ v}
    ; w( u7 }9 I* D( P  W# m服务端代码放在org.apache.hadoop.yarn.server.api.impl.pb.server包中, 且类名为Resource-TrackerPBServerImpl, 实现如下:6 {7 T( k5 V  X" g( g: [
    public class ResourceTrackerPBServiceImpl implements ResourceTrackerPB {4 x/ z  U  R/ Q  n6 x# J
    private ResourceTracker real;
    1 G$ b' j) ?% m1 b/ e* r2 jpublic ResourceTrackerPBServiceImpl(ResourceTracker impl) {
    1 w2 @7 M0 f+ T$ \1 w( a# [/ xthis.real = impl;
    4 A# u  g! j) S. o$ E- M  p; Z. ~" h} @$ h7 N5 S7 w6 O- X% i
    Override
    " R: T1 y9 O, }/ u3 r& g. {6 ?7 C$ Vpublic RegisterNodeManagerResponseProto registerNodeManager(2 O- F* S) r2 h) }- o3 D1 z
    RpcController controller, RegisterNodeManagerRequestProto proto)
    7 H) o/ a# M  z$ \' A5 \8 hthrows ServiceException {9 Y" c. h3 t& E, {, l- x/ g! q
    RegisterNodeManagerRequestPBImpl request = new RegisterNodeManagerRequestPBImpl(proto);
    , |6 V) U* ]) i0 m, utry {
    + o1 K2 a; R" l9 t7 D+ KRegisterNodeManagerResponse response = real.registerNodeManager(request);% S& T9 t7 B5 K  ^2 r
    return ((RegisterNodeManagerResponsePBImpl)response).getProto();5 @% Q6 k! o) j4 S% c+ X/ Y
    } catch (YarnException e) {
    7 d# j, @! W1 {# sthrow new ServiceException(e);
    * k/ q; [9 z. j" ^} catch (IOException e) {) Y  K0 M# g) O% v5 j8 c! S" N
    throw new ServiceException(e);
    + r5 X* q& `% V- d- v' h( ^}
    " b7 D7 e( [& y3 _/ K  S5 G  {! ^} .+ p3 [, j" L  L" J9 U
    ..1 F% p' N  ~; B
    }. O' e( f- n6 }: s  X4 e. N
    总结上面几个步骤, 为了实现基于Protocol Buffers序列化框架的YARN RPC通信协议ResourceTrackerYARN实现了一系列& @2 O+ E0 z* o
    Java接口定义和Protocol Buffers封装, 具体如图3-12所示( 以服务器端实现为例) 。( ]& ?: |7 f: K- G5 J; \
    3-12 YARN RPC中的Protocol Buffers封装
    0 Y0 ?  d0 X1 k: U! J  ~
    [6] 参见网址http://en.wikipedia.org/wiki/Remote_procedure_call
    8 U# g4 l. q& ~7 O0 p6 M) P* O
    [7] Doug CuttingHadoop最初设计时就是这样描述Hadoop RPC设计动机的。
    # ?9 z* q  L5 V/ B( ?/ f
    [8] HDFS的单点故障已经在Hadoop 2.0中得到了解决, MRv1中的JobTracker的单点故障在CDH4中也得到了解决。
    - J: g: H4 T2 t6 y+ C& X( ^
    [9] 参见网址http://thrift.apache.org/
    # n2 s4 z& j7 n' k3 I1 q8 Z
    [10] 参见网址: http://code.google.com/p/protobuf/wiki/ThirdParty/AddOns
    . V: p" |$ V0 v( J
    [11] AvroRpcEngineHadoop 0.21.0版本开始出现。
    ; O3 I6 ?( S/ a. ^
    [12] ProtobufRpcEngineHadoop 2.0-apha版本开始出现。9 ]4 C+ r  \; H, A" {2 N2 @' k
    [13] 参见网址https://issues.apache.org/jira/browse/HADOOP-7347  S/ [3 s8 Y2 V' P
    [14] Hadoop 2.0中的RFC框架是采用Java编写的, 尚不能像ThriftAvro那样支持多语言编程, 但引入Protocol Buffers序列化框架则  r* g$ ~3 `$ e9 c6 P& L
    使其向前迈进了一步。
      ! M, F# e4 V7 E

    + D& n1 q6 ^; X+ x, C: Z# F0 P- x0 X4 \' W1 C& m1 K# S* e% u) ]
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|Archiver|手机版|小黑屋|Java自学网

    GMT+8, 2024-12-22 09:39 , Processed in 0.739089 second(s), 33 queries .

    Powered by Javazx

    Copyright © 2012-2022, Javazx Cloud.

    快速回复 返回顶部 返回列表