|
3.2 第三方开源库
! W6 b. [4 S# Z# m$ ` u$ \ J8 G3.2.1 Protocol Buffers
) M6 k# p7 v2 h0 h8 ^% @Protocol Buffers [1] 是一种轻便高效的结构化数据存储格式, 可以用于结构化数据序列化/反序列化。 它很适合做数据存储或
" u5 ?0 G0 K6 e. s$ T0 [( m- wRPC 的数据交换格式, 常用作通信协议、 数据存储等领域的与语言无关、 平台无关、 可扩展的序列化结构数据格式。 目前支持
) k1 P7 A5 E6 z+ h( c* qC++、 Java、 Python三种语言。 在Google内部, 几乎所有的RPC协议和文件格式都是采用Protocol Buffers。- F1 j+ X; V/ N
相比于常见的XML格式, Protocol Buffers官方网站这样描述它的优点:, {0 H# p- ^9 Q* ~
❑平台无关、 语言无关;, |/ {7 [# {# G- p
❑高性能, 解析速度是XML的20~100倍;
1 ^6 N5 }) q2 E4 x, D❑体积小, 文件大小仅是XML的1/10~1/3;2 _7 L* k- e( Y; v; }+ Z
❑使用简单;* |8 |. ?0 u4 E1 C p) a4 j# s
❑兼容性好。. z6 T/ K7 w& z7 O9 K/ O) W$ y0 E) R
通常编写一个Protocol Buffers应用需要以下三步:$ g- A( @7 V9 J8 C6 ^- A8 K
1) 定义消息格式文件, 通常以proto作为扩展名;
4 S2 }7 `3 t$ v; R, }" {8 |. r2) 使用Google提供的Protocol Buffers编译器生成特定语言( 目前支持C++、 Java、 Python三类语言) 的代码文件;
9 G- g2 C: ^; H. T! r0 ]& ~1 |3) 使用Protocol Buffers库提供的API来编写应用程序。
( t. e6 |/ Z" g2 Y: U! u, w7 I为了说明Protocol Buffers的使用方法, 下面给出一个简单的实例。
0 b; p& }# C, t9 T, M该实例中首先定义一个消息格式文件person.proto, 描述了通讯录中一个人的基本信息, 接着用Protocol Buffers提供的方法将- ]1 c- \. L, v
一个人的信息写入文件。
9 M" ?, T! Y! ^2 i3 ^步骤1 定义消息格式文件person.proto, 该文件描述了通讯录中某个人的基本信息, 内容如下:4 m# {0 O" n- ^ A% S6 [
package tutorial; //自定义的命名空间
: v, n- W5 [7 Eoption java_package = "com.example.tutorial"; //生成文件的包名8 e+ ?% L% u6 N
option java_outer_classname = "PersonProtos"; //类名( Y& p: y0 R+ b+ \1 \6 W
message Person { //待描述的结构化数据
% u) b: ^& c6 m2 ?% i: Orequired string name = 1; //required表示这个字段不能为空, ?& S. M) x$ m% g$ d. n; q5 w
required int32 id = 2; //数字“2”表示字段的数字别名
3 \+ I9 _" l* u* j# w7 o8 Yoptional string email = 3; //optional表示该字段可以为空& p6 m: J& u4 k. w1 M+ U
message PhoneNumber { //内部message/ t/ U* k' k+ P2 h/ L
required string number = 1;
% s6 U* l* k0 Z" m( S3 |4 _. n: Y4 }optional int32 type = 2;
9 B1 x9 t& J0 A}r
; s- M; N% {) N7 k6 _+ f, w+ Xepeated PhoneNumber phone = 4;
- J9 Z, G! Z% c. m; \}9 F- E& y9 k2 R# f3 ^4 I2 E
步骤2 使用Google提供的Protocol Buffers编译器生成Java语言, 命令如下:$ Z/ E9 @2 Q* L* S! Q1 t
protoc -java_out=. person.proto
, t' [2 W- l* `6 i) A注意, 上面的命令运行时的当前路径是person.proto所在目录。
. G& y: [( h Z* F9 [9 O$ a步骤3 使用Protocol Buffers库提供的API编写应用程序。 该例子创建了一个Person对象, 先将该对象保存到文件example.txt* e# N+ J0 p: s; C! f3 \: @- Y3 S
中, 之后又从文件中读出并打印出来。' t' g1 t* \1 I1 j$ e) Y# o
public class ProtocolBufferExample {3 I% t: A$ l4 W7 C: M, W
static public void main(String[] argv) {
2 `) y" `5 u1 m4 e" R0 SPerson person1 = Person.newBuilder()- Y( [" W6 }& Q' T. P& ^
.setName("Dong Xicheng")' q1 m. \& N3 I. v8 ~6 ~# a' z
.setEmail("dongxicheng@yahoo.com")0 [- B' O0 R- v
.setId(11111)
- F5 }: z: _: k+ w0 h; r/ k" p8 N.addPhone(Person.PhoneNumber.newBuilder()
, }9 q9 _+ `0 P# q.setNumber("15110241024")* r! \3 U W: Y* O4 [4 p
.setType(0))- B* M0 N3 D" n) F" X# c
.addPhone(Person.PhoneNumber.newBuilder()1 q, A; p7 ~; l
.setNumber("01025689654")
5 f1 c; {+ k& }/ c( y( ^.setType(1)).build();5 X5 ^0 b" C' v
try {: y# z ]$ H6 Z1 Y& U
FileOutputStream output = new FileOutputStream("example.txt");
4 S) V( y2 ]4 O$ dperson1.writeTo(output); E' f/ S& d- g' O, F
output.close();5 M, J- K: h; x3 }
} catch(Exception e) {
6 ^0 ^- F+ A0 x# g fSystem.out.println("Write Error! ");
* i4 k& |, [8 [} t) ?9 B, W' j7 D2 b* c
ry {
) v/ V5 r1 {* `! f! B$ N. U$ ?FileInputStream input = new FileInputStream("example.txt");! e. G @& P% @$ I
Person person2 = Person.parseFrom(input);
8 c- x9 O4 R! g" j. ]7 {& v mSystem.out.println("person2:" + person2);
( }; [ P9 p7 E5 J5 Y* A5 G( i} catch(Exception e) {$ v6 }6 p2 J+ l/ w/ v8 }6 d' }9 v/ F
System.out.println("Read Error!");
2 e) N8 I8 q8 [}
+ _% ?& D; t$ I; [}0 x; r# m9 b- J' p" d$ x
}
5 h* J8 _$ r, ] ]5 t在YARN中, 所有RPC函数的参数均采用Protocol Buffers定义的, 相比MRv1中基于Writable序列化的方法, Protocol Buffers的引
0 a9 Q" B! M1 e2 @- h, H入使得YARN在向后兼容性和性能方面向前迈进了一大步。/ {8 Z# Z0 o2 n9 T6 H* x* J
除序列化/反序列化之外, Protocol Buffers也提供了RPC函数的定义方法, 但并未给出具体实现, 这需 要用户自行实现 [2] , 而4 k) Y" }* u+ {( G8 z4 J) j" \
YARN则采用了MRv1中Hadoop RPC库, 举例如下:" W+ [" Q1 ]0 g+ _) r
service ContainerManagerService { //这是YARN自带的ContainerManager协议的定义. V3 `" o& @5 O% d2 _! B6 ^
rpc startContainer(StartContainerRequestProto) returns (StartContainerResponseProto);8 Y- R5 S d( ~: K; @& z
rpc stopContainer(StopContainerRequestProto) returns (StopContainerResponseProto);9 Q6 D! c' H+ h5 \, O8 a; k- d H
rpc getContainerStatus(GetContainerStatusRequestProto) returns (GetContainerStatusResponseProto);
5 g& J+ m, c) U* s' }+ G}
1 ?5 ]( `; i( z: G7 q( {7 P2 z& V5 E在第2章中, 介绍了YARN中的所有RPC协议, 而这些协议全是使用Protocol Buffers定义的, 具体如下:; }6 i, E; |) m3 u, ~0 m9 F5 m
❑applicationmaster_protocol.proto: 定义了AM与RM之间的协议—Application-MasterProtocol。
. d! T" x, D1 e% f0 h. \❑applicationclient_protocol.proto: 定义了JobClient( 作业提交客户端) 与RM之间的协议—ApplicationClientProtocol。
4 o& ?- e8 O* b( L5 g" P❑containermanagement_protocol.proto: 定义了AM与NM之间的协议—Container-ManagementProtocol。
( H! U7 i& e) Q3 |. S❑resourcemanager_administration_protocol.proto: 定义了Admin( 管理员) 与RM之间的通信协议—( L( V2 V0 g4 }' `1 P* g. S
ResourceManagerAdministrationProtocol。
& N& J' d7 p1 A3 z& ^/ }; w; R( C❑yarn_protos.proto: 定义了各个协议RPC的参数。/ U$ f, M6 U- E; j
❑ResourceTracker.proto: 定义了NM与RM之间的协议—ResourceTracker。5 K7 G4 n4 j! D6 a4 O
除了以上几个内核中的协议, YARN还使用Protocol Buffers对MapReduce中的协议进行了重新定义:
) ^4 P+ {; ^1 i❑MRClientProtocol.proto: 定义了JobClient( 作业提交客户端) 与MRAppMaster之间的协议—MRClientProtocol。
. [) E& W" Y9 b❑mr_protos.proto: 定义了MRClientProtocol协议的各个参数。
9 i% [3 A0 M8 Q3 g, ]7 I1 @. o& `9 z% x3.2.2 Apache Avro
$ q+ i* i/ b0 s7 X) jApache Avro [3] 是Hadoop下的一个子项目。 它本身既是一个序列化框架, 同时也实现了RPC的功能。7 ?3 y8 A# R3 B
Avro官网描述Avro的特性和功能如下:
9 K: m4 E/ M/ m, O5 g- H❑丰富的数据结构类型;
; i z8 |% S0 @% j5 y4 R2 {❑快速可压缩的二进制数据形式;" N6 o3 H# } ^
❑存储持久数据的文件容器;
7 [* a& n( w# }! B4 |0 X2 a❑提供远程过程调用RPC;
% n- M/ ~# t8 c5 C❑简单的动态语言结合功能。( R4 A8 }) n8 O6 J" R: f* l
相比于Apache Thrift 和Google的Protocol Buffers, Apache Avro具有以下特点:/ r+ m% K# m/ [& C
❑支持动态模式 。 Avro不需要生成代码, 这有利于搭建通用的数据处理系统, 同时避免了代码入侵。, I G- f' X* G( j% A; L3 {% B
❑数据无须加标签 。 读取数据前, Avro能够获取模式定义, 这使得Avro在数据编码时只需要保留更少的类型信息, 有利于. H+ G j( q4 b
减少序列化后的数据大小。
1 T3 n6 O8 o4 Z* Q6 M❑无须手工分配的域标识 。 Thrift和Protocol Buffers使用一个用户添加的整型域唯一性定义一个字段, 而Avro则直接使用域
/ n) N# ]- `/ M( W+ s名, 该方法更加直观、 更加易扩展。( k! u& `2 f% N5 n% A
编写一个Avro应用也需如下三步:
/ ^+ u" |! i3 E" T9 @' Y1) 定义消息格式文件, 通常以avro作为扩展名;
% p, e/ S6 t- T6 {( D8 b2) 使用Avro编译器生成特定语言的代码文件( 可选) ;7 c, Y! u" @* ]9 y6 d5 E
3) 使用Avro库提供的API来编写应用程序。1 }, x+ `% k8 j
下面给出一个使用实例。4 L/ A$ z# S: {, s) \* ~
步骤1 定义消息格式文件person.avro, 该文件描述了通讯录中某个人的基本信息, 内容如下:% Q4 i8 I! N1 h! o4 c
{"namespace": "com.example.tutorial"," {6 }% P$ d9 _1 B
"type": "record",
0 `/ d3 W, @- z/ g3 c"name": "Person",
+ h& F, z8 M( m- M"fields": [
- \0 c' A, x. Y6 x" P! W! j. @{"name": "name", "type": "string"},
( l0 M2 d2 }- |: O: m8 |{"name": "id", "type": "int"},
7 D# V& d% ]2 K" `{"name": "email", "type": ["string", "null"]},6 _+ F4 o) E2 U: O* z
{"name": "phone", "type": {"type": "array",
- g. D Q( B1 S7 g4 k7 e) d( V" q"items": {"type": "record", "name": "PhoneNumber",
' T. O5 j0 R0 r6 \" I: S"fields": [$ x" C" ^- s/ C
{"name": "number", "type": "string"},
4 V* B8 R( ^' I& a9 P{"name": "type", "type": ["int", "null"]}. j6 Q9 z1 t# `) _1 L
]
5 a' t( I3 G0 ?2 o# Z}
& {) k) }* K* Q+ F" y u}
/ r) c0 ~% A! Q( o}]
) x1 N9 L) G! ~' M+ [/ ]}
" }4 M. E- v1 `步骤2 使用Avro编译器生成Java语言, 命令如下:, S2 j$ U1 ^$ g7 O- |$ `5 |
java -jar avro-tools-1.7.4.jar compile schema person.avro .
! W: e. `2 K5 s7 ~& |. a- I注意, 上面的命令运行时的当前路径是person.avro所在目录。/ p R: Z8 U- n0 F8 K0 a" |
步骤3 使用Avro库提供的API来编写应用程序。 该例子创建一个Person对象, 先将该对象保存到文件example.txt中, 之后从文
" z3 {/ O& j: X件中读出并打印。
4 z9 N, o! ]0 [" E5 L+ ipublic class AvroExample {6 \# i: K% e3 C6 U: R$ M
static public void main(String[] argv) {
) H! M* C4 o8 U+ t. Y" zPhoneNumber phoneNumber1 = PhoneNumber.newBuilder()
* p- \; [7 q. f/ a4 s$ \$ O3 M( u.setNumber("15110241024")! @; ]* i! p/ H
.setType(0).build();( c% S' U; u+ [. a) e
PhoneNumber phoneNumber2 = PhoneNumber.newBuilder()
2 Q ~5 R' v. `/ r.setNumber("01025689654")! w9 g- p) V9 j+ ^: d1 j4 t
.setType(1).build();" d4 s% d7 f$ d% u( E. p( H8 A
List<PhoneNumber> phoneNumbers = new ArrayList<PhoneNumber>(); i* b+ [/ v) f7 H- f
phoneNumbers.add(phoneNumber1);
3 j+ ]/ u% |$ A0 Y2 w( Y4 ?phoneNumbers.add(phoneNumber2);- @0 ]/ E! W% i% C8 h' y
Person person = Person.newBuilder()4 {% ~6 \$ r/ G8 u% [
.setName("Dong Xicheng")/ d6 i, x9 w/ M7 E9 U3 c: J9 n
.setEmail("dongxicheng@yahoo.com")
: K9 X" {# u' z.setId(11111)& e1 W! @$ t3 G, p% T
.setPhone(phoneNumbers).build();: ]; d9 a: u9 g
File file = new File("person.txt");6 g4 @& ?$ d8 M# f; K) U" p
try {5 [5 j0 t& L+ F9 f, X0 R
DatumWriter<Person> personDatumWriter = new SpecificDatumWriter<Person>(Person.class);) [6 m2 W% [% |6 Y9 e W, n5 ^# Y# d
DataFileWriter<Person> dataFileWriter = new DataFileWriter<Person>(personDatumWriter);/ l% B* b4 H8 o+ }
dataFileWriter.create(person.getSchema(), file);9 E. l2 ?: C/ L8 Y' l; h, i$ K% Z
dataFileWriter.append(person);
2 a+ v5 P. t" T! p( E8 ]dataFileWriter.close();1 y+ U+ m* S0 k; ^
} catch(Exception e) {
- O( H* b+ O/ }System.out.println("Write Error:" + e);
9 T; w: y# O3 z; P$ J* k4 ?}t
3 Q9 b, J4 J3 [+ P B' D, |' p7 |ry {( P5 o9 B6 _3 O, w: N2 T
DatumReader<Person> userDatumReader = new SpecificDatumReader<Person>(Person.class);
/ L+ m8 U$ ]7 x0 B! `+ uDataFileReader<Person> dataFileReader = new DataFileReader<Person>(file, userDatumReader);
2 q3 F3 A0 b( P3 z t& zperson = null;
& n J9 F/ Z9 S3 Qwhile (dataFileReader.hasNext()) {
# A: w& \2 E+ A; x3 c& {$ d4 zperson = dataFileReader.next(person);
) K8 G( D% s) rSystem.out.println(person);; |# _+ o6 ~3 s7 J
}
; X- b) p& k' O- E} catch(Exception e) {
2 p* Z/ C& q, Q2 B0 z$ p6 ~System.out.println("Read Error:" + e);% l- v! ]( ^& ~5 C# K
}
6 F' i1 X4 W* G' R- ^2 z}
0 D* P) B0 a* u+ I}
$ F8 p9 y2 H& O5 C% n如果不想编译person.avro文件, 需要使用另外一套应用程序接口, 具体可参 考官方文档 [4] 。
. A# J; a$ S, C$ n6 HApache Avro最初是为Hadoop量身打造的RPC框架, 考虑到 稳定性 [5] , YARN暂时采用Protocol Buffers作为序列化库, RPC仍
' {9 m% J, q7 n; [: ^: `使用MRv1中的RPC, 而Avro则作为日志序列化库使用( 将在第8章介绍) 。 在YARN MapReduce中, 所有事件的序列化/反序列化
# i- @( j- s. ^2 t, i: v( K均采用Avro完成, 相关定义在Events.avpr文件中, 举例如下:2 t2 M$ z& t* ]3 k, X3 X
{"namespace": "org.apache.hadoop.mapreduce.jobhistory",) |( Q# F# N' ]
"protocol": "Events",
& {& X+ L$ E6 D"types": [
. R6 O/ U8 r S# I5 x% Z8 q…{
' n/ ^$ c' g' Y$ m"type": "record", "name": "JobInfoChange",
& q+ }1 @3 ^9 w `"fields": [8 s% u$ m7 V6 F9 f9 d4 ]9 O, k
{"name": "jobid", "type": "string"}," l: P' `; K& O a- Y' ^
{"name": "submitTime", "type": "long"},# {, G: F3 q c/ x# R: W
{"name": "launchTime", "type": "long"}! e; K$ h: H Z; f. u6 ^
]
( u+ Y) i+ s# X6 F/ H. d},
# s0 o+ {# C& r' m4 M' f{"type": "record", "name": "JobPriorityChange"," C- I2 L, h5 D, c o3 O
"fields": [9 h& _9 d p" L( K
{"name": "jobid", "type": "string"}," I3 ~' L% x& [* A
{"name": "priority", "type": "string"}
# f h2 z9 z$ e7 z* x, f- J]
4 y0 ]! o& C2 h5 ~# x1 M7 J/ ^& `},5 i7 s* v+ b1 V; q# g* N9 N
{"type": "record", "name": "JobStatusChanged",
5 S, ~8 k2 `9 w. D/ @# D+ b"fields": [
+ f9 n4 a9 I; z6 r* F, m* ~{"name": "jobid", "type": "string"},) h/ z+ |: S" e' h" a' Y
{"name": "jobStatus", "type": "string"}
! Q) V$ O# r9 k/ I3 B]0 ^) x7 f$ C! N9 P" u
},' N2 w- y+ X m- z# `1 H! U6 X
…]
. V7 b; c! e: l7 e( ?: E}8 h- a% d6 M- C
[1] 参见网址http://code.google.com/p/protobuf/。
5 X$ `+ }9 }2 P& f[2] 可参考第三方开源实现, 网址为http://code.google.com/p/protobuf/wiki/ThirdPartyAddOns。
- R, c/ C, |' C( J# A- Q7 j[3] 参见网址http://avro.apache.org/。
' J. ~7 s* J k" |" F$ o[4] 参见网址http://avro.apache.org/docs/current/gettingstartedjava.html。& p1 r* D' K+ _# W- H0 [3 V
[5] YARN项目启动时, Apache Avro尚不成熟, 存在各种问题。
/ [$ v' a5 i, K* f5 l+ T3 U1 W+ F+ A6 D) |/ H, R
1 f/ L- F0 \7 c+ L
|
|