java自学网VIP

Java自学网

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 2512|回复: 0

《大规模分布式存储系统》第13章 大数据【13.3】

[复制链接]
  • TA的每日心情
    开心
    2021-5-25 00:00
  • 签到天数: 1917 天

    [LV.Master]出神入化

    2096

    主题

    3754

    帖子

    6万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    66788

    宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2017-3-20 19:37:40 | 显示全部楼层 |阅读模式
    13.3 MapReduce扩展
    ) Q' O3 }% M* [$ E, A9 b& @) WMapReduce框架有效地解决了海量数据的离线批处理问题,在各大互联网公司得
    1 I% A3 Q$ ^) U2 Z到广泛的应用。事实已经证明了MapReduce巨大的影响力,以至于引发了一系列的扩: Y- u# W6 a- d6 u* F; R, c3 d
    展和改进。这些扩展包括:
    : a. ?5 l. f# }2 S, g●Google Tenzing:基于MapReduce模型构建SQL执行引擎,使得数据分析人员可# s% }. l6 _8 v7 x0 v& V6 {0 P
    以直接通过SQL语言处理大数据。
    . A6 V' O/ c, P2 u, r6 F7 F●Microsoft Dryad:将MapReduce模型从一个简单的两步工作流扩展为任何函数
    . @4 t& f7 p, Q; P; q集的组合,并通过一个有向无环图来表示函数之间的工作流。
    : A& Z  v& i8 w( `; r; V●Google Pregel:用于图模型迭代计算,这种场景下Pregel的性能远远好于3 A) Q& p/ I( O5 h: ^5 Z
    MapReduce。6 h5 X4 H( c- o
    13.3.1 Google Tenzing
    7 b% I0 g6 y: r3 x3 z. B- VGoogle Tenzing是一个构建在MapReduce之上的SQL执行引擎,支持SQL查询且能' N8 ^( H! k2 _6 k. a
    够扩展到成千上万台机器,极大地方便了数据分析人员。% ^4 M5 u& D* q/ G8 }( Z$ o
    1.整体架构& q4 R! V, z& a( h& K- R
    Tenzing系统有四个主要组件:分布式Worker池、查询服务器、客户端接口和元
    3 Y8 D; h4 D' c9 q4 J数据服务器,如图13-2所示。
    ! J. H* d5 V8 G* ]' M5 ]  p, U图 13-2 Tenzing整体架构7 {; b3 ?- B: i
    ●查询服务器(Query Server):作为连接客户端和worker池的中间桥梁而存在。
    " J+ g$ K/ A( I: {& p  O查询服务器会解析客户端发送的查询请求,进行SQL优化,然后将执行计划发送给分; K3 ?+ O" k" P3 ?- u( g$ w, w& G
    布式Worker池执行。Tenzing支持基于规则(rule-based optimizer)以及基于开销
    . [/ G3 S3 I! N. ]0 v6 p' ~# i! _(cost-based optimizer)两种优化模式。
    ) K- F. C$ ~$ P% B2 K●分布式Worker池:作为执行系统,它会根据查询服务器生成的执行计划运行( Q0 N  n8 E  o7 X# Q) X
    MapReduce任务。为了降低查询延时,Tenzing不是每次都重新生成新进程,而是让进% G$ n1 z9 {  O2 I# _' n
    程一直处于运行状态。Worker池包含master和worker两种节点,其中,master对应' A- g/ R) U5 d, ]  T2 E! Y4 @$ S
    MapReduce框架中的master进程,worker对应MapReduce框架中的map和reduce进程。
    4 [( M, r* C6 f( v0 P: J另外,还有一个称为master监听者(master watcher)的守护进程,查询服务器通过
    2 z1 B4 Y1 V2 b9 emaster监听者获取master信息。
    4 C8 L& S2 K- a●元数据服务器(Metadata Server):存储和获取表格schema、访问控制列表6 _  B9 x- F7 R, g
    (Access Control List,ACL)等全局元数据。元数据服务器使用Bigtable作为持久化的5 N1 j9 x4 v) L* j, j* A
    后台存储。6 U* B8 [) S1 `. M0 G7 J
    ●客户端接口:Tenzing提供三类客户端接口,包括API、命令行客户端(CLI)以
    $ q4 I% i# W, s6 o- H$ o. T及Web UI。* c8 a* I8 O% R3 n0 y$ e/ e
    ●存储(Storage):分布式worker池中的master和worker进程执行MapReduce任务" X2 Q+ P# H" t2 H4 [8 d; ~
    时需要读写存储服务。另外,查询服务器会从存储服务获取执行结果。4 A* ?, b5 O! T' L. O
    2.查询流程
    ' \3 S& t/ W( i5 F# @1)用户通过Web UI、CLI或者API向查询服务器提交查询。7 Y# W  Z1 x, q* E! D
    2)查询服务器将查询请求解析为一个中间语法树。
    7 y( V  S9 q5 B3 M. }8 l3 O3)查询服务器从元数据服务器获取相应的元数据,然后创建一个更加完整的中
    " F$ x2 x/ o4 e间格式。
    , N1 y$ b5 P2 d8 a- c4 C+ M2 Q2 u4)优化器扫描该中间格式进行各种优化,生成物理查询计划。
    # P8 l/ T" @- U1 Y4 c  R4 L5)优化后的物理查询计划由一个或多个MapReduce作业组成。对于每个' b/ z& Z) M2 B+ @" W( H+ d
    MapReduce作业,查询服务器通过master监听者找到一个可用的master,master将该作业
    5 h# @. B7 a: b9 m- P; D  K+ R划分为多个任务。0 m# m# o3 I  G" B! U8 S
    6)空闲的worker从master拉取已就绪的任务。Reduce进程会将它们的结果写入# T# p7 r7 Q* u* @" p2 v
    到一个中间存储区域中。
    ! ?  E/ B' v* [( `7)查询服务器监控这些中间存储区域,收集中间结果,并流失地返回给客户' h6 d) n6 _9 K: C# h& B  B9 m
    端。
    , d! e' S6 c/ T- n  Z3 R3.SQL运算符映射到MapReduce) z8 l/ E4 E( l# r
    查询服务器负责将用户的SQL操作转化为MapReduce作业,本节介绍各个SQL物
    3 u) W. G6 h. |3 U理运算符对应的MapReduce实现。
    * ?  _" J0 }8 U4 x" u& I0 g, h- `(1)选择和投影4 D, S9 b8 p' f% B% V2 v0 ^
    选择运算符σ C (R)的一种MapReduce实现如下。
    ' r3 G6 L" o+ M! u3 vMap函数:对R中的每个元素t,检测它是否满足条件C。如果满足,则产生一! c' |! x; u* |1 G8 M
    个“键-值”对(t,t)。也就是说,键和值都是t。- S' p% e9 o' N% D) L- m. O( E
    Reduce函数:Reduce的作用类似于恒等式,它仅仅将每个“键-值”对传递到输出
    ( c4 b7 G8 C' {, s2 v  b部分。
    + l9 ~% X$ W$ b2 w投影运算的处理和选择运算类似,不同的是,投影运算可能会产生多个相同的5 m+ R( ]1 V" Y7 l
    元组,因此Reduce函数必须要剔除冗余元组。可以采用如下方式实现投影运算符
    - b& W/ ~  D7 D6 Q/ jπ S (R)。
    : N4 r: F+ y0 X  p9 KMap函数:对R中的每个元组t,通过剔除属性不在S中的字段得到元组t',输出一3 m  Z5 `8 t5 ?; ^' \
    个“键-值”对(t',t')。3 Q3 U0 D) A& H3 D( u$ P
    Reduce函数:对任意Map任务产生的每个键t',将存在一个或多个“键-值”对6 [( L2 ^% J" {1 O* j
    (t',t'),Reduce函数将(t',[t',t',…,t'])转换为(t',t'),以保证对该键t'只产* i9 \3 @. P* m6 q
    生一个(t',t')对。
      Y; E% B2 P5 Y1 Y7 E; V: w/ WTenzing执行时会做一些优化,例如选择运算符下移到存储层;如果存储层支持
    3 z1 l# |4 `$ v  p" l: N- E列式存储,Tenzing只扫描那些查询执行必须的列。
    & h4 j% W5 m9 W6 e' |(2)分组和聚合
    0 P8 V/ g" s( P3 L7 m- z  [; m假定对关系R(A,B,C)按照字段A分组,并计算每个分组中所有元组的字段B
    : u# q0 v5 \4 X/ r; h" o; |7 ~8 N之和。可以采用如下方式实现γ A,SUM(B) (R)。4 u% Y" \1 n# L4 W1 q9 m
    Map函数:对于每个元组,生成“键-值”对(a,b)。
    9 w  n0 k! }& aReduce函数:每个键a代表一个分组,对与键a相关的字段B的值的列表[b 1 ,b 2 ,6 p, S4 M3 q1 |$ T  y- z
    …,b n ]执行SUM操作,输出结果为(a,SUM(b 1 ,b 2 ,…,b n ))。- k7 r) |6 h5 P/ P
    Tenzing支持基于哈希的聚合操作,首先,放松底层MapReduce框架的限制,3 H" X6 `% Y/ U! X3 q
    shuffle时保证所有键相同的“键-值”对属于同一个Reduce任务,但是并不要求按照键
    * C1 I# Q" I: H3 `8 Y6 x有序排列。其次,Reduce函数采用基于哈希的方法对数据分组并计算聚合结果。; ?3 P0 x1 U5 m# \- w! W5 g9 `
    (3)多表连接- w8 {$ ~' A) W
    大表连接是分布式数据库的难题,MapReduce模型能够有效地解决这一类问题。4 N: @) `& x% M' |
    常见的连接算法包括Sort Merge Join、Hash Join以及Nested Loop Join。* \# n" Z* G% J  y
    假设需要将R(A,B)和S(B,C)进行自然连接运算,即寻找字段B相同的元" O$ D% A- e! L( ]9 _
    组。可以通过Sort Merge Join实现如下:
    & _( s# y# D" cMap函数:对于R中的每个元组(a,b),生成“键-值”对(b,(R,a)),对S中的
    " P4 o+ Q: m* [' l$ z, M每个元组(b,c),生成“键-值”对(b,(S,c))。* x7 e3 @# Y2 w/ T
    Reduce函数:每个键值b会与一系列对相关联,这些对要么来自(R,a),要么来! M+ \, s, F7 a0 r$ J
    自(S,c)。键b对应的输出结果是(b,[(a 1 ,b,c 1 ),(a 2 ,b,c 2 ),…]),也就是说,与b0 H0 d, x+ B9 Z' E: I
    相关联的元组列表由来自R和S中的具有共同b值的元组组合而成。3 ?& X3 W" [7 A$ K8 U
    如果两张表格都很大,且二者的大小比较接近,Join字段也没有索引,Sort0 F+ @3 ?% n# F- z* t& Y8 T
    Merge Join往往比较高效。然而,如果一张表格相比另外一张表格要大很多,Hash% _: Z$ M% \  Z& @( a
    Join往往更加合适。
    7 d  l2 c9 ~$ {6 f3 O. b假设R(A,B)比S(B,C)大很多,可以通过Hash Join实现自然连接。Tenzing中! l3 _7 M% h; \! J
    一次Hash Join需要执行三个MapReduce任务。
    / ~$ n. I3 W. }( j% T+ `MR1:将R(A,B)按照字段B划分为N个哈希分区,记为R 1 ,R 2 ,…,R N ;
    ( S: i+ G: f. E" k+ W  z9 fMR2:将S(B,C)按照字段B划分为N个哈希分区,记为S 1 ,S 2 ,…,S n ;
    2 x+ i7 Y6 [. @7 o0 uMR3:每个哈希分区<R i ,S i >对应一个Map任务,这个Map任务会将S i 加载到内
    / j6 m6 E& A+ b7 J( h+ ~; m存中。对于R i 中的每个元组(a,b),生成(b,[(a,b,c 1 ),(a,b,c 2 ),…]),其中,
    3 Z4 p/ o% B: z: W7 M! [6 I(b,[c 1 ,c 2 ,…])是S i 中存储的元组。Reduce的作用类似于恒等式,输出每个传入
    ' E! e% m1 n4 Q0 P" F/ X的“键-值”对。# M2 ~2 U2 [+ D7 B' q
    Sort Merge Join和Hash Join适用于两张表格都不能够存放到内存中,且连接列没/ M, Z, t. v4 R
    有索引的场景。如果S(B,C)在B列有索引,可以通过Remote Lookup Join实现自然
    " g1 a& V" s8 V' A! }连接,如下:& G( G0 M$ }0 [  p
    Map函数:对于R中的每个元组(a,b),通过索引查询S(B,C)中所有列值为b
    ! Y; e. R1 E+ F的元组,生成(b,[(a,b,c 1 ),(a,b,c 2 ),…])。! L% |6 E" Q9 R, n6 A% G+ _
    Reduce函数:Reduce的作用类似于恒等式,输出每个传入的“键-值”对。! |4 }; v5 E0 J  z4 ~& B/ G
    如果S(B,C)能够存放到内存中,那么,Map进程在执行map任务的过程中会将
    " D# N  D5 {8 @+ j  @) J: DS(B,C)的所有元组缓存在本地,进一步优化执行效率。另外,同一个Map进程可0 s& `( H& x: h
    能执行多个map任务,这些map任务共享一份S(B,C)的所有元组缓存。! n  D1 d1 D6 L
    13.3.2 Microsoft Dryad7 ^1 n1 K& ?% s( ?- }! [3 V. F
    Microsoft Dryad是微软研究院创建的研究项目,主要用来提供一个分布式并行计# ~  X3 A* e: x
    算平台。在Dryad平台上,每个Dryad工作流被表示为一个有向无环图。图中的每个
      e: Q. @; E" e: w% b, X; g0 [) E/ z% i节点表示一个要执行的程序,节点之间的边表示数据通道中数据的传输方式,其可  K% V( P0 t/ W* J0 S
    能是文件、管道、共享内存、网络RPC等。Dryard工作流如图13-3所示。* E2 f3 f( u1 i' \' r  B5 s- p
    图 13-3 Dryad工作流
    8 r# m, I; g' \  G( y) u每个节点(vertices)上都有一个处理程序在运行,并且通过数据通道
    / l& S7 ]0 O+ f! }- n(channels)的方式在它们之间传输数据。类似于Map和Reduce函数,工作流中的8 B8 B& D/ t" T. c" X0 ?! F6 s/ c
    grep、sed、map、reduce、merge等函数可以被很多节点执行,每个节点会被分配一部  L- J; u- o6 K# V; l; o" o0 d
    分输入。Dryad的主控进程(Job Manager)负责将整个工作分割成多个任务,并分发, V! i7 b5 k. Z" r$ O1 a
    给多个节点执行。每个节点执行完任务后通知主控进程,接着,主控进程会通知后. z5 f: X# v1 [2 _0 q
    续节点获取前一个节点的输出结果。等到后续节点的输入数据全部准备好后,才可. N( ~+ C& r% _9 w5 C1 a0 d7 q) E
    以继续执行后续任务。( E  D0 s/ `+ R* J) `# w
    Dryad与MapReduce具有的共同特性就是,只有任务完成之后才会将输出传递给
    3 U; g2 [$ O7 g" s( ]7 j接收任务。如果某个任务失败,其结果将不会传递给它在工作流中的任何后续任4 x# ]2 r; F) X4 ?, G: N
    务。因此,主控进程可以在其他计算节点上重启该任务,同时不用担心会将结果重
    1 }1 |" y- U" P( Q: X8 e9 w7 B, I复传递给以前传过的任务。' Q* \# v* z: J4 O+ @) B! Q
    相比多个MapReduce作业串联模型,Dryad模型的优势在于不需要将每个
    9 f  w% m2 O; Z% k0 M& `MapReduce作业输出的临时结果存放在分布式文件系统中。如果先存储前一个
    / n- |* U* m& A& s5 b& H4 XMapReduce作业的结果,然后再启动新的MapReduce作业,那么,这种开销很难避6 @# f' a/ V. j1 N, Q. b( j
    免。1 F0 Q2 H' F% M1 [  C' f
    13.3.3 Google Pregel
    2 L, s  ?8 F( b2 m( vGoogle Pregel用于图模型迭代计算,图中的每个节点对应一个任务,每个图节点
    : g" Y: H% {8 ?/ A1 Q会产生输出消息给图中与它关联的后续节点,而每个节点会对从其他节点传入的输- A' m( B- z) Q5 x- H
    入消息进行处理。
    1 J" A& K/ B. U. {" [8 y' S4 [Pregel中将计算组织成“超步”(superstep)。在每个超步中,每个节点在上一步; a8 d+ k3 ?) j0 C7 d
    收到的所有消息将被处理,并且将处理完后的结果传递给后续节点。
    % ?: t2 {% W5 [4 w* g. qPregel采用了BSP(Bulk Sychronous Parallel,整体同步并行计算)模型。每个“超
    . ]& Q9 u5 f* I" H% u步”分为三个步骤:每个节点首先执行本地计算,接着将本地计算的结果发送给图中! E. b7 P3 g: x( N" X
    相邻的节点,最后执行一次栅栏同步,等待所有节点的前两步操作结束。Pregel模型
    - N3 K! }% L; t. l! A0 h8 B会在每个超步做一次迭代运算,当某次迭代生成的结果没有比上一次更好,说明结
    8 @& N3 E0 k  r! D) z: J8 b6 G# t+ q. k- J果已经收敛,可以终止迭代。3 U; }7 [0 f/ N6 Q
    图 13-4 Pregel BSP计算模型
      p1 J7 V6 i. G1 V- ^5 q5 }& ^假设有一个带边权重的图,我们的目标是对图中的每个节点计算到其他任一节
    8 i" t+ C# C+ K' i9 J- f9 Y% `点的最短路径长度。一开始,每个图节点a都保存了诸如(b,w)对的集合,这表示a5 D8 b; w( u/ }' W
    到b的边权重为w。2 @! a1 ~/ V3 i# N7 K
    (1)超步
    ! `- k6 B( p5 R2 ]! Q每个节点会将(a,b,w)传递给图中与它关联的后续节点。当节点c收到三元组% e9 ]+ j5 d' b( J* s; {4 J/ a8 F! |
    (a,b,w)时,它会重新计算c到b的最短距离,如果w+v<u(假设当前已知的c到a的
    . S9 {' E/ v$ {- F) L最短距离为v,c到b的最短距离为u),那么,更新c到b的最短距离为w+v。最后,消" o$ O+ j. U: _7 U+ a2 t5 s  ]
    息(c,b,w+v)会传递给后续节点。! B) ^& r9 }0 B) P/ e
    (2)终止条件
    7 {( l6 R! _. q当所有节点在执行某个超步时都没有更新到其他节点的最短距离时,说明已经/ q9 L3 P% H- N# q4 `+ r) ?
    计算出想要的结果,整个迭代过程可以结束。
    2 l; x' D+ h8 U* R0 kPregel通过检查点(checkpoint)的方式进行容错处理。它在每执行完一个超步之7 `' \8 W' I. A! ]5 L+ L1 s! M' \0 o
    后会记录整个计算的现场,即记录检查点情况。检查点中记录了这一轮迭代中每个
    4 A6 \6 b: ]) j任务的全部状态信息,一旦后续某个计算节点失效,Pregel将从最近的检查点重启整/ U3 R0 A0 e6 s% _
    个超步。尽管上述的容错策略会重做很多并未失效的任务,但是实现简单。考虑到3 o$ n1 g- i  ^9 R% H) [0 t; u
    服务器故障的概率不高,这种方法在大多数时候还是令人满意的。
    1 U% C: j, \( {( F+ t; n) f/ q! g( K4 K
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|Archiver|手机版|小黑屋|Java自学网

    GMT+8, 2025-4-1 14:35 , Processed in 0.284079 second(s), 32 queries .

    Powered by Javazx

    Copyright © 2012-2022, Javazx Cloud.

    快速回复 返回顶部 返回列表