Skip to content

Commit

Permalink
importinto: support import from stale read (#50852)
Browse files Browse the repository at this point in the history
ref #49883
  • Loading branch information
D3Hunter authored Feb 1, 2024
1 parent efe8523 commit 9bad202
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 6 deletions.
2 changes: 1 addition & 1 deletion pkg/executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -980,7 +980,7 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, e exec.Executor) (
// Check if "tidb_snapshot" is set for the write executors.
// In history read mode, we can not do write operations.
switch e.(type) {
case *DeleteExec, *InsertExec, *UpdateExec, *ReplaceExec, *LoadDataExec, *DDLExec:
case *DeleteExec, *InsertExec, *UpdateExec, *ReplaceExec, *LoadDataExec, *DDLExec, *ImportIntoExec:
snapshotTS := sctx.GetSessionVars().SnapshotTS
if snapshotTS != 0 {
return nil, errors.New("can not execute write statement when 'tidb_snapshot' is set")
Expand Down
6 changes: 4 additions & 2 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1016,13 +1016,15 @@ func (b *executorBuilder) buildInsert(v *plannercore.Insert) exec.Executor {
}

func (b *executorBuilder) buildImportInto(v *plannercore.ImportInto) exec.Executor {
tbl, ok := b.is.TableByID(v.Table.TableInfo.ID)
// see planBuilder.buildImportInto for detail why we use the latest schema here.
latestIS := b.ctx.GetDomainInfoSchema().(infoschema.InfoSchema)
tbl, ok := latestIS.TableByID(v.Table.TableInfo.ID)
if !ok {
b.err = errors.Errorf("Can not get table %d", v.Table.TableInfo.ID)
return nil
}
if !tbl.Meta().IsBaseTable() {
b.err = plannererrors.ErrNonUpdatableTable.GenWithStackByArgs(tbl.Meta().Name.O, "LOAD")
b.err = plannererrors.ErrNonUpdatableTable.GenWithStackByArgs(tbl.Meta().Name.O, "IMPORT")
return nil
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/executor/import_into.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,9 @@ func (e *ImportIntoExec) doImport(ctx context.Context, se sessionctx.Context, di

func (e *ImportIntoExec) importFromSelect(ctx context.Context) error {
e.dataFilled = true
// must use a new session to pre-check, else the stmt in show processlist will be changed.
// must use a new session as:
// - pre-check will execute other sql, the stmt in show processlist will be changed.
// - userSctx might be in stale read, we cannot do write.
newSCtx, err2 := CreateSession(e.userSctx)
if err2 != nil {
return err2
Expand Down Expand Up @@ -334,7 +336,7 @@ func (e *ImportIntoExec) importFromSelect(ctx context.Context) error {
return err
}

if err2 = flushStats(ctx, e.userSctx, e.importPlan.TableInfo.ID, importResult); err2 != nil {
if err2 = flushStats(ctx, newSCtx, e.importPlan.TableInfo.ID, importResult); err2 != nil {
logutil.Logger(ctx).Error("flush stats failed", zap.Error(err2))
}

Expand Down
14 changes: 14 additions & 0 deletions pkg/planner/core/plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ import (
"bytes"
"fmt"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/expression/aggregation"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
Expand Down Expand Up @@ -720,4 +722,16 @@ func TestImportIntoBuildPlan(t *testing.T) {
plannererrors.ErrWrongValueCountOnRow)
require.ErrorIs(t, tk.ExecToErr("IMPORT INTO t1(a) FROM select * from t2;"),
plannererrors.ErrWrongValueCountOnRow)

time.Sleep(100 * time.Millisecond)
now := tk.MustQuery("select now(6)").Rows()[0][0].(string)
time.Sleep(100 * time.Millisecond)
tk.MustExec("create table t3 (a int, b int);")
// set tidb_snapshot will fail without this
tk.MustExec(`replace into mysql.tidb(variable_name, variable_value) values ('tikv_gc_safe_point', '20240131-00:00:00.000 +0800')`)
tk.MustExec("set tidb_snapshot = '" + now + "'")
require.ErrorContains(t, tk.ExecToErr("IMPORT INTO t1 FROM select * from t2"),
"can not execute write statement when 'tidb_snapshot' is set")
require.ErrorIs(t, tk.ExecToErr("IMPORT INTO t3 FROM select * from t2"),
infoschema.ErrTableNotExists)
}
16 changes: 15 additions & 1 deletion pkg/planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4217,8 +4217,22 @@ func (b *PlanBuilder) buildImportInto(ctx context.Context, ld *ast.ImportIntoStm
b.visitInfo = appendVisitInfo(b.visitInfo, mysql.FilePriv, "", "", "", plannererrors.ErrSpecificAccessDenied.GenWithStackByArgs("FILE"))
}
tableInfo := p.Table.TableInfo
tableInPlan, ok := b.is.TableByID(tableInfo.ID)
// we use the latest IS to support IMPORT INTO dst FROM SELECT * FROM src AS OF TIMESTAMP '2020-01-01 00:00:00'
// Note: we need to get p.Table when preprocessing, at that time, IS of session
// transaction is used, if the session ctx is already in snapshot read using tidb_snapshot, we might
// not get the schema or get a stale schema of the target table, so we don't
// support set 'tidb_snapshot' first and then import into the target table.
//
// tidb_read_staleness can be used to do stale read too, it's allowed as long as
// tableInfo.ID matches with the latest schema.
latestIS := b.ctx.GetDomainInfoSchema().(infoschema.InfoSchema)
tableInPlan, ok := latestIS.TableByID(tableInfo.ID)
if !ok {
// adaptor.handleNoDelayExecutor has a similar check, but we want to give
// a more specific error message here.
if b.ctx.GetSessionVars().SnapshotTS != 0 {
return nil, errors.New("can not execute IMPORT statement when 'tidb_snapshot' is set")
}
db := b.ctx.GetSessionVars().CurrentDB
return nil, infoschema.ErrTableNotExists.GenWithStackByArgs(db, tableInfo.Name.O)
}
Expand Down
66 changes: 66 additions & 0 deletions tests/realtikvtest/importintotest2/from_select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ import (
"fmt"
"slices"
"strings"
"time"

"github.com/pingcap/tidb/pkg/executor/importer"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/util/dbterror/plannererrors"
)
Expand Down Expand Up @@ -88,3 +90,67 @@ func (s *mockGCSSuite) TestWriteAfterImportFromSelect() {
s.tk.MustExec("insert into dt values(4, 'aaaaaa'), (5, 'bbbbbb'), (6, 'cccccc'), (7, 'dddddd')")
s.testWriteAfterImport(`import into t FROM select * from from_select.dt`, importer.DataSourceTypeQuery)
}

func (s *mockGCSSuite) TestImportFromSelectStaleRead() {
s.prepareAndUseDB("from_select")
// set tidb_snapshot might fail without this, not familiar about this part.
s.tk.MustExec(`replace into mysql.tidb(variable_name, variable_value) values ('tikv_gc_safe_point', '20240131-00:00:00.000 +0800')`)
s.tk.MustExec("create table src(id int, v varchar(64))")
s.tk.MustExec("insert into src values(1, 'a')")
time.Sleep(100 * time.Millisecond)
now := s.tk.MustQuery("select now(6)").Rows()[0][0].(string)
time.Sleep(100 * time.Millisecond)
s.tk.MustExec("insert into src values(2, 'b')")
s.tk.MustQuery("select * from src").Check(testkit.Rows("1 a", "2 b"))
staleReadSQL := fmt.Sprintf("select * from src as of timestamp '%s'", now)
s.tk.MustQuery(staleReadSQL).Check(testkit.Rows("1 a"))
s.tk.MustExec("create table dst(id int, v varchar(64))")

//
// in below cases, dst table not exists at time 'now'
//
// using set tidb_snapshot
s.tk.MustExec("set tidb_snapshot = '" + now + "'")
s.ErrorIs(s.tk.ExecToErr("import into dst from "+staleReadSQL), infoschema.ErrTableNotExists)
s.ErrorIs(s.tk.ExecToErr("import into dst from select * from src"), infoschema.ErrTableNotExists)
// using AS OF TIMESTAMP
s.tk.MustExec("set tidb_snapshot = ''")
s.tk.MustExec("import into dst from " + staleReadSQL)
s.tk.MustQuery("select * from dst").Check(testkit.Rows("1 a"))

//
// in below cases, table exists at time 'now', and it's the latest version too.
//
s.tk.MustExec("truncate table dst")
time.Sleep(100 * time.Millisecond)
now = s.tk.MustQuery("select now(6)").Rows()[0][0].(string)
time.Sleep(100 * time.Millisecond)
staleReadSQL = fmt.Sprintf("select * from src as of timestamp '%s'", now)
s.tk.MustExec("insert into src values(3, 'c')")
s.tk.MustQuery("select * from src").Check(testkit.Rows("1 a", "2 b", "3 c"))
// using set tidb_snapshot
s.tk.MustExec("set tidb_snapshot = '" + now + "'")
s.ErrorContains(s.tk.ExecToErr("import into dst from "+staleReadSQL),
"can not execute write statement when 'tidb_snapshot' is set")
s.ErrorContains(s.tk.ExecToErr("import into dst from select * from src"),
"can not execute write statement when 'tidb_snapshot' is set")
// using AS OF TIMESTAMP
s.tk.MustExec("set tidb_snapshot = ''")
s.tk.MustExec("import into dst from " + staleReadSQL)
s.tk.MustQuery("select * from dst").Check(testkit.Rows("1 a", "2 b"))

//
// in below cases, table exists at time 'now', and it's NOT the latest version.
//
s.tk.MustExec("truncate table dst")
// using set tidb_snapshot
s.tk.MustExec("set tidb_snapshot = '" + now + "'")
s.ErrorContains(s.tk.ExecToErr("import into dst from "+staleReadSQL),
"can not execute IMPORT statement when 'tidb_snapshot' is set")
s.ErrorContains(s.tk.ExecToErr("import into dst from select * from src"),
"can not execute IMPORT statement when 'tidb_snapshot' is set")
// using AS OF TIMESTAMP
s.tk.MustExec("set tidb_snapshot = ''")
s.tk.MustExec("import into dst from " + staleReadSQL)
s.tk.MustQuery("select * from dst").Check(testkit.Rows("1 a", "2 b"))
}

0 comments on commit 9bad202

Please sign in to comment.