java自学网VIP

Java自学网

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 2958|回复: 0

《深入解析YARN架构设计与实现原理》第3章 YARN基础库【3.3】part2

[复制链接]
  • TA的每日心情
    开心
    2021-5-25 00:00
  • 签到天数: 1917 天

    [LV.Master]出神入化

    2025

    主题

    3683

    帖子

    6万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    66345

    宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2017-4-17 13:56:45 | 显示全部楼层 |阅读模式
    3.3.5 Hadoop RPC类详解: @- }, f0 C7 r% W4 J" S4 A
    Hadoop RPC主要由三个大类组成, 即RPCClientServer, 分别对应对外编程接口、 客户端实现和服务器实现。+ X9 \8 g  M9 @+ h
    1.ipc.RPC类分析% j( X2 b! q+ T" s, o  f
    RPC类实际上是对底层客户机服务器网络模型的封装, 以便为程序员提供一套更方便简洁的编程接口。4 M/ Q( ~3 ~& J
    如图
    3-4所示, RPC类定义了一系列构建和销毁RPC客户端的方法, 构建方法分为getProxywaitForProxy两类, 销毁方只有一
    ( y" d* i3 E5 \2 j个, 即为
    stopProxyRPC服务器的构建则由静态内部类RPC.Builder, 该类提供了一些列setXxx方法( Xxx为某个参数名称) 供用户
    . i. m! N. ?( O8 L设置一些基本的参数, 比如
    RPC协议、 RPC协议实现对象、 服务器绑定地址、 端口号等, 一旦设置完成这些参数后, 可通过调用' s4 o' D4 H$ {9 [
    RPC.Builder.build()完成一个服务器对象的构建, 之后直接调用Server.start()方法便可以启动该服务器。. g3 G' h$ `# D) F3 f
    Hadoop 1.x中的RPC仅支持基于Writable序列化方式不同, Hadoop 2.x允许用户使用其他序列化框架, 比如Protocol Buffers6 d& J7 x" k& ~  y# |6 r: ?# v
    等, 目前提供了WritableWritableRpcEngine) 和Protocol BuffersProtobufRpcEngine) 两种, 默认实现是Writable方式, 用户可通过
    8 K9 l; O7 q7 ?2 `$ ~6 e调用
    RPC.setProtocolEngine(…)修改采用的序列化方式。: V5 B9 B4 E  |0 {% q1 B$ b
    下面以采用
    Writable序列化为例( 采用Protocol Buffers的过程类似) , 介绍Hadoop RPC的远程过程调用流程。 Hadoop RPC使用
    7 o3 k( M2 P# Y6 H% p! m
    Java动态代理完成对远程方法的调用: 用户只需实现java.lang.reflect.InvocationHandler接口, 并按照自己需求实现invoke 方法即可
    ; Y3 O. _& p' g2 |+ W完成动态代理类对象上的方法调用。 但对于
    Hadoop RPC, 函数调用由客户端发出, 并在服务器端执行并返回, 因此不能像单机7 g  A! w' N4 `) _
    程序那样直接在
    invoke 方法中本地调用相关函数, 它的做法是, 在invoke方法中, 将函数调用信息( 函数名, 函数参数列表等)
    9 e: W% ~3 I6 g' n8 z% s3 z0 \打包成可序列化的
    WritableRpcEngine.Invocation对象, 并通过网络发送给服务器端, 服务端收到该调用信息后, 解析出和函数名,. [" t& L, L/ A( @% T# u
    函数参数列表等信息, 利用
    Java反射机制完成函数调用, 期间涉及到的类关系如下图所示。- m3 [3 Z5 H3 G+ J9 y3 r0 `
    3-4 HadoopRPC的主要类关系图* p" v- T; @+ M. l$ |0 x; C+ r( _1 `
    3-5 HadoopRPC中服务器端动态代理实现类图
    , S1 a/ R, p+ m' F  e
    2.ipc.Client3 M6 P3 N5 S. N  ]" S
    Client主要完成的功能是发送远程过程调用信息并接收执行结果。 它涉及到的类关系如图3-6所示。 Client类对外提供了一类执
    9 Z5 c% q$ q, U2 \  J+ V行远程调用的接口, 这些接口的名称一样, 仅仅是参数列表不同, 比如其中一个的声明如下所示:' `4 R0 O! @" y; G0 \& p$ w
    public Writable call(Writable param, ConnectionIdremoteId)9 p" p1 Z9 u* w/ Z
    throws InterruptedException, IOException;
    0 y  T, {- q% Y3-6 Client类图8 c2 S3 F& D! C5 p
    Client内部有两个重要的内部类, 分别是CallConnection6 C3 ~0 r& M$ I; }7 Z" O/ P! N
    ❑Call: 封装了一个RPC请求, 它包含5个成员变量, 分别是唯一标识id、 函数调用信息param、 函数执行返回值value、 出
    . d# _8 Q! s7 t4 A0 o8 c错或者异常信息
    error和执行完成标识符done。 由于Hadoop RPC Server采用异步方式处理客户端请求, 这使远程过程调用的发生顺
    . ?) ]: z) T; F% d  ?5 N, _& d序与结果返回顺序无直接关系, 而
    Client端正是通过id识别不同的函数调用的。 当客户端向服务器端发送请求时, 只需填充id9 w6 S2 I3 j9 E6 W, t1 A
    param两个变量, 而剩下的3个变量( valueerrordone) 则由服务器端根据函数执行情况填充。0 P" |( y) M' u3 k  F
    ❑ConnectionClient与每个Server之间维护一个通信连接, 与该连接相关的基本信息及操作被封装到Connection类中, 基本
    4 G# K4 I  E. F4 X% O8 t信息主要包括通信连接唯一标识(
    remoteId) 、 与Server端通信的Socketsocket) 、 网络输入数据流( in) 、 网络输出数据流; {. w" N& o0 O. k% w( k; m
    out) 、 保存RPC请求的哈希表( calls) 等。 操作则包括:/ h1 Z0 f: [6 f4 C) o
    ❍addCall—将一个Call对象添加到哈希表中;" f0 S$ J8 Q/ E* x
    ❍sendParam—向服务器端发送RPC请求;
    , J6 Y5 f5 A4 ~& }% G
    ❍receiveResponse —从服务器端接收已经处理完成的RPC请求;3 |5 h/ t/ T* N7 G6 c: r
    ❍run—Connection是一个线程类, 它的run方法调用了receiveResponse方法, 会一直等待接收RPC返回结果。
    * D0 m, j( A; D) v+ k当调用
    call函数执行某个远程方法时, Client端需要进行( 如图3-7所示) 以下4个步骤。% z# k+ V: k8 r2 s% G
    1) 创建一个Connection对象, 并将远程方法调用信息封装成Call对象, 放到Connection对象中的哈希表中;2 F! l0 u0 U4 t' G3 B4 e
    2) 调用Connection类中的sendRpcRequest()方法将当前Call对象发送给Server端;3 w" R5 i" Y( P* d# m, }# B( p
    3Server端处理完RPC请求后, 将结果通过网络返回给Client端, Client端通过receiveRpcResponse()函数获取结果;! J, n! e( ]% J9 @$ u% ~  L
    4Client检查结果处理状态( 成功还是失败) , 并将对应Call对象从哈希表中删除。
    + l% P- c% L! n( a- I
    3-7 Hadoop RPC Client处理流程0 ~+ q6 r: ]& u) \' o  P1 i4 B; {
    3.ipc.Server类分析
    4 L' I2 v  x& o: M' ^
    Hadoop采用了Master/Slave结构, 其中Master是整个系统的单点, 如NameNodeJobTracker [8] , 这是制约系统性能和可扩展
    ' k1 Q7 x! A2 J0 R% `性的最关键因素之一; 而
    Master通过ipc.Server接收并处理所有Slave发送的请求, 这就要求ipc.Server 将高并发和可扩展性作为设计
    ' Y5 J' Z5 G; ^! z目标。 为此,
    ipc.Server采用了很多提高并发处理能力的技术, 主要包括线程池、 事件驱动和Reactor设计模式等, 这些技术均采用
    * Z. @( z; T4 \+ o" V
    JDK自带的库实现, 这里重点分析它是如何利用Reactor设计模式提高整体性能的。
    ) t# S2 y$ n: |9 B! {
    Reactor是并发编程中的一种基于事件驱动的设计模式, 它具有以下两个特点: 通过派发/分离I/O操作事件提高系统的并发性; S9 A1 T% C$ z  V% a& ]
    能; 提供了粗粒度的并发控制, 使用单线程实现, 避免了复杂的同步处理。 典型的
    Reactor实现原理如图3-8所示。8 w6 H; E) z, F$ k7 V4 d# P* Q
    3-8 Reactor模式工作原理7 e5 k% P" T# T# S' K7 B  }/ c5 f
    典型的
    Reactor模式中主要包括以下几个角色。
    8 d9 D  i, B2 {7 y9 d& W$ A* b❑ReactorI/O事件的派发者。
    . x8 E: D- l8 d" J& j: s1 U
    ❑Acceptor接受来自Client的连接, 建立与Client对应的Handler, 并向Reactor注册此Handler
    2 X3 T, I) X7 N
    ❑Handler与一个Client通信的实体, 并按一定的过程实现业务的处理。 Handler内部往往会有更进一步的层次划分, 用来抽
    : k4 |5 n, }  E+ G/ ]象诸如
    readdecodecomputeencodesend等过程。 在Reactor模式中, 业务逻辑被分散的I/O事件所打破, 所以Handler需要有适
    3 v6 F5 y0 O3 u7 h当的机制在所需的信息还不全( 读到一半) 的时候保存上下文, 并在下一次
    I/O事件到来的时候( 另一半可读) 能继续上次中断' g9 |" ?" q( J2 q' S, T
    的处理。
    + C8 }, G+ |# p3 R8 y+ R) J2 @
    ❑Reader/Sender为了加速处理速度, Reactor模式往往构建一个存放数据处理线程的线程池, 这样数据读出后, 立即扔到线/ i/ j* V. S" e( z4 O! F2 a
    程池中等待后续处理即可。 为此,
    Reactor模式一般分离Handler中的读和写两个过程, 分别注册成单独的读事件和写事件, 并由对
    5 y: t; B4 |$ J% L, U应的
    ReaderSender线程处理。) I5 Q# D; ~) W5 g
    ip
    . V" \, ?/ z. m9 M; Dc.Server
    实际上实现了一个典型的Reactor设计模式, 其整体架构与上述完全一致。 一旦读者了解典型Reactor架构便可很容易
    - @4 C) a& ]6 q5 w3 k地学习
    ipc.Server的设计思路及实现。 接下来, 我们分析ipc.Server的实现细节。# F* v6 P! _+ h# C/ l
    前面提到,
    ipc.Server的主要功能是接收来自客户端的RPC请求, 经过调用相应的函数获取结果后, 返回给对应的客户端。 为2 [: ^* }6 x- z$ o: Z7 }
    此,
    ipc.Server被划分成3个阶段: 接收请求、 处理请求和返回结果, 如图3-9所示。 各阶段实现细节如下。1 y$ S; w8 c. Y" B' A# }
    3-9 Hadoop RPC Server处理流程
    0 r& E/ e/ c( }# h' [8 q
    1) 接收请求
    ' V; a( V+ R, [( |2 @该阶段主要任务是接收来自各个客户端的
    RPC请求, 并将它们封装成固定的格式( Call类) 放到一个共享队列( callQueue" F+ V1 t+ y+ @; Z
    中, 以便进行后续处理。 该阶段内部又分为建立连接和接收请求两个子阶段, 分别由
    ListenerReader两种线程完成。
    ; T% {8 K& `! u4 p! ~! s3 v5 D整个
    Server只有一个Listener线程, 统一负责监听来自客户端的连接请求, 一旦有新的请求到达, 它会采用轮询的方式从线程7 P# c4 V! o* j' `0 U5 p" O
    池中选择一个
    Reader线程进行处理, 而Reader线程可同时存在多个, 它们分别负责接收一部分客户端连接的RPC请求, 至于每个) X" |- z6 M7 n, w2 v) ^, Y) c
    Reader线程负责哪些客户端连接, 完全由Listener决定, 当前Listener只是采用了简单的轮询分配机制。2 o; W& q3 G% z  `3 Q
    ListenerReader线程内部各自包含一个Selector对象, 分别用于监听SelectionKey.OP_ACCEPTSelectionKey.OP_READ事件。
      e3 O9 J/ }- c对于
    Listener线程, 主循环的实现体是监听是否有新的连接请求到达, 并采用轮询策略选择一个Reader线程处理新连接; 对于
    0 u' b1 i) M' q2 x, j
    Reader线程, 主循环的实现体是监听( 它负责的那部分) 客户端连接中是否有新的RPC请求到达, 并将新的RPC请求封装成Call
    , q; S" }& r4 ~7 R象, 放到共享队列
    callQueue中。
    8 x7 K+ Z% g8 Y  T
    2) 处理请求  {' B0 q4 V0 R$ [$ ?5 D
    该阶段主要任务是从共享队列
    callQueue中获取Call对象, 执行对应的函数调用, 并将结果返回给客户端, 这全部由Handler线! C- u! B1 S0 F4 R% u
    程完成。) A/ }- G9 y" ^3 ^' G/ R
    Server端可同时存在多个Handler线程, 它们并行从共享队列中读取Call对象, 经执行对应的函数调用后, 将尝试着直接将结果# q2 B2 w8 I( B2 H
    返回给对应的客户端。 但考虑到某些函数调用返回结果很大或者网络速度过慢, 可能难以将结果一次性发送到客户端, 此时
    6 l2 T3 m  b- K  y. L( U
    Handler将尝试着将后续发送任务交给Responder线程。
    6 W& ~$ s: @5 k* u8 u
    3) 返回结果1 K! I, k! ^' k9 {6 |  y
    前面提到, 每个
    Handler线程执行完函数调用后, 会尝试着将执行结果返回给客户端, 但对于特殊情况, 比如函数调用返回结7 l( A8 _7 I/ ]/ C2 N2 s; O( A
    果过大或者网络异常情况( 网速过慢) , 会将发送任务交给
    Responder线程。
    9 N$ C6 V- T9 q/ i9 M) A
    Server端仅存在一个Responder线程, 它的内部包含一个Selector对象, 用于监听SelectionKey.OP_WRITE事件。 当Handler没能将
    8 A. }0 k  Q$ [# g结果一次性发送到客户端时, 会向该
    Selector对象注册SelectionKey.OP_WRITE事件, 进而由Responder线程采用异步方式继续发送4 O4 V0 u6 {- u2 x  d
    未发送完成的结果。
    ( |2 G+ R9 L- `# _% q* F+ p
    3.3.6 Hadoop RPC参数调优4 b+ E- b0 d- {6 J. C
    Hadoop RPC对外提供了一些可配置参数, 以便于用户根据业务需求和硬件环境对其进行调优。 主要的配置参数如下。/ o+ c/ e7 `6 W* ^  }
    ❑Reader线程数目。 由参数ipc.server.read.threadpool.size配置, 默认是1, 也就是说, 默认情况下, 一个RPC Server只包含一个* _/ `5 {) ]/ Z6 d& h
    Reader线程。+ l+ a# w. p' f, X% P
    ❑每个Handler线程对应的最大Call数目。 由参数ipc.server.handler.queue.size指定, 默认是100, 也就是说, 默认情况下, 每个
    6 S5 }0 M- e% a8 m+ x
    Handler线程对应的Call队列长度为100。 比如, 如果Handler数目为10, 则整个Call队列( 即共享队列callQueue) 最大长度为:
    8 W$ u* w* Y2 w. c
    100×10=1000
    & v- [3 ~  x3 J; H
    ❑Handler线程数目。 Hadoop中, ResourceManagerNameNode分别是YARNHDFS两个子系统中的RPC Server, 其对应的% Q& J- M/ m0 Z4 a  K
    Handler数目分别由参数yarn.resourcemanager.resource-tracker.client.thread-countdfs.namenode.service.handler.count指定, 默认值分别为5 h$ ]# @- g8 m2 y8 y- J
    5010, 当集群规模较大时, 这两个参数值会大大影响系统性能。2 ^' C  O8 V* T. c- Q3 j
    ❑客户端最大重试次数。 在分布式环境下, 因网络故障或者其他原因迫使客户端重试连接是很常见的, 但尝试次数过多可
    / t9 W6 K$ B& B. f& S; u: L4 N能不利于对实时性要求较高的应用。 客户端最大重试次数由参数
    ipc.client.connect.max.retries指定, 默认值为10, 也就是会连续尝试
      \0 Q" F/ d( q1 w1 l6 X/ O1 A8 l
    10次( 每两次之间相隔1秒) 。/ B( s: P2 q0 c7 `" g' {: c2 p; ~
    3.3.7 YARN RPC实现
    & O0 R- [) C: U" Z; O' ^当前存在非常多的开源RPC框架, 比较有名 的有Thrift [9] Protocol BuffersAvro。 同Hadoop RPC一样, 它们均由两部分组
    4 {" Z5 z+ @4 r( z5 B% i  [4 q% D成: 对象序列化和远程过程调用(
    Protocol Buflers官方仅提供了序列化实现, 未提供远程调用相关实现, 但三 方RPC库非常多 [10]5 O4 q/ }- X" c* j0 ], l$ D$ l
    ) 。 相比于Hadoop RPC, 它们有以下几个特点:
    $ P- @; b* Q& a
    ❑跨语言特性 。 前面提到, RPC框架实际上是客户机服务器模型的一个应用实例, 对于Hadoop RPC而言, 由于Hadoop采用, H" e0 {' I  ?0 J3 }( b  E
    Java语言编写, 因而其RPC客户端和服务器端仅支持Java语言; 但对于更通用的RPC框架, 如Thrift或者Protocol Buffers等, 其客户
    3 Q$ j% I) `$ U! w! J端和服务器端可采用任何语言编写, 如
    JavaC++Python等, 这给用户编程带来极大方便。
    ' C& i: O* X/ Z5 M9 z1 C4 l  k4 Q
    ❑引入IDL 。 开源RPC框架均提供了一套接口描述语言( Interface Description LanguageIDL) , 它提供一套通用的数据类型,
    1 p# v, R% U2 ~( h并以这些数据类型来定义更为复杂的数据类型和对外服务接口。 一旦用户按照
    IDL定义的语法编写完接口文件后, 可根据实际应
    " Q2 E+ f# _1 R  K用需要生成特定编程语言( 如
    JavaC++Python等) 的客户端和服务器端代码。
    % U) A( O. l* f
    ❑协议兼容性 。 开源RPC框架在设计上均考虑到了协议兼容性问题, 即当协议格式发生改变时, 比如某个类需要添加或者
    . u2 Z+ Q, l% e5 t% x5 V/ H* l删除一个成员变量( 字段) 后, 旧版本代码仍然能识别新格式的数据, 也就是说, 具有向后兼容性。
    * h( @6 t/ `! }  @8 U: c% ^* G. h随着
    Hadoop版本的不断演化, 研发人员发现Hadoop RPC在跨语言支持和协议兼容性两个方面存在不足, 具体表现为:8 u! N! k9 E) f; b  k( D1 P% G
    ❑从长远发展看, Hadoop RPC应允许某些协议的客户端或者服务器端采用其他语言实现, 比如用户希望直接使用C/C++语言
    2 a4 ~7 |* z4 c# @8 q: `读写
    HDFS中的文件, 这就需要有C/C++语言的HDFS客户端。
    3 ^* ~0 x! \- J
    ❑当前Hadoop版本较多, 而不同版本之间不能通信, 比如0.20.2版本的JobTracker不能与0.21.0版本中的TaskTracker通信, 如
    * V5 X6 n8 G4 }$ \- w" x5 B果用户企图这样做, 会抛出
    VersionMismatch异常。
    & E& x/ m" b& i/ I/ X! i为了解决以上几个问题,
    Hadoop YARNRPC中的序列化部分剥离开, 以便将现有的开源RPC框架集成进来。 经过改进之, ^. |: x$ C6 S5 V: D( n
    后,
    Hadoop RPC的类关系如图3-10所示, RPC类变成了一个工厂, 它将具体的RPC实现授权给RpcEngine实现类, 而现有的开源
    . k; W0 E7 P+ Y6 i
    RPC只要实现RpcEngine接口, 便可以集成到Hadoop RPC中。 在该图中, WritableRpcEngine是采用Hadoop自带的序列化框架实现的0 j, ]4 N3 g# o% y' ^9 ?
    RPC, 而 AvroRpcEngine [11] ProtobufRpcEngine [12] 分别是开源RPC( 或序列化) 框架Apache AvroProtocol Buffers对应的. S" r$ X: V* w2 F; t# }: K
    RpcEngine实现, 用户可通过配置参数rpc.engine.{protocol}以指定协议{protocol}采用的序列化方式。 需要注意的是, 当前实现( G% U7 U; e/ h. s$ [+ B
    中,
    Hadoop RPC只是采用了这些开源框架的序列化机制, 底层的函数调用机制仍采用Hadoop自带的。  M$ Y0 z4 N+ @4 E5 w: i; W& E
    YARN提供的对外类是YarnRPC, 用户只需使用该类便可以构建一个基于Hadoop RPC且采用Protocol Buffers序列化框架的通信/ S& W9 W" K6 Q2 v
    协议。
    YarnRPC相关实现类如图3-11所示。 YarnRPC是一个抽象类, 实际的实现由参数yarn.ipc.rpc.class指定, 默认值是, h6 k) h# c4 ^' R0 O1 a+ \: c& F
    org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPCHadoopYarnProtoRPC通过RPC工厂生成器( 工厂设计模式) RpcFactoryProvider8 @/ f+ k& }0 l6 w' S2 ~
    成客户端工厂( 由参数
    yarn.ipc.client.factory.class指定, 默认值是org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl) 和服
    0 X: z9 q6 a' B0 I务器工厂( 由参数
    yarn.ipc.server.factory.class指定, 默认值是org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl) , 以根1 v. V: j& F( J; k
    据通信协议的
    Protocol Buffers定义生成客户端对象和服务器对象。
      `  x. h7 O) R) w3-10 Hadoop RPC 集成多种开源RPC 框架  K1 O  C: Y$ Y, ]# @
    3-11 YarnRPC 相关类图. S8 \; ]6 M) r9 x* V2 b/ g! F& w, P
    ❑RpcClientFactoryPBImpl根据通信协议接口( 实际上就是一个Java interface) 及Protocol Buffers定义构造RPC客户端句柄, 但
    ' M5 B: S8 N3 w0 u2 t+ R" |它对通信协议的存放位置和类名命有一定要求。 假设通信协议接口
    Xxx所在Java包名为XxxPackage, 则客户端实现代码必须位于% }; ?) W7 k$ {& t
    JavaXxxPackage.impl.pb.client中( 在接口包名后面增加".impl.pb.client") , 且实现类名为PBClientImplXxx( 在接口名前面增加前$ w; s. V& G5 `5 g4 d) ?* W/ j
    "PBClientImpl") 。
    : U& E9 R4 @6 U; u& O! l1 x0 r9 l
    ❑RpcServerFactoryPBImpl根据通信协议接口( 实际上就是一个Java interface) 及Protocol Buffers定义构造RPC服务器句柄* z0 j" v1 F* q" S
    (具体会调用前面节介绍的RPC.Server类) , 但它对通信协议的存放位置和类命名有一定要求。 假设通信协议接口Xxx所在Java% `5 c6 C3 o9 R  H" J) h
    名为
    XxxPackage, 则客户端实现代码必须位于JavaXxxPackage.impl.pb.server中(在接口包名后面增加".impl.pb.server") , 且实现
    8 v, U- p3 y  ]' C类名为
    PBServiceImplXxx(在接口名前面增加前缀"PBServiceImpl") 。) L1 S: M3 Y" a
    Hadoop YARN已将Protocol Buffers作为默认 的序列化机制 [13] (而不是Hadoop自带的Writable) , 这带来的好处主要表现在以
    9 A. b$ |% k3 H下几个方面:
    % G' e' P' q6 B
    ❑继承了Protocol Buffers的优势 Protocol Buffers已在实践中证明了其高效性、 可扩展性、 紧凑性和跨语言特性。 首先, 它允+ D. y6 y# N9 m, [5 H# \# X
    许在保持向后兼容性的前提下修改协议, 比如为某个定义好的数据格式添加一个新的字段; 其次, 它支持多种语言, 进而方便用/ R' [( s; m/ q* J0 @2 |' f2 Q
    户为某些服务(比如
    HDFSNameNode) 编写 非Java客户端 [14] ; 此外, 实验表明Protocol BuffersHadoop 自带的Writable在性能" n% p+ w" L% o
    方面有很大提升。( @' g# L$ |! E
    ❑支持升级回滚 Hadoop 2.0已经将NameNode HA方案合并进来, 在该方案中, Name-Node分为ActiveStandby两种角色,
    5 `& u: C! K" s6 ^  D其中,
    Active NameNode在当前对外提供服务, 而Standby NameNode则是能够在Active NameNode出现故障时接替它。 采用Protocol* P- U  _* J; O2 d! q; L
    Buffers
    序列化机制后, 管理员能够在不停止NameNode对外服务的前提下, 通过主备NameNode之间的切换, 依次对主备& X" h/ z( c0 F/ H% H, j
    NameNode进行在线升级(不用考虑版本和协议兼容性等问题) 。3 f+ O. R7 @, ^. I7 [
    3.3.8 YARN RPC应用实例0 I' z8 v6 H' O  ~3 B2 L
    为了进一步说明YARN RPC的使用方法, 本小节给出一个具体的应用实例。( b; q' o8 [. g
    YARN中, ResourceManagerNodeManager之间的通信协议是ResourceTracker, 其中NodeManager是该协议的客户
    1 F! g0 R7 `" I端,
    ResourceManager是服务端, NodeManager通过该协议中定义的两个RPC函数( registerNodeManagernodeHeartbeat) 向, `% \1 x# h4 H, b; W
    ResourceManager注册和周期性发送心跳信息。 ResourceManager(服务器端) 中的相关代码如下:0 f) B: a# w1 ~6 o7 h
    // ResourceTrackerService实现了ResourceTracker通信接口, 并启动RPC Server
    2 b2 M) ?' N$ ]; ^; y& h+ `; Jpublic class ResourceTrackerService extends AbstractService implements
    7 Z; E6 I% B7 Z* T5 cResourceTracker {
    + f% k( J. q9 y" zprivate Server server;! S$ E0 y6 h' ]" W  n  w
    ...
    8 K, E1 {2 j0 O/ l& l+ e/ `# Lprotected void serviceStart() throws Exception {8 \. j, ^2 Y) h' u; a
    super.serviceStart();: ?$ d% @* r( H2 Z( n
    Configuration conf = getConfig();
    0 \2 U% C6 _: t( G, I0 ?YarnRPC rpc = YarnRPC.create(conf); //使用YarnRPC
    ) d6 V3 w4 x( p$ D0 ]" D
    this.server = rpc.getServer(ResourceTracker.class, this, resourceTrackerAddress,
    ) a" q3 }& N" [# s, sconf, null, conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT,
    5 o* f8 d, ]1 J+ K/ MYarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT));
    : j/ P% Q) Y2 g% }0 t' @this.server.start();
    " k- N5 }* E- i- ]) `, E1 t}.
    5 l- m- Y( X0 x. Q0 \..) F8 ?& f4 |, e# \
    @Override% q' x' m* [8 B8 Q" p
    public RegisterNodeManagerResponse registerNodeManager(
    4 T) _6 R9 {2 [: IRegisterNodeManagerRequest request) throws YarnException,! P' C5 y) u, r' n9 H  f
    IOException {
    ( @: W, G5 s$ @2 g//具体实现; ^! e- [3 i' N9 e
    }@; C- X9 Q, V' I
    Override) U$ }; x  z; S9 ?% h
    public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
    8 k4 C8 s3 ]  Kthrows YarnException, IOException {
    . |  G; M: T* {# }; |* j. Y9 E//具体实现
    6 [. j: ~8 A' o) r: a; C( h
    }
    9 f. q; p( F8 i3 Z7 _8 Q+ v}0 q; B6 }$ Z4 d" s' D
    NodeManager(客户端) 中的相关代码如下。
    0 a* |/ e$ l4 v3 e5 p) [* e1 h5 H
    // 该函数是从YARN源代码中简单修改而来的$ \; l* C3 o) e" |0 S9 ?9 q( n
    protected ResourceTracker getRMClient() throws IOException {0 {1 v/ L9 N1 k' J( B
    Configuration conf = getConfig();
    : b$ E0 b- u3 r8 T/ N$ Q$ C" CInetSocketAddress rmAddress = getRMAddress(conf, protocol);
    7 @- _5 I& v9 d: s0 |. XRetryPolicy retryPolicy = createRetryPolicy(conf);
    ' }( ~& ?! B+ xResourceTracker proxy = RMProxy.<T>getProxy(conf, ResourceTracker.class, rmAddress);# s! U6 ~/ q, M3 I/ ]: n
    LOG.info("Connecting to ResourceManager at " + rmAddress);' N& _( B/ N8 c) j* c, Y- U4 A& Y
    return (ResourceTracker) RetryProxy.create(protocol, proxy, retryPolicy);
    . X3 x9 X1 T* a* k8 W& u}.& u* \8 B+ u, A' ?0 W
    ..
    8 t/ ~+ e% U9 p; u0 Mthis.resourceTracker = getRMClient();2 s" z; R' x) v9 i* u
    ...
    " P9 `8 ]* [: pRegisterNodeManagerResponse regNMResponse = resourceTracker.registerNodeManager(request);
    1 b, z% a( Y2 _8 R7 i/ q3 u- x# V...
    5 K; w8 W7 j9 O1 D" [2 U& jresponse = resourceTracker.nodeHeartbeat(request);) U4 H* p" s: s+ ~0 a
    为了能够让以上代码正常工作, YARN按照以下流程实现各种功能。
    9 ]: \* L; t4 N  ~# s# g3 `- P4 s6 {步骤1 定义通信协议接口( Java Interface) 。 定义通信协议接口ResourceTracker, 它包含registerNodeManagernodeHeartbeat
    8 K* j6 T2 \! B- K/ z
    两个函数, 且每个函数包含一个参数和一个返回值, 具体如下:
    % r0 B% a8 N. {% e1 ^9 I8 Z
    public interface ResourceTracker {
    2 v) ?0 s0 C1 Fpublic RegisterNodeManagerResponse registerNodeManager(/ t* y( Z' F" t8 S9 J+ f+ c( g8 H" s- z
    RegisterNodeManagerRequest request) throws YarnException, IOException;
    9 F* F. o8 o* c# [) epublic NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
    " ]; d" R; z  p; T  n" X: fthrows YarnException, IOException;- R# C2 I2 J8 [: h% u7 W3 K
    }6 p% |- d  ]& i7 O- v( P
    步骤2 为通信协议ResourceTracker提供Protocol Buffers定义和Java实现。 前面提到, Protocol Buffers仅提供了序列化框架, 但: ]" `9 A- w+ [* Z+ r5 J2 D4 _
    未提供
    RPC实现, 因此RPC部分需要由用户自己实现, 而YARN则让ResourceTrackerService类实现了ResourceTracker协议, 它的
    5 j6 |% q  T% u/ ~( L* u- O
    Protocol Buffers定义( 具体见文件ResourceTracker.proto) 如下:
    * v: p# a0 I5 Z1 S& Q& }
    option java_package = "org.apache.hadoop.yarn.proto";
    . v7 B$ P* L3 p. |$ J7 O8 Zoption java_outer_classname = "ResourceTracker";
    % d" S0 e5 r! Y& T7 soption java_generic_services = true;. g# @6 r+ I' ]( x. O
    option java_generate_equals_and_hash = true;
    + `, h/ D8 C& W- R  `1 [; d0 }* Gimport "yarn_server_common_service_protos.proto";
    " r* Y" M: H9 I  qservice ResourceTrackerService {* N: E; X5 _; C0 U' J. ~+ `& S& ~. z
    rpc registerNodeManager(RegisterNodeManagerRequestProto) returns (RegisterNodeManagerResponseProto);
    8 ~- [- U9 `2 G( vrpc nodeHeartbeat(NodeHeartbeatRequestProto) returns (NodeHeartbeatResponseProto);
    " {. k$ G6 M4 `( |}
    8 }- t+ X5 l+ `, u% c& VResourceTrackerRPC函数实现是由ResourceManager中的ResourceTrackerService完成的。
    4 }- ]$ V) t: H3 [4 {步骤
    3 RPC函数的参数和返回值提供Protocol Buffers定义。 YARN需要保证每个RPC函数的参数和返回值是采用Protocol
    0 d: w7 S. M6 ^# L9 p+ uBuffers
    定义的, 因此ResourceTracker协议中RegisterNodeManagerRequestRegisterNodeManagerResponseNodeHeartbeatRequest
    ' N$ l. B/ _% c) `
    NodeHeartbeatResponse四个参数或者返回值需要使用Protocol Buffers定义, 具体如下( 见yarn_server_common_service_protos.proto
    5 {. G! n7 ?4 s" J件) :- e4 `! ?5 g: q
    import "yarn_protos.proto";
    6 l$ t) I7 W4 q- bimport "yarn_server_common_protos.proto";3 s1 n, e' p; j4 M5 I* ]. l
    message RegisterNodeManagerRequestProto {8 Q. f" v& B% e8 F1 k! G1 Y$ Y
    optional NodeIdProto node_id = 1;$ ~$ Y) G0 _' I4 l  V
    optional int32 http_port = 3;( i/ T) O5 t0 x8 W+ D
    optional ResourceProto resource = 4;& a" e. ~# E" E8 }
    } m+ B. R. Z: s& Q& C1 h. q1 g* m1 y2 H
    essage RegisterNodeManagerResponseProto {
    . w! L* ]- K( |! j" t8 roptional MasterKeyProto container_token_master_key = 1;% O) C& w% C" s. |
    optional MasterKeyProto nm_token_master_key = 2;
    ' N! |; d' u8 zoptional NodeActionProto nodeAction = 3;
    9 Z4 [2 w. G' F7 o! Doptional int64 rm_identifier = 4;
    ; J: R( Y0 f% c- J$ w, R  J5 doptional string diagnostics_message = 5;
    0 q1 a) }( q6 B}.* _- o1 N$ X6 c4 _. y3 K
    .. //其他几个参数和返回值的定义5 t  k, ?: V9 e/ ~5 X4 ^  d
    步骤4 RPC函数的参数和返回值提供Java定义和封装。 YARN采用了Protocol Buffers作为参数和返回值的序列化框架, 且以8 d+ u9 J0 u! P6 y
    原生态
    .proto文件的方式给出了定义, 而具体的Java代码生成需在代码编写之后完成。 基于以上考虑, 为了更容易使用Protocol7 z- F* s$ @; O  g
    Buffers
    生成的( Java语言) 参数和返回值定义, YARN RPC为每个RPC函数的参数和返回值提供Java定义和封装, 以参数
    : B3 Z- `- @* \
    RegisterNodeManagerRequest为例进行说明。
    " J. `5 h+ b3 K* P3 t; \% _! c/ B
    Java接口定义如下( 见Javaorg.apache.hadoop.yarn.server.api.protocolrecords) :
    ( W7 u0 y1 z/ }  e" ]7 ~
    public interface RegisterNodeManagerRequest {! I; {9 K1 D) e6 m& ?. X. q
    NodeId getNodeId();
    0 e6 C. I6 D, t& b% q) z( _) oint getHttpPort();
    : W6 |/ x" Q$ F" U( ?Resource getResource();
    , i! z& F/ [1 i% }9 g- Qvoid setNodeId(NodeId nodeId);
    + i4 z4 H2 C/ u9 C# Fvoid setHttpPort(int port);% Q- B6 ]3 G" R, G& T4 P# z' X1 d) f- R
    void setResource(Resource resource);
    ; C# Q% Y8 P* T8 a: [}- A1 x3 B- R6 K  B- }
    Java封装如下( 见Javaorg.apache.hadoop.yarn.server.api.protocolrecords.impl.pb) :* }/ j7 _4 T/ ?, a$ v
    public class RegisterNodeManagerRequestPBImpl extends
    / _$ d% Q3 F0 p# \0 E7 ^/ o9 n: [ProtoBase<RegisterNodeManagerRequestProto> implements RegisterNodeManagerRequest {/ V1 d1 N% ?/ A% T1 d. @
    RegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance();  B/ R1 _; {; |% `3 M
    RegisterNodeManagerRequestProto.Builder builder = null;9 n1 Q& v0 \8 E, o
    private NodeId nodeId = null;6 l; ?( x3 G' f- W& c$ k7 h
    ...* g  g" o: f7 K* K" i; M# L8 U9 y
    @Override: ^. K; n' `+ @8 Q4 s5 }0 l7 R
    public NodeId getNodeId() {
    # X2 e, H, g5 [% T, V& f; CRegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;  ?: b, R  q! v! y, f1 T
    if (this.nodeId != null) {* o' T: r, q* h, E- N. T6 p
    return this.nodeId;
    . ?$ q8 A! s. k/ a. [: ]}i# o. k& _9 a, p) P
    f (!p.hasNodeId()) {4 u7 q9 o% F  S9 S& {  x
    return null;
    ) e8 ]* q, C6 L7 i$ d} t
    4 S( k  k+ R. ]/ Khis.nodeId = convertFromProtoFormat(p.getNodeId());1 Y2 Q9 t8 l) M8 e& P; h
    return this.nodeId;/ E3 R6 A: Y7 y) l: M& {4 h& x# K
    } @
    - X6 S/ G4 m: D9 F3 i# jOverride0 K5 z" Z' ]7 B$ B7 Z
    public void setNodeId(NodeId nodeId) {- S+ f. W/ C; d# p% c
    maybeInitBuilder();
    % ]5 a' Q, J9 |& ^+ h2 [* Fif (nodeId == null)2 f! [! X9 \  u. E" w" J) e
    builder.clearNodeId();# a: O: J# g3 U* {
    this.nodeId = nodeId;8 ^; q3 z. a; {! A- h" \$ q& T/ g1 Y
    } .. Z9 S7 J) r5 o- b8 u
    ..
    4 v( P6 j" I3 p' w: Z3 X0 N7 a6 E9 v/ B}" `- f# @. Y& P" G0 ?4 c
    步骤5 为通信协议提供客户端和服务器端实现。 客户端代码放在org.apache.hadoop.yarn.server.api.impl.pb.client包中, 且类名为, x  s, w# I: {' p: D
    ResourceTrackerPBClientImpl, 实现如下:% M" B1 s! M3 N2 w* }
    public class ResourceTrackerPBClientImpl implements ResourceTracker, Closeable {! K% t% k1 d$ s, n
    private ResourceTrackerPB proxy;
    % L* P* J0 `& |) b( h! p7 Vpublic ResourceTrackerPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException {+ g/ X  R3 A; |6 t
    RPC.setProtocolEngine(conf, ResourceTrackerPB.class, ProtobufRpcEngine.class);& p- H) V' d/ i7 G6 M% X
    proxy = (ResourceTrackerPB)RPC.getProxy(* E* p( w+ X6 [5 N* u
    ResourceTrackerPB.class, clientVersion, addr, conf);
    2 r. s. x2 i/ S. ^+ B& I0 ^} @% W6 ^' ]5 s  F& Z8 _5 ^" K
    Override
    0 f# {, O  K3 Tpublic RegisterNodeManagerResponse registerNodeManager(
    + U; U. l) i# b) \1 M: \) l! SRegisterNodeManagerRequest request) throws YarnException,
    / f/ ?; ?9 w& t# BIOException {
    2 `; O" R/ t0 ~/ TRegisterNodeManagerRequestProto requestProto = ((RegisterNodeManagerRequestPBImpl)request).getProto();
    5 [, o9 m0 U/ z" _3 a" Htry {
    2 i3 A0 }' C/ B6 V: nreturn new RegisterNodeManagerResponsePBImpl(proxy.registerNodeManager (null, requestProto));
    - ~4 A( @4 J' ]; U+ o} catch (ServiceException e) {
      z; U3 {* w5 u6 n: CRPCUtil.unwrapAndThrowException(e);
    / z3 ?" ~, ^7 I5 u& rreturn null;
    + C8 N' v3 |' M- }" R}
    " r) ~0 X5 G+ D} .- s: T6 B, e  B. u; R8 |5 X, C
    ..0 M5 B( C2 [& d) L7 E
    }1 r  b# s8 E9 A( T$ S! D; e6 }
    服务端代码放在org.apache.hadoop.yarn.server.api.impl.pb.server包中, 且类名为Resource-TrackerPBServerImpl, 实现如下:, `& U# p' M# u. i, C+ z
    public class ResourceTrackerPBServiceImpl implements ResourceTrackerPB {/ x# C% H% G8 l
    private ResourceTracker real;5 |- ~, u9 M" t3 C& `
    public ResourceTrackerPBServiceImpl(ResourceTracker impl) {
    $ k" i1 p9 t3 z, Z0 i2 _this.real = impl;
    : N  b& \+ s( ~} @
    ; T& M' c/ k# KOverride$ h0 a! S, ~& \: t( W
    public RegisterNodeManagerResponseProto registerNodeManager(
    ; A4 m) r6 c% ?. h0 E. sRpcController controller, RegisterNodeManagerRequestProto proto), Q+ `! e  Z5 _: F7 ]
    throws ServiceException {$ e* H. L) ~) p
    RegisterNodeManagerRequestPBImpl request = new RegisterNodeManagerRequestPBImpl(proto);
    % O( i6 l6 D1 r6 C2 Utry {
    3 y, K" o# w/ h$ o. gRegisterNodeManagerResponse response = real.registerNodeManager(request);
    # P( b7 x5 k' m& e8 h1 C9 Q. j; F# \return ((RegisterNodeManagerResponsePBImpl)response).getProto();, n1 P4 F' U: z- T9 L2 u
    } catch (YarnException e) {8 K6 M% x/ W6 K! @' h) j
    throw new ServiceException(e);
    ; x" U5 y" w1 R5 ]- o3 h  Y7 B} catch (IOException e) {9 ]) v" a" Z! ?* ^* G  O: C; [
    throw new ServiceException(e);
    ' L8 A# g& `% g4 ^! P}
    # [1 u: q; D4 ]. N3 c& b} .2 R, d  G" c' v
    ..
    & a# ^' l3 y, u3 g& F" ?" z3 N}
    6 ~) g' c/ j! T3 u9 J1 b; R总结上面几个步骤, 为了实现基于Protocol Buffers序列化框架的YARN RPC通信协议ResourceTrackerYARN实现了一系列5 R. D  K& P. C; j- K
    Java接口定义和Protocol Buffers封装, 具体如图3-12所示( 以服务器端实现为例) 。* [9 c* \0 d( Y7 Z3 P! [0 b& u6 H& Z
    3-12 YARN RPC中的Protocol Buffers封装
      K' W" S0 T( e. W
    [6] 参见网址http://en.wikipedia.org/wiki/Remote_procedure_call
    ; S" P( y' v5 y) N
    [7] Doug CuttingHadoop最初设计时就是这样描述Hadoop RPC设计动机的。% g1 c( s! A) y6 P: E* d5 Q
    [8] HDFS的单点故障已经在Hadoop 2.0中得到了解决, MRv1中的JobTracker的单点故障在CDH4中也得到了解决。
    " t: E0 q7 ?; x* C& [0 c7 K, j
    [9] 参见网址http://thrift.apache.org/  t! H* }% h( Y' W5 W
    [10] 参见网址: http://code.google.com/p/protobuf/wiki/ThirdParty/AddOns
    % L+ j, |; p! B+ [' N
    [11] AvroRpcEngineHadoop 0.21.0版本开始出现。
    , y# O; t9 p9 b
    [12] ProtobufRpcEngineHadoop 2.0-apha版本开始出现。" _3 z! x9 C4 w9 A- E- X; z
    [13] 参见网址https://issues.apache.org/jira/browse/HADOOP-7347; Q# `2 I- w: T( W9 Y5 G+ |4 P; [
    [14] Hadoop 2.0中的RFC框架是采用Java编写的, 尚不能像ThriftAvro那样支持多语言编程, 但引入Protocol Buffers序列化框架则4 ~% O( N: q% t- P, Q- u
    使其向前迈进了一步。
      4 `$ E) I3 |/ F: l% i- @

      D. }; @& f* d' x* C) j3 ]) x; a$ z% o# H6 w* K3 q
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|Archiver|手机版|小黑屋|Java自学网

    GMT+8, 2024-11-21 18:00 , Processed in 0.318296 second(s), 31 queries .

    Powered by Javazx

    Copyright © 2012-2022, Javazx Cloud.

    快速回复 返回顶部 返回列表