java自学网VIP

Java自学网

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 3361|回复: 0

《深入解析YARN架构设计与实现原理》第3章 YARN基础库【3.2】

[复制链接]
  • TA的每日心情
    开心
    2021-5-25 00:00
  • 签到天数: 1917 天

    [LV.Master]出神入化

    2062

    主题

    3720

    帖子

    6万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    66592

    宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2017-4-14 11:11:39 | 显示全部楼层 |阅读模式
    3.2 第三方开源库" Y, r3 S9 ~* k+ A
    3.2.1 Protocol Buffers% a; ~0 g$ ?1 J5 R8 ~  y
    Protocol Buffers [1] 是一种轻便高效的结构化数据存储格式, 可以用于结构化数据序列化/反序列化。 它很适合做数据存储或
    1 c& S! _3 C$ ]6 D7 s
    RPC 的数据交换格式, 常用作通信协议、 数据存储等领域的与语言无关、 平台无关、 可扩展的序列化结构数据格式。 目前支持1 M- ?. y0 ]1 o& s
    C++JavaPython三种语言。 在Google内部, 几乎所有的RPC协议和文件格式都是采用Protocol Buffers
    " Z$ v9 J, e8 `/ u8 B# ^相比于常见的
    XML格式, Protocol Buffers官方网站这样描述它的优点:
    5 g9 E5 x  o& x( U* s! C
    ❑平台无关、 语言无关;  n$ J4 r; A' `( ~/ I+ T
    ❑高性能, 解析速度是XML20100倍;9 j9 f, a* {% J1 B* [/ d% t8 w
    ❑体积小, 文件大小仅是XML1/101/3- N5 d. ?% e, h: t9 l9 k- `7 C) y- _
    ❑使用简单;* y) t' \* e  x1 N! ^
    ❑兼容性好。
    3 Y. ?1 s9 S# d" S通常编写一个
    Protocol Buffers应用需要以下三步:
    6 p6 x6 \$ U. e8 ~, K  v. g
    1) 定义消息格式文件, 通常以proto作为扩展名;
    4 w& p! q9 c# B; ~: E2 b! q! _4 ?
    2) 使用Google提供的Protocol Buffers编译器生成特定语言( 目前支持C++JavaPython三类语言) 的代码文件;$ B# _7 _. t) N7 t; ]) w: G
    3) 使用Protocol Buffers库提供的API来编写应用程序。
      n1 v8 \) t& G为了说明
    Protocol Buffers的使用方法, 下面给出一个简单的实例。
    . {7 ^9 k" n- k该实例中首先定义一个消息格式文件
    person.proto, 描述了通讯录中一个人的基本信息, 接着用Protocol Buffers提供的方法将# t) M) l) p8 ^5 r  [/ U* E
    一个人的信息写入文件。9 y& B3 Y! |8 c& g- F9 X
    步骤
    1 定义消息格式文件person.proto, 该文件描述了通讯录中某个人的基本信息, 内容如下:' I# u+ D  {% a0 K
    package tutorial; //自定义的命名空间7 I6 Y5 ~2 s5 k. F+ y
    option java_package = "com.example.tutorial"; //生成文件的包名
    + k9 @6 ?# Z: z
    option java_outer_classname = "PersonProtos"; //类名
    2 [* K( ^! x  i9 Y; a( }- P# e* E- n
    message Person { //待描述的结构化数据
    7 g. l0 C, E7 m
    required string name = 1; //required表示这个字段不能为空2 L0 R. b7 h+ V5 V, \  ~* q
    required int32 id = 2; //数字“2”表示字段的数字别名
    & |% P$ U$ F/ ~  ^) P2 O
    optional string email = 3; //optional表示该字段可以为空  A/ R) l: p' F, X9 Q! i; S  G
    message PhoneNumber { //内部message4 e* M& z" e& Y# D! |3 v* E
    required string number = 1;
    9 N- o( k  [, ]: e" f; N" aoptional int32 type = 2;
    3 ^0 q; O1 o/ w, ~}r- `% t; k2 H- m
    epeated PhoneNumber phone = 4;1 Q4 @9 ~* ?8 f5 s
    }
    ; i+ D5 ]2 W* m: ~步骤2 使用Google提供的Protocol Buffers编译器生成Java语言, 命令如下:, K" T$ x: K  F' \
    protoc -java_out=. person.proto
    ' t. f8 Y" Y5 ~; H/ |7 c注意, 上面的命令运行时的当前路径是person.proto所在目录。, `3 \% R* H' e: p/ N2 M& r( U9 s
    步骤
    3 使用Protocol Buffers库提供的API编写应用程序。 该例子创建了一个Person对象, 先将该对象保存到文件example.txt) H0 E. W  M% J
    中, 之后又从文件中读出并打印出来。
    : u$ L$ W9 z6 O3 K& V& J* [
    public class ProtocolBufferExample {5 r! o) V9 M" Z( R
    static public void main(String[] argv) {
    2 F- A7 S. Y' `/ JPerson person1 = Person.newBuilder(), g5 V5 Z. m/ n3 J9 M- d
    .setName("Dong Xicheng")
    " s- d9 p! H" |. ]4 d.setEmail("dongxicheng@yahoo.com")1 E' y6 m3 ^- [% Q& U. H
    .setId(11111)
    + q. l2 I9 o6 k6 h; U5 n9 `. ?% H" ^.addPhone(Person.PhoneNumber.newBuilder()
    . p" z* f" F! b$ z( l' s.setNumber("15110241024")
    : k3 q( [$ I; M5 u) ?) r, g+ ?.setType(0))
    4 P. N5 P* \+ j2 E.addPhone(Person.PhoneNumber.newBuilder()
    ' K: Q4 y" L& w  `" i1 Q.setNumber("01025689654")
    + S% k) c2 q' f.setType(1)).build();, [2 d1 {/ N0 I
    try {
    " B" ~6 k3 |9 \+ B5 V* `FileOutputStream output = new FileOutputStream("example.txt");0 c1 X3 j! A4 n" b  i, i3 E
    person1.writeTo(output);. w, K: h# g4 w( M( ?, X5 T
    output.close();$ T9 I: w. Q9 R0 v  c! |& }
    } catch(Exception e) {
    , g( n- w6 U" b' a# I- \7 |0 m  XSystem.out.println("Write Error");8 _) U! C2 j& \( R5 K" g; `  V
    } t- C1 N; V0 b/ s
    ry {8 R3 X7 t# A3 W0 m- f
    FileInputStream input = new FileInputStream("example.txt");* U( l$ `6 b6 M
    Person person2 = Person.parseFrom(input);
    ! K+ f- I, w4 n. q& {System.out.println("person2:" + person2);; z1 Q" }- t2 n3 n2 [
    } catch(Exception e) {
    & G6 N1 X3 M7 F8 Y, YSystem.out.println("Read Error!");3 }% T' l6 i0 B2 r
    }' j3 w$ l( i8 q4 Q8 G% I' y, m
    }  T$ K! L- ]4 v9 g7 T! z
    }' g0 d: h) y1 }6 S& a: x: y) y  b
    YARN中, 所有RPC函数的参数均采用Protocol Buffers定义的, 相比MRv1中基于Writable序列化的方法, Protocol Buffers的引
    9 ~: t( P/ v* e. J入使得
    YARN在向后兼容性和性能方面向前迈进了一大步。/ B! Z6 @* E* C) k* l* f
    除序列化
    /反序列化之外, Protocol Buffers也提供了RPC函数的定义方法, 但并未给出具体实现, 这需 要用户自行实现 [2] , 而
    / z3 r$ e& P- u# Q) D
    YARN则采用了MRv1Hadoop RPC库, 举例如下:
    2 F) @# r9 |: J5 Q! ?. i8 S
    service ContainerManagerService { //这是YARN自带的ContainerManager协议的定义# o, h7 X# k7 {; S8 x9 u
    rpc startContainer(StartContainerRequestProto) returns (StartContainerResponseProto);
    7 h" {8 e0 L+ E1 D: a4 K! W+ Grpc stopContainer(StopContainerRequestProto) returns (StopContainerResponseProto);
    & @. ~9 F9 o% Z7 D; Q* Y& m/ Zrpc getContainerStatus(GetContainerStatusRequestProto) returns (GetContainerStatusResponseProto);& j# ^# M9 ~. w; f
    }
    5 P& R5 H- O/ V, h在第2章中, 介绍了YARN中的所有RPC协议, 而这些协议全是使用Protocol Buffers定义的, 具体如下:
    + ~3 M$ Q( x" h  ?  J
    ❑applicationmaster_protocol.proto: 定义了AMRM之间的协议—Application-MasterProtocol, E$ W6 g" B8 d! J% \( ?
    ❑applicationclient_protocol.proto: 定义了JobClient( 作业提交客户端) 与RM之间的协议—ApplicationClientProtocol
    ' y. U. M) J  j
    ❑containermanagement_protocol.proto: 定义了AMNM之间的协议—Container-ManagementProtocol
    " I. G6 A/ g2 ]
    ❑resourcemanager_administration_protocol.proto: 定义了Admin( 管理员) 与RM之间的通信协议* s1 J0 h1 w* W/ m
    ResourceManagerAdministrationProtocol
    3 T9 u- i, s% ~' R9 x/ I
    ❑yarn_protos.proto: 定义了各个协议RPC的参数。
    5 f% R* A' {. N5 x3 P4 q  a5 ~
    ❑ResourceTracker.proto: 定义了NMRM之间的协议—ResourceTracker+ Y. G  g0 n: T
    除了以上几个内核中的协议,
    YARN还使用Protocol BuffersMapReduce中的协议进行了重新定义:
    & s* P4 A+ @# S9 \$ y
    ❑MRClientProtocol.proto: 定义了JobClient( 作业提交客户端) 与MRAppMaster之间的协议—MRClientProtocol
      G; T9 W9 y& `3 L: h8 a
    ❑mr_protos.proto: 定义了MRClientProtocol协议的各个参数。! T6 _2 `3 D' \
    3.2.2 Apache Avro
    1 E, |: I& e# O0 {# P! o
    Apache Avro [3] Hadoop下的一个子项目。 它本身既是一个序列化框架, 同时也实现了RPC的功能。, ]" z/ \; |1 ^7 X: D
    Avro官网描述Avro的特性和功能如下:
    # P) o  e, D; s# |
    ❑丰富的数据结构类型;
    - y: C" {( R. I❑快速可压缩的二进制数据形式;0 T% ^5 D. ]. ^& C2 C# l( k
    ❑存储持久数据的文件容器;, Z1 \: M( h1 K; m1 \4 x
    ❑提供远程过程调用RPC
    + j: i$ z5 S- h. E; H+ i$ d
    ❑简单的动态语言结合功能。
    ! N* R. j& {' o) I相比于
    Apache Thrift GoogleProtocol BuffersApache Avro具有以下特点:
    # I4 Y9 D& v0 B- y- p6 ^
    ❑支持动态模式 Avro不需要生成代码, 这有利于搭建通用的数据处理系统, 同时避免了代码入侵。
    ( g9 z6 W- b  b  m
    ❑数据无须加标签 。 读取数据前, Avro能够获取模式定义, 这使得Avro在数据编码时只需要保留更少的类型信息, 有利于1 G: U4 A) ^/ r& v" [  J
    减少序列化后的数据大小。
    * g! k4 u5 [% v- D
    ❑无须手工分配的域标识 ThriftProtocol Buffers使用一个用户添加的整型域唯一性定义一个字段, 而Avro则直接使用域$ J' M( v! c8 E- O. M: y
    名, 该方法更加直观、 更加易扩展。
    6 T) E+ R1 M( O, A! l3 V编写一个
    Avro应用也需如下三步:* e  e/ z& K6 F3 A3 o! X
    1) 定义消息格式文件, 通常以avro作为扩展名;* ?7 a- j: H1 ?' |
    2) 使用Avro编译器生成特定语言的代码文件( 可选) ;  f+ o+ o$ }' {2 `8 \4 n/ p
    3) 使用Avro库提供的API来编写应用程序。
    7 m3 a+ P8 x+ V  x8 a下面给出一个使用实例。, T7 T% ]" z1 ^( \3 o9 r& o
    步骤
    1 定义消息格式文件person.avro, 该文件描述了通讯录中某个人的基本信息, 内容如下:( v0 |" J: q2 e4 w! Y2 l7 O3 ~
    {"namespace": "com.example.tutorial",
    8 T$ {/ O* ^) K" S, a"type": "record",7 ?5 V: \) Y* ~: F+ _+ d
    "name": "Person",
    : T: |+ P- X2 K8 Y7 V1 r"fields": [; O" s. g! ^( T' \* v
    {"name": "name", "type": "string"},
    : {! |! ^) T* F{"name": "id", "type": "int"},
    : m2 Q$ L/ H$ B0 U- p0 I  |! h{"name": "email", "type": ["string", "null"]},
    $ ?. z% b8 o8 {/ V9 C* m$ H$ ]{"name": "phone", "type": {"type": "array",, y& F& u7 ~, G
    "items": {"type": "record", "name": "PhoneNumber",$ n9 _$ E; k% [1 J2 C2 }
    "fields": [
    ) f( I5 k: k0 r$ J. |{"name": "number", "type": "string"},
    6 F: K- P- H' v{"name": "type", "type": ["int", "null"]}
    8 x4 Y  o( y( `' j+ G) Q0 ?]/ E- {( {/ R8 K4 m0 a  m
    }
    4 K  c3 l# _/ `. ]0 o9 J}, z; [! I! F" |! k
    }]* J% T& o  u+ {( d) {
    }
    % y4 z: [9 ?' F, B& J步骤2 使用Avro编译器生成Java语言, 命令如下:; ?1 n1 C; ], G1 s
    java -jar avro-tools-1.7.4.jar compile schema person.avro .& p, F2 n- P! C$ C
    注意, 上面的命令运行时的当前路径是person.avro所在目录。
    3 l; x! i5 w. x; S2 M1 Q+ \步骤
    3 使用Avro库提供的API来编写应用程序。 该例子创建一个Person对象, 先将该对象保存到文件example.txt中, 之后从文3 ]6 H' i7 R$ J6 k2 G% y4 E  O
    件中读出并打印。
    ' W, m' @* v, Y$ R+ h
    public class AvroExample {
    1 b% t3 d( s# L" o* i) Dstatic public void main(String[] argv) {
    - ?5 U4 y7 W! N& s. d+ s/ ^) ZPhoneNumber phoneNumber1 = PhoneNumber.newBuilder()
    + k+ p1 b* a6 T2 a+ S1 }.setNumber("15110241024")  k( |6 l3 X( @) ~5 S
    .setType(0).build();: U. K5 f+ G( d; X2 m3 o# |) w
    PhoneNumber phoneNumber2 = PhoneNumber.newBuilder()' V, Q) H/ W; g: C" ~
    .setNumber("01025689654")
    , f( d# q, ]* a7 E.setType(1).build();# I1 R. J' f1 O/ c! r4 m
    List<PhoneNumber> phoneNumbers = new ArrayList<PhoneNumber>();
    & q' g+ B: N; ]9 `$ o; j. }% ^2 pphoneNumbers.add(phoneNumber1);
    3 Q. _8 {, q4 {3 u) i/ l. r7 D0 [' U4 i/ IphoneNumbers.add(phoneNumber2);% N/ Y; j1 Y! N2 z  F+ _$ M
    Person person = Person.newBuilder()
    3 N/ j. w6 i; s6 ?& [.setName("Dong Xicheng")
    * x! w1 v" l' A) A: w0 y.setEmail("dongxicheng@yahoo.com")) E6 y* O. Q# f4 P" k
    .setId(11111)
    ) a, m& E4 E6 L* O1 n- @.setPhone(phoneNumbers).build();: u# m# e9 T9 k
    File file = new File("person.txt");
    * F: e( o" d" u' ^  @try {! j7 t, C9 L$ E& t& B; _
    DatumWriter<Person> personDatumWriter = new SpecificDatumWriter<Person>(Person.class);  ]6 Q! f0 T8 H0 R9 D
    DataFileWriter<Person> dataFileWriter = new DataFileWriter<Person>(personDatumWriter);
    & B. m3 v- X- h3 P; A' P' hdataFileWriter.create(person.getSchema(), file);
    8 X6 d9 A5 S) b( x- ndataFileWriter.append(person);
    * {" i* l6 {2 g! `+ [dataFileWriter.close();' w* L4 x- K9 }; C% t5 M8 @
    } catch(Exception e) {
    / `0 E( g% I9 j) o5 @5 A8 ~System.out.println("Write Error:" + e);
    8 N6 L5 ?% g0 i/ b+ f* |5 y}t- J+ C( s  X7 _9 }
    ry {
    - z2 S7 o5 Q/ ^- JDatumReader<Person> userDatumReader = new SpecificDatumReader<Person>(Person.class);. A7 R5 H+ f4 R" j* g2 H- c. {' z
    DataFileReader<Person> dataFileReader = new DataFileReader<Person>(file, userDatumReader);& [6 L1 @: j8 D! R
    person = null;
    4 r/ j0 E; g+ j7 B) F% ~9 ], O% cwhile (dataFileReader.hasNext()) {
    ( y7 D9 Q/ k5 jperson = dataFileReader.next(person);
    / z" g$ P( g# k6 l+ H) o' p# OSystem.out.println(person);
    2 c# S2 ^( `2 c6 X% f  f4 l}; Z# A. R( z: q2 N
    } catch(Exception e) {# n3 m2 ]! \1 [0 `/ F( d4 U
    System.out.println("Read Error:" + e);
    4 Y1 w2 p5 P- \3 s. d' z8 v. ?}" \1 z4 r+ J# W! X6 t& o! ~( C5 S
    }) i5 T$ J* ^- V+ l) N
    }
    / ~3 s8 ~; ~0 {! k5 ~% Y3 Q$ F如果不想编译person.avro文件, 需要使用另外一套应用程序接口, 具体可参 考官方文档 [4] ' \7 m, o  d1 f
    Apache Avro最初是为Hadoop量身打造的RPC框架, 考虑到 稳定性 [5] YARN暂时采用Protocol Buffers作为序列化库, RPC
    . I  v; ?" `+ M4 l+ i* d使用
    MRv1中的RPC, 而Avro则作为日志序列化库使用( 将在第8章介绍) 。 在YARN MapReduce中, 所有事件的序列化/反序列化: U4 W: _5 c) ~% x8 d$ L
    均采用
    Avro完成, 相关定义在Events.avpr文件中, 举例如下:
    0 j4 J0 q2 \* V. m
    {"namespace": "org.apache.hadoop.mapreduce.jobhistory",3 z* \5 E9 ?( S. P
    "protocol": "Events",4 v7 g) H; I2 z- K* _' `5 o0 `  g
    "types": [1 y6 }, {' E% u. z0 W7 q4 G% u
    …{
      j, \1 R3 J0 ?' H- ~) t"type": "record", "name": "JobInfoChange",2 B% O" o- w0 n7 Y1 O( g
    "fields": [# {( [/ G! M( n+ R- ]
    {"name": "jobid", "type": "string"},
    ) Q4 g" P% a4 ]) l{"name": "submitTime", "type": "long"},
    8 {4 Z# m1 R$ P! d7 T4 F# M{"name": "launchTime", "type": "long"}6 j! x  R" Z9 I! O, n' L* `3 [5 X* q8 x
    ]: {8 E4 W% \$ |$ V/ ~" H" q% e
    },
    % R2 Z0 k. X& o3 |# ~) b9 L$ {$ V{"type": "record", "name": "JobPriorityChange",7 ]) a- u) o' L$ `; ?+ @
    "fields": [
    + h0 a# ?# ~4 G1 I- u- P{"name": "jobid", "type": "string"},: j" Z! c! j4 v+ ?' e: c
    {"name": "priority", "type": "string"}
    1 X/ D( _8 |! M# z: K8 U, C; v" X]+ C6 f5 ~: {4 v8 z- G
    },/ \0 l: a  l# b7 Y  |. i$ Y
    {"type": "record", "name": "JobStatusChanged",1 D; b+ h& t) ?
    "fields": [# C: N" o4 b% p2 q7 @% X( b* p
    {"name": "jobid", "type": "string"},) ?/ \( N) Y  n) N9 v5 V
    {"name": "jobStatus", "type": "string"}8 y* ?% d: ]) z1 W# j
    ]5 N: `2 u9 z3 A8 M# o$ x, H. v
    },
    2 W# P  E" L6 _7 E/ |…]
    / X7 Y3 e7 K! O* M  e}/ P& R( U& M4 G' K* ]! A1 q0 S
    [1] 参见网址http://code.google.com/p/protobuf/+ x, S2 q0 G* U2 G+ N% n
    [2] 可参考第三方开源实现, 网址为http://code.google.com/p/protobuf/wiki/ThirdPartyAddOns, }1 U* M) q3 m" y9 ^
    [3] 参见网址http://avro.apache.org/
    : c4 c0 q  S1 H$ k& X
    [4] 参见网址http://avro.apache.org/docs/current/gettingstartedjava.html- m/ E! f' ?2 l( h& f9 t: w
    [5] YARN项目启动时, Apache Avro尚不成熟, 存在各种问题。  
    & k8 F5 Z. `% k7 k2 l& A
    2 ^+ o+ R6 o! y( N  V' Y. q" h# m4 s$ r' s
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|Archiver|手机版|小黑屋|Java自学网

    GMT+8, 2025-2-23 04:46 , Processed in 0.069228 second(s), 26 queries .

    Powered by Javazx

    Copyright © 2012-2022, Javazx Cloud.

    快速回复 返回顶部 返回列表