java自学网VIP

Java自学网

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 3235|回复: 0

《深入解析YARN架构设计与实现原理》第3章 YARN基础库【3.2】

[复制链接]
  • TA的每日心情
    开心
    2021-5-25 00:00
  • 签到天数: 1917 天

    [LV.Master]出神入化

    2025

    主题

    3683

    帖子

    6万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    66345

    宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2017-4-14 11:11:39 | 显示全部楼层 |阅读模式
    3.2 第三方开源库/ M. e' S; R: k, }- c8 F
    3.2.1 Protocol Buffers' I8 d8 F/ u# n
    Protocol Buffers [1] 是一种轻便高效的结构化数据存储格式, 可以用于结构化数据序列化/反序列化。 它很适合做数据存储或! ?& a$ x7 K6 |4 I! O
    RPC 的数据交换格式, 常用作通信协议、 数据存储等领域的与语言无关、 平台无关、 可扩展的序列化结构数据格式。 目前支持
    7 c/ D. M% n7 m1 @( Z( k6 D) L
    C++JavaPython三种语言。 在Google内部, 几乎所有的RPC协议和文件格式都是采用Protocol Buffers1 t( A3 U# j$ ]1 \
    相比于常见的
    XML格式, Protocol Buffers官方网站这样描述它的优点:
    5 u2 d, J/ g0 A! X
    ❑平台无关、 语言无关;! J: I3 }4 t6 c
    ❑高性能, 解析速度是XML20100倍;
    ( }1 {( \# e0 V4 c& ?7 Y' a
    ❑体积小, 文件大小仅是XML1/101/3+ Q2 Q: g! k5 t) U* E5 Q) X9 o
    ❑使用简单;7 f/ H6 E/ ]" B; |3 I1 m# A
    ❑兼容性好。) L* U( s8 Z- y& }/ G
    通常编写一个
    Protocol Buffers应用需要以下三步:$ H- |" ~' v: Q9 m  x# b4 Q
    1) 定义消息格式文件, 通常以proto作为扩展名;1 _* c1 u7 c! y) ]4 Y
    2) 使用Google提供的Protocol Buffers编译器生成特定语言( 目前支持C++JavaPython三类语言) 的代码文件;# D. t0 i4 U8 g* o5 u2 t% d8 @7 ?
    3) 使用Protocol Buffers库提供的API来编写应用程序。
    + o  N' o0 E& S+ D" V为了说明
    Protocol Buffers的使用方法, 下面给出一个简单的实例。( }$ m" h6 U9 [5 J# o
    该实例中首先定义一个消息格式文件
    person.proto, 描述了通讯录中一个人的基本信息, 接着用Protocol Buffers提供的方法将
    - c/ B0 V1 Y( T1 m% \) |一个人的信息写入文件。2 B1 L$ P! }  A. U4 }$ Q' D
    步骤
    1 定义消息格式文件person.proto, 该文件描述了通讯录中某个人的基本信息, 内容如下:2 c6 Y  ~% }! L  j5 l
    package tutorial; //自定义的命名空间- r1 I- Z+ R* D& O* N8 Q( J- d, O
    option java_package = "com.example.tutorial"; //生成文件的包名
    1 E( ~5 n7 O  c7 \3 S' w7 U# l
    option java_outer_classname = "PersonProtos"; //类名
    % L) w6 E4 y! H  a
    message Person { //待描述的结构化数据- s3 B, W/ o% h
    required string name = 1; //required表示这个字段不能为空
      o" `3 L: F/ w
    required int32 id = 2; //数字“2”表示字段的数字别名
    ! B( M7 [0 v/ Z" `1 `
    optional string email = 3; //optional表示该字段可以为空) O9 n; T/ h( j2 P
    message PhoneNumber { //内部message
    ( O- T, c, z7 x, `required string number = 1;- Z) H& T  H. Y# {* p: V% Y+ T! f
    optional int32 type = 2;; S  B8 E8 y# j( I
    }r1 J4 z5 Q4 l( k3 R( [) m
    epeated PhoneNumber phone = 4;
    - c7 m! U# w9 ]& G4 w  |}6 j/ |. r9 e6 i( V! U1 n0 B
    步骤2 使用Google提供的Protocol Buffers编译器生成Java语言, 命令如下:5 Q& y0 H0 b* z" B" `( @
    protoc -java_out=. person.proto
    0 h2 B9 v: d; c# g, y1 ?; w$ H3 J8 p注意, 上面的命令运行时的当前路径是person.proto所在目录。) t" ]& e8 ^! r! a/ t1 ?+ l+ h
    步骤
    3 使用Protocol Buffers库提供的API编写应用程序。 该例子创建了一个Person对象, 先将该对象保存到文件example.txt, Q  ^8 w0 {0 f! F. ~
    中, 之后又从文件中读出并打印出来。
    5 c8 A, u+ \7 u/ O& X
    public class ProtocolBufferExample {
    7 S0 O+ @& q  j8 L$ ]; Y: cstatic public void main(String[] argv) {
    / j7 _  o0 g$ j+ ^4 sPerson person1 = Person.newBuilder()0 x9 b7 X# m$ e9 F; R- \
    .setName("Dong Xicheng")" p9 p* h: g+ r9 M: @% E
    .setEmail("dongxicheng@yahoo.com")4 `; d& r' x: i- X& b( o) U
    .setId(11111)1 g  R2 Q; r" J3 h" [/ c( g& T5 f
    .addPhone(Person.PhoneNumber.newBuilder()& ~* o3 G! O9 ~# _5 k
    .setNumber("15110241024")
    $ [. D) ?6 M2 X# s3 e( Z( e+ V1 x.setType(0))
      n7 b4 ^. \; h( k; C  Y" m.addPhone(Person.PhoneNumber.newBuilder()3 U; ^) u( g) U& U
    .setNumber("01025689654")( P0 [( x! P" N( l' T) z3 ^. ]' T
    .setType(1)).build();9 ?1 F2 f1 K3 P
    try {
    * X0 J$ O- W  A# m  `" U- h- C5 P7 }FileOutputStream output = new FileOutputStream("example.txt");
    $ N( p$ m/ c  N9 J; o1 wperson1.writeTo(output);0 U% b9 i. Z+ I( `! v
    output.close();
    8 F9 b: W$ @$ z& C, ]$ z8 Z} catch(Exception e) {
    ! \: g1 l; w8 l* {System.out.println("Write Error");  J9 M; h; o+ g0 }
    } t
    . X1 N/ ?! }* ?5 C' Sry {- |8 [* a4 C! O( @2 S
    FileInputStream input = new FileInputStream("example.txt");) E* L+ m5 r  z- w3 ^
    Person person2 = Person.parseFrom(input);/ [  y7 ?! Q1 ]* v# _; Q  c* N0 c
    System.out.println("person2:" + person2);
    " g5 F: y/ D$ Z) S" H1 n} catch(Exception e) {( ^7 a. Y3 Z2 C+ \* ~' X
    System.out.println("Read Error!");6 g1 w0 r9 S. m( u/ u
    }7 O# L" W/ H3 w2 b# \& J
    }/ C) h2 M' e6 ^6 p& m! n
    }
    * ~5 _. P$ _8 `" fYARN中, 所有RPC函数的参数均采用Protocol Buffers定义的, 相比MRv1中基于Writable序列化的方法, Protocol Buffers的引
    7 H- \9 }% e; z/ M入使得
    YARN在向后兼容性和性能方面向前迈进了一大步。/ ?( O+ s* @1 f2 |
    除序列化
    /反序列化之外, Protocol Buffers也提供了RPC函数的定义方法, 但并未给出具体实现, 这需 要用户自行实现 [2] , 而4 L  A$ X( _2 Y
    YARN则采用了MRv1Hadoop RPC库, 举例如下:
    % n* @7 h8 M3 H. y/ B1 I1 o9 r
    service ContainerManagerService { //这是YARN自带的ContainerManager协议的定义# V2 Y$ a2 q, P* |1 B
    rpc startContainer(StartContainerRequestProto) returns (StartContainerResponseProto);1 `" j- {9 z% W- ?" K( p; q  }2 c' q
    rpc stopContainer(StopContainerRequestProto) returns (StopContainerResponseProto);
    + X/ P' a1 s( R: Z9 f% D) K9 a' srpc getContainerStatus(GetContainerStatusRequestProto) returns (GetContainerStatusResponseProto);* {: j4 c, b6 M
    }
    5 E; p- y% v, W# C% ~: f: i在第2章中, 介绍了YARN中的所有RPC协议, 而这些协议全是使用Protocol Buffers定义的, 具体如下:( y4 M9 O0 o5 E9 T
    ❑applicationmaster_protocol.proto: 定义了AMRM之间的协议—Application-MasterProtocol
    9 Z# O  n3 X1 T6 U: z" [
    ❑applicationclient_protocol.proto: 定义了JobClient( 作业提交客户端) 与RM之间的协议—ApplicationClientProtocol
    : E$ k; l3 K% C  @3 p
    ❑containermanagement_protocol.proto: 定义了AMNM之间的协议—Container-ManagementProtocol: K4 d5 h2 O. P) \
    ❑resourcemanager_administration_protocol.proto: 定义了Admin( 管理员) 与RM之间的通信协议3 }# n0 ~( }8 J4 m* n, [& g( D# H
    ResourceManagerAdministrationProtocol
    * D* U* n+ ~& n! n: G3 Y# C3 Q
    ❑yarn_protos.proto: 定义了各个协议RPC的参数。( L  N/ P& H- T, e9 w# Q3 R
    ❑ResourceTracker.proto: 定义了NMRM之间的协议—ResourceTracker$ |  z0 }  `' ?  `; M4 z% ^; M( U
    除了以上几个内核中的协议,
    YARN还使用Protocol BuffersMapReduce中的协议进行了重新定义:: j, J$ k9 A. C7 s6 x: Y% [
    ❑MRClientProtocol.proto: 定义了JobClient( 作业提交客户端) 与MRAppMaster之间的协议—MRClientProtocol
    6 o# A. f6 p8 _9 e. _( ?
    ❑mr_protos.proto: 定义了MRClientProtocol协议的各个参数。' D8 [( D1 A8 I. v* R6 L: V- V
    3.2.2 Apache Avro
    ; w) s, Z; F' }+ B
    Apache Avro [3] Hadoop下的一个子项目。 它本身既是一个序列化框架, 同时也实现了RPC的功能。/ g% e2 x5 ~! Y% z5 `! F/ A
    Avro官网描述Avro的特性和功能如下:
    4 n. }1 e: i# d3 J8 q
    ❑丰富的数据结构类型;
    ) [7 g8 v; N/ k6 g+ `& v; X7 C9 k❑快速可压缩的二进制数据形式;
    ; `: l. }  c( b1 ~. ?& y0 ?: H
    ❑存储持久数据的文件容器;' J8 [7 U: I+ m7 H: Z# U8 k
    ❑提供远程过程调用RPC
    * D- z- J" ]$ a5 h0 J' e" I4 X9 [
    ❑简单的动态语言结合功能。9 c# l/ h. j" B3 J1 \$ ]
    相比于
    Apache Thrift GoogleProtocol BuffersApache Avro具有以下特点:( f9 Q0 B, I  j9 N  F, x& ^
    ❑支持动态模式 Avro不需要生成代码, 这有利于搭建通用的数据处理系统, 同时避免了代码入侵。7 a, W2 t5 ~5 ]$ O
    ❑数据无须加标签 。 读取数据前, Avro能够获取模式定义, 这使得Avro在数据编码时只需要保留更少的类型信息, 有利于
    % f& [5 \( a: n5 z  O减少序列化后的数据大小。
    1 ^7 \6 a8 y# p6 s: t
    ❑无须手工分配的域标识 ThriftProtocol Buffers使用一个用户添加的整型域唯一性定义一个字段, 而Avro则直接使用域6 d4 x& o. u# [9 m7 _: _) _' s  E
    名, 该方法更加直观、 更加易扩展。
    ' s2 p5 B) I1 c6 Q编写一个
    Avro应用也需如下三步:
    , _1 z/ L1 ^+ q4 i2 {9 E
    1) 定义消息格式文件, 通常以avro作为扩展名;
    7 j, |7 J+ a1 t! A& |7 f
    2) 使用Avro编译器生成特定语言的代码文件( 可选) ;
    / z: P' v9 c1 E
    3) 使用Avro库提供的API来编写应用程序。( `* _$ {9 B: ]6 r0 ~
    下面给出一个使用实例。8 i/ y2 M2 ]6 N' y! r4 h+ @
    步骤
    1 定义消息格式文件person.avro, 该文件描述了通讯录中某个人的基本信息, 内容如下:
    $ C, M8 P# u, b# a( x% x
    {"namespace": "com.example.tutorial",
    7 q- h( k# f$ b+ I5 K' M, u"type": "record",
    $ E) J- V! M1 S0 W' G& I+ |5 T5 K"name": "Person",* a  Y, w4 T3 S4 K3 e
    "fields": [
    1 L7 C( V: b2 s5 u& Z# E{"name": "name", "type": "string"},
    $ t$ l" y4 o2 S( K3 u{"name": "id", "type": "int"},# M" A. [. Y/ ]$ }
    {"name": "email", "type": ["string", "null"]},
    * |. O" {. x7 e7 N8 k{"name": "phone", "type": {"type": "array",# g  p3 O" n* ~, p2 a7 e
    "items": {"type": "record", "name": "PhoneNumber",
    * j, X, N5 J4 \2 g4 o/ ]"fields": [, j1 n  D/ X/ Q8 h
    {"name": "number", "type": "string"},
    , ~" u0 A8 L1 x{"name": "type", "type": ["int", "null"]}
    * }& m( b2 l$ Y6 H/ V4 v# B9 c! B]
    " P% j, [/ F6 P2 Z# {! |/ L( l}9 n! V3 }5 E9 F4 ?& Z7 L$ @( d9 }
    }
    0 ~1 _3 X$ ]0 y$ o1 p: L2 E}]
    3 m% S9 ~1 _& Q7 ?9 F}5 w/ z. I1 ]' R/ ^9 _
    步骤2 使用Avro编译器生成Java语言, 命令如下:
    5 _, U' v' N1 ~' \, O
    java -jar avro-tools-1.7.4.jar compile schema person.avro .2 P* e& p/ P: q/ F
    注意, 上面的命令运行时的当前路径是person.avro所在目录。, M* X+ V5 V* e; p8 ~
    步骤
    3 使用Avro库提供的API来编写应用程序。 该例子创建一个Person对象, 先将该对象保存到文件example.txt中, 之后从文
    9 a  `: n  Q) ~4 t% o8 G- {" ]# \件中读出并打印。
    , n: Z% B( F$ h' A: Q; T+ R
    public class AvroExample {
    ! [* C6 c# n# a9 Y3 i$ Nstatic public void main(String[] argv) {% N' K. |' T6 h" X% s% z6 V
    PhoneNumber phoneNumber1 = PhoneNumber.newBuilder()
    1 c8 y$ X" j( U* r& h" w0 _.setNumber("15110241024")3 t% V  W  U; r0 j- w. ~
    .setType(0).build();
    9 f* q: `* T; p+ o* C. C9 yPhoneNumber phoneNumber2 = PhoneNumber.newBuilder()
    ; q  ^4 i( k2 Z5 Q.setNumber("01025689654")
    8 G  [! D, \8 F( r) P" N.setType(1).build();
    5 B; x* z2 B' ^- z5 g( N( PList<PhoneNumber> phoneNumbers = new ArrayList<PhoneNumber>();, K% E( S) X1 W, V% Q
    phoneNumbers.add(phoneNumber1);# t5 [/ v! F/ M, k
    phoneNumbers.add(phoneNumber2);2 A% {, ~) f# Q! M; Q
    Person person = Person.newBuilder()
    - h! Q, Z) P% B' U% y.setName("Dong Xicheng")" e% S" J- ?3 S' y9 F
    .setEmail("dongxicheng@yahoo.com")2 T5 U) h) }1 B
    .setId(11111)
    ; y3 y5 }; R0 u8 D1 ^.setPhone(phoneNumbers).build();
    - B# U9 |7 L) b4 b! s( QFile file = new File("person.txt");
    2 R1 W- H- C! _" M  I, `try {
    - r) Y5 f( D6 O; I% x! a0 g- [DatumWriter<Person> personDatumWriter = new SpecificDatumWriter<Person>(Person.class);
    6 K% ]+ W0 d7 q% y0 UDataFileWriter<Person> dataFileWriter = new DataFileWriter<Person>(personDatumWriter);
    # t% w* h) a' y) v: zdataFileWriter.create(person.getSchema(), file);
    0 t" l1 X- t' X" S# ~8 W1 y0 p  v, HdataFileWriter.append(person);4 C4 K, t3 Q0 |' U
    dataFileWriter.close();
    5 I3 t! j8 B: `} catch(Exception e) {7 j! O7 k! ?1 r  B' o
    System.out.println("Write Error:" + e);; o6 v5 A( F, ^  H+ v+ i( x
    }t
    & L) m0 E" I1 y8 jry {' P- f$ b& F1 E1 s6 t3 ?. A4 _
    DatumReader<Person> userDatumReader = new SpecificDatumReader<Person>(Person.class);
    - @' Q  y* n0 }DataFileReader<Person> dataFileReader = new DataFileReader<Person>(file, userDatumReader);, |1 z0 O' B% g: {# L. G1 X
    person = null;
    ! y3 B7 p4 ~9 t, T) fwhile (dataFileReader.hasNext()) {
    * r( B2 O! l8 M, C0 W/ fperson = dataFileReader.next(person);
    : K0 n/ n( }) c4 W0 u# ISystem.out.println(person);1 \/ n; X6 I. C6 O
    }0 L% M7 W+ p# s6 _: {
    } catch(Exception e) {
    4 O1 [: }3 W2 a1 p/ }/ w' n+ RSystem.out.println("Read Error:" + e);
    ) V: R, N) r7 g! o& P- W* G$ l, H}: s5 c3 e4 [9 _. a% ]+ o
    }
    . u& w' V: k! U}; ^, W3 p6 m' M1 F0 x
    如果不想编译person.avro文件, 需要使用另外一套应用程序接口, 具体可参 考官方文档 [4]
    ' W9 H" q# v- t9 f2 e1 ?
    Apache Avro最初是为Hadoop量身打造的RPC框架, 考虑到 稳定性 [5] YARN暂时采用Protocol Buffers作为序列化库, RPC
    ; F  O$ p! p/ z使用
    MRv1中的RPC, 而Avro则作为日志序列化库使用( 将在第8章介绍) 。 在YARN MapReduce中, 所有事件的序列化/反序列化
    , R5 Y& u. y/ N0 }* G7 k  J均采用
    Avro完成, 相关定义在Events.avpr文件中, 举例如下:0 U! _2 O: K& L& {
    {"namespace": "org.apache.hadoop.mapreduce.jobhistory",
    " `' N, m( i  f  S4 k"protocol": "Events",
    ( j5 {/ F- e9 T"types": [
    $ K( b6 u8 C* N  z…{
    7 d9 s7 N8 F% t5 R7 U! I% F"type": "record", "name": "JobInfoChange",
    8 I6 H% x! T2 {! F% @" `4 C"fields": [. _) M9 u6 ?( g* i
    {"name": "jobid", "type": "string"},8 Y  m' H; {6 L
    {"name": "submitTime", "type": "long"},
    3 j4 J, A. [" l$ _1 \{"name": "launchTime", "type": "long"}' T' e. k# {9 g! e/ _
    ]- b8 L! a1 @, [7 U& m1 D
    },
    " u, v) N8 U( e4 w: Z- a{"type": "record", "name": "JobPriorityChange",/ V7 L  W( [8 R- [7 X( e/ V3 v
    "fields": [
    * w' B5 n4 ~. k; A{"name": "jobid", "type": "string"},
    9 {+ M, ], y+ i{"name": "priority", "type": "string"}: R2 v/ I; M) ]4 w
    ]
    / w3 s: S  G) \! `},6 p" Y+ P! O1 K
    {"type": "record", "name": "JobStatusChanged",
    ) F4 m" Z5 O2 i5 Z"fields": [6 T& T$ l* k- Y
    {"name": "jobid", "type": "string"},
    ' |& f; j6 d# ^! ^) R. f. L; E4 j{"name": "jobStatus", "type": "string"}; j& [9 B/ c( g  n9 i* O
    ]0 w2 I' }1 z1 o9 r% `  E
    },4 l; G. j# b  t: Z0 ^# f
    …]" @# |# }+ l4 J/ V; n" H
    }$ A, q4 ^  i! g! ~3 s0 u2 z
    [1] 参见网址http://code.google.com/p/protobuf/
    ' ^9 {3 V) N7 L7 b; [# D2 O& L
    [2] 可参考第三方开源实现, 网址为http://code.google.com/p/protobuf/wiki/ThirdPartyAddOns* l# J# I/ v8 p
    [3] 参见网址http://avro.apache.org/3 P! W3 [5 G4 w3 a2 j3 q
    [4] 参见网址http://avro.apache.org/docs/current/gettingstartedjava.html
    6 j7 K4 ]2 A, I5 m
    [5] YARN项目启动时, Apache Avro尚不成熟, 存在各种问题。  
    ) i5 Y  @, U; J# K) v0 }2 z( O* t. P9 {. g

    4 X9 t! r$ u+ G' \; E* x  D
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|Archiver|手机版|小黑屋|Java自学网

    GMT+8, 2024-11-21 17:47 , Processed in 0.122416 second(s), 31 queries .

    Powered by Javazx

    Copyright © 2012-2022, Javazx Cloud.

    快速回复 返回顶部 返回列表