java自学网VIP

Java自学网

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 3234|回复: 0

《深入解析YARN架构设计与实现原理》第3章 YARN基础库【3.2】

[复制链接]
  • TA的每日心情
    开心
    2021-5-25 00:00
  • 签到天数: 1917 天

    [LV.Master]出神入化

    2025

    主题

    3683

    帖子

    6万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    66345

    宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2017-4-14 11:11:39 | 显示全部楼层 |阅读模式
    3.2 第三方开源库
    # [$ Z. X* ]% R  l* @% Q3.2.1 Protocol Buffers# N1 x6 s4 Z- q# S# F
    Protocol Buffers [1] 是一种轻便高效的结构化数据存储格式, 可以用于结构化数据序列化/反序列化。 它很适合做数据存储或
    " i- c  a( R1 L
    RPC 的数据交换格式, 常用作通信协议、 数据存储等领域的与语言无关、 平台无关、 可扩展的序列化结构数据格式。 目前支持9 _: z0 j% w, t  Y3 P/ i
    C++JavaPython三种语言。 在Google内部, 几乎所有的RPC协议和文件格式都是采用Protocol Buffers
    ' t3 y  i7 ?7 n% b& B* }相比于常见的
    XML格式, Protocol Buffers官方网站这样描述它的优点:! w$ V0 G6 l8 T" S' X( m
    ❑平台无关、 语言无关;
      ?; ?4 B' T% H% v
    ❑高性能, 解析速度是XML20100倍;5 S6 ^  o( x7 s2 O/ u3 u
    ❑体积小, 文件大小仅是XML1/101/3
    - x% t+ P/ q! b+ c
    ❑使用简单;6 N: T1 k# D1 N; g% G* ~1 ~
    ❑兼容性好。& s0 K4 ^4 s7 q" c: @1 `; g, p
    通常编写一个
    Protocol Buffers应用需要以下三步:8 t+ q' w# Y6 L0 ~
    1) 定义消息格式文件, 通常以proto作为扩展名;
    ' @) A7 I$ `( C3 C8 Z6 @
    2) 使用Google提供的Protocol Buffers编译器生成特定语言( 目前支持C++JavaPython三类语言) 的代码文件;
    ; ^# w# y& f) S5 ?! e' P! A
    3) 使用Protocol Buffers库提供的API来编写应用程序。
    2 G6 `6 f( I3 B5 G+ L为了说明
    Protocol Buffers的使用方法, 下面给出一个简单的实例。: I( q; K- S/ M' K
    该实例中首先定义一个消息格式文件
    person.proto, 描述了通讯录中一个人的基本信息, 接着用Protocol Buffers提供的方法将# ~+ D" }. \* [& r
    一个人的信息写入文件。
    9 I' H! ]$ a3 k8 _步骤
    1 定义消息格式文件person.proto, 该文件描述了通讯录中某个人的基本信息, 内容如下:$ N/ H3 ?7 [( m* z- q0 ]: D; r
    package tutorial; //自定义的命名空间
    1 d; \* p% W' ]
    option java_package = "com.example.tutorial"; //生成文件的包名
      d2 H  o3 w" K2 R: S
    option java_outer_classname = "PersonProtos"; //类名
    6 }( X! I, n  Y, n4 n6 y
    message Person { //待描述的结构化数据
    4 T/ G, @2 L- i* Y$ r6 i" n' y
    required string name = 1; //required表示这个字段不能为空
    ; W9 d7 q- {$ }
    required int32 id = 2; //数字“2”表示字段的数字别名
    . U* F' `% ]& b) B; ?
    optional string email = 3; //optional表示该字段可以为空
    , p$ q0 s! T$ u5 K
    message PhoneNumber { //内部message0 k/ l2 _2 Z2 f. ~  G# G, Z
    required string number = 1;8 u; n- b- y: {
    optional int32 type = 2;
    & y) Y& S/ `' @! {}r
    , Y+ B3 [( ?6 T- {* s! P" Vepeated PhoneNumber phone = 4;8 \5 b  C& p# [. [" e3 w. `
    }3 m( Z; V! \7 K1 R7 P
    步骤2 使用Google提供的Protocol Buffers编译器生成Java语言, 命令如下:
    : {- t% k6 z8 E
    protoc -java_out=. person.proto( [: F/ {9 S7 `9 w& g$ h6 I
    注意, 上面的命令运行时的当前路径是person.proto所在目录。4 a# Y$ _2 ^2 y* D* P0 n" ~9 x# j
    步骤
    3 使用Protocol Buffers库提供的API编写应用程序。 该例子创建了一个Person对象, 先将该对象保存到文件example.txt4 ^. {7 V3 w! j
    中, 之后又从文件中读出并打印出来。; n" w' d/ Y5 C5 i5 ~  R
    public class ProtocolBufferExample {
    7 v4 q) A# z6 Z& W/ p9 L# A) p! gstatic public void main(String[] argv) {
    % L! v1 u* E. `' {* b/ k7 iPerson person1 = Person.newBuilder()
    . E# _' k& O0 _( m/ n, z/ }.setName("Dong Xicheng")" F; u& C( L- _+ L- p( t
    .setEmail("dongxicheng@yahoo.com")
    & q! K8 ?6 p+ x- V5 N  a.setId(11111); m, z' N( ~0 \) B
    .addPhone(Person.PhoneNumber.newBuilder()
    & P1 k+ p  e3 a: o4 N6 G+ p; H4 P.setNumber("15110241024")
    " E' x# Z3 x. r; I8 [# t4 `$ h: T4 v.setType(0))
    - N- R! q4 k) H& F" z4 |: m" E.addPhone(Person.PhoneNumber.newBuilder()
    ! Q+ q3 m( Q' R' N9 |.setNumber("01025689654")9 F+ S' t% A1 h' N1 t
    .setType(1)).build();
    / N$ R" }* f! W# l3 S" _try {
      c8 r& A, `  S( ~3 eFileOutputStream output = new FileOutputStream("example.txt");
    " P6 i! |4 G8 S) z5 f: u5 Lperson1.writeTo(output);% K# ~2 z0 Y9 A; S+ C/ S
    output.close();
    ; E" O4 i( l' N- S& `} catch(Exception e) {
    ! y7 f9 _# J1 s" h/ p' w, ~1 bSystem.out.println("Write Error");, C& {" ?# L- _
    } t2 a4 E0 I% @" e8 M+ l/ L
    ry {; m' \. j2 V: D' f9 D4 V
    FileInputStream input = new FileInputStream("example.txt");
    9 E) D/ j3 Z: j. H# h6 i" L6 ?, hPerson person2 = Person.parseFrom(input);; ~3 @  y7 q4 b
    System.out.println("person2:" + person2);* M/ k* }8 p/ i* P
    } catch(Exception e) {
    + N% `5 t- H, _- mSystem.out.println("Read Error!");$ i( L) A8 D! j/ p! ^, S
    }9 }, a/ |* K  [( i$ N
    }
    % @' h4 V; g+ O$ [2 Z}
    6 ?) Q2 J$ Z; _1 N# \8 N9 kYARN中, 所有RPC函数的参数均采用Protocol Buffers定义的, 相比MRv1中基于Writable序列化的方法, Protocol Buffers的引) K& V, N7 {; Q; T$ |
    入使得
    YARN在向后兼容性和性能方面向前迈进了一大步。
    7 `# P* _2 g( r- B3 }) W1 H: f9 @除序列化
    /反序列化之外, Protocol Buffers也提供了RPC函数的定义方法, 但并未给出具体实现, 这需 要用户自行实现 [2] , 而
    ) _4 S) l4 S) o9 I
    YARN则采用了MRv1Hadoop RPC库, 举例如下:
    : B: M* m% Y1 Q/ @& \) f/ q9 ]
    service ContainerManagerService { //这是YARN自带的ContainerManager协议的定义5 ?3 {# n5 d3 O5 q' V6 i
    rpc startContainer(StartContainerRequestProto) returns (StartContainerResponseProto);
    & C+ z( o3 n$ M' `& p' m: Z9 [rpc stopContainer(StopContainerRequestProto) returns (StopContainerResponseProto);
    " `5 X3 H, b9 J9 b( ^& [rpc getContainerStatus(GetContainerStatusRequestProto) returns (GetContainerStatusResponseProto);
    1 @5 D. w; P) }1 e3 g}
    % ~& @3 I' @- M$ O' E1 b- e9 {$ K+ i在第2章中, 介绍了YARN中的所有RPC协议, 而这些协议全是使用Protocol Buffers定义的, 具体如下:
    9 ]+ s5 C$ A: ]0 n2 M
    ❑applicationmaster_protocol.proto: 定义了AMRM之间的协议—Application-MasterProtocol
    ( K- k$ E/ [$ X: ?, C. C9 R* h. y
    ❑applicationclient_protocol.proto: 定义了JobClient( 作业提交客户端) 与RM之间的协议—ApplicationClientProtocol/ M. @& r* f' |2 u4 z
    ❑containermanagement_protocol.proto: 定义了AMNM之间的协议—Container-ManagementProtocol' F# N. z% j5 n8 X/ \
    ❑resourcemanager_administration_protocol.proto: 定义了Admin( 管理员) 与RM之间的通信协议# D2 {$ ]2 _$ H" N' W# S! Q7 E
    ResourceManagerAdministrationProtocol
    ' G/ K* N+ ^  n0 t3 N9 l- g1 |
    ❑yarn_protos.proto: 定义了各个协议RPC的参数。
    , x0 \7 W' |  c' i. q6 ]1 t
    ❑ResourceTracker.proto: 定义了NMRM之间的协议—ResourceTracker
    1 n; v/ R7 P( d. x7 ?  ~除了以上几个内核中的协议,
    YARN还使用Protocol BuffersMapReduce中的协议进行了重新定义:2 g1 T, T, f  D: I8 Z' s/ `' ?* |! W
    ❑MRClientProtocol.proto: 定义了JobClient( 作业提交客户端) 与MRAppMaster之间的协议—MRClientProtocol
    ) I5 C- J" p6 _' q
    ❑mr_protos.proto: 定义了MRClientProtocol协议的各个参数。
    ) m$ W$ e5 R; h1 k; }: u  f
    3.2.2 Apache Avro
    ; C; l4 P5 o% w  D
    Apache Avro [3] Hadoop下的一个子项目。 它本身既是一个序列化框架, 同时也实现了RPC的功能。3 W# y5 B1 ]" J/ T, v$ L; G
    Avro官网描述Avro的特性和功能如下:# @6 H! p, J7 l, s9 G
    ❑丰富的数据结构类型;
    $ ?1 s) v4 w2 L- u6 _0 r! `) O+ y8 x❑快速可压缩的二进制数据形式;
      ?& G. o9 U6 ]" J; _% z: V
    ❑存储持久数据的文件容器;
    ; K( ~$ f* {9 S( H% D- n" G
    ❑提供远程过程调用RPC
    ) l/ `: [3 m' H* f' I, ^( O/ X& K
    ❑简单的动态语言结合功能。9 D8 X* \) z0 c! y( K: D' b
    相比于
    Apache Thrift GoogleProtocol BuffersApache Avro具有以下特点:
    + T6 q8 @' Z" ?/ D
    ❑支持动态模式 Avro不需要生成代码, 这有利于搭建通用的数据处理系统, 同时避免了代码入侵。$ W5 [; u7 w& l" Q) c
    ❑数据无须加标签 。 读取数据前, Avro能够获取模式定义, 这使得Avro在数据编码时只需要保留更少的类型信息, 有利于) s0 u6 X( r5 l
    减少序列化后的数据大小。  G5 z( C; }% F! ]- j7 k" T
    ❑无须手工分配的域标识 ThriftProtocol Buffers使用一个用户添加的整型域唯一性定义一个字段, 而Avro则直接使用域, `6 A' m5 X: G: Z9 t
    名, 该方法更加直观、 更加易扩展。# n+ ~4 `! N' p# |. O: P  I
    编写一个
    Avro应用也需如下三步:
    ( p* {0 c: _6 U/ d9 u
    1) 定义消息格式文件, 通常以avro作为扩展名;% }2 X2 j7 M: |
    2) 使用Avro编译器生成特定语言的代码文件( 可选) ;) `; P. v( r5 B  P2 a
    3) 使用Avro库提供的API来编写应用程序。
    0 D4 y! \$ a8 X, Z2 X+ i# i下面给出一个使用实例。
    * T" j$ U% m4 e; Z, T0 I7 [步骤
    1 定义消息格式文件person.avro, 该文件描述了通讯录中某个人的基本信息, 内容如下:" [7 B# U7 t% u7 G* H9 d
    {"namespace": "com.example.tutorial",5 S  I: z* R; D
    "type": "record",
    2 E3 y- B0 A+ A6 W. J"name": "Person",
    4 a6 T6 K8 x/ _. P' S( O4 @5 G. u% i/ i% Y"fields": [
    - u& Z) {$ @* `$ S. F7 E{"name": "name", "type": "string"},0 j5 @* v% ^" _' S# B& o
    {"name": "id", "type": "int"},' Z& o+ H* \8 W
    {"name": "email", "type": ["string", "null"]},
    0 u; O) a( Y4 T; x* p$ i3 K{"name": "phone", "type": {"type": "array",
    , p& T) b( g& z) s) a"items": {"type": "record", "name": "PhoneNumber",* h  V+ ?  B; U1 K5 G$ A+ N
    "fields": [
    7 }3 f+ X, q5 B7 f; }" \{"name": "number", "type": "string"},
    ' D# J1 e, ^0 w: i# z  p{"name": "type", "type": ["int", "null"]}
    6 U" W& Y" I: T4 r, |]
    & k. l+ Q9 o/ A% W2 C. a}8 @6 r; `- ?- J' ?
    }8 y/ V5 K$ C; D) P/ f
    }]
    3 f: ~4 u' z0 d# l}
    0 h) K) ^9 M5 M. s- \) H0 U步骤2 使用Avro编译器生成Java语言, 命令如下:
    3 u3 K5 w8 X' j
    java -jar avro-tools-1.7.4.jar compile schema person.avro .
    3 V: Y- N0 _0 W" G- u& D1 h3 j注意, 上面的命令运行时的当前路径是person.avro所在目录。
    , O( B6 k6 Z5 ]步骤
    3 使用Avro库提供的API来编写应用程序。 该例子创建一个Person对象, 先将该对象保存到文件example.txt中, 之后从文% |& l$ i, {9 E' d$ f
    件中读出并打印。
    ! h! Y% m  N# J
    public class AvroExample {5 u/ h9 b9 b4 F" M
    static public void main(String[] argv) {3 c) P5 s* q3 M+ o( e
    PhoneNumber phoneNumber1 = PhoneNumber.newBuilder(); H1 _; ]0 c/ }& K) V. T- `
    .setNumber("15110241024")2 T7 V. Z0 V5 s9 K+ h' U
    .setType(0).build();
    3 \1 V* i! K2 b! a: RPhoneNumber phoneNumber2 = PhoneNumber.newBuilder()
      b$ d5 g2 x. I# X- u  @! v( y! D.setNumber("01025689654")3 Y" w# T& Z, L
    .setType(1).build();
    , b% H5 ~6 Y( e6 vList<PhoneNumber> phoneNumbers = new ArrayList<PhoneNumber>();
    5 b6 X1 M" u/ |- J* o( \phoneNumbers.add(phoneNumber1);
    , O* ~2 E5 {% N& h5 L6 IphoneNumbers.add(phoneNumber2);8 W2 ?7 E4 I: R- |; @7 w
    Person person = Person.newBuilder()
    5 n  n2 y- v1 s$ r9 Y9 _.setName("Dong Xicheng")
    - {' p1 I* ]% H+ S: s) w5 N  x4 G/ P.setEmail("dongxicheng@yahoo.com")* Y* F; K8 i  q4 F( Y6 T
    .setId(11111)
    2 P2 M! \! X( Q; j- ^.setPhone(phoneNumbers).build();! G* ]+ X7 G# r: I2 [
    File file = new File("person.txt");
    7 y; X# h8 R1 _+ Jtry {9 |; T/ c7 f+ Y
    DatumWriter<Person> personDatumWriter = new SpecificDatumWriter<Person>(Person.class);
    & n1 O: @) C" M$ N3 T+ y" qDataFileWriter<Person> dataFileWriter = new DataFileWriter<Person>(personDatumWriter);3 \, f' _7 O; x& I& W7 @
    dataFileWriter.create(person.getSchema(), file);
    , i' y. M1 [# U2 TdataFileWriter.append(person);4 J" V  v. t, Q
    dataFileWriter.close();
    3 [% q- M! g) u* B1 e& a} catch(Exception e) {
    1 l: z+ f, r( i# bSystem.out.println("Write Error:" + e);$ r; k+ y" y6 |# O% v, {
    }t
    % T2 w: N* c! D" N* X. Yry {
    6 X, p3 L/ P9 T! kDatumReader<Person> userDatumReader = new SpecificDatumReader<Person>(Person.class);
    - j. _$ r' R, _, u4 aDataFileReader<Person> dataFileReader = new DataFileReader<Person>(file, userDatumReader);+ y2 c' T- h) Q  G, W; E$ s4 O5 ^
    person = null;
    . O! i2 o0 p: f! \; }0 H1 qwhile (dataFileReader.hasNext()) {3 |: e8 p- C/ I5 ~; T
    person = dataFileReader.next(person);% l" d% b7 D/ Z# A) z8 e6 V
    System.out.println(person);
      }3 Q: ?1 _' Z# V6 V2 ?}; c) k) k8 z, j" U) q! k; \  u8 `
    } catch(Exception e) {+ e' r8 O3 o# ?2 C& K
    System.out.println("Read Error:" + e);
    9 m/ j- z; W, X  y0 D}- e+ X7 Y: i9 V) _: f
    }* T1 F: w. G  k0 m) |" _
    }
    % h. ?5 _8 Z9 D/ c4 M0 B如果不想编译person.avro文件, 需要使用另外一套应用程序接口, 具体可参 考官方文档 [4] ! E- i8 v3 y% y: V# ]( ?
    Apache Avro最初是为Hadoop量身打造的RPC框架, 考虑到 稳定性 [5] YARN暂时采用Protocol Buffers作为序列化库, RPC
    % i  G; r1 |/ d& N$ [' l使用
    MRv1中的RPC, 而Avro则作为日志序列化库使用( 将在第8章介绍) 。 在YARN MapReduce中, 所有事件的序列化/反序列化0 u+ C; x$ M" M2 y9 ]0 _7 H
    均采用
    Avro完成, 相关定义在Events.avpr文件中, 举例如下:
    : |3 u1 g7 Z' M! G9 ~; L# @
    {"namespace": "org.apache.hadoop.mapreduce.jobhistory",
    2 g: \# {2 _+ D' R9 ]" S# z$ [) D4 x"protocol": "Events",
    1 J8 o# C. P: v% Y) d& C% }% k"types": [. ]4 `& u) G( r6 f. T
    …{# a7 Z0 C( _0 \# l
    "type": "record", "name": "JobInfoChange",
    " s) w+ c  K7 p+ _1 W"fields": [: @* ^' a$ {  P) D$ F2 g- Y8 m3 n
    {"name": "jobid", "type": "string"},/ ?& f: m2 x3 w, i; P8 H
    {"name": "submitTime", "type": "long"},: n3 Q) L9 J' m3 `( _5 W1 m
    {"name": "launchTime", "type": "long"}
    ! U1 p! e! b( K/ @: P) b]
    " W, ~0 z: t& {& B},
    1 a- j: m7 E6 U8 C9 W- D' ~  q{"type": "record", "name": "JobPriorityChange",7 E) D0 V( i3 y- g( T
    "fields": [7 @! c- i) }. r# G4 J# F+ P" U
    {"name": "jobid", "type": "string"},& t( [- a% Z( N3 {) h: s- t
    {"name": "priority", "type": "string"}
    . j" d6 W+ {$ k]
    8 f: B2 s+ D' [$ l4 H$ R' L},/ Z, v+ v! a6 Z/ O' t% \
    {"type": "record", "name": "JobStatusChanged",8 q* G8 e$ n- U5 H. e3 k
    "fields": [+ _5 k# Y% f' V) r* J
    {"name": "jobid", "type": "string"},
    2 D* H/ T4 Y: k: b7 t' d; G{"name": "jobStatus", "type": "string"}  ]" @; R6 X+ ?, C. i/ ?8 _
    ]! h/ j6 `# j. s6 Q6 ~
    },
    6 n0 }1 x) }; w5 O…]
    $ l! s, c! Y" s. D, N# ^}
      g- m* l1 Q# g7 X6 f[1] 参见网址http://code.google.com/p/protobuf/& N* |" l9 P0 S% X
    [2] 可参考第三方开源实现, 网址为http://code.google.com/p/protobuf/wiki/ThirdPartyAddOns2 G8 Z* H2 y- M. h# A! Y, Z* B
    [3] 参见网址http://avro.apache.org/* R0 j1 v" K4 _$ }$ e3 {
    [4] 参见网址http://avro.apache.org/docs/current/gettingstartedjava.html9 p8 L# x* O' S2 }; G5 J
    [5] YARN项目启动时, Apache Avro尚不成熟, 存在各种问题。  
    $ y) d1 o. C3 Y: \6 G/ J% i3 v7 X& x1 K/ p. \. s( U

    $ [5 h8 o4 B2 T
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|Archiver|手机版|小黑屋|Java自学网

    GMT+8, 2024-11-21 17:45 , Processed in 0.197307 second(s), 30 queries .

    Powered by Javazx

    Copyright © 2012-2022, Javazx Cloud.

    快速回复 返回顶部 返回列表