java自学网VIP

Java自学网

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 2694|回复: 0

《大规模分布式存储系统》第13章 大数据【13.4】

[复制链接]
  • TA的每日心情
    开心
    2021-5-25 00:00
  • 签到天数: 1917 天

    [LV.Master]出神入化

    2096

    主题

    3754

    帖子

    6万

    积分

    管理员

    Rank: 9Rank: 9Rank: 9

    积分
    66788

    宣传达人突出贡献优秀版主荣誉管理论坛元老

    发表于 2017-3-20 19:43:57 | 显示全部楼层 |阅读模式
    13.4 流式计算
    7 x5 i1 P" q# @) U4 lMapReduce及其扩展解决了离线批处理问题,但是无法保证实时性。对于实时性, Z8 ]6 G6 \/ h. \; D- G% U/ a! p
    要求高的场景,可以采用流式计算或者实时分析系统进行处理。5 r! ]5 s8 [2 i
    流式计算(Stream Processing)解决在线聚合(Online Aggregation)、在线过滤
    + n! k/ u! ~5 R& W: J' ]5 ](Online Filter)等问题,流式计算同时具有存储系统和计算系统的特点,经常应用' y/ k4 Z& \! j7 M& w. K
    在一些类似反作弊、交易异常监控等场景。流式计算的操作算子和时间相关,处理
    ' j4 W9 D. T; B) g5 f9 h最近一段时间窗口内的数据。
      ]/ A/ ]/ C- Y2 Z" v) G13.4.1 原理' }' a  v9 H' s' c. t) H: `# z) f
    流式计算强调的是数据流的实时性。MapReduce系统主要解决的是对静态数据的% W, l3 S' d2 c
    批量处理,当MapReduce作业启动时,已经准备好了输入数据,比如保存在分布式文0 f. @5 c9 b) h3 @* j
    件系统上。而流式计算系统在启动时,输入数据一般并没有完全到位,而是经由外
    . k! G6 t0 U. T) ^9 `; G, i% J' X部数据流源源不断地流入。另外,流式计算并不像批处理系统那样,重视数据处理  [! |/ L9 \8 x$ B0 a
    的总吞吐量,而是更加重视对数据处理的延迟。7 j% o% T4 ~, Z; R* [5 [
    MapReduce及其扩展采用的是一种比较静态的模型,如果用它来做数据流的处
    , m8 ^% b& z; F: q0 ?理,首先需要将数据流缓存并分块,然后放入集群计算。如果MapReduce每次处理的
    1 u7 u  B6 M8 H* I/ K  B! P- I数据量较小,缓存数据流的时间较短,但是,MapReduce框架造成的额外开销将会占. ]7 ]& ^2 R& X; [" `6 w
    很大比重;如果MapReduce每次处理的数据量较大,缓存数据流的时间会很长,无法
      v" G  Y/ X  {: M4 J) \满足实时性的要求。
    4 T0 E: E! m& d* \3 u8 W流式计算系统架构如图13-5所示。
    ; I& J& ~* R5 K0 |8 {图 13-5 流式计算系统
    ' j  q7 K) x" f2 a7 }源数据写入到流处理节点,流处理节点内部运行用户自定义的钩子函数对输入3 x% H. ?* K. @& ]
    流进行处理,处理完后根据一定的规则转发给下游的流处理节点继续处理。另外,
    9 r0 a1 X% |0 {3 a8 q8 R/ u系统中往往还有管理节点,用来管理流处理节点的状态以及节点之间的路由规则。
    3 C3 Y- Z7 H4 V& x" C" L- y+ `典型钩子函数包括:! ?$ c, z+ {- P
    ●聚合函数:计算最近一段时间窗口内数据的聚合值,如max、min、avg、sum、
    # f% B" I0 A; K; ocount等。
    , R: O5 O$ A7 o( i! O) c: p●过滤函数:过滤最近一段时间窗口内满足某些特性的数据,如过滤1秒钟内重
    ; T& s, \$ n; h; R复的点击。  t' ?+ I. q" x. S4 u9 K
    如果考虑机器故障,问题变得复杂。上游的处理节点出现故障时,下游有两种
    ' {$ Q+ n0 T! |5 E2 m5 {2 O选择:第一种选择是等待上游恢复服务,保证数据一致性;第二种选择是继续处
    2 U* q9 z. Y6 N# u- O) n4 p理,优先保证可用性,等到上游恢复后再修复错误的计算结果。6 M% n$ R& E7 b- i0 x8 y8 X4 c
    流处理节点可以通过主备同步(Master/Slave)的方式容错,即将数据强同步到& p# D7 `& w1 L# V( ~
    备机,如果主机出现故障,备机自动切换为主机继续提供服务。然而,这种方式的, _1 T" X7 V" s- f6 o
    代价很高,且流式处理系统往往对错误有一定的容忍度,实际应用时经常选择其他
    ; A; g+ w! _+ f5 r/ F; M" r代价更低的容错方式。# `  Y9 \  O, C3 g0 Z. T- [+ {
    13.4.2 Yahoo S44 H* `% V6 Y) a6 \" {+ G
    Yahoo S4最初是Yahoo为了提高搜索广告有效点击率而开发的一个流式处理系
    # `. h$ x9 A0 o$ w( |% q% Z) P5 e1 n统。S4的主要设计目标是提供一种简单的编程接口来处理数据流,使得用户可以定$ \0 {+ i) n- A4 F4 X$ [# l" d
    制流式计算的操作算子。在容错设计上,S4做得比较简单:一旦S4集群中的某个节
    , `( d  Q3 M1 p  B点故障,会自动切换到另外一个备用节点,但是原节点的内存状态将丢失。这种方! m& j1 @( I" j# h/ p$ O  O
    式虽然可能丢失一部分数据,但是成本较低。考虑到服务器故障的概率很低,能够
    ) K; s8 _. `' V, h7 Y/ d6 ~; @6 e很好地满足流式计算业务需求。7 Y, _; Z3 r6 W* P* O
    S4中每个流处理节点称为一个处理节点(Processing Node,PN),其主要工作是& c5 G+ \( o- }7 ^  g; S: A
    监听事件,当事件到达时调用合适的处理元(Processing Elements,PE)处理事件。如
    1 R8 x/ [0 H% W- \7 x果PE有输出,则还需调用通信层接口进行事件的分发和输出,如图13-6所示。5 `+ }0 G  @* q( F2 n; y
    图 13-6 S4处理节点内部模块
    ! [. R8 B* _0 f1 N5 T8 F& H事件监听器(Event Listener)负责监听事件并转交给PE容器(Processing Element
    ) [0 |7 k" D: {/ P  hContainer,PEC),由PEC交给合适的PE处理业务逻辑。配置文件中会配置PE原型/ l% ?( G* w1 _! k
    (PE prototype),包括其功能、处理的事件类型(event type)、关心的key以及关心1 K' O" s! b% U. S+ \9 n! A
    的key值。每个PE只负责处理自己所关心的事件,也就是说,只有当事件类型、key
    * s* p, ?8 i/ ^1 T9 N) H! x1 Q类型和key值都匹配时,才会交由该PE进行计算处理。PE处理完逻辑后根据其定义的
    8 s( U5 b! D& K9 c5 c% i9 R" y输出方法可以输出事件,事件交由分发器(Dispatcher)与通信层(Communication
    % F! t. v1 g; k2 F; S. R6 }Layer)进行交互并由输出器(Emitter)输出至下一个逻辑节点。输出器通过对事件
    ' K7 e, F5 a7 F: s" z2 W% P的类型、key类型、key值计算哈希值,以路由到配置文件中指定的PN。2 ?1 l3 g0 _! M* e* [
    通信层提供集群路由(Routing)、负载均衡(Load Balancing)、故障恢复管理9 j( L5 I4 p6 l! ?; `4 G
    (Failover Management)、逻辑节点到物理节点的映射(存放在Zookeeper上)。当检
    . w% O1 K% S$ P$ a测到节点故障时,会切换到备用节点,并自动更新映射关系。通信层隐藏的映射使: z1 c% o; D$ {- ]$ V
    得PN发送消息时只需要关心逻辑节点而不用关心物理节点。
    1 O1 f& j4 a* a1 l- V: ~7 S( Z13.4.3 Twitter Storm
    ' x) u# Q% x- C. N+ p* {Twitter Storm是目前广泛使用的流式计算系统,它创造性地引入了一种记录级容
    # }5 W" f1 u- O# A$ p" P错的方法。如图13-7所示,Storm系统中包含如下几种角色:! l* t7 v7 _8 V9 a* @1 K
    图 13-7 Storm集群的基本组件9 A$ [/ r; E! \# ?
    ●Nimbus:负责资源分配、任务调度、监控状态。Nimbus和supervisor之间的所
    ) I2 h& @4 Z6 P1 L9 c" J2 @' J有协调工作都是通过一个Zookeeper集群来完成。' B1 a+ b# y, @0 t7 [
    ●Supervisor:负责接受nimbus分配的任务,启动和停止属于自己管理的Worker进7 O8 V& f3 E1 }) Q; E/ ~
    程。5 f% x4 t' ]- `. {7 H
    ●Worker:运行spout/bolt组件的进程。
    : M$ \3 j$ Z$ S9 V1 [●Spout:产生源数据流的组件。通常情况下spout会从外部数据源中读取数据,
    $ k% T+ g' C/ m8 {( v然后转换为内部的数据格式。Spout是一个主动的角色,其接口中有个nextTuple()函+ X' g& }+ J* g( c3 p2 E3 E
    数,Storm框架会不停地调用此函数,用户只要在其中生成源数据即可。2 Y7 a) s! u/ b$ P3 r0 v
    ●Bolt:接受数据然后执行处理的组件。Bolt可以执行过滤、函数操作、合并、; U/ A4 `' v  s! b* I. r2 w! {4 C
    写数据库等任何操作。Bolt是一个被动的角色,其接口中有个execute(Tuple input)* @0 D5 L" D# A/ f, t9 N
    函数,在接受到消息后会调用此函数,用户可以在其中执行自己想要的操作。8 W6 a, v3 i* L* L
    每个worker上运行着spolt或者bolt组件,数据从spolt组件流入,经过一系列bolt
    ) g7 k* C+ m3 [7 M8 C9 J组件的处理直到生成用户想要的结果。) x: p" A5 ^1 {& b( f( P1 H+ a% c
    Storm中的一个记录称为tuple,用户在spout中生成一个新的源tuple时可以为其指# C% ~, c8 J4 M, ^" k# g
    定一个消息编号(message id),多个源tuple可以共用一个消息编号,表示这多个源
    % C7 H6 |$ a$ Y& J; r% f8 htuple对用户来说是同一个消息单元。Storm的记录级容错机制会告知用户由Spolt发出6 P4 E- z4 f, o! y- P7 v: h, H& e
    的每个消息单元是否在指定时间内被完全处理了,从而允许Splot重新发送出错的消+ I! H* j0 |" M* J$ `
    息。如图13-8,message1绑定的源tuple1和tuple2经过了bolt1和bolt2的处理后生成两个
    7 R* Q  P4 c9 \8 e/ v0 a' l: N新的tuple(tuple3和tuple4),并最终都流向bolt3。当这个过程全部完成时,message1
    " A0 C9 b, d$ F9 Q" H' k被完全处理了。Storm中有一个系统级组件,叫做acker。这个acker的任务就是追踪从1 V( I% V% s' v
    spout中流出来的每一个message绑定的若干tuple的处理路径。Bolt1、bolt2、bolt3每次) ?# t& v1 S; b: h) I% Z% Y6 G
    处理完成一个tuple都会通知acker,acker会判断message1是否被完全处理了,等到完全7 _# E# V5 ~1 h& V3 q* x
    处理时通知生成message1的spolt。这里存在两个问题:
    1 l$ s% R' X  W0 j: a* H" O2 g图 13-8 Storm数据流示例$ X* N" f, Y8 v- c) a
    1)如何判断message1是否被完全处理了?
    2 V( X* L; |7 ~& ~* s: a9 j: AAcker中保存了message1对应的校验值(64位整数),初始为0。每次发送或者接
    * C& x! z+ `. G9 B8 {$ z收一个message1绑定的tuple时都会将tuple的编号与校验值进行异或(XOR)运行,如$ v7 }  q6 y. s6 F( S* h
    果每个发送出去的tuple都被接受了,那么,message1对应校验值一定是0,从而认为0 h- A3 @2 A6 h" T
    message1被完全处理了。当然,这种方式有一定的误判率,然而考虑到每个tuple的编) I$ [; W) _3 i: Y6 X; W1 a& v
    号为64位整数,这种概率很低。8 g1 r% O& U5 J  w
    2)系统中有很多acker实例,如何选择将message1发给哪个实例?
    . A" A( ^8 M6 ZStorm中采用一致性哈希算法来计算message1对应的acker实例。如果acker出现性
    5 M# U0 E3 n( H' Y, L, k9 N% @: R能瓶颈,只需要往系统中加入新的acker实例即可。! L! x* Y% @$ m/ Q; C
    ) a9 L9 ]- p7 s+ b; z0 f
    4 b7 `6 ?' u& f: [/ S. u
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|Archiver|手机版|小黑屋|Java自学网

    GMT+8, 2025-4-1 14:14 , Processed in 0.361516 second(s), 31 queries .

    Powered by Javazx

    Copyright © 2012-2022, Javazx Cloud.

    快速回复 返回顶部 返回列表