Database

Quickly Start: MatrixOne Database Kernel Front-End

Posted by Cao KaiPublished on

Part 1 Background

First of all, we will start with a simple introduction of the structure of a database kernel. The execution of a sql statement, mainly involves the following components:

  • Parser - Parse sql statements to an Abstract Syntax Tree (AST).
  • Binder - Perform semantic analysis to ensure that the statements are logically correct. Check the existence of tables, columns and functions in sqls, and bind the logical names with the actual objects in the database.
  • Planner - After binding phase, the planner generates a concrete execution plan for the query. The execution plan specifies the sequence of steps required to retrieve, filter, join, and aggregate data to produce the query result.
  • Executor - Execute the query according to the execution plan generated by the planner.

Based on previous learning experiences, I personally believe that the quickest way to become familiar with a new codebase that one has never encountered before is: follow the most common execution path, combine code inspection with step-by-step debuging, clarify the main logic, and then gradually expand from the mainline to various components and features, until the entire project is covered. In this article, we will start from the most common sql statement select … from … join … where … group by … order by … limit … to quickly familiarize ourselves with the code of MatrixOne kernel front-end (mainly including binder, planner and optimizer).

Part 2 Implementation Process

MatrixOne server handles the request from the client, we ignore the process of passing requests in between, the final request will enter the MysqlCmdExecutor.doComQuery() method, the general logic of this method is:

  1. Parser parses the sql into a statement list, also known as AST list;
  2. Executes each statement.
    // pkg/frontend/mysql_cmd_executor.go
    doComQuery() {
        cws = GetComputationWrapper()
        for cw in cws {
            // Execute Statement
            executeStmt(cw)
        }
    }
    
    GetComputationWrapper() {
    // Parse SQL Statement
        asts = parsers.Parse(sql)
        return [InitTxnComputationWrapper(stmt) for stmt in asts]
    }
    

MatrixOne is compatible with MySQL protocol. According to the MySQL protocol, MO will first return column information, then send detail data, and finally returning an EOF packet to end the communication. Column information is available after the execution plan is generated, and the specific data will be sent after the runner gets the results.

// pkg/frontend/mysql_cmd_executor.go
executeStmt(cw) {
    // Generate an execution plan
    compile = cw.Compile()
    // Send column information
    for c in cw.columns {
        SendColumnDefinitionPacket(c)
    }
    SendEOFPacketIf()
    // Execute the query
    runner = compile.(ComputationRunner)
    runner.Run()
    // End communication
    sendEOFOrOkPacket()
}

Part 3 Generating an Execution Plan

The Compile method mentioned in the previous parts do two main things: generates the execution plan and compiles the executor-related arithmetic.

// pkg/frontend/computation_wrapper.go
Compile() {
    // Build and optimize the execution plan
    cw.plan = buildPlan(..)
    // Compile the execution code
    cw.compile = compile.New(...)
    cw.compile.Compile()
}

Let's look at the buildPlan method first, and since much of the code below is a simple wrapper call, the call path: buildPlan(mysql_cmd_executor.go) -> BaseOptimizer.Optimize -> BuildPlan(build.go) -> runBuildSelectByBinder. We'll skip the middle part and go straight to the runBuildSelectByBinder method:

// pkg/sql/plan/build.go
runBuildSelectByBinder(stmt) {
    // Generate the execution plan
    builder := NewQueryBuilder(SelectType...)
    bindCtx := NewBindContext(builder)
    rootId := builder.buildSelect(stmt, bindCtx, true)
    builder.qry.Steps = append(builder.qry.Steps, rootId)
    // Optimizing the execution plan
    query := builder.createQuery()
    return &Plan{query}
}

As described above, the main responsibility of the binder is to bind the database/table/column names in the query statement to the corresponding objects in the database. The method to bind select statements in MatrixOne is to buildSelect, and the code is roughly as follows:

// pkg/sql/plan/query_builder.go
buildSelect() {
    preprocess WITH
    build FROM
    unfold stars and generate headings
    bind WHERE
    bind GROUP BY
    bind HAVING
    bind PROJECTION
    bind TIME WINDOW
    bind ORDER BY
    bind LIMIT/OFFSET

    append AGGREGATION node
    append TIME WINDOW node
    append WINDOW node
    append PROJECT node
    append DISTINCT node
    append SORT node (include LIMIT/OFFSET)
    append result PROJECT node
}

