|
9.4 ChunkServer实现机制
9 B7 ~. t& @% o4 m. L! jChunkServer用于存储基线数据,它由如下基本部分组成:
% S3 u5 e/ }, |' O●管理子表,主动实现子表分裂,配合RootServer实现子表迁移、删除、合并;
7 p+ T- i/ t' g●SSTable,根据主键有序存储每个子表的基线数据;
3 v1 P% u0 B! x, U3 T/ k' w●基于LRU实现块缓存(Block cache)以及行缓存(Row cache);
2 |/ y6 i9 }1 J! ?1 e, d●实现Direct IO,磁盘IO与CPU计算并行化;5 u1 M7 q6 A" i3 R3 S1 g. \* F& d9 h
●通过定期合并&数据分发获取UpdateServer的冻结数据,从而分散到整个集
2 s, P' P1 K T) O2 R' K群。
. }3 D+ }! I/ F, t1 R& v# W每台ChunkServer服务着几千到几万个子表的基线数据,每个子表由若干个
- q$ U+ q1 t# Q% ESSTable组成(一般为1个)。下面从SSTable开始介绍ChunkServer的内部实现。
) X/ }. R) c3 B1 W' r' q8 E9.4.1 子表管理
3 i: Z9 H- L+ K2 o1 b每台ChunkServer服务于多个子表,子表的个数一般在10000~100000之间。! p; y, O; n7 X- ~9 Y
Chunk-Server内部通过ObMultiVersionTabletImage来存储每个子表的索引信息,包括数
" m, }3 i; Y8 k0 L' j& Q% u" ?据行数(row_count),数据量(occupy_size),校验和(check_sum),包含的
) f6 `4 @1 A! V( @7 QSSTable列表,所在磁盘编号(disk_no)等,代码如下:7 r! ?, p& @! _3 }9 d
class ObMultiVersionTabletImage; D) j% L6 O+ Z
{% ]. V4 M2 J y2 d
public:
, c/ J" _2 b* y% m6 b) t j//获取第一个包含指定数据范围的子表* _$ I( g- [' q- ~( s
//@param[in]range数据范围
8 j% `+ R4 a) k* l7 @/ ]//@param[in]scan_direction正向扫描(默认)还是逆向扫描
) i% o9 h- D5 j# C& q1 @//@param[in]version子表的版本号
% S. C2 i; [" C% e& c//@param[out]tablet获取的子表索引结构7 k$ v% D- v0 w: }6 K- d) D
int acquire_tablet(const ObNewRange&range,const ScanDirection
: K& N' J9 |) J8 H# y2 Hscan_direction,const int64_t version,ObTablet*&tablet)const;" B' G* m! ^0 V, f+ H/ E
//释放一个子表$ w/ J, W$ V) t4 s3 }. s
int release_tablet(ObTablet*tablet);
: r, P* ~7 U! \$ F9 s//新增一个子表,load_sstable表示是否立即加载其中的SSTable文件) r; ~" g) ~+ v: [0 q c' i7 P6 \
int add_tablet(ObTablet*tablet,const bool load_sstable=false);8 [: P1 v! l S) M- `. }; i
//每日合并后升级子表到新版本,load_sstable表示是否立即加载新版本的SSTable文件/ W1 B' ~+ c9 \" w3 D1 t
int upgrade_tablet(ObTablet*old_tablet,ObTablet*new_tablet,const bool+ C0 m. D+ L: Q! W( b5 d) ^2 b
load_sstable=false);
; {, F( X: _8 A2 X- q0 ?//每日合并后升级子表到新版本,且子表发生分裂,有一个变成多个。load_sstable表示是否立即加载
3 n1 B; Z5 F2 y: Z1 L$ Q1 ~! [分裂后的SSTable文件
8 |5 e z5 V6 T2 O8 C. yint upgrade_tablet(ObTablet*old_tablet,ObTablet*new_tablets[],const int32_t' ~# Z/ t: u" z0 `6 h/ Q
split_size,const bool load_sstable=false);
, N7 Y! [ f9 Y, Q( c) J2 o& ]. X//删除一个指定数据范围和版本的子表 X8 |7 y' ]- r( p
int remove_tablet(const ObNewRange&range,const int64_t version);
' _$ n; t+ f7 T. T5 o" n//删除一个表格对应的所有子表: ]4 M1 p3 G; B
int delete_table(const uint64_t table_id);
( W# M) q8 g8 Q' f9 t5 H6 @//获取下一批需要进行每日合并的子表8 N: E0 l% u4 n) t- \. O
//@param[in]version子表的版本号 }8 j, Z+ `# `$ |
//@param[out]size下一批需要进行每日合并的子表个数
/ i6 `' ~6 t$ v6 ]0 d* ^//@param[out]tablets下一批需要进行每日合并的子表索引结构
7 v6 r! K- ]- K j! ^int get_tablets_for_merge(const int64_t version,int64_t&size,ObTablet*&4 B$ J5 V. P8 J. |3 z
tablets[])const;5 U# \+ ^; Y7 g8 B5 ]
};
8 m& @4 ?! z( d0 ^ChunkServer维护了多个版本的子表数据,每日合并后升级子表的版本号。如果
* F+ ?0 F' o7 X$ [! Y5 B: M. I子表发生分裂,每日合并后将由一个子表变成多个子表。子表相关的操作方法包& z! i9 c& v ]1 W
括:, O O& f# v: G1 G' C6 S
1)add_tablet:新增一个子表。如果load_sstable参数为true,那么,立即加载其4 W8 [% M" |3 a; p4 {1 O
中的SSTable文件。否则,使用延迟加载策略,即读取子表时再加载其中的SSTable。
* U4 w) V/ D3 o9 t* x2)remove_tablet:删除一个子表。RootServer发现某个子表的副本数过多,则会
& q; E7 p4 _8 a0 e0 F* ^通知其中某台ChunkServer删除指定的子表。
1 X. c$ w) `4 T; |" X3)delete_table:删除表格。用户执行删除表格命令时,RootServer会通知每台
* x4 [) K3 K6 r# r! {( j! N+ ZChunkServer删除表格包含的所有子表。
` I$ ^1 [+ |9 g/ F3 [# z" _4)upgrade_tablet:每日合并后升级子表的版本号。如果没有发生分裂,只需要( I( t3 t1 u& P" f; z, r
将老子表的版本号加1;否则,将老子表替换为多个范围连续的新子表,每个新子表3 ]. ]$ o' _0 o
的版本号均为老子表的版本号加1。
& A; H/ a; L6 K5)acquire_tablet/release_tablet:读取时首先调用acquire_tablet获取一个子表,增, L) @! t$ S' ?
加该子表的引用计数从而防止它在读取过程中被释放掉,接着读取其中的SSTable,) K# b5 j# D, X! B( n5 n( r( V0 c
最后调用release_tablet释放子表。
8 l! @# g" r0 g5 Q; j, L6)get_tablets_for_merge:每日合并时通过调用该函数获取下一批需要进行每日
- A0 g( v% a: T, N$ Y9 n) ~: N合并的子表。
" @) |- o, p7 ]7 Y h M9.4.2 SSTable
2 ~+ w: v- X$ c3 `0 n1 A如图9-8所示,SSTable中的数据按主键排序后存放在连续的数据块(Block)3 j) R* |( g( p# f0 w
中,Block之间也有序。接着,存放数据块索引(Block Index),由每个Block最后一
* x m3 i5 r9 j行的主键(End Key)组成,用于数据查询中的Block定位。接着,存放布隆过滤器& }+ z0 B7 v3 o& H2 a
(Bloom Filter)和表格的Schema信息。最后,存放固定大小的Trailer以及Trailer的偏; o0 {4 M; c5 }: m3 }8 z
移位置。
& F3 {4 k7 |5 C& u. X/ C6 O8 d& n图 9-8 SSTable格式
* x1 I# N5 H( _! ]4 u' r查找SSTable时,首先从子表的索引信息中读取SSTable Trailer的偏移位置,接着
# A! Y$ Z; [- H0 G# P: j4 q获取Trailer信息。根据Trailer中记录的信息,可以获取块索引的大小和偏移,从而将; k1 q" c; l8 K5 g2 m# y
整个块索引加载到内存中。根据块索引记录的每个Block的最后一行的主键,可以通1 d& X6 \, [5 p0 j6 H0 h0 d
过二分查找定位到查找的Block。最后将Block加载到内存中,通过二分查找Block中
. Y2 v9 H7 m- Q" F8 l/ E记录的行索引(Row Index)查找到具体某一行。本质上看,SSTable是一个两级索引7 Q0 c7 x2 \1 R/ P R: Q
结构:块索引以及行索引;而整个ChunkServer是一个三级索引结构:子表索引、块
- ]+ L* ]5 a7 Q索引以及行索引。
5 `# T. R8 {9 @, sSSTable分为两种格式:稀疏格式以及稠密格式。对于稀疏格式,某些列可能存
* s4 L+ W# k W' a在,也可能不存在,因此,每一行只存储包含实际值的列,每一列存储的内容为:, d" x! ^6 d! w" r' G# ~
<列ID,列值>(<Column ID,Column Value>);而稠密格式中每一行都需要存储5 K9 ]* l( o+ n$ n5 N
所有列,每一列只需要存储列值,不需要存储列ID,这是因为列ID可以从表格; e: o7 o( H4 m( [' C6 o
Schema中获取。
5 W5 l, Q( N7 P/ G' E例9-4 假设有一张表格包含10列,列ID为1~10,表格中有一行的数据内容( _2 L6 g) q. [
为:
! ~0 M2 |: A j4 T. k$ a那么,如果采用稀疏格式存储,内容为:<2,20>,<3,30>,<5,50>," K( A. |( D2 L1 k2 E
<7,70>,<8,80>;如果采用稠密格式存储,内容为:null,20,30,null,/ T8 C9 Z: N: O- ^# r% ^* w
50,null,70,80,null,null。3 ^. E: R8 t+ e2 R1 X9 n6 T/ Z
ChunkServer中的SSTable为稠密格式,而UpdateServer中的SSTable为稀疏格式,+ ^; O' f4 Z) [7 E
且存储了多张表格的数据。另外,SSTable支持列组(Column Group),将同一个列
. s, W4 n5 [; j. v组下的多个列的内容存储在一块。列组是一种行列混合存储模式,将每一行的所有
6 J6 |8 R- D) G' l列分成多个组(称为列组),每个列组内部按行存储。
; j- n; B5 d+ C- B/ l1 }7 K如图9-9所示,当一个SSTable中包含多个表格/列组时,数据按照[表格ID,列组
m' {5 `6 u# A& S* sID,行主键]([table_id,column group id,row_key])的形式有序存储。
* s1 g1 r- C: r图 9-9 SSTable包含多个表格/列组
5 v* q7 \1 K9 V! D2 j. [9 @) y另外,SSTable支持压缩功能,压缩以Block为单位。每个Block写入磁盘之前调
; x& ^% l7 Z! D" l! E3 j/ B用压缩算法执行压缩,读取时需要解压缩。用户可以自定义SSTable的压缩算法,目
6 b6 j1 ~2 k5 G前支持的算法包括LZO以及Snappy。
, Y, }3 L* T$ o* {# ?SSTable的操作接口分为写入和读取两个部分,其中,写入类为3 d3 X( l. Z/ Y2 F5 s
ObSSTableWriter,读取类为ObSSTableGetter(随机读取)和ObSSTableScanner(范围/ D! {3 B3 g: y) m8 g
查询)。代码如下:
0 w9 z; l6 A$ W" V* ~class ObSSTableWriter
: X0 K S8 y- `{
" ^) }6 h- T- m. K& O- H, `public:
6 L7 E# N# b( Y/ o. _7 m7 }* m//创建SSTable" i, Z: }1 U. ~) t, X& O" B
//@param[in]schema表格schema信息
5 g( o w- h# d" }! T//@param[in]path SSTable在磁盘中的路径名
+ X/ \: b; D7 \* ~' ]; ]//@param[in]compressor_name压缩算法名* D+ V$ m2 W+ m
//@param[in]store_type SSTable格式,稀疏格式或者稠密格式
$ r6 ?! X" h& n: _8 G//@param[in]block_size块大小,默认64KB
0 ~$ l6 _6 z1 ~! z2 [( g5 h$ eint create_sstable(const ObSSTableSchema&schema,const ObString&path,const! H; E- f+ c- C! w h
ObString&compressor_name,const int store_type,const int64_t block_size);
" @0 \4 v! M t3 v0 H//往SSTable中追加一行数据 C7 y! M' H0 g0 x6 w
//@param[in]row一行SSTable数据: z9 [7 z8 L, P8 Q+ d
//@param[out]space_usage追加完这一行后SSTable大致占用的磁盘空间
- X7 T: j- _* K3 u. w$ V+ \int append_row(const ObSSTableRow&row,int64_t&space_usage);" @) ]7 Q4 ]/ y$ \* C- n
//关闭SSTable,将往磁盘中写入Block Index,Bloom Filter,Schema,Trailer等信息
7 c+ L; |& E% \# U U5 e//@param[out]trailer_offset返回SSTable的Trailer偏移量
7 {$ H& @& d% o) N& W- ^! L y) Zint close_sstable(int64_t&trailer_offset);
% g, f1 F, q- N2 Z$ I7 y: t};1 @! ~2 o1 D2 h2 b8 \
定期合并&数据分发过程将产生新的SSTable,步骤如下:
9 o1 V$ b/ z; r: G7 ~9 c1 T2 f+ t1)调用create_sstable函数创建一个新的SSTable;
. }) h) g$ h* Q. U2 s: o& K9 r2)不断调用append_row函数往SSTable中追加一行行数据;
- f0 g1 d! ~% p9 V' P- u2 U6 }3)调用close_sstable完成SSTable写入。! K9 i! O' B) u" W6 o0 X
与9.2.1节中的MemTableIterator一样,ObSSTableGetter和ObSSTableScanner实现了
% N6 Y i+ @3 c- N2 d( v6 V迭代器接口,通过它可以不断地获取SSTable的下一个cell。# \' m9 z- G/ W: {7 h
class ObIterator8 [) G0 _4 Y0 L& H" ]
{" k, N, R# ^; X; }1 T( @
public:9 t4 a$ c+ ]& W4 D2 n6 |/ r
//迭代器移动到下一个cell) q" m. v$ o& V5 X" _/ A
int next_cell();
* m( W! C0 ?/ A) Q) U//获取当前cell的内容
U3 y; S! ~; C) e" T1 m" ?//@param[out]cell_info当前cell的内容,包括表名(table_id),行主键(row_key),列编号9 K" z/ [% |8 \# u; P
(column_id)以及列值(column_value)
3 P2 g. e! g2 g' _. `! L g( _int get_cell(ObCellInfo**cell_info);! F: j/ o( z0 \' S! P7 M
//获取当前cell的内容
$ a( y- }& m$ Z, Y. j L4 i//@param[out]cell_info当前cell的内容
9 U4 j4 |6 ~8 Q) C/ Q' L//@param is_row_changed是否迭代到下一行
5 m, F) X, U, b# u( E U, U/ ^+ Cint get_cell(ObCellInfo**cell_info,bool*is_row_changed);
3 B7 p) g7 z9 Q};
$ i% e; I4 ]0 d# n; T4 ?) NOceanBase读取的数据可能来源于MemTable,也可能来源于SSTable,或者是合
X, I1 I" s2 ^7 a+ u并多个MemTable和多个SSTable生成的结果。无论底层数据来源如何变化,上层的读+ ^4 A5 M5 B. v
取接口总是ObIterator。
: m/ |% I. ? W9.4.3 缓存实现7 x7 [- p; L) t" `% V+ v$ L
ChunkServer中包含三种缓存:块缓存(Block Cache)、行缓存(Row Cache)以! a3 s$ }, G+ t) O8 b8 J
及块索引缓存(Block Index Cache)。其中,块缓存中存储了SSTable中访问较热的数
9 ~: i+ |5 W- D+ \) I7 O" Q据块(Block),行缓存中存储了SSTable中访问较热的数据行(Row),而块索引缓
2 |8 i6 ^9 ~2 B7 V存中存储了最近访问过的SSTable的块索引(Block Index)。一般来说,块索引不会
4 M f" Z5 W# P6 S3 q6 B% F太大,ChunkServer中所有SSTable的块索引都是常驻内存的。不同缓存的底层采用相
, `5 ?& D6 H1 g1 Z: m# v& U同的实现方式。% Z$ w- M: F. g6 x2 O$ w
1.底层实现 o+ @% X5 ?' P; E# i
经典的LRU缓存实现包含两个部分:哈希表和LRU链表,其中,哈希表用于查找1 M0 U# Z. x& x9 k/ P
缓存中的元素,LRU链表用于淘汰。每次访问LRU缓存时,需要将被访问的元素移动
) v1 }( T2 ~! L; D T5 a到LRU链表的头部,从而避免被很快淘汰,这个过程需要锁住LRU链表。
! `5 s6 ?$ S4 ~如图9-10所示,块缓存和行缓存底层都是一个Key-Value Cache,实现步骤如下:3 P: @% X* U4 {$ N
图 9-10 Key-Value Cache的实现( v- ?+ q% Y5 Y5 D
1)OceanBase一次分配1MB的连续内存块(称为memblock),每个memblock包& s& g, x* O; @9 N) n9 ~
含若干缓存项(item)。添加item时,只需要简单地将item追加到memblock的尾部;. k; F. k; c1 ~5 t* o, I% v- S+ |
另外,缓存淘汰以memblock为单位,而不是以item为单位。9 E# k/ }: `- X: j& ?9 b$ N: N: H/ I
2)OceanBase没有维护LRU链表,而是对每个memblock都维护了访问次数和最
/ X9 `( T' O, `8 {: W% ]' C近频繁访问时间。访问memblock中的item时将增加memblock的访问次数,如果最近一, r5 K9 v8 G g
段时间之内的访问次数超过一定值,那么,更新最近频繁访问时间;淘汰memblock
# X" [; I9 ?3 E- N2 V时,对所有的memblock按照最近频繁访问时间排序,淘汰最近一段时间访问较少的% P. f9 ]- |8 }) k* S% A
memblock。可以看出,读取时只需要更新memblock的访问次数和最近频繁访问时) L' _$ N% g4 I4 ^) m9 K }
间,不需要移动LRU链表。这种实现方式通过牺牲LRU算法的精确性,来规避LRU链* s+ n" {" z1 @# o* z3 o. q' }
表的全局锁冲突。
9 B, v4 R- X$ u1 v4 C+ u% L. c1 n3)每个memblock维护了引用计数,读取缓存项时所在memblock的引用计数加& t) s8 ?5 q0 @
1,淘汰memblock时引用计数减1,引用计数为0时memblock可以回收重用。通过引用5 p4 B/ _0 u" d' s+ B; d
计数,实现读取memblock中的缓存项不加锁。
. o/ \1 r$ l4 F+ A2.惊群效应$ M5 x' L# d, X
以行缓存为例,假设ChunkServer中有一个热点行,ChunkServer中的N个工作线3 d( Q1 F+ E* ]& I0 ~( F: {
程(假设为N=50)同时发现这一行的缓存失效,于是,所有工作线程同时读取这行3 S. r9 f, Y( w0 H
数据并更新行缓存。可以看出,N-1共49个线程不仅做了无用功,还增加了锁冲突。7 T# J5 [. T3 @+ |% s4 A( e5 s: j: H
这种现象称为“惊群效应”。为了解决这个问题,第一个线程发现行缓存失效时会往
% z: I, X' f3 J1 R+ O e" l. F% G8 Q缓存中加入一个fake标记,其他线程发现这个标记后会等待一段时间,直到第一个线# C5 `1 |3 e |$ X1 u5 v/ N
程从SSTable中读到这行数据并加入到行缓存后,再从行缓存中读取。
0 J! z3 s `: a4 E4 ]7 m: Q算法描述如下:
: C9 s% z5 |5 }5 G: Y6 m调用internal_get读取一行数据;# c: W% u5 a2 w7 [. e) b: g$ q
if(行不存在){
) |3 w4 }) D. a5 p j; C+ @调用internal_set往缓存中加入一个fake标记;# s: B! T5 {& ^# x0 L' a9 [
从SSTable中读取数据行;
& d `. E+ V ?: O* p7 C9 u将SSTable中读到的行内容加入缓存,清除fake标记,唤醒等待线程;* @+ \/ z! x% \% D l5 g6 M
返回读到的数据行;* C& s/ V8 b: O3 _2 i5 A
}else if(行存在且为fake标记): v2 f' R3 Q; c9 ^1 q! W4 t, ~
{
% r O( ^6 p/ c" h7 l线程等待,直到清除fake标记;! _- S0 ^. S; h0 P7 ~& {
if(等待成功)返回行缓存中的数据;% U* b3 B/ `* d5 B9 i* k
if(等待超时)返回读取超时;
) Y/ ?! D" N% a( Q6 E0 R}
5 {% O" _# m) [! w: Ielse- O5 G7 f5 ^; i" B& L
{+ d5 a F" n7 P& G/ w! y* q
返回行缓存中的数据;
2 x( g; [2 V% P7 H# J}/ @! L9 P, o) _, B# Y& l
3.缓存预热
/ \" A7 x- A" t! W& c1 g6 H9 KChunkServer定期合并后需要使用生成的新的SSTable提供服务,如果大量请求同' u7 z8 `& F4 U! {! c! F9 g
时读取新的SSTable文件,将使得ChunkServer的服务能力在切换SSTable瞬间大幅下: d6 N9 ?2 z5 a
降。因此,这里需要一个缓存预热的过程。OceanBase最初的版本实现了主动缓存预
@9 _& G: }+ V0 ]热,即:扫描原来的缓存,根据每个缓存项的key读取新的SSTable并将结果加入到新" s" C" }* i; E0 K6 Z3 @! Y/ t
的缓存中。例如,原来缓存数据项的主键分别为100、200、500,那么只需要从新的. x; N9 b! g6 w' x6 U9 R
SSTable中读取主键为100、200、500的数据并加入新的缓存。扫描完成后,原来的缓% L f+ b5 z& p' m8 G8 j, b
存可以丢弃。8 f. j- b8 Y7 h) X, U. d
线上运行一段时间后发现,定期合并基本上都安排在凌晨业务低峰期,合并完0 A7 m8 l F7 P% E- ]7 [
成后OceanBase集群收到的用户请求总是由少到多(早上7点之前请求很少,9点以后
& T2 X, g1 s+ T, h, |请求逐步增多),能够很自然地实现被动缓存预热。由于ChunkServer在主动缓存预9 S& f9 @: G1 V' `5 M+ I
热期间需要占用两倍的内存,因此,目前的线上版本放弃了这种方式,转而采用被
N& h' M% f* g- h动缓存预热。/ L4 s v. e. N) W, H- X- U: E
9.4.4 IO实现
3 o8 |$ {$ ~' M0 ]OceanBase没有使用操作系统本身的页面缓存(page cache)机制,而是自己实现' ^) p& t6 m5 J% z# i9 z3 k
缓存。相应地,IO也采用Direct IO实现,并且支持磁盘IO与CPU计算并行化。! @ a" e' f' L7 e3 \6 _+ H/ S
ChunkServer采用Linux的Libaio [1] 实现异步IO,并通过双缓冲区机制实现磁盘预读
5 V0 p% {; V+ q. ], E与CPU处理并行化,实现步骤如下:4 `0 d% R0 M( a" r; x8 d# }
1)分配当前(current)以及预读(ahead)两个缓冲区;
f( m+ ^* \9 R1 l: d2)使用当前缓冲区读取数据,当前缓冲区通过Libaio发起异步读取请求,接着 B+ I- Y8 c! u! O! B( I+ |4 i
等待异步读取完成;
4 k$ g \. l( d4 q3)异步读取完成后,将当前缓冲区返回上层执行CPU计算,同时,原来的预读5 k" J q) S# y5 f% t, J
缓冲区变为新的当前缓冲区,发送异步读取请求将数据读取到新的当前缓冲区。
c# b7 q7 Q* x( c. lCPU计算完成后,原来的当前缓冲区变为空闲,成为新的预读缓冲区,用于下一次4 j' S2 }" l4 B+ D5 l4 x
预读。) w' `, s4 o! F' L9 `
4)重复步骤3),直到所有数据全部读完。8 _' Q# \! e" a6 H: P0 T
例9-5 假设需要读取的数据范围为(1,150],分三次读取:(1,50],(50,: Q" ~' f* \# W
100],(100,150],当前和预读缓冲区分别记为A和B。实现步骤如下:& S! H7 v! T' N8 d$ D
1)发送异步请求将(1,50]读取到缓冲区A,等待读取完成;
; i( K5 I- k5 e; S1 y6 j2)对缓冲区A执行CPU计算,发送异步请求,将(50,100]读取到缓冲区B;* d" g" C- J2 A5 Z! y
3)如果CPU计算先于磁盘读取完成,那么,缓冲区A变为空闲,等到(50,' ~* m$ F/ U! T& @5 d
100]读取完成后将缓冲区B返回上层执行CPU计算,同时,发送异步请求,将6 h/ K' `: ^! Y. C {
(100,150]读取到缓冲区A;
. v. _% E* c0 A+ e* G) q4)如果磁盘读取先于CPU计算完成,那么,首先等待缓冲区A上的CPU计算完
% D4 Y" A) E/ a! ~成,接着,将缓冲区B返回上层执行CPU计算,同时,发送异步请求,将(100,9 i4 d P0 Z$ w E8 ^7 x0 g( o
150]读取到缓冲区A;" E- o4 _; A5 t( S: k( b
5)等待(100,150]读取完成后,将缓冲区A返回给上层执行CPU计算。
. R3 w _: L a$ u! B- `# l5 s双缓冲区广泛用于生产者/消费者模型,ChunkServer中使用了双缓冲区异步预读
7 H1 y! L. N4 Z0 U! u的技术,生产者为磁盘,消费者为CPU,磁盘中生产的原始数据需要给CPU计算消费 h: V: i( M4 p; A
掉。; g1 j ^% Q: o$ ]
所谓“双缓冲区”,顾名思义就是两个缓冲区(简称A和B)。这两个缓冲区,总
2 W: z& z/ d* f' r& z是一个用于生产者,另一个用于消费者。当两个缓冲区都操作完,再进行一次切
A4 }+ F/ n! C" H; b( C换,先前被生产者写入的被消费者读取,先前消费者读取的转为生产者写入。为了
, L$ [6 N6 R: ]/ U' X3 e+ n做到不冲突,给每个缓冲区分配一把互斥锁(简称La和Lb)。生产者或者消费者如' ?4 G8 f6 o3 @. \ J4 j0 y$ t
果要操作某个缓冲区,必须先拥有对应的互斥锁。
% P( |4 L; T& G- o [% R0 b双缓冲区包括如下几种状态:# a8 D p ^' Z7 d0 w
●双缓冲区都在使用的状态(并发读写)。大多数情况下,生产者和消费者都处
8 P! U& h2 D! {, q# r7 {: p于并发读写状态。不妨设生产者写入A,消费者读取B。在这种状态下,生产者拥有
6 A8 p/ J! Y( [) y' g9 @' s锁La;同样地,消费者拥有锁Lb。由于两个缓冲区都是处于独占状态,因此每次读
; X7 X6 G6 j: ?* u. p- z: ?写缓冲区中的元素都不需要再进行加锁、解锁操作。这是节约开销的主要来源。3 c/ s7 I M3 U3 h
●单个缓冲区空闲状态。由于两个并发实体的速度会有差异,必然会出现一个缓, s3 f4 L* \4 S/ z( X
冲区已经操作完,而另一个尚未操作完。不妨假设生产者快于消费者。在这种情况$ G, R8 ^9 q, C* p+ }5 ]3 G9 Y
下,当生产者把A写满的时候,生产者要先释放La(表示它已经不再操作A),然后
% j6 o$ W, D& F' I尝试获取Lb。由于B还没有被读空,Lb还被消费者持有,所以生产者进入等待
4 y# ?+ j* w6 e( ]% _4 |(wait)状态。7 R9 E& Q( y9 V- o
●缓冲区的切换。过了若干时间,消费者终于把B读完。这时候,消费者也要先
: v$ `2 a- r' }0 u' n释放Lb,然后尝试获取La。由于La刚才已经被生产者释放,所以消费者能立即拥有8 L% P( o" w( z$ ~
La并开始读取A的数据。而由于Lb被消费者释放,所以刚才等待的生产者会苏醒过来$ J# b* c( U' N" c+ `9 H H0 a" k7 @
(wakeup)并拥有Lb,然后生产者继续往B写入数据。
8 T. z0 N+ N3 {+ o7 r[1]Oracle公司实现的Linux异步IO库,开源地址:https://oss.oracle.com/projects/libaio-
' B; M; h2 G7 \5 W9 V1 d' ~8 Loracle/
7 @$ i9 j2 ?/ c( L1 u9.4.5 定期合并&数据分发
: R5 d- m/ ^/ L' r7 C; ^RootServer将UpdateServer上的版本变化信息通知ChunkServer后,ChunkServer将
% N9 d1 S* ]! V& A) P) W( p$ u执行定期合并或者数据分发。( O( ^( c: W! K1 x" i. |1 V4 W. ?
如果UpdateServer执行了大版本冻结,ChunkServer将执行定期合并。ChunkServer
# W5 v1 f! f* W1 S; W唤醒若干个定期合并线程(比如10个),每个线程执行如下流程:; m* W4 s. r* W3 A' s
1)加锁获取下一个需要定期合并的子表;5 T% l2 Y- b; X. z9 b- v
2)根据子表的主键范围读取UpdateServer中的修改操作;* O: O8 g3 p5 m8 @7 F# v1 }' L
3)将每行数据的基线数据和增量数据合并后,产生新的基线数据,并写入到新
0 P) b5 ^6 k" g. K! A" r的SSTable中;
3 L+ B B& B+ A- I$ X% y! A4)更改子表索引信息,指向新的SSTable。2 h, }/ o2 g& P. t: _! O/ }
等到ChunkServer上所有的子表定期合并都执行完成后,ChunkServer会向
: L6 e) p3 E, l; P$ ?. v$ n0 DRootServer汇报,RootServer会更新RootTable中记录的子表版本信息。定期合并一般
. U5 ~& y) F5 } V i安排在每天凌晨业务低峰期(凌晨1:00开始)执行一次,因此也称为每日合并。另
6 _/ [5 w. Z" b9 d3 J% I& _外,定期合并过程中ChunkServer的压力比较大,需要控制合并速度,否则可能影响 O4 }& [% C8 d
正常的读取服务。& @' F- c& i7 h, j3 h! P/ P2 X
如果UpdateServer执行了小版本冻结,ChunkServer将执行数据分发。与定期合并3 {( Q+ }" ^( ]' [$ D
不同的是,数据分发只是将UpdateServer冻结的数据缓存到ChunkServer,并不会生成
; t5 F7 C) {. b P$ q- |1 j+ F+ k新的SSTable文件。因此,数据分发对ChunkServer造成的压力不大。3 J( N8 F1 I2 Q; M5 q
数据分发由外部读取请求驱动,当请求ChunkServer上的某个子表时,除了返回1 ^( Y& p& o$ O2 O Y: p
使用者需要的数据外,还会在后台生成这个子表的数据分发任务,这个任务会获取6 q/ c* g) X+ R+ M
UpdateServer中冻结的小版本数据,并缓存在ChunkServer的内存中。如果内存用完,. T) D/ N: T r' k2 Y, y
数据分发任务将不再进行。当然,这里可以做一些改进,比如除了将UpdateServer分
' L7 [8 k" \# Y' `发的数据存放到ChunkServer的内存中,还可以存储到SSD磁盘中。# v$ d: y3 H, o; I
例9-6 假设某台ChunkServer上有一个子表t1,t1的主键范围为(1,10],只有一% ? x) V% g* z& X9 g
行数据:rowkey=8=>(<2,update,20>,<3,update,30>,<4,update,40
: r, K/ N! @: x; d8 ?>)。UpdateServer的冻结版本有两行更新操作:rowkey=8=>(<2,update,30% [/ g% H. A9 t
>,<3,up-date,38>)和rowkey=20=>(<4,update,50>)。
1 N% G' O% d( ?●如果是大版本冻结,那么,ChunkServer上的子表t1执行定期合并后结果为:; J1 ` U* r. a: ^# b4 k5 ~
ro-wkey=8=>(<2,update,30>,<3,update,38>,<4,update,40>); S. ]1 q3 P) Q" H" @+ k
●如果是小版本冻结,那么,ChunkServer上的子表t1执行数据分发后的结果为:& n. D% v! t' i( g6 `) i
rowkey=8=>(<2,update,20>,<3,update,30>,<4,update,40>,<2," L! j9 I( E: T v
update,30>,<3,update,38>)。
W; b$ J+ R/ R- N f8 j) t9.4.6 定期合并限速! |8 [$ a$ `* J3 ]7 s
定期合并期间系统的压力较大,需要控制定期合并的速度,避免影响正常服 O( S2 o5 d; k: M# M; R7 I
务。定期合并限速的措施包括如下步骤:! P& k6 c, X6 ?: Y) {
1)ChunkServer:ChunkServer定期合并过程中,每合并完成若干行(默认2000 j; n8 o, o$ z% Y& c, {- g
行)数据,就查看本机的负载(查看Linux系统的Load值)。如果负载过高,一部分; g, t# q2 U2 m. B- Q, z
定期合并线程转入休眠状态;如果负载过低,唤醒更多的定期合并线程。另外,
4 H- a5 n; w; h' A, I7 jRootServer将UpdateServer冻结的大版本通知所有的ChunkServer,每台ChunkServer会
, `2 @& i+ y) X; m$ `; D9 b随机等待一段时间再开始执行定期合并,防止所有的ChunkServer同时将大量的请求
3 H2 t: O2 K" W/ T) U. R! [ T$ h! O发给UpdateServer。
1 u( t! B0 y$ M7 e5 g2)UpdateServer:定期合并过程中ChunkServer需要从UpdateServer读取大量的数
, s1 Q8 N: l) i: L7 s/ r4 x o& l据,为了防止定期合并任务用满带宽而阻塞用户的正常请求,UpdateServer将任务区
& g/ T! m2 r7 a p: a分为高优先级(用户正常请求)和低优先级(定期合并任务),并单独统计每种任. ]) p& ]. C- e( F
务的输出带宽。如果低优先级任务的输出带宽超过上限,降低低优先级任务的处理
9 j' E9 v1 |) p d7 _) ]/ M, y' y+ {速度;反之,适当提高低优先级任务的处理速度。
! b1 t8 a# S' m6 s1 C如果OceanBase部署了两个集群,还能够支持主备集群在不同时间段进行“错峰
% l: c2 T6 ]" e1 E合并”:一个集群执行定期合并时,把全部或大部分读写流量切到另一个集群,该集
9 i: h6 u. E. [$ X( x群合并完成后,把全部或大部分流量切回,以便另一个集群接着进行定期合并。两/ u3 L Y: \& y( ~6 b
个集群都合并完成后,恢复正常的流量分配。4 G5 [% R% u/ C0 {
4 ]) A) r+ U' L7 b1 d* Q5 N
. x" A7 s+ T! ^ n0 L1 G* z
|
|