java自学网VIP

Java自学网

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 3459|回复: 0

《深入解析YARN架构设计与实现原理》第3章 YARN基础库【3.2】

[复制链接]
  • TA的每日心情
    开心
    2021-5-25 00:00
  • 签到天数: 1917 天

    [LV.Master]出神入化

    2100

    主题

    3758

    帖子

    6万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    66834

    宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2017-4-14 11:11:39 | 显示全部楼层 |阅读模式
    3.2 第三方开源库
    % Q7 e& ?5 C0 p* H+ m" d* I  K3 Y! d3.2.1 Protocol Buffers
    & G2 G" f! U/ A
    Protocol Buffers [1] 是一种轻便高效的结构化数据存储格式, 可以用于结构化数据序列化/反序列化。 它很适合做数据存储或
    / H6 i/ y) H7 _' R6 ^
    RPC 的数据交换格式, 常用作通信协议、 数据存储等领域的与语言无关、 平台无关、 可扩展的序列化结构数据格式。 目前支持4 `3 J; x- S. U) w* X% Y7 U* w
    C++JavaPython三种语言。 在Google内部, 几乎所有的RPC协议和文件格式都是采用Protocol Buffers6 e0 O0 R& S2 q6 Z# C/ S
    相比于常见的
    XML格式, Protocol Buffers官方网站这样描述它的优点:2 n  J: k7 q6 U& H/ y7 Z5 u
    ❑平台无关、 语言无关;
    - b# \; k: x8 s4 O( U
    ❑高性能, 解析速度是XML20100倍;
    " W8 p8 u' q) s- `0 D1 |, Q6 A) j) v3 U. N
    ❑体积小, 文件大小仅是XML1/101/3" ?& \& }' G( N' J& B
    ❑使用简单;
    2 `/ P7 ?/ E  O. }. @
    ❑兼容性好。8 B/ S: W* P+ q/ K
    通常编写一个
    Protocol Buffers应用需要以下三步:6 }: J' C5 o5 O! d1 h" `( K
    1) 定义消息格式文件, 通常以proto作为扩展名;. {+ Q: @. V: Y& N9 U- P
    2) 使用Google提供的Protocol Buffers编译器生成特定语言( 目前支持C++JavaPython三类语言) 的代码文件;
    9 g4 Y( R5 K7 K: ^$ U9 a( `: ?
    3) 使用Protocol Buffers库提供的API来编写应用程序。
    : ~' x  V9 p6 `4 @( R9 H为了说明
    Protocol Buffers的使用方法, 下面给出一个简单的实例。( o$ V& H: M( `1 q5 }. t
    该实例中首先定义一个消息格式文件
    person.proto, 描述了通讯录中一个人的基本信息, 接着用Protocol Buffers提供的方法将, g7 m, V/ \- F& V: n8 C: b
    一个人的信息写入文件。
    6 e4 R5 A* M# W8 m$ ]步骤
    1 定义消息格式文件person.proto, 该文件描述了通讯录中某个人的基本信息, 内容如下:( u+ K, d1 ?' H8 w: \
    package tutorial; //自定义的命名空间
    * \' s9 o+ A; @
    option java_package = "com.example.tutorial"; //生成文件的包名. d0 H- m- D5 W' t; v4 i
    option java_outer_classname = "PersonProtos"; //类名+ R/ B0 P) }3 [  J$ q# H
    message Person { //待描述的结构化数据" O5 Z* ~+ y7 }) d$ A
    required string name = 1; //required表示这个字段不能为空
      {( H' J" `; K, O4 J5 D9 v, f; x: K
    required int32 id = 2; //数字“2”表示字段的数字别名
    + p6 o7 A; I0 ?" b0 e
    optional string email = 3; //optional表示该字段可以为空
    4 D- {, `' a" Q% M2 ~$ l
    message PhoneNumber { //内部message
    / j8 `/ K. L) ]& \; I; urequired string number = 1;
    7 k/ Z: W0 M* h& O& L# \: Yoptional int32 type = 2;) I4 w% H/ k! T( h) c3 m0 n# X
    }r: x8 i7 G( Y, u2 x+ {
    epeated PhoneNumber phone = 4;; {- l; H. d6 z# h, @  N$ J. h3 P
    }
    4 [0 p/ x$ k$ w  B3 T步骤2 使用Google提供的Protocol Buffers编译器生成Java语言, 命令如下:
    & ?. g- r; }$ i9 }6 T
    protoc -java_out=. person.proto1 o9 D- i) D  L5 v* C/ l
    注意, 上面的命令运行时的当前路径是person.proto所在目录。
    3 |" @; |7 B. U$ `% |步骤
    3 使用Protocol Buffers库提供的API编写应用程序。 该例子创建了一个Person对象, 先将该对象保存到文件example.txt
    . ]% d4 K  N1 }0 F, j" }
    中, 之后又从文件中读出并打印出来。3 j$ q) Q; J* `& Y
    public class ProtocolBufferExample {/ e& q% C* y6 Q: C3 G; |" ~
    static public void main(String[] argv) {
    ! t7 }* d: _! ~4 t' vPerson person1 = Person.newBuilder()
    9 s; ~/ Y% x3 l+ b.setName("Dong Xicheng")
    6 b, e( H/ X8 B: Q8 E8 x.setEmail("dongxicheng@yahoo.com")
    + s. A# B! c" @% j0 w' ~% A/ j.setId(11111)
      o& c. O0 }7 @( r.addPhone(Person.PhoneNumber.newBuilder()
    7 L. `1 Z% `) Q: _.setNumber("15110241024")
    ' j; d: C% r8 b( X& {.setType(0))3 J4 r) p8 l3 u" z4 k* E
    .addPhone(Person.PhoneNumber.newBuilder()
    % v# y: z1 e( ?.setNumber("01025689654")
    * ~- [  I9 U( P) M8 q; m.setType(1)).build();+ z0 d2 h0 V% S! g8 Y- B+ I
    try {
    ( d- \* Y# _4 B& cFileOutputStream output = new FileOutputStream("example.txt");
    # }$ r6 V  J( \$ Iperson1.writeTo(output);7 }- [5 M: T# q) T6 @
    output.close();% e* L1 Y6 E! e
    } catch(Exception e) {( z; A2 q, p! p
    System.out.println("Write Error");! d3 `# S' z) H
    } t$ a0 G1 j  {# p  R
    ry {
      b; E' w. R) D. y; G/ r  [FileInputStream input = new FileInputStream("example.txt");
    " b5 ~9 s3 @* o2 H4 N- W, vPerson person2 = Person.parseFrom(input);
    $ R. j' J" t- M' ~7 x& F+ o# BSystem.out.println("person2:" + person2);$ o, {: o, d5 @9 M
    } catch(Exception e) {4 n& n& u4 g1 S$ C
    System.out.println("Read Error!");/ K3 m6 M' _. }1 E4 v: r" y! T) j
    }
    % |: W* q# f9 l/ t9 a3 q6 H}* E  r7 c' E- W/ m2 N
    }
    2 q# _) v0 z: k, |7 P3 W4 JYARN中, 所有RPC函数的参数均采用Protocol Buffers定义的, 相比MRv1中基于Writable序列化的方法, Protocol Buffers的引
    , B5 q) @& a, l1 S入使得
    YARN在向后兼容性和性能方面向前迈进了一大步。
    $ h9 X" M" t. e7 @( L除序列化
    /反序列化之外, Protocol Buffers也提供了RPC函数的定义方法, 但并未给出具体实现, 这需 要用户自行实现 [2] , 而
    5 k' S$ L4 j6 B
    YARN则采用了MRv1Hadoop RPC库, 举例如下:3 E! ~; y& o) s- P0 S4 _
    service ContainerManagerService { //这是YARN自带的ContainerManager协议的定义
    $ `$ {* s) V8 g9 T! \, T* o
    rpc startContainer(StartContainerRequestProto) returns (StartContainerResponseProto);& L7 ^* s  G5 Y  i. y8 @( f
    rpc stopContainer(StopContainerRequestProto) returns (StopContainerResponseProto);
    # ~* C+ t7 X1 ~& ]. c! ~rpc getContainerStatus(GetContainerStatusRequestProto) returns (GetContainerStatusResponseProto);
      U" t& Y# f# X; [- d}
    2 @# r5 }* l  _$ I! l4 f在第2章中, 介绍了YARN中的所有RPC协议, 而这些协议全是使用Protocol Buffers定义的, 具体如下:/ x# Z/ a! N, |2 ?# p
    ❑applicationmaster_protocol.proto: 定义了AMRM之间的协议—Application-MasterProtocol6 U9 J8 M4 I* S: J4 {$ q- V6 P$ Q  ~
    ❑applicationclient_protocol.proto: 定义了JobClient( 作业提交客户端) 与RM之间的协议—ApplicationClientProtocol
    % y$ ^% y' ~! |4 K& p; d6 A
    ❑containermanagement_protocol.proto: 定义了AMNM之间的协议—Container-ManagementProtocol
    3 Z  j9 k! _+ ]; v7 t$ B
    ❑resourcemanager_administration_protocol.proto: 定义了Admin( 管理员) 与RM之间的通信协议
    . ]2 v+ U8 f' d; z2 I2 _$ TResourceManagerAdministrationProtocol
    ' R9 j% a1 p1 X/ O6 y& P( z; d
    ❑yarn_protos.proto: 定义了各个协议RPC的参数。
    6 \# n/ N% m7 j/ H6 W4 D; K! \0 f1 i
    ❑ResourceTracker.proto: 定义了NMRM之间的协议—ResourceTracker4 v  E3 |( a# j+ o" b) F, f
    除了以上几个内核中的协议,
    YARN还使用Protocol BuffersMapReduce中的协议进行了重新定义:
    0 ?# V  u2 s, @+ V# M1 J& [( c
    ❑MRClientProtocol.proto: 定义了JobClient( 作业提交客户端) 与MRAppMaster之间的协议—MRClientProtocol, }2 v2 o& q/ H7 D
    ❑mr_protos.proto: 定义了MRClientProtocol协议的各个参数。" [+ S* V6 Z' r+ H9 \
    3.2.2 Apache Avro0 k$ V3 b6 w( `/ o
    Apache Avro [3] Hadoop下的一个子项目。 它本身既是一个序列化框架, 同时也实现了RPC的功能。% g/ S  M/ Z) _: c/ S! _
    Avro官网描述Avro的特性和功能如下:, w0 ^" M! S' p, i2 q$ _
    ❑丰富的数据结构类型;
    5 ~+ T- Z' @) _0 T❑快速可压缩的二进制数据形式;
    8 E; x7 E& b; c) z  {
    ❑存储持久数据的文件容器;
    # S8 Z. Y% o; x; q6 Z, [6 o5 p
    ❑提供远程过程调用RPC
    & `- ?% O/ |& I* A- G) C7 E0 g0 ^$ m# h
    ❑简单的动态语言结合功能。
    3 @- e/ d3 M' ]6 [+ `) W; p相比于
    Apache Thrift GoogleProtocol BuffersApache Avro具有以下特点:
    - Z" s( S4 d* X& V: r
    ❑支持动态模式 Avro不需要生成代码, 这有利于搭建通用的数据处理系统, 同时避免了代码入侵。
    ! R7 w9 b$ {9 L# S+ S! H  y
    ❑数据无须加标签 。 读取数据前, Avro能够获取模式定义, 这使得Avro在数据编码时只需要保留更少的类型信息, 有利于
    * F6 q# A2 Q  W+ t减少序列化后的数据大小。: g9 d3 i3 `& |2 i- c
    ❑无须手工分配的域标识 ThriftProtocol Buffers使用一个用户添加的整型域唯一性定义一个字段, 而Avro则直接使用域% Q+ F' e9 f) Y! I/ b% z: D: Z9 _
    名, 该方法更加直观、 更加易扩展。1 O: ^9 r# K, o- G$ ^" N+ g
    编写一个
    Avro应用也需如下三步:
    ' P) E; C6 h4 \  I  P
    1) 定义消息格式文件, 通常以avro作为扩展名;8 L! m# |+ e6 W+ Y9 J2 O7 J, n
    2) 使用Avro编译器生成特定语言的代码文件( 可选) ;
    5 T8 s9 Z! N2 P$ q
    3) 使用Avro库提供的API来编写应用程序。
    & P6 N0 a$ s) Z" x7 w0 v% Y下面给出一个使用实例。
    7 k' c3 r0 D! t$ d5 T步骤
    1 定义消息格式文件person.avro, 该文件描述了通讯录中某个人的基本信息, 内容如下:5 ?$ R1 d, W8 c+ S( l7 {9 ]
    {"namespace": "com.example.tutorial",
    # F: J  h* g7 C) g/ l# q"type": "record",& C, D, t: }* x, S4 F* I6 w
    "name": "Person",6 L! E# f$ j" X# K; L
    "fields": [: Q' I/ d; k/ _& U" y/ w
    {"name": "name", "type": "string"},
    2 x" h. x# I0 }* s0 s& r{"name": "id", "type": "int"},
    2 G! Z! p; K/ F, u5 Y9 Z  u{"name": "email", "type": ["string", "null"]},7 _% A( e5 L( r  r  {
    {"name": "phone", "type": {"type": "array",
    4 ?( i* s# a3 ]: m"items": {"type": "record", "name": "PhoneNumber",
    - T2 s. a1 H) m* L5 r"fields": [
    8 D  P! c7 V4 t- I+ a{"name": "number", "type": "string"},
    * m. I/ R. }1 v* G5 X9 @: y) _- v5 Y{"name": "type", "type": ["int", "null"]}
    3 G8 @/ q( ]& o1 a]
    % Q. e! k* c- a  b$ X( X8 }}( ~" O3 ~* P. p$ ^2 K- T! Z
    }
    - K: H7 S! ]7 J4 N- o% \: I}]9 K: w1 p; Z. L5 k: y& N
    }7 q# L$ c/ i1 ~6 i% X6 B" g( \
    步骤2 使用Avro编译器生成Java语言, 命令如下:  x( v" `' h& J1 e0 h$ T  l$ ~
    java -jar avro-tools-1.7.4.jar compile schema person.avro .* b8 f# \, v# O4 c1 m- Z# {
    注意, 上面的命令运行时的当前路径是person.avro所在目录。& K$ G0 l: S; p  N  X, \8 Z
    步骤
    3 使用Avro库提供的API来编写应用程序。 该例子创建一个Person对象, 先将该对象保存到文件example.txt中, 之后从文5 \, M; [6 X" I3 D  w
    件中读出并打印。8 g, J/ l. r' `1 G
    public class AvroExample {$ R& W, ]) @% }% R$ ~
    static public void main(String[] argv) {2 V# P6 f2 T4 _" n7 A
    PhoneNumber phoneNumber1 = PhoneNumber.newBuilder()
    ) @( r* n% y8 C  H5 z4 z# j# Q.setNumber("15110241024")
    # `& j& \& k. Y4 ^6 v- w.setType(0).build();2 ~0 _8 p1 F5 P2 b
    PhoneNumber phoneNumber2 = PhoneNumber.newBuilder()
    7 {+ i( c/ f$ H, K) p4 P.setNumber("01025689654")
    & A% l: W: ?+ q/ @( _7 R.setType(1).build();# K2 l9 b+ f/ R# a' y
    List<PhoneNumber> phoneNumbers = new ArrayList<PhoneNumber>();3 Y8 v% j2 o' i; l: ^0 p" y6 c& h3 X
    phoneNumbers.add(phoneNumber1);9 q: S9 k: w9 x
    phoneNumbers.add(phoneNumber2);5 K$ f, b' k3 A
    Person person = Person.newBuilder()9 Z5 f- I2 {' g6 S( I0 [
    .setName("Dong Xicheng")0 n& H: Q! L. X  l% K. \- g: T
    .setEmail("dongxicheng@yahoo.com")' S8 E0 q. B, d6 o+ h, z
    .setId(11111)! s) ]8 U5 n% ^3 X
    .setPhone(phoneNumbers).build();2 k! x1 r, b! C/ Y% @/ Y" d* J
    File file = new File("person.txt");
    ; k( {8 e' K/ U7 E/ i: l2 Ttry {
    8 ]% e: h8 [* E' s- LDatumWriter<Person> personDatumWriter = new SpecificDatumWriter<Person>(Person.class);
    : T% Q- {5 ^% S8 }! LDataFileWriter<Person> dataFileWriter = new DataFileWriter<Person>(personDatumWriter);
    . u. K" E' g9 `0 \dataFileWriter.create(person.getSchema(), file);
      t3 Y/ |1 ^2 S% y1 q! }dataFileWriter.append(person);- Q# d0 V+ f0 t" k( s5 i
    dataFileWriter.close();
    ; ?1 ]3 x$ `0 f, Y# W. I5 f} catch(Exception e) {
    6 C& I6 \# l# w( t* s: v+ lSystem.out.println("Write Error:" + e);
    ! ?* e. g7 w& l}t7 O) R$ w! l7 S2 ]; m" H7 ^, f
    ry {
    / |- t* `# f8 k3 F- gDatumReader<Person> userDatumReader = new SpecificDatumReader<Person>(Person.class);
    9 S; ]* Z+ k5 U+ g# NDataFileReader<Person> dataFileReader = new DataFileReader<Person>(file, userDatumReader);
    ) J* y, U: d  p+ @# ?person = null;/ M, t/ F: p6 a# F% ^
    while (dataFileReader.hasNext()) {
    $ j. k7 x( V2 Y* p/ ?, Hperson = dataFileReader.next(person);( Q+ ^. ]2 p0 {( |
    System.out.println(person);
    9 d. S0 G. r+ K2 S2 u# a/ r6 z}
    + c. ]$ h3 d* j% O0 D} catch(Exception e) {
    5 y! `/ t% d; G, y7 u- @" uSystem.out.println("Read Error:" + e);2 j+ w) f1 [* e7 \3 Q
    }9 d& ^, s6 {( F" h  M& J
    }
    # i  l6 N. i9 u! t' @}
    ; a4 ?* y: b% O1 Q+ B* _. {+ \如果不想编译person.avro文件, 需要使用另外一套应用程序接口, 具体可参 考官方文档 [4]
    % s6 u" V* g8 r+ [- _
    Apache Avro最初是为Hadoop量身打造的RPC框架, 考虑到 稳定性 [5] YARN暂时采用Protocol Buffers作为序列化库, RPC
    3 {, E8 F& x, C. F, q. U" {% m使用
    MRv1中的RPC, 而Avro则作为日志序列化库使用( 将在第8章介绍) 。 在YARN MapReduce中, 所有事件的序列化/反序列化
    , S* H& o% b' G% h均采用
    Avro完成, 相关定义在Events.avpr文件中, 举例如下:
    * C- F# @4 G5 n* g# ^
    {"namespace": "org.apache.hadoop.mapreduce.jobhistory",! n9 s( n& Y, Z, E5 l$ `0 H
    "protocol": "Events",
    : f/ ~4 A' T1 N  ^" R1 C"types": [
    5 |, }" T" u9 C8 r5 q. z…{8 v( ?: j4 V1 s3 S
    "type": "record", "name": "JobInfoChange",
    , b, l+ X, Q9 K; @8 ^& m"fields": [, r- v. W7 a0 X7 I" r
    {"name": "jobid", "type": "string"},
    8 A% u6 o; ~7 }% r{"name": "submitTime", "type": "long"},, T* b5 p9 y5 S( e' j4 K& \
    {"name": "launchTime", "type": "long"}
    + J: Q) w2 u& Z, Q  {]
    * z+ C; L  ^: A8 w},! m0 ^, v  b+ L1 o) ~
    {"type": "record", "name": "JobPriorityChange",4 }$ n6 y1 @$ c: e- ~
    "fields": [" D: T) C6 B, V) O' ^
    {"name": "jobid", "type": "string"},
    - v+ ~% g2 w$ p% m7 ~) X{"name": "priority", "type": "string"}
      n+ T: c! U" E2 l7 w6 z3 o6 j4 [# d]
    0 n; D" W' n: s},4 ?' B7 t3 S. F& g( `5 ?
    {"type": "record", "name": "JobStatusChanged",
    * Y9 Z4 f& F3 }1 }( j1 I( T"fields": [- N5 a, F! G" T) t, j
    {"name": "jobid", "type": "string"},9 x& `( q+ A  g6 J7 s& r2 |! d
    {"name": "jobStatus", "type": "string"}
    # ^- O& G$ ]1 c( i# `]
    : q- F) y/ C) f  h},. {# i1 \+ }3 M* \4 V7 n# K* ^
    …]# C0 _8 `  m7 S9 e8 M' ?
    }
    7 `: d, k3 J% Q& P) \4 y( [" X[1] 参见网址http://code.google.com/p/protobuf/% y. ^# y# ~: S1 u* v
    [2] 可参考第三方开源实现, 网址为http://code.google.com/p/protobuf/wiki/ThirdPartyAddOns: U0 a2 T: X5 K# {) v- Q
    [3] 参见网址http://avro.apache.org/2 c7 w, g3 ~' A, E2 O1 k
    [4] 参见网址http://avro.apache.org/docs/current/gettingstartedjava.html* L$ @3 f9 n7 g* i1 d% p% Z" M
    [5] YARN项目启动时, Apache Avro尚不成熟, 存在各种问题。  8 V9 M& M" N* L2 k! s% u
    ! L: @+ i) K: I7 @, D" f
    ; b0 e  i% D6 }# c7 S. a: `8 W
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|Archiver|手机版|小黑屋|Java自学网

    GMT+8, 2025-4-21 05:16 , Processed in 0.142640 second(s), 29 queries .

    Powered by Javazx

    Copyright © 2012-2022, Javazx Cloud.

    快速回复 返回顶部 返回列表