java自学网VIP

Java自学网

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 3082|回复: 0

《深入解析YARN架构设计与实现原理》第3章 YARN基础库【3.3】part2

[复制链接]
  • TA的每日心情
    开心
    2021-5-25 00:00
  • 签到天数: 1917 天

    [LV.Master]出神入化

    2062

    主题

    3720

    帖子

    6万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    66592

    宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2017-4-17 13:56:45 | 显示全部楼层 |阅读模式
    3.3.5 Hadoop RPC类详解
    0 p0 n& Y! P6 ?4 h! v, IHadoop RPC主要由三个大类组成, 即RPCClientServer, 分别对应对外编程接口、 客户端实现和服务器实现。
    7 S& E3 v( J# ~% U
    1.ipc.RPC类分析
    2 b7 |4 J' [2 |8 n% N/ z' E  c
    RPC类实际上是对底层客户机服务器网络模型的封装, 以便为程序员提供一套更方便简洁的编程接口。1 Z6 Q. K( T( p% K( e
    如图
    3-4所示, RPC类定义了一系列构建和销毁RPC客户端的方法, 构建方法分为getProxywaitForProxy两类, 销毁方只有一
    ! N8 V9 ]0 @' r- E, n' S# U个, 即为
    stopProxyRPC服务器的构建则由静态内部类RPC.Builder, 该类提供了一些列setXxx方法( Xxx为某个参数名称) 供用户" |" r! }- u4 _4 ^6 Z, x
    设置一些基本的参数, 比如
    RPC协议、 RPC协议实现对象、 服务器绑定地址、 端口号等, 一旦设置完成这些参数后, 可通过调用
    9 S7 t. K& ]" C0 J2 n* o
    RPC.Builder.build()完成一个服务器对象的构建, 之后直接调用Server.start()方法便可以启动该服务器。. q6 K" d. y) ^) X8 t+ m4 z
    Hadoop 1.x中的RPC仅支持基于Writable序列化方式不同, Hadoop 2.x允许用户使用其他序列化框架, 比如Protocol Buffers$ s6 t- e/ Z/ _
    等, 目前提供了WritableWritableRpcEngine) 和Protocol BuffersProtobufRpcEngine) 两种, 默认实现是Writable方式, 用户可通过
    1 {- A( [) S/ s: i" P调用
    RPC.setProtocolEngine(…)修改采用的序列化方式。
    + y( D; {& ?6 v9 u  r下面以采用
    Writable序列化为例( 采用Protocol Buffers的过程类似) , 介绍Hadoop RPC的远程过程调用流程。 Hadoop RPC使用5 s8 b& H! N8 v. N3 v! l8 p
    Java动态代理完成对远程方法的调用: 用户只需实现java.lang.reflect.InvocationHandler接口, 并按照自己需求实现invoke 方法即可
    8 c) C+ @; b) `8 e完成动态代理类对象上的方法调用。 但对于
    Hadoop RPC, 函数调用由客户端发出, 并在服务器端执行并返回, 因此不能像单机* r8 \, u9 R- D- t' M! l
    程序那样直接在
    invoke 方法中本地调用相关函数, 它的做法是, 在invoke方法中, 将函数调用信息( 函数名, 函数参数列表等)9 p! Y+ F; t, L' e3 I
    打包成可序列化的
    WritableRpcEngine.Invocation对象, 并通过网络发送给服务器端, 服务端收到该调用信息后, 解析出和函数名,
    1 {- {4 ^* E! P6 ?函数参数列表等信息, 利用
    Java反射机制完成函数调用, 期间涉及到的类关系如下图所示。
    1 i" R; j" t  L, h
    3-4 HadoopRPC的主要类关系图2 p- A1 \% m" E$ s4 `; h2 j1 h' H
    3-5 HadoopRPC中服务器端动态代理实现类图
    / B7 ]2 v/ ~8 f; t
    2.ipc.Client
    8 Y, z( ?. ^6 E% M4 I/ \4 P- p( X4 c; D
    Client主要完成的功能是发送远程过程调用信息并接收执行结果。 它涉及到的类关系如图3-6所示。 Client类对外提供了一类执
    & ^  l& \+ h4 ?7 x行远程调用的接口, 这些接口的名称一样, 仅仅是参数列表不同, 比如其中一个的声明如下所示:7 [* h' _) s: N8 T0 k5 r
    public Writable call(Writable param, ConnectionIdremoteId)
    . k: H  |4 F+ X1 Ythrows InterruptedException, IOException;
    : F' _9 P4 R# L' N8 l' S1 K- w1 f3-6 Client类图
    3 v1 t5 T* {8 z# x+ P  {/ Z
    Client内部有两个重要的内部类, 分别是CallConnection6 `& B# v6 k# Q" l# D/ x# }
    ❑Call: 封装了一个RPC请求, 它包含5个成员变量, 分别是唯一标识id、 函数调用信息param、 函数执行返回值value、 出8 b7 j* F( @" x. H
    错或者异常信息
    error和执行完成标识符done。 由于Hadoop RPC Server采用异步方式处理客户端请求, 这使远程过程调用的发生顺/ \, B/ G; F- n& o
    序与结果返回顺序无直接关系, 而
    Client端正是通过id识别不同的函数调用的。 当客户端向服务器端发送请求时, 只需填充id* Q% T' d, k/ `) x4 {1 j2 n8 [* @
    param两个变量, 而剩下的3个变量( valueerrordone) 则由服务器端根据函数执行情况填充。" D+ h; j* U: C1 {' }
    ❑ConnectionClient与每个Server之间维护一个通信连接, 与该连接相关的基本信息及操作被封装到Connection类中, 基本
    ' L3 [1 f' s. y! s2 \3 M/ g信息主要包括通信连接唯一标识(
    remoteId) 、 与Server端通信的Socketsocket) 、 网络输入数据流( in) 、 网络输出数据流
    4 ^* U& f& {; ~+ s' B1 L6 I
    out) 、 保存RPC请求的哈希表( calls) 等。 操作则包括:
    ; n# g; W: ^% v4 q% C/ ]
    ❍addCall—将一个Call对象添加到哈希表中;
    4 J) v  @2 X9 ^6 W9 L
    ❍sendParam—向服务器端发送RPC请求;3 w3 r: E" I. [& G5 P: w) d
    ❍receiveResponse —从服务器端接收已经处理完成的RPC请求;
    9 `" z8 r) @2 C3 K& g1 K
    ❍run—Connection是一个线程类, 它的run方法调用了receiveResponse方法, 会一直等待接收RPC返回结果。
    7 j" I. h2 e, i; n8 H# S当调用
    call函数执行某个远程方法时, Client端需要进行( 如图3-7所示) 以下4个步骤。
    4 u  e' s% |0 A
    1) 创建一个Connection对象, 并将远程方法调用信息封装成Call对象, 放到Connection对象中的哈希表中;1 G0 W" K( l- J) c( ]
    2) 调用Connection类中的sendRpcRequest()方法将当前Call对象发送给Server端;
    2 G4 N$ v! V/ }4 @( v' A: s
    3Server端处理完RPC请求后, 将结果通过网络返回给Client端, Client端通过receiveRpcResponse()函数获取结果;
    $ R& @- j3 f% q9 U' [  V% B4Client检查结果处理状态( 成功还是失败) , 并将对应Call对象从哈希表中删除。: J4 z, X4 H9 S5 I7 n) |
    3-7 Hadoop RPC Client处理流程5 O$ q6 C/ l4 G1 A
    3.ipc.Server类分析$ M" ^) T" `# T0 d! t' R. M) v
    Hadoop采用了Master/Slave结构, 其中Master是整个系统的单点, 如NameNodeJobTracker [8] , 这是制约系统性能和可扩展3 P3 N0 s2 t) \. c
    性的最关键因素之一; 而
    Master通过ipc.Server接收并处理所有Slave发送的请求, 这就要求ipc.Server 将高并发和可扩展性作为设计8 w6 x3 g7 A+ x, k. i, W
    目标。 为此,
    ipc.Server采用了很多提高并发处理能力的技术, 主要包括线程池、 事件驱动和Reactor设计模式等, 这些技术均采用
    + k% i/ U* ]% z" q
    JDK自带的库实现, 这里重点分析它是如何利用Reactor设计模式提高整体性能的。2 D: o5 _0 V( s5 E. P
    Reactor是并发编程中的一种基于事件驱动的设计模式, 它具有以下两个特点: 通过派发/分离I/O操作事件提高系统的并发性2 a& q7 ?& _) X* Q2 ]# p$ g  O
    能; 提供了粗粒度的并发控制, 使用单线程实现, 避免了复杂的同步处理。 典型的
    Reactor实现原理如图3-8所示。) _: \; z  \* }5 s( M! g+ d8 W2 I
    3-8 Reactor模式工作原理3 t. O1 ]! _3 Z1 I2 I
    典型的
    Reactor模式中主要包括以下几个角色。
    2 x/ K* o6 s$ D; T" {7 [❑ReactorI/O事件的派发者。2 `/ R1 e, y4 t: l* X  Z$ q, H) k/ s
    ❑Acceptor接受来自Client的连接, 建立与Client对应的Handler, 并向Reactor注册此Handler6 m; _" w1 r6 a6 ]( i# t! d: r& x
    ❑Handler与一个Client通信的实体, 并按一定的过程实现业务的处理。 Handler内部往往会有更进一步的层次划分, 用来抽
    ! J6 k. A5 j  A7 w) d象诸如
    readdecodecomputeencodesend等过程。 在Reactor模式中, 业务逻辑被分散的I/O事件所打破, 所以Handler需要有适
    . o6 l; S; S( ]' q8 @/ m  j当的机制在所需的信息还不全( 读到一半) 的时候保存上下文, 并在下一次
    I/O事件到来的时候( 另一半可读) 能继续上次中断' V: z- ?- T+ o- b- x! H1 @
    的处理。
    8 [% g6 A8 A8 Z5 P. G
    ❑Reader/Sender为了加速处理速度, Reactor模式往往构建一个存放数据处理线程的线程池, 这样数据读出后, 立即扔到线4 c  w) m+ j, v& w+ {6 X4 Y
    程池中等待后续处理即可。 为此,
    Reactor模式一般分离Handler中的读和写两个过程, 分别注册成单独的读事件和写事件, 并由对( s5 k* @% u' b7 }  K) ]1 s
    应的
    ReaderSender线程处理。  U5 z1 f0 G2 A, x) r
    ip( o2 F2 |9 `- H- l; \
    c.Server
    实际上实现了一个典型的Reactor设计模式, 其整体架构与上述完全一致。 一旦读者了解典型Reactor架构便可很容易5 y5 I, p0 o/ q' x1 V5 t. _
    地学习
    ipc.Server的设计思路及实现。 接下来, 我们分析ipc.Server的实现细节。
    2 h) ?7 H7 m& f+ r8 F前面提到,
    ipc.Server的主要功能是接收来自客户端的RPC请求, 经过调用相应的函数获取结果后, 返回给对应的客户端。 为
    5 w8 k1 \- o$ R; r+ D此,
    ipc.Server被划分成3个阶段: 接收请求、 处理请求和返回结果, 如图3-9所示。 各阶段实现细节如下。
    2 W6 t/ _+ U( Y  r" a6 h
    3-9 Hadoop RPC Server处理流程5 e5 M* t; ^- c
    1) 接收请求
    5 T2 x# c! y8 O! A该阶段主要任务是接收来自各个客户端的
    RPC请求, 并将它们封装成固定的格式( Call类) 放到一个共享队列( callQueue
    5 S: ^) k) {4 h1 P& s: B中, 以便进行后续处理。 该阶段内部又分为建立连接和接收请求两个子阶段, 分别由
    ListenerReader两种线程完成。/ {2 b3 O  U" o  n! |0 r4 \1 q
    整个
    Server只有一个Listener线程, 统一负责监听来自客户端的连接请求, 一旦有新的请求到达, 它会采用轮询的方式从线程
    # p# F8 L* y8 _0 [7 V% C池中选择一个
    Reader线程进行处理, 而Reader线程可同时存在多个, 它们分别负责接收一部分客户端连接的RPC请求, 至于每个
    ) h7 b: {6 Z, Q- l. o: |! L& x/ O1 i
    Reader线程负责哪些客户端连接, 完全由Listener决定, 当前Listener只是采用了简单的轮询分配机制。; @2 F9 \( J6 ~2 X7 t
    ListenerReader线程内部各自包含一个Selector对象, 分别用于监听SelectionKey.OP_ACCEPTSelectionKey.OP_READ事件。. n5 G3 Y, q! z, G9 |
    对于
    Listener线程, 主循环的实现体是监听是否有新的连接请求到达, 并采用轮询策略选择一个Reader线程处理新连接; 对于' d, P0 H9 E+ A, M# W1 u4 ?% {0 f6 M6 z
    Reader线程, 主循环的实现体是监听( 它负责的那部分) 客户端连接中是否有新的RPC请求到达, 并将新的RPC请求封装成Call
    1 G4 F8 q5 E. o5 V3 V象, 放到共享队列
    callQueue中。
    3 `8 U6 X; M" l% o
    2) 处理请求# \$ e/ a+ A: Z. S
    该阶段主要任务是从共享队列
    callQueue中获取Call对象, 执行对应的函数调用, 并将结果返回给客户端, 这全部由Handler线
    : `+ b7 V, H6 h2 ^程完成。% N# y2 G3 Q  M; N0 \
    Server端可同时存在多个Handler线程, 它们并行从共享队列中读取Call对象, 经执行对应的函数调用后, 将尝试着直接将结果2 b: f' o/ A" r# c8 ^; A
    返回给对应的客户端。 但考虑到某些函数调用返回结果很大或者网络速度过慢, 可能难以将结果一次性发送到客户端, 此时
    ; j1 Y- j+ w6 [% m
    Handler将尝试着将后续发送任务交给Responder线程。
    1 p* S( E9 Q4 j. y, p4 l- ]
    3) 返回结果
    5 x; U  Y5 A$ ~前面提到, 每个
    Handler线程执行完函数调用后, 会尝试着将执行结果返回给客户端, 但对于特殊情况, 比如函数调用返回结  F, ?9 o3 {* s$ I2 ?- d( ^
    果过大或者网络异常情况( 网速过慢) , 会将发送任务交给
    Responder线程。% @) F7 \5 @" t9 [' ?) X: F' V$ {& O
    Server端仅存在一个Responder线程, 它的内部包含一个Selector对象, 用于监听SelectionKey.OP_WRITE事件。 当Handler没能将
    $ r7 l% o/ O$ T) a/ r0 [: p结果一次性发送到客户端时, 会向该
    Selector对象注册SelectionKey.OP_WRITE事件, 进而由Responder线程采用异步方式继续发送, C7 g  u7 u1 v, K1 M6 Y" X8 d$ s
    未发送完成的结果。
    ! y; Q& i% a. N( l+ e
    3.3.6 Hadoop RPC参数调优4 i% _$ C0 W" x3 L* U2 C  O
    Hadoop RPC对外提供了一些可配置参数, 以便于用户根据业务需求和硬件环境对其进行调优。 主要的配置参数如下。
    : B/ r. a3 E* }2 q. |4 Z4 R; \
    ❑Reader线程数目。 由参数ipc.server.read.threadpool.size配置, 默认是1, 也就是说, 默认情况下, 一个RPC Server只包含一个. w. B7 B" E* L  H+ V" {
    Reader线程。
    ! t2 r: I9 H/ G5 l1 {
    ❑每个Handler线程对应的最大Call数目。 由参数ipc.server.handler.queue.size指定, 默认是100, 也就是说, 默认情况下, 每个
    , b- p, F' I5 o6 t
    Handler线程对应的Call队列长度为100。 比如, 如果Handler数目为10, 则整个Call队列( 即共享队列callQueue) 最大长度为:% e4 ~& t9 S7 c: a( K# |
    100×10=1000
    ) J: \, ~! E* `* U0 B7 U
    ❑Handler线程数目。 Hadoop中, ResourceManagerNameNode分别是YARNHDFS两个子系统中的RPC Server, 其对应的, O% D8 t# I+ o* s  g. S
    Handler数目分别由参数yarn.resourcemanager.resource-tracker.client.thread-countdfs.namenode.service.handler.count指定, 默认值分别为
    8 Q# G+ n0 E; m
    5010, 当集群规模较大时, 这两个参数值会大大影响系统性能。$ O: h! t$ l: V' E  L
    ❑客户端最大重试次数。 在分布式环境下, 因网络故障或者其他原因迫使客户端重试连接是很常见的, 但尝试次数过多可+ j1 I! m( G: T
    能不利于对实时性要求较高的应用。 客户端最大重试次数由参数
    ipc.client.connect.max.retries指定, 默认值为10, 也就是会连续尝试
    7 K0 t. l! z5 |9 E6 J$ r' J
    10次( 每两次之间相隔1秒) 。
    0 _0 G9 R! k5 v+ c
    3.3.7 YARN RPC实现
    . @2 P5 @2 P+ t当前存在非常多的开源RPC框架, 比较有名 的有Thrift [9] Protocol BuffersAvro。 同Hadoop RPC一样, 它们均由两部分组1 o* V7 H2 Q5 v9 Z+ p
    成: 对象序列化和远程过程调用(
    Protocol Buflers官方仅提供了序列化实现, 未提供远程调用相关实现, 但三 方RPC库非常多 [10]9 |' N4 e: J9 i+ e: i
    ) 。 相比于Hadoop RPC, 它们有以下几个特点:
      I" ?% h% c- B
    ❑跨语言特性 。 前面提到, RPC框架实际上是客户机服务器模型的一个应用实例, 对于Hadoop RPC而言, 由于Hadoop采用5 @- L1 N2 i% a, p
    Java语言编写, 因而其RPC客户端和服务器端仅支持Java语言; 但对于更通用的RPC框架, 如Thrift或者Protocol Buffers等, 其客户
    2 }' Y& ]) w# u端和服务器端可采用任何语言编写, 如
    JavaC++Python等, 这给用户编程带来极大方便。
    ( ?( m9 a% A! `  l7 h( a
    ❑引入IDL 。 开源RPC框架均提供了一套接口描述语言( Interface Description LanguageIDL) , 它提供一套通用的数据类型,
    : }, z5 y  t' K8 J4 C: y并以这些数据类型来定义更为复杂的数据类型和对外服务接口。 一旦用户按照
    IDL定义的语法编写完接口文件后, 可根据实际应
    ( @, _+ m- R9 F+ m2 X$ @用需要生成特定编程语言( 如
    JavaC++Python等) 的客户端和服务器端代码。& L+ A6 y  s/ `
    ❑协议兼容性 。 开源RPC框架在设计上均考虑到了协议兼容性问题, 即当协议格式发生改变时, 比如某个类需要添加或者
    # w4 F1 ?4 h8 S- e0 f3 U删除一个成员变量( 字段) 后, 旧版本代码仍然能识别新格式的数据, 也就是说, 具有向后兼容性。
    " [. {* }( W9 l. ]' ^, a9 k. m5 y$ ^随着
    Hadoop版本的不断演化, 研发人员发现Hadoop RPC在跨语言支持和协议兼容性两个方面存在不足, 具体表现为:
    5 k* {3 V9 @7 }3 V: U. J
    ❑从长远发展看, Hadoop RPC应允许某些协议的客户端或者服务器端采用其他语言实现, 比如用户希望直接使用C/C++语言
    : O; ^! o6 o8 L+ N+ W# u6 j读写
    HDFS中的文件, 这就需要有C/C++语言的HDFS客户端。, c  p1 j: H% U! ]
    ❑当前Hadoop版本较多, 而不同版本之间不能通信, 比如0.20.2版本的JobTracker不能与0.21.0版本中的TaskTracker通信, 如
    : @* c" W$ t& A1 ~0 t果用户企图这样做, 会抛出
    VersionMismatch异常。5 C; I) _" l" Z+ @
    为了解决以上几个问题,
    Hadoop YARNRPC中的序列化部分剥离开, 以便将现有的开源RPC框架集成进来。 经过改进之
    ) l' {$ u/ R: a后,
    Hadoop RPC的类关系如图3-10所示, RPC类变成了一个工厂, 它将具体的RPC实现授权给RpcEngine实现类, 而现有的开源. U3 O! u+ E, V- T- R
    RPC只要实现RpcEngine接口, 便可以集成到Hadoop RPC中。 在该图中, WritableRpcEngine是采用Hadoop自带的序列化框架实现的
    2 V) M& F- d. H1 D) \0 ~
    RPC, 而 AvroRpcEngine [11] ProtobufRpcEngine [12] 分别是开源RPC( 或序列化) 框架Apache AvroProtocol Buffers对应的2 A) M0 I1 k; S! S4 y$ M& ~- v
    RpcEngine实现, 用户可通过配置参数rpc.engine.{protocol}以指定协议{protocol}采用的序列化方式。 需要注意的是, 当前实现
    " j$ I, g, j+ P3 H8 R中,
    Hadoop RPC只是采用了这些开源框架的序列化机制, 底层的函数调用机制仍采用Hadoop自带的。
    * N4 y" ?# ]$ i" d/ X( w
    YARN提供的对外类是YarnRPC, 用户只需使用该类便可以构建一个基于Hadoop RPC且采用Protocol Buffers序列化框架的通信
    : @7 r! p# u: d2 j; V3 `协议。
    YarnRPC相关实现类如图3-11所示。 YarnRPC是一个抽象类, 实际的实现由参数yarn.ipc.rpc.class指定, 默认值是
    1 {0 V" l( [( h+ f' i4 L6 R
    org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPCHadoopYarnProtoRPC通过RPC工厂生成器( 工厂设计模式) RpcFactoryProvider
      V# ]3 T& v  E# U) _成客户端工厂( 由参数
    yarn.ipc.client.factory.class指定, 默认值是org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl) 和服, r! l4 x% ]0 P9 F! J5 }
    务器工厂( 由参数
    yarn.ipc.server.factory.class指定, 默认值是org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl) , 以根
    . O' L) Z& b' L  \" l+ X4 ?6 c& j据通信协议的
    Protocol Buffers定义生成客户端对象和服务器对象。: P8 {1 J  @' ^! z# ~
    3-10 Hadoop RPC 集成多种开源RPC 框架" A. Q& P7 L4 {  z4 ?8 ^
    3-11 YarnRPC 相关类图0 \8 b; `* Z* D: E
    ❑RpcClientFactoryPBImpl根据通信协议接口( 实际上就是一个Java interface) 及Protocol Buffers定义构造RPC客户端句柄, 但' K+ h6 U* D0 Z2 g- x+ g+ ~
    它对通信协议的存放位置和类名命有一定要求。 假设通信协议接口
    Xxx所在Java包名为XxxPackage, 则客户端实现代码必须位于: g+ a! m6 X" E, p/ _4 k8 k8 C
    JavaXxxPackage.impl.pb.client中( 在接口包名后面增加".impl.pb.client") , 且实现类名为PBClientImplXxx( 在接口名前面增加前
    # _/ f( k0 K3 ~0 ^: ^7 B. B, ~$ ?8 W
    "PBClientImpl") 。
    5 [0 B2 e8 x0 W+ G! \
    ❑RpcServerFactoryPBImpl根据通信协议接口( 实际上就是一个Java interface) 及Protocol Buffers定义构造RPC服务器句柄# |# k$ m" A3 y' N+ I; i( E
    (具体会调用前面节介绍的RPC.Server类) , 但它对通信协议的存放位置和类命名有一定要求。 假设通信协议接口Xxx所在Java
    8 r% [# t  T( w% o6 U5 `0 ]' M% K名为
    XxxPackage, 则客户端实现代码必须位于JavaXxxPackage.impl.pb.server中(在接口包名后面增加".impl.pb.server") , 且实现
    ) L0 k9 B/ s; V3 s- p类名为
    PBServiceImplXxx(在接口名前面增加前缀"PBServiceImpl") 。# K. A: @) x9 z) D9 G
    Hadoop YARN已将Protocol Buffers作为默认 的序列化机制 [13] (而不是Hadoop自带的Writable) , 这带来的好处主要表现在以4 ?! d. N, k! T) d% r4 G
    下几个方面:/ q  e5 N$ X' R, M
    ❑继承了Protocol Buffers的优势 Protocol Buffers已在实践中证明了其高效性、 可扩展性、 紧凑性和跨语言特性。 首先, 它允% P2 o( Q2 U  k) x% I
    许在保持向后兼容性的前提下修改协议, 比如为某个定义好的数据格式添加一个新的字段; 其次, 它支持多种语言, 进而方便用
    3 u- m% M: f: P4 N+ }( O: b6 `户为某些服务(比如
    HDFSNameNode) 编写 非Java客户端 [14] ; 此外, 实验表明Protocol BuffersHadoop 自带的Writable在性能
    ; I7 ~4 G; n1 F! w+ e方面有很大提升。
    * d9 Z( n1 E) S
    ❑支持升级回滚 Hadoop 2.0已经将NameNode HA方案合并进来, 在该方案中, Name-Node分为ActiveStandby两种角色,- D6 R6 L: X$ A; z
    其中,
    Active NameNode在当前对外提供服务, 而Standby NameNode则是能够在Active NameNode出现故障时接替它。 采用Protocol
    # h7 m5 `6 {$ r; qBuffers
    序列化机制后, 管理员能够在不停止NameNode对外服务的前提下, 通过主备NameNode之间的切换, 依次对主备
    : ?8 C! O3 C! r  h& b+ D4 v
    NameNode进行在线升级(不用考虑版本和协议兼容性等问题) 。
    , C; Y0 U- g$ p: z: ]8 B  x
    3.3.8 YARN RPC应用实例* ^+ z- `, W$ b! |
    为了进一步说明YARN RPC的使用方法, 本小节给出一个具体的应用实例。
    - F( F; ~. E5 X/ O3 D, l. g9 r- D
    YARN中, ResourceManagerNodeManager之间的通信协议是ResourceTracker, 其中NodeManager是该协议的客户2 C: F# Q! ]/ K3 W* {& U5 [4 H6 O
    端,
    ResourceManager是服务端, NodeManager通过该协议中定义的两个RPC函数( registerNodeManagernodeHeartbeat) 向- q8 ^9 B* F& N* X, r. [7 g+ M/ h
    ResourceManager注册和周期性发送心跳信息。 ResourceManager(服务器端) 中的相关代码如下:
    # x' n8 z/ C" n7 S) T% G& A
    // ResourceTrackerService实现了ResourceTracker通信接口, 并启动RPC Server
    : \5 {& R; n2 |9 N; D8 Gpublic class ResourceTrackerService extends AbstractService implements
    ! V' w5 R) V, A+ YResourceTracker {
    " Z& |/ K* V0 y' ~  I) X7 {private Server server;
    / h5 Z4 i$ m) u- N( W! n...
    * c( `, ]4 N5 @) [7 [2 l: h" Eprotected void serviceStart() throws Exception {
    ! d6 |( _! F) k5 O# ysuper.serviceStart();# W/ s  j) m% O6 Q, d  o! b9 f
    Configuration conf = getConfig();7 v/ p( A6 h$ [; |% E
    YarnRPC rpc = YarnRPC.create(conf); //使用YarnRPC
    - i+ x0 U( m  E- y
    this.server = rpc.getServer(ResourceTracker.class, this, resourceTrackerAddress,
    2 Q0 j7 Y+ D( ^3 C; L3 b+ econf, null, conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT,9 g- r  A( O( H+ J5 Q$ Q
    YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT));  J; P; p9 m8 }1 A" A/ E
    this.server.start();4 ^! y. w# u% b! o9 c
    }.- X- V8 f" V" O. _+ }
    ..
    ' T" b3 f. i5 |2 P5 _4 [" c@Override
    , W8 |& M* d6 h2 o7 j" d+ rpublic RegisterNodeManagerResponse registerNodeManager(
    & t% |$ i6 }- URegisterNodeManagerRequest request) throws YarnException,
      }7 R5 N: t4 T) ?$ L* oIOException {
    + ~0 z: [& V! }, q5 y$ e1 c) x4 B4 _//具体实现4 _; ?: J% l  W
    }@0 }# T1 u. g& Q
    Override& y! Y, }, U8 N+ W8 v- q& ?
    public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
    8 c. s) k0 M) _: d7 I& F) Uthrows YarnException, IOException {
    : ?4 T! ^+ _9 p; C//具体实现
    6 j2 ?/ d- ^: W7 v: I8 S
    }$ c* P, ~- Q! C/ z& T
    }/ p+ z0 d8 k8 x" O& }( M0 p
    NodeManager(客户端) 中的相关代码如下。" ?. ?2 j5 i. C; h: m; ]% A
    // 该函数是从YARN源代码中简单修改而来的
    + I( K  j$ z2 g9 E2 b$ j) P, `
    protected ResourceTracker getRMClient() throws IOException {5 D% {/ p( d) r% }
    Configuration conf = getConfig();; G+ B) W* C; e
    InetSocketAddress rmAddress = getRMAddress(conf, protocol);6 Z. ~8 E: ]4 }" M
    RetryPolicy retryPolicy = createRetryPolicy(conf);
    6 i/ H# E. `% P, @. d4 p/ u9 ~ResourceTracker proxy = RMProxy.<T>getProxy(conf, ResourceTracker.class, rmAddress);  T: P/ `8 N8 V4 J. d& B5 R, ?9 {1 I
    LOG.info("Connecting to ResourceManager at " + rmAddress);4 Q& h% n& {5 a7 j+ \( y
    return (ResourceTracker) RetryProxy.create(protocol, proxy, retryPolicy);5 o  G- O1 r4 u: M6 d
    }.
    / K/ |7 D- ^6 X..
    : B4 e" u+ `) l7 i* Sthis.resourceTracker = getRMClient();8 X2 X8 e9 D2 `- T2 n6 M3 i
    ...
    6 }9 Q$ W+ u; k) ~+ Z! q) \: TRegisterNodeManagerResponse regNMResponse = resourceTracker.registerNodeManager(request);# ^9 Q) j7 r& ?: C5 z% F2 O) W
    ...# ~- i/ }' r  B3 F& F
    response = resourceTracker.nodeHeartbeat(request);
    8 S2 g. @( `& j# ^9 ?& j9 f为了能够让以上代码正常工作, YARN按照以下流程实现各种功能。0 J( V! A% t' W+ R
    步骤1 定义通信协议接口( Java Interface) 。 定义通信协议接口ResourceTracker, 它包含registerNodeManagernodeHeartbeat
    * Z- q7 q' L( B& D' {" U% r5 j
    两个函数, 且每个函数包含一个参数和一个返回值, 具体如下:
    1 l/ D8 d: L, k. f3 e  H
    public interface ResourceTracker {1 o" E, v0 p" U
    public RegisterNodeManagerResponse registerNodeManager(
    : }4 t, @8 @. R2 `9 C8 B, h3 tRegisterNodeManagerRequest request) throws YarnException, IOException;1 ~! z* _8 H4 T+ ^3 c+ v
    public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
    + ^" y9 A& d7 B( ~# b3 s. \$ C9 `  \throws YarnException, IOException;
    6 [& _3 [* N. a- F( W6 P$ N}" A6 M5 L& n6 S; e% U7 c7 x
    步骤2 为通信协议ResourceTracker提供Protocol Buffers定义和Java实现。 前面提到, Protocol Buffers仅提供了序列化框架, 但6 m2 j/ f. J; l( d; M
    未提供
    RPC实现, 因此RPC部分需要由用户自己实现, 而YARN则让ResourceTrackerService类实现了ResourceTracker协议, 它的6 [$ ]5 T& g" b, b, d, [
    Protocol Buffers定义( 具体见文件ResourceTracker.proto) 如下:9 e0 R# U. s* l$ X( d7 r, k/ t
    option java_package = "org.apache.hadoop.yarn.proto";0 B0 ~2 }2 S  ~3 ~
    option java_outer_classname = "ResourceTracker";
    * z* _  P: Q4 d% S" c# |option java_generic_services = true;: Q3 p; d! t% N$ u5 t
    option java_generate_equals_and_hash = true;
    3 {6 ^! R* w4 F8 v* S3 [5 Ximport "yarn_server_common_service_protos.proto";1 _4 J! W/ T/ l, ~/ L
    service ResourceTrackerService {5 V) R2 f  E0 r) }7 A! i) I
    rpc registerNodeManager(RegisterNodeManagerRequestProto) returns (RegisterNodeManagerResponseProto);
    5 r3 I) s* R1 X" V! {+ w8 A  {rpc nodeHeartbeat(NodeHeartbeatRequestProto) returns (NodeHeartbeatResponseProto);
    " N& ]- D3 c2 X/ a}, S0 Y0 \7 b' I5 T3 q% s& N
    ResourceTrackerRPC函数实现是由ResourceManager中的ResourceTrackerService完成的。' d1 F6 ]4 ^& i5 t; b0 ?- L
    步骤
    3 RPC函数的参数和返回值提供Protocol Buffers定义。 YARN需要保证每个RPC函数的参数和返回值是采用Protocol" S6 T$ K3 k0 h% A
    Buffers
    定义的, 因此ResourceTracker协议中RegisterNodeManagerRequestRegisterNodeManagerResponseNodeHeartbeatRequest
    ) @% k5 ^* g  z% k* K
    NodeHeartbeatResponse四个参数或者返回值需要使用Protocol Buffers定义, 具体如下( 见yarn_server_common_service_protos.proto
    4 @( o% ]. s3 ^4 Q# M+ b件) :. B$ y- \7 c0 x7 k
    import "yarn_protos.proto";
    - s- p* s, C1 Qimport "yarn_server_common_protos.proto";5 L/ Q5 C0 b( p6 d/ o" g! E  e4 Y
    message RegisterNodeManagerRequestProto {
    % e% e) c) t3 }' soptional NodeIdProto node_id = 1;4 Z0 q: O% i, J, q
    optional int32 http_port = 3;5 o  b7 g3 }1 r2 X: T
    optional ResourceProto resource = 4;
    1 K- r2 q5 @# e3 E* F/ w} m  _0 Q5 |, s5 G8 y. _7 D2 J
    essage RegisterNodeManagerResponseProto {
    7 |# `3 W7 n! H% K. Q5 S5 eoptional MasterKeyProto container_token_master_key = 1;/ O6 L! a- O+ ]! H. X( b
    optional MasterKeyProto nm_token_master_key = 2;% d- [; r; n. C: ]9 C$ U
    optional NodeActionProto nodeAction = 3;
    * `4 b" n& p/ n- s  Z7 T. Loptional int64 rm_identifier = 4;* V" y1 v: P# S1 D! ?
    optional string diagnostics_message = 5;
    . H' y( g6 f7 [5 O$ K$ Z}.
    * A2 I- n( S5 R& F1 v.. //其他几个参数和返回值的定义
    ( d# X; s' M) c" p
    步骤4 RPC函数的参数和返回值提供Java定义和封装。 YARN采用了Protocol Buffers作为参数和返回值的序列化框架, 且以/ @8 ^0 F! T6 v1 y# U; Q9 y
    原生态
    .proto文件的方式给出了定义, 而具体的Java代码生成需在代码编写之后完成。 基于以上考虑, 为了更容易使用Protocol$ |' T! a9 N% v8 h& I
    Buffers
    生成的( Java语言) 参数和返回值定义, YARN RPC为每个RPC函数的参数和返回值提供Java定义和封装, 以参数" |8 @2 d8 x$ X' _3 j
    RegisterNodeManagerRequest为例进行说明。1 M7 S6 `% _3 y* G
    Java接口定义如下( 见Javaorg.apache.hadoop.yarn.server.api.protocolrecords) :( B" Y* {* ]; n
    public interface RegisterNodeManagerRequest {
    7 F1 f, W  x1 ^  Y  \( hNodeId getNodeId();
    . g9 x4 e1 G3 T0 L( u& Vint getHttpPort();
    3 O* ^% r( h+ m" qResource getResource();
    & d) z% |! Y" V4 K( wvoid setNodeId(NodeId nodeId);
      r3 i; j5 B5 b8 lvoid setHttpPort(int port);( }1 ]- a' h# m/ }! ]
    void setResource(Resource resource);
    $ ]- A. @) q/ ~: A& S8 F4 O; C}/ L0 X9 Z/ J6 _+ K6 h6 N5 ^
    Java封装如下( 见Javaorg.apache.hadoop.yarn.server.api.protocolrecords.impl.pb) :" r; }; C3 G$ l5 V
    public class RegisterNodeManagerRequestPBImpl extends  b" c. G" f1 z- q" g4 U" n  B
    ProtoBase<RegisterNodeManagerRequestProto> implements RegisterNodeManagerRequest {5 a8 @* f: s  u
    RegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance();0 S8 c) a; O% \- m! n' ?
    RegisterNodeManagerRequestProto.Builder builder = null;; v+ D' M7 r/ F$ i6 U
    private NodeId nodeId = null;5 n/ ?; U) D$ R" y+ z7 ?
    ...! b& d* g& z4 h+ n
    @Override
    + A+ y, T8 M) n0 S3 l0 Ipublic NodeId getNodeId() {& ?4 J8 K: ~( v$ U' o
    RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
    0 l' @6 s1 S8 v) Q7 q+ oif (this.nodeId != null) {* w' G9 J2 }0 }. W5 L! y+ \( G2 e4 Z$ j
    return this.nodeId;0 {9 c) _5 g2 t1 b. E0 I/ c
    }i1 C" j3 c  {6 q2 {9 j+ ~3 K: D
    f (!p.hasNodeId()) {
    ! {7 ^0 q: `( R6 B. v! G" rreturn null;6 }- u4 M2 s7 Q9 i6 Y9 Q
    } t' Y, j  M1 I3 z( _- ?2 Z6 s9 O
    his.nodeId = convertFromProtoFormat(p.getNodeId());6 S# ~( g. M- G; _" D
    return this.nodeId;
    + I2 v7 P: l5 Y* H0 B' ^} @
    # T$ O5 Z( f  D5 F7 \) f, `) VOverride+ o2 z3 }, q1 N8 n/ }* z
    public void setNodeId(NodeId nodeId) {
    5 `3 Z: o' V( H9 E. R1 f% WmaybeInitBuilder();1 O2 J. y- N6 V# U
    if (nodeId == null)
    5 ~2 P) E' X# K; b0 Hbuilder.clearNodeId();5 m0 C- a3 [% [8 R/ a% y6 G9 B+ P3 y
    this.nodeId = nodeId;
    ! [  ?/ Z6 @7 S% Q' Q} .
    " W6 T4 G$ ^$ Q9 l& y. m( Y..) u4 v. x, _+ D; H9 S7 b
    }
    ; b# ]/ G  ^9 H8 x步骤5 为通信协议提供客户端和服务器端实现。 客户端代码放在org.apache.hadoop.yarn.server.api.impl.pb.client包中, 且类名为7 x# Y7 r% q; M
    ResourceTrackerPBClientImpl, 实现如下:
    ( T! [( N3 P$ ~4 G, G3 m
    public class ResourceTrackerPBClientImpl implements ResourceTracker, Closeable {/ r, _6 ~4 |1 d: ~+ I& Q
    private ResourceTrackerPB proxy;* M8 h/ p6 P7 |* ~& D
    public ResourceTrackerPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException {9 A# V7 m: y; Z6 Y
    RPC.setProtocolEngine(conf, ResourceTrackerPB.class, ProtobufRpcEngine.class);
    - i4 u' {; Y# y& \! gproxy = (ResourceTrackerPB)RPC.getProxy(
    ( w9 W/ L3 m% jResourceTrackerPB.class, clientVersion, addr, conf);# \1 z: K0 S! |( N
    } @. _0 Y1 ^% e5 @% i4 o5 }9 m
    Override
    ) @9 O- @$ A. ]% {3 c. @public RegisterNodeManagerResponse registerNodeManager(
      u; ~  E  I' aRegisterNodeManagerRequest request) throws YarnException,
    : z+ U" n% o7 X" k: M1 Z; UIOException {7 t" h# R7 X7 B3 i& D
    RegisterNodeManagerRequestProto requestProto = ((RegisterNodeManagerRequestPBImpl)request).getProto();
    ) s; a/ c9 B) F; Ctry {
    ( o7 `1 S" N" ]- L6 Ireturn new RegisterNodeManagerResponsePBImpl(proxy.registerNodeManager (null, requestProto));
    4 N, O$ n  k* p# Z4 `} catch (ServiceException e) {
    9 F. s1 I( w, gRPCUtil.unwrapAndThrowException(e);
    - ^6 v3 W* Y3 \0 ^: preturn null;5 y& D  a# G8 i5 m
    }
    2 J" V, Z' i* R( }$ w3 d1 L1 I} .
    7 |& H' B8 n  c- S% n..
    $ g: {# ?5 A5 k. e/ [) L6 ]) o}
    3 T. \$ g& b) b7 |. [2 ^& e: c服务端代码放在org.apache.hadoop.yarn.server.api.impl.pb.server包中, 且类名为Resource-TrackerPBServerImpl, 实现如下:7 x$ L* U% s0 Q- O" N  W$ C
    public class ResourceTrackerPBServiceImpl implements ResourceTrackerPB {
      Z0 H2 q' A) Z6 d; w: \: v$ kprivate ResourceTracker real;
    ; {2 Z% B5 Z( Cpublic ResourceTrackerPBServiceImpl(ResourceTracker impl) {, U1 D/ r4 Q# R. C% o7 T5 I$ Y
    this.real = impl;
    0 \, N0 _3 x+ A; Z% F} @8 I: O) E" Q) t6 m$ h/ W
    Override& C$ _: H. p% |
    public RegisterNodeManagerResponseProto registerNodeManager(& B& l, Q( C/ s8 g
    RpcController controller, RegisterNodeManagerRequestProto proto)
    + y5 _9 O0 z/ }* O9 N3 Wthrows ServiceException {2 n2 W" m& P5 P4 T) ]0 B
    RegisterNodeManagerRequestPBImpl request = new RegisterNodeManagerRequestPBImpl(proto);
    5 g1 E' Q) W0 C5 Ltry {; y% I9 [3 ]. t/ C! n# m$ y4 o
    RegisterNodeManagerResponse response = real.registerNodeManager(request);
    7 b# }! {1 O* V7 Q# \; @8 d% x! ^return ((RegisterNodeManagerResponsePBImpl)response).getProto();
    " \4 w9 }; q, ]6 s} catch (YarnException e) {& ~' d# {" a' f+ L  F- b
    throw new ServiceException(e);3 B/ ^8 u6 N$ n! d
    } catch (IOException e) {0 _. w5 y  @1 ~  _9 l2 H
    throw new ServiceException(e);
    " e, _% U2 V+ R; A}
    " E( q  e) |* P* A+ @& v% u; h0 H) U} .9 [! }4 R9 l+ |: x& m% Z8 m
    ..
    1 O$ D. q/ K! \/ |" z; R( Y5 r}/ }0 q( d% E+ g
    总结上面几个步骤, 为了实现基于Protocol Buffers序列化框架的YARN RPC通信协议ResourceTrackerYARN实现了一系列
    ) Z" f' b& d; |6 o
    Java接口定义和Protocol Buffers封装, 具体如图3-12所示( 以服务器端实现为例) 。1 ]( e- c8 d4 |# {, K/ l6 a, Q
    3-12 YARN RPC中的Protocol Buffers封装
    % ]0 k4 \) t) X" k
    [6] 参见网址http://en.wikipedia.org/wiki/Remote_procedure_call
    8 q7 x+ w" `1 U, m# B+ ~# i
    [7] Doug CuttingHadoop最初设计时就是这样描述Hadoop RPC设计动机的。
    : c5 _; j7 c* {0 o# t
    [8] HDFS的单点故障已经在Hadoop 2.0中得到了解决, MRv1中的JobTracker的单点故障在CDH4中也得到了解决。' x; y' L$ r& j3 \
    [9] 参见网址http://thrift.apache.org/
    ( O/ Y& r5 l8 m4 X1 u
    [10] 参见网址: http://code.google.com/p/protobuf/wiki/ThirdParty/AddOns
    1 W! c0 Z% e, z: i
    [11] AvroRpcEngineHadoop 0.21.0版本开始出现。
    3 n( E1 |: J1 s$ ~/ e/ e
    [12] ProtobufRpcEngineHadoop 2.0-apha版本开始出现。! Z% N* c) K- f7 B3 Y
    [13] 参见网址https://issues.apache.org/jira/browse/HADOOP-7347% W% W3 z! E" [' H3 L' k9 d) J
    [14] Hadoop 2.0中的RFC框架是采用Java编写的, 尚不能像ThriftAvro那样支持多语言编程, 但引入Protocol Buffers序列化框架则
    # n8 b, P4 M8 b, Q! f9 C6 J9 g使其向前迈进了一步。
      , E1 W2 b* T% f! f6 M7 B4 Z

    - p' ~, d  i4 W6 x( r" Y
    ; s8 p, x5 h0 M6 T/ p
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|Archiver|手机版|小黑屋|Java自学网

    GMT+8, 2025-2-23 04:26 , Processed in 0.161959 second(s), 28 queries .

    Powered by Javazx

    Copyright © 2012-2022, Javazx Cloud.

    快速回复 返回顶部 返回列表