|
3.3.5 Hadoop RPC类详解
0 p0 n& Y! P6 ?4 h! v, IHadoop RPC主要由三个大类组成, 即RPC、 Client和Server, 分别对应对外编程接口、 客户端实现和服务器实现。
7 S& E3 v( J# ~% U1.ipc.RPC类分析
2 b7 |4 J' [2 |8 n% N/ z' E cRPC类实际上是对底层客户机–服务器网络模型的封装, 以便为程序员提供一套更方便简洁的编程接口。1 Z6 Q. K( T( p% K( e
如图3-4所示, RPC类定义了一系列构建和销毁RPC客户端的方法, 构建方法分为getProxy和waitForProxy两类, 销毁方只有一
! N8 V9 ]0 @' r- E, n' S# U个, 即为stopProxy。 RPC服务器的构建则由静态内部类RPC.Builder, 该类提供了一些列setXxx方法( Xxx为某个参数名称) 供用户" |" r! }- u4 _4 ^6 Z, x
设置一些基本的参数, 比如RPC协议、 RPC协议实现对象、 服务器绑定地址、 端口号等, 一旦设置完成这些参数后, 可通过调用
9 S7 t. K& ]" C0 J2 n* oRPC.Builder.build()完成一个服务器对象的构建, 之后直接调用Server.start()方法便可以启动该服务器。. q6 K" d. y) ^) X8 t+ m4 z
与Hadoop 1.x中的RPC仅支持基于Writable序列化方式不同, Hadoop 2.x允许用户使用其他序列化框架, 比如Protocol Buffers$ s6 t- e/ Z/ _
等, 目前提供了Writable( WritableRpcEngine) 和Protocol Buffers( ProtobufRpcEngine) 两种, 默认实现是Writable方式, 用户可通过
1 {- A( [) S/ s: i" P调用RPC.setProtocolEngine(…)修改采用的序列化方式。
+ y( D; {& ?6 v9 u r下面以采用Writable序列化为例( 采用Protocol Buffers的过程类似) , 介绍Hadoop RPC的远程过程调用流程。 Hadoop RPC使用5 s8 b& H! N8 v. N3 v! l8 p
了Java动态代理完成对远程方法的调用: 用户只需实现java.lang.reflect.InvocationHandler接口, 并按照自己需求实现invoke 方法即可
8 c) C+ @; b) `8 e完成动态代理类对象上的方法调用。 但对于Hadoop RPC, 函数调用由客户端发出, 并在服务器端执行并返回, 因此不能像单机* r8 \, u9 R- D- t' M! l
程序那样直接在invoke 方法中本地调用相关函数, 它的做法是, 在invoke方法中, 将函数调用信息( 函数名, 函数参数列表等)9 p! Y+ F; t, L' e3 I
打包成可序列化的WritableRpcEngine.Invocation对象, 并通过网络发送给服务器端, 服务端收到该调用信息后, 解析出和函数名,
1 {- {4 ^* E! P6 ?函数参数列表等信息, 利用Java反射机制完成函数调用, 期间涉及到的类关系如下图所示。
1 i" R; j" t L, h图3-4 HadoopRPC的主要类关系图2 p- A1 \% m" E$ s4 `; h2 j1 h' H
图3-5 HadoopRPC中服务器端动态代理实现类图
/ B7 ]2 v/ ~8 f; t2.ipc.Client
8 Y, z( ?. ^6 E% M4 I/ \4 P- p( X4 c; DClient主要完成的功能是发送远程过程调用信息并接收执行结果。 它涉及到的类关系如图3-6所示。 Client类对外提供了一类执
& ^ l& \+ h4 ?7 x行远程调用的接口, 这些接口的名称一样, 仅仅是参数列表不同, 比如其中一个的声明如下所示:7 [* h' _) s: N8 T0 k5 r
public Writable call(Writable param, ConnectionIdremoteId)
. k: H |4 F+ X1 Ythrows InterruptedException, IOException;
: F' _9 P4 R# L' N8 l' S1 K- w1 f图3-6 Client类图
3 v1 t5 T* {8 z# x+ P {/ ZClient内部有两个重要的内部类, 分别是Call和Connection。6 `& B# v6 k# Q" l# D/ x# }
❑Call类 : 封装了一个RPC请求, 它包含5个成员变量, 分别是唯一标识id、 函数调用信息param、 函数执行返回值value、 出8 b7 j* F( @" x. H
错或者异常信息error和执行完成标识符done。 由于Hadoop RPC Server采用异步方式处理客户端请求, 这使远程过程调用的发生顺/ \, B/ G; F- n& o
序与结果返回顺序无直接关系, 而Client端正是通过id识别不同的函数调用的。 当客户端向服务器端发送请求时, 只需填充id和* Q% T' d, k/ `) x4 {1 j2 n8 [* @
param两个变量, 而剩下的3个变量( value、 error和done) 则由服务器端根据函数执行情况填充。" D+ h; j* U: C1 {' }
❑Connection类 : Client与每个Server之间维护一个通信连接, 与该连接相关的基本信息及操作被封装到Connection类中, 基本
' L3 [1 f' s. y! s2 \3 M/ g信息主要包括通信连接唯一标识( remoteId) 、 与Server端通信的Socket( socket) 、 网络输入数据流( in) 、 网络输出数据流
4 ^* U& f& {; ~+ s' B1 L6 I( out) 、 保存RPC请求的哈希表( calls) 等。 操作则包括:
; n# g; W: ^% v4 q% C/ ]❍addCall—将一个Call对象添加到哈希表中;
4 J) v @2 X9 ^6 W9 L❍sendParam—向服务器端发送RPC请求;3 w3 r: E" I. [& G5 P: w) d
❍receiveResponse —从服务器端接收已经处理完成的RPC请求;
9 `" z8 r) @2 C3 K& g1 K❍run—Connection是一个线程类, 它的run方法调用了receiveResponse方法, 会一直等待接收RPC返回结果。
7 j" I. h2 e, i; n8 H# S当调用call函数执行某个远程方法时, Client端需要进行( 如图3-7所示) 以下4个步骤。
4 u e' s% |0 A1) 创建一个Connection对象, 并将远程方法调用信息封装成Call对象, 放到Connection对象中的哈希表中;1 G0 W" K( l- J) c( ]
2) 调用Connection类中的sendRpcRequest()方法将当前Call对象发送给Server端;
2 G4 N$ v! V/ }4 @( v' A: s3) Server端处理完RPC请求后, 将结果通过网络返回给Client端, Client端通过receiveRpcResponse()函数获取结果;
$ R& @- j3 f% q9 U' [ V% B4) Client检查结果处理状态( 成功还是失败) , 并将对应Call对象从哈希表中删除。: J4 z, X4 H9 S5 I7 n) |
图3-7 Hadoop RPC Client处理流程5 O$ q6 C/ l4 G1 A
3.ipc.Server类分析$ M" ^) T" `# T0 d! t' R. M) v
Hadoop采用了Master/Slave结构, 其中Master是整个系统的单点, 如NameNode或 JobTracker [8] , 这是制约系统性能和可扩展3 P3 N0 s2 t) \. c
性的最关键因素之一; 而Master通过ipc.Server接收并处理所有Slave发送的请求, 这就要求ipc.Server 将高并发和可扩展性作为设计8 w6 x3 g7 A+ x, k. i, W
目标。 为此, ipc.Server采用了很多提高并发处理能力的技术, 主要包括线程池、 事件驱动和Reactor设计模式等, 这些技术均采用
+ k% i/ U* ]% z" q了JDK自带的库实现, 这里重点分析它是如何利用Reactor设计模式提高整体性能的。2 D: o5 _0 V( s5 E. P
Reactor是并发编程中的一种基于事件驱动的设计模式, 它具有以下两个特点: 通过派发/分离I/O操作事件提高系统的并发性2 a& q7 ?& _) X* Q2 ]# p$ g O
能; 提供了粗粒度的并发控制, 使用单线程实现, 避免了复杂的同步处理。 典型的Reactor实现原理如图3-8所示。) _: \; z \* }5 s( M! g+ d8 W2 I
图3-8 Reactor模式工作原理3 t. O1 ]! _3 Z1 I2 I
典型的Reactor模式中主要包括以下几个角色。
2 x/ K* o6 s$ D; T" {7 [❑Reactor: I/O事件的派发者。2 `/ R1 e, y4 t: l* X Z$ q, H) k/ s
❑Acceptor: 接受来自Client的连接, 建立与Client对应的Handler, 并向Reactor注册此Handler。6 m; _" w1 r6 a6 ]( i# t! d: r& x
❑Handler: 与一个Client通信的实体, 并按一定的过程实现业务的处理。 Handler内部往往会有更进一步的层次划分, 用来抽
! J6 k. A5 j A7 w) d象诸如read、 decode、 compute、 encode和send等过程。 在Reactor模式中, 业务逻辑被分散的I/O事件所打破, 所以Handler需要有适
. o6 l; S; S( ]' q8 @/ m j当的机制在所需的信息还不全( 读到一半) 的时候保存上下文, 并在下一次I/O事件到来的时候( 另一半可读) 能继续上次中断' V: z- ?- T+ o- b- x! H1 @
的处理。
8 [% g6 A8 A8 Z5 P. G❑Reader/Sender: 为了加速处理速度, Reactor模式往往构建一个存放数据处理线程的线程池, 这样数据读出后, 立即扔到线4 c w) m+ j, v& w+ {6 X4 Y
程池中等待后续处理即可。 为此, Reactor模式一般分离Handler中的读和写两个过程, 分别注册成单独的读事件和写事件, 并由对( s5 k* @% u' b7 } K) ]1 s
应的Reader和Sender线程处理。 U5 z1 f0 G2 A, x) r
ip( o2 F2 |9 `- H- l; \
c.Server实际上实现了一个典型的Reactor设计模式, 其整体架构与上述完全一致。 一旦读者了解典型Reactor架构便可很容易5 y5 I, p0 o/ q' x1 V5 t. _
地学习ipc.Server的设计思路及实现。 接下来, 我们分析ipc.Server的实现细节。
2 h) ?7 H7 m& f+ r8 F前面提到, ipc.Server的主要功能是接收来自客户端的RPC请求, 经过调用相应的函数获取结果后, 返回给对应的客户端。 为
5 w8 k1 \- o$ R; r+ D此, ipc.Server被划分成3个阶段: 接收请求、 处理请求和返回结果, 如图3-9所示。 各阶段实现细节如下。
2 W6 t/ _+ U( Y r" a6 h图3-9 Hadoop RPC Server处理流程5 e5 M* t; ^- c
( 1) 接收请求
5 T2 x# c! y8 O! A该阶段主要任务是接收来自各个客户端的RPC请求, 并将它们封装成固定的格式( Call类) 放到一个共享队列( callQueue)
5 S: ^) k) {4 h1 P& s: B中, 以便进行后续处理。 该阶段内部又分为建立连接和接收请求两个子阶段, 分别由Listener和Reader两种线程完成。/ {2 b3 O U" o n! |0 r4 \1 q
整个Server只有一个Listener线程, 统一负责监听来自客户端的连接请求, 一旦有新的请求到达, 它会采用轮询的方式从线程
# p# F8 L* y8 _0 [7 V% C池中选择一个Reader线程进行处理, 而Reader线程可同时存在多个, 它们分别负责接收一部分客户端连接的RPC请求, 至于每个
) h7 b: {6 Z, Q- l. o: |! L& x/ O1 iReader线程负责哪些客户端连接, 完全由Listener决定, 当前Listener只是采用了简单的轮询分配机制。; @2 F9 \( J6 ~2 X7 t
Listener和Reader线程内部各自包含一个Selector对象, 分别用于监听SelectionKey.OP_ACCEPT和SelectionKey.OP_READ事件。. n5 G3 Y, q! z, G9 |
对于Listener线程, 主循环的实现体是监听是否有新的连接请求到达, 并采用轮询策略选择一个Reader线程处理新连接; 对于' d, P0 H9 E+ A, M# W1 u4 ?% {0 f6 M6 z
Reader线程, 主循环的实现体是监听( 它负责的那部分) 客户端连接中是否有新的RPC请求到达, 并将新的RPC请求封装成Call对
1 G4 F8 q5 E. o5 V3 V象, 放到共享队列callQueue中。
3 `8 U6 X; M" l% o( 2) 处理请求# \$ e/ a+ A: Z. S
该阶段主要任务是从共享队列callQueue中获取Call对象, 执行对应的函数调用, 并将结果返回给客户端, 这全部由Handler线
: `+ b7 V, H6 h2 ^程完成。% N# y2 G3 Q M; N0 \
Server端可同时存在多个Handler线程, 它们并行从共享队列中读取Call对象, 经执行对应的函数调用后, 将尝试着直接将结果2 b: f' o/ A" r# c8 ^; A
返回给对应的客户端。 但考虑到某些函数调用返回结果很大或者网络速度过慢, 可能难以将结果一次性发送到客户端, 此时
; j1 Y- j+ w6 [% mHandler将尝试着将后续发送任务交给Responder线程。
1 p* S( E9 Q4 j. y, p4 l- ]( 3) 返回结果
5 x; U Y5 A$ ~前面提到, 每个Handler线程执行完函数调用后, 会尝试着将执行结果返回给客户端, 但对于特殊情况, 比如函数调用返回结 F, ?9 o3 {* s$ I2 ?- d( ^
果过大或者网络异常情况( 网速过慢) , 会将发送任务交给Responder线程。% @) F7 \5 @" t9 [' ?) X: F' V$ {& O
Server端仅存在一个Responder线程, 它的内部包含一个Selector对象, 用于监听SelectionKey.OP_WRITE事件。 当Handler没能将
$ r7 l% o/ O$ T) a/ r0 [: p结果一次性发送到客户端时, 会向该Selector对象注册SelectionKey.OP_WRITE事件, 进而由Responder线程采用异步方式继续发送, C7 g u7 u1 v, K1 M6 Y" X8 d$ s
未发送完成的结果。
! y; Q& i% a. N( l+ e3.3.6 Hadoop RPC参数调优4 i% _$ C0 W" x3 L* U2 C O
Hadoop RPC对外提供了一些可配置参数, 以便于用户根据业务需求和硬件环境对其进行调优。 主要的配置参数如下。
: B/ r. a3 E* }2 q. |4 Z4 R; \❑Reader线程数目。 由参数ipc.server.read.threadpool.size配置, 默认是1, 也就是说, 默认情况下, 一个RPC Server只包含一个. w. B7 B" E* L H+ V" {
Reader线程。
! t2 r: I9 H/ G5 l1 {❑每个Handler线程对应的最大Call数目。 由参数ipc.server.handler.queue.size指定, 默认是100, 也就是说, 默认情况下, 每个
, b- p, F' I5 o6 tHandler线程对应的Call队列长度为100。 比如, 如果Handler数目为10, 则整个Call队列( 即共享队列callQueue) 最大长度为:% e4 ~& t9 S7 c: a( K# |
100×10=1000。
) J: \, ~! E* `* U0 B7 U❑Handler线程数目。 在Hadoop中, ResourceManager和NameNode分别是YARN和HDFS两个子系统中的RPC Server, 其对应的, O% D8 t# I+ o* s g. S
Handler数目分别由参数yarn.resourcemanager.resource-tracker.client.thread-count和dfs.namenode.service.handler.count指定, 默认值分别为
8 Q# G+ n0 E; m50和10, 当集群规模较大时, 这两个参数值会大大影响系统性能。$ O: h! t$ l: V' E L
❑客户端最大重试次数。 在分布式环境下, 因网络故障或者其他原因迫使客户端重试连接是很常见的, 但尝试次数过多可+ j1 I! m( G: T
能不利于对实时性要求较高的应用。 客户端最大重试次数由参数ipc.client.connect.max.retries指定, 默认值为10, 也就是会连续尝试
7 K0 t. l! z5 |9 E6 J$ r' J10次( 每两次之间相隔1秒) 。
0 _0 G9 R! k5 v+ c3.3.7 YARN RPC实现
. @2 P5 @2 P+ t当前存在非常多的开源RPC框架, 比较有名 的有Thrift [9] 、 Protocol Buffers和Avro。 同Hadoop RPC一样, 它们均由两部分组1 o* V7 H2 Q5 v9 Z+ p
成: 对象序列化和远程过程调用( Protocol Buflers官方仅提供了序列化实现, 未提供远程调用相关实现, 但三 方RPC库非常多 [10]9 |' N4 e: J9 i+ e: i
) 。 相比于Hadoop RPC, 它们有以下几个特点:
I" ?% h% c- B❑跨语言特性 。 前面提到, RPC框架实际上是客户机–服务器模型的一个应用实例, 对于Hadoop RPC而言, 由于Hadoop采用5 @- L1 N2 i% a, p
Java语言编写, 因而其RPC客户端和服务器端仅支持Java语言; 但对于更通用的RPC框架, 如Thrift或者Protocol Buffers等, 其客户
2 }' Y& ]) w# u端和服务器端可采用任何语言编写, 如Java、 C++、 Python等, 这给用户编程带来极大方便。
( ?( m9 a% A! ` l7 h( a❑引入IDL 。 开源RPC框架均提供了一套接口描述语言( Interface Description Language, IDL) , 它提供一套通用的数据类型,
: }, z5 y t' K8 J4 C: y并以这些数据类型来定义更为复杂的数据类型和对外服务接口。 一旦用户按照IDL定义的语法编写完接口文件后, 可根据实际应
( @, _+ m- R9 F+ m2 X$ @用需要生成特定编程语言( 如Java、 C++、 Python等) 的客户端和服务器端代码。& L+ A6 y s/ `
❑协议兼容性 。 开源RPC框架在设计上均考虑到了协议兼容性问题, 即当协议格式发生改变时, 比如某个类需要添加或者
# w4 F1 ?4 h8 S- e0 f3 U删除一个成员变量( 字段) 后, 旧版本代码仍然能识别新格式的数据, 也就是说, 具有向后兼容性。
" [. {* }( W9 l. ]' ^, a9 k. m5 y$ ^随着Hadoop版本的不断演化, 研发人员发现Hadoop RPC在跨语言支持和协议兼容性两个方面存在不足, 具体表现为:
5 k* {3 V9 @7 }3 V: U. J❑从长远发展看, Hadoop RPC应允许某些协议的客户端或者服务器端采用其他语言实现, 比如用户希望直接使用C/C++语言
: O; ^! o6 o8 L+ N+ W# u6 j读写HDFS中的文件, 这就需要有C/C++语言的HDFS客户端。, c p1 j: H% U! ]
❑当前Hadoop版本较多, 而不同版本之间不能通信, 比如0.20.2版本的JobTracker不能与0.21.0版本中的TaskTracker通信, 如
: @* c" W$ t& A1 ~0 t果用户企图这样做, 会抛出VersionMismatch异常。5 C; I) _" l" Z+ @
为了解决以上几个问题, Hadoop YARN将RPC中的序列化部分剥离开, 以便将现有的开源RPC框架集成进来。 经过改进之
) l' {$ u/ R: a后, Hadoop RPC的类关系如图3-10所示, RPC类变成了一个工厂, 它将具体的RPC实现授权给RpcEngine实现类, 而现有的开源. U3 O! u+ E, V- T- R
RPC只要实现RpcEngine接口, 便可以集成到Hadoop RPC中。 在该图中, WritableRpcEngine是采用Hadoop自带的序列化框架实现的
2 V) M& F- d. H1 D) \0 ~RPC, 而 AvroRpcEngine [11] 和 ProtobufRpcEngine [12] 分别是开源RPC( 或序列化) 框架Apache Avro和Protocol Buffers对应的2 A) M0 I1 k; S! S4 y$ M& ~- v
RpcEngine实现, 用户可通过配置参数rpc.engine.{protocol}以指定协议{protocol}采用的序列化方式。 需要注意的是, 当前实现
" j$ I, g, j+ P3 H8 R中, Hadoop RPC只是采用了这些开源框架的序列化机制, 底层的函数调用机制仍采用Hadoop自带的。
* N4 y" ?# ]$ i" d/ X( wYARN提供的对外类是YarnRPC, 用户只需使用该类便可以构建一个基于Hadoop RPC且采用Protocol Buffers序列化框架的通信
: @7 r! p# u: d2 j; V3 `协议。 YarnRPC相关实现类如图3-11所示。 YarnRPC是一个抽象类, 实际的实现由参数yarn.ipc.rpc.class指定, 默认值是
1 {0 V" l( [( h+ f' i4 L6 Rorg.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC。 HadoopYarnProtoRPC通过RPC工厂生成器( 工厂设计模式) RpcFactoryProvider生
V# ]3 T& v E# U) _成客户端工厂( 由参数yarn.ipc.client.factory.class指定, 默认值是org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl) 和服, r! l4 x% ]0 P9 F! J5 }
务器工厂( 由参数yarn.ipc.server.factory.class指定, 默认值是org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl) , 以根
. O' L) Z& b' L \" l+ X4 ?6 c& j据通信协议的Protocol Buffers定义生成客户端对象和服务器对象。: P8 {1 J @' ^! z# ~
图3-10 Hadoop RPC 集成多种开源RPC 框架" A. Q& P7 L4 { z4 ?8 ^
图3-11 YarnRPC 相关类图0 \8 b; `* Z* D: E
❑RpcClientFactoryPBImpl: 根据通信协议接口( 实际上就是一个Java interface) 及Protocol Buffers定义构造RPC客户端句柄, 但' K+ h6 U* D0 Z2 g- x+ g+ ~
它对通信协议的存放位置和类名命有一定要求。 假设通信协议接口Xxx所在Java包名为XxxPackage, 则客户端实现代码必须位于: g+ a! m6 X" E, p/ _4 k8 k8 C
Java包XxxPackage.impl.pb.client中( 在接口包名后面增加".impl.pb.client") , 且实现类名为PBClientImplXxx( 在接口名前面增加前
# _/ f( k0 K3 ~0 ^: ^7 B. B, ~$ ?8 W缀"PBClientImpl") 。
5 [0 B2 e8 x0 W+ G! \❑RpcServerFactoryPBImpl: 根据通信协议接口( 实际上就是一个Java interface) 及Protocol Buffers定义构造RPC服务器句柄# |# k$ m" A3 y' N+ I; i( E
(具体会调用前面节介绍的RPC.Server类) , 但它对通信协议的存放位置和类命名有一定要求。 假设通信协议接口Xxx所在Java包
8 r% [# t T( w% o6 U5 `0 ]' M% K名为XxxPackage, 则客户端实现代码必须位于Java包XxxPackage.impl.pb.server中(在接口包名后面增加".impl.pb.server") , 且实现
) L0 k9 B/ s; V3 s- p类名为PBServiceImplXxx(在接口名前面增加前缀"PBServiceImpl") 。# K. A: @) x9 z) D9 G
Hadoop YARN已将Protocol Buffers作为默认 的序列化机制 [13] (而不是Hadoop自带的Writable) , 这带来的好处主要表现在以4 ?! d. N, k! T) d% r4 G
下几个方面:/ q e5 N$ X' R, M
❑继承了Protocol Buffers的优势 。 Protocol Buffers已在实践中证明了其高效性、 可扩展性、 紧凑性和跨语言特性。 首先, 它允% P2 o( Q2 U k) x% I
许在保持向后兼容性的前提下修改协议, 比如为某个定义好的数据格式添加一个新的字段; 其次, 它支持多种语言, 进而方便用
3 u- m% M: f: P4 N+ }( O: b6 `户为某些服务(比如HDFS的NameNode) 编写 非Java客户端 [14] ; 此外, 实验表明Protocol Buffers比Hadoop 自带的Writable在性能
; I7 ~4 G; n1 F! w+ e方面有很大提升。
* d9 Z( n1 E) S❑支持升级回滚 。 Hadoop 2.0已经将NameNode HA方案合并进来, 在该方案中, Name-Node分为Active和Standby两种角色,- D6 R6 L: X$ A; z
其中, Active NameNode在当前对外提供服务, 而Standby NameNode则是能够在Active NameNode出现故障时接替它。 采用Protocol
# h7 m5 `6 {$ r; qBuffers序列化机制后, 管理员能够在不停止NameNode对外服务的前提下, 通过主备NameNode之间的切换, 依次对主备
: ?8 C! O3 C! r h& b+ D4 vNameNode进行在线升级(不用考虑版本和协议兼容性等问题) 。
, C; Y0 U- g$ p: z: ]8 B x3.3.8 YARN RPC应用实例* ^+ z- `, W$ b! |
为了进一步说明YARN RPC的使用方法, 本小节给出一个具体的应用实例。
- F( F; ~. E5 X/ O3 D, l. g9 r- D在YARN中, ResourceManager和NodeManager之间的通信协议是ResourceTracker, 其中NodeManager是该协议的客户2 C: F# Q! ]/ K3 W* {& U5 [4 H6 O
端, ResourceManager是服务端, NodeManager通过该协议中定义的两个RPC函数( registerNodeManager和nodeHeartbeat) 向- q8 ^9 B* F& N* X, r. [7 g+ M/ h
ResourceManager注册和周期性发送心跳信息。 ResourceManager(服务器端) 中的相关代码如下:
# x' n8 z/ C" n7 S) T% G& A// ResourceTrackerService实现了ResourceTracker通信接口, 并启动RPC Server
: \5 {& R; n2 |9 N; D8 Gpublic class ResourceTrackerService extends AbstractService implements
! V' w5 R) V, A+ YResourceTracker {
" Z& |/ K* V0 y' ~ I) X7 {private Server server;
/ h5 Z4 i$ m) u- N( W! n...
* c( `, ]4 N5 @) [7 [2 l: h" Eprotected void serviceStart() throws Exception {
! d6 |( _! F) k5 O# ysuper.serviceStart();# W/ s j) m% O6 Q, d o! b9 f
Configuration conf = getConfig();7 v/ p( A6 h$ [; |% E
YarnRPC rpc = YarnRPC.create(conf); //使用YarnRPC类
- i+ x0 U( m E- ythis.server = rpc.getServer(ResourceTracker.class, this, resourceTrackerAddress,
2 Q0 j7 Y+ D( ^3 C; L3 b+ econf, null, conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT,9 g- r A( O( H+ J5 Q$ Q
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT)); J; P; p9 m8 }1 A" A/ E
this.server.start();4 ^! y. w# u% b! o9 c
}.- X- V8 f" V" O. _+ }
..
' T" b3 f. i5 |2 P5 _4 [" c@Override
, W8 |& M* d6 h2 o7 j" d+ rpublic RegisterNodeManagerResponse registerNodeManager(
& t% |$ i6 }- URegisterNodeManagerRequest request) throws YarnException,
}7 R5 N: t4 T) ?$ L* oIOException {
+ ~0 z: [& V! }, q5 y$ e1 c) x4 B4 _//具体实现4 _; ?: J% l W
}@0 }# T1 u. g& Q
Override& y! Y, }, U8 N+ W8 v- q& ?
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
8 c. s) k0 M) _: d7 I& F) Uthrows YarnException, IOException {
: ?4 T! ^+ _9 p; C//具体实现
6 j2 ?/ d- ^: W7 v: I8 S}$ c* P, ~- Q! C/ z& T
}/ p+ z0 d8 k8 x" O& }( M0 p
NodeManager(客户端) 中的相关代码如下。" ?. ?2 j5 i. C; h: m; ]% A
// 该函数是从YARN源代码中简单修改而来的
+ I( K j$ z2 g9 E2 b$ j) P, `protected ResourceTracker getRMClient() throws IOException {5 D% {/ p( d) r% }
Configuration conf = getConfig();; G+ B) W* C; e
InetSocketAddress rmAddress = getRMAddress(conf, protocol);6 Z. ~8 E: ]4 }" M
RetryPolicy retryPolicy = createRetryPolicy(conf);
6 i/ H# E. `% P, @. d4 p/ u9 ~ResourceTracker proxy = RMProxy.<T>getProxy(conf, ResourceTracker.class, rmAddress); T: P/ `8 N8 V4 J. d& B5 R, ?9 {1 I
LOG.info("Connecting to ResourceManager at " + rmAddress);4 Q& h% n& {5 a7 j+ \( y
return (ResourceTracker) RetryProxy.create(protocol, proxy, retryPolicy);5 o G- O1 r4 u: M6 d
}.
/ K/ |7 D- ^6 X..
: B4 e" u+ `) l7 i* Sthis.resourceTracker = getRMClient();8 X2 X8 e9 D2 `- T2 n6 M3 i
...
6 }9 Q$ W+ u; k) ~+ Z! q) \: TRegisterNodeManagerResponse regNMResponse = resourceTracker.registerNodeManager(request);# ^9 Q) j7 r& ?: C5 z% F2 O) W
...# ~- i/ }' r B3 F& F
response = resourceTracker.nodeHeartbeat(request);
8 S2 g. @( `& j# ^9 ?& j9 f为了能够让以上代码正常工作, YARN按照以下流程实现各种功能。0 J( V! A% t' W+ R
步骤1 定义通信协议接口( Java Interface) 。 定义通信协议接口ResourceTracker, 它包含registerNodeManager和nodeHeartbeat
* Z- q7 q' L( B& D' {" U% r5 j两个函数, 且每个函数包含一个参数和一个返回值, 具体如下:
1 l/ D8 d: L, k. f3 e Hpublic interface ResourceTracker {1 o" E, v0 p" U
public RegisterNodeManagerResponse registerNodeManager(
: }4 t, @8 @. R2 `9 C8 B, h3 tRegisterNodeManagerRequest request) throws YarnException, IOException;1 ~! z* _8 H4 T+ ^3 c+ v
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
+ ^" y9 A& d7 B( ~# b3 s. \$ C9 ` \throws YarnException, IOException;
6 [& _3 [* N. a- F( W6 P$ N}" A6 M5 L& n6 S; e% U7 c7 x
步骤2 为通信协议ResourceTracker提供Protocol Buffers定义和Java实现。 前面提到, Protocol Buffers仅提供了序列化框架, 但6 m2 j/ f. J; l( d; M
未提供RPC实现, 因此RPC部分需要由用户自己实现, 而YARN则让ResourceTrackerService类实现了ResourceTracker协议, 它的6 [$ ]5 T& g" b, b, d, [
Protocol Buffers定义( 具体见文件ResourceTracker.proto) 如下:9 e0 R# U. s* l$ X( d7 r, k/ t
option java_package = "org.apache.hadoop.yarn.proto";0 B0 ~2 }2 S ~3 ~
option java_outer_classname = "ResourceTracker";
* z* _ P: Q4 d% S" c# |option java_generic_services = true;: Q3 p; d! t% N$ u5 t
option java_generate_equals_and_hash = true;
3 {6 ^! R* w4 F8 v* S3 [5 Ximport "yarn_server_common_service_protos.proto";1 _4 J! W/ T/ l, ~/ L
service ResourceTrackerService {5 V) R2 f E0 r) }7 A! i) I
rpc registerNodeManager(RegisterNodeManagerRequestProto) returns (RegisterNodeManagerResponseProto);
5 r3 I) s* R1 X" V! {+ w8 A {rpc nodeHeartbeat(NodeHeartbeatRequestProto) returns (NodeHeartbeatResponseProto);
" N& ]- D3 c2 X/ a}, S0 Y0 \7 b' I5 T3 q% s& N
ResourceTracker的RPC函数实现是由ResourceManager中的ResourceTrackerService完成的。' d1 F6 ]4 ^& i5 t; b0 ?- L
步骤3 为RPC函数的参数和返回值提供Protocol Buffers定义。 YARN需要保证每个RPC函数的参数和返回值是采用Protocol" S6 T$ K3 k0 h% A
Buffers定义的, 因此ResourceTracker协议中RegisterNodeManagerRequest、 RegisterNodeManagerResponse、 NodeHeartbeatRequest和
) @% k5 ^* g z% k* KNodeHeartbeatResponse四个参数或者返回值需要使用Protocol Buffers定义, 具体如下( 见yarn_server_common_service_protos.proto文
4 @( o% ]. s3 ^4 Q# M+ b件) :. B$ y- \7 c0 x7 k
import "yarn_protos.proto";
- s- p* s, C1 Qimport "yarn_server_common_protos.proto";5 L/ Q5 C0 b( p6 d/ o" g! E e4 Y
message RegisterNodeManagerRequestProto {
% e% e) c) t3 }' soptional NodeIdProto node_id = 1;4 Z0 q: O% i, J, q
optional int32 http_port = 3;5 o b7 g3 }1 r2 X: T
optional ResourceProto resource = 4;
1 K- r2 q5 @# e3 E* F/ w} m _0 Q5 |, s5 G8 y. _7 D2 J
essage RegisterNodeManagerResponseProto {
7 |# `3 W7 n! H% K. Q5 S5 eoptional MasterKeyProto container_token_master_key = 1;/ O6 L! a- O+ ]! H. X( b
optional MasterKeyProto nm_token_master_key = 2;% d- [; r; n. C: ]9 C$ U
optional NodeActionProto nodeAction = 3;
* `4 b" n& p/ n- s Z7 T. Loptional int64 rm_identifier = 4;* V" y1 v: P# S1 D! ?
optional string diagnostics_message = 5;
. H' y( g6 f7 [5 O$ K$ Z}.
* A2 I- n( S5 R& F1 v.. //其他几个参数和返回值的定义
( d# X; s' M) c" p步骤4 为RPC函数的参数和返回值提供Java定义和封装。 YARN采用了Protocol Buffers作为参数和返回值的序列化框架, 且以/ @8 ^0 F! T6 v1 y# U; Q9 y
原生态.proto文件的方式给出了定义, 而具体的Java代码生成需在代码编写之后完成。 基于以上考虑, 为了更容易使用Protocol$ |' T! a9 N% v8 h& I
Buffers生成的( Java语言) 参数和返回值定义, YARN RPC为每个RPC函数的参数和返回值提供Java定义和封装, 以参数" |8 @2 d8 x$ X' _3 j
RegisterNodeManagerRequest为例进行说明。1 M7 S6 `% _3 y* G
Java接口定义如下( 见Java包org.apache.hadoop.yarn.server.api.protocolrecords) :( B" Y* {* ]; n
public interface RegisterNodeManagerRequest {
7 F1 f, W x1 ^ Y \( hNodeId getNodeId();
. g9 x4 e1 G3 T0 L( u& Vint getHttpPort();
3 O* ^% r( h+ m" qResource getResource();
& d) z% |! Y" V4 K( wvoid setNodeId(NodeId nodeId);
r3 i; j5 B5 b8 lvoid setHttpPort(int port);( }1 ]- a' h# m/ }! ]
void setResource(Resource resource);
$ ]- A. @) q/ ~: A& S8 F4 O; C}/ L0 X9 Z/ J6 _+ K6 h6 N5 ^
Java封装如下( 见Java包org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb) :" r; }; C3 G$ l5 V
public class RegisterNodeManagerRequestPBImpl extends b" c. G" f1 z- q" g4 U" n B
ProtoBase<RegisterNodeManagerRequestProto> implements RegisterNodeManagerRequest {5 a8 @* f: s u
RegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance();0 S8 c) a; O% \- m! n' ?
RegisterNodeManagerRequestProto.Builder builder = null;; v+ D' M7 r/ F$ i6 U
private NodeId nodeId = null;5 n/ ?; U) D$ R" y+ z7 ?
...! b& d* g& z4 h+ n
@Override
+ A+ y, T8 M) n0 S3 l0 Ipublic NodeId getNodeId() {& ?4 J8 K: ~( v$ U' o
RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
0 l' @6 s1 S8 v) Q7 q+ oif (this.nodeId != null) {* w' G9 J2 }0 }. W5 L! y+ \( G2 e4 Z$ j
return this.nodeId;0 {9 c) _5 g2 t1 b. E0 I/ c
}i1 C" j3 c {6 q2 {9 j+ ~3 K: D
f (!p.hasNodeId()) {
! {7 ^0 q: `( R6 B. v! G" rreturn null;6 }- u4 M2 s7 Q9 i6 Y9 Q
} t' Y, j M1 I3 z( _- ?2 Z6 s9 O
his.nodeId = convertFromProtoFormat(p.getNodeId());6 S# ~( g. M- G; _" D
return this.nodeId;
+ I2 v7 P: l5 Y* H0 B' ^} @
# T$ O5 Z( f D5 F7 \) f, `) VOverride+ o2 z3 }, q1 N8 n/ }* z
public void setNodeId(NodeId nodeId) {
5 `3 Z: o' V( H9 E. R1 f% WmaybeInitBuilder();1 O2 J. y- N6 V# U
if (nodeId == null)
5 ~2 P) E' X# K; b0 Hbuilder.clearNodeId();5 m0 C- a3 [% [8 R/ a% y6 G9 B+ P3 y
this.nodeId = nodeId;
! [ ?/ Z6 @7 S% Q' Q} .
" W6 T4 G$ ^$ Q9 l& y. m( Y..) u4 v. x, _+ D; H9 S7 b
}
; b# ]/ G ^9 H8 x步骤5 为通信协议提供客户端和服务器端实现。 客户端代码放在org.apache.hadoop.yarn.server.api.impl.pb.client包中, 且类名为7 x# Y7 r% q; M
ResourceTrackerPBClientImpl, 实现如下:
( T! [( N3 P$ ~4 G, G3 mpublic class ResourceTrackerPBClientImpl implements ResourceTracker, Closeable {/ r, _6 ~4 |1 d: ~+ I& Q
private ResourceTrackerPB proxy;* M8 h/ p6 P7 |* ~& D
public ResourceTrackerPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException {9 A# V7 m: y; Z6 Y
RPC.setProtocolEngine(conf, ResourceTrackerPB.class, ProtobufRpcEngine.class);
- i4 u' {; Y# y& \! gproxy = (ResourceTrackerPB)RPC.getProxy(
( w9 W/ L3 m% jResourceTrackerPB.class, clientVersion, addr, conf);# \1 z: K0 S! |( N
} @. _0 Y1 ^% e5 @% i4 o5 }9 m
Override
) @9 O- @$ A. ]% {3 c. @public RegisterNodeManagerResponse registerNodeManager(
u; ~ E I' aRegisterNodeManagerRequest request) throws YarnException,
: z+ U" n% o7 X" k: M1 Z; UIOException {7 t" h# R7 X7 B3 i& D
RegisterNodeManagerRequestProto requestProto = ((RegisterNodeManagerRequestPBImpl)request).getProto();
) s; a/ c9 B) F; Ctry {
( o7 `1 S" N" ]- L6 Ireturn new RegisterNodeManagerResponsePBImpl(proxy.registerNodeManager (null, requestProto));
4 N, O$ n k* p# Z4 `} catch (ServiceException e) {
9 F. s1 I( w, gRPCUtil.unwrapAndThrowException(e);
- ^6 v3 W* Y3 \0 ^: preturn null;5 y& D a# G8 i5 m
}
2 J" V, Z' i* R( }$ w3 d1 L1 I} .
7 |& H' B8 n c- S% n..
$ g: {# ?5 A5 k. e/ [) L6 ]) o}
3 T. \$ g& b) b7 |. [2 ^& e: c服务端代码放在org.apache.hadoop.yarn.server.api.impl.pb.server包中, 且类名为Resource-TrackerPBServerImpl, 实现如下:7 x$ L* U% s0 Q- O" N W$ C
public class ResourceTrackerPBServiceImpl implements ResourceTrackerPB {
Z0 H2 q' A) Z6 d; w: \: v$ kprivate ResourceTracker real;
; {2 Z% B5 Z( Cpublic ResourceTrackerPBServiceImpl(ResourceTracker impl) {, U1 D/ r4 Q# R. C% o7 T5 I$ Y
this.real = impl;
0 \, N0 _3 x+ A; Z% F} @8 I: O) E" Q) t6 m$ h/ W
Override& C$ _: H. p% |
public RegisterNodeManagerResponseProto registerNodeManager(& B& l, Q( C/ s8 g
RpcController controller, RegisterNodeManagerRequestProto proto)
+ y5 _9 O0 z/ }* O9 N3 Wthrows ServiceException {2 n2 W" m& P5 P4 T) ]0 B
RegisterNodeManagerRequestPBImpl request = new RegisterNodeManagerRequestPBImpl(proto);
5 g1 E' Q) W0 C5 Ltry {; y% I9 [3 ]. t/ C! n# m$ y4 o
RegisterNodeManagerResponse response = real.registerNodeManager(request);
7 b# }! {1 O* V7 Q# \; @8 d% x! ^return ((RegisterNodeManagerResponsePBImpl)response).getProto();
" \4 w9 }; q, ]6 s} catch (YarnException e) {& ~' d# {" a' f+ L F- b
throw new ServiceException(e);3 B/ ^8 u6 N$ n! d
} catch (IOException e) {0 _. w5 y @1 ~ _9 l2 H
throw new ServiceException(e);
" e, _% U2 V+ R; A}
" E( q e) |* P* A+ @& v% u; h0 H) U} .9 [! }4 R9 l+ |: x& m% Z8 m
..
1 O$ D. q/ K! \/ |" z; R( Y5 r}/ }0 q( d% E+ g
总结上面几个步骤, 为了实现基于Protocol Buffers序列化框架的YARN RPC通信协议ResourceTracker, YARN实现了一系列
) Z" f' b& d; |6 oJava接口定义和Protocol Buffers封装, 具体如图3-12所示( 以服务器端实现为例) 。1 ]( e- c8 d4 |# {, K/ l6 a, Q
图3-12 YARN RPC中的Protocol Buffers封装
% ]0 k4 \) t) X" k[6] 参见网址http://en.wikipedia.org/wiki/Remote_procedure_call。
8 q7 x+ w" `1 U, m# B+ ~# i[7] Doug Cutting在Hadoop最初设计时就是这样描述Hadoop RPC设计动机的。
: c5 _; j7 c* {0 o# t[8] HDFS的单点故障已经在Hadoop 2.0中得到了解决, MRv1中的JobTracker的单点故障在CDH4中也得到了解决。' x; y' L$ r& j3 \
[9] 参见网址http://thrift.apache.org/。
( O/ Y& r5 l8 m4 X1 u[10] 参见网址: http://code.google.com/p/protobuf/wiki/ThirdParty/AddOns。
1 W! c0 Z% e, z: i[11] AvroRpcEngine从Hadoop 0.21.0版本开始出现。
3 n( E1 |: J1 s$ ~/ e/ e[12] ProtobufRpcEngine从Hadoop 2.0-apha版本开始出现。! Z% N* c) K- f7 B3 Y
[13] 参见网址https://issues.apache.org/jira/browse/HADOOP-7347。% W% W3 z! E" [' H3 L' k9 d) J
[14] Hadoop 2.0中的RFC框架是采用Java编写的, 尚不能像Thrift和Avro那样支持多语言编程, 但引入Protocol Buffers序列化框架则
# n8 b, P4 M8 b, Q! f9 C6 J9 g使其向前迈进了一步。 , E1 W2 b* T% f! f6 M7 B4 Z
- p' ~, d i4 W6 x( r" Y
; s8 p, x5 h0 M6 T/ p |
|