|
9.4 ChunkServer实现机制
7 J, W( w( W4 U1 G$ J/ d, DChunkServer用于存储基线数据,它由如下基本部分组成:6 v( I" o* p& K8 V F" @# }
●管理子表,主动实现子表分裂,配合RootServer实现子表迁移、删除、合并;
5 \' T9 [3 R+ O' B1 v0 w0 ~4 g9 z●SSTable,根据主键有序存储每个子表的基线数据;
- ^/ F) K9 U7 I$ X' v+ |9 F; [3 Z# k●基于LRU实现块缓存(Block cache)以及行缓存(Row cache);8 q! o) P( b& F# [; ^( ?# c' Q4 J
●实现Direct IO,磁盘IO与CPU计算并行化;
6 E, f: u# c# s. W7 o# C( z, a$ B●通过定期合并&数据分发获取UpdateServer的冻结数据,从而分散到整个集# N5 s. s6 x( z; L2 B$ s2 I3 L# z
群。
3 f4 D t4 V- j: r, r) p每台ChunkServer服务着几千到几万个子表的基线数据,每个子表由若干个
& K1 J9 C% G$ X1 a7 eSSTable组成(一般为1个)。下面从SSTable开始介绍ChunkServer的内部实现。, g( _/ ?2 X% Y4 P
9.4.1 子表管理
& K$ R! P0 Q: j8 X0 e/ F8 v. ~每台ChunkServer服务于多个子表,子表的个数一般在10000~100000之间。
{3 o+ s) E/ B& r# b) h) ~. tChunk-Server内部通过ObMultiVersionTabletImage来存储每个子表的索引信息,包括数
0 k4 I. V; `: s据行数(row_count),数据量(occupy_size),校验和(check_sum),包含的1 Q) @8 E; R( e( g0 F$ a3 l
SSTable列表,所在磁盘编号(disk_no)等,代码如下:5 p T( {) I z: T: {
class ObMultiVersionTabletImage4 f; q+ n v. E7 p
{
9 F' e1 _1 _$ upublic:0 Z( I2 @, j' m
//获取第一个包含指定数据范围的子表- s- X# V9 ~: }, o
//@param[in]range数据范围+ }$ _% A- d+ r8 d+ D/ S$ r; Q
//@param[in]scan_direction正向扫描(默认)还是逆向扫描* s, V% |1 E% W
//@param[in]version子表的版本号 S2 m2 |- N7 O
//@param[out]tablet获取的子表索引结构 U, k4 `0 z0 u$ y
int acquire_tablet(const ObNewRange&range,const ScanDirection& d: t9 }$ Q$ O1 W
scan_direction,const int64_t version,ObTablet*&tablet)const;
5 m1 N: W0 v0 o; }& E//释放一个子表, U2 Q; t/ y$ l+ m; Q* A
int release_tablet(ObTablet*tablet);0 p$ x7 n$ ^7 q* e$ b# H
//新增一个子表,load_sstable表示是否立即加载其中的SSTable文件6 E- m: Q9 D1 v& V; S$ o: b3 s7 L
int add_tablet(ObTablet*tablet,const bool load_sstable=false);' p& W3 d# q4 d+ B0 q7 d
//每日合并后升级子表到新版本,load_sstable表示是否立即加载新版本的SSTable文件
) K; M( L$ f" Q: }, J. g" c. Sint upgrade_tablet(ObTablet*old_tablet,ObTablet*new_tablet,const bool
9 | \- J$ \) Lload_sstable=false);! E; H2 d$ c0 W. b
//每日合并后升级子表到新版本,且子表发生分裂,有一个变成多个。load_sstable表示是否立即加载" h( H) n' R' Y! z p$ m
分裂后的SSTable文件
" }2 n0 z1 [4 t% A$ G6 t/ Sint upgrade_tablet(ObTablet*old_tablet,ObTablet*new_tablets[],const int32_t
& T9 X3 o4 H! m% w; ~split_size,const bool load_sstable=false);4 u& Q f4 L( ]' Y
//删除一个指定数据范围和版本的子表
. M+ ~' [8 `& d, o$ M* wint remove_tablet(const ObNewRange&range,const int64_t version);
6 `. J) T/ e# q1 p3 l( l; S; |//删除一个表格对应的所有子表
1 p- U$ T+ t N: t5 V2 aint delete_table(const uint64_t table_id);
: K6 c2 S& @# j5 e//获取下一批需要进行每日合并的子表, m9 x7 U4 t, l
//@param[in]version子表的版本号+ \9 K% ~ W0 P y
//@param[out]size下一批需要进行每日合并的子表个数8 g9 F/ o/ m( ]" X
//@param[out]tablets下一批需要进行每日合并的子表索引结构
+ x* i- H1 v0 L2 @9 Z: Bint get_tablets_for_merge(const int64_t version,int64_t&size,ObTablet*&
/ ~& e" M/ y* a( A5 d+ ttablets[])const;
& V9 }7 p) I/ ]6 ? ?3 ` C) v};
1 i5 F2 z; [6 R, G7 {2 vChunkServer维护了多个版本的子表数据,每日合并后升级子表的版本号。如果
8 y# k2 T) L+ i2 J子表发生分裂,每日合并后将由一个子表变成多个子表。子表相关的操作方法包
' l" O$ s! y# C& v2 s5 x括:
& u) H- |3 j) r7 J0 [$ b1)add_tablet:新增一个子表。如果load_sstable参数为true,那么,立即加载其
% B2 A/ w1 `0 H& T中的SSTable文件。否则,使用延迟加载策略,即读取子表时再加载其中的SSTable。2 l, q; O( K0 n" z5 k
2)remove_tablet:删除一个子表。RootServer发现某个子表的副本数过多,则会
4 n7 z- x0 \# e' G6 f( J通知其中某台ChunkServer删除指定的子表。* ]+ f. N8 m" \. h' O$ g0 p
3)delete_table:删除表格。用户执行删除表格命令时,RootServer会通知每台4 J* k5 R0 S* U5 `+ R7 s3 x
ChunkServer删除表格包含的所有子表。
8 J( s8 Z( ^1 O4 m4)upgrade_tablet:每日合并后升级子表的版本号。如果没有发生分裂,只需要
5 j" f, G" `2 m! p$ N# u5 x9 T将老子表的版本号加1;否则,将老子表替换为多个范围连续的新子表,每个新子表2 R$ R) o: [# L+ N: Q
的版本号均为老子表的版本号加1。
& v* k4 g0 N& C3 ^: B* g* @/ S6 d5)acquire_tablet/release_tablet:读取时首先调用acquire_tablet获取一个子表,增
, _' i2 W N0 q3 q! s: Y加该子表的引用计数从而防止它在读取过程中被释放掉,接着读取其中的SSTable,% M# J* k% _9 u$ Y
最后调用release_tablet释放子表。
, }$ R- X, B4 w& w' T0 P6)get_tablets_for_merge:每日合并时通过调用该函数获取下一批需要进行每日' O& o% v# E7 Y/ `# t
合并的子表。2 }# S. ^3 Y8 l4 a
9.4.2 SSTable! Y; r5 C9 Z' d. \ W, f: x
如图9-8所示,SSTable中的数据按主键排序后存放在连续的数据块(Block)
% ]! O3 S2 F8 K中,Block之间也有序。接着,存放数据块索引(Block Index),由每个Block最后一
3 M, Z& e5 D: j/ ?行的主键(End Key)组成,用于数据查询中的Block定位。接着,存放布隆过滤器
/ b1 p% N+ Z, T2 U3 J- z(Bloom Filter)和表格的Schema信息。最后,存放固定大小的Trailer以及Trailer的偏
& J. P, Z' R- O移位置。( Q) c* y. h. r' a0 k" ?
图 9-8 SSTable格式
# q; ]! }$ ^3 H# v* C. O查找SSTable时,首先从子表的索引信息中读取SSTable Trailer的偏移位置,接着
) v3 }8 r- ~7 u2 J8 b7 t4 n获取Trailer信息。根据Trailer中记录的信息,可以获取块索引的大小和偏移,从而将
) w& S0 k& l; H# X整个块索引加载到内存中。根据块索引记录的每个Block的最后一行的主键,可以通+ |! [1 X; u' ], j f
过二分查找定位到查找的Block。最后将Block加载到内存中,通过二分查找Block中& e( `8 m' }/ e( b1 n
记录的行索引(Row Index)查找到具体某一行。本质上看,SSTable是一个两级索引; c2 L8 j+ {; b7 b$ `6 l4 ?
结构:块索引以及行索引;而整个ChunkServer是一个三级索引结构:子表索引、块
2 [! ] p: a/ `9 y索引以及行索引。
) r2 ]* i, A$ [% W9 ~8 Z; z/ `SSTable分为两种格式:稀疏格式以及稠密格式。对于稀疏格式,某些列可能存& x( h" S R6 w' V6 s3 P
在,也可能不存在,因此,每一行只存储包含实际值的列,每一列存储的内容为:
( q" n8 o4 B. `0 W Y% r<列ID,列值>(<Column ID,Column Value>);而稠密格式中每一行都需要存储. H8 S- Y. {1 f% a+ B8 | k E" N ~
所有列,每一列只需要存储列值,不需要存储列ID,这是因为列ID可以从表格
/ w: w) X+ R& QSchema中获取。/ v/ D8 M8 U/ ]$ L+ f" G) S2 A
例9-4 假设有一张表格包含10列,列ID为1~10,表格中有一行的数据内容+ B6 j- Q: W' n3 l" s( f% j' L
为:% a h. U1 G# {; q
那么,如果采用稀疏格式存储,内容为:<2,20>,<3,30>,<5,50>,
7 k# D6 I; u; R3 h$ \ ^% g<7,70>,<8,80>;如果采用稠密格式存储,内容为:null,20,30,null,
: p* k& {6 P1 R2 i4 h50,null,70,80,null,null。+ ?+ {* v, C- j, o: m; |
ChunkServer中的SSTable为稠密格式,而UpdateServer中的SSTable为稀疏格式,' N7 R# @# C! p2 A; S
且存储了多张表格的数据。另外,SSTable支持列组(Column Group),将同一个列
9 Q7 D! k7 |/ }! A2 Z组下的多个列的内容存储在一块。列组是一种行列混合存储模式,将每一行的所有
# @# E+ j5 u2 n, |列分成多个组(称为列组),每个列组内部按行存储。
9 A! J# z, l% @$ g& Q如图9-9所示,当一个SSTable中包含多个表格/列组时,数据按照[表格ID,列组) u" S: W" x% w
ID,行主键]([table_id,column group id,row_key])的形式有序存储。, ~9 O6 P- b: W, n
图 9-9 SSTable包含多个表格/列组3 p9 B' Q5 W0 K# |: |
另外,SSTable支持压缩功能,压缩以Block为单位。每个Block写入磁盘之前调
: Z% r! ?! v# {6 }( c3 N: O3 `$ v用压缩算法执行压缩,读取时需要解压缩。用户可以自定义SSTable的压缩算法,目/ [) y& x9 {6 z" i* X* N
前支持的算法包括LZO以及Snappy。
2 Y6 C# G" U) C- i; R, R2 I. N$ jSSTable的操作接口分为写入和读取两个部分,其中,写入类为- e1 L) [; a. W
ObSSTableWriter,读取类为ObSSTableGetter(随机读取)和ObSSTableScanner(范围
0 d( Y, o0 `0 K* ]查询)。代码如下:
: s A* k( ~! U% Kclass ObSSTableWriter
6 h$ D; f7 [) v! ]0 N8 f* G4 ^3 b I{( C5 _4 ]7 @$ P! s) p
public:, {# G9 k3 V$ ~2 x) W
//创建SSTable
5 D4 G9 ?, ^) r6 e% `; d' r: s//@param[in]schema表格schema信息% e: @; @6 Z: u+ w7 J7 W
//@param[in]path SSTable在磁盘中的路径名
$ k ]# ~6 X, {0 Q( ^//@param[in]compressor_name压缩算法名- {, B. ?! }0 s! X
//@param[in]store_type SSTable格式,稀疏格式或者稠密格式
% g* Y" s$ l |$ T z//@param[in]block_size块大小,默认64KB1 w. R. F+ ]: ?3 v( Y4 `$ b3 T& @
int create_sstable(const ObSSTableSchema&schema,const ObString&path,const I6 }& X9 w/ o4 P
ObString&compressor_name,const int store_type,const int64_t block_size);4 q. a+ Y, |9 o$ Z% ?
//往SSTable中追加一行数据" I y4 R5 h* x5 t( K, M
//@param[in]row一行SSTable数据
! s% e2 l8 z( i z7 w" f9 Y//@param[out]space_usage追加完这一行后SSTable大致占用的磁盘空间
: v, G' E6 B& f6 Uint append_row(const ObSSTableRow&row,int64_t&space_usage);/ B) S% S8 h( Q0 ~. M- T9 q6 Y
//关闭SSTable,将往磁盘中写入Block Index,Bloom Filter,Schema,Trailer等信息, g3 R8 g% [- r5 k7 F+ p+ t
//@param[out]trailer_offset返回SSTable的Trailer偏移量' {! O$ @! ^: `; I
int close_sstable(int64_t&trailer_offset);7 A8 z9 n+ q3 b; i, {4 {
};, X; j+ [0 F* k% g0 [7 M+ f
定期合并&数据分发过程将产生新的SSTable,步骤如下:
. a1 q: a( f6 ], Y6 @, a5 S: x1)调用create_sstable函数创建一个新的SSTable;
' l4 ~" k" @3 \% e+ b M5 }2)不断调用append_row函数往SSTable中追加一行行数据;. w) o8 U) G/ K5 L; V7 m" J
3)调用close_sstable完成SSTable写入。
2 L1 D P0 v* C L0 p与9.2.1节中的MemTableIterator一样,ObSSTableGetter和ObSSTableScanner实现了
( J1 e3 z' o6 T4 e6 L迭代器接口,通过它可以不断地获取SSTable的下一个cell。
& d2 W) ~2 b) r% l, @& g/ P# D4 Eclass ObIterator
* c! e5 e* Y' d8 E5 i1 s: \{% b! Z& m( l; s& L3 Q! k
public:; R# E ]: V9 m3 N' F. f9 s- L( K& G" s
//迭代器移动到下一个cell2 S9 B4 U H9 m6 C6 @
int next_cell();
: e# N6 R) I+ ]. S) H//获取当前cell的内容 G6 _: T3 J4 D* l; K5 h! `2 r
//@param[out]cell_info当前cell的内容,包括表名(table_id),行主键(row_key),列编号
, b& W+ F/ [' C, t(column_id)以及列值(column_value)4 B: z" f# Z1 X1 @( {
int get_cell(ObCellInfo**cell_info);
: x, t- V0 Y8 D8 G) H2 l. d: N/ H//获取当前cell的内容' ]. B' S/ [8 I8 v
//@param[out]cell_info当前cell的内容
Y& W7 ~, c6 v1 O' X% Y3 P- ~4 _//@param is_row_changed是否迭代到下一行$ p' U$ X/ _: T2 n, S: I2 d
int get_cell(ObCellInfo**cell_info,bool*is_row_changed);
. ? d6 M% u. v7 O};
0 q+ D5 ^5 g. Q$ G. oOceanBase读取的数据可能来源于MemTable,也可能来源于SSTable,或者是合, a; N2 J- }, Y) N2 I: d4 Z5 ^
并多个MemTable和多个SSTable生成的结果。无论底层数据来源如何变化,上层的读0 e. X* P; K) A) J7 ^0 a
取接口总是ObIterator。- d! b5 Q0 _/ y' t
9.4.3 缓存实现
: E) X: x) V7 FChunkServer中包含三种缓存:块缓存(Block Cache)、行缓存(Row Cache)以6 [1 W1 V1 x9 S
及块索引缓存(Block Index Cache)。其中,块缓存中存储了SSTable中访问较热的数
' [! _; o! H. h" C3 Q据块(Block),行缓存中存储了SSTable中访问较热的数据行(Row),而块索引缓
, \9 T; G6 F2 o; N* B8 \2 k% m存中存储了最近访问过的SSTable的块索引(Block Index)。一般来说,块索引不会
" N* b B* @0 V+ S' g U太大,ChunkServer中所有SSTable的块索引都是常驻内存的。不同缓存的底层采用相
6 f* O( y1 p: m8 `* J& M1 i: u$ v同的实现方式。
e+ K5 p; [# [' A: }1.底层实现& A' G# v2 `3 o. X- Q2 [/ `
经典的LRU缓存实现包含两个部分:哈希表和LRU链表,其中,哈希表用于查找
; D: L8 z+ l& ?( f1 Q& P缓存中的元素,LRU链表用于淘汰。每次访问LRU缓存时,需要将被访问的元素移动' a3 L2 @/ ~" |0 D: V2 D
到LRU链表的头部,从而避免被很快淘汰,这个过程需要锁住LRU链表。
& L. O' B; @ d0 Y如图9-10所示,块缓存和行缓存底层都是一个Key-Value Cache,实现步骤如下:
; k. b- G, h- @4 B; [图 9-10 Key-Value Cache的实现3 B; t9 X* V) |8 P6 x! F9 u2 x+ x
1)OceanBase一次分配1MB的连续内存块(称为memblock),每个memblock包
/ E' }! W$ F2 B# w: k: f) A; J含若干缓存项(item)。添加item时,只需要简单地将item追加到memblock的尾部;
; y- C9 e( e" f& w另外,缓存淘汰以memblock为单位,而不是以item为单位。
5 j h# n/ P) i/ V1 M8 K" z5 k2)OceanBase没有维护LRU链表,而是对每个memblock都维护了访问次数和最
$ y+ I& k$ E+ U8 t2 F$ q0 _近频繁访问时间。访问memblock中的item时将增加memblock的访问次数,如果最近一0 J- V( j0 ~4 K8 L; M
段时间之内的访问次数超过一定值,那么,更新最近频繁访问时间;淘汰memblock
/ ^9 m3 i6 y* S+ S- r时,对所有的memblock按照最近频繁访问时间排序,淘汰最近一段时间访问较少的) j, A8 m% {' y7 [% u
memblock。可以看出,读取时只需要更新memblock的访问次数和最近频繁访问时
6 b1 X6 v' Q) P间,不需要移动LRU链表。这种实现方式通过牺牲LRU算法的精确性,来规避LRU链
. l, ]9 [* E) Y! U$ b- _& |3 d( ^表的全局锁冲突。 G9 I1 P/ H* _8 u
3)每个memblock维护了引用计数,读取缓存项时所在memblock的引用计数加; \ L1 d1 E7 O0 y- {
1,淘汰memblock时引用计数减1,引用计数为0时memblock可以回收重用。通过引用
! U1 `' G( r% i7 `! C, M计数,实现读取memblock中的缓存项不加锁。, ^& e1 H. k" w5 f# v
2.惊群效应4 E: i3 B+ C0 d& w
以行缓存为例,假设ChunkServer中有一个热点行,ChunkServer中的N个工作线3 o6 }- I7 R& C8 Y7 I$ R5 ~' s4 i
程(假设为N=50)同时发现这一行的缓存失效,于是,所有工作线程同时读取这行
; h, ~/ v: U* @6 g数据并更新行缓存。可以看出,N-1共49个线程不仅做了无用功,还增加了锁冲突。6 [6 X( Y7 ~/ B$ s4 K+ F( t
这种现象称为“惊群效应”。为了解决这个问题,第一个线程发现行缓存失效时会往0 }2 Z* M; s9 |3 ]: i5 [, p( T/ j+ I
缓存中加入一个fake标记,其他线程发现这个标记后会等待一段时间,直到第一个线
/ m6 `: W' C2 a9 W* Z4 C程从SSTable中读到这行数据并加入到行缓存后,再从行缓存中读取。
# s. Z; E$ T2 K算法描述如下:
- C. b4 B) ]/ I1 u* K* z$ ?调用internal_get读取一行数据;$ i$ {( a+ c7 ~
if(行不存在){
+ s$ r$ E: B- @" t1 T调用internal_set往缓存中加入一个fake标记;
& ^, l( l6 M+ y从SSTable中读取数据行;
$ u, K' x5 k& o* {) l. s0 k将SSTable中读到的行内容加入缓存,清除fake标记,唤醒等待线程;
% {, f: I {5 L3 G/ ^; w返回读到的数据行;7 N. Y2 J1 Z& R2 J: N
}else if(行存在且为fake标记)
/ r. h+ k: j( ]) o{+ I# U/ @/ {8 [; C- p% _
线程等待,直到清除fake标记;
, B6 U9 q; y( w9 s" |if(等待成功)返回行缓存中的数据;' p3 Q- |- j8 r) n0 Q
if(等待超时)返回读取超时;
# d9 t7 p X$ y% p& y}
3 n7 _4 V6 P9 i. ]4 I; s3 Helse7 h- w0 l7 N; d( e U& i O
{0 r8 O, q2 V) _5 _. O9 O
返回行缓存中的数据;1 I! [1 L' |7 ]9 }# e& T8 J5 C
}; p% T( s& W, D, x' T
3.缓存预热
0 |- m% X% Q% v. i7 OChunkServer定期合并后需要使用生成的新的SSTable提供服务,如果大量请求同
7 F9 f8 N0 o2 D) T& K& m4 _1 e. H0 b时读取新的SSTable文件,将使得ChunkServer的服务能力在切换SSTable瞬间大幅下+ K5 P7 P6 \7 {/ s
降。因此,这里需要一个缓存预热的过程。OceanBase最初的版本实现了主动缓存预% o2 a _7 Z, d C+ |
热,即:扫描原来的缓存,根据每个缓存项的key读取新的SSTable并将结果加入到新
- c" c0 ?6 B4 r5 I* Q的缓存中。例如,原来缓存数据项的主键分别为100、200、500,那么只需要从新的
: c0 A: O j2 T$ \SSTable中读取主键为100、200、500的数据并加入新的缓存。扫描完成后,原来的缓
" r- R4 L, o5 K/ d; i存可以丢弃。8 T5 }# i3 D, A: |3 h: ?6 S5 u+ \
线上运行一段时间后发现,定期合并基本上都安排在凌晨业务低峰期,合并完( T7 k6 z1 e6 e# M, W
成后OceanBase集群收到的用户请求总是由少到多(早上7点之前请求很少,9点以后* D% @8 t$ C3 t# I+ K
请求逐步增多),能够很自然地实现被动缓存预热。由于ChunkServer在主动缓存预
! ]/ r' u% R; n: V9 T0 |1 y8 E6 V热期间需要占用两倍的内存,因此,目前的线上版本放弃了这种方式,转而采用被/ ? {) t! J3 F8 x1 o8 x2 V
动缓存预热。+ b* y0 w& c# k# N: `0 I: u9 H+ U9 W
9.4.4 IO实现8 E& S Y0 Y- s% H5 h0 l+ r% w
OceanBase没有使用操作系统本身的页面缓存(page cache)机制,而是自己实现
8 i6 w' i2 X* n0 J/ x! P缓存。相应地,IO也采用Direct IO实现,并且支持磁盘IO与CPU计算并行化。8 J$ Z( R3 X1 e' O2 G3 y: K+ ]0 Y
ChunkServer采用Linux的Libaio [1] 实现异步IO,并通过双缓冲区机制实现磁盘预读
+ ?: W; x- k# c与CPU处理并行化,实现步骤如下:
. Q" E% z! p1 K1)分配当前(current)以及预读(ahead)两个缓冲区;
/ r& X+ y1 H& O2)使用当前缓冲区读取数据,当前缓冲区通过Libaio发起异步读取请求,接着
" m4 F. b6 k* ^) y+ v等待异步读取完成;6 { |0 x7 y# [( f2 L
3)异步读取完成后,将当前缓冲区返回上层执行CPU计算,同时,原来的预读
$ V3 A7 `. I5 ?1 U3 W: V缓冲区变为新的当前缓冲区,发送异步读取请求将数据读取到新的当前缓冲区。
" F- r! G5 v+ Q: \: fCPU计算完成后,原来的当前缓冲区变为空闲,成为新的预读缓冲区,用于下一次0 A3 S; G) X1 J) i9 o1 Q. `0 B
预读。
4 \+ I0 o) q9 Q2 g4)重复步骤3),直到所有数据全部读完。" `* G" I% r$ {( J# T
例9-5 假设需要读取的数据范围为(1,150],分三次读取:(1,50],(50,+ H. @& I/ j* D
100],(100,150],当前和预读缓冲区分别记为A和B。实现步骤如下:$ \# [: @4 X0 j: R
1)发送异步请求将(1,50]读取到缓冲区A,等待读取完成;
! j0 I% X+ E4 V( H) Z$ O2)对缓冲区A执行CPU计算,发送异步请求,将(50,100]读取到缓冲区B;0 J: \; a: n r- X6 U* N8 q) H
3)如果CPU计算先于磁盘读取完成,那么,缓冲区A变为空闲,等到(50,
; Y6 U% X3 N$ a- W, r! m100]读取完成后将缓冲区B返回上层执行CPU计算,同时,发送异步请求,将
) H8 W3 N+ c7 \8 \( j! R4 ~; ~(100,150]读取到缓冲区A;
3 I3 a, L ]) b6 G; N H4)如果磁盘读取先于CPU计算完成,那么,首先等待缓冲区A上的CPU计算完2 u5 v$ S6 \3 H) L% C; C
成,接着,将缓冲区B返回上层执行CPU计算,同时,发送异步请求,将(100," b8 C- X: X8 h7 W: g9 a
150]读取到缓冲区A;8 C4 D; e! }8 I+ R% `& u7 T: c
5)等待(100,150]读取完成后,将缓冲区A返回给上层执行CPU计算。
+ q( m) T U/ J( M M6 U) r双缓冲区广泛用于生产者/消费者模型,ChunkServer中使用了双缓冲区异步预读4 c7 ?1 D9 G/ I! L! C) v' H
的技术,生产者为磁盘,消费者为CPU,磁盘中生产的原始数据需要给CPU计算消费
8 Y: W b5 y, V掉。' t2 K2 ~9 h" R: f5 Q3 ?
所谓“双缓冲区”,顾名思义就是两个缓冲区(简称A和B)。这两个缓冲区,总& n% ?' u" M. ]% P
是一个用于生产者,另一个用于消费者。当两个缓冲区都操作完,再进行一次切
2 p! }# i9 b7 L# J换,先前被生产者写入的被消费者读取,先前消费者读取的转为生产者写入。为了" A* f3 f' C& ~( {
做到不冲突,给每个缓冲区分配一把互斥锁(简称La和Lb)。生产者或者消费者如
6 |0 E, `/ g+ O- A果要操作某个缓冲区,必须先拥有对应的互斥锁。
2 I% d& w! k/ @0 ]' _% d& v- @* r双缓冲区包括如下几种状态:- e2 y" \$ C$ x1 P7 j, r' O+ J
●双缓冲区都在使用的状态(并发读写)。大多数情况下,生产者和消费者都处
4 ~1 f0 E8 \* B- ?' j3 g4 _于并发读写状态。不妨设生产者写入A,消费者读取B。在这种状态下,生产者拥有! r# B- |1 q& T
锁La;同样地,消费者拥有锁Lb。由于两个缓冲区都是处于独占状态,因此每次读- ?- I. S$ ]" B% u# `6 ~. q/ x, v
写缓冲区中的元素都不需要再进行加锁、解锁操作。这是节约开销的主要来源。9 ]/ S) [0 }9 d; ]! X/ z
●单个缓冲区空闲状态。由于两个并发实体的速度会有差异,必然会出现一个缓& v T6 F6 }! B9 X# p
冲区已经操作完,而另一个尚未操作完。不妨假设生产者快于消费者。在这种情况; Q; ?) w. q! t8 @0 Q
下,当生产者把A写满的时候,生产者要先释放La(表示它已经不再操作A),然后, @5 D! J+ u9 k/ u" |8 [& z
尝试获取Lb。由于B还没有被读空,Lb还被消费者持有,所以生产者进入等待. _/ B3 [" {3 I6 P8 [$ _0 i
(wait)状态。5 t S- R d7 c1 m5 s7 E" K
●缓冲区的切换。过了若干时间,消费者终于把B读完。这时候,消费者也要先! ^6 @2 K3 K6 t" `+ V
释放Lb,然后尝试获取La。由于La刚才已经被生产者释放,所以消费者能立即拥有8 J5 A7 i5 ]6 E) t, G
La并开始读取A的数据。而由于Lb被消费者释放,所以刚才等待的生产者会苏醒过来; X& L6 B# k7 _( @ q6 z! t- g
(wakeup)并拥有Lb,然后生产者继续往B写入数据。
2 H% [+ [$ ?! M# D[1]Oracle公司实现的Linux异步IO库,开源地址:https://oss.oracle.com/projects/libaio-
; i$ D0 u9 q# u( Coracle/, U3 P7 M7 Z" |; @! ?
9.4.5 定期合并&数据分发
- m* N* x' W8 f# m ~RootServer将UpdateServer上的版本变化信息通知ChunkServer后,ChunkServer将
9 ~! I, M/ C& G7 v" ]; w$ Z' D执行定期合并或者数据分发。
5 g: s9 O6 Q% c) b" u2 s如果UpdateServer执行了大版本冻结,ChunkServer将执行定期合并。ChunkServer& a2 A# p& F- X0 F3 C
唤醒若干个定期合并线程(比如10个),每个线程执行如下流程: b9 H) H+ l7 {; f
1)加锁获取下一个需要定期合并的子表;- C6 H2 ~% @4 Y9 r6 i
2)根据子表的主键范围读取UpdateServer中的修改操作;
! o; Q o' n/ v5 y3)将每行数据的基线数据和增量数据合并后,产生新的基线数据,并写入到新
( L. x4 }& _; P f6 ^9 `+ `的SSTable中;
* l0 N' ]& K7 @% m+ v: O1 {( N4)更改子表索引信息,指向新的SSTable。2 f6 r6 J) P8 k8 e
等到ChunkServer上所有的子表定期合并都执行完成后,ChunkServer会向
* `4 O) ~, J; b0 C/ I4 i9 gRootServer汇报,RootServer会更新RootTable中记录的子表版本信息。定期合并一般: F3 w7 G: ^6 e) j! t# ]. B0 s
安排在每天凌晨业务低峰期(凌晨1:00开始)执行一次,因此也称为每日合并。另$ }/ U, N. b/ y: w0 t( p8 o- l$ c
外,定期合并过程中ChunkServer的压力比较大,需要控制合并速度,否则可能影响. p2 M5 }4 P5 U$ r
正常的读取服务。
3 a( l w$ r% `7 z( r8 u如果UpdateServer执行了小版本冻结,ChunkServer将执行数据分发。与定期合并
9 G0 g" o4 {' r9 w不同的是,数据分发只是将UpdateServer冻结的数据缓存到ChunkServer,并不会生成
% @& e# K" E: D' ?3 `$ O: {新的SSTable文件。因此,数据分发对ChunkServer造成的压力不大。
" Y4 ?9 r" k4 c8 q) C数据分发由外部读取请求驱动,当请求ChunkServer上的某个子表时,除了返回* d7 A- P* i, G/ D% l* i
使用者需要的数据外,还会在后台生成这个子表的数据分发任务,这个任务会获取8 m" b/ ^5 m8 r d; r I1 y0 O* X
UpdateServer中冻结的小版本数据,并缓存在ChunkServer的内存中。如果内存用完,
2 t3 z2 a& Y. o' C+ i% l1 Y数据分发任务将不再进行。当然,这里可以做一些改进,比如除了将UpdateServer分
2 ]4 K: {8 R5 J: G. J$ |: @2 K发的数据存放到ChunkServer的内存中,还可以存储到SSD磁盘中。' d6 ~& k+ K, Q( F4 e
例9-6 假设某台ChunkServer上有一个子表t1,t1的主键范围为(1,10],只有一 R' h- i6 w" E, x2 \9 e
行数据:rowkey=8=>(<2,update,20>,<3,update,30>,<4,update,40) H, ^ G$ Q, \
>)。UpdateServer的冻结版本有两行更新操作:rowkey=8=>(<2,update,30$ Z' P: I9 Y5 I1 M- u0 D
>,<3,up-date,38>)和rowkey=20=>(<4,update,50>)。
4 R9 m& ~8 b# T) {●如果是大版本冻结,那么,ChunkServer上的子表t1执行定期合并后结果为:
3 V$ v% A4 H6 v. | `ro-wkey=8=>(<2,update,30>,<3,update,38>,<4,update,40>);
; D7 V1 e4 X7 N" b7 t" ^8 R●如果是小版本冻结,那么,ChunkServer上的子表t1执行数据分发后的结果为:: w6 [3 v2 G/ Q/ h! r1 N
rowkey=8=>(<2,update,20>,<3,update,30>,<4,update,40>,<2,7 x; e0 z5 Y8 Q* b' a6 ~
update,30>,<3,update,38>)。2 ^- i2 H0 U0 k
9.4.6 定期合并限速6 [. I& r3 B) z& ^8 W7 d# G
定期合并期间系统的压力较大,需要控制定期合并的速度,避免影响正常服
& c( d2 s% ~- F2 x+ F务。定期合并限速的措施包括如下步骤:$ c/ j. @; Q a$ Y; b" S$ }
1)ChunkServer:ChunkServer定期合并过程中,每合并完成若干行(默认2000
4 X/ E4 }9 ?/ s8 O" M' ]行)数据,就查看本机的负载(查看Linux系统的Load值)。如果负载过高,一部分
2 L: p5 a+ j0 n, B5 S4 l) p定期合并线程转入休眠状态;如果负载过低,唤醒更多的定期合并线程。另外,/ `+ z4 K1 F5 ?
RootServer将UpdateServer冻结的大版本通知所有的ChunkServer,每台ChunkServer会% d$ k, @0 A4 a* f
随机等待一段时间再开始执行定期合并,防止所有的ChunkServer同时将大量的请求
+ f2 R7 o/ m) r8 G" R8 a p发给UpdateServer。$ f- @& Y) |5 y4 k( f* q, V
2)UpdateServer:定期合并过程中ChunkServer需要从UpdateServer读取大量的数2 W7 S5 F8 _( A) Y9 j
据,为了防止定期合并任务用满带宽而阻塞用户的正常请求,UpdateServer将任务区
2 t# g) m% Z4 r, c: ?$ a, J% Z分为高优先级(用户正常请求)和低优先级(定期合并任务),并单独统计每种任
( h& \$ G. \$ g- S务的输出带宽。如果低优先级任务的输出带宽超过上限,降低低优先级任务的处理
4 w# w( Q! Q1 a& }+ a- X! ^速度;反之,适当提高低优先级任务的处理速度。
( @% g- P3 Q1 }8 C( e9 l" B如果OceanBase部署了两个集群,还能够支持主备集群在不同时间段进行“错峰7 f: l; j: F8 V8 Y' j
合并”:一个集群执行定期合并时,把全部或大部分读写流量切到另一个集群,该集; T/ q! {. Y: W) J
群合并完成后,把全部或大部分流量切回,以便另一个集群接着进行定期合并。两
, S W5 r3 l, ?- B9 N# w: ?4 _个集群都合并完成后,恢复正常的流量分配。
8 S3 O; m0 W% {" r2 i& P4 w0 J/ U
i# \( U0 k+ {( r
$ g+ x# N; B8 P. q |
|