|
13.3 MapReduce扩展
- g% N6 R8 T+ q( _$ [7 ]; K# aMapReduce框架有效地解决了海量数据的离线批处理问题,在各大互联网公司得
, [" E ]$ T Z( u, i到广泛的应用。事实已经证明了MapReduce巨大的影响力,以至于引发了一系列的扩
: [9 I+ s" l" n9 D7 F/ x展和改进。这些扩展包括:4 D* Y6 {+ ?% r) [" a! _/ Q
●Google Tenzing:基于MapReduce模型构建SQL执行引擎,使得数据分析人员可+ x$ \. D+ d' d+ j
以直接通过SQL语言处理大数据。
% [& I7 O% u. R y, B. P' v●Microsoft Dryad:将MapReduce模型从一个简单的两步工作流扩展为任何函数
. |! z; P8 j. o L6 |) C集的组合,并通过一个有向无环图来表示函数之间的工作流。% ?7 n* m x( @# Z, k" U2 [
●Google Pregel:用于图模型迭代计算,这种场景下Pregel的性能远远好于1 L3 N4 x- H& ?' V# P
MapReduce。$ U8 N% ~: E0 K, M/ g
13.3.1 Google Tenzing
2 o9 l: k( d3 g/ x' M }* SGoogle Tenzing是一个构建在MapReduce之上的SQL执行引擎,支持SQL查询且能
4 r! D6 }: V/ @' {8 P够扩展到成千上万台机器,极大地方便了数据分析人员。4 w- X1 r4 @. d i
1.整体架构2 u8 O8 L5 X! c, i
Tenzing系统有四个主要组件:分布式Worker池、查询服务器、客户端接口和元6 v- F& C8 E: K( T3 b
数据服务器,如图13-2所示。4 Y- P, D- v+ K+ N) R, {
图 13-2 Tenzing整体架构- u! R/ x% L8 z( Y7 N3 q: ~
●查询服务器(Query Server):作为连接客户端和worker池的中间桥梁而存在。+ x) i7 s4 P l
查询服务器会解析客户端发送的查询请求,进行SQL优化,然后将执行计划发送给分( m! R1 f @' |" F$ _, `$ d7 {
布式Worker池执行。Tenzing支持基于规则(rule-based optimizer)以及基于开销# S& k3 ~) ?' [+ j1 X
(cost-based optimizer)两种优化模式。
( |9 `* `$ W/ E% b: A$ K●分布式Worker池:作为执行系统,它会根据查询服务器生成的执行计划运行* G/ b! H1 {' ?
MapReduce任务。为了降低查询延时,Tenzing不是每次都重新生成新进程,而是让进% V# a A/ _" l) ~
程一直处于运行状态。Worker池包含master和worker两种节点,其中,master对应4 D+ R( z K& L
MapReduce框架中的master进程,worker对应MapReduce框架中的map和reduce进程。8 D) x! A) J& L3 I
另外,还有一个称为master监听者(master watcher)的守护进程,查询服务器通过
% d( N# o0 }2 `! a: ?' ~" fmaster监听者获取master信息。) J3 u$ g g4 n1 t% V. F
●元数据服务器(Metadata Server):存储和获取表格schema、访问控制列表
6 c- x0 ~+ ^% ^1 f" w(Access Control List,ACL)等全局元数据。元数据服务器使用Bigtable作为持久化的9 f. `" B+ z J |; Q8 ~0 [
后台存储。! V& Z' O4 N9 t j5 L( P( q
●客户端接口:Tenzing提供三类客户端接口,包括API、命令行客户端(CLI)以
1 j6 O7 s; V# z8 e/ o% V及Web UI。2 q" n9 t( p& o" f
●存储(Storage):分布式worker池中的master和worker进程执行MapReduce任务# ^/ z3 I* m8 T% k- r5 }( h6 ~
时需要读写存储服务。另外,查询服务器会从存储服务获取执行结果。+ b) `% W( t" g, S& \
2.查询流程! r `5 a$ m+ Q4 Q" c# V( k0 ~ i
1)用户通过Web UI、CLI或者API向查询服务器提交查询。) O. w. H K2 ~7 \
2)查询服务器将查询请求解析为一个中间语法树。5 k6 j7 S+ A0 Z! w2 U! |6 c2 `
3)查询服务器从元数据服务器获取相应的元数据,然后创建一个更加完整的中, c6 `! ~& {2 r& B
间格式。
5 V n; j, h4 g' s) F* T4)优化器扫描该中间格式进行各种优化,生成物理查询计划。: n# ~' ], w' x, r5 y7 _
5)优化后的物理查询计划由一个或多个MapReduce作业组成。对于每个
% G' ?9 i, k; z( N* e! q0 t1 \MapReduce作业,查询服务器通过master监听者找到一个可用的master,master将该作业0 l2 c* Q2 Q+ i$ ~/ \3 t d
划分为多个任务。
4 B% h! o6 N2 ]3 p Y7 c) [4 ?' j& K6)空闲的worker从master拉取已就绪的任务。Reduce进程会将它们的结果写入! S8 N2 [4 e2 z) T
到一个中间存储区域中。
# P* v( D. Z6 a& L! p4 p. K+ [ p7)查询服务器监控这些中间存储区域,收集中间结果,并流失地返回给客户
, h3 Q" v0 q8 G& T端。6 }7 n4 e; W( Y# y
3.SQL运算符映射到MapReduce
# s8 e8 ?3 X& B5 O. M- u查询服务器负责将用户的SQL操作转化为MapReduce作业,本节介绍各个SQL物0 m- `* h I5 n" Q1 K
理运算符对应的MapReduce实现。5 p& y' O- ~! A
(1)选择和投影
( |. B# P+ O9 r+ M8 I6 i0 K选择运算符σ C (R)的一种MapReduce实现如下。6 E# i7 ]; T9 |
Map函数:对R中的每个元素t,检测它是否满足条件C。如果满足,则产生一
1 ^& ~( v: m: ~个“键-值”对(t,t)。也就是说,键和值都是t。6 p+ m8 @, b( o0 H* f) ]
Reduce函数:Reduce的作用类似于恒等式,它仅仅将每个“键-值”对传递到输出
0 v' b/ }; Z3 S0 k2 _部分。
% k; r2 S- ]$ ]* U; O3 {投影运算的处理和选择运算类似,不同的是,投影运算可能会产生多个相同的9 k. Y. s( m, o2 Q
元组,因此Reduce函数必须要剔除冗余元组。可以采用如下方式实现投影运算符7 b! t! Q. r: X! g
π S (R)。) u) Y3 `; h. T4 s5 q% R+ g
Map函数:对R中的每个元组t,通过剔除属性不在S中的字段得到元组t',输出一0 E5 o2 ?/ r( {
个“键-值”对(t',t')。* M3 J2 e( r% l& F
Reduce函数:对任意Map任务产生的每个键t',将存在一个或多个“键-值”对- v- q$ f1 m% T7 Y! `# A
(t',t'),Reduce函数将(t',[t',t',…,t'])转换为(t',t'),以保证对该键t'只产
* i( @5 Q0 o( ?( k! T) A& C* R, q生一个(t',t')对。 c1 E2 ]/ V9 P0 o7 F* R
Tenzing执行时会做一些优化,例如选择运算符下移到存储层;如果存储层支持2 @" G3 l2 _2 a8 e( b
列式存储,Tenzing只扫描那些查询执行必须的列。
# ]2 U; V; H; f R! p7 W(2)分组和聚合4 H# M9 j' J4 {* b( L
假定对关系R(A,B,C)按照字段A分组,并计算每个分组中所有元组的字段B
( _' u4 U5 K, e( n之和。可以采用如下方式实现γ A,SUM(B) (R)。
1 ^5 U0 r+ ^+ o DMap函数:对于每个元组,生成“键-值”对(a,b)。
' k7 }" v7 f; E7 FReduce函数:每个键a代表一个分组,对与键a相关的字段B的值的列表[b 1 ,b 2 ,
" e9 B$ f5 r: U% A: M4 b…,b n ]执行SUM操作,输出结果为(a,SUM(b 1 ,b 2 ,…,b n ))。' m4 |! ]$ L$ s7 F/ g% f8 ~) n
Tenzing支持基于哈希的聚合操作,首先,放松底层MapReduce框架的限制,. s0 y6 l; Z( C
shuffle时保证所有键相同的“键-值”对属于同一个Reduce任务,但是并不要求按照键6 Y7 j7 Y- N% e
有序排列。其次,Reduce函数采用基于哈希的方法对数据分组并计算聚合结果。
+ U7 E, X2 B$ E- V7 K(3)多表连接
1 \- a! i. {1 k" Z6 A+ v/ w3 u大表连接是分布式数据库的难题,MapReduce模型能够有效地解决这一类问题。
e! l# e, O7 v0 c常见的连接算法包括Sort Merge Join、Hash Join以及Nested Loop Join。4 d! @" @3 T/ z
假设需要将R(A,B)和S(B,C)进行自然连接运算,即寻找字段B相同的元( X/ y$ a3 }6 y/ d0 E0 _+ B0 h
组。可以通过Sort Merge Join实现如下:
& {7 }6 H5 G3 z* ZMap函数:对于R中的每个元组(a,b),生成“键-值”对(b,(R,a)),对S中的$ z" d9 W/ O. g( O
每个元组(b,c),生成“键-值”对(b,(S,c))。
# \- c) V4 S& r/ J" A0 z/ w9 j0 bReduce函数:每个键值b会与一系列对相关联,这些对要么来自(R,a),要么来* C0 n: G8 @; L& q- n3 e# r
自(S,c)。键b对应的输出结果是(b,[(a 1 ,b,c 1 ),(a 2 ,b,c 2 ),…]),也就是说,与b
0 h/ W) p4 x+ w) f4 o相关联的元组列表由来自R和S中的具有共同b值的元组组合而成。
" R2 b# J9 a9 Y5 u8 V8 C7 w* X6 w如果两张表格都很大,且二者的大小比较接近,Join字段也没有索引,Sort
4 l! }0 A5 j m, KMerge Join往往比较高效。然而,如果一张表格相比另外一张表格要大很多,Hash8 k) ~4 E9 o% N& @7 @$ v8 }8 P
Join往往更加合适。
F. |6 s& |* |4 }' D' {假设R(A,B)比S(B,C)大很多,可以通过Hash Join实现自然连接。Tenzing中
4 S: V, l& G/ C. Z) e0 G$ P$ Z一次Hash Join需要执行三个MapReduce任务。
" H/ S3 H- z: B' Q7 UMR1:将R(A,B)按照字段B划分为N个哈希分区,记为R 1 ,R 2 ,…,R N ;
8 I9 a4 i2 b* H' r* S; ^" H9 x1 UMR2:将S(B,C)按照字段B划分为N个哈希分区,记为S 1 ,S 2 ,…,S n ;. m# R; C, z6 z
MR3:每个哈希分区<R i ,S i >对应一个Map任务,这个Map任务会将S i 加载到内9 U7 @4 g, A) J$ }+ t k6 I: I7 F+ ?
存中。对于R i 中的每个元组(a,b),生成(b,[(a,b,c 1 ),(a,b,c 2 ),…]),其中,. q* c5 v2 Z* ^) M. ~- j9 U
(b,[c 1 ,c 2 ,…])是S i 中存储的元组。Reduce的作用类似于恒等式,输出每个传入
1 ]% h6 z9 ~- f( D: U: Q的“键-值”对。
0 N$ K, {! R: K# ?Sort Merge Join和Hash Join适用于两张表格都不能够存放到内存中,且连接列没
- J( j3 a. ?! `' Y" L: ]+ z有索引的场景。如果S(B,C)在B列有索引,可以通过Remote Lookup Join实现自然* R0 i3 R2 k# U+ @
连接,如下:- A7 [9 ?% F8 s: r$ C- `4 U
Map函数:对于R中的每个元组(a,b),通过索引查询S(B,C)中所有列值为b
5 U! u; @. x0 z( c的元组,生成(b,[(a,b,c 1 ),(a,b,c 2 ),…])。/ n( W' F- k. ?' a4 t; U8 D
Reduce函数:Reduce的作用类似于恒等式,输出每个传入的“键-值”对。) j9 F- |! W# s& I
如果S(B,C)能够存放到内存中,那么,Map进程在执行map任务的过程中会将% O3 ?# q2 J6 o9 D
S(B,C)的所有元组缓存在本地,进一步优化执行效率。另外,同一个Map进程可- ~+ b# M% I. O4 P* a) i& f+ e
能执行多个map任务,这些map任务共享一份S(B,C)的所有元组缓存。
" u% T8 ^5 D( o7 s( d: i+ ^) _. e13.3.2 Microsoft Dryad
I7 e. I$ e' h* L4 Z6 l3 IMicrosoft Dryad是微软研究院创建的研究项目,主要用来提供一个分布式并行计; `4 i% c2 f. \# `" t5 W& q8 J. N
算平台。在Dryad平台上,每个Dryad工作流被表示为一个有向无环图。图中的每个0 v. m( _. T t
节点表示一个要执行的程序,节点之间的边表示数据通道中数据的传输方式,其可
9 c u9 p& U7 \能是文件、管道、共享内存、网络RPC等。Dryard工作流如图13-3所示。+ H8 R( B. f6 b) U& {
图 13-3 Dryad工作流
: a4 f) \9 f. o# ?每个节点(vertices)上都有一个处理程序在运行,并且通过数据通道
+ q+ O6 q! O8 r' w2 A(channels)的方式在它们之间传输数据。类似于Map和Reduce函数,工作流中的% h4 t4 V, e, R( t+ K9 L; l
grep、sed、map、reduce、merge等函数可以被很多节点执行,每个节点会被分配一部
8 ~/ T/ m1 Z8 Q, e& @0 k2 M0 O* u分输入。Dryad的主控进程(Job Manager)负责将整个工作分割成多个任务,并分发) i* j2 D8 S; D
给多个节点执行。每个节点执行完任务后通知主控进程,接着,主控进程会通知后
" ?6 L$ ?8 E+ R7 I2 p续节点获取前一个节点的输出结果。等到后续节点的输入数据全部准备好后,才可% a% X7 m& X, R2 R( g
以继续执行后续任务。" M( ]) }; q+ S( T; A+ j* o. R% w+ G
Dryad与MapReduce具有的共同特性就是,只有任务完成之后才会将输出传递给
( M# [4 T4 X- \" W8 K- O: J接收任务。如果某个任务失败,其结果将不会传递给它在工作流中的任何后续任: T7 z+ r) K7 A
务。因此,主控进程可以在其他计算节点上重启该任务,同时不用担心会将结果重
. d# U0 K( c5 v) ]7 b0 A7 D复传递给以前传过的任务。
$ z+ r4 j5 w* V3 N+ v$ z相比多个MapReduce作业串联模型,Dryad模型的优势在于不需要将每个
7 [$ k' w1 X8 r' p& K2 Q5 j: iMapReduce作业输出的临时结果存放在分布式文件系统中。如果先存储前一个7 i3 O% V. `0 L/ u$ i& {
MapReduce作业的结果,然后再启动新的MapReduce作业,那么,这种开销很难避* I& }* i5 Y( x- O, a% ?
免。
8 M! h) b8 f' ^7 @* ?( c0 g13.3.3 Google Pregel
- c1 o2 x8 ]. M' D! P6 vGoogle Pregel用于图模型迭代计算,图中的每个节点对应一个任务,每个图节点
* A1 r" M, J: I会产生输出消息给图中与它关联的后续节点,而每个节点会对从其他节点传入的输/ ]- F: P" `( C2 B$ ^
入消息进行处理。* J, B3 u4 m0 ^- T, n( k' t
Pregel中将计算组织成“超步”(superstep)。在每个超步中,每个节点在上一步 W; Y) S; u" {7 u4 @0 Q) \; X
收到的所有消息将被处理,并且将处理完后的结果传递给后续节点。
& L/ Q2 p" c& sPregel采用了BSP(Bulk Sychronous Parallel,整体同步并行计算)模型。每个“超
+ ]$ Y3 I" A t" H) i: X步”分为三个步骤:每个节点首先执行本地计算,接着将本地计算的结果发送给图中2 m/ P& H/ F$ Z- H3 ^8 L
相邻的节点,最后执行一次栅栏同步,等待所有节点的前两步操作结束。Pregel模型
9 x% T3 ]6 s7 d7 N会在每个超步做一次迭代运算,当某次迭代生成的结果没有比上一次更好,说明结0 z4 b- V6 V- k
果已经收敛,可以终止迭代。
6 U; }# k! Y" M, Z) u图 13-4 Pregel BSP计算模型
; S% u/ `. k- G. L- }# M假设有一个带边权重的图,我们的目标是对图中的每个节点计算到其他任一节
% b; P( a/ g8 \3 D# D. I' }点的最短路径长度。一开始,每个图节点a都保存了诸如(b,w)对的集合,这表示a
& x9 I7 m% p1 u7 y8 t. H到b的边权重为w。, p- @* l6 V, }5 L4 U
(1)超步& m; Y* Z2 Q0 g2 P; h2 m( h. u9 B
每个节点会将(a,b,w)传递给图中与它关联的后续节点。当节点c收到三元组8 e+ O* m6 ^! y. O0 @7 J+ f2 a/ d+ t
(a,b,w)时,它会重新计算c到b的最短距离,如果w+v<u(假设当前已知的c到a的
7 a R8 T2 `6 h最短距离为v,c到b的最短距离为u),那么,更新c到b的最短距离为w+v。最后,消
3 v" ]- A; ~1 F8 X- ^2 w7 x息(c,b,w+v)会传递给后续节点。2 L- ~: M+ V/ h( B# L, ~, T4 j
(2)终止条件 n k5 q4 a9 [* { {
当所有节点在执行某个超步时都没有更新到其他节点的最短距离时,说明已经
5 a7 {: J, V1 m. \; H0 G# a( G2 p计算出想要的结果,整个迭代过程可以结束。
3 X( R a6 h6 sPregel通过检查点(checkpoint)的方式进行容错处理。它在每执行完一个超步之
) y+ @5 e( u$ v8 b后会记录整个计算的现场,即记录检查点情况。检查点中记录了这一轮迭代中每个2 L- i, F7 @5 V t- @$ J' e
任务的全部状态信息,一旦后续某个计算节点失效,Pregel将从最近的检查点重启整7 Z4 H+ d Y8 d' q
个超步。尽管上述的容错策略会重做很多并未失效的任务,但是实现简单。考虑到
$ a( f2 ~$ L. g# g1 H0 E服务器故障的概率不高,这种方法在大多数时候还是令人满意的。
l. P% u5 L3 ^: q, I
( B" _4 i1 J# e |
|