|
3.2 第三方开源库5 r& T1 H* O$ D6 I/ ?3 Y: R4 Z8 j
3.2.1 Protocol Buffers
4 c, t$ |8 H; \9 @1 M0 h% e: tProtocol Buffers [1] 是一种轻便高效的结构化数据存储格式, 可以用于结构化数据序列化/反序列化。 它很适合做数据存储或6 i5 k: }3 E- }+ F0 O/ z
RPC 的数据交换格式, 常用作通信协议、 数据存储等领域的与语言无关、 平台无关、 可扩展的序列化结构数据格式。 目前支持; S1 I+ P2 e7 T3 k
C++、 Java、 Python三种语言。 在Google内部, 几乎所有的RPC协议和文件格式都是采用Protocol Buffers。( E3 Y; l; L0 {+ T- c. K9 s
相比于常见的XML格式, Protocol Buffers官方网站这样描述它的优点:5 y, t( F1 x# Z4 Z9 i
❑平台无关、 语言无关;
/ P2 s0 M" K6 F( ^3 M8 B$ N❑高性能, 解析速度是XML的20~100倍;
4 W$ g3 G$ T% F/ ]❑体积小, 文件大小仅是XML的1/10~1/3;* u1 \* X& d |" _
❑使用简单;5 m5 m" P. O; x5 f9 O6 [ s3 f
❑兼容性好。
7 C3 {& A# K+ v D4 f3 f; X) i8 [) V9 c通常编写一个Protocol Buffers应用需要以下三步:
0 f8 Z# l" z( e+ ]3 n. @1) 定义消息格式文件, 通常以proto作为扩展名;
: h0 A" O" h" J8 R q2) 使用Google提供的Protocol Buffers编译器生成特定语言( 目前支持C++、 Java、 Python三类语言) 的代码文件;
. K3 a9 U& F; C3) 使用Protocol Buffers库提供的API来编写应用程序。0 |0 }* k2 o+ l8 H$ `& w9 h( C
为了说明Protocol Buffers的使用方法, 下面给出一个简单的实例。) x% i! _+ T4 O% I" W+ F
该实例中首先定义一个消息格式文件person.proto, 描述了通讯录中一个人的基本信息, 接着用Protocol Buffers提供的方法将8 Y) q) D1 p8 f3 f# t
一个人的信息写入文件。
S& ]7 D! d9 s6 K步骤1 定义消息格式文件person.proto, 该文件描述了通讯录中某个人的基本信息, 内容如下:
6 O5 \+ `6 v T! D' r# r$ f1 ?5 d* tpackage tutorial; //自定义的命名空间+ |6 h, A/ j W9 `3 h- X/ v
option java_package = "com.example.tutorial"; //生成文件的包名
# m+ a+ f! P9 C" [( {option java_outer_classname = "PersonProtos"; //类名
# L) K+ z2 |3 Emessage Person { //待描述的结构化数据2 b7 @& y T- `
required string name = 1; //required表示这个字段不能为空; z: `) }) [+ F
required int32 id = 2; //数字“2”表示字段的数字别名
! P# ?; ]. e% J% Y& e% hoptional string email = 3; //optional表示该字段可以为空
U* D) g7 X! jmessage PhoneNumber { //内部message
6 A# V( h, B2 x; d# U- \required string number = 1;& P3 |3 ?2 B$ A0 x& F
optional int32 type = 2;
( T+ E8 I* z; w/ q! O- c}r
# H2 B: ?2 @1 ?, s/ j# |epeated PhoneNumber phone = 4;- x% K' y2 Q9 o; p8 S: D
}
0 g! [; a. `- r# y* o步骤2 使用Google提供的Protocol Buffers编译器生成Java语言, 命令如下:1 ]$ c% Y& `" E
protoc -java_out=. person.proto
* J. ]% f2 u& Q4 E7 O注意, 上面的命令运行时的当前路径是person.proto所在目录。9 k4 @7 G' V2 Q2 s0 D
步骤3 使用Protocol Buffers库提供的API编写应用程序。 该例子创建了一个Person对象, 先将该对象保存到文件example.txt( L- r+ W& B( |$ w q
中, 之后又从文件中读出并打印出来。6 C" X1 Z- G. y8 s8 s& N% Z1 U; @
public class ProtocolBufferExample {- T" h/ L- z) t' H& w; |
static public void main(String[] argv) {: C- X3 K# y0 d7 h; `) i2 D
Person person1 = Person.newBuilder()
$ O: @; |' f: y) |0 y* l.setName("Dong Xicheng")
* e7 e' i9 c9 B$ k. g$ T* c0 I.setEmail("dongxicheng@yahoo.com")
) F7 d9 w y% a% ?& s6 S# C- R.setId(11111)
3 S# ], ?( b+ P.addPhone(Person.PhoneNumber.newBuilder()
7 g; B' a- W" V$ D.setNumber("15110241024")5 T' [0 R7 U$ j# H
.setType(0))/ H" `( C' Z5 x( s J$ l) _( C; P
.addPhone(Person.PhoneNumber.newBuilder()6 M/ P3 p* E( E: J) ~# I& R3 A+ D
.setNumber("01025689654")
8 |- L& _ L; q! x.setType(1)).build();& w" L) g7 T! y/ y2 H/ T6 ]
try {2 Z" E/ [, g; v- z2 i# U0 Q
FileOutputStream output = new FileOutputStream("example.txt");
; @3 H F/ J& b6 k! u" u2 F7 j1 |/ [& Rperson1.writeTo(output);
1 C: `# v. g8 h& e. voutput.close();7 l, J& O& f1 |
} catch(Exception e) { K$ M; p6 e) m( K6 k
System.out.println("Write Error! ");6 s$ e f8 c$ J5 z8 ~$ N% N
} t
* D9 `, E0 \4 y5 |' w- `* H0 lry {, X" K/ Y. g) x. Q" M4 T
FileInputStream input = new FileInputStream("example.txt");8 w$ c7 B5 z4 `7 b" T5 |6 I
Person person2 = Person.parseFrom(input);" s" X. b, U, D7 t
System.out.println("person2:" + person2);
% M, | {# E- I7 g} catch(Exception e) {% p8 n6 [6 Q" o) ~
System.out.println("Read Error!");1 X, i/ o6 Y$ h
}
6 c- `) I1 U9 X$ U7 I}0 [1 Z/ W5 l! R F9 B
}; V3 A f |. x
在YARN中, 所有RPC函数的参数均采用Protocol Buffers定义的, 相比MRv1中基于Writable序列化的方法, Protocol Buffers的引
9 K+ O- t8 L1 z& ? P入使得YARN在向后兼容性和性能方面向前迈进了一大步。
, u$ }, H, b) {6 J. w除序列化/反序列化之外, Protocol Buffers也提供了RPC函数的定义方法, 但并未给出具体实现, 这需 要用户自行实现 [2] , 而
' d5 Z! }( J6 K; `YARN则采用了MRv1中Hadoop RPC库, 举例如下:* L A Z9 e" R& y& c, \
service ContainerManagerService { //这是YARN自带的ContainerManager协议的定义
% A0 x( E! q( a+ zrpc startContainer(StartContainerRequestProto) returns (StartContainerResponseProto);( E# _8 O$ Z) b+ x" l, f( a5 u
rpc stopContainer(StopContainerRequestProto) returns (StopContainerResponseProto);, @/ Y( z. O: N" j) p( N
rpc getContainerStatus(GetContainerStatusRequestProto) returns (GetContainerStatusResponseProto);
* A* P, d- e( l3 d/ Y0 A}
4 `# H1 D% z9 V! o在第2章中, 介绍了YARN中的所有RPC协议, 而这些协议全是使用Protocol Buffers定义的, 具体如下:
( q" s# R7 |" w2 k8 s" l❑applicationmaster_protocol.proto: 定义了AM与RM之间的协议—Application-MasterProtocol。* A/ b$ E0 ?- ?" F: H# S
❑applicationclient_protocol.proto: 定义了JobClient( 作业提交客户端) 与RM之间的协议—ApplicationClientProtocol。7 m7 a+ E0 T7 k# r% M
❑containermanagement_protocol.proto: 定义了AM与NM之间的协议—Container-ManagementProtocol。$ G$ l; J# B* p" s% b- u
❑resourcemanager_administration_protocol.proto: 定义了Admin( 管理员) 与RM之间的通信协议—
1 D% g% \/ n% M3 b: YResourceManagerAdministrationProtocol。/ W% S, z- _( M0 j
❑yarn_protos.proto: 定义了各个协议RPC的参数。) d5 @/ g. u# g7 V. U9 r
❑ResourceTracker.proto: 定义了NM与RM之间的协议—ResourceTracker。- }3 i' W. K4 q$ M# E" s
除了以上几个内核中的协议, YARN还使用Protocol Buffers对MapReduce中的协议进行了重新定义:
; k' N( g. w# E+ M1 r! g9 U❑MRClientProtocol.proto: 定义了JobClient( 作业提交客户端) 与MRAppMaster之间的协议—MRClientProtocol。& y5 J$ c& o- b4 \- u3 F( S; a
❑mr_protos.proto: 定义了MRClientProtocol协议的各个参数。
, F* Y0 [& `3 ^% ]+ S3.2.2 Apache Avro
' U, G) l9 A$ ?# @Apache Avro [3] 是Hadoop下的一个子项目。 它本身既是一个序列化框架, 同时也实现了RPC的功能。
) q X7 j! K) a* oAvro官网描述Avro的特性和功能如下:. }, }: I+ y* n0 T; y
❑丰富的数据结构类型;
9 L$ q, F3 o; i4 h❑快速可压缩的二进制数据形式;+ ]6 X$ x3 x3 d: E* G9 h
❑存储持久数据的文件容器;$ o( J0 D" i7 E$ j. L
❑提供远程过程调用RPC;
' y( F9 X2 f H❑简单的动态语言结合功能。3 H* q4 s0 M5 c# ^- j% d
相比于Apache Thrift 和Google的Protocol Buffers, Apache Avro具有以下特点:. f' z# z* Y& H7 x
❑支持动态模式 。 Avro不需要生成代码, 这有利于搭建通用的数据处理系统, 同时避免了代码入侵。3 K- i0 {+ F# u. A5 x+ g
❑数据无须加标签 。 读取数据前, Avro能够获取模式定义, 这使得Avro在数据编码时只需要保留更少的类型信息, 有利于
! |6 m, `4 c4 J! H减少序列化后的数据大小。- b( a8 J+ _1 s8 E5 m6 o* J* h
❑无须手工分配的域标识 。 Thrift和Protocol Buffers使用一个用户添加的整型域唯一性定义一个字段, 而Avro则直接使用域3 Z6 A7 N+ Y7 v2 u9 q
名, 该方法更加直观、 更加易扩展。
1 O" I9 \( H2 ]( K6 ]编写一个Avro应用也需如下三步:
1 l. F. |2 t: H$ k5 C! E% T; T1) 定义消息格式文件, 通常以avro作为扩展名;
9 S- J, ~* x( i/ x% ]* G2) 使用Avro编译器生成特定语言的代码文件( 可选) ;7 h& [- m, J" p! S
3) 使用Avro库提供的API来编写应用程序。. H7 Y, R) g0 U3 X
下面给出一个使用实例。
" T2 c/ M1 n6 v5 N. t" U步骤1 定义消息格式文件person.avro, 该文件描述了通讯录中某个人的基本信息, 内容如下:5 z$ R! B+ m0 `7 m" \9 p8 B
{"namespace": "com.example.tutorial",
$ d5 N# v, M' v: n"type": "record",: a% U* Q# k) c1 L. V
"name": "Person",0 Y6 [9 N& w0 E
"fields": [8 @/ ?* _8 K1 D8 Z7 n5 Y
{"name": "name", "type": "string"},, P% i! C& o) g, J0 ]4 n
{"name": "id", "type": "int"},
! |5 O) b. s, n# p3 e; l9 }{"name": "email", "type": ["string", "null"]},
' C" b8 B: Y u K9 i{"name": "phone", "type": {"type": "array", E- B8 w5 K) {
"items": {"type": "record", "name": "PhoneNumber",1 d0 Y" _1 M! ?9 S0 J
"fields": [
- q. A i! t, D' x{"name": "number", "type": "string"},1 m3 ^+ ~, { h7 e2 y2 Q/ T4 Q& V5 t
{"name": "type", "type": ["int", "null"]}+ s4 G6 T) m# k/ d9 r- m- i6 v
]
5 `9 B$ d- u* a}0 m: U0 `% ]6 f! z" ^( o5 {0 T0 ^
}
7 x" O" D6 c( M& r5 c( z; V0 h8 B}]
) Q$ b, m) b P! N}
/ a0 g& I; j5 A步骤2 使用Avro编译器生成Java语言, 命令如下:
7 b, D. Y" R# D# g6 ijava -jar avro-tools-1.7.4.jar compile schema person.avro .6 g2 N, {0 J$ g6 A* k
注意, 上面的命令运行时的当前路径是person.avro所在目录。
* B9 U: I, I8 e% S步骤3 使用Avro库提供的API来编写应用程序。 该例子创建一个Person对象, 先将该对象保存到文件example.txt中, 之后从文
3 E+ M8 x, R& M) p) J件中读出并打印。
g6 X7 Y% d( N7 p8 ?7 r2 Jpublic class AvroExample {7 G7 H1 Q$ p- V
static public void main(String[] argv) {! Z% L$ M$ }2 s3 _! y7 Q s- x
PhoneNumber phoneNumber1 = PhoneNumber.newBuilder()/ C6 ]* e3 s- }* Y* [
.setNumber("15110241024")! d* R5 @/ y4 P) H2 b: N2 R$ ~
.setType(0).build();
4 H, w7 X( q5 O( s, L5 [6 WPhoneNumber phoneNumber2 = PhoneNumber.newBuilder()
2 G7 h6 e% s; N( L! m: n.setNumber("01025689654")4 G P5 u1 p! r( s( d+ S' R
.setType(1).build();# q/ }& d, ^4 B a# ?0 i& _4 {
List<PhoneNumber> phoneNumbers = new ArrayList<PhoneNumber>();
+ k9 X# ^7 g4 A+ c/ a! Z0 ZphoneNumbers.add(phoneNumber1);* ^. c9 x4 p0 c% c# I2 @1 g
phoneNumbers.add(phoneNumber2);
. T/ T0 I- Q0 L) t; W( gPerson person = Person.newBuilder()
+ S b! J! ^8 v( P. v: p- i.setName("Dong Xicheng")
: D0 [6 e% H; H& |1 Y& s! S.setEmail("dongxicheng@yahoo.com")
' ?0 `3 L. V; S) [- e( b% }, M.setId(11111); v) V9 U o4 J: N/ B# i, a
.setPhone(phoneNumbers).build();
) Q8 S" {. e+ Z; w1 \File file = new File("person.txt");
; R! L% D) x5 j6 j: D @+ ztry {
9 x- x4 L, N, P5 K1 @DatumWriter<Person> personDatumWriter = new SpecificDatumWriter<Person>(Person.class);
. v/ R2 h$ E; q3 |$ r( f6 g$ C3 H: o, `DataFileWriter<Person> dataFileWriter = new DataFileWriter<Person>(personDatumWriter);3 \8 B" N5 r2 l: W5 d
dataFileWriter.create(person.getSchema(), file);
' ]2 U, b; E6 QdataFileWriter.append(person);
& u( O& S) k, h; c% ]# IdataFileWriter.close();
+ {+ E u9 ]+ G7 d$ O0 F. G; |} catch(Exception e) {" A+ u1 A+ ~+ [
System.out.println("Write Error:" + e);6 U: L+ [. R+ ?; Y" ~- k# I: E
}t
6 ~- v* x6 X( Z M& k) i4 l; Iry {
% A' e* _8 u" ?7 cDatumReader<Person> userDatumReader = new SpecificDatumReader<Person>(Person.class);
0 S( y& T5 s4 I& g* c& tDataFileReader<Person> dataFileReader = new DataFileReader<Person>(file, userDatumReader);
; Z9 Q! B1 g0 S- d7 W5 pperson = null;
- `$ u" z8 [3 q9 y, q1 _3 }while (dataFileReader.hasNext()) {$ ?% A6 U. g' Y4 y/ r2 ~3 n
person = dataFileReader.next(person);, x9 t2 \ _. J4 \* z: P( w1 A
System.out.println(person);0 K4 T$ U& E/ |2 w! M
}
3 f8 o2 V# M- d8 j" J} catch(Exception e) {4 r1 c7 ^3 k' _- h! J Y
System.out.println("Read Error:" + e);
$ a# Q" w. D6 Y5 v}
7 P/ H" f) k, j A0 u( |}# Z: R( P+ h6 o" j Y
}
9 t3 W$ i: L8 |8 o8 u% a, h8 X如果不想编译person.avro文件, 需要使用另外一套应用程序接口, 具体可参 考官方文档 [4] 。
! K# ^9 p) s9 E; a: Y. ZApache Avro最初是为Hadoop量身打造的RPC框架, 考虑到 稳定性 [5] , YARN暂时采用Protocol Buffers作为序列化库, RPC仍
$ Y+ v# m$ T( N- o7 I- r使用MRv1中的RPC, 而Avro则作为日志序列化库使用( 将在第8章介绍) 。 在YARN MapReduce中, 所有事件的序列化/反序列化
, @2 [8 ^+ h- j1 n: p: a均采用Avro完成, 相关定义在Events.avpr文件中, 举例如下:6 q# A0 A( W4 Q2 N+ z" I+ Z
{"namespace": "org.apache.hadoop.mapreduce.jobhistory",3 P4 |) T3 Q# G' t# p0 s0 V& D% y3 Z
"protocol": "Events",
; j+ L& j, T2 N8 F# l"types": [$ @; v( S- S' v2 |& u
…{% T) A' c3 L0 M
"type": "record", "name": "JobInfoChange",
$ R9 w. f0 p/ K# S+ h5 s1 _"fields": [
, }4 E" p6 f; m; d, A{"name": "jobid", "type": "string"},' y( l" n( L% ]! @1 \
{"name": "submitTime", "type": "long"},. Z6 v, q, p! d6 B6 Q' O
{"name": "launchTime", "type": "long"}9 f1 C2 Z+ z2 X9 p1 }
]" h4 m" A: r) I
},# N, n. u- t; r9 \
{"type": "record", "name": "JobPriorityChange",
0 g1 ~# u9 {: x: u N/ Z"fields": [
# r+ H% i9 ~. v& o/ m+ @{"name": "jobid", "type": "string"},. {' l7 L) `2 S4 O* J) Y# L5 T
{"name": "priority", "type": "string"}
- ~4 R8 J' B2 u9 v]
. _+ i e& @; x},9 K4 N, G! _2 Q. |/ T' i1 W! R
{"type": "record", "name": "JobStatusChanged",8 N- R: G9 P! G1 _- x
"fields": [
# g& y8 [) y7 i, Y/ ]4 T7 f{"name": "jobid", "type": "string"},
: i/ H% x% ]0 t9 j% V! D0 K d{"name": "jobStatus", "type": "string"}
, t5 Y6 G7 M4 }* a; ^]
. h1 M H+ d4 l},
% X6 D4 `! K) b2 t1 l" F' F# M…]
: c3 j' S# o- w, E9 s}( F9 T! h" f8 O; `
[1] 参见网址http://code.google.com/p/protobuf/。
9 Z4 h/ {# Q* h[2] 可参考第三方开源实现, 网址为http://code.google.com/p/protobuf/wiki/ThirdPartyAddOns。9 S6 ^0 O3 ]* f: x4 Z
[3] 参见网址http://avro.apache.org/。5 _1 G$ s/ ^" T* F% r- U9 v
[4] 参见网址http://avro.apache.org/docs/current/gettingstartedjava.html。
! _ N' P& J" w, |- w1 y0 ?% {[5] YARN项目启动时, Apache Avro尚不成熟, 存在各种问题。 & g% d2 |5 N' ]4 D% m2 V1 I( v1 @
9 T. Q9 F) F; Z0 F2 `
. H! D$ u& X1 B0 x; I* @. u4 v |
|