|
3.3.5 Hadoop RPC类详解/ U) J! {0 M# [' G3 A( F, \
Hadoop RPC主要由三个大类组成, 即RPC、 Client和Server, 分别对应对外编程接口、 客户端实现和服务器实现。2 B% }# p; ]# y" a) Y" n
1.ipc.RPC类分析; ~4 F) |% ~% w3 }
RPC类实际上是对底层客户机–服务器网络模型的封装, 以便为程序员提供一套更方便简洁的编程接口。4 a9 ^$ q: b/ h' N7 J( f
如图3-4所示, RPC类定义了一系列构建和销毁RPC客户端的方法, 构建方法分为getProxy和waitForProxy两类, 销毁方只有一2 f: J) _( c/ Q( h( _/ u
个, 即为stopProxy。 RPC服务器的构建则由静态内部类RPC.Builder, 该类提供了一些列setXxx方法( Xxx为某个参数名称) 供用户1 I( W$ P- J! Z( K s4 m# Q0 y
设置一些基本的参数, 比如RPC协议、 RPC协议实现对象、 服务器绑定地址、 端口号等, 一旦设置完成这些参数后, 可通过调用9 m; k. p& K# O0 M
RPC.Builder.build()完成一个服务器对象的构建, 之后直接调用Server.start()方法便可以启动该服务器。
3 e& [" A: U. }9 f. W5 W与Hadoop 1.x中的RPC仅支持基于Writable序列化方式不同, Hadoop 2.x允许用户使用其他序列化框架, 比如Protocol Buffers
, h- W8 B. v B3 r5 N: a) D" I等, 目前提供了Writable( WritableRpcEngine) 和Protocol Buffers( ProtobufRpcEngine) 两种, 默认实现是Writable方式, 用户可通过
L4 `9 \7 P$ D$ @$ r; p9 i调用RPC.setProtocolEngine(…)修改采用的序列化方式。1 `8 ~/ A5 b) q" z+ x8 d6 K* X
下面以采用Writable序列化为例( 采用Protocol Buffers的过程类似) , 介绍Hadoop RPC的远程过程调用流程。 Hadoop RPC使用6 q/ H$ d6 ]( N: K4 _! L
了Java动态代理完成对远程方法的调用: 用户只需实现java.lang.reflect.InvocationHandler接口, 并按照自己需求实现invoke 方法即可( I: B1 e! F# G* N+ Q" ^
完成动态代理类对象上的方法调用。 但对于Hadoop RPC, 函数调用由客户端发出, 并在服务器端执行并返回, 因此不能像单机( B1 o. {$ \) q5 \" S" S; X
程序那样直接在invoke 方法中本地调用相关函数, 它的做法是, 在invoke方法中, 将函数调用信息( 函数名, 函数参数列表等)
0 Z5 `1 V9 O' e6 F- x" b: }, z打包成可序列化的WritableRpcEngine.Invocation对象, 并通过网络发送给服务器端, 服务端收到该调用信息后, 解析出和函数名,
" Y2 }: L3 a3 ^- _函数参数列表等信息, 利用Java反射机制完成函数调用, 期间涉及到的类关系如下图所示。
' ~* y9 x4 y8 I9 @图3-4 HadoopRPC的主要类关系图
7 E' [! N# x" r# {2 G3 r5 Q图3-5 HadoopRPC中服务器端动态代理实现类图, X2 d5 m( o0 @$ I
2.ipc.Client3 S: `! F/ k0 H1 D9 Q
Client主要完成的功能是发送远程过程调用信息并接收执行结果。 它涉及到的类关系如图3-6所示。 Client类对外提供了一类执
$ E8 g+ Q2 z0 x/ w" C9 L3 V+ }行远程调用的接口, 这些接口的名称一样, 仅仅是参数列表不同, 比如其中一个的声明如下所示:
- q; d. I7 S1 {$ l* u2 Fpublic Writable call(Writable param, ConnectionIdremoteId)7 _, |2 T! E) r, ` d: {
throws InterruptedException, IOException;
/ D& ], N. o# w4 ?图3-6 Client类图
2 g+ Z7 Y" f4 \' pClient内部有两个重要的内部类, 分别是Call和Connection。, O: @8 f: O/ |, X' J1 j
❑Call类 : 封装了一个RPC请求, 它包含5个成员变量, 分别是唯一标识id、 函数调用信息param、 函数执行返回值value、 出
* p4 g% }: q) v9 K错或者异常信息error和执行完成标识符done。 由于Hadoop RPC Server采用异步方式处理客户端请求, 这使远程过程调用的发生顺
/ ~: Q+ [& V, E. h& J' C序与结果返回顺序无直接关系, 而Client端正是通过id识别不同的函数调用的。 当客户端向服务器端发送请求时, 只需填充id和, \5 R& O4 ?0 M# w% I% J9 \( T
param两个变量, 而剩下的3个变量( value、 error和done) 则由服务器端根据函数执行情况填充。. h7 w, G; n* |/ }8 p
❑Connection类 : Client与每个Server之间维护一个通信连接, 与该连接相关的基本信息及操作被封装到Connection类中, 基本
) ~; P1 ]# N- l5 z& L1 r- O s信息主要包括通信连接唯一标识( remoteId) 、 与Server端通信的Socket( socket) 、 网络输入数据流( in) 、 网络输出数据流, e4 i8 K1 Y$ p$ K. F# V* t
( out) 、 保存RPC请求的哈希表( calls) 等。 操作则包括:
8 G6 Z# z* N+ ]$ A& p) ~! m4 }' a❍addCall—将一个Call对象添加到哈希表中;$ j" m! f, c' J8 u# n. R* C4 w
❍sendParam—向服务器端发送RPC请求;
, ~* s* U8 S; h3 P2 Z5 b ?4 d❍receiveResponse —从服务器端接收已经处理完成的RPC请求;
. H$ N& W8 h8 S$ L" L% b❍run—Connection是一个线程类, 它的run方法调用了receiveResponse方法, 会一直等待接收RPC返回结果。5 b& u# P* p& V5 q$ @/ b
当调用call函数执行某个远程方法时, Client端需要进行( 如图3-7所示) 以下4个步骤。
* k! J' ^- @* w) _) B+ n) T1) 创建一个Connection对象, 并将远程方法调用信息封装成Call对象, 放到Connection对象中的哈希表中;
: U) \! q* m& u m2) 调用Connection类中的sendRpcRequest()方法将当前Call对象发送给Server端;. H4 s8 m7 A8 Y' R& d( \+ [1 M* `
3) Server端处理完RPC请求后, 将结果通过网络返回给Client端, Client端通过receiveRpcResponse()函数获取结果;
; G3 y3 t) w3 W/ Z D- W/ D! O; [2 a7 @4) Client检查结果处理状态( 成功还是失败) , 并将对应Call对象从哈希表中删除。: N. R9 Q# Y+ V
图3-7 Hadoop RPC Client处理流程6 G+ E$ y$ b3 U+ G2 z
3.ipc.Server类分析8 B+ j( m3 O6 A" x5 J0 r- o
Hadoop采用了Master/Slave结构, 其中Master是整个系统的单点, 如NameNode或 JobTracker [8] , 这是制约系统性能和可扩展
& V7 U9 f3 I6 [! s- K性的最关键因素之一; 而Master通过ipc.Server接收并处理所有Slave发送的请求, 这就要求ipc.Server 将高并发和可扩展性作为设计# w2 ^9 M, x5 f- q
目标。 为此, ipc.Server采用了很多提高并发处理能力的技术, 主要包括线程池、 事件驱动和Reactor设计模式等, 这些技术均采用6 R6 c4 E$ C2 H
了JDK自带的库实现, 这里重点分析它是如何利用Reactor设计模式提高整体性能的。5 H; t/ A9 I9 u& X4 u
Reactor是并发编程中的一种基于事件驱动的设计模式, 它具有以下两个特点: 通过派发/分离I/O操作事件提高系统的并发性% p3 r0 V- U% q/ Y) g$ V
能; 提供了粗粒度的并发控制, 使用单线程实现, 避免了复杂的同步处理。 典型的Reactor实现原理如图3-8所示。$ n( F; @4 H1 V* Z; |9 g$ ^. D
图3-8 Reactor模式工作原理
- s4 S' _9 s) P/ O0 a7 L典型的Reactor模式中主要包括以下几个角色。
$ p" j1 }3 R( X( B; r❑Reactor: I/O事件的派发者。
% I. ?$ Q$ T5 @1 V❑Acceptor: 接受来自Client的连接, 建立与Client对应的Handler, 并向Reactor注册此Handler。0 ]0 s2 Y% l u( L' L$ x6 U
❑Handler: 与一个Client通信的实体, 并按一定的过程实现业务的处理。 Handler内部往往会有更进一步的层次划分, 用来抽
/ g6 i7 L- G+ V象诸如read、 decode、 compute、 encode和send等过程。 在Reactor模式中, 业务逻辑被分散的I/O事件所打破, 所以Handler需要有适% I/ r1 ?- p6 d3 C n; Y( }
当的机制在所需的信息还不全( 读到一半) 的时候保存上下文, 并在下一次I/O事件到来的时候( 另一半可读) 能继续上次中断5 ~/ }2 b# m& I5 u2 \: o8 m T
的处理。( H0 Z5 P* e- ?
❑Reader/Sender: 为了加速处理速度, Reactor模式往往构建一个存放数据处理线程的线程池, 这样数据读出后, 立即扔到线
2 o9 n5 S9 w" ]! q( L* j程池中等待后续处理即可。 为此, Reactor模式一般分离Handler中的读和写两个过程, 分别注册成单独的读事件和写事件, 并由对
9 i6 L/ l' G7 W! g9 V; a, r1 [$ V1 S应的Reader和Sender线程处理。3 d7 d( L$ l+ }1 G% K% M- E
ip
9 h4 Q) x/ m @. v) O% {" Tc.Server实际上实现了一个典型的Reactor设计模式, 其整体架构与上述完全一致。 一旦读者了解典型Reactor架构便可很容易
% L1 L) `- F- O0 }7 ?地学习ipc.Server的设计思路及实现。 接下来, 我们分析ipc.Server的实现细节。
' q' n3 ]. ]* a* {) U% l# u' F前面提到, ipc.Server的主要功能是接收来自客户端的RPC请求, 经过调用相应的函数获取结果后, 返回给对应的客户端。 为! d4 d8 e# l& q* u, S
此, ipc.Server被划分成3个阶段: 接收请求、 处理请求和返回结果, 如图3-9所示。 各阶段实现细节如下。
, p4 ?9 ?' L& @3 Z$ c图3-9 Hadoop RPC Server处理流程9 n8 c# P( A; n" U; v8 [
( 1) 接收请求
( u9 W0 g2 b+ {7 t该阶段主要任务是接收来自各个客户端的RPC请求, 并将它们封装成固定的格式( Call类) 放到一个共享队列( callQueue)- E* N5 }* p' N" ~. \
中, 以便进行后续处理。 该阶段内部又分为建立连接和接收请求两个子阶段, 分别由Listener和Reader两种线程完成。
& K" g) G' j4 Q整个Server只有一个Listener线程, 统一负责监听来自客户端的连接请求, 一旦有新的请求到达, 它会采用轮询的方式从线程
9 m3 @2 U1 G* z4 K+ ^ O池中选择一个Reader线程进行处理, 而Reader线程可同时存在多个, 它们分别负责接收一部分客户端连接的RPC请求, 至于每个
( d* { E- q( b" t, RReader线程负责哪些客户端连接, 完全由Listener决定, 当前Listener只是采用了简单的轮询分配机制。: T6 y: l/ b+ T+ \- `0 O
Listener和Reader线程内部各自包含一个Selector对象, 分别用于监听SelectionKey.OP_ACCEPT和SelectionKey.OP_READ事件。) W! O! @6 r( v K
对于Listener线程, 主循环的实现体是监听是否有新的连接请求到达, 并采用轮询策略选择一个Reader线程处理新连接; 对于
- t) b I: h1 ]/ @! jReader线程, 主循环的实现体是监听( 它负责的那部分) 客户端连接中是否有新的RPC请求到达, 并将新的RPC请求封装成Call对
6 M8 j- T# W* |9 {) v象, 放到共享队列callQueue中。$ T W( E9 }2 D8 L1 G/ `) ^/ D+ D2 X
( 2) 处理请求( k( E5 x2 _' S3 H" N- B' D
该阶段主要任务是从共享队列callQueue中获取Call对象, 执行对应的函数调用, 并将结果返回给客户端, 这全部由Handler线- _ a8 B* \3 c2 \9 o4 j4 i
程完成。
; V Z5 S2 h9 o' Q6 E' p( ]Server端可同时存在多个Handler线程, 它们并行从共享队列中读取Call对象, 经执行对应的函数调用后, 将尝试着直接将结果+ H+ u, M2 M c# h
返回给对应的客户端。 但考虑到某些函数调用返回结果很大或者网络速度过慢, 可能难以将结果一次性发送到客户端, 此时9 @! L( K0 Y: v9 t9 S& e
Handler将尝试着将后续发送任务交给Responder线程。
$ W+ Q7 V' |$ X4 l( 3) 返回结果' t+ Q: i o0 }" m
前面提到, 每个Handler线程执行完函数调用后, 会尝试着将执行结果返回给客户端, 但对于特殊情况, 比如函数调用返回结
7 E+ N1 ]9 I* q( F# P9 K" i8 F; O7 ~果过大或者网络异常情况( 网速过慢) , 会将发送任务交给Responder线程。6 d; V0 m( ~4 k* k! z( B3 D
Server端仅存在一个Responder线程, 它的内部包含一个Selector对象, 用于监听SelectionKey.OP_WRITE事件。 当Handler没能将8 C1 J3 b/ Y2 N! o/ X
结果一次性发送到客户端时, 会向该Selector对象注册SelectionKey.OP_WRITE事件, 进而由Responder线程采用异步方式继续发送
; x: W) N4 n6 h2 [$ e4 G3 c2 `未发送完成的结果。3 t8 u8 N; p* X0 w0 H" B! P
3.3.6 Hadoop RPC参数调优
% S! c. M0 }+ ?; A- bHadoop RPC对外提供了一些可配置参数, 以便于用户根据业务需求和硬件环境对其进行调优。 主要的配置参数如下。* R% d: h* ~' R2 a
❑Reader线程数目。 由参数ipc.server.read.threadpool.size配置, 默认是1, 也就是说, 默认情况下, 一个RPC Server只包含一个
* q" U7 u- v, p" Q9 lReader线程。
, {" B6 O! `& {❑每个Handler线程对应的最大Call数目。 由参数ipc.server.handler.queue.size指定, 默认是100, 也就是说, 默认情况下, 每个
Y! R" V, r# i0 sHandler线程对应的Call队列长度为100。 比如, 如果Handler数目为10, 则整个Call队列( 即共享队列callQueue) 最大长度为:
& o* u$ a5 e+ \100×10=1000。" A+ \; [5 a6 e' M, C
❑Handler线程数目。 在Hadoop中, ResourceManager和NameNode分别是YARN和HDFS两个子系统中的RPC Server, 其对应的
0 H6 x0 P# ?0 U. y7 I, a& e% J0 u! D, uHandler数目分别由参数yarn.resourcemanager.resource-tracker.client.thread-count和dfs.namenode.service.handler.count指定, 默认值分别为5 o4 L( [* X* u' }+ v% A B
50和10, 当集群规模较大时, 这两个参数值会大大影响系统性能。- E8 T C. O$ G
❑客户端最大重试次数。 在分布式环境下, 因网络故障或者其他原因迫使客户端重试连接是很常见的, 但尝试次数过多可
2 _* d4 w) r" h: H$ d/ y) b能不利于对实时性要求较高的应用。 客户端最大重试次数由参数ipc.client.connect.max.retries指定, 默认值为10, 也就是会连续尝试% B- J! U% U( D8 ^
10次( 每两次之间相隔1秒) 。9 n# @0 ?- w3 |; ?9 H
3.3.7 YARN RPC实现
7 H" G* E% v: h8 c9 u3 h% n* h当前存在非常多的开源RPC框架, 比较有名 的有Thrift [9] 、 Protocol Buffers和Avro。 同Hadoop RPC一样, 它们均由两部分组
, O; w# l4 P8 @/ }9 M3 }2 O1 w成: 对象序列化和远程过程调用( Protocol Buflers官方仅提供了序列化实现, 未提供远程调用相关实现, 但三 方RPC库非常多 [10]
9 y2 Z$ C0 N0 g- L6 M- S) 。 相比于Hadoop RPC, 它们有以下几个特点:
6 l- G2 m6 F2 @2 E0 ~6 _9 ~ V' m❑跨语言特性 。 前面提到, RPC框架实际上是客户机–服务器模型的一个应用实例, 对于Hadoop RPC而言, 由于Hadoop采用2 F1 p6 n& Q+ p% Q0 p& V
Java语言编写, 因而其RPC客户端和服务器端仅支持Java语言; 但对于更通用的RPC框架, 如Thrift或者Protocol Buffers等, 其客户
3 G0 G! E: G2 J端和服务器端可采用任何语言编写, 如Java、 C++、 Python等, 这给用户编程带来极大方便。( x3 G: w: s' k/ B& k( X7 x
❑引入IDL 。 开源RPC框架均提供了一套接口描述语言( Interface Description Language, IDL) , 它提供一套通用的数据类型,0 D7 m6 f- d0 _+ B) }* p: M4 u
并以这些数据类型来定义更为复杂的数据类型和对外服务接口。 一旦用户按照IDL定义的语法编写完接口文件后, 可根据实际应9 j% \7 y& M0 @
用需要生成特定编程语言( 如Java、 C++、 Python等) 的客户端和服务器端代码。
+ x4 @# C# P/ _* x❑协议兼容性 。 开源RPC框架在设计上均考虑到了协议兼容性问题, 即当协议格式发生改变时, 比如某个类需要添加或者
3 [% O, D" z: v$ G) P删除一个成员变量( 字段) 后, 旧版本代码仍然能识别新格式的数据, 也就是说, 具有向后兼容性。
( d: ^$ C1 Z* L2 Z5 Q( D随着Hadoop版本的不断演化, 研发人员发现Hadoop RPC在跨语言支持和协议兼容性两个方面存在不足, 具体表现为:; G& f' K, R( Z) U8 e! L
❑从长远发展看, Hadoop RPC应允许某些协议的客户端或者服务器端采用其他语言实现, 比如用户希望直接使用C/C++语言
5 w9 e0 F, ^2 Q; f读写HDFS中的文件, 这就需要有C/C++语言的HDFS客户端。
! W# O) ]4 `* Z0 F0 V: |1 _❑当前Hadoop版本较多, 而不同版本之间不能通信, 比如0.20.2版本的JobTracker不能与0.21.0版本中的TaskTracker通信, 如
8 r# G$ d1 S4 C0 i果用户企图这样做, 会抛出VersionMismatch异常。
/ p6 }. ?& S0 H8 ]; p; f6 j为了解决以上几个问题, Hadoop YARN将RPC中的序列化部分剥离开, 以便将现有的开源RPC框架集成进来。 经过改进之3 G- k* o0 n( N2 ]9 k4 f
后, Hadoop RPC的类关系如图3-10所示, RPC类变成了一个工厂, 它将具体的RPC实现授权给RpcEngine实现类, 而现有的开源
' r1 m1 G: d& l* V" j8 @: ?' @: K- q0 xRPC只要实现RpcEngine接口, 便可以集成到Hadoop RPC中。 在该图中, WritableRpcEngine是采用Hadoop自带的序列化框架实现的
+ D# u+ \ h. cRPC, 而 AvroRpcEngine [11] 和 ProtobufRpcEngine [12] 分别是开源RPC( 或序列化) 框架Apache Avro和Protocol Buffers对应的; K/ n- K, ^8 b
RpcEngine实现, 用户可通过配置参数rpc.engine.{protocol}以指定协议{protocol}采用的序列化方式。 需要注意的是, 当前实现, I4 W V7 M0 G/ N
中, Hadoop RPC只是采用了这些开源框架的序列化机制, 底层的函数调用机制仍采用Hadoop自带的。
+ H! o! i3 H: m- j$ p: @YARN提供的对外类是YarnRPC, 用户只需使用该类便可以构建一个基于Hadoop RPC且采用Protocol Buffers序列化框架的通信
~ U! m3 j0 P' M2 y协议。 YarnRPC相关实现类如图3-11所示。 YarnRPC是一个抽象类, 实际的实现由参数yarn.ipc.rpc.class指定, 默认值是. W$ v5 I2 e4 h" D
org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC。 HadoopYarnProtoRPC通过RPC工厂生成器( 工厂设计模式) RpcFactoryProvider生
4 U) Q4 z: I+ S7 b9 \成客户端工厂( 由参数yarn.ipc.client.factory.class指定, 默认值是org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl) 和服
: M+ _8 {! x, _' x务器工厂( 由参数yarn.ipc.server.factory.class指定, 默认值是org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl) , 以根1 R. n% a' x p! D4 q r
据通信协议的Protocol Buffers定义生成客户端对象和服务器对象。
2 {% p& Q9 L6 q$ t1 K图3-10 Hadoop RPC 集成多种开源RPC 框架
: b S m5 J U$ B5 g/ W7 R4 q' P图3-11 YarnRPC 相关类图5 \ ]; s; F8 v8 w$ j7 @+ }3 l
❑RpcClientFactoryPBImpl: 根据通信协议接口( 实际上就是一个Java interface) 及Protocol Buffers定义构造RPC客户端句柄, 但" }0 y& Y0 D7 h1 L
它对通信协议的存放位置和类名命有一定要求。 假设通信协议接口Xxx所在Java包名为XxxPackage, 则客户端实现代码必须位于" G) e/ S/ {' N% \( m
Java包XxxPackage.impl.pb.client中( 在接口包名后面增加".impl.pb.client") , 且实现类名为PBClientImplXxx( 在接口名前面增加前
- R9 H4 l1 q5 q) S1 }缀"PBClientImpl") 。
8 u, j( Y' f% }. f7 B) Y❑RpcServerFactoryPBImpl: 根据通信协议接口( 实际上就是一个Java interface) 及Protocol Buffers定义构造RPC服务器句柄3 | e r1 d$ I1 x7 X/ _' o! }* Q
(具体会调用前面节介绍的RPC.Server类) , 但它对通信协议的存放位置和类命名有一定要求。 假设通信协议接口Xxx所在Java包
9 E+ u& P7 Q& c! ?& w$ H名为XxxPackage, 则客户端实现代码必须位于Java包XxxPackage.impl.pb.server中(在接口包名后面增加".impl.pb.server") , 且实现! Q3 |; ?! g* u) t$ ~2 v- G
类名为PBServiceImplXxx(在接口名前面增加前缀"PBServiceImpl") 。: r" M8 o% V, I
Hadoop YARN已将Protocol Buffers作为默认 的序列化机制 [13] (而不是Hadoop自带的Writable) , 这带来的好处主要表现在以6 }9 b, ~, |- K- {
下几个方面:
, l, a- Z5 b4 {9 r❑继承了Protocol Buffers的优势 。 Protocol Buffers已在实践中证明了其高效性、 可扩展性、 紧凑性和跨语言特性。 首先, 它允1 [% S4 Q. W( A; Z
许在保持向后兼容性的前提下修改协议, 比如为某个定义好的数据格式添加一个新的字段; 其次, 它支持多种语言, 进而方便用
4 R7 v' R' s* Y4 w- S户为某些服务(比如HDFS的NameNode) 编写 非Java客户端 [14] ; 此外, 实验表明Protocol Buffers比Hadoop 自带的Writable在性能8 t9 p i5 X* \6 ^
方面有很大提升。: j+ L! L4 R% T, d5 h9 V1 }6 a) a
❑支持升级回滚 。 Hadoop 2.0已经将NameNode HA方案合并进来, 在该方案中, Name-Node分为Active和Standby两种角色," I& j) Y. B. I2 u$ d0 T. T% p- k1 @
其中, Active NameNode在当前对外提供服务, 而Standby NameNode则是能够在Active NameNode出现故障时接替它。 采用Protocol" l- u( T; d2 r& _: Y
Buffers序列化机制后, 管理员能够在不停止NameNode对外服务的前提下, 通过主备NameNode之间的切换, 依次对主备3 j) c3 _6 t: w4 @& |& d
NameNode进行在线升级(不用考虑版本和协议兼容性等问题) 。
5 L9 O: G* F/ Z3.3.8 YARN RPC应用实例
7 {0 b' \0 Z+ @2 h% j/ `为了进一步说明YARN RPC的使用方法, 本小节给出一个具体的应用实例。
, J( b& e# E. T) x& U& y7 x" D在YARN中, ResourceManager和NodeManager之间的通信协议是ResourceTracker, 其中NodeManager是该协议的客户9 y' U" a! T9 v* I8 a, g& K
端, ResourceManager是服务端, NodeManager通过该协议中定义的两个RPC函数( registerNodeManager和nodeHeartbeat) 向
# |. T, s. l. s) W% FResourceManager注册和周期性发送心跳信息。 ResourceManager(服务器端) 中的相关代码如下:: L- g N; w$ O) {7 P
// ResourceTrackerService实现了ResourceTracker通信接口, 并启动RPC Server' n5 Y, r+ B" m, u! _$ F7 l2 k1 U
public class ResourceTrackerService extends AbstractService implements
% l( R0 P0 ]8 VResourceTracker {0 ?) Y5 A! D0 A6 m$ z3 }
private Server server;/ J# a5 h4 c8 [
...
" v- {* ], P- u' c5 _* Nprotected void serviceStart() throws Exception {
1 Y. N V2 m9 A/ \8 A8 nsuper.serviceStart();
: F7 o2 A: n. q- @, _: ^' iConfiguration conf = getConfig();, H& f8 N6 V; k/ Z) x0 T
YarnRPC rpc = YarnRPC.create(conf); //使用YarnRPC类- u/ g4 X; ^/ Q5 T3 ]
this.server = rpc.getServer(ResourceTracker.class, this, resourceTrackerAddress, b- \4 d) r$ S" Q6 X8 D
conf, null, conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT,: k1 L: D, Q- q) o5 ]% e% V
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT));
3 e2 C8 u/ H' C+ z9 O% T2 X' pthis.server.start();& y* e9 D$ @6 X" }8 F, F2 j: p( m1 t. q
}.
+ o: L5 O+ H4 e; Y; K" p: d..+ |$ [- o& f: O/ R3 k/ j/ C4 t
@Override( T0 Y% R& ?1 g
public RegisterNodeManagerResponse registerNodeManager(
' e* `7 Y7 ?! _& \RegisterNodeManagerRequest request) throws YarnException,
7 r6 q; }! Y+ k& wIOException {3 L- t' _+ Z0 [
//具体实现
) M0 S t- E6 J7 Q8 t( _8 C+ B}@' w6 X/ ]4 C7 K( w
Override3 g. m5 W3 b. J/ H6 n% z0 R5 y0 L
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)1 G2 R0 w" c9 l6 Y4 v
throws YarnException, IOException {
" v4 B7 w3 F8 J$ ^8 e$ \& p. C//具体实现# A4 g2 G6 F$ @" y5 `
}
' q. g2 f' E: T5 L: j: Z0 Q* D}5 z( m7 Z, Z: g$ B1 A/ r
NodeManager(客户端) 中的相关代码如下。3 d1 H/ R4 {! [. b' P
// 该函数是从YARN源代码中简单修改而来的
4 Z* `8 N; M+ z1 k3 o8 g. o% U1 Aprotected ResourceTracker getRMClient() throws IOException {+ z% Z3 E" ~! R$ w/ v
Configuration conf = getConfig();
' R2 ]1 K4 o' G& h, ~InetSocketAddress rmAddress = getRMAddress(conf, protocol);
/ z. y6 N7 m4 `% f6 L/ v% NRetryPolicy retryPolicy = createRetryPolicy(conf);* S& L6 I, A8 t% q
ResourceTracker proxy = RMProxy.<T>getProxy(conf, ResourceTracker.class, rmAddress);& g7 b/ V& o/ W( q' Z
LOG.info("Connecting to ResourceManager at " + rmAddress);
3 i, V6 H& l: V* i, \8 mreturn (ResourceTracker) RetryProxy.create(protocol, proxy, retryPolicy);6 Q& y4 q5 y- L5 V8 j4 `" W( _1 h5 w
}., k0 O7 V3 s5 Y
..
( P+ Z4 A* ?2 G6 x9 e0 h) dthis.resourceTracker = getRMClient();9 K0 t s! A2 ?" t3 Z$ {* }0 u0 {
...
5 w- ]8 o9 M* q4 n- fRegisterNodeManagerResponse regNMResponse = resourceTracker.registerNodeManager(request);
" P& v1 _! l' v8 [...
( \; a2 w* A1 D$ {6 h5 O* Kresponse = resourceTracker.nodeHeartbeat(request);
1 G, \- r/ P( j) d5 f为了能够让以上代码正常工作, YARN按照以下流程实现各种功能。" u+ f, F/ I$ t% m/ B
步骤1 定义通信协议接口( Java Interface) 。 定义通信协议接口ResourceTracker, 它包含registerNodeManager和nodeHeartbeat
1 J1 a* ~& K# R8 L" F( {两个函数, 且每个函数包含一个参数和一个返回值, 具体如下:& `3 @. E) e" T! I I
public interface ResourceTracker {
0 m" I! @8 l8 M1 qpublic RegisterNodeManagerResponse registerNodeManager(4 d: _, v9 i3 @" H: X4 [4 q
RegisterNodeManagerRequest request) throws YarnException, IOException;+ p! X- \4 _3 t3 K7 K% n
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)5 G8 l9 b9 U9 g7 W# s. M
throws YarnException, IOException;! D# a+ w/ B5 ^% Q; j
}6 C4 O3 {' w, e5 ]6 A
步骤2 为通信协议ResourceTracker提供Protocol Buffers定义和Java实现。 前面提到, Protocol Buffers仅提供了序列化框架, 但2 d+ H' M* m" o( W
未提供RPC实现, 因此RPC部分需要由用户自己实现, 而YARN则让ResourceTrackerService类实现了ResourceTracker协议, 它的$ W J$ L7 J% M9 t* B
Protocol Buffers定义( 具体见文件ResourceTracker.proto) 如下:
: n' E6 y; t) o+ P5 uoption java_package = "org.apache.hadoop.yarn.proto";
% i( G3 M6 X7 D! zoption java_outer_classname = "ResourceTracker";( k9 J! l# Q0 I
option java_generic_services = true;0 `( s* D9 q; U6 ^ |
option java_generate_equals_and_hash = true;; H+ J8 O4 K6 E! j* p C8 V
import "yarn_server_common_service_protos.proto";
( }' B8 z1 E7 uservice ResourceTrackerService {
$ {: c z4 s# c2 q/ r# Y {rpc registerNodeManager(RegisterNodeManagerRequestProto) returns (RegisterNodeManagerResponseProto);
0 a6 u; N, l) ^rpc nodeHeartbeat(NodeHeartbeatRequestProto) returns (NodeHeartbeatResponseProto);. d2 N4 R6 y: k. B2 T
}
- v3 C& S2 T0 N& n$ d- U2 \ _ResourceTracker的RPC函数实现是由ResourceManager中的ResourceTrackerService完成的。3 Y) a6 z# t! H3 B8 x2 g
步骤3 为RPC函数的参数和返回值提供Protocol Buffers定义。 YARN需要保证每个RPC函数的参数和返回值是采用Protocol
4 Y* {/ g" Y/ b( r: qBuffers定义的, 因此ResourceTracker协议中RegisterNodeManagerRequest、 RegisterNodeManagerResponse、 NodeHeartbeatRequest和' J, F3 B9 p7 T( @: _6 k6 O, H
NodeHeartbeatResponse四个参数或者返回值需要使用Protocol Buffers定义, 具体如下( 见yarn_server_common_service_protos.proto文
, k5 o& o( p! o3 K件) :8 m) q; j2 a2 v' z- }/ p
import "yarn_protos.proto";/ u; H6 F, R1 z+ r. @3 R8 f
import "yarn_server_common_protos.proto";
$ [9 E7 ~- M+ r6 @# |! Ymessage RegisterNodeManagerRequestProto {- r1 @; Z; Y$ X/ k
optional NodeIdProto node_id = 1;( `) r6 ?/ O1 H
optional int32 http_port = 3;0 t5 @" {* c6 @! Q
optional ResourceProto resource = 4;
/ L3 m9 s! {: [} m
0 X7 o9 Y$ J' `& vessage RegisterNodeManagerResponseProto {
: ~/ S: B, z/ t) L9 y8 z* Eoptional MasterKeyProto container_token_master_key = 1;- B# \" @9 E/ l+ c
optional MasterKeyProto nm_token_master_key = 2;
/ g% w! G @; w4 Yoptional NodeActionProto nodeAction = 3;
9 z! i' h- i, q* d3 {optional int64 rm_identifier = 4;
) e" A# T& p) y. n+ T' x! g2 M( Coptional string diagnostics_message = 5;
/ T, a: M: a) n+ O- m( |}.
+ K' m; M/ l5 W Z( h6 |. e9 [.. //其他几个参数和返回值的定义
' U ] Y0 m1 H% z8 F3 y% t4 k* m步骤4 为RPC函数的参数和返回值提供Java定义和封装。 YARN采用了Protocol Buffers作为参数和返回值的序列化框架, 且以
) X; w, ?: R4 r+ U" S; X( s' m原生态.proto文件的方式给出了定义, 而具体的Java代码生成需在代码编写之后完成。 基于以上考虑, 为了更容易使用Protocol
) c, j n7 Z1 N. R+ ]Buffers生成的( Java语言) 参数和返回值定义, YARN RPC为每个RPC函数的参数和返回值提供Java定义和封装, 以参数
( O$ ^; m$ F2 U+ ?' u5 YRegisterNodeManagerRequest为例进行说明。, t' r3 v1 Q$ m2 U: Z
Java接口定义如下( 见Java包org.apache.hadoop.yarn.server.api.protocolrecords) :& E2 M; \% `% n, e
public interface RegisterNodeManagerRequest {; p, z/ w# o) m
NodeId getNodeId();
: ~' q a4 }4 N" z5 k: \* Fint getHttpPort();: g; O4 M) X, v- m: e5 \
Resource getResource();
3 D8 _: o3 n+ S4 F: c+ B9 evoid setNodeId(NodeId nodeId);. p, f9 l1 W7 {3 @5 l t! C* v1 l; e
void setHttpPort(int port);: g% u7 t6 m1 E/ J0 r D
void setResource(Resource resource);
B0 p- }6 M6 \( A}
' b2 L4 _/ L6 Y& K5 IJava封装如下( 见Java包org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb) :
7 G0 D9 i4 [6 {$ I( n( U& |2 h$ ppublic class RegisterNodeManagerRequestPBImpl extends
/ z1 R4 p! l8 L/ T) xProtoBase<RegisterNodeManagerRequestProto> implements RegisterNodeManagerRequest {
4 c) o" ~; J4 t T% ^% eRegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance();$ }0 H* F, L- N8 z4 G
RegisterNodeManagerRequestProto.Builder builder = null;- f2 _' U3 S3 l- H2 X
private NodeId nodeId = null;0 j# e' D' Y, Y& T+ m E) v
...6 h' t* s% `0 k
@Override" O/ C' S/ _2 x' Y
public NodeId getNodeId() {
# K4 O7 c* S/ U' ~7 i! jRegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
- a! L/ `- {9 B% Z o$ c, A: jif (this.nodeId != null) {
P/ O) h; y/ E0 [return this.nodeId;. H/ _$ q. Z# w3 T. I' f* ^2 D
}i
% k8 ?* Z% k" g5 @4 `/ f* Lf (!p.hasNodeId()) {9 z, \. v2 N* L/ {' d. }% \1 n
return null;6 F9 h0 ~* d9 ^( ~
} t) J' T2 s# l' N
his.nodeId = convertFromProtoFormat(p.getNodeId());4 q* y6 m3 ^" G
return this.nodeId;
1 z0 @& k4 M. W1 K% ?2 r} @
. Q" {% f) D: _( F( u8 Q( E/ FOverride$ I r2 `* j9 Z. Z0 ~0 o( v
public void setNodeId(NodeId nodeId) {
. J5 E% N, O! c. Q+ ]' hmaybeInitBuilder();* n( [4 f6 X% N+ b5 P) u. A; g
if (nodeId == null); Q0 ~( _6 V* ^ x# q. H2 W) `! X
builder.clearNodeId();
+ [' V# z% f1 ^) L, K4 \this.nodeId = nodeId;# F# v9 D2 J$ ^
} .$ l. M( L$ f1 f6 d: {, n
..
( V# e. T/ `! w1 c, ?' H: H7 _}
3 O. q" r- |( j6 X9 R4 S: E6 ?步骤5 为通信协议提供客户端和服务器端实现。 客户端代码放在org.apache.hadoop.yarn.server.api.impl.pb.client包中, 且类名为& Z; C# e: F: X) }. W
ResourceTrackerPBClientImpl, 实现如下:; N; E1 W% e2 \" e8 |+ Q
public class ResourceTrackerPBClientImpl implements ResourceTracker, Closeable {4 o8 F! d6 s' I* T: E+ n8 |" y
private ResourceTrackerPB proxy;
- n- E% y5 o& ~# g0 t, W% a3 Spublic ResourceTrackerPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException {8 N5 L" p% `) G
RPC.setProtocolEngine(conf, ResourceTrackerPB.class, ProtobufRpcEngine.class);
3 E& C" g6 o: {" k2 `7 o3 bproxy = (ResourceTrackerPB)RPC.getProxy(/ g6 I+ y; G9 B! P3 H! Q" [. c
ResourceTrackerPB.class, clientVersion, addr, conf);
- Z7 |8 z* I: Z8 q; |} @3 J5 l* H" g* ` W% K, k
Override, N; W( ?! }( m9 W. k9 a
public RegisterNodeManagerResponse registerNodeManager(
) j, q: P$ Q/ T$ V" n/ A4 b0 f- IRegisterNodeManagerRequest request) throws YarnException,
2 N, ^' b* n$ p, |0 cIOException {
% L8 U$ y7 z i! x5 LRegisterNodeManagerRequestProto requestProto = ((RegisterNodeManagerRequestPBImpl)request).getProto();
: W, W; Z1 h4 itry {
- l: ]: R& N5 d: x$ nreturn new RegisterNodeManagerResponsePBImpl(proxy.registerNodeManager (null, requestProto));
, Y4 A$ K: m% t3 {! ^$ K2 h} catch (ServiceException e) {
1 l0 D2 P1 t/ O0 wRPCUtil.unwrapAndThrowException(e);# y9 ^' z$ s- G( ?6 s5 E
return null;) _7 X6 Z# T. i$ T, m
}
% r: _: ~. K; X/ P$ |4 F* _} .
9 p! ~7 f( N. i2 }..
2 `8 R7 a; j6 p# G6 ~" F$ v}
; w( u7 }9 I* D( P W# m服务端代码放在org.apache.hadoop.yarn.server.api.impl.pb.server包中, 且类名为Resource-TrackerPBServerImpl, 实现如下:6 {7 T( k5 V X" g( g: [
public class ResourceTrackerPBServiceImpl implements ResourceTrackerPB {4 x/ z U R/ Q n6 x# J
private ResourceTracker real;
1 G$ b' j) ?% m1 b/ e* r2 jpublic ResourceTrackerPBServiceImpl(ResourceTracker impl) {
1 w2 @7 M0 f+ T$ \1 w( a# [/ xthis.real = impl;
4 A# u g! j) S. o$ E- M p; Z. ~" h} @$ h7 N5 S7 w6 O- X% i
Override
" R: T1 y9 O, }/ u3 r& g. {6 ?7 C$ Vpublic RegisterNodeManagerResponseProto registerNodeManager(2 O- F* S) r2 h) }- o3 D1 z
RpcController controller, RegisterNodeManagerRequestProto proto)
7 H) o/ a# M z$ \' A5 \8 hthrows ServiceException {9 Y" c. h3 t& E, {, l- x/ g! q
RegisterNodeManagerRequestPBImpl request = new RegisterNodeManagerRequestPBImpl(proto);
, |6 V) U* ]) i0 m, utry {
+ o1 K2 a; R" l9 t7 D+ KRegisterNodeManagerResponse response = real.registerNodeManager(request);% S& T9 t7 B5 K ^2 r
return ((RegisterNodeManagerResponsePBImpl)response).getProto();5 @% Q6 k! o) j4 S% c+ X/ Y
} catch (YarnException e) {
7 d# j, @! W1 {# sthrow new ServiceException(e);
* k/ q; [9 z. j" ^} catch (IOException e) {) Y K0 M# g) O% v5 j8 c! S" N
throw new ServiceException(e);
+ r5 X* q& `% V- d- v' h( ^}
" b7 D7 e( [& y3 _/ K S5 G {! ^} .+ p3 [, j" L L" J9 U
..1 F% p' N ~; B
}. O' e( f- n6 }: s X4 e. N
总结上面几个步骤, 为了实现基于Protocol Buffers序列化框架的YARN RPC通信协议ResourceTracker, YARN实现了一系列& @2 O+ E0 z* o
Java接口定义和Protocol Buffers封装, 具体如图3-12所示( 以服务器端实现为例) 。( ]& ?: |7 f: K- G5 J; \
图3-12 YARN RPC中的Protocol Buffers封装
0 Y0 ? d0 X1 k: U! J ~[6] 参见网址http://en.wikipedia.org/wiki/Remote_procedure_call。
8 U# g4 l. q& ~7 O0 p6 M) P* O[7] Doug Cutting在Hadoop最初设计时就是这样描述Hadoop RPC设计动机的。
# ?9 z* q L5 V/ B( ?/ f[8] HDFS的单点故障已经在Hadoop 2.0中得到了解决, MRv1中的JobTracker的单点故障在CDH4中也得到了解决。
- J: g: H4 T2 t6 y+ C& X( ^[9] 参见网址http://thrift.apache.org/。
# n2 s4 z& j7 n' k3 I1 q8 Z[10] 参见网址: http://code.google.com/p/protobuf/wiki/ThirdParty/AddOns。
. V: p" |$ V0 v( J[11] AvroRpcEngine从Hadoop 0.21.0版本开始出现。
; O3 I6 ?( S/ a. ^[12] ProtobufRpcEngine从Hadoop 2.0-apha版本开始出现。9 ]4 C+ r \; H, A" {2 N2 @' k
[13] 参见网址https://issues.apache.org/jira/browse/HADOOP-7347。 S/ [3 s8 Y2 V' P
[14] Hadoop 2.0中的RFC框架是采用Java编写的, 尚不能像Thrift和Avro那样支持多语言编程, 但引入Protocol Buffers序列化框架则 r* g$ ~3 `$ e9 c6 P& L
使其向前迈进了一步。 ! M, F# e4 V7 E
+ D& n1 q6 ^; X+ x, C: Z# F0 P- x0 X4 \' W1 C& m1 K# S* e% u) ]
|
|