|
10.2 只读事务 ]% ~+ U# ~; V
只读事务(SELECT语句),经过词法分析、语法分析,预处理后,转化为逻辑( H$ M( b0 Q- i/ y1 H! S* N
查询计划和物理查询计划。以SQL语句select c1,c2 from t1 where id=1 group by c10 G3 Y8 K$ I7 c
order by c2为例,MergeServer收到该语句后将调用ObSql类的静态方法8 C9 v+ f# o5 O5 f
direct_execute,执行步骤如下:
7 l( B3 g% e1 k1 l5 {' W% _1)调用flex、bison解析SQL语句生成一个语法树。
. R, S5 t" e/ r2)解析语法树,生成逻辑执行计划ObSelectStmt。ObSelectStmt结构中记录了
* b) X/ A; ]; M# T- }8 g/ USQL语句扫描的表格名(t1),投影列(c1,c2),过滤条件(id=1),分组列; N6 n( w4 f4 j& Z1 @
(c1)以及排序列(c2)。# l( ^6 N& N6 m- y! p
3)根据逻辑执行计划生成物理执行计划。ObSelectStmt只是表达了一种意图,
% Z( a% x9 U* S2 X但并不知道实际如何执行,ObTransformer类的generate_physical_plan将ObSelectStmt转
! o6 @4 P1 Z- h; x2 M# t+ S化为物理执行计划。
2 p9 I' m, u+ Z逻辑查询计划的改进以及物理查询计划的选择,即查询优化器,是关系数据库
' w( q1 t' u4 m9 e" Z c最难的部分,OceanBase目前在这一部分的工作不多。因此,本节不会涉及太多关于
' d9 H, q3 R: C2 W7 E8 F9 @如何生成物理查询计划的内容,下面仅以两个例子说明OceanBase的物理查询计划。 h, ~* D, K$ b7 L
例10-1 假设有一个单表SQL语句如图10-2所示。
- n2 @; ]; {% `$ ]. S5 F% G图 10-2 单表物理查询计划示例
2 P( Z" [1 c5 a3 b. a" V3 S单表SQL语句执行过程如下:
* `7 i9 D% z Q% m1)调用TableScan操作符,读取子表t1中的数据,该操作符还将执行投影4 A+ }& c; \3 d. \! P! E
(Project)和过滤(Filter),返回的结果只包含c3=10的数据行,且每行只包含c1、0 `- X5 z: W7 r+ Z9 K( i
c2、c3三列。) P; ~7 q* B- I/ U1 |% ^5 d# k
2)调用HashGroupBy操作符(假设采用基于哈希的分组算法),按照c1对数据
' R- u9 g' _. ~分组,同时计算每个分组内c2列的总和。$ ~7 [1 |3 a# x4 y
3)调用Filter操作符,过滤分组后生成的结果,只返回上一层sum(c2)>=10的# z$ L$ \0 ~7 Z# t8 P
行。, T- U. q- U$ Q. ?- A
4)调用Sort操作符将结果按照c1排序。+ {: _* ?. t- F d$ u) h
5)调用Project操作符,只返回c1和sum(c2)这两列数据。
* L- w1 Z) i# P# J: P& m9 Q6)调用Limit操作符执行分页操作,只返回前20条数据。
5 q9 x2 v+ N& Q8 w例10-2 假设有一个需要联表的SQL语句如图10-3所示。
& ?3 \9 L5 z4 N, _7 C2 ^图 10-3 多表物理查询计划示例
! Z* \2 [3 ~/ g- K. W多表SQL语句执行过程如下:
$ |3 }, r' n d' F* k* V* E2 ~8 {1)调用TableScan分别读取t1和t2的数据。对于t1,使用条件c3=10对结果进行过
% o& }/ m6 i( m k0 R* p滤,t1和t2都只需要返回c1,c2,c3这三列数据。* v o0 U$ ?0 ~- Z' u
2)假设采用基于排序的表连接算法,t1和t2分别按照t1.c2和t2.c2排序后,调用
0 U7 h) E B5 M8 p$ ]Merge Join运算符,以t1.c2=t2.c2为条件执行等值连接。& Y/ x9 P' t6 @. K; u
3)调用HashGroupBy运算符(假设采用基于哈希的分组算法),按照t1.c1对数
- x; G v$ n, T* H据分组,同时计算每个分组内t2.c3列的总和。
0 Z4 u7 N# q* N8 x4)调用Filter运算符,过滤分组后的生成的结果,只返回上一层sum(t2.c3)>5 ^% y5 E8 R |" s1 T7 g% ]% {
=10的行。
4 `- O% W( W( H0 J, t5)调用Sort操作符将结果按照t1.c1排序。9 ?/ L% K1 a: H- X( e; j
6)调用Project操作符,只返回t1.c1和sum(t2.c3)这两列数据。' X. I6 a3 ^( [# f0 H6 j
7)调用Limit操作符执行分页操作,只返回前20条数据。; Y7 o% }; J7 P2 I3 l& x
10.2.1 物理操作符接口
& p) q! ], v7 n2 i4 {( h+ s9.4.2节介绍一期分布式存储引擎中的迭代器接口为ObIterator,通过它,可以将! B# i; q. p4 J
读到的数据以cell为单位逐个迭代出来。然而,数据库操作总是以行为单位的,因+ a: X" v, I" W2 N
此,二期实现数据库功能层时考虑将基于cell的迭代器修改为基于行的迭代器。2 p, ^1 I; g- T8 ]- E
行迭代器接口如下:
; N7 }9 x& X6 G% ^9 s2 P! d" f6 f( R//ObRow表示一行数据内容1 ]2 ?5 O" s" u
class ObRow# A: l1 B4 }! \- K L* p/ a
{' [0 e- ? Q5 i* {- Q8 f
public:
5 a8 v" ?* c3 l. n+ V9 o' d//根据表ID以及列ID获得指定cell
( R* M: s3 L D( |//@param[in]table_id表格ID7 R e( u" g- u5 L2 N! i
//@param[in]column_id列ID
2 W/ V" ~4 l$ B/ ~6 L e//@param[out]cell读到的cell
, O7 x% a4 n& f! wint get_cell(const uint64_t table_id,const uint64_t column_id,ObObj*&cell);
- p! O/ ?( H* e$ ~+ A; J- {7 n//获取第cell_idx个cell9 t7 D. H: J! m L% G
int raw_get_cell(const int64_t cell_idx,const ObObj*&cell,uint64_t&table_id,+ C1 h! V4 U, O' @
uint64_t&column_id);5 R9 Z0 O6 b4 X, Y
//获取本行的列数
- |8 V( y. w( w& B4 M, Aint64_t get_column_num()const;
/ M$ A# t& Z, X- C$ N# {, x# d};
/ V3 C$ u# u7 J, L# l每一行数据(ObRow)包括多个列,每个列的内容包括所在的表0 t4 T) c' m) R9 C
ID(table_id),列ID(column_id)以及列内容(cell)。ObRow提供两种访问方
- ?) q* [# e8 z6 A4 v式:根据table_id和column_id随机访问某个列,以及根据列下标(cell_idx)获取某个. e& s: u, R! p5 l8 {6 q: k
指定列。1 B6 d" U8 S4 [: n2 K6 ^4 Y6 ~( G6 o1 f
物理运算符接口如下:
! a: j* Q& G" L//物理运算符接口9 V/ l. v9 U# A1 A' E8 ?$ s' I1 `
class ObPhyOperator
2 }( }3 Q+ z5 M. \3 K{
! {0 M% N' h4 r& O* Apublic:
8 d" L. H. E3 n& s; e! y//添加子运算符,所有非叶子节点物理运算符都需要调用该接口
: y( s* h! H7 vvirtual int set_child(int32_t child_idx,ObPhyOperator&child_operator);# S* w, z8 v. I4 d: [
//打开物理运算符。申请资源,打开子运算符等
$ b$ x* `( B) s# u. u- e! x x) Wvirtual int open()=0;
9 W. p( \/ z) C2 _//关闭物理运算符。释放资源,关闭子运算符等+ M N7 K T* p2 V6 a, i: F
virtual int close()=0;2 k/ P/ S) x) l/ z5 s( ^6 E
//获得下一行数据内容
* g6 S9 ], g, o3 N6 W//@param[out]row下一行数据内容的引用
, j0 B5 T) k& Y# x: G1 v% y//@return返回码,包括成功、迭代过程中出现错误以及迭代完成# z" ^2 L0 K# {; p! N X
virtual int get_next_row(const ObRow*&row)=0;
1 G2 b s; K4 W};/ G6 @1 P4 e* _; }
ObPhyOperator每次获取一行数据,使用方法如下:
+ ` `3 D) e8 o* p8 L& yObPhyOperator root_operator=root_operator_;//根运算符/ T% I; W3 p3 N% V) p7 B0 [* v9 q
root_operator->open();
- y3 X8 }: ]/ M6 ?ObRow*row=NULL;
) w- h4 y; ~% w$ G, u0 A+ [5 Fwhile(OB_SUCCESS==root_operator->get_next_row(row)), ^/ n: l% w; [9 G! o X
{
8 Z, x+ S N. Q- k7 b+ _% U( aOutput(row);//输出本行
+ `# A! ~0 a8 k}: h1 d) O/ C1 J+ b2 G; ^
root_operator->close();
9 ^ A3 g" u5 L! X$ ?* {% `为什么ObPhyOperator类中有一个set_child接口呢?这是因为所有的物理运算符构
% o+ `, X; w2 X4 c5 l成一个树,每个物理运算的输出结果都可以认为是一个临时的二维表,树中孩子节+ ~- d' A4 ? H, ^! x1 X- u, y' d6 }' g
点的输出总是作为它的父亲节点的输入。例10-1中,叶子节点为一个TableScan类型
" i/ f: a9 ^/ {5 v3 c的物理运算符(称为table_scan_op),它的父亲节点为一个HashGroupBy类型的物理
' ~ |& |' m0 [) }5 P7 `运算符(称为hash_group_by_op),接下来依次为Filter类型物理运算符filter_op,Sort7 X+ e/ j7 _" P4 Z6 r/ e: f
类型物理运算符sort_op,Project类型物理运算符project_op,Limit类型物理运算符
( C8 p2 d2 e! X- l0 a) v. @# Flimit_op。其中,limit_op为根运算符。那么,生成物理运算符时将执行如下语句:! j9 q1 ?5 q+ }
limit_op->set_child(0,project_op);6 t0 S& w( j9 i l, M
project_op->set_child(0,sort_op);
9 O& J. |0 i/ t- jsort_op->set_child(0,filter_op);
! P" d0 M Y) i. y/ ?7 u# s7 Efilter_op->set_child(0,hash_group_by_op);& z I' s3 ]9 \( o& m/ F& u/ ]
hash_group_by_op->set_child(0,table_scan_op);! t- R. D* z5 t) @/ U$ k
root_op=limit_op;" ^9 M8 \( X+ S8 W
SQL最终执行时,只需要迭代root_op(即limit_op)就能够把需要的数据依次迭
7 i9 q" d0 Y7 {) R1 I3 Q代出来。limit_op发现前一批数据迭代完成则驱动下层的project_op获取下一批数据,
. s4 G7 J ^4 f; I! u# r' wproject_op发现前一批数据迭代完成则驱动下层的sort_op获取下一批数据。以此类1 D. U f0 D* L2 i( i+ i
推,直到最底层的table_scan_op不断地从原始表t1中读取数据。
% W# Q7 a( s8 B {# c10.2.2 单表操作
; ?. }# _9 T4 {* E- C单表相关的物理运算符包括:
9 y/ ?9 f9 {0 S: h0 [( J. w●TableScan:扫描某个表格,MergeServer将扫描请求发给请求的各个子表所在
4 M$ Q5 s+ `5 z3 l的ChunkServer,并将ChunkServer返回的结果按照子表范围拼接起来作为输出。如果! s& T# |7 L$ ?) ?
请求涉及多个子表,TabletScan可由多台ChunkServer并发执行。
* g$ {* e }7 ]2 z●Filter:针对每行数据,判断是否满足过滤条件。
% g+ Z5 c, J5 c+ L! Y6 v●Projection:对输入的每一行,根据定义的输出表达式,计算输出结果行。! ]& t2 M0 l# c7 f5 ~
●GroupBy:把输入数据按照指定列进行聚集,对聚集后的每组数据可以执行计
/ b0 E9 C! D! b% u7 d$ r数(count)、求和(sum)、计算最小值(min)、计算最大值(max)、计算平均值$ O6 N b2 b5 d! L3 P6 ]$ x6 ?
(avg)等聚集操作。
" b8 W/ C( t6 ^, S. C●Sort:对输入数据进行整体排序,如果内存不够,需要使用外排序。
. \/ Z+ g/ I: L0 `- p●Limit(offset,count):返回行号在[offset,offset+count)范围内的行。" x9 w3 H$ X g
●Distinct:消除某些列相同的重复行。2 C0 ~- e: y, \( f
GroupBy、Distinct物理操作符可以通过基于排序的算法实现,也可以通过基于哈9 E# L3 S# z, V8 O6 `' J I9 T# q
希的算法实现,分别对应HashGroupBy和MergeGroupBy,以及HashDistinct和
& V- v3 K+ k& k @! X0 SMergeDistinct。下面分别讨论排序算法和哈希算法。
; {6 L+ S3 V3 J, R1.排序算法& @$ X: h* L1 S8 t; B8 p
MergeGroupBy、MergeDistinct以及Sort都需要使用排序算法。通用的<key,value% U5 y% ]! o$ [- X
>排序器可以分为两个阶段:2 w, L1 I0 {. F% S
●数据收集:在数据收集阶段,调用者将<key,value>对依次加入到排序器。如
1 m/ B' @+ y F9 C8 o& p0 {果数据总量超过排序器的内存上限,需要首先将内存中的数据排好序,并存储到外6 A& o9 u, m! p
部磁盘中。
: X! X/ f1 j3 l●迭代输出:迭代第一行数据时,内存中可能有一部分未排序的数据,磁盘中也
$ r9 \0 X5 i5 y* p( n可能有几路已经排好序的数据。因此,首先将内存中的数据排好序。如果数据总量
0 B7 d7 K( _/ y* f不超过排序器内存上限,那么将内存中已经排好序的数据按行迭代输出(内排! { [+ n+ p7 V& q1 _3 u1 ?
序);否则,对内存和磁盘中的部分有序数据执行多路归并,一边归并一边将结果
/ ~+ {. t. Z+ ^: C# R迭代输出。3 ~- f& K5 R5 I% q8 p8 n3 ?
2.哈希算法
: w6 D2 }5 E5 e/ m7 \; FHashGroupBy以及HashDistinct都需要使用哈希算法。假设需要对<key,value>对& }+ Q C9 [% x& m9 N, E
按照key分组,那么首先使用key计算哈希值K,并将这个<key,value>对写入到第K个
: w3 a2 S' f# k1 @桶中。不同的key可能对应相同的哈希桶,因此,还需要对每个哈希桶内的<
. S+ a% H) L4 H( o3 V' Lkey,value>对排序,这样才能使得key相同的元组能够连续迭代出来。哈希算法的难
: C8 M: p0 X8 w, d& p& h& L点在于数据总量超过内存上限的处理,由于篇幅有限,请自行思考。
+ f) @0 A, p; D7 P) K/ B s2 S10.2.3 多表操作
' v% L; f0 j2 y" s: e多表相关的物理操作符主要是Join。最为常见的Join类型包括两种:内连接
) l$ A O2 I6 C1 k(Inner Join)和左外连接(Left Outer Join),而且基本都是等值连接。如果需要连接
2 f% x- r L7 H4 h( I多张表,可以先连接前两张表,再将前两张表连接生成的结果(相当于一张临时0 `7 d0 n1 v2 R. j2 d
表)与第三张表格连接,以此类推。% D, S% Z5 B* p+ X( o0 d
两张表实现等值连接方式主要分为两类:基于排序的算法(MergeJoin)以及基. O$ v3 m0 t6 C! N; v
于哈希的算法(HashJoin)。对于MergeJoin,首先使用Sort运算符分别对输入表格预6 h% f d1 x" I1 _5 W+ X
处理,使得两张输入表都在连接列上排好序,接着按顺序迭代两张输入表,合并连
7 Q D" A+ Y$ r) T接列相同的行并输出;对于HashJoin,首先根据连接列计算哈希值K,并分别将两张
8 U/ P" b7 [- @输入表格的数据写入到第K个桶中。接着,对每个哈希桶按照连接列排序。最后,依. _7 e* O3 H: u9 k. `" J
次对每个哈希桶合并连接列相同的行并输出。' e5 y) R+ n0 P$ K3 G
子查询分为两种:关联子查询和非关联子查询,其中比较常用的是使用IN子句
1 [2 z- h9 g' N2 \" r/ c的非关联子查询。举例如下:+ m" a5 g. g0 j8 ?
例10-3 假设有两张表格:item(商品表,包括商品号item_id,商品名
! w/ x6 D7 ~$ g+ Eitem_name,分类号category_id,),category(类别表,包括分类号category_id,分( P0 O5 h7 U1 }# C; l! I9 o4 o* ]; n
类名category_name)。如果需要查询分类号出现在category表中商品,可以采用图10-. W- u2 u! d: r; r9 I. j3 x
4左边的IN子查询,而这个子查询将被自动转化为图10-4右边的等值连接。如果
: F) P$ l( g9 I% P4 Vcategory表中的category_id列有重复,表连接之前还需要使用distinct运算符来删除重+ L8 L# N7 {, R2 P$ r
复的记录。
! W5 |: E0 ^2 l0 a# |2 {0 L" a! E4 {图 10-4 IN子查询转化为等值连接
# O* Y% u/ w( C7 r例10-4 例10-3中,如果category表只包含category_id为1~10的记录,那么,可/ w2 b+ i. d+ o' ?4 Q
以将IN子查询写成图10-5中的常量表达式。4 t+ Z2 m6 o8 t6 u0 W4 {1 _
图 10-5 IN子查询转化为常量表达式
7 y0 R5 d2 R p+ q% X( G转化为常量表达式后,MergeServer执行SQL计算时,可以将IN后面的常量列表7 Z$ T8 P2 ~7 O( c
发送给ChunkServer,ChunkServer只返回category_id在常量列表中的商品记录,而不是
* F: {) n& B5 P0 w1 L将所有的记录返回给MergeServer过滤,从而减少二者之间传输的数据量。
% @. ]* d' L5 y1 \OceanBase多表操作做得还很粗糙,例如不支持嵌套连接(Nested Loop Join),
+ w! j5 P& z: A& h/ n; A3 `不支持非等值连接,不支持查询优化等,后续将在合适的时间对这一部分代码进行
* n8 A% [9 ] x r4 S重构。
! a) g n* j) W2 ^10.2.4 SQL执行本地化- W; E8 u- R6 [" Q, s7 ?& g
MergeServer包含SQL执行模块MS-SQL,ChunkServer也包含SQL执行模块CS-
3 c$ V/ Y# P0 a' Q; Z8 Q3 @SQL,那么,如何区分二者的功能呢?多表操作由MergeServer执行,对于单表操
7 B5 m% n3 L, H; g3 G作,OceanBase设计的基本原则是尽量支持SQL计算本地化,保持数据节点与计算节
) Q1 I$ a# T* e8 _! C2 v点一致,也就是说,只要ChunkServer能够实现的操作,原则上都应该由它来完成。
8 V8 { b o0 Q/ r; \8 x8 T●TableScan:每个ChunkServer扫描各自子表范围内的数据,由MergeServer合并: Y; G) g7 T( i5 @
ChunkServer返回的部分结果。
1 d& R0 N( ~0 \) K; s●Filter:对基本表的过滤集成在TableScan操作符中,由ChunkServer完成。对分2 Y% b( d. |' H9 D
组后的结果执行过滤(Having)集成在GroupBy操作符中,一般情况下由MergeServer
* @2 C* Y# b& E9 \. B* W& k! ^1 D完成;但是,如果能够确定每个分组的所有数据行只属于同一个子表,比如SQL请求! i1 O+ m* b* G$ _' R
只涉及一个tablet,那么,分组以及分组后的过滤操作符可以由ChunkServer完成。8 X9 T7 Z# ?2 ^( _5 h2 H ?8 O
●Projection:对基本表的投影集成在TableScan操作符中,由ChunkServer完成,
8 O. v5 ], Z: ]对最终结果的投影由MergeServer完成。
d% _9 K0 h% v* `●GroupBy:如果SQL读取的数据只在一个子表上,那么由该子表所在的
# e6 k* `/ j" s4 Y% GChunkServer完成分组操作;否则,每台ChunkServer各自完成部分数据的分组操作,( f7 ]. q" l- ^" l5 W4 F
执行聚合运算后得到部分结果,再由MergeServer合并所有ChunkServer返回的部分结0 t0 D% z9 ^+ _1 n2 v: m
果,对于属于同一个分组的数据再次执行聚合运算。某些聚合运算需要做特殊处/ y' y0 A0 p; v: o& p
理,比如avg,需要转化为sum和count操作发送给ChunkServer,MergeServer合并
3 U" ?' L/ W) rChunkServer返回的部分结果后计算出最终的sum和count值,并通过sum/count得到avg @- Z- r6 m; s$ T. S
的最终结果。
: U F# `: `/ U5 k+ f1 x$ D●Sort:如果SQL读取的数据只在一个子表上,那么由该子表所在的ChunkServer# d* d5 L+ O* u9 R, O
完成排序操作;否则,每台ChunkServer各自完成部分数据的排序,并将排好序的部 F9 |2 g3 i* r* M1 r9 _* X
分数据返回MergeServer,再由MergeServer执行多路归并。
% o$ S4 B+ L6 J●Limit:Limit操作一般由MergeServer完成,但是,如果请求的数据只在一个子! z0 u$ @& \: e0 e/ D: n0 g
表上,可以由ChunkServer完成,这往往会大大减少MergeServer与ChunkServer之间传( T" S- m" V$ x9 B' u
输的数据量。
% {' O. l0 p$ y. a( b●Distinct:Distinct与GroupBy类似。ChunkServer先完成部分数据的去重,再由
% ~0 H" D1 F# r$ [" y: i+ OMergeServer进行整体去重。2 l2 Y& G& f4 x7 W
例10-5 图10-2中的SQL语句为"select c1,sum(c2)from t1 where c3=10 group
9 r7 i' d9 a# L+ ~" m1 vby c1 having sum(c2)>=10 order by c1 limit 0,20"。执行步骤如下:
7 k7 j# x8 {( r l1 _' g1)ChunkServer调用TableScan操作符,读取子表t1中的数据,该操作符还将执行& l' H9 w- W) j
投影(Project)和过滤(Filter),返回的结果只包含c3=10的数据行,且每行只包含# y; [. l" M3 W2 a' Z$ a
c1、c2、c3三列。7 Q. G( K- {% E0 _. E+ Q
2)ChunkServer调用HashGroupBy操作符(假设采用基于哈希的分组算法),按8 ~& _4 B3 g- I1 F
照c1对数据分组,同时计算每个分组内c2列的总和sum(c2)。
& Q2 x& q2 B, T. z% Z3)每个ChunkServer将分组后的部分结果返回MergeServer,MergeServer将来自不 j# J6 W% L) {7 I; L* m& z0 p
同ChunkServer的c1列相同的行合并在一起,再次执行sum运算。 r5 J, a6 C9 I- @
4)MergeServer调用Filter操作符,过滤第3)步生成的最终结果,只返回
, G0 [7 J6 W' E; f) ]sum(c2)>=10的行。& _+ }- F6 d7 J' K) s# ~5 r' o* f9 X
5)MergeServer调用Sort操作符将结果按照c1排序。
0 N0 C2 H+ @3 q2 c p6)MergeServer调用Project操作符,只返回c1和sum(c2)这两列数据。
; o# k2 {! {7 q7)MergeServer调用Limit操作符执行分页操作,只返回前20条数据。
5 C& b( y( \3 u/ d3 v当然,如果能够确定请求的数据全部属于同一个子表,那么,所有的物理运算- x( G; Q! d' C! ?) W& y* J
符都可以由ChunkServer执行,MergeServer只需要将ChunkServer计算得到的结果转发
P2 e9 M m/ F2 N" C给客户端。" C/ n/ Q) F& c0 G5 S- F4 s
, \9 g8 ?5 G# S7 Y% p8 Y8 K
9 b. S" x3 y. q9 D
|
|