java自学网VIP

Java自学网

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 2380|回复: 0

《大规模分布式存储系统》第13章 大数据【13.3】

[复制链接]
  • TA的每日心情
    开心
    2021-5-25 00:00
  • 签到天数: 1917 天

    [LV.Master]出神入化

    2025

    主题

    3683

    帖子

    6万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    66345

    宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2017-3-20 19:37:40 | 显示全部楼层 |阅读模式
    13.3 MapReduce扩展, y6 T$ B: O3 C& y
    MapReduce框架有效地解决了海量数据的离线批处理问题,在各大互联网公司得: f* Q* \8 N9 z3 Z( h
    到广泛的应用。事实已经证明了MapReduce巨大的影响力,以至于引发了一系列的扩+ d/ B1 [8 g. {/ s: \8 L
    展和改进。这些扩展包括:" p; X" B% Q6 \" Q6 h  J
    ●Google Tenzing:基于MapReduce模型构建SQL执行引擎,使得数据分析人员可: j' k% u, |+ b8 W: N/ @# a6 L) b
    以直接通过SQL语言处理大数据。+ G* i$ Y, _# z6 Z) P9 q
    ●Microsoft Dryad:将MapReduce模型从一个简单的两步工作流扩展为任何函数% ?6 {& A+ f3 }6 A
    集的组合,并通过一个有向无环图来表示函数之间的工作流。
    2 w6 C$ [0 F, K4 n●Google Pregel:用于图模型迭代计算,这种场景下Pregel的性能远远好于
    # |# I! D3 k1 ZMapReduce。
    / V4 T( [1 F% l+ `+ `) Z$ r13.3.1 Google Tenzing4 \( N% ?" d; j; |* S/ q
    Google Tenzing是一个构建在MapReduce之上的SQL执行引擎,支持SQL查询且能
    $ l; \3 U  \7 {. g7 `够扩展到成千上万台机器,极大地方便了数据分析人员。
    . h$ U. p# f+ W& j' ?9 ?) g3 m1.整体架构
    : x9 h8 C3 H& l8 e$ E8 e8 {: J" BTenzing系统有四个主要组件:分布式Worker池、查询服务器、客户端接口和元
    - ?5 {: P0 K, a5 Q数据服务器,如图13-2所示。
    ) z* K! c7 D( t, H' v5 i* {图 13-2 Tenzing整体架构
    " m' H6 k2 d0 Q7 o; g8 g●查询服务器(Query Server):作为连接客户端和worker池的中间桥梁而存在。
    ; X  v- R, V/ H+ a, H; O. [" {查询服务器会解析客户端发送的查询请求,进行SQL优化,然后将执行计划发送给分3 r$ ~& _: g: z3 H# A( f" Y6 k
    布式Worker池执行。Tenzing支持基于规则(rule-based optimizer)以及基于开销
    & ?8 R8 \) d0 h3 i. \1 F( Y(cost-based optimizer)两种优化模式。
    ; {) ]" C0 |* t7 ~+ H5 j●分布式Worker池:作为执行系统,它会根据查询服务器生成的执行计划运行
    $ f$ W& T% g* t/ k; N5 O9 c# ]+ HMapReduce任务。为了降低查询延时,Tenzing不是每次都重新生成新进程,而是让进/ j3 H; I7 N5 @( T
    程一直处于运行状态。Worker池包含master和worker两种节点,其中,master对应$ H" s( l2 M. V; B6 r! j
    MapReduce框架中的master进程,worker对应MapReduce框架中的map和reduce进程。
    $ m! b" @" L0 F) D) C另外,还有一个称为master监听者(master watcher)的守护进程,查询服务器通过& V# z$ `$ R/ [9 B
    master监听者获取master信息。
    ( K  G" Q9 l& ^9 [1 v●元数据服务器(Metadata Server):存储和获取表格schema、访问控制列表
    6 [/ A  ^" w7 h- q+ Q8 |7 l1 C& k(Access Control List,ACL)等全局元数据。元数据服务器使用Bigtable作为持久化的- X+ c3 H7 C$ }9 L8 Z
    后台存储。; ^* a$ c7 k7 k( m! w5 F/ Z+ G
    ●客户端接口:Tenzing提供三类客户端接口,包括API、命令行客户端(CLI)以! @; k9 v; g0 k$ l
    及Web UI。+ q4 B, D  P7 M' ^8 e7 }
    ●存储(Storage):分布式worker池中的master和worker进程执行MapReduce任务
      s( [% X: D6 U% Q! x0 t+ |6 j& W7 Q时需要读写存储服务。另外,查询服务器会从存储服务获取执行结果。
    # S# K4 W* l& a: f: g' F2 ]: |7 d1 A+ ~: H2.查询流程
    7 G- R% `: Q: r2 q1)用户通过Web UI、CLI或者API向查询服务器提交查询。9 Y( @! D  u  W
    2)查询服务器将查询请求解析为一个中间语法树。0 k9 J9 t& i9 B; c/ d
    3)查询服务器从元数据服务器获取相应的元数据,然后创建一个更加完整的中1 s( G1 E% a1 {/ u- s6 ]' Y
    间格式。6 q& v- y" H) @0 r/ t: G
    4)优化器扫描该中间格式进行各种优化,生成物理查询计划。2 Q- |1 F. t0 c% W5 }& D
    5)优化后的物理查询计划由一个或多个MapReduce作业组成。对于每个
    - A: T  H" ]4 R3 n% f: dMapReduce作业,查询服务器通过master监听者找到一个可用的master,master将该作业
    $ }) o9 e/ \8 `" e" V. l2 b0 Z划分为多个任务。% q+ {7 H$ f' `2 k! _2 h2 M- y  o
    6)空闲的worker从master拉取已就绪的任务。Reduce进程会将它们的结果写入
    ; n0 S" l( ~, Q1 K9 T2 f# `1 U. r到一个中间存储区域中。  q" X. Z( f4 l; Q0 C" V: ~
    7)查询服务器监控这些中间存储区域,收集中间结果,并流失地返回给客户& W5 p2 G* X( e
    端。8 A3 X1 y4 W8 _# W- s- X# {" b4 l
    3.SQL运算符映射到MapReduce/ w" j5 X# b2 D
    查询服务器负责将用户的SQL操作转化为MapReduce作业,本节介绍各个SQL物6 n* v, {+ D6 h6 {6 b
    理运算符对应的MapReduce实现。
    ; i2 b4 K" n" l4 Z/ L. n(1)选择和投影
    4 \$ l0 c3 L% a7 c& Z- J4 _) r选择运算符σ C (R)的一种MapReduce实现如下。  Q" E0 c" ]2 Z4 v+ f  G7 B
    Map函数:对R中的每个元素t,检测它是否满足条件C。如果满足,则产生一
    , q! q- I& t, x. N# g5 Y0 ]个“键-值”对(t,t)。也就是说,键和值都是t。
    # ?5 N: T5 ^- e1 g- y( @Reduce函数:Reduce的作用类似于恒等式,它仅仅将每个“键-值”对传递到输出( ], I4 g, r. J0 W& Q7 ~) g
    部分。
    % Y8 e0 Z" z) S& O7 F4 J5 M投影运算的处理和选择运算类似,不同的是,投影运算可能会产生多个相同的
    * c* u3 Y; r, Z元组,因此Reduce函数必须要剔除冗余元组。可以采用如下方式实现投影运算符( C2 ^" B/ g% W- Q6 |
    π S (R)。# H% A3 e3 Y+ M" g0 h' ?. I
    Map函数:对R中的每个元组t,通过剔除属性不在S中的字段得到元组t',输出一
    ! L  F9 _- l/ C* ?: I  C' ~9 m个“键-值”对(t',t')。
    & \& d2 ]% ^& _$ |# GReduce函数:对任意Map任务产生的每个键t',将存在一个或多个“键-值”对) d7 F3 u  c0 f2 U+ a
    (t',t'),Reduce函数将(t',[t',t',…,t'])转换为(t',t'),以保证对该键t'只产; W$ T! F2 g4 U. n/ N5 S  N
    生一个(t',t')对。% `. o3 T9 b; w( \
    Tenzing执行时会做一些优化,例如选择运算符下移到存储层;如果存储层支持$ ~/ K/ o& j. B  z
    列式存储,Tenzing只扫描那些查询执行必须的列。
    2 p( x; K! Y, V% _(2)分组和聚合  V' X! @( P- A6 b6 {
    假定对关系R(A,B,C)按照字段A分组,并计算每个分组中所有元组的字段B
    , k3 l5 N+ ^: Y& n之和。可以采用如下方式实现γ A,SUM(B) (R)。( R2 F$ E( t! y* R# A
    Map函数:对于每个元组,生成“键-值”对(a,b)。
    $ }5 W* a% R& Y! r. K0 O0 H6 e/ _Reduce函数:每个键a代表一个分组,对与键a相关的字段B的值的列表[b 1 ,b 2 ,
    ( ]9 c8 v  j3 g0 z…,b n ]执行SUM操作,输出结果为(a,SUM(b 1 ,b 2 ,…,b n ))。
    4 f4 X8 ]( o# t2 Z3 Y0 i4 fTenzing支持基于哈希的聚合操作,首先,放松底层MapReduce框架的限制,2 f& T* U2 v0 V$ W$ o
    shuffle时保证所有键相同的“键-值”对属于同一个Reduce任务,但是并不要求按照键# `: G% I9 F9 U% G% U
    有序排列。其次,Reduce函数采用基于哈希的方法对数据分组并计算聚合结果。( k' A3 u7 W& u! @
    (3)多表连接) W2 r7 H# M4 c9 r
    大表连接是分布式数据库的难题,MapReduce模型能够有效地解决这一类问题。
    ) I  |& F* c/ @' k0 I9 v% R# J常见的连接算法包括Sort Merge Join、Hash Join以及Nested Loop Join。
    8 g$ h3 O4 ~! I% H假设需要将R(A,B)和S(B,C)进行自然连接运算,即寻找字段B相同的元7 n9 e/ ?1 @2 K& o
    组。可以通过Sort Merge Join实现如下:
    0 o2 l' F" H- X% S9 [Map函数:对于R中的每个元组(a,b),生成“键-值”对(b,(R,a)),对S中的
    + M& d0 ~; v: w每个元组(b,c),生成“键-值”对(b,(S,c))。
    6 |8 O" q+ d; ZReduce函数:每个键值b会与一系列对相关联,这些对要么来自(R,a),要么来2 k; w# m" M& V0 Q$ {* P
    自(S,c)。键b对应的输出结果是(b,[(a 1 ,b,c 1 ),(a 2 ,b,c 2 ),…]),也就是说,与b) I: X6 x, r! k( {$ W
    相关联的元组列表由来自R和S中的具有共同b值的元组组合而成。
    # m% [, O6 M( y如果两张表格都很大,且二者的大小比较接近,Join字段也没有索引,Sort7 }. l. A. }# I  f- s  m: `
    Merge Join往往比较高效。然而,如果一张表格相比另外一张表格要大很多,Hash* a/ `$ T4 _/ p& I, _$ g
    Join往往更加合适。; W$ }* R9 c* a$ [: \# X( x5 d
    假设R(A,B)比S(B,C)大很多,可以通过Hash Join实现自然连接。Tenzing中7 l# t# @& J: r
    一次Hash Join需要执行三个MapReduce任务。
    0 i6 p& H2 c. L% c; Q" O( ~MR1:将R(A,B)按照字段B划分为N个哈希分区,记为R 1 ,R 2 ,…,R N ;! K: P# U# {4 U. @
    MR2:将S(B,C)按照字段B划分为N个哈希分区,记为S 1 ,S 2 ,…,S n ;
    3 ]1 h2 H3 \+ T' JMR3:每个哈希分区<R i ,S i >对应一个Map任务,这个Map任务会将S i 加载到内0 K  U( R$ o* g7 Z& r3 b
    存中。对于R i 中的每个元组(a,b),生成(b,[(a,b,c 1 ),(a,b,c 2 ),…]),其中,, @$ E. l  ]; e
    (b,[c 1 ,c 2 ,…])是S i 中存储的元组。Reduce的作用类似于恒等式,输出每个传入
    2 S, p; F4 `) ~3 H9 B) u; N8 ?的“键-值”对。
    6 @' V; l& P8 Y5 sSort Merge Join和Hash Join适用于两张表格都不能够存放到内存中,且连接列没; `( u6 I4 V0 G1 M8 p  t
    有索引的场景。如果S(B,C)在B列有索引,可以通过Remote Lookup Join实现自然% V/ ]' l. u) |
    连接,如下:
    ; w  m$ j; p: A+ C. FMap函数:对于R中的每个元组(a,b),通过索引查询S(B,C)中所有列值为b5 u2 ^% F" t8 y
    的元组,生成(b,[(a,b,c 1 ),(a,b,c 2 ),…])。  U* S6 Y1 [' ]0 |
    Reduce函数:Reduce的作用类似于恒等式,输出每个传入的“键-值”对。
    ! a+ K6 m. c" B9 [% Q如果S(B,C)能够存放到内存中,那么,Map进程在执行map任务的过程中会将
    * ~' q6 j' J# R! Z6 Z% `8 p" bS(B,C)的所有元组缓存在本地,进一步优化执行效率。另外,同一个Map进程可; b, n5 }4 w( t. t3 w. a7 p3 N' \" ]
    能执行多个map任务,这些map任务共享一份S(B,C)的所有元组缓存。
    ; U: C3 B  w- y; Q0 S13.3.2 Microsoft Dryad
    ; {5 G+ ?- D- c, g3 sMicrosoft Dryad是微软研究院创建的研究项目,主要用来提供一个分布式并行计
    % m0 Z1 z1 o" o7 X) p, H算平台。在Dryad平台上,每个Dryad工作流被表示为一个有向无环图。图中的每个4 l( l' P. Y  |: v
    节点表示一个要执行的程序,节点之间的边表示数据通道中数据的传输方式,其可
    & G# u( R2 a) C* R1 K能是文件、管道、共享内存、网络RPC等。Dryard工作流如图13-3所示。( @/ w: z! g, i
    图 13-3 Dryad工作流5 a# ?  y% m* i8 c7 O3 }% W
    每个节点(vertices)上都有一个处理程序在运行,并且通过数据通道
    , n7 ?/ B+ E7 o% w3 F/ j5 N4 B; L(channels)的方式在它们之间传输数据。类似于Map和Reduce函数,工作流中的7 V2 M) ^1 n' q, ?8 a0 ~" B
    grep、sed、map、reduce、merge等函数可以被很多节点执行,每个节点会被分配一部
    / [9 ~7 \0 d- k分输入。Dryad的主控进程(Job Manager)负责将整个工作分割成多个任务,并分发5 s1 ]8 W5 u, c: W
    给多个节点执行。每个节点执行完任务后通知主控进程,接着,主控进程会通知后
    : P' V$ |3 N- B# S0 O  s& S* A( Y续节点获取前一个节点的输出结果。等到后续节点的输入数据全部准备好后,才可) f1 ?6 C7 n6 z* C( c
    以继续执行后续任务。) }7 ^+ U4 L( b- V
    Dryad与MapReduce具有的共同特性就是,只有任务完成之后才会将输出传递给6 X+ T- M7 T* A1 Q. O* Z& v+ y. H
    接收任务。如果某个任务失败,其结果将不会传递给它在工作流中的任何后续任
    + \' x( p6 R- }/ p" i) |务。因此,主控进程可以在其他计算节点上重启该任务,同时不用担心会将结果重
    * t3 m1 Q: M/ t3 Z复传递给以前传过的任务。* Q- f  A* G$ n) b. \9 F% D
    相比多个MapReduce作业串联模型,Dryad模型的优势在于不需要将每个/ A3 N' s7 ~$ L, m+ A# E# ^
    MapReduce作业输出的临时结果存放在分布式文件系统中。如果先存储前一个2 m3 ^9 x0 M" [& h
    MapReduce作业的结果,然后再启动新的MapReduce作业,那么,这种开销很难避* n- G) X; s9 J( z2 z
    免。. K, v9 U4 C+ \" F+ e& i$ ]' S* ]: }
    13.3.3 Google Pregel; [  ^4 E, w) Y/ M/ M4 R- L1 M
    Google Pregel用于图模型迭代计算,图中的每个节点对应一个任务,每个图节点% [( J2 |3 U5 D. M+ j3 e
    会产生输出消息给图中与它关联的后续节点,而每个节点会对从其他节点传入的输* \/ [6 g, i) w
    入消息进行处理。
    9 l' x2 s+ x( e; dPregel中将计算组织成“超步”(superstep)。在每个超步中,每个节点在上一步) x3 D. J+ M0 {$ {7 R; P- Y$ ^
    收到的所有消息将被处理,并且将处理完后的结果传递给后续节点。
    - ?6 h5 ~) a* n( j, {1 gPregel采用了BSP(Bulk Sychronous Parallel,整体同步并行计算)模型。每个“超/ J4 m! b7 B3 S- s
    步”分为三个步骤:每个节点首先执行本地计算,接着将本地计算的结果发送给图中8 B& k4 a8 G$ v, l
    相邻的节点,最后执行一次栅栏同步,等待所有节点的前两步操作结束。Pregel模型
    6 V5 v3 ?/ [  F  B% O1 f2 A会在每个超步做一次迭代运算,当某次迭代生成的结果没有比上一次更好,说明结
    7 V6 g0 G, L. I6 n果已经收敛,可以终止迭代。" ]  i7 `' r8 U- g
    图 13-4 Pregel BSP计算模型
    1 R, t2 M0 Z/ ~假设有一个带边权重的图,我们的目标是对图中的每个节点计算到其他任一节
    ) b* m3 X$ Z3 c% e: u4 c点的最短路径长度。一开始,每个图节点a都保存了诸如(b,w)对的集合,这表示a# T& Y. o, a& T6 u. N6 i+ P. Y5 x
    到b的边权重为w。$ M) Q( y! J, l3 Q' f5 q! E! P
    (1)超步
    , _7 W7 T5 p- X6 k: A% Q( d每个节点会将(a,b,w)传递给图中与它关联的后续节点。当节点c收到三元组0 s/ C( l# V* ^
    (a,b,w)时,它会重新计算c到b的最短距离,如果w+v<u(假设当前已知的c到a的- {8 r2 W& X) O1 i, z* A
    最短距离为v,c到b的最短距离为u),那么,更新c到b的最短距离为w+v。最后,消
    8 o2 A! C, v& `' V1 D8 H- A息(c,b,w+v)会传递给后续节点。! G* H9 M/ n$ Z3 V# c: _7 U
    (2)终止条件
    % Y' R3 C% M# m+ k8 y, {当所有节点在执行某个超步时都没有更新到其他节点的最短距离时,说明已经
    1 _4 t, D" i4 T计算出想要的结果,整个迭代过程可以结束。
    ; R. H/ `: f/ V. sPregel通过检查点(checkpoint)的方式进行容错处理。它在每执行完一个超步之4 E( ?& i3 E1 f
    后会记录整个计算的现场,即记录检查点情况。检查点中记录了这一轮迭代中每个  O0 L" a% ?( b1 \0 L! Y; ]
    任务的全部状态信息,一旦后续某个计算节点失效,Pregel将从最近的检查点重启整
    ' V/ C2 d7 J) P个超步。尽管上述的容错策略会重做很多并未失效的任务,但是实现简单。考虑到
    , O5 k" L' |( o! n3 _, [. D服务器故障的概率不高,这种方法在大多数时候还是令人满意的。8 P( [( g( m# S' `0 n3 B

    1 S; F1 @1 @+ c0 N
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|Archiver|手机版|小黑屋|Java自学网

    GMT+8, 2024-11-21 20:55 , Processed in 0.140006 second(s), 31 queries .

    Powered by Javazx

    Copyright © 2012-2022, Javazx Cloud.

    快速回复 返回顶部 返回列表