|
13.3 MapReduce扩展
0 B' k& P6 U3 A: q9 t3 \1 X( ?MapReduce框架有效地解决了海量数据的离线批处理问题,在各大互联网公司得
0 B' P% ]3 b6 l3 G到广泛的应用。事实已经证明了MapReduce巨大的影响力,以至于引发了一系列的扩: a- O& U8 }0 r* [
展和改进。这些扩展包括:
) I! b, A0 |0 m- p% {' n●Google Tenzing:基于MapReduce模型构建SQL执行引擎,使得数据分析人员可
0 {6 K9 k' g+ m" a. [# c d; c以直接通过SQL语言处理大数据。+ O C% w2 ^* P( Y! ?8 u
●Microsoft Dryad:将MapReduce模型从一个简单的两步工作流扩展为任何函数
" H2 J) \8 f0 B/ ^( V; ^集的组合,并通过一个有向无环图来表示函数之间的工作流。, c2 U* i8 ?& i3 }( R: O
●Google Pregel:用于图模型迭代计算,这种场景下Pregel的性能远远好于8 i& u; A' f3 n: _; t
MapReduce。
3 c( L+ ?7 r! M- U5 k4 t13.3.1 Google Tenzing! U O1 } X; ?0 J
Google Tenzing是一个构建在MapReduce之上的SQL执行引擎,支持SQL查询且能 V1 m6 _& I' l
够扩展到成千上万台机器,极大地方便了数据分析人员。6 Z {- ], U4 L- g6 ]' Z
1.整体架构
- S; R$ w# k( t) m9 eTenzing系统有四个主要组件:分布式Worker池、查询服务器、客户端接口和元: }2 h$ N6 Y$ a
数据服务器,如图13-2所示。/ F- I& H$ F% H6 i& `* f Q
图 13-2 Tenzing整体架构
( q% f# C2 y8 z4 f8 f1 z' g# ^5 L●查询服务器(Query Server):作为连接客户端和worker池的中间桥梁而存在。8 H) g- m3 ^' @; v6 E# p b
查询服务器会解析客户端发送的查询请求,进行SQL优化,然后将执行计划发送给分
: m( p- p8 d3 _* N/ t) ^+ Z布式Worker池执行。Tenzing支持基于规则(rule-based optimizer)以及基于开销
7 f6 J' A/ `( ~9 B. Q(cost-based optimizer)两种优化模式。
+ O* d# f) u6 N* p* P4 Y●分布式Worker池:作为执行系统,它会根据查询服务器生成的执行计划运行
6 C1 o! [2 [3 n5 H: ^2 s1 dMapReduce任务。为了降低查询延时,Tenzing不是每次都重新生成新进程,而是让进9 T7 n# O3 V( o$ J
程一直处于运行状态。Worker池包含master和worker两种节点,其中,master对应$ f4 F; V; B0 e) H$ W
MapReduce框架中的master进程,worker对应MapReduce框架中的map和reduce进程。: u9 l% ?2 Q5 o A# U
另外,还有一个称为master监听者(master watcher)的守护进程,查询服务器通过
$ E& F5 E) G. D/ e; v9 Kmaster监听者获取master信息。
& A( Y: `/ f) K& v J0 P2 d●元数据服务器(Metadata Server):存储和获取表格schema、访问控制列表
; y H& K4 X' A$ H" }& B(Access Control List,ACL)等全局元数据。元数据服务器使用Bigtable作为持久化的 |, w* z2 n, d l8 |8 b7 M
后台存储。
. r) c9 N& S5 ~6 B: S●客户端接口:Tenzing提供三类客户端接口,包括API、命令行客户端(CLI)以
( V1 q% U! J( u7 F3 O5 { D$ g及Web UI。8 a* a: z/ F9 ]. _0 Q1 e
●存储(Storage):分布式worker池中的master和worker进程执行MapReduce任务
+ m/ y6 j/ T% w时需要读写存储服务。另外,查询服务器会从存储服务获取执行结果。
! A7 S# B9 x# [6 L4 L. A2.查询流程7 O- `7 i( S" q$ @2 g% b. r I# Y
1)用户通过Web UI、CLI或者API向查询服务器提交查询。: p& i3 [- T9 G: d8 u' E3 R, D# v6 Y' Y
2)查询服务器将查询请求解析为一个中间语法树。
X, J+ C2 c( F. r/ s3)查询服务器从元数据服务器获取相应的元数据,然后创建一个更加完整的中- M% b% n. q1 V2 [
间格式。' z3 g; B- M5 G! @( R6 u
4)优化器扫描该中间格式进行各种优化,生成物理查询计划。
$ W& ]* u5 |$ H+ O3 U5)优化后的物理查询计划由一个或多个MapReduce作业组成。对于每个
; z$ x" T2 z5 P8 U9 z& eMapReduce作业,查询服务器通过master监听者找到一个可用的master,master将该作业' e! N6 U: |4 M, a$ ?
划分为多个任务。; v1 d' V$ D+ f; R! s5 s
6)空闲的worker从master拉取已就绪的任务。Reduce进程会将它们的结果写入
' O" G; a+ k; R9 p到一个中间存储区域中。
7 ~( f9 K9 c. v- X8 C& a7)查询服务器监控这些中间存储区域,收集中间结果,并流失地返回给客户
4 x4 ]3 t; `; C( \" e, t" a% D端。
; D8 v5 h0 @3 b5 v2 ?3.SQL运算符映射到MapReduce; [ }" u6 X, G, E/ m
查询服务器负责将用户的SQL操作转化为MapReduce作业,本节介绍各个SQL物
6 N8 L3 K N! f理运算符对应的MapReduce实现。
6 x, x; {4 P3 x9 D$ Y(1)选择和投影+ z( J0 c' s! D" n. P \6 Q Y1 ]$ l8 R
选择运算符σ C (R)的一种MapReduce实现如下。8 G4 s j7 s, ^2 z
Map函数:对R中的每个元素t,检测它是否满足条件C。如果满足,则产生一/ x3 x/ p3 g+ n5 d5 `; M
个“键-值”对(t,t)。也就是说,键和值都是t。
q( Q, O0 l8 nReduce函数:Reduce的作用类似于恒等式,它仅仅将每个“键-值”对传递到输出
' ^( } k* K) ], K- U部分。" R! W+ \1 O; }* o* R( v
投影运算的处理和选择运算类似,不同的是,投影运算可能会产生多个相同的
( U. M. `% Y4 M0 |, I, w元组,因此Reduce函数必须要剔除冗余元组。可以采用如下方式实现投影运算符3 a- n; ]8 _" N6 F1 L0 x( {: A$ @" O
π S (R)。9 z: i# {6 S$ |: c$ T
Map函数:对R中的每个元组t,通过剔除属性不在S中的字段得到元组t',输出一
' Y% l$ e, Z3 M# E% W" O个“键-值”对(t',t')。9 a. u) {4 ~8 x1 v! D
Reduce函数:对任意Map任务产生的每个键t',将存在一个或多个“键-值”对
+ Z9 C5 L4 s# F, h) S, q/ e. n+ V(t',t'),Reduce函数将(t',[t',t',…,t'])转换为(t',t'),以保证对该键t'只产! C/ g0 S4 j8 c: H+ l, K
生一个(t',t')对。
: z7 H' S' [- D I+ LTenzing执行时会做一些优化,例如选择运算符下移到存储层;如果存储层支持& U. f6 Q8 B0 e( F& Q# y& Y1 O
列式存储,Tenzing只扫描那些查询执行必须的列。+ @4 F" z8 Z ~ r) H- A* b J2 U( q
(2)分组和聚合
4 {( B5 v2 C$ I$ a2 M( F假定对关系R(A,B,C)按照字段A分组,并计算每个分组中所有元组的字段B2 H% }& G3 U! Z. w
之和。可以采用如下方式实现γ A,SUM(B) (R)。; e; i. w" u$ E2 W8 ~. P# x" R
Map函数:对于每个元组,生成“键-值”对(a,b)。4 s5 L4 V7 t2 w% r/ }8 ~0 V! O
Reduce函数:每个键a代表一个分组,对与键a相关的字段B的值的列表[b 1 ,b 2 ,
/ }/ N7 ^% k1 V; f3 e# t5 v…,b n ]执行SUM操作,输出结果为(a,SUM(b 1 ,b 2 ,…,b n ))。. P# S! U) p2 B- m2 l/ U' L4 }0 m
Tenzing支持基于哈希的聚合操作,首先,放松底层MapReduce框架的限制,
! q/ x9 B: A: K0 X% w) gshuffle时保证所有键相同的“键-值”对属于同一个Reduce任务,但是并不要求按照键" _& N+ j) U5 {; J
有序排列。其次,Reduce函数采用基于哈希的方法对数据分组并计算聚合结果。. {1 b/ g8 d c; ]. `" B& o& D
(3)多表连接
) M4 J4 c* W" ^$ j9 i2 U& U2 i大表连接是分布式数据库的难题,MapReduce模型能够有效地解决这一类问题。2 b4 Q7 _9 Q9 z' Y4 N/ H
常见的连接算法包括Sort Merge Join、Hash Join以及Nested Loop Join。
# r B2 f% C' s' `6 ^假设需要将R(A,B)和S(B,C)进行自然连接运算,即寻找字段B相同的元
2 G' T7 o! e# P% C2 S5 X组。可以通过Sort Merge Join实现如下:' X6 Q$ z$ I$ C% _
Map函数:对于R中的每个元组(a,b),生成“键-值”对(b,(R,a)),对S中的
+ } [- o5 l% j* W每个元组(b,c),生成“键-值”对(b,(S,c))。0 l, x9 F, D& h$ U, J6 K' y3 Q
Reduce函数:每个键值b会与一系列对相关联,这些对要么来自(R,a),要么来
8 B# P& J, A" y: h自(S,c)。键b对应的输出结果是(b,[(a 1 ,b,c 1 ),(a 2 ,b,c 2 ),…]),也就是说,与b" |6 I. I( T' X! f5 R: Y
相关联的元组列表由来自R和S中的具有共同b值的元组组合而成。6 p- \: ^( k( A& ^
如果两张表格都很大,且二者的大小比较接近,Join字段也没有索引,Sort
- M3 i0 y6 ^9 { jMerge Join往往比较高效。然而,如果一张表格相比另外一张表格要大很多,Hash# q" E+ f# q8 S6 K4 ^8 X4 y3 b! T
Join往往更加合适。
% l! K, ?/ ]0 S f6 p( ], ~假设R(A,B)比S(B,C)大很多,可以通过Hash Join实现自然连接。Tenzing中
$ {: E# Y- w3 R4 q( g* f% O1 e- d一次Hash Join需要执行三个MapReduce任务。
, \. j* x. M$ }% NMR1:将R(A,B)按照字段B划分为N个哈希分区,记为R 1 ,R 2 ,…,R N ;
; e$ i& h$ L* X- A5 cMR2:将S(B,C)按照字段B划分为N个哈希分区,记为S 1 ,S 2 ,…,S n ;
@* t' ]- Y# A# ~- U" l8 n4 KMR3:每个哈希分区<R i ,S i >对应一个Map任务,这个Map任务会将S i 加载到内
* n7 g+ v- I8 _3 M存中。对于R i 中的每个元组(a,b),生成(b,[(a,b,c 1 ),(a,b,c 2 ),…]),其中,
5 S- V3 }& a# r. U(b,[c 1 ,c 2 ,…])是S i 中存储的元组。Reduce的作用类似于恒等式,输出每个传入
0 t: ]% X, ^$ T2 i) l2 k) y, Q的“键-值”对。5 e- O! ]* U* K: D1 ?
Sort Merge Join和Hash Join适用于两张表格都不能够存放到内存中,且连接列没
% t0 J+ }. \+ S& e有索引的场景。如果S(B,C)在B列有索引,可以通过Remote Lookup Join实现自然
% B1 U0 m* f$ V" t连接,如下:* k4 l" b2 }6 T2 P! ?+ R( C
Map函数:对于R中的每个元组(a,b),通过索引查询S(B,C)中所有列值为b
7 ^0 e/ {7 i/ c的元组,生成(b,[(a,b,c 1 ),(a,b,c 2 ),…])。
$ b3 Z* H4 B* g, K* P' J- ^3 wReduce函数:Reduce的作用类似于恒等式,输出每个传入的“键-值”对。
+ k8 `$ n, d: t3 H; ]1 M# `- T2 q如果S(B,C)能够存放到内存中,那么,Map进程在执行map任务的过程中会将5 H C% [5 }* e% B; E9 w
S(B,C)的所有元组缓存在本地,进一步优化执行效率。另外,同一个Map进程可+ p5 B) [4 X H1 `8 x
能执行多个map任务,这些map任务共享一份S(B,C)的所有元组缓存。
2 t4 ?; H+ t) U d! M. [# B13.3.2 Microsoft Dryad. Z- q$ T2 W7 I1 x
Microsoft Dryad是微软研究院创建的研究项目,主要用来提供一个分布式并行计% ^9 N7 ]6 }8 T$ t' @
算平台。在Dryad平台上,每个Dryad工作流被表示为一个有向无环图。图中的每个
6 ]3 G W! |4 t节点表示一个要执行的程序,节点之间的边表示数据通道中数据的传输方式,其可
2 `9 N9 s2 W& a4 f1 Q( N% @能是文件、管道、共享内存、网络RPC等。Dryard工作流如图13-3所示。- Z: D0 m* h( u/ P3 h
图 13-3 Dryad工作流$ _$ T7 {0 e7 p' J! _* |
每个节点(vertices)上都有一个处理程序在运行,并且通过数据通道
5 I- G- F& Q: t% S8 Z' L* [(channels)的方式在它们之间传输数据。类似于Map和Reduce函数,工作流中的
7 D4 [: I: M* }, o, ^+ r6 M3 Wgrep、sed、map、reduce、merge等函数可以被很多节点执行,每个节点会被分配一部3 i: N4 {) w. \( d5 I8 g$ v3 M
分输入。Dryad的主控进程(Job Manager)负责将整个工作分割成多个任务,并分发
+ P5 L$ W" Q" w- s' M给多个节点执行。每个节点执行完任务后通知主控进程,接着,主控进程会通知后
3 P; {( D$ K6 W续节点获取前一个节点的输出结果。等到后续节点的输入数据全部准备好后,才可% q9 i- T& o8 }$ j$ _+ G+ \/ ?/ B
以继续执行后续任务。5 h7 w( y1 n+ t+ c
Dryad与MapReduce具有的共同特性就是,只有任务完成之后才会将输出传递给% v: x% c/ ?3 {* i
接收任务。如果某个任务失败,其结果将不会传递给它在工作流中的任何后续任
q! h+ O: j0 [# |$ B" y% h务。因此,主控进程可以在其他计算节点上重启该任务,同时不用担心会将结果重
% a Q/ b2 S8 C6 q, q4 p复传递给以前传过的任务。
/ S: g4 q2 a4 G( k% v b: |相比多个MapReduce作业串联模型,Dryad模型的优势在于不需要将每个
" s5 u3 a, h! C8 B6 A' i% s0 k* s' RMapReduce作业输出的临时结果存放在分布式文件系统中。如果先存储前一个3 {. H2 @$ M& \5 p
MapReduce作业的结果,然后再启动新的MapReduce作业,那么,这种开销很难避# a6 W/ Z% Y% n1 }
免。
$ z7 f- n6 f Z4 a. C7 o13.3.3 Google Pregel
: C0 O! E& A7 j- v4 l7 [Google Pregel用于图模型迭代计算,图中的每个节点对应一个任务,每个图节点
' b& w. |, a; d会产生输出消息给图中与它关联的后续节点,而每个节点会对从其他节点传入的输3 m% g0 p9 L* o" c% C" L3 O% |
入消息进行处理。 `- J( y/ v+ w# `# `1 Y+ y+ V6 `
Pregel中将计算组织成“超步”(superstep)。在每个超步中,每个节点在上一步
" `6 {! f9 c+ L- W- x+ h8 D5 o" V收到的所有消息将被处理,并且将处理完后的结果传递给后续节点。" |' R2 M$ W2 @9 d i* j
Pregel采用了BSP(Bulk Sychronous Parallel,整体同步并行计算)模型。每个“超: C+ X( J- q' I# h2 q6 M9 W s) t0 _
步”分为三个步骤:每个节点首先执行本地计算,接着将本地计算的结果发送给图中- P3 m1 M- n, i3 F" p! F
相邻的节点,最后执行一次栅栏同步,等待所有节点的前两步操作结束。Pregel模型$ J7 V8 l& B7 e/ d" c" @) L- g% `- w) g
会在每个超步做一次迭代运算,当某次迭代生成的结果没有比上一次更好,说明结2 F# M) m" k; S' g/ k( ^
果已经收敛,可以终止迭代。7 d* d% _; O4 O, S5 K1 X; U
图 13-4 Pregel BSP计算模型# ^5 O9 H( ^3 E8 q; W
假设有一个带边权重的图,我们的目标是对图中的每个节点计算到其他任一节
1 @1 K8 w! m g2 c点的最短路径长度。一开始,每个图节点a都保存了诸如(b,w)对的集合,这表示a
) ~6 o7 K4 v- I# w$ L到b的边权重为w。
$ W, J4 w: k( \) l0 B(1)超步) f- |7 [( D7 T4 ?6 y0 y9 k6 W( R1 g6 k3 N$ h
每个节点会将(a,b,w)传递给图中与它关联的后续节点。当节点c收到三元组* D6 _% E% N. @
(a,b,w)时,它会重新计算c到b的最短距离,如果w+v<u(假设当前已知的c到a的
6 b1 T: ~/ r/ l5 M) A4 ?, d! B最短距离为v,c到b的最短距离为u),那么,更新c到b的最短距离为w+v。最后,消4 V7 H$ G6 J$ f- S
息(c,b,w+v)会传递给后续节点。- X; X/ a! v2 b3 o0 S4 [* D
(2)终止条件1 j$ k' h. F3 n7 O1 v
当所有节点在执行某个超步时都没有更新到其他节点的最短距离时,说明已经
' s, b; L9 x# ~" \计算出想要的结果,整个迭代过程可以结束。
/ L) k) t& S5 yPregel通过检查点(checkpoint)的方式进行容错处理。它在每执行完一个超步之! A8 w& f& k5 S) k
后会记录整个计算的现场,即记录检查点情况。检查点中记录了这一轮迭代中每个# n* L" i; ?8 l1 V) X
任务的全部状态信息,一旦后续某个计算节点失效,Pregel将从最近的检查点重启整: Y) C8 C z( ]1 [7 \) v
个超步。尽管上述的容错策略会重做很多并未失效的任务,但是实现简单。考虑到
* |3 w7 x! [; X% m6 g* W6 N. |; d# e服务器故障的概率不高,这种方法在大多数时候还是令人满意的。8 y' G5 s# Q1 a, W4 s/ } f' I( c3 U
" f# @3 c- f( c: B# r |
|