|
10.3 写事务
* [& } l0 ]" v写事务,包括更新(UPDATE)、插入(INSERT)、删除(DELETE)、替换
& p) `( N2 ~2 \4 \( n. |) a(REPLACE,插入或者更新,如果行不存在则插入新行;否则,更新已有行),由$ U/ S6 a4 O- N3 ^! {2 h+ S; c
MergeServer解析后生成物理执行计划,这个物理执行计划最终将发给UpdateServer执 r6 H' Q* E- ?
行。写事务可能需要读取基线数据,用于判断更新或者插入的数据行是否存在,判# B" f$ ?' d8 w6 E1 m! c
断某个条件是否满足,等等,这些基线数据也会由MergeServer传给UpdateServer。
2 D+ p! x; d+ O5 ~& h' I& z10.3.1 写事务执行流程
7 ~4 G3 e+ }9 \9 A大部分写事务都是针对单行的操作,如果单行事务不带其他条件:
: {- M# h1 a: [$ b●REPLACE:REPLACE事务不关心写入行是否已经存在,因此,MergeServer直1 ?) i0 [' k- g! Q
接将修改操作发送给UpdateServer执行。
9 S( K* n. t* s+ X●INSERT:MergeServer首先读取ChunkServer中的基线数据,并将基线数据中行: N8 q# F/ B6 V8 P
是否存在信息发送给UpdateServer,UpdateServer接着查看增量数据中行是否被删除或7 C+ k9 h2 T5 a/ I9 z/ H, G
者有新的修改操作,融合基线数据和增量数据后,如果行不存在,则执行插入操
+ O4 i6 u4 T' E4 H! x作;否则,返回行已存在错误。! \# d6 M# ]: T3 ]& _
●UPDATE:与INSERT事务执行步骤类似,不同点在于,行已存在则执行更新操* C+ x1 \7 M7 ?9 B/ v
作;否则,什么也不做。0 V- g" g! @& d' n5 G4 r
●DELETE:与UPDATE事务执行步骤类似。如果行已存在则执行删除操作;否& @' \. w7 Z! P
则,什么也不做。! J9 l- n' R* e S" x
如果单行写事务带有其他条件:
m; n4 N1 J) |( O8 x3 F/ G; V7 D●UPDATE:如果UPDATE事务带有其他条件,那么,MergeServer除了从基线数
' T/ b3 Y0 J3 E3 k9 a据中读取行是否存在,还需要读取用于条件判断的基线数据,并传给UpdateServer。
4 X$ B: l+ F7 EUpdateServer融合基线数据和增量数据后,将会执行条件判断,如果行存在且判断条
& ]" m9 O/ T( Z# e% Q, L件成立则执行更新操作。否则,返回行已存在或者条件不成立错误。
9 Q* J% T$ o6 V+ Q' j5 t( l: e' [$ \●DELETE:与UPDATE事务执行步骤类似。, ^* j2 W. D# _! n9 y
例10-6 有一张表格item(user_id,item_id,item_status,item_name),其中,<
- P. i# c% W2 o5 s0 X3 ]2 ^user_id,item_id>为联合主键。
. R, ^( B, n. c5 s3 u' _+ }. yMergeServer首先解析图10-6的SQL语句产生执行计划,确定待修改行的主键为
; A* U' o* i8 N# q, j+ e5 f<1,2>,接着,请求主键<1,2>所在的ChunkServer,获取基线数据中行是否存3 n" \5 m, O: V# b. @* N; ` n
在,最后,将SQL执行计划和基线数据中行是否存在一起发送给UpdateServer。
/ k/ L, S. ~* }+ ]3 jUpdateServer融合基线数据和增量数据,如果行已存在且未被删除,UPDATE和
$ O+ T/ V/ j- f* M mDELETE语句执行成功,INSERT语句执行返回“行已存在”;如果行不存在或者最后
( z1 g- ]. u) y% E4 E% a) z被删除,INSERT语句执行成功,UPDATE和DELETE语句返回“行不存在”。: ?, A, r3 ~; j, i$ e+ m
图 10-6 单行写事务(不带条件)$ q& N; b! U- c: |5 T4 k( ]
图10-7中的UPDATE和DELETE语句还带有item_name="item1"的条件,; i, t1 S6 p4 ~7 b4 g1 _7 Q
MergeServer除了请求ChunkServer获取基线数据中行是否存在,还需要获取item_name6 w+ Z- [: N( v( f
的内容,并将这些信息一起发送给UpdateServer。UpdateServer融合基线数据和增量
p+ [1 X/ ~, o# I5 x. W数据,判断最终结果中行是否存在,以及item_name的内容是否为"item1",只有两个
, G0 X& `. e- e0 c9 A9 p" ~, k条件同时成立,UPDATE和DELETE语句才能够执行成功;否则,返回“行不存在或者
9 t/ \& l+ ^8 i9 }1 l7 `% t# B) J0 Z2 vitem_name列的内容不符合预期”。- M! O( L4 @1 G
图 10-7 单行写事务(带条件): K& X! F: J( K* x4 V4 R2 }8 Z
当然,并不是所有的写事务都这么简单。复杂的写事务可能需要修改多行数& D. K U! @$ s7 N
据,事务执行过程也可能比较复杂。
# h4 P# t9 b0 B9 B% T' y0 |例10-7 有两张表格item(user_id,item_id,item_status,item_name)以及
' D" E* D0 h- buser(user_id,user_name)。其中,<user_id,item_id>为item表格的联合主键,! Q; z8 N7 L9 r0 {9 z& n; ?$ r" X
user_id为user表格的主键。; E6 l3 \$ X9 C( R R
图10-8的UPDATE语句可能会更新多行。MergeServer首先从ChunkServer获取编
) z: R1 U; }- X, @号为1的用户包含的全部item(可能包含多行),并传给UpdateServer。接着,
- Y+ r. o+ B! b/ x* C0 r' j9 RUpdateServer融合基线数据和增量数据,更新每个存在且未被删除的item的item_status7 {8 k* J6 P* W. z- \
列。
1 d8 M% V2 R Z- e3 E图 10-8 复杂写事务举例% ?$ F% l" o/ v; A9 C9 w6 c! A7 R
图10-8的DELETE语句更加复杂,执行时需要首先获取user_name为“张三”的用户4 ~/ H' Q0 P/ ?
的user_id,考虑到事务隔离级别,这里可能需要锁住user_name为“张三”的数据行 I& c: J8 h5 s4 Q0 b- z
(防止user_name被修改为其他值)甚至锁住整张user表(防止其他行的user_name修
, B* ?# q! r T5 c3 P& C改为“张三”或者插入user_name为“张三”的新行)。接着,获取用户名为“张三”的所
1 r/ V- N9 ^0 n6 q& o- w有用户的所有item,最后,删除这些item。这条语句执行的难点在于如何降低锁粒度2 u2 o$ L1 E$ X: H' u
以及锁占用时间,具体的做法请读者自行思考。7 l+ H2 ~1 |2 U
10.3.2 多版本并发控制- @" l \/ N9 \8 i9 ?
OceanBase的MemTable包含两个部分:索引结构及行操作链。其中,索引结构存1 ?( f( P4 S- |5 i# k l& o) e6 f# l
储行头信息,采用9.1.2节中的内存B树实现;行操作链表中存储了不同版本的修改操
: k+ g& d/ z& C% k. U; V$ V作,从而支持多版本并发控制。3 @) G% s, b; A' R6 L
OceanBase支持多线程并发修改,写操作拆分为两个阶段:
3 ]6 t- L& H- Y) z% _●预提交(多线程执行):事务执行线程首先锁住待更新数据行,接着,将事务
: ?( q' f& r" _中针对数据行的操作追加到该行的未提交行操作链表中,最后,往提交任务队列中7 [3 K2 m3 J4 e2 I! Y; C5 {' }* D3 r7 X
加入一个提交任务。5 w. k7 [4 g. f* q( }
●提交(单线程执行):提交线程不断地扫描并取出提交任务队列中的提交任' O7 e5 Z1 Z$ } w- Y7 G, z
务,将这些任务的操作日志追加到日志缓冲区中。如果日志缓冲区到达一定大小,
; k4 C3 g j0 d7 y [将日志缓冲区中的数据同步到备机,同时写入主机的磁盘日志文件。操作日志写成1 J: \; c! b8 V- Y" ?/ P9 |
功后,将未提交行操作链表中的cell操作追加到已提交行操作链表的末尾,释放锁并
8 M, D9 ]4 j e' J$ p6 N5 H回复客户端写操作成功。' f' @$ f: C% u. `% `7 C
如图10-9所示,MemTable行操作链表包含两个部分:已提交部分和未提交部, v( d8 A' O! H& v! I
分。另外,每个事务管理结构记录了当前事务正在操作的数据行的行头,每个数据
4 W9 N( K4 J' W. _' o3 D4 V行的行头包含已提交和未提交行操作链表的头部指针。在预提交阶段,每个事务会% c& |( f- _8 [, s( n
将cell操作追加到未提交行操作链表中,并在行头保存未提交行操作链表的头部指针
* R6 h; U; m" M以及锁信息,同时,将行头信息记录到事务管理结构中;在提交阶段,根据事务管
. F) c9 F/ q% ?2 J) K理结构中记录的行头信息找到未提交行操作链表,链接到已提交行操作链表的末6 d3 h% `/ @7 u6 e) u5 Q4 U" A
尾,并释放行头记录的锁。' f, O4 z6 `; w( U# [* f
Class ObTransExecutor
4 u2 s \; Y1 g* w{& ?. I% y6 w8 b2 M6 f1 X+ ?1 w0 V
public:. u: {$ G0 u$ D$ ?( U! i% k
//处理预提交任务! g: e/ p- c4 ?, n) ^- k3 Y
void handle_trans(void*ptask,void*pdata);( H% M8 g: @3 [" d5 m
//处理提交任务
1 L L# J3 K7 ^; R" n9 g( l$ Wvoid handle_commit(void*ptask,void*pdata);4 i2 K0 J; g$ R( O! P( p* C- {
};
$ e0 v4 S/ V' v) `图 10-9 MemTable实现MVCC
& W& f$ B5 ~* O* p7 D' AObTransExecutor是UpdateServer读写事务处理的入口类,它主要包含两个方法:
6 z! x) _$ m$ Phandle_trans以及handle_commit。其中,handle_trans处理预提交任务,handle_commit
: E/ e9 Y3 u$ S9 e$ R7 m9 M: X2 F处理提交任务。handle_trans首先将写事务预提交到MemTable中,接着将写事务加入
/ d! u& K% g* B2 G到提交任务队列。提交线程不断地从提交任务队列中取出提交任务,并调用# B# ]; }; W; J( p; u
handle_commit进行处理。
9 M5 @! t. a: b. J2 F* n每个写事务会根据提交时的系统时间生成一个事务版本,读事务只会读取在它, b. Q+ m, @, b; X6 f: w+ z
之前提交的写事务的修改操作。- K- @; P4 p1 J U: ?
如图10-10所示,对主键为1的商品有2个写事务,事务T1(提交版本号为2)将( w% S9 b% P, f6 D+ u; s1 q6 ^# I
商品购买人数修改为100,事务T2(提交版本号为4)将商品购买人数修改为50。那8 `+ r+ ]0 b. C& @) \9 A
么,事务T2预提交时,T1已经提交,该商品的已提交行操作链包含一个cell:<" O% m0 F. t' j* w
update,购买人数,100>,未提交操作链包含一个cell:<update,购买人数,507 ~; E$ V0 f: P, Z5 n9 ?( s/ ^
>。事务T2成功提交后,该商品的已提交行操作链将包含两个cell:<update,购买
4 e M/ |7 G1 p* S! E* {* \) t人数,100>以及<update,购买人数,50>,未提交行操作链为空。对于只读事
2 s6 {1 O" n$ S+ K$ a5 J务:7 r4 K! X ]/ ^
图 10-10 读写事务并发执行实例, Z: \# W) s1 U
●T3:事务版本号为1,T1和T2均未提交,该行数据为空。4 ?, g* V7 I$ D
●T4:事务版本号为3,T1已提交,T2未提交,读取到<update,购买人数, 1000 K A( U0 f) y6 g) e4 Z+ J, |" k- t
>。尽管T2在T4执行过程中将购买人数修改为50,T4第二次读取时会过滤掉T2的修: v9 T9 m/ {9 N$ |. e+ c; _# _
改操作,因而两次读取将得到相同的结果。
' K3 w5 w% b) `% W& | }●T5:事务版本号为5,T1和T2均已提交,读取到<update,购买人数,100>以+ }/ |) I/ K5 T) m1 ~
及<update,购买人数,50>,购买人数最终值为50。
/ A9 s' v% Z9 q1.锁机制0 b& ?/ E+ h w8 X
OceanBase锁定粒度为行锁,默认情况下的隔离级别为读取已提交(read
Y' _# ^$ u0 z2 W) Rcommitted)。另外,读操作总是读取某个版本的快照数据,不需要加锁。+ w0 G$ @2 b. k- N
●只写事务(修改单行):事务预提交时对待修改的数据行加写锁,事务提交时
/ t K& R6 z, \. ]释放写锁。! I* q g$ {, J. n3 t
●只写事务(修改多行):事务预提交时对待修改的多个数据行加写锁,事务提. |! p# w/ V3 @0 j+ R" O
交时释放写锁。为了保证一致性,采用两阶段锁的方式实现,即需要在事务预提交
. \7 J! `% a' _" V0 O+ `6 M阶段获取所有数据行的写锁,如果获取某行写锁失败,整个事务执行失败。
* y1 l; ^* `: Q+ l! s6 ]2 {●读写事务(read committed):读写事务中的读操作读取某个版本的快照,写操
g0 g6 L K* y# s% J作的加锁方式与只写事务相同。/ F; B2 _- Q7 a% D" \( a6 z. ]: W
为了保证系统并发性能,OceanBase暂时不支持更高的隔离级别。另外,为了支: g' `( j/ b5 V& e
持对一致性要求很高的业务,OceanBase允许用户显式锁住某个数据行。例如,有一0 t8 `9 b$ G+ X# f$ Q- U& t# j
张账务表account(account_id,balance),其中account_id为主键。假设需要从A账户1 n; ?: G$ F% ^& t4 T l! A( C& W+ `
(account_id=1)向B账户(account_id=2)转账100元,那么,A账户需要减少100
7 w' w+ \5 t2 H元,B账户需要增加100元,整个转账操作是一个事务,执行过程中需要防止A账户+ C/ W, \% v+ A
和B账户被其他事务并发修改。& c6 _$ T! a& K" s7 X
如图10-11所示,OceanBase提供了"select...for update"语句用于显示锁住A账户或
. D6 [ l( x6 I3 ^. k) L者B账户,防止转账过程中被其他事务并发修改。
' g0 V2 j' `0 L' X' F" h6 v! @9 B. Z图 10-11 select……for update示例6 F' ^: Y' {! {: [2 o9 z# B
事务执行过程中可能会发生死锁,例如事务T1持有账户A的写锁并尝试获取账户# F: C; L' M7 j* F; Y* R
B的写锁,事务T2持有账户B的写锁并尝试获取账户A的写锁,这两个事务因为循环) M) _* \( ~% e' c9 P3 \; o, E
等待而出现死锁。OceanBase目前处理死锁的方式很简单,事务执行过程中如果超过
/ w* n M$ h2 R: d) N) L一定时间无法获取写锁,则自动回滚。
2 ^; f6 B0 k/ ?! K! c2.多线程并发日志回放$ r# E1 n0 @0 h1 D! C6 y' Y. _' {
9.2.3节介绍了主备同步原理,引入多版本并发控制机制后,UpdateServer备机支
, V9 ?1 D$ q, z% a7 T* Y2 p' j持多线程并发回放日志功能。如图10-12所示,有一个日志分发线程每次从日志源读6 s; C3 a- Z/ x- G$ s7 s. l
取一批日志,拆分为单独的日志回放任务交给不同的日志回放线程处理。一批日志) W: \* ~) ^; ?
回放完成时,日志提交线程会将对应的事务提交到内存表并将日志内容持久化到日, I: K0 d: M4 H) y
志文件。2 r- \2 h3 g( y: H
图 10-12 备机多线程并发日志回放3 `2 T+ Z2 m4 N
Class ObLogReplayWorker5 Z% u% o6 I$ }0 Q
{+ y2 N: ~! l/ ?5 K5 s' Z7 z
public:
. D/ a0 X% x& [5 t1 q2 P+ x( y//提交一批待回放的操作日志
. u6 K8 i- b3 F1 v2 d//@param[out]task_id最后一条操作日志的编号: k; k3 L7 R: w% B7 j! g- P) b
//@param[in]buf日志缓冲区
$ U1 Q6 E% X7 ~' Z; m9 {* o) C1 n9 a! O//@param[in]len日志缓冲区的大小
6 N) @4 }' ^1 Q- [: J//@param[in]replay_type日志回放类型,包括RT_LOCAL(回放本地日志)和RT_APPLY(回放通过7 K9 J* G3 e4 ?' C) s, y
网络接收到的日志)
3 z3 Z1 h# {" l6 F# Q Mint submit_batch(int64_t&task_id,const char*buf,int64_t len,const ReplayType! y3 P9 i, H% T5 |
replay_type);
- A4 A2 y7 h, N) c hpublic:
& T6 m; H7 e4 P# o+ _) h) W//回放一条操作日志
. K- i0 h/ |* _! W7 }int handle_apply(ObLogTask*task);
6 g3 d, [# @( f$ V- l};
$ A( n5 _# \; X6 _! r' K" m在9.3.3节中提到,备UpdateServer有专门的日志回放线程不断地调用ObUpsLog-
+ {7 `4 b7 S2 ?2 q# ^Mgr中的replay_log函数获取并回放操作日志。UpdateServer支持多线程并发写事务
, K) @' D" ]3 d% d( q$ I" S后,replay_log函数实现成调用ObLogReplayWorker中的submit_batch,将一批待回放
* b ?" p9 {% ?* ~$ W) J- ]的操作日志加入到回放任务队列中。多个日志回放线程会取出回放任务并不断地调. X% @6 h; u) i7 J8 B u
用handle_apply回放操作日志,即首先将操作日志预提交到MemTable中,接着加入到& Q. F) G2 y3 y* C* j5 b. S% D1 l, R
提交任务队列。另外,还有一个单独的提交线程会从提交任务队列中一次取出一批
* ]9 S9 I) V5 @% j任务,提交到MemTable并持久化到日志文件中。
8 f. F; l; t6 G& T& P$ l9 I6 c% @7 I0 S1 S) {* B4 ^
1 Z7 t' F% P) @- K @3 K9 R2 |$ C |
|