Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

*: fix context usage for SQL operation #377

Merged
merged 18 commits into from
Dec 9, 2019
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 32 additions & 28 deletions loader/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type CheckPoint interface {
// Load loads all checkpoints recorded before.
// because of no checkpoints updated in memory when error occurred
// when resuming, Load will be called again to load checkpoints
Load() error
Load(tctx *tcontext.Context) error

// GetRestoringFileInfo get restoring data files for table
GetRestoringFileInfo(db, table string) map[string][]int64
Expand All @@ -48,19 +48,19 @@ type CheckPoint interface {
CalcProgress(allFiles map[string]Tables2DataFiles) error

// Init initialize checkpoint data in tidb
Init(filename string, endpos int64) error
Init(tctx *tcontext.Context, filename string, endpos int64) error

// ResetConn resets database connections owned by the Checkpoint
ResetConn() error
ResetConn(tctx *tcontext.Context) error

// Close closes the CheckPoint
Close()

// Clear clears all recorded checkpoints
Clear() error
Clear(tctx *tcontext.Context) error

// Count returns recorded checkpoints' count
Count() (int, error)
Count(tctx *tcontext.Context) (int, error)

// GenSQL generates sql to update checkpoint to DB
GenSQL(filename string, offset int64) string
Expand All @@ -80,13 +80,14 @@ type RemoteCheckPoint struct {
}

func newRemoteCheckPoint(tctx *tcontext.Context, cfg *config.SubTaskConfig, id string) (CheckPoint, error) {
db, dbConns, err := createConns(tctx, cfg, 1)
tctx2, cancel := tctx.WithTimeout(defaultDBContextTimeout)
defer cancel()

db, dbConns, err := createConns(tctx2, cfg, 1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does context with timeout affect resetBaseConnFn in createConns?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, tctx in resetBaseConnFn is passed in (it's a function argument) when reseting connetion like in func (conn *DBConn) resetConn(tctx *tcontext.Context) error.

this timeout context only used to GetBaseConn and log something.

if err != nil {
return nil, err
}

newtctx := tctx.WithLogger(tctx.L().WithFields(zap.String("component", "remote checkpoint")))

cp := &RemoteCheckPoint{
db: db,
conn: dbConns[0],
Expand All @@ -95,36 +96,36 @@ func newRemoteCheckPoint(tctx *tcontext.Context, cfg *config.SubTaskConfig, id s
finishedTables: make(map[string]struct{}),
schema: cfg.MetaSchema,
table: fmt.Sprintf("%s_loader_checkpoint", cfg.Name),
tctx: newtctx,
tctx: tctx.WithLogger(tctx.L().WithFields(zap.String("component", "remote checkpoint"))),
}

err = cp.prepare()
err = cp.prepare(tctx2)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is defaultDBContextTimeout(10 seconds) enough?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sometimes tidb is not fast, espicallly DDL

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, I'm not sure, but this timeout only used for some checkpoint operations and they may not expensive (and if timeout, they can be retried through restring the task). Are there any suggestions for me?

BTW, "better improvement for these operations can be done when moving them to etcd" in the PR's description, and it's better to add context support for all I/O (network) operations later, then we can control the timeout in the upper caller.

if err != nil {
return nil, err
}

return cp, nil
}

func (cp *RemoteCheckPoint) prepare() error {
func (cp *RemoteCheckPoint) prepare(tctx *tcontext.Context) error {
// create schema
if err := cp.createSchema(); err != nil {
if err := cp.createSchema(tctx); err != nil {
return err
}
// create table
if err := cp.createTable(); err != nil {
if err := cp.createTable(tctx); err != nil {
return err
}
return nil
}

func (cp *RemoteCheckPoint) createSchema() error {
func (cp *RemoteCheckPoint) createSchema(tctx *tcontext.Context) error {
sql2 := fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS `%s`", cp.schema)
err := cp.conn.executeSQL(cp.tctx, []string{sql2})
err := cp.conn.executeSQL(tctx, []string{sql2})
return terror.WithScope(err, terror.ScopeDownstream)
}

func (cp *RemoteCheckPoint) createTable() error {
func (cp *RemoteCheckPoint) createTable(tctx *tcontext.Context) error {
tableName := fmt.Sprintf("`%s`.`%s`", cp.schema, cp.table)
createTable := `CREATE TABLE IF NOT EXISTS %s (
id char(32) NOT NULL,
Expand All @@ -139,19 +140,19 @@ func (cp *RemoteCheckPoint) createTable() error {
);
`
sql2 := fmt.Sprintf(createTable, tableName)
err := cp.conn.executeSQL(cp.tctx, []string{sql2})
err := cp.conn.executeSQL(tctx, []string{sql2})
return terror.WithScope(err, terror.ScopeDownstream)
}

// Load implements CheckPoint.Load
func (cp *RemoteCheckPoint) Load() error {
func (cp *RemoteCheckPoint) Load(tctx *tcontext.Context) error {
begin := time.Now()
defer func() {
cp.tctx.L().Info("load checkpoint", zap.Duration("cost time", time.Since(begin)))
}()

query := fmt.Sprintf("SELECT `filename`,`cp_schema`,`cp_table`,`offset`,`end_pos` from `%s`.`%s` where `id`=?", cp.schema, cp.table)
rows, err := cp.conn.querySQL(cp.tctx, query, cp.id)
rows, err := cp.conn.querySQL(tctx, query, cp.id)
if err != nil {
return terror.WithScope(err, terror.ScopeDownstream)
}
Expand Down Expand Up @@ -266,7 +267,7 @@ func (cp *RemoteCheckPoint) allFilesFinished(files map[string][]int64) bool {
}

// Init implements CheckPoint.Init
func (cp *RemoteCheckPoint) Init(filename string, endPos int64) error {
func (cp *RemoteCheckPoint) Init(tctx *tcontext.Context, filename string, endPos int64) error {
idx := strings.Index(filename, ".sql")
if idx < 0 {
return terror.ErrCheckpointInvalidTableFile.Generate(filename)
Expand All @@ -288,7 +289,7 @@ func (cp *RemoteCheckPoint) Init(filename string, endPos int64) error {
zap.Int64("offset", 0),
zap.Int64("end position", endPos))
args := []interface{}{cp.id, filename, fields[0], fields[1], 0, endPos}
err := cp.conn.executeSQL(cp.tctx, []string{sql2}, args)
err := cp.conn.executeSQL(tctx, []string{sql2}, args)
if err != nil {
if isErrDupEntry(err) {
cp.tctx.L().Info("checkpoint record already exists, skip it.", zap.String("id", cp.id), zap.String("filename", filename))
Expand All @@ -300,8 +301,8 @@ func (cp *RemoteCheckPoint) Init(filename string, endPos int64) error {
}

// ResetConn implements CheckPoint.ResetConn
func (cp *RemoteCheckPoint) ResetConn() error {
return cp.conn.resetConn(cp.tctx)
func (cp *RemoteCheckPoint) ResetConn(tctx *tcontext.Context) error {
return cp.conn.resetConn(tctx)
}

// Close implements CheckPoint.Close
Expand All @@ -320,16 +321,16 @@ func (cp *RemoteCheckPoint) GenSQL(filename string, offset int64) string {
}

// Clear implements CheckPoint.Clear
func (cp *RemoteCheckPoint) Clear() error {
func (cp *RemoteCheckPoint) Clear(tctx *tcontext.Context) error {
sql2 := fmt.Sprintf("DELETE FROM `%s`.`%s` WHERE `id` = '%s'", cp.schema, cp.table, cp.id)
err := cp.conn.executeSQL(cp.tctx, []string{sql2})
err := cp.conn.executeSQL(tctx, []string{sql2})
return terror.WithScope(err, terror.ScopeDownstream)
}

// Count implements CheckPoint.Count
func (cp *RemoteCheckPoint) Count() (int, error) {
func (cp *RemoteCheckPoint) Count(tctx *tcontext.Context) (int, error) {
query := fmt.Sprintf("SELECT COUNT(id) FROM `%s`.`%s` WHERE `id` = ?", cp.schema, cp.table)
rows, err := cp.conn.querySQL(cp.tctx, query, cp.id)
rows, err := cp.conn.querySQL(tctx, query, cp.id)
if err != nil {
return 0, terror.WithScope(err, terror.ScopeDownstream)
}
Expand All @@ -349,7 +350,10 @@ func (cp *RemoteCheckPoint) Count() (int, error) {
}

func (cp *RemoteCheckPoint) String() string {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can refine this function later, it used to log, but need visit the database, maybe we can just use information saved in RemoteCheckPoint

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree.

if err := cp.Load(); err != nil {
tctx2, cancel := cp.tctx.WithTimeout(defaultDBContextTimeout)
defer cancel()

if err := cp.Load(tctx2); err != nil {
return err.Error()
}

Expand Down
22 changes: 11 additions & 11 deletions loader/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,26 +76,26 @@ func (t *testCheckPointSuite) TestForDB(c *C) {
c.Assert(err, IsNil)
defer cp.Close()

cp.Clear()
cp.Clear(tctx)

// no checkpoint exist
err = cp.Load()
err = cp.Load(tctx)
c.Assert(err, IsNil)

infos := cp.GetAllRestoringFileInfo()
c.Assert(len(infos), Equals, 0)

count, err := cp.Count()
count, err := cp.Count(tctx)
c.Assert(err, IsNil)
c.Assert(count, Equals, 0)

// insert default checkpoints
for _, cs := range cases {
err = cp.Init(cs.filename, cs.endPos)
err = cp.Init(tctx, cs.filename, cs.endPos)
c.Assert(err, IsNil)
}

err = cp.Load()
err = cp.Load(tctx)
c.Assert(err, IsNil)

infos = cp.GetAllRestoringFileInfo()
Expand All @@ -108,7 +108,7 @@ func (t *testCheckPointSuite) TestForDB(c *C) {
c.Assert(info[1], Equals, cs.endPos)
}

count, err = cp.Count()
count, err = cp.Count(tctx)
c.Assert(err, IsNil)
c.Assert(count, Equals, len(cases))

Expand All @@ -126,7 +126,7 @@ func (t *testCheckPointSuite) TestForDB(c *C) {
c.Assert(err, IsNil)
}

err = cp.Load()
err = cp.Load(tctx)
c.Assert(err, IsNil)

infos = cp.GetAllRestoringFileInfo()
Expand All @@ -139,22 +139,22 @@ func (t *testCheckPointSuite) TestForDB(c *C) {
c.Assert(info[1], Equals, cs.endPos)
}

count, err = cp.Count()
count, err = cp.Count(tctx)
c.Assert(err, IsNil)
c.Assert(count, Equals, len(cases))

// clear all
cp.Clear()
cp.Clear(tctx)

// no checkpoint exist
err = cp.Load()
err = cp.Load(tctx)
c.Assert(err, IsNil)

infos = cp.GetAllRestoringFileInfo()
c.Assert(len(infos), Equals, 0)

// obtain count again
count, err = cp.Count()
count, err = cp.Count(tctx)
c.Assert(err, IsNil)
c.Assert(count, Equals, 0)
}
Loading