Skip to content

Commit

Permalink
loader: fix concurrent usage of checkpoint's DBconn. (pingcap#552)
Browse files Browse the repository at this point in the history
  • Loading branch information
csuzhangxc authored Mar 18, 2020
1 parent 478e6c6 commit 4f6c54b
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 4 deletions.
21 changes: 20 additions & 1 deletion loader/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"encoding/json"
"fmt"
"strings"
"sync"
"time"

"github.com/pingcap/dm/dm/config"
Expand Down Expand Up @@ -69,6 +70,10 @@ type CheckPoint interface {
// RemoteCheckPoint implements CheckPoint by saving status in remote database system, mostly in TiDB.
// it's not thread-safe
type RemoteCheckPoint struct {
// used to protect database operation with `conn`.
// if more operations need to be protected, add another mutex or rename this one.
connMutex sync.Mutex

db *conn.BaseDB
conn *DBConn
id string
Expand Down Expand Up @@ -118,7 +123,9 @@ func (cp *RemoteCheckPoint) prepare(tctx *tcontext.Context) error {

func (cp *RemoteCheckPoint) createSchema(tctx *tcontext.Context) error {
sql2 := fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS `%s`", cp.schema)
cp.connMutex.Lock()
err := cp.conn.executeSQL(tctx, []string{sql2})
cp.connMutex.Unlock()
return terror.WithScope(err, terror.ScopeDownstream)
}

Expand All @@ -137,7 +144,9 @@ func (cp *RemoteCheckPoint) createTable(tctx *tcontext.Context) error {
);
`
sql2 := fmt.Sprintf(createTable, tableName)
cp.connMutex.Lock()
err := cp.conn.executeSQL(tctx, []string{sql2})
cp.connMutex.Unlock()
return terror.WithScope(err, terror.ScopeDownstream)
}

Expand All @@ -149,7 +158,9 @@ func (cp *RemoteCheckPoint) Load(tctx *tcontext.Context) error {
}()

query := fmt.Sprintf("SELECT `filename`,`cp_schema`,`cp_table`,`offset`,`end_pos` from `%s`.`%s` where `id`=?", cp.schema, cp.table)
cp.connMutex.Lock()
rows, err := cp.conn.querySQL(tctx, query, cp.id)
cp.connMutex.Unlock()
if err != nil {
return terror.WithScope(err, terror.ScopeDownstream)
}
Expand Down Expand Up @@ -277,7 +288,7 @@ func (cp *RemoteCheckPoint) Init(tctx *tcontext.Context, filename string, endPos

// fields[0] -> db name, fields[1] -> table name
sql2 := fmt.Sprintf("INSERT INTO `%s`.`%s` (`id`, `filename`, `cp_schema`, `cp_table`, `offset`, `end_pos`) VALUES(?,?,?,?,?,?)", cp.schema, cp.table)
cp.logCtx.L().Debug("initial checkpoint record",
cp.logCtx.L().Info("initial checkpoint record",
zap.String("sql", sql2),
zap.String("id", cp.id),
zap.String("filename", filename),
Expand All @@ -286,7 +297,9 @@ func (cp *RemoteCheckPoint) Init(tctx *tcontext.Context, filename string, endPos
zap.Int64("offset", 0),
zap.Int64("end position", endPos))
args := []interface{}{cp.id, filename, fields[0], fields[1], 0, endPos}
cp.connMutex.Lock()
err := cp.conn.executeSQL(tctx, []string{sql2}, args)
cp.connMutex.Unlock()
if err != nil {
if isErrDupEntry(err) {
cp.logCtx.L().Info("checkpoint record already exists, skip it.", zap.String("id", cp.id), zap.String("filename", filename))
Expand All @@ -299,6 +312,8 @@ func (cp *RemoteCheckPoint) Init(tctx *tcontext.Context, filename string, endPos

// ResetConn implements CheckPoint.ResetConn
func (cp *RemoteCheckPoint) ResetConn(tctx *tcontext.Context) error {
cp.connMutex.Lock()
defer cp.connMutex.Unlock()
return cp.conn.resetConn(tctx)
}

Expand All @@ -320,14 +335,18 @@ func (cp *RemoteCheckPoint) GenSQL(filename string, offset int64) string {
// Clear implements CheckPoint.Clear
func (cp *RemoteCheckPoint) Clear(tctx *tcontext.Context) error {
sql2 := fmt.Sprintf("DELETE FROM `%s`.`%s` WHERE `id` = '%s'", cp.schema, cp.table, cp.id)
cp.connMutex.Lock()
err := cp.conn.executeSQL(tctx, []string{sql2})
cp.connMutex.Unlock()
return terror.WithScope(err, terror.ScopeDownstream)
}

// Count implements CheckPoint.Count
func (cp *RemoteCheckPoint) Count(tctx *tcontext.Context) (int, error) {
query := fmt.Sprintf("SELECT COUNT(id) FROM `%s`.`%s` WHERE `id` = ?", cp.schema, cp.table)
cp.connMutex.Lock()
rows, err := cp.conn.querySQL(tctx, query, cp.id)
cp.connMutex.Unlock()
if err != nil {
return 0, terror.WithScope(err, terror.ScopeDownstream)
}
Expand Down
6 changes: 3 additions & 3 deletions loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func (w *Worker) dispatchSQL(ctx context.Context, file string, offset int64, tab
baseFile := filepath.Base(file)
err = w.checkPoint.Init(w.tctx.WithContext(ctx), baseFile, finfo.Size())
if err != nil {
w.tctx.L().Error("fail to initial checkpoint", zap.String("data file", file), log.ShortError(err))
w.tctx.L().Error("fail to initial checkpoint", zap.String("data file", file), zap.Int64("offset", offset), log.ShortError(err))
return err
}

Expand All @@ -273,7 +273,7 @@ func (w *Worker) dispatchSQL(ctx context.Context, file string, offset int64, tab
for {
select {
case <-ctx.Done():
w.tctx.L().Info("sql dispatcher is ready to quit.", zap.String("data file", file))
w.tctx.L().Info("sql dispatcher is ready to quit.", zap.String("data file", file), zap.Int64("offset", offset))
return nil
default:
// do nothing
Expand All @@ -282,7 +282,7 @@ func (w *Worker) dispatchSQL(ctx context.Context, file string, offset int64, tab
cur += int64(len(line))

if err == io.EOF {
w.tctx.L().Info("data are scanned finished.", zap.String("data file", file))
w.tctx.L().Info("data are scanned finished.", zap.String("data file", file), zap.Int64("offset", offset))
break
}

Expand Down

0 comments on commit 4f6c54b

Please sign in to comment.