java自学网VIP

Java自学网

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 3255|回复: 0

《深入解析YARN架构设计与实现原理》第3章 YARN基础库【3.2】

[复制链接]
  • TA的每日心情
    开心
    2021-5-25 00:00
  • 签到天数: 1917 天

    [LV.Master]出神入化

    2025

    主题

    3683

    帖子

    6万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    66375

    宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2017-4-14 11:11:39 | 显示全部楼层 |阅读模式
    3.2 第三方开源库/ h5 r+ X+ E0 @7 {. X  J$ _9 c" D
    3.2.1 Protocol Buffers$ z6 k. O$ G( f6 _5 V6 Q- |
    Protocol Buffers [1] 是一种轻便高效的结构化数据存储格式, 可以用于结构化数据序列化/反序列化。 它很适合做数据存储或
    " h, M4 e7 y  F
    RPC 的数据交换格式, 常用作通信协议、 数据存储等领域的与语言无关、 平台无关、 可扩展的序列化结构数据格式。 目前支持. F2 _5 I$ E* t$ B8 T' z
    C++JavaPython三种语言。 在Google内部, 几乎所有的RPC协议和文件格式都是采用Protocol Buffers
    8 Z8 `$ U5 w+ w3 _3 |8 ]2 {相比于常见的
    XML格式, Protocol Buffers官方网站这样描述它的优点:
    9 n! z8 }& Z/ E2 ~2 ~+ D
    ❑平台无关、 语言无关;: w9 g; r! W0 h" r
    ❑高性能, 解析速度是XML20100倍;* [+ S( `8 L7 O% D
    ❑体积小, 文件大小仅是XML1/101/3
    ( w, n3 \9 P+ j* [" ^4 J6 y
    ❑使用简单;
      W1 q/ P! t! j6 h! j  m
    ❑兼容性好。# I6 m- w$ b; A+ \2 d5 ]- C- Y2 |
    通常编写一个
    Protocol Buffers应用需要以下三步:
    - d4 b1 X2 l8 K! I
    1) 定义消息格式文件, 通常以proto作为扩展名;9 f8 }! {3 f. g7 D  q4 k( ]/ e7 |
    2) 使用Google提供的Protocol Buffers编译器生成特定语言( 目前支持C++JavaPython三类语言) 的代码文件;6 \9 f" w, m/ z% ^4 |- X: x
    3) 使用Protocol Buffers库提供的API来编写应用程序。
    ' X( z7 a$ D2 p& z为了说明
    Protocol Buffers的使用方法, 下面给出一个简单的实例。* O2 I. F' n  S8 b% a: g0 F% e7 k
    该实例中首先定义一个消息格式文件
    person.proto, 描述了通讯录中一个人的基本信息, 接着用Protocol Buffers提供的方法将3 j' Q3 E: Z3 y6 [! m
    一个人的信息写入文件。% s* e7 W7 n( I6 K1 {7 [. o: d
    步骤
    1 定义消息格式文件person.proto, 该文件描述了通讯录中某个人的基本信息, 内容如下:
    8 T" \1 E- m) z5 `* N. d
    package tutorial; //自定义的命名空间) w4 W8 @) m# I
    option java_package = "com.example.tutorial"; //生成文件的包名) C& `% i0 F- R9 k" _8 B( ^
    option java_outer_classname = "PersonProtos"; //类名
    5 I9 j1 A9 Q2 {
    message Person { //待描述的结构化数据% \* e; l, g6 D8 K3 \
    required string name = 1; //required表示这个字段不能为空
    + M9 h$ ^* W( V+ r; `
    required int32 id = 2; //数字“2”表示字段的数字别名' m$ L$ |, k# Q, t8 Y
    optional string email = 3; //optional表示该字段可以为空
    8 ~' p5 ?8 V; _# K0 s. S
    message PhoneNumber { //内部message
    8 s& E9 z1 C  b0 E4 ~5 O5 G2 _required string number = 1;9 a5 P6 E, W8 _( W' P9 f
    optional int32 type = 2;2 J' }( ^' B( ~6 [8 ^$ r% O
    }r
    . W6 k8 `- \( x( D1 U! yepeated PhoneNumber phone = 4;
      ~  L# ~3 ~& N& A" Q}, C% G2 ^7 |9 I
    步骤2 使用Google提供的Protocol Buffers编译器生成Java语言, 命令如下:4 Y* {' ?. p5 h. A! X
    protoc -java_out=. person.proto
    # [% S7 }* l' k( E注意, 上面的命令运行时的当前路径是person.proto所在目录。
    9 M: `& t3 K  {8 s# ?步骤
    3 使用Protocol Buffers库提供的API编写应用程序。 该例子创建了一个Person对象, 先将该对象保存到文件example.txt3 r* O% a& i+ h# p# H1 `& X# F
    中, 之后又从文件中读出并打印出来。9 ?7 _9 r0 W% u/ W, F) T% h
    public class ProtocolBufferExample {
    - a' k' G  y/ i( i% Xstatic public void main(String[] argv) {  s" `( V  z; _3 ]6 H6 h0 t
    Person person1 = Person.newBuilder()& ?8 y4 H/ l, v" X  z5 v
    .setName("Dong Xicheng")( T8 z0 q7 P% N. N
    .setEmail("dongxicheng@yahoo.com")
    8 Y1 {8 d( d" Y+ k0 S.setId(11111)
    2 H0 {/ T  l3 l0 c: g! q; ?" {* M.addPhone(Person.PhoneNumber.newBuilder()6 D% U6 z" o/ c6 P  h
    .setNumber("15110241024")7 k5 B9 b( N3 Z3 H( q
    .setType(0))
      K- H# y( s' e, I) I. z; Y' b.addPhone(Person.PhoneNumber.newBuilder(), w$ j3 d# x. ^% g
    .setNumber("01025689654")
    5 s/ ?7 N2 ~* m  |; ].setType(1)).build();3 C% H6 U% [0 S% ^
    try {
    - W# q) b# d! O, a* `+ kFileOutputStream output = new FileOutputStream("example.txt");
    0 {" p. J9 W  g) X& Sperson1.writeTo(output);
    , L. @9 [1 h3 b* ?/ ooutput.close();
    + {; H& W% W" \7 `} catch(Exception e) {
    . P5 I. X" _9 OSystem.out.println("Write Error");
    - }2 i/ C+ T. H% Q: L* E} t
    + X. o; q1 }) L5 H$ V% m2 E6 p3 l. h* xry {1 j1 o! K) x7 f% N
    FileInputStream input = new FileInputStream("example.txt");
    - S6 P! U. y+ W1 a5 q% _Person person2 = Person.parseFrom(input);  B! ]3 e1 ~5 x
    System.out.println("person2:" + person2);8 H' m, Y* s# a$ _- s3 V
    } catch(Exception e) {5 q2 x0 _2 O: g; n
    System.out.println("Read Error!");
    0 k& L/ `" y( i7 f* Q) I- w: f}; H9 W4 L$ [: D
    }
    ! ~5 N8 n- Z) F}
    $ }# w$ _1 c! z8 M( z6 T8 tYARN中, 所有RPC函数的参数均采用Protocol Buffers定义的, 相比MRv1中基于Writable序列化的方法, Protocol Buffers的引# ^  i1 w6 j: j# t8 H7 J
    入使得
    YARN在向后兼容性和性能方面向前迈进了一大步。: d, \2 Y/ v/ O4 I$ C8 m
    除序列化
    /反序列化之外, Protocol Buffers也提供了RPC函数的定义方法, 但并未给出具体实现, 这需 要用户自行实现 [2] , 而2 q7 w& e) B0 G: G" G
    YARN则采用了MRv1Hadoop RPC库, 举例如下:
    6 p! O" I+ }7 }$ z/ b3 f& q
    service ContainerManagerService { //这是YARN自带的ContainerManager协议的定义
    $ Y! e0 y6 H' D! J1 c8 O2 o) I
    rpc startContainer(StartContainerRequestProto) returns (StartContainerResponseProto);0 y/ }; b8 c6 D" ?) A, D: @, T
    rpc stopContainer(StopContainerRequestProto) returns (StopContainerResponseProto);; ]1 H( z5 g7 L# X) D
    rpc getContainerStatus(GetContainerStatusRequestProto) returns (GetContainerStatusResponseProto);
    9 W: s8 f# r( K7 A' W; j}# ^6 `9 [8 i) k! g. _
    在第2章中, 介绍了YARN中的所有RPC协议, 而这些协议全是使用Protocol Buffers定义的, 具体如下:
    * i4 g+ @3 I# e7 l
    ❑applicationmaster_protocol.proto: 定义了AMRM之间的协议—Application-MasterProtocol0 o* B4 r: n% _! m( u: `* D0 _2 [. w: \% T
    ❑applicationclient_protocol.proto: 定义了JobClient( 作业提交客户端) 与RM之间的协议—ApplicationClientProtocol2 j9 n/ p* x( \5 O* k3 p
    ❑containermanagement_protocol.proto: 定义了AMNM之间的协议—Container-ManagementProtocol
    7 f+ f6 g/ o' t  u5 F9 ?4 B
    ❑resourcemanager_administration_protocol.proto: 定义了Admin( 管理员) 与RM之间的通信协议
    2 v) K& \; i  R% `4 vResourceManagerAdministrationProtocol
    ) m4 i: N1 F! j  a5 P8 l; a
    ❑yarn_protos.proto: 定义了各个协议RPC的参数。) l3 [% q7 g2 x' s5 E
    ❑ResourceTracker.proto: 定义了NMRM之间的协议—ResourceTracker
    + \$ m" U6 u. G& ?! r" K除了以上几个内核中的协议,
    YARN还使用Protocol BuffersMapReduce中的协议进行了重新定义:4 ?2 C; j9 N1 X* P( n3 m
    ❑MRClientProtocol.proto: 定义了JobClient( 作业提交客户端) 与MRAppMaster之间的协议—MRClientProtocol' W( ^' J4 L" w( S) h  C' P8 T
    ❑mr_protos.proto: 定义了MRClientProtocol协议的各个参数。
    $ g8 ^2 o! m6 L
    3.2.2 Apache Avro! y6 K6 N4 s: T; _0 c3 j* S
    Apache Avro [3] Hadoop下的一个子项目。 它本身既是一个序列化框架, 同时也实现了RPC的功能。
    $ W: f- b5 c6 B- N/ I9 L
    Avro官网描述Avro的特性和功能如下:3 z# ?8 n! C) m) ?; V% `
    ❑丰富的数据结构类型;8 t+ k8 {% M2 N! e
    ❑快速可压缩的二进制数据形式;- H) M. p# e  Q- ^# ~4 p+ P
    ❑存储持久数据的文件容器;: w/ x' k  u1 t, k! Z) b( p3 Q
    ❑提供远程过程调用RPC
    - R( ~6 i) a% A) f/ \0 e
    ❑简单的动态语言结合功能。
    6 G$ e. u. I  I( z8 G相比于
    Apache Thrift GoogleProtocol BuffersApache Avro具有以下特点:
    / q' m2 W- Z5 A3 V
    ❑支持动态模式 Avro不需要生成代码, 这有利于搭建通用的数据处理系统, 同时避免了代码入侵。# @: [9 C6 p- T
    ❑数据无须加标签 。 读取数据前, Avro能够获取模式定义, 这使得Avro在数据编码时只需要保留更少的类型信息, 有利于
    ) F5 x' f" B6 C8 b减少序列化后的数据大小。- v8 k  P$ b" V' f
    ❑无须手工分配的域标识 ThriftProtocol Buffers使用一个用户添加的整型域唯一性定义一个字段, 而Avro则直接使用域
    $ ]: S2 A9 q5 w' T名, 该方法更加直观、 更加易扩展。
    ! A+ I( o. [, k( |) x编写一个
    Avro应用也需如下三步:
    8 |6 B; ^) g: _; m0 i
    1) 定义消息格式文件, 通常以avro作为扩展名;
    7 k6 ?$ N# \! g* k  j
    2) 使用Avro编译器生成特定语言的代码文件( 可选) ;
    ) G% m8 ?' o& K! F' G
    3) 使用Avro库提供的API来编写应用程序。% R1 }7 Y0 H. |  |
    下面给出一个使用实例。# f6 F- D% i( r
    步骤
    1 定义消息格式文件person.avro, 该文件描述了通讯录中某个人的基本信息, 内容如下:) F7 U* d% |* g
    {"namespace": "com.example.tutorial",
    : y( M. A* B) G$ Q& u8 F"type": "record",( X& p, _2 X- H2 p; ^; T/ H
    "name": "Person",
    8 u, H5 u7 h( S( J4 r  g0 ^"fields": [0 D( M( `8 }( ~9 ?. s
    {"name": "name", "type": "string"},  `9 H* l5 A6 p+ o9 O
    {"name": "id", "type": "int"},
    * J" \$ e3 @4 U, Y+ @" `{"name": "email", "type": ["string", "null"]},! m0 v" E$ G- A0 w
    {"name": "phone", "type": {"type": "array",( R; g2 z. s6 K3 ^% [+ I4 `
    "items": {"type": "record", "name": "PhoneNumber",# S! ?) o  e9 d$ l. R
    "fields": [
    3 W  M+ K& H. E) ^- u/ X{"name": "number", "type": "string"},7 G2 ~5 k/ b* D$ u5 _
    {"name": "type", "type": ["int", "null"]}) L. I2 ]4 n8 y0 r. N& R  B" z
    ]
    ) \9 k3 M- \( f, L* Z+ ^" Q# ~5 p}
    4 a% H9 H5 O, Z# T& O}
    & v. W+ E, I9 L) I! z}]
    1 @0 O; \/ S: s9 I2 F}  y: b* C9 V; U" ^8 t, K. z
    步骤2 使用Avro编译器生成Java语言, 命令如下:
    ! `" s& o2 w) n- ]8 i& P7 l
    java -jar avro-tools-1.7.4.jar compile schema person.avro .
    & H+ J/ |7 Z, d% T. U* @5 I$ U注意, 上面的命令运行时的当前路径是person.avro所在目录。0 u" l" m4 Q, k' }( P5 ^. N
    步骤
    3 使用Avro库提供的API来编写应用程序。 该例子创建一个Person对象, 先将该对象保存到文件example.txt中, 之后从文
    * I+ @5 x& w; L件中读出并打印。
    : v* X; f( I5 M+ n
    public class AvroExample {
    + Q6 N+ i- `  ]: m! Hstatic public void main(String[] argv) {2 A' D# N' K! _8 K
    PhoneNumber phoneNumber1 = PhoneNumber.newBuilder()
    6 j+ E- j7 f0 W.setNumber("15110241024")
    6 k, `6 Y3 U. }& v1 U# _2 f- ?5 E, q.setType(0).build();
    + G$ S' J( \" T  {9 |PhoneNumber phoneNumber2 = PhoneNumber.newBuilder()
    8 x; r7 i5 i3 ?  `+ j" o.setNumber("01025689654")) r+ O7 a) _  U% P3 R( i
    .setType(1).build();% r- h; r, V- |3 ], t  a4 k
    List<PhoneNumber> phoneNumbers = new ArrayList<PhoneNumber>();
    $ v1 n; H# N& M4 K( @3 ~( t. MphoneNumbers.add(phoneNumber1);: v. B2 P$ D8 g, @5 K
    phoneNumbers.add(phoneNumber2);8 g: w) D& O  X/ n
    Person person = Person.newBuilder()
    ! z8 _% N% b5 S/ z1 x1 ~.setName("Dong Xicheng")
    ! }* L2 d" m- A- z% W) v.setEmail("dongxicheng@yahoo.com")
    & u4 ?& ~( ]6 L+ f5 F) R3 A2 Q. ].setId(11111)
    , {) V) d# U5 u* [.setPhone(phoneNumbers).build();
    ) y- j! g# e6 ~2 N  f1 M: c1 OFile file = new File("person.txt");: l0 R' e! x- l) U0 l
    try {; \8 U% u; C) Y$ m$ k6 k
    DatumWriter<Person> personDatumWriter = new SpecificDatumWriter<Person>(Person.class);- n; O5 _- ~' f5 K7 n" F9 `
    DataFileWriter<Person> dataFileWriter = new DataFileWriter<Person>(personDatumWriter);4 s9 n- O; `! a0 o! ~
    dataFileWriter.create(person.getSchema(), file);% d0 H# ?/ f. ]
    dataFileWriter.append(person);
    # m4 f  k3 ]$ V/ }% d- fdataFileWriter.close();( E9 R4 U* L* Z3 Z) s
    } catch(Exception e) {
    : c- ^0 w# p9 ]0 A/ ~. S- e: ~' r, fSystem.out.println("Write Error:" + e);
    ' `* l+ l0 C" Z5 t: h4 S}t
    * k$ U0 X- l2 S; Qry {" l: X* T5 n  G
    DatumReader<Person> userDatumReader = new SpecificDatumReader<Person>(Person.class);
    ) T0 V' x9 M5 Y2 qDataFileReader<Person> dataFileReader = new DataFileReader<Person>(file, userDatumReader);
    6 S: [9 k% }# r  v( Qperson = null;
    3 N$ L9 k1 W; |. ]$ xwhile (dataFileReader.hasNext()) {
      l( ~6 H8 m7 y+ _. hperson = dataFileReader.next(person);
    , G0 S. z; J, _System.out.println(person);
    5 e$ G7 n% g5 A+ d5 j}- s0 c  `* m" m( @5 _1 w: J
    } catch(Exception e) {
    ) W' X/ }- {, u* }System.out.println("Read Error:" + e);
    8 T! ^( t' s, G5 s}! ~4 f, V6 {0 E/ {# r2 }
    }
    9 x. z$ B6 ^! e) T1 O- j" K}
      ]8 S, h1 u4 x# u2 |如果不想编译person.avro文件, 需要使用另外一套应用程序接口, 具体可参 考官方文档 [4] 0 l$ K+ ^- y$ r& q4 L
    Apache Avro最初是为Hadoop量身打造的RPC框架, 考虑到 稳定性 [5] YARN暂时采用Protocol Buffers作为序列化库, RPC& y! X! m) L/ r+ A6 e: |. L' o
    使用
    MRv1中的RPC, 而Avro则作为日志序列化库使用( 将在第8章介绍) 。 在YARN MapReduce中, 所有事件的序列化/反序列化" |) Y$ s$ Z/ l! J% B8 D3 z
    均采用
    Avro完成, 相关定义在Events.avpr文件中, 举例如下:
    % z$ O5 R, i4 j; V: a0 D
    {"namespace": "org.apache.hadoop.mapreduce.jobhistory",& E' u! [- R9 s5 i6 p3 Y
    "protocol": "Events",: W+ l  z. ~- l: s( U$ [: c
    "types": [: v/ v& U+ U) f
    …{
    5 ?0 n- g" c( k; p! H"type": "record", "name": "JobInfoChange",6 ^8 m( p& X* K7 z" N
    "fields": [" Z* h! ]7 Z5 b( O0 |1 G! c
    {"name": "jobid", "type": "string"},
    3 t0 L/ t3 U5 n' \  c) B{"name": "submitTime", "type": "long"},
    2 d1 @# y% ?* E5 M* {7 q9 L8 Z{"name": "launchTime", "type": "long"}
    - P1 P$ V$ h4 l; W% W7 c" `]
    " }# `' T8 @  U: }8 g- J- g},
    7 y6 c- A, r) B" v{"type": "record", "name": "JobPriorityChange",; m. ?+ N) L( ~1 w; l) g% \
    "fields": [3 l% m5 |2 N- N
    {"name": "jobid", "type": "string"},/ h3 T) ?1 c4 j+ G
    {"name": "priority", "type": "string"}9 f3 U* ]( b' M- C
    ]* V) _9 P" G5 D1 Y" K' G, X" g
    },
    9 b# ~, N( L# t* P8 ]{"type": "record", "name": "JobStatusChanged",
    3 Y% m2 B. E. F! n+ J7 e"fields": [6 ?/ d/ c# [) }$ L0 M; l1 J( m
    {"name": "jobid", "type": "string"},/ F" R: c* `+ [
    {"name": "jobStatus", "type": "string"}1 G6 ~; H) v; I
    ]
    ' N" |/ w% q  w3 M, V" `2 d* S2 U},% M1 I# T, e9 S& f: H% W
    …]
    + {- {/ q8 g5 d3 C}; K8 U. O; K3 e1 l3 X1 s0 {' D. N
    [1] 参见网址http://code.google.com/p/protobuf/. Q' w8 v5 W5 Y$ `* u
    [2] 可参考第三方开源实现, 网址为http://code.google.com/p/protobuf/wiki/ThirdPartyAddOns
    ( T, s" S- q( {% x# s( H. ?  a
    [3] 参见网址http://avro.apache.org/
    & O6 A& T, i& h- t
    [4] 参见网址http://avro.apache.org/docs/current/gettingstartedjava.html
    9 B. p- R" g0 V1 N! [. {, @
    [5] YARN项目启动时, Apache Avro尚不成熟, 存在各种问题。  
    5 T2 I0 i/ W  |* m8 d' [( @( Z) i
    4 \1 \  `! P0 D( U9 t( u! C/ i1 a; M3 n" f) W1 ^
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|Archiver|手机版|小黑屋|Java自学网

    GMT+8, 2024-12-22 09:03 , Processed in 0.111364 second(s), 30 queries .

    Powered by Javazx

    Copyright © 2012-2022, Javazx Cloud.

    快速回复 返回顶部 返回列表