|
3.2 第三方开源库" Y, r3 S9 ~* k+ A
3.2.1 Protocol Buffers% a; ~0 g$ ?1 J5 R8 ~ y
Protocol Buffers [1] 是一种轻便高效的结构化数据存储格式, 可以用于结构化数据序列化/反序列化。 它很适合做数据存储或
1 c& S! _3 C$ ]6 D7 sRPC 的数据交换格式, 常用作通信协议、 数据存储等领域的与语言无关、 平台无关、 可扩展的序列化结构数据格式。 目前支持1 M- ?. y0 ]1 o& s
C++、 Java、 Python三种语言。 在Google内部, 几乎所有的RPC协议和文件格式都是采用Protocol Buffers。
" Z$ v9 J, e8 `/ u8 B# ^相比于常见的XML格式, Protocol Buffers官方网站这样描述它的优点:
5 g9 E5 x o& x( U* s! C❑平台无关、 语言无关; n$ J4 r; A' `( ~/ I+ T
❑高性能, 解析速度是XML的20~100倍;9 j9 f, a* {% J1 B* [/ d% t8 w
❑体积小, 文件大小仅是XML的1/10~1/3;- N5 d. ?% e, h: t9 l9 k- `7 C) y- _
❑使用简单;* y) t' \* e x1 N! ^
❑兼容性好。
3 Y. ?1 s9 S# d" S通常编写一个Protocol Buffers应用需要以下三步:
6 p6 x6 \$ U. e8 ~, K v. g1) 定义消息格式文件, 通常以proto作为扩展名;
4 w& p! q9 c# B; ~: E2 b! q! _4 ?2) 使用Google提供的Protocol Buffers编译器生成特定语言( 目前支持C++、 Java、 Python三类语言) 的代码文件;$ B# _7 _. t) N7 t; ]) w: G
3) 使用Protocol Buffers库提供的API来编写应用程序。
n1 v8 \) t& G为了说明Protocol Buffers的使用方法, 下面给出一个简单的实例。
. {7 ^9 k" n- k该实例中首先定义一个消息格式文件person.proto, 描述了通讯录中一个人的基本信息, 接着用Protocol Buffers提供的方法将# t) M) l) p8 ^5 r [/ U* E
一个人的信息写入文件。9 y& B3 Y! |8 c& g- F9 X
步骤1 定义消息格式文件person.proto, 该文件描述了通讯录中某个人的基本信息, 内容如下:' I# u+ D {% a0 K
package tutorial; //自定义的命名空间7 I6 Y5 ~2 s5 k. F+ y
option java_package = "com.example.tutorial"; //生成文件的包名
+ k9 @6 ?# Z: zoption java_outer_classname = "PersonProtos"; //类名
2 [* K( ^! x i9 Y; a( }- P# e* E- nmessage Person { //待描述的结构化数据
7 g. l0 C, E7 mrequired string name = 1; //required表示这个字段不能为空2 L0 R. b7 h+ V5 V, \ ~* q
required int32 id = 2; //数字“2”表示字段的数字别名
& |% P$ U$ F/ ~ ^) P2 Ooptional string email = 3; //optional表示该字段可以为空 A/ R) l: p' F, X9 Q! i; S G
message PhoneNumber { //内部message4 e* M& z" e& Y# D! |3 v* E
required string number = 1;
9 N- o( k [, ]: e" f; N" aoptional int32 type = 2;
3 ^0 q; O1 o/ w, ~}r- `% t; k2 H- m
epeated PhoneNumber phone = 4;1 Q4 @9 ~* ?8 f5 s
}
; i+ D5 ]2 W* m: ~步骤2 使用Google提供的Protocol Buffers编译器生成Java语言, 命令如下:, K" T$ x: K F' \
protoc -java_out=. person.proto
' t. f8 Y" Y5 ~; H/ |7 c注意, 上面的命令运行时的当前路径是person.proto所在目录。, `3 \% R* H' e: p/ N2 M& r( U9 s
步骤3 使用Protocol Buffers库提供的API编写应用程序。 该例子创建了一个Person对象, 先将该对象保存到文件example.txt) H0 E. W M% J
中, 之后又从文件中读出并打印出来。
: u$ L$ W9 z6 O3 K& V& J* [public class ProtocolBufferExample {5 r! o) V9 M" Z( R
static public void main(String[] argv) {
2 F- A7 S. Y' `/ JPerson person1 = Person.newBuilder(), g5 V5 Z. m/ n3 J9 M- d
.setName("Dong Xicheng")
" s- d9 p! H" |. ]4 d.setEmail("dongxicheng@yahoo.com")1 E' y6 m3 ^- [% Q& U. H
.setId(11111)
+ q. l2 I9 o6 k6 h; U5 n9 `. ?% H" ^.addPhone(Person.PhoneNumber.newBuilder()
. p" z* f" F! b$ z( l' s.setNumber("15110241024")
: k3 q( [$ I; M5 u) ?) r, g+ ?.setType(0))
4 P. N5 P* \+ j2 E.addPhone(Person.PhoneNumber.newBuilder()
' K: Q4 y" L& w `" i1 Q.setNumber("01025689654")
+ S% k) c2 q' f.setType(1)).build();, [2 d1 {/ N0 I
try {
" B" ~6 k3 |9 \+ B5 V* `FileOutputStream output = new FileOutputStream("example.txt");0 c1 X3 j! A4 n" b i, i3 E
person1.writeTo(output);. w, K: h# g4 w( M( ?, X5 T
output.close();$ T9 I: w. Q9 R0 v c! |& }
} catch(Exception e) {
, g( n- w6 U" b' a# I- \7 |0 m XSystem.out.println("Write Error! ");8 _) U! C2 j& \( R5 K" g; ` V
} t- C1 N; V0 b/ s
ry {8 R3 X7 t# A3 W0 m- f
FileInputStream input = new FileInputStream("example.txt");* U( l$ `6 b6 M
Person person2 = Person.parseFrom(input);
! K+ f- I, w4 n. q& {System.out.println("person2:" + person2);; z1 Q" }- t2 n3 n2 [
} catch(Exception e) {
& G6 N1 X3 M7 F8 Y, YSystem.out.println("Read Error!");3 }% T' l6 i0 B2 r
}' j3 w$ l( i8 q4 Q8 G% I' y, m
} T$ K! L- ]4 v9 g7 T! z
}' g0 d: h) y1 }6 S& a: x: y) y b
在YARN中, 所有RPC函数的参数均采用Protocol Buffers定义的, 相比MRv1中基于Writable序列化的方法, Protocol Buffers的引
9 ~: t( P/ v* e. J入使得YARN在向后兼容性和性能方面向前迈进了一大步。/ B! Z6 @* E* C) k* l* f
除序列化/反序列化之外, Protocol Buffers也提供了RPC函数的定义方法, 但并未给出具体实现, 这需 要用户自行实现 [2] , 而
/ z3 r$ e& P- u# Q) DYARN则采用了MRv1中Hadoop RPC库, 举例如下:
2 F) @# r9 |: J5 Q! ?. i8 Sservice ContainerManagerService { //这是YARN自带的ContainerManager协议的定义# o, h7 X# k7 {; S8 x9 u
rpc startContainer(StartContainerRequestProto) returns (StartContainerResponseProto);
7 h" {8 e0 L+ E1 D: a4 K! W+ Grpc stopContainer(StopContainerRequestProto) returns (StopContainerResponseProto);
& @. ~9 F9 o% Z7 D; Q* Y& m/ Zrpc getContainerStatus(GetContainerStatusRequestProto) returns (GetContainerStatusResponseProto);& j# ^# M9 ~. w; f
}
5 P& R5 H- O/ V, h在第2章中, 介绍了YARN中的所有RPC协议, 而这些协议全是使用Protocol Buffers定义的, 具体如下:
+ ~3 M$ Q( x" h ? J❑applicationmaster_protocol.proto: 定义了AM与RM之间的协议—Application-MasterProtocol。, E$ W6 g" B8 d! J% \( ?
❑applicationclient_protocol.proto: 定义了JobClient( 作业提交客户端) 与RM之间的协议—ApplicationClientProtocol。
' y. U. M) J j❑containermanagement_protocol.proto: 定义了AM与NM之间的协议—Container-ManagementProtocol。
" I. G6 A/ g2 ]❑resourcemanager_administration_protocol.proto: 定义了Admin( 管理员) 与RM之间的通信协议—* s1 J0 h1 w* W/ m
ResourceManagerAdministrationProtocol。3 T9 u- i, s% ~' R9 x/ I
❑yarn_protos.proto: 定义了各个协议RPC的参数。
5 f% R* A' {. N5 x3 P4 q a5 ~❑ResourceTracker.proto: 定义了NM与RM之间的协议—ResourceTracker。+ Y. G g0 n: T
除了以上几个内核中的协议, YARN还使用Protocol Buffers对MapReduce中的协议进行了重新定义:
& s* P4 A+ @# S9 \$ y❑MRClientProtocol.proto: 定义了JobClient( 作业提交客户端) 与MRAppMaster之间的协议—MRClientProtocol。
G; T9 W9 y& `3 L: h8 a❑mr_protos.proto: 定义了MRClientProtocol协议的各个参数。! T6 _2 `3 D' \
3.2.2 Apache Avro
1 E, |: I& e# O0 {# P! oApache Avro [3] 是Hadoop下的一个子项目。 它本身既是一个序列化框架, 同时也实现了RPC的功能。, ]" z/ \; |1 ^7 X: D
Avro官网描述Avro的特性和功能如下:
# P) o e, D; s# |❑丰富的数据结构类型;
- y: C" {( R. I❑快速可压缩的二进制数据形式;0 T% ^5 D. ]. ^& C2 C# l( k
❑存储持久数据的文件容器;, Z1 \: M( h1 K; m1 \4 x
❑提供远程过程调用RPC;
+ j: i$ z5 S- h. E; H+ i$ d❑简单的动态语言结合功能。
! N* R. j& {' o) I相比于Apache Thrift 和Google的Protocol Buffers, Apache Avro具有以下特点:
# I4 Y9 D& v0 B- y- p6 ^❑支持动态模式 。 Avro不需要生成代码, 这有利于搭建通用的数据处理系统, 同时避免了代码入侵。
( g9 z6 W- b b m❑数据无须加标签 。 读取数据前, Avro能够获取模式定义, 这使得Avro在数据编码时只需要保留更少的类型信息, 有利于1 G: U4 A) ^/ r& v" [ J
减少序列化后的数据大小。
* g! k4 u5 [% v- D❑无须手工分配的域标识 。 Thrift和Protocol Buffers使用一个用户添加的整型域唯一性定义一个字段, 而Avro则直接使用域$ J' M( v! c8 E- O. M: y
名, 该方法更加直观、 更加易扩展。
6 T) E+ R1 M( O, A! l3 V编写一个Avro应用也需如下三步:* e e/ z& K6 F3 A3 o! X
1) 定义消息格式文件, 通常以avro作为扩展名;* ?7 a- j: H1 ?' |
2) 使用Avro编译器生成特定语言的代码文件( 可选) ; f+ o+ o$ }' {2 `8 \4 n/ p
3) 使用Avro库提供的API来编写应用程序。
7 m3 a+ P8 x+ V x8 a下面给出一个使用实例。, T7 T% ]" z1 ^( \3 o9 r& o
步骤1 定义消息格式文件person.avro, 该文件描述了通讯录中某个人的基本信息, 内容如下:( v0 |" J: q2 e4 w! Y2 l7 O3 ~
{"namespace": "com.example.tutorial",
8 T$ {/ O* ^) K" S, a"type": "record",7 ?5 V: \) Y* ~: F+ _+ d
"name": "Person",
: T: |+ P- X2 K8 Y7 V1 r"fields": [; O" s. g! ^( T' \* v
{"name": "name", "type": "string"},
: {! |! ^) T* F{"name": "id", "type": "int"},
: m2 Q$ L/ H$ B0 U- p0 I |! h{"name": "email", "type": ["string", "null"]},
$ ?. z% b8 o8 {/ V9 C* m$ H$ ]{"name": "phone", "type": {"type": "array",, y& F& u7 ~, G
"items": {"type": "record", "name": "PhoneNumber",$ n9 _$ E; k% [1 J2 C2 }
"fields": [
) f( I5 k: k0 r$ J. |{"name": "number", "type": "string"},
6 F: K- P- H' v{"name": "type", "type": ["int", "null"]}
8 x4 Y o( y( `' j+ G) Q0 ?]/ E- {( {/ R8 K4 m0 a m
}
4 K c3 l# _/ `. ]0 o9 J}, z; [! I! F" |! k
}]* J% T& o u+ {( d) {
}
% y4 z: [9 ?' F, B& J步骤2 使用Avro编译器生成Java语言, 命令如下:; ?1 n1 C; ], G1 s
java -jar avro-tools-1.7.4.jar compile schema person.avro .& p, F2 n- P! C$ C
注意, 上面的命令运行时的当前路径是person.avro所在目录。
3 l; x! i5 w. x; S2 M1 Q+ \步骤3 使用Avro库提供的API来编写应用程序。 该例子创建一个Person对象, 先将该对象保存到文件example.txt中, 之后从文3 ]6 H' i7 R$ J6 k2 G% y4 E O
件中读出并打印。
' W, m' @* v, Y$ R+ hpublic class AvroExample {
1 b% t3 d( s# L" o* i) Dstatic public void main(String[] argv) {
- ?5 U4 y7 W! N& s. d+ s/ ^) ZPhoneNumber phoneNumber1 = PhoneNumber.newBuilder()
+ k+ p1 b* a6 T2 a+ S1 }.setNumber("15110241024") k( |6 l3 X( @) ~5 S
.setType(0).build();: U. K5 f+ G( d; X2 m3 o# |) w
PhoneNumber phoneNumber2 = PhoneNumber.newBuilder()' V, Q) H/ W; g: C" ~
.setNumber("01025689654")
, f( d# q, ]* a7 E.setType(1).build();# I1 R. J' f1 O/ c! r4 m
List<PhoneNumber> phoneNumbers = new ArrayList<PhoneNumber>();
& q' g+ B: N; ]9 `$ o; j. }% ^2 pphoneNumbers.add(phoneNumber1);
3 Q. _8 {, q4 {3 u) i/ l. r7 D0 [' U4 i/ IphoneNumbers.add(phoneNumber2);% N/ Y; j1 Y! N2 z F+ _$ M
Person person = Person.newBuilder()
3 N/ j. w6 i; s6 ?& [.setName("Dong Xicheng")
* x! w1 v" l' A) A: w0 y.setEmail("dongxicheng@yahoo.com")) E6 y* O. Q# f4 P" k
.setId(11111)
) a, m& E4 E6 L* O1 n- @.setPhone(phoneNumbers).build();: u# m# e9 T9 k
File file = new File("person.txt");
* F: e( o" d" u' ^ @try {! j7 t, C9 L$ E& t& B; _
DatumWriter<Person> personDatumWriter = new SpecificDatumWriter<Person>(Person.class); ]6 Q! f0 T8 H0 R9 D
DataFileWriter<Person> dataFileWriter = new DataFileWriter<Person>(personDatumWriter);
& B. m3 v- X- h3 P; A' P' hdataFileWriter.create(person.getSchema(), file);
8 X6 d9 A5 S) b( x- ndataFileWriter.append(person);
* {" i* l6 {2 g! `+ [dataFileWriter.close();' w* L4 x- K9 }; C% t5 M8 @
} catch(Exception e) {
/ `0 E( g% I9 j) o5 @5 A8 ~System.out.println("Write Error:" + e);
8 N6 L5 ?% g0 i/ b+ f* |5 y}t- J+ C( s X7 _9 }
ry {
- z2 S7 o5 Q/ ^- JDatumReader<Person> userDatumReader = new SpecificDatumReader<Person>(Person.class);. A7 R5 H+ f4 R" j* g2 H- c. {' z
DataFileReader<Person> dataFileReader = new DataFileReader<Person>(file, userDatumReader);& [6 L1 @: j8 D! R
person = null;
4 r/ j0 E; g+ j7 B) F% ~9 ], O% cwhile (dataFileReader.hasNext()) {
( y7 D9 Q/ k5 jperson = dataFileReader.next(person);
/ z" g$ P( g# k6 l+ H) o' p# OSystem.out.println(person);
2 c# S2 ^( `2 c6 X% f f4 l}; Z# A. R( z: q2 N
} catch(Exception e) {# n3 m2 ]! \1 [0 `/ F( d4 U
System.out.println("Read Error:" + e);
4 Y1 w2 p5 P- \3 s. d' z8 v. ?}" \1 z4 r+ J# W! X6 t& o! ~( C5 S
}) i5 T$ J* ^- V+ l) N
}
/ ~3 s8 ~; ~0 {! k5 ~% Y3 Q$ F如果不想编译person.avro文件, 需要使用另外一套应用程序接口, 具体可参 考官方文档 [4] 。' \7 m, o d1 f
Apache Avro最初是为Hadoop量身打造的RPC框架, 考虑到 稳定性 [5] , YARN暂时采用Protocol Buffers作为序列化库, RPC仍
. I v; ?" `+ M4 l+ i* d使用MRv1中的RPC, 而Avro则作为日志序列化库使用( 将在第8章介绍) 。 在YARN MapReduce中, 所有事件的序列化/反序列化: U4 W: _5 c) ~% x8 d$ L
均采用Avro完成, 相关定义在Events.avpr文件中, 举例如下:
0 j4 J0 q2 \* V. m{"namespace": "org.apache.hadoop.mapreduce.jobhistory",3 z* \5 E9 ?( S. P
"protocol": "Events",4 v7 g) H; I2 z- K* _' `5 o0 ` g
"types": [1 y6 }, {' E% u. z0 W7 q4 G% u
…{
j, \1 R3 J0 ?' H- ~) t"type": "record", "name": "JobInfoChange",2 B% O" o- w0 n7 Y1 O( g
"fields": [# {( [/ G! M( n+ R- ]
{"name": "jobid", "type": "string"},
) Q4 g" P% a4 ]) l{"name": "submitTime", "type": "long"},
8 {4 Z# m1 R$ P! d7 T4 F# M{"name": "launchTime", "type": "long"}6 j! x R" Z9 I! O, n' L* `3 [5 X* q8 x
]: {8 E4 W% \$ |$ V/ ~" H" q% e
},
% R2 Z0 k. X& o3 |# ~) b9 L$ {$ V{"type": "record", "name": "JobPriorityChange",7 ]) a- u) o' L$ `; ?+ @
"fields": [
+ h0 a# ?# ~4 G1 I- u- P{"name": "jobid", "type": "string"},: j" Z! c! j4 v+ ?' e: c
{"name": "priority", "type": "string"}
1 X/ D( _8 |! M# z: K8 U, C; v" X]+ C6 f5 ~: {4 v8 z- G
},/ \0 l: a l# b7 Y |. i$ Y
{"type": "record", "name": "JobStatusChanged",1 D; b+ h& t) ?
"fields": [# C: N" o4 b% p2 q7 @% X( b* p
{"name": "jobid", "type": "string"},) ?/ \( N) Y n) N9 v5 V
{"name": "jobStatus", "type": "string"}8 y* ?% d: ]) z1 W# j
]5 N: `2 u9 z3 A8 M# o$ x, H. v
},
2 W# P E" L6 _7 E/ |…]
/ X7 Y3 e7 K! O* M e}/ P& R( U& M4 G' K* ]! A1 q0 S
[1] 参见网址http://code.google.com/p/protobuf/。+ x, S2 q0 G* U2 G+ N% n
[2] 可参考第三方开源实现, 网址为http://code.google.com/p/protobuf/wiki/ThirdPartyAddOns。, }1 U* M) q3 m" y9 ^
[3] 参见网址http://avro.apache.org/。
: c4 c0 q S1 H$ k& X[4] 参见网址http://avro.apache.org/docs/current/gettingstartedjava.html。- m/ E! f' ?2 l( h& f9 t: w
[5] YARN项目启动时, Apache Avro尚不成熟, 存在各种问题。
& k8 F5 Z. `% k7 k2 l& A
2 ^+ o+ R6 o! y( N V' Y. q" h# m4 s$ r' s
|
|