|
3.3.5 Hadoop RPC类详解3 a4 A3 i0 S( b( S A. E6 I6 R
Hadoop RPC主要由三个大类组成, 即RPC、 Client和Server, 分别对应对外编程接口、 客户端实现和服务器实现。
: x: N& C# Q7 }* ?1.ipc.RPC类分析
$ j' f# G9 I2 n# ?RPC类实际上是对底层客户机–服务器网络模型的封装, 以便为程序员提供一套更方便简洁的编程接口。6 X$ z. V( q. E0 g) X& C1 w
如图3-4所示, RPC类定义了一系列构建和销毁RPC客户端的方法, 构建方法分为getProxy和waitForProxy两类, 销毁方只有一# X3 j ?- }/ S9 |
个, 即为stopProxy。 RPC服务器的构建则由静态内部类RPC.Builder, 该类提供了一些列setXxx方法( Xxx为某个参数名称) 供用户( \3 C1 E+ @7 D# n6 M8 ~
设置一些基本的参数, 比如RPC协议、 RPC协议实现对象、 服务器绑定地址、 端口号等, 一旦设置完成这些参数后, 可通过调用8 t7 X5 X) r0 i4 q- |
RPC.Builder.build()完成一个服务器对象的构建, 之后直接调用Server.start()方法便可以启动该服务器。3 |+ \; s1 E5 P2 `
与Hadoop 1.x中的RPC仅支持基于Writable序列化方式不同, Hadoop 2.x允许用户使用其他序列化框架, 比如Protocol Buffers7 t# `6 [5 ]$ W! |, y% B5 {2 G9 x: h5 u% {
等, 目前提供了Writable( WritableRpcEngine) 和Protocol Buffers( ProtobufRpcEngine) 两种, 默认实现是Writable方式, 用户可通过
, a" g: s8 j5 n4 a9 v( f调用RPC.setProtocolEngine(…)修改采用的序列化方式。
' J! n z$ J2 ~: u2 e1 v/ l! V7 L0 i下面以采用Writable序列化为例( 采用Protocol Buffers的过程类似) , 介绍Hadoop RPC的远程过程调用流程。 Hadoop RPC使用
4 w* n( Q3 m# O- O" k- S0 l C了Java动态代理完成对远程方法的调用: 用户只需实现java.lang.reflect.InvocationHandler接口, 并按照自己需求实现invoke 方法即可' C+ V! k9 E6 @, t
完成动态代理类对象上的方法调用。 但对于Hadoop RPC, 函数调用由客户端发出, 并在服务器端执行并返回, 因此不能像单机
- Q M0 r+ D4 _- c- N+ T程序那样直接在invoke 方法中本地调用相关函数, 它的做法是, 在invoke方法中, 将函数调用信息( 函数名, 函数参数列表等)* |1 Y0 P% p+ I; F$ W+ I
打包成可序列化的WritableRpcEngine.Invocation对象, 并通过网络发送给服务器端, 服务端收到该调用信息后, 解析出和函数名,
( R& I6 u, B6 C* H6 r6 e9 T/ g函数参数列表等信息, 利用Java反射机制完成函数调用, 期间涉及到的类关系如下图所示。4 V( [, B ]& j/ g1 m
图3-4 HadoopRPC的主要类关系图# z z2 b1 T3 d" n7 J* x: X, _
图3-5 HadoopRPC中服务器端动态代理实现类图
4 r/ y+ D; `6 h% P/ p2.ipc.Client
; [ F. n; n- L3 Y- D0 h ]Client主要完成的功能是发送远程过程调用信息并接收执行结果。 它涉及到的类关系如图3-6所示。 Client类对外提供了一类执8 D* s/ k$ T8 }4 }8 v ?1 a+ K
行远程调用的接口, 这些接口的名称一样, 仅仅是参数列表不同, 比如其中一个的声明如下所示:
/ |+ n% b5 C+ ^1 \) Dpublic Writable call(Writable param, ConnectionIdremoteId)
) ^8 m6 ?/ F, B- zthrows InterruptedException, IOException;
4 l1 X9 R! w6 [8 T, W! C2 I) _! K7 U图3-6 Client类图& G* O! P2 V0 j9 T w7 c# X( o
Client内部有两个重要的内部类, 分别是Call和Connection。+ ~7 e" h/ N" t7 r" H& w
❑Call类 : 封装了一个RPC请求, 它包含5个成员变量, 分别是唯一标识id、 函数调用信息param、 函数执行返回值value、 出! X/ |" i& x5 N1 \: d
错或者异常信息error和执行完成标识符done。 由于Hadoop RPC Server采用异步方式处理客户端请求, 这使远程过程调用的发生顺
9 G5 M+ s. M3 j/ ]+ P序与结果返回顺序无直接关系, 而Client端正是通过id识别不同的函数调用的。 当客户端向服务器端发送请求时, 只需填充id和" D5 J0 m1 _5 l4 |7 z4 F0 b% @
param两个变量, 而剩下的3个变量( value、 error和done) 则由服务器端根据函数执行情况填充。; O. |: T1 _. ^6 u! X, F" W' s
❑Connection类 : Client与每个Server之间维护一个通信连接, 与该连接相关的基本信息及操作被封装到Connection类中, 基本) ~; b C! B& t3 O
信息主要包括通信连接唯一标识( remoteId) 、 与Server端通信的Socket( socket) 、 网络输入数据流( in) 、 网络输出数据流& d% n1 X2 }* S1 L h
( out) 、 保存RPC请求的哈希表( calls) 等。 操作则包括:
, `0 P! h4 \: m& {6 z4 X( I0 x❍addCall—将一个Call对象添加到哈希表中;- A* X& C6 l1 B
❍sendParam—向服务器端发送RPC请求;
$ z& m) g9 w/ Z5 Q- b❍receiveResponse —从服务器端接收已经处理完成的RPC请求;
$ ?( d' { D. m; [❍run—Connection是一个线程类, 它的run方法调用了receiveResponse方法, 会一直等待接收RPC返回结果。
: ~0 o% _6 W! N: _8 R. H当调用call函数执行某个远程方法时, Client端需要进行( 如图3-7所示) 以下4个步骤。. y+ B# x' W/ c; W- l2 f% |
1) 创建一个Connection对象, 并将远程方法调用信息封装成Call对象, 放到Connection对象中的哈希表中;
8 y$ Y) @4 B& A" J. B2) 调用Connection类中的sendRpcRequest()方法将当前Call对象发送给Server端;7 x9 i* S: t' ]5 a& `4 Z" G
3) Server端处理完RPC请求后, 将结果通过网络返回给Client端, Client端通过receiveRpcResponse()函数获取结果;
, A. G; E; d9 x4) Client检查结果处理状态( 成功还是失败) , 并将对应Call对象从哈希表中删除。" `! O# t7 k! }4 v6 |9 I" O+ f4 n: z
图3-7 Hadoop RPC Client处理流程9 z% q+ w; k) V9 y5 F, ?) \0 ~
3.ipc.Server类分析
6 M1 m" ?6 C" m& n+ |/ K. W0 jHadoop采用了Master/Slave结构, 其中Master是整个系统的单点, 如NameNode或 JobTracker [8] , 这是制约系统性能和可扩展
& z2 X( P! q$ C9 r Z/ C3 p5 j性的最关键因素之一; 而Master通过ipc.Server接收并处理所有Slave发送的请求, 这就要求ipc.Server 将高并发和可扩展性作为设计
* k! H1 x5 _9 D! b4 p目标。 为此, ipc.Server采用了很多提高并发处理能力的技术, 主要包括线程池、 事件驱动和Reactor设计模式等, 这些技术均采用3 ~7 I: l& f) c0 p
了JDK自带的库实现, 这里重点分析它是如何利用Reactor设计模式提高整体性能的。4 U! ~5 M: N+ p( M6 B+ N
Reactor是并发编程中的一种基于事件驱动的设计模式, 它具有以下两个特点: 通过派发/分离I/O操作事件提高系统的并发性
- B' Z7 A% {# h+ H; s能; 提供了粗粒度的并发控制, 使用单线程实现, 避免了复杂的同步处理。 典型的Reactor实现原理如图3-8所示。
- i( `9 s4 s( E6 P$ i图3-8 Reactor模式工作原理+ t% R+ G- d# o- ^0 |; O
典型的Reactor模式中主要包括以下几个角色。- m+ q- A1 E# s( d
❑Reactor: I/O事件的派发者。
( Q; c9 d$ Z; m! ]" Y7 w) p9 N; u❑Acceptor: 接受来自Client的连接, 建立与Client对应的Handler, 并向Reactor注册此Handler。8 F/ i) U9 S0 r; s( z
❑Handler: 与一个Client通信的实体, 并按一定的过程实现业务的处理。 Handler内部往往会有更进一步的层次划分, 用来抽' r. U" G* X# m) J& t9 u) b& E1 _
象诸如read、 decode、 compute、 encode和send等过程。 在Reactor模式中, 业务逻辑被分散的I/O事件所打破, 所以Handler需要有适/ A3 n6 n. k% j( D! }
当的机制在所需的信息还不全( 读到一半) 的时候保存上下文, 并在下一次I/O事件到来的时候( 另一半可读) 能继续上次中断
( x" P! O1 S+ \; @5 t的处理。
^! \3 i% ]9 h3 m( g* x❑Reader/Sender: 为了加速处理速度, Reactor模式往往构建一个存放数据处理线程的线程池, 这样数据读出后, 立即扔到线 a7 w% O% v: |* s6 d9 j! e" m
程池中等待后续处理即可。 为此, Reactor模式一般分离Handler中的读和写两个过程, 分别注册成单独的读事件和写事件, 并由对 `/ d4 h6 l+ j2 E
应的Reader和Sender线程处理。( I( A$ M3 N$ }% R$ J0 i m2 I
ip2 X6 c6 v: L9 `+ B
c.Server实际上实现了一个典型的Reactor设计模式, 其整体架构与上述完全一致。 一旦读者了解典型Reactor架构便可很容易5 M% Y& w6 q& n% _
地学习ipc.Server的设计思路及实现。 接下来, 我们分析ipc.Server的实现细节。
% B+ I. s+ A; A前面提到, ipc.Server的主要功能是接收来自客户端的RPC请求, 经过调用相应的函数获取结果后, 返回给对应的客户端。 为1 y7 j1 S" f- w- y8 z2 c
此, ipc.Server被划分成3个阶段: 接收请求、 处理请求和返回结果, 如图3-9所示。 各阶段实现细节如下。
! o w: H5 G+ p$ G图3-9 Hadoop RPC Server处理流程2 p; `: X# m) \# [( M6 q# T
( 1) 接收请求
# ~' ?2 s) {* m( Z/ i; A该阶段主要任务是接收来自各个客户端的RPC请求, 并将它们封装成固定的格式( Call类) 放到一个共享队列( callQueue)
7 d6 p' M H/ e中, 以便进行后续处理。 该阶段内部又分为建立连接和接收请求两个子阶段, 分别由Listener和Reader两种线程完成。
@, B5 [# k& A, Y t整个Server只有一个Listener线程, 统一负责监听来自客户端的连接请求, 一旦有新的请求到达, 它会采用轮询的方式从线程
- }2 {0 W& W* m! Q: C1 ^池中选择一个Reader线程进行处理, 而Reader线程可同时存在多个, 它们分别负责接收一部分客户端连接的RPC请求, 至于每个
5 J; S& |) H& ?: @Reader线程负责哪些客户端连接, 完全由Listener决定, 当前Listener只是采用了简单的轮询分配机制。8 H. n! `+ b4 b- b* I. a9 M
Listener和Reader线程内部各自包含一个Selector对象, 分别用于监听SelectionKey.OP_ACCEPT和SelectionKey.OP_READ事件。
" Q) X4 X2 C; ^" {8 Z/ X对于Listener线程, 主循环的实现体是监听是否有新的连接请求到达, 并采用轮询策略选择一个Reader线程处理新连接; 对于
6 B0 q7 k( w( H! ~5 h R* S4 kReader线程, 主循环的实现体是监听( 它负责的那部分) 客户端连接中是否有新的RPC请求到达, 并将新的RPC请求封装成Call对
) ` P1 p& D" i' Q9 F9 g象, 放到共享队列callQueue中。7 a& u2 i' @8 `" J
( 2) 处理请求
0 c" ~! v: b( w7 w5 V该阶段主要任务是从共享队列callQueue中获取Call对象, 执行对应的函数调用, 并将结果返回给客户端, 这全部由Handler线
& n. i$ s' a! R Q/ t$ s' d程完成。
4 w5 m& g8 R& V) aServer端可同时存在多个Handler线程, 它们并行从共享队列中读取Call对象, 经执行对应的函数调用后, 将尝试着直接将结果0 L* Z& n) v6 u6 S; u/ w" P* @
返回给对应的客户端。 但考虑到某些函数调用返回结果很大或者网络速度过慢, 可能难以将结果一次性发送到客户端, 此时. R7 G1 H" H0 O- ]# }0 c1 j! c+ X
Handler将尝试着将后续发送任务交给Responder线程。5 Y0 w1 r6 ~8 w' C
( 3) 返回结果# B. X% u+ b7 ?3 e* K9 x
前面提到, 每个Handler线程执行完函数调用后, 会尝试着将执行结果返回给客户端, 但对于特殊情况, 比如函数调用返回结 {- v: ?0 }3 [5 g
果过大或者网络异常情况( 网速过慢) , 会将发送任务交给Responder线程。
6 j' J* ~7 v6 {3 Z* x1 N* b* ]: WServer端仅存在一个Responder线程, 它的内部包含一个Selector对象, 用于监听SelectionKey.OP_WRITE事件。 当Handler没能将
8 o1 f* U+ B! j8 q, f6 E结果一次性发送到客户端时, 会向该Selector对象注册SelectionKey.OP_WRITE事件, 进而由Responder线程采用异步方式继续发送* ]7 E* L8 [, H5 o# c! E, ~+ x
未发送完成的结果。1 H" F, D' v; }$ W7 `/ @0 [- Y
3.3.6 Hadoop RPC参数调优
, M# r4 E) [4 \" f6 UHadoop RPC对外提供了一些可配置参数, 以便于用户根据业务需求和硬件环境对其进行调优。 主要的配置参数如下。
- @- U% i2 y) i❑Reader线程数目。 由参数ipc.server.read.threadpool.size配置, 默认是1, 也就是说, 默认情况下, 一个RPC Server只包含一个
x. G7 c- I% Q# V( r$ ^8 CReader线程。1 F$ f. T+ M; h+ a0 w+ V! {
❑每个Handler线程对应的最大Call数目。 由参数ipc.server.handler.queue.size指定, 默认是100, 也就是说, 默认情况下, 每个
- A0 A6 k \( N2 v8 k9 G: YHandler线程对应的Call队列长度为100。 比如, 如果Handler数目为10, 则整个Call队列( 即共享队列callQueue) 最大长度为:+ p4 b+ b' W- @0 H) U
100×10=1000。
1 ?0 g8 ~& R* w8 G+ J& S' G* q❑Handler线程数目。 在Hadoop中, ResourceManager和NameNode分别是YARN和HDFS两个子系统中的RPC Server, 其对应的 G5 f- T; T1 o: A* a
Handler数目分别由参数yarn.resourcemanager.resource-tracker.client.thread-count和dfs.namenode.service.handler.count指定, 默认值分别为
: _6 d' A2 X6 D$ Q50和10, 当集群规模较大时, 这两个参数值会大大影响系统性能。/ L2 x& @2 X+ i
❑客户端最大重试次数。 在分布式环境下, 因网络故障或者其他原因迫使客户端重试连接是很常见的, 但尝试次数过多可; m( n7 A! @4 W6 H
能不利于对实时性要求较高的应用。 客户端最大重试次数由参数ipc.client.connect.max.retries指定, 默认值为10, 也就是会连续尝试3 o! r, f3 ] P" j$ U+ v, R
10次( 每两次之间相隔1秒) 。5 }0 i% X4 z$ m. E
3.3.7 YARN RPC实现/ T1 w. Q0 B( X! x" U
当前存在非常多的开源RPC框架, 比较有名 的有Thrift [9] 、 Protocol Buffers和Avro。 同Hadoop RPC一样, 它们均由两部分组
7 o0 Q: m* m2 u/ r) f1 c4 {8 [: g成: 对象序列化和远程过程调用( Protocol Buflers官方仅提供了序列化实现, 未提供远程调用相关实现, 但三 方RPC库非常多 [10]# \ I! U$ ?, f7 M+ d( [# ]. o7 V4 C
) 。 相比于Hadoop RPC, 它们有以下几个特点:
9 Z( ?" w9 o5 s. ~3 W❑跨语言特性 。 前面提到, RPC框架实际上是客户机–服务器模型的一个应用实例, 对于Hadoop RPC而言, 由于Hadoop采用. G# J0 F/ \9 b: n
Java语言编写, 因而其RPC客户端和服务器端仅支持Java语言; 但对于更通用的RPC框架, 如Thrift或者Protocol Buffers等, 其客户
# Q- N- e7 X' D, v! J端和服务器端可采用任何语言编写, 如Java、 C++、 Python等, 这给用户编程带来极大方便。
9 {7 ?7 T2 X. k7 E3 k❑引入IDL 。 开源RPC框架均提供了一套接口描述语言( Interface Description Language, IDL) , 它提供一套通用的数据类型,
9 T1 t7 G# H0 p' d4 b" v8 ^# c并以这些数据类型来定义更为复杂的数据类型和对外服务接口。 一旦用户按照IDL定义的语法编写完接口文件后, 可根据实际应
9 ]$ @8 x- P1 e用需要生成特定编程语言( 如Java、 C++、 Python等) 的客户端和服务器端代码。
) o4 }: O+ V2 X8 o, F; r❑协议兼容性 。 开源RPC框架在设计上均考虑到了协议兼容性问题, 即当协议格式发生改变时, 比如某个类需要添加或者
- U' X) N! ]& j2 t) i删除一个成员变量( 字段) 后, 旧版本代码仍然能识别新格式的数据, 也就是说, 具有向后兼容性。# A" X+ G: w% M3 k7 ^: F9 p
随着Hadoop版本的不断演化, 研发人员发现Hadoop RPC在跨语言支持和协议兼容性两个方面存在不足, 具体表现为:
) c2 c0 b% k2 V5 S: }3 @❑从长远发展看, Hadoop RPC应允许某些协议的客户端或者服务器端采用其他语言实现, 比如用户希望直接使用C/C++语言0 }; a# _" w6 v8 e" B; z1 X; Z3 a
读写HDFS中的文件, 这就需要有C/C++语言的HDFS客户端。
; P7 T- D% ?7 |# c0 w+ F7 u. s; ?1 u❑当前Hadoop版本较多, 而不同版本之间不能通信, 比如0.20.2版本的JobTracker不能与0.21.0版本中的TaskTracker通信, 如
1 `) D1 } K' s5 B5 _: G果用户企图这样做, 会抛出VersionMismatch异常。
, _. x' L/ C8 _3 k% }为了解决以上几个问题, Hadoop YARN将RPC中的序列化部分剥离开, 以便将现有的开源RPC框架集成进来。 经过改进之- z9 g H8 q2 L9 X' S* ^
后, Hadoop RPC的类关系如图3-10所示, RPC类变成了一个工厂, 它将具体的RPC实现授权给RpcEngine实现类, 而现有的开源
1 F' k& L6 z2 Y; G/ s; Z. xRPC只要实现RpcEngine接口, 便可以集成到Hadoop RPC中。 在该图中, WritableRpcEngine是采用Hadoop自带的序列化框架实现的
+ w4 R3 B0 @& t b( |RPC, 而 AvroRpcEngine [11] 和 ProtobufRpcEngine [12] 分别是开源RPC( 或序列化) 框架Apache Avro和Protocol Buffers对应的
) u! P" c' U: F& @: M% gRpcEngine实现, 用户可通过配置参数rpc.engine.{protocol}以指定协议{protocol}采用的序列化方式。 需要注意的是, 当前实现. G+ u) R4 S, H! T: W
中, Hadoop RPC只是采用了这些开源框架的序列化机制, 底层的函数调用机制仍采用Hadoop自带的。- w, p9 a- v/ y
YARN提供的对外类是YarnRPC, 用户只需使用该类便可以构建一个基于Hadoop RPC且采用Protocol Buffers序列化框架的通信: M7 O& L/ `! c. ^
协议。 YarnRPC相关实现类如图3-11所示。 YarnRPC是一个抽象类, 实际的实现由参数yarn.ipc.rpc.class指定, 默认值是
4 {! h3 h4 M( d9 t! h; a, worg.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC。 HadoopYarnProtoRPC通过RPC工厂生成器( 工厂设计模式) RpcFactoryProvider生
$ R! R# p: b- X! l9 o成客户端工厂( 由参数yarn.ipc.client.factory.class指定, 默认值是org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl) 和服
4 c0 B. y6 }7 i0 X& ]务器工厂( 由参数yarn.ipc.server.factory.class指定, 默认值是org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl) , 以根
6 }: M$ y. [5 I ]7 v- a据通信协议的Protocol Buffers定义生成客户端对象和服务器对象。
. k9 w* S; B5 O4 n5 c图3-10 Hadoop RPC 集成多种开源RPC 框架: a% q; c% \: W$ ? d0 j' r
图3-11 YarnRPC 相关类图
: h y$ x1 I$ [7 @- O" A y❑RpcClientFactoryPBImpl: 根据通信协议接口( 实际上就是一个Java interface) 及Protocol Buffers定义构造RPC客户端句柄, 但& v J9 y1 v% S# L
它对通信协议的存放位置和类名命有一定要求。 假设通信协议接口Xxx所在Java包名为XxxPackage, 则客户端实现代码必须位于
3 z( A9 e1 J _8 tJava包XxxPackage.impl.pb.client中( 在接口包名后面增加".impl.pb.client") , 且实现类名为PBClientImplXxx( 在接口名前面增加前
6 i5 Y, T' Q, M缀"PBClientImpl") 。* @# r+ Z& m& n" ~
❑RpcServerFactoryPBImpl: 根据通信协议接口( 实际上就是一个Java interface) 及Protocol Buffers定义构造RPC服务器句柄: {% q H( b6 [0 ~& X5 x; ?2 p
(具体会调用前面节介绍的RPC.Server类) , 但它对通信协议的存放位置和类命名有一定要求。 假设通信协议接口Xxx所在Java包# x' K7 [ h E; U0 B$ v
名为XxxPackage, 则客户端实现代码必须位于Java包XxxPackage.impl.pb.server中(在接口包名后面增加".impl.pb.server") , 且实现
, Y8 v. i" g$ ~+ G* \: E类名为PBServiceImplXxx(在接口名前面增加前缀"PBServiceImpl") 。
7 b2 P) D% I: V/ _9 I# ^Hadoop YARN已将Protocol Buffers作为默认 的序列化机制 [13] (而不是Hadoop自带的Writable) , 这带来的好处主要表现在以9 b0 q9 N0 v) ^( U( t ]+ J1 |
下几个方面:
; K/ k3 H: `* x n9 Z& i❑继承了Protocol Buffers的优势 。 Protocol Buffers已在实践中证明了其高效性、 可扩展性、 紧凑性和跨语言特性。 首先, 它允* { _5 G s* l, v9 [3 `
许在保持向后兼容性的前提下修改协议, 比如为某个定义好的数据格式添加一个新的字段; 其次, 它支持多种语言, 进而方便用
% A3 y6 n/ V& k) g/ R, p户为某些服务(比如HDFS的NameNode) 编写 非Java客户端 [14] ; 此外, 实验表明Protocol Buffers比Hadoop 自带的Writable在性能
+ i" l# Z: b/ t方面有很大提升。4 v' B- A" R6 U4 S0 H' X
❑支持升级回滚 。 Hadoop 2.0已经将NameNode HA方案合并进来, 在该方案中, Name-Node分为Active和Standby两种角色,
* ]# M' u; C. v0 _: D其中, Active NameNode在当前对外提供服务, 而Standby NameNode则是能够在Active NameNode出现故障时接替它。 采用Protocol( v0 ]1 ^7 s9 ]+ }- M
Buffers序列化机制后, 管理员能够在不停止NameNode对外服务的前提下, 通过主备NameNode之间的切换, 依次对主备1 } M& V; u$ I' f( Z
NameNode进行在线升级(不用考虑版本和协议兼容性等问题) 。4 q$ R2 |0 q/ k! V. C, U0 z# D
3.3.8 YARN RPC应用实例: s, K5 R, ~4 X0 _
为了进一步说明YARN RPC的使用方法, 本小节给出一个具体的应用实例。
/ X0 B0 u# R3 k. W& w* g& L在YARN中, ResourceManager和NodeManager之间的通信协议是ResourceTracker, 其中NodeManager是该协议的客户
n) f6 s" y4 y端, ResourceManager是服务端, NodeManager通过该协议中定义的两个RPC函数( registerNodeManager和nodeHeartbeat) 向
- n# I' z' x0 t; `6 SResourceManager注册和周期性发送心跳信息。 ResourceManager(服务器端) 中的相关代码如下:
7 Y$ b: h. W3 P// ResourceTrackerService实现了ResourceTracker通信接口, 并启动RPC Server- h* n& L( A6 o2 t
public class ResourceTrackerService extends AbstractService implements
% A$ A' P1 I1 {/ WResourceTracker {
7 m) M+ R$ I, Q9 u }* c" Pprivate Server server;
3 x2 G- l* @+ _8 I5 ?...3 R6 m1 r( A4 v
protected void serviceStart() throws Exception {
- P; u0 t8 M* gsuper.serviceStart();
2 k+ O- ?9 d7 i7 r- L- ?0 n7 |Configuration conf = getConfig();
. U' N7 `( q5 m" RYarnRPC rpc = YarnRPC.create(conf); //使用YarnRPC类
5 `8 D& H. D0 J7 s+ v- pthis.server = rpc.getServer(ResourceTracker.class, this, resourceTrackerAddress,3 i$ R3 Z- F6 q7 c6 O0 {( _
conf, null, conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT,5 a7 ~( B. a9 d y
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT));* E/ `& ]# [, C5 r4 J. \) v7 m/ C
this.server.start();3 k5 |) C" S% i
}.# O- T5 H; M% S: F% h
..
- n9 H- W- j% `2 V+ z@Override5 o, n% w. Z7 Q+ _" S5 J/ n5 I
public RegisterNodeManagerResponse registerNodeManager(' C# s( P% S! B Z) p
RegisterNodeManagerRequest request) throws YarnException,
# y: q, q) B& C$ Z3 z; i' rIOException {, c5 h* _* n1 B# g" X4 V
//具体实现" i9 ], [+ o* [
}@
) C9 T) I- v9 G; [% Y& NOverride
1 Y$ O" x. L" ^7 Jpublic NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)$ H' g' i3 Q+ }, W
throws YarnException, IOException {
* w5 Q4 A A' @% g+ P//具体实现' W; v1 L' H1 ^8 y; b+ r5 Z
}) C0 d8 z6 `% c/ P( t
}- U# I* S$ F! N- T2 y
NodeManager(客户端) 中的相关代码如下。, k9 ?2 l; k! T% U
// 该函数是从YARN源代码中简单修改而来的9 ~/ v* _' w; F
protected ResourceTracker getRMClient() throws IOException {
X, E5 q J2 \2 Q( A2 q* j2 H, PConfiguration conf = getConfig();
" b& d1 e' g; }* B, XInetSocketAddress rmAddress = getRMAddress(conf, protocol);
- s9 y7 ^4 g) ~4 V' aRetryPolicy retryPolicy = createRetryPolicy(conf);; j+ _& U# e& }6 |. Z! d. {1 Z% n
ResourceTracker proxy = RMProxy.<T>getProxy(conf, ResourceTracker.class, rmAddress);. ~9 T6 d/ X4 m% D
LOG.info("Connecting to ResourceManager at " + rmAddress);4 |4 E; N" }5 K( d0 d
return (ResourceTracker) RetryProxy.create(protocol, proxy, retryPolicy);
8 Z! A: K2 p1 T* z! e}.; w' b+ T. S7 c X2 ]/ F! l
..) K: e' j2 b$ V$ i) `
this.resourceTracker = getRMClient();
+ _2 Z# L2 p0 H- o..." B) S' V4 X0 f! \2 T# ^9 t9 J. e
RegisterNodeManagerResponse regNMResponse = resourceTracker.registerNodeManager(request);4 @0 j' Z3 g( }. x! ?
...9 U4 S3 |3 u& f, G
response = resourceTracker.nodeHeartbeat(request);
$ j. @. o) C9 ]7 f1 b* \为了能够让以上代码正常工作, YARN按照以下流程实现各种功能。
9 p3 w6 C# J9 L3 @步骤1 定义通信协议接口( Java Interface) 。 定义通信协议接口ResourceTracker, 它包含registerNodeManager和nodeHeartbeat& r4 w( p0 w. |9 m- ~
两个函数, 且每个函数包含一个参数和一个返回值, 具体如下:! B- C# ], {) s r }
public interface ResourceTracker {% ~ l2 C+ _( H% j
public RegisterNodeManagerResponse registerNodeManager(9 L) Y4 [8 H; ~
RegisterNodeManagerRequest request) throws YarnException, IOException;0 o7 @+ L6 c. o9 |0 Z# ^: _
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
# t+ V q5 T4 M4 w- p0 j2 [throws YarnException, IOException;5 @( W) p6 L% c* Q1 F% e5 M
}
7 n2 P# N6 c1 N& Y步骤2 为通信协议ResourceTracker提供Protocol Buffers定义和Java实现。 前面提到, Protocol Buffers仅提供了序列化框架, 但
; Y2 }( I1 z# R& i未提供RPC实现, 因此RPC部分需要由用户自己实现, 而YARN则让ResourceTrackerService类实现了ResourceTracker协议, 它的
8 V5 r b; g0 r* g' q' ~/ T6 zProtocol Buffers定义( 具体见文件ResourceTracker.proto) 如下:& i3 G* c2 {( B1 W+ i: h( `
option java_package = "org.apache.hadoop.yarn.proto";$ ^, {7 r, G; D. G' w: [4 ?
option java_outer_classname = "ResourceTracker";! k8 f8 N# q: u, F0 }9 m
option java_generic_services = true;; `/ V* p8 q! o% u
option java_generate_equals_and_hash = true;1 E* C9 }8 g5 H7 j4 w
import "yarn_server_common_service_protos.proto";1 I" O w1 ^' o+ A. o# A: L0 J/ E
service ResourceTrackerService {
5 [, R0 F9 @; T8 d' krpc registerNodeManager(RegisterNodeManagerRequestProto) returns (RegisterNodeManagerResponseProto);
" x1 A+ g: K. [' R$ x [; Yrpc nodeHeartbeat(NodeHeartbeatRequestProto) returns (NodeHeartbeatResponseProto);; g _! z% ?1 _, H) ~+ ]3 X
}
1 o3 h' W$ p# p8 `3 t+ m- yResourceTracker的RPC函数实现是由ResourceManager中的ResourceTrackerService完成的。
+ c/ ^5 U6 q: Y% E步骤3 为RPC函数的参数和返回值提供Protocol Buffers定义。 YARN需要保证每个RPC函数的参数和返回值是采用Protocol* S. W7 j5 `5 @
Buffers定义的, 因此ResourceTracker协议中RegisterNodeManagerRequest、 RegisterNodeManagerResponse、 NodeHeartbeatRequest和* K+ y9 g3 u$ v- ]! `
NodeHeartbeatResponse四个参数或者返回值需要使用Protocol Buffers定义, 具体如下( 见yarn_server_common_service_protos.proto文" b+ ^' d1 s* ?- q7 f
件) :
) q2 W3 L$ e6 u% g& l6 V+ _import "yarn_protos.proto";. m. | m' m/ B* _8 I) Z2 ]
import "yarn_server_common_protos.proto";) o6 s& f9 r! i4 s
message RegisterNodeManagerRequestProto {, t2 o! f8 Z8 ?8 F0 H- C
optional NodeIdProto node_id = 1;
7 J2 G" g# `1 Q/ Q7 P; e& C3 Toptional int32 http_port = 3; O+ p) `: I) H3 G2 F0 x4 M$ @
optional ResourceProto resource = 4;
; u1 m. H: c4 Q- T/ l7 `0 ]2 V5 p% o} m
$ d( Q& g/ W; ~essage RegisterNodeManagerResponseProto {6 P7 w; d8 O8 N, j4 S1 u$ N9 j" A
optional MasterKeyProto container_token_master_key = 1;0 d: [! k t c5 ~, l+ s* D
optional MasterKeyProto nm_token_master_key = 2;
, S) Z" e" B: w& roptional NodeActionProto nodeAction = 3;
( F- z7 P$ S; \% c/ foptional int64 rm_identifier = 4;8 d, I5 i, a; A9 J0 [
optional string diagnostics_message = 5;
! ]: W: e# H) T4 T% N}.' K) C4 v" a; g3 ^
.. //其他几个参数和返回值的定义
0 y$ p* b' a% F8 @) f9 K1 H$ g步骤4 为RPC函数的参数和返回值提供Java定义和封装。 YARN采用了Protocol Buffers作为参数和返回值的序列化框架, 且以
% u+ K3 u* y0 C4 k# w原生态.proto文件的方式给出了定义, 而具体的Java代码生成需在代码编写之后完成。 基于以上考虑, 为了更容易使用Protocol
! l- h; r0 r: x5 |. p) GBuffers生成的( Java语言) 参数和返回值定义, YARN RPC为每个RPC函数的参数和返回值提供Java定义和封装, 以参数
" ]0 q U7 B8 \& I' sRegisterNodeManagerRequest为例进行说明。% n* v1 _# w% c$ c! U2 T
Java接口定义如下( 见Java包org.apache.hadoop.yarn.server.api.protocolrecords) :) n) S' n7 I6 I% _2 w
public interface RegisterNodeManagerRequest {5 d$ f6 W/ Y; |, x5 M7 ~# L
NodeId getNodeId();
. A+ I( w) x! e6 w3 y1 B4 V6 u Oint getHttpPort();. ?0 w$ Y/ Z& m! Y5 i+ J8 f% f! z
Resource getResource();2 _* U, z6 I! R( |# t$ @
void setNodeId(NodeId nodeId);
# J. U1 t5 Q, ^# F& P# |; ?void setHttpPort(int port);
9 }3 m4 Q% [+ K H9 gvoid setResource(Resource resource);
4 k+ c- e( O) {8 E B}
+ b4 {9 d. a; |& Y9 }: oJava封装如下( 见Java包org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb) :8 a1 H ? e# j' p) X
public class RegisterNodeManagerRequestPBImpl extends
- B; V& O+ L R, T. x& hProtoBase<RegisterNodeManagerRequestProto> implements RegisterNodeManagerRequest {
# c1 o, [' d! A) q& TRegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance();( }' d9 I; K2 b0 L9 |3 X
RegisterNodeManagerRequestProto.Builder builder = null;- J6 [' {9 O) @$ [6 ?5 q
private NodeId nodeId = null;
# K* T8 y) h* S& t- P1 N7 h...$ A: G7 w$ d E5 \4 I- D9 q. |
@Override3 V# h! L8 ?: Y }7 i; F
public NodeId getNodeId() {
; E1 J/ m( Q$ C' \2 \. tRegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;. O }4 B( |! U; U
if (this.nodeId != null) { I% C6 }+ `. p1 A! ]
return this.nodeId;
3 W4 o+ O; ~9 Z}i/ c' V% ^( t, p2 h+ A) j/ s$ ]: ?
f (!p.hasNodeId()) {
4 Y. d2 Y0 t) Freturn null;
m+ W1 h4 K6 M% y} t
$ E' d& z6 X, q$ ~: v4 i6 T4 mhis.nodeId = convertFromProtoFormat(p.getNodeId());
4 l+ e" ~8 j P4 |2 z. @1 Y) `& vreturn this.nodeId;
7 |( z+ P: H* p7 q3 \% P& |} @
. S3 [( D; w3 t( a/ tOverride
4 A4 t( [* U8 N; w$ Z* Upublic void setNodeId(NodeId nodeId) {
9 c% }: Q5 w0 h- SmaybeInitBuilder();( g A. `3 Z; [5 j' I
if (nodeId == null)
- q2 N: \: [# B: |% A cbuilder.clearNodeId();
) |) J+ I( N& Q% g: F2 xthis.nodeId = nodeId; K2 r( ~2 r& T, j
} .
2 K/ p( {1 G n5 h* l5 C5 A8 s..# F& Z4 o5 Y7 [7 u
}
& {5 Q1 ^" e4 r0 o G- i' }5 s步骤5 为通信协议提供客户端和服务器端实现。 客户端代码放在org.apache.hadoop.yarn.server.api.impl.pb.client包中, 且类名为3 }- N' R/ j! j
ResourceTrackerPBClientImpl, 实现如下:5 w# I8 ?4 `( h3 W% z& E
public class ResourceTrackerPBClientImpl implements ResourceTracker, Closeable {3 H* U2 Z- r; q$ u3 Z1 _4 P
private ResourceTrackerPB proxy;
, Z+ @- Z# {: X8 ~; F6 K- @public ResourceTrackerPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException {% y) X$ ] @ l; o5 J
RPC.setProtocolEngine(conf, ResourceTrackerPB.class, ProtobufRpcEngine.class);
/ |$ F- S" z L& g! [* b5 Qproxy = (ResourceTrackerPB)RPC.getProxy(6 r3 M! K5 R+ g' E4 {
ResourceTrackerPB.class, clientVersion, addr, conf);0 B0 H- |8 Z. @9 D% Y0 ^
} @
" H1 ?7 L5 Z4 H8 M7 \Override# T9 h* d$ H+ q' y0 _& c
public RegisterNodeManagerResponse registerNodeManager(
% S& M9 w9 m; W2 K9 O8 ]RegisterNodeManagerRequest request) throws YarnException,$ s# A6 e# q- H
IOException {% l v' I5 }. @9 O l: C! A
RegisterNodeManagerRequestProto requestProto = ((RegisterNodeManagerRequestPBImpl)request).getProto();/ w+ H7 `) D5 Z
try {9 u% a3 c/ e8 |" }6 K
return new RegisterNodeManagerResponsePBImpl(proxy.registerNodeManager (null, requestProto));- J6 j. t# V1 T0 Y& a' I
} catch (ServiceException e) {0 S! W+ Q6 s! \+ C1 j4 F
RPCUtil.unwrapAndThrowException(e);' E9 U7 `& t& e' R0 K7 O
return null;
$ x/ K* @; C3 ]1 M5 G1 U+ s ^}0 u9 q' E3 Z1 G
} .
- `% L) h g5 t..
8 X! L2 k+ J F, s6 }( e9 i}3 h& C) t: _& d! \: f+ q
服务端代码放在org.apache.hadoop.yarn.server.api.impl.pb.server包中, 且类名为Resource-TrackerPBServerImpl, 实现如下:
, e2 E- Z# X( v4 j! Y; |public class ResourceTrackerPBServiceImpl implements ResourceTrackerPB {2 P+ A, T2 p% E+ J
private ResourceTracker real;0 s2 _( M' w; z8 ?2 ?' p/ R4 U+ s( @
public ResourceTrackerPBServiceImpl(ResourceTracker impl) {
' X4 q [/ m# M4 Xthis.real = impl;
! \! B$ X( A" N& C+ `# y4 L} @
$ Z5 s" `2 T4 F4 o! ^) h+ C* ~ e* jOverride
% \2 o* V: N" d7 U1 \public RegisterNodeManagerResponseProto registerNodeManager(
# y1 q8 V9 z" Z' U$ sRpcController controller, RegisterNodeManagerRequestProto proto)9 v, Z# J( R6 S# ^
throws ServiceException {, R9 A' c/ O: X% f
RegisterNodeManagerRequestPBImpl request = new RegisterNodeManagerRequestPBImpl(proto);) v- _5 X: ?, z, R
try {$ Z5 Z. w$ C5 c
RegisterNodeManagerResponse response = real.registerNodeManager(request);
5 ]% g E+ c$ V) x0 {' V treturn ((RegisterNodeManagerResponsePBImpl)response).getProto();
+ l3 B4 d8 g$ |& Y$ ?: k: X} catch (YarnException e) {
4 B8 H7 ~ B& n$ Dthrow new ServiceException(e);2 V# X, h b" o6 |+ e9 a1 Y
} catch (IOException e) {+ [7 Y6 \9 Q9 L' k
throw new ServiceException(e);6 ~ ^ ^' f- K# [) ^) i- Z0 K
}
: V' P7 Z/ K5 R5 ^$ f" K2 a} .
9 y% ]5 B6 b: U0 \7 l& a..
/ v7 L) e: B. K4 s3 N/ C2 r0 {}
; ~( ~5 ?0 x# o9 w3 p) B总结上面几个步骤, 为了实现基于Protocol Buffers序列化框架的YARN RPC通信协议ResourceTracker, YARN实现了一系列4 [' l- ]$ n) g' j8 M0 d
Java接口定义和Protocol Buffers封装, 具体如图3-12所示( 以服务器端实现为例) 。
: M- x) y1 k f0 Z0 @! A图3-12 YARN RPC中的Protocol Buffers封装 e3 [$ V5 R! ?5 h! M7 |
[6] 参见网址http://en.wikipedia.org/wiki/Remote_procedure_call。
! d2 H- k3 f2 R/ R8 Y[7] Doug Cutting在Hadoop最初设计时就是这样描述Hadoop RPC设计动机的。- A" q9 f8 `* w P& C% ^% ^
[8] HDFS的单点故障已经在Hadoop 2.0中得到了解决, MRv1中的JobTracker的单点故障在CDH4中也得到了解决。, \' G; j! s6 O5 A) ]
[9] 参见网址http://thrift.apache.org/。! G4 f& J- L9 w, ]4 `
[10] 参见网址: http://code.google.com/p/protobuf/wiki/ThirdParty/AddOns。
; }5 t1 T; I1 H7 f% n, [: E[11] AvroRpcEngine从Hadoop 0.21.0版本开始出现。
" ~( a, u! _( e$ N" S[12] ProtobufRpcEngine从Hadoop 2.0-apha版本开始出现。% _) t+ [+ X. U+ `9 y
[13] 参见网址https://issues.apache.org/jira/browse/HADOOP-7347。- i0 v: P1 i$ {0 @3 T* q
[14] Hadoop 2.0中的RFC框架是采用Java编写的, 尚不能像Thrift和Avro那样支持多语言编程, 但引入Protocol Buffers序列化框架则
: l. b+ ?% a! X! L+ _使其向前迈进了一步。 6 U2 M S6 x8 K1 r/ {7 [2 S
' O0 D( J7 s9 g$ Q! t2 c* B3 G
7 C& ^5 I; i; ` D+ `: | |
|