You can see that buildSelect method binds each part of the select statement (WITH, FROM, WHERE (HAVING), GROUP BY, TIME WINDOW, ORDER BY and LIMIT/OFFSET) in order, the following is an introduction to most commonly used part:

1. FROM

The method in MatrixOne to bind FROM clause is buildTable.

// pkg/sql/plan/query_builder.go
buildTable(stmt) {
    switch tbl := stmt.(type) {
    case *tree.Select:
    case *tree.TableName:
        // Get database, table name
        schema := tbl.SchemaName
        table := tbl.ObjectName
        // check db
        schema, err = databaseIsValid(schema, builder.compCtx)
        if err != nil {
            return 0, err
        }
        // Get tableDef
        obj, tableDef := builder.compCtx.Resolve(schema, table)
        if tableDef == nil {
            return 0, moerr.NewParseError("table %q does not exist", table)
        }
        // Generate node
        nodeID = builder.appendNode(&plan.Node{
            NodeType:    nodeType,
            Stats:       nil,
            ObjRef:      obj,
            TableDef:    tableDef,
            BindingTags: []int32{builder.genNewTag()},
        }, ctx)

    case *tree.JoinTableExpr:
        // If you have a single table, bind left table
        if tbl.Right == nil {
            return builder.buildTable(tbl.Left, ctx, preNodeId, leftCtx)
        }
        // Else, buildJoinTable
        return builder.buildJoinTable(tbl, ctx)

    case *tree.TableFunction:
    case *tree.ParenTableExpr:
    case *tree.AliasedTableExpr:
        // bind table
        nodeID = builder.buildTable(tbl.Expr)
        // addBinding
        builder.addBinding(nodeID, ctx)
    }
}

According to sql syntax, the types of expressions that can follow FROM are:

  • select expression
  • table name
  • join table expression
  • Table functions
  • Table expression with parentheses
  • Table expressions with aliases

Table expression includestable name (TableName), aliased table expression (AliasedTableExpr) and join table expression (JoinTableExpr). To maintain formal uniformity, MatrixOne's sql parser wraps each TableName in an AliasedTableExpr. As field will be set to nil if it has no alias; each AliasedTableExpr is wrapped into a JoinTableExpr, and Right field will be set to nil if it is just a single table without a join.

As an example, the sql statement SELECT * FROM t1, parse gets a structure of t1 similar to this:

JoinTableExpr {
    Left: AliasedTableExpr {
        Expr: t1
        As: nil
    }
    Right: nil
}

The process of binding the FROM clause of this sql is roughly like this: first JoinTableExpr right table is nil, then only bind the left table; the left table is an AliasedTableExpr structure, first bind Expr (here is the structure of the TableName), and then add the information of the bind into the BindContext(ctx) (addBinding method).

The logic of bind TableName is roughly as follows, first verify whether the db exists, if it exists to get the table meta information, get the table's column information (i.e., the Resolve method, if the table does not exist to return nil), and then wrap it into a plan.Node and return.

The addBinding method wraps information from the bind node, such as bindingTag, nodeID, table information (tableName, tableID), and column information (colName, colType, and whether or not to hide) into a Binding structure and adds it to the ctx. In the subsequent bind process, the table/column names can be parsed based on these binding information.

// pkg/sql/plan/query_builder.go
addBinding(nodeID, ctx) {
    // Get node
    node := getNode(nodeID)
    // Generate col information
    colLength := len(node.TableDef.Cols)
    cols := make([]string, colLength)
    colIsHidden := make([]bool, colLength)
    types := make([]*plan.Type, colLength)
    tag := node.BindingTags[0]
    for i, col := range node.TableDef.Cols {
        if i < len(alias.Cols) {
            cols[i] = string(alias.Cols[i])
        } else {
            cols[i] = col.Name
        }
        colIsHidden[i] = col.Hidden
        types[i] = col.Typ
        name := table + "." + cols[i]
        builder.nameByColRef[[2]int32{tag, int32(i)}] = name
    }
    // Generate Binding structure
    binding = NewBinding(tag, nodeID, table, node.TableDef.TblId, cols, colIsHidden, types)
    // Add Binding to BindContext
    ctx.bindings = append(ctx.bindings, binding)
    ctx.bindingByTag[binding.tag] = binding
    ctx.bindingByTable[binding.table] = binding
}

