|
3.2 第三方开源库/ h5 r+ X+ E0 @7 {. X J$ _9 c" D
3.2.1 Protocol Buffers$ z6 k. O$ G( f6 _5 V6 Q- |
Protocol Buffers [1] 是一种轻便高效的结构化数据存储格式, 可以用于结构化数据序列化/反序列化。 它很适合做数据存储或
" h, M4 e7 y FRPC 的数据交换格式, 常用作通信协议、 数据存储等领域的与语言无关、 平台无关、 可扩展的序列化结构数据格式。 目前支持. F2 _5 I$ E* t$ B8 T' z
C++、 Java、 Python三种语言。 在Google内部, 几乎所有的RPC协议和文件格式都是采用Protocol Buffers。
8 Z8 `$ U5 w+ w3 _3 |8 ]2 {相比于常见的XML格式, Protocol Buffers官方网站这样描述它的优点:
9 n! z8 }& Z/ E2 ~2 ~+ D❑平台无关、 语言无关;: w9 g; r! W0 h" r
❑高性能, 解析速度是XML的20~100倍;* [+ S( `8 L7 O% D
❑体积小, 文件大小仅是XML的1/10~1/3;
( w, n3 \9 P+ j* [" ^4 J6 y❑使用简单;
W1 q/ P! t! j6 h! j m❑兼容性好。# I6 m- w$ b; A+ \2 d5 ]- C- Y2 |
通常编写一个Protocol Buffers应用需要以下三步:
- d4 b1 X2 l8 K! I1) 定义消息格式文件, 通常以proto作为扩展名;9 f8 }! {3 f. g7 D q4 k( ]/ e7 |
2) 使用Google提供的Protocol Buffers编译器生成特定语言( 目前支持C++、 Java、 Python三类语言) 的代码文件;6 \9 f" w, m/ z% ^4 |- X: x
3) 使用Protocol Buffers库提供的API来编写应用程序。
' X( z7 a$ D2 p& z为了说明Protocol Buffers的使用方法, 下面给出一个简单的实例。* O2 I. F' n S8 b% a: g0 F% e7 k
该实例中首先定义一个消息格式文件person.proto, 描述了通讯录中一个人的基本信息, 接着用Protocol Buffers提供的方法将3 j' Q3 E: Z3 y6 [! m
一个人的信息写入文件。% s* e7 W7 n( I6 K1 {7 [. o: d
步骤1 定义消息格式文件person.proto, 该文件描述了通讯录中某个人的基本信息, 内容如下:
8 T" \1 E- m) z5 `* N. dpackage tutorial; //自定义的命名空间) w4 W8 @) m# I
option java_package = "com.example.tutorial"; //生成文件的包名) C& `% i0 F- R9 k" _8 B( ^
option java_outer_classname = "PersonProtos"; //类名
5 I9 j1 A9 Q2 {message Person { //待描述的结构化数据% \* e; l, g6 D8 K3 \
required string name = 1; //required表示这个字段不能为空
+ M9 h$ ^* W( V+ r; `required int32 id = 2; //数字“2”表示字段的数字别名' m$ L$ |, k# Q, t8 Y
optional string email = 3; //optional表示该字段可以为空
8 ~' p5 ?8 V; _# K0 s. Smessage PhoneNumber { //内部message
8 s& E9 z1 C b0 E4 ~5 O5 G2 _required string number = 1;9 a5 P6 E, W8 _( W' P9 f
optional int32 type = 2;2 J' }( ^' B( ~6 [8 ^$ r% O
}r
. W6 k8 `- \( x( D1 U! yepeated PhoneNumber phone = 4;
~ L# ~3 ~& N& A" Q}, C% G2 ^7 |9 I
步骤2 使用Google提供的Protocol Buffers编译器生成Java语言, 命令如下:4 Y* {' ?. p5 h. A! X
protoc -java_out=. person.proto
# [% S7 }* l' k( E注意, 上面的命令运行时的当前路径是person.proto所在目录。
9 M: `& t3 K {8 s# ?步骤3 使用Protocol Buffers库提供的API编写应用程序。 该例子创建了一个Person对象, 先将该对象保存到文件example.txt3 r* O% a& i+ h# p# H1 `& X# F
中, 之后又从文件中读出并打印出来。9 ?7 _9 r0 W% u/ W, F) T% h
public class ProtocolBufferExample {
- a' k' G y/ i( i% Xstatic public void main(String[] argv) { s" `( V z; _3 ]6 H6 h0 t
Person person1 = Person.newBuilder()& ?8 y4 H/ l, v" X z5 v
.setName("Dong Xicheng")( T8 z0 q7 P% N. N
.setEmail("dongxicheng@yahoo.com")
8 Y1 {8 d( d" Y+ k0 S.setId(11111)
2 H0 {/ T l3 l0 c: g! q; ?" {* M.addPhone(Person.PhoneNumber.newBuilder()6 D% U6 z" o/ c6 P h
.setNumber("15110241024")7 k5 B9 b( N3 Z3 H( q
.setType(0))
K- H# y( s' e, I) I. z; Y' b.addPhone(Person.PhoneNumber.newBuilder(), w$ j3 d# x. ^% g
.setNumber("01025689654")
5 s/ ?7 N2 ~* m |; ].setType(1)).build();3 C% H6 U% [0 S% ^
try {
- W# q) b# d! O, a* `+ kFileOutputStream output = new FileOutputStream("example.txt");
0 {" p. J9 W g) X& Sperson1.writeTo(output);
, L. @9 [1 h3 b* ?/ ooutput.close();
+ {; H& W% W" \7 `} catch(Exception e) {
. P5 I. X" _9 OSystem.out.println("Write Error! ");
- }2 i/ C+ T. H% Q: L* E} t
+ X. o; q1 }) L5 H$ V% m2 E6 p3 l. h* xry {1 j1 o! K) x7 f% N
FileInputStream input = new FileInputStream("example.txt");
- S6 P! U. y+ W1 a5 q% _Person person2 = Person.parseFrom(input); B! ]3 e1 ~5 x
System.out.println("person2:" + person2);8 H' m, Y* s# a$ _- s3 V
} catch(Exception e) {5 q2 x0 _2 O: g; n
System.out.println("Read Error!");
0 k& L/ `" y( i7 f* Q) I- w: f}; H9 W4 L$ [: D
}
! ~5 N8 n- Z) F}
$ }# w$ _1 c! z8 M( z6 T8 t在YARN中, 所有RPC函数的参数均采用Protocol Buffers定义的, 相比MRv1中基于Writable序列化的方法, Protocol Buffers的引# ^ i1 w6 j: j# t8 H7 J
入使得YARN在向后兼容性和性能方面向前迈进了一大步。: d, \2 Y/ v/ O4 I$ C8 m
除序列化/反序列化之外, Protocol Buffers也提供了RPC函数的定义方法, 但并未给出具体实现, 这需 要用户自行实现 [2] , 而2 q7 w& e) B0 G: G" G
YARN则采用了MRv1中Hadoop RPC库, 举例如下:
6 p! O" I+ }7 }$ z/ b3 f& qservice ContainerManagerService { //这是YARN自带的ContainerManager协议的定义
$ Y! e0 y6 H' D! J1 c8 O2 o) Irpc startContainer(StartContainerRequestProto) returns (StartContainerResponseProto);0 y/ }; b8 c6 D" ?) A, D: @, T
rpc stopContainer(StopContainerRequestProto) returns (StopContainerResponseProto);; ]1 H( z5 g7 L# X) D
rpc getContainerStatus(GetContainerStatusRequestProto) returns (GetContainerStatusResponseProto);
9 W: s8 f# r( K7 A' W; j}# ^6 `9 [8 i) k! g. _
在第2章中, 介绍了YARN中的所有RPC协议, 而这些协议全是使用Protocol Buffers定义的, 具体如下:
* i4 g+ @3 I# e7 l❑applicationmaster_protocol.proto: 定义了AM与RM之间的协议—Application-MasterProtocol。0 o* B4 r: n% _! m( u: `* D0 _2 [. w: \% T
❑applicationclient_protocol.proto: 定义了JobClient( 作业提交客户端) 与RM之间的协议—ApplicationClientProtocol。2 j9 n/ p* x( \5 O* k3 p
❑containermanagement_protocol.proto: 定义了AM与NM之间的协议—Container-ManagementProtocol。
7 f+ f6 g/ o' t u5 F9 ?4 B❑resourcemanager_administration_protocol.proto: 定义了Admin( 管理员) 与RM之间的通信协议—
2 v) K& \; i R% `4 vResourceManagerAdministrationProtocol。) m4 i: N1 F! j a5 P8 l; a
❑yarn_protos.proto: 定义了各个协议RPC的参数。) l3 [% q7 g2 x' s5 E
❑ResourceTracker.proto: 定义了NM与RM之间的协议—ResourceTracker。
+ \$ m" U6 u. G& ?! r" K除了以上几个内核中的协议, YARN还使用Protocol Buffers对MapReduce中的协议进行了重新定义:4 ?2 C; j9 N1 X* P( n3 m
❑MRClientProtocol.proto: 定义了JobClient( 作业提交客户端) 与MRAppMaster之间的协议—MRClientProtocol。' W( ^' J4 L" w( S) h C' P8 T
❑mr_protos.proto: 定义了MRClientProtocol协议的各个参数。
$ g8 ^2 o! m6 L3.2.2 Apache Avro! y6 K6 N4 s: T; _0 c3 j* S
Apache Avro [3] 是Hadoop下的一个子项目。 它本身既是一个序列化框架, 同时也实现了RPC的功能。
$ W: f- b5 c6 B- N/ I9 LAvro官网描述Avro的特性和功能如下:3 z# ?8 n! C) m) ?; V% `
❑丰富的数据结构类型;8 t+ k8 {% M2 N! e
❑快速可压缩的二进制数据形式;- H) M. p# e Q- ^# ~4 p+ P
❑存储持久数据的文件容器;: w/ x' k u1 t, k! Z) b( p3 Q
❑提供远程过程调用RPC;
- R( ~6 i) a% A) f/ \0 e❑简单的动态语言结合功能。
6 G$ e. u. I I( z8 G相比于Apache Thrift 和Google的Protocol Buffers, Apache Avro具有以下特点:
/ q' m2 W- Z5 A3 V❑支持动态模式 。 Avro不需要生成代码, 这有利于搭建通用的数据处理系统, 同时避免了代码入侵。# @: [9 C6 p- T
❑数据无须加标签 。 读取数据前, Avro能够获取模式定义, 这使得Avro在数据编码时只需要保留更少的类型信息, 有利于
) F5 x' f" B6 C8 b减少序列化后的数据大小。- v8 k P$ b" V' f
❑无须手工分配的域标识 。 Thrift和Protocol Buffers使用一个用户添加的整型域唯一性定义一个字段, 而Avro则直接使用域
$ ]: S2 A9 q5 w' T名, 该方法更加直观、 更加易扩展。
! A+ I( o. [, k( |) x编写一个Avro应用也需如下三步:
8 |6 B; ^) g: _; m0 i1) 定义消息格式文件, 通常以avro作为扩展名;
7 k6 ?$ N# \! g* k j2) 使用Avro编译器生成特定语言的代码文件( 可选) ;
) G% m8 ?' o& K! F' G3) 使用Avro库提供的API来编写应用程序。% R1 }7 Y0 H. | |
下面给出一个使用实例。# f6 F- D% i( r
步骤1 定义消息格式文件person.avro, 该文件描述了通讯录中某个人的基本信息, 内容如下:) F7 U* d% |* g
{"namespace": "com.example.tutorial",
: y( M. A* B) G$ Q& u8 F"type": "record",( X& p, _2 X- H2 p; ^; T/ H
"name": "Person",
8 u, H5 u7 h( S( J4 r g0 ^"fields": [0 D( M( `8 }( ~9 ?. s
{"name": "name", "type": "string"}, `9 H* l5 A6 p+ o9 O
{"name": "id", "type": "int"},
* J" \$ e3 @4 U, Y+ @" `{"name": "email", "type": ["string", "null"]},! m0 v" E$ G- A0 w
{"name": "phone", "type": {"type": "array",( R; g2 z. s6 K3 ^% [+ I4 `
"items": {"type": "record", "name": "PhoneNumber",# S! ?) o e9 d$ l. R
"fields": [
3 W M+ K& H. E) ^- u/ X{"name": "number", "type": "string"},7 G2 ~5 k/ b* D$ u5 _
{"name": "type", "type": ["int", "null"]}) L. I2 ]4 n8 y0 r. N& R B" z
]
) \9 k3 M- \( f, L* Z+ ^" Q# ~5 p}
4 a% H9 H5 O, Z# T& O}
& v. W+ E, I9 L) I! z}]
1 @0 O; \/ S: s9 I2 F} y: b* C9 V; U" ^8 t, K. z
步骤2 使用Avro编译器生成Java语言, 命令如下:
! `" s& o2 w) n- ]8 i& P7 ljava -jar avro-tools-1.7.4.jar compile schema person.avro .
& H+ J/ |7 Z, d% T. U* @5 I$ U注意, 上面的命令运行时的当前路径是person.avro所在目录。0 u" l" m4 Q, k' }( P5 ^. N
步骤3 使用Avro库提供的API来编写应用程序。 该例子创建一个Person对象, 先将该对象保存到文件example.txt中, 之后从文
* I+ @5 x& w; L件中读出并打印。
: v* X; f( I5 M+ npublic class AvroExample {
+ Q6 N+ i- ` ]: m! Hstatic public void main(String[] argv) {2 A' D# N' K! _8 K
PhoneNumber phoneNumber1 = PhoneNumber.newBuilder()
6 j+ E- j7 f0 W.setNumber("15110241024")
6 k, `6 Y3 U. }& v1 U# _2 f- ?5 E, q.setType(0).build();
+ G$ S' J( \" T {9 |PhoneNumber phoneNumber2 = PhoneNumber.newBuilder()
8 x; r7 i5 i3 ? `+ j" o.setNumber("01025689654")) r+ O7 a) _ U% P3 R( i
.setType(1).build();% r- h; r, V- |3 ], t a4 k
List<PhoneNumber> phoneNumbers = new ArrayList<PhoneNumber>();
$ v1 n; H# N& M4 K( @3 ~( t. MphoneNumbers.add(phoneNumber1);: v. B2 P$ D8 g, @5 K
phoneNumbers.add(phoneNumber2);8 g: w) D& O X/ n
Person person = Person.newBuilder()
! z8 _% N% b5 S/ z1 x1 ~.setName("Dong Xicheng")
! }* L2 d" m- A- z% W) v.setEmail("dongxicheng@yahoo.com")
& u4 ?& ~( ]6 L+ f5 F) R3 A2 Q. ].setId(11111)
, {) V) d# U5 u* [.setPhone(phoneNumbers).build();
) y- j! g# e6 ~2 N f1 M: c1 OFile file = new File("person.txt");: l0 R' e! x- l) U0 l
try {; \8 U% u; C) Y$ m$ k6 k
DatumWriter<Person> personDatumWriter = new SpecificDatumWriter<Person>(Person.class);- n; O5 _- ~' f5 K7 n" F9 `
DataFileWriter<Person> dataFileWriter = new DataFileWriter<Person>(personDatumWriter);4 s9 n- O; `! a0 o! ~
dataFileWriter.create(person.getSchema(), file);% d0 H# ?/ f. ]
dataFileWriter.append(person);
# m4 f k3 ]$ V/ }% d- fdataFileWriter.close();( E9 R4 U* L* Z3 Z) s
} catch(Exception e) {
: c- ^0 w# p9 ]0 A/ ~. S- e: ~' r, fSystem.out.println("Write Error:" + e);
' `* l+ l0 C" Z5 t: h4 S}t
* k$ U0 X- l2 S; Qry {" l: X* T5 n G
DatumReader<Person> userDatumReader = new SpecificDatumReader<Person>(Person.class);
) T0 V' x9 M5 Y2 qDataFileReader<Person> dataFileReader = new DataFileReader<Person>(file, userDatumReader);
6 S: [9 k% }# r v( Qperson = null;
3 N$ L9 k1 W; |. ]$ xwhile (dataFileReader.hasNext()) {
l( ~6 H8 m7 y+ _. hperson = dataFileReader.next(person);
, G0 S. z; J, _System.out.println(person);
5 e$ G7 n% g5 A+ d5 j}- s0 c `* m" m( @5 _1 w: J
} catch(Exception e) {
) W' X/ }- {, u* }System.out.println("Read Error:" + e);
8 T! ^( t' s, G5 s}! ~4 f, V6 {0 E/ {# r2 }
}
9 x. z$ B6 ^! e) T1 O- j" K}
]8 S, h1 u4 x# u2 |如果不想编译person.avro文件, 需要使用另外一套应用程序接口, 具体可参 考官方文档 [4] 。0 l$ K+ ^- y$ r& q4 L
Apache Avro最初是为Hadoop量身打造的RPC框架, 考虑到 稳定性 [5] , YARN暂时采用Protocol Buffers作为序列化库, RPC仍& y! X! m) L/ r+ A6 e: |. L' o
使用MRv1中的RPC, 而Avro则作为日志序列化库使用( 将在第8章介绍) 。 在YARN MapReduce中, 所有事件的序列化/反序列化" |) Y$ s$ Z/ l! J% B8 D3 z
均采用Avro完成, 相关定义在Events.avpr文件中, 举例如下:
% z$ O5 R, i4 j; V: a0 D{"namespace": "org.apache.hadoop.mapreduce.jobhistory",& E' u! [- R9 s5 i6 p3 Y
"protocol": "Events",: W+ l z. ~- l: s( U$ [: c
"types": [: v/ v& U+ U) f
…{
5 ?0 n- g" c( k; p! H"type": "record", "name": "JobInfoChange",6 ^8 m( p& X* K7 z" N
"fields": [" Z* h! ]7 Z5 b( O0 |1 G! c
{"name": "jobid", "type": "string"},
3 t0 L/ t3 U5 n' \ c) B{"name": "submitTime", "type": "long"},
2 d1 @# y% ?* E5 M* {7 q9 L8 Z{"name": "launchTime", "type": "long"}
- P1 P$ V$ h4 l; W% W7 c" `]
" }# `' T8 @ U: }8 g- J- g},
7 y6 c- A, r) B" v{"type": "record", "name": "JobPriorityChange",; m. ?+ N) L( ~1 w; l) g% \
"fields": [3 l% m5 |2 N- N
{"name": "jobid", "type": "string"},/ h3 T) ?1 c4 j+ G
{"name": "priority", "type": "string"}9 f3 U* ]( b' M- C
]* V) _9 P" G5 D1 Y" K' G, X" g
},
9 b# ~, N( L# t* P8 ]{"type": "record", "name": "JobStatusChanged",
3 Y% m2 B. E. F! n+ J7 e"fields": [6 ?/ d/ c# [) }$ L0 M; l1 J( m
{"name": "jobid", "type": "string"},/ F" R: c* `+ [
{"name": "jobStatus", "type": "string"}1 G6 ~; H) v; I
]
' N" |/ w% q w3 M, V" `2 d* S2 U},% M1 I# T, e9 S& f: H% W
…]
+ {- {/ q8 g5 d3 C}; K8 U. O; K3 e1 l3 X1 s0 {' D. N
[1] 参见网址http://code.google.com/p/protobuf/。. Q' w8 v5 W5 Y$ `* u
[2] 可参考第三方开源实现, 网址为http://code.google.com/p/protobuf/wiki/ThirdPartyAddOns。
( T, s" S- q( {% x# s( H. ? a[3] 参见网址http://avro.apache.org/。
& O6 A& T, i& h- t[4] 参见网址http://avro.apache.org/docs/current/gettingstartedjava.html。
9 B. p- R" g0 V1 N! [. {, @[5] YARN项目启动时, Apache Avro尚不成熟, 存在各种问题。
5 T2 I0 i/ W |* m8 d' [( @( Z) i
4 \1 \ `! P0 D( U9 t( u! C/ i1 a; M3 n" f) W1 ^
|
|