《大规模分布式存储系统》第10章 数据库功能【10.2】
10.2 只读事务只读事务(SELECT语句),经过词法分析、语法分析,预处理后,转化为逻辑
查询计划和物理查询计划。以SQL语句select c1,c2 from t1 where id=1 group by c1
order by c2为例,MergeServer收到该语句后将调用ObSql类的静态方法
direct_execute,执行步骤如下:
1)调用flex、bison解析SQL语句生成一个语法树。
2)解析语法树,生成逻辑执行计划ObSelectStmt。ObSelectStmt结构中记录了
SQL语句扫描的表格名(t1),投影列(c1,c2),过滤条件(id=1),分组列
(c1)以及排序列(c2)。
3)根据逻辑执行计划生成物理执行计划。ObSelectStmt只是表达了一种意图,
但并不知道实际如何执行,ObTransformer类的generate_physical_plan将ObSelectStmt转
化为物理执行计划。
逻辑查询计划的改进以及物理查询计划的选择,即查询优化器,是关系数据库
最难的部分,OceanBase目前在这一部分的工作不多。因此,本节不会涉及太多关于
如何生成物理查询计划的内容,下面仅以两个例子说明OceanBase的物理查询计划。
例10-1 假设有一个单表SQL语句如图10-2所示。
图 10-2 单表物理查询计划示例
单表SQL语句执行过程如下:
1)调用TableScan操作符,读取子表t1中的数据,该操作符还将执行投影
(Project)和过滤(Filter),返回的结果只包含c3=10的数据行,且每行只包含c1、
c2、c3三列。
2)调用HashGroupBy操作符(假设采用基于哈希的分组算法),按照c1对数据
分组,同时计算每个分组内c2列的总和。
3)调用Filter操作符,过滤分组后生成的结果,只返回上一层sum(c2)>=10的
行。
4)调用Sort操作符将结果按照c1排序。
5)调用Project操作符,只返回c1和sum(c2)这两列数据。
6)调用Limit操作符执行分页操作,只返回前20条数据。
例10-2 假设有一个需要联表的SQL语句如图10-3所示。
图 10-3 多表物理查询计划示例
多表SQL语句执行过程如下:
1)调用TableScan分别读取t1和t2的数据。对于t1,使用条件c3=10对结果进行过
滤,t1和t2都只需要返回c1,c2,c3这三列数据。
2)假设采用基于排序的表连接算法,t1和t2分别按照t1.c2和t2.c2排序后,调用
Merge Join运算符,以t1.c2=t2.c2为条件执行等值连接。
3)调用HashGroupBy运算符(假设采用基于哈希的分组算法),按照t1.c1对数
据分组,同时计算每个分组内t2.c3列的总和。
4)调用Filter运算符,过滤分组后的生成的结果,只返回上一层sum(t2.c3)>
=10的行。
5)调用Sort操作符将结果按照t1.c1排序。
6)调用Project操作符,只返回t1.c1和sum(t2.c3)这两列数据。
7)调用Limit操作符执行分页操作,只返回前20条数据。
10.2.1 物理操作符接口
9.4.2节介绍一期分布式存储引擎中的迭代器接口为ObIterator,通过它,可以将
读到的数据以cell为单位逐个迭代出来。然而,数据库操作总是以行为单位的,因
此,二期实现数据库功能层时考虑将基于cell的迭代器修改为基于行的迭代器。
行迭代器接口如下:
//ObRow表示一行数据内容
class ObRow
{
public:
//根据表ID以及列ID获得指定cell
//@paramtable_id表格ID
//@paramcolumn_id列ID
//@paramcell读到的cell
int get_cell(const uint64_t table_id,const uint64_t column_id,ObObj*&cell);
//获取第cell_idx个cell
int raw_get_cell(const int64_t cell_idx,const ObObj*&cell,uint64_t&table_id,
uint64_t&column_id);
//获取本行的列数
int64_t get_column_num()const;
};
每一行数据(ObRow)包括多个列,每个列的内容包括所在的表
ID(table_id),列ID(column_id)以及列内容(cell)。ObRow提供两种访问方
式:根据table_id和column_id随机访问某个列,以及根据列下标(cell_idx)获取某个
指定列。
物理运算符接口如下:
//物理运算符接口
class ObPhyOperator
{
public:
//添加子运算符,所有非叶子节点物理运算符都需要调用该接口
virtual int set_child(int32_t child_idx,ObPhyOperator&child_operator);
//打开物理运算符。申请资源,打开子运算符等
virtual int open()=0;
//关闭物理运算符。释放资源,关闭子运算符等
virtual int close()=0;
//获得下一行数据内容
//@paramrow下一行数据内容的引用
//@return返回码,包括成功、迭代过程中出现错误以及迭代完成
virtual int get_next_row(const ObRow*&row)=0;
};
ObPhyOperator每次获取一行数据,使用方法如下:
ObPhyOperator root_operator=root_operator_;//根运算符
root_operator->open();
ObRow*row=NULL;
while(OB_SUCCESS==root_operator->get_next_row(row))
{
Output(row);//输出本行
}
root_operator->close();
为什么ObPhyOperator类中有一个set_child接口呢?这是因为所有的物理运算符构
成一个树,每个物理运算的输出结果都可以认为是一个临时的二维表,树中孩子节
点的输出总是作为它的父亲节点的输入。例10-1中,叶子节点为一个TableScan类型
的物理运算符(称为table_scan_op),它的父亲节点为一个HashGroupBy类型的物理
运算符(称为hash_group_by_op),接下来依次为Filter类型物理运算符filter_op,Sort
类型物理运算符sort_op,Project类型物理运算符project_op,Limit类型物理运算符
limit_op。其中,limit_op为根运算符。那么,生成物理运算符时将执行如下语句:
limit_op->set_child(0,project_op);
project_op->set_child(0,sort_op);
sort_op->set_child(0,filter_op);
filter_op->set_child(0,hash_group_by_op);
hash_group_by_op->set_child(0,table_scan_op);
root_op=limit_op;
SQL最终执行时,只需要迭代root_op(即limit_op)就能够把需要的数据依次迭
代出来。limit_op发现前一批数据迭代完成则驱动下层的project_op获取下一批数据,
project_op发现前一批数据迭代完成则驱动下层的sort_op获取下一批数据。以此类
推,直到最底层的table_scan_op不断地从原始表t1中读取数据。
10.2.2 单表操作
单表相关的物理运算符包括:
●TableScan:扫描某个表格,MergeServer将扫描请求发给请求的各个子表所在
的ChunkServer,并将ChunkServer返回的结果按照子表范围拼接起来作为输出。如果
请求涉及多个子表,TabletScan可由多台ChunkServer并发执行。
●Filter:针对每行数据,判断是否满足过滤条件。
●Projection:对输入的每一行,根据定义的输出表达式,计算输出结果行。
●GroupBy:把输入数据按照指定列进行聚集,对聚集后的每组数据可以执行计
数(count)、求和(sum)、计算最小值(min)、计算最大值(max)、计算平均值
(avg)等聚集操作。
●Sort:对输入数据进行整体排序,如果内存不够,需要使用外排序。
●Limit(offset,count):返回行号在[offset,offset+count)范围内的行。
●Distinct:消除某些列相同的重复行。
GroupBy、Distinct物理操作符可以通过基于排序的算法实现,也可以通过基于哈
希的算法实现,分别对应HashGroupBy和MergeGroupBy,以及HashDistinct和
MergeDistinct。下面分别讨论排序算法和哈希算法。
1.排序算法
MergeGroupBy、MergeDistinct以及Sort都需要使用排序算法。通用的<key,value
>排序器可以分为两个阶段:
●数据收集:在数据收集阶段,调用者将<key,value>对依次加入到排序器。如
果数据总量超过排序器的内存上限,需要首先将内存中的数据排好序,并存储到外
部磁盘中。
●迭代输出:迭代第一行数据时,内存中可能有一部分未排序的数据,磁盘中也
可能有几路已经排好序的数据。因此,首先将内存中的数据排好序。如果数据总量
不超过排序器内存上限,那么将内存中已经排好序的数据按行迭代输出(内排
序);否则,对内存和磁盘中的部分有序数据执行多路归并,一边归并一边将结果
迭代输出。
2.哈希算法
HashGroupBy以及HashDistinct都需要使用哈希算法。假设需要对<key,value>对
按照key分组,那么首先使用key计算哈希值K,并将这个<key,value>对写入到第K个
桶中。不同的key可能对应相同的哈希桶,因此,还需要对每个哈希桶内的<
key,value>对排序,这样才能使得key相同的元组能够连续迭代出来。哈希算法的难
点在于数据总量超过内存上限的处理,由于篇幅有限,请自行思考。
10.2.3 多表操作
多表相关的物理操作符主要是Join。最为常见的Join类型包括两种:内连接
(Inner Join)和左外连接(Left Outer Join),而且基本都是等值连接。如果需要连接
多张表,可以先连接前两张表,再将前两张表连接生成的结果(相当于一张临时
表)与第三张表格连接,以此类推。
两张表实现等值连接方式主要分为两类:基于排序的算法(MergeJoin)以及基
于哈希的算法(HashJoin)。对于MergeJoin,首先使用Sort运算符分别对输入表格预
处理,使得两张输入表都在连接列上排好序,接着按顺序迭代两张输入表,合并连
接列相同的行并输出;对于HashJoin,首先根据连接列计算哈希值K,并分别将两张
输入表格的数据写入到第K个桶中。接着,对每个哈希桶按照连接列排序。最后,依
次对每个哈希桶合并连接列相同的行并输出。
子查询分为两种:关联子查询和非关联子查询,其中比较常用的是使用IN子句
的非关联子查询。举例如下:
例10-3 假设有两张表格:item(商品表,包括商品号item_id,商品名
item_name,分类号category_id,),category(类别表,包括分类号category_id,分
类名category_name)。如果需要查询分类号出现在category表中商品,可以采用图10-
4左边的IN子查询,而这个子查询将被自动转化为图10-4右边的等值连接。如果
category表中的category_id列有重复,表连接之前还需要使用distinct运算符来删除重
复的记录。
图 10-4 IN子查询转化为等值连接
例10-4 例10-3中,如果category表只包含category_id为1~10的记录,那么,可
以将IN子查询写成图10-5中的常量表达式。
图 10-5 IN子查询转化为常量表达式
转化为常量表达式后,MergeServer执行SQL计算时,可以将IN后面的常量列表
发送给ChunkServer,ChunkServer只返回category_id在常量列表中的商品记录,而不是
将所有的记录返回给MergeServer过滤,从而减少二者之间传输的数据量。
OceanBase多表操作做得还很粗糙,例如不支持嵌套连接(Nested Loop Join),
不支持非等值连接,不支持查询优化等,后续将在合适的时间对这一部分代码进行
重构。
10.2.4 SQL执行本地化
MergeServer包含SQL执行模块MS-SQL,ChunkServer也包含SQL执行模块CS-
SQL,那么,如何区分二者的功能呢?多表操作由MergeServer执行,对于单表操
作,OceanBase设计的基本原则是尽量支持SQL计算本地化,保持数据节点与计算节
点一致,也就是说,只要ChunkServer能够实现的操作,原则上都应该由它来完成。
●TableScan:每个ChunkServer扫描各自子表范围内的数据,由MergeServer合并
ChunkServer返回的部分结果。
●Filter:对基本表的过滤集成在TableScan操作符中,由ChunkServer完成。对分
组后的结果执行过滤(Having)集成在GroupBy操作符中,一般情况下由MergeServer
完成;但是,如果能够确定每个分组的所有数据行只属于同一个子表,比如SQL请求
只涉及一个tablet,那么,分组以及分组后的过滤操作符可以由ChunkServer完成。
●Projection:对基本表的投影集成在TableScan操作符中,由ChunkServer完成,
对最终结果的投影由MergeServer完成。
●GroupBy:如果SQL读取的数据只在一个子表上,那么由该子表所在的
ChunkServer完成分组操作;否则,每台ChunkServer各自完成部分数据的分组操作,
执行聚合运算后得到部分结果,再由MergeServer合并所有ChunkServer返回的部分结
果,对于属于同一个分组的数据再次执行聚合运算。某些聚合运算需要做特殊处
理,比如avg,需要转化为sum和count操作发送给ChunkServer,MergeServer合并
ChunkServer返回的部分结果后计算出最终的sum和count值,并通过sum/count得到avg
的最终结果。
●Sort:如果SQL读取的数据只在一个子表上,那么由该子表所在的ChunkServer
完成排序操作;否则,每台ChunkServer各自完成部分数据的排序,并将排好序的部
分数据返回MergeServer,再由MergeServer执行多路归并。
●Limit:Limit操作一般由MergeServer完成,但是,如果请求的数据只在一个子
表上,可以由ChunkServer完成,这往往会大大减少MergeServer与ChunkServer之间传
输的数据量。
●Distinct:Distinct与GroupBy类似。ChunkServer先完成部分数据的去重,再由
MergeServer进行整体去重。
例10-5 图10-2中的SQL语句为"select c1,sum(c2)from t1 where c3=10 group
by c1 having sum(c2)>=10 order by c1 limit 0,20"。执行步骤如下:
1)ChunkServer调用TableScan操作符,读取子表t1中的数据,该操作符还将执行
投影(Project)和过滤(Filter),返回的结果只包含c3=10的数据行,且每行只包含
c1、c2、c3三列。
2)ChunkServer调用HashGroupBy操作符(假设采用基于哈希的分组算法),按
照c1对数据分组,同时计算每个分组内c2列的总和sum(c2)。
3)每个ChunkServer将分组后的部分结果返回MergeServer,MergeServer将来自不
同ChunkServer的c1列相同的行合并在一起,再次执行sum运算。
4)MergeServer调用Filter操作符,过滤第3)步生成的最终结果,只返回
sum(c2)>=10的行。
5)MergeServer调用Sort操作符将结果按照c1排序。
6)MergeServer调用Project操作符,只返回c1和sum(c2)这两列数据。
7)MergeServer调用Limit操作符执行分页操作,只返回前20条数据。
当然,如果能够确定请求的数据全部属于同一个子表,那么,所有的物理运算
符都可以由ChunkServer执行,MergeServer只需要将ChunkServer计算得到的结果转发
给客户端。
页:
[1]