2. WHERE / HAVING

The bind process of the WHERE clause is: first split the filter condition with AND as the separator, and then bind each expression recursively, for example, the filter condition A AND B AND C can be split into A, B and C, and then bind(A), bind(B), bind(C) respectively; After the filter binds, the statement containing the subquery needs to be flattened to convert the subquery into an equivalent join statement. Because the execution efficiency of the subquery is extremely low, it will slow down the speed of the entire query, and converting it to a join can greatly improve system performance.

The problem of how to convert a subquery to a join is a bit more complex, so you can refer to the paper if you are interested.

The HAVING clause, like the WHERE clause, acts as a filter, but syntactically does not support subqueries. The bind process is the same as the first half of WHERE, so I won't go into that.

// pkg/sql/plan/query_builder.go
// Split filter condition
whereList := splitAndBindCondition(clause.Where.Expr, NoAlias, ctx)

var newFilterList []*plan.Expr
var expr *plan.Expr
for _, cond := range whereList {
    // Optimizing subqueries
    nodeID, expr, err = builder.flattenSubqueries(nodeID, cond, ctx)
    newFilterList = append(newFilterList, expr)
}
// Generate node and add to ctx
nodeID = builder.appendNode(&plan.Node{
    NodeType:     plan.Node_FILTER,
    Children:     []int32{nodeID},
    FilterList:   newFilterList,
    NotCacheable: notCacheable,
}, ctx)

3. GROUP BY

The bind process for GROUP BY clauses is a little simpler than HAVING, both need to split the expression and directly recursively bind each GROUP BY sub-expression.

The following PROEJCTION, ORDER BY, LIMIT/OFFSET bind process is basically similar to GROUP BY, in addition to the need to do some additional information to set up (such as sorting order etc.) and check (such as limit count can not be a negative number, etc.), I'll not repeat the description.

// pkg/sql/plan/query_builder.go
groupBinder := NewGroupBinder(builder, ctx)
for _, group := range clause.GroupBy {
    // Complement column names, col1 -> t1.col1
    group = ctx.qualifyColumnNames(group, AliasAfterColumn)
    // Bind subexpression
    groupBinder.BindExpr(group, 0, true)
}

4. PROJECTION

// pkg/sql/plan/query_builder.go
for i := range selectList {
    // Bind subexpression
    expr := projectionBinder.BindExpr(selectList[i].Expr, 0, true)
    ctx.projects = append(ctx.projects, expr)
}

5. ORDER BY


// pkg/sql/plan/query_builder.go
orderBinder := NewOrderBinder(projectionBinder, selectList)
orderBys := make([]*plan.OrderBySpec, 0, len(astOrderBy))

for _, order := range astOrderBy {
    // Bind subexpression
    expr := orderBinder.BindExpr(order.Expr)

    orderBy := &plan.OrderBySpec{
        Expr: expr,
        Flag: plan.OrderBySpec_INTERNAL,
    }
    // Set Flag
    set orderBy.Flag
    orderBys = append(orderBys, orderBy)
}

6. LIMIT/OFFSET

// pkg/sql/plan/query_builder.go
limitBinder := NewLimitBinder(builder, ctx)
if astLimit.Offset != nil {
    // Bind subexpression
    offsetExpr := limitBinder.BindExpr(astLimit.Offset, 0, true)
    // Check non-negativity
    if ifNegative(offsetExpr) {
        return 0, moerr.NewSyntaxError(builder.GetContext(), "offset value must be nonnegative")
    }
}
if astLimit.Count != nil {
    // Bind subexpression
    limitExpr := limitBinder.BindExpr(astLimit.Count, 0, true)
    // Check non-negativity
    if ifNegative(limitExpr) {
        return 0, moerr.NewSyntaxError(builder.GetContext(), "offset value must be nonnegative")
    }
}

