|
3.2 第三方开源库
% Q7 e& ?5 C0 p* H+ m" d* I K3 Y! d3.2.1 Protocol Buffers
& G2 G" f! U/ AProtocol Buffers [1] 是一种轻便高效的结构化数据存储格式, 可以用于结构化数据序列化/反序列化。 它很适合做数据存储或
/ H6 i/ y) H7 _' R6 ^RPC 的数据交换格式, 常用作通信协议、 数据存储等领域的与语言无关、 平台无关、 可扩展的序列化结构数据格式。 目前支持4 `3 J; x- S. U) w* X% Y7 U* w
C++、 Java、 Python三种语言。 在Google内部, 几乎所有的RPC协议和文件格式都是采用Protocol Buffers。6 e0 O0 R& S2 q6 Z# C/ S
相比于常见的XML格式, Protocol Buffers官方网站这样描述它的优点:2 n J: k7 q6 U& H/ y7 Z5 u
❑平台无关、 语言无关;
- b# \; k: x8 s4 O( U❑高性能, 解析速度是XML的20~100倍;
" W8 p8 u' q) s- `0 D1 |, Q6 A) j) v3 U. N❑体积小, 文件大小仅是XML的1/10~1/3;" ?& \& }' G( N' J& B
❑使用简单;
2 `/ P7 ?/ E O. }. @❑兼容性好。8 B/ S: W* P+ q/ K
通常编写一个Protocol Buffers应用需要以下三步:6 }: J' C5 o5 O! d1 h" `( K
1) 定义消息格式文件, 通常以proto作为扩展名;. {+ Q: @. V: Y& N9 U- P
2) 使用Google提供的Protocol Buffers编译器生成特定语言( 目前支持C++、 Java、 Python三类语言) 的代码文件;
9 g4 Y( R5 K7 K: ^$ U9 a( `: ?3) 使用Protocol Buffers库提供的API来编写应用程序。
: ~' x V9 p6 `4 @( R9 H为了说明Protocol Buffers的使用方法, 下面给出一个简单的实例。( o$ V& H: M( `1 q5 }. t
该实例中首先定义一个消息格式文件person.proto, 描述了通讯录中一个人的基本信息, 接着用Protocol Buffers提供的方法将, g7 m, V/ \- F& V: n8 C: b
一个人的信息写入文件。
6 e4 R5 A* M# W8 m$ ]步骤1 定义消息格式文件person.proto, 该文件描述了通讯录中某个人的基本信息, 内容如下:( u+ K, d1 ?' H8 w: \
package tutorial; //自定义的命名空间
* \' s9 o+ A; @option java_package = "com.example.tutorial"; //生成文件的包名. d0 H- m- D5 W' t; v4 i
option java_outer_classname = "PersonProtos"; //类名+ R/ B0 P) }3 [ J$ q# H
message Person { //待描述的结构化数据" O5 Z* ~+ y7 }) d$ A
required string name = 1; //required表示这个字段不能为空
{( H' J" `; K, O4 J5 D9 v, f; x: Krequired int32 id = 2; //数字“2”表示字段的数字别名
+ p6 o7 A; I0 ?" b0 eoptional string email = 3; //optional表示该字段可以为空
4 D- {, `' a" Q% M2 ~$ lmessage PhoneNumber { //内部message
/ j8 `/ K. L) ]& \; I; urequired string number = 1;
7 k/ Z: W0 M* h& O& L# \: Yoptional int32 type = 2;) I4 w% H/ k! T( h) c3 m0 n# X
}r: x8 i7 G( Y, u2 x+ {
epeated PhoneNumber phone = 4;; {- l; H. d6 z# h, @ N$ J. h3 P
}
4 [0 p/ x$ k$ w B3 T步骤2 使用Google提供的Protocol Buffers编译器生成Java语言, 命令如下:
& ?. g- r; }$ i9 }6 Tprotoc -java_out=. person.proto1 o9 D- i) D L5 v* C/ l
注意, 上面的命令运行时的当前路径是person.proto所在目录。
3 |" @; |7 B. U$ `% |步骤3 使用Protocol Buffers库提供的API编写应用程序。 该例子创建了一个Person对象, 先将该对象保存到文件example.txt
. ]% d4 K N1 }0 F, j" }中, 之后又从文件中读出并打印出来。3 j$ q) Q; J* `& Y
public class ProtocolBufferExample {/ e& q% C* y6 Q: C3 G; |" ~
static public void main(String[] argv) {
! t7 }* d: _! ~4 t' vPerson person1 = Person.newBuilder()
9 s; ~/ Y% x3 l+ b.setName("Dong Xicheng")
6 b, e( H/ X8 B: Q8 E8 x.setEmail("dongxicheng@yahoo.com")
+ s. A# B! c" @% j0 w' ~% A/ j.setId(11111)
o& c. O0 }7 @( r.addPhone(Person.PhoneNumber.newBuilder()
7 L. `1 Z% `) Q: _.setNumber("15110241024")
' j; d: C% r8 b( X& {.setType(0))3 J4 r) p8 l3 u" z4 k* E
.addPhone(Person.PhoneNumber.newBuilder()
% v# y: z1 e( ?.setNumber("01025689654")
* ~- [ I9 U( P) M8 q; m.setType(1)).build();+ z0 d2 h0 V% S! g8 Y- B+ I
try {
( d- \* Y# _4 B& cFileOutputStream output = new FileOutputStream("example.txt");
# }$ r6 V J( \$ Iperson1.writeTo(output);7 }- [5 M: T# q) T6 @
output.close();% e* L1 Y6 E! e
} catch(Exception e) {( z; A2 q, p! p
System.out.println("Write Error! ");! d3 `# S' z) H
} t$ a0 G1 j {# p R
ry {
b; E' w. R) D. y; G/ r [FileInputStream input = new FileInputStream("example.txt");
" b5 ~9 s3 @* o2 H4 N- W, vPerson person2 = Person.parseFrom(input);
$ R. j' J" t- M' ~7 x& F+ o# BSystem.out.println("person2:" + person2);$ o, {: o, d5 @9 M
} catch(Exception e) {4 n& n& u4 g1 S$ C
System.out.println("Read Error!");/ K3 m6 M' _. }1 E4 v: r" y! T) j
}
% |: W* q# f9 l/ t9 a3 q6 H}* E r7 c' E- W/ m2 N
}
2 q# _) v0 z: k, |7 P3 W4 J在YARN中, 所有RPC函数的参数均采用Protocol Buffers定义的, 相比MRv1中基于Writable序列化的方法, Protocol Buffers的引
, B5 q) @& a, l1 S入使得YARN在向后兼容性和性能方面向前迈进了一大步。
$ h9 X" M" t. e7 @( L除序列化/反序列化之外, Protocol Buffers也提供了RPC函数的定义方法, 但并未给出具体实现, 这需 要用户自行实现 [2] , 而
5 k' S$ L4 j6 BYARN则采用了MRv1中Hadoop RPC库, 举例如下:3 E! ~; y& o) s- P0 S4 _
service ContainerManagerService { //这是YARN自带的ContainerManager协议的定义
$ `$ {* s) V8 g9 T! \, T* orpc startContainer(StartContainerRequestProto) returns (StartContainerResponseProto);& L7 ^* s G5 Y i. y8 @( f
rpc stopContainer(StopContainerRequestProto) returns (StopContainerResponseProto);
# ~* C+ t7 X1 ~& ]. c! ~rpc getContainerStatus(GetContainerStatusRequestProto) returns (GetContainerStatusResponseProto);
U" t& Y# f# X; [- d}
2 @# r5 }* l _$ I! l4 f在第2章中, 介绍了YARN中的所有RPC协议, 而这些协议全是使用Protocol Buffers定义的, 具体如下:/ x# Z/ a! N, |2 ?# p
❑applicationmaster_protocol.proto: 定义了AM与RM之间的协议—Application-MasterProtocol。6 U9 J8 M4 I* S: J4 {$ q- V6 P$ Q ~
❑applicationclient_protocol.proto: 定义了JobClient( 作业提交客户端) 与RM之间的协议—ApplicationClientProtocol。
% y$ ^% y' ~! |4 K& p; d6 A❑containermanagement_protocol.proto: 定义了AM与NM之间的协议—Container-ManagementProtocol。
3 Z j9 k! _+ ]; v7 t$ B❑resourcemanager_administration_protocol.proto: 定义了Admin( 管理员) 与RM之间的通信协议—
. ]2 v+ U8 f' d; z2 I2 _$ TResourceManagerAdministrationProtocol。' R9 j% a1 p1 X/ O6 y& P( z; d
❑yarn_protos.proto: 定义了各个协议RPC的参数。
6 \# n/ N% m7 j/ H6 W4 D; K! \0 f1 i❑ResourceTracker.proto: 定义了NM与RM之间的协议—ResourceTracker。4 v E3 |( a# j+ o" b) F, f
除了以上几个内核中的协议, YARN还使用Protocol Buffers对MapReduce中的协议进行了重新定义:
0 ?# V u2 s, @+ V# M1 J& [( c❑MRClientProtocol.proto: 定义了JobClient( 作业提交客户端) 与MRAppMaster之间的协议—MRClientProtocol。, }2 v2 o& q/ H7 D
❑mr_protos.proto: 定义了MRClientProtocol协议的各个参数。" [+ S* V6 Z' r+ H9 \
3.2.2 Apache Avro0 k$ V3 b6 w( `/ o
Apache Avro [3] 是Hadoop下的一个子项目。 它本身既是一个序列化框架, 同时也实现了RPC的功能。% g/ S M/ Z) _: c/ S! _
Avro官网描述Avro的特性和功能如下:, w0 ^" M! S' p, i2 q$ _
❑丰富的数据结构类型;
5 ~+ T- Z' @) _0 T❑快速可压缩的二进制数据形式;
8 E; x7 E& b; c) z {❑存储持久数据的文件容器;
# S8 Z. Y% o; x; q6 Z, [6 o5 p❑提供远程过程调用RPC;
& `- ?% O/ |& I* A- G) C7 E0 g0 ^$ m# h❑简单的动态语言结合功能。
3 @- e/ d3 M' ]6 [+ `) W; p相比于Apache Thrift 和Google的Protocol Buffers, Apache Avro具有以下特点:
- Z" s( S4 d* X& V: r❑支持动态模式 。 Avro不需要生成代码, 这有利于搭建通用的数据处理系统, 同时避免了代码入侵。
! R7 w9 b$ {9 L# S+ S! H y❑数据无须加标签 。 读取数据前, Avro能够获取模式定义, 这使得Avro在数据编码时只需要保留更少的类型信息, 有利于
* F6 q# A2 Q W+ t减少序列化后的数据大小。: g9 d3 i3 `& |2 i- c
❑无须手工分配的域标识 。 Thrift和Protocol Buffers使用一个用户添加的整型域唯一性定义一个字段, 而Avro则直接使用域% Q+ F' e9 f) Y! I/ b% z: D: Z9 _
名, 该方法更加直观、 更加易扩展。1 O: ^9 r# K, o- G$ ^" N+ g
编写一个Avro应用也需如下三步:
' P) E; C6 h4 \ I P1) 定义消息格式文件, 通常以avro作为扩展名;8 L! m# |+ e6 W+ Y9 J2 O7 J, n
2) 使用Avro编译器生成特定语言的代码文件( 可选) ;
5 T8 s9 Z! N2 P$ q3) 使用Avro库提供的API来编写应用程序。
& P6 N0 a$ s) Z" x7 w0 v% Y下面给出一个使用实例。
7 k' c3 r0 D! t$ d5 T步骤1 定义消息格式文件person.avro, 该文件描述了通讯录中某个人的基本信息, 内容如下:5 ?$ R1 d, W8 c+ S( l7 {9 ]
{"namespace": "com.example.tutorial",
# F: J h* g7 C) g/ l# q"type": "record",& C, D, t: }* x, S4 F* I6 w
"name": "Person",6 L! E# f$ j" X# K; L
"fields": [: Q' I/ d; k/ _& U" y/ w
{"name": "name", "type": "string"},
2 x" h. x# I0 }* s0 s& r{"name": "id", "type": "int"},
2 G! Z! p; K/ F, u5 Y9 Z u{"name": "email", "type": ["string", "null"]},7 _% A( e5 L( r r {
{"name": "phone", "type": {"type": "array",
4 ?( i* s# a3 ]: m"items": {"type": "record", "name": "PhoneNumber",
- T2 s. a1 H) m* L5 r"fields": [
8 D P! c7 V4 t- I+ a{"name": "number", "type": "string"},
* m. I/ R. }1 v* G5 X9 @: y) _- v5 Y{"name": "type", "type": ["int", "null"]}
3 G8 @/ q( ]& o1 a]
% Q. e! k* c- a b$ X( X8 }}( ~" O3 ~* P. p$ ^2 K- T! Z
}
- K: H7 S! ]7 J4 N- o% \: I}]9 K: w1 p; Z. L5 k: y& N
}7 q# L$ c/ i1 ~6 i% X6 B" g( \
步骤2 使用Avro编译器生成Java语言, 命令如下: x( v" `' h& J1 e0 h$ T l$ ~
java -jar avro-tools-1.7.4.jar compile schema person.avro .* b8 f# \, v# O4 c1 m- Z# {
注意, 上面的命令运行时的当前路径是person.avro所在目录。& K$ G0 l: S; p N X, \8 Z
步骤3 使用Avro库提供的API来编写应用程序。 该例子创建一个Person对象, 先将该对象保存到文件example.txt中, 之后从文5 \, M; [6 X" I3 D w
件中读出并打印。8 g, J/ l. r' `1 G
public class AvroExample {$ R& W, ]) @% }% R$ ~
static public void main(String[] argv) {2 V# P6 f2 T4 _" n7 A
PhoneNumber phoneNumber1 = PhoneNumber.newBuilder()
) @( r* n% y8 C H5 z4 z# j# Q.setNumber("15110241024")
# `& j& \& k. Y4 ^6 v- w.setType(0).build();2 ~0 _8 p1 F5 P2 b
PhoneNumber phoneNumber2 = PhoneNumber.newBuilder()
7 {+ i( c/ f$ H, K) p4 P.setNumber("01025689654")
& A% l: W: ?+ q/ @( _7 R.setType(1).build();# K2 l9 b+ f/ R# a' y
List<PhoneNumber> phoneNumbers = new ArrayList<PhoneNumber>();3 Y8 v% j2 o' i; l: ^0 p" y6 c& h3 X
phoneNumbers.add(phoneNumber1);9 q: S9 k: w9 x
phoneNumbers.add(phoneNumber2);5 K$ f, b' k3 A
Person person = Person.newBuilder()9 Z5 f- I2 {' g6 S( I0 [
.setName("Dong Xicheng")0 n& H: Q! L. X l% K. \- g: T
.setEmail("dongxicheng@yahoo.com")' S8 E0 q. B, d6 o+ h, z
.setId(11111)! s) ]8 U5 n% ^3 X
.setPhone(phoneNumbers).build();2 k! x1 r, b! C/ Y% @/ Y" d* J
File file = new File("person.txt");
; k( {8 e' K/ U7 E/ i: l2 Ttry {
8 ]% e: h8 [* E' s- LDatumWriter<Person> personDatumWriter = new SpecificDatumWriter<Person>(Person.class);
: T% Q- {5 ^% S8 }! LDataFileWriter<Person> dataFileWriter = new DataFileWriter<Person>(personDatumWriter);
. u. K" E' g9 `0 \dataFileWriter.create(person.getSchema(), file);
t3 Y/ |1 ^2 S% y1 q! }dataFileWriter.append(person);- Q# d0 V+ f0 t" k( s5 i
dataFileWriter.close();
; ?1 ]3 x$ `0 f, Y# W. I5 f} catch(Exception e) {
6 C& I6 \# l# w( t* s: v+ lSystem.out.println("Write Error:" + e);
! ?* e. g7 w& l}t7 O) R$ w! l7 S2 ]; m" H7 ^, f
ry {
/ |- t* `# f8 k3 F- gDatumReader<Person> userDatumReader = new SpecificDatumReader<Person>(Person.class);
9 S; ]* Z+ k5 U+ g# NDataFileReader<Person> dataFileReader = new DataFileReader<Person>(file, userDatumReader);
) J* y, U: d p+ @# ?person = null;/ M, t/ F: p6 a# F% ^
while (dataFileReader.hasNext()) {
$ j. k7 x( V2 Y* p/ ?, Hperson = dataFileReader.next(person);( Q+ ^. ]2 p0 {( |
System.out.println(person);
9 d. S0 G. r+ K2 S2 u# a/ r6 z}
+ c. ]$ h3 d* j% O0 D} catch(Exception e) {
5 y! `/ t% d; G, y7 u- @" uSystem.out.println("Read Error:" + e);2 j+ w) f1 [* e7 \3 Q
}9 d& ^, s6 {( F" h M& J
}
# i l6 N. i9 u! t' @}
; a4 ?* y: b% O1 Q+ B* _. {+ \如果不想编译person.avro文件, 需要使用另外一套应用程序接口, 具体可参 考官方文档 [4] 。
% s6 u" V* g8 r+ [- _Apache Avro最初是为Hadoop量身打造的RPC框架, 考虑到 稳定性 [5] , YARN暂时采用Protocol Buffers作为序列化库, RPC仍
3 {, E8 F& x, C. F, q. U" {% m使用MRv1中的RPC, 而Avro则作为日志序列化库使用( 将在第8章介绍) 。 在YARN MapReduce中, 所有事件的序列化/反序列化
, S* H& o% b' G% h均采用Avro完成, 相关定义在Events.avpr文件中, 举例如下:
* C- F# @4 G5 n* g# ^{"namespace": "org.apache.hadoop.mapreduce.jobhistory",! n9 s( n& Y, Z, E5 l$ `0 H
"protocol": "Events",
: f/ ~4 A' T1 N ^" R1 C"types": [
5 |, }" T" u9 C8 r5 q. z…{8 v( ?: j4 V1 s3 S
"type": "record", "name": "JobInfoChange",
, b, l+ X, Q9 K; @8 ^& m"fields": [, r- v. W7 a0 X7 I" r
{"name": "jobid", "type": "string"},
8 A% u6 o; ~7 }% r{"name": "submitTime", "type": "long"},, T* b5 p9 y5 S( e' j4 K& \
{"name": "launchTime", "type": "long"}
+ J: Q) w2 u& Z, Q {]
* z+ C; L ^: A8 w},! m0 ^, v b+ L1 o) ~
{"type": "record", "name": "JobPriorityChange",4 }$ n6 y1 @$ c: e- ~
"fields": [" D: T) C6 B, V) O' ^
{"name": "jobid", "type": "string"},
- v+ ~% g2 w$ p% m7 ~) X{"name": "priority", "type": "string"}
n+ T: c! U" E2 l7 w6 z3 o6 j4 [# d]
0 n; D" W' n: s},4 ?' B7 t3 S. F& g( `5 ?
{"type": "record", "name": "JobStatusChanged",
* Y9 Z4 f& F3 }1 }( j1 I( T"fields": [- N5 a, F! G" T) t, j
{"name": "jobid", "type": "string"},9 x& `( q+ A g6 J7 s& r2 |! d
{"name": "jobStatus", "type": "string"}
# ^- O& G$ ]1 c( i# `]
: q- F) y/ C) f h},. {# i1 \+ }3 M* \4 V7 n# K* ^
…]# C0 _8 ` m7 S9 e8 M' ?
}
7 `: d, k3 J% Q& P) \4 y( [" X[1] 参见网址http://code.google.com/p/protobuf/。% y. ^# y# ~: S1 u* v
[2] 可参考第三方开源实现, 网址为http://code.google.com/p/protobuf/wiki/ThirdPartyAddOns。: U0 a2 T: X5 K# {) v- Q
[3] 参见网址http://avro.apache.org/。2 c7 w, g3 ~' A, E2 O1 k
[4] 参见网址http://avro.apache.org/docs/current/gettingstartedjava.html。* L$ @3 f9 n7 g* i1 d% p% Z" M
[5] YARN项目启动时, Apache Avro尚不成熟, 存在各种问题。 8 V9 M& M" N* L2 k! s% u
! L: @+ i) K: I7 @, D" f
; b0 e i% D6 }# c7 S. a: `8 W
|
|