|
9.4 ChunkServer实现机制( p+ S7 h; i2 B$ p3 {3 @
ChunkServer用于存储基线数据,它由如下基本部分组成:" n- `: ^, ]3 ~1 ]7 n8 p) D
●管理子表,主动实现子表分裂,配合RootServer实现子表迁移、删除、合并;
0 P2 k9 Y$ p! _6 b! T, m●SSTable,根据主键有序存储每个子表的基线数据;2 {: Z0 l1 ]+ T
●基于LRU实现块缓存(Block cache)以及行缓存(Row cache);
" |' Y# m. p" N●实现Direct IO,磁盘IO与CPU计算并行化;; t; D# D k* `& N3 M2 p
●通过定期合并&数据分发获取UpdateServer的冻结数据,从而分散到整个集
: O2 ?9 ?. [4 l# I" B- v; J# [0 E群。
. _7 Y2 g3 y0 b. d4 E; M3 B8 v每台ChunkServer服务着几千到几万个子表的基线数据,每个子表由若干个
: W- h1 H' J0 J+ k" m ?. YSSTable组成(一般为1个)。下面从SSTable开始介绍ChunkServer的内部实现。% ^4 S. a& r" w! e0 T5 p
9.4.1 子表管理- k5 o1 f7 o& M, B# a3 u3 l k @
每台ChunkServer服务于多个子表,子表的个数一般在10000~100000之间。. X3 N. n" F+ h0 {6 h+ T
Chunk-Server内部通过ObMultiVersionTabletImage来存储每个子表的索引信息,包括数: u2 k% ]7 R/ @2 w
据行数(row_count),数据量(occupy_size),校验和(check_sum),包含的) t/ }" e+ y8 ?- |& f. s' D1 t4 E
SSTable列表,所在磁盘编号(disk_no)等,代码如下:( G" `8 ]8 a! [* g* \; D
class ObMultiVersionTabletImage
: h5 e+ Z- C5 c9 g, }, U{
. z0 r L9 G! @+ O6 T$ [- ~public:
3 ]0 y8 q0 o/ d; s//获取第一个包含指定数据范围的子表
( M% T. @ l9 A//@param[in]range数据范围
, Z: K' G: H. Q( r//@param[in]scan_direction正向扫描(默认)还是逆向扫描
# b6 {9 N4 j/ p/ S8 T0 Z. \//@param[in]version子表的版本号
5 h' q4 {- i& `8 x4 e6 T0 b7 y, q( `//@param[out]tablet获取的子表索引结构0 e- y5 r/ h. }& w# h4 f
int acquire_tablet(const ObNewRange&range,const ScanDirection- \8 j- \$ k+ G
scan_direction,const int64_t version,ObTablet*&tablet)const;
, Z! }" S7 r5 \* ]3 l8 p- S: Z//释放一个子表: @# O2 o/ N# I2 u! W4 [, U
int release_tablet(ObTablet*tablet);
; Q6 o5 i2 E" I! ?3 J//新增一个子表,load_sstable表示是否立即加载其中的SSTable文件. J) L/ R3 O* g* m
int add_tablet(ObTablet*tablet,const bool load_sstable=false);
% G% l z$ V E. A//每日合并后升级子表到新版本,load_sstable表示是否立即加载新版本的SSTable文件
) Q5 l; q6 f$ x# q1 |: G0 m9 ^int upgrade_tablet(ObTablet*old_tablet,ObTablet*new_tablet,const bool# ]6 ~- H5 D- m: B
load_sstable=false);; `$ [$ O: r" n; _
//每日合并后升级子表到新版本,且子表发生分裂,有一个变成多个。load_sstable表示是否立即加载, x- Z' z9 {# |# u1 r) _6 u
分裂后的SSTable文件0 c; N! L2 i0 |+ e. J7 S& c
int upgrade_tablet(ObTablet*old_tablet,ObTablet*new_tablets[],const int32_t: O. q; ]) B* d" t/ i
split_size,const bool load_sstable=false);
# N0 b+ h3 R- j2 g//删除一个指定数据范围和版本的子表+ O( q1 {$ [ x P
int remove_tablet(const ObNewRange&range,const int64_t version);+ Q1 }) l, l4 p9 q
//删除一个表格对应的所有子表8 O/ E. n" H( e# K& p
int delete_table(const uint64_t table_id);7 H% `% @" c4 ~; A9 v8 d
//获取下一批需要进行每日合并的子表2 S& M: Y1 V5 h
//@param[in]version子表的版本号
! H0 J1 `# T7 P1 F" y//@param[out]size下一批需要进行每日合并的子表个数8 O% O! |% \/ V. h* z* g: v
//@param[out]tablets下一批需要进行每日合并的子表索引结构6 t1 [9 N k+ @7 ? B/ Q
int get_tablets_for_merge(const int64_t version,int64_t&size,ObTablet*&: ~; G b( v" x J
tablets[])const;
0 c2 d: V* Y: W2 m};
( L: }; J* _9 N' S1 wChunkServer维护了多个版本的子表数据,每日合并后升级子表的版本号。如果* ~8 W2 R j0 o
子表发生分裂,每日合并后将由一个子表变成多个子表。子表相关的操作方法包
+ s% r! o, S! l2 Y; @0 S0 T, ]括:
% `4 i- H# r6 n1)add_tablet:新增一个子表。如果load_sstable参数为true,那么,立即加载其
; ~3 D2 a0 c4 p; f7 `中的SSTable文件。否则,使用延迟加载策略,即读取子表时再加载其中的SSTable。" R* s2 K* h" L% F$ d7 D- K
2)remove_tablet:删除一个子表。RootServer发现某个子表的副本数过多,则会
k8 w* m) m1 q% \, y通知其中某台ChunkServer删除指定的子表。* v U% ~1 j3 ]8 a; h3 K
3)delete_table:删除表格。用户执行删除表格命令时,RootServer会通知每台
M& P0 W: A& ]0 t5 `ChunkServer删除表格包含的所有子表。
+ K+ c y- i3 W& K6 E0 p4)upgrade_tablet:每日合并后升级子表的版本号。如果没有发生分裂,只需要5 n |+ N/ G6 `5 V: q
将老子表的版本号加1;否则,将老子表替换为多个范围连续的新子表,每个新子表
. r4 i& N6 b) {' {& N) {的版本号均为老子表的版本号加1。5 m3 n( [! v- |- T- q
5)acquire_tablet/release_tablet:读取时首先调用acquire_tablet获取一个子表,增
) L) {1 ^% `) q& U* J2 q加该子表的引用计数从而防止它在读取过程中被释放掉,接着读取其中的SSTable,
# C4 [7 E+ O* X! u; q, t7 B% V最后调用release_tablet释放子表。
- o" q. I. q# }' a& v+ N: Z0 Y! {6)get_tablets_for_merge:每日合并时通过调用该函数获取下一批需要进行每日
# I/ ~4 w. }0 N合并的子表。
+ ^2 g7 ?" p) G/ q# I) h( y# V9.4.2 SSTable/ l0 i8 `8 w3 [, h" D" x" ~! {4 ~4 N
如图9-8所示,SSTable中的数据按主键排序后存放在连续的数据块(Block)
' f* s) J/ C* O9 ]5 f中,Block之间也有序。接着,存放数据块索引(Block Index),由每个Block最后一! g- {7 K* w# ?. u( _# _7 H @
行的主键(End Key)组成,用于数据查询中的Block定位。接着,存放布隆过滤器6 w6 _2 }: F5 W
(Bloom Filter)和表格的Schema信息。最后,存放固定大小的Trailer以及Trailer的偏
1 e; _1 N: I7 I移位置。/ {# V2 s* L: m5 X
图 9-8 SSTable格式
9 w+ t' r9 [; M查找SSTable时,首先从子表的索引信息中读取SSTable Trailer的偏移位置,接着
- V- L$ }* [' N5 Z' H获取Trailer信息。根据Trailer中记录的信息,可以获取块索引的大小和偏移,从而将1 j, Q; `& F7 Q/ A
整个块索引加载到内存中。根据块索引记录的每个Block的最后一行的主键,可以通% H- C. K$ ^( a* G A R
过二分查找定位到查找的Block。最后将Block加载到内存中,通过二分查找Block中
( p; }6 \- ^/ E- e e9 B记录的行索引(Row Index)查找到具体某一行。本质上看,SSTable是一个两级索引
6 e' Q9 Y( Y! a% o4 W% H结构:块索引以及行索引;而整个ChunkServer是一个三级索引结构:子表索引、块
' y) i% @8 U$ m( T索引以及行索引。3 n' E7 T# }1 X4 J" f# N [
SSTable分为两种格式:稀疏格式以及稠密格式。对于稀疏格式,某些列可能存
4 V5 d; o% P* w1 Z在,也可能不存在,因此,每一行只存储包含实际值的列,每一列存储的内容为:) o$ q! N4 ^8 A7 S
<列ID,列值>(<Column ID,Column Value>);而稠密格式中每一行都需要存储8 g+ T/ _) x$ A% s$ [
所有列,每一列只需要存储列值,不需要存储列ID,这是因为列ID可以从表格
! E! Q }% S0 {4 _4 r3 M: g2 |9 d! rSchema中获取。# D$ ^7 d0 y% H R0 g
例9-4 假设有一张表格包含10列,列ID为1~10,表格中有一行的数据内容4 P% A; T+ x2 T3 ~. f: |2 ]& `
为:
7 O6 w8 e1 H5 l/ |: [' t那么,如果采用稀疏格式存储,内容为:<2,20>,<3,30>,<5,50>,
3 Q2 i% ]( I, j) b' P+ @3 W$ e; i<7,70>,<8,80>;如果采用稠密格式存储,内容为:null,20,30,null,8 r' d0 ^5 s9 i1 B5 Q( g* m, B! K% S# f
50,null,70,80,null,null。/ {; L9 {( }4 ^9 y* a2 W
ChunkServer中的SSTable为稠密格式,而UpdateServer中的SSTable为稀疏格式,
4 N4 e) S$ e. w$ @ V且存储了多张表格的数据。另外,SSTable支持列组(Column Group),将同一个列; A& w- Q4 W: j% K4 X- M$ N" j
组下的多个列的内容存储在一块。列组是一种行列混合存储模式,将每一行的所有' G& E; J1 }6 t2 ~
列分成多个组(称为列组),每个列组内部按行存储。" V$ Z; T8 B! M" r1 Q; D2 \
如图9-9所示,当一个SSTable中包含多个表格/列组时,数据按照[表格ID,列组
- d6 w6 f% @: t8 Y8 U( {* ], kID,行主键]([table_id,column group id,row_key])的形式有序存储。4 o' [! W. z3 d* A# ~' Z
图 9-9 SSTable包含多个表格/列组
* r8 c2 `- L, S另外,SSTable支持压缩功能,压缩以Block为单位。每个Block写入磁盘之前调
4 W+ j& S9 h1 C, ^# i2 y用压缩算法执行压缩,读取时需要解压缩。用户可以自定义SSTable的压缩算法,目
9 W& m& X/ c9 ?. a* H% N6 [8 S; Y$ B前支持的算法包括LZO以及Snappy。
# C! g/ ?- X0 L# d8 `- JSSTable的操作接口分为写入和读取两个部分,其中,写入类为" C2 B/ d- E: `6 K3 e' T* E
ObSSTableWriter,读取类为ObSSTableGetter(随机读取)和ObSSTableScanner(范围& Y! P7 Z8 S- q* P2 t6 D# J
查询)。代码如下:
, G% M/ t/ t8 p, X+ Uclass ObSSTableWriter; j/ G1 L% O: A) \6 W9 D
{( J$ R/ t& H! z. |7 f
public:
: A1 ]* I( C; p. k( r8 V( _//创建SSTable+ t6 v; G) |& I/ y# R) s0 m
//@param[in]schema表格schema信息9 t/ Q q x" d, t
//@param[in]path SSTable在磁盘中的路径名: \: j6 J6 |; j/ _ w6 l! y& O
//@param[in]compressor_name压缩算法名3 I4 b P7 G5 t
//@param[in]store_type SSTable格式,稀疏格式或者稠密格式
s3 U/ g+ @& W. V' p//@param[in]block_size块大小,默认64KB& c) D* m% C) f2 J0 E" R6 ]4 V
int create_sstable(const ObSSTableSchema&schema,const ObString&path,const# O5 f2 n8 L/ P* B4 g
ObString&compressor_name,const int store_type,const int64_t block_size);
* w5 v8 a; L7 f0 e. @1 U4 O//往SSTable中追加一行数据
% n+ f2 c3 q( I//@param[in]row一行SSTable数据* U' o: r, r/ @
//@param[out]space_usage追加完这一行后SSTable大致占用的磁盘空间) `0 H# A' H1 s
int append_row(const ObSSTableRow&row,int64_t&space_usage);
' N9 m: i) c- S7 s, v3 m) W//关闭SSTable,将往磁盘中写入Block Index,Bloom Filter,Schema,Trailer等信息/ x. O5 k5 Y+ |& B2 K
//@param[out]trailer_offset返回SSTable的Trailer偏移量# s9 }! U! G' D6 _/ y, B
int close_sstable(int64_t&trailer_offset);
/ S- T* X3 ]# L& v* h# h( S% Q# g, d6 D};) E: f/ L" q, T1 K
定期合并&数据分发过程将产生新的SSTable,步骤如下: m! y" w- U, |4 a/ j9 k+ A# h
1)调用create_sstable函数创建一个新的SSTable;
3 Q' s( [) b- |. v5 e# n2)不断调用append_row函数往SSTable中追加一行行数据;
; p. e0 T5 q* I' {1 |$ {3)调用close_sstable完成SSTable写入。
8 h- A* Q: y7 j8 ?与9.2.1节中的MemTableIterator一样,ObSSTableGetter和ObSSTableScanner实现了
2 Y/ @0 H& i" t1 U& W迭代器接口,通过它可以不断地获取SSTable的下一个cell。, `4 W$ T. ~4 _5 Q g
class ObIterator
- N- p9 s! s5 K3 L, T* a/ r{1 J- a, B, z' c# f: _/ y, H
public:
+ P- A0 l3 R; d0 l3 `; t//迭代器移动到下一个cell
i/ U+ b \: Y+ i5 Hint next_cell();4 F8 d# X C/ r" Z
//获取当前cell的内容
" c9 k! u1 `4 N+ w* g$ ^//@param[out]cell_info当前cell的内容,包括表名(table_id),行主键(row_key),列编号
; s" N7 o' F E% u(column_id)以及列值(column_value)" J) g. n! q/ B2 n* ^& H+ X7 v
int get_cell(ObCellInfo**cell_info);* T1 J2 {$ [5 J* T. Q+ n
//获取当前cell的内容
0 z0 m* U! q/ n7 r! s& H//@param[out]cell_info当前cell的内容
: _# u" z/ E& Z//@param is_row_changed是否迭代到下一行
4 Q E$ B5 K% n1 \/ e: |int get_cell(ObCellInfo**cell_info,bool*is_row_changed);
# o3 D9 Q" U; U# M( f" p};1 R0 M' f$ m- S+ i
OceanBase读取的数据可能来源于MemTable,也可能来源于SSTable,或者是合7 Y6 @- u( g$ I7 L. }( @" @4 c
并多个MemTable和多个SSTable生成的结果。无论底层数据来源如何变化,上层的读
7 D, }/ {) @0 Y9 I, t取接口总是ObIterator。; F' z) o ?7 E8 b! ?3 G
9.4.3 缓存实现
3 r$ o) \% m+ M/ F- s5 w9 j8 YChunkServer中包含三种缓存:块缓存(Block Cache)、行缓存(Row Cache)以! u7 L9 ^: ~ Y
及块索引缓存(Block Index Cache)。其中,块缓存中存储了SSTable中访问较热的数
/ @( E2 T7 `5 r M据块(Block),行缓存中存储了SSTable中访问较热的数据行(Row),而块索引缓
5 S3 O: b1 `2 l* A3 O3 R; w2 C/ i存中存储了最近访问过的SSTable的块索引(Block Index)。一般来说,块索引不会6 O9 u* h6 Y& p; f* R0 f7 R8 f
太大,ChunkServer中所有SSTable的块索引都是常驻内存的。不同缓存的底层采用相
& V3 d' Q4 l, D5 Z# u同的实现方式。
/ w% r' f9 g3 C# U9 m7 P1.底层实现) c" {' V( Y$ y* m
经典的LRU缓存实现包含两个部分:哈希表和LRU链表,其中,哈希表用于查找
! D& U, E5 m1 L; O+ L8 m. T' x$ ^" P缓存中的元素,LRU链表用于淘汰。每次访问LRU缓存时,需要将被访问的元素移动6 ^* c3 y [! T, p2 W
到LRU链表的头部,从而避免被很快淘汰,这个过程需要锁住LRU链表。6 ^" [4 | i5 U$ v9 R3 E9 g
如图9-10所示,块缓存和行缓存底层都是一个Key-Value Cache,实现步骤如下:
( r, z |$ k, O/ \, K" J图 9-10 Key-Value Cache的实现
4 _" {/ Q& _- ~$ }7 {1)OceanBase一次分配1MB的连续内存块(称为memblock),每个memblock包
. }+ Q; g* Q& ?$ Q3 w0 _3 U1 Q含若干缓存项(item)。添加item时,只需要简单地将item追加到memblock的尾部;
. M+ w; u# I4 Y6 v另外,缓存淘汰以memblock为单位,而不是以item为单位。
! v& I! P0 q9 q. H2)OceanBase没有维护LRU链表,而是对每个memblock都维护了访问次数和最
, l1 L; _( H/ j+ s% c近频繁访问时间。访问memblock中的item时将增加memblock的访问次数,如果最近一% v" H1 m! N$ Q6 v" y$ S
段时间之内的访问次数超过一定值,那么,更新最近频繁访问时间;淘汰memblock( `; H/ A- k4 R7 b1 o% c
时,对所有的memblock按照最近频繁访问时间排序,淘汰最近一段时间访问较少的
( m$ x- G2 }( U/ ]7 R# rmemblock。可以看出,读取时只需要更新memblock的访问次数和最近频繁访问时6 X1 O6 s. m, s/ j6 W4 Q; d
间,不需要移动LRU链表。这种实现方式通过牺牲LRU算法的精确性,来规避LRU链) Q9 Q& _$ A8 u+ [, N: {/ L3 J) _3 |2 `
表的全局锁冲突。
& s( o# q; p2 k& S5 |3)每个memblock维护了引用计数,读取缓存项时所在memblock的引用计数加
6 f( O5 I4 j a8 }! l6 ~. @1,淘汰memblock时引用计数减1,引用计数为0时memblock可以回收重用。通过引用/ V' J- X7 t- p) G4 {
计数,实现读取memblock中的缓存项不加锁。# L: _* w5 T0 q" \
2.惊群效应0 v5 Y2 V: B4 ?7 L( ?
以行缓存为例,假设ChunkServer中有一个热点行,ChunkServer中的N个工作线2 C5 C& n5 }2 c( _; K- u0 o" f. ?
程(假设为N=50)同时发现这一行的缓存失效,于是,所有工作线程同时读取这行
5 Z: Y5 p1 W1 G- [$ ~; s" D2 U数据并更新行缓存。可以看出,N-1共49个线程不仅做了无用功,还增加了锁冲突。& ^$ h! l9 f( A1 R7 z0 U
这种现象称为“惊群效应”。为了解决这个问题,第一个线程发现行缓存失效时会往/ h/ \8 Y0 H* V7 [
缓存中加入一个fake标记,其他线程发现这个标记后会等待一段时间,直到第一个线! i" {/ H9 z# E9 E3 }8 P. c
程从SSTable中读到这行数据并加入到行缓存后,再从行缓存中读取。
5 o/ g# C) Y/ Z8 c' y算法描述如下:
* m, I1 @$ `' O$ W) i$ |$ q. a调用internal_get读取一行数据;* k5 J$ f1 x9 z" `# O
if(行不存在){+ m" ]" p6 A% @% x2 b
调用internal_set往缓存中加入一个fake标记;
3 v* o/ \" U1 P1 H从SSTable中读取数据行;8 u# }# j: X' n% J
将SSTable中读到的行内容加入缓存,清除fake标记,唤醒等待线程;. C# m; y: A4 v, {2 p, U( i/ F0 A
返回读到的数据行;
3 ^8 \2 K+ o z0 s}else if(行存在且为fake标记)" t* L% J& V( D* L/ s
{
) t r, c- G3 W5 y% l" _6 }线程等待,直到清除fake标记;
! w5 u; m h: Z) \0 V4 Z/ |" e" oif(等待成功)返回行缓存中的数据;4 L& t+ |. h. r! u) T
if(等待超时)返回读取超时;5 V r; a5 B5 T) j# u9 x
} s& t6 v6 W) r; v* {
else0 ]8 P* S+ Y" O! p: p
{
7 h q* \: A" e/ l8 V& l& J' S返回行缓存中的数据;) {$ W8 ^+ U# Z% l, S1 X
}
5 Y! X) X6 q4 ]! q* q* G3.缓存预热
/ L: a" x# ]7 F" T5 VChunkServer定期合并后需要使用生成的新的SSTable提供服务,如果大量请求同
5 l3 u% p) F7 b* \/ g时读取新的SSTable文件,将使得ChunkServer的服务能力在切换SSTable瞬间大幅下
e8 M) Q) L. d. G: F" T降。因此,这里需要一个缓存预热的过程。OceanBase最初的版本实现了主动缓存预! m' K/ N& ^. [! y+ X* W9 m
热,即:扫描原来的缓存,根据每个缓存项的key读取新的SSTable并将结果加入到新
& Z- `$ K; Y) l, n" }& V* E; k" b的缓存中。例如,原来缓存数据项的主键分别为100、200、500,那么只需要从新的
& V4 [' g: L. w! X oSSTable中读取主键为100、200、500的数据并加入新的缓存。扫描完成后,原来的缓5 l& I+ ?; u" d- |( ]4 O+ p9 _
存可以丢弃。
- d. ?7 ^" g& E l线上运行一段时间后发现,定期合并基本上都安排在凌晨业务低峰期,合并完
! w$ F4 M( u# v5 `成后OceanBase集群收到的用户请求总是由少到多(早上7点之前请求很少,9点以后2 _( A! T+ ~# K% x7 v/ U3 A+ W- I# N
请求逐步增多),能够很自然地实现被动缓存预热。由于ChunkServer在主动缓存预& E* h3 q' o6 |; w. t9 F& Y, \9 Z: X
热期间需要占用两倍的内存,因此,目前的线上版本放弃了这种方式,转而采用被 O U- W9 _7 A: h) X
动缓存预热。0 u2 N, q1 }1 j( c
9.4.4 IO实现* x3 ^$ [ E5 z, H m4 i
OceanBase没有使用操作系统本身的页面缓存(page cache)机制,而是自己实现0 \" F; |& ^3 G
缓存。相应地,IO也采用Direct IO实现,并且支持磁盘IO与CPU计算并行化。/ W, I: l0 I6 k$ M' Y
ChunkServer采用Linux的Libaio [1] 实现异步IO,并通过双缓冲区机制实现磁盘预读& L: z" F9 G; @
与CPU处理并行化,实现步骤如下:% X1 E2 N8 Q' Q$ O! k7 Y1 s
1)分配当前(current)以及预读(ahead)两个缓冲区;
! y# ]9 U" t/ T8 @) W! \2)使用当前缓冲区读取数据,当前缓冲区通过Libaio发起异步读取请求,接着- d7 b) V( M* l- |; T6 K- P2 Q+ O
等待异步读取完成;# t1 W9 A6 X \) @2 O
3)异步读取完成后,将当前缓冲区返回上层执行CPU计算,同时,原来的预读2 o5 E% O# y7 l' @/ H- {5 a
缓冲区变为新的当前缓冲区,发送异步读取请求将数据读取到新的当前缓冲区。- A- y( x/ Y& G& c& W+ T
CPU计算完成后,原来的当前缓冲区变为空闲,成为新的预读缓冲区,用于下一次
) E# s; ]+ I1 v+ V预读。( F2 X( f9 g% l) ?2 l
4)重复步骤3),直到所有数据全部读完。
. u* @. y* x y4 W" X2 m例9-5 假设需要读取的数据范围为(1,150],分三次读取:(1,50],(50,5 p9 s" `' S6 L8 L" j0 V
100],(100,150],当前和预读缓冲区分别记为A和B。实现步骤如下:; t2 q; t; \8 J1 E7 ]& `, Q
1)发送异步请求将(1,50]读取到缓冲区A,等待读取完成;
. w1 @. V! k1 O0 H5 ]/ j4 L2)对缓冲区A执行CPU计算,发送异步请求,将(50,100]读取到缓冲区B;
6 h( }# ^8 M* z4 S2 F* X( A4 ]; q3)如果CPU计算先于磁盘读取完成,那么,缓冲区A变为空闲,等到(50,+ v1 r9 g' V* U# `3 p, l2 t9 [
100]读取完成后将缓冲区B返回上层执行CPU计算,同时,发送异步请求,将 I' }( ~( ` V
(100,150]读取到缓冲区A;
( k$ z. z( a) d4)如果磁盘读取先于CPU计算完成,那么,首先等待缓冲区A上的CPU计算完3 M5 ?. q& @1 N* F2 J
成,接着,将缓冲区B返回上层执行CPU计算,同时,发送异步请求,将(100,
) I, o+ K1 r2 D' |/ I& f150]读取到缓冲区A;
7 o+ i: ]/ ^# F) p2 x5)等待(100,150]读取完成后,将缓冲区A返回给上层执行CPU计算。
$ e3 T5 z5 |8 \5 J9 G9 B' P3 U双缓冲区广泛用于生产者/消费者模型,ChunkServer中使用了双缓冲区异步预读
4 i; Q1 R7 W q" G+ q: z$ @9 T的技术,生产者为磁盘,消费者为CPU,磁盘中生产的原始数据需要给CPU计算消费- C. d" V) t7 v& \( q& R
掉。
* j# n: c, _# c3 y# ? V所谓“双缓冲区”,顾名思义就是两个缓冲区(简称A和B)。这两个缓冲区,总
- @, q( m& n2 Z+ r0 N是一个用于生产者,另一个用于消费者。当两个缓冲区都操作完,再进行一次切5 ^6 x: P4 o' X) s
换,先前被生产者写入的被消费者读取,先前消费者读取的转为生产者写入。为了( s- x) C* H1 m0 o' z- c( y
做到不冲突,给每个缓冲区分配一把互斥锁(简称La和Lb)。生产者或者消费者如
# Z7 o7 a( y, [# z* ~果要操作某个缓冲区,必须先拥有对应的互斥锁。
4 ?5 x" S, H* Y3 m& l% f& s双缓冲区包括如下几种状态:, F5 w* U+ Q2 a
●双缓冲区都在使用的状态(并发读写)。大多数情况下,生产者和消费者都处( b3 L- V4 `3 Y# }4 P, `/ x
于并发读写状态。不妨设生产者写入A,消费者读取B。在这种状态下,生产者拥有8 v8 Q' E$ t7 _! _# Z- r4 P
锁La;同样地,消费者拥有锁Lb。由于两个缓冲区都是处于独占状态,因此每次读3 u8 |2 Z' I* ]: c o
写缓冲区中的元素都不需要再进行加锁、解锁操作。这是节约开销的主要来源。
! q; z; ] _& {4 B" g●单个缓冲区空闲状态。由于两个并发实体的速度会有差异,必然会出现一个缓/ J+ D/ r* l. X h r
冲区已经操作完,而另一个尚未操作完。不妨假设生产者快于消费者。在这种情况# C2 D$ l# h6 q# r. Y) w
下,当生产者把A写满的时候,生产者要先释放La(表示它已经不再操作A),然后5 o& R j% O% ?& D0 p" E) Q0 S
尝试获取Lb。由于B还没有被读空,Lb还被消费者持有,所以生产者进入等待; d( j6 ^7 O6 y. m
(wait)状态。6 m' o7 Q- {- t Q3 s
●缓冲区的切换。过了若干时间,消费者终于把B读完。这时候,消费者也要先
0 y9 [$ T" a% e4 @0 d, d L释放Lb,然后尝试获取La。由于La刚才已经被生产者释放,所以消费者能立即拥有
+ u" J7 W9 P* K \8 y/ oLa并开始读取A的数据。而由于Lb被消费者释放,所以刚才等待的生产者会苏醒过来
/ T3 J) n) m2 z( u5 y(wakeup)并拥有Lb,然后生产者继续往B写入数据。1 E( b/ J! |0 R m
[1]Oracle公司实现的Linux异步IO库,开源地址:https://oss.oracle.com/projects/libaio-
7 b2 T c7 i7 [8 F- l- Foracle/, D+ K7 X2 Y9 `! |
9.4.5 定期合并&数据分发, W& W2 [. S' o! u9 m, N
RootServer将UpdateServer上的版本变化信息通知ChunkServer后,ChunkServer将
p( \- G5 J2 T& T2 l2 r# \9 Y执行定期合并或者数据分发。0 c. u% A$ W& i- z
如果UpdateServer执行了大版本冻结,ChunkServer将执行定期合并。ChunkServer
s, _1 [8 R8 f4 N& I9 }唤醒若干个定期合并线程(比如10个),每个线程执行如下流程:( I4 O! G: |& V$ g) h1 }6 `! P, E
1)加锁获取下一个需要定期合并的子表;
) \$ A2 Y$ ~2 |/ t8 u' P2)根据子表的主键范围读取UpdateServer中的修改操作;
, A. n0 {2 Y$ O! W: y" W+ E3)将每行数据的基线数据和增量数据合并后,产生新的基线数据,并写入到新
5 m) A* m/ Y- w( B的SSTable中;
$ T# d- R. V# J/ e) R( q5 T4)更改子表索引信息,指向新的SSTable。7 o9 g' r: [( c
等到ChunkServer上所有的子表定期合并都执行完成后,ChunkServer会向
2 z0 [+ j1 d2 V: k4 TRootServer汇报,RootServer会更新RootTable中记录的子表版本信息。定期合并一般
( P; X- Z' Z, ]安排在每天凌晨业务低峰期(凌晨1:00开始)执行一次,因此也称为每日合并。另+ f S/ [9 j8 D4 ?" N
外,定期合并过程中ChunkServer的压力比较大,需要控制合并速度,否则可能影响1 g' }* o" ?0 I# Q+ p9 ^! s
正常的读取服务。
6 y' b" e, K" w' P如果UpdateServer执行了小版本冻结,ChunkServer将执行数据分发。与定期合并7 Q8 T1 D+ r9 Z- n8 ?; B
不同的是,数据分发只是将UpdateServer冻结的数据缓存到ChunkServer,并不会生成9 u+ S0 P5 K& \7 h& f ^1 V
新的SSTable文件。因此,数据分发对ChunkServer造成的压力不大。7 a7 @2 F9 p# U- l4 P) D
数据分发由外部读取请求驱动,当请求ChunkServer上的某个子表时,除了返回
3 a, t2 L* d* |: W" T& B1 R使用者需要的数据外,还会在后台生成这个子表的数据分发任务,这个任务会获取
c% F A# Y+ u/ L) L/ i2 D# r; {2 KUpdateServer中冻结的小版本数据,并缓存在ChunkServer的内存中。如果内存用完,2 A$ ? Z( J! c$ T6 ^
数据分发任务将不再进行。当然,这里可以做一些改进,比如除了将UpdateServer分
+ V0 i. p* q, H' \$ E8 E& O发的数据存放到ChunkServer的内存中,还可以存储到SSD磁盘中。$ V+ ?7 V: g/ N: U
例9-6 假设某台ChunkServer上有一个子表t1,t1的主键范围为(1,10],只有一
i9 t( x* a$ f) g8 t行数据:rowkey=8=>(<2,update,20>,<3,update,30>,<4,update,404 F. E# G3 w) _& b7 Z
>)。UpdateServer的冻结版本有两行更新操作:rowkey=8=>(<2,update,30
. i( x. z: D$ K2 C& }>,<3,up-date,38>)和rowkey=20=>(<4,update,50>)。2 ]1 K2 G/ X7 S, c; L' }! r
●如果是大版本冻结,那么,ChunkServer上的子表t1执行定期合并后结果为:
& \# m: ~3 Q2 ^9 M: Y5 H2 D" N$ Wro-wkey=8=>(<2,update,30>,<3,update,38>,<4,update,40>);) B7 }6 v8 \0 w- n4 e- W, B' j8 |
●如果是小版本冻结,那么,ChunkServer上的子表t1执行数据分发后的结果为:; M) g# R6 Z. W; b' A/ a( ?
rowkey=8=>(<2,update,20>,<3,update,30>,<4,update,40>,<2,
e9 Q0 d; C- h) L% ]! n eupdate,30>,<3,update,38>)。
8 @9 s7 x: K, q% m$ Q' u2 s9.4.6 定期合并限速
) |5 P4 T- v2 V) K$ F4 o) p/ }定期合并期间系统的压力较大,需要控制定期合并的速度,避免影响正常服* |: c6 X, ^0 R& ?2 @* R' T2 {9 `
务。定期合并限速的措施包括如下步骤:
* g$ t. O# K1 S0 P2 q+ g7 o1)ChunkServer:ChunkServer定期合并过程中,每合并完成若干行(默认2000% U# k! S ?8 k* b+ p% L
行)数据,就查看本机的负载(查看Linux系统的Load值)。如果负载过高,一部分+ P& Q' Y' v3 Q5 P0 C& w! S7 |
定期合并线程转入休眠状态;如果负载过低,唤醒更多的定期合并线程。另外,
% [. G9 Y" q0 NRootServer将UpdateServer冻结的大版本通知所有的ChunkServer,每台ChunkServer会1 \3 h+ r1 X8 i$ w! D! c/ r; U& Q r
随机等待一段时间再开始执行定期合并,防止所有的ChunkServer同时将大量的请求
2 H7 a l+ T# s* y发给UpdateServer。
( k7 L7 j! _: M( U$ ]8 q$ f8 P2)UpdateServer:定期合并过程中ChunkServer需要从UpdateServer读取大量的数
! Q! A$ A+ R# n5 }% G' }6 ?- N据,为了防止定期合并任务用满带宽而阻塞用户的正常请求,UpdateServer将任务区
& V; \# X3 |. C% R1 _6 W分为高优先级(用户正常请求)和低优先级(定期合并任务),并单独统计每种任
* ^0 ]5 C5 G; L9 [' S! _务的输出带宽。如果低优先级任务的输出带宽超过上限,降低低优先级任务的处理, K, H0 d( c" |/ O; |/ E
速度;反之,适当提高低优先级任务的处理速度。
' O- h5 c% |$ C8 L- L0 W0 y如果OceanBase部署了两个集群,还能够支持主备集群在不同时间段进行“错峰
7 b; R3 c! W4 d/ C合并”:一个集群执行定期合并时,把全部或大部分读写流量切到另一个集群,该集
6 D, C" U: n/ V) o群合并完成后,把全部或大部分流量切回,以便另一个集群接着进行定期合并。两' k. u. J* f) s: S+ N" J- }4 q3 T0 \: Q
个集群都合并完成后,恢复正常的流量分配。
. o( t! V" r' q, R
, t$ `0 c$ [5 T! ^4 a3 p9 J) k6 d7 D5 a* u
|
|