Skip to content

Commit

Permalink
sink(ticdc): enable genUpdateSQLFast in TiDB conn (pingcap#8191),(pin…
Browse files Browse the repository at this point in the history
  • Loading branch information
amyangfei committed Apr 3, 2023
1 parent 12bc280 commit 4ae8d27
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 5 deletions.
43 changes: 43 additions & 0 deletions cdc/sink/mysql/helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package mysql

import (
"context"
"database/sql"

dmysql "github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
"github.com/pingcap/log"
tmysql "github.com/pingcap/tidb/parser/mysql"
"go.uber.org/zap"
)

// CheckIsTiDB checks if the db connects to a TiDB.
func CheckIsTiDB(ctx context.Context, db *sql.DB) (bool, error) {
var tidbVer string
row := db.QueryRowContext(ctx, "select tidb_version()")
err := row.Scan(&tidbVer)
if err != nil {
log.Error("check tidb version error", zap.Error(err))
// downstream is not TiDB, do nothing
if mysqlErr, ok := errors.Cause(err).(*dmysql.MySQLError); ok &&
(mysqlErr.Number == tmysql.ErrNoDB ||
mysqlErr.Number == tmysql.ErrSpDoesNotExist) {
return false, nil
}
return false, errors.Trace(err)
}
return true, nil
}
27 changes: 22 additions & 5 deletions cdc/sink/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,12 @@ func NewMySQLSink(
return nil, err
}

isTiDB, err := CheckIsTiDB(ctx, db)
if err != nil {
return nil, err
}
params.isTiDB = isTiDB

log.Info("Start mysql sink")

db.SetMaxIdleConns(params.workerCount)
Expand Down Expand Up @@ -916,11 +922,22 @@ func (s *mysqlSink) batchSingleTxnDmls(
// handle update
if len(updateRows) > 0 {
// TODO: use sql.GenUpdateSQL to generate update sql after we optimize the func.
for _, rows := range updateRows {
for _, row := range rows {
sql, value := row.GenSQL(sqlmodel.DMLUpdate)
sqls = append(sqls, sql)
values = append(values, value)
if s.params.isTiDB {
for _, rows := range updateRows {
s, v := s.genUpdateSQL(rows...)
sqls = append(sqls, s...)
values = append(values, v...)
}
} else {
// The behavior of batch update statement differs between TiDB and MySQL.
// So we don't use batch update statement when downstream is MySQL.
// Ref:https://docs.pingcap.com/tidb/stable/sql-statement-update#mysql-compatibility
for _, rows := range updateRows {
for _, row := range rows {
sql, value := row.GenSQL(sqlmodel.DMLUpdate)
sqls = append(sqls, sql)
values = append(values, value)
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions cdc/sink/mysql/mysql_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ type sinkParams struct {
tls string
batchDMLEnable bool
batchUpdateRowCount int
isTiDB bool
}

func (s *sinkParams) Clone() *sinkParams {
Expand Down

0 comments on commit 4ae8d27

Please sign in to comment.