java自学网VIP

Java自学网

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 2928|回复: 0

《深入解析YARN架构设计与实现原理》第3章 YARN基础库【3.3】part2

[复制链接]
  • TA的每日心情
    开心
    2021-5-25 00:00
  • 签到天数: 1917 天

    [LV.Master]出神入化

    2025

    主题

    3683

    帖子

    6万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    66319

    宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2017-4-17 13:56:45 | 显示全部楼层 |阅读模式
    3.3.5 Hadoop RPC类详解- Y4 C" S3 A1 z0 b/ G1 H' S  M
    Hadoop RPC主要由三个大类组成, 即RPCClientServer, 分别对应对外编程接口、 客户端实现和服务器实现。
    6 _- [! _, y( Y' G' \9 A3 f2 _
    1.ipc.RPC类分析
    5 v/ g, ?/ z5 d  e4 E) ]5 e
    RPC类实际上是对底层客户机服务器网络模型的封装, 以便为程序员提供一套更方便简洁的编程接口。& T" H  ?" i+ O! D9 E/ `
    如图
    3-4所示, RPC类定义了一系列构建和销毁RPC客户端的方法, 构建方法分为getProxywaitForProxy两类, 销毁方只有一
    3 T* i/ B* `2 b- H1 h& L! V6 y; g个, 即为
    stopProxyRPC服务器的构建则由静态内部类RPC.Builder, 该类提供了一些列setXxx方法( Xxx为某个参数名称) 供用户
    : ]. K4 _: C3 A& Q2 Y# l设置一些基本的参数, 比如
    RPC协议、 RPC协议实现对象、 服务器绑定地址、 端口号等, 一旦设置完成这些参数后, 可通过调用) P& {; e9 b  @/ N
    RPC.Builder.build()完成一个服务器对象的构建, 之后直接调用Server.start()方法便可以启动该服务器。
    ; M5 n- e$ K* h8 l, G8 D
    Hadoop 1.x中的RPC仅支持基于Writable序列化方式不同, Hadoop 2.x允许用户使用其他序列化框架, 比如Protocol Buffers3 I/ n, {4 M& w
    等, 目前提供了WritableWritableRpcEngine) 和Protocol BuffersProtobufRpcEngine) 两种, 默认实现是Writable方式, 用户可通过" j' }! c  F' O; U8 d2 S3 U' ?) M
    调用
    RPC.setProtocolEngine(…)修改采用的序列化方式。, C2 ]9 x, r* @+ x9 \0 S* c' x( u
    下面以采用
    Writable序列化为例( 采用Protocol Buffers的过程类似) , 介绍Hadoop RPC的远程过程调用流程。 Hadoop RPC使用
    . I2 V, s' e( e! r0 f
    Java动态代理完成对远程方法的调用: 用户只需实现java.lang.reflect.InvocationHandler接口, 并按照自己需求实现invoke 方法即可
    4 o; x" |3 H$ i+ ]5 i4 w完成动态代理类对象上的方法调用。 但对于
    Hadoop RPC, 函数调用由客户端发出, 并在服务器端执行并返回, 因此不能像单机/ d1 `- d! L# [; V6 k# ?
    程序那样直接在
    invoke 方法中本地调用相关函数, 它的做法是, 在invoke方法中, 将函数调用信息( 函数名, 函数参数列表等)
    , j' L3 E$ A' W0 l3 K* F/ M$ U打包成可序列化的
    WritableRpcEngine.Invocation对象, 并通过网络发送给服务器端, 服务端收到该调用信息后, 解析出和函数名,4 N: G8 G7 f! p! I, q, u" h
    函数参数列表等信息, 利用
    Java反射机制完成函数调用, 期间涉及到的类关系如下图所示。) Z6 x- r# k6 k; L; Z
    3-4 HadoopRPC的主要类关系图0 |* y" g8 h- J8 s1 h9 D
    3-5 HadoopRPC中服务器端动态代理实现类图" l5 m7 D; c) f% t1 J* i8 A! p
    2.ipc.Client% J3 r/ u3 Z6 \
    Client主要完成的功能是发送远程过程调用信息并接收执行结果。 它涉及到的类关系如图3-6所示。 Client类对外提供了一类执1 X( I, P6 W3 f8 a3 B! m4 \! m
    行远程调用的接口, 这些接口的名称一样, 仅仅是参数列表不同, 比如其中一个的声明如下所示:% ?/ s! w' t1 k% z" V* x/ ^. D
    public Writable call(Writable param, ConnectionIdremoteId)
    9 g9 X( e0 f6 Z  X( Ithrows InterruptedException, IOException;: v6 S" q' E+ g6 K$ l4 X
    3-6 Client类图
    * [+ @$ U% ?. C6 p
    Client内部有两个重要的内部类, 分别是CallConnection
    1 r9 U/ `# {6 W, ?
    ❑Call: 封装了一个RPC请求, 它包含5个成员变量, 分别是唯一标识id、 函数调用信息param、 函数执行返回值value、 出
    9 a; [5 b  e: B2 f; e0 D错或者异常信息
    error和执行完成标识符done。 由于Hadoop RPC Server采用异步方式处理客户端请求, 这使远程过程调用的发生顺5 d. D* X3 `! C) g0 V; V
    序与结果返回顺序无直接关系, 而
    Client端正是通过id识别不同的函数调用的。 当客户端向服务器端发送请求时, 只需填充id/ m# o' {0 g3 W& H4 Q
    param两个变量, 而剩下的3个变量( valueerrordone) 则由服务器端根据函数执行情况填充。
    ) }% I2 ^- `  h5 I2 A* D# k9 U8 A
    ❑ConnectionClient与每个Server之间维护一个通信连接, 与该连接相关的基本信息及操作被封装到Connection类中, 基本
    2 ?, v0 t$ h) C* G5 Z% G& W信息主要包括通信连接唯一标识(
    remoteId) 、 与Server端通信的Socketsocket) 、 网络输入数据流( in) 、 网络输出数据流6 L- r: G( v* K$ a) R: K
    out) 、 保存RPC请求的哈希表( calls) 等。 操作则包括:5 ]5 `' e+ b. O5 d& Z( [
    ❍addCall—将一个Call对象添加到哈希表中;8 g! `( B- ~6 ]$ p
    ❍sendParam—向服务器端发送RPC请求;' j6 y+ W$ P* u/ I7 j
    ❍receiveResponse —从服务器端接收已经处理完成的RPC请求;
    - M3 K7 q- W. B! H# [9 D, F( d* H
    ❍run—Connection是一个线程类, 它的run方法调用了receiveResponse方法, 会一直等待接收RPC返回结果。7 p4 Z+ d: ^  t- Y
    当调用
    call函数执行某个远程方法时, Client端需要进行( 如图3-7所示) 以下4个步骤。
    + s' J% h  l' ^$ x" C+ M+ p# }8 M
    1) 创建一个Connection对象, 并将远程方法调用信息封装成Call对象, 放到Connection对象中的哈希表中;
    . ~, H9 j+ I" ?! f
    2) 调用Connection类中的sendRpcRequest()方法将当前Call对象发送给Server端;4 ?# `- l) G* d3 p
    3Server端处理完RPC请求后, 将结果通过网络返回给Client端, Client端通过receiveRpcResponse()函数获取结果;( F: B! Z1 c/ q8 a3 Q* H
    4Client检查结果处理状态( 成功还是失败) , 并将对应Call对象从哈希表中删除。
    ! P0 L! `# l9 Z# {3 D- [* v
    3-7 Hadoop RPC Client处理流程
    & H8 L, w2 W+ f/ D2 t
    3.ipc.Server类分析7 G  j: }, B, j0 k# ^9 z
    Hadoop采用了Master/Slave结构, 其中Master是整个系统的单点, 如NameNodeJobTracker [8] , 这是制约系统性能和可扩展9 G# S) ^4 Z; L$ V7 x
    性的最关键因素之一; 而
    Master通过ipc.Server接收并处理所有Slave发送的请求, 这就要求ipc.Server 将高并发和可扩展性作为设计
    " _8 ^# x7 z- e8 k2 T. \  c目标。 为此,
    ipc.Server采用了很多提高并发处理能力的技术, 主要包括线程池、 事件驱动和Reactor设计模式等, 这些技术均采用
    # Y8 X5 t7 E1 x* O
    JDK自带的库实现, 这里重点分析它是如何利用Reactor设计模式提高整体性能的。
    / k4 B2 B% ]- W* m% W
    Reactor是并发编程中的一种基于事件驱动的设计模式, 它具有以下两个特点: 通过派发/分离I/O操作事件提高系统的并发性, j& r9 }/ D/ P3 h3 Q: J) }
    能; 提供了粗粒度的并发控制, 使用单线程实现, 避免了复杂的同步处理。 典型的
    Reactor实现原理如图3-8所示。' E0 R# L& h) J$ j! p) B" ?1 j
    3-8 Reactor模式工作原理
    2 W$ H% ^6 k% l: M典型的
    Reactor模式中主要包括以下几个角色。
    $ j! n9 E% ^& b3 b: T' P❑ReactorI/O事件的派发者。4 H" n5 r+ N" [6 ]$ J( q$ x3 P0 a
    ❑Acceptor接受来自Client的连接, 建立与Client对应的Handler, 并向Reactor注册此Handler) n* X3 k$ k6 [% Z8 i
    ❑Handler与一个Client通信的实体, 并按一定的过程实现业务的处理。 Handler内部往往会有更进一步的层次划分, 用来抽6 O- U- o! k. l4 F! P% A/ j8 ~7 V. e
    象诸如
    readdecodecomputeencodesend等过程。 在Reactor模式中, 业务逻辑被分散的I/O事件所打破, 所以Handler需要有适
    , W8 s1 x# X* E2 [- a1 j/ ^3 g当的机制在所需的信息还不全( 读到一半) 的时候保存上下文, 并在下一次
    I/O事件到来的时候( 另一半可读) 能继续上次中断) n8 E0 M1 Y$ X6 B" z
    的处理。
    . x9 Q  \7 h/ g# ]( H
    ❑Reader/Sender为了加速处理速度, Reactor模式往往构建一个存放数据处理线程的线程池, 这样数据读出后, 立即扔到线  m+ }9 b% k% V
    程池中等待后续处理即可。 为此,
    Reactor模式一般分离Handler中的读和写两个过程, 分别注册成单独的读事件和写事件, 并由对
    9 {4 R5 u9 o/ S2 m; t应的
    ReaderSender线程处理。$ G/ B3 H6 Y% f( _
    ip: Y0 h3 x; S; C4 y
    c.Server
    实际上实现了一个典型的Reactor设计模式, 其整体架构与上述完全一致。 一旦读者了解典型Reactor架构便可很容易
    5 Q8 Q, d9 ~- d地学习
    ipc.Server的设计思路及实现。 接下来, 我们分析ipc.Server的实现细节。9 G/ |8 }& @6 U* w& g: n
    前面提到,
    ipc.Server的主要功能是接收来自客户端的RPC请求, 经过调用相应的函数获取结果后, 返回给对应的客户端。 为' y5 k; [0 F; H
    此,
    ipc.Server被划分成3个阶段: 接收请求、 处理请求和返回结果, 如图3-9所示。 各阶段实现细节如下。9 `+ |9 E  `3 V, D' F1 @
    3-9 Hadoop RPC Server处理流程
    ; V6 \; l! `; G* H+ {. g
    1) 接收请求
    9 `0 t8 K8 b, |* o8 Q" `# @该阶段主要任务是接收来自各个客户端的
    RPC请求, 并将它们封装成固定的格式( Call类) 放到一个共享队列( callQueue
    ; R, `% @5 ~; ^$ [中, 以便进行后续处理。 该阶段内部又分为建立连接和接收请求两个子阶段, 分别由
    ListenerReader两种线程完成。
    7 B' ^3 \0 P* A# P- \整个
    Server只有一个Listener线程, 统一负责监听来自客户端的连接请求, 一旦有新的请求到达, 它会采用轮询的方式从线程
    & q4 |+ F5 a, f3 V2 p4 Y% E池中选择一个
    Reader线程进行处理, 而Reader线程可同时存在多个, 它们分别负责接收一部分客户端连接的RPC请求, 至于每个
    : p! b3 G3 H. b+ y% Q
    Reader线程负责哪些客户端连接, 完全由Listener决定, 当前Listener只是采用了简单的轮询分配机制。- i2 C1 R5 r" ?+ p# t$ k# V" P
    ListenerReader线程内部各自包含一个Selector对象, 分别用于监听SelectionKey.OP_ACCEPTSelectionKey.OP_READ事件。. P1 a3 h! a, K
    对于
    Listener线程, 主循环的实现体是监听是否有新的连接请求到达, 并采用轮询策略选择一个Reader线程处理新连接; 对于8 `2 |& Q' V' W' L3 S
    Reader线程, 主循环的实现体是监听( 它负责的那部分) 客户端连接中是否有新的RPC请求到达, 并将新的RPC请求封装成Call2 ?5 |2 R2 W8 Q  p- s& N
    象, 放到共享队列
    callQueue中。
    6 J% _' f; Y1 d' W* S9 _
    2) 处理请求
    . _" g% b) q  i. {% i. ~+ t. B该阶段主要任务是从共享队列
    callQueue中获取Call对象, 执行对应的函数调用, 并将结果返回给客户端, 这全部由Handler线# g! r- n; N. p& L. l
    程完成。
    $ }# _. L  d" q* ~
    Server端可同时存在多个Handler线程, 它们并行从共享队列中读取Call对象, 经执行对应的函数调用后, 将尝试着直接将结果
    ' Y& M) {- \+ a: l% P) r0 w返回给对应的客户端。 但考虑到某些函数调用返回结果很大或者网络速度过慢, 可能难以将结果一次性发送到客户端, 此时! V2 k% O' k: O' Y8 Y
    Handler将尝试着将后续发送任务交给Responder线程。( F/ s7 A( H% i1 m: J3 M' l2 c: E0 a: m
    3) 返回结果* j! A# Z! A) L3 b. R1 i3 x
    前面提到, 每个
    Handler线程执行完函数调用后, 会尝试着将执行结果返回给客户端, 但对于特殊情况, 比如函数调用返回结
    6 y. J, a# f) X% q. S7 f果过大或者网络异常情况( 网速过慢) , 会将发送任务交给
    Responder线程。: i3 Q( d4 i& _! h
    Server端仅存在一个Responder线程, 它的内部包含一个Selector对象, 用于监听SelectionKey.OP_WRITE事件。 当Handler没能将
    7 w6 N  K  \: ^: i结果一次性发送到客户端时, 会向该
    Selector对象注册SelectionKey.OP_WRITE事件, 进而由Responder线程采用异步方式继续发送
    / i6 D6 n# q; @' A% _未发送完成的结果。
    ' u, A1 C1 S* Y$ V7 ^
    3.3.6 Hadoop RPC参数调优
    & z$ z" n# Z4 n: h$ tHadoop RPC对外提供了一些可配置参数, 以便于用户根据业务需求和硬件环境对其进行调优。 主要的配置参数如下。1 [3 u  O" G' t6 c
    ❑Reader线程数目。 由参数ipc.server.read.threadpool.size配置, 默认是1, 也就是说, 默认情况下, 一个RPC Server只包含一个# R  [6 C: {6 k; V# J' U7 u
    Reader线程。# m+ \- g& t5 [5 |
    ❑每个Handler线程对应的最大Call数目。 由参数ipc.server.handler.queue.size指定, 默认是100, 也就是说, 默认情况下, 每个
    4 w: `: L( C( ?5 W4 m
    Handler线程对应的Call队列长度为100。 比如, 如果Handler数目为10, 则整个Call队列( 即共享队列callQueue) 最大长度为:
    ; F3 e8 Y  D( L* r! e% m. Q0 v
    100×10=10005 g1 K& C% [% {- [& a
    ❑Handler线程数目。 Hadoop中, ResourceManagerNameNode分别是YARNHDFS两个子系统中的RPC Server, 其对应的/ R1 f  M6 k/ u5 z- W7 Z. y/ k6 U
    Handler数目分别由参数yarn.resourcemanager.resource-tracker.client.thread-countdfs.namenode.service.handler.count指定, 默认值分别为
    8 J( l( e3 i) t
    5010, 当集群规模较大时, 这两个参数值会大大影响系统性能。
    9 o& _& Q: S; ]7 M
    ❑客户端最大重试次数。 在分布式环境下, 因网络故障或者其他原因迫使客户端重试连接是很常见的, 但尝试次数过多可* |6 |" V6 c+ x" l- E! O
    能不利于对实时性要求较高的应用。 客户端最大重试次数由参数
    ipc.client.connect.max.retries指定, 默认值为10, 也就是会连续尝试
    9 t3 r7 ~5 _' h4 b4 S
    10次( 每两次之间相隔1秒) 。* C& `' k" ?. Z2 O
    3.3.7 YARN RPC实现
    ; @" I9 p% I. [/ K" c+ J6 W当前存在非常多的开源RPC框架, 比较有名 的有Thrift [9] Protocol BuffersAvro。 同Hadoop RPC一样, 它们均由两部分组
    6 w8 r. A7 I- n- [+ v成: 对象序列化和远程过程调用(
    Protocol Buflers官方仅提供了序列化实现, 未提供远程调用相关实现, 但三 方RPC库非常多 [10]
    ) P" a& Y" m$ P9 c; C# F9 B) 。 相比于Hadoop RPC, 它们有以下几个特点:
    2 Z! A" W4 ^8 L8 K( q
    ❑跨语言特性 。 前面提到, RPC框架实际上是客户机服务器模型的一个应用实例, 对于Hadoop RPC而言, 由于Hadoop采用
    , V. E& H+ I% D# ~  o: Q) P
    Java语言编写, 因而其RPC客户端和服务器端仅支持Java语言; 但对于更通用的RPC框架, 如Thrift或者Protocol Buffers等, 其客户3 @) N( R7 q4 m4 I
    端和服务器端可采用任何语言编写, 如
    JavaC++Python等, 这给用户编程带来极大方便。0 ?% d9 c" r" p
    ❑引入IDL 。 开源RPC框架均提供了一套接口描述语言( Interface Description LanguageIDL) , 它提供一套通用的数据类型,
    , J( ?0 V0 b: Z: [% w并以这些数据类型来定义更为复杂的数据类型和对外服务接口。 一旦用户按照
    IDL定义的语法编写完接口文件后, 可根据实际应! i3 J; Y5 R1 G) Y
    用需要生成特定编程语言( 如
    JavaC++Python等) 的客户端和服务器端代码。/ |$ D- X0 i" E6 q$ h) p) f$ `- u, W
    ❑协议兼容性 。 开源RPC框架在设计上均考虑到了协议兼容性问题, 即当协议格式发生改变时, 比如某个类需要添加或者! G! Z' P& P6 m1 m2 j7 e: K
    删除一个成员变量( 字段) 后, 旧版本代码仍然能识别新格式的数据, 也就是说, 具有向后兼容性。8 g/ G( e# c, \2 R+ }/ K1 a' S( B
    随着
    Hadoop版本的不断演化, 研发人员发现Hadoop RPC在跨语言支持和协议兼容性两个方面存在不足, 具体表现为:
    6 S5 F9 l. x3 m4 e8 _' ^$ V; |9 ~0 _
    ❑从长远发展看, Hadoop RPC应允许某些协议的客户端或者服务器端采用其他语言实现, 比如用户希望直接使用C/C++语言9 M% N" }- A1 H) E
    读写
    HDFS中的文件, 这就需要有C/C++语言的HDFS客户端。
    * u# X: `$ b; T) _+ q
    ❑当前Hadoop版本较多, 而不同版本之间不能通信, 比如0.20.2版本的JobTracker不能与0.21.0版本中的TaskTracker通信, 如
    : `: }0 v) e6 H7 F% A- |1 t果用户企图这样做, 会抛出
    VersionMismatch异常。. k" n% B% k- j% u
    为了解决以上几个问题,
    Hadoop YARNRPC中的序列化部分剥离开, 以便将现有的开源RPC框架集成进来。 经过改进之: [0 a' o# f6 T: z2 ?/ b
    后,
    Hadoop RPC的类关系如图3-10所示, RPC类变成了一个工厂, 它将具体的RPC实现授权给RpcEngine实现类, 而现有的开源2 G7 a3 P7 Z. `' h$ a0 Z6 N% g. r
    RPC只要实现RpcEngine接口, 便可以集成到Hadoop RPC中。 在该图中, WritableRpcEngine是采用Hadoop自带的序列化框架实现的# g4 {& i2 G5 T
    RPC, 而 AvroRpcEngine [11] ProtobufRpcEngine [12] 分别是开源RPC( 或序列化) 框架Apache AvroProtocol Buffers对应的
    2 ^3 C9 d% v1 l! A: I8 T
    RpcEngine实现, 用户可通过配置参数rpc.engine.{protocol}以指定协议{protocol}采用的序列化方式。 需要注意的是, 当前实现+ ]1 W( K( [) e7 o
    中,
    Hadoop RPC只是采用了这些开源框架的序列化机制, 底层的函数调用机制仍采用Hadoop自带的。
    9 o6 f7 W, X( t' `+ ?3 j
    YARN提供的对外类是YarnRPC, 用户只需使用该类便可以构建一个基于Hadoop RPC且采用Protocol Buffers序列化框架的通信
    ! J4 s9 B' k7 y9 n2 h协议。
    YarnRPC相关实现类如图3-11所示。 YarnRPC是一个抽象类, 实际的实现由参数yarn.ipc.rpc.class指定, 默认值是3 _  x7 U9 M) ]& d: G: B
    org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPCHadoopYarnProtoRPC通过RPC工厂生成器( 工厂设计模式) RpcFactoryProvider
    / r7 d; _% A* ~) N0 s" _成客户端工厂( 由参数
    yarn.ipc.client.factory.class指定, 默认值是org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl) 和服- D, ~8 s# w  q. E1 N- [; T, s2 w
    务器工厂( 由参数
    yarn.ipc.server.factory.class指定, 默认值是org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl) , 以根8 S) z, \# p* M0 E7 N# p
    据通信协议的
    Protocol Buffers定义生成客户端对象和服务器对象。
    9 c1 H* n  M. H6 z. Q3-10 Hadoop RPC 集成多种开源RPC 框架
    ; W2 Y9 c- `7 A% t9 x
    3-11 YarnRPC 相关类图# T. G# Y1 d% c' ^: H) s0 ?0 m
    ❑RpcClientFactoryPBImpl根据通信协议接口( 实际上就是一个Java interface) 及Protocol Buffers定义构造RPC客户端句柄, 但
    + ^& h8 y# A# ]它对通信协议的存放位置和类名命有一定要求。 假设通信协议接口
    Xxx所在Java包名为XxxPackage, 则客户端实现代码必须位于2 w% S+ c% }( I2 A$ L# ]- m0 C6 h
    JavaXxxPackage.impl.pb.client中( 在接口包名后面增加".impl.pb.client") , 且实现类名为PBClientImplXxx( 在接口名前面增加前! f! u( S  \- Y
    "PBClientImpl") 。& C5 M% C5 P  {" y4 M* P3 L
    ❑RpcServerFactoryPBImpl根据通信协议接口( 实际上就是一个Java interface) 及Protocol Buffers定义构造RPC服务器句柄" ^4 y: F, R( l( q; n$ Q6 F
    (具体会调用前面节介绍的RPC.Server类) , 但它对通信协议的存放位置和类命名有一定要求。 假设通信协议接口Xxx所在Java
    9 Z( _: A+ g. ?  D7 p6 o2 l; N( Y0 j名为
    XxxPackage, 则客户端实现代码必须位于JavaXxxPackage.impl.pb.server中(在接口包名后面增加".impl.pb.server") , 且实现4 v  _  G+ x2 u- ]7 m% D, f/ d
    类名为
    PBServiceImplXxx(在接口名前面增加前缀"PBServiceImpl") 。
    : {+ ^7 u' j  S/ V  Z; A, n
    Hadoop YARN已将Protocol Buffers作为默认 的序列化机制 [13] (而不是Hadoop自带的Writable) , 这带来的好处主要表现在以
    6 [  a8 ?/ M  h+ V0 [  M下几个方面:
    - V3 E: {" u" _! i- Q% `
    ❑继承了Protocol Buffers的优势 Protocol Buffers已在实践中证明了其高效性、 可扩展性、 紧凑性和跨语言特性。 首先, 它允9 {6 ^3 ~, E' T
    许在保持向后兼容性的前提下修改协议, 比如为某个定义好的数据格式添加一个新的字段; 其次, 它支持多种语言, 进而方便用
    * k2 L# L  C, O户为某些服务(比如
    HDFSNameNode) 编写 非Java客户端 [14] ; 此外, 实验表明Protocol BuffersHadoop 自带的Writable在性能
    # C( u- w$ _3 G# D, J2 ~  z方面有很大提升。# ^) ^! s; R5 K5 k5 v0 m$ e
    ❑支持升级回滚 Hadoop 2.0已经将NameNode HA方案合并进来, 在该方案中, Name-Node分为ActiveStandby两种角色,' l& v9 K. r3 Y8 @; ^
    其中,
    Active NameNode在当前对外提供服务, 而Standby NameNode则是能够在Active NameNode出现故障时接替它。 采用Protocol
    1 D' `$ i- s4 B4 @+ zBuffers
    序列化机制后, 管理员能够在不停止NameNode对外服务的前提下, 通过主备NameNode之间的切换, 依次对主备
    ' F2 n( i" p1 T" _$ t; O8 L
    NameNode进行在线升级(不用考虑版本和协议兼容性等问题) 。3 e* f0 D! d. V3 ?. Q
    3.3.8 YARN RPC应用实例; i4 `; q. w* I$ ~: ]  y" ?7 c
    为了进一步说明YARN RPC的使用方法, 本小节给出一个具体的应用实例。
    2 O9 ]2 R- Y; P- V; c6 L
    YARN中, ResourceManagerNodeManager之间的通信协议是ResourceTracker, 其中NodeManager是该协议的客户5 J  H+ E" s7 I
    端,
    ResourceManager是服务端, NodeManager通过该协议中定义的两个RPC函数( registerNodeManagernodeHeartbeat) 向
    # O  D8 G3 C) e1 p/ V
    ResourceManager注册和周期性发送心跳信息。 ResourceManager(服务器端) 中的相关代码如下:& D( s- n3 N% Q  Y" v
    // ResourceTrackerService实现了ResourceTracker通信接口, 并启动RPC Server
    2 c% `  Z0 `( N. spublic class ResourceTrackerService extends AbstractService implements( T7 C: n; O" z% R  C3 c( a
    ResourceTracker {
    3 p* Z& _1 ~0 N5 aprivate Server server;" g" L. ^: {3 m3 }
    ...: \- K! V8 \9 i4 ?
    protected void serviceStart() throws Exception {! B; n# M8 T1 ^
    super.serviceStart();
    5 p: N  l6 l1 D, X( D8 h% zConfiguration conf = getConfig();
    1 n8 x4 A, k. y& G( FYarnRPC rpc = YarnRPC.create(conf); //使用YarnRPC
    7 u7 @/ x; |; ~8 L4 x
    this.server = rpc.getServer(ResourceTracker.class, this, resourceTrackerAddress,
    " r3 |6 I. ?% j/ b# Hconf, null, conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT,
    8 N. a7 A& H( E2 [0 hYarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT));
    & G% _# j" c: N) p7 S0 cthis.server.start();! W+ N! ]! X! g* B
    }.
      n6 K: B- m# Z1 m* Y; o+ O4 c..
    $ f7 P! t0 [* S3 t& o@Override$ v1 {0 A7 W* j' n* ^$ [( ^5 V
    public RegisterNodeManagerResponse registerNodeManager() R: X$ h% @+ c# k
    RegisterNodeManagerRequest request) throws YarnException,8 l* ], I  R( A7 X4 K6 I( n
    IOException {' k" N2 B7 C) j5 @6 ~8 {
    //具体实现. `; Z3 ]' x* H( ~
    }@1 J' ?1 r; m. x
    Override
    8 T6 E& M8 {- T2 V& S3 Xpublic NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)! B; r. {! F% h* n" p) m6 G
    throws YarnException, IOException {) T. [! P' i0 [1 q
    //具体实现6 |* {( x* x, I7 d# P
    }
    . x, X  J# g/ |; d' Z' w}
    8 n& G( D: [" b% {. rNodeManager(客户端) 中的相关代码如下。& [. d! K6 V+ d
    // 该函数是从YARN源代码中简单修改而来的
    : o. h' \9 A8 H  L. D! v0 l
    protected ResourceTracker getRMClient() throws IOException {2 `. v3 h! k* @7 n) T  P
    Configuration conf = getConfig();
      M) S: F. c5 rInetSocketAddress rmAddress = getRMAddress(conf, protocol);
    ! ?, p" F8 g* Y! @% ?+ IRetryPolicy retryPolicy = createRetryPolicy(conf);% |# m: l' u3 D4 h! j
    ResourceTracker proxy = RMProxy.<T>getProxy(conf, ResourceTracker.class, rmAddress);
    * ~- Z0 e3 x% L  qLOG.info("Connecting to ResourceManager at " + rmAddress);2 V$ I4 }$ ?, n: ~5 H
    return (ResourceTracker) RetryProxy.create(protocol, proxy, retryPolicy);
    3 e9 O8 K, {9 v" B/ ~$ Y}.
    % Q6 m4 L2 e% v1 G5 \  u9 m5 g9 Z/ l..
    9 \3 U- b3 T0 I  y) ^: ^( K7 m# xthis.resourceTracker = getRMClient();; c  l! j3 ~1 Q6 O# E: {+ I) }
    ...+ ]8 b2 x4 f- v# j' a+ _
    RegisterNodeManagerResponse regNMResponse = resourceTracker.registerNodeManager(request);9 q' Q6 q7 f4 Y; o' s8 W" o4 Z
    ...
    & f* i" I% l5 ^& ^response = resourceTracker.nodeHeartbeat(request);$ ]0 b( k' c3 g5 D/ M, T# v5 X
    为了能够让以上代码正常工作, YARN按照以下流程实现各种功能。8 t. l* w' K$ H' `+ A5 a
    步骤1 定义通信协议接口( Java Interface) 。 定义通信协议接口ResourceTracker, 它包含registerNodeManagernodeHeartbeat9 h, t: N$ \' e% h+ o: G9 a5 t
    两个函数, 且每个函数包含一个参数和一个返回值, 具体如下:5 b+ L+ n& |$ z8 ]* W
    public interface ResourceTracker {
    / x( s+ B4 g$ k5 T: Wpublic RegisterNodeManagerResponse registerNodeManager(
    & }8 v2 M7 v; k; k+ \RegisterNodeManagerRequest request) throws YarnException, IOException;  K* `( Y! j+ ^" q7 {
    public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
    % g; D7 X. [& f2 fthrows YarnException, IOException;$ g7 b; F+ S& A  z  I0 j
    }
    9 k, s! O: |+ V步骤2 为通信协议ResourceTracker提供Protocol Buffers定义和Java实现。 前面提到, Protocol Buffers仅提供了序列化框架, 但" {+ o& ?& Z, q: x3 x
    未提供
    RPC实现, 因此RPC部分需要由用户自己实现, 而YARN则让ResourceTrackerService类实现了ResourceTracker协议, 它的5 V3 M- t+ |3 W# V1 a- c' z
    Protocol Buffers定义( 具体见文件ResourceTracker.proto) 如下:6 u3 Z  m( t6 j3 ^% n$ T" U
    option java_package = "org.apache.hadoop.yarn.proto";
    # b& q6 t2 F' k' Q' `& ^5 Z9 o% k  n' toption java_outer_classname = "ResourceTracker";/ p) S/ p( K' k! ?
    option java_generic_services = true;
    # q* a# _; X+ Q/ Q' ?4 Yoption java_generate_equals_and_hash = true;
    : S! A/ Y) ~- N( rimport "yarn_server_common_service_protos.proto";
    / H* r) Z4 m6 O' Cservice ResourceTrackerService {# R0 V4 Q8 K: _
    rpc registerNodeManager(RegisterNodeManagerRequestProto) returns (RegisterNodeManagerResponseProto);: m$ \# o" G  e% y( Y
    rpc nodeHeartbeat(NodeHeartbeatRequestProto) returns (NodeHeartbeatResponseProto);
    % F) E  Z: |! k7 J( r2 p}
    3 _& _: ]( ]: [% H: [7 G2 wResourceTrackerRPC函数实现是由ResourceManager中的ResourceTrackerService完成的。
    1 O9 |, y7 H9 b0 l0 `/ S步骤
    3 RPC函数的参数和返回值提供Protocol Buffers定义。 YARN需要保证每个RPC函数的参数和返回值是采用Protocol% o- h+ @( @. x- G
    Buffers
    定义的, 因此ResourceTracker协议中RegisterNodeManagerRequestRegisterNodeManagerResponseNodeHeartbeatRequest
    : C# F2 ^7 c' {( T- x* E; a, O. D) j
    NodeHeartbeatResponse四个参数或者返回值需要使用Protocol Buffers定义, 具体如下( 见yarn_server_common_service_protos.proto9 O% w8 l5 p: m: p/ d6 I1 Y
    件) :# \" c- ^: R& r7 g0 H/ B  s, j
    import "yarn_protos.proto";6 d& _" y9 L7 W# o
    import "yarn_server_common_protos.proto";
    " [/ i* I% b5 W5 T9 K6 `) @7 k  t( Gmessage RegisterNodeManagerRequestProto {
      ~: @" ^+ G# s1 k7 u# b- Voptional NodeIdProto node_id = 1;
    * ^' M3 Z* T5 N2 poptional int32 http_port = 3;
    6 L; p( ~9 x+ l8 R$ Goptional ResourceProto resource = 4;
    * J7 j$ y) @( L7 E( V* r} m
    2 o+ D0 q2 u: h6 O* Ressage RegisterNodeManagerResponseProto {
    6 `7 s0 Y, E) G" s1 m" n. z$ zoptional MasterKeyProto container_token_master_key = 1;9 K& ~& q3 c$ N7 C* H3 H: @
    optional MasterKeyProto nm_token_master_key = 2;7 M, p: l& \. \& m: u
    optional NodeActionProto nodeAction = 3;
    4 T4 c+ t6 @4 X2 d# V' \( Ioptional int64 rm_identifier = 4;' N/ e# K, Q4 s
    optional string diagnostics_message = 5;5 P! W% h  L6 M4 s7 X- N
    }.4 b% l" U0 _* A, k
    .. //其他几个参数和返回值的定义
    1 e) W& ^. g/ A5 H. ]# H. `
    步骤4 RPC函数的参数和返回值提供Java定义和封装。 YARN采用了Protocol Buffers作为参数和返回值的序列化框架, 且以
    5 _5 r* Y+ _, q" S; f原生态
    .proto文件的方式给出了定义, 而具体的Java代码生成需在代码编写之后完成。 基于以上考虑, 为了更容易使用Protocol  C4 |- G& G1 T! K2 {: U8 b1 y+ Y
    Buffers
    生成的( Java语言) 参数和返回值定义, YARN RPC为每个RPC函数的参数和返回值提供Java定义和封装, 以参数/ m- k* G; T4 f  M: a6 R/ d  A! B/ I
    RegisterNodeManagerRequest为例进行说明。
      h( g/ k9 @6 x: L1 k
    Java接口定义如下( 见Javaorg.apache.hadoop.yarn.server.api.protocolrecords) :8 N, Q# x' u6 C* x
    public interface RegisterNodeManagerRequest {2 x7 _5 ~. u' y- V6 R1 }4 i
    NodeId getNodeId();# P' q8 ^: N; p+ o  ~, u
    int getHttpPort();. d4 u" m  [" t  d  o' O( ~7 q
    Resource getResource();7 k0 u7 @2 u# d! j* z( O( u, \
    void setNodeId(NodeId nodeId);
    ' w0 |* ?9 ^5 x* ^( j# ^# ^, zvoid setHttpPort(int port);3 k# I& q- K( ~$ d% M0 |
    void setResource(Resource resource);
    7 b* L9 k1 e8 o6 ~' ^0 D}. U% o$ [  f6 W& f: {3 _9 s& a" G
    Java封装如下( 见Javaorg.apache.hadoop.yarn.server.api.protocolrecords.impl.pb) :& Z! |8 ~; F8 d
    public class RegisterNodeManagerRequestPBImpl extends/ F+ @9 z1 r4 }5 g
    ProtoBase<RegisterNodeManagerRequestProto> implements RegisterNodeManagerRequest {2 I% S; _+ ]0 v
    RegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance();
    : v# Q5 a6 k$ d* `/ qRegisterNodeManagerRequestProto.Builder builder = null;; N( j( n2 g( v* s6 f! s
    private NodeId nodeId = null;
      L$ ?7 s4 r. D3 v/ v$ Y# H...% }1 x0 ~% `8 m7 F5 m2 ^* y! f
    @Override
    & i$ a. z9 F4 k7 jpublic NodeId getNodeId() {' u+ F: J8 p5 Y& G
    RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;; d' ?4 N6 `7 q
    if (this.nodeId != null) {
    8 v5 S, `% Q, h) ^return this.nodeId;9 k7 M7 u) s6 v0 E3 g4 T  ^1 z: i
    }i
    + Y$ a# F: f$ D5 u0 Ef (!p.hasNodeId()) {
    2 M0 i; j- s& {; c1 n& @% v, M2 \return null;9 z, F& y( ^% {$ Q0 B7 Y8 M
    } t
    $ b! Q- e6 k, `/ l. V+ g3 E4 B8 khis.nodeId = convertFromProtoFormat(p.getNodeId());
    # m% `5 ~6 |$ t- E- jreturn this.nodeId;- g6 C0 B* L2 H5 U- V$ t# S* {
    } @
    - F. ^, L# a: I: C: kOverride% T% r6 f+ W9 x7 M3 X1 d: [+ D
    public void setNodeId(NodeId nodeId) {4 V+ q& |7 S' I+ j5 o
    maybeInitBuilder();
    ; @5 p5 n2 \2 ?9 wif (nodeId == null)
    % R  A( p7 O; o% R- L$ _/ U# ?builder.clearNodeId();& O' W1 s3 u1 h2 Q- P6 k3 w" k( S' G
    this.nodeId = nodeId;6 l3 k! L6 _; J0 n
    } .5 z6 h. b- C$ I5 ~  b
    ..
    2 }4 _! C& ]5 I7 [5 x3 k}
    3 E. p' h' m% M. t& `步骤5 为通信协议提供客户端和服务器端实现。 客户端代码放在org.apache.hadoop.yarn.server.api.impl.pb.client包中, 且类名为1 n0 V, F8 s; J, [, L
    ResourceTrackerPBClientImpl, 实现如下:/ r" a. [+ g' `- }$ q
    public class ResourceTrackerPBClientImpl implements ResourceTracker, Closeable {8 F) T, t  p6 k5 b2 d
    private ResourceTrackerPB proxy;6 ~! ~& u2 |. y- n. D6 m8 _4 T
    public ResourceTrackerPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException {- G; l& j: I% i: M; ?1 O( w
    RPC.setProtocolEngine(conf, ResourceTrackerPB.class, ProtobufRpcEngine.class);2 A9 O% ?& [' f; x: d% P
    proxy = (ResourceTrackerPB)RPC.getProxy(- C, n; r7 D0 r9 V# k0 g2 ^
    ResourceTrackerPB.class, clientVersion, addr, conf);2 O# b5 l3 Y& {/ @/ ^1 ~+ V
    } @* \: V* h/ W: o5 P( k
    Override) b) F' x3 M9 b7 `
    public RegisterNodeManagerResponse registerNodeManager(/ }) A9 r+ H7 W8 p0 S+ f
    RegisterNodeManagerRequest request) throws YarnException," @' R! Z0 K4 M# s# \
    IOException {+ M% F) ~4 `9 h5 N
    RegisterNodeManagerRequestProto requestProto = ((RegisterNodeManagerRequestPBImpl)request).getProto();2 V( i: p) y3 q/ |  X! B( S" B
    try {" ]" f5 s! m( k0 e
    return new RegisterNodeManagerResponsePBImpl(proxy.registerNodeManager (null, requestProto));
    5 \2 l) v: R2 R} catch (ServiceException e) {+ c! s0 x* z% A
    RPCUtil.unwrapAndThrowException(e);& V3 W: d( H/ ]* {0 v6 {
    return null;. W$ z( ^4 v2 `/ e9 J
    }' o8 ^, M6 f5 I: @- c
    } .
    ' l" }; D9 g7 W1 O; Z. ]; s..
    1 w* ^! i+ W) z& f}; d; Z# p& S- i; g! p
    服务端代码放在org.apache.hadoop.yarn.server.api.impl.pb.server包中, 且类名为Resource-TrackerPBServerImpl, 实现如下:
    ( T* s  i4 g3 i' s; F( |7 x
    public class ResourceTrackerPBServiceImpl implements ResourceTrackerPB {! N; x0 u6 O  b/ j: x
    private ResourceTracker real;
    ! N2 {, S; v8 S% L, ~public ResourceTrackerPBServiceImpl(ResourceTracker impl) {* K( j+ A' f8 E2 N: Q7 d% o+ R; Z! V. C
    this.real = impl;: K) I# f9 \. I# s0 x
    } @# p* B1 t% t1 K1 B) k
    Override% ^5 b3 R8 r& A5 U% J8 D
    public RegisterNodeManagerResponseProto registerNodeManager(
    5 f# w1 y% L1 W  Y+ cRpcController controller, RegisterNodeManagerRequestProto proto)
    + s) r# }+ H) b8 O- H! M. Lthrows ServiceException {: p' l3 P" y9 Q8 ^! f& G# f# o( ^
    RegisterNodeManagerRequestPBImpl request = new RegisterNodeManagerRequestPBImpl(proto);3 j6 ]5 E4 n( ]5 L1 s2 W
    try {
    ; \. P5 G' }8 ^5 y1 CRegisterNodeManagerResponse response = real.registerNodeManager(request);
    0 H9 w9 z4 }4 f' Z  oreturn ((RegisterNodeManagerResponsePBImpl)response).getProto();) X* R+ s- O8 ?4 q& b5 j
    } catch (YarnException e) {/ L& T: J1 Q! R9 t2 I
    throw new ServiceException(e);& `5 x9 I6 y2 l. ]+ G! _. L" V6 U6 }- |
    } catch (IOException e) {
    2 P" H0 p- U( `: m$ dthrow new ServiceException(e);
    6 f4 A; Q/ b  S! o) y8 k}7 O( M' h5 O/ t7 i9 m) ?
    } .
    , z; S7 u* @3 Q) ^( o..
    5 ]5 s% j1 A( H& ?8 x}
      N, D' l" p1 }9 x$ K1 Z4 b. @5 D- E总结上面几个步骤, 为了实现基于Protocol Buffers序列化框架的YARN RPC通信协议ResourceTrackerYARN实现了一系列$ \5 t( \) G8 f% w
    Java接口定义和Protocol Buffers封装, 具体如图3-12所示( 以服务器端实现为例) 。
    - N8 A. g9 ?# u& D3-12 YARN RPC中的Protocol Buffers封装
      V. G. s9 {8 {! z4 t& R' y/ C
    [6] 参见网址http://en.wikipedia.org/wiki/Remote_procedure_call
    6 W, k( ?4 E) E. J9 ?: w0 h4 n
    [7] Doug CuttingHadoop最初设计时就是这样描述Hadoop RPC设计动机的。
    1 @/ h& M* C+ Q$ z
    [8] HDFS的单点故障已经在Hadoop 2.0中得到了解决, MRv1中的JobTracker的单点故障在CDH4中也得到了解决。
    * d  ]5 j( Q; \# ~
    [9] 参见网址http://thrift.apache.org/3 D6 L  x2 A0 v
    [10] 参见网址: http://code.google.com/p/protobuf/wiki/ThirdParty/AddOns
    8 R1 ^; ?+ P+ l, f/ I/ O8 j, d6 f
    [11] AvroRpcEngineHadoop 0.21.0版本开始出现。
    8 Z  @4 n' E8 B: m( A
    [12] ProtobufRpcEngineHadoop 2.0-apha版本开始出现。' N+ L- e9 m' p" K
    [13] 参见网址https://issues.apache.org/jira/browse/HADOOP-7347/ p) h7 @' N- l0 ^
    [14] Hadoop 2.0中的RFC框架是采用Java编写的, 尚不能像ThriftAvro那样支持多语言编程, 但引入Protocol Buffers序列化框架则! u% `: t5 {$ M2 q
    使其向前迈进了一步。
      
    9 C3 w3 y6 c0 ~, q4 c- t3 a5 m) g) A; s* `' ]
    # I4 M1 ^4 t" C. f
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|Archiver|手机版|小黑屋|Java自学网

    GMT+8, 2024-10-18 13:53 , Processed in 0.117483 second(s), 34 queries .

    Powered by Javazx

    Copyright © 2012-2022, Javazx Cloud.

    快速回复 返回顶部 返回列表