|
3.3.5 Hadoop RPC类详解
. v, Q6 A& i4 u m. GHadoop RPC主要由三个大类组成, 即RPC、 Client和Server, 分别对应对外编程接口、 客户端实现和服务器实现。% }. M: P, k3 M. F3 u: b
1.ipc.RPC类分析
' [. _% X+ y7 W- T0 |9 N8 tRPC类实际上是对底层客户机–服务器网络模型的封装, 以便为程序员提供一套更方便简洁的编程接口。
; O- o, c- D# v$ C如图3-4所示, RPC类定义了一系列构建和销毁RPC客户端的方法, 构建方法分为getProxy和waitForProxy两类, 销毁方只有一
- t% W6 K5 V6 m6 \; u# J( O& b8 D个, 即为stopProxy。 RPC服务器的构建则由静态内部类RPC.Builder, 该类提供了一些列setXxx方法( Xxx为某个参数名称) 供用户
$ B7 U7 m8 Z4 a! p# N* p# `6 |设置一些基本的参数, 比如RPC协议、 RPC协议实现对象、 服务器绑定地址、 端口号等, 一旦设置完成这些参数后, 可通过调用
e1 d& P' L' X1 z/ m) w qRPC.Builder.build()完成一个服务器对象的构建, 之后直接调用Server.start()方法便可以启动该服务器。
( l( T$ g9 k4 W2 e1 o0 a- j与Hadoop 1.x中的RPC仅支持基于Writable序列化方式不同, Hadoop 2.x允许用户使用其他序列化框架, 比如Protocol Buffers
+ L r* p7 g6 O: h: R' F等, 目前提供了Writable( WritableRpcEngine) 和Protocol Buffers( ProtobufRpcEngine) 两种, 默认实现是Writable方式, 用户可通过7 L" r2 M5 z) s- G( d' k$ v
调用RPC.setProtocolEngine(…)修改采用的序列化方式。
7 p; k1 f4 e; X* ~下面以采用Writable序列化为例( 采用Protocol Buffers的过程类似) , 介绍Hadoop RPC的远程过程调用流程。 Hadoop RPC使用' y1 d9 l. S I! d$ W' ?! @7 z9 V
了Java动态代理完成对远程方法的调用: 用户只需实现java.lang.reflect.InvocationHandler接口, 并按照自己需求实现invoke 方法即可7 C* e+ v! @. j9 r5 B3 L5 J4 G
完成动态代理类对象上的方法调用。 但对于Hadoop RPC, 函数调用由客户端发出, 并在服务器端执行并返回, 因此不能像单机
0 p/ b& t4 K3 n0 `* v3 [, ?$ c程序那样直接在invoke 方法中本地调用相关函数, 它的做法是, 在invoke方法中, 将函数调用信息( 函数名, 函数参数列表等)
' {' E6 o4 c" ?1 j |* M; a# x打包成可序列化的WritableRpcEngine.Invocation对象, 并通过网络发送给服务器端, 服务端收到该调用信息后, 解析出和函数名,! Z* K. A. F7 I# W. e
函数参数列表等信息, 利用Java反射机制完成函数调用, 期间涉及到的类关系如下图所示。( K5 O2 R; [; s; v4 ^( i5 O
图3-4 HadoopRPC的主要类关系图+ L; G) E9 ?0 \& I, V( p) g& e
图3-5 HadoopRPC中服务器端动态代理实现类图 Q" h* A! a0 g4 P: P5 G k8 O9 B
2.ipc.Client
* G7 ^3 ~8 `% a( R- m- h IClient主要完成的功能是发送远程过程调用信息并接收执行结果。 它涉及到的类关系如图3-6所示。 Client类对外提供了一类执
. H4 i$ y) k8 r5 m' z1 `+ s9 @行远程调用的接口, 这些接口的名称一样, 仅仅是参数列表不同, 比如其中一个的声明如下所示:
6 E2 Y9 I" b, { u0 N. ^# T+ xpublic Writable call(Writable param, ConnectionIdremoteId)
& F! _, T' l3 s9 M1 m( T1 nthrows InterruptedException, IOException;( y* P) K: N; f8 L% \, a8 F
图3-6 Client类图
% b" t) H2 I1 R/ l( c8 U2 B/ q8 i: W" cClient内部有两个重要的内部类, 分别是Call和Connection。$ w$ N9 Z6 p* V4 _
❑Call类 : 封装了一个RPC请求, 它包含5个成员变量, 分别是唯一标识id、 函数调用信息param、 函数执行返回值value、 出
/ H2 O# |& R0 L1 N2 D! `1 o }! J错或者异常信息error和执行完成标识符done。 由于Hadoop RPC Server采用异步方式处理客户端请求, 这使远程过程调用的发生顺
+ x9 W$ U0 ?4 G. S! C序与结果返回顺序无直接关系, 而Client端正是通过id识别不同的函数调用的。 当客户端向服务器端发送请求时, 只需填充id和
( @& }5 K0 g0 {' z0 G) R0 B; u- L z" K9 Xparam两个变量, 而剩下的3个变量( value、 error和done) 则由服务器端根据函数执行情况填充。' A9 Z5 J" ~9 ?, n. O. K
❑Connection类 : Client与每个Server之间维护一个通信连接, 与该连接相关的基本信息及操作被封装到Connection类中, 基本
; Q9 R. m! g- M- I! L' v信息主要包括通信连接唯一标识( remoteId) 、 与Server端通信的Socket( socket) 、 网络输入数据流( in) 、 网络输出数据流, j. x& S, V: ^; i, T
( out) 、 保存RPC请求的哈希表( calls) 等。 操作则包括:
2 V2 J- m0 T0 ?, k, S* U! H❍addCall—将一个Call对象添加到哈希表中;; f' P5 Y* e7 J T! l
❍sendParam—向服务器端发送RPC请求;
3 v/ x% m- @, N; ^& N❍receiveResponse —从服务器端接收已经处理完成的RPC请求;# S& [$ ^5 w7 L b/ Z4 d. @
❍run—Connection是一个线程类, 它的run方法调用了receiveResponse方法, 会一直等待接收RPC返回结果。
! D. C, O8 n. \当调用call函数执行某个远程方法时, Client端需要进行( 如图3-7所示) 以下4个步骤。" z; A! ^; D/ T. [7 i3 L1 m
1) 创建一个Connection对象, 并将远程方法调用信息封装成Call对象, 放到Connection对象中的哈希表中;1 s8 w2 C) y: z8 G3 S# n
2) 调用Connection类中的sendRpcRequest()方法将当前Call对象发送给Server端;. }( L+ I) P! H
3) Server端处理完RPC请求后, 将结果通过网络返回给Client端, Client端通过receiveRpcResponse()函数获取结果;6 I* L# G% f, j9 F
4) Client检查结果处理状态( 成功还是失败) , 并将对应Call对象从哈希表中删除。, }1 t+ [: \: o$ s! D
图3-7 Hadoop RPC Client处理流程
$ |7 c' K8 r# P# T6 W5 ~' j3.ipc.Server类分析
, [" l/ `9 g' x+ S5 vHadoop采用了Master/Slave结构, 其中Master是整个系统的单点, 如NameNode或 JobTracker [8] , 这是制约系统性能和可扩展' f5 G$ I! l# Q9 F9 E) r
性的最关键因素之一; 而Master通过ipc.Server接收并处理所有Slave发送的请求, 这就要求ipc.Server 将高并发和可扩展性作为设计! [9 J- C/ s8 s' ]- M+ o
目标。 为此, ipc.Server采用了很多提高并发处理能力的技术, 主要包括线程池、 事件驱动和Reactor设计模式等, 这些技术均采用
- _. k+ \, K: X& k2 m+ u% ]( v6 l" R了JDK自带的库实现, 这里重点分析它是如何利用Reactor设计模式提高整体性能的。 X [6 O( v) ~9 v8 N5 V1 P
Reactor是并发编程中的一种基于事件驱动的设计模式, 它具有以下两个特点: 通过派发/分离I/O操作事件提高系统的并发性2 `6 m6 _# V1 G+ \& c) t
能; 提供了粗粒度的并发控制, 使用单线程实现, 避免了复杂的同步处理。 典型的Reactor实现原理如图3-8所示。
9 P5 x, L+ C. |+ ~" B% i- e0 F图3-8 Reactor模式工作原理. A- n0 ^% f0 f1 v4 c
典型的Reactor模式中主要包括以下几个角色。( g |! O5 q0 [
❑Reactor: I/O事件的派发者。) z4 U3 |2 X ]: e9 ]; J
❑Acceptor: 接受来自Client的连接, 建立与Client对应的Handler, 并向Reactor注册此Handler。
t+ [2 c0 O+ j1 w, p❑Handler: 与一个Client通信的实体, 并按一定的过程实现业务的处理。 Handler内部往往会有更进一步的层次划分, 用来抽
! `6 W H& @& n0 E! N2 ]象诸如read、 decode、 compute、 encode和send等过程。 在Reactor模式中, 业务逻辑被分散的I/O事件所打破, 所以Handler需要有适, w, R' B& U( u/ X; F
当的机制在所需的信息还不全( 读到一半) 的时候保存上下文, 并在下一次I/O事件到来的时候( 另一半可读) 能继续上次中断9 a# {8 Z2 Z+ B9 _. f
的处理。
7 j" ]# s, b4 F7 J" T7 {& r❑Reader/Sender: 为了加速处理速度, Reactor模式往往构建一个存放数据处理线程的线程池, 这样数据读出后, 立即扔到线2 ?4 ?0 }/ m7 O
程池中等待后续处理即可。 为此, Reactor模式一般分离Handler中的读和写两个过程, 分别注册成单独的读事件和写事件, 并由对
4 H. P* ]; ~: e应的Reader和Sender线程处理。. T; N! L9 L5 K
ip* z8 O1 R/ T4 o3 P
c.Server实际上实现了一个典型的Reactor设计模式, 其整体架构与上述完全一致。 一旦读者了解典型Reactor架构便可很容易. a) L2 ]. m, ]" {% r6 S/ g) g
地学习ipc.Server的设计思路及实现。 接下来, 我们分析ipc.Server的实现细节。
+ V- I4 j8 f9 Q# L7 X& r, q: N2 b前面提到, ipc.Server的主要功能是接收来自客户端的RPC请求, 经过调用相应的函数获取结果后, 返回给对应的客户端。 为4 P- {! E R( e# ^+ P( r
此, ipc.Server被划分成3个阶段: 接收请求、 处理请求和返回结果, 如图3-9所示。 各阶段实现细节如下。6 n! T8 E, K* N5 g! |: {% P
图3-9 Hadoop RPC Server处理流程, P- b( L5 i% h: C
( 1) 接收请求
- k/ n& Y* w( b. ^2 v! b该阶段主要任务是接收来自各个客户端的RPC请求, 并将它们封装成固定的格式( Call类) 放到一个共享队列( callQueue)
" H/ b. a* n6 i( x( d, _中, 以便进行后续处理。 该阶段内部又分为建立连接和接收请求两个子阶段, 分别由Listener和Reader两种线程完成。, ]5 F) F+ m. ? Q% @/ q" ^2 i- {
整个Server只有一个Listener线程, 统一负责监听来自客户端的连接请求, 一旦有新的请求到达, 它会采用轮询的方式从线程
. Z- t& B9 C# n1 H池中选择一个Reader线程进行处理, 而Reader线程可同时存在多个, 它们分别负责接收一部分客户端连接的RPC请求, 至于每个' z. k: T8 s. m! R4 |' l$ N. T" j
Reader线程负责哪些客户端连接, 完全由Listener决定, 当前Listener只是采用了简单的轮询分配机制。" _4 u+ K A1 ^1 W
Listener和Reader线程内部各自包含一个Selector对象, 分别用于监听SelectionKey.OP_ACCEPT和SelectionKey.OP_READ事件。
- @+ F+ T- W, L9 A6 y' J对于Listener线程, 主循环的实现体是监听是否有新的连接请求到达, 并采用轮询策略选择一个Reader线程处理新连接; 对于
( F: j8 T& N8 L ?* U7 N& v7 b; J5 `. KReader线程, 主循环的实现体是监听( 它负责的那部分) 客户端连接中是否有新的RPC请求到达, 并将新的RPC请求封装成Call对
, A3 K0 w* h# r8 i象, 放到共享队列callQueue中。 I' h+ X7 e5 Q: t0 w! \
( 2) 处理请求
* j! U) B( ~' e* K" ~该阶段主要任务是从共享队列callQueue中获取Call对象, 执行对应的函数调用, 并将结果返回给客户端, 这全部由Handler线
5 P; `: b/ ?, S) [程完成。
$ b" E+ u& D0 I6 s( JServer端可同时存在多个Handler线程, 它们并行从共享队列中读取Call对象, 经执行对应的函数调用后, 将尝试着直接将结果: Q6 I) _* A; L
返回给对应的客户端。 但考虑到某些函数调用返回结果很大或者网络速度过慢, 可能难以将结果一次性发送到客户端, 此时) @2 Q- R( v! u
Handler将尝试着将后续发送任务交给Responder线程。
$ K% u+ t0 h$ Z! B# `- |( S( 3) 返回结果
$ B+ h. W8 @7 [前面提到, 每个Handler线程执行完函数调用后, 会尝试着将执行结果返回给客户端, 但对于特殊情况, 比如函数调用返回结
d# x8 i5 `* x2 e# J果过大或者网络异常情况( 网速过慢) , 会将发送任务交给Responder线程。 @1 |$ E/ C) I5 K# M7 m+ w+ H4 C$ p1 k
Server端仅存在一个Responder线程, 它的内部包含一个Selector对象, 用于监听SelectionKey.OP_WRITE事件。 当Handler没能将, P: c8 H8 C3 r/ A/ F# @1 d
结果一次性发送到客户端时, 会向该Selector对象注册SelectionKey.OP_WRITE事件, 进而由Responder线程采用异步方式继续发送
! g" B- \7 A! f4 \9 j1 i* b( j# \, ]未发送完成的结果。
# L8 B% B3 k9 d# G. u; W$ p$ `3.3.6 Hadoop RPC参数调优
- M+ }$ u1 U4 V" i- O6 EHadoop RPC对外提供了一些可配置参数, 以便于用户根据业务需求和硬件环境对其进行调优。 主要的配置参数如下。% v( t* U0 {9 _# s# y0 j9 H
❑Reader线程数目。 由参数ipc.server.read.threadpool.size配置, 默认是1, 也就是说, 默认情况下, 一个RPC Server只包含一个
7 Y2 a' {* o' F TReader线程。* R. b1 a# ^( z; z" Q/ }
❑每个Handler线程对应的最大Call数目。 由参数ipc.server.handler.queue.size指定, 默认是100, 也就是说, 默认情况下, 每个1 v8 i- N) u% Y P* F
Handler线程对应的Call队列长度为100。 比如, 如果Handler数目为10, 则整个Call队列( 即共享队列callQueue) 最大长度为:( I) V$ y+ m- J% _( H
100×10=1000。0 P# M$ e+ m" `$ ]6 b! p
❑Handler线程数目。 在Hadoop中, ResourceManager和NameNode分别是YARN和HDFS两个子系统中的RPC Server, 其对应的: E( ]2 A; r9 ~+ H7 M' Y2 s9 w$ B
Handler数目分别由参数yarn.resourcemanager.resource-tracker.client.thread-count和dfs.namenode.service.handler.count指定, 默认值分别为
2 O/ b- x+ ]6 ?. @ [* O4 C50和10, 当集群规模较大时, 这两个参数值会大大影响系统性能。
2 l, H" S" q/ U4 m3 ~" l❑客户端最大重试次数。 在分布式环境下, 因网络故障或者其他原因迫使客户端重试连接是很常见的, 但尝试次数过多可" x5 m" m! n5 X7 ?7 i
能不利于对实时性要求较高的应用。 客户端最大重试次数由参数ipc.client.connect.max.retries指定, 默认值为10, 也就是会连续尝试; ~4 R7 B( \3 X2 l1 n! [# P; o, u6 y; w
10次( 每两次之间相隔1秒) 。
0 |$ ~8 X+ c: p4 x. U: D3.3.7 YARN RPC实现& V; T8 t5 Z& l- R
当前存在非常多的开源RPC框架, 比较有名 的有Thrift [9] 、 Protocol Buffers和Avro。 同Hadoop RPC一样, 它们均由两部分组: f3 `9 V% d7 _
成: 对象序列化和远程过程调用( Protocol Buflers官方仅提供了序列化实现, 未提供远程调用相关实现, 但三 方RPC库非常多 [10]
5 m. H2 H( |* D& [5 L1 G) 。 相比于Hadoop RPC, 它们有以下几个特点:/ _. r2 z! u; E7 C, B) J/ `+ f$ l
❑跨语言特性 。 前面提到, RPC框架实际上是客户机–服务器模型的一个应用实例, 对于Hadoop RPC而言, 由于Hadoop采用
- \5 ?3 K/ m* T8 ]) d+ R% _Java语言编写, 因而其RPC客户端和服务器端仅支持Java语言; 但对于更通用的RPC框架, 如Thrift或者Protocol Buffers等, 其客户
+ H7 Z$ t6 _0 K( Q% |3 `# s. o端和服务器端可采用任何语言编写, 如Java、 C++、 Python等, 这给用户编程带来极大方便。
0 B" B1 Z4 ~, g/ h❑引入IDL 。 开源RPC框架均提供了一套接口描述语言( Interface Description Language, IDL) , 它提供一套通用的数据类型,6 z5 M( H+ V" a* a( N$ j/ n
并以这些数据类型来定义更为复杂的数据类型和对外服务接口。 一旦用户按照IDL定义的语法编写完接口文件后, 可根据实际应2 p* g# k: d8 K8 H C
用需要生成特定编程语言( 如Java、 C++、 Python等) 的客户端和服务器端代码。
# d1 Q& _; }/ e V) f- F❑协议兼容性 。 开源RPC框架在设计上均考虑到了协议兼容性问题, 即当协议格式发生改变时, 比如某个类需要添加或者, F5 m8 V; a0 ~$ U R. C/ M
删除一个成员变量( 字段) 后, 旧版本代码仍然能识别新格式的数据, 也就是说, 具有向后兼容性。1 L+ O1 F: y. h+ c
随着Hadoop版本的不断演化, 研发人员发现Hadoop RPC在跨语言支持和协议兼容性两个方面存在不足, 具体表现为:
8 T) R( ^* i4 R a% H9 x❑从长远发展看, Hadoop RPC应允许某些协议的客户端或者服务器端采用其他语言实现, 比如用户希望直接使用C/C++语言
( p% e6 z, b, W1 o9 M9 x& `读写HDFS中的文件, 这就需要有C/C++语言的HDFS客户端。$ d2 h8 g- v* h6 E" p* Z" s
❑当前Hadoop版本较多, 而不同版本之间不能通信, 比如0.20.2版本的JobTracker不能与0.21.0版本中的TaskTracker通信, 如
' m: p1 L( R' b果用户企图这样做, 会抛出VersionMismatch异常。3 J: W" {! |$ I& ]1 T1 T4 o
为了解决以上几个问题, Hadoop YARN将RPC中的序列化部分剥离开, 以便将现有的开源RPC框架集成进来。 经过改进之
: q- `6 f1 y9 e. v" e/ {7 D! V后, Hadoop RPC的类关系如图3-10所示, RPC类变成了一个工厂, 它将具体的RPC实现授权给RpcEngine实现类, 而现有的开源0 j, l3 p5 a3 v1 f! v$ O0 e$ Z
RPC只要实现RpcEngine接口, 便可以集成到Hadoop RPC中。 在该图中, WritableRpcEngine是采用Hadoop自带的序列化框架实现的
6 i7 m/ H% I8 C6 H O& y7 mRPC, 而 AvroRpcEngine [11] 和 ProtobufRpcEngine [12] 分别是开源RPC( 或序列化) 框架Apache Avro和Protocol Buffers对应的# @: r! {! ^ w1 }$ V7 V: C( U
RpcEngine实现, 用户可通过配置参数rpc.engine.{protocol}以指定协议{protocol}采用的序列化方式。 需要注意的是, 当前实现
1 C: p8 y6 e9 f' W$ ^* G. J7 J中, Hadoop RPC只是采用了这些开源框架的序列化机制, 底层的函数调用机制仍采用Hadoop自带的。4 `7 [; Z# g! \: t# P
YARN提供的对外类是YarnRPC, 用户只需使用该类便可以构建一个基于Hadoop RPC且采用Protocol Buffers序列化框架的通信. h( B$ ^) x7 P3 x$ ?& l
协议。 YarnRPC相关实现类如图3-11所示。 YarnRPC是一个抽象类, 实际的实现由参数yarn.ipc.rpc.class指定, 默认值是5 `* [3 A8 C, _4 J, B1 x6 R
org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC。 HadoopYarnProtoRPC通过RPC工厂生成器( 工厂设计模式) RpcFactoryProvider生( Q$ { R) |3 b, {8 D
成客户端工厂( 由参数yarn.ipc.client.factory.class指定, 默认值是org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl) 和服
5 @! J9 E/ x+ \4 q7 Z3 E! y务器工厂( 由参数yarn.ipc.server.factory.class指定, 默认值是org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl) , 以根
; b( J, V7 x4 {! a, k$ e: u" L ?据通信协议的Protocol Buffers定义生成客户端对象和服务器对象。
4 j$ p E+ C1 O ~3 O* q, Y; P图3-10 Hadoop RPC 集成多种开源RPC 框架! `8 S# C* w2 F& \; `2 |
图3-11 YarnRPC 相关类图/ ^; V) x6 W! V0 U ?5 ]
❑RpcClientFactoryPBImpl: 根据通信协议接口( 实际上就是一个Java interface) 及Protocol Buffers定义构造RPC客户端句柄, 但) }, P+ Z) m7 _8 [8 Z' R. R' N6 B
它对通信协议的存放位置和类名命有一定要求。 假设通信协议接口Xxx所在Java包名为XxxPackage, 则客户端实现代码必须位于
% ~( I, K* `' FJava包XxxPackage.impl.pb.client中( 在接口包名后面增加".impl.pb.client") , 且实现类名为PBClientImplXxx( 在接口名前面增加前
4 L" I- u7 I! ~6 \+ g缀"PBClientImpl") 。3 A* ]- W. y+ c
❑RpcServerFactoryPBImpl: 根据通信协议接口( 实际上就是一个Java interface) 及Protocol Buffers定义构造RPC服务器句柄
3 P9 ]& H: A; ?# g/ O/ m4 |, M* T! x(具体会调用前面节介绍的RPC.Server类) , 但它对通信协议的存放位置和类命名有一定要求。 假设通信协议接口Xxx所在Java包
1 v) ~; w2 j- ]) t9 J& V0 m+ T* a! i名为XxxPackage, 则客户端实现代码必须位于Java包XxxPackage.impl.pb.server中(在接口包名后面增加".impl.pb.server") , 且实现
: R- D: ]% k- _7 I. h; K类名为PBServiceImplXxx(在接口名前面增加前缀"PBServiceImpl") 。% f) t! ^! c- E. }/ S# Y' l1 i
Hadoop YARN已将Protocol Buffers作为默认 的序列化机制 [13] (而不是Hadoop自带的Writable) , 这带来的好处主要表现在以' C+ x8 ^5 \. n; p& V
下几个方面:' X0 w7 i, z$ m) E+ @& y- S
❑继承了Protocol Buffers的优势 。 Protocol Buffers已在实践中证明了其高效性、 可扩展性、 紧凑性和跨语言特性。 首先, 它允! ]& o' x* j! Q# f' O g9 _( v
许在保持向后兼容性的前提下修改协议, 比如为某个定义好的数据格式添加一个新的字段; 其次, 它支持多种语言, 进而方便用! o9 R6 @2 L6 `* i9 S% h w
户为某些服务(比如HDFS的NameNode) 编写 非Java客户端 [14] ; 此外, 实验表明Protocol Buffers比Hadoop 自带的Writable在性能
# C4 t1 @6 U7 d% L, n+ h& K: ^: B方面有很大提升。4 j' l6 L& l0 N. t" o+ p
❑支持升级回滚 。 Hadoop 2.0已经将NameNode HA方案合并进来, 在该方案中, Name-Node分为Active和Standby两种角色,
( E% q E; x: t( H4 Z1 x. F其中, Active NameNode在当前对外提供服务, 而Standby NameNode则是能够在Active NameNode出现故障时接替它。 采用Protocol
/ P9 I- y4 t! x2 V2 KBuffers序列化机制后, 管理员能够在不停止NameNode对外服务的前提下, 通过主备NameNode之间的切换, 依次对主备
! L4 h) ]( m% c9 hNameNode进行在线升级(不用考虑版本和协议兼容性等问题) 。$ O7 W7 v4 l2 z s \
3.3.8 YARN RPC应用实例; f4 e9 N1 W; W* v9 _) C
为了进一步说明YARN RPC的使用方法, 本小节给出一个具体的应用实例。/ @) x) K# g& Z9 Y/ F2 g7 ~; {
在YARN中, ResourceManager和NodeManager之间的通信协议是ResourceTracker, 其中NodeManager是该协议的客户3 k: B) n' o. e6 g$ _. f+ _% i
端, ResourceManager是服务端, NodeManager通过该协议中定义的两个RPC函数( registerNodeManager和nodeHeartbeat) 向
+ ]8 d- r$ [; I/ \* n: n! {ResourceManager注册和周期性发送心跳信息。 ResourceManager(服务器端) 中的相关代码如下:8 g5 d" k# q' F% p
// ResourceTrackerService实现了ResourceTracker通信接口, 并启动RPC Server
; \4 r$ z/ n: F# d4 b! U' l2 Mpublic class ResourceTrackerService extends AbstractService implements) ?) H. K6 {& ^3 M. J# J
ResourceTracker {3 r7 i( Z. B! F6 Y
private Server server;
' @; T$ e& b6 ^1 C/ P/ N... g! Y; [ J& G3 U& T
protected void serviceStart() throws Exception {
6 G. h! X- b5 T+ Ksuper.serviceStart();, M* J H' d* F2 W, @" D0 E1 @- ?
Configuration conf = getConfig();% Q8 |+ O- a' O1 a% {
YarnRPC rpc = YarnRPC.create(conf); //使用YarnRPC类1 k& M6 p1 [% [; J
this.server = rpc.getServer(ResourceTracker.class, this, resourceTrackerAddress,9 N3 G' p) J0 P) M9 j$ N$ \# _
conf, null, conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT,& x* B4 N' i" N4 v. R% i8 O
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT));
$ i" R' s& Q$ o, {( Sthis.server.start();
, ]# m: v8 k$ {6 s+ \ y# D) B0 q+ y}.
, I" |6 c% T* V6 i2 D) a( o..& k _6 y$ y1 ?$ h; K e5 T0 h X( b
@Override+ q) \! P1 @. f% u; @' M4 V
public RegisterNodeManagerResponse registerNodeManager(
* |1 h) ^3 e) d9 ^' HRegisterNodeManagerRequest request) throws YarnException,+ M5 [; M0 T J4 I G( d8 c/ `
IOException {" q+ }$ K* Q* ^
//具体实现
0 s0 F3 n. ?- G% U; t: |}@
! s+ Y0 c& U# d5 k2 \Override# u4 f8 }( Q( L# G3 ?1 ]
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
" }' [8 f5 {8 ~! F! o# Othrows YarnException, IOException {, T l5 C: B; T( D& b; R
//具体实现
+ e$ P* i" c2 y( Q4 h2 K}
9 W* {# x! D6 n- F0 ?; l}
; {' n8 _4 U* C/ d0 eNodeManager(客户端) 中的相关代码如下。
# D5 U. F9 z! _ k4 _, B- A. t// 该函数是从YARN源代码中简单修改而来的
6 W/ D+ B2 m2 {/ B* oprotected ResourceTracker getRMClient() throws IOException {
9 j; D, r) M% eConfiguration conf = getConfig();
5 z! i4 C: z' `, v3 S6 {2 uInetSocketAddress rmAddress = getRMAddress(conf, protocol);
! q8 s# [+ F h! J- ERetryPolicy retryPolicy = createRetryPolicy(conf);* k h: K2 G5 v& p
ResourceTracker proxy = RMProxy.<T>getProxy(conf, ResourceTracker.class, rmAddress);
8 R' V: {' R* ?5 xLOG.info("Connecting to ResourceManager at " + rmAddress);$ i7 n" H' _, K4 p3 q* z4 k
return (ResourceTracker) RetryProxy.create(protocol, proxy, retryPolicy);
# R$ k5 J0 o, p( P7 r3 S6 z}.
4 C, o. a: f) T* L0 h, v..9 e6 U# ~: ?8 W
this.resourceTracker = getRMClient();
) D- j0 d' x, z9 {& h" A...
9 P4 v; y5 i& B8 R5 b/ O, fRegisterNodeManagerResponse regNMResponse = resourceTracker.registerNodeManager(request);
& T" M2 Z- F, o0 \" [...
# W* j/ O3 S. q, U; K: _response = resourceTracker.nodeHeartbeat(request);$ ~% y' G! O6 h, M
为了能够让以上代码正常工作, YARN按照以下流程实现各种功能。
6 z7 m# D: W; d; n4 O- Q步骤1 定义通信协议接口( Java Interface) 。 定义通信协议接口ResourceTracker, 它包含registerNodeManager和nodeHeartbeat
) z. E, U/ W9 v8 T( E) H% K两个函数, 且每个函数包含一个参数和一个返回值, 具体如下:
% {+ @# y, S( R) g# spublic interface ResourceTracker {! o/ ]* }" q7 H# \, I. B' F6 d
public RegisterNodeManagerResponse registerNodeManager(
5 y( r) m' r, ?8 n) h3 n) r8 WRegisterNodeManagerRequest request) throws YarnException, IOException;; e4 d/ K. J9 M/ Z
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
) I& r: v/ s* s( g* a! n ` ethrows YarnException, IOException;3 f3 K, _% T2 z! B! n' x( ^5 u
}$ f! z. J, [8 m5 z8 L& x6 N# S* m: X$ Y1 ]
步骤2 为通信协议ResourceTracker提供Protocol Buffers定义和Java实现。 前面提到, Protocol Buffers仅提供了序列化框架, 但
6 i! S3 p. r P% T f4 j' p未提供RPC实现, 因此RPC部分需要由用户自己实现, 而YARN则让ResourceTrackerService类实现了ResourceTracker协议, 它的5 J# D' E/ ]% X1 R9 K4 z0 R! @
Protocol Buffers定义( 具体见文件ResourceTracker.proto) 如下:
5 g' S) x8 |$ d6 b$ y! \9 yoption java_package = "org.apache.hadoop.yarn.proto";5 O8 p( m( H$ X, r, I; g( P
option java_outer_classname = "ResourceTracker";% V6 v# h: ^1 c0 N) y) x# |, r) C
option java_generic_services = true;
. }; v8 s# n+ E T1 o4 U' e: }option java_generate_equals_and_hash = true;3 l1 |! z, B! Q
import "yarn_server_common_service_protos.proto";- A6 F& E0 D7 h. R6 u" E c& h* D
service ResourceTrackerService {
& n& _ W$ y# Xrpc registerNodeManager(RegisterNodeManagerRequestProto) returns (RegisterNodeManagerResponseProto);& a h2 D# v* r- G# A5 ^) ^
rpc nodeHeartbeat(NodeHeartbeatRequestProto) returns (NodeHeartbeatResponseProto);
+ C6 w% t/ C1 i+ b* U}
+ f+ _: a! u& g# Z" r; ~ResourceTracker的RPC函数实现是由ResourceManager中的ResourceTrackerService完成的。
3 Y8 i( x, x( h; t步骤3 为RPC函数的参数和返回值提供Protocol Buffers定义。 YARN需要保证每个RPC函数的参数和返回值是采用Protocol
* n6 _( T& E6 r$ Y1 V0 K, r. S( `0 pBuffers定义的, 因此ResourceTracker协议中RegisterNodeManagerRequest、 RegisterNodeManagerResponse、 NodeHeartbeatRequest和8 Y7 ? c( G% B$ o" N) X/ `
NodeHeartbeatResponse四个参数或者返回值需要使用Protocol Buffers定义, 具体如下( 见yarn_server_common_service_protos.proto文+ V/ ^4 i( Z2 N7 ?9 J
件) :
0 _1 c. r+ j2 [/ g, s$ |import "yarn_protos.proto";
7 F4 K: S- J1 [9 q! n! A9 T5 p5 Fimport "yarn_server_common_protos.proto";3 v: F1 y2 }- X; d7 `
message RegisterNodeManagerRequestProto {
2 \) v2 [4 b+ C# k5 v5 T! `optional NodeIdProto node_id = 1;% b @! o" G9 d6 j8 ^+ S% G9 I
optional int32 http_port = 3;: X5 D# v3 s3 r9 F0 w
optional ResourceProto resource = 4;
+ O+ n% e+ u3 @& s2 h" o} m
: y* m1 n$ |+ J4 }* H, gessage RegisterNodeManagerResponseProto {1 N2 c% p; W/ _! Q' W+ A5 }% o1 `
optional MasterKeyProto container_token_master_key = 1;
8 \) u$ m) X( v) c z* y% qoptional MasterKeyProto nm_token_master_key = 2;
) S# j8 v& n) o3 ~: [2 ^. Koptional NodeActionProto nodeAction = 3;
4 G6 r% c3 a0 P: L& R+ R- I! p$ }# `optional int64 rm_identifier = 4;( k# |& ?& M1 s3 f
optional string diagnostics_message = 5;7 G. J% K. q6 }
}.
$ |" R* s- N( |* h5 x, V.. //其他几个参数和返回值的定义
8 n/ l- a1 g% i0 a8 f9 g* R9 B步骤4 为RPC函数的参数和返回值提供Java定义和封装。 YARN采用了Protocol Buffers作为参数和返回值的序列化框架, 且以
* d( r' R2 P0 d原生态.proto文件的方式给出了定义, 而具体的Java代码生成需在代码编写之后完成。 基于以上考虑, 为了更容易使用Protocol" P d6 C0 ^- O. g
Buffers生成的( Java语言) 参数和返回值定义, YARN RPC为每个RPC函数的参数和返回值提供Java定义和封装, 以参数6 T0 A! r7 L* O. ~
RegisterNodeManagerRequest为例进行说明。3 v! v/ n- A8 b* Q4 k
Java接口定义如下( 见Java包org.apache.hadoop.yarn.server.api.protocolrecords) :8 E4 h& n x( ^# c
public interface RegisterNodeManagerRequest {* r: l2 y' R$ T B' p7 t
NodeId getNodeId();
, b$ \2 p0 u# G- o1 L3 ]4 d# xint getHttpPort();
9 a5 u( g/ r" a: P2 J: G1 P0 T! RResource getResource();. h, R# \) r& ^: [! w! D; p
void setNodeId(NodeId nodeId);
8 w! d8 A* L8 w& Uvoid setHttpPort(int port);" l/ f5 w( _0 N# a" i
void setResource(Resource resource);6 r7 j$ J- j% F5 W9 u
}
6 `! c- L6 i8 E3 g V( f7 PJava封装如下( 见Java包org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb) :4 I& s; I) B, r4 G8 `1 X6 m1 G* ~
public class RegisterNodeManagerRequestPBImpl extends
/ P# i( {& u# t1 [+ sProtoBase<RegisterNodeManagerRequestProto> implements RegisterNodeManagerRequest {
/ j' M% y7 D( x0 J! I4 O( E1 cRegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance();& [ i* d5 N* H# t& N# c4 B% M
RegisterNodeManagerRequestProto.Builder builder = null;
$ w' B6 ~# q; R! iprivate NodeId nodeId = null;5 q9 X) d* Y4 }& i
...
* S9 Y# u. F; S( X@Override( c3 G( |" g/ R7 F' a
public NodeId getNodeId() {' X# D+ F9 ^1 [9 |' F
RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;2 R/ u4 j. W" _0 R
if (this.nodeId != null) {) s/ n2 B# V5 r
return this.nodeId;
1 e! N7 x$ ^3 Q* ]0 b9 M& X}i
c2 y% k+ B0 g0 m! ~9 p" zf (!p.hasNodeId()) {
- J7 c, C% n0 Preturn null;
3 }6 w9 S" W* k9 \% u" i} t
5 O$ ?) I/ _" N+ F4 [his.nodeId = convertFromProtoFormat(p.getNodeId());
! t7 e# o# R9 Mreturn this.nodeId;" c- T% `7 h k# B
} @
% `, E: x# C% L3 g+ _' `Override* b0 }9 g7 Z- @+ f. P- s/ j
public void setNodeId(NodeId nodeId) {( k* ^5 W" v( O
maybeInitBuilder();
+ @0 J+ I- t0 F7 G7 J2 h) dif (nodeId == null)
3 ]- }- k w2 Nbuilder.clearNodeId();0 o! q! l& g, l P2 u4 s' T
this.nodeId = nodeId;
6 I8 }; g$ p+ I) M: h} .2 v9 ], a4 L0 J" p9 h; X, u2 | b
..
% c( G' C9 d* b8 N( k}5 f0 x. Y }, A9 \/ f
步骤5 为通信协议提供客户端和服务器端实现。 客户端代码放在org.apache.hadoop.yarn.server.api.impl.pb.client包中, 且类名为* ?1 x! Y; D" O" V c
ResourceTrackerPBClientImpl, 实现如下:
5 |; p4 O* S. v1 ?# Bpublic class ResourceTrackerPBClientImpl implements ResourceTracker, Closeable {
# ^: i& H5 r/ h' n4 f! A9 aprivate ResourceTrackerPB proxy;
% H) J+ V0 q+ R' _# ?public ResourceTrackerPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException {8 Q$ I- ?. k( R/ ~; l
RPC.setProtocolEngine(conf, ResourceTrackerPB.class, ProtobufRpcEngine.class);
- Q5 f5 _) O) b! i: Tproxy = (ResourceTrackerPB)RPC.getProxy(/ x3 b5 ~0 d/ \+ B5 X
ResourceTrackerPB.class, clientVersion, addr, conf);
% Q" u/ y8 p% q' o! U7 A} @- @& T! _' f# o6 Q
Override
+ J2 o+ _0 g# Q4 }4 o6 m5 B0 s4 ipublic RegisterNodeManagerResponse registerNodeManager(
9 D3 a" j$ D" Z4 }8 c& |4 b9 P! gRegisterNodeManagerRequest request) throws YarnException,/ ]* b' J6 V* u5 i$ C
IOException {% J$ V: s8 V, B& p: J, f
RegisterNodeManagerRequestProto requestProto = ((RegisterNodeManagerRequestPBImpl)request).getProto();
Z. b; K6 w4 D& rtry {3 D' x, }4 H4 {. n8 q" q6 L
return new RegisterNodeManagerResponsePBImpl(proxy.registerNodeManager (null, requestProto));0 r i5 P4 ?& s; ~# v6 V: {
} catch (ServiceException e) { ]4 J, M, V7 l& e5 t. t2 T
RPCUtil.unwrapAndThrowException(e);( n9 L5 m1 m/ w% I) i
return null;. L( p$ `* Z: }" z
}" w; y5 _" H8 Y, m6 F* _
} ., ?/ y5 q; `2 w( j
..; L e9 J5 _' M5 j
}, R: T( E- q! D* n" I. ~/ e
服务端代码放在org.apache.hadoop.yarn.server.api.impl.pb.server包中, 且类名为Resource-TrackerPBServerImpl, 实现如下:
7 v: X' d' @1 L7 Gpublic class ResourceTrackerPBServiceImpl implements ResourceTrackerPB {9 ~1 u9 k8 h* s- C M
private ResourceTracker real;- R( T1 x/ i; d+ k
public ResourceTrackerPBServiceImpl(ResourceTracker impl) {2 t2 g& t8 R& b; X5 V& U
this.real = impl;; K( _) g& }# Q5 i$ |& v
} @- W( N/ F6 R9 c/ g
Override1 o( i8 ^& x7 H7 ^2 H" d7 F' K
public RegisterNodeManagerResponseProto registerNodeManager( q& z2 \6 P1 x/ i: J7 ], O/ f9 N
RpcController controller, RegisterNodeManagerRequestProto proto)
4 E: B0 D q4 m8 T: u+ A8 bthrows ServiceException {% F$ t! Z. p/ | U4 @8 J
RegisterNodeManagerRequestPBImpl request = new RegisterNodeManagerRequestPBImpl(proto);1 W+ D `- C4 s- R! H# M
try {
- ]8 Q7 X' ]5 Z) f' P. ^7 P$ wRegisterNodeManagerResponse response = real.registerNodeManager(request);6 r' x! X. J/ A7 L3 c: q1 D
return ((RegisterNodeManagerResponsePBImpl)response).getProto();$ \; y2 ?7 z- f. \2 y
} catch (YarnException e) {
( m2 ]: X% o2 x0 e& D( P! u, o0 othrow new ServiceException(e);
# i7 s, i2 |0 H+ @2 w# s: k} catch (IOException e) {
) @9 m# L& T& |throw new ServiceException(e);
' \: v6 y8 D8 \% o}, ~% H+ k4 m( t# w, q% S2 c
} .
/ Z: s% ~( C6 F1 L: p..0 v/ m" p- o+ {' |* j& c3 Y+ M
}; c" D# _* Y$ |6 c$ T
总结上面几个步骤, 为了实现基于Protocol Buffers序列化框架的YARN RPC通信协议ResourceTracker, YARN实现了一系列
V# c; m& w2 a& gJava接口定义和Protocol Buffers封装, 具体如图3-12所示( 以服务器端实现为例) 。
\. M9 M' J1 g; x: |图3-12 YARN RPC中的Protocol Buffers封装
) l7 B3 A; b c1 X, _2 L[6] 参见网址http://en.wikipedia.org/wiki/Remote_procedure_call。9 p7 M! Y6 C; Y+ |
[7] Doug Cutting在Hadoop最初设计时就是这样描述Hadoop RPC设计动机的。
# h4 @; R/ @8 }0 T) P[8] HDFS的单点故障已经在Hadoop 2.0中得到了解决, MRv1中的JobTracker的单点故障在CDH4中也得到了解决。. P4 u6 M9 x5 c# N8 q D$ ~- x
[9] 参见网址http://thrift.apache.org/。
7 r$ K. i j' m8 M[10] 参见网址: http://code.google.com/p/protobuf/wiki/ThirdParty/AddOns。
/ {+ u0 z1 j, @3 n[11] AvroRpcEngine从Hadoop 0.21.0版本开始出现。
* N% F- i6 x! o" Y$ Y D8 }5 h[12] ProtobufRpcEngine从Hadoop 2.0-apha版本开始出现。
) y( ^ D8 s3 O& ^% U T0 M[13] 参见网址https://issues.apache.org/jira/browse/HADOOP-7347。
/ J$ v5 l# l9 m[14] Hadoop 2.0中的RFC框架是采用Java编写的, 尚不能像Thrift和Avro那样支持多语言编程, 但引入Protocol Buffers序列化框架则
! i9 {7 i+ k1 W% k* G使其向前迈进了一步。 ( U9 H) S) G( o2 d
, j) D' W5 ]8 v! Q/ @- ^$ L. D! u i; K. I4 Q' A8 D; j- I
|
|