|
3.2 第三方开源库
6 ^/ ^0 r2 r6 j" C1 x3.2.1 Protocol Buffers
8 a. }; x- M9 |# l% g+ m; {Protocol Buffers [1] 是一种轻便高效的结构化数据存储格式, 可以用于结构化数据序列化/反序列化。 它很适合做数据存储或
. O4 D- I, p) pRPC 的数据交换格式, 常用作通信协议、 数据存储等领域的与语言无关、 平台无关、 可扩展的序列化结构数据格式。 目前支持, ^: v9 L; p8 Y, }9 }8 h8 y
C++、 Java、 Python三种语言。 在Google内部, 几乎所有的RPC协议和文件格式都是采用Protocol Buffers。
9 A5 ~8 @3 c" ]7 v5 F7 l相比于常见的XML格式, Protocol Buffers官方网站这样描述它的优点:
* x) [) W7 @5 C5 u4 c1 k( H❑平台无关、 语言无关;
/ i" p6 ~" o6 y5 N5 n❑高性能, 解析速度是XML的20~100倍;7 m) U0 x. W* V( `) K
❑体积小, 文件大小仅是XML的1/10~1/3;
- H+ W7 q* Q+ r( ?5 V❑使用简单;
: ?; A3 D) w6 S! _, v+ b$ g9 _❑兼容性好。
% k! E# @' w, Y/ H7 x1 B0 x/ k通常编写一个Protocol Buffers应用需要以下三步:
' s3 ~+ ], j. d0 i: m+ k+ i1) 定义消息格式文件, 通常以proto作为扩展名;! x% r# D: p" `, K
2) 使用Google提供的Protocol Buffers编译器生成特定语言( 目前支持C++、 Java、 Python三类语言) 的代码文件;* c& t) X8 M; J& Z9 b. y! y# o
3) 使用Protocol Buffers库提供的API来编写应用程序。4 R: v) K! V. w @- w. _/ d+ y! r4 F
为了说明Protocol Buffers的使用方法, 下面给出一个简单的实例。" }+ v y0 U2 Q1 }- }
该实例中首先定义一个消息格式文件person.proto, 描述了通讯录中一个人的基本信息, 接着用Protocol Buffers提供的方法将
4 o6 f/ X s6 J% r) M" y% u# r一个人的信息写入文件。' P3 b$ n; K) }2 B% `0 |& R4 f
步骤1 定义消息格式文件person.proto, 该文件描述了通讯录中某个人的基本信息, 内容如下:4 U) c& a# y# _" @
package tutorial; //自定义的命名空间
7 Y6 X) T% l1 I! T1 Boption java_package = "com.example.tutorial"; //生成文件的包名
! O2 x& D8 K. F( T6 s, @0 [option java_outer_classname = "PersonProtos"; //类名/ Y( V3 u; ~$ W5 A& t! H
message Person { //待描述的结构化数据
- I4 q9 k1 i: b& Hrequired string name = 1; //required表示这个字段不能为空
% i2 p' R+ c; ^+ U/ @ g' V0 arequired int32 id = 2; //数字“2”表示字段的数字别名
8 |! X5 {; X7 I |2 Boptional string email = 3; //optional表示该字段可以为空
# \5 `/ Z8 ?: h6 `* b( e) _* Qmessage PhoneNumber { //内部message
& d$ @0 Y% O3 b$ F4 T, ]3 e7 Y( krequired string number = 1;- n# C$ C6 J" w! s: Z9 k( F2 o$ y
optional int32 type = 2;
% Y! {$ s+ P% u' w9 G9 @}r
9 d) X! x6 O1 K8 hepeated PhoneNumber phone = 4;
* K9 I( R; {# E# s- `}
! H* C* w( s7 V步骤2 使用Google提供的Protocol Buffers编译器生成Java语言, 命令如下:9 ~7 u' P9 ?0 h
protoc -java_out=. person.proto
7 E; S# F6 J' S9 b# o/ m+ P. l注意, 上面的命令运行时的当前路径是person.proto所在目录。6 k2 H+ `! }; U6 t0 L
步骤3 使用Protocol Buffers库提供的API编写应用程序。 该例子创建了一个Person对象, 先将该对象保存到文件example.txt
4 W3 E& n% G0 g) b中, 之后又从文件中读出并打印出来。
, i) N9 `5 H8 ^public class ProtocolBufferExample {
, V$ g+ u, R, c4 ~" e, G* v# Fstatic public void main(String[] argv) {! f5 L2 z$ A' a: T+ h5 N" `
Person person1 = Person.newBuilder()8 [/ ]( `. o) c/ [* N
.setName("Dong Xicheng")
8 o: i9 M1 d0 @5 y+ {.setEmail("dongxicheng@yahoo.com")
% M5 w+ X X3 u% K) c9 k.setId(11111)
2 \8 O. P, Z1 S5 N. y5 D.addPhone(Person.PhoneNumber.newBuilder()* D5 [5 C. k6 I, J- P, C1 W
.setNumber("15110241024")
- X r9 W# ^) r6 b* L) {.setType(0))
: i$ {. m/ r! W" j# _9 k8 S$ O.addPhone(Person.PhoneNumber.newBuilder()% m; L& O& {& n, A- Y2 h
.setNumber("01025689654"); k R8 q7 `% [# G
.setType(1)).build();
) v& c2 a- e4 Ltry {4 I) v- U5 X% ?% q
FileOutputStream output = new FileOutputStream("example.txt");
/ p! n$ W7 x4 mperson1.writeTo(output);
' M5 a; D3 ]4 V( ^& \output.close();
6 T. O2 O1 g5 @} catch(Exception e) {
1 N" U! v$ \( v" h" p1 H' \7 WSystem.out.println("Write Error! "); P3 o$ B0 F" `8 a
} t* i" m- b8 ?! H2 Q) P
ry {6 f; b$ a) _9 S& @' N" I7 h
FileInputStream input = new FileInputStream("example.txt");
1 Q% N3 _6 l( \# w/ ~: V- APerson person2 = Person.parseFrom(input);
9 `9 `6 q. M1 WSystem.out.println("person2:" + person2);
4 R9 N5 I A* E, `5 _2 ?9 b% f( T} catch(Exception e) {! j9 |2 s8 ] J* z
System.out.println("Read Error!");
1 t: c$ ]7 s3 S) d0 d- w0 G, m}! F3 K- r( R4 W0 T, c8 R, m9 Z
}
: k# D( a! W" v% D}( S6 j" Y' ]- u+ e! @
在YARN中, 所有RPC函数的参数均采用Protocol Buffers定义的, 相比MRv1中基于Writable序列化的方法, Protocol Buffers的引
& q) i) k' `, ?, E4 Z) Y: j入使得YARN在向后兼容性和性能方面向前迈进了一大步。
7 J. i( ^1 Y. d2 g7 [除序列化/反序列化之外, Protocol Buffers也提供了RPC函数的定义方法, 但并未给出具体实现, 这需 要用户自行实现 [2] , 而/ ~' ?$ M% F* S* J
YARN则采用了MRv1中Hadoop RPC库, 举例如下:: J2 r6 Q- u6 C" C
service ContainerManagerService { //这是YARN自带的ContainerManager协议的定义 I4 t0 T) b C3 Z7 q
rpc startContainer(StartContainerRequestProto) returns (StartContainerResponseProto);1 k$ r9 t: O% B6 M% \3 ^' L
rpc stopContainer(StopContainerRequestProto) returns (StopContainerResponseProto);5 W, s5 W; k4 ^8 Y; p
rpc getContainerStatus(GetContainerStatusRequestProto) returns (GetContainerStatusResponseProto);
- }6 _% @1 X8 g; ^/ a m. Q2 m N}
* k$ H% } E' r7 y7 P在第2章中, 介绍了YARN中的所有RPC协议, 而这些协议全是使用Protocol Buffers定义的, 具体如下:
2 G# P" P3 ^4 w( K❑applicationmaster_protocol.proto: 定义了AM与RM之间的协议—Application-MasterProtocol。
6 P6 g2 z3 K% l2 U! ]1 i❑applicationclient_protocol.proto: 定义了JobClient( 作业提交客户端) 与RM之间的协议—ApplicationClientProtocol。
5 k1 T# u2 q0 \$ Y❑containermanagement_protocol.proto: 定义了AM与NM之间的协议—Container-ManagementProtocol。4 s8 b& e1 l8 u( h! v, [8 \6 V% |
❑resourcemanager_administration_protocol.proto: 定义了Admin( 管理员) 与RM之间的通信协议—
c. x p& {: p" W8 ^$ tResourceManagerAdministrationProtocol。( ~1 m1 E, |5 w2 F: B
❑yarn_protos.proto: 定义了各个协议RPC的参数。
+ d+ P8 p6 s, T! f0 X) j! n6 l❑ResourceTracker.proto: 定义了NM与RM之间的协议—ResourceTracker。. \% d( e8 \: S7 g& L& r" [ [; u
除了以上几个内核中的协议, YARN还使用Protocol Buffers对MapReduce中的协议进行了重新定义:; n) [! [2 X% c
❑MRClientProtocol.proto: 定义了JobClient( 作业提交客户端) 与MRAppMaster之间的协议—MRClientProtocol。
% H: @# S8 j0 |% i# R# W7 ^0 F3 d3 i5 x3 I& }❑mr_protos.proto: 定义了MRClientProtocol协议的各个参数。; i* ~( K& ]6 i- R3 N5 s/ k$ b
3.2.2 Apache Avro
' j2 g" i& C, Y$ sApache Avro [3] 是Hadoop下的一个子项目。 它本身既是一个序列化框架, 同时也实现了RPC的功能。
: b" O6 {+ J( }& w) rAvro官网描述Avro的特性和功能如下:
3 z) g \) G4 X5 }! T❑丰富的数据结构类型;
0 g, C3 U" y% u4 g❑快速可压缩的二进制数据形式;
# A( [9 z3 J9 q# m8 ?❑存储持久数据的文件容器;# _ f" ~; W. |) `% k D" V
❑提供远程过程调用RPC;7 c* `( R% x4 D, B0 H$ Z3 h" p
❑简单的动态语言结合功能。# O2 y7 `) h( G! H
相比于Apache Thrift 和Google的Protocol Buffers, Apache Avro具有以下特点:& R( E/ v* }& m1 Z$ N o5 T
❑支持动态模式 。 Avro不需要生成代码, 这有利于搭建通用的数据处理系统, 同时避免了代码入侵。
# X% E4 d0 H% R; c❑数据无须加标签 。 读取数据前, Avro能够获取模式定义, 这使得Avro在数据编码时只需要保留更少的类型信息, 有利于* Y8 [5 W$ g! q3 ^ Y. w
减少序列化后的数据大小。
: l2 ~# O1 s+ q; [' E$ P❑无须手工分配的域标识 。 Thrift和Protocol Buffers使用一个用户添加的整型域唯一性定义一个字段, 而Avro则直接使用域
# f2 u5 n, I4 k4 A- R9 o名, 该方法更加直观、 更加易扩展。; y# I0 Z& J/ P7 G) q
编写一个Avro应用也需如下三步:+ q( @9 }& D; A- D% R
1) 定义消息格式文件, 通常以avro作为扩展名;
$ Q0 o* g. D- ]/ h2) 使用Avro编译器生成特定语言的代码文件( 可选) ;
% s. p* j9 x1 N1 w/ t# t Y) i3) 使用Avro库提供的API来编写应用程序。: S: z2 z* g2 f
下面给出一个使用实例。
( w- g/ S2 X! z4 I' Q/ j步骤1 定义消息格式文件person.avro, 该文件描述了通讯录中某个人的基本信息, 内容如下:
2 i$ |" J7 I- |9 _, i% {8 A& G9 q{"namespace": "com.example.tutorial",
0 i* r- }8 Q# j% e# ~3 L6 F1 M- s& _"type": "record",* G% [( M3 P/ _. Y4 C( Q7 P6 }% V! r! r
"name": "Person",
, t5 M* u* v8 c! }"fields": [
8 H0 c* o# G E& g% z# P{"name": "name", "type": "string"},# z; A7 D9 Y P. u1 ?4 |, l5 T
{"name": "id", "type": "int"}," d: O* {7 C0 A; ]/ A
{"name": "email", "type": ["string", "null"]},
9 r7 n/ A1 X7 e$ z s* R{"name": "phone", "type": {"type": "array",
' h- S4 X; X9 o0 T7 ~0 V; v- U e3 T"items": {"type": "record", "name": "PhoneNumber",1 `. Z+ j7 Z/ N4 O6 a
"fields": [
5 w1 o6 q7 g) A4 R{"name": "number", "type": "string"},2 H1 d0 ^9 V" @3 A& i" w
{"name": "type", "type": ["int", "null"]} w$ y' J T% p9 f% L5 |- b
]/ }9 `( a8 \- M+ g/ F
}
8 d x7 K( d& n" @3 \}; t/ w3 ]; F4 R0 A6 x' g& m
}]
- c% X9 G$ Z; n# f0 C}9 y" g( Q3 {- l$ o F# N
步骤2 使用Avro编译器生成Java语言, 命令如下:& N( ]# ~4 L+ f0 S" ?4 z
java -jar avro-tools-1.7.4.jar compile schema person.avro .
2 Q+ i ]2 N& M- \/ I注意, 上面的命令运行时的当前路径是person.avro所在目录。
2 K- B. V; A- o8 [0 I步骤3 使用Avro库提供的API来编写应用程序。 该例子创建一个Person对象, 先将该对象保存到文件example.txt中, 之后从文( X! s- b/ U( z
件中读出并打印。; t* }& A5 c1 d
public class AvroExample {; A8 W/ f1 ~8 [! u4 W6 r
static public void main(String[] argv) {
) h* A4 h+ p$ S; l s, ]: b7 }PhoneNumber phoneNumber1 = PhoneNumber.newBuilder()
. }, P1 J# A7 [* ]: p.setNumber("15110241024")5 H3 Q o! l0 r$ ]; i. l
.setType(0).build();& e/ N O+ D. q6 V; ?6 P
PhoneNumber phoneNumber2 = PhoneNumber.newBuilder()1 ^. d! D7 g% T) c/ E
.setNumber("01025689654")1 X' R+ }, _6 n M* o2 I2 j8 U
.setType(1).build();
4 v) K& f4 e) i+ r4 a" @$ cList<PhoneNumber> phoneNumbers = new ArrayList<PhoneNumber>();
& ^8 Z1 @' W& m: l; q5 ophoneNumbers.add(phoneNumber1);
" l* {& s3 e# ~) o: J4 z0 ? mphoneNumbers.add(phoneNumber2);3 o( t8 B" C0 R6 o( ^) Q, L. u! t6 }
Person person = Person.newBuilder()
; M/ t" H" }+ ~% M0 R.setName("Dong Xicheng")
- _5 o4 w% F5 O' H# x.setEmail("dongxicheng@yahoo.com")
# H7 w3 X0 r. _% c) S.setId(11111)
& u, y) b- s' ?) E1 Y.setPhone(phoneNumbers).build();
& S' _. U% q0 G/ o) rFile file = new File("person.txt");2 M; Y! _+ z* m, g/ p" g# \
try {! a/ ~' c2 C* M8 f
DatumWriter<Person> personDatumWriter = new SpecificDatumWriter<Person>(Person.class);
4 a3 {5 p; ~ A& G% h8 CDataFileWriter<Person> dataFileWriter = new DataFileWriter<Person>(personDatumWriter);
0 E d& b4 |5 [dataFileWriter.create(person.getSchema(), file);
( s4 |% p! h& z+ n! F. j) e9 [9 _dataFileWriter.append(person);
: A8 w4 U' W. t0 g9 LdataFileWriter.close();
% S; [+ ?. a$ i5 ]7 X/ H} catch(Exception e) {! Y- R! C. `# _
System.out.println("Write Error:" + e);
" |5 `; V3 T7 j/ N# q}t
6 l. [0 r: a% \- S- kry {
. `; L. c3 `( F0 {% s8 `DatumReader<Person> userDatumReader = new SpecificDatumReader<Person>(Person.class);
: Y9 I6 y6 q1 aDataFileReader<Person> dataFileReader = new DataFileReader<Person>(file, userDatumReader);0 O4 K$ R/ ]$ C1 X8 F" {$ u
person = null;) ?1 M8 X1 R# }/ g
while (dataFileReader.hasNext()) {
* ~, [4 c- d& hperson = dataFileReader.next(person);
, z7 W9 O3 h- f/ DSystem.out.println(person);( S" @# M" a; ~% j0 N" |
}
2 p* i9 a* v0 ~9 U2 Q& k} catch(Exception e) {
$ V7 U; g2 @: P, {) C; k" mSystem.out.println("Read Error:" + e);& m: B" z$ {9 N, z; s# \
}
) S/ K6 F- V) ^/ j. f) b}
* q- ]. W1 ~, S) q}
& N: X4 b' l" Z$ S如果不想编译person.avro文件, 需要使用另外一套应用程序接口, 具体可参 考官方文档 [4] 。
) U' x# @" Y" h8 T: F8 rApache Avro最初是为Hadoop量身打造的RPC框架, 考虑到 稳定性 [5] , YARN暂时采用Protocol Buffers作为序列化库, RPC仍; `% \2 P+ M! }7 c# M& y G
使用MRv1中的RPC, 而Avro则作为日志序列化库使用( 将在第8章介绍) 。 在YARN MapReduce中, 所有事件的序列化/反序列化
7 I" w b6 y5 Q2 l1 h0 p5 }5 R均采用Avro完成, 相关定义在Events.avpr文件中, 举例如下:
3 @5 c, T ^- T4 F- q' q, o( d( ~{"namespace": "org.apache.hadoop.mapreduce.jobhistory",
: J! T4 V- j3 F6 ?"protocol": "Events",8 t- H: s3 T' P+ m6 g
"types": [; `2 y8 B- J# a7 }
…{9 ?" W# u4 ^. }1 r+ j
"type": "record", "name": "JobInfoChange",- L' Y9 F$ `" S
"fields": [! i/ L+ Z0 j8 `( u0 M7 u( J
{"name": "jobid", "type": "string"},# f2 J0 d. M$ t2 \. K% `# s
{"name": "submitTime", "type": "long"},5 b% z9 B+ @+ ]2 `( v5 V8 e
{"name": "launchTime", "type": "long"}
9 T3 |% u$ t/ t. }! T2 G]* z4 f/ ~1 r& Z: L! y, z
},0 l2 C0 U6 b$ t( ?+ a7 u
{"type": "record", "name": "JobPriorityChange",
0 l& n! P j# n. |! T"fields": [7 P" B6 D$ O. \$ k5 [2 e0 U
{"name": "jobid", "type": "string"}," A3 d" b( E' P. r; R8 g# g$ T% Q
{"name": "priority", "type": "string"}0 `. M6 P, g: S* L" p
]; S0 i: o. X6 ^0 B$ B) P" ^9 k9 R
},
% \4 a) J: a: r- S7 C9 g{"type": "record", "name": "JobStatusChanged",
; D2 e$ y/ U' _: R. C$ j"fields": [2 L& S7 r1 j- j' l3 k1 u
{"name": "jobid", "type": "string"},& U; e( [8 A4 _
{"name": "jobStatus", "type": "string"}. x. z) j, ^7 |( s
]6 U% t) [7 n$ `
},
. T# b( ~, h" a# x+ G/ |…]2 y! S4 f% I9 x- _" P# S2 [5 ?
}
e7 \" l( z7 }0 N3 E; n7 ^[1] 参见网址http://code.google.com/p/protobuf/。
' L' l$ Y6 S9 [, R% F# l[2] 可参考第三方开源实现, 网址为http://code.google.com/p/protobuf/wiki/ThirdPartyAddOns。
. Y# w7 I6 f3 I+ @( c[3] 参见网址http://avro.apache.org/。
" d+ M5 p+ p, [: H/ S2 r[4] 参见网址http://avro.apache.org/docs/current/gettingstartedjava.html。5 @6 F$ m6 P6 M6 {
[5] YARN项目启动时, Apache Avro尚不成熟, 存在各种问题。
9 u2 _( B! z/ _* _3 Y0 G
: e# w' N" q/ {* ^8 D( A, G/ U' R4 \! y$ d/ h: W7 m% T
|
|