|
3.3.5 Hadoop RPC类详解& ], O+ @, y* E0 D' F
Hadoop RPC主要由三个大类组成, 即RPC、 Client和Server, 分别对应对外编程接口、 客户端实现和服务器实现。
$ }& x/ a; x/ k- s1 \5 c# J4 w8 l1.ipc.RPC类分析4 k2 A& o: k+ I2 K) {0 ^" Y# I
RPC类实际上是对底层客户机–服务器网络模型的封装, 以便为程序员提供一套更方便简洁的编程接口。
$ r% C& |; C$ Y- B5 j如图3-4所示, RPC类定义了一系列构建和销毁RPC客户端的方法, 构建方法分为getProxy和waitForProxy两类, 销毁方只有一
; H7 B; d- b1 |6 b& T8 P S+ I4 H个, 即为stopProxy。 RPC服务器的构建则由静态内部类RPC.Builder, 该类提供了一些列setXxx方法( Xxx为某个参数名称) 供用户
0 G! g) b5 P' o: ?设置一些基本的参数, 比如RPC协议、 RPC协议实现对象、 服务器绑定地址、 端口号等, 一旦设置完成这些参数后, 可通过调用
`7 t1 _" v4 r6 p1 NRPC.Builder.build()完成一个服务器对象的构建, 之后直接调用Server.start()方法便可以启动该服务器。8 z# J: w. E* {) T8 @$ l; l$ s
与Hadoop 1.x中的RPC仅支持基于Writable序列化方式不同, Hadoop 2.x允许用户使用其他序列化框架, 比如Protocol Buffers6 M0 w+ ^. v! }1 c+ F7 |3 G
等, 目前提供了Writable( WritableRpcEngine) 和Protocol Buffers( ProtobufRpcEngine) 两种, 默认实现是Writable方式, 用户可通过3 S3 U9 A9 [' _0 G4 J# p# q. M
调用RPC.setProtocolEngine(…)修改采用的序列化方式。! ? z' u. @0 W8 Y9 V4 y' B- Y
下面以采用Writable序列化为例( 采用Protocol Buffers的过程类似) , 介绍Hadoop RPC的远程过程调用流程。 Hadoop RPC使用; T) ?: Y3 g( I7 u
了Java动态代理完成对远程方法的调用: 用户只需实现java.lang.reflect.InvocationHandler接口, 并按照自己需求实现invoke 方法即可
' Y" z5 O' _; v3 o4 y& Q完成动态代理类对象上的方法调用。 但对于Hadoop RPC, 函数调用由客户端发出, 并在服务器端执行并返回, 因此不能像单机* [, Y1 x5 t G' N( d
程序那样直接在invoke 方法中本地调用相关函数, 它的做法是, 在invoke方法中, 将函数调用信息( 函数名, 函数参数列表等)
- U \0 r+ B+ b# M+ _$ r打包成可序列化的WritableRpcEngine.Invocation对象, 并通过网络发送给服务器端, 服务端收到该调用信息后, 解析出和函数名,
7 O e) b6 Q( l2 l% f, R. P函数参数列表等信息, 利用Java反射机制完成函数调用, 期间涉及到的类关系如下图所示。
# f+ j: ~# l+ a# k. m图3-4 HadoopRPC的主要类关系图
" q1 `( l ~5 ]- F* e' j# H$ \8 I/ S5 s图3-5 HadoopRPC中服务器端动态代理实现类图
" r: V5 N% U8 o+ Z% l# P6 U2.ipc.Client
7 z( m: e) [ N5 z4 cClient主要完成的功能是发送远程过程调用信息并接收执行结果。 它涉及到的类关系如图3-6所示。 Client类对外提供了一类执
( I: L8 r# ]. U" g# ^; l. i: ~行远程调用的接口, 这些接口的名称一样, 仅仅是参数列表不同, 比如其中一个的声明如下所示:
4 U8 _8 b- z# A. P2 ^7 p' ~1 jpublic Writable call(Writable param, ConnectionIdremoteId)
' h& F, ~0 e, C. X _- I- |throws InterruptedException, IOException;& t6 E7 y( `/ u5 l
图3-6 Client类图
D0 \. T2 ^$ u" I# W1 d0 E! O6 oClient内部有两个重要的内部类, 分别是Call和Connection。2 Q( ^& u! N4 \2 S
❑Call类 : 封装了一个RPC请求, 它包含5个成员变量, 分别是唯一标识id、 函数调用信息param、 函数执行返回值value、 出
Y1 H" P* c3 q( n- R3 ]错或者异常信息error和执行完成标识符done。 由于Hadoop RPC Server采用异步方式处理客户端请求, 这使远程过程调用的发生顺
/ y4 C# }6 h" p7 Y ?6 d2 ^序与结果返回顺序无直接关系, 而Client端正是通过id识别不同的函数调用的。 当客户端向服务器端发送请求时, 只需填充id和- f8 F& [% x: B' v
param两个变量, 而剩下的3个变量( value、 error和done) 则由服务器端根据函数执行情况填充。
; p+ S# L) c& x ~❑Connection类 : Client与每个Server之间维护一个通信连接, 与该连接相关的基本信息及操作被封装到Connection类中, 基本
. s7 v$ Q6 k: f' t信息主要包括通信连接唯一标识( remoteId) 、 与Server端通信的Socket( socket) 、 网络输入数据流( in) 、 网络输出数据流
& ]/ H1 h. _' X% T- |4 ~. [5 h( out) 、 保存RPC请求的哈希表( calls) 等。 操作则包括:' K& v; L% g8 U% x i* J j" i
❍addCall—将一个Call对象添加到哈希表中;
7 h( g4 l, v# ?* s d- t❍sendParam—向服务器端发送RPC请求;
. t/ b$ z4 I0 d! N, R t( W ?❍receiveResponse —从服务器端接收已经处理完成的RPC请求;
2 P; |& E- \" X. X' M❍run—Connection是一个线程类, 它的run方法调用了receiveResponse方法, 会一直等待接收RPC返回结果。
- ]$ m9 [, x, m5 U, ?9 x' P当调用call函数执行某个远程方法时, Client端需要进行( 如图3-7所示) 以下4个步骤。
0 z& l( H" Q9 d1 M1) 创建一个Connection对象, 并将远程方法调用信息封装成Call对象, 放到Connection对象中的哈希表中;
7 o! i5 B, x/ ^/ L0 Q2) 调用Connection类中的sendRpcRequest()方法将当前Call对象发送给Server端;
* p+ p* n0 M8 _8 \+ G3) Server端处理完RPC请求后, 将结果通过网络返回给Client端, Client端通过receiveRpcResponse()函数获取结果;$ C* @4 m F' ?
4) Client检查结果处理状态( 成功还是失败) , 并将对应Call对象从哈希表中删除。
$ p6 n% `+ u2 v) p图3-7 Hadoop RPC Client处理流程1 b1 {' y6 ^( G( X% ?& L3 a" q+ y% A
3.ipc.Server类分析0 u1 K- ^: ~0 Z: n. l* _
Hadoop采用了Master/Slave结构, 其中Master是整个系统的单点, 如NameNode或 JobTracker [8] , 这是制约系统性能和可扩展
: [) W: N2 s4 U性的最关键因素之一; 而Master通过ipc.Server接收并处理所有Slave发送的请求, 这就要求ipc.Server 将高并发和可扩展性作为设计
. F9 C6 c/ e, E6 I% `目标。 为此, ipc.Server采用了很多提高并发处理能力的技术, 主要包括线程池、 事件驱动和Reactor设计模式等, 这些技术均采用+ |& K z! _6 c, I* |: o
了JDK自带的库实现, 这里重点分析它是如何利用Reactor设计模式提高整体性能的。3 C, I- K2 c' b# K3 y% c8 H
Reactor是并发编程中的一种基于事件驱动的设计模式, 它具有以下两个特点: 通过派发/分离I/O操作事件提高系统的并发性2 ^9 ~' ~! W3 ^ s& b
能; 提供了粗粒度的并发控制, 使用单线程实现, 避免了复杂的同步处理。 典型的Reactor实现原理如图3-8所示。7 M7 }: {. m2 z6 D
图3-8 Reactor模式工作原理
& E) v' ?1 t6 L' v2 P0 U典型的Reactor模式中主要包括以下几个角色。6 f: @+ D4 o4 U% C2 d+ s
❑Reactor: I/O事件的派发者。
, J5 O; S, k1 [❑Acceptor: 接受来自Client的连接, 建立与Client对应的Handler, 并向Reactor注册此Handler。! Z0 _. Z8 N, E* A* ^# I0 d
❑Handler: 与一个Client通信的实体, 并按一定的过程实现业务的处理。 Handler内部往往会有更进一步的层次划分, 用来抽- _* B A2 o" d
象诸如read、 decode、 compute、 encode和send等过程。 在Reactor模式中, 业务逻辑被分散的I/O事件所打破, 所以Handler需要有适
& n3 z! q8 @/ K. q当的机制在所需的信息还不全( 读到一半) 的时候保存上下文, 并在下一次I/O事件到来的时候( 另一半可读) 能继续上次中断3 Y5 d4 b0 l; \7 U# E; a7 E- A" w
的处理。, ~! |7 C5 V0 P8 U4 q$ G0 U4 C
❑Reader/Sender: 为了加速处理速度, Reactor模式往往构建一个存放数据处理线程的线程池, 这样数据读出后, 立即扔到线
. M) A3 h) F A程池中等待后续处理即可。 为此, Reactor模式一般分离Handler中的读和写两个过程, 分别注册成单独的读事件和写事件, 并由对! Y* v9 v7 w5 J# O
应的Reader和Sender线程处理。- g: x; A' K5 M/ W: }: N2 d. l, h
ip
! P% a) u3 Q1 J9 fc.Server实际上实现了一个典型的Reactor设计模式, 其整体架构与上述完全一致。 一旦读者了解典型Reactor架构便可很容易+ D% N, `" H6 u9 E
地学习ipc.Server的设计思路及实现。 接下来, 我们分析ipc.Server的实现细节。 O) h8 d- |' c- c$ z
前面提到, ipc.Server的主要功能是接收来自客户端的RPC请求, 经过调用相应的函数获取结果后, 返回给对应的客户端。 为. f/ P6 N3 i9 V6 I! O& c
此, ipc.Server被划分成3个阶段: 接收请求、 处理请求和返回结果, 如图3-9所示。 各阶段实现细节如下。1 S( Z" i |6 d2 G* D
图3-9 Hadoop RPC Server处理流程
P8 @- H' k8 l6 t6 G6 f) M- b( 1) 接收请求
3 D: b% d |+ g该阶段主要任务是接收来自各个客户端的RPC请求, 并将它们封装成固定的格式( Call类) 放到一个共享队列( callQueue)
; ~: M, J8 z. }& o( p) S' d7 c$ m+ f" d中, 以便进行后续处理。 该阶段内部又分为建立连接和接收请求两个子阶段, 分别由Listener和Reader两种线程完成。, x, ^! E1 I! e; r* D |1 a; f& U
整个Server只有一个Listener线程, 统一负责监听来自客户端的连接请求, 一旦有新的请求到达, 它会采用轮询的方式从线程* f" N, T5 {# @) r6 M, b" e6 l4 ]5 w
池中选择一个Reader线程进行处理, 而Reader线程可同时存在多个, 它们分别负责接收一部分客户端连接的RPC请求, 至于每个
% P) o3 J1 s" [6 i9 `6 N) Z8 Z+ u* WReader线程负责哪些客户端连接, 完全由Listener决定, 当前Listener只是采用了简单的轮询分配机制。
+ J- n% @- f7 W) M- M% ]! k7 RListener和Reader线程内部各自包含一个Selector对象, 分别用于监听SelectionKey.OP_ACCEPT和SelectionKey.OP_READ事件。) V7 [) w X% @5 u( F U1 k$ E
对于Listener线程, 主循环的实现体是监听是否有新的连接请求到达, 并采用轮询策略选择一个Reader线程处理新连接; 对于! w2 [& p. j* I8 u4 p) ^; v$ X
Reader线程, 主循环的实现体是监听( 它负责的那部分) 客户端连接中是否有新的RPC请求到达, 并将新的RPC请求封装成Call对 Q" i8 C( g% N7 o, T
象, 放到共享队列callQueue中。
+ T' T6 B. Q% H( 2) 处理请求
9 Z" E+ c% o `6 m+ @; F该阶段主要任务是从共享队列callQueue中获取Call对象, 执行对应的函数调用, 并将结果返回给客户端, 这全部由Handler线, l3 n, Y- K! ^ I. m: f3 R
程完成。6 A; h# h; I( K; ^
Server端可同时存在多个Handler线程, 它们并行从共享队列中读取Call对象, 经执行对应的函数调用后, 将尝试着直接将结果
, e8 B& s; u) T返回给对应的客户端。 但考虑到某些函数调用返回结果很大或者网络速度过慢, 可能难以将结果一次性发送到客户端, 此时
+ D/ A! X4 ]. {3 V% S6 P$ G$ sHandler将尝试着将后续发送任务交给Responder线程。
# O- ]* h6 f- O, t% e( 3) 返回结果
# G: g6 d# w! X' \; Q5 s* Y前面提到, 每个Handler线程执行完函数调用后, 会尝试着将执行结果返回给客户端, 但对于特殊情况, 比如函数调用返回结 c' F8 X# u" ?- ~- H; ~" T( K# N
果过大或者网络异常情况( 网速过慢) , 会将发送任务交给Responder线程。$ i8 B; o( y* Q( a2 L; m$ H
Server端仅存在一个Responder线程, 它的内部包含一个Selector对象, 用于监听SelectionKey.OP_WRITE事件。 当Handler没能将' H+ _8 h5 o4 G! x# o
结果一次性发送到客户端时, 会向该Selector对象注册SelectionKey.OP_WRITE事件, 进而由Responder线程采用异步方式继续发送1 c3 ~2 L7 B; n
未发送完成的结果。
+ K/ U1 K: y1 ~$ P; h4 U( s3.3.6 Hadoop RPC参数调优
9 w. V: X1 E- ]6 ?9 L! D! vHadoop RPC对外提供了一些可配置参数, 以便于用户根据业务需求和硬件环境对其进行调优。 主要的配置参数如下。3 g2 `* L5 |) a% o* Y
❑Reader线程数目。 由参数ipc.server.read.threadpool.size配置, 默认是1, 也就是说, 默认情况下, 一个RPC Server只包含一个% T7 s6 l% w* m, o+ C7 q2 ?. p. Y
Reader线程。
3 ~9 C0 V$ E' x1 T❑每个Handler线程对应的最大Call数目。 由参数ipc.server.handler.queue.size指定, 默认是100, 也就是说, 默认情况下, 每个
3 i n) f5 |8 E" IHandler线程对应的Call队列长度为100。 比如, 如果Handler数目为10, 则整个Call队列( 即共享队列callQueue) 最大长度为:& N4 W& j7 P9 C4 _% D! K5 U
100×10=1000。 W- d B% B7 T7 }
❑Handler线程数目。 在Hadoop中, ResourceManager和NameNode分别是YARN和HDFS两个子系统中的RPC Server, 其对应的
3 z. M% B" h7 i% b' v' f4 pHandler数目分别由参数yarn.resourcemanager.resource-tracker.client.thread-count和dfs.namenode.service.handler.count指定, 默认值分别为
$ \8 [5 H3 j3 \% b4 R8 `50和10, 当集群规模较大时, 这两个参数值会大大影响系统性能。
3 p$ d% Z9 c5 K( t5 y❑客户端最大重试次数。 在分布式环境下, 因网络故障或者其他原因迫使客户端重试连接是很常见的, 但尝试次数过多可+ `% R; j- }5 ^' R9 L: w* D, c: B! k3 Q
能不利于对实时性要求较高的应用。 客户端最大重试次数由参数ipc.client.connect.max.retries指定, 默认值为10, 也就是会连续尝试% M) x c. y* N. u
10次( 每两次之间相隔1秒) 。
; r6 v" A4 n+ s: b) t8 j) e3.3.7 YARN RPC实现
) D( E7 V# p# M; Z/ w# M, h当前存在非常多的开源RPC框架, 比较有名 的有Thrift [9] 、 Protocol Buffers和Avro。 同Hadoop RPC一样, 它们均由两部分组
! p: C O$ O* j3 s2 h! x成: 对象序列化和远程过程调用( Protocol Buflers官方仅提供了序列化实现, 未提供远程调用相关实现, 但三 方RPC库非常多 [10]
' }+ v$ O/ V5 n( \4 |$ P) 。 相比于Hadoop RPC, 它们有以下几个特点:
# G0 m8 T9 ~/ ^; X& l❑跨语言特性 。 前面提到, RPC框架实际上是客户机–服务器模型的一个应用实例, 对于Hadoop RPC而言, 由于Hadoop采用7 a$ V3 J+ l2 Z7 \. P
Java语言编写, 因而其RPC客户端和服务器端仅支持Java语言; 但对于更通用的RPC框架, 如Thrift或者Protocol Buffers等, 其客户
- W3 r& L- z5 I# n+ L8 l/ j& z+ S端和服务器端可采用任何语言编写, 如Java、 C++、 Python等, 这给用户编程带来极大方便。
' @4 }5 [/ R5 Q- s' y❑引入IDL 。 开源RPC框架均提供了一套接口描述语言( Interface Description Language, IDL) , 它提供一套通用的数据类型,
. g0 w0 Y, f( T并以这些数据类型来定义更为复杂的数据类型和对外服务接口。 一旦用户按照IDL定义的语法编写完接口文件后, 可根据实际应
9 i- H4 D- V( q# }, E用需要生成特定编程语言( 如Java、 C++、 Python等) 的客户端和服务器端代码。
# s! o+ r5 `- O) ^; ^( ~❑协议兼容性 。 开源RPC框架在设计上均考虑到了协议兼容性问题, 即当协议格式发生改变时, 比如某个类需要添加或者
# B/ L" Y6 I. T5 ^$ }7 w删除一个成员变量( 字段) 后, 旧版本代码仍然能识别新格式的数据, 也就是说, 具有向后兼容性。
$ B/ `9 G- Y0 I8 j- O q0 k/ e( Z随着Hadoop版本的不断演化, 研发人员发现Hadoop RPC在跨语言支持和协议兼容性两个方面存在不足, 具体表现为:
7 [7 L3 B( q; E$ p9 w# n$ ~❑从长远发展看, Hadoop RPC应允许某些协议的客户端或者服务器端采用其他语言实现, 比如用户希望直接使用C/C++语言* S' Y! x& X; p4 b; i
读写HDFS中的文件, 这就需要有C/C++语言的HDFS客户端。7 O! x! l+ F5 {4 U+ m+ \1 f$ E$ x
❑当前Hadoop版本较多, 而不同版本之间不能通信, 比如0.20.2版本的JobTracker不能与0.21.0版本中的TaskTracker通信, 如
& c; u% p, Z+ e) b" c0 \9 p果用户企图这样做, 会抛出VersionMismatch异常。5 K0 R7 U- h j5 f; d' w8 j
为了解决以上几个问题, Hadoop YARN将RPC中的序列化部分剥离开, 以便将现有的开源RPC框架集成进来。 经过改进之) m3 ?5 k0 B, J- L/ A# e
后, Hadoop RPC的类关系如图3-10所示, RPC类变成了一个工厂, 它将具体的RPC实现授权给RpcEngine实现类, 而现有的开源
' ^, Z# q# P8 M7 V% o6 d3 [+ ORPC只要实现RpcEngine接口, 便可以集成到Hadoop RPC中。 在该图中, WritableRpcEngine是采用Hadoop自带的序列化框架实现的
1 v, P+ N8 a5 @6 GRPC, 而 AvroRpcEngine [11] 和 ProtobufRpcEngine [12] 分别是开源RPC( 或序列化) 框架Apache Avro和Protocol Buffers对应的( J9 U* V2 }. ?% g2 [: q- Z' N
RpcEngine实现, 用户可通过配置参数rpc.engine.{protocol}以指定协议{protocol}采用的序列化方式。 需要注意的是, 当前实现" l& D% r6 F: I" d" c
中, Hadoop RPC只是采用了这些开源框架的序列化机制, 底层的函数调用机制仍采用Hadoop自带的。' w( e8 Q' y7 C% e7 y- q: r
YARN提供的对外类是YarnRPC, 用户只需使用该类便可以构建一个基于Hadoop RPC且采用Protocol Buffers序列化框架的通信8 F# x3 B( G2 y- r- S4 C6 ^- |
协议。 YarnRPC相关实现类如图3-11所示。 YarnRPC是一个抽象类, 实际的实现由参数yarn.ipc.rpc.class指定, 默认值是
3 F' q9 S; j' [3 _4 korg.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC。 HadoopYarnProtoRPC通过RPC工厂生成器( 工厂设计模式) RpcFactoryProvider生
; W2 U: ?. L& P( ^成客户端工厂( 由参数yarn.ipc.client.factory.class指定, 默认值是org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl) 和服
" ~% y) d* u, d! m+ a1 K1 v务器工厂( 由参数yarn.ipc.server.factory.class指定, 默认值是org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl) , 以根
8 Q" a7 D/ C" x/ Q C据通信协议的Protocol Buffers定义生成客户端对象和服务器对象。
" d8 z5 V+ w* w7 x0 Z4 R图3-10 Hadoop RPC 集成多种开源RPC 框架
" p0 T+ {( o9 l+ m# v: p$ d, x图3-11 YarnRPC 相关类图
* M2 y1 Y2 v, k* D( \% x7 b# k❑RpcClientFactoryPBImpl: 根据通信协议接口( 实际上就是一个Java interface) 及Protocol Buffers定义构造RPC客户端句柄, 但
J/ G `% ^' k( c/ T/ [- S. R' A& |3 Q它对通信协议的存放位置和类名命有一定要求。 假设通信协议接口Xxx所在Java包名为XxxPackage, 则客户端实现代码必须位于& g! ^5 x1 }! O" d
Java包XxxPackage.impl.pb.client中( 在接口包名后面增加".impl.pb.client") , 且实现类名为PBClientImplXxx( 在接口名前面增加前 x C* p" w: n3 x- _$ m
缀"PBClientImpl") 。$ G/ l0 x) V/ T4 V! a
❑RpcServerFactoryPBImpl: 根据通信协议接口( 实际上就是一个Java interface) 及Protocol Buffers定义构造RPC服务器句柄" A7 C- S' g5 V" d j6 h
(具体会调用前面节介绍的RPC.Server类) , 但它对通信协议的存放位置和类命名有一定要求。 假设通信协议接口Xxx所在Java包" t9 T$ M4 w/ d8 K4 J; t
名为XxxPackage, 则客户端实现代码必须位于Java包XxxPackage.impl.pb.server中(在接口包名后面增加".impl.pb.server") , 且实现/ @& ]7 w7 {0 v3 d' I( a, h/ y
类名为PBServiceImplXxx(在接口名前面增加前缀"PBServiceImpl") 。8 q- Z! f; Z4 ^" W4 ]5 K b
Hadoop YARN已将Protocol Buffers作为默认 的序列化机制 [13] (而不是Hadoop自带的Writable) , 这带来的好处主要表现在以
6 z& C7 z% |% O# D( y5 W0 x下几个方面:! n6 ^2 l& g+ l* H
❑继承了Protocol Buffers的优势 。 Protocol Buffers已在实践中证明了其高效性、 可扩展性、 紧凑性和跨语言特性。 首先, 它允
6 h9 S& s: A3 Q, q& q+ A* _+ B" D许在保持向后兼容性的前提下修改协议, 比如为某个定义好的数据格式添加一个新的字段; 其次, 它支持多种语言, 进而方便用
% n0 ~' L( D- E; Z$ n# U3 {户为某些服务(比如HDFS的NameNode) 编写 非Java客户端 [14] ; 此外, 实验表明Protocol Buffers比Hadoop 自带的Writable在性能# ~* \' U( ]3 k6 t# u
方面有很大提升。- E/ g) S' p- Y1 m: [9 `3 W" f% s
❑支持升级回滚 。 Hadoop 2.0已经将NameNode HA方案合并进来, 在该方案中, Name-Node分为Active和Standby两种角色,
3 }# _* \4 `" E- e4 _7 |# y其中, Active NameNode在当前对外提供服务, 而Standby NameNode则是能够在Active NameNode出现故障时接替它。 采用Protocol
0 M( X! g x0 O& kBuffers序列化机制后, 管理员能够在不停止NameNode对外服务的前提下, 通过主备NameNode之间的切换, 依次对主备8 W# v; k+ c1 x8 g
NameNode进行在线升级(不用考虑版本和协议兼容性等问题) 。+ i0 L0 X; X: t5 K
3.3.8 YARN RPC应用实例
5 r$ E/ k# i% M% t为了进一步说明YARN RPC的使用方法, 本小节给出一个具体的应用实例。
$ B3 T2 S% Z c6 ?: L在YARN中, ResourceManager和NodeManager之间的通信协议是ResourceTracker, 其中NodeManager是该协议的客户
3 \& w! K1 P+ J+ O5 h, i6 c, M端, ResourceManager是服务端, NodeManager通过该协议中定义的两个RPC函数( registerNodeManager和nodeHeartbeat) 向
( U* [9 {3 N2 E) r& s: RResourceManager注册和周期性发送心跳信息。 ResourceManager(服务器端) 中的相关代码如下:8 i/ i! t) w4 w8 j x; c( V3 }3 a
// ResourceTrackerService实现了ResourceTracker通信接口, 并启动RPC Server
0 D+ g4 K( j! P F2 h: Mpublic class ResourceTrackerService extends AbstractService implements( i7 v8 K+ |8 I( a
ResourceTracker {
3 {5 I1 \7 f- O: U+ lprivate Server server;
* |+ T7 Z. L% {6 Z% l# H9 Y D.... h5 Q# L1 n3 o( j( L3 _! y; R
protected void serviceStart() throws Exception {
* }& J* ^0 e9 I; P4 s+ Z1 Qsuper.serviceStart();, ~% q) w$ k' S/ o, d. L* u
Configuration conf = getConfig();
' R6 z) G; O: h) b, I3 z) NYarnRPC rpc = YarnRPC.create(conf); //使用YarnRPC类
7 z/ b4 t+ ?/ |/ v& @- e. Hthis.server = rpc.getServer(ResourceTracker.class, this, resourceTrackerAddress,# W/ F( R, m( G- |/ V" F0 z0 R
conf, null, conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT,
6 x$ T; d5 J: C; V5 k' H4 e4 oYarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT));: W; Y5 H$ J, R4 J
this.server.start();9 A5 R# k9 f+ z! m. D: ]
}.; O n7 m/ U0 b. e t( G9 R$ N
..
% v# S- q: ]) m- ^$ q@Override
* n1 k, ~) I9 T: Y- B+ Bpublic RegisterNodeManagerResponse registerNodeManager(- M/ W: g+ X1 P) ?; r
RegisterNodeManagerRequest request) throws YarnException,5 I6 y, g! o6 `+ ^& l- ]6 H
IOException {
/ A$ T8 I. Q( W& S. {% \9 w//具体实现
0 a1 X, f9 K8 X) ^4 q* e# o}@; ]$ W8 r1 g' h# W
Override% ]: I* j* X9 C6 `6 W
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)& s y H# Y+ D; o4 t- k" G* _5 O: z
throws YarnException, IOException {
8 Q6 W r$ I2 u5 }5 { L//具体实现
" g% @; d, ^) W7 x0 g. S} B9 | t2 h, F0 o: n
}% g- m' O. _4 X3 ]* `9 @: s
NodeManager(客户端) 中的相关代码如下。" m% i3 t& Z( M, h+ i g y6 g
// 该函数是从YARN源代码中简单修改而来的
9 o: l |9 `; v6 o! X1 _4 ?protected ResourceTracker getRMClient() throws IOException {; P! Y* C7 _( w3 G% e
Configuration conf = getConfig();
+ [. y1 g* E# R+ N. O; T2 ~InetSocketAddress rmAddress = getRMAddress(conf, protocol);* b; X2 \3 T5 j; A- A2 j% V
RetryPolicy retryPolicy = createRetryPolicy(conf);
& I4 n1 ^- n2 ZResourceTracker proxy = RMProxy.<T>getProxy(conf, ResourceTracker.class, rmAddress);
" z" y% ~- b7 t `* t' X+ z4 ?LOG.info("Connecting to ResourceManager at " + rmAddress);* v6 e- a' y, `6 j4 v
return (ResourceTracker) RetryProxy.create(protocol, proxy, retryPolicy);
1 Y4 @1 B# [" [' |, k/ `0 R}.4 t5 \/ M% j1 {* w. B! E; d
..
% ^# C3 S2 _1 J6 |2 e6 Z! Sthis.resourceTracker = getRMClient();% J5 c# V- j9 O0 t8 M- i7 s$ L
...
- ^* h9 d1 H. I$ J+ T* N# z/ n6 jRegisterNodeManagerResponse regNMResponse = resourceTracker.registerNodeManager(request);
1 N6 A& y8 I3 u$ [, q" C...% L) Z% e$ v, u, j& F! t Y
response = resourceTracker.nodeHeartbeat(request);
7 s g/ i+ s) m% e为了能够让以上代码正常工作, YARN按照以下流程实现各种功能。
) }. ^2 f8 O4 u/ k( o; b步骤1 定义通信协议接口( Java Interface) 。 定义通信协议接口ResourceTracker, 它包含registerNodeManager和nodeHeartbeat1 d3 G( o( G- E% M e; [
两个函数, 且每个函数包含一个参数和一个返回值, 具体如下:4 @5 N" h& X+ W3 t5 ]" A
public interface ResourceTracker {
* V' D) d0 o: M; opublic RegisterNodeManagerResponse registerNodeManager(
& z8 l H: T9 K6 r4 w" D7 dRegisterNodeManagerRequest request) throws YarnException, IOException;; c2 o e. s P" |) ]
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
4 p! t; ~& a% A# l. xthrows YarnException, IOException;
3 ^3 e4 m5 y7 W/ C) ?0 E}
3 Q. [& \% G" c/ F步骤2 为通信协议ResourceTracker提供Protocol Buffers定义和Java实现。 前面提到, Protocol Buffers仅提供了序列化框架, 但
- O( h& X$ j9 }, H未提供RPC实现, 因此RPC部分需要由用户自己实现, 而YARN则让ResourceTrackerService类实现了ResourceTracker协议, 它的$ f/ p5 v$ p0 ?1 [
Protocol Buffers定义( 具体见文件ResourceTracker.proto) 如下:/ j, b8 d& i- q f/ Z+ b# t
option java_package = "org.apache.hadoop.yarn.proto";* O+ P5 N8 K: q: X$ W. a# R, q1 O0 S
option java_outer_classname = "ResourceTracker";
. }, Q' _& R' Z0 s. Boption java_generic_services = true;2 A/ g0 ?8 e2 {! h- z/ w' }& i3 q
option java_generate_equals_and_hash = true;, ]- V, V$ n1 q# H7 X' U
import "yarn_server_common_service_protos.proto";$ R! D4 c% ]" p# `! W
service ResourceTrackerService {
2 O) Y3 S/ b8 S$ O+ Xrpc registerNodeManager(RegisterNodeManagerRequestProto) returns (RegisterNodeManagerResponseProto);4 v& P3 R2 q# s5 v7 n
rpc nodeHeartbeat(NodeHeartbeatRequestProto) returns (NodeHeartbeatResponseProto);
6 ~% w* q7 x' c4 s}$ @# [3 b) O C8 B' @ t1 ~
ResourceTracker的RPC函数实现是由ResourceManager中的ResourceTrackerService完成的。* A# F# Z9 }1 w& W+ I9 @8 X
步骤3 为RPC函数的参数和返回值提供Protocol Buffers定义。 YARN需要保证每个RPC函数的参数和返回值是采用Protocol
& T! Z( D/ M; g( D1 d2 i+ HBuffers定义的, 因此ResourceTracker协议中RegisterNodeManagerRequest、 RegisterNodeManagerResponse、 NodeHeartbeatRequest和& ~5 k6 a) W+ w" {& O
NodeHeartbeatResponse四个参数或者返回值需要使用Protocol Buffers定义, 具体如下( 见yarn_server_common_service_protos.proto文
+ f7 Y3 u2 k5 d5 I! `0 t4 ?1 i/ e件) :
- p9 d# g4 H e) ^# Z/ k1 Bimport "yarn_protos.proto";
. Q% D( j8 J8 u4 I8 ~" O! Y7 zimport "yarn_server_common_protos.proto";
+ n" \7 y$ }, Q4 O3 S/ M+ ^8 amessage RegisterNodeManagerRequestProto {
; P) @5 |4 E; H8 z& ?) S' s9 f& W4 h# foptional NodeIdProto node_id = 1;
" L/ ]+ X; T. w) doptional int32 http_port = 3;9 W: [' i, {4 o+ J" k& k; q
optional ResourceProto resource = 4;! ^: U5 ~8 C8 k& L$ W
} m% Z$ F6 B5 M f4 |
essage RegisterNodeManagerResponseProto {
4 a, M- q+ T' z- E: roptional MasterKeyProto container_token_master_key = 1;
; a9 M2 m4 n* ~' |% Zoptional MasterKeyProto nm_token_master_key = 2;
( |% v2 E. F6 zoptional NodeActionProto nodeAction = 3;3 E6 V: d9 J* L6 J) A7 w5 V
optional int64 rm_identifier = 4;8 V- x7 s6 \# k9 ?
optional string diagnostics_message = 5;
/ K: `4 u6 c4 x4 {% N; p}.- V! g0 |* W) a8 }6 I( x
.. //其他几个参数和返回值的定义# x% b1 U: l" U+ t3 H C& q8 ?
步骤4 为RPC函数的参数和返回值提供Java定义和封装。 YARN采用了Protocol Buffers作为参数和返回值的序列化框架, 且以& j) ]1 B$ M5 f1 Z# F7 Q0 R+ Y
原生态.proto文件的方式给出了定义, 而具体的Java代码生成需在代码编写之后完成。 基于以上考虑, 为了更容易使用Protocol+ ?- P+ g+ j [2 g5 U
Buffers生成的( Java语言) 参数和返回值定义, YARN RPC为每个RPC函数的参数和返回值提供Java定义和封装, 以参数
$ x! ?9 _/ u9 t! Z* k: k' \RegisterNodeManagerRequest为例进行说明。& q4 o7 ]5 S. [2 `- Y' h& g
Java接口定义如下( 见Java包org.apache.hadoop.yarn.server.api.protocolrecords) :, T1 `5 S" H% C* f0 B- q
public interface RegisterNodeManagerRequest {+ E0 }3 D+ M: P4 K) U
NodeId getNodeId();* T9 V! E s; k
int getHttpPort();7 k4 T- f9 H- z: ]( Q
Resource getResource();
* i8 w3 E$ ^0 A8 Z o* m: {void setNodeId(NodeId nodeId);
1 u- F/ Z- k' H5 A }. Q! ?9 mvoid setHttpPort(int port);
0 b% E6 }* l8 o) h9 N* Z: gvoid setResource(Resource resource);1 o9 g% P1 s+ F3 ?+ B
}
! M! k# b. t4 R% x$ zJava封装如下( 见Java包org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb) :
/ s3 U" V0 H: s: L. _public class RegisterNodeManagerRequestPBImpl extends
5 T/ C S* [2 x& C: wProtoBase<RegisterNodeManagerRequestProto> implements RegisterNodeManagerRequest {/ x3 j7 U7 h- c- W1 a" [% S: B6 T
RegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance();" q3 d( i/ K D; g7 I7 d1 n
RegisterNodeManagerRequestProto.Builder builder = null;
- s& O3 H& w+ b& f7 s1 ~0 Tprivate NodeId nodeId = null;9 I8 p. F/ ^( u0 Q8 y( d4 q4 f2 b
...: [- B6 Y; X: j' z% `* N: H
@Override2 N+ R4 v( U# L" y+ E7 g* u
public NodeId getNodeId() {
% D9 p" C7 b O# J, c3 d; B7 DRegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;# ~( B3 P$ u3 _- L+ n8 \
if (this.nodeId != null) {3 w+ q8 B1 ]' m. I# \" }
return this.nodeId;7 j. C' G C5 L4 `# W1 f5 o
}i
. m8 W3 \6 F/ df (!p.hasNodeId()) {
. c' ~/ I( c @- G5 |$ S9 q- m: ~return null;
. }4 D" Z) X5 X2 z3 w! a} t
/ u$ S* f% x- W; j1 u5 L& z5 U3 O2 this.nodeId = convertFromProtoFormat(p.getNodeId());& W9 e& H. z; F4 p
return this.nodeId;! J" {" e* ~. M" {1 ]
} @
1 v/ A: P3 K: c$ `% ~* ]' o$ _Override
7 u; F9 _0 _: \public void setNodeId(NodeId nodeId) {- G! e* P2 e2 m! @3 j3 @
maybeInitBuilder();8 o" [9 M6 L A+ B& N L
if (nodeId == null)0 B: u5 Y3 w7 j- `
builder.clearNodeId();, D/ K% s z# D' z9 \
this.nodeId = nodeId;
7 A; n1 e% J! @" v} .+ Z3 a; s$ y1 ]
..5 I5 \- z% S, L5 E
}
( _2 O) l7 N/ x# f8 H- ^5 T" m步骤5 为通信协议提供客户端和服务器端实现。 客户端代码放在org.apache.hadoop.yarn.server.api.impl.pb.client包中, 且类名为
8 ]7 I7 P9 k( P0 {ResourceTrackerPBClientImpl, 实现如下:. ^2 a# ]9 G3 s/ E0 a8 i
public class ResourceTrackerPBClientImpl implements ResourceTracker, Closeable {; B2 z r. P. |& s$ T( v8 q
private ResourceTrackerPB proxy;
6 g3 i$ I0 N3 X" ?2 H5 Gpublic ResourceTrackerPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException {
- t; Z. S' [; s8 i; ^$ V3 f; wRPC.setProtocolEngine(conf, ResourceTrackerPB.class, ProtobufRpcEngine.class);
4 ^, W3 T" J& o) E3 Yproxy = (ResourceTrackerPB)RPC.getProxy(% _' D% ~0 ?2 I* c+ v. j
ResourceTrackerPB.class, clientVersion, addr, conf);% D% [' b( {- S5 D
} @
X( U& E5 c* g! BOverride: u! \; i6 C7 P
public RegisterNodeManagerResponse registerNodeManager(
0 J/ k( N4 ^$ {8 E$ b" W* [RegisterNodeManagerRequest request) throws YarnException,( Q; ~# T- \+ Y
IOException {
. @& L: W3 F, k: R7 ]: i1 m- h8 DRegisterNodeManagerRequestProto requestProto = ((RegisterNodeManagerRequestPBImpl)request).getProto();
/ y5 h4 @- G7 c2 P, [' M1 Stry {
) \$ h \% p( Jreturn new RegisterNodeManagerResponsePBImpl(proxy.registerNodeManager (null, requestProto));
9 t: X+ B- Q9 q" `7 f* m1 P `: J% h} catch (ServiceException e) {
5 N1 w; g; ^$ }) N" cRPCUtil.unwrapAndThrowException(e);: k5 U: N- K. u& | z( i- G2 C% z
return null;
* n0 Y! b! @2 R2 G$ Y9 u3 b R}% K* @* t7 X, O
} .
! x0 `+ h6 Y3 b..
8 R+ N5 R/ j- E9 Z V}) ]) W2 D9 v0 n
服务端代码放在org.apache.hadoop.yarn.server.api.impl.pb.server包中, 且类名为Resource-TrackerPBServerImpl, 实现如下:6 W) J# w5 T- I/ G5 C) f
public class ResourceTrackerPBServiceImpl implements ResourceTrackerPB {
% k$ ~# t0 D7 J1 B: i gprivate ResourceTracker real;6 Z' T" S6 {9 ` B6 j' y
public ResourceTrackerPBServiceImpl(ResourceTracker impl) {
1 z0 B) V% E+ t! a+ ythis.real = impl; r* q* {+ v7 h/ s1 t
} @3 ~. s7 D8 n0 J# P: h0 g. ?
Override# z" `1 \" v; p
public RegisterNodeManagerResponseProto registerNodeManager(; b* z D1 s( c' q
RpcController controller, RegisterNodeManagerRequestProto proto)
9 Q2 }- \4 `. ^1 a* d( Fthrows ServiceException {
6 C7 O+ w. `4 k/ k3 G/ \2 RRegisterNodeManagerRequestPBImpl request = new RegisterNodeManagerRequestPBImpl(proto);
6 ?- l/ F8 V* b- Etry {
' a V) a) V9 M! v5 P( zRegisterNodeManagerResponse response = real.registerNodeManager(request);
& g. I% q% d/ s' z5 J5 L% jreturn ((RegisterNodeManagerResponsePBImpl)response).getProto();
# @3 q: F; ~/ a4 G- I} catch (YarnException e) {0 L7 ? }" ]; ~" o! d
throw new ServiceException(e);
6 q9 n; W. h+ }( g; w' i$ w} catch (IOException e) {) H$ B5 r4 @" P
throw new ServiceException(e);
- ?. k( e* N# t! [/ Y}
& J; v+ l( x% ]& B8 H- w} .
B- Q% ?. c2 I' t$ a..
5 @* R$ t2 ^' F9 E}, T( D& M! X3 i g9 r8 K# [: c) Z
总结上面几个步骤, 为了实现基于Protocol Buffers序列化框架的YARN RPC通信协议ResourceTracker, YARN实现了一系列
$ _9 E1 Z7 d' U8 j, _! jJava接口定义和Protocol Buffers封装, 具体如图3-12所示( 以服务器端实现为例) 。! _! p( Q2 Z8 q1 z2 t
图3-12 YARN RPC中的Protocol Buffers封装
! \$ K8 c( j* p9 M[6] 参见网址http://en.wikipedia.org/wiki/Remote_procedure_call。( H* F9 ]3 W3 B9 l
[7] Doug Cutting在Hadoop最初设计时就是这样描述Hadoop RPC设计动机的。: a! s) ]! u+ W8 }
[8] HDFS的单点故障已经在Hadoop 2.0中得到了解决, MRv1中的JobTracker的单点故障在CDH4中也得到了解决。/ \* k4 V8 y+ ~) J+ }+ T
[9] 参见网址http://thrift.apache.org/。; u; ^# E+ E) w
[10] 参见网址: http://code.google.com/p/protobuf/wiki/ThirdParty/AddOns。+ y+ e* |) A* W6 }# N e
[11] AvroRpcEngine从Hadoop 0.21.0版本开始出现。
" Y& \3 _3 G/ m7 l4 B3 X[12] ProtobufRpcEngine从Hadoop 2.0-apha版本开始出现。5 ]6 D+ Y3 H$ t
[13] 参见网址https://issues.apache.org/jira/browse/HADOOP-7347。5 w" @) T) c2 R# L$ L8 K
[14] Hadoop 2.0中的RFC框架是采用Java编写的, 尚不能像Thrift和Avro那样支持多语言编程, 但引入Protocol Buffers序列化框架则
7 B+ B J Y3 ^+ k, [, i6 T使其向前迈进了一步。
3 U' j& ?0 O# W4 k* l3 w8 c
) [8 W+ ^+ v% z6 @. {- N" g8 u
|
|