|
3.3 底层通信库
: C% d+ ]* b+ i2 _& r网络通信模块是分布式系统中最底层的模块, 它直接支撑了上层分布式环境下复杂的进程间通信( Inter-Process
2 j% l& ~( H4 N, G% e3 n# B! S: RCommunication, IPC) 逻辑, 是所有分布式系统的基础。 远程过程调用( Remote Procedure Call, RPC) 是一种常用的分布式网络
! t" l( a; c9 H# d+ H( Y6 e) V. y9 b通信协议, 它允许运行于一台计算机的程序调用另一台计算机的子程序, 同时将网络的通信细节隐藏起来, 使得用户无须额外地5 j* _9 u5 h+ O2 B0 `+ m3 [7 X
为这个交互作用编程。 由于RPC大大简化了分布式程序开发, 因此备受欢迎。
3 m8 t# a7 M( t s% L. [, x5 g作为一个分布式系统, Hadoop实现了自己的RPC通信协议, 它是上层多个分布式子系统( 如MapReduce、 YARN、 HDFS等). e/ c {0 ]8 s! G' a
公用的网络通信模块。 本节首先从框架设计及实现等方面介绍Hadoop RPC, 接着介绍RPC框架在Hadoop YARN中的应用。
6 n U9 a0 ^) s6 Q3.3.1 RPC通信模型
- o" d- O! M; |! nRPC是一种通过网络从远程计算机上请求服务, 但不需要了解底层网络技术的协议。 RPC 协议假定某些传输协议( 如TCP
7 c' B5 [$ [/ S7 R. e9 z9 T或UDP等) 已经存在, 并通过这些传输协议为通信程序之间传递访问请求或者应答信息。 在OSI 网络通信模型中, RPC 跨越了传. U1 j- u6 x9 d/ M, o: h
输层和应用层。 RPC 使得开发分布式应用程 序更加容易 [6] 。# n: d: D7 V: l- W, b! y$ q
RPC 通常采用客户机/服务器模型。 请求程序是一个客户机, 而服务提供程序则是一个服务器。 一个典型的RPC框架如图3-1- k& g% q# X, T* X1 o8 Y
所示, 主要包括以下几个部分:3 c$ [+ [4 C' t6 X0 P' _% ]
❑通信模块 。 两个相互协作的通信模块实现请求-应答协议, 它们在客户和服务器之间传递请求和应答消息, 一般不会对数
o. r; P, S2 F据包进行任何处理。 请求–应答协议的实现方式有同步方式和异步方式两种。. x! n& I4 V9 g _- v
如图3-1所示, 同步模式下客户端程序一直阻塞到服务器端发送的应答请求到达本地; 而异步模式不同, 客户端将请求发送
: Y! f8 L* Y4 N7 ?到服务器端后, 不必等待应答返回, 可以做其他事情, 待服务器端处理完请求后, 主动通知客户端。 在高并发应用场景中, 一般4 S9 C# s( {. Z: u0 M9 A5 _' g
采用异步模式以降低访问延迟和提高带宽利用率。5 v0 \0 M, h3 D- ]
图3-1 同步模式与异步模式对比
7 m5 U; R3 ? D/ h7 T. v. F' W❑Stub程序 。 客户端和服务器端均包含Stub程序, 可将之看做代理程序。 它使得远程函数调用表现得跟本地调用一样, 对用8 K* ~. W- t' ?6 G* R0 W& }
户程序完全透明。 在客户端, 它表现得就像一个本地程序, 但不直接执行本地调用, 而是将请求信息通过网络模块发送给服务器
8 s+ P! S7 L+ D2 @; s端。 此外, 当服务器发送应答后, 它会解码对应结果。 在服务器端, Stub程序依次进行解码请求消息中的参数、 调用相应的服务5 M+ \1 D: V: m
过程和编码应答结果的返回值等处理。9 ]& L5 C8 y: T: t, b! H; G
❑调度程序 。 调度程序接收来自通信模块的请求消息, 并根据其中的标识选择一个Stub程序进行处理。 通常客户端并发请求( J, g2 I* V9 _# C
量比较大时, 会采用线程池提高处理效率。% x2 V, H( }% T- k4 U9 n m4 D
❑客户程序/服务过程 。 请求的发出者和请求的处理者。 如果是单机环境, 客户程序可直接通过函数调用访问服务过程, 但
! A/ f6 B; L P2 L6 n在分布式环境下, 需要考虑网络通信, 这不得增加通信模块和Stub程序( 保证函数调用的透明性) 。* E" D8 T8 v" P: h
通常而言, 一个RPC请求从发送到获取处理结果, 所经历的步骤( 见图3-2) 下所示。) B& I! v H" P% h8 A4 m U$ G
1) 客户程序以本地方式调用系统产生的Stub程序;' G$ m2 X. k8 l+ }- L, }) k
2) 该Stub程序将函数调用信息按照网络通信模块的要求封装成消息包, 并交给通信模块发送到远程服务器端。
# p: b: r4 f3 v& o3) 远程服务器端接收此消息后, 将此消息发送给相应的Stub程序;7 m4 z @2 l& A1 a
4) Stub程序拆封消息, 形成被调过程要求的形式, 并调用对应函数;6 S, p* P i+ _ E
5) 被调用函数按照所获参数执行, 并将结果返回给Stub程序;
7 m# g' v$ b4 U/ r6) Stub程序将此结果封装成消息, 通过网络通信模块逐级地传送给客户程序。
2 C: q! @) M2 @: A图3-2 RPC通用架构+ H, G$ }! m: C1 E6 i! X
3.3.2 Hadoop RPC的特点概述
; }: r4 M( H9 C2 C* Y# m/ f2 ]# DRPC实际上是分布式计算中C/S( Client/Server) 模型的一个应用实例, 对于Hadoop RPC而言, 它具有以下几个特点。7 k' u- f8 ] k5 `4 ]: B+ P% b# @
❑透明性 。 这是所有RPC框架最根本的特点, 即当用户在一台计算机的程序调用另外一台计算机上的子程序时, 用户自身! B0 v" Y0 @- ^) C6 ?* V) o
不应感觉到其间涉及跨机器间的通信, 而是感觉像是在执行一个本地调用。
& D! a' e( I# P7 O3 | r❑高性能 。 Hadoop各个系统( 如HDFS、 YARN、 MapReduce等) 均采用了Master/Slave结构, 其中, Master实际上是一个RPC
4 J' v" g! e1 Z- b7 QServer, 它负责处理集群中所有Slave发送的服务请求, 为了保证Master的并发处理能力, RPC Server应是一个高性能服务器, 能够3 z, u$ O% E2 I! {4 p8 g- B7 n
高效地处理来自多个Client的并发RPC请求。
$ F+ |7 D9 M0 ^8 o. E❑可控性 。 JDK中已经自带了一个RPC框架—RMI( Remote Method Invocation, 远程方法调用) , 之所以不直接使用该框
) ?! H% u/ _% C3 e; Q( m( ~架, 主要是考虑到RPC是Hadoop最底层最核心的模块之一, 保证其轻量级、 高性能和可控性显得尤为重要, 而RMI重量级过大且
& e2 b0 c8 x+ n( l8 ]/ Z2 K用户可控之处太少( 如网络连接、 超时和缓冲等均 难以定制或者修改) [7] 。+ @6 ~7 I/ i) b4 E D; E
3.3.3 RPC总体架构
5 y- E! W7 A X6 @: u同其他RPC框架一样, Hadoop RPC主要分为四个部分, 分别是序列化层、 函数调用层、 网络传输层和服务器端处理框架,$ `' c& q. P1 {3 l0 N' n: L2 ?4 M
具体实现机制如下:
8 f" h. e6 g D❑序列化层 。 序列化主要作用是将结构化对象转为字节流以便于通过网络进行传输或写入持久存储, 在RPC框架中, 它主
1 o7 B6 r- u' c要用于将用户请求中的参数或者应答转化成字节流以便跨机器传输。 前面介绍的Protocol Buffers和Apache Avro均可用在序列化9 _0 D7 m8 _9 }0 c
层, Hadoop本身也提供了一套序列化框架, 一个类只要实现Writable接口即可支持对象序列化与反序列化。
/ ~+ d/ F) K3 z- v❑函数调用层 。 函数调用层主要功能是定位要调用的函数并执行该函数, Hadoop RPC采用了Java反射机制与动态代理实现
% ^5 \, N$ ]- r了函数调用。; H: {! W; [5 J& c, B+ r; H; G
❑网络传输层 。 网络传输层描述了Client与Server之间消息传输的方式, Hadoop RPC 采用了基于TCP/IP 的Socket 机制。) m6 F1 V, X& ?! |$ E! B
❑服务器端处理框架 。 服务器端处理框架可被抽象为网络I/O模型, 它描述了客户端与服务器端间信息交互方式, 它的设计
f6 n3 i- c9 n: y M* r* W直接决定着服务器端的并发处理能力, 常见的网络I/O模型有阻塞式I/O、 非阻塞式I/O、 事件驱动I/O等, 而Hadoop RPC采用了基
1 N: O; [+ a: s& z' u于Reactor设计模式的事件驱动I/O模型。
- U# R) z2 F2 ?6 u6 ZHadoop RPC总体架构如图3-3所示, 自下而上可分为两层, 第一层是一个基于Java NIO ( New I/O) 实现的客户机–服务器6 W: Y: O( |6 `7 s' w
( C/S) 通信模型。 其中, 客户端将用户的调用方法及其参数封装成请求包后发送到服务器端。 服务器端收到请求包后, 经解
" v( \0 V& F" t2 s" q/ @包、 调用函数、 打包结果等一系列操作后, 将结果返回给客户端。 为了增强Sever端的扩展性和并发处理能力, Hadoop RPC采用! P/ H' i7 ~# I; n" F; Q
了基于事件驱动的Reactor设计模式, 在具体实现时, 用到了JDK提供的各种功能包, 主要包括java.nio( NIO) 、1 y- w4 _7 u8 V8 `0 y
java.lang.reflect( 反射机制和动态代理) 、 java.net( 网络编程库) 等。 第二层是供更上层程序直接调用的RPC接口, 这些接口底层$ H2 a) E6 C+ [% X( \) n+ v
即为C/S通信模型。- {( n: r/ N2 g: D
图3-3 Hadoop RPC总体架构% C W" U! J9 y, M* V* [7 W
3.3.4 Hadoop RPC使用方法
8 _, W. Z$ r" G. Q. THadoop RPC对外主要提供了两种接口( 见类org.apache.hadoop.ipc.RPC) , 分别是:
+ s. H1 E$ [9 U* t3 O* A❑public static <T>ProtocolProxy <T>getProxy/waitForProxy(…): 构造一个客户端代理对象( 该对象实现了某个协议) , 用于向$ \5 f7 |. d0 H1 q# u
服务器发送RPC请求。. C5 {4 ]0 h! G; j/ B) ]( |+ K: d0 N Y
❑public static Server RPC.Builder (Configuration).build(): 为某个协议( 实际上是Java接口) 实例构造一个服务器对象, 用于处理$ k& i3 u3 ?9 o3 a
客户端发送的请求。6 O/ j( E+ V# ~
通常而言, 使用Hadoop RPC可分为以下4个步骤。
3 \, ^7 m! s: R. @+ |+ s: ?' M1.定义RPC协议8 F9 o' @4 A& M
RPC协议是客户端和服务器端之间的通信接口, 它定义了服务器端对外提供的服务接口。 如下所示, 我们定义一个
9 R B$ ]9 A3 p+ X) ]& xClientProtocol通信接口, 声明了echo()和add()两个方法。 需要注意的是, Hadoop中所有自定义RPC接口都需要继承
# @& n( d4 a/ q* SVersionedProtocol接口, 它描述了协议的版本信息。
5 n, ?( S9 @& {$ ]interface ClientProtocol extends org.apache.hadoop.ipc.VersionedProtocol {8 b7 Y0 d1 K6 {9 H; W% t
//版本号, 默认情况下, 不同版本号的RPC Client和Server之间不能相互通信. k" [( t( f9 C m8 b
public static final long versionID = 1L;
( C, U8 h% N o- n- sString echo(String value) throws IOException;
( |2 p! Z4 l' G, F( `! d& X6 `& Pint add(int v1, int v2) throws IOException;5 B, O& L7 D' ~* H [
}
( H/ n( Q, h1 g. ]! S2 A2.实现RPC协议5 V$ A! h9 b* K) G/ ]
Hadoop RPC协议通常是一个Java接口, 用户需要实现该接口。 对ClientProtocol接口进行简单的实现如下所示:/ s5 l+ I+ s" ]7 t7 |( s
public static class ClientProtocolImpl implements ClientProtocol {) Y/ _- c3 p1 o5 t8 A5 Z9 h
//重载的方法, 用于获取自定义的协议版本号,
- u, G1 O! l+ ?6 Kpublic long getProtocolVersion(String protocol, long clientVersion) {0 C3 ?+ c8 _0 n9 T2 M, u
return ClientProtocol.versionID;
2 y. G6 L- J/ k; B" s2 N* X4 x}7 l! ]$ L1 ~8 n: D1 c$ }
//重载的方法, 用于获取协议签名
. ?) U9 z+ L9 c9 P2 n( v' Q+ hpublic ProtocolSignature getProtocolSignature(String protocol, long clientVersion,5 n( g! K1 C! N, b; t$ C. H
inthashcode) {
8 _2 P' Y, } K" U% y( Ereturn new ProtocolSignature(ClientProtocol.versionID, null);) l; i$ Q6 x' O7 u
}p8 Q; v y- c# x$ }! L7 J
ublic String echo(String value) throws IOException {
2 \+ D2 x* S6 L" o! a# l7 Hreturn value;: w g; V, T! d$ K! q; `+ m
}
1 q) x' \9 V. a# ?$ `. v4 Ipublic int add(int v1, int v2) throws IOException {
( ]; [4 T9 n& I% ~5 _return v1 + v2;
S+ C! ]4 ]# Z! O0 A5 K}
% m; s N! s7 C4 _}/ c! L, j. V- u2 \0 i( j l
3.构造并启动RPC Server( b' S! M7 \ S5 h
直接使用静态类Builder构造一个RPC Server, 并调用函数start()启动该Server:, H0 f1 \$ q0 o& v, B. Y
Server server = new RPC.Builder(conf).setProtocol(ClientProtocol.class)
# u! s' m% Z4 y.setInstance(new ClientProtocolImpl()).setBindAddress(ADDRESS).setPort(0)& c& A2 O+ t( X) n" b2 i
.setNumHandlers(5).build();( k6 A+ [; b- q9 f/ E9 S
server.start();
Z8 T# p0 D k) h. F9 t; ^其中, BindAddress( 由函数setBindAddress设置) 和Port( 由函数setPort设置, 0表示由系统随机选择一个端口号) 分别表示服
' @$ q8 Z; T, A3 Z u务器的host和监听端口号, 而NnumHandlers( 由函数setNumHandlers设置) 表示服务器端处理请求的线程数目。 到此为止, 服务器
4 f G, o+ F- O. T0 E! Z处理监听状态, 等待客户端请求到达。
0 L8 ~+ k& K( o- J9 X9 k* Q! m4.构造RPC Client并发送RPC请求( A3 a$ a# T ]# R6 q( l
使用静态方法getProxy构造客户端代理对象, 直接通过代理对象调用远程端的方法, 具体如下所示:
) k. b! [. n7 {0 j* P* kproxy = (ClientProtocol)RPC.getProxy(
6 _1 R8 N: }: N! ?& qClientProtocol.class, ClientProtocol.versionID, addr, conf);4 I9 U! E& _4 R- R3 I
int result = proxy.add(5, 6);
2 U5 u' ^- F% H7 EString echoResult = proxy.echo("result");
, \2 @/ M' T( [0 B3 H经过以上四步, 我们便利用Hadoop RPC搭建了一个非常高效的客户机–服务器网络模型。 接下来, 我们将深入到Hadoop RPC
( N1 j" k; {; _% T' o# C X内部, 剖析它的设计原理及技巧。 : G; G8 l6 S5 n9 w6 z
! v. ?1 l' j8 O6 z) _; C `, Q+ a5 R! N, I: w5 H$ W5 ^# w) S4 ~6 t! c
|
|