|
3.2 第三方开源库
# [$ Z. X* ]% R l* @% Q3.2.1 Protocol Buffers# N1 x6 s4 Z- q# S# F
Protocol Buffers [1] 是一种轻便高效的结构化数据存储格式, 可以用于结构化数据序列化/反序列化。 它很适合做数据存储或
" i- c a( R1 LRPC 的数据交换格式, 常用作通信协议、 数据存储等领域的与语言无关、 平台无关、 可扩展的序列化结构数据格式。 目前支持9 _: z0 j% w, t Y3 P/ i
C++、 Java、 Python三种语言。 在Google内部, 几乎所有的RPC协议和文件格式都是采用Protocol Buffers。
' t3 y i7 ?7 n% b& B* }相比于常见的XML格式, Protocol Buffers官方网站这样描述它的优点:! w$ V0 G6 l8 T" S' X( m
❑平台无关、 语言无关;
?; ?4 B' T% H% v❑高性能, 解析速度是XML的20~100倍;5 S6 ^ o( x7 s2 O/ u3 u
❑体积小, 文件大小仅是XML的1/10~1/3;
- x% t+ P/ q! b+ c❑使用简单;6 N: T1 k# D1 N; g% G* ~1 ~
❑兼容性好。& s0 K4 ^4 s7 q" c: @1 `; g, p
通常编写一个Protocol Buffers应用需要以下三步:8 t+ q' w# Y6 L0 ~
1) 定义消息格式文件, 通常以proto作为扩展名;
' @) A7 I$ `( C3 C8 Z6 @2) 使用Google提供的Protocol Buffers编译器生成特定语言( 目前支持C++、 Java、 Python三类语言) 的代码文件;
; ^# w# y& f) S5 ?! e' P! A3) 使用Protocol Buffers库提供的API来编写应用程序。
2 G6 `6 f( I3 B5 G+ L为了说明Protocol Buffers的使用方法, 下面给出一个简单的实例。: I( q; K- S/ M' K
该实例中首先定义一个消息格式文件person.proto, 描述了通讯录中一个人的基本信息, 接着用Protocol Buffers提供的方法将# ~+ D" }. \* [& r
一个人的信息写入文件。
9 I' H! ]$ a3 k8 _步骤1 定义消息格式文件person.proto, 该文件描述了通讯录中某个人的基本信息, 内容如下:$ N/ H3 ?7 [( m* z- q0 ]: D; r
package tutorial; //自定义的命名空间
1 d; \* p% W' ]option java_package = "com.example.tutorial"; //生成文件的包名
d2 H o3 w" K2 R: Soption java_outer_classname = "PersonProtos"; //类名
6 }( X! I, n Y, n4 n6 ymessage Person { //待描述的结构化数据
4 T/ G, @2 L- i* Y$ r6 i" n' yrequired string name = 1; //required表示这个字段不能为空
; W9 d7 q- {$ }required int32 id = 2; //数字“2”表示字段的数字别名
. U* F' `% ]& b) B; ?optional string email = 3; //optional表示该字段可以为空
, p$ q0 s! T$ u5 Kmessage PhoneNumber { //内部message0 k/ l2 _2 Z2 f. ~ G# G, Z
required string number = 1;8 u; n- b- y: {
optional int32 type = 2;
& y) Y& S/ `' @! {}r
, Y+ B3 [( ?6 T- {* s! P" Vepeated PhoneNumber phone = 4;8 \5 b C& p# [. [" e3 w. `
}3 m( Z; V! \7 K1 R7 P
步骤2 使用Google提供的Protocol Buffers编译器生成Java语言, 命令如下:
: {- t% k6 z8 Eprotoc -java_out=. person.proto( [: F/ {9 S7 `9 w& g$ h6 I
注意, 上面的命令运行时的当前路径是person.proto所在目录。4 a# Y$ _2 ^2 y* D* P0 n" ~9 x# j
步骤3 使用Protocol Buffers库提供的API编写应用程序。 该例子创建了一个Person对象, 先将该对象保存到文件example.txt4 ^. {7 V3 w! j
中, 之后又从文件中读出并打印出来。; n" w' d/ Y5 C5 i5 ~ R
public class ProtocolBufferExample {
7 v4 q) A# z6 Z& W/ p9 L# A) p! gstatic public void main(String[] argv) {
% L! v1 u* E. `' {* b/ k7 iPerson person1 = Person.newBuilder()
. E# _' k& O0 _( m/ n, z/ }.setName("Dong Xicheng")" F; u& C( L- _+ L- p( t
.setEmail("dongxicheng@yahoo.com")
& q! K8 ?6 p+ x- V5 N a.setId(11111); m, z' N( ~0 \) B
.addPhone(Person.PhoneNumber.newBuilder()
& P1 k+ p e3 a: o4 N6 G+ p; H4 P.setNumber("15110241024")
" E' x# Z3 x. r; I8 [# t4 `$ h: T4 v.setType(0))
- N- R! q4 k) H& F" z4 |: m" E.addPhone(Person.PhoneNumber.newBuilder()
! Q+ q3 m( Q' R' N9 |.setNumber("01025689654")9 F+ S' t% A1 h' N1 t
.setType(1)).build();
/ N$ R" }* f! W# l3 S" _try {
c8 r& A, ` S( ~3 eFileOutputStream output = new FileOutputStream("example.txt");
" P6 i! |4 G8 S) z5 f: u5 Lperson1.writeTo(output);% K# ~2 z0 Y9 A; S+ C/ S
output.close();
; E" O4 i( l' N- S& `} catch(Exception e) {
! y7 f9 _# J1 s" h/ p' w, ~1 bSystem.out.println("Write Error! ");, C& {" ?# L- _
} t2 a4 E0 I% @" e8 M+ l/ L
ry {; m' \. j2 V: D' f9 D4 V
FileInputStream input = new FileInputStream("example.txt");
9 E) D/ j3 Z: j. H# h6 i" L6 ?, hPerson person2 = Person.parseFrom(input);; ~3 @ y7 q4 b
System.out.println("person2:" + person2);* M/ k* }8 p/ i* P
} catch(Exception e) {
+ N% `5 t- H, _- mSystem.out.println("Read Error!");$ i( L) A8 D! j/ p! ^, S
}9 }, a/ |* K [( i$ N
}
% @' h4 V; g+ O$ [2 Z}
6 ?) Q2 J$ Z; _1 N# \8 N9 k在YARN中, 所有RPC函数的参数均采用Protocol Buffers定义的, 相比MRv1中基于Writable序列化的方法, Protocol Buffers的引) K& V, N7 {; Q; T$ |
入使得YARN在向后兼容性和性能方面向前迈进了一大步。
7 `# P* _2 g( r- B3 }) W1 H: f9 @除序列化/反序列化之外, Protocol Buffers也提供了RPC函数的定义方法, 但并未给出具体实现, 这需 要用户自行实现 [2] , 而
) _4 S) l4 S) o9 IYARN则采用了MRv1中Hadoop RPC库, 举例如下:
: B: M* m% Y1 Q/ @& \) f/ q9 ]service ContainerManagerService { //这是YARN自带的ContainerManager协议的定义5 ?3 {# n5 d3 O5 q' V6 i
rpc startContainer(StartContainerRequestProto) returns (StartContainerResponseProto);
& C+ z( o3 n$ M' `& p' m: Z9 [rpc stopContainer(StopContainerRequestProto) returns (StopContainerResponseProto);
" `5 X3 H, b9 J9 b( ^& [rpc getContainerStatus(GetContainerStatusRequestProto) returns (GetContainerStatusResponseProto);
1 @5 D. w; P) }1 e3 g}
% ~& @3 I' @- M$ O' E1 b- e9 {$ K+ i在第2章中, 介绍了YARN中的所有RPC协议, 而这些协议全是使用Protocol Buffers定义的, 具体如下:
9 ]+ s5 C$ A: ]0 n2 M❑applicationmaster_protocol.proto: 定义了AM与RM之间的协议—Application-MasterProtocol。
( K- k$ E/ [$ X: ?, C. C9 R* h. y❑applicationclient_protocol.proto: 定义了JobClient( 作业提交客户端) 与RM之间的协议—ApplicationClientProtocol。/ M. @& r* f' |2 u4 z
❑containermanagement_protocol.proto: 定义了AM与NM之间的协议—Container-ManagementProtocol。' F# N. z% j5 n8 X/ \
❑resourcemanager_administration_protocol.proto: 定义了Admin( 管理员) 与RM之间的通信协议—# D2 {$ ]2 _$ H" N' W# S! Q7 E
ResourceManagerAdministrationProtocol。' G/ K* N+ ^ n0 t3 N9 l- g1 |
❑yarn_protos.proto: 定义了各个协议RPC的参数。
, x0 \7 W' | c' i. q6 ]1 t❑ResourceTracker.proto: 定义了NM与RM之间的协议—ResourceTracker。
1 n; v/ R7 P( d. x7 ? ~除了以上几个内核中的协议, YARN还使用Protocol Buffers对MapReduce中的协议进行了重新定义:2 g1 T, T, f D: I8 Z' s/ `' ?* |! W
❑MRClientProtocol.proto: 定义了JobClient( 作业提交客户端) 与MRAppMaster之间的协议—MRClientProtocol。
) I5 C- J" p6 _' q❑mr_protos.proto: 定义了MRClientProtocol协议的各个参数。
) m$ W$ e5 R; h1 k; }: u f3.2.2 Apache Avro
; C; l4 P5 o% w DApache Avro [3] 是Hadoop下的一个子项目。 它本身既是一个序列化框架, 同时也实现了RPC的功能。3 W# y5 B1 ]" J/ T, v$ L; G
Avro官网描述Avro的特性和功能如下:# @6 H! p, J7 l, s9 G
❑丰富的数据结构类型;
$ ?1 s) v4 w2 L- u6 _0 r! `) O+ y8 x❑快速可压缩的二进制数据形式;
?& G. o9 U6 ]" J; _% z: V❑存储持久数据的文件容器;
; K( ~$ f* {9 S( H% D- n" G❑提供远程过程调用RPC;
) l/ `: [3 m' H* f' I, ^( O/ X& K❑简单的动态语言结合功能。9 D8 X* \) z0 c! y( K: D' b
相比于Apache Thrift 和Google的Protocol Buffers, Apache Avro具有以下特点:
+ T6 q8 @' Z" ?/ D❑支持动态模式 。 Avro不需要生成代码, 这有利于搭建通用的数据处理系统, 同时避免了代码入侵。$ W5 [; u7 w& l" Q) c
❑数据无须加标签 。 读取数据前, Avro能够获取模式定义, 这使得Avro在数据编码时只需要保留更少的类型信息, 有利于) s0 u6 X( r5 l
减少序列化后的数据大小。 G5 z( C; }% F! ]- j7 k" T
❑无须手工分配的域标识 。 Thrift和Protocol Buffers使用一个用户添加的整型域唯一性定义一个字段, 而Avro则直接使用域, `6 A' m5 X: G: Z9 t
名, 该方法更加直观、 更加易扩展。# n+ ~4 `! N' p# |. O: P I
编写一个Avro应用也需如下三步:
( p* {0 c: _6 U/ d9 u1) 定义消息格式文件, 通常以avro作为扩展名;% }2 X2 j7 M: |
2) 使用Avro编译器生成特定语言的代码文件( 可选) ;) `; P. v( r5 B P2 a
3) 使用Avro库提供的API来编写应用程序。
0 D4 y! \$ a8 X, Z2 X+ i# i下面给出一个使用实例。
* T" j$ U% m4 e; Z, T0 I7 [步骤1 定义消息格式文件person.avro, 该文件描述了通讯录中某个人的基本信息, 内容如下:" [7 B# U7 t% u7 G* H9 d
{"namespace": "com.example.tutorial",5 S I: z* R; D
"type": "record",
2 E3 y- B0 A+ A6 W. J"name": "Person",
4 a6 T6 K8 x/ _. P' S( O4 @5 G. u% i/ i% Y"fields": [
- u& Z) {$ @* `$ S. F7 E{"name": "name", "type": "string"},0 j5 @* v% ^" _' S# B& o
{"name": "id", "type": "int"},' Z& o+ H* \8 W
{"name": "email", "type": ["string", "null"]},
0 u; O) a( Y4 T; x* p$ i3 K{"name": "phone", "type": {"type": "array",
, p& T) b( g& z) s) a"items": {"type": "record", "name": "PhoneNumber",* h V+ ? B; U1 K5 G$ A+ N
"fields": [
7 }3 f+ X, q5 B7 f; }" \{"name": "number", "type": "string"},
' D# J1 e, ^0 w: i# z p{"name": "type", "type": ["int", "null"]}
6 U" W& Y" I: T4 r, |]
& k. l+ Q9 o/ A% W2 C. a}8 @6 r; `- ?- J' ?
}8 y/ V5 K$ C; D) P/ f
}]
3 f: ~4 u' z0 d# l}
0 h) K) ^9 M5 M. s- \) H0 U步骤2 使用Avro编译器生成Java语言, 命令如下:
3 u3 K5 w8 X' jjava -jar avro-tools-1.7.4.jar compile schema person.avro .
3 V: Y- N0 _0 W" G- u& D1 h3 j注意, 上面的命令运行时的当前路径是person.avro所在目录。
, O( B6 k6 Z5 ]步骤3 使用Avro库提供的API来编写应用程序。 该例子创建一个Person对象, 先将该对象保存到文件example.txt中, 之后从文% |& l$ i, {9 E' d$ f
件中读出并打印。
! h! Y% m N# Jpublic class AvroExample {5 u/ h9 b9 b4 F" M
static public void main(String[] argv) {3 c) P5 s* q3 M+ o( e
PhoneNumber phoneNumber1 = PhoneNumber.newBuilder(); H1 _; ]0 c/ }& K) V. T- `
.setNumber("15110241024")2 T7 V. Z0 V5 s9 K+ h' U
.setType(0).build();
3 \1 V* i! K2 b! a: RPhoneNumber phoneNumber2 = PhoneNumber.newBuilder()
b$ d5 g2 x. I# X- u @! v( y! D.setNumber("01025689654")3 Y" w# T& Z, L
.setType(1).build();
, b% H5 ~6 Y( e6 vList<PhoneNumber> phoneNumbers = new ArrayList<PhoneNumber>();
5 b6 X1 M" u/ |- J* o( \phoneNumbers.add(phoneNumber1);
, O* ~2 E5 {% N& h5 L6 IphoneNumbers.add(phoneNumber2);8 W2 ?7 E4 I: R- |; @7 w
Person person = Person.newBuilder()
5 n n2 y- v1 s$ r9 Y9 _.setName("Dong Xicheng")
- {' p1 I* ]% H+ S: s) w5 N x4 G/ P.setEmail("dongxicheng@yahoo.com")* Y* F; K8 i q4 F( Y6 T
.setId(11111)
2 P2 M! \! X( Q; j- ^.setPhone(phoneNumbers).build();! G* ]+ X7 G# r: I2 [
File file = new File("person.txt");
7 y; X# h8 R1 _+ Jtry {9 |; T/ c7 f+ Y
DatumWriter<Person> personDatumWriter = new SpecificDatumWriter<Person>(Person.class);
& n1 O: @) C" M$ N3 T+ y" qDataFileWriter<Person> dataFileWriter = new DataFileWriter<Person>(personDatumWriter);3 \, f' _7 O; x& I& W7 @
dataFileWriter.create(person.getSchema(), file);
, i' y. M1 [# U2 TdataFileWriter.append(person);4 J" V v. t, Q
dataFileWriter.close();
3 [% q- M! g) u* B1 e& a} catch(Exception e) {
1 l: z+ f, r( i# bSystem.out.println("Write Error:" + e);$ r; k+ y" y6 |# O% v, {
}t
% T2 w: N* c! D" N* X. Yry {
6 X, p3 L/ P9 T! kDatumReader<Person> userDatumReader = new SpecificDatumReader<Person>(Person.class);
- j. _$ r' R, _, u4 aDataFileReader<Person> dataFileReader = new DataFileReader<Person>(file, userDatumReader);+ y2 c' T- h) Q G, W; E$ s4 O5 ^
person = null;
. O! i2 o0 p: f! \; }0 H1 qwhile (dataFileReader.hasNext()) {3 |: e8 p- C/ I5 ~; T
person = dataFileReader.next(person);% l" d% b7 D/ Z# A) z8 e6 V
System.out.println(person);
}3 Q: ?1 _' Z# V6 V2 ?}; c) k) k8 z, j" U) q! k; \ u8 `
} catch(Exception e) {+ e' r8 O3 o# ?2 C& K
System.out.println("Read Error:" + e);
9 m/ j- z; W, X y0 D}- e+ X7 Y: i9 V) _: f
}* T1 F: w. G k0 m) |" _
}
% h. ?5 _8 Z9 D/ c4 M0 B如果不想编译person.avro文件, 需要使用另外一套应用程序接口, 具体可参 考官方文档 [4] 。! E- i8 v3 y% y: V# ]( ?
Apache Avro最初是为Hadoop量身打造的RPC框架, 考虑到 稳定性 [5] , YARN暂时采用Protocol Buffers作为序列化库, RPC仍
% i G; r1 |/ d& N$ [' l使用MRv1中的RPC, 而Avro则作为日志序列化库使用( 将在第8章介绍) 。 在YARN MapReduce中, 所有事件的序列化/反序列化0 u+ C; x$ M" M2 y9 ]0 _7 H
均采用Avro完成, 相关定义在Events.avpr文件中, 举例如下:
: |3 u1 g7 Z' M! G9 ~; L# @{"namespace": "org.apache.hadoop.mapreduce.jobhistory",
2 g: \# {2 _+ D' R9 ]" S# z$ [) D4 x"protocol": "Events",
1 J8 o# C. P: v% Y) d& C% }% k"types": [. ]4 `& u) G( r6 f. T
…{# a7 Z0 C( _0 \# l
"type": "record", "name": "JobInfoChange",
" s) w+ c K7 p+ _1 W"fields": [: @* ^' a$ { P) D$ F2 g- Y8 m3 n
{"name": "jobid", "type": "string"},/ ?& f: m2 x3 w, i; P8 H
{"name": "submitTime", "type": "long"},: n3 Q) L9 J' m3 `( _5 W1 m
{"name": "launchTime", "type": "long"}
! U1 p! e! b( K/ @: P) b]
" W, ~0 z: t& {& B},
1 a- j: m7 E6 U8 C9 W- D' ~ q{"type": "record", "name": "JobPriorityChange",7 E) D0 V( i3 y- g( T
"fields": [7 @! c- i) }. r# G4 J# F+ P" U
{"name": "jobid", "type": "string"},& t( [- a% Z( N3 {) h: s- t
{"name": "priority", "type": "string"}
. j" d6 W+ {$ k]
8 f: B2 s+ D' [$ l4 H$ R' L},/ Z, v+ v! a6 Z/ O' t% \
{"type": "record", "name": "JobStatusChanged",8 q* G8 e$ n- U5 H. e3 k
"fields": [+ _5 k# Y% f' V) r* J
{"name": "jobid", "type": "string"},
2 D* H/ T4 Y: k: b7 t' d; G{"name": "jobStatus", "type": "string"} ]" @; R6 X+ ?, C. i/ ?8 _
]! h/ j6 `# j. s6 Q6 ~
},
6 n0 }1 x) }; w5 O…]
$ l! s, c! Y" s. D, N# ^}
g- m* l1 Q# g7 X6 f[1] 参见网址http://code.google.com/p/protobuf/。& N* |" l9 P0 S% X
[2] 可参考第三方开源实现, 网址为http://code.google.com/p/protobuf/wiki/ThirdPartyAddOns。2 G8 Z* H2 y- M. h# A! Y, Z* B
[3] 参见网址http://avro.apache.org/。* R0 j1 v" K4 _$ }$ e3 {
[4] 参见网址http://avro.apache.org/docs/current/gettingstartedjava.html。9 p8 L# x* O' S2 }; G5 J
[5] YARN项目启动时, Apache Avro尚不成熟, 存在各种问题。
$ y) d1 o. C3 Y: \6 G/ J% i3 v7 X& x1 K/ p. \. s( U
$ [5 h8 o4 B2 T |
|