java自学网VIP

Java自学网

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 3032|回复: 0

《深入解析YARN架构设计与实现原理》第3章 YARN基础库【3.3】part2

[复制链接]
  • TA的每日心情
    开心
    2021-5-25 00:00
  • 签到天数: 1917 天

    [LV.Master]出神入化

    2039

    主题

    3697

    帖子

    6万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    66471

    宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2017-4-17 13:56:45 | 显示全部楼层 |阅读模式
    3.3.5 Hadoop RPC类详解
    . v, Q6 A& i4 u  m. GHadoop RPC主要由三个大类组成, 即RPCClientServer, 分别对应对外编程接口、 客户端实现和服务器实现。% }. M: P, k3 M. F3 u: b
    1.ipc.RPC类分析
    ' [. _% X+ y7 W- T0 |9 N8 t
    RPC类实际上是对底层客户机服务器网络模型的封装, 以便为程序员提供一套更方便简洁的编程接口。
    ; O- o, c- D# v$ C如图
    3-4所示, RPC类定义了一系列构建和销毁RPC客户端的方法, 构建方法分为getProxywaitForProxy两类, 销毁方只有一
    - t% W6 K5 V6 m6 \; u# J( O& b8 D个, 即为
    stopProxyRPC服务器的构建则由静态内部类RPC.Builder, 该类提供了一些列setXxx方法( Xxx为某个参数名称) 供用户
    $ B7 U7 m8 Z4 a! p# N* p# `6 |设置一些基本的参数, 比如
    RPC协议、 RPC协议实现对象、 服务器绑定地址、 端口号等, 一旦设置完成这些参数后, 可通过调用
      e1 d& P' L' X1 z/ m) w  q
    RPC.Builder.build()完成一个服务器对象的构建, 之后直接调用Server.start()方法便可以启动该服务器。
    ( l( T$ g9 k4 W2 e1 o0 a- j
    Hadoop 1.x中的RPC仅支持基于Writable序列化方式不同, Hadoop 2.x允许用户使用其他序列化框架, 比如Protocol Buffers
    + L  r* p7 g6 O: h: R' F
    等, 目前提供了WritableWritableRpcEngine) 和Protocol BuffersProtobufRpcEngine) 两种, 默认实现是Writable方式, 用户可通过7 L" r2 M5 z) s- G( d' k$ v
    调用
    RPC.setProtocolEngine(…)修改采用的序列化方式。
    7 p; k1 f4 e; X* ~下面以采用
    Writable序列化为例( 采用Protocol Buffers的过程类似) , 介绍Hadoop RPC的远程过程调用流程。 Hadoop RPC使用' y1 d9 l. S  I! d$ W' ?! @7 z9 V
    Java动态代理完成对远程方法的调用: 用户只需实现java.lang.reflect.InvocationHandler接口, 并按照自己需求实现invoke 方法即可7 C* e+ v! @. j9 r5 B3 L5 J4 G
    完成动态代理类对象上的方法调用。 但对于
    Hadoop RPC, 函数调用由客户端发出, 并在服务器端执行并返回, 因此不能像单机
    0 p/ b& t4 K3 n0 `* v3 [, ?$ c程序那样直接在
    invoke 方法中本地调用相关函数, 它的做法是, 在invoke方法中, 将函数调用信息( 函数名, 函数参数列表等)
    ' {' E6 o4 c" ?1 j  |* M; a# x打包成可序列化的
    WritableRpcEngine.Invocation对象, 并通过网络发送给服务器端, 服务端收到该调用信息后, 解析出和函数名,! Z* K. A. F7 I# W. e
    函数参数列表等信息, 利用
    Java反射机制完成函数调用, 期间涉及到的类关系如下图所示。( K5 O2 R; [; s; v4 ^( i5 O
    3-4 HadoopRPC的主要类关系图+ L; G) E9 ?0 \& I, V( p) g& e
    3-5 HadoopRPC中服务器端动态代理实现类图  Q" h* A! a0 g4 P: P5 G  k8 O9 B
    2.ipc.Client
    * G7 ^3 ~8 `% a( R- m- h  I
    Client主要完成的功能是发送远程过程调用信息并接收执行结果。 它涉及到的类关系如图3-6所示。 Client类对外提供了一类执
    . H4 i$ y) k8 r5 m' z1 `+ s9 @行远程调用的接口, 这些接口的名称一样, 仅仅是参数列表不同, 比如其中一个的声明如下所示:
    6 E2 Y9 I" b, {  u0 N. ^# T+ x
    public Writable call(Writable param, ConnectionIdremoteId)
    & F! _, T' l3 s9 M1 m( T1 nthrows InterruptedException, IOException;( y* P) K: N; f8 L% \, a8 F
    3-6 Client类图
    % b" t) H2 I1 R/ l( c8 U2 B/ q8 i: W" c
    Client内部有两个重要的内部类, 分别是CallConnection$ w$ N9 Z6 p* V4 _
    ❑Call: 封装了一个RPC请求, 它包含5个成员变量, 分别是唯一标识id、 函数调用信息param、 函数执行返回值value、 出
    / H2 O# |& R0 L1 N2 D! `1 o  }! J错或者异常信息
    error和执行完成标识符done。 由于Hadoop RPC Server采用异步方式处理客户端请求, 这使远程过程调用的发生顺
    + x9 W$ U0 ?4 G. S! C序与结果返回顺序无直接关系, 而
    Client端正是通过id识别不同的函数调用的。 当客户端向服务器端发送请求时, 只需填充id
    ( @& }5 K0 g0 {' z0 G) R0 B; u- L  z" K9 X
    param两个变量, 而剩下的3个变量( valueerrordone) 则由服务器端根据函数执行情况填充。' A9 Z5 J" ~9 ?, n. O. K
    ❑ConnectionClient与每个Server之间维护一个通信连接, 与该连接相关的基本信息及操作被封装到Connection类中, 基本
    ; Q9 R. m! g- M- I! L' v信息主要包括通信连接唯一标识(
    remoteId) 、 与Server端通信的Socketsocket) 、 网络输入数据流( in) 、 网络输出数据流, j. x& S, V: ^; i, T
    out) 、 保存RPC请求的哈希表( calls) 等。 操作则包括:
    2 V2 J- m0 T0 ?, k, S* U! H
    ❍addCall—将一个Call对象添加到哈希表中;; f' P5 Y* e7 J  T! l
    ❍sendParam—向服务器端发送RPC请求;
    3 v/ x% m- @, N; ^& N
    ❍receiveResponse —从服务器端接收已经处理完成的RPC请求;# S& [$ ^5 w7 L  b/ Z4 d. @
    ❍run—Connection是一个线程类, 它的run方法调用了receiveResponse方法, 会一直等待接收RPC返回结果。
    ! D. C, O8 n. \当调用
    call函数执行某个远程方法时, Client端需要进行( 如图3-7所示) 以下4个步骤。" z; A! ^; D/ T. [7 i3 L1 m
    1) 创建一个Connection对象, 并将远程方法调用信息封装成Call对象, 放到Connection对象中的哈希表中;1 s8 w2 C) y: z8 G3 S# n
    2) 调用Connection类中的sendRpcRequest()方法将当前Call对象发送给Server端;. }( L+ I) P! H
    3Server端处理完RPC请求后, 将结果通过网络返回给Client端, Client端通过receiveRpcResponse()函数获取结果;6 I* L# G% f, j9 F
    4Client检查结果处理状态( 成功还是失败) , 并将对应Call对象从哈希表中删除。, }1 t+ [: \: o$ s! D
    3-7 Hadoop RPC Client处理流程
    $ |7 c' K8 r# P# T6 W5 ~' j
    3.ipc.Server类分析
    , [" l/ `9 g' x+ S5 v
    Hadoop采用了Master/Slave结构, 其中Master是整个系统的单点, 如NameNodeJobTracker [8] , 这是制约系统性能和可扩展' f5 G$ I! l# Q9 F9 E) r
    性的最关键因素之一; 而
    Master通过ipc.Server接收并处理所有Slave发送的请求, 这就要求ipc.Server 将高并发和可扩展性作为设计! [9 J- C/ s8 s' ]- M+ o
    目标。 为此,
    ipc.Server采用了很多提高并发处理能力的技术, 主要包括线程池、 事件驱动和Reactor设计模式等, 这些技术均采用
    - _. k+ \, K: X& k2 m+ u% ]( v6 l" R
    JDK自带的库实现, 这里重点分析它是如何利用Reactor设计模式提高整体性能的。  X  [6 O( v) ~9 v8 N5 V1 P
    Reactor是并发编程中的一种基于事件驱动的设计模式, 它具有以下两个特点: 通过派发/分离I/O操作事件提高系统的并发性2 `6 m6 _# V1 G+ \& c) t
    能; 提供了粗粒度的并发控制, 使用单线程实现, 避免了复杂的同步处理。 典型的
    Reactor实现原理如图3-8所示。
    9 P5 x, L+ C. |+ ~" B% i- e0 F
    3-8 Reactor模式工作原理. A- n0 ^% f0 f1 v4 c
    典型的
    Reactor模式中主要包括以下几个角色。( g  |! O5 q0 [
    ❑ReactorI/O事件的派发者。) z4 U3 |2 X  ]: e9 ]; J
    ❑Acceptor接受来自Client的连接, 建立与Client对应的Handler, 并向Reactor注册此Handler
      t+ [2 c0 O+ j1 w, p
    ❑Handler与一个Client通信的实体, 并按一定的过程实现业务的处理。 Handler内部往往会有更进一步的层次划分, 用来抽
    ! `6 W  H& @& n0 E! N2 ]象诸如
    readdecodecomputeencodesend等过程。 在Reactor模式中, 业务逻辑被分散的I/O事件所打破, 所以Handler需要有适, w, R' B& U( u/ X; F
    当的机制在所需的信息还不全( 读到一半) 的时候保存上下文, 并在下一次
    I/O事件到来的时候( 另一半可读) 能继续上次中断9 a# {8 Z2 Z+ B9 _. f
    的处理。
    7 j" ]# s, b4 F7 J" T7 {& r
    ❑Reader/Sender为了加速处理速度, Reactor模式往往构建一个存放数据处理线程的线程池, 这样数据读出后, 立即扔到线2 ?4 ?0 }/ m7 O
    程池中等待后续处理即可。 为此,
    Reactor模式一般分离Handler中的读和写两个过程, 分别注册成单独的读事件和写事件, 并由对
    4 H. P* ]; ~: e应的
    ReaderSender线程处理。. T; N! L9 L5 K
    ip* z8 O1 R/ T4 o3 P
    c.Server
    实际上实现了一个典型的Reactor设计模式, 其整体架构与上述完全一致。 一旦读者了解典型Reactor架构便可很容易. a) L2 ]. m, ]" {% r6 S/ g) g
    地学习
    ipc.Server的设计思路及实现。 接下来, 我们分析ipc.Server的实现细节。
    + V- I4 j8 f9 Q# L7 X& r, q: N2 b前面提到,
    ipc.Server的主要功能是接收来自客户端的RPC请求, 经过调用相应的函数获取结果后, 返回给对应的客户端。 为4 P- {! E  R( e# ^+ P( r
    此,
    ipc.Server被划分成3个阶段: 接收请求、 处理请求和返回结果, 如图3-9所示。 各阶段实现细节如下。6 n! T8 E, K* N5 g! |: {% P
    3-9 Hadoop RPC Server处理流程, P- b( L5 i% h: C
    1) 接收请求
    - k/ n& Y* w( b. ^2 v! b该阶段主要任务是接收来自各个客户端的
    RPC请求, 并将它们封装成固定的格式( Call类) 放到一个共享队列( callQueue
    " H/ b. a* n6 i( x( d, _中, 以便进行后续处理。 该阶段内部又分为建立连接和接收请求两个子阶段, 分别由
    ListenerReader两种线程完成。, ]5 F) F+ m. ?  Q% @/ q" ^2 i- {
    整个
    Server只有一个Listener线程, 统一负责监听来自客户端的连接请求, 一旦有新的请求到达, 它会采用轮询的方式从线程
    . Z- t& B9 C# n1 H池中选择一个
    Reader线程进行处理, 而Reader线程可同时存在多个, 它们分别负责接收一部分客户端连接的RPC请求, 至于每个' z. k: T8 s. m! R4 |' l$ N. T" j
    Reader线程负责哪些客户端连接, 完全由Listener决定, 当前Listener只是采用了简单的轮询分配机制。" _4 u+ K  A1 ^1 W
    ListenerReader线程内部各自包含一个Selector对象, 分别用于监听SelectionKey.OP_ACCEPTSelectionKey.OP_READ事件。
    - @+ F+ T- W, L9 A6 y' J对于
    Listener线程, 主循环的实现体是监听是否有新的连接请求到达, 并采用轮询策略选择一个Reader线程处理新连接; 对于
    ( F: j8 T& N8 L  ?* U7 N& v7 b; J5 `. K
    Reader线程, 主循环的实现体是监听( 它负责的那部分) 客户端连接中是否有新的RPC请求到达, 并将新的RPC请求封装成Call
    , A3 K0 w* h# r8 i象, 放到共享队列
    callQueue中。  I' h+ X7 e5 Q: t0 w! \
    2) 处理请求
    * j! U) B( ~' e* K" ~该阶段主要任务是从共享队列
    callQueue中获取Call对象, 执行对应的函数调用, 并将结果返回给客户端, 这全部由Handler线
    5 P; `: b/ ?, S) [程完成。
    $ b" E+ u& D0 I6 s( J
    Server端可同时存在多个Handler线程, 它们并行从共享队列中读取Call对象, 经执行对应的函数调用后, 将尝试着直接将结果: Q6 I) _* A; L
    返回给对应的客户端。 但考虑到某些函数调用返回结果很大或者网络速度过慢, 可能难以将结果一次性发送到客户端, 此时) @2 Q- R( v! u
    Handler将尝试着将后续发送任务交给Responder线程。
    $ K% u+ t0 h$ Z! B# `- |( S
    3) 返回结果
    $ B+ h. W8 @7 [前面提到, 每个
    Handler线程执行完函数调用后, 会尝试着将执行结果返回给客户端, 但对于特殊情况, 比如函数调用返回结
      d# x8 i5 `* x2 e# J果过大或者网络异常情况( 网速过慢) , 会将发送任务交给
    Responder线程。  @1 |$ E/ C) I5 K# M7 m+ w+ H4 C$ p1 k
    Server端仅存在一个Responder线程, 它的内部包含一个Selector对象, 用于监听SelectionKey.OP_WRITE事件。 当Handler没能将, P: c8 H8 C3 r/ A/ F# @1 d
    结果一次性发送到客户端时, 会向该
    Selector对象注册SelectionKey.OP_WRITE事件, 进而由Responder线程采用异步方式继续发送
    ! g" B- \7 A! f4 \9 j1 i* b( j# \, ]未发送完成的结果。
    # L8 B% B3 k9 d# G. u; W$ p$ `
    3.3.6 Hadoop RPC参数调优
    - M+ }$ u1 U4 V" i- O6 EHadoop RPC对外提供了一些可配置参数, 以便于用户根据业务需求和硬件环境对其进行调优。 主要的配置参数如下。% v( t* U0 {9 _# s# y0 j9 H
    ❑Reader线程数目。 由参数ipc.server.read.threadpool.size配置, 默认是1, 也就是说, 默认情况下, 一个RPC Server只包含一个
    7 Y2 a' {* o' F  T
    Reader线程。* R. b1 a# ^( z; z" Q/ }
    ❑每个Handler线程对应的最大Call数目。 由参数ipc.server.handler.queue.size指定, 默认是100, 也就是说, 默认情况下, 每个1 v8 i- N) u% Y  P* F
    Handler线程对应的Call队列长度为100。 比如, 如果Handler数目为10, 则整个Call队列( 即共享队列callQueue) 最大长度为:( I) V$ y+ m- J% _( H
    100×10=10000 P# M$ e+ m" `$ ]6 b! p
    ❑Handler线程数目。 Hadoop中, ResourceManagerNameNode分别是YARNHDFS两个子系统中的RPC Server, 其对应的: E( ]2 A; r9 ~+ H7 M' Y2 s9 w$ B
    Handler数目分别由参数yarn.resourcemanager.resource-tracker.client.thread-countdfs.namenode.service.handler.count指定, 默认值分别为
    2 O/ b- x+ ]6 ?. @  [* O4 C
    5010, 当集群规模较大时, 这两个参数值会大大影响系统性能。
    2 l, H" S" q/ U4 m3 ~" l
    ❑客户端最大重试次数。 在分布式环境下, 因网络故障或者其他原因迫使客户端重试连接是很常见的, 但尝试次数过多可" x5 m" m! n5 X7 ?7 i
    能不利于对实时性要求较高的应用。 客户端最大重试次数由参数
    ipc.client.connect.max.retries指定, 默认值为10, 也就是会连续尝试; ~4 R7 B( \3 X2 l1 n! [# P; o, u6 y; w
    10次( 每两次之间相隔1秒) 。
    0 |$ ~8 X+ c: p4 x. U: D
    3.3.7 YARN RPC实现& V; T8 t5 Z& l- R
    当前存在非常多的开源RPC框架, 比较有名 的有Thrift [9] Protocol BuffersAvro。 同Hadoop RPC一样, 它们均由两部分组: f3 `9 V% d7 _
    成: 对象序列化和远程过程调用(
    Protocol Buflers官方仅提供了序列化实现, 未提供远程调用相关实现, 但三 方RPC库非常多 [10]
    5 m. H2 H( |* D& [5 L1 G) 。 相比于Hadoop RPC, 它们有以下几个特点:/ _. r2 z! u; E7 C, B) J/ `+ f$ l
    ❑跨语言特性 。 前面提到, RPC框架实际上是客户机服务器模型的一个应用实例, 对于Hadoop RPC而言, 由于Hadoop采用
    - \5 ?3 K/ m* T8 ]) d+ R% _
    Java语言编写, 因而其RPC客户端和服务器端仅支持Java语言; 但对于更通用的RPC框架, 如Thrift或者Protocol Buffers等, 其客户
    + H7 Z$ t6 _0 K( Q% |3 `# s. o端和服务器端可采用任何语言编写, 如
    JavaC++Python等, 这给用户编程带来极大方便。
    0 B" B1 Z4 ~, g/ h
    ❑引入IDL 。 开源RPC框架均提供了一套接口描述语言( Interface Description LanguageIDL) , 它提供一套通用的数据类型,6 z5 M( H+ V" a* a( N$ j/ n
    并以这些数据类型来定义更为复杂的数据类型和对外服务接口。 一旦用户按照
    IDL定义的语法编写完接口文件后, 可根据实际应2 p* g# k: d8 K8 H  C
    用需要生成特定编程语言( 如
    JavaC++Python等) 的客户端和服务器端代码。
    # d1 Q& _; }/ e  V) f- F
    ❑协议兼容性 。 开源RPC框架在设计上均考虑到了协议兼容性问题, 即当协议格式发生改变时, 比如某个类需要添加或者, F5 m8 V; a0 ~$ U  R. C/ M
    删除一个成员变量( 字段) 后, 旧版本代码仍然能识别新格式的数据, 也就是说, 具有向后兼容性。1 L+ O1 F: y. h+ c
    随着
    Hadoop版本的不断演化, 研发人员发现Hadoop RPC在跨语言支持和协议兼容性两个方面存在不足, 具体表现为:
    8 T) R( ^* i4 R  a% H9 x
    ❑从长远发展看, Hadoop RPC应允许某些协议的客户端或者服务器端采用其他语言实现, 比如用户希望直接使用C/C++语言
    ( p% e6 z, b, W1 o9 M9 x& `读写
    HDFS中的文件, 这就需要有C/C++语言的HDFS客户端。$ d2 h8 g- v* h6 E" p* Z" s
    ❑当前Hadoop版本较多, 而不同版本之间不能通信, 比如0.20.2版本的JobTracker不能与0.21.0版本中的TaskTracker通信, 如
    ' m: p1 L( R' b果用户企图这样做, 会抛出
    VersionMismatch异常。3 J: W" {! |$ I& ]1 T1 T4 o
    为了解决以上几个问题,
    Hadoop YARNRPC中的序列化部分剥离开, 以便将现有的开源RPC框架集成进来。 经过改进之
    : q- `6 f1 y9 e. v" e/ {7 D! V后,
    Hadoop RPC的类关系如图3-10所示, RPC类变成了一个工厂, 它将具体的RPC实现授权给RpcEngine实现类, 而现有的开源0 j, l3 p5 a3 v1 f! v$ O0 e$ Z
    RPC只要实现RpcEngine接口, 便可以集成到Hadoop RPC中。 在该图中, WritableRpcEngine是采用Hadoop自带的序列化框架实现的
    6 i7 m/ H% I8 C6 H  O& y7 m
    RPC, 而 AvroRpcEngine [11] ProtobufRpcEngine [12] 分别是开源RPC( 或序列化) 框架Apache AvroProtocol Buffers对应的# @: r! {! ^  w1 }$ V7 V: C( U
    RpcEngine实现, 用户可通过配置参数rpc.engine.{protocol}以指定协议{protocol}采用的序列化方式。 需要注意的是, 当前实现
    1 C: p8 y6 e9 f' W$ ^* G. J7 J中,
    Hadoop RPC只是采用了这些开源框架的序列化机制, 底层的函数调用机制仍采用Hadoop自带的。4 `7 [; Z# g! \: t# P
    YARN提供的对外类是YarnRPC, 用户只需使用该类便可以构建一个基于Hadoop RPC且采用Protocol Buffers序列化框架的通信. h( B$ ^) x7 P3 x$ ?& l
    协议。
    YarnRPC相关实现类如图3-11所示。 YarnRPC是一个抽象类, 实际的实现由参数yarn.ipc.rpc.class指定, 默认值是5 `* [3 A8 C, _4 J, B1 x6 R
    org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPCHadoopYarnProtoRPC通过RPC工厂生成器( 工厂设计模式) RpcFactoryProvider( Q$ {  R) |3 b, {8 D
    成客户端工厂( 由参数
    yarn.ipc.client.factory.class指定, 默认值是org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl) 和服
    5 @! J9 E/ x+ \4 q7 Z3 E! y务器工厂( 由参数
    yarn.ipc.server.factory.class指定, 默认值是org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl) , 以根
    ; b( J, V7 x4 {! a, k$ e: u" L  ?据通信协议的
    Protocol Buffers定义生成客户端对象和服务器对象。
    4 j$ p  E+ C1 O  ~3 O* q, Y; P3-10 Hadoop RPC 集成多种开源RPC 框架! `8 S# C* w2 F& \; `2 |
    3-11 YarnRPC 相关类图/ ^; V) x6 W! V0 U  ?5 ]
    ❑RpcClientFactoryPBImpl根据通信协议接口( 实际上就是一个Java interface) 及Protocol Buffers定义构造RPC客户端句柄, 但) }, P+ Z) m7 _8 [8 Z' R. R' N6 B
    它对通信协议的存放位置和类名命有一定要求。 假设通信协议接口
    Xxx所在Java包名为XxxPackage, 则客户端实现代码必须位于
    % ~( I, K* `' F
    JavaXxxPackage.impl.pb.client中( 在接口包名后面增加".impl.pb.client") , 且实现类名为PBClientImplXxx( 在接口名前面增加前
    4 L" I- u7 I! ~6 \+ g
    "PBClientImpl") 。3 A* ]- W. y+ c
    ❑RpcServerFactoryPBImpl根据通信协议接口( 实际上就是一个Java interface) 及Protocol Buffers定义构造RPC服务器句柄
    3 P9 ]& H: A; ?# g/ O/ m4 |, M* T! x(具体会调用前面节介绍的RPC.Server类) , 但它对通信协议的存放位置和类命名有一定要求。 假设通信协议接口Xxx所在Java
    1 v) ~; w2 j- ]) t9 J& V0 m+ T* a! i名为
    XxxPackage, 则客户端实现代码必须位于JavaXxxPackage.impl.pb.server中(在接口包名后面增加".impl.pb.server") , 且实现
    : R- D: ]% k- _7 I. h; K类名为
    PBServiceImplXxx(在接口名前面增加前缀"PBServiceImpl") 。% f) t! ^! c- E. }/ S# Y' l1 i
    Hadoop YARN已将Protocol Buffers作为默认 的序列化机制 [13] (而不是Hadoop自带的Writable) , 这带来的好处主要表现在以' C+ x8 ^5 \. n; p& V
    下几个方面:' X0 w7 i, z$ m) E+ @& y- S
    ❑继承了Protocol Buffers的优势 Protocol Buffers已在实践中证明了其高效性、 可扩展性、 紧凑性和跨语言特性。 首先, 它允! ]& o' x* j! Q# f' O  g9 _( v
    许在保持向后兼容性的前提下修改协议, 比如为某个定义好的数据格式添加一个新的字段; 其次, 它支持多种语言, 进而方便用! o9 R6 @2 L6 `* i9 S% h  w
    户为某些服务(比如
    HDFSNameNode) 编写 非Java客户端 [14] ; 此外, 实验表明Protocol BuffersHadoop 自带的Writable在性能
    # C4 t1 @6 U7 d% L, n+ h& K: ^: B方面有很大提升。4 j' l6 L& l0 N. t" o+ p
    ❑支持升级回滚 Hadoop 2.0已经将NameNode HA方案合并进来, 在该方案中, Name-Node分为ActiveStandby两种角色,
    ( E% q  E; x: t( H4 Z1 x. F其中,
    Active NameNode在当前对外提供服务, 而Standby NameNode则是能够在Active NameNode出现故障时接替它。 采用Protocol
    / P9 I- y4 t! x2 V2 KBuffers
    序列化机制后, 管理员能够在不停止NameNode对外服务的前提下, 通过主备NameNode之间的切换, 依次对主备
    ! L4 h) ]( m% c9 h
    NameNode进行在线升级(不用考虑版本和协议兼容性等问题) 。$ O7 W7 v4 l2 z  s  \
    3.3.8 YARN RPC应用实例; f4 e9 N1 W; W* v9 _) C
    为了进一步说明YARN RPC的使用方法, 本小节给出一个具体的应用实例。/ @) x) K# g& Z9 Y/ F2 g7 ~; {
    YARN中, ResourceManagerNodeManager之间的通信协议是ResourceTracker, 其中NodeManager是该协议的客户3 k: B) n' o. e6 g$ _. f+ _% i
    端,
    ResourceManager是服务端, NodeManager通过该协议中定义的两个RPC函数( registerNodeManagernodeHeartbeat) 向
    + ]8 d- r$ [; I/ \* n: n! {
    ResourceManager注册和周期性发送心跳信息。 ResourceManager(服务器端) 中的相关代码如下:8 g5 d" k# q' F% p
    // ResourceTrackerService实现了ResourceTracker通信接口, 并启动RPC Server
    ; \4 r$ z/ n: F# d4 b! U' l2 Mpublic class ResourceTrackerService extends AbstractService implements) ?) H. K6 {& ^3 M. J# J
    ResourceTracker {3 r7 i( Z. B! F6 Y
    private Server server;
    ' @; T$ e& b6 ^1 C/ P/ N...  g! Y; [  J& G3 U& T
    protected void serviceStart() throws Exception {
    6 G. h! X- b5 T+ Ksuper.serviceStart();, M* J  H' d* F2 W, @" D0 E1 @- ?
    Configuration conf = getConfig();% Q8 |+ O- a' O1 a% {
    YarnRPC rpc = YarnRPC.create(conf); //使用YarnRPC1 k& M6 p1 [% [; J
    this.server = rpc.getServer(ResourceTracker.class, this, resourceTrackerAddress,9 N3 G' p) J0 P) M9 j$ N$ \# _
    conf, null, conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT,& x* B4 N' i" N4 v. R% i8 O
    YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT));
    $ i" R' s& Q$ o, {( Sthis.server.start();
    , ]# m: v8 k$ {6 s+ \  y# D) B0 q+ y}.
    , I" |6 c% T* V6 i2 D) a( o..& k  _6 y$ y1 ?$ h; K  e5 T0 h  X( b
    @Override+ q) \! P1 @. f% u; @' M4 V
    public RegisterNodeManagerResponse registerNodeManager(
    * |1 h) ^3 e) d9 ^' HRegisterNodeManagerRequest request) throws YarnException,+ M5 [; M0 T  J4 I  G( d8 c/ `
    IOException {" q+ }$ K* Q* ^
    //具体实现
    0 s0 F3 n. ?- G% U; t: |
    }@
    ! s+ Y0 c& U# d5 k2 \Override# u4 f8 }( Q( L# G3 ?1 ]
    public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
    " }' [8 f5 {8 ~! F! o# Othrows YarnException, IOException {, T  l5 C: B; T( D& b; R
    //具体实现
    + e$ P* i" c2 y( Q4 h2 K
    }
    9 W* {# x! D6 n- F0 ?; l}
    ; {' n8 _4 U* C/ d0 eNodeManager(客户端) 中的相关代码如下。
    # D5 U. F9 z! _  k4 _, B- A. t
    // 该函数是从YARN源代码中简单修改而来的
    6 W/ D+ B2 m2 {/ B* o
    protected ResourceTracker getRMClient() throws IOException {
    9 j; D, r) M% eConfiguration conf = getConfig();
    5 z! i4 C: z' `, v3 S6 {2 uInetSocketAddress rmAddress = getRMAddress(conf, protocol);
    ! q8 s# [+ F  h! J- ERetryPolicy retryPolicy = createRetryPolicy(conf);* k  h: K2 G5 v& p
    ResourceTracker proxy = RMProxy.<T>getProxy(conf, ResourceTracker.class, rmAddress);
    8 R' V: {' R* ?5 xLOG.info("Connecting to ResourceManager at " + rmAddress);$ i7 n" H' _, K4 p3 q* z4 k
    return (ResourceTracker) RetryProxy.create(protocol, proxy, retryPolicy);
    # R$ k5 J0 o, p( P7 r3 S6 z}.
    4 C, o. a: f) T* L0 h, v..9 e6 U# ~: ?8 W
    this.resourceTracker = getRMClient();
    ) D- j0 d' x, z9 {& h" A...
    9 P4 v; y5 i& B8 R5 b/ O, fRegisterNodeManagerResponse regNMResponse = resourceTracker.registerNodeManager(request);
    & T" M2 Z- F, o0 \" [...
    # W* j/ O3 S. q, U; K: _response = resourceTracker.nodeHeartbeat(request);$ ~% y' G! O6 h, M
    为了能够让以上代码正常工作, YARN按照以下流程实现各种功能。
    6 z7 m# D: W; d; n4 O- Q步骤1 定义通信协议接口( Java Interface) 。 定义通信协议接口ResourceTracker, 它包含registerNodeManagernodeHeartbeat
    ) z. E, U/ W9 v8 T( E) H% K
    两个函数, 且每个函数包含一个参数和一个返回值, 具体如下:
    % {+ @# y, S( R) g# s
    public interface ResourceTracker {! o/ ]* }" q7 H# \, I. B' F6 d
    public RegisterNodeManagerResponse registerNodeManager(
    5 y( r) m' r, ?8 n) h3 n) r8 WRegisterNodeManagerRequest request) throws YarnException, IOException;; e4 d/ K. J9 M/ Z
    public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
    ) I& r: v/ s* s( g* a! n  `  ethrows YarnException, IOException;3 f3 K, _% T2 z! B! n' x( ^5 u
    }$ f! z. J, [8 m5 z8 L& x6 N# S* m: X$ Y1 ]
    步骤2 为通信协议ResourceTracker提供Protocol Buffers定义和Java实现。 前面提到, Protocol Buffers仅提供了序列化框架, 但
    6 i! S3 p. r  P% T  f4 j' p未提供
    RPC实现, 因此RPC部分需要由用户自己实现, 而YARN则让ResourceTrackerService类实现了ResourceTracker协议, 它的5 J# D' E/ ]% X1 R9 K4 z0 R! @
    Protocol Buffers定义( 具体见文件ResourceTracker.proto) 如下:
    5 g' S) x8 |$ d6 b$ y! \9 y
    option java_package = "org.apache.hadoop.yarn.proto";5 O8 p( m( H$ X, r, I; g( P
    option java_outer_classname = "ResourceTracker";% V6 v# h: ^1 c0 N) y) x# |, r) C
    option java_generic_services = true;
    . }; v8 s# n+ E  T1 o4 U' e: }option java_generate_equals_and_hash = true;3 l1 |! z, B! Q
    import "yarn_server_common_service_protos.proto";- A6 F& E0 D7 h. R6 u" E  c& h* D
    service ResourceTrackerService {
    & n& _  W$ y# Xrpc registerNodeManager(RegisterNodeManagerRequestProto) returns (RegisterNodeManagerResponseProto);& a  h2 D# v* r- G# A5 ^) ^
    rpc nodeHeartbeat(NodeHeartbeatRequestProto) returns (NodeHeartbeatResponseProto);
    + C6 w% t/ C1 i+ b* U}
    + f+ _: a! u& g# Z" r; ~ResourceTrackerRPC函数实现是由ResourceManager中的ResourceTrackerService完成的。
    3 Y8 i( x, x( h; t步骤
    3 RPC函数的参数和返回值提供Protocol Buffers定义。 YARN需要保证每个RPC函数的参数和返回值是采用Protocol
    * n6 _( T& E6 r$ Y1 V0 K, r. S( `0 pBuffers
    定义的, 因此ResourceTracker协议中RegisterNodeManagerRequestRegisterNodeManagerResponseNodeHeartbeatRequest8 Y7 ?  c( G% B$ o" N) X/ `
    NodeHeartbeatResponse四个参数或者返回值需要使用Protocol Buffers定义, 具体如下( 见yarn_server_common_service_protos.proto+ V/ ^4 i( Z2 N7 ?9 J
    件) :
    0 _1 c. r+ j2 [/ g, s$ |
    import "yarn_protos.proto";
    7 F4 K: S- J1 [9 q! n! A9 T5 p5 Fimport "yarn_server_common_protos.proto";3 v: F1 y2 }- X; d7 `
    message RegisterNodeManagerRequestProto {
    2 \) v2 [4 b+ C# k5 v5 T! `optional NodeIdProto node_id = 1;% b  @! o" G9 d6 j8 ^+ S% G9 I
    optional int32 http_port = 3;: X5 D# v3 s3 r9 F0 w
    optional ResourceProto resource = 4;
    + O+ n% e+ u3 @& s2 h" o} m
    : y* m1 n$ |+ J4 }* H, gessage RegisterNodeManagerResponseProto {1 N2 c% p; W/ _! Q' W+ A5 }% o1 `
    optional MasterKeyProto container_token_master_key = 1;
    8 \) u$ m) X( v) c  z* y% qoptional MasterKeyProto nm_token_master_key = 2;
    ) S# j8 v& n) o3 ~: [2 ^. Koptional NodeActionProto nodeAction = 3;
    4 G6 r% c3 a0 P: L& R+ R- I! p$ }# `optional int64 rm_identifier = 4;( k# |& ?& M1 s3 f
    optional string diagnostics_message = 5;7 G. J% K. q6 }
    }.
    $ |" R* s- N( |* h5 x, V.. //其他几个参数和返回值的定义
    8 n/ l- a1 g% i0 a8 f9 g* R9 B
    步骤4 RPC函数的参数和返回值提供Java定义和封装。 YARN采用了Protocol Buffers作为参数和返回值的序列化框架, 且以
    * d( r' R2 P0 d原生态
    .proto文件的方式给出了定义, 而具体的Java代码生成需在代码编写之后完成。 基于以上考虑, 为了更容易使用Protocol" P  d6 C0 ^- O. g
    Buffers
    生成的( Java语言) 参数和返回值定义, YARN RPC为每个RPC函数的参数和返回值提供Java定义和封装, 以参数6 T0 A! r7 L* O. ~
    RegisterNodeManagerRequest为例进行说明。3 v! v/ n- A8 b* Q4 k
    Java接口定义如下( 见Javaorg.apache.hadoop.yarn.server.api.protocolrecords) :8 E4 h& n  x( ^# c
    public interface RegisterNodeManagerRequest {* r: l2 y' R$ T  B' p7 t
    NodeId getNodeId();
    , b$ \2 p0 u# G- o1 L3 ]4 d# xint getHttpPort();
    9 a5 u( g/ r" a: P2 J: G1 P0 T! RResource getResource();. h, R# \) r& ^: [! w! D; p
    void setNodeId(NodeId nodeId);
    8 w! d8 A* L8 w& Uvoid setHttpPort(int port);" l/ f5 w( _0 N# a" i
    void setResource(Resource resource);6 r7 j$ J- j% F5 W9 u
    }
    6 `! c- L6 i8 E3 g  V( f7 PJava封装如下( 见Javaorg.apache.hadoop.yarn.server.api.protocolrecords.impl.pb) :4 I& s; I) B, r4 G8 `1 X6 m1 G* ~
    public class RegisterNodeManagerRequestPBImpl extends
    / P# i( {& u# t1 [+ sProtoBase<RegisterNodeManagerRequestProto> implements RegisterNodeManagerRequest {
    / j' M% y7 D( x0 J! I4 O( E1 cRegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance();& [  i* d5 N* H# t& N# c4 B% M
    RegisterNodeManagerRequestProto.Builder builder = null;
    $ w' B6 ~# q; R! iprivate NodeId nodeId = null;5 q9 X) d* Y4 }& i
    ...
    * S9 Y# u. F; S( X@Override( c3 G( |" g/ R7 F' a
    public NodeId getNodeId() {' X# D+ F9 ^1 [9 |' F
    RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;2 R/ u4 j. W" _0 R
    if (this.nodeId != null) {) s/ n2 B# V5 r
    return this.nodeId;
    1 e! N7 x$ ^3 Q* ]0 b9 M& X}i
      c2 y% k+ B0 g0 m! ~9 p" zf (!p.hasNodeId()) {
    - J7 c, C% n0 Preturn null;
    3 }6 w9 S" W* k9 \% u" i} t
    5 O$ ?) I/ _" N+ F4 [his.nodeId = convertFromProtoFormat(p.getNodeId());
    ! t7 e# o# R9 Mreturn this.nodeId;" c- T% `7 h  k# B
    } @
    % `, E: x# C% L3 g+ _' `Override* b0 }9 g7 Z- @+ f. P- s/ j
    public void setNodeId(NodeId nodeId) {( k* ^5 W" v( O
    maybeInitBuilder();
    + @0 J+ I- t0 F7 G7 J2 h) dif (nodeId == null)
    3 ]- }- k  w2 Nbuilder.clearNodeId();0 o! q! l& g, l  P2 u4 s' T
    this.nodeId = nodeId;
    6 I8 }; g$ p+ I) M: h} .2 v9 ], a4 L0 J" p9 h; X, u2 |  b
    ..
    % c( G' C9 d* b8 N( k}5 f0 x. Y  }, A9 \/ f
    步骤5 为通信协议提供客户端和服务器端实现。 客户端代码放在org.apache.hadoop.yarn.server.api.impl.pb.client包中, 且类名为* ?1 x! Y; D" O" V  c
    ResourceTrackerPBClientImpl, 实现如下:
    5 |; p4 O* S. v1 ?# B
    public class ResourceTrackerPBClientImpl implements ResourceTracker, Closeable {
    # ^: i& H5 r/ h' n4 f! A9 aprivate ResourceTrackerPB proxy;
    % H) J+ V0 q+ R' _# ?public ResourceTrackerPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException {8 Q$ I- ?. k( R/ ~; l
    RPC.setProtocolEngine(conf, ResourceTrackerPB.class, ProtobufRpcEngine.class);
    - Q5 f5 _) O) b! i: Tproxy = (ResourceTrackerPB)RPC.getProxy(/ x3 b5 ~0 d/ \+ B5 X
    ResourceTrackerPB.class, clientVersion, addr, conf);
    % Q" u/ y8 p% q' o! U7 A} @- @& T! _' f# o6 Q
    Override
    + J2 o+ _0 g# Q4 }4 o6 m5 B0 s4 ipublic RegisterNodeManagerResponse registerNodeManager(
    9 D3 a" j$ D" Z4 }8 c& |4 b9 P! gRegisterNodeManagerRequest request) throws YarnException,/ ]* b' J6 V* u5 i$ C
    IOException {% J$ V: s8 V, B& p: J, f
    RegisterNodeManagerRequestProto requestProto = ((RegisterNodeManagerRequestPBImpl)request).getProto();
      Z. b; K6 w4 D& rtry {3 D' x, }4 H4 {. n8 q" q6 L
    return new RegisterNodeManagerResponsePBImpl(proxy.registerNodeManager (null, requestProto));0 r  i5 P4 ?& s; ~# v6 V: {
    } catch (ServiceException e) {  ]4 J, M, V7 l& e5 t. t2 T
    RPCUtil.unwrapAndThrowException(e);( n9 L5 m1 m/ w% I) i
    return null;. L( p$ `* Z: }" z
    }" w; y5 _" H8 Y, m6 F* _
    } ., ?/ y5 q; `2 w( j
    ..; L  e9 J5 _' M5 j
    }, R: T( E- q! D* n" I. ~/ e
    服务端代码放在org.apache.hadoop.yarn.server.api.impl.pb.server包中, 且类名为Resource-TrackerPBServerImpl, 实现如下:
    7 v: X' d' @1 L7 G
    public class ResourceTrackerPBServiceImpl implements ResourceTrackerPB {9 ~1 u9 k8 h* s- C  M
    private ResourceTracker real;- R( T1 x/ i; d+ k
    public ResourceTrackerPBServiceImpl(ResourceTracker impl) {2 t2 g& t8 R& b; X5 V& U
    this.real = impl;; K( _) g& }# Q5 i$ |& v
    } @- W( N/ F6 R9 c/ g
    Override1 o( i8 ^& x7 H7 ^2 H" d7 F' K
    public RegisterNodeManagerResponseProto registerNodeManager(  q& z2 \6 P1 x/ i: J7 ], O/ f9 N
    RpcController controller, RegisterNodeManagerRequestProto proto)
    4 E: B0 D  q4 m8 T: u+ A8 bthrows ServiceException {% F$ t! Z. p/ |  U4 @8 J
    RegisterNodeManagerRequestPBImpl request = new RegisterNodeManagerRequestPBImpl(proto);1 W+ D  `- C4 s- R! H# M
    try {
    - ]8 Q7 X' ]5 Z) f' P. ^7 P$ wRegisterNodeManagerResponse response = real.registerNodeManager(request);6 r' x! X. J/ A7 L3 c: q1 D
    return ((RegisterNodeManagerResponsePBImpl)response).getProto();$ \; y2 ?7 z- f. \2 y
    } catch (YarnException e) {
    ( m2 ]: X% o2 x0 e& D( P! u, o0 othrow new ServiceException(e);
    # i7 s, i2 |0 H+ @2 w# s: k} catch (IOException e) {
    ) @9 m# L& T& |throw new ServiceException(e);
    ' \: v6 y8 D8 \% o}, ~% H+ k4 m( t# w, q% S2 c
    } .
    / Z: s% ~( C6 F1 L: p..0 v/ m" p- o+ {' |* j& c3 Y+ M
    }; c" D# _* Y$ |6 c$ T
    总结上面几个步骤, 为了实现基于Protocol Buffers序列化框架的YARN RPC通信协议ResourceTrackerYARN实现了一系列
      V# c; m& w2 a& g
    Java接口定义和Protocol Buffers封装, 具体如图3-12所示( 以服务器端实现为例) 。
      \. M9 M' J1 g; x: |3-12 YARN RPC中的Protocol Buffers封装
    ) l7 B3 A; b  c1 X, _2 L
    [6] 参见网址http://en.wikipedia.org/wiki/Remote_procedure_call9 p7 M! Y6 C; Y+ |
    [7] Doug CuttingHadoop最初设计时就是这样描述Hadoop RPC设计动机的。
    # h4 @; R/ @8 }0 T) P
    [8] HDFS的单点故障已经在Hadoop 2.0中得到了解决, MRv1中的JobTracker的单点故障在CDH4中也得到了解决。. P4 u6 M9 x5 c# N8 q  D$ ~- x
    [9] 参见网址http://thrift.apache.org/
    7 r$ K. i  j' m8 M
    [10] 参见网址: http://code.google.com/p/protobuf/wiki/ThirdParty/AddOns
    / {+ u0 z1 j, @3 n
    [11] AvroRpcEngineHadoop 0.21.0版本开始出现。
    * N% F- i6 x! o" Y$ Y  D8 }5 h
    [12] ProtobufRpcEngineHadoop 2.0-apha版本开始出现。
    ) y( ^  D8 s3 O& ^% U  T0 M
    [13] 参见网址https://issues.apache.org/jira/browse/HADOOP-7347
    / J$ v5 l# l9 m
    [14] Hadoop 2.0中的RFC框架是采用Java编写的, 尚不能像ThriftAvro那样支持多语言编程, 但引入Protocol Buffers序列化框架则
    ! i9 {7 i+ k1 W% k* G使其向前迈进了一步。
      ( U9 H) S) G( o2 d

    , j) D' W5 ]8 v! Q/ @- ^$ L. D! u  i; K. I4 Q' A8 D; j- I
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|Archiver|手机版|小黑屋|Java自学网

    GMT+8, 2025-1-22 12:37 , Processed in 0.519927 second(s), 30 queries .

    Powered by Javazx

    Copyright © 2012-2022, Javazx Cloud.

    快速回复 返回顶部 返回列表