|
3.3.5 Hadoop RPC类详解: @- }, f0 C7 r% W4 J" S4 A
Hadoop RPC主要由三个大类组成, 即RPC、 Client和Server, 分别对应对外编程接口、 客户端实现和服务器实现。+ X9 \8 g M9 @+ h
1.ipc.RPC类分析% j( X2 b! q+ T" s, o f
RPC类实际上是对底层客户机–服务器网络模型的封装, 以便为程序员提供一套更方便简洁的编程接口。4 M/ Q( ~3 ~& J
如图3-4所示, RPC类定义了一系列构建和销毁RPC客户端的方法, 构建方法分为getProxy和waitForProxy两类, 销毁方只有一
( y" d* i3 E5 \2 j个, 即为stopProxy。 RPC服务器的构建则由静态内部类RPC.Builder, 该类提供了一些列setXxx方法( Xxx为某个参数名称) 供用户
. i. m! N. ?( O8 L设置一些基本的参数, 比如RPC协议、 RPC协议实现对象、 服务器绑定地址、 端口号等, 一旦设置完成这些参数后, 可通过调用' s4 o' D4 H$ {9 [
RPC.Builder.build()完成一个服务器对象的构建, 之后直接调用Server.start()方法便可以启动该服务器。. g3 G' h$ `# D) F3 f
与Hadoop 1.x中的RPC仅支持基于Writable序列化方式不同, Hadoop 2.x允许用户使用其他序列化框架, 比如Protocol Buffers6 d& J7 x" k& ~ y# |6 r: ?# v
等, 目前提供了Writable( WritableRpcEngine) 和Protocol Buffers( ProtobufRpcEngine) 两种, 默认实现是Writable方式, 用户可通过
8 K9 l; O7 q7 ?2 `$ ~6 e调用RPC.setProtocolEngine(…)修改采用的序列化方式。: V5 B9 B4 E |0 {% q1 B$ b
下面以采用Writable序列化为例( 采用Protocol Buffers的过程类似) , 介绍Hadoop RPC的远程过程调用流程。 Hadoop RPC使用
7 o3 k( M2 P# Y6 H% p! m了Java动态代理完成对远程方法的调用: 用户只需实现java.lang.reflect.InvocationHandler接口, 并按照自己需求实现invoke 方法即可
; Y3 O. _& p' g2 |+ W完成动态代理类对象上的方法调用。 但对于Hadoop RPC, 函数调用由客户端发出, 并在服务器端执行并返回, 因此不能像单机7 g A! w' N4 `) _
程序那样直接在invoke 方法中本地调用相关函数, 它的做法是, 在invoke方法中, 将函数调用信息( 函数名, 函数参数列表等)
9 e: W% ~3 I6 g' n8 z% s3 z0 \打包成可序列化的WritableRpcEngine.Invocation对象, 并通过网络发送给服务器端, 服务端收到该调用信息后, 解析出和函数名,. [" t& L, L/ A( @% T# u
函数参数列表等信息, 利用Java反射机制完成函数调用, 期间涉及到的类关系如下图所示。- m3 [3 Z5 H3 G+ J9 y3 r0 `
图3-4 HadoopRPC的主要类关系图* p" v- T; @+ M. l$ |0 x; C+ r( _1 `
图3-5 HadoopRPC中服务器端动态代理实现类图
, S1 a/ R, p+ m' F e2.ipc.Client3 M6 P3 N5 S. N ]" S
Client主要完成的功能是发送远程过程调用信息并接收执行结果。 它涉及到的类关系如图3-6所示。 Client类对外提供了一类执
9 Z5 c% q$ q, U2 \ J+ V行远程调用的接口, 这些接口的名称一样, 仅仅是参数列表不同, 比如其中一个的声明如下所示:' `4 R0 O! @" y; G0 \& p$ w
public Writable call(Writable param, ConnectionIdremoteId)9 p" p1 Z9 u* w/ Z
throws InterruptedException, IOException;
0 y T, {- q% Y图3-6 Client类图8 c2 S3 F& D! C5 p
Client内部有两个重要的内部类, 分别是Call和Connection。6 C3 ~0 r& M$ I; }7 Z" O/ P! N
❑Call类 : 封装了一个RPC请求, 它包含5个成员变量, 分别是唯一标识id、 函数调用信息param、 函数执行返回值value、 出
. d# _8 Q! s7 t4 A0 o8 c错或者异常信息error和执行完成标识符done。 由于Hadoop RPC Server采用异步方式处理客户端请求, 这使远程过程调用的发生顺
. ?) ]: z) T; F% d ?5 N, _& d序与结果返回顺序无直接关系, 而Client端正是通过id识别不同的函数调用的。 当客户端向服务器端发送请求时, 只需填充id和9 w6 S2 I3 j9 E6 W, t1 A
param两个变量, 而剩下的3个变量( value、 error和done) 则由服务器端根据函数执行情况填充。0 P" |( y) M' u3 k F
❑Connection类 : Client与每个Server之间维护一个通信连接, 与该连接相关的基本信息及操作被封装到Connection类中, 基本
4 G# K4 I E. F4 X% O8 t信息主要包括通信连接唯一标识( remoteId) 、 与Server端通信的Socket( socket) 、 网络输入数据流( in) 、 网络输出数据流; {. w" N& o0 O. k% w( k; m
( out) 、 保存RPC请求的哈希表( calls) 等。 操作则包括:/ h1 Z0 f: [6 f4 C) o
❍addCall—将一个Call对象添加到哈希表中;" f0 S$ J8 Q/ E* x
❍sendParam—向服务器端发送RPC请求;
, J6 Y5 f5 A4 ~& }% G❍receiveResponse —从服务器端接收已经处理完成的RPC请求;3 |5 h/ t/ T* N7 G6 c: r
❍run—Connection是一个线程类, 它的run方法调用了receiveResponse方法, 会一直等待接收RPC返回结果。
* D0 m, j( A; D) v+ k当调用call函数执行某个远程方法时, Client端需要进行( 如图3-7所示) 以下4个步骤。% z# k+ V: k8 r2 s% G
1) 创建一个Connection对象, 并将远程方法调用信息封装成Call对象, 放到Connection对象中的哈希表中;2 F! l0 u0 U4 t' G3 B4 e
2) 调用Connection类中的sendRpcRequest()方法将当前Call对象发送给Server端;3 w" R5 i" Y( P* d# m, }# B( p
3) Server端处理完RPC请求后, 将结果通过网络返回给Client端, Client端通过receiveRpcResponse()函数获取结果;! J, n! e( ]% J9 @$ u% ~ L
4) Client检查结果处理状态( 成功还是失败) , 并将对应Call对象从哈希表中删除。
+ l% P- c% L! n( a- I图3-7 Hadoop RPC Client处理流程0 ~+ q6 r: ]& u) \' o P1 i4 B; {
3.ipc.Server类分析
4 L' I2 v x& o: M' ^Hadoop采用了Master/Slave结构, 其中Master是整个系统的单点, 如NameNode或 JobTracker [8] , 这是制约系统性能和可扩展
' k1 Q7 x! A2 J0 R% `性的最关键因素之一; 而Master通过ipc.Server接收并处理所有Slave发送的请求, 这就要求ipc.Server 将高并发和可扩展性作为设计
' Y5 J' Z5 G; ^! z目标。 为此, ipc.Server采用了很多提高并发处理能力的技术, 主要包括线程池、 事件驱动和Reactor设计模式等, 这些技术均采用
* Z. @( z; T4 \+ o" V了JDK自带的库实现, 这里重点分析它是如何利用Reactor设计模式提高整体性能的。
) t# S2 y$ n: |9 B! {Reactor是并发编程中的一种基于事件驱动的设计模式, 它具有以下两个特点: 通过派发/分离I/O操作事件提高系统的并发性; S9 A1 T% C$ z V% a& ]
能; 提供了粗粒度的并发控制, 使用单线程实现, 避免了复杂的同步处理。 典型的Reactor实现原理如图3-8所示。8 w6 H; E) z, F$ k7 V4 d# P* Q
图3-8 Reactor模式工作原理7 e5 k% P" T# T# S' K7 B }/ c5 f
典型的Reactor模式中主要包括以下几个角色。
8 d9 D i, B2 {7 y9 d& W$ A* b❑Reactor: I/O事件的派发者。
. x8 E: D- l8 d" J& j: s1 U❑Acceptor: 接受来自Client的连接, 建立与Client对应的Handler, 并向Reactor注册此Handler。
2 X3 T, I) X7 N❑Handler: 与一个Client通信的实体, 并按一定的过程实现业务的处理。 Handler内部往往会有更进一步的层次划分, 用来抽
: k4 |5 n, } E+ G/ ]象诸如read、 decode、 compute、 encode和send等过程。 在Reactor模式中, 业务逻辑被分散的I/O事件所打破, 所以Handler需要有适
3 v6 F5 y0 O3 u7 h当的机制在所需的信息还不全( 读到一半) 的时候保存上下文, 并在下一次I/O事件到来的时候( 另一半可读) 能继续上次中断' g9 |" ?" q( J2 q' S, T
的处理。
+ C8 }, G+ |# p3 R8 y+ R) J2 @❑Reader/Sender: 为了加速处理速度, Reactor模式往往构建一个存放数据处理线程的线程池, 这样数据读出后, 立即扔到线/ i/ j* V. S" e( z4 O! F2 a
程池中等待后续处理即可。 为此, Reactor模式一般分离Handler中的读和写两个过程, 分别注册成单独的读事件和写事件, 并由对
5 y: t; B4 |$ J% L, U应的Reader和Sender线程处理。) I5 Q# D; ~) W5 g
ip
. V" \, ?/ z. m9 M; Dc.Server实际上实现了一个典型的Reactor设计模式, 其整体架构与上述完全一致。 一旦读者了解典型Reactor架构便可很容易
- @4 C) a& ]6 q5 w3 k地学习ipc.Server的设计思路及实现。 接下来, 我们分析ipc.Server的实现细节。# F* v6 P! _+ h# C/ l
前面提到, ipc.Server的主要功能是接收来自客户端的RPC请求, 经过调用相应的函数获取结果后, 返回给对应的客户端。 为2 [: ^* }6 x- z$ o: Z7 }
此, ipc.Server被划分成3个阶段: 接收请求、 处理请求和返回结果, 如图3-9所示。 各阶段实现细节如下。1 y$ S; w8 c. Y" B' A# }
图3-9 Hadoop RPC Server处理流程
0 r& E/ e/ c( }# h' [8 q( 1) 接收请求
' V; a( V+ R, [( |2 @该阶段主要任务是接收来自各个客户端的RPC请求, 并将它们封装成固定的格式( Call类) 放到一个共享队列( callQueue)" F+ V1 t+ y+ @; Z
中, 以便进行后续处理。 该阶段内部又分为建立连接和接收请求两个子阶段, 分别由Listener和Reader两种线程完成。
; T% {8 K& `! u4 p! ~! s3 v5 D整个Server只有一个Listener线程, 统一负责监听来自客户端的连接请求, 一旦有新的请求到达, 它会采用轮询的方式从线程7 P# c4 V! o* j' `0 U5 p" O
池中选择一个Reader线程进行处理, 而Reader线程可同时存在多个, 它们分别负责接收一部分客户端连接的RPC请求, 至于每个) X" |- z6 M7 n, w2 v) ^, Y) c
Reader线程负责哪些客户端连接, 完全由Listener决定, 当前Listener只是采用了简单的轮询分配机制。2 o; W& q3 G% z `3 Q
Listener和Reader线程内部各自包含一个Selector对象, 分别用于监听SelectionKey.OP_ACCEPT和SelectionKey.OP_READ事件。
e3 O9 J/ }- c对于Listener线程, 主循环的实现体是监听是否有新的连接请求到达, 并采用轮询策略选择一个Reader线程处理新连接; 对于
0 u' b1 i) M' q2 x, jReader线程, 主循环的实现体是监听( 它负责的那部分) 客户端连接中是否有新的RPC请求到达, 并将新的RPC请求封装成Call对
, q; S" }& r4 ~7 R象, 放到共享队列callQueue中。
8 x7 K+ Z% g8 Y T( 2) 处理请求 {' B0 q4 V0 R$ [$ ?5 D
该阶段主要任务是从共享队列callQueue中获取Call对象, 执行对应的函数调用, 并将结果返回给客户端, 这全部由Handler线! C- u! B1 S0 F4 R% u
程完成。) A/ }- G9 y" ^3 ^' G/ R
Server端可同时存在多个Handler线程, 它们并行从共享队列中读取Call对象, 经执行对应的函数调用后, 将尝试着直接将结果# q2 B2 w8 I( B2 H
返回给对应的客户端。 但考虑到某些函数调用返回结果很大或者网络速度过慢, 可能难以将结果一次性发送到客户端, 此时
6 l2 T3 m b- K y. L( UHandler将尝试着将后续发送任务交给Responder线程。
6 W& ~$ s: @5 k* u8 u( 3) 返回结果1 K! I, k! ^' k9 {6 | y
前面提到, 每个Handler线程执行完函数调用后, 会尝试着将执行结果返回给客户端, 但对于特殊情况, 比如函数调用返回结7 l( A8 _7 I/ ]/ C2 N2 s; O( A
果过大或者网络异常情况( 网速过慢) , 会将发送任务交给Responder线程。
9 N$ C6 V- T9 q/ i9 M) AServer端仅存在一个Responder线程, 它的内部包含一个Selector对象, 用于监听SelectionKey.OP_WRITE事件。 当Handler没能将
8 A. }0 k Q$ [# g结果一次性发送到客户端时, 会向该Selector对象注册SelectionKey.OP_WRITE事件, 进而由Responder线程采用异步方式继续发送4 O4 V0 u6 {- u2 x d
未发送完成的结果。
( |2 G+ R9 L- `# _% q* F+ p3.3.6 Hadoop RPC参数调优4 b+ E- b0 d- {6 J. C
Hadoop RPC对外提供了一些可配置参数, 以便于用户根据业务需求和硬件环境对其进行调优。 主要的配置参数如下。/ o+ c/ e7 `6 W* ^ }
❑Reader线程数目。 由参数ipc.server.read.threadpool.size配置, 默认是1, 也就是说, 默认情况下, 一个RPC Server只包含一个* _/ `5 {) ]/ Z6 d& h
Reader线程。+ l+ a# w. p' f, X% P
❑每个Handler线程对应的最大Call数目。 由参数ipc.server.handler.queue.size指定, 默认是100, 也就是说, 默认情况下, 每个
6 S5 }0 M- e% a8 m+ xHandler线程对应的Call队列长度为100。 比如, 如果Handler数目为10, 则整个Call队列( 即共享队列callQueue) 最大长度为:
8 W$ u* w* Y2 w. c100×10=1000。
& v- [3 ~ x3 J; H❑Handler线程数目。 在Hadoop中, ResourceManager和NameNode分别是YARN和HDFS两个子系统中的RPC Server, 其对应的% Q& J- M/ m0 Z4 a K
Handler数目分别由参数yarn.resourcemanager.resource-tracker.client.thread-count和dfs.namenode.service.handler.count指定, 默认值分别为5 h$ ]# @- g8 m2 y8 y- J
50和10, 当集群规模较大时, 这两个参数值会大大影响系统性能。2 ^' C O8 V* T. c- Q3 j
❑客户端最大重试次数。 在分布式环境下, 因网络故障或者其他原因迫使客户端重试连接是很常见的, 但尝试次数过多可
/ t9 W6 K$ B& B. f& S; u: L4 N能不利于对实时性要求较高的应用。 客户端最大重试次数由参数ipc.client.connect.max.retries指定, 默认值为10, 也就是会连续尝试
\0 Q" F/ d( q1 w1 l6 X/ O1 A8 l10次( 每两次之间相隔1秒) 。/ B( s: P2 q0 c7 `" g' {: c2 p; ~
3.3.7 YARN RPC实现
& O0 R- [) C: U" Z; O' ^当前存在非常多的开源RPC框架, 比较有名 的有Thrift [9] 、 Protocol Buffers和Avro。 同Hadoop RPC一样, 它们均由两部分组
4 {" Z5 z+ @4 r( z5 B% i [4 q% D成: 对象序列化和远程过程调用( Protocol Buflers官方仅提供了序列化实现, 未提供远程调用相关实现, 但三 方RPC库非常多 [10]5 O4 q/ }- X" c* j0 ], l$ D$ l
) 。 相比于Hadoop RPC, 它们有以下几个特点:
$ P- @; b* Q& a❑跨语言特性 。 前面提到, RPC框架实际上是客户机–服务器模型的一个应用实例, 对于Hadoop RPC而言, 由于Hadoop采用, H" e0 {' I ?0 J3 }( b E
Java语言编写, 因而其RPC客户端和服务器端仅支持Java语言; 但对于更通用的RPC框架, 如Thrift或者Protocol Buffers等, 其客户
3 Q$ j% I) `$ U! w! J端和服务器端可采用任何语言编写, 如Java、 C++、 Python等, 这给用户编程带来极大方便。
' C& i: O* X/ Z5 M9 z1 C4 l k4 Q❑引入IDL 。 开源RPC框架均提供了一套接口描述语言( Interface Description Language, IDL) , 它提供一套通用的数据类型,
1 p# v, R% U2 ~( h并以这些数据类型来定义更为复杂的数据类型和对外服务接口。 一旦用户按照IDL定义的语法编写完接口文件后, 可根据实际应
" Q2 E+ f# _1 R K用需要生成特定编程语言( 如Java、 C++、 Python等) 的客户端和服务器端代码。
% U) A( O. l* f❑协议兼容性 。 开源RPC框架在设计上均考虑到了协议兼容性问题, 即当协议格式发生改变时, 比如某个类需要添加或者
. u2 Z+ Q, l% e5 t% x5 V/ H* l删除一个成员变量( 字段) 后, 旧版本代码仍然能识别新格式的数据, 也就是说, 具有向后兼容性。
* h( @6 t/ `! } @8 U: c% ^* G. h随着Hadoop版本的不断演化, 研发人员发现Hadoop RPC在跨语言支持和协议兼容性两个方面存在不足, 具体表现为:8 u! N! k9 E) f; b k( D1 P% G
❑从长远发展看, Hadoop RPC应允许某些协议的客户端或者服务器端采用其他语言实现, 比如用户希望直接使用C/C++语言
2 a4 ~7 |* z4 c# @8 q: `读写HDFS中的文件, 这就需要有C/C++语言的HDFS客户端。
3 ^* ~0 x! \- J❑当前Hadoop版本较多, 而不同版本之间不能通信, 比如0.20.2版本的JobTracker不能与0.21.0版本中的TaskTracker通信, 如
* V5 X6 n8 G4 }$ \- w" x5 B果用户企图这样做, 会抛出VersionMismatch异常。
& E& x/ m" b& i/ I/ X! i为了解决以上几个问题, Hadoop YARN将RPC中的序列化部分剥离开, 以便将现有的开源RPC框架集成进来。 经过改进之, ^. |: x$ C6 S5 V: D( n
后, Hadoop RPC的类关系如图3-10所示, RPC类变成了一个工厂, 它将具体的RPC实现授权给RpcEngine实现类, 而现有的开源
. k; W0 E7 P+ Y6 iRPC只要实现RpcEngine接口, 便可以集成到Hadoop RPC中。 在该图中, WritableRpcEngine是采用Hadoop自带的序列化框架实现的0 j, ]4 N3 g# o% y' ^9 ?
RPC, 而 AvroRpcEngine [11] 和 ProtobufRpcEngine [12] 分别是开源RPC( 或序列化) 框架Apache Avro和Protocol Buffers对应的. S" r$ X: V* w2 F; t# }: K
RpcEngine实现, 用户可通过配置参数rpc.engine.{protocol}以指定协议{protocol}采用的序列化方式。 需要注意的是, 当前实现( G% U7 U; e/ h. s$ [+ B
中, Hadoop RPC只是采用了这些开源框架的序列化机制, 底层的函数调用机制仍采用Hadoop自带的。 M$ Y0 z4 N+ @4 E5 w: i; W& E
YARN提供的对外类是YarnRPC, 用户只需使用该类便可以构建一个基于Hadoop RPC且采用Protocol Buffers序列化框架的通信/ S& W9 W" K6 Q2 v
协议。 YarnRPC相关实现类如图3-11所示。 YarnRPC是一个抽象类, 实际的实现由参数yarn.ipc.rpc.class指定, 默认值是, h6 k) h# c4 ^' R0 O1 a+ \: c& F
org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC。 HadoopYarnProtoRPC通过RPC工厂生成器( 工厂设计模式) RpcFactoryProvider生8 @/ f+ k& }0 l6 w' S2 ~
成客户端工厂( 由参数yarn.ipc.client.factory.class指定, 默认值是org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl) 和服
0 X: z9 q6 a' B0 I务器工厂( 由参数yarn.ipc.server.factory.class指定, 默认值是org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl) , 以根1 v. V: j& F( J; k
据通信协议的Protocol Buffers定义生成客户端对象和服务器对象。
` x. h7 O) R) w图3-10 Hadoop RPC 集成多种开源RPC 框架 K1 O C: Y$ Y, ]# @
图3-11 YarnRPC 相关类图. S8 \; ]6 M) r9 x* V2 b/ g! F& w, P
❑RpcClientFactoryPBImpl: 根据通信协议接口( 实际上就是一个Java interface) 及Protocol Buffers定义构造RPC客户端句柄, 但
' M5 B: S8 N3 w0 u2 t+ R" |它对通信协议的存放位置和类名命有一定要求。 假设通信协议接口Xxx所在Java包名为XxxPackage, 则客户端实现代码必须位于% }; ?) W7 k$ {& t
Java包XxxPackage.impl.pb.client中( 在接口包名后面增加".impl.pb.client") , 且实现类名为PBClientImplXxx( 在接口名前面增加前$ w; s. V& G5 `5 g4 d) ?* W/ j
缀"PBClientImpl") 。
: U& E9 R4 @6 U; u& O! l1 x0 r9 l❑RpcServerFactoryPBImpl: 根据通信协议接口( 实际上就是一个Java interface) 及Protocol Buffers定义构造RPC服务器句柄* z0 j" v1 F* q" S
(具体会调用前面节介绍的RPC.Server类) , 但它对通信协议的存放位置和类命名有一定要求。 假设通信协议接口Xxx所在Java包% `5 c6 C3 o9 R H" J) h
名为XxxPackage, 则客户端实现代码必须位于Java包XxxPackage.impl.pb.server中(在接口包名后面增加".impl.pb.server") , 且实现
8 v, U- p3 y ]' C类名为PBServiceImplXxx(在接口名前面增加前缀"PBServiceImpl") 。) L1 S: M3 Y" a
Hadoop YARN已将Protocol Buffers作为默认 的序列化机制 [13] (而不是Hadoop自带的Writable) , 这带来的好处主要表现在以
9 A. b$ |% k3 H下几个方面:
% G' e' P' q6 B❑继承了Protocol Buffers的优势 。 Protocol Buffers已在实践中证明了其高效性、 可扩展性、 紧凑性和跨语言特性。 首先, 它允+ D. y6 y# N9 m, [5 H# \# X
许在保持向后兼容性的前提下修改协议, 比如为某个定义好的数据格式添加一个新的字段; 其次, 它支持多种语言, 进而方便用/ R' [( s; m/ q* J0 @2 |' f2 Q
户为某些服务(比如HDFS的NameNode) 编写 非Java客户端 [14] ; 此外, 实验表明Protocol Buffers比Hadoop 自带的Writable在性能" n% p+ w" L% o
方面有很大提升。( @' g# L$ |! E
❑支持升级回滚 。 Hadoop 2.0已经将NameNode HA方案合并进来, 在该方案中, Name-Node分为Active和Standby两种角色,
5 `& u: C! K" s6 ^ D其中, Active NameNode在当前对外提供服务, 而Standby NameNode则是能够在Active NameNode出现故障时接替它。 采用Protocol* P- U _* J; O2 d! q; L
Buffers序列化机制后, 管理员能够在不停止NameNode对外服务的前提下, 通过主备NameNode之间的切换, 依次对主备& X" h/ z( c0 F/ H% H, j
NameNode进行在线升级(不用考虑版本和协议兼容性等问题) 。3 f+ O. R7 @, ^. I7 [
3.3.8 YARN RPC应用实例0 I' z8 v6 H' O ~3 B2 L
为了进一步说明YARN RPC的使用方法, 本小节给出一个具体的应用实例。( b; q' o8 [. g
在YARN中, ResourceManager和NodeManager之间的通信协议是ResourceTracker, 其中NodeManager是该协议的客户
1 F! g0 R7 `" I端, ResourceManager是服务端, NodeManager通过该协议中定义的两个RPC函数( registerNodeManager和nodeHeartbeat) 向, `% \1 x# h4 H, b; W
ResourceManager注册和周期性发送心跳信息。 ResourceManager(服务器端) 中的相关代码如下:0 f) B: a# w1 ~6 o7 h
// ResourceTrackerService实现了ResourceTracker通信接口, 并启动RPC Server
2 b2 M) ?' N$ ]; ^; y& h+ `; Jpublic class ResourceTrackerService extends AbstractService implements
7 Z; E6 I% B7 Z* T5 cResourceTracker {
+ f% k( J. q9 y" zprivate Server server;! S$ E0 y6 h' ]" W n w
...
8 K, E1 {2 j0 O/ l& l+ e/ `# Lprotected void serviceStart() throws Exception {8 \. j, ^2 Y) h' u; a
super.serviceStart();: ?$ d% @* r( H2 Z( n
Configuration conf = getConfig();
0 \2 U% C6 _: t( G, I0 ?YarnRPC rpc = YarnRPC.create(conf); //使用YarnRPC类
) d6 V3 w4 x( p$ D0 ]" Dthis.server = rpc.getServer(ResourceTracker.class, this, resourceTrackerAddress,
) a" q3 }& N" [# s, sconf, null, conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT,
5 o* f8 d, ]1 J+ K/ MYarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT));
: j/ P% Q) Y2 g% }0 t' @this.server.start();
" k- N5 }* E- i- ]) `, E1 t}.
5 l- m- Y( X0 x. Q0 \..) F8 ?& f4 |, e# \
@Override% q' x' m* [8 B8 Q" p
public RegisterNodeManagerResponse registerNodeManager(
4 T) _6 R9 {2 [: IRegisterNodeManagerRequest request) throws YarnException,! P' C5 y) u, r' n9 H f
IOException {
( @: W, G5 s$ @2 g//具体实现; ^! e- [3 i' N9 e
}@; C- X9 Q, V' I
Override) U$ }; x z; S9 ?% h
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
8 k4 C8 s3 ] Kthrows YarnException, IOException {
. | G; M: T* {# }; |* j. Y9 E//具体实现
6 [. j: ~8 A' o) r: a; C( h}
9 f. q; p( F8 i3 Z7 _8 Q+ v}0 q; B6 }$ Z4 d" s' D
NodeManager(客户端) 中的相关代码如下。
0 a* |/ e$ l4 v3 e5 p) [* e1 h5 H// 该函数是从YARN源代码中简单修改而来的$ \; l* C3 o) e" |0 S9 ?9 q( n
protected ResourceTracker getRMClient() throws IOException {0 {1 v/ L9 N1 k' J( B
Configuration conf = getConfig();
: b$ E0 b- u3 r8 T/ N$ Q$ C" CInetSocketAddress rmAddress = getRMAddress(conf, protocol);
7 @- _5 I& v9 d: s0 |. XRetryPolicy retryPolicy = createRetryPolicy(conf);
' }( ~& ?! B+ xResourceTracker proxy = RMProxy.<T>getProxy(conf, ResourceTracker.class, rmAddress);# s! U6 ~/ q, M3 I/ ]: n
LOG.info("Connecting to ResourceManager at " + rmAddress);' N& _( B/ N8 c) j* c, Y- U4 A& Y
return (ResourceTracker) RetryProxy.create(protocol, proxy, retryPolicy);
. X3 x9 X1 T* a* k8 W& u}.& u* \8 B+ u, A' ?0 W
..
8 t/ ~+ e% U9 p; u0 Mthis.resourceTracker = getRMClient();2 s" z; R' x) v9 i* u
...
" P9 `8 ]* [: pRegisterNodeManagerResponse regNMResponse = resourceTracker.registerNodeManager(request);
1 b, z% a( Y2 _8 R7 i/ q3 u- x# V...
5 K; w8 W7 j9 O1 D" [2 U& jresponse = resourceTracker.nodeHeartbeat(request);) U4 H* p" s: s+ ~0 a
为了能够让以上代码正常工作, YARN按照以下流程实现各种功能。
9 ]: \* L; t4 N ~# s# g3 `- P4 s6 {步骤1 定义通信协议接口( Java Interface) 。 定义通信协议接口ResourceTracker, 它包含registerNodeManager和nodeHeartbeat
8 K* j6 T2 \! B- K/ z两个函数, 且每个函数包含一个参数和一个返回值, 具体如下:
% r0 B% a8 N. {% e1 ^9 I8 Zpublic interface ResourceTracker {
2 v) ?0 s0 C1 Fpublic RegisterNodeManagerResponse registerNodeManager(/ t* y( Z' F" t8 S9 J+ f+ c( g8 H" s- z
RegisterNodeManagerRequest request) throws YarnException, IOException;
9 F* F. o8 o* c# [) epublic NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
" ]; d" R; z p; T n" X: fthrows YarnException, IOException;- R# C2 I2 J8 [: h% u7 W3 K
}6 p% |- d ]& i7 O- v( P
步骤2 为通信协议ResourceTracker提供Protocol Buffers定义和Java实现。 前面提到, Protocol Buffers仅提供了序列化框架, 但: ]" `9 A- w+ [* Z+ r5 J2 D4 _
未提供RPC实现, 因此RPC部分需要由用户自己实现, 而YARN则让ResourceTrackerService类实现了ResourceTracker协议, 它的
5 j6 |% q T% u/ ~( L* u- OProtocol Buffers定义( 具体见文件ResourceTracker.proto) 如下:
* v: p# a0 I5 Z1 S& Q& }option java_package = "org.apache.hadoop.yarn.proto";
. v7 B$ P* L3 p. |$ J7 O8 Zoption java_outer_classname = "ResourceTracker";
% d" S0 e5 r! Y& T7 soption java_generic_services = true;. g# @6 r+ I' ]( x. O
option java_generate_equals_and_hash = true;
+ `, h/ D8 C& W- R `1 [; d0 }* Gimport "yarn_server_common_service_protos.proto";
" r* Y" M: H9 I qservice ResourceTrackerService {* N: E; X5 _; C0 U' J. ~+ `& S& ~. z
rpc registerNodeManager(RegisterNodeManagerRequestProto) returns (RegisterNodeManagerResponseProto);
8 ~- [- U9 `2 G( vrpc nodeHeartbeat(NodeHeartbeatRequestProto) returns (NodeHeartbeatResponseProto);
" {. k$ G6 M4 `( |}
8 }- t+ X5 l+ `, u% c& VResourceTracker的RPC函数实现是由ResourceManager中的ResourceTrackerService完成的。
4 }- ]$ V) t: H3 [4 {步骤3 为RPC函数的参数和返回值提供Protocol Buffers定义。 YARN需要保证每个RPC函数的参数和返回值是采用Protocol
0 d: w7 S. M6 ^# L9 p+ uBuffers定义的, 因此ResourceTracker协议中RegisterNodeManagerRequest、 RegisterNodeManagerResponse、 NodeHeartbeatRequest和
' N$ l. B/ _% c) `NodeHeartbeatResponse四个参数或者返回值需要使用Protocol Buffers定义, 具体如下( 见yarn_server_common_service_protos.proto文
5 {. G! n7 ?4 s" J件) :- e4 `! ?5 g: q
import "yarn_protos.proto";
6 l$ t) I7 W4 q- bimport "yarn_server_common_protos.proto";3 s1 n, e' p; j4 M5 I* ]. l
message RegisterNodeManagerRequestProto {8 Q. f" v& B% e8 F1 k! G1 Y$ Y
optional NodeIdProto node_id = 1;$ ~$ Y) G0 _' I4 l V
optional int32 http_port = 3;( i/ T) O5 t0 x8 W+ D
optional ResourceProto resource = 4;& a" e. ~# E" E8 }
} m+ B. R. Z: s& Q& C1 h. q1 g* m1 y2 H
essage RegisterNodeManagerResponseProto {
. w! L* ]- K( |! j" t8 roptional MasterKeyProto container_token_master_key = 1;% O) C& w% C" s. |
optional MasterKeyProto nm_token_master_key = 2;
' N! |; d' u8 zoptional NodeActionProto nodeAction = 3;
9 Z4 [2 w. G' F7 o! Doptional int64 rm_identifier = 4;
; J: R( Y0 f% c- J$ w, R J5 doptional string diagnostics_message = 5;
0 q1 a) }( q6 B}.* _- o1 N$ X6 c4 _. y3 K
.. //其他几个参数和返回值的定义5 t k, ?: V9 e/ ~5 X4 ^ d
步骤4 为RPC函数的参数和返回值提供Java定义和封装。 YARN采用了Protocol Buffers作为参数和返回值的序列化框架, 且以8 d+ u9 J0 u! P6 y
原生态.proto文件的方式给出了定义, 而具体的Java代码生成需在代码编写之后完成。 基于以上考虑, 为了更容易使用Protocol7 z- F* s$ @; O g
Buffers生成的( Java语言) 参数和返回值定义, YARN RPC为每个RPC函数的参数和返回值提供Java定义和封装, 以参数
: B3 Z- `- @* \RegisterNodeManagerRequest为例进行说明。
" J. `5 h+ b3 K* P3 t; \% _! c/ BJava接口定义如下( 见Java包org.apache.hadoop.yarn.server.api.protocolrecords) :
( W7 u0 y1 z/ } e" ]7 ~public interface RegisterNodeManagerRequest {! I; {9 K1 D) e6 m& ?. X. q
NodeId getNodeId();
0 e6 C. I6 D, t& b% q) z( _) oint getHttpPort();
: W6 |/ x" Q$ F" U( ?Resource getResource();
, i! z& F/ [1 i% }9 g- Qvoid setNodeId(NodeId nodeId);
+ i4 z4 H2 C/ u9 C# Fvoid setHttpPort(int port);% Q- B6 ]3 G" R, G& T4 P# z' X1 d) f- R
void setResource(Resource resource);
; C# Q% Y8 P* T8 a: [}- A1 x3 B- R6 K B- }
Java封装如下( 见Java包org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb) :* }/ j7 _4 T/ ?, a$ v
public class RegisterNodeManagerRequestPBImpl extends
/ _$ d% Q3 F0 p# \0 E7 ^/ o9 n: [ProtoBase<RegisterNodeManagerRequestProto> implements RegisterNodeManagerRequest {/ V1 d1 N% ?/ A% T1 d. @
RegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance(); B/ R1 _; {; |% `3 M
RegisterNodeManagerRequestProto.Builder builder = null;9 n1 Q& v0 \8 E, o
private NodeId nodeId = null;6 l; ?( x3 G' f- W& c$ k7 h
...* g g" o: f7 K* K" i; M# L8 U9 y
@Override: ^. K; n' `+ @8 Q4 s5 }0 l7 R
public NodeId getNodeId() {
# X2 e, H, g5 [% T, V& f; CRegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder; ?: b, R q! v! y, f1 T
if (this.nodeId != null) {* o' T: r, q* h, E- N. T6 p
return this.nodeId;
. ?$ q8 A! s. k/ a. [: ]}i# o. k& _9 a, p) P
f (!p.hasNodeId()) {4 u7 q9 o% F S9 S& { x
return null;
) e8 ]* q, C6 L7 i$ d} t
4 S( k k+ R. ]/ Khis.nodeId = convertFromProtoFormat(p.getNodeId());1 Y2 Q9 t8 l) M8 e& P; h
return this.nodeId;/ E3 R6 A: Y7 y) l: M& {4 h& x# K
} @
- X6 S/ G4 m: D9 F3 i# jOverride0 K5 z" Z' ]7 B$ B7 Z
public void setNodeId(NodeId nodeId) {- S+ f. W/ C; d# p% c
maybeInitBuilder();
% ]5 a' Q, J9 |& ^+ h2 [* Fif (nodeId == null)2 f! [! X9 \ u. E" w" J) e
builder.clearNodeId();# a: O: J# g3 U* {
this.nodeId = nodeId;8 ^; q3 z. a; {! A- h" \$ q& T/ g1 Y
} .. Z9 S7 J) r5 o- b8 u
..
4 v( P6 j" I3 p' w: Z3 X0 N7 a6 E9 v/ B}" `- f# @. Y& P" G0 ?4 c
步骤5 为通信协议提供客户端和服务器端实现。 客户端代码放在org.apache.hadoop.yarn.server.api.impl.pb.client包中, 且类名为, x s, w# I: {' p: D
ResourceTrackerPBClientImpl, 实现如下:% M" B1 s! M3 N2 w* }
public class ResourceTrackerPBClientImpl implements ResourceTracker, Closeable {! K% t% k1 d$ s, n
private ResourceTrackerPB proxy;
% L* P* J0 `& |) b( h! p7 Vpublic ResourceTrackerPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException {+ g/ X R3 A; |6 t
RPC.setProtocolEngine(conf, ResourceTrackerPB.class, ProtobufRpcEngine.class);& p- H) V' d/ i7 G6 M% X
proxy = (ResourceTrackerPB)RPC.getProxy(* E* p( w+ X6 [5 N* u
ResourceTrackerPB.class, clientVersion, addr, conf);
2 r. s. x2 i/ S. ^+ B& I0 ^} @% W6 ^' ]5 s F& Z8 _5 ^" K
Override
0 f# {, O K3 Tpublic RegisterNodeManagerResponse registerNodeManager(
+ U; U. l) i# b) \1 M: \) l! SRegisterNodeManagerRequest request) throws YarnException,
/ f/ ?; ?9 w& t# BIOException {
2 `; O" R/ t0 ~/ TRegisterNodeManagerRequestProto requestProto = ((RegisterNodeManagerRequestPBImpl)request).getProto();
5 [, o9 m0 U/ z" _3 a" Htry {
2 i3 A0 }' C/ B6 V: nreturn new RegisterNodeManagerResponsePBImpl(proxy.registerNodeManager (null, requestProto));
- ~4 A( @4 J' ]; U+ o} catch (ServiceException e) {
z; U3 {* w5 u6 n: CRPCUtil.unwrapAndThrowException(e);
/ z3 ?" ~, ^7 I5 u& rreturn null;
+ C8 N' v3 |' M- }" R}
" r) ~0 X5 G+ D} .- s: T6 B, e B. u; R8 |5 X, C
..0 M5 B( C2 [& d) L7 E
}1 r b# s8 E9 A( T$ S! D; e6 }
服务端代码放在org.apache.hadoop.yarn.server.api.impl.pb.server包中, 且类名为Resource-TrackerPBServerImpl, 实现如下:, `& U# p' M# u. i, C+ z
public class ResourceTrackerPBServiceImpl implements ResourceTrackerPB {/ x# C% H% G8 l
private ResourceTracker real;5 |- ~, u9 M" t3 C& `
public ResourceTrackerPBServiceImpl(ResourceTracker impl) {
$ k" i1 p9 t3 z, Z0 i2 _this.real = impl;
: N b& \+ s( ~} @
; T& M' c/ k# KOverride$ h0 a! S, ~& \: t( W
public RegisterNodeManagerResponseProto registerNodeManager(
; A4 m) r6 c% ?. h0 E. sRpcController controller, RegisterNodeManagerRequestProto proto), Q+ `! e Z5 _: F7 ]
throws ServiceException {$ e* H. L) ~) p
RegisterNodeManagerRequestPBImpl request = new RegisterNodeManagerRequestPBImpl(proto);
% O( i6 l6 D1 r6 C2 Utry {
3 y, K" o# w/ h$ o. gRegisterNodeManagerResponse response = real.registerNodeManager(request);
# P( b7 x5 k' m& e8 h1 C9 Q. j; F# \return ((RegisterNodeManagerResponsePBImpl)response).getProto();, n1 P4 F' U: z- T9 L2 u
} catch (YarnException e) {8 K6 M% x/ W6 K! @' h) j
throw new ServiceException(e);
; x" U5 y" w1 R5 ]- o3 h Y7 B} catch (IOException e) {9 ]) v" a" Z! ?* ^* G O: C; [
throw new ServiceException(e);
' L8 A# g& `% g4 ^! P}
# [1 u: q; D4 ]. N3 c& b} .2 R, d G" c' v
..
& a# ^' l3 y, u3 g& F" ?" z3 N}
6 ~) g' c/ j! T3 u9 J1 b; R总结上面几个步骤, 为了实现基于Protocol Buffers序列化框架的YARN RPC通信协议ResourceTracker, YARN实现了一系列5 R. D K& P. C; j- K
Java接口定义和Protocol Buffers封装, 具体如图3-12所示( 以服务器端实现为例) 。* [9 c* \0 d( Y7 Z3 P! [0 b& u6 H& Z
图3-12 YARN RPC中的Protocol Buffers封装
K' W" S0 T( e. W[6] 参见网址http://en.wikipedia.org/wiki/Remote_procedure_call。
; S" P( y' v5 y) N[7] Doug Cutting在Hadoop最初设计时就是这样描述Hadoop RPC设计动机的。% g1 c( s! A) y6 P: E* d5 Q
[8] HDFS的单点故障已经在Hadoop 2.0中得到了解决, MRv1中的JobTracker的单点故障在CDH4中也得到了解决。
" t: E0 q7 ?; x* C& [0 c7 K, j[9] 参见网址http://thrift.apache.org/。 t! H* }% h( Y' W5 W
[10] 参见网址: http://code.google.com/p/protobuf/wiki/ThirdParty/AddOns。
% L+ j, |; p! B+ [' N[11] AvroRpcEngine从Hadoop 0.21.0版本开始出现。
, y# O; t9 p9 b[12] ProtobufRpcEngine从Hadoop 2.0-apha版本开始出现。" _3 z! x9 C4 w9 A- E- X; z
[13] 参见网址https://issues.apache.org/jira/browse/HADOOP-7347。; Q# `2 I- w: T( W9 Y5 G+ |4 P; [
[14] Hadoop 2.0中的RFC框架是采用Java编写的, 尚不能像Thrift和Avro那样支持多语言编程, 但引入Protocol Buffers序列化框架则4 ~% O( N: q% t- P, Q- u
使其向前迈进了一步。 4 `$ E) I3 |/ F: l% i- @
D. }; @& f* d' x* C) j3 ]) x; a$ z% o# H6 w* K3 q
|
|