Part 4 Optimize the Execution Plan

The runBuildSelectByBinder method, in addition to the buildSelect method, there is also a part of the work that does query optimization, namely the createQuery method:

// pkg/sql/plan/query_builder.go
createQuery() {
    for i, rootID := range builder.qry.Steps {
        // rule 1
        builder.rewriteDistinctToAGG(rootID)
        // rule 2
        builder.rewriteEffectlessAggToProject(rootID)
        // rule 3
        rootID = builder.pushdownFilters(rootID, nil, false)
        // rule 4
        err := foldTableScanFilters(builder.compCtx.GetProcess(), builder.qry, rootID)
        if err != nil {
            return nil, err
        }
        // rule 5
        builder.pushdownLimit(rootID)
        // rule 6
        builder.removeSimpleProjections(rootID, plan.Node_UNKNOWN, false, make(map[[2]int32]int))
        ...
        // rule n
    }
}

From the code, the logic of the createQuery method is simple: it applies all the optimization rules, one by one, to the previously generated execution plan. Currently the most common optimization rules are: predicate push down, column prune, projection elimination, outer join elimination, aggregation operator push down/up, subquery optimization (as mentioned above, MatrixOne already does this optimization in the generation of PLAN phase), and so on.

A common and important strategy is presented here: predicate push down.

Predicate is also known as the filter conditions, predicate push down from the intuitively very easy to understand an optimization: the closer to the position of the data read to do filtering, the subsequent need to deal with the amount of data will be less, the overall query efficiency will be higher.

For example, this sql: select * from t1, t2 where t1.col1 > 1 and t2.col2 > 2. Assuming that t1, t2 have 10,000 records, the amount of data in the two tables that meet the filtering conditions are 100. If you first do the Cartesian product on t1,t2 and then do the filtering, the data to be processed is 100 million. If you first do the filtering on t1,t2 first and then do the Cartesian product, then the number of data to be processed is only 10,000, which can greatly reduce the amount of data to be processed.

The predicate pushdown optimization in MatrixOne is in the pushdownFilters method:

// pkg/sql/plan/opt_misc.go
// filters are the filters that are pushed down to the current node
pushdownFilters(nodeID, filters) {
    node := builder.qry.Nodes[nodeID]
    var canPushdown, cantPushdown []*plan.Expr
    // Statements with limit cannot be extrapolated
    if node.Limit != nil {
        cantPushdown = filters
        filters = nil
    }

    switch node.NodeType {
    case plan.Node_FILTER:
        canPushdown = filters
        // Collect the filter conditions for the current node
        for _, filter := range node.FilterList {
            canPushdown = append(canPushdown, splitPlanConjunction(filter)...)
        }
        // Push down to child nodes
        childID, cantPushdownChild := pushdownFilters(node.Children[0], canPushdown)
        // If all filters are pushed down, the current node can be deleted
        if len(cantPushdownChild) > 0 {
            node.Children[0] = childID
            node.FilterList = cantPushdownChild
        } else {
            nodeID = childID
        }
    case plan.Node_JOIN:
        var leftPushdown, rightPushdown []*plan.Expr
        for i, filter := range filters {
            joinSides[i] = getJoinSide(filter, leftTags, rightTags, markTag)
            // Whether can convert a left join to an inner join
            if node.JoinType == plan.Node_LEFT && joinSides[i]&JoinSideRight != 0 && rejectsNull(filter, builder.compCtx.GetProcess()) {
                node.JoinType = plan.Node_INNER
                break
            }
        }

        node.OnList = splitPlanConjunctions(node.OnList)

        if node.JoinType == plan.Node_INNER {
            // If it can be converted to an inner join, recalculate the joinSide
            for _, cond := range node.OnList {
                // Collecting filter conditions for ON clauses
                filters = append(filters, splitPlanConjunction(cond)...)
            }
            node.OnList = nil

            joinSides = make([]int8, len(filters))
            for i, filter := range filters {
                joinSides[i] = getJoinSide(filter, leftTags, rightTags, markTag)
            }
        } else if node.JoinType == plan.Node_LEFT {
            // If you can't transform it into an inner join, collect filters that can be extrapolated to the right     table.
            var newOnList []*plan.Expr
            for _, cond := range node.OnList {
                conj := splitPlanConjunction(cond)
                for _, conjElem := range conj {
                    side := getJoinSide(conjElem, leftTags, rightTags, markTag)
                    if side&JoinSideLeft == 0 {
                        rightPushdown = append(rightPushdown, conjElem)
                    } else {
                        newOnList = append(newOnList, conjElem)
                    }
                }
            }
            node.OnList = newOnList
        }

        // Collect separately the conditions which can be extrapolated down to the left and right tables
        for i, filter := range filters {
            switch joinSides[i] {
            case JoinSideLeft:
                if node.JoinType != plan.Node_OUTER {
                    leftPushdown = append(leftPushdown, filter)
                } else {
                    cantPushdown = append(cantPushdown, filter)
                }
            case JoinSideRight:
                if node.JoinType == plan.Node_INNER {
                    rightPushdown = append(rightPushdown, filter)
                } else {
                    cantPushdown = append(cantPushdown, filter)
                }
            // ... case XXXX
            default:
                cantPushdown = append(cantPushdown, filter)
            }
        }

        // Push down to the left child node
        childID, cantPushdownChild := pushdownFilters(node.Children[0], leftPushdown)
        // If there is a condition that cannot be pushed down, the left child node adds a FILTER node
        if len(cantPushdownChild) > 0 {
            childID = builder.appendNode(&plan.Node{
                NodeType:   plan.Node_FILTER,
                Children:   []int32{node.Children[0]},
                FilterList: cantPushdownChild,
            }, nil)
        }
        node.Children[0] = childID
        // Push down to the right child node, handle as above
        childID, cantPushdownChild = pushdownFilters(node.Children[1], rightPushdown)
        if len(cantPushdownChild) > 0 {
            childID = builder.appendNode(&plan.Node{
                NodeType:   plan.Node_FILTER,
                Children:   []int32{node.Children[1]},
                FilterList: cantPushdownChild,
            }, nil)
        }
        node.Children[1] = childID
    }

    case plan.Node_PROJECT:
    case plan.Node_AGG:
    case plan.Node_WINDOW:
    case plan.Node_TABLE_SCAN, plan.Node_EXTERNAL_SCAN:
    case plan.Node_FUNCTION_SCAN:
    ...
}

Statements with a LIMIT cannot push the predicate down, because the result of doing a LIMIT and then a FILTER is different from doing a FILTER and then a LIMIT.

Then perform corresponding processing according to the node type. Here are the common FILTER nodes and JOIN nodes :

FILTER Node

The FILTER type is the simplest case, just merge the filtering conditions of the current node with those passed from the upper level and try to push down to the child node: if all the conditions can be pushed down, then the current node is a FILTER node with no filtering conditions, and it can be deleted directly; if there are conditions that can not be pushed down, then it will be processed at the current node.

JOIN Node

First of all, we introduce a small optimization that can transform a left join to an inner join. The result of left join contains all rows of the left table. If a left table row does not have a matching result in the right table, the corresponding column of the right table is filled with NULL value.

The advantage of this optimization is that for inner join, if the filter condition of the ON clause only involves the columns of the left or right table, the condition can be pushed down to the corresponding child nodes directly.

The predicate push down strategy on JOIN node first tries to transform the left join into an inner join, and then collects the push-down filter conditions based on the join side, and recursively pushes down to the left and right child nodes. For conditions that cannot be pushed down to the child nodes, a FILTER node is added above the child nodes to handle these conditions.

Part 5 Summarize

This article briefly introduces the front-end part of the MatrixOne kernel code from the perspective of a beginner, starting from the most common select statement, including the 3 components of binder, planner, and optimizer. Also introduces common sql clause bind and plan generation process, rule-based optimizer code framework and predicate push down optimization strategy.

The main goal of this article is to sort out the main process of MatrixOne, the actual MatrixOne source code is much more complex, involving details and all kinds of corners cases. Engineers who want to take a closer look at MatrixOne as well as those interested in digging deeper into the details can download the source code and read it carefully.