Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

Commit

Permalink
backend: update mysql backend to tidb backend (#228)
Browse files Browse the repository at this point in the history
* update mysql backend to tidb backend

* fix test
  • Loading branch information
WangXiangUSTC authored and kennytm committed Aug 21, 2019
1 parent 1577f31 commit ce5fa5f
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 41 deletions.
64 changes: 32 additions & 32 deletions lightning/backend/mysql.go → lightning/backend/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,34 +32,34 @@ import (
"github.com/pingcap/tidb-lightning/lightning/verification"
)

type mysqlRow string
type tidbRow string

type mysqlRows []mysqlRow
type tidbRows []tidbRow

type mysqlEncoder struct {
type tidbEncoder struct {
mode mysql.SQLMode
}

type mysqlBackend struct {
type tidbBackend struct {
db *sql.DB
}

// NewMySQLBackend creates a new MySQL backend using the given database.
// NewTiDBBackend creates a new TiDB backend using the given database.
//
// The backend does not take ownership of `db`. Caller should close `db`
// manually after the backend expired.
func NewMySQLBackend(db *sql.DB) Backend {
return MakeBackend(&mysqlBackend{db: db})
func NewTiDBBackend(db *sql.DB) Backend {
return MakeBackend(&tidbBackend{db: db})
}

func (row mysqlRow) ClassifyAndAppend(data *Rows, checksum *verification.KVChecksum, _ *Rows, _ *verification.KVChecksum) {
rows := (*data).(mysqlRows)
*data = mysqlRows(append(rows, row))
func (row tidbRow) ClassifyAndAppend(data *Rows, checksum *verification.KVChecksum, _ *Rows, _ *verification.KVChecksum) {
rows := (*data).(tidbRows)
*data = tidbRows(append(rows, row))
cs := verification.MakeKVChecksum(uint64(len(row)), 1, 0)
checksum.Add(&cs)
}

func (rows mysqlRows) SplitIntoChunks(splitSize int) []Rows {
func (rows tidbRows) SplitIntoChunks(splitSize int) []Rows {
if len(rows) == 0 {
return nil
}
Expand All @@ -80,11 +80,11 @@ func (rows mysqlRows) SplitIntoChunks(splitSize int) []Rows {
return append(res, rows[i:])
}

func (rows mysqlRows) Clear() Rows {
func (rows tidbRows) Clear() Rows {
return rows[:0]
}

func (enc mysqlEncoder) appendSQLBytes(sb *strings.Builder, value []byte) {
func (enc tidbEncoder) appendSQLBytes(sb *strings.Builder, value []byte) {
sb.Grow(2 + len(value))
sb.WriteByte('\'')
if enc.mode.HasNoBackslashEscapesMode() {
Expand Down Expand Up @@ -124,7 +124,7 @@ func (enc mysqlEncoder) appendSQLBytes(sb *strings.Builder, value []byte) {

// appendSQL appends the SQL representation of the Datum into the string builder.
// Note that we cannot use Datum.ToString since it doesn't perform SQL escaping.
func (enc mysqlEncoder) appendSQL(sb *strings.Builder, datum *types.Datum) error {
func (enc tidbEncoder) appendSQL(sb *strings.Builder, datum *types.Datum) error {
switch datum.Kind() {
case types.KindNull:
sb.WriteString("NULL")
Expand Down Expand Up @@ -192,9 +192,9 @@ func (enc mysqlEncoder) appendSQL(sb *strings.Builder, datum *types.Datum) error
return nil
}

func (mysqlEncoder) Close() {}
func (tidbEncoder) Close() {}

func (enc mysqlEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, _ []int) (Row, error) {
func (enc tidbEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, _ []int) (Row, error) {
var encoded strings.Builder
encoded.Grow(8 * len(row))
encoded.WriteByte('(')
Expand All @@ -203,7 +203,7 @@ func (enc mysqlEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, _
encoded.WriteByte(',')
}
if err := enc.appendSQL(&encoded, &field); err != nil {
logger.Error("mysql encode failed",
logger.Error("tidb encode failed",
zap.Array("original", rowArrayMarshaler(row)),
zap.Int("originalCol", i),
log.ShortError(err),
Expand All @@ -212,52 +212,52 @@ func (enc mysqlEncoder) Encode(logger log.Logger, row []types.Datum, _ int64, _
}
}
encoded.WriteByte(')')
return mysqlRow(encoded.String()), nil
return tidbRow(encoded.String()), nil
}

func (be *mysqlBackend) Close() {
func (be *tidbBackend) Close() {
// *Not* going to close `be.db`. The db object is normally borrowed from a
// TidbManager, so we let the manager to close it.
}

func (be *mysqlBackend) MakeEmptyRows() Rows {
return mysqlRows(nil)
func (be *tidbBackend) MakeEmptyRows() Rows {
return tidbRows(nil)
}

func (be *mysqlBackend) RetryImportDelay() time.Duration {
func (be *tidbBackend) RetryImportDelay() time.Duration {
return 0
}

func (be *mysqlBackend) MaxChunkSize() int {
func (be *tidbBackend) MaxChunkSize() int {
return 1048576
}

func (be *mysqlBackend) ShouldPostProcess() bool {
func (be *tidbBackend) ShouldPostProcess() bool {
return false
}

func (be *mysqlBackend) NewEncoder(_ table.Table, mode mysql.SQLMode) Encoder {
return mysqlEncoder{mode: mode}
func (be *tidbBackend) NewEncoder(_ table.Table, mode mysql.SQLMode) Encoder {
return tidbEncoder{mode: mode}
}

func (be *mysqlBackend) OpenEngine(context.Context, uuid.UUID) error {
func (be *tidbBackend) OpenEngine(context.Context, uuid.UUID) error {
return nil
}

func (be *mysqlBackend) CloseEngine(context.Context, uuid.UUID) error {
func (be *tidbBackend) CloseEngine(context.Context, uuid.UUID) error {
return nil
}

func (be *mysqlBackend) CleanupEngine(context.Context, uuid.UUID) error {
func (be *tidbBackend) CleanupEngine(context.Context, uuid.UUID) error {
return nil
}

func (be *mysqlBackend) ImportEngine(context.Context, uuid.UUID) error {
func (be *tidbBackend) ImportEngine(context.Context, uuid.UUID) error {
return nil
}

func (be *mysqlBackend) WriteRows(ctx context.Context, _ uuid.UUID, tableName string, columnNames []string, _ uint64, r Rows) error {
rows := r.(mysqlRows)
func (be *tidbBackend) WriteRows(ctx context.Context, _ uuid.UUID, tableName string, columnNames []string, _ uint64, r Rows) error {
rows := r.(tidbRows)
if len(rows) == 0 {
return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (s *mysqlSuite) SetUpTest(c *C) {
c.Assert(err, IsNil)

s.mockDB = mock
s.backend = kv.NewMySQLBackend(db)
s.backend = kv.NewTiDBBackend(db)
}

func (s *mysqlSuite) TearDownTest(c *C) {
Expand Down
6 changes: 3 additions & 3 deletions lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ const (
// NormalMode defines mode of normal for tikv.
NormalMode = "normal"

// BackendMySQL is a constant for choosing the "MySQL" backend in the configuration.
BackendMySQL = "mysql"
// BackendTiDB is a constant for choosing the "TiDB" backend in the configuration.
BackendTiDB = "tidb"
// BackendImporter is a constant for choosing the "Importer" backend in the configuration.
BackendImporter = "importer"

Expand Down Expand Up @@ -308,7 +308,7 @@ func (cfg *Config) Adjust() error {

cfg.TikvImporter.Backend = strings.ToLower(cfg.TikvImporter.Backend)
switch cfg.TikvImporter.Backend {
case BackendMySQL, BackendImporter:
case BackendTiDB, BackendImporter:
default:
return errors.Errorf("invalid config: unsupported `tikv-importer.backend` (%s)", cfg.TikvImporter.Backend)
}
Expand Down
4 changes: 2 additions & 2 deletions lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ func NewRestoreController(ctx context.Context, dbMetas []*mydump.MDDatabaseMeta,
if err != nil {
return nil, err
}
case config.BackendMySQL:
backend = kv.NewMySQLBackend(tidbMgr.db)
case config.BackendTiDB:
backend = kv.NewTiDBBackend(tidbMgr.db)
default:
return nil, errors.New("unknown backend: " + cfg.TikvImporter.Backend)
}
Expand Down
2 changes: 1 addition & 1 deletion tests/csv/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

set -eu

for BACKEND in importer mysql; do
for BACKEND in importer tidb; do

run_sql 'DROP DATABASE IF EXISTS csv'

Expand Down
2 changes: 1 addition & 1 deletion tests/various_types/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

set -eu

for BACKEND in importer mysql; do
for BACKEND in importer tidb; do

run_sql 'DROP DATABASE IF EXISTS vt;'
run_lightning config --backend $BACKEND
Expand Down
2 changes: 1 addition & 1 deletion tidb-lightning.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ driver = "file"
#keep-after-success = false

[tikv-importer]
# Delivery backend, can be "importer" or "mysql".
# Delivery backend, can be "importer" or "tidb".
backend = "importer"
# Address of tikv-importer when the backend is 'importer'
addr = "127.0.0.1:8287"
Expand Down

0 comments on commit ce5fa5f

Please sign in to comment.