|
13.4 流式计算0 e& L( h: h( Y% k
MapReduce及其扩展解决了离线批处理问题,但是无法保证实时性。对于实时性% S* {+ h7 b& V4 d* A+ h7 A
要求高的场景,可以采用流式计算或者实时分析系统进行处理。) {1 ?* b3 p8 S
流式计算(Stream Processing)解决在线聚合(Online Aggregation)、在线过滤! f, |" v! d/ |. w8 ]
(Online Filter)等问题,流式计算同时具有存储系统和计算系统的特点,经常应用1 g; c: [$ Y& c0 ], w5 r
在一些类似反作弊、交易异常监控等场景。流式计算的操作算子和时间相关,处理) }5 R- \! }3 h
最近一段时间窗口内的数据。
" }. k4 Z! A) f9 m9 `* t* }- n0 \13.4.1 原理, u7 v: b: `! R$ J. n& f: q
流式计算强调的是数据流的实时性。MapReduce系统主要解决的是对静态数据的
# R, u. U3 `) A, E* @% B z0 i批量处理,当MapReduce作业启动时,已经准备好了输入数据,比如保存在分布式文' c( H2 ] j' s( k' \: @$ W
件系统上。而流式计算系统在启动时,输入数据一般并没有完全到位,而是经由外
4 y6 L" E1 u, b" k! ~& o部数据流源源不断地流入。另外,流式计算并不像批处理系统那样,重视数据处理
6 N* X, M) ], X, a( ]6 \: {1 ~! r的总吞吐量,而是更加重视对数据处理的延迟。* x3 L! u( ~ X9 N" h; |1 b0 d3 d
MapReduce及其扩展采用的是一种比较静态的模型,如果用它来做数据流的处
! R$ o1 Y, o# J) C$ f理,首先需要将数据流缓存并分块,然后放入集群计算。如果MapReduce每次处理的/ n" |$ E# y7 U6 v! R' l
数据量较小,缓存数据流的时间较短,但是,MapReduce框架造成的额外开销将会占
9 |1 Y5 [: A+ o很大比重;如果MapReduce每次处理的数据量较大,缓存数据流的时间会很长,无法
6 V7 b7 U) g3 `满足实时性的要求。
3 h$ ^! q2 a8 s/ }( l w流式计算系统架构如图13-5所示。
! R9 G1 B; D8 L图 13-5 流式计算系统 J1 S9 i M$ I p
源数据写入到流处理节点,流处理节点内部运行用户自定义的钩子函数对输入6 o3 z5 s0 q: N& G
流进行处理,处理完后根据一定的规则转发给下游的流处理节点继续处理。另外,0 [% _4 z4 { r1 p/ R2 \" `
系统中往往还有管理节点,用来管理流处理节点的状态以及节点之间的路由规则。
! ^8 T" ^, k" I: `典型钩子函数包括: S3 ?( b* D0 K! H
●聚合函数:计算最近一段时间窗口内数据的聚合值,如max、min、avg、sum、5 w0 C9 B) G( `) m: Z- H
count等。/ F% @* o' n+ e& v" Q8 @5 e
●过滤函数:过滤最近一段时间窗口内满足某些特性的数据,如过滤1秒钟内重# y5 T8 V: t# B' X7 U4 J7 W
复的点击。$ {9 v2 T1 |) y% T
如果考虑机器故障,问题变得复杂。上游的处理节点出现故障时,下游有两种 v' P0 f, j) Y8 p# @
选择:第一种选择是等待上游恢复服务,保证数据一致性;第二种选择是继续处
) L- s7 r" W" K% _7 Y理,优先保证可用性,等到上游恢复后再修复错误的计算结果。
r+ y: B/ V O# @流处理节点可以通过主备同步(Master/Slave)的方式容错,即将数据强同步到
' ]# \9 [% A5 i2 P. e备机,如果主机出现故障,备机自动切换为主机继续提供服务。然而,这种方式的
6 k8 v9 i7 k. Z2 ^4 o9 r2 m代价很高,且流式处理系统往往对错误有一定的容忍度,实际应用时经常选择其他
! M" d0 b8 H2 T. S6 w) E代价更低的容错方式。1 p6 S* g% e& i' j
13.4.2 Yahoo S4' j! m8 u" ?) e6 v: K
Yahoo S4最初是Yahoo为了提高搜索广告有效点击率而开发的一个流式处理系
! E% e. Q# r/ B5 {0 ]" q0 E4 e5 a统。S4的主要设计目标是提供一种简单的编程接口来处理数据流,使得用户可以定- z( C+ _3 O9 ^* k* R& c
制流式计算的操作算子。在容错设计上,S4做得比较简单:一旦S4集群中的某个节
; ]3 B' u4 n, p, }2 e' V4 Q8 F9 Z点故障,会自动切换到另外一个备用节点,但是原节点的内存状态将丢失。这种方
# r$ t+ u" @: c/ {6 t式虽然可能丢失一部分数据,但是成本较低。考虑到服务器故障的概率很低,能够
+ f2 m+ G4 s8 X8 M很好地满足流式计算业务需求。# U8 A$ \# E& \3 L! k. H: ]
S4中每个流处理节点称为一个处理节点(Processing Node,PN),其主要工作是" m, I" I1 P1 L i: {; F! t8 r
监听事件,当事件到达时调用合适的处理元(Processing Elements,PE)处理事件。如) d9 D8 U( V# @3 l; @. {
果PE有输出,则还需调用通信层接口进行事件的分发和输出,如图13-6所示。2 V5 e8 }6 b# y; P9 L
图 13-6 S4处理节点内部模块
6 c" J7 H% V0 R r s& S( i事件监听器(Event Listener)负责监听事件并转交给PE容器(Processing Element
# M' D2 l/ r$ j; hContainer,PEC),由PEC交给合适的PE处理业务逻辑。配置文件中会配置PE原型
( z D+ K K, e$ \(PE prototype),包括其功能、处理的事件类型(event type)、关心的key以及关心
- C- l# ^3 x; Y0 Y* K( b的key值。每个PE只负责处理自己所关心的事件,也就是说,只有当事件类型、key
, p% `( y3 z' i8 e8 _$ ^" A5 X类型和key值都匹配时,才会交由该PE进行计算处理。PE处理完逻辑后根据其定义的4 k3 v6 L& ?/ }5 y: H5 \
输出方法可以输出事件,事件交由分发器(Dispatcher)与通信层(Communication, J5 F, e6 d g* A5 x2 y
Layer)进行交互并由输出器(Emitter)输出至下一个逻辑节点。输出器通过对事件$ [& c. k9 h) Y. ^
的类型、key类型、key值计算哈希值,以路由到配置文件中指定的PN。
N j2 O( l' d4 l6 [$ S通信层提供集群路由(Routing)、负载均衡(Load Balancing)、故障恢复管理" ^7 o2 h6 {% x7 L
(Failover Management)、逻辑节点到物理节点的映射(存放在Zookeeper上)。当检" h) c4 S4 t: L8 ]* L1 h
测到节点故障时,会切换到备用节点,并自动更新映射关系。通信层隐藏的映射使3 e: l9 l1 X3 m3 J+ I6 K" M
得PN发送消息时只需要关心逻辑节点而不用关心物理节点。" D; v" V5 ~- ?0 A
13.4.3 Twitter Storm
' ^: W9 Q$ m k1 c) M' W) qTwitter Storm是目前广泛使用的流式计算系统,它创造性地引入了一种记录级容- g7 g M1 D8 e8 _! V; s( }' _
错的方法。如图13-7所示,Storm系统中包含如下几种角色:7 U' p( `; G# z. `
图 13-7 Storm集群的基本组件" j" t7 {9 P9 K8 c b
●Nimbus:负责资源分配、任务调度、监控状态。Nimbus和supervisor之间的所
. U$ Q6 h# N1 O1 e: z, u2 ~有协调工作都是通过一个Zookeeper集群来完成。" p- J: O7 Y; d+ z- Q# A/ g
●Supervisor:负责接受nimbus分配的任务,启动和停止属于自己管理的Worker进9 ^5 y. {9 k1 U/ s
程。
0 `1 j) Z6 u: E* \0 b●Worker:运行spout/bolt组件的进程。* X- ]4 S0 R) {: L
●Spout:产生源数据流的组件。通常情况下spout会从外部数据源中读取数据,
* J5 \. X( E) K; f0 `然后转换为内部的数据格式。Spout是一个主动的角色,其接口中有个nextTuple()函
6 v; w$ S) L- g# ^0 @* o* R数,Storm框架会不停地调用此函数,用户只要在其中生成源数据即可。
0 d) u. @/ C5 k●Bolt:接受数据然后执行处理的组件。Bolt可以执行过滤、函数操作、合并、% _- a2 ~( p) i. w4 o6 {' B
写数据库等任何操作。Bolt是一个被动的角色,其接口中有个execute(Tuple input)
% @9 d4 T# H) v# q8 J6 `6 k函数,在接受到消息后会调用此函数,用户可以在其中执行自己想要的操作。
+ h7 b" [: S2 O+ l* ~; C每个worker上运行着spolt或者bolt组件,数据从spolt组件流入,经过一系列bolt5 x5 g8 }/ O0 U
组件的处理直到生成用户想要的结果。
5 f+ M2 T- X6 g8 r) p3 }Storm中的一个记录称为tuple,用户在spout中生成一个新的源tuple时可以为其指( d- {! L5 G5 h. x
定一个消息编号(message id),多个源tuple可以共用一个消息编号,表示这多个源
0 C8 V3 a$ k8 k# Rtuple对用户来说是同一个消息单元。Storm的记录级容错机制会告知用户由Spolt发出
5 |5 `% T4 J/ C( g% w1 \的每个消息单元是否在指定时间内被完全处理了,从而允许Splot重新发送出错的消/ B2 \: T+ I" N
息。如图13-8,message1绑定的源tuple1和tuple2经过了bolt1和bolt2的处理后生成两个2 s* l! C, \. K
新的tuple(tuple3和tuple4),并最终都流向bolt3。当这个过程全部完成时,message1
2 s9 W2 j* k/ x& ^0 I+ |: P被完全处理了。Storm中有一个系统级组件,叫做acker。这个acker的任务就是追踪从2 O1 _2 B3 G, @, T9 d/ b$ ~% n
spout中流出来的每一个message绑定的若干tuple的处理路径。Bolt1、bolt2、bolt3每次
# ~; b+ \* \/ ^处理完成一个tuple都会通知acker,acker会判断message1是否被完全处理了,等到完全* b2 d; G- H( v
处理时通知生成message1的spolt。这里存在两个问题:& X5 P# H# E6 o- h( @7 ]
图 13-8 Storm数据流示例) C$ P( D' W" J! J. u, y7 m
1)如何判断message1是否被完全处理了? `( v5 u. v3 ]1 O: A; r2 y G
Acker中保存了message1对应的校验值(64位整数),初始为0。每次发送或者接
! K6 U0 j! |; f8 F5 U; s收一个message1绑定的tuple时都会将tuple的编号与校验值进行异或(XOR)运行,如1 T. E2 }& p9 q! F* ]
果每个发送出去的tuple都被接受了,那么,message1对应校验值一定是0,从而认为5 T! K/ J+ `1 d; {& p
message1被完全处理了。当然,这种方式有一定的误判率,然而考虑到每个tuple的编# O8 Q: b% I/ w: T" G
号为64位整数,这种概率很低。) s. A) _0 O3 W
2)系统中有很多acker实例,如何选择将message1发给哪个实例?
+ W) _( k) E& f, u9 ^: u" k+ hStorm中采用一致性哈希算法来计算message1对应的acker实例。如果acker出现性
/ [, [& Y, m# b( L能瓶颈,只需要往系统中加入新的acker实例即可。
/ V$ G8 k3 J5 d4 |5 h
- a; \: c( V, X! ?) c; u/ h- ?, h# O1 } d3 I" F+ I+ T+ R
|
|