Part 1 Background
First of all, we will start with a simple introduction of the structure of a database kernel. The execution of a sql statement, mainly involves the following components:
- Parser - Parse sql statements to an Abstract Syntax Tree (AST).
- Binder - Perform semantic analysis to ensure that the statements are logically correct. Check the existence of tables, columns and functions in sqls, and bind the logical names with the actual objects in the database.
- Planner - After binding phase, the planner generates a concrete execution plan for the query. The execution plan specifies the sequence of steps required to retrieve, filter, join, and aggregate data to produce the query result.
- Executor - Execute the query according to the execution plan generated by the planner.
Based on previous learning experiences, I personally believe that the quickest way to become familiar with a new codebase that one has never encountered before is: follow the most common execution path, combine code inspection with step-by-step debuging, clarify the main logic, and then gradually expand from the mainline to various components and features, until the entire project is covered. In this article, we will start from the most common sql statement select … from … join … where … group by … order by … limit …
to quickly familiarize ourselves with the code of MatrixOne kernel front-end (mainly including binder, planner and optimizer).
Part 2 Implementation Process
MatrixOne server handles the request from the client, we ignore the process of passing requests in between, the final request will enter the MysqlCmdExecutor.doComQuery()
method, the general logic of this method is:
- Parser parses the sql into a statement list, also known as AST list;
- Executes each statement.
// pkg/frontend/mysql_cmd_executor.go doComQuery() { cws = GetComputationWrapper() for cw in cws { // Execute Statement executeStmt(cw) } } GetComputationWrapper() { // Parse SQL Statement asts = parsers.Parse(sql) return [InitTxnComputationWrapper(stmt) for stmt in asts] }
MatrixOne is compatible with MySQL protocol. According to the MySQL protocol, MO will first return column information, then send detail data, and finally returning an EOF packet to end the communication. Column information is available after the execution plan is generated, and the specific data will be sent after the runner gets the results.
// pkg/frontend/mysql_cmd_executor.go
executeStmt(cw) {
// Generate an execution plan
compile = cw.Compile()
// Send column information
for c in cw.columns {
SendColumnDefinitionPacket(c)
}
SendEOFPacketIf()
// Execute the query
runner = compile.(ComputationRunner)
runner.Run()
// End communication
sendEOFOrOkPacket()
}
Part 3 Generating an Execution Plan
The Compile method mentioned in the previous parts do two main things: generates the execution plan and compiles the executor-related arithmetic.
// pkg/frontend/computation_wrapper.go
Compile() {
// Build and optimize the execution plan
cw.plan = buildPlan(..)
// Compile the execution code
cw.compile = compile.New(...)
cw.compile.Compile()
}
Let's look at the buildPlan method first, and since much of the code below is a simple wrapper call, the call path: buildPlan(mysql_cmd_executor.go) -> BaseOptimizer.Optimize -> BuildPlan(build.go) -> runBuildSelectByBinder
. We'll skip the middle part and go straight to the runBuildSelectByBinder method:
// pkg/sql/plan/build.go
runBuildSelectByBinder(stmt) {
// Generate the execution plan
builder := NewQueryBuilder(SelectType...)
bindCtx := NewBindContext(builder)
rootId := builder.buildSelect(stmt, bindCtx, true)
builder.qry.Steps = append(builder.qry.Steps, rootId)
// Optimizing the execution plan
query := builder.createQuery()
return &Plan{query}
}
As described above, the main responsibility of the binder is to bind the database/table/column names in the query statement to the corresponding objects in the database. The method to bind select statements in MatrixOne is to buildSelect
, and the code is roughly as follows:
// pkg/sql/plan/query_builder.go
buildSelect() {
preprocess WITH
build FROM
unfold stars and generate headings
bind WHERE
bind GROUP BY
bind HAVING
bind PROJECTION
bind TIME WINDOW
bind ORDER BY
bind LIMIT/OFFSET
append AGGREGATION node
append TIME WINDOW node
append WINDOW node
append PROJECT node
append DISTINCT node
append SORT node (include LIMIT/OFFSET)
append result PROJECT node
}
You can see that buildSelect
method binds each part of the select statement (WITH, FROM, WHERE (HAVING), GROUP BY, TIME WINDOW, ORDER BY and LIMIT/OFFSET) in order, the following is an introduction to most commonly used part:
1. FROM
The method in MatrixOne to bind FROM clause is buildTable
.
// pkg/sql/plan/query_builder.go
buildTable(stmt) {
switch tbl := stmt.(type) {
case *tree.Select:
case *tree.TableName:
// Get database, table name
schema := tbl.SchemaName
table := tbl.ObjectName
// check db
schema, err = databaseIsValid(schema, builder.compCtx)
if err != nil {
return 0, err
}
// Get tableDef
obj, tableDef := builder.compCtx.Resolve(schema, table)
if tableDef == nil {
return 0, moerr.NewParseError("table %q does not exist", table)
}
// Generate node
nodeID = builder.appendNode(&plan.Node{
NodeType: nodeType,
Stats: nil,
ObjRef: obj,
TableDef: tableDef,
BindingTags: []int32{builder.genNewTag()},
}, ctx)
case *tree.JoinTableExpr:
// If you have a single table, bind left table
if tbl.Right == nil {
return builder.buildTable(tbl.Left, ctx, preNodeId, leftCtx)
}
// Else, buildJoinTable
return builder.buildJoinTable(tbl, ctx)
case *tree.TableFunction:
case *tree.ParenTableExpr:
case *tree.AliasedTableExpr:
// bind table
nodeID = builder.buildTable(tbl.Expr)
// addBinding
builder.addBinding(nodeID, ctx)
}
}
According to sql syntax, the types of expressions that can follow FROM are:
- select expression
- table name
- join table expression
- Table functions
- Table expression with parentheses
- Table expressions with aliases
Table expression includestable name (TableName
), aliased table expression (AliasedTableExpr
) and join table expression (JoinTableExpr
). To maintain formal uniformity, MatrixOne's sql parser wraps each TableName
in an AliasedTableExpr
. As field will be set to nil
if it has no alias; each AliasedTableExpr
is wrapped into a JoinTableExpr
, and Right
field will be set to nil
if it is just a single table without a join.
As an example, the sql statement SELECT * FROM t1
, parse gets a structure of t1 similar to this:
JoinTableExpr {
Left: AliasedTableExpr {
Expr: t1
As: nil
}
Right: nil
}
The process of binding the FROM clause of this sql is roughly like this: first JoinTableExpr
right table is nil, then only bind the left table; the left table is an AliasedTableExpr
structure, first bind Expr
(here is the structure of the TableName
), and then add the information of the bind into the BindContext(ctx)
(addBinding
method).
The logic of bind TableName
is roughly as follows, first verify whether the db exists, if it exists to get the table meta information, get the table's column information (i.e., the Resolve
method, if the table does not exist to return nil
), and then wrap it into a plan.Node
and return.
The addBinding
method wraps information from the bind node, such as bindingTag
, nodeID
, table information (tableName
, tableID
), and column information (colName
, colType
, and whether or not to hide) into a Binding structure and adds it to the ctx. In the subsequent bind process, the table/column names can be parsed based on these binding information.
// pkg/sql/plan/query_builder.go
addBinding(nodeID, ctx) {
// Get node
node := getNode(nodeID)
// Generate col information
colLength := len(node.TableDef.Cols)
cols := make([]string, colLength)
colIsHidden := make([]bool, colLength)
types := make([]*plan.Type, colLength)
tag := node.BindingTags[0]
for i, col := range node.TableDef.Cols {
if i < len(alias.Cols) {
cols[i] = string(alias.Cols[i])
} else {
cols[i] = col.Name
}
colIsHidden[i] = col.Hidden
types[i] = col.Typ
name := table + "." + cols[i]
builder.nameByColRef[[2]int32{tag, int32(i)}] = name
}
// Generate Binding structure
binding = NewBinding(tag, nodeID, table, node.TableDef.TblId, cols, colIsHidden, types)
// Add Binding to BindContext
ctx.bindings = append(ctx.bindings, binding)
ctx.bindingByTag[binding.tag] = binding
ctx.bindingByTable[binding.table] = binding
}
2. WHERE / HAVING
The bind process of the WHERE clause is: first split the filter condition with AND as the separator, and then bind each expression recursively, for example, the filter condition A AND B AND C can be split into A, B and C, and then bind(A), bind(B), bind(C) respectively; After the filter binds, the statement containing the subquery needs to be flattened to convert the subquery into an equivalent join statement. Because the execution efficiency of the subquery is extremely low, it will slow down the speed of the entire query, and converting it to a join can greatly improve system performance.
The problem of how to convert a subquery to a join is a bit more complex, so you can refer to the paper if you are interested.
The HAVING clause, like the WHERE clause, acts as a filter, but syntactically does not support subqueries. The bind process is the same as the first half of WHERE, so I won't go into that.
// pkg/sql/plan/query_builder.go
// Split filter condition
whereList := splitAndBindCondition(clause.Where.Expr, NoAlias, ctx)
var newFilterList []*plan.Expr
var expr *plan.Expr
for _, cond := range whereList {
// Optimizing subqueries
nodeID, expr, err = builder.flattenSubqueries(nodeID, cond, ctx)
newFilterList = append(newFilterList, expr)
}
// Generate node and add to ctx
nodeID = builder.appendNode(&plan.Node{
NodeType: plan.Node_FILTER,
Children: []int32{nodeID},
FilterList: newFilterList,
NotCacheable: notCacheable,
}, ctx)
3. GROUP BY
The bind process for GROUP BY clauses is a little simpler than HAVING, both need to split the expression and directly recursively bind each GROUP BY sub-expression.
The following PROEJCTION, ORDER BY, LIMIT/OFFSET bind process is basically similar to GROUP BY, in addition to the need to do some additional information to set up (such as sorting order etc.) and check (such as limit count can not be a negative number, etc.), I'll not repeat the description.
// pkg/sql/plan/query_builder.go
groupBinder := NewGroupBinder(builder, ctx)
for _, group := range clause.GroupBy {
// Complement column names, col1 -> t1.col1
group = ctx.qualifyColumnNames(group, AliasAfterColumn)
// Bind subexpression
groupBinder.BindExpr(group, 0, true)
}
4. PROJECTION
// pkg/sql/plan/query_builder.go
for i := range selectList {
// Bind subexpression
expr := projectionBinder.BindExpr(selectList[i].Expr, 0, true)
ctx.projects = append(ctx.projects, expr)
}
5. ORDER BY
// pkg/sql/plan/query_builder.go
orderBinder := NewOrderBinder(projectionBinder, selectList)
orderBys := make([]*plan.OrderBySpec, 0, len(astOrderBy))
for _, order := range astOrderBy {
// Bind subexpression
expr := orderBinder.BindExpr(order.Expr)
orderBy := &plan.OrderBySpec{
Expr: expr,
Flag: plan.OrderBySpec_INTERNAL,
}
// Set Flag
set orderBy.Flag
orderBys = append(orderBys, orderBy)
}
6. LIMIT/OFFSET
// pkg/sql/plan/query_builder.go
limitBinder := NewLimitBinder(builder, ctx)
if astLimit.Offset != nil {
// Bind subexpression
offsetExpr := limitBinder.BindExpr(astLimit.Offset, 0, true)
// Check non-negativity
if ifNegative(offsetExpr) {
return 0, moerr.NewSyntaxError(builder.GetContext(), "offset value must be nonnegative")
}
}
if astLimit.Count != nil {
// Bind subexpression
limitExpr := limitBinder.BindExpr(astLimit.Count, 0, true)
// Check non-negativity
if ifNegative(limitExpr) {
return 0, moerr.NewSyntaxError(builder.GetContext(), "offset value must be nonnegative")
}
}
Part 4 Optimize the Execution Plan
The runBuildSelectByBinder
method, in addition to the buildSelect
method, there is also a part of the work that does query optimization, namely the createQuery
method:
// pkg/sql/plan/query_builder.go
createQuery() {
for i, rootID := range builder.qry.Steps {
// rule 1
builder.rewriteDistinctToAGG(rootID)
// rule 2
builder.rewriteEffectlessAggToProject(rootID)
// rule 3
rootID = builder.pushdownFilters(rootID, nil, false)
// rule 4
err := foldTableScanFilters(builder.compCtx.GetProcess(), builder.qry, rootID)
if err != nil {
return nil, err
}
// rule 5
builder.pushdownLimit(rootID)
// rule 6
builder.removeSimpleProjections(rootID, plan.Node_UNKNOWN, false, make(map[[2]int32]int))
...
// rule n
}
}
From the code, the logic of the createQuery
method is simple: it applies all the optimization rules, one by one, to the previously generated execution plan. Currently the most common optimization rules are: predicate push down, column prune, projection elimination, outer join elimination, aggregation operator push down/up, subquery optimization (as mentioned above, MatrixOne already does this optimization in the generation of PLAN phase), and so on.
A common and important strategy is presented here: predicate push down.
Predicate is also known as the filter conditions, predicate push down from the intuitively very easy to understand an optimization: the closer to the position of the data read to do filtering, the subsequent need to deal with the amount of data will be less, the overall query efficiency will be higher.
For example, this sql: select * from t1, t2 where t1.col1 > 1 and t2.col2 > 2
. Assuming that t1, t2 have 10,000 records, the amount of data in the two tables that meet the filtering conditions are 100. If you first do the Cartesian product on t1,t2 and then do the filtering, the data to be processed is 100 million. If you first do the filtering on t1,t2 first and then do the Cartesian product, then the number of data to be processed is only 10,000, which can greatly reduce the amount of data to be processed.
The predicate pushdown optimization in MatrixOne is in the pushdownFilters
method:
// pkg/sql/plan/opt_misc.go
// filters are the filters that are pushed down to the current node
pushdownFilters(nodeID, filters) {
node := builder.qry.Nodes[nodeID]
var canPushdown, cantPushdown []*plan.Expr
// Statements with limit cannot be extrapolated
if node.Limit != nil {
cantPushdown = filters
filters = nil
}
switch node.NodeType {
case plan.Node_FILTER:
canPushdown = filters
// Collect the filter conditions for the current node
for _, filter := range node.FilterList {
canPushdown = append(canPushdown, splitPlanConjunction(filter)...)
}
// Push down to child nodes
childID, cantPushdownChild := pushdownFilters(node.Children[0], canPushdown)
// If all filters are pushed down, the current node can be deleted
if len(cantPushdownChild) > 0 {
node.Children[0] = childID
node.FilterList = cantPushdownChild
} else {
nodeID = childID
}
case plan.Node_JOIN:
var leftPushdown, rightPushdown []*plan.Expr
for i, filter := range filters {
joinSides[i] = getJoinSide(filter, leftTags, rightTags, markTag)
// Whether can convert a left join to an inner join
if node.JoinType == plan.Node_LEFT && joinSides[i]&JoinSideRight != 0 && rejectsNull(filter, builder.compCtx.GetProcess()) {
node.JoinType = plan.Node_INNER
break
}
}
node.OnList = splitPlanConjunctions(node.OnList)
if node.JoinType == plan.Node_INNER {
// If it can be converted to an inner join, recalculate the joinSide
for _, cond := range node.OnList {
// Collecting filter conditions for ON clauses
filters = append(filters, splitPlanConjunction(cond)...)
}
node.OnList = nil
joinSides = make([]int8, len(filters))
for i, filter := range filters {
joinSides[i] = getJoinSide(filter, leftTags, rightTags, markTag)
}
} else if node.JoinType == plan.Node_LEFT {
// If you can't transform it into an inner join, collect filters that can be extrapolated to the right table.
var newOnList []*plan.Expr
for _, cond := range node.OnList {
conj := splitPlanConjunction(cond)
for _, conjElem := range conj {
side := getJoinSide(conjElem, leftTags, rightTags, markTag)
if side&JoinSideLeft == 0 {
rightPushdown = append(rightPushdown, conjElem)
} else {
newOnList = append(newOnList, conjElem)
}
}
}
node.OnList = newOnList
}
// Collect separately the conditions which can be extrapolated down to the left and right tables
for i, filter := range filters {
switch joinSides[i] {
case JoinSideLeft:
if node.JoinType != plan.Node_OUTER {
leftPushdown = append(leftPushdown, filter)
} else {
cantPushdown = append(cantPushdown, filter)
}
case JoinSideRight:
if node.JoinType == plan.Node_INNER {
rightPushdown = append(rightPushdown, filter)
} else {
cantPushdown = append(cantPushdown, filter)
}
// ... case XXXX
default:
cantPushdown = append(cantPushdown, filter)
}
}
// Push down to the left child node
childID, cantPushdownChild := pushdownFilters(node.Children[0], leftPushdown)
// If there is a condition that cannot be pushed down, the left child node adds a FILTER node
if len(cantPushdownChild) > 0 {
childID = builder.appendNode(&plan.Node{
NodeType: plan.Node_FILTER,
Children: []int32{node.Children[0]},
FilterList: cantPushdownChild,
}, nil)
}
node.Children[0] = childID
// Push down to the right child node, handle as above
childID, cantPushdownChild = pushdownFilters(node.Children[1], rightPushdown)
if len(cantPushdownChild) > 0 {
childID = builder.appendNode(&plan.Node{
NodeType: plan.Node_FILTER,
Children: []int32{node.Children[1]},
FilterList: cantPushdownChild,
}, nil)
}
node.Children[1] = childID
}
case plan.Node_PROJECT:
case plan.Node_AGG:
case plan.Node_WINDOW:
case plan.Node_TABLE_SCAN, plan.Node_EXTERNAL_SCAN:
case plan.Node_FUNCTION_SCAN:
...
}
Statements with a LIMIT cannot push the predicate down, because the result of doing a LIMIT and then a FILTER is different from doing a FILTER and then a LIMIT.
Then perform corresponding processing according to the node type. Here are the common FILTER nodes and JOIN nodes :
FILTER Node
The FILTER type is the simplest case, just merge the filtering conditions of the current node with those passed from the upper level and try to push down to the child node: if all the conditions can be pushed down, then the current node is a FILTER node with no filtering conditions, and it can be deleted directly; if there are conditions that can not be pushed down, then it will be processed at the current node.
JOIN Node
First of all, we introduce a small optimization that can transform a left join to an inner join. The result of left join contains all rows of the left table. If a left table row does not have a matching result in the right table, the corresponding column of the right table is filled with NULL value.
The advantage of this optimization is that for inner join, if the filter condition of the ON clause only involves the columns of the left or right table, the condition can be pushed down to the corresponding child nodes directly.
The predicate push down strategy on JOIN node first tries to transform the left join into an inner join, and then collects the push-down filter conditions based on the join side, and recursively pushes down to the left and right child nodes. For conditions that cannot be pushed down to the child nodes, a FILTER node is added above the child nodes to handle these conditions.
Part 5 Summarize
This article briefly introduces the front-end part of the MatrixOne kernel code from the perspective of a beginner, starting from the most common select statement, including the 3 components of binder, planner, and optimizer. Also introduces common sql clause bind and plan generation process, rule-based optimizer code framework and predicate push down optimization strategy.
The main goal of this article is to sort out the main process of MatrixOne, the actual MatrixOne source code is much more complex, involving details and all kinds of corners cases. Engineers who want to take a closer look at MatrixOne as well as those interested in digging deeper into the details can download the source code and read it carefully.