1. Learning Objectives
Understand the network interaction process of connecting and operating between MOserver and MySQL client.
2. Basics of MySQL Network Protocol
Basic Structure of Protocol Packet
MySQL protocol packet is the basic unit of communication between the client and server. Each protocol packet contains the following parts:
-
Packet Length
- Length: 3 bytes
- Description: Represents the length of the packet, excluding the header (i.e., 3-byte packet length and 1-byte sequence number). The maximum value is 2^24 - 1 (16,777,215 bytes, which is 16MB - 1 byte).
-
Sequence ID
- Length: 1 byte
- Description: Used to identify the order of packets, with the client and server alternately incrementing the sequence number. The sequence number starts at 0 and increments by 1 with each packet sent, resetting to 0 after reaching 255.
-
Payload Data
- Length: Variable
- Description: The actual data being transmitted, with the length determined by the packet length field.
Since the maximum length of a MySQL protocol packet is 16,777,215 bytes (about 16MB), data exceeding this length needs to be split into multiple packets for transmission. MySQL handles this using a mechanism called "fragmented packets." Each fragmented packet has its own packet length and sequence number fields. The payload data of the fragmented packets is concatenated to form the complete data.
MySQL Protocol Encoding Types in MO
The basic data types in MySQL protocol are Integer and String. Integers can be either fixed-length or variable-length. Fixed lengths are 1, 2, 3, 4, 6, and 8 bytes, while variable lengths use length encoding to determine the overall length of the integer, providing flexibility and efficiency.
func (mp *MysqlProtocolImpl) readIntLenEnc(data []byte, pos int) (uint64, int, bool) {
if pos >= len(data) {
return 0, 0, false
}
// First byte greater than 250 indicates the size of the integer
switch data[pos] {
case 0xfb:
// Zero, one byte
return 0, pos + 1, true
case 0xfc:
// Integer in two bytes
if pos+2 >= len(data) {
return 0, 0, false
}
value := uint64(data[pos+1]) |
uint64(data[pos+2])<<8
return value, pos + 3, true
case 0xfd:
// Integer in three bytes
if pos+3 >= len(data) {
return 0, 0, false
}
value := uint64(data[pos+1]) |
uint64(data[pos+2])<<8 |
uint64(data[pos+3])<<16
return value, pos + 4, true
case 0xfe:
// Integer in eight bytes
if pos+8 >= len(data) {
return 0, 0, false
}
value := uint64(data[pos+1]) |
uint64(data[pos+2])<<8 |
uint64(data[pos+3])<<16 |
uint64(data[pos+4])<<24 |
uint64(data[pos+5])<<32 |
uint64(data[pos+6])<<40 |
uint64(data[pos+7])<<48 |
uint64(data[pos+8])<<56
return value, pos + 9, true
}
// Values between 0 and 250 occupy 1 byte and are returned directly
return uint64(data[pos]), pos + 1, true
}
The string types are mainly categorized as follows:
// FixedLengthString, such as sql status in ERR_Packet is fixed at 5
func readStringFix() {
var sdata []byte
var ok bool
sdata, pos, ok = mp.readCountOfBytes(data, pos, length)
return string(sdata), pos, true
}
// NullTerminatedString ends at 0
func readStringNUL() {
zeroPos := bytes.IndexByte(data[pos:], 0)
return string(data[pos : pos+zeroPos]), pos + zeroPos + 1, true
}
// VariableLengthString
func readStringLenEnc() {
var value uint64
var ok bool
// First read the string length using LengthEncodedInteger, then read the corresponding string
value, pos, ok = mp.readIntLenEnc(data, pos)
sLength := int(value)
return string(data[pos : pos+sLength]), pos + sLength, true
}
3. Connection Phase
Listen-Accept
MOserver establishes connections by listening to TCP and UNIX ports using the standard Go net library. Each established connection is handed over to handleConn for further processing.
// Core code
func (mo *MOServer) startListener() {
// Supports both TCP and UNIX
for _, listener := range mo.listeners {
go mo.startAccept(listener)
}
}
func (mo *MOServer) startAccept(listener net.Listener) {
for {
conn, err := listener.Accept()
if err != nil {
return
}
// Each connection is handled in a separate goroutine
go mo.handleConn(conn)
}
}
HandShake Preparation and Sending
handleConn handles server-client authentication, i.e., handshake. The handshake process in MySQL protocol involves sending a handshake packet and processing the authentication return handshake response to complete the identity verification, establishing the connection between server and client. The main interactions between the client and server are as follows:
Core code in MO:
func (mo *MOServer) handleConn(conn net.Conn) {
// NewIOSession creates a wrapper for net.Conn, managing network buffers and read/write operations
rs, err := NewIOSession(conn)
if err != nil {
mo.Closed(rs)
return
}
err = mo.handshake(rs)
if err != nil {
mo.rm.Closed(rs)
return
}
// If handshake is successful, returns nil error and enters the command phase
mo.handleLoop(rs)
}
func (mo *MOServer) handshake(rs *Conn) {
hsV10pkt := makeHandshakeV10Payload()
writePackets(hsV10pkt)
// To handle handshake response
}
func makeHandshakeV10Payload() []byte {
// Write the first part of the salt (fixed 8 bytes)
pos = mp.writeCountOfBytes(data, pos, mp.GetSalt()[0:8])
// Check if plugin authentication is supported
// To decide whether to write the length of auth-plugin-data
if (DefaultCapability & CLIENT_PLUGIN_AUTH) != 0 {
// MO is fixed at 20 bytes + 1 NUL byte
pos = mp.io.WriteUint8(data, pos, uint8(len(mp.GetSalt())+1))
} else {
pos = mp.io.WriteUint8(data, pos, 0)
}
// By default, secure connection is enabled, write the second part of the salt
if (DefaultCapability & CLIENT_SECURE_CONNECTION) != 0 {
pos = mp.writeCountOfBytes(data, pos, mp.GetSalt()[8:])
pos = mp.io.WriteUint8(data, pos, 0)
}
// Write auth_plugin_name, MO is fixed as mysql_native_password
if (DefaultCapability & CLIENT_PLUGIN_AUTH) != 0 {
pos = mp.writeStringNUL(data, pos, AuthNativePassword)
}
return data
}
Example of a server handshake packet captured using Wireshark:
// Server Request
Server Greeting
Protocol: 10
Version: 8.0.30-MatrixOne-v286829
Thread ID: 893
Salt: \x12_"pID~\x11
Server Capabilities: 0xa68f
Server Language: utf8mb4 COLLATE utf8mb4_bin (46)
Server Status: 0x0002
Extended Server Capabilities: 0x013b
Authentication Plugin Length: 21
Unused: 00000000000000000000
Salt: \x1E\!\x1Cku(2#\x06T~
Authentication Plugin: mysql_native_password
HandShakeResponse Analysis and Processing
After receiving the handshake, the client processes it and returns a handshake response packet:
// Client Request
MySQL Protocol
Packet Length: 195
Packet Number: 1
Login Request
Client Capabilities: 0xa685
Extended Client Capabilities: 0x19ff
MAX Packet: 16777216
Charset: utf8mb4 COLLATE utf8mb4_0900_ai_ci (255)
Unused: 0000000000000000000000000000000000000000000000
Username: dump
Password: 4c1978397152b08d31bfded38c0322fb8602a116
Client Auth Plugin: mysql_native_password
Connection Attributes
Connection Attributes length: 114
Connection Attribute - _pid: 27998
Connection Attribute - _platform: x86_64
Connection Attribute - _os: Linux
Connection Attribute - _client_name: libmysql
Connection Attribute - os_user: cjk
Connection Attribute - _client_version: 8.0.33
Connection Attribute - _pid: 27998
Connection Attribute - _platform: x86_64
Connection Attribute - _os: Linux
Connection Attribute - _client_name: libmysql
Connection Attribute - os_user: cjk
Connection Attribute - _client_version: 8.0.33
Connection Attribute - program_name: mysql
Response mainly includes client capabilities identifiers, maximum acceptable packet size, charset, necessary Auth information (for non-SSL connections), and Connection Attributes. After MOserver sends the handshake packet, it waits for the client to return the handshakeResponse. The IsEstablished flag is false before the handshake is completed, indicating it is waiting to read and analyze the handshake response.
func (mo *MOServer) handshake(rs *Conn) error {
// Begin processing the handshake response
// Determine if the information being processed is the handshakeResponse based on the Established flag
if !protocol.IsEstablished() {
// Core analysis code, MO supports version 4.1 protocol
var resp41 response41
resp41 = analyseHandshakeResponse41()
// After parsing, proceed with identity authentication
}
// Returning nil error indicates successful handshake
return nil
}
// Analyze the payload of the response
func analyseHandshakeResponse41() response41 {
// For version 4.1, read the 4-byte capabilities in one go. If it is found that the 4.1 protocol is not supported, it returns an error
info.capabilities, pos, ok = mp.io.ReadUint32(data, pos)
if (info.capabilities & CLIENT_PROTOCOL_41) == 0 {
error
}
// Read the 4-byte maximum packet size, 1-byte charset, and skip the zero-fill bytes
info.maxPacketSize, pos, ok = mp.io.ReadUint32(data, pos)
info.collationID, pos, ok = mp.io.ReadUint8(data, pos)
pos += 23
// Return for SSL connection, waiting for exchange
if pos == len(data) && (info.capabilities&CLIENT_SSL) != 0 {
info.isAskForTlsHeader = true
return true, info, nil
}
// For non-SSL, read the username in plaintext
// Decide the method to read the password (plaintext/encrypted, fixed-length/variable-length encoding) based on capabilities
info.username, pos, ok = mp.readStringNUL(data, pos)
if (info.capabilities & CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA) != 0
else if (info.capabilities & CLIENT_SECURE_CONNECTION) != 0 else
// If the database is specified
if (info.capabilities & CLIENT_CONNECT_WITH_DB) != 0 {
info.database, pos, ok = mp.readStringNUL(data, pos)
}
// Plugin_Auth only supports mysql_native_password
info.clientPluginName, pos, ok = mp.readStringNUL(data, pos)
info.connectAttrs = make(map[string]string)
if info.capabilities&CLIENT_CONNECT_ATTRS != 0 {
// Read connectAttrs with variable-length encoding
}
return true, info, nil
}
Authenticate Authentication
After parsing the handshakeResponse, proceed to the Authenticate information verification stage. If the authentication is successful, the Established flag is set, and the connection is completed. The server will return an OK packet, and then it starts handling regular command information.
if err = protocol.Authenticate(); err != nil {
return err
}
protocol.SetEstablished()
// Server Response
MySQL Protocol - response OK
Packet Length: 8
Packet Number: 2
Response Code: OK Packet (0x00)
Affected Rows: 0
Server Status: 0x0810
Warnings: 0
If the connection fails, such as due to incorrect password, it returns an ERROR packet.
// Server Response
MySQL Protocol - response ERROR
Packet Length: 75
Packet Number: 2
Response Code: ERR Packet (0xff)
Error Code: 1045
SQL state: 28000
Error message: Access denied for user dump. internal error: check password failed
The core functions of Authenticate consist of two parts: AuthenticateUser under Session, which includes verifying user identity, checking account status, checking role permissions, and verifying database existence; and checkPassword, which uses the SHA-1 hash algorithm and salt to perform password verification. AuthenticateUser involves SQL queries.
// Get the SQL query statement
func getSqlFor***() (string, error) {
err := inputIsInvalid(ctx, tenant)
if err != nil {
return "", err
}
return fmt.Sprintf(check***ormat, tenant), nil
}
// Execute SQL query
func executeSQLInBackgroundSession(sql string) ([]ExecResult, error) {
bh := NewBackgroundExec(reqCtx, upstream, mp)
defer bh.Close()
err := bh.Exec(reqCtx, sql)
if err != nil {
return nil, err
}
return getResultSet(reqCtx, bh)
}
// AuthenticateUser mainly verifies the user's information in the database through SQL queries
func (ses *Session) AuthenticateUser {
// Set tenant information to session
ses.SetTenantInfo(tenant)
// check tenant exit
// check account status
// check user:role
// GetPassword is used to convert the stored password from hash value to byte array
return GetPassWord(pwd)
}
// Validate client-sent auth and stored pwd
func checkPassword(pwd, salt, auth []byte) {
ses := mp.GetSession()
// Calculate SHA-1(salt + pwd)
sha := sha1.New()
_, err := sha.Write(salt)
_, err = sha.Write(pwd)
hash1 := sha.Sum(nil)
// Validate if auth and hash1 have equal lengths
if len(auth) != len(hash1) {
return false
}
// Perform XOR operation
for i := range hash1 {
hash1[i] ^= auth[i]
}
// Calculate SHA-1(hash1)
hash2 := HashSha1(hash1)
// Compare the restored hash value with the stored password hash value
return bytes.Equal(pwd, hash2)
}
SSL Connection
The handshake interaction under SSL connection is mainly as follows:
Furthermore, analyzing the server's handshake packet captured by Wireshark, we see the SSL flag bit set in Server Capabilities.
// Server request
Server Greeting
Protocol: 10
Version: 8.0.30-MatrixOne-v286829
Thread ID: 707
Salt: \x05H\nz@[7O
Server Capabilities: 0xae8f
Server Language: utf8mb4 COLLATE utf8mb4_bin (46)
Server Status: 0x0002
Extended Server Capabilities: 0x013b
Authentication Plugin Length: 21
Unused: 00000000000000000000
Salt: -E`\x13c/L2GxcB
Authentication Plugin: mysql_native_password
In the response packet, after the username, encrypted data is transmitted through SSL.
// Client Response
Login Request
Client Capabilities: 0xae85
Extended Client Capabilities: 0x19ff
MAX Packet: 16777216
Charset: utf8mb4 COLLATE utf8mb4_0900_ai_ci (255)
Unused: 0000000000000000000000000000000000000000000000
Username:
The specific code for establishing an SSL connection is as follows:
func analyseHandshakeResponse41() {
// Return directly when SSL flag bit = 1
if pos == len(data) && (info.capabilities&CLIENT_SSL) != 0 {
info.isAskForTlsHeader = true
return true, info, nil
}
}
func Handler() {
if isTlsHeader {
// Go standard library crypto/tls
tlsConn := tls.Server(rs.RawConn(), rm.getTlsConfig())
protocol.SetTlsEstablished()
}
}
4. Command Phase
MO Network Buffer
Before introducing the specific Command execution process, let's briefly explain the basic network read and write method of the MOserver. The MOserver encapsulates the original net.Conn
connection, primarily adding a buffer design for the MySQL protocol. The core method is to reuse a fixed-size buffer (default is 1MB) to reduce the number of memory allocations. When the data packet exceeds the fixed buffer size, it continues to write into a dynamic buffer consisting of a linked list. Below is the main structure code of the encapsulated Conn:
// The basic unit of the buffer, consisting of a data slice and a write pointer
type ListBlock struct {
data []byte
writeIndex int
}
type Conn struct {
// Unique ID
id uint64
conn net.Conn
// Maintain the correct SeqID
sequenceId uint8
// Fixed buffer
fixBuf *ListBlock
// Dynamic buffer, implemented by a linked list
dynamicBuf *list.List
// curBuf and curHeader mark the current write block and Packet's header
curBuf *ListBlock
curHeader []byte
// Current buffer data volume and current Packet data volume
bufferLength int
packetLength int
}
Except for the state response packets (OK, EOF, ERROR) in the MySQL protocol that need to be sent immediately, other data packets will be stored in the buffer first, waiting to be sent together with the state response packet to reduce the number of system call write operations. Note that the size of a data packet cannot be known in advance, so 4 bytes of Packet header are always reserved, and the Packet size and Seq ID are written back at the end of the packet.
func (c *Conn) Append(elems ...byte) error {
// Besides an extra judgment for >16MB, the core method is AppendPart
err = c.AppendPart(elems)
return err
}
func (c *Conn) AppendPart(elems []byte) error {
var err error
// Calculate the remaining space in the current block
curBufRemainSpace := len(c.curBuf.data) - c.curBuf.writeIndex
if len(elems) > curBufRemainSpace {
// If the remaining size of the current block is insufficient, allocate a new block
copy(c.curBuf.data[c.curBuf.writeIndex:], elems[:curBufRemainSpace])
c.curBuf.writeIndex += curBufRemainSpace
curElemsRemainSpace := len(elems) - curBufRemainSpace
// PushNewBlock allocates new memory and puts it into the dynamic buffer, and modifies the usage of c.curBuf
err = c.PushNewBlock(curElemsRemainSpace)
copy(c.curBuf.data[c.curBuf.writeIndex:], elems[curBufRemainSpace:])
c.curBuf.writeIndex += len(elems[curBufRemainSpace:])
} else {
// Otherwise, continue writing to the end of the current block
copy(c.curBuf.data[c.curBuf.writeIndex:], elems)
c.curBuf.writeIndex += len(elems)
}
return err
}
func (c *Conn) BeginPacket() error {
// Record the header position of the current Packet
c.curHeader = c.curBuf.data[c.curBuf.writeIndex : c.curBuf.writeIndex+HeaderLengthOfTheProtocol]
// Skip the header
c.curBuf.writeIndex += HeaderLengthOfTheProtocol
c.bufferLength += HeaderLengthOfTheProtocol
return nil
}
func (c *Conn) FinishedPacket() error {
// At the end of the current packet, write the PacketSize and Seq ID to the reserved header position
binary.LittleEndian.PutUint32(c.curHeader, uint32(c.packetLength))
c.curHeader[3] = c.sequenceId
return nil
}
func (c *Conn) FlushIfFull() error {
// FlushIfFull only checks whether Flush needs to be called
}
func (c *Conn) Flush() error {
// WriteToConn safely writes data to the network
err = c.WriteToConn(c.fixBuf.data[:c.fixBuf.writeIndex])
// After the fixed buffer is written, if there is additional data in the dynamic buffer, it also needs to be written
for node := c.dynamicBuf.Front(); node != nil; node = node.Next() {
block := node.Value.(*ListBlock)
err = c.WriteToConn(block.data[:block.writeIndex])
}
return err
}
// Only state response packets OK, EOF, ERROR are directly sent via Write
func (c *Conn) Write(payload []byte) error {
// First, send all buffer data
err = c.Flush()
// Construct the header of the state response packet and send it together
var header [4]byte
length := len(payload)
binary.LittleEndian.PutUint32(header[:], uint32(length))
header[3] = c.sequenceId
c.sequenceId += 1
err = c.WriteToConn(append(header[:], payload...))
return err
}
Query Structure
After the connection is established, the server and the client start processing different Commands. The client continuously sends Request Command Query. The structure of the Request Command Query in the MySQL official documentation is as follows:
// Client Request
Request Command Query
Command: Query (3)
Statement: select * from t
The first byte of a Request Command Query represents the command type. Taking the simplest and most common query type as an example, apart from the judgment of special cases, a Request Command Query consists of only two parts:
-
command type
-
query string (NullTerminatedString)
The incoming data packet is first received by the server's Read function in the form of a byte slice and then finally passed to handleRequest
for processing. The handleRequest
eventually calls the core ExecuteStmt
function after layer-by-layer invocation. Next, we will describe the execution process for common statements.
SELECT Statement
In a SELECT statement, the interaction between the client and the server is as follows:
After the SELECT statement enters the server and is parsed and classified, it is executed by calling executeStmtWithResponse
.
func Handler(msg interface{}) {
// Confirm the connection is established
if !protocaol.IsEstablished() {}
// Deconstruct the Payload data stream into a Request structure
req := protocol.GetRequest(payload)
// Core code
routine.handleRequest(req)
}
func handleRequest(req *Request) {
// The query plan ultimately enters this function for execution and sending results
executeStmtWithResponse()
}
Eventually, it will land in the ExecuteResultRowStmt
function, which completes the SQL parse, generates the plan, compiles, and sends the column count and column definition before running, and finally sends the text row result. We focus on the encoding and sending method of the result after execution.
func executeStmtWithResponse() {
// Core function, including execution and sending result data
executeStmt()
// Respond to the client, send the status packet
respClientWhenSuccess()
}
func respClientWhenSuccess() {
// The final response function, ending the query by sending the result status packet
switch stmt.RespType() {
// The return function executed under select
case tree.RESP_STREAM_RESULT_ROW:
respStreamResultRow()
}
}
func executeStmt() {
// The data of the result row ultimately enters this function
executeResultRowStmt()
}
func executeResultRowStmt() {
// Send the column count and definition before executing the query plan
respColumnDefsWithoutFlush()
Run()
}
The result sending consists of three main parts: ColumnCount
, Columns
, and TextRow
. Next, we will focus on the internal operations of these three functions.
The ColumnCount
is encoded as a length-encoded integer and sent as a single packet.
func SendColumnCountPacket(count uint64) {
// Send ColumnCount as LengthEncodedInteger
pos = writeIntLenEnc(data, pos, count)
// appendPacket internally calls Conn's BeginPacket, Append, FinishedPacket, etc.
appendPackets(data)
}
According to the ColumnCount
, each column definition is sent individually as a packet. After all column packets are sent, an EOF packet is sent.
func sendColumns(set ResultSet) {
// Send each column definition in sequence
for i := uint64(0); i < set.GetColumnCount(); i++ {
col := set.GetColumn(i)
SendColumnDefinitionPacket(col)
}
// End with EOF
sendEOFPacket()
}
func SendColumnDefinitionPacket(column Column) {
// Core code to generate column definition packets
data := makeColumnDefinition(column)
appendPackets(data)
}
makeColumnDefinition
generates packets similar to those in the MySQL documentation protocol and sends them. Flags are encoded to include column properties (Not NULL, primary key), etc.
// Server Response
MySQL Protocol - field packet
Packet Length: 38
Packet Number: 3
Catalog
Catalog: def
Database
Database: testdb
Table
Table: t
Original table
Original table: t
Name
Name: name
Original name
Original name: name
Charset number: utf8 COLLATE utf8_general_ci (33)
Length: 4294967295
Type: FIELD_TYPE_VAR_STRING (253)
Flags: 0x0000
Decimals: 0
Additionally, except for executeResultRowStmt
, in statements such as Insert
, Update
, etc., where the result returned is the execution status, it enters executeStatusStmt
, and column definitions and actual row data are no longer returned. Only the final status packet is returned using respClientWhenSuccess
. This will not be elaborated on here.
During execution, each result batch ultimately enters the RespResult
function and eventually calls WriteResultSetRow
to send rows. Unlike column definitions and status packets, result sending here uses a more efficient method by directly writing the byte stream into the TCP buffer rather than reconstructing it. The specific implementation process is as follows:
// Parameters are the result set and the number of rows
func Write(bat *batch.Batch) {
for j := 0; j < n; j++ {
// Extract row data from column vectors row by row
extractRowFromEveryVector(bat, j, mrs.Data[0])
// Send row data
WriteResultSetRow(&mrs, 1)
}
}
func WriteResultSetRow(mrs *MysqlResultSet, cnt uint64) {
// Call BeginPacket method in Conn to start a new protocol packet
beginPacket()
// Prepare data
appendResultSetTextRow(mrs, i)
// End the protocol packet after row writing is complete
FinishedPacket()
}
func appendResultSetTextRow(data []byte, r uint64) []byte {
for i := uint64(0); i < GetColumnCount(); i++ {
column, err := set.GetColumn(i)
// Null values are defined as 0xfb
if isNil(column) {
appendUint8(data, 0xFB)
} else {
// Other types use string<lenenc> encoding and are added to the buffer
switch column.ColumnType() {
appendStringLenEnc(value)
}
}
}
}
func appendStringLenEnc(value string) {
// Append ultimately calls Conn.Append to write bytes into the buffer
Conn.Append([]byte(value[:length])...)
}
Prepare/Execute
The Prepare
statement can be directly called in CLI after using a custom name, followed by Execute
and SET @var
. It can also be used directly in JDBC methods without explicitly specifying the statement name. The protocol packets for the two cases differ and are described below.
Prepare (CLI)
In CLI, Prepare/Set/Execute
is executed, with the command type in the request packet still being query
. Prepare
and Set
will only return an OK
packet, while Execute
returns the same result as directly executing the corresponding statement. Below is the Prepare
query structure:
// Client Request
Request Command Query
Command: Query (3)
Statement: PREPARE stmt FROM 'select * from t where name'
After the server receives the request, it enters executeStmt
, just like a normal query:
func executeStmt() {
// Prepare is executed directly in frontend
execInFrontend()
}
func execInFrontend() {
// Depending on stmt type, CLI's Prepare enters doPrepareString
switch st := execCtx.stmt.(type) {
case *tree.PrepareString:
// Core code
doPrepareString()
}
}
func doPrepareString() {
// The SQL after prepare is parsed and the plan is built in this function
stmts = mysql.Parse(Sql)
preparePlan = buildPlan()
// Name, stmt, plan, etc., are packed into prepareStmt
SetPrepareStmt(name, prepareStmt)
}
func SetPrepareStmt(name, prepareStmt) {
// Saved in the map
ses.prepareStmts[name] = prepareStmt
}
After successful server processing, an OK
data packet is returned to the client, also through the respClientWhenSuccess
function:
// Server Response
MySQL Protocol - response OK
Packet Length: 8
Packet Number: 1
Response Code: OK Packet (0x00)
Affected Rows: 0
Server Status: 0x0812
Warnings: 0
Core code:
func respClientWhenSuccess() {
respClientWithoutFlush()
}
func respClientWithoutFlush() {
switch execCtx.stmt.StmtKind().RespType() {
// Return as status type
case tree.RESP_STATUS:
respStatus(ses, execCtx)
}
}
func respStatus() {
switch st := execCtx.stmt.(type) {
case *tree.PrepareStmt, *tree.PrepareString:
// Command type is query, not PREPARE
if ses.GetCmd() == COM_STMT_PREPARE {
} else {
// Prepare and send OK packet
resp := setResponse(ses, execCtx.isLastStmt, rspLen)
SendResponse(execCtx.reqCtx, resp)
}
}
}
Set (CLI)
After Prepare
, CLI usually needs to use Set
for variable assignment before executing Execute
. The Set
statement, when entering executeStmt
, is similarly executed in execInFrontend
, falling into the SetVar
case. The data is obtained from the virtual table and stored in the map:
func execInFrontend() {
// Depending on stmt type, CLI's Prepare enters doPrepareString
switch st := execCtx.stmt.(type) {
case *tree.SetVar:
// Core code
doSetVar()
}
}
func doSetVar() {
value := getExprValue()
SetUserDefinedVar(value)
}
func getExprValue() {
// Compose an AST from dual table select
compositedSelect = ...
tempExecCtx := ExecCtx{
reqCtx: execCtx.reqCtx,
ses: ses,
}
// Execute in temporary context
executeStmtInSameSession(tempExecCtx.reqCtx, ses, &tempExecCtx, compositedSelect)
// Extract execution result, i.e., the actual value of the variable
batches := ses.GetResultBatches()
}
func SetUserDefinedVar(value interface{}) {
// User-defined variables are stored in the map
ses.userDefinedVars[strings.ToLower(name)] = &UserDefinedVar{Value: value, Sql: sql}
}
Execute (CLI)
After Prepare
and Set
, Execute
is used to run variables, with the command type in CLI also being query
:
// Client Request
Request Command Query
Command: Query (3)
Statement: EXECUTE stmt using @name
Upon receiving the request, the server first assigns the function for parameter extraction in doComQuery
:
func doComQuery() {
// Set the function used for parameter extraction
proc.SetResolveVariableFunc(ResolveVariable)
}
func ResolveVariable(varName string) {
// Extract user-defined parameters
GetUserDefinedVar(varName)
}
func GetUserDefinedVar(varName string) {
// Extract and return from the map
val, ok := userDefinedVars[strings.ToLower(varName)]
return val
}
Entering executeStmt
, it first compiles the actual execute statement, retrieves the prepared plan from the map, and extracts actual parameters from userDefinedVars
based on variable names. Then it executes the prepared statement with actual parameters.
func Compile() {
// Regular build plan
plan := buildPlan()
// If execute type, replace the plan
if _, ok := cwft.stmt.(*tree.Execute); ok {
plan, stmt, sql := replacePlan(plan)
}
// Continue with subsequent execution after replacement
}
func replacePlan(plan) {
// Read stmt from the map based on the extracted stmtName
stmtName := execPlan.GetName()
prepareStmt := GetPrepareStmt(stmtName)
if prepareStmt.params != nil {
// Params not nil for JDBC case, already extracted during parse
} else {
// CLI execute extracts param values during replacePlan stage
for i, arg := range Plan.Args {
// Call function decided in doComQuery phase to extract params
param = GetResolveVariableFunc()(varName)
paramVals[i] = param
}
SetPrepareParams(prepareStmt.params)
// Bind to compile results
cwft.paramVals = paramVals
return prepareStmt.PreparePlan
}
}
After replacement, the execution plan is obtained and processed normally in executeStmt
. Based on the actual statement, it enters either executeResultRowStmt
or executeStatusStmt
. In CLI, executeResultRowStmt
is the same as directly executing a Select
, sending column counts, column definitions, and all results row by row encoded as length-encoded strings.
func executeResultRowStmt() {
// Send column count and column definitions
respColumnDefsWithoutFlush()
// After execution, send all row data
Run()
}
Prepare (JDBC)
In JDBC, the command type in the packet sent for Prepare
is no longer query
, but prepare statement
. The interaction between client and server also changes as follows:
![Figure 4]( "PREPARE interaction")
The structure of the Prepare
command is as follows:
// Client Request
Request Command Prepare Statement
Command: Prepare Statement (22)
Statement: select
* from t where name = ?
After receiving the data, the server behaves similarly to CLI, ultimately falling into execInFrontend
, but executes *tree.PrepareStmt
instead of *tree.PrepareString
.
func execInFrontend() {
// Depending on stmt type, JDBC's Prepare enters doPrepareStmt
switch st := execCtx.stmt.(type) {
case *tree.PrepareStmt:
// Core code
doPrepareStmt()
}
}
func doPrepareStmt() {
// No need to parse again
preparePlan = buildPlan()
SetPrepareStmt(name, prepareStmt)
}
func SetPrepareStmt(name, prepareStmt) {
// Save to map
ses.prepareStmts[name] = prepareStmt
}
After successful server processing, it enters the SendPrepareResponse
function in respStatus
, which completes the entire PrepareResponse
sending task:
func respStatus() {
switch st := execCtx.stmt.(type) {
case *tree.PrepareStmt, *tree.PrepareString:
// Command type is PREPARE, not query
if ses.GetCmd() == COM_STMT_PREPARE {
SendPrepareResponse()
} else {
}
}
func SendPrepareResponse() {
// `PrepareResponse` first sends an OK packet with param and column count
SendOKPacket()
// Send each param count as ?def and table column definitions
// Both ending with EOF
for i := 0; i < numParams; i++ {
column := new(MysqlColumn)
column.SetName("?")
SendColumnDefinitionPacket()
}
SendEOFPacket()
for i := 0; i < numColumns; i++ {
column := new(MysqlColumn)
column.SetName(columns[i].Name)
SendColumnDefinitionPacket()
}
SendEOFPacket()
}
Execute (JDBC)
In JDBC, the major difference in executing Execute
is that instead of using the Text protocol, it uses Binary for efficiency. The interaction process is as follows:
// Client Request
Request Command Execute Statement
Command: Execute Statement (23)
Statement ID: 1
Flags: Defaults (0)
Iterations (unused): 1
New parameter bound flag: First call or rebound (1)
Parameter
Type: FIELD_TYPE_STRING (254)
Unsigned: 0
Value (String): xx
Upon receiving Command type = Execute
, the server first calls parseStmtExecute
before doComQuery
to parse the actual parameters of the execute statement in binary form and binds them to the current session's prepareStmts
.
func ExecRequest() {
switch req.GetCmd() {
case COM_STMT_EXECUTE:
// Read prepareStmt structure and SQL string
sql, prepareStmt := parseStmtExecute(data)
doComQuery(&UserInput{sql: sql})
}
}
func parseStmtExecute(data) {
// Read stmtID
stmtID := binary.LittleEndian.Uint32(data[0:4])
// Compose stmtName generated during Prepare
stmtName := fmt.Sprintf("%s_%d", prefixPrepareStmtName, stmtID)
// Get stmt's plan, SQL, etc.
preStmt := GetPrepareStmt(stmtName)
sql := fmt.Sprintf("execute %s", stmtName)
// Extract params and bind
ParseExecuteData()
return sql, preStmt
}
func ParseExecuteData() {
// Get the number of params
preStmt = len(preStmt.PreparePlan.ParamTypes)
// Read null bitmap
var nullBitmaps []byte
nullBitmapLen := (numParams + 7) >> 3
nullBitmaps = readCountOfBytes(nullBitmapLen)
// Read each variable based on different parameter types
for i := 0; i < numParams; i++ {
tp := stmt.ParamTypes[i<<1]
switch defines.MysqlType(tp) {
// For example, varchar, read length-encoded string
case defines.MYSQL_TYPE_VARSTRING:
val := readStringLenEnc(data)
// Bind to stmt's vector
SetAnyToStringVector(val, vector)
}
}
}
Similar to CLI, the replacePlan
function performs parameter replacement before executing the plan. The result is then sent in Binary format if the executed statement returns data rows (e.g., select
). The remaining logic is similar, with specific code as follows:
func SendResultSetTextBatchRowSpeedup() {
// Send each row's result data using binary protocol
for i := uint64(0); i < cnt; i++ {
beginPacket()
appendResultSetBinaryRow(resultSet, i)
finishedPacket()
}
}
func appendResultSetBinaryRow() {
// Protocol starts with 1 byte fixed as 0
data = mp.append(data, defines.OKHeader)
// Define null bitmap size based on column count
numBytes4Null := (columnsLength + 7 + 2) / 8
// Check if each column is null
for i := uint64(0); i < columnsLength; i++ {
isNil := ColumnIsNull(i)
if isNil {
// Find the position in the bitmap
bytePos := (i + 2) / 8
bitPos := byte((i + 2) % 8)
idx := int(bytePos)
// Set the bit to 1 to indicate null value
buffer[idx] |= 1 << bitPos
}
}
data = Conn.Append(data, buffer...)
// Add each column sequentially
for i := uint64(0); i < columnsLength; i++ {
column, err := GetColumn(i)
// Choose encoding based on different data types
switch column.ColumnType() {
Conn.Append(column.GetString())
}
}
}
LOAD DATA LOCAL INFILE
The LOAD DATA LOCAL INFILE
command is used for loading data from a local file on the client into a MySQL database table. During the network interaction process, the client first sends a LOAD DATA LOCAL INFILE
query command, which includes the file path, target table, delimiter, and other data. The server responds with a LOCAL INFILE Packet that contains the filename. After receiving this packet, the client begins to send the file data, with an empty packet indicating the end of the data. The overall interaction process is as follows:
![Figure 6]( "LOAD DATA interaction")
The protocol packet structure is as follows:
// Client Request
MySQL Protocol
Packet Length: 153
Packet Number: 0
Request Command Query
Command: Query (3)
Statement: LOAD DATA LOCAL INFILE '/home/test_script/users.csv' INTO TABLE t FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' IGNORE 1 LINES (id, name, email)
// Server Response
MySQL Protocol - local infile
Packet Length: 32
Packet Number: 1
Response Code: LOCAL INFILE Packet (0xfb)
LOCAL INFILE Filename: /home/test_script/users.csv
Upon receiving the request, the server processes it via executeStmt
, which constructs and compiles the statement, determining that it is a LOAD LOCAL
statement. The server first sets up io.Pipe()
for subsequent data stream reading and writing. Then, executeStatusStmt
is executed. Within executeStatusStmt
, it is determined whether the statement is a LOAD
statement, and the process transitions to the processLoadLocal
function to complete the LOAD LOCAL
process:
func executeStmt() {
switch st := stmt.(type) {
case *tree.Load:
if st.Local {
// Create io.Pipe() for subsequent reading and writing of data from client
// Reading logic is in the external external.go file
LoadLocalReader, loadLocalWriter = io.Pipe()
}
}
executeStatusStmt()
}
func executeStatusStmt() {
// Handle Load Local
processLoadLocal()
}
func processLoadLocal() {
// TCP read/write interface
mysqlRrWr := ses.GetResponser().MysqlRrWr()
// Initialize file-related parameters, such as path format
InitInfileParam()
// Send file path to client
WriteLocalInfileRequest(Filepath)
for {
// Continuously read data sent by client until an empty packet is received
msg, err = mysqlRrWr.Read()
if err != nil {
break
}
// writer is a pipeline
writer.Write(msg)
}
}
func WriteLocalInfileRequest(Filepath string) {
// Encode as fixed-length string
req := writeStringFix(filename, len(filename))
// Write data to client
writePackets(req)
}