|
第9章 分布式存储引擎% d9 d+ a6 C3 A
分布式存储引擎层负责处理分布式系统中的各种问题,例如数据分布、负载均
+ Q- m) U! z6 n衡、容错、一致性协议等。与其他分布式存储系统类似,分布式存储引擎层支持根
7 a+ w' N4 ~9 o; ?" X$ n& w3 Y据主键更新、插入、删除、随机读取以及范围查找等操作,数据库功能层构建在分: |" ? Y: I1 X' s, r
布式存储引擎层之上。3 A: h) V2 {: i7 W% ?
分布式存储引擎层包含三个模块:RootServer、UpdateServer以及ChunkServer。" H# S+ q: ]# g# Q3 F. `( D7 E
其中,RootServer用于整体控制,实现子表分布、副本复制、负载均衡、机器管理以
+ m( `* _ t# t# Y! y9 _4 t及Schema管理;UpdateServer用于存储增量数据,数据结构为一个内存B树,并通过 M" t8 F/ X$ u5 @8 b+ |
主备实时同步实现高可用,另外,UpdateServer的网络框架也经过专门的优化;
: p4 l7 s0 I3 }, E8 x/ eChunkServer用于存储基线数据,基线数据按照主键有序划分为一个个子表,每个子
. b! i/ M6 K9 C9 S- @2 O表在ChunkServer上存储了一个或者多个SSTable,另外,定期合并和数据分发的主要$ \" ^) O; ~9 n" i0 d$ H( K$ |3 B
逻辑也由ChunkServer实现。& V: |$ P6 j" v( T1 V
OceanBase包含一个公共模块,包含其他模块共用的网络框架、内存池、任务队
" K2 C9 D% F* Q0 u3 s' V列、锁、基础数据结构等,本章将介绍分布式存储引擎以及公共模块的实现。
$ m% c4 d9 y' ?% M9.1 公共模块9 k- Y+ K0 _& U
OceanBase源代码中有一个公共模块,包含其他模块需要的公共类,例如公共数) v9 k! d0 Z, B$ k+ X3 w( S
据结构、内存管理、锁、任务队列、RPC框架、压缩/解压缩等。下面介绍其中部分- n8 M8 `6 c: h( v6 Y
类的设计思路。. E7 h( W1 a0 A
9.1.1 内存管理
z: ^3 ~ P, R% I$ _2 I+ n内存管理是C++高性能服务器的核心问题。一些通用的内存管理库,比如Google
) Z1 N0 `/ _1 F6 K/ K3 |TCMalloc,在内存申请/释放速度、小内存管理、锁开销等方面都已经做得相当卓越; s# ]. @* Y; ^% V# D
了,然而,我们并没有采用。这是因为,通用内存管理库在性能上毕竟不如专用的
y/ q& v5 D- O1 `9 [* A内存池,更为严重的问题是,它鼓励了开发人员忽视内存管理的陋习,比如在服务
1 W5 Y- @3 ]5 ~* z; \: T: T$ f器程序中滥用C++标准模板库(STL)。; [6 g5 `3 E, i2 |
在分布式存储系统开发初期,内存相关的Bug相当常见,比如内存越界、服务器
$ R" ?% ^ y/ J* F) s* {1 w$ [- h出现Core Dump,这些Bug都非常难以调试。因此,这个时期内存管理的首要问题并7 `& o( A" x0 @. I5 W- ]4 ~0 i* n
不是高效,而是可控性,并防止内存碎片。
1 x& O- ~' @2 E7 E2 \1 wOceanBase系统有一个全局的定长内存池,这个内存池维护了由64KB大小的定长
L4 M( e8 Q3 H) r! O* v内存块组成的空闲链表,其工作原理如下: \/ [" f3 |; d6 F- f. M
●如果申请的内存不超过64KB,尝试从空闲链表中获取一个64KB的内存块返回
# {- a% f+ s3 l( `! j8 b& i$ h给申请者;如果空闲链表为空,需要首先从操作系统中申请一批大小为64KB的内存/ [( j# u! o' x9 c7 U2 ^
块加入空闲链表。释放时将64KB的内存块加入到空闲链表中以便下次重用。
& O+ C; r$ f: h" t2 o7 O- i' h●如果申请的内存超过64KB,直接调用Glibc的内存分配(malloc)函数,向操
D# \: p8 j9 J; C7 u' b8 j' L作系统申请用户所需大小的内存块。释放时直接调用Glibc的内存释放(free)函数,
3 [$ F- t7 {* A/ L* t h- r" P将内存块归还操作系统。6 W0 Q: A8 V/ F# M) f
OceanBase的全局内存池实现简单,但内存使用率比较低,即使申请几个字节的7 v' l6 }5 }8 j) I" K
内存,也需要占用大小为64KB的内存块。因此,全局内存池不适合管理小块内存,
9 i, Q1 C" z7 T8 {5 x每个需要申请内存的模块,比如UpdateServer中的MemTable,ChunkServer中的缓存
2 H. Y) A! O# R! ^( i G等,都只能从全局内存池中申请大块内存,每个模块内部再实现专用的内存池。每
5 N( ]% F( B1 V- w; I3 R个线程处理读写请求时需要使用临时内存,为了提高效率,每个线程会缓存若干个2 C/ n0 s( P! X1 F+ }; F) ?
大小分别为64KB和2MB的内存块,每个线程总是首先尝试从线程局部缓存中申请内/ H. X( o) M: S) V7 @
存,如果申请不到,再从全局内存池中申请。
* I" V( b. F6 \7 _) g: g5 yclass ObIAllocator
4 d% a0 w; d$ @0 B) X9 N6 O* v{6 A d5 \8 Y; Q( I" m1 a
public:$ z6 ?# E$ J# q+ a! q
//内存申请接口
! w# X/ B# B+ N/ b3 Cvirtual void*alloc(const int64_t sz)=0;
+ S; \$ [- g8 J7 o. q//内存释放接口# q3 f7 F, a' \
virtual void free(void*ptr)=0;
: T/ q s) A. k1 e9 u# q$ G};
% s4 U, P# F8 I% E8 ~class ObMalloc:public ObIAllocator) i% Q" I. n6 v
{' b: H' v$ I; }; ?$ K1 x" j" q
public:( E4 }* G C1 h0 y* ?& A
//设置模块号( S7 O+ h0 j" f
void set_mod_id(int32_t mod_id);
& j5 S4 o$ t4 C2 G% i//申请大小为sz的内存块. {6 U9 z4 c% }
void*alloc(const int64_t sz);) ` P3 ~' P# q {. I* j
//释放内存
' T9 c4 {$ G& d2 N) @void free(void*ptr); J+ \. p# Q' b5 f
}1 W! h% Q/ [* Y7 B: i
class ObTCMalloc:public ObIAllocator$ x6 s9 x' s6 P+ B5 o
{
2 F8 f! p# z. s0 o4 {public:# w" `3 U, E! Y! e( H
//设置模块号2 [5 c0 z7 K& J! a
void set_mod_id(int32_t mod_id);0 o% P! v' T( ^$ a9 j
//申请大小为sz的内存块
/ V$ e3 \) }: t& C# D7 y) u7 E& a7 h% svoid*alloc(const int64_t sz);
. @, Z0 i: C0 B* p8 c& e5 h6 N//释放内存$ ^6 e6 w& B5 _( S# [! [
void free(void*ptr);
5 L- q1 h9 Q1 [0 @8 u}
% Z1 @1 f4 V9 p* e. yObIAllocator是内存管理器的接口,包含alloc和free两个方法。ObMalloc和; ^, K3 v; j% s7 I
ObTCMalloc是两个实现了ObIAllocator接口的全局内存池,不同点在于,ObMalloc不+ h3 I2 M! i& E& {2 I
支持线程缓存,ObTCMalloc支持线程缓存。ObTCMalloc首先尝试从线程局部的空闲! v: m0 y \; e
链表申请内存块,如果申请不到,再通过ObMalloc的alloc方法申请。释放内存时,
- i- m: h$ D1 |5 L6 k如果没有超出线程缓存的内存块个数限制,则将内存块还给线程局部的空闲链表;
; @6 T) T# y/ f否则,通过ObMalloc的free方法释放。另外,允许通过set_mod_id函数设置申请者所
9 ?5 _1 A7 Z1 t! ]& v( l( }在的模块编号,便于统计每个模块的内存使用情况。
, s" C. s9 Z4 C8 F全局内存池的意义如下:6 j' a. ], P/ M
●全局内存池可以统计每个模块的内存使用情况,如果出现内存泄露,可以很快# P* G- C$ Y- j. l( N G
定位到发生问题的模块。6 A9 W' ?) F( O9 v: {8 \0 r
●全局内存池可用于辅助调试。例如,可以将全局内存池中申请到的内存块按字5 v! [1 @; W: J' O* K$ `$ w x
节填充为某个非法的值(比如0xFE),当出现内存越界等问题时,服务器程序会很% }5 A7 Z, K4 G6 b/ |
快在出现问题的位置Core Dump,而不是带着错误运行一段时间后才Core Dump,从) Z8 w7 D& J# t6 p! m& v
而方便问题定位。/ V& v- V! g$ Q# [: @
总而言之,OceanBase的内存管理没有采用高深的技术,也没有做到通用或者最/ M, Z, _+ x k- I; W
优,但是很好地满足了服务器程序开发的两个最主要的需求:可控性以及没有内存
" E$ b# q1 ~4 k6 A) k& s- u' l% G碎片。% Y1 w" {7 V1 F0 B
9.1.2 基础数据结构
' J4 y4 g8 j; h) N: v5 f) m$ J) N1.哈希表
' O9 {; [4 H5 b4 O( h# T) x$ ~为了提高随机读取性能,UpdateServer支持创建哈希索引,这个哈希索引结构就8 A7 c; v$ R# ` _1 L
是LightyHashMap,代码如下:
8 |* r! z* a3 D J, I# Z" ~template<typename Key,typename Value>% `. O6 @# ~/ j9 l
class LightyHashMap! j. a- c1 r& e0 T% P. `5 v
{
9 Y$ `6 Z- a0 y: ppublic:
9 E! u+ b0 D' @( k, `. \* z4 s//插入一个<key,value>对到哈希表
8 ?8 d7 O# I6 ]; J! {9 ainline int insert(const Key&key,const Value&value);) }, q' W% k" I, Y" {* o0 J
//根据key查找value
7 W* @3 o6 r# K" M' rinline int get(const Key&key,Value&value);
. h. }: D# I; K0 y//根据key删除一个<key,value>对,如果value不为空,那么,保存删除的值到value中
! K( }0 ]+ F# Ninline int erase(const Key&key,Value*value=NULL);
5 v1 i8 h- O5 a1 j3 M; C, ?private:
1 q) z+ V9 v8 @0 D; F9 [7 kstruct Node
* t" c! F, f. ^{
d- p. t! q+ B* P6 {4 qKey key;" j2 A6 S; z& l- [0 o# E
Value value;
, T$ X8 a/ g( @& ?$ funion+ G9 |, a# w; d9 h4 _' Z( h
{
# _9 L0 n1 d5 y2 rNode*next;
; L5 V% z' C1 n8 A0 O( ]int64_t flag;
1 z" ?( C; X. M5 I+ v, a};
8 l9 U* @2 a9 w( f9 W8 }};, x4 R. i7 ~" U/ _$ I
Node*buckets_;//哈希桶指针2 P D y0 K' M' ?0 [& }" D& J f
BitLock bit_lock_;//位锁,用于保护哈希桶
; M5 H7 u* w! U) m; h: z# ]6 D};
' b3 Y5 l! s7 I$ QLightyHashMap采用链式冲突处理方法,即将所有哈希值相同的<key,value>对
# @- i9 z# c( v) y( @链到同一个哈希桶中,它包含如下三个方法:4 r5 X# K8 l& i$ r, j" V
●insert:往哈希表中插入一个<key,value>对。这个函数首先根据key的哈希值
, v6 l \ f4 Y得到桶号,接着,往哈希桶中插入一个包含key和value值的Node节点。& _$ ?. x8 G+ k4 B$ w; h: k% A3 C
●get:根据key查找value。这个函数首先根据key的哈希值得到桶号,接着,遍历
; _# a6 H" ]. i9 J" X4 l! ?, D对应的链表,找到与传入key相同的Node节点,返回其中的value值。" b& \: v5 F4 i2 [ \ |0 V
●erase:根据key删除一个<key,value>对。这个函数首先根据key的哈希值得到
( u. Y% M. H# l桶号,接着,遍历对应的链表,找到并删除与传入key相同的Node节点。; o& }! I2 ^5 L& j0 Z& u2 \
LightyHashMap设计用来存储几千万甚至几亿个元素,它与普通哈希表的不同点
! b' y, n$ [/ V4 {* ]- `. D在于以下两点:
+ Y1 K( x6 W7 K, `) ~1 y, }5 X1)位锁(BitLock):LightyHashMap通过BitLock实现哈希桶的锁结构,每个哈
; C$ W" r# _3 M! K希桶的锁结构只需要占用一个位(Bit)。如果哈希桶对应的位锁值为0,表示没有锁
; R% p5 G) a; j+ T3 U9 r% n! b/ v冲突;否则,表示出现锁冲突。需要注意的是,LightyHashMap没有区分读锁和写
: r1 { v: p8 q$ }4 n" }& @) V锁,多个get请求也是冲突的。可以对LightyHashMap的BitLock做一些改进,例如用两7 ?) @; n2 [6 L% }6 F
个位(Bit)表示哈希桶对应的锁,其中一个位表示是否有读冲突,另外一个位表示: K* Z' _$ m. {: J8 V0 F
是否有写冲突。
. {( _0 ^ V3 O, }8 B2)延迟初始化(Lazy Initialization):LightyHashMap的哈希桶个数往往特别多4 L$ f8 m6 G5 D
(默认为1000万个),即使仅仅对所有哈希桶执行一次memset操作,消耗的时间也
! o4 s2 u# s2 ]2 p. j& W1 p% t1 H4 d是相当可观的。因此,LightyHashMap采用延迟初始化策略,即将哈希桶划分为多个4 p7 n* s1 n1 @) E4 A$ N
单元,默认情况下每个单元包含65536个哈希桶。每次执行insert、get或者erase操作
* C( ?" }1 S4 p/ `时都会判断哈希桶所属的单元是否已经初始化,如果未初始化,则对该单元内的所8 ?" n5 P+ {' r; y+ D9 A, b
有哈希桶执行初始化操作。6 X8 d( s* k# u. ^# M- g% I' p6 l
2.B树
# b" M* ?3 r: A/ A% ~2 ?UpdateServer的MemTable结构底层采用B树结构索引其中的数据行,代码如下:+ |- Z. Y6 D8 K9 `% r2 k9 P* e
template<class K,class V,class Alloc>
" M' A: c( t7 F+ wclass BTreeBase
5 |7 a( C3 y: r; j{
' ~. m1 q8 s3 S3 V t; I. L1 |! wpublic:
& @( ^( H8 S6 V) S' T' \* I) V) j//把<key,value>对加到B树中,overwrite参数表示是否覆盖原有值
7 ~# F* m' o% B/ a2 Oint put(const K&key,const V&value,const bool overwrite=false);
1 e) g( n! a0 ~. c4 d2 h0 u( `//获取key对应的value! }" b: y' S# [% X2 n+ ?% X+ P
int get(const K&key,V&value);; |4 B& ]+ M' E2 _% t
//获取扫描操作描述符
3 K. P7 A7 C0 U8 E a, ^ F2 ?. Uint get_scan_handle(TScanHandle&handle);4 r% j/ T6 Z6 d: u- B
//设置扫描的数据范围
" l2 a) Q5 E O/ G; Aint set_key_range(TScanHandle&handle,const K&start_key,int32_t
1 K% s9 B5 ~1 a# ~start_exclude,const K&end_key,int32_t end_exclude);
( A# R! I) W$ u0 E//读取下一行数据6 Q5 {0 U" O+ E& k; U
int get_next(TScanHandle&handle,K&key,V&value);
6 E, H5 I( d- ~; ]/ M};
/ Y. [4 L* |6 _3 G支持的功能如下:3 L+ u1 \4 V0 i* C3 z0 P) ]
1)Put:插入一个<key,value>对。& _# m8 N7 U9 S
2)Get:根据key获取对应的value。
4 ~- l4 `' L/ W7 ~3)Scan:扫描一段范围内的数据行。首先,调用get_scan_handle获取扫描操作
( x, g$ V9 J+ Z' b: F v6 h, B描述符,其次,调用set_key_range设置扫描的数据范围,最后,不断地调用get_next7 k, n) V- e' f+ D% J$ J$ }
读取下一行数据直到全部读完。
2 _* ^+ u1 c5 u3 ^ ?# b5 q5 F" ZB树支持多线程并发修改。如图9-1所示,往MemTable插入数据行(Data)时,
' @; w7 y o( f6 J5 ~" Z0 g将修改其B树索引结构(Index),分为两种情况:
. Y: o: J8 g' n. I2 g图 9-1 并发修改B树# n( Z; g- a% @4 t2 _5 a
●两个线程分别插入Data1和Data2:由于Data1和Data2属于不同的索引节点,插; `9 |5 N: u* ~7 j" [! {* O0 T( Q
入Data1和Data2将影响B树的不同部分,两个线程可以并发执行,不会产生冲突。
9 J( ]/ ]3 U3 H( a4 @) K# U●两个线程分别插入Data2和Data3:由于Data2和Data3属于相同的索引节点,因
' O" U1 b1 ]# B$ T" T- D此,插入操作将产生冲突。其中一个线程会执行成功,另外一个线程失败后将重
/ K' ]' f7 `9 d1 {试。
! s* k3 k! K2 ^# _ D5 r7 {+ L1 |3 Q每个索引节点满了以后将分裂为两个节点,并触发对该索引节点的父亲节点的
% Z0 g" B }1 u u$ O- F+ v5 A3 }修改操作。分裂操作将增加插入线程冲突的概率,在图9-1中,如果Data1和Data2的
5 H" ]; K1 u- U% A/ F父亲节点都需要分裂,那么,两个插入线程都需要修改Data1和Data2的祖父节点,从7 T d* X: @% A6 q: }
而产生冲突。
; M+ Q0 N) X( m3 P5 n1 h另外,为了提高读写并发能力,B树实现时采用了写时复制(Copy-on-write)技; X# N' s0 K5 N" a& O
术,修改每个索引节点时首先将该节点拷贝出来,接着在拷贝出来的节点上执行修
* C/ \& Y( w9 z1 U$ e4 D0 K改操作,最后再原子地修改其父亲节点的指针使其指向拷贝出来的节点。这种实现
5 E% ^9 T$ Z/ @* j! m' @" z方式的好处在于修改操作不影响读取,读取操作永远不会被阻塞。
* y: l! D" Z l细心的读者可能会发现,这里的B树不支持更新(Update)以及删除操作,这是0 \6 L5 o( o, T" r9 y
由OceanBase MVCC存储引擎的实现机制决定的。对于更新操作,MVCC存储引擎会
" [6 ~; P i t3 g7 a$ [3 M" {在行的末尾追加一个单元记录更新的内容,而不会影响索引结构;对于删除操作,
6 ^, T( C# R5 {4 tMVCC存储引擎内部实现为标记删除,即在行的末尾追加一个单元记录行的删除时: r6 M7 F& M1 L# S4 ^) Y' \
间,而不会物理删除某行数据。
0 g4 q) {6 g& q0 I- o9.1.3 锁
- T9 b0 ~, C) ^8 ]为了实现并发控制,OceanBase需要对一行记录加共享锁或者互斥锁。为此,专
: g+ ?8 V. X4 B3 o门实现了QLock,代码如下:
0 l. B/ f0 M0 Q8 o9 D+ N$ o1 T9 S, rstruct QLock6 G& o. [) P; P/ o; B8 m R, i4 G
{( j7 a' f3 U: ^- ^( d( g
enum State
0 f) t/ B& Z& O) B% Q- ^{* x0 f4 G: X9 t* u. q
EXCLUSIVE_BIT=1UL<<31,$ z% [1 Z2 p% p7 e
UID_MASK=~EXCLUSIVE_BIT( L$ ^9 t# p r" K2 L4 x
};
& b! |, h8 L) {' R. e! Zvolatile uint32_t n_ref_;//表示持有共享锁的引用计数; x3 u: [$ O4 R: N& [
volatile uint32_t uid_;//表示持有互斥锁的用户编号( A4 F4 @. Q6 f8 t/ C3 P# x- J u3 E
//加共享锁,uid为用户编号,end_time为超时时间
* J- \% N! f* [5 J5 b. |) Mint shared_lock(const uint32_t uid,const int64_t end_time=-1);
+ \3 q0 B# D. s0 {//解除共享锁
+ ?8 m( T5 v* m, B7 Q+ ^6 rint shared_unlock();! n; s' {! x9 w' e. W* {
//加互斥锁,uid为用户编号,end_time为超时时间
. ?1 [& M) W5 {' L, @9 gint exclusive_lock(const uint32_t uid,const int64_t end_time=-1);# |+ M+ Z9 u& o2 M3 i
//解除互斥锁* v- J, @) T8 p6 P5 |) W) `' b
int exclusive_unlock(const uint32_t uid);
4 t9 u5 U! l! g$ [9 W' j//共享锁升级为互斥锁,uid为用户编号,end_time为超时时间) z' d/ P* h4 |4 ]* h
int share2exclusive_lock(const uint32_t uid,const int64_t end_time=-1);
7 D5 ]% `5 m$ r/ {, c//互斥锁降级为共享锁+ J5 _5 i1 _. p7 d' e' M8 H! b/ H( U
int exclusive2shared_lock(const uint32_t uid);
: b- X7 r; Q. @5 K; O- O};
: ]5 d# \5 ?$ |, i# ^在QLock的实现中,每把锁占用8个字节,其中4个字节为n_ref_,表示持有共享
/ b: K4 U9 P& D/ Y锁的引用计数,另外4个字节为uid_,表示持有互斥锁的用户编号(例如线程编
( ]( I0 |6 x0 I3 D8 f$ I5 P2 F" J号)。uid_的最高位(EXCLUSIVE_BIT)表示是否为互斥锁,其余31位表示用户编6 @+ X7 a4 R+ ]: m! t( k8 ~3 R$ M
号。
9 [! Y6 a% O4 _' R2 Ashare_lock用于加共享锁,实现时只需要将n_ref_原子加1;exclusive_lock用于加7 m" h) n- X% ?2 T
互斥锁,实现时需要将EXCLUSIVE_BIT置1并等待持有共享锁的所有用户解锁完成。
. d8 L) N, i# I8 D& [( l另外,为了避免新用户不断产生并持有共享锁导致无法获取互斥锁的情况,6 m, F+ J& h" ~1 |; a
exclusive_lock实现步骤如下:
. k+ X; }0 D# q" c! f6 [1)将EXCLUSIVE_BIT置为1;
6 a' \9 W ^' \+ U$ V2)等待持有共享锁的所有用户解锁完成;0 J) W( w8 q0 F8 ~/ y; u( ~
3)如果第2)步无法在超时时间内完成,加锁失败,将EXCLUSIVE_BIT重新置 d" U* R O. A: h) e1 R
为0。
" t$ W$ r& W- }4 H. v第1)步执行完成后,新产生的用户无法获取共享锁。这样,只需要等待已经持
) j" c! E: D2 s2 Z/ Q" Y2 Y, Y有共享锁的用户解锁即可,不会出现获取互斥锁时“饿死”的现象。
# H, c M- p. R# d/ Z& wshare2exclusive_lock将共享锁升级为互斥锁,实现时首先升级为互斥锁,如果获
5 t$ @/ I; F/ w a2 v. `取成功,接着再解除共享锁,即引用计数减1。; W7 @4 L1 B5 Z$ m
9.1.4 任务队列
; L: ~3 @) [- B在生产者/消费者模型中,往往有一个任务队列,生产者将任务加入到任务队
. x4 V+ [1 X. v列,消费者从任务队列中取出任务进行处理。例如,在网络框架中,网络线程接收/ _, n& l2 i! a) q8 J$ v% K
任务并加入到任务队列,工作线程不断地从任务队列取出任务进行处理。% e6 X0 N5 h4 A6 K- V5 T! G0 X" _
最为常见的场景是系统有一个全局任务队列,所有网络线程和工作线程操作全! c4 y( d, w" B# x5 B! t
局任务队列都需要首先获取独占锁,这种方式的锁冲突严重,将导致大量操作系统+ {6 a# c: q$ c# B Q( _1 {- [5 R
上下文切换(context switch)。为了解决这个问题,可以给每个工作线程分配一个任
( u7 w' o5 W" y$ R* ?7 a务队列,网络线程按照一定的策略选择一个任务队列并加入任务,例如随机选择或% S' Z9 t+ B& K+ S+ o9 _: Y; R
者选择已有任务个数最少的任务队列。- }5 K. ~$ B, D2 g/ v- v( R
将任务加入到任务队列(随机选择):4 D5 w1 \( w% A
1)将total_task_num原子加1(total_task_num为全局任务计数值);1 B. d9 K7 _- Y) L
2)通过total_task_num%工作线程数,计算出任务所属的工作线程;
d6 S$ o+ x) |0 E* l, q+ }3)将任务加入到该工作线程对应的任务队列中;
/ o8 V" E& M5 P# \: R* @8 \' n: S4)唤醒工作线程。9 V+ I4 s1 @ A$ q1 Q0 f- }( a3 j
然而,如果某个任务的处理时间很长,就有可能出现任务不均衡的情况,即某 {! I2 X6 g6 J) X0 M$ T: x
个线程的任务队列中还有很多任务未被处理,其他线程却处于空闲状态。OceanBase4 f! S+ S( V1 N8 j% U4 r/ Y. @
采取了一种很简单的策略应对这种情况:每个工作线程首先尝试从对应的任务队列$ u- F3 g. K. k: g% H2 x
中获取任务,如果获取失败(对应的任务队列为空),那么,遍历所有工作线程的2 ^7 t: d% o( j/ R
任务队列,直到获取任务成功或者遍历完成所有的任务队列为止。$ [/ ?7 }& N! c0 ~
除此之外,OceanBase还实现了LightyQueue用于解决全局任务队列锁冲突问题。
- |2 k, g+ t1 _6 b7 c3 S/ x" CLightyQueue的设计思想如下:
) \& F! e d1 C0 e4 a; }假设系统中有3个工作线程t1,t2和t3,全局任务队列中共有10个槽位。首先,# {' z* [& J" h
t1,t2和t3分别等待1号,2号以及3号槽位。网络线程将任务加入1号槽位时唤醒t1,
3 g- z7 U7 Q f6 `5 Y7 p加入2号槽位时唤醒t2,加入3号槽位时唤醒t3。接着,t2很快将任务处理完成后等待4
! e% @7 j, c3 \& L$ \号槽位,t3等待5号槽位,t1等待6号槽位。网络线程将任务加入到4,5,6号槽位时
9 d6 r' M9 L. e- s3 U3 a; P) L将分别唤醒t2,t3和t1。通过这样的方式,每个工作线程在不同的槽位上等待,避免
+ ]! L" U$ K* ]了全局锁冲突。
9 ?: L+ w+ ~$ |2 A将任务加入到工作队列(push)的操作如下:* r2 M5 D7 y! {1 H) e0 [+ Z" }; b
1)占据下一个push槽位;$ ?5 d, P1 ~: e, Y- a: O
2)将任务加入到该push槽位;
: E3 A/ I2 Y9 q: M0 v3)唤醒该push槽位上正在等待的工作线程。9 y1 x, N# Q7 O; _ Q: W
工作线程从任务队列中获取任务(pop)的操作如下: \! a% a9 N+ G' U6 K- t
1)占据下一个pop槽位;, _4 d6 |. Y' r, V$ }# J
2)如果该pop槽位上有任务,则直接返回;
3 H: c1 k& K2 \: R0 h% p3)否则,工作线程在该pop槽位上等待直到被push操作唤醒或者超时。7 O# o1 y' W, k' d2 M/ r: q
9.1.5 网络框架
& ?( y8 I+ H) U2 ?: JOceanBase的网络框架代码如下:
+ ]. r( U0 l" F' h" c" h5 Yclass ObSingleServer
( i" v: o/ }: l& s{
% L; @9 @- Q" Zpublic:/ N8 |% m5 N3 q- d9 s }% R+ {+ n, E
//设置工作线程个数& [' Z& k# ~" ?& H% |% G. w
int set_thread_count(const int thread_count);
. @2 \+ ]. F9 U. ~7 b% ?; G//设置网络IO线程个数8 F. d4 ]9 o$ E8 A
int set_io_thread_count(const int io_thread_count);
9 p1 m6 Z0 Q" @, |//设置监听端口
5 l }2 n0 T' c9 aint set_listen_port(const int listen_port);" D/ Z- W: p0 e! [/ V
public:
) C! N* c$ c4 O; k9 L# E. J//处理接收到的网络包,默认的处理逻辑是将网络包加入到全局任务队列中
$ U2 @, B1 B4 w- c$ L/ u9 ^virtual int handlePacket(ObPacket*packet);2 h/ f9 q5 a' u; {0 |
//工作线程每次从全局任务队列中取出一个网络包并调用该函数进行处理
+ _4 I/ K: n! \$ kvirtual do_request(ObPacket*packet);$ m3 p ]# d# Q* B2 Q$ _: a. G4 {2 l
};
6 \# l. D) R& ^1 i# GOceanBase服务端接收客户端发送的网络包(ObPacket),并交给handlePacket处% S7 D" D% f5 o$ Y! J
理函数进行处理。默认情况下,handlePacket会将网络包加入到全局任务队列中。接
3 s$ u A$ |/ g4 [" v9 {着,工作线程会从全局任务队列中不断获取网络包,并调用do_request进行处理,处: @4 a! b1 E7 w; O
理完成后应答客户端。可以分别通过set_thread_count以及set_io_thread_count函数来设
+ H4 k! \+ h7 |* l置工作线程以及网络线程的个数。4 O: `: d" E9 e, H) \! r6 W% _
客户端使用ObClientManager发送网络包:
8 }) {3 V( @/ ^; \class ObClientManager# M! ^: [ M5 e9 G) y: {) f
{
9 j2 Y+ O ~ R( v. p' N7 M4 g E1 T" kpublic:& Y& v( g) ^: Y. d* a, w
//异步发送请求包
2 k* C6 ` m. E4 |//@param[in]server服务器端地址
+ j6 e& M7 `1 S3 F x/ e8 v//@param[in]pcode请求包的类型(packet code)9 {7 }7 b9 r. r6 l" W. k
//@param[in]version请求包的版本' m. i4 j/ U& B' K# |
//@param[in]in_buffer请求包实际内容缓冲区% n8 v' z: A3 W6 O# U
int post_request(const ObServer&server,const int32_t pcode,const int32_t- o1 |0 A& i/ D7 n
version,const ObDataBuffer&in_buffer)const;
" b; d3 F" Y1 e! g, O3 G//同步发送请求包并等待应答' V+ @& Y; n3 U: I3 c! U
//@param[in]server服务器端地址' m9 T, ^. T5 @/ y$ U2 ~
//@param[in]pcode请求包的类型(packet code)
p! A3 o$ t" h//@param[in]version请求包的版本# g# D8 D$ K# ]5 K5 F
//@param[in]timeout请求时间
; M; `; B* i9 C5 ^! a6 W//@param[in]in_buffer请求包实际内容缓冲区. H. v7 i j, n! q# `% X9 [
//@param[out]out_buffer应答包的实际内容缓冲区
" T( C- B4 H, T% bint send_request(const ObServer&server,const int32_t pcode,const int32_t$ c" b( `+ z$ K9 B7 ~' z
version,const int64_t timeout,ObDataBuffer&in_buffer,ObDataBuffer&
" L/ b! ~: |& d& a% g/ k6 P4 q$ |out_buffer)const;4 V1 A+ y5 E$ r' x6 B. x7 D
};
% a( T+ X( I; @3 K! Z5 t5 O客户端发包分为两种情况:异步请求(post_request)以及同步请求( @1 l. P; \ L2 W5 K
(send_request)。异步请求时,客户端将请求包加入到网络发送队列后立即返回,& U) | [+ v+ Q; Q1 T, f- j# \
不等待应答。同步请求时,客户端将请求包加入到网络发送队列后开始阻塞等待,6 \+ _) D' m; y% A7 a
直到网络线程接收到服务端的应答包后才唤醒客户端,从而执行后续处理逻辑。
' S6 x, m& n: z- L1 U9.1.6 压缩与解压缩
" e. w: y) j& k }& aclass ObCompressor7 ~/ g* B, k: `% r; s
{2 b7 T9 x' h5 X$ e3 K3 b
public:
- u, D4 q; Y( D) i$ d' [& i//数据压缩与解压缩接口2 B" T# Y( D, D3 u& X/ q; u( P
//@param[in]src_buff输入数据缓冲区
8 k1 r5 ]0 g' s* l- F0 J//@param[in]src_data_size输入数据大小
. h" ?. E0 n1 B: k, k* D6 D//@param[in/out]dst_buffer输出数据缓冲区
. F) E: O& ]. i, p//@param[in]dst_buffer_size输出数据缓冲区大小7 _" A$ @; m) b7 [+ w3 h
//@param[out]dst_data_size输出数据大小
& P( F, U7 X; L3 [8 M- pvirtual compress(const char*src_buffer,const int64_t
5 r$ P0 d6 w( t. C2 ~7 G2 Nsrc_data_size,char*dst_buffer,const int64_t dst_buffer_size,int64_t&
9 _+ t8 N4 X6 b ~! x/ tdst_data_size)=0;
+ |8 e3 w. |( n* Z/ ~5 x6 Q% R' i8 Bvirtual decompress(const char*src_buffer,const int64_t
9 A$ |1 e# K! t y& O1 Z5 Ysrc_data_size,char*dst_buffer,const int64_t dst_buffer_size,int64_t&
0 F) o/ {: V' V7 y+ T7 L) |- Gdst_data_size)=0;. l% R7 I9 p$ p$ B
//获取压缩库名称
! }% e1 r3 u. D" a; Qconst char*get_compress_name()const;( B. |* x) {0 P- C1 d# `
//根据传入的大小计算压缩后最大可能的溢出大小 H) d8 U: S( u7 y+ d T' f
int64_t get_max_overflow_size(const int64_t src_data_size)const;
$ J- q4 H1 O' V};% c( X. C4 I0 Z" b% `
ObCompressor定义了压缩与解压缩的通用接口,具体的压缩库实现了这些接6 O% l i R' L/ u( C
口。压缩库以动态库(.so)的形式存在,每个工作线程第一次调用compress或者* @+ d3 d1 C# v$ P/ \* Q' T2 J1 U
decompress方法时将加载相应的动态库,这样便实现了压缩库的插件化。目前,支持" h2 e( z+ ^& H" [8 K
的压缩库包括LZO [1] 以及Snappy [2] 。
( l! a& Y+ \* ?. m[1]LZO:见http://www.oberhumer.com/opensource/lzo/* f$ ~! k& g1 P$ @- d: [3 c
[2]Snappy:Google开源的压缩库,见http://code.google.com/p/snappy/
! ]) ~" U; A* c* f U1 ]6 B
4 J' }7 q/ K: x) X1 n5 e- |$ f8 `
|
|