java自学网VIP

Java自学网

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 3424|回复: 0

《深入解析YARN架构设计与实现原理》第3章 YARN基础库【3.2】

[复制链接]
  • TA的每日心情
    开心
    2021-5-25 00:00
  • 签到天数: 1917 天

    [LV.Master]出神入化

    2093

    主题

    3751

    帖子

    6万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    66775

    宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2017-4-14 11:11:39 | 显示全部楼层 |阅读模式
    3.2 第三方开源库
    6 ^/ ^0 r2 r6 j" C1 x3.2.1 Protocol Buffers
    8 a. }; x- M9 |# l% g+ m; {
    Protocol Buffers [1] 是一种轻便高效的结构化数据存储格式, 可以用于结构化数据序列化/反序列化。 它很适合做数据存储或
    . O4 D- I, p) p
    RPC 的数据交换格式, 常用作通信协议、 数据存储等领域的与语言无关、 平台无关、 可扩展的序列化结构数据格式。 目前支持, ^: v9 L; p8 Y, }9 }8 h8 y
    C++JavaPython三种语言。 在Google内部, 几乎所有的RPC协议和文件格式都是采用Protocol Buffers
    9 A5 ~8 @3 c" ]7 v5 F7 l相比于常见的
    XML格式, Protocol Buffers官方网站这样描述它的优点:
    * x) [) W7 @5 C5 u4 c1 k( H
    ❑平台无关、 语言无关;
    / i" p6 ~" o6 y5 N5 n
    ❑高性能, 解析速度是XML20100倍;7 m) U0 x. W* V( `) K
    ❑体积小, 文件大小仅是XML1/101/3
    - H+ W7 q* Q+ r( ?5 V
    ❑使用简单;
    : ?; A3 D) w6 S! _, v+ b$ g9 _
    ❑兼容性好。
    % k! E# @' w, Y/ H7 x1 B0 x/ k通常编写一个
    Protocol Buffers应用需要以下三步:
    ' s3 ~+ ], j. d0 i: m+ k+ i
    1) 定义消息格式文件, 通常以proto作为扩展名;! x% r# D: p" `, K
    2) 使用Google提供的Protocol Buffers编译器生成特定语言( 目前支持C++JavaPython三类语言) 的代码文件;* c& t) X8 M; J& Z9 b. y! y# o
    3) 使用Protocol Buffers库提供的API来编写应用程序。4 R: v) K! V. w  @- w. _/ d+ y! r4 F
    为了说明
    Protocol Buffers的使用方法, 下面给出一个简单的实例。" }+ v  y0 U2 Q1 }- }
    该实例中首先定义一个消息格式文件
    person.proto, 描述了通讯录中一个人的基本信息, 接着用Protocol Buffers提供的方法将
    4 o6 f/ X  s6 J% r) M" y% u# r一个人的信息写入文件。' P3 b$ n; K) }2 B% `0 |& R4 f
    步骤
    1 定义消息格式文件person.proto, 该文件描述了通讯录中某个人的基本信息, 内容如下:4 U) c& a# y# _" @
    package tutorial; //自定义的命名空间
    7 Y6 X) T% l1 I! T1 B
    option java_package = "com.example.tutorial"; //生成文件的包名
    ! O2 x& D8 K. F( T6 s, @0 [
    option java_outer_classname = "PersonProtos"; //类名/ Y( V3 u; ~$ W5 A& t! H
    message Person { //待描述的结构化数据
    - I4 q9 k1 i: b& H
    required string name = 1; //required表示这个字段不能为空
    % i2 p' R+ c; ^+ U/ @  g' V0 a
    required int32 id = 2; //数字“2”表示字段的数字别名
    8 |! X5 {; X7 I  |2 B
    optional string email = 3; //optional表示该字段可以为空
    # \5 `/ Z8 ?: h6 `* b( e) _* Q
    message PhoneNumber { //内部message
    & d$ @0 Y% O3 b$ F4 T, ]3 e7 Y( krequired string number = 1;- n# C$ C6 J" w! s: Z9 k( F2 o$ y
    optional int32 type = 2;
    % Y! {$ s+ P% u' w9 G9 @}r
    9 d) X! x6 O1 K8 hepeated PhoneNumber phone = 4;
    * K9 I( R; {# E# s- `}
    ! H* C* w( s7 V步骤2 使用Google提供的Protocol Buffers编译器生成Java语言, 命令如下:9 ~7 u' P9 ?0 h
    protoc -java_out=. person.proto
    7 E; S# F6 J' S9 b# o/ m+ P. l注意, 上面的命令运行时的当前路径是person.proto所在目录。6 k2 H+ `! }; U6 t0 L
    步骤
    3 使用Protocol Buffers库提供的API编写应用程序。 该例子创建了一个Person对象, 先将该对象保存到文件example.txt
    4 W3 E& n% G0 g) b
    中, 之后又从文件中读出并打印出来。
    , i) N9 `5 H8 ^
    public class ProtocolBufferExample {
    , V$ g+ u, R, c4 ~" e, G* v# Fstatic public void main(String[] argv) {! f5 L2 z$ A' a: T+ h5 N" `
    Person person1 = Person.newBuilder()8 [/ ]( `. o) c/ [* N
    .setName("Dong Xicheng")
    8 o: i9 M1 d0 @5 y+ {.setEmail("dongxicheng@yahoo.com")
    % M5 w+ X  X3 u% K) c9 k.setId(11111)
    2 \8 O. P, Z1 S5 N. y5 D.addPhone(Person.PhoneNumber.newBuilder()* D5 [5 C. k6 I, J- P, C1 W
    .setNumber("15110241024")
    - X  r9 W# ^) r6 b* L) {.setType(0))
    : i$ {. m/ r! W" j# _9 k8 S$ O.addPhone(Person.PhoneNumber.newBuilder()% m; L& O& {& n, A- Y2 h
    .setNumber("01025689654"); k  R8 q7 `% [# G
    .setType(1)).build();
    ) v& c2 a- e4 Ltry {4 I) v- U5 X% ?% q
    FileOutputStream output = new FileOutputStream("example.txt");
    / p! n$ W7 x4 mperson1.writeTo(output);
    ' M5 a; D3 ]4 V( ^& \output.close();
    6 T. O2 O1 g5 @} catch(Exception e) {
    1 N" U! v$ \( v" h" p1 H' \7 WSystem.out.println("Write Error");  P3 o$ B0 F" `8 a
    } t* i" m- b8 ?! H2 Q) P
    ry {6 f; b$ a) _9 S& @' N" I7 h
    FileInputStream input = new FileInputStream("example.txt");
    1 Q% N3 _6 l( \# w/ ~: V- APerson person2 = Person.parseFrom(input);
    9 `9 `6 q. M1 WSystem.out.println("person2:" + person2);
    4 R9 N5 I  A* E, `5 _2 ?9 b% f( T} catch(Exception e) {! j9 |2 s8 ]  J* z
    System.out.println("Read Error!");
    1 t: c$ ]7 s3 S) d0 d- w0 G, m}! F3 K- r( R4 W0 T, c8 R, m9 Z
    }
    : k# D( a! W" v% D}( S6 j" Y' ]- u+ e! @
    YARN中, 所有RPC函数的参数均采用Protocol Buffers定义的, 相比MRv1中基于Writable序列化的方法, Protocol Buffers的引
    & q) i) k' `, ?, E4 Z) Y: j入使得
    YARN在向后兼容性和性能方面向前迈进了一大步。
    7 J. i( ^1 Y. d2 g7 [除序列化
    /反序列化之外, Protocol Buffers也提供了RPC函数的定义方法, 但并未给出具体实现, 这需 要用户自行实现 [2] , 而/ ~' ?$ M% F* S* J
    YARN则采用了MRv1Hadoop RPC库, 举例如下:: J2 r6 Q- u6 C" C
    service ContainerManagerService { //这是YARN自带的ContainerManager协议的定义  I4 t0 T) b  C3 Z7 q
    rpc startContainer(StartContainerRequestProto) returns (StartContainerResponseProto);1 k$ r9 t: O% B6 M% \3 ^' L
    rpc stopContainer(StopContainerRequestProto) returns (StopContainerResponseProto);5 W, s5 W; k4 ^8 Y; p
    rpc getContainerStatus(GetContainerStatusRequestProto) returns (GetContainerStatusResponseProto);
    - }6 _% @1 X8 g; ^/ a  m. Q2 m  N}
    * k$ H% }  E' r7 y7 P在第2章中, 介绍了YARN中的所有RPC协议, 而这些协议全是使用Protocol Buffers定义的, 具体如下:
    2 G# P" P3 ^4 w( K
    ❑applicationmaster_protocol.proto: 定义了AMRM之间的协议—Application-MasterProtocol
    6 P6 g2 z3 K% l2 U! ]1 i
    ❑applicationclient_protocol.proto: 定义了JobClient( 作业提交客户端) 与RM之间的协议—ApplicationClientProtocol
    5 k1 T# u2 q0 \$ Y
    ❑containermanagement_protocol.proto: 定义了AMNM之间的协议—Container-ManagementProtocol4 s8 b& e1 l8 u( h! v, [8 \6 V% |
    ❑resourcemanager_administration_protocol.proto: 定义了Admin( 管理员) 与RM之间的通信协议
      c. x  p& {: p" W8 ^$ tResourceManagerAdministrationProtocol
    ( ~1 m1 E, |5 w2 F: B
    ❑yarn_protos.proto: 定义了各个协议RPC的参数。
    + d+ P8 p6 s, T! f0 X) j! n6 l
    ❑ResourceTracker.proto: 定义了NMRM之间的协议—ResourceTracker. \% d( e8 \: S7 g& L& r" [  [; u
    除了以上几个内核中的协议,
    YARN还使用Protocol BuffersMapReduce中的协议进行了重新定义:; n) [! [2 X% c
    ❑MRClientProtocol.proto: 定义了JobClient( 作业提交客户端) 与MRAppMaster之间的协议—MRClientProtocol
    % H: @# S8 j0 |% i# R# W7 ^0 F3 d3 i5 x3 I& }
    ❑mr_protos.proto: 定义了MRClientProtocol协议的各个参数。; i* ~( K& ]6 i- R3 N5 s/ k$ b
    3.2.2 Apache Avro
    ' j2 g" i& C, Y$ s
    Apache Avro [3] Hadoop下的一个子项目。 它本身既是一个序列化框架, 同时也实现了RPC的功能。
    : b" O6 {+ J( }& w) r
    Avro官网描述Avro的特性和功能如下:
    3 z) g  \) G4 X5 }! T
    ❑丰富的数据结构类型;
    0 g, C3 U" y% u4 g❑快速可压缩的二进制数据形式;
    # A( [9 z3 J9 q# m8 ?
    ❑存储持久数据的文件容器;# _  f" ~; W. |) `% k  D" V
    ❑提供远程过程调用RPC7 c* `( R% x4 D, B0 H$ Z3 h" p
    ❑简单的动态语言结合功能。# O2 y7 `) h( G! H
    相比于
    Apache Thrift GoogleProtocol BuffersApache Avro具有以下特点:& R( E/ v* }& m1 Z$ N  o5 T
    ❑支持动态模式 Avro不需要生成代码, 这有利于搭建通用的数据处理系统, 同时避免了代码入侵。
    # X% E4 d0 H% R; c
    ❑数据无须加标签 。 读取数据前, Avro能够获取模式定义, 这使得Avro在数据编码时只需要保留更少的类型信息, 有利于* Y8 [5 W$ g! q3 ^  Y. w
    减少序列化后的数据大小。
    : l2 ~# O1 s+ q; [' E$ P
    ❑无须手工分配的域标识 ThriftProtocol Buffers使用一个用户添加的整型域唯一性定义一个字段, 而Avro则直接使用域
    # f2 u5 n, I4 k4 A- R9 o名, 该方法更加直观、 更加易扩展。; y# I0 Z& J/ P7 G) q
    编写一个
    Avro应用也需如下三步:+ q( @9 }& D; A- D% R
    1) 定义消息格式文件, 通常以avro作为扩展名;
    $ Q0 o* g. D- ]/ h
    2) 使用Avro编译器生成特定语言的代码文件( 可选) ;
    % s. p* j9 x1 N1 w/ t# t  Y) i
    3) 使用Avro库提供的API来编写应用程序。: S: z2 z* g2 f
    下面给出一个使用实例。
    ( w- g/ S2 X! z4 I' Q/ j步骤
    1 定义消息格式文件person.avro, 该文件描述了通讯录中某个人的基本信息, 内容如下:
    2 i$ |" J7 I- |9 _, i% {8 A& G9 q
    {"namespace": "com.example.tutorial",
    0 i* r- }8 Q# j% e# ~3 L6 F1 M- s& _"type": "record",* G% [( M3 P/ _. Y4 C( Q7 P6 }% V! r! r
    "name": "Person",
    , t5 M* u* v8 c! }"fields": [
    8 H0 c* o# G  E& g% z# P{"name": "name", "type": "string"},# z; A7 D9 Y  P. u1 ?4 |, l5 T
    {"name": "id", "type": "int"}," d: O* {7 C0 A; ]/ A
    {"name": "email", "type": ["string", "null"]},
    9 r7 n/ A1 X7 e$ z  s* R{"name": "phone", "type": {"type": "array",
    ' h- S4 X; X9 o0 T7 ~0 V; v- U  e3 T"items": {"type": "record", "name": "PhoneNumber",1 `. Z+ j7 Z/ N4 O6 a
    "fields": [
    5 w1 o6 q7 g) A4 R{"name": "number", "type": "string"},2 H1 d0 ^9 V" @3 A& i" w
    {"name": "type", "type": ["int", "null"]}  w$ y' J  T% p9 f% L5 |- b
    ]/ }9 `( a8 \- M+ g/ F
    }
    8 d  x7 K( d& n" @3 \}; t/ w3 ]; F4 R0 A6 x' g& m
    }]
    - c% X9 G$ Z; n# f0 C}9 y" g( Q3 {- l$ o  F# N
    步骤2 使用Avro编译器生成Java语言, 命令如下:& N( ]# ~4 L+ f0 S" ?4 z
    java -jar avro-tools-1.7.4.jar compile schema person.avro .
    2 Q+ i  ]2 N& M- \/ I注意, 上面的命令运行时的当前路径是person.avro所在目录。
    2 K- B. V; A- o8 [0 I步骤
    3 使用Avro库提供的API来编写应用程序。 该例子创建一个Person对象, 先将该对象保存到文件example.txt中, 之后从文( X! s- b/ U( z
    件中读出并打印。; t* }& A5 c1 d
    public class AvroExample {; A8 W/ f1 ~8 [! u4 W6 r
    static public void main(String[] argv) {
    ) h* A4 h+ p$ S; l  s, ]: b7 }PhoneNumber phoneNumber1 = PhoneNumber.newBuilder()
    . }, P1 J# A7 [* ]: p.setNumber("15110241024")5 H3 Q  o! l0 r$ ]; i. l
    .setType(0).build();& e/ N  O+ D. q6 V; ?6 P
    PhoneNumber phoneNumber2 = PhoneNumber.newBuilder()1 ^. d! D7 g% T) c/ E
    .setNumber("01025689654")1 X' R+ }, _6 n  M* o2 I2 j8 U
    .setType(1).build();
    4 v) K& f4 e) i+ r4 a" @$ cList<PhoneNumber> phoneNumbers = new ArrayList<PhoneNumber>();
    & ^8 Z1 @' W& m: l; q5 ophoneNumbers.add(phoneNumber1);
    " l* {& s3 e# ~) o: J4 z0 ?  mphoneNumbers.add(phoneNumber2);3 o( t8 B" C0 R6 o( ^) Q, L. u! t6 }
    Person person = Person.newBuilder()
    ; M/ t" H" }+ ~% M0 R.setName("Dong Xicheng")
    - _5 o4 w% F5 O' H# x.setEmail("dongxicheng@yahoo.com")
    # H7 w3 X0 r. _% c) S.setId(11111)
    & u, y) b- s' ?) E1 Y.setPhone(phoneNumbers).build();
    & S' _. U% q0 G/ o) rFile file = new File("person.txt");2 M; Y! _+ z* m, g/ p" g# \
    try {! a/ ~' c2 C* M8 f
    DatumWriter<Person> personDatumWriter = new SpecificDatumWriter<Person>(Person.class);
    4 a3 {5 p; ~  A& G% h8 CDataFileWriter<Person> dataFileWriter = new DataFileWriter<Person>(personDatumWriter);
    0 E  d& b4 |5 [dataFileWriter.create(person.getSchema(), file);
    ( s4 |% p! h& z+ n! F. j) e9 [9 _dataFileWriter.append(person);
    : A8 w4 U' W. t0 g9 LdataFileWriter.close();
    % S; [+ ?. a$ i5 ]7 X/ H} catch(Exception e) {! Y- R! C. `# _
    System.out.println("Write Error:" + e);
    " |5 `; V3 T7 j/ N# q}t
    6 l. [0 r: a% \- S- kry {
    . `; L. c3 `( F0 {% s8 `DatumReader<Person> userDatumReader = new SpecificDatumReader<Person>(Person.class);
    : Y9 I6 y6 q1 aDataFileReader<Person> dataFileReader = new DataFileReader<Person>(file, userDatumReader);0 O4 K$ R/ ]$ C1 X8 F" {$ u
    person = null;) ?1 M8 X1 R# }/ g
    while (dataFileReader.hasNext()) {
    * ~, [4 c- d& hperson = dataFileReader.next(person);
    , z7 W9 O3 h- f/ DSystem.out.println(person);( S" @# M" a; ~% j0 N" |
    }
    2 p* i9 a* v0 ~9 U2 Q& k} catch(Exception e) {
    $ V7 U; g2 @: P, {) C; k" mSystem.out.println("Read Error:" + e);& m: B" z$ {9 N, z; s# \
    }
    ) S/ K6 F- V) ^/ j. f) b}
    * q- ]. W1 ~, S) q}
    & N: X4 b' l" Z$ S如果不想编译person.avro文件, 需要使用另外一套应用程序接口, 具体可参 考官方文档 [4]
    ) U' x# @" Y" h8 T: F8 r
    Apache Avro最初是为Hadoop量身打造的RPC框架, 考虑到 稳定性 [5] YARN暂时采用Protocol Buffers作为序列化库, RPC; `% \2 P+ M! }7 c# M& y  G
    使用
    MRv1中的RPC, 而Avro则作为日志序列化库使用( 将在第8章介绍) 。 在YARN MapReduce中, 所有事件的序列化/反序列化
    7 I" w  b6 y5 Q2 l1 h0 p5 }5 R均采用
    Avro完成, 相关定义在Events.avpr文件中, 举例如下:
    3 @5 c, T  ^- T4 F- q' q, o( d( ~
    {"namespace": "org.apache.hadoop.mapreduce.jobhistory",
    : J! T4 V- j3 F6 ?"protocol": "Events",8 t- H: s3 T' P+ m6 g
    "types": [; `2 y8 B- J# a7 }
    …{9 ?" W# u4 ^. }1 r+ j
    "type": "record", "name": "JobInfoChange",- L' Y9 F$ `" S
    "fields": [! i/ L+ Z0 j8 `( u0 M7 u( J
    {"name": "jobid", "type": "string"},# f2 J0 d. M$ t2 \. K% `# s
    {"name": "submitTime", "type": "long"},5 b% z9 B+ @+ ]2 `( v5 V8 e
    {"name": "launchTime", "type": "long"}
    9 T3 |% u$ t/ t. }! T2 G]* z4 f/ ~1 r& Z: L! y, z
    },0 l2 C0 U6 b$ t( ?+ a7 u
    {"type": "record", "name": "JobPriorityChange",
    0 l& n! P  j# n. |! T"fields": [7 P" B6 D$ O. \$ k5 [2 e0 U
    {"name": "jobid", "type": "string"}," A3 d" b( E' P. r; R8 g# g$ T% Q
    {"name": "priority", "type": "string"}0 `. M6 P, g: S* L" p
    ]; S0 i: o. X6 ^0 B$ B) P" ^9 k9 R
    },
    % \4 a) J: a: r- S7 C9 g{"type": "record", "name": "JobStatusChanged",
    ; D2 e$ y/ U' _: R. C$ j"fields": [2 L& S7 r1 j- j' l3 k1 u
    {"name": "jobid", "type": "string"},& U; e( [8 A4 _
    {"name": "jobStatus", "type": "string"}. x. z) j, ^7 |( s
    ]6 U% t) [7 n$ `
    },
    . T# b( ~, h" a# x+ G/ |…]2 y! S4 f% I9 x- _" P# S2 [5 ?
    }
      e7 \" l( z7 }0 N3 E; n7 ^[1] 参见网址http://code.google.com/p/protobuf/
    ' L' l$ Y6 S9 [, R% F# l
    [2] 可参考第三方开源实现, 网址为http://code.google.com/p/protobuf/wiki/ThirdPartyAddOns
    . Y# w7 I6 f3 I+ @( c
    [3] 参见网址http://avro.apache.org/
    " d+ M5 p+ p, [: H/ S2 r
    [4] 参见网址http://avro.apache.org/docs/current/gettingstartedjava.html5 @6 F$ m6 P6 M6 {
    [5] YARN项目启动时, Apache Avro尚不成熟, 存在各种问题。  
    9 u2 _( B! z/ _* _3 Y0 G
    : e# w' N" q/ {* ^8 D( A, G/ U' R4 \! y$ d/ h: W7 m% T
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|Archiver|手机版|小黑屋|Java自学网

    GMT+8, 2025-3-30 06:43 , Processed in 0.193228 second(s), 30 queries .

    Powered by Javazx

    Copyright © 2012-2022, Javazx Cloud.

    快速回复 返回顶部 返回列表