java自学网VIP

Java自学网

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 3156|回复: 0

《深入解析YARN架构设计与实现原理》第3章 YARN基础库【3.3】part2

[复制链接]
  • TA的每日心情
    开心
    2021-5-25 00:00
  • 签到天数: 1917 天

    [LV.Master]出神入化

    2100

    主题

    3758

    帖子

    6万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    66834

    宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2017-4-17 13:56:45 | 显示全部楼层 |阅读模式
    3.3.5 Hadoop RPC类详解+ F/ K& S: \1 z1 j
    Hadoop RPC主要由三个大类组成, 即RPCClientServer, 分别对应对外编程接口、 客户端实现和服务器实现。
    8 D7 w* D& t4 y% P
    1.ipc.RPC类分析
    / ?4 F' @1 ~0 `3 S1 g) d- j
    RPC类实际上是对底层客户机服务器网络模型的封装, 以便为程序员提供一套更方便简洁的编程接口。0 G. V' H- H3 X- [
    如图
    3-4所示, RPC类定义了一系列构建和销毁RPC客户端的方法, 构建方法分为getProxywaitForProxy两类, 销毁方只有一
    ; a+ h+ [+ k# |$ E: _! s+ D( o个, 即为
    stopProxyRPC服务器的构建则由静态内部类RPC.Builder, 该类提供了一些列setXxx方法( Xxx为某个参数名称) 供用户
    3 Q" r( t+ m4 O" R& X; h, V设置一些基本的参数, 比如
    RPC协议、 RPC协议实现对象、 服务器绑定地址、 端口号等, 一旦设置完成这些参数后, 可通过调用9 W: \1 R7 `7 O$ J2 H7 h1 J+ X8 y' }/ F
    RPC.Builder.build()完成一个服务器对象的构建, 之后直接调用Server.start()方法便可以启动该服务器。2 G0 \/ m4 }5 G% L9 o
    Hadoop 1.x中的RPC仅支持基于Writable序列化方式不同, Hadoop 2.x允许用户使用其他序列化框架, 比如Protocol Buffers
    7 }5 B. I9 Y" y% E
    等, 目前提供了WritableWritableRpcEngine) 和Protocol BuffersProtobufRpcEngine) 两种, 默认实现是Writable方式, 用户可通过  D  ?3 \8 s3 K* t- X; G
    调用
    RPC.setProtocolEngine(…)修改采用的序列化方式。( t% D) h! Y8 v* E- C8 ~- ~) a
    下面以采用
    Writable序列化为例( 采用Protocol Buffers的过程类似) , 介绍Hadoop RPC的远程过程调用流程。 Hadoop RPC使用/ Y6 ~. n# Y3 z
    Java动态代理完成对远程方法的调用: 用户只需实现java.lang.reflect.InvocationHandler接口, 并按照自己需求实现invoke 方法即可  w0 r1 R. |# Z9 g- o9 s5 c" W! ?! i
    完成动态代理类对象上的方法调用。 但对于
    Hadoop RPC, 函数调用由客户端发出, 并在服务器端执行并返回, 因此不能像单机
    ; `: L) {0 b. ]' U程序那样直接在
    invoke 方法中本地调用相关函数, 它的做法是, 在invoke方法中, 将函数调用信息( 函数名, 函数参数列表等): ]* q9 X1 t1 W" m! W. p
    打包成可序列化的
    WritableRpcEngine.Invocation对象, 并通过网络发送给服务器端, 服务端收到该调用信息后, 解析出和函数名,* O# e7 \% ^9 _, m# ?" n, O+ [
    函数参数列表等信息, 利用
    Java反射机制完成函数调用, 期间涉及到的类关系如下图所示。! G& X6 F7 o: p& M* A/ F# C, e
    3-4 HadoopRPC的主要类关系图5 c5 n9 T7 Y: I/ P5 p
    3-5 HadoopRPC中服务器端动态代理实现类图
    / L7 r! P' L0 e6 y+ w3 q
    2.ipc.Client' d3 |, E1 |) B
    Client主要完成的功能是发送远程过程调用信息并接收执行结果。 它涉及到的类关系如图3-6所示。 Client类对外提供了一类执
    ; _4 ^% ]* C5 z  D% H) V行远程调用的接口, 这些接口的名称一样, 仅仅是参数列表不同, 比如其中一个的声明如下所示:
    / A, D' y$ I) F& B
    public Writable call(Writable param, ConnectionIdremoteId)
    , x8 {+ J" q8 R2 G, a4 b2 Rthrows InterruptedException, IOException;
    & c1 Z& e* L8 f. L! o9 v% b3-6 Client类图
    % [2 n/ V  R8 m, J% D. L6 Q; o. X
    Client内部有两个重要的内部类, 分别是CallConnection
    $ w% j& {: j. F+ h
    ❑Call: 封装了一个RPC请求, 它包含5个成员变量, 分别是唯一标识id、 函数调用信息param、 函数执行返回值value、 出2 X1 _, i  F9 i6 z1 v6 |
    错或者异常信息
    error和执行完成标识符done。 由于Hadoop RPC Server采用异步方式处理客户端请求, 这使远程过程调用的发生顺% {$ `( N. t! v! @0 n0 X" M( w( s
    序与结果返回顺序无直接关系, 而
    Client端正是通过id识别不同的函数调用的。 当客户端向服务器端发送请求时, 只需填充id
    2 |8 L. O! N2 Y1 z  e# b0 h
    param两个变量, 而剩下的3个变量( valueerrordone) 则由服务器端根据函数执行情况填充。6 P6 J9 S% |0 E; }# t7 T% C
    ❑ConnectionClient与每个Server之间维护一个通信连接, 与该连接相关的基本信息及操作被封装到Connection类中, 基本% ^4 Z" y9 ^" }- h' a* S% t9 M0 }
    信息主要包括通信连接唯一标识(
    remoteId) 、 与Server端通信的Socketsocket) 、 网络输入数据流( in) 、 网络输出数据流
      {. u5 v# `* e5 I- U
    out) 、 保存RPC请求的哈希表( calls) 等。 操作则包括:
    & Z8 B1 Q# {3 Z8 q4 h( u
    ❍addCall—将一个Call对象添加到哈希表中;: }7 h' b- X" ?2 P+ J; q5 w. W
    ❍sendParam—向服务器端发送RPC请求;3 Q1 k$ }- O$ i
    ❍receiveResponse —从服务器端接收已经处理完成的RPC请求;
    ! {0 \( u& W7 H2 Z! b% n+ n$ o
    ❍run—Connection是一个线程类, 它的run方法调用了receiveResponse方法, 会一直等待接收RPC返回结果。1 v5 i' q7 l, [# t( h+ E3 U2 M
    当调用
    call函数执行某个远程方法时, Client端需要进行( 如图3-7所示) 以下4个步骤。  ~3 I3 c7 S3 g8 @
    1) 创建一个Connection对象, 并将远程方法调用信息封装成Call对象, 放到Connection对象中的哈希表中;
    # _0 j5 g  @2 o1 I# G) D9 @7 |
    2) 调用Connection类中的sendRpcRequest()方法将当前Call对象发送给Server端;
    % d# }, z. z: Q- r" G
    3Server端处理完RPC请求后, 将结果通过网络返回给Client端, Client端通过receiveRpcResponse()函数获取结果;
    + Q- a5 O! u( a# [2 ~3 ?% z& i; P0 U# t4Client检查结果处理状态( 成功还是失败) , 并将对应Call对象从哈希表中删除。
    , b% q% b* g# j" f+ P( c4 J
    3-7 Hadoop RPC Client处理流程
    8 n8 q( V% _/ R
    3.ipc.Server类分析& E; r$ F! v/ U0 {6 K
    Hadoop采用了Master/Slave结构, 其中Master是整个系统的单点, 如NameNodeJobTracker [8] , 这是制约系统性能和可扩展9 x% W; I# ]3 n
    性的最关键因素之一; 而
    Master通过ipc.Server接收并处理所有Slave发送的请求, 这就要求ipc.Server 将高并发和可扩展性作为设计
    4 d- M, J+ R) J3 c8 ]+ U* `$ \4 L目标。 为此,
    ipc.Server采用了很多提高并发处理能力的技术, 主要包括线程池、 事件驱动和Reactor设计模式等, 这些技术均采用4 b& C# {( B" Z( T6 W
    JDK自带的库实现, 这里重点分析它是如何利用Reactor设计模式提高整体性能的。8 r' v8 |9 F- g! v% Q0 l
    Reactor是并发编程中的一种基于事件驱动的设计模式, 它具有以下两个特点: 通过派发/分离I/O操作事件提高系统的并发性4 W( C9 [( Y+ Z4 v) u4 I. l
    能; 提供了粗粒度的并发控制, 使用单线程实现, 避免了复杂的同步处理。 典型的
    Reactor实现原理如图3-8所示。
    1 ~3 S) ?2 i" c- P0 G7 G
    3-8 Reactor模式工作原理0 H6 J0 J! d7 v" r
    典型的
    Reactor模式中主要包括以下几个角色。9 m% E6 U7 D- G9 f! G9 X1 @
    ❑ReactorI/O事件的派发者。
    # M# I- U* A: W7 [$ B- Q6 d
    ❑Acceptor接受来自Client的连接, 建立与Client对应的Handler, 并向Reactor注册此Handler, @& I) ?& e. }' O
    ❑Handler与一个Client通信的实体, 并按一定的过程实现业务的处理。 Handler内部往往会有更进一步的层次划分, 用来抽1 g) _' D) P2 N9 T
    象诸如
    readdecodecomputeencodesend等过程。 在Reactor模式中, 业务逻辑被分散的I/O事件所打破, 所以Handler需要有适
    8 M* j- J/ d$ Q9 a4 @- E, x  U+ l当的机制在所需的信息还不全( 读到一半) 的时候保存上下文, 并在下一次
    I/O事件到来的时候( 另一半可读) 能继续上次中断# {% u) m& o  k+ O7 l0 k5 s
    的处理。
    + b/ H3 E. }6 |5 D
    ❑Reader/Sender为了加速处理速度, Reactor模式往往构建一个存放数据处理线程的线程池, 这样数据读出后, 立即扔到线" F9 S0 q0 {0 f( |; \& Q' M
    程池中等待后续处理即可。 为此,
    Reactor模式一般分离Handler中的读和写两个过程, 分别注册成单独的读事件和写事件, 并由对
    - s/ N# Z" V2 ?2 y5 b# _应的
    ReaderSender线程处理。/ b9 O3 [3 g+ Q+ \( |& B' J
    ip
    ( u$ t/ n* ]) W  o+ \9 F; i; S* Rc.Server
    实际上实现了一个典型的Reactor设计模式, 其整体架构与上述完全一致。 一旦读者了解典型Reactor架构便可很容易8 r1 W+ g: [* F0 V% a4 {
    地学习
    ipc.Server的设计思路及实现。 接下来, 我们分析ipc.Server的实现细节。4 T1 F4 _; ~! P% I
    前面提到,
    ipc.Server的主要功能是接收来自客户端的RPC请求, 经过调用相应的函数获取结果后, 返回给对应的客户端。 为  V9 y3 _) }# B9 T' \: T" b5 ]
    此,
    ipc.Server被划分成3个阶段: 接收请求、 处理请求和返回结果, 如图3-9所示。 各阶段实现细节如下。
    & m4 X; b6 w( c
    3-9 Hadoop RPC Server处理流程
    ) G3 `8 ]; ~4 M3 r7 s- ]7 C
    1) 接收请求0 g9 ]" V7 S! D4 g$ b. G- d" H
    该阶段主要任务是接收来自各个客户端的
    RPC请求, 并将它们封装成固定的格式( Call类) 放到一个共享队列( callQueue3 H; ^" t3 B' P3 i0 E
    中, 以便进行后续处理。 该阶段内部又分为建立连接和接收请求两个子阶段, 分别由
    ListenerReader两种线程完成。# v! v3 Y3 B1 S1 b  v- G1 j
    整个
    Server只有一个Listener线程, 统一负责监听来自客户端的连接请求, 一旦有新的请求到达, 它会采用轮询的方式从线程4 {1 v$ o* V& A
    池中选择一个
    Reader线程进行处理, 而Reader线程可同时存在多个, 它们分别负责接收一部分客户端连接的RPC请求, 至于每个
    5 m. n8 H" l7 T# s" x" c
    Reader线程负责哪些客户端连接, 完全由Listener决定, 当前Listener只是采用了简单的轮询分配机制。
      v! K3 Y% g7 J! ~6 H4 fListenerReader线程内部各自包含一个Selector对象, 分别用于监听SelectionKey.OP_ACCEPTSelectionKey.OP_READ事件。8 I" w" Z' \, P, K7 O
    对于
    Listener线程, 主循环的实现体是监听是否有新的连接请求到达, 并采用轮询策略选择一个Reader线程处理新连接; 对于. }, J2 `" J: b; F, q7 _
    Reader线程, 主循环的实现体是监听( 它负责的那部分) 客户端连接中是否有新的RPC请求到达, 并将新的RPC请求封装成Call
    . z0 B: ?: g# N; s! |& N3 U/ K象, 放到共享队列
    callQueue中。
    % v) }3 [" r& V; ^
    2) 处理请求
    , ?( ~2 q  z+ ]; w* o该阶段主要任务是从共享队列
    callQueue中获取Call对象, 执行对应的函数调用, 并将结果返回给客户端, 这全部由Handler线
    9 x: L" g; s" z程完成。
    ( ?0 H! O$ O! \
    Server端可同时存在多个Handler线程, 它们并行从共享队列中读取Call对象, 经执行对应的函数调用后, 将尝试着直接将结果% ^1 A$ k8 J+ v/ h
    返回给对应的客户端。 但考虑到某些函数调用返回结果很大或者网络速度过慢, 可能难以将结果一次性发送到客户端, 此时# p8 q) T$ O6 K
    Handler将尝试着将后续发送任务交给Responder线程。
    ; l, Y! _% g: ~5 J
    3) 返回结果
    6 B( P. p- V3 C: t5 g+ v% i前面提到, 每个
    Handler线程执行完函数调用后, 会尝试着将执行结果返回给客户端, 但对于特殊情况, 比如函数调用返回结' Q( |, _! ?6 D9 Z
    果过大或者网络异常情况( 网速过慢) , 会将发送任务交给
    Responder线程。
    3 v1 \$ D. A4 L. S9 p
    Server端仅存在一个Responder线程, 它的内部包含一个Selector对象, 用于监听SelectionKey.OP_WRITE事件。 当Handler没能将7 `. H- \/ `  S3 C' ?4 d- P
    结果一次性发送到客户端时, 会向该
    Selector对象注册SelectionKey.OP_WRITE事件, 进而由Responder线程采用异步方式继续发送0 ]7 _* c9 O. c3 w4 t8 q1 g
    未发送完成的结果。
    # ]0 u7 G# @3 P1 _' B; E/ t% j1 z
    3.3.6 Hadoop RPC参数调优
    - a8 P+ a, ]1 p/ u! Z$ CHadoop RPC对外提供了一些可配置参数, 以便于用户根据业务需求和硬件环境对其进行调优。 主要的配置参数如下。9 j& P7 {& Y; ~8 ]4 I
    ❑Reader线程数目。 由参数ipc.server.read.threadpool.size配置, 默认是1, 也就是说, 默认情况下, 一个RPC Server只包含一个
    + T" W" Y# F2 ^/ q" j
    Reader线程。
    6 s2 _1 W; F! I' |
    ❑每个Handler线程对应的最大Call数目。 由参数ipc.server.handler.queue.size指定, 默认是100, 也就是说, 默认情况下, 每个: ?0 e" e7 f& q( Z( [
    Handler线程对应的Call队列长度为100。 比如, 如果Handler数目为10, 则整个Call队列( 即共享队列callQueue) 最大长度为:
    - a) `9 c+ n/ s: j+ u( J, f; J
    100×10=1000
    + p) ?6 U8 c/ @) ?; v' T! T1 {; w
    ❑Handler线程数目。 Hadoop中, ResourceManagerNameNode分别是YARNHDFS两个子系统中的RPC Server, 其对应的; G; L$ Y; x) A) u, l4 o
    Handler数目分别由参数yarn.resourcemanager.resource-tracker.client.thread-countdfs.namenode.service.handler.count指定, 默认值分别为
    . k# w/ U, N* a! G6 x7 i
    5010, 当集群规模较大时, 这两个参数值会大大影响系统性能。5 D- k/ z# A! O* e0 e% ?7 h
    ❑客户端最大重试次数。 在分布式环境下, 因网络故障或者其他原因迫使客户端重试连接是很常见的, 但尝试次数过多可) u  ^6 N6 M! }7 k+ z  o+ c! |8 u
    能不利于对实时性要求较高的应用。 客户端最大重试次数由参数
    ipc.client.connect.max.retries指定, 默认值为10, 也就是会连续尝试& ?# C4 B. s  s! u  I8 Q
    10次( 每两次之间相隔1秒) 。+ b) G6 [, k/ \6 N5 s
    3.3.7 YARN RPC实现
    7 ], R' X# c9 z3 P; a9 Q当前存在非常多的开源RPC框架, 比较有名 的有Thrift [9] Protocol BuffersAvro。 同Hadoop RPC一样, 它们均由两部分组& q' Y: F9 d, X) ]- b
    成: 对象序列化和远程过程调用(
    Protocol Buflers官方仅提供了序列化实现, 未提供远程调用相关实现, 但三 方RPC库非常多 [10]
    / z5 b8 \0 ~1 L' {0 H3 E- T3 \8 E) 。 相比于Hadoop RPC, 它们有以下几个特点:) P' U( i0 S( W; e8 L- G  B
    ❑跨语言特性 。 前面提到, RPC框架实际上是客户机服务器模型的一个应用实例, 对于Hadoop RPC而言, 由于Hadoop采用. [- q; o3 p! ~0 t
    Java语言编写, 因而其RPC客户端和服务器端仅支持Java语言; 但对于更通用的RPC框架, 如Thrift或者Protocol Buffers等, 其客户
    . q% n/ t" I! i2 {% }( T- ^! X端和服务器端可采用任何语言编写, 如
    JavaC++Python等, 这给用户编程带来极大方便。. p% y  s9 X, F; c. z3 z/ D6 ]' e0 T
    ❑引入IDL 。 开源RPC框架均提供了一套接口描述语言( Interface Description LanguageIDL) , 它提供一套通用的数据类型,
    ! Z1 A" Y# c, Z! b6 C4 {并以这些数据类型来定义更为复杂的数据类型和对外服务接口。 一旦用户按照
    IDL定义的语法编写完接口文件后, 可根据实际应
      }; m- A4 Y. J; f: x1 ?用需要生成特定编程语言( 如
    JavaC++Python等) 的客户端和服务器端代码。
    % E* A5 u( o" v( m
    ❑协议兼容性 。 开源RPC框架在设计上均考虑到了协议兼容性问题, 即当协议格式发生改变时, 比如某个类需要添加或者
    : b; g% D! @' I' ~删除一个成员变量( 字段) 后, 旧版本代码仍然能识别新格式的数据, 也就是说, 具有向后兼容性。
    ( x- T% d2 S. D4 }7 o随着
    Hadoop版本的不断演化, 研发人员发现Hadoop RPC在跨语言支持和协议兼容性两个方面存在不足, 具体表现为:
    ( o4 J/ X/ I7 B  G+ \* m
    ❑从长远发展看, Hadoop RPC应允许某些协议的客户端或者服务器端采用其他语言实现, 比如用户希望直接使用C/C++语言* Z3 A  t$ Q* H- n! D! v
    读写
    HDFS中的文件, 这就需要有C/C++语言的HDFS客户端。. x' O" Z" L8 j4 U
    ❑当前Hadoop版本较多, 而不同版本之间不能通信, 比如0.20.2版本的JobTracker不能与0.21.0版本中的TaskTracker通信, 如8 O  R) L+ ?& u2 X4 J: v0 f; v8 n
    果用户企图这样做, 会抛出
    VersionMismatch异常。7 k2 d; M- G# w, F
    为了解决以上几个问题,
    Hadoop YARNRPC中的序列化部分剥离开, 以便将现有的开源RPC框架集成进来。 经过改进之* A" u* E- V5 B
    后,
    Hadoop RPC的类关系如图3-10所示, RPC类变成了一个工厂, 它将具体的RPC实现授权给RpcEngine实现类, 而现有的开源6 m$ `2 m; a4 g: B
    RPC只要实现RpcEngine接口, 便可以集成到Hadoop RPC中。 在该图中, WritableRpcEngine是采用Hadoop自带的序列化框架实现的
    ' W6 A) H1 c2 o" E- r0 Q
    RPC, 而 AvroRpcEngine [11] ProtobufRpcEngine [12] 分别是开源RPC( 或序列化) 框架Apache AvroProtocol Buffers对应的5 \( U  p) p1 r! [( S
    RpcEngine实现, 用户可通过配置参数rpc.engine.{protocol}以指定协议{protocol}采用的序列化方式。 需要注意的是, 当前实现! F. e% \5 a& h( q
    中,
    Hadoop RPC只是采用了这些开源框架的序列化机制, 底层的函数调用机制仍采用Hadoop自带的。  }6 |3 ?0 j/ X  u
    YARN提供的对外类是YarnRPC, 用户只需使用该类便可以构建一个基于Hadoop RPC且采用Protocol Buffers序列化框架的通信
    " H! q$ T6 \0 g+ Z1 l协议。
    YarnRPC相关实现类如图3-11所示。 YarnRPC是一个抽象类, 实际的实现由参数yarn.ipc.rpc.class指定, 默认值是
    ) G/ Q( i+ y) \* M
    org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPCHadoopYarnProtoRPC通过RPC工厂生成器( 工厂设计模式) RpcFactoryProvider9 @/ j, V7 I" C! x% {
    成客户端工厂( 由参数
    yarn.ipc.client.factory.class指定, 默认值是org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl) 和服
    ! f4 L' p8 r, s& N务器工厂( 由参数
    yarn.ipc.server.factory.class指定, 默认值是org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl) , 以根
    ! f$ ?# W0 y6 t* {" Q* x9 s据通信协议的
    Protocol Buffers定义生成客户端对象和服务器对象。- G+ ^4 b* V9 R/ c; E7 J
    3-10 Hadoop RPC 集成多种开源RPC 框架' b* R) b  y) x0 \" Q
    3-11 YarnRPC 相关类图, @7 Q  u: W  H, m" E) {
    ❑RpcClientFactoryPBImpl根据通信协议接口( 实际上就是一个Java interface) 及Protocol Buffers定义构造RPC客户端句柄, 但
      C- J! M" O% ?9 A1 s8 b它对通信协议的存放位置和类名命有一定要求。 假设通信协议接口
    Xxx所在Java包名为XxxPackage, 则客户端实现代码必须位于6 q# d- O# h) U. ]' R
    JavaXxxPackage.impl.pb.client中( 在接口包名后面增加".impl.pb.client") , 且实现类名为PBClientImplXxx( 在接口名前面增加前
    9 G7 @) y; x. d$ c- {
    "PBClientImpl") 。
    + c) M1 ^# c2 X* h6 h6 T
    ❑RpcServerFactoryPBImpl根据通信协议接口( 实际上就是一个Java interface) 及Protocol Buffers定义构造RPC服务器句柄
    + u( \! j8 a- H6 t1 K(具体会调用前面节介绍的RPC.Server类) , 但它对通信协议的存放位置和类命名有一定要求。 假设通信协议接口Xxx所在Java' r9 k- B: O: `% P7 T) Y
    名为
    XxxPackage, 则客户端实现代码必须位于JavaXxxPackage.impl.pb.server中(在接口包名后面增加".impl.pb.server") , 且实现7 U1 P0 @9 I+ `
    类名为
    PBServiceImplXxx(在接口名前面增加前缀"PBServiceImpl") 。
    % J2 }! N2 y! ^$ H% ^8 T
    Hadoop YARN已将Protocol Buffers作为默认 的序列化机制 [13] (而不是Hadoop自带的Writable) , 这带来的好处主要表现在以
    / |  ]# B) g) n* G8 d. X下几个方面:& D, U  L' K) H3 D3 U+ Y; u
    ❑继承了Protocol Buffers的优势 Protocol Buffers已在实践中证明了其高效性、 可扩展性、 紧凑性和跨语言特性。 首先, 它允
    & `4 E7 a: W) F5 r+ r# H2 V许在保持向后兼容性的前提下修改协议, 比如为某个定义好的数据格式添加一个新的字段; 其次, 它支持多种语言, 进而方便用8 C: o; _! y- x# c" |" I
    户为某些服务(比如
    HDFSNameNode) 编写 非Java客户端 [14] ; 此外, 实验表明Protocol BuffersHadoop 自带的Writable在性能) C3 s! }( M! {/ {
    方面有很大提升。
    $ d$ M' K" E# h! @" {+ U4 u
    ❑支持升级回滚 Hadoop 2.0已经将NameNode HA方案合并进来, 在该方案中, Name-Node分为ActiveStandby两种角色,- Y% `2 m% \' C8 s7 x' n
    其中,
    Active NameNode在当前对外提供服务, 而Standby NameNode则是能够在Active NameNode出现故障时接替它。 采用Protocol
    6 z2 |8 O) F; a$ k# {4 EBuffers
    序列化机制后, 管理员能够在不停止NameNode对外服务的前提下, 通过主备NameNode之间的切换, 依次对主备4 c) a3 K) C* f  D8 S  @" Y
    NameNode进行在线升级(不用考虑版本和协议兼容性等问题) 。
    # G* N. |% q, `4 k
    3.3.8 YARN RPC应用实例
    6 ]5 |/ |& K0 p3 E: c为了进一步说明YARN RPC的使用方法, 本小节给出一个具体的应用实例。4 v( _' e3 Q5 Y- @! D
    YARN中, ResourceManagerNodeManager之间的通信协议是ResourceTracker, 其中NodeManager是该协议的客户( W  V2 Q8 |: ?# H$ d
    端,
    ResourceManager是服务端, NodeManager通过该协议中定义的两个RPC函数( registerNodeManagernodeHeartbeat) 向4 u8 Y; P: S. h; @$ x$ v/ K1 G- Z- `
    ResourceManager注册和周期性发送心跳信息。 ResourceManager(服务器端) 中的相关代码如下:  a. B6 P* {8 v: V% x6 k
    // ResourceTrackerService实现了ResourceTracker通信接口, 并启动RPC Server
    " o  `3 A8 c9 Rpublic class ResourceTrackerService extends AbstractService implements. @  u/ Q0 T6 d) G0 N+ E
    ResourceTracker {
    ! V' h! Z2 V' T" v2 mprivate Server server;
    3 q2 E" e. t4 P...
    ) |4 h9 {2 _7 T8 X0 x0 ?# cprotected void serviceStart() throws Exception {
      C0 U0 I* N. D+ n: k, X+ J# Osuper.serviceStart();7 p( X) H# ^6 C& K6 F' ~
    Configuration conf = getConfig();
    6 H* N) x1 u8 A. Y2 y- GYarnRPC rpc = YarnRPC.create(conf); //使用YarnRPC( R4 F" H" r: T: u
    this.server = rpc.getServer(ResourceTracker.class, this, resourceTrackerAddress,
    " m& X1 ?/ w4 [1 i/ O* S6 }  xconf, null, conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT,
    - U% l) _# `+ v* U* ?" E" M2 y# c0 LYarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT));2 B2 h8 H8 z+ O: ^
    this.server.start();) {) r3 @3 F4 u' `
    }.) x* w+ s7 l2 h. a+ J) H
    ..8 }: N0 ]2 x* v  @( t) N- H/ i
    @Override
    $ w. O' ^4 \# n/ d0 j  npublic RegisterNodeManagerResponse registerNodeManager(2 \  `$ [6 w& ~9 R! U5 ?
    RegisterNodeManagerRequest request) throws YarnException,! J* f7 k. ^$ U( h9 D/ u
    IOException {
    ; @% g2 _( E" c& I. K//具体实现( |2 i: Z5 G+ S- e: e
    }@
      A: [2 Q, @) e$ cOverride
    * g# p' E, n- s1 _: }! wpublic NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
    1 t- O- W6 c6 I5 \throws YarnException, IOException {1 @$ }3 Z, x' m
    //具体实现
      q8 q# V* T! T0 Z4 Q( L
    }
    # m8 [, p. w! n}
    ( T( K. m* c: _& |NodeManager(客户端) 中的相关代码如下。5 I+ Z: \! n4 V: V( ^$ L1 E2 C
    // 该函数是从YARN源代码中简单修改而来的
    2 {2 t+ [$ p. A+ H' W7 f5 c
    protected ResourceTracker getRMClient() throws IOException {
    % t- h/ `8 A9 m% P9 R* U/ sConfiguration conf = getConfig();8 {! u8 ~3 I# `
    InetSocketAddress rmAddress = getRMAddress(conf, protocol);" B9 L3 E. F8 m! K- k, [2 v
    RetryPolicy retryPolicy = createRetryPolicy(conf);
    1 o2 U3 r1 B! [ResourceTracker proxy = RMProxy.<T>getProxy(conf, ResourceTracker.class, rmAddress);
    0 M1 @& C) ^8 D0 g/ wLOG.info("Connecting to ResourceManager at " + rmAddress);
    ) B  \* U+ C) q% |- Wreturn (ResourceTracker) RetryProxy.create(protocol, proxy, retryPolicy);) P" I5 N7 f8 Y, i' z# i
    }.
    * D4 e% o7 p( @- \9 G7 U* a; R9 }' B... g- j* K( `# w8 {- n: p; i
    this.resourceTracker = getRMClient();
    " e7 O9 U! `8 _...8 J9 V( t% \. T0 U" o# R$ D! z9 G: t
    RegisterNodeManagerResponse regNMResponse = resourceTracker.registerNodeManager(request);
    8 F% A0 ?! L" n* p# r...
    3 w9 f0 U% K4 T. d; w6 o- Jresponse = resourceTracker.nodeHeartbeat(request);6 s2 z! N) k0 J; f; m& T$ u& q' a
    为了能够让以上代码正常工作, YARN按照以下流程实现各种功能。
    5 X4 a- ?3 N; z步骤1 定义通信协议接口( Java Interface) 。 定义通信协议接口ResourceTracker, 它包含registerNodeManagernodeHeartbeat$ Z1 w. e' s4 [: a( k+ q
    两个函数, 且每个函数包含一个参数和一个返回值, 具体如下:
      q6 I& v& s% H/ j$ Z' G1 f
    public interface ResourceTracker {
    % C: J7 q4 a& L4 ?public RegisterNodeManagerResponse registerNodeManager(
    & h# x! Z" ^* t: fRegisterNodeManagerRequest request) throws YarnException, IOException;2 m8 Q5 W9 c/ V1 P- R$ d
    public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
    : H. d0 |8 i5 p3 R! `- Bthrows YarnException, IOException;* I( ~/ K0 t% R, O' J
    }
    0 v4 K6 W: k  l/ r: O& z步骤2 为通信协议ResourceTracker提供Protocol Buffers定义和Java实现。 前面提到, Protocol Buffers仅提供了序列化框架, 但
    ; K. z3 N; R% g, B7 B未提供
    RPC实现, 因此RPC部分需要由用户自己实现, 而YARN则让ResourceTrackerService类实现了ResourceTracker协议, 它的
    * X+ S# j. v2 l# b( K
    Protocol Buffers定义( 具体见文件ResourceTracker.proto) 如下:  e4 l. B6 g1 m- `: @
    option java_package = "org.apache.hadoop.yarn.proto";( c+ V: u) c9 ?7 e. g1 C2 S1 ~2 D
    option java_outer_classname = "ResourceTracker";& Z$ w2 B0 C# a9 s
    option java_generic_services = true;
    : Y( C9 Y2 z+ ?: O" _. boption java_generate_equals_and_hash = true;
    & `+ D$ c, d& @2 fimport "yarn_server_common_service_protos.proto";
    % l) f# ?5 k: d) ^* |1 ?+ Kservice ResourceTrackerService {/ }  o# Y+ l& V- M
    rpc registerNodeManager(RegisterNodeManagerRequestProto) returns (RegisterNodeManagerResponseProto);8 U/ f2 d5 X% b
    rpc nodeHeartbeat(NodeHeartbeatRequestProto) returns (NodeHeartbeatResponseProto);
    / c3 P' w* g8 ?1 C  f' o}$ g/ m/ c! |" N8 j6 Z
    ResourceTrackerRPC函数实现是由ResourceManager中的ResourceTrackerService完成的。: d0 \" r) h7 ?: j# @* w9 g* l
    步骤
    3 RPC函数的参数和返回值提供Protocol Buffers定义。 YARN需要保证每个RPC函数的参数和返回值是采用Protocol
    4 i- d2 i8 X' y/ N5 n; eBuffers
    定义的, 因此ResourceTracker协议中RegisterNodeManagerRequestRegisterNodeManagerResponseNodeHeartbeatRequest
    $ E1 s( P3 T9 P. {5 e+ H- @; @
    NodeHeartbeatResponse四个参数或者返回值需要使用Protocol Buffers定义, 具体如下( 见yarn_server_common_service_protos.proto3 w7 ?, m2 |# D- H4 e
    件) :. P* ^- H7 I- T% \
    import "yarn_protos.proto";
    / c# J9 w( C$ O1 Gimport "yarn_server_common_protos.proto";
    ' |8 d# i) L5 {" T6 t# fmessage RegisterNodeManagerRequestProto {
    , `3 O; D/ E2 r7 }$ poptional NodeIdProto node_id = 1;
    ) {! @5 t' _4 x, P6 g" Moptional int32 http_port = 3;
    + Q8 q# K; x, m) p# D6 c3 _optional ResourceProto resource = 4;
    0 w1 T! q: P( n' r, k5 N/ _- o* V: K} m
    7 a' i% M$ D2 A4 W8 D; k+ Z3 Pessage RegisterNodeManagerResponseProto {$ o8 A* H& I1 C: K2 I3 S
    optional MasterKeyProto container_token_master_key = 1;
    3 ?/ p/ ~& f& b6 d% Noptional MasterKeyProto nm_token_master_key = 2;
    9 C* g/ k, ~) e$ H# @: [optional NodeActionProto nodeAction = 3;
    2 o$ o3 b' D- v" P' Y' H  |optional int64 rm_identifier = 4;. l! P+ L( E( s* M  T
    optional string diagnostics_message = 5;
    5 i: i- S$ N0 w' D: b/ a  t}.
    ; O+ C: V; G. m3 f7 d& H$ Y.. //其他几个参数和返回值的定义
    + i6 @; G3 M, q  @5 M3 M' r
    步骤4 RPC函数的参数和返回值提供Java定义和封装。 YARN采用了Protocol Buffers作为参数和返回值的序列化框架, 且以5 O' P6 d7 C! ]4 _) ?
    原生态
    .proto文件的方式给出了定义, 而具体的Java代码生成需在代码编写之后完成。 基于以上考虑, 为了更容易使用Protocol
    1 \3 N$ ~# u) {! l* aBuffers
    生成的( Java语言) 参数和返回值定义, YARN RPC为每个RPC函数的参数和返回值提供Java定义和封装, 以参数
    8 _7 _" o1 a& d; U2 w* t/ {5 s
    RegisterNodeManagerRequest为例进行说明。
    1 x$ Z- T9 Z' [, N
    Java接口定义如下( 见Javaorg.apache.hadoop.yarn.server.api.protocolrecords) :
    1 B. L( T$ j* X" M( z% ]8 y
    public interface RegisterNodeManagerRequest {
    0 m# P( W0 P/ E( ?+ A* v0 T& LNodeId getNodeId();+ Q0 O+ E4 @( d  |6 p
    int getHttpPort();
    0 E! ^) R0 y* X5 A: u' r! dResource getResource();+ t1 v* J* M; P: Y$ X& n6 l# B' W; K
    void setNodeId(NodeId nodeId);! H( N7 S& l% Y% Q2 M
    void setHttpPort(int port);6 ]4 x$ g+ @0 z5 n% `3 ^9 f. {
    void setResource(Resource resource);
    6 h7 I1 M. {# K7 m) t. N" F: j}
    : Q) S& ~3 S8 e/ c8 WJava封装如下( 见Javaorg.apache.hadoop.yarn.server.api.protocolrecords.impl.pb) :
    - K0 S1 q0 J, |. u0 ~" h
    public class RegisterNodeManagerRequestPBImpl extends
    . x1 c& O, e1 r% e; I7 aProtoBase<RegisterNodeManagerRequestProto> implements RegisterNodeManagerRequest {9 a' ]+ x/ Y$ |8 ^# O) F7 N
    RegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance();' G) `$ n6 S( b" J5 Q" y# x; L
    RegisterNodeManagerRequestProto.Builder builder = null;
    ' Q1 O9 g9 r4 d/ x5 ~" Aprivate NodeId nodeId = null;
    ) ~, m3 t; H; N...
    ! J4 S5 _* ^2 K$ p0 @@Override
    , r& b( ~$ Y# E; ^4 Fpublic NodeId getNodeId() {
    8 P! h! ]2 V+ J5 iRegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;4 W; ?# y7 O: f% T/ [
    if (this.nodeId != null) {
    * X5 I7 o  l  W: e4 Wreturn this.nodeId;
    6 m0 r0 X, M2 k" x3 j1 `  \3 b}i
    ! X4 _' x( }/ t/ @' S3 [f (!p.hasNodeId()) {
    5 j) t/ g4 o3 v4 Ereturn null;; P& e. ]8 A6 K4 n7 _2 Q3 w/ z; {
    } t
    3 _+ c- Z* T7 X, rhis.nodeId = convertFromProtoFormat(p.getNodeId());- |% g/ q% D5 g" d7 m6 z
    return this.nodeId;
    # N" G: a) l5 c6 h, s! I0 A7 {( d} @' T3 [" E# h1 W6 C4 H! p9 G4 ?
    Override! u3 i/ n4 C4 c$ m, w1 c2 N
    public void setNodeId(NodeId nodeId) {: A. o, l, R1 Q& \
    maybeInitBuilder();# C: ~2 ^/ B2 l
    if (nodeId == null)3 q1 c0 O- |# x. H4 q8 V
    builder.clearNodeId();
    6 j. D8 S! q' `% gthis.nodeId = nodeId;9 V/ _: x! T/ S* Y
    } .2 X2 Y" G' p1 t
    ..2 Q) i9 L( v  J
    }
    $ E& q0 L! o  f6 s步骤5 为通信协议提供客户端和服务器端实现。 客户端代码放在org.apache.hadoop.yarn.server.api.impl.pb.client包中, 且类名为2 C5 ^  ?$ b( D1 b7 {
    ResourceTrackerPBClientImpl, 实现如下:
    ) M3 A4 r: u& u+ ~
    public class ResourceTrackerPBClientImpl implements ResourceTracker, Closeable {
    & M, @8 A) f4 ^4 {% hprivate ResourceTrackerPB proxy;0 N+ v1 l8 `/ Z5 m. M8 r( o0 v
    public ResourceTrackerPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException {
    ( z3 F2 {6 z; R4 BRPC.setProtocolEngine(conf, ResourceTrackerPB.class, ProtobufRpcEngine.class);* S, }+ S& @: K4 r- V
    proxy = (ResourceTrackerPB)RPC.getProxy(" O2 u9 i6 {5 Z" `! `& G
    ResourceTrackerPB.class, clientVersion, addr, conf);
    ! k3 F. @: r1 g: L, s% A} @
    ; p% p/ e$ I1 |# ~Override. i/ D* L/ x2 w5 b% u( d0 ]+ R
    public RegisterNodeManagerResponse registerNodeManager(
    * r5 C2 W  }% k& y# i! I; Y3 B; GRegisterNodeManagerRequest request) throws YarnException,- N: P- r) o- j, a- L) q  _
    IOException {7 t1 H2 w/ k6 [1 W! p: ]
    RegisterNodeManagerRequestProto requestProto = ((RegisterNodeManagerRequestPBImpl)request).getProto();# e4 U% i; ?3 o8 A- \- e) [
    try {
    + L1 ^* ^) I" Zreturn new RegisterNodeManagerResponsePBImpl(proxy.registerNodeManager (null, requestProto));0 K4 u! i0 Z& M6 [# Q
    } catch (ServiceException e) {
    ( B) a# k8 M, f) C+ cRPCUtil.unwrapAndThrowException(e);! A5 \' z0 j  s; f9 R) T& j4 L
    return null;; M. V5 Z- U& X* K: n0 ~& e
    }9 q/ ^5 |( _0 [* i8 N# q
    } .
    " t$ ?8 x3 F$ t# {4 `2 B* S..; t! D$ `3 ~( l+ a8 E. t5 G
    }
    / N) _+ v7 F) C# _服务端代码放在org.apache.hadoop.yarn.server.api.impl.pb.server包中, 且类名为Resource-TrackerPBServerImpl, 实现如下:
    ) r6 b& E& L* E( P
    public class ResourceTrackerPBServiceImpl implements ResourceTrackerPB {
    $ b3 I" n- D$ @2 a. G* Fprivate ResourceTracker real;
    3 U# l+ @8 Z( f( F' epublic ResourceTrackerPBServiceImpl(ResourceTracker impl) {
      \. A2 C! S- c3 ]0 D# A. R4 _this.real = impl;
    6 L  y  i- s& ^$ C} @
    & U; o3 @- d+ c( `8 e) QOverride5 U& o& ^# I6 f* l
    public RegisterNodeManagerResponseProto registerNodeManager(
    ( x* o6 V8 _& @RpcController controller, RegisterNodeManagerRequestProto proto)! q3 E0 V8 M7 Q2 S5 V8 J5 {8 `
    throws ServiceException {
    0 h$ V  [! v; U6 ]) _; o" {RegisterNodeManagerRequestPBImpl request = new RegisterNodeManagerRequestPBImpl(proto);
    ) d* Y0 p) E. q# M( Stry {
    4 `2 ]5 G2 ^% S8 z) yRegisterNodeManagerResponse response = real.registerNodeManager(request);2 t1 C' S3 [; {1 w
    return ((RegisterNodeManagerResponsePBImpl)response).getProto();% z+ n4 A. Y, ^
    } catch (YarnException e) {
    ' v# E# \1 Y& Ythrow new ServiceException(e);, o& q6 o  D2 c( b
    } catch (IOException e) {  [/ g! q/ {. `: Q% r
    throw new ServiceException(e);- H7 e7 t# U! Z+ L4 g# l$ j; r
    }" R/ Q& m, [& X* C3 E
    } .; ?* |- T& J- t3 \" X
    ..
    ; }, S% X  a$ \* v- P- p}
    # x- J2 l( |$ [" l: o5 O# r总结上面几个步骤, 为了实现基于Protocol Buffers序列化框架的YARN RPC通信协议ResourceTrackerYARN实现了一系列7 ?, j1 |  R! }5 c
    Java接口定义和Protocol Buffers封装, 具体如图3-12所示( 以服务器端实现为例) 。% D+ p, t9 e# K% H. u4 n
    3-12 YARN RPC中的Protocol Buffers封装2 T* l. @# K: T
    [6] 参见网址http://en.wikipedia.org/wiki/Remote_procedure_call5 E9 a  H5 ^8 J4 T
    [7] Doug CuttingHadoop最初设计时就是这样描述Hadoop RPC设计动机的。* _; Y* \6 ?) V
    [8] HDFS的单点故障已经在Hadoop 2.0中得到了解决, MRv1中的JobTracker的单点故障在CDH4中也得到了解决。
    5 L7 k4 r8 u& [9 V, p* n  q  G* T
    [9] 参见网址http://thrift.apache.org/
    ) g( a$ w2 c8 O
    [10] 参见网址: http://code.google.com/p/protobuf/wiki/ThirdParty/AddOns" v7 j2 Z- s- g/ O
    [11] AvroRpcEngineHadoop 0.21.0版本开始出现。
    ' i' t  o$ z9 I2 N
    [12] ProtobufRpcEngineHadoop 2.0-apha版本开始出现。
    $ U: O% P$ L/ I* W
    [13] 参见网址https://issues.apache.org/jira/browse/HADOOP-7347
    , `6 Y4 K  k  ~/ p. F0 e
    [14] Hadoop 2.0中的RFC框架是采用Java编写的, 尚不能像ThriftAvro那样支持多语言编程, 但引入Protocol Buffers序列化框架则
    - B* l0 D2 v+ K8 P使其向前迈进了一步。
      
    % ]( \" M9 w& J6 p  C. z# _' |. {4 `' a
    ) s( x0 U) F; L* ]4 y' G
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|Archiver|手机版|小黑屋|Java自学网

    GMT+8, 2025-4-20 20:45 , Processed in 0.574282 second(s), 31 queries .

    Powered by Javazx

    Copyright © 2012-2022, Javazx Cloud.

    快速回复 返回顶部 返回列表