|
13.3 MapReduce扩展
! O% B% y$ E# L xMapReduce框架有效地解决了海量数据的离线批处理问题,在各大互联网公司得8 _& M7 ?4 F3 z" o5 `0 G3 s
到广泛的应用。事实已经证明了MapReduce巨大的影响力,以至于引发了一系列的扩
# [$ {8 F$ T) u: a- |( ^- w展和改进。这些扩展包括:0 Q9 w6 `0 j7 Z7 e% ~) H& Z4 N
●Google Tenzing:基于MapReduce模型构建SQL执行引擎,使得数据分析人员可, Q% F0 M6 a2 u) `& w) I# F! u
以直接通过SQL语言处理大数据。) R' Z' F: l* x! K) |; t+ Q6 o- z
●Microsoft Dryad:将MapReduce模型从一个简单的两步工作流扩展为任何函数: f# p0 b9 ]+ x) l+ Z
集的组合,并通过一个有向无环图来表示函数之间的工作流。$ w2 _1 b2 r0 ]4 K
●Google Pregel:用于图模型迭代计算,这种场景下Pregel的性能远远好于1 l0 V9 t8 c5 U
MapReduce。
+ p4 M% h3 @% I$ [; b' P7 u13.3.1 Google Tenzing- p: p/ Y4 a ^9 `% g
Google Tenzing是一个构建在MapReduce之上的SQL执行引擎,支持SQL查询且能
/ o, k* H" X/ A+ F9 f7 T* H% E6 p够扩展到成千上万台机器,极大地方便了数据分析人员。9 q. I- f! k5 s/ k# w
1.整体架构! W: H0 W9 }* J( O* z* o
Tenzing系统有四个主要组件:分布式Worker池、查询服务器、客户端接口和元
. ~+ L3 }/ [ Q* ]0 u0 f数据服务器,如图13-2所示。
3 B4 q( a9 t9 M5 O4 G: @4 [' n图 13-2 Tenzing整体架构' V; k& {, q0 k
●查询服务器(Query Server):作为连接客户端和worker池的中间桥梁而存在。
/ v+ Q: t: j: z6 M6 Y& d查询服务器会解析客户端发送的查询请求,进行SQL优化,然后将执行计划发送给分
2 f! h* M) `% ~+ u5 C0 L' g4 L布式Worker池执行。Tenzing支持基于规则(rule-based optimizer)以及基于开销" L) u6 H( b/ p( G$ I
(cost-based optimizer)两种优化模式。5 [7 L/ `) n, ^* s" b9 i
●分布式Worker池:作为执行系统,它会根据查询服务器生成的执行计划运行$ ?7 Z: k6 Y+ ?& V2 A' N) ^
MapReduce任务。为了降低查询延时,Tenzing不是每次都重新生成新进程,而是让进 d! Z9 c& m/ P! j
程一直处于运行状态。Worker池包含master和worker两种节点,其中,master对应2 q. g2 t9 r. b9 W4 g$ x
MapReduce框架中的master进程,worker对应MapReduce框架中的map和reduce进程。
( ]. I+ o8 L$ W* J' P! L3 R! M1 f另外,还有一个称为master监听者(master watcher)的守护进程,查询服务器通过3 M5 ~& C f6 \
master监听者获取master信息。* j! x O( S8 T. ^- J* \
●元数据服务器(Metadata Server):存储和获取表格schema、访问控制列表6 S- W# v* k8 @) Y8 k7 [
(Access Control List,ACL)等全局元数据。元数据服务器使用Bigtable作为持久化的. W3 c1 B) q1 V" R# `7 q, N
后台存储。6 t1 @' \5 {1 m/ F, }
●客户端接口:Tenzing提供三类客户端接口,包括API、命令行客户端(CLI)以
3 F. S0 g& e3 S, k/ l# W) V) Y及Web UI。
; ?' n, f! B p; R7 t: i0 n●存储(Storage):分布式worker池中的master和worker进程执行MapReduce任务- j8 D7 E& r& e$ y# L
时需要读写存储服务。另外,查询服务器会从存储服务获取执行结果。
# C1 Q: v) D1 B# E/ b2 y0 n2.查询流程5 {2 a& Q0 P7 w! `3 |! ^8 Y6 g) D ~
1)用户通过Web UI、CLI或者API向查询服务器提交查询。
0 y0 Q4 ?6 S! i7 |+ t, l! k2)查询服务器将查询请求解析为一个中间语法树。: `, L8 x) f- G5 Q
3)查询服务器从元数据服务器获取相应的元数据,然后创建一个更加完整的中
9 H% y; e9 s @间格式。( y$ K5 A% z% @3 c. d3 R
4)优化器扫描该中间格式进行各种优化,生成物理查询计划。" {3 k1 L2 }# K
5)优化后的物理查询计划由一个或多个MapReduce作业组成。对于每个% e+ p9 j; ?, Q
MapReduce作业,查询服务器通过master监听者找到一个可用的master,master将该作业' `5 v- l) _, t* K
划分为多个任务。
) |* H' o% |0 x" w6)空闲的worker从master拉取已就绪的任务。Reduce进程会将它们的结果写入
. o& Y' r. B, R( K$ L1 L到一个中间存储区域中。 Q( y( |; _" H {# D) `$ l' O2 p
7)查询服务器监控这些中间存储区域,收集中间结果,并流失地返回给客户9 ]) g3 z2 L! z- M, O: V, x$ i
端。
/ Y" W5 g5 q& z& q3.SQL运算符映射到MapReduce
* Z8 j7 d' ]' p: J* H查询服务器负责将用户的SQL操作转化为MapReduce作业,本节介绍各个SQL物! ]% `2 g Y$ W8 l% o+ J" ?' H& o
理运算符对应的MapReduce实现。
2 o& F+ \7 M1 ]2 b/ ](1)选择和投影
- J: q A# ]& R% X选择运算符σ C (R)的一种MapReduce实现如下。
. e3 R! w7 R( ^5 u) uMap函数:对R中的每个元素t,检测它是否满足条件C。如果满足,则产生一
( h7 A* w, p# L$ {个“键-值”对(t,t)。也就是说,键和值都是t。2 i' k/ X0 b* w. H/ w5 A- b7 p. Q
Reduce函数:Reduce的作用类似于恒等式,它仅仅将每个“键-值”对传递到输出
8 B) ?. P7 z3 r0 j2 \: n( t( ~: ^. y部分。
3 T7 w0 o3 d$ C, v) F) C投影运算的处理和选择运算类似,不同的是,投影运算可能会产生多个相同的- a0 w! U8 h7 N" H. n. F' b! \. u
元组,因此Reduce函数必须要剔除冗余元组。可以采用如下方式实现投影运算符
* [7 Z5 U3 O+ p; c7 Gπ S (R)。% n" A. h. B- l! i' q! H$ E
Map函数:对R中的每个元组t,通过剔除属性不在S中的字段得到元组t',输出一 p$ h' ?8 ~+ m) Z/ `
个“键-值”对(t',t')。
7 ~* p1 F& t x$ [% q; GReduce函数:对任意Map任务产生的每个键t',将存在一个或多个“键-值”对; @& b8 o) Y: E4 T+ d
(t',t'),Reduce函数将(t',[t',t',…,t'])转换为(t',t'),以保证对该键t'只产
( l# y5 |( Q6 U7 q2 b7 P$ z生一个(t',t')对。7 ^6 h7 h! }8 J6 o
Tenzing执行时会做一些优化,例如选择运算符下移到存储层;如果存储层支持4 f3 N0 ^+ P w& p4 f" ]
列式存储,Tenzing只扫描那些查询执行必须的列。3 K8 w2 H& }7 i$ N- `5 y1 O& b0 m
(2)分组和聚合1 p5 x$ G) e7 r8 L" V4 p, I
假定对关系R(A,B,C)按照字段A分组,并计算每个分组中所有元组的字段B
A2 o4 N4 M* u0 n( w之和。可以采用如下方式实现γ A,SUM(B) (R)。5 C. \6 M E( U( b* e5 O
Map函数:对于每个元组,生成“键-值”对(a,b)。3 d8 Y, R9 u' K# c" F* Q, k) g* j
Reduce函数:每个键a代表一个分组,对与键a相关的字段B的值的列表[b 1 ,b 2 ,/ f' t. [9 [2 J% g4 _
…,b n ]执行SUM操作,输出结果为(a,SUM(b 1 ,b 2 ,…,b n ))。
' s. f1 |* E* m! S. OTenzing支持基于哈希的聚合操作,首先,放松底层MapReduce框架的限制,, w7 U& ?# Z7 _* K
shuffle时保证所有键相同的“键-值”对属于同一个Reduce任务,但是并不要求按照键! \7 G2 E- n' p; t B
有序排列。其次,Reduce函数采用基于哈希的方法对数据分组并计算聚合结果。7 d$ Y$ o9 J" `
(3)多表连接
6 i) N; i" T$ ]大表连接是分布式数据库的难题,MapReduce模型能够有效地解决这一类问题。
3 v) y5 b( o: G# D常见的连接算法包括Sort Merge Join、Hash Join以及Nested Loop Join。# R# s( z. g% W0 D- ]4 w8 v1 t, \4 n
假设需要将R(A,B)和S(B,C)进行自然连接运算,即寻找字段B相同的元
2 B$ J! @; ]( N$ M) r5 ^) ~" M4 |1 Q组。可以通过Sort Merge Join实现如下:' f' l( U, z b) m
Map函数:对于R中的每个元组(a,b),生成“键-值”对(b,(R,a)),对S中的# b9 A2 ]$ q7 T9 |2 ]
每个元组(b,c),生成“键-值”对(b,(S,c))。
2 |1 {9 c' c* `1 F0 EReduce函数:每个键值b会与一系列对相关联,这些对要么来自(R,a),要么来
6 H Y: y' k' G$ T* o/ t自(S,c)。键b对应的输出结果是(b,[(a 1 ,b,c 1 ),(a 2 ,b,c 2 ),…]),也就是说,与b9 A" W3 E6 }( ]8 D" k9 f6 Q9 k
相关联的元组列表由来自R和S中的具有共同b值的元组组合而成。
9 S* Y2 l% r) r1 e) O1 ~如果两张表格都很大,且二者的大小比较接近,Join字段也没有索引,Sort9 F( J7 S- H) L2 ~
Merge Join往往比较高效。然而,如果一张表格相比另外一张表格要大很多,Hash
2 z/ k3 _( ?0 W* @Join往往更加合适。
: l# f' E3 M( T% ~1 |假设R(A,B)比S(B,C)大很多,可以通过Hash Join实现自然连接。Tenzing中
3 V! n/ s. ~3 s- Y9 h4 s" X, G) t1 p一次Hash Join需要执行三个MapReduce任务。
( k& b2 l# k9 @6 L7 G3 g4 \, o: dMR1:将R(A,B)按照字段B划分为N个哈希分区,记为R 1 ,R 2 ,…,R N ;* L2 w7 _5 j: {% Z6 ?0 g
MR2:将S(B,C)按照字段B划分为N个哈希分区,记为S 1 ,S 2 ,…,S n ;
- \1 w. j$ R( XMR3:每个哈希分区<R i ,S i >对应一个Map任务,这个Map任务会将S i 加载到内
9 t. H- B2 [4 Y: [存中。对于R i 中的每个元组(a,b),生成(b,[(a,b,c 1 ),(a,b,c 2 ),…]),其中,' P, g& s+ q, f3 ~( B6 w
(b,[c 1 ,c 2 ,…])是S i 中存储的元组。Reduce的作用类似于恒等式,输出每个传入8 G0 c4 G& F& S% i: A
的“键-值”对。
% Y+ x6 j" [# Y( ?. [/ P6 |Sort Merge Join和Hash Join适用于两张表格都不能够存放到内存中,且连接列没
. s6 F, g5 F# c& Y ~7 t有索引的场景。如果S(B,C)在B列有索引,可以通过Remote Lookup Join实现自然; T) K) r+ q$ \9 {% Q, x5 f
连接,如下:
* w% U( n3 S- n& C4 uMap函数:对于R中的每个元组(a,b),通过索引查询S(B,C)中所有列值为b
# M! n* f% C/ O* r9 L的元组,生成(b,[(a,b,c 1 ),(a,b,c 2 ),…])。" g2 W3 _/ g; Z* g3 N
Reduce函数:Reduce的作用类似于恒等式,输出每个传入的“键-值”对。
1 c) {5 Y; {+ n% ?如果S(B,C)能够存放到内存中,那么,Map进程在执行map任务的过程中会将
: S2 B$ u; ~" jS(B,C)的所有元组缓存在本地,进一步优化执行效率。另外,同一个Map进程可) T6 U% t2 }- l
能执行多个map任务,这些map任务共享一份S(B,C)的所有元组缓存。
2 g/ e2 p6 {8 n6 S0 P5 e13.3.2 Microsoft Dryad4 F8 _: U x3 s5 r' P
Microsoft Dryad是微软研究院创建的研究项目,主要用来提供一个分布式并行计
4 |# C0 B* e' P# _算平台。在Dryad平台上,每个Dryad工作流被表示为一个有向无环图。图中的每个
2 \" ?5 I7 D# o1 g) F- P* F节点表示一个要执行的程序,节点之间的边表示数据通道中数据的传输方式,其可
# j8 l( Q, w/ y1 _6 A7 X能是文件、管道、共享内存、网络RPC等。Dryard工作流如图13-3所示。
" L% f7 k9 U' \2 w7 b% ?+ Y* Y1 U图 13-3 Dryad工作流
5 K1 n7 ^( H! w每个节点(vertices)上都有一个处理程序在运行,并且通过数据通道% Q; @# ^( l5 o! I* E6 U3 v# \# d( ^
(channels)的方式在它们之间传输数据。类似于Map和Reduce函数,工作流中的8 g: w& c1 W8 {" O d, t$ r
grep、sed、map、reduce、merge等函数可以被很多节点执行,每个节点会被分配一部1 @; B* p$ f4 j8 Q
分输入。Dryad的主控进程(Job Manager)负责将整个工作分割成多个任务,并分发
+ G5 F( r$ i$ Y+ ^给多个节点执行。每个节点执行完任务后通知主控进程,接着,主控进程会通知后
+ }: E& K5 f' ]9 ]续节点获取前一个节点的输出结果。等到后续节点的输入数据全部准备好后,才可
( H/ t- P! ^5 v7 K2 \4 d以继续执行后续任务。
- y( d# }3 n( D$ O* qDryad与MapReduce具有的共同特性就是,只有任务完成之后才会将输出传递给. C! a4 r) f2 s$ f5 |0 V+ X
接收任务。如果某个任务失败,其结果将不会传递给它在工作流中的任何后续任1 ~2 ?0 Y( X/ U
务。因此,主控进程可以在其他计算节点上重启该任务,同时不用担心会将结果重
6 H: B# W$ G/ X8 T复传递给以前传过的任务。
* w. ^' ?* l5 @相比多个MapReduce作业串联模型,Dryad模型的优势在于不需要将每个
3 ^5 `/ k3 \! [9 d' XMapReduce作业输出的临时结果存放在分布式文件系统中。如果先存储前一个8 E- c) q# I+ J; ^- T) g+ t7 a
MapReduce作业的结果,然后再启动新的MapReduce作业,那么,这种开销很难避# e2 ~% D7 c3 M2 q! k
免。9 k- p y) J- G) M4 R
13.3.3 Google Pregel6 ^, r/ g; a# r- ~
Google Pregel用于图模型迭代计算,图中的每个节点对应一个任务,每个图节点
0 e; I) a. V- T' c5 i9 S, R会产生输出消息给图中与它关联的后续节点,而每个节点会对从其他节点传入的输* f1 Z8 b5 w4 p; d1 o0 v# N% I
入消息进行处理。
" d( X& w2 `4 t# g; l1 u8 v& qPregel中将计算组织成“超步”(superstep)。在每个超步中,每个节点在上一步 [* d- e) U% B3 ~8 M
收到的所有消息将被处理,并且将处理完后的结果传递给后续节点。
: x5 B3 ^# l/ ]7 c5 O8 U, wPregel采用了BSP(Bulk Sychronous Parallel,整体同步并行计算)模型。每个“超0 u( X$ J t+ x, j4 j6 H1 T5 s
步”分为三个步骤:每个节点首先执行本地计算,接着将本地计算的结果发送给图中) S% S$ g& e$ ?6 w4 x2 e4 L2 C$ d# f
相邻的节点,最后执行一次栅栏同步,等待所有节点的前两步操作结束。Pregel模型. R \9 M& Y' R" e5 C9 \
会在每个超步做一次迭代运算,当某次迭代生成的结果没有比上一次更好,说明结+ r B4 u# k7 j. O7 T, d9 B- D6 x
果已经收敛,可以终止迭代。
, b2 L6 l! E4 r! k2 R图 13-4 Pregel BSP计算模型
; L8 R* E1 Y+ Q假设有一个带边权重的图,我们的目标是对图中的每个节点计算到其他任一节6 o2 j \6 X0 O: B& b; u6 |
点的最短路径长度。一开始,每个图节点a都保存了诸如(b,w)对的集合,这表示a
0 j- X! |7 D( I, x; Q/ n2 r X到b的边权重为w。
5 S) G2 Q* X7 Y! P(1)超步# f2 u4 y' g# a. m; _# y
每个节点会将(a,b,w)传递给图中与它关联的后续节点。当节点c收到三元组7 Y! [: S" b6 n
(a,b,w)时,它会重新计算c到b的最短距离,如果w+v<u(假设当前已知的c到a的$ X# ^' M# @. X
最短距离为v,c到b的最短距离为u),那么,更新c到b的最短距离为w+v。最后,消 o3 r5 o7 c' q! T2 v' |; N0 o
息(c,b,w+v)会传递给后续节点。2 X, e' m! ?. y9 G% j% p$ o) p
(2)终止条件6 f- }' i3 K, \6 \4 R8 @7 y" _
当所有节点在执行某个超步时都没有更新到其他节点的最短距离时,说明已经
9 o! n" f: }" l7 D! J& k/ v8 a计算出想要的结果,整个迭代过程可以结束。$ C8 U$ i; r% a4 l
Pregel通过检查点(checkpoint)的方式进行容错处理。它在每执行完一个超步之 [9 x* x& `* C
后会记录整个计算的现场,即记录检查点情况。检查点中记录了这一轮迭代中每个5 G0 v; V* ]* j8 [6 B6 @; ^
任务的全部状态信息,一旦后续某个计算节点失效,Pregel将从最近的检查点重启整# t, k6 R3 C; L
个超步。尽管上述的容错策略会重做很多并未失效的任务,但是实现简单。考虑到. A/ M0 O7 s* ~2 {2 Q
服务器故障的概率不高,这种方法在大多数时候还是令人满意的。
9 s7 t, M4 b. |. S( P: @" L, n4 d: K
|
|