|
3.2 第三方开源库/ M. e' S; R: k, }- c8 F
3.2.1 Protocol Buffers' I8 d8 F/ u# n
Protocol Buffers [1] 是一种轻便高效的结构化数据存储格式, 可以用于结构化数据序列化/反序列化。 它很适合做数据存储或! ?& a$ x7 K6 |4 I! O
RPC 的数据交换格式, 常用作通信协议、 数据存储等领域的与语言无关、 平台无关、 可扩展的序列化结构数据格式。 目前支持
7 c/ D. M% n7 m1 @( Z( k6 D) LC++、 Java、 Python三种语言。 在Google内部, 几乎所有的RPC协议和文件格式都是采用Protocol Buffers。1 t( A3 U# j$ ]1 \
相比于常见的XML格式, Protocol Buffers官方网站这样描述它的优点:
5 u2 d, J/ g0 A! X❑平台无关、 语言无关;! J: I3 }4 t6 c
❑高性能, 解析速度是XML的20~100倍;
( }1 {( \# e0 V4 c& ?7 Y' a❑体积小, 文件大小仅是XML的1/10~1/3;+ Q2 Q: g! k5 t) U* E5 Q) X9 o
❑使用简单;7 f/ H6 E/ ]" B; |3 I1 m# A
❑兼容性好。) L* U( s8 Z- y& }/ G
通常编写一个Protocol Buffers应用需要以下三步:$ H- |" ~' v: Q9 m x# b4 Q
1) 定义消息格式文件, 通常以proto作为扩展名;1 _* c1 u7 c! y) ]4 Y
2) 使用Google提供的Protocol Buffers编译器生成特定语言( 目前支持C++、 Java、 Python三类语言) 的代码文件;# D. t0 i4 U8 g* o5 u2 t% d8 @7 ?
3) 使用Protocol Buffers库提供的API来编写应用程序。
+ o N' o0 E& S+ D" V为了说明Protocol Buffers的使用方法, 下面给出一个简单的实例。( }$ m" h6 U9 [5 J# o
该实例中首先定义一个消息格式文件person.proto, 描述了通讯录中一个人的基本信息, 接着用Protocol Buffers提供的方法将
- c/ B0 V1 Y( T1 m% \) |一个人的信息写入文件。2 B1 L$ P! } A. U4 }$ Q' D
步骤1 定义消息格式文件person.proto, 该文件描述了通讯录中某个人的基本信息, 内容如下:2 c6 Y ~% }! L j5 l
package tutorial; //自定义的命名空间- r1 I- Z+ R* D& O* N8 Q( J- d, O
option java_package = "com.example.tutorial"; //生成文件的包名
1 E( ~5 n7 O c7 \3 S' w7 U# loption java_outer_classname = "PersonProtos"; //类名
% L) w6 E4 y! H amessage Person { //待描述的结构化数据- s3 B, W/ o% h
required string name = 1; //required表示这个字段不能为空
o" `3 L: F/ wrequired int32 id = 2; //数字“2”表示字段的数字别名
! B( M7 [0 v/ Z" `1 `optional string email = 3; //optional表示该字段可以为空) O9 n; T/ h( j2 P
message PhoneNumber { //内部message
( O- T, c, z7 x, `required string number = 1;- Z) H& T H. Y# {* p: V% Y+ T! f
optional int32 type = 2;; S B8 E8 y# j( I
}r1 J4 z5 Q4 l( k3 R( [) m
epeated PhoneNumber phone = 4;
- c7 m! U# w9 ]& G4 w |}6 j/ |. r9 e6 i( V! U1 n0 B
步骤2 使用Google提供的Protocol Buffers编译器生成Java语言, 命令如下:5 Q& y0 H0 b* z" B" `( @
protoc -java_out=. person.proto
0 h2 B9 v: d; c# g, y1 ?; w$ H3 J8 p注意, 上面的命令运行时的当前路径是person.proto所在目录。) t" ]& e8 ^! r! a/ t1 ?+ l+ h
步骤3 使用Protocol Buffers库提供的API编写应用程序。 该例子创建了一个Person对象, 先将该对象保存到文件example.txt, Q ^8 w0 {0 f! F. ~
中, 之后又从文件中读出并打印出来。
5 c8 A, u+ \7 u/ O& Xpublic class ProtocolBufferExample {
7 S0 O+ @& q j8 L$ ]; Y: cstatic public void main(String[] argv) {
/ j7 _ o0 g$ j+ ^4 sPerson person1 = Person.newBuilder()0 x9 b7 X# m$ e9 F; R- \
.setName("Dong Xicheng")" p9 p* h: g+ r9 M: @% E
.setEmail("dongxicheng@yahoo.com")4 `; d& r' x: i- X& b( o) U
.setId(11111)1 g R2 Q; r" J3 h" [/ c( g& T5 f
.addPhone(Person.PhoneNumber.newBuilder()& ~* o3 G! O9 ~# _5 k
.setNumber("15110241024")
$ [. D) ?6 M2 X# s3 e( Z( e+ V1 x.setType(0))
n7 b4 ^. \; h( k; C Y" m.addPhone(Person.PhoneNumber.newBuilder()3 U; ^) u( g) U& U
.setNumber("01025689654")( P0 [( x! P" N( l' T) z3 ^. ]' T
.setType(1)).build();9 ?1 F2 f1 K3 P
try {
* X0 J$ O- W A# m `" U- h- C5 P7 }FileOutputStream output = new FileOutputStream("example.txt");
$ N( p$ m/ c N9 J; o1 wperson1.writeTo(output);0 U% b9 i. Z+ I( `! v
output.close();
8 F9 b: W$ @$ z& C, ]$ z8 Z} catch(Exception e) {
! \: g1 l; w8 l* {System.out.println("Write Error! "); J9 M; h; o+ g0 }
} t
. X1 N/ ?! }* ?5 C' Sry {- |8 [* a4 C! O( @2 S
FileInputStream input = new FileInputStream("example.txt");) E* L+ m5 r z- w3 ^
Person person2 = Person.parseFrom(input);/ [ y7 ?! Q1 ]* v# _; Q c* N0 c
System.out.println("person2:" + person2);
" g5 F: y/ D$ Z) S" H1 n} catch(Exception e) {( ^7 a. Y3 Z2 C+ \* ~' X
System.out.println("Read Error!");6 g1 w0 r9 S. m( u/ u
}7 O# L" W/ H3 w2 b# \& J
}/ C) h2 M' e6 ^6 p& m! n
}
* ~5 _. P$ _8 `" f在YARN中, 所有RPC函数的参数均采用Protocol Buffers定义的, 相比MRv1中基于Writable序列化的方法, Protocol Buffers的引
7 H- \9 }% e; z/ M入使得YARN在向后兼容性和性能方面向前迈进了一大步。/ ?( O+ s* @1 f2 |
除序列化/反序列化之外, Protocol Buffers也提供了RPC函数的定义方法, 但并未给出具体实现, 这需 要用户自行实现 [2] , 而4 L A$ X( _2 Y
YARN则采用了MRv1中Hadoop RPC库, 举例如下:
% n* @7 h8 M3 H. y/ B1 I1 o9 rservice ContainerManagerService { //这是YARN自带的ContainerManager协议的定义# V2 Y$ a2 q, P* |1 B
rpc startContainer(StartContainerRequestProto) returns (StartContainerResponseProto);1 `" j- {9 z% W- ?" K( p; q }2 c' q
rpc stopContainer(StopContainerRequestProto) returns (StopContainerResponseProto);
+ X/ P' a1 s( R: Z9 f% D) K9 a' srpc getContainerStatus(GetContainerStatusRequestProto) returns (GetContainerStatusResponseProto);* {: j4 c, b6 M
}
5 E; p- y% v, W# C% ~: f: i在第2章中, 介绍了YARN中的所有RPC协议, 而这些协议全是使用Protocol Buffers定义的, 具体如下:( y4 M9 O0 o5 E9 T
❑applicationmaster_protocol.proto: 定义了AM与RM之间的协议—Application-MasterProtocol。
9 Z# O n3 X1 T6 U: z" [❑applicationclient_protocol.proto: 定义了JobClient( 作业提交客户端) 与RM之间的协议—ApplicationClientProtocol。
: E$ k; l3 K% C @3 p❑containermanagement_protocol.proto: 定义了AM与NM之间的协议—Container-ManagementProtocol。: K4 d5 h2 O. P) \
❑resourcemanager_administration_protocol.proto: 定义了Admin( 管理员) 与RM之间的通信协议—3 }# n0 ~( }8 J4 m* n, [& g( D# H
ResourceManagerAdministrationProtocol。* D* U* n+ ~& n! n: G3 Y# C3 Q
❑yarn_protos.proto: 定义了各个协议RPC的参数。( L N/ P& H- T, e9 w# Q3 R
❑ResourceTracker.proto: 定义了NM与RM之间的协议—ResourceTracker。$ | z0 } `' ? `; M4 z% ^; M( U
除了以上几个内核中的协议, YARN还使用Protocol Buffers对MapReduce中的协议进行了重新定义:: j, J$ k9 A. C7 s6 x: Y% [
❑MRClientProtocol.proto: 定义了JobClient( 作业提交客户端) 与MRAppMaster之间的协议—MRClientProtocol。
6 o# A. f6 p8 _9 e. _( ?❑mr_protos.proto: 定义了MRClientProtocol协议的各个参数。' D8 [( D1 A8 I. v* R6 L: V- V
3.2.2 Apache Avro
; w) s, Z; F' }+ BApache Avro [3] 是Hadoop下的一个子项目。 它本身既是一个序列化框架, 同时也实现了RPC的功能。/ g% e2 x5 ~! Y% z5 `! F/ A
Avro官网描述Avro的特性和功能如下:
4 n. }1 e: i# d3 J8 q❑丰富的数据结构类型;
) [7 g8 v; N/ k6 g+ `& v; X7 C9 k❑快速可压缩的二进制数据形式;
; `: l. } c( b1 ~. ?& y0 ?: H❑存储持久数据的文件容器;' J8 [7 U: I+ m7 H: Z# U8 k
❑提供远程过程调用RPC;
* D- z- J" ]$ a5 h0 J' e" I4 X9 [❑简单的动态语言结合功能。9 c# l/ h. j" B3 J1 \$ ]
相比于Apache Thrift 和Google的Protocol Buffers, Apache Avro具有以下特点:( f9 Q0 B, I j9 N F, x& ^
❑支持动态模式 。 Avro不需要生成代码, 这有利于搭建通用的数据处理系统, 同时避免了代码入侵。7 a, W2 t5 ~5 ]$ O
❑数据无须加标签 。 读取数据前, Avro能够获取模式定义, 这使得Avro在数据编码时只需要保留更少的类型信息, 有利于
% f& [5 \( a: n5 z O减少序列化后的数据大小。
1 ^7 \6 a8 y# p6 s: t❑无须手工分配的域标识 。 Thrift和Protocol Buffers使用一个用户添加的整型域唯一性定义一个字段, 而Avro则直接使用域6 d4 x& o. u# [9 m7 _: _) _' s E
名, 该方法更加直观、 更加易扩展。
' s2 p5 B) I1 c6 Q编写一个Avro应用也需如下三步:
, _1 z/ L1 ^+ q4 i2 {9 E1) 定义消息格式文件, 通常以avro作为扩展名;
7 j, |7 J+ a1 t! A& |7 f2) 使用Avro编译器生成特定语言的代码文件( 可选) ;
/ z: P' v9 c1 E3) 使用Avro库提供的API来编写应用程序。( `* _$ {9 B: ]6 r0 ~
下面给出一个使用实例。8 i/ y2 M2 ]6 N' y! r4 h+ @
步骤1 定义消息格式文件person.avro, 该文件描述了通讯录中某个人的基本信息, 内容如下:
$ C, M8 P# u, b# a( x% x{"namespace": "com.example.tutorial",
7 q- h( k# f$ b+ I5 K' M, u"type": "record",
$ E) J- V! M1 S0 W' G& I+ |5 T5 K"name": "Person",* a Y, w4 T3 S4 K3 e
"fields": [
1 L7 C( V: b2 s5 u& Z# E{"name": "name", "type": "string"},
$ t$ l" y4 o2 S( K3 u{"name": "id", "type": "int"},# M" A. [. Y/ ]$ }
{"name": "email", "type": ["string", "null"]},
* |. O" {. x7 e7 N8 k{"name": "phone", "type": {"type": "array",# g p3 O" n* ~, p2 a7 e
"items": {"type": "record", "name": "PhoneNumber",
* j, X, N5 J4 \2 g4 o/ ]"fields": [, j1 n D/ X/ Q8 h
{"name": "number", "type": "string"},
, ~" u0 A8 L1 x{"name": "type", "type": ["int", "null"]}
* }& m( b2 l$ Y6 H/ V4 v# B9 c! B]
" P% j, [/ F6 P2 Z# {! |/ L( l}9 n! V3 }5 E9 F4 ?& Z7 L$ @( d9 }
}
0 ~1 _3 X$ ]0 y$ o1 p: L2 E}]
3 m% S9 ~1 _& Q7 ?9 F}5 w/ z. I1 ]' R/ ^9 _
步骤2 使用Avro编译器生成Java语言, 命令如下:
5 _, U' v' N1 ~' \, Ojava -jar avro-tools-1.7.4.jar compile schema person.avro .2 P* e& p/ P: q/ F
注意, 上面的命令运行时的当前路径是person.avro所在目录。, M* X+ V5 V* e; p8 ~
步骤3 使用Avro库提供的API来编写应用程序。 该例子创建一个Person对象, 先将该对象保存到文件example.txt中, 之后从文
9 a `: n Q) ~4 t% o8 G- {" ]# \件中读出并打印。
, n: Z% B( F$ h' A: Q; T+ Rpublic class AvroExample {
! [* C6 c# n# a9 Y3 i$ Nstatic public void main(String[] argv) {% N' K. |' T6 h" X% s% z6 V
PhoneNumber phoneNumber1 = PhoneNumber.newBuilder()
1 c8 y$ X" j( U* r& h" w0 _.setNumber("15110241024")3 t% V W U; r0 j- w. ~
.setType(0).build();
9 f* q: `* T; p+ o* C. C9 yPhoneNumber phoneNumber2 = PhoneNumber.newBuilder()
; q ^4 i( k2 Z5 Q.setNumber("01025689654")
8 G [! D, \8 F( r) P" N.setType(1).build();
5 B; x* z2 B' ^- z5 g( N( PList<PhoneNumber> phoneNumbers = new ArrayList<PhoneNumber>();, K% E( S) X1 W, V% Q
phoneNumbers.add(phoneNumber1);# t5 [/ v! F/ M, k
phoneNumbers.add(phoneNumber2);2 A% {, ~) f# Q! M; Q
Person person = Person.newBuilder()
- h! Q, Z) P% B' U% y.setName("Dong Xicheng")" e% S" J- ?3 S' y9 F
.setEmail("dongxicheng@yahoo.com")2 T5 U) h) }1 B
.setId(11111)
; y3 y5 }; R0 u8 D1 ^.setPhone(phoneNumbers).build();
- B# U9 |7 L) b4 b! s( QFile file = new File("person.txt");
2 R1 W- H- C! _" M I, `try {
- r) Y5 f( D6 O; I% x! a0 g- [DatumWriter<Person> personDatumWriter = new SpecificDatumWriter<Person>(Person.class);
6 K% ]+ W0 d7 q% y0 UDataFileWriter<Person> dataFileWriter = new DataFileWriter<Person>(personDatumWriter);
# t% w* h) a' y) v: zdataFileWriter.create(person.getSchema(), file);
0 t" l1 X- t' X" S# ~8 W1 y0 p v, HdataFileWriter.append(person);4 C4 K, t3 Q0 |' U
dataFileWriter.close();
5 I3 t! j8 B: `} catch(Exception e) {7 j! O7 k! ?1 r B' o
System.out.println("Write Error:" + e);; o6 v5 A( F, ^ H+ v+ i( x
}t
& L) m0 E" I1 y8 jry {' P- f$ b& F1 E1 s6 t3 ?. A4 _
DatumReader<Person> userDatumReader = new SpecificDatumReader<Person>(Person.class);
- @' Q y* n0 }DataFileReader<Person> dataFileReader = new DataFileReader<Person>(file, userDatumReader);, |1 z0 O' B% g: {# L. G1 X
person = null;
! y3 B7 p4 ~9 t, T) fwhile (dataFileReader.hasNext()) {
* r( B2 O! l8 M, C0 W/ fperson = dataFileReader.next(person);
: K0 n/ n( }) c4 W0 u# ISystem.out.println(person);1 \/ n; X6 I. C6 O
}0 L% M7 W+ p# s6 _: {
} catch(Exception e) {
4 O1 [: }3 W2 a1 p/ }/ w' n+ RSystem.out.println("Read Error:" + e);
) V: R, N) r7 g! o& P- W* G$ l, H}: s5 c3 e4 [9 _. a% ]+ o
}
. u& w' V: k! U}; ^, W3 p6 m' M1 F0 x
如果不想编译person.avro文件, 需要使用另外一套应用程序接口, 具体可参 考官方文档 [4] 。
' W9 H" q# v- t9 f2 e1 ?Apache Avro最初是为Hadoop量身打造的RPC框架, 考虑到 稳定性 [5] , YARN暂时采用Protocol Buffers作为序列化库, RPC仍
; F O$ p! p/ z使用MRv1中的RPC, 而Avro则作为日志序列化库使用( 将在第8章介绍) 。 在YARN MapReduce中, 所有事件的序列化/反序列化
, R5 Y& u. y/ N0 }* G7 k J均采用Avro完成, 相关定义在Events.avpr文件中, 举例如下:0 U! _2 O: K& L& {
{"namespace": "org.apache.hadoop.mapreduce.jobhistory",
" `' N, m( i f S4 k"protocol": "Events",
( j5 {/ F- e9 T"types": [
$ K( b6 u8 C* N z…{
7 d9 s7 N8 F% t5 R7 U! I% F"type": "record", "name": "JobInfoChange",
8 I6 H% x! T2 {! F% @" `4 C"fields": [. _) M9 u6 ?( g* i
{"name": "jobid", "type": "string"},8 Y m' H; {6 L
{"name": "submitTime", "type": "long"},
3 j4 J, A. [" l$ _1 \{"name": "launchTime", "type": "long"}' T' e. k# {9 g! e/ _
]- b8 L! a1 @, [7 U& m1 D
},
" u, v) N8 U( e4 w: Z- a{"type": "record", "name": "JobPriorityChange",/ V7 L W( [8 R- [7 X( e/ V3 v
"fields": [
* w' B5 n4 ~. k; A{"name": "jobid", "type": "string"},
9 {+ M, ], y+ i{"name": "priority", "type": "string"}: R2 v/ I; M) ]4 w
]
/ w3 s: S G) \! `},6 p" Y+ P! O1 K
{"type": "record", "name": "JobStatusChanged",
) F4 m" Z5 O2 i5 Z"fields": [6 T& T$ l* k- Y
{"name": "jobid", "type": "string"},
' |& f; j6 d# ^! ^) R. f. L; E4 j{"name": "jobStatus", "type": "string"}; j& [9 B/ c( g n9 i* O
]0 w2 I' }1 z1 o9 r% ` E
},4 l; G. j# b t: Z0 ^# f
…]" @# |# }+ l4 J/ V; n" H
}$ A, q4 ^ i! g! ~3 s0 u2 z
[1] 参见网址http://code.google.com/p/protobuf/。
' ^9 {3 V) N7 L7 b; [# D2 O& L[2] 可参考第三方开源实现, 网址为http://code.google.com/p/protobuf/wiki/ThirdPartyAddOns。* l# J# I/ v8 p
[3] 参见网址http://avro.apache.org/。3 P! W3 [5 G4 w3 a2 j3 q
[4] 参见网址http://avro.apache.org/docs/current/gettingstartedjava.html。
6 j7 K4 ]2 A, I5 m[5] YARN项目启动时, Apache Avro尚不成熟, 存在各种问题。
) i5 Y @, U; J# K) v0 }2 z( O* t. P9 {. g
4 X9 t! r$ u+ G' \; E* x D |
|