java自学网VIP

Java自学网

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 3423|回复: 0

《深入解析YARN架构设计与实现原理》第3章 YARN基础库【3.2】

[复制链接]
  • TA的每日心情
    开心
    2021-5-25 00:00
  • 签到天数: 1917 天

    [LV.Master]出神入化

    2093

    主题

    3751

    帖子

    6万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    66775

    宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2017-4-14 11:11:39 | 显示全部楼层 |阅读模式
    3.2 第三方开源库
    ! W6 b. [4 S# Z# m$ `  u$ \  J8 G3.2.1 Protocol Buffers
    ) M6 k# p7 v2 h0 h8 ^% @
    Protocol Buffers [1] 是一种轻便高效的结构化数据存储格式, 可以用于结构化数据序列化/反序列化。 它很适合做数据存储或
    " u5 ?0 G0 K6 e. s$ T0 [( m- w
    RPC 的数据交换格式, 常用作通信协议、 数据存储等领域的与语言无关、 平台无关、 可扩展的序列化结构数据格式。 目前支持
    ) k1 P7 A5 E6 z+ h( c* q
    C++JavaPython三种语言。 在Google内部, 几乎所有的RPC协议和文件格式都是采用Protocol Buffers- F1 j+ X; V/ N
    相比于常见的
    XML格式, Protocol Buffers官方网站这样描述它的优点:, {0 H# p- ^9 Q* ~
    ❑平台无关、 语言无关;, |/ {7 [# {# G- p
    ❑高性能, 解析速度是XML20100倍;
    1 ^6 N5 }) q2 E4 x, D
    ❑体积小, 文件大小仅是XML1/101/32 _7 L* k- e( Y; v; }+ Z
    ❑使用简单;* |8 |. ?0 u4 E1 C  p) a4 j# s
    ❑兼容性好。. z6 T/ K7 w& z7 O9 K/ O) W$ y0 E) R
    通常编写一个
    Protocol Buffers应用需要以下三步:$ g- A( @7 V9 J8 C6 ^- A8 K
    1) 定义消息格式文件, 通常以proto作为扩展名;
    4 S2 }7 `3 t$ v; R, }" {8 |. r
    2) 使用Google提供的Protocol Buffers编译器生成特定语言( 目前支持C++JavaPython三类语言) 的代码文件;
    9 G- g2 C: ^; H. T! r0 ]& ~1 |
    3) 使用Protocol Buffers库提供的API来编写应用程序。
    ( t. e6 |/ Z" g2 Y: U! u, w7 I为了说明
    Protocol Buffers的使用方法, 下面给出一个简单的实例。
    0 b; p& }# C, t9 T, M该实例中首先定义一个消息格式文件
    person.proto, 描述了通讯录中一个人的基本信息, 接着用Protocol Buffers提供的方法将- ]1 c- \. L, v
    一个人的信息写入文件。
    9 M" ?, T! Y! ^2 i3 ^步骤
    1 定义消息格式文件person.proto, 该文件描述了通讯录中某个人的基本信息, 内容如下:4 m# {0 O" n- ^  A% S6 [
    package tutorial; //自定义的命名空间
    : v, n- W5 [7 E
    option java_package = "com.example.tutorial"; //生成文件的包名8 e+ ?% L% u6 N
    option java_outer_classname = "PersonProtos"; //类名( Y& p: y0 R+ b+ \1 \6 W
    message Person { //待描述的结构化数据
    % u) b: ^& c6 m2 ?% i: O
    required string name = 1; //required表示这个字段不能为空, ?& S. M) x$ m% g$ d. n; q5 w
    required int32 id = 2; //数字“2”表示字段的数字别名
    3 \+ I9 _" l* u* j# w7 o8 Y
    optional string email = 3; //optional表示该字段可以为空& p6 m: J& u4 k. w1 M+ U
    message PhoneNumber { //内部message/ t/ U* k' k+ P2 h/ L
    required string number = 1;
    % s6 U* l* k0 Z" m( S3 |4 _. n: Y4 }optional int32 type = 2;
    9 B1 x9 t& J0 A}r
    ; s- M; N% {) N7 k6 _+ f, w+ Xepeated PhoneNumber phone = 4;
    - J9 Z, G! Z% c. m; \}9 F- E& y9 k2 R# f3 ^4 I2 E
    步骤2 使用Google提供的Protocol Buffers编译器生成Java语言, 命令如下:$ Z/ E9 @2 Q* L* S! Q1 t
    protoc -java_out=. person.proto
    , t' [2 W- l* `6 i) A注意, 上面的命令运行时的当前路径是person.proto所在目录。
    . G& y: [( h  Z* F9 [9 O$ a步骤
    3 使用Protocol Buffers库提供的API编写应用程序。 该例子创建了一个Person对象, 先将该对象保存到文件example.txt* e# N+ J0 p: s; C! f3 \: @- Y3 S
    中, 之后又从文件中读出并打印出来。' t' g1 t* \1 I1 j$ e) Y# o
    public class ProtocolBufferExample {3 I% t: A$ l4 W7 C: M, W
    static public void main(String[] argv) {
    2 `) y" `5 u1 m4 e" R0 SPerson person1 = Person.newBuilder()- Y( [" W6 }& Q' T. P& ^
    .setName("Dong Xicheng")' q1 m. \& N3 I. v8 ~6 ~# a' z
    .setEmail("dongxicheng@yahoo.com")0 [- B' O0 R- v
    .setId(11111)
    - F5 }: z: _: k+ w0 h; r/ k" p8 N.addPhone(Person.PhoneNumber.newBuilder()
    , }9 q9 _+ `0 P# q.setNumber("15110241024")* r! \3 U  W: Y* O4 [4 p
    .setType(0))- B* M0 N3 D" n) F" X# c
    .addPhone(Person.PhoneNumber.newBuilder()1 q, A; p7 ~; l
    .setNumber("01025689654")
    5 f1 c; {+ k& }/ c( y( ^.setType(1)).build();5 X5 ^0 b" C' v
    try {: y# z  ]$ H6 Z1 Y& U
    FileOutputStream output = new FileOutputStream("example.txt");
    4 S) V( y2 ]4 O$ dperson1.writeTo(output);  E' f/ S& d- g' O, F
    output.close();5 M, J- K: h; x3 }
    } catch(Exception e) {
    6 ^0 ^- F+ A0 x# g  fSystem.out.println("Write Error");
    * i4 k& |, [8 [} t) ?9 B, W' j7 D2 b* c
    ry {
    ) v/ V5 r1 {* `! f! B$ N. U$ ?FileInputStream input = new FileInputStream("example.txt");! e. G  @& P% @$ I
    Person person2 = Person.parseFrom(input);
    8 c- x9 O4 R! g" j. ]7 {& v  mSystem.out.println("person2:" + person2);
    ( }; [  P9 p7 E5 J5 Y* A5 G( i} catch(Exception e) {$ v6 }6 p2 J+ l/ w/ v8 }6 d' }9 v/ F
    System.out.println("Read Error!");
    2 e) N8 I8 q8 [}
    + _% ?& D; t$ I; [}0 x; r# m9 b- J' p" d$ x
    }
    5 h* J8 _$ r, ]  ]5 tYARN中, 所有RPC函数的参数均采用Protocol Buffers定义的, 相比MRv1中基于Writable序列化的方法, Protocol Buffers的引
    0 a9 Q" B! M1 e2 @- h, H入使得
    YARN在向后兼容性和性能方面向前迈进了一大步。/ {8 Z# Z0 o2 n9 T6 H* x* J
    除序列化
    /反序列化之外, Protocol Buffers也提供了RPC函数的定义方法, 但并未给出具体实现, 这需 要用户自行实现 [2] , 而4 k) Y" }* u+ {( G8 z4 J) j" \
    YARN则采用了MRv1Hadoop RPC库, 举例如下:" W+ [" Q1 ]0 g+ _) r
    service ContainerManagerService { //这是YARN自带的ContainerManager协议的定义. V3 `" o& @5 O% d2 _! B6 ^
    rpc startContainer(StartContainerRequestProto) returns (StartContainerResponseProto);8 Y- R5 S  d( ~: K; @& z
    rpc stopContainer(StopContainerRequestProto) returns (StopContainerResponseProto);9 Q6 D! c' H+ h5 \, O8 a; k- d  H
    rpc getContainerStatus(GetContainerStatusRequestProto) returns (GetContainerStatusResponseProto);
    5 g& J+ m, c) U* s' }+ G}
    1 ?5 ]( `; i( z: G7 q( {7 P2 z& V5 E在第2章中, 介绍了YARN中的所有RPC协议, 而这些协议全是使用Protocol Buffers定义的, 具体如下:; }6 i, E; |) m3 u, ~0 m9 F5 m
    ❑applicationmaster_protocol.proto: 定义了AMRM之间的协议—Application-MasterProtocol
    . d! T" x, D1 e% f0 h. \
    ❑applicationclient_protocol.proto: 定义了JobClient( 作业提交客户端) 与RM之间的协议—ApplicationClientProtocol
    4 o& ?- e8 O* b( L5 g" P
    ❑containermanagement_protocol.proto: 定义了AMNM之间的协议—Container-ManagementProtocol
    ( H! U7 i& e) Q3 |. S
    ❑resourcemanager_administration_protocol.proto: 定义了Admin( 管理员) 与RM之间的通信协议( L( V2 V0 g4 }' `1 P* g. S
    ResourceManagerAdministrationProtocol

    & N& J' d7 p1 A3 z& ^/ }; w; R( C
    ❑yarn_protos.proto: 定义了各个协议RPC的参数。/ U$ f, M6 U- E; j
    ❑ResourceTracker.proto: 定义了NMRM之间的协议—ResourceTracker5 K7 G4 n4 j! D6 a4 O
    除了以上几个内核中的协议,
    YARN还使用Protocol BuffersMapReduce中的协议进行了重新定义:
    ) ^4 P+ {; ^1 i
    ❑MRClientProtocol.proto: 定义了JobClient( 作业提交客户端) 与MRAppMaster之间的协议—MRClientProtocol
    . [) E& W" Y9 b
    ❑mr_protos.proto: 定义了MRClientProtocol协议的各个参数。
    9 i% [3 A0 M8 Q3 g, ]7 I1 @. o& `9 z% x
    3.2.2 Apache Avro
    $ q+ i* i/ b0 s7 X) j
    Apache Avro [3] Hadoop下的一个子项目。 它本身既是一个序列化框架, 同时也实现了RPC的功能。7 ?3 y8 A# R3 B
    Avro官网描述Avro的特性和功能如下:
    9 K: m4 E/ M/ m, O5 g- H
    ❑丰富的数据结构类型;
    ; i  z8 |% S0 @% j5 y4 R2 {❑快速可压缩的二进制数据形式;" N6 o3 H# }  ^
    ❑存储持久数据的文件容器;
    7 [* a& n( w# }! B4 |0 X2 a
    ❑提供远程过程调用RPC
    % n- M/ ~# t8 c5 C
    ❑简单的动态语言结合功能。( R4 A8 }) n8 O6 J" R: f* l
    相比于
    Apache Thrift GoogleProtocol BuffersApache Avro具有以下特点:/ r+ m% K# m/ [& C
    ❑支持动态模式 Avro不需要生成代码, 这有利于搭建通用的数据处理系统, 同时避免了代码入侵。, I  G- f' X* G( j% A; L3 {% B
    ❑数据无须加标签 。 读取数据前, Avro能够获取模式定义, 这使得Avro在数据编码时只需要保留更少的类型信息, 有利于. H+ G  j( q4 b
    减少序列化后的数据大小。
    1 T3 n6 O8 o4 Z* Q6 M
    ❑无须手工分配的域标识 ThriftProtocol Buffers使用一个用户添加的整型域唯一性定义一个字段, 而Avro则直接使用域
    / n) N# ]- `/ M( W+ s名, 该方法更加直观、 更加易扩展。( k! u& `2 f% N5 n% A
    编写一个
    Avro应用也需如下三步:
    / ^+ u" |! i3 E" T9 @' Y
    1) 定义消息格式文件, 通常以avro作为扩展名;
    % p, e/ S6 t- T6 {( D8 b
    2) 使用Avro编译器生成特定语言的代码文件( 可选) ;7 c, Y! u" @* ]9 y6 d5 E
    3) 使用Avro库提供的API来编写应用程序。1 }, x+ `% k8 j
    下面给出一个使用实例。4 L/ A$ z# S: {, s) \* ~
    步骤
    1 定义消息格式文件person.avro, 该文件描述了通讯录中某个人的基本信息, 内容如下:% Q4 i8 I! N1 h! o4 c
    {"namespace": "com.example.tutorial"," {6 }% P$ d9 _1 B
    "type": "record",
    0 `/ d3 W, @- z/ g3 c"name": "Person",
    + h& F, z8 M( m- M"fields": [
    - \0 c' A, x. Y6 x" P! W! j. @{"name": "name", "type": "string"},
    ( l0 M2 d2 }- |: O: m8 |{"name": "id", "type": "int"},
    7 D# V& d% ]2 K" `{"name": "email", "type": ["string", "null"]},6 _+ F4 o) E2 U: O* z
    {"name": "phone", "type": {"type": "array",
    - g. D  Q( B1 S7 g4 k7 e) d( V" q"items": {"type": "record", "name": "PhoneNumber",
    ' T. O5 j0 R0 r6 \" I: S"fields": [$ x" C" ^- s/ C
    {"name": "number", "type": "string"},
    4 V* B8 R( ^' I& a9 P{"name": "type", "type": ["int", "null"]}. j6 Q9 z1 t# `) _1 L
    ]
    5 a' t( I3 G0 ?2 o# Z}
    & {) k) }* K* Q+ F" y  u}
    / r) c0 ~% A! Q( o}]
    ) x1 N9 L) G! ~' M+ [/ ]}
    " }4 M. E- v1 `步骤2 使用Avro编译器生成Java语言, 命令如下:, S2 j$ U1 ^$ g7 O- |$ `5 |
    java -jar avro-tools-1.7.4.jar compile schema person.avro .
    ! W: e. `2 K5 s7 ~& |. a- I注意, 上面的命令运行时的当前路径是person.avro所在目录。/ p  R: Z8 U- n0 F8 K0 a" |
    步骤
    3 使用Avro库提供的API来编写应用程序。 该例子创建一个Person对象, 先将该对象保存到文件example.txt中, 之后从文
    " z3 {/ O& j: X件中读出并打印。
    4 z9 N, o! ]0 [" E5 L+ i
    public class AvroExample {6 \# i: K% e3 C6 U: R$ M
    static public void main(String[] argv) {
    ) H! M* C4 o8 U+ t. Y" zPhoneNumber phoneNumber1 = PhoneNumber.newBuilder()
    * p- \; [7 q. f/ a4 s$ \$ O3 M( u.setNumber("15110241024")! @; ]* i! p/ H
    .setType(0).build();( c% S' U; u+ [. a) e
    PhoneNumber phoneNumber2 = PhoneNumber.newBuilder()
    2 Q  ~5 R' v. `/ r.setNumber("01025689654")! w9 g- p) V9 j+ ^: d1 j4 t
    .setType(1).build();" d4 s% d7 f$ d% u( E. p( H8 A
    List<PhoneNumber> phoneNumbers = new ArrayList<PhoneNumber>();  i* b+ [/ v) f7 H- f
    phoneNumbers.add(phoneNumber1);
    3 j+ ]/ u% |$ A0 Y2 w( Y4 ?phoneNumbers.add(phoneNumber2);- @0 ]/ E! W% i% C8 h' y
    Person person = Person.newBuilder()4 {% ~6 \$ r/ G8 u% [
    .setName("Dong Xicheng")/ d6 i, x9 w/ M7 E9 U3 c: J9 n
    .setEmail("dongxicheng@yahoo.com")
    : K9 X" {# u' z.setId(11111)& e1 W! @$ t3 G, p% T
    .setPhone(phoneNumbers).build();: ]; d9 a: u9 g
    File file = new File("person.txt");6 g4 @& ?$ d8 M# f; K) U" p
    try {5 [5 j0 t& L+ F9 f, X0 R
    DatumWriter<Person> personDatumWriter = new SpecificDatumWriter<Person>(Person.class);) [6 m2 W% [% |6 Y9 e  W, n5 ^# Y# d
    DataFileWriter<Person> dataFileWriter = new DataFileWriter<Person>(personDatumWriter);/ l% B* b4 H8 o+ }
    dataFileWriter.create(person.getSchema(), file);9 E. l2 ?: C/ L8 Y' l; h, i$ K% Z
    dataFileWriter.append(person);
    2 a+ v5 P. t" T! p( E8 ]dataFileWriter.close();1 y+ U+ m* S0 k; ^
    } catch(Exception e) {
    - O( H* b+ O/ }System.out.println("Write Error:" + e);
    9 T; w: y# O3 z; P$ J* k4 ?}t
    3 Q9 b, J4 J3 [+ P  B' D, |' p7 |ry {( P5 o9 B6 _3 O, w: N2 T
    DatumReader<Person> userDatumReader = new SpecificDatumReader<Person>(Person.class);
    / L+ m8 U$ ]7 x0 B! `+ uDataFileReader<Person> dataFileReader = new DataFileReader<Person>(file, userDatumReader);
    2 q3 F3 A0 b( P3 z  t& zperson = null;
    & n  J9 F/ Z9 S3 Qwhile (dataFileReader.hasNext()) {
    # A: w& \2 E+ A; x3 c& {$ d4 zperson = dataFileReader.next(person);
    ) K8 G( D% s) rSystem.out.println(person);; |# _+ o6 ~3 s7 J
    }
    ; X- b) p& k' O- E} catch(Exception e) {
    2 p* Z/ C& q, Q2 B0 z$ p6 ~System.out.println("Read Error:" + e);% l- v! ]( ^& ~5 C# K
    }
    6 F' i1 X4 W* G' R- ^2 z}
    0 D* P) B0 a* u+ I}
    $ F8 p9 y2 H& O5 C% n如果不想编译person.avro文件, 需要使用另外一套应用程序接口, 具体可参 考官方文档 [4]
    . A# J; a$ S, C$ n6 H
    Apache Avro最初是为Hadoop量身打造的RPC框架, 考虑到 稳定性 [5] YARN暂时采用Protocol Buffers作为序列化库, RPC
    ' {9 m% J, q7 n; [: ^: `使用
    MRv1中的RPC, 而Avro则作为日志序列化库使用( 将在第8章介绍) 。 在YARN MapReduce中, 所有事件的序列化/反序列化
    # i- @( j- s. ^2 t, i: v( K均采用
    Avro完成, 相关定义在Events.avpr文件中, 举例如下:2 t2 M$ z& t* ]3 k, X3 X
    {"namespace": "org.apache.hadoop.mapreduce.jobhistory",) |( Q# F# N' ]
    "protocol": "Events",
    & {& X+ L$ E6 D"types": [
    . R6 O/ U8 r  S# I5 x% Z8 q…{
    ' n/ ^$ c' g' Y$ m"type": "record", "name": "JobInfoChange",
    & q+ }1 @3 ^9 w  `"fields": [8 s% u$ m7 V6 F9 f9 d4 ]9 O, k
    {"name": "jobid", "type": "string"}," l: P' `; K& O  a- Y' ^
    {"name": "submitTime", "type": "long"},# {, G: F3 q  c/ x# R: W
    {"name": "launchTime", "type": "long"}! e; K$ h: H  Z; f. u6 ^
    ]
    ( u+ Y) i+ s# X6 F/ H. d},
    # s0 o+ {# C& r' m4 M' f{"type": "record", "name": "JobPriorityChange"," C- I2 L, h5 D, c  o3 O
    "fields": [9 h& _9 d  p" L( K
    {"name": "jobid", "type": "string"}," I3 ~' L% x& [* A
    {"name": "priority", "type": "string"}
    # f  h2 z9 z$ e7 z* x, f- J]
    4 y0 ]! o& C2 h5 ~# x1 M7 J/ ^& `},5 i7 s* v+ b1 V; q# g* N9 N
    {"type": "record", "name": "JobStatusChanged",
    5 S, ~8 k2 `9 w. D/ @# D+ b"fields": [
    + f9 n4 a9 I; z6 r* F, m* ~{"name": "jobid", "type": "string"},) h/ z+ |: S" e' h" a' Y
    {"name": "jobStatus", "type": "string"}
    ! Q) V$ O# r9 k/ I3 B]0 ^) x7 f$ C! N9 P" u
    },' N2 w- y+ X  m- z# `1 H! U6 X
    …]
    . V7 b; c! e: l7 e( ?: E}8 h- a% d6 M- C
    [1] 参见网址http://code.google.com/p/protobuf/
    5 X$ `+ }9 }2 P& f
    [2] 可参考第三方开源实现, 网址为http://code.google.com/p/protobuf/wiki/ThirdPartyAddOns
    - R, c/ C, |' C( J# A- Q7 j
    [3] 参见网址http://avro.apache.org/
    ' J. ~7 s* J  k" |" F$ o
    [4] 参见网址http://avro.apache.org/docs/current/gettingstartedjava.html& p1 r* D' K+ _# W- H0 [3 V
    [5] YARN项目启动时, Apache Avro尚不成熟, 存在各种问题。  
    / [$ v' a5 i, K* f5 l+ T3 U1 W+ F+ A6 D) |/ H, R
    1 f/ L- F0 \7 c+ L
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|Archiver|手机版|小黑屋|Java自学网

    GMT+8, 2025-3-30 05:41 , Processed in 0.097879 second(s), 28 queries .

    Powered by Javazx

    Copyright © 2012-2022, Javazx Cloud.

    快速回复 返回顶部 返回列表