From 7c3a22e48db9c866f1d5f983a81b4e183daa00e1 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Mon, 13 Mar 2023 22:24:39 +0800 Subject: [PATCH] *: implement SHOW LOAD DATA JOB (#42118) ref pingcap/tidb#40499 --- errno/errcode.go | 1 + errno/errname.go | 1 + errors.toml | 5 + executor/BUILD.bazel | 2 + executor/asyncloaddata/BUILD.bazel | 2 + executor/asyncloaddata/show_test.go | 178 +++++++++++++++++++++++++++- executor/asyncloaddata/util.go | 70 +++++++++-- executor/asyncloaddata/util_test.go | 114 +++++++++++------- executor/builder.go | 1 + executor/importer/import.go | 9 +- executor/importer/import_test.go | 2 +- executor/load_data.go | 17 ++- executor/show.go | 70 +++++++++++ planner/core/logical_plans.go | 2 + planner/core/planbuilder.go | 10 ++ util/dbterror/exeerrors/errors.go | 1 + 16 files changed, 414 insertions(+), 71 deletions(-) diff --git a/errno/errcode.go b/errno/errcode.go index f32d2a2ec28a6..d17b676129975 100644 --- a/errno/errcode.go +++ b/errno/errcode.go @@ -1058,6 +1058,7 @@ const ( ErrInvalidOptionVal = 8164 ErrDuplicateOption = 8165 ErrLoadDataUnsupportedOption = 8166 + ErrLoadDataJobNotFound = 8170 // Error codes used by TiDB ddl package ErrUnsupportedDDLOperation = 8200 diff --git a/errno/errname.go b/errno/errname.go index b3ad4ae690384..a4359be68cb2f 100644 --- a/errno/errname.go +++ b/errno/errname.go @@ -1052,6 +1052,7 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{ ErrInvalidOptionVal: mysql.Message("Invalid option value for %s", nil), ErrDuplicateOption: mysql.Message("Option %s specified more than once", nil), ErrLoadDataUnsupportedOption: mysql.Message("Unsupported option %s for %s import mode", nil), + ErrLoadDataJobNotFound: mysql.Message("Job ID %d doesn't exist", nil), ErrWarnOptimizerHintInvalidInteger: mysql.Message("integer value is out of range in '%s'", nil), ErrWarnOptimizerHintUnsupportedHint: mysql.Message("Optimizer hint %s is not supported by TiDB and is ignored", nil), diff --git a/errors.toml b/errors.toml index b41e3b05286a8..9d7c899f4c69e 100644 --- a/errors.toml +++ b/errors.toml @@ -1731,6 +1731,11 @@ error = ''' Unsupported option %s for %s import mode ''' +["executor:8170"] +error = ''' +Job ID %d doesn't exist +''' + ["executor:8212"] error = ''' Failed to split region ranges: %s diff --git a/executor/BUILD.bazel b/executor/BUILD.bazel index a72502a96b2b3..21f274b9ad4af 100644 --- a/executor/BUILD.bazel +++ b/executor/BUILD.bazel @@ -103,6 +103,7 @@ go_library( "//br/pkg/lightning/mydump", "//br/pkg/storage", "//br/pkg/task", + "//br/pkg/utils", "//config", "//ddl", "//ddl/label", @@ -212,6 +213,7 @@ go_library( "//util/topsql/state", "//util/tracing", "@com_github_burntsushi_toml//:toml", + "@com_github_docker_go_units//:go-units", "@com_github_gogo_protobuf//proto", "@com_github_ngaut_pools//:pools", "@com_github_opentracing_basictracer_go//:basictracer-go", diff --git a/executor/asyncloaddata/BUILD.bazel b/executor/asyncloaddata/BUILD.bazel index 43f34e52d59e9..31631885c97cb 100644 --- a/executor/asyncloaddata/BUILD.bazel +++ b/executor/asyncloaddata/BUILD.bazel @@ -11,7 +11,9 @@ go_library( deps = [ "//kv", "//parser/terror", + "//types", "//util/chunk", + "//util/dbterror/exeerrors", "//util/sqlexec", "@com_github_pingcap_errors//:errors", "@com_github_tikv_client_go_v2//util", diff --git a/executor/asyncloaddata/show_test.go b/executor/asyncloaddata/show_test.go index d77fd2566cc9a..09b73086809f3 100644 --- a/executor/asyncloaddata/show_test.go +++ b/executor/asyncloaddata/show_test.go @@ -17,6 +17,7 @@ package asyncloaddata_test import ( "context" "fmt" + "strconv" "sync" "testing" "time" @@ -79,6 +80,104 @@ func (s *mockGCSSuite) enableFailpoint(path, term string) { }) } +type expectedRecord struct { + jobID string + dataSource string + targetTable string + importMode string + createdBy string + jobState string + jobStatus string + sourceFileSize string + loadedFileSize string + resultCode string + resultMessage string +} + +func (r *expectedRecord) checkIgnoreTimes(t *testing.T, row []interface{}) { + require.Equal(t, r.jobID, row[0]) + require.Equal(t, r.dataSource, row[4]) + require.Equal(t, r.targetTable, row[5]) + require.Equal(t, r.importMode, row[6]) + require.Equal(t, r.createdBy, row[7]) + require.Equal(t, r.jobState, row[8]) + require.Equal(t, r.jobStatus, row[9]) + require.Equal(t, r.sourceFileSize, row[10]) + require.Equal(t, r.loadedFileSize, row[11]) + require.Equal(t, r.resultCode, row[12]) + require.Equal(t, r.resultMessage, row[13]) +} + +func (r *expectedRecord) check(t *testing.T, row []interface{}) { + r.checkIgnoreTimes(t, row) + require.NotEmpty(t, row[1]) + require.NotEmpty(t, row[2]) + require.NotEmpty(t, row[3]) +} + +func (s *mockGCSSuite) TestSimpleShowLoadDataJobs() { + s.tk.MustExec("DROP DATABASE IF EXISTS test_show;") + s.tk.MustExec("CREATE DATABASE test_show;") + s.tk.MustExec("CREATE TABLE test_show.t (i INT PRIMARY KEY);") + s.server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{ + BucketName: "test-show", + Name: "t.tsv", + }, + Content: []byte(`1 +2`), + }) + + user := &auth.UserIdentity{ + AuthUsername: "test-load-2", + AuthHostname: "test-host", + } + s.tk.Session().GetSessionVars().User = user + + sql := fmt.Sprintf(`LOAD DATA INFILE 'gs://test-show/t.tsv?endpoint=%s' + INTO TABLE test_show.t;`, gcsEndpoint) + s.tk.MustExec(sql) + + rows := s.tk.MustQuery("SHOW LOAD DATA JOBS;").Rows() + require.Len(s.T(), rows, 1) + row := rows[0] + id := row[0].(string) + r := expectedRecord{ + jobID: id, + dataSource: "gs://test-show/t.tsv", + targetTable: "`test_show`.`t`", + importMode: "logical", + createdBy: "test-load-2@test-host", + jobState: "loading", + jobStatus: "finished", + sourceFileSize: "3B", + loadedFileSize: "3B", + resultCode: "0", + resultMessage: "Records: 2 Deleted: 0 Skipped: 0 Warnings: 0", + } + r.check(s.T(), row) + + err := s.tk.QueryToErr("SHOW LOAD DATA JOB 100") + require.ErrorContains(s.T(), err, "Job ID 100 doesn't exist") + + // repeat LOAD DATA, will get duplicate entry error + s.tk.MustContainErrMsg(sql, "Duplicate entry '1' for key 't.PRIMARY'") + idNum, err := strconv.Atoi(id) + require.NoError(s.T(), err) + nextID := strconv.Itoa(idNum + 1) + rows = s.tk.MustQuery("SHOW LOAD DATA JOB " + nextID + ";").Rows() + require.Len(s.T(), rows, 1) + row = rows[0] + + r.jobID = nextID + r.jobStatus = "failed" + r.sourceFileSize = "" + r.loadedFileSize = "" + r.resultCode = "1062" + r.resultMessage = "Duplicate entry '1' for key 't.PRIMARY'" + r.check(s.T(), row) +} + func (s *mockGCSSuite) TestInternalStatus() { s.tk.MustExec("DROP DATABASE IF EXISTS load_tsv;") s.tk.MustExec("CREATE DATABASE load_tsv;") @@ -113,11 +212,15 @@ func (s *mockGCSSuite) TestInternalStatus() { defer wg.Done() tk2 := testkit.NewTestKit(s.T(), s.store) tk2.Session().GetSessionVars().User = user + userStr := tk2.Session().GetSessionVars().User.String() + // tk @ 0:00 // create load data job record in the system table and sleep 3 seconds + time.Sleep(2 * time.Second) // tk2 @ 0:02 - jobInfos, err := GetAllJobInfo(ctx, tk2.Session(), tk2.Session().GetSessionVars().User.String()) + + jobInfos, err := GetAllJobInfo(ctx, tk2.Session(), userStr) require.NoError(s.T(), err) require.Len(s.T(), jobInfos, 1) info := jobInfos[0] @@ -125,49 +228,112 @@ func (s *mockGCSSuite) TestInternalStatus() { expected := &JobInfo{ JobID: id, User: "test-load@test-host", - DataSource: fmt.Sprintf("gs://test-tsv/t*.tsv?endpoint=%s", gcsEndpoint), + DataSource: "gs://test-tsv/t*.tsv", TableSchema: "load_tsv", TableName: "t", ImportMode: "logical", Progress: "", Status: JobPending, StatusMessage: "", + CreateTime: info.CreateTime, + StartTime: info.StartTime, + EndTime: info.EndTime, } require.Equal(s.T(), expected, info) + + rows := tk2.MustQuery("SHOW LOAD DATA JOBS;").Rows() + require.Len(s.T(), rows, 1) + row := rows[0] + r := expectedRecord{ + jobID: strconv.Itoa(int(id)), + dataSource: "gs://test-tsv/t*.tsv", + targetTable: "`load_tsv`.`t`", + importMode: "logical", + createdBy: "test-load@test-host", + jobState: "loading", + jobStatus: "pending", + sourceFileSize: "", + loadedFileSize: "", + resultCode: "0", + resultMessage: "", + } + r.checkIgnoreTimes(s.T(), row) + // tk @ 0:03 // start job and sleep 3 seconds + time.Sleep(3 * time.Second) // tk2 @ 0:05 - info, err = GetJobInfo(ctx, tk2.Session(), id) + + info, err = GetJobInfo(ctx, tk2.Session(), id, userStr) require.NoError(s.T(), err) + expected.StartTime = info.StartTime expected.Status = JobRunning require.Equal(s.T(), expected, info) + + rows = tk2.MustQuery(fmt.Sprintf("SHOW LOAD DATA JOB %d;", id)).Rows() + require.Len(s.T(), rows, 1) + row = rows[0] + r.jobStatus = "running" + r.checkIgnoreTimes(s.T(), row) + // tk @ 0:06 // commit one task and sleep 3 seconds + time.Sleep(3 * time.Second) // tk2 @ 0:08 - info, err = GetJobInfo(ctx, tk2.Session(), id) + + info, err = GetJobInfo(ctx, tk2.Session(), id, userStr) require.NoError(s.T(), err) expected.Progress = `{"SourceFileSize":2,"LoadedFileSize":0,"LoadedRowCnt":1}` require.Equal(s.T(), expected, info) + + rows = tk2.MustQuery(fmt.Sprintf("SHOW LOAD DATA JOB %d;", id)).Rows() + require.Len(s.T(), rows, 1) + row = rows[0] + r.sourceFileSize = "2B" + r.loadedFileSize = "0B" + r.checkIgnoreTimes(s.T(), row) + // tk @ 0:09 // commit one task and sleep 3 seconds + time.Sleep(3 * time.Second) // tk2 @ 0:11 - info, err = GetJobInfo(ctx, tk2.Session(), id) + + info, err = GetJobInfo(ctx, tk2.Session(), id, userStr) require.NoError(s.T(), err) expected.Progress = `{"SourceFileSize":2,"LoadedFileSize":1,"LoadedRowCnt":2}` require.Equal(s.T(), expected, info) + + rows = tk2.MustQuery(fmt.Sprintf("SHOW LOAD DATA JOB %d;", id)).Rows() + require.Len(s.T(), rows, 1) + row = rows[0] + r.loadedFileSize = "1B" + r.checkIgnoreTimes(s.T(), row) + // tk @ 0:12 // finish job + time.Sleep(3 * time.Second) // tk2 @ 0:14 - info, err = GetJobInfo(ctx, tk2.Session(), id) + + info, err = GetJobInfo(ctx, tk2.Session(), id, userStr) require.NoError(s.T(), err) expected.Status = JobFinished + expected.EndTime = info.EndTime expected.StatusMessage = "Records: 2 Deleted: 0 Skipped: 0 Warnings: 0" expected.Progress = `{"SourceFileSize":2,"LoadedFileSize":2,"LoadedRowCnt":2}` require.Equal(s.T(), expected, info) + + rows = tk2.MustQuery(fmt.Sprintf("SHOW LOAD DATA JOB %d;", id)).Rows() + require.Len(s.T(), rows, 1) + row = rows[0] + r.loadedFileSize = "2B" + r.jobStatus = "finished" + r.resultCode = "0" + r.resultMessage = "Records: 2 Deleted: 0 Skipped: 0 Warnings: 0" + r.checkIgnoreTimes(s.T(), row) }() backup := HeartBeatInSec diff --git a/executor/asyncloaddata/util.go b/executor/asyncloaddata/util.go index e2604d0ab308f..38c03024c554f 100644 --- a/executor/asyncloaddata/util.go +++ b/executor/asyncloaddata/util.go @@ -17,11 +17,14 @@ package asyncloaddata import ( "context" "fmt" + "net/url" "github.com/pingcap/errors" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/terror" + "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/dbterror/exeerrors" "github.com/pingcap/tidb/util/sqlexec" "github.com/tikv/client-go/v2/util" ) @@ -45,23 +48,32 @@ const ( result_message TEXT DEFAULT NULL, error_message TEXT DEFAULT NULL, PRIMARY KEY (job_id), + KEY (create_time), KEY (create_user));` ) -// CreateLoadDataJob creates a load data job. +// CreateLoadDataJob creates a load data job by insert a record to system table. +// The AUTO_INCREMENT value will be returned as jobID. func CreateLoadDataJob( ctx context.Context, conn sqlexec.SQLExecutor, - source, db, table string, + dataSource, db, table string, importMode string, user string, ) (int64, error) { + // remove the params in data source URI because it may contains AK/SK + u, err := url.Parse(dataSource) + if err == nil && u.Scheme != "" { + u.RawQuery = "" + u.Fragment = "" + dataSource = u.String() + } ctx = util.WithInternalSourceType(ctx, kv.InternalLoadData) - _, err := conn.ExecuteInternal(ctx, + _, err = conn.ExecuteInternal(ctx, `INSERT INTO mysql.load_data_jobs (data_source, table_schema, table_name, import_mode, create_user) VALUES (%?, %?, %?, %?, %?);`, - source, db, table, importMode, user) + dataSource, db, table, importMode, user) if err != nil { return 0, err } @@ -81,7 +93,8 @@ func CreateLoadDataJob( return rows[0].GetInt64(0), nil } -// StartJob starts a load data job. A job can only be started once. +// StartJob tries to start a not-yet-started job with jobID. It will not return +// error when there's no matched job. func StartJob( ctx context.Context, conn sqlexec.SQLExecutor, @@ -91,7 +104,7 @@ func StartJob( _, err := conn.ExecuteInternal(ctx, `UPDATE mysql.load_data_jobs SET start_time = CURRENT_TIMESTAMP(6), update_time = CURRENT_TIMESTAMP(6) - WHERE job_id = %? AND start_time IS NULL;`, + WHERE job_id = %? AND start_time IS NULL AND end_time IS NULL;`, jobID) return err } @@ -105,9 +118,12 @@ var ( ) // UpdateJobProgress updates the progress of a load data job. It should be called -// periodically as heartbeat. +// periodically as heartbeat after StartJob. // The returned bool indicates whether the keepalive is succeeded. If not, the // caller should call FailJob soon. +// TODO: Currently if the node is crashed after CreateLoadDataJob and before StartJob, +// it will always be in the status of pending. Maybe we should unify CreateLoadDataJob +// and StartJob. func UpdateJobProgress( ctx context.Context, conn sqlexec.SQLExecutor, @@ -222,6 +238,25 @@ const ( JobRunning ) +func (s JobStatus) String() string { + switch s { + case JobFailed: + return "failed" + case JobCanceled: + return "canceled" + case JobPaused: + return "paused" + case JobFinished: + return "finished" + case JobPending: + return "pending" + case JobRunning: + return "running" + default: + return "unknown JobStatus" + } +} + // GetJobStatus gets the status of a load data job. The returned error means // something wrong when querying the database. Other business logic errors are // returned as JobFailed with message. @@ -251,7 +286,7 @@ func GetJobStatus( return JobFailed, "", err } if len(rows) != 1 { - return JobFailed, fmt.Sprintf("job %d not found", jobID), nil + return JobFailed, exeerrors.ErrLoadDataJobNotFound.GenWithStackByArgs(jobID).Error(), nil } return getJobStatus(rows[0]) @@ -309,6 +344,9 @@ type JobInfo struct { Progress string Status JobStatus StatusMessage string + CreateTime types.Time + StartTime types.Time + EndTime types.Time } // GetJobInfo gets all needed information of a load data job. @@ -316,6 +354,7 @@ func GetJobInfo( ctx context.Context, conn sqlexec.SQLExecutor, jobID int64, + user string, ) (*JobInfo, error) { ctx = util.WithInternalSourceType(ctx, kv.InternalLoadData) rs, err := conn.ExecuteInternal(ctx, @@ -333,10 +372,11 @@ func GetJobInfo( table_name, import_mode, progress, - create_user + create_user, + create_time FROM mysql.load_data_jobs - WHERE job_id = %?;`, - OfflineThresholdInSec, jobID) + WHERE job_id = %? AND create_user = %?;`, + OfflineThresholdInSec, jobID, user) if err != nil { return nil, err } @@ -346,7 +386,7 @@ func GetJobInfo( return nil, err } if len(rows) != 1 { - return nil, fmt.Errorf("job %d not found", jobID) + return nil, exeerrors.ErrLoadDataJobNotFound.GenWithStackByArgs(jobID) } return getJobInfo(rows[0]) @@ -366,6 +406,9 @@ func getJobInfo(row chunk.Row) (*JobInfo, error) { ImportMode: row.GetString(10), Progress: row.GetString(11), User: row.GetString(12), + CreateTime: row.GetTime(13), + StartTime: row.GetTime(5), + EndTime: row.GetTime(2), } jobInfo.Status, jobInfo.StatusMessage, err = getJobStatus(row) if err != nil { @@ -396,7 +439,8 @@ func GetAllJobInfo( table_name, import_mode, progress, - create_user + create_user, + create_time FROM mysql.load_data_jobs WHERE create_user = %?;`, OfflineThresholdInSec, user) diff --git a/executor/asyncloaddata/util_test.go b/executor/asyncloaddata/util_test.go index c3ecca8003f77..4aa979019feef 100644 --- a/executor/asyncloaddata/util_test.go +++ b/executor/asyncloaddata/util_test.go @@ -25,10 +25,18 @@ import ( "github.com/stretchr/testify/require" ) +func checkEqualIgnoreTimes(t *testing.T, expected, got *JobInfo) { + cloned := *expected + cloned.CreateTime = got.CreateTime + cloned.StartTime = got.StartTime + cloned.EndTime = got.EndTime + require.Equal(t, &cloned, got) +} + func createJob(t *testing.T, conn sqlexec.SQLExecutor, user string) (int64, *JobInfo) { id, err := CreateLoadDataJob(context.Background(), conn, "/tmp/test.csv", "test", "t", "logical", user) require.NoError(t, err) - info, err := GetJobInfo(context.Background(), conn, id) + info, err := GetJobInfo(context.Background(), conn, id, user) require.NoError(t, err) expected := &JobInfo{ JobID: id, @@ -41,7 +49,7 @@ func createJob(t *testing.T, conn sqlexec.SQLExecutor, user string) (int64, *Job Status: JobPending, StatusMessage: "", } - require.Equal(t, expected, info) + checkEqualIgnoreTimes(t, expected, info) return id, info } @@ -66,58 +74,58 @@ func TestHappyPath(t *testing.T) { }) err := StartJob(ctx, tk.Session(), id) require.NoError(t, err) - info, err := GetJobInfo(ctx, tk.Session(), id) + info, err := GetJobInfo(ctx, tk.Session(), id, "user") require.NoError(t, err) expected.Status = JobRunning - require.Equal(t, expected, info) + checkEqualIgnoreTimes(t, expected, info) // job is periodically updated by worker ok, err := UpdateJobProgress(ctx, tk.Session(), id, "imported 10%") require.NoError(t, err) require.True(t, ok) - info, err = GetJobInfo(ctx, tk.Session(), id) + info, err = GetJobInfo(ctx, tk.Session(), id, "user") require.NoError(t, err) expected.Progress = "imported 10%" - require.Equal(t, expected, info) + checkEqualIgnoreTimes(t, expected, info) // job is paused err = UpdateJobExpectedStatus(ctx, tk.Session(), id, JobExpectedPaused) require.NoError(t, err) - info, err = GetJobInfo(ctx, tk.Session(), id) + info, err = GetJobInfo(ctx, tk.Session(), id, "user") require.NoError(t, err) expected.Status = JobPaused - require.Equal(t, expected, info) + checkEqualIgnoreTimes(t, expected, info) // worker still can update progress, maybe response to pausing is delayed ok, err = UpdateJobProgress(ctx, tk.Session(), id, "imported 20%") require.NoError(t, err) require.True(t, ok) - info, err = GetJobInfo(ctx, tk.Session(), id) + info, err = GetJobInfo(ctx, tk.Session(), id, "user") require.NoError(t, err) expected.Progress = "imported 20%" - require.Equal(t, expected, info) + checkEqualIgnoreTimes(t, expected, info) // job is resumed err = UpdateJobExpectedStatus(ctx, tk.Session(), id, JobExpectedRunning) require.NoError(t, err) - info, err = GetJobInfo(ctx, tk.Session(), id) + info, err = GetJobInfo(ctx, tk.Session(), id, "user") require.NoError(t, err) expected.Status = JobRunning - require.Equal(t, expected, info) + checkEqualIgnoreTimes(t, expected, info) // job is finished err = FinishJob(ctx, tk.Session(), id, "finished message") require.NoError(t, err) - info, err = GetJobInfo(ctx, tk.Session(), id) + info, err = GetJobInfo(ctx, tk.Session(), id, "user") require.NoError(t, err) expected.Status = JobFinished expected.StatusMessage = "finished message" - require.Equal(t, expected, info) + checkEqualIgnoreTimes(t, expected, info) } func TestKeepAlive(t *testing.T) { @@ -138,69 +146,70 @@ func TestKeepAlive(t *testing.T) { }) // before job is started, worker don't need to keepalive + // TODO:👆not correct! time.Sleep(2 * time.Second) - info, err := GetJobInfo(ctx, tk.Session(), id) + info, err := GetJobInfo(ctx, tk.Session(), id, "user") require.NoError(t, err) - require.Equal(t, expected, info) + checkEqualIgnoreTimes(t, expected, info) err = StartJob(ctx, tk.Session(), id) require.NoError(t, err) - info, err = GetJobInfo(ctx, tk.Session(), id) + info, err = GetJobInfo(ctx, tk.Session(), id, "user") require.NoError(t, err) expected.Status = JobRunning - require.Equal(t, expected, info) + checkEqualIgnoreTimes(t, expected, info) // if worker failed to keepalive, job will fail time.Sleep(2 * time.Second) - info, err = GetJobInfo(ctx, tk.Session(), id) + info, err = GetJobInfo(ctx, tk.Session(), id, "user") require.NoError(t, err) expected.Status = JobFailed expected.StatusMessage = "job expected running but the node is timeout" - require.Equal(t, expected, info) + checkEqualIgnoreTimes(t, expected, info) // after the worker is failed to keepalive, further keepalive will fail ok, err := UpdateJobProgress(ctx, tk.Session(), id, "imported 20%") require.NoError(t, err) require.False(t, ok) - info, err = GetJobInfo(ctx, tk.Session(), id) + info, err = GetJobInfo(ctx, tk.Session(), id, "user") require.NoError(t, err) - require.Equal(t, expected, info) + checkEqualIgnoreTimes(t, expected, info) // when worker fails to keepalive, before it calls FailJob, it still can // change expected status to some extent. err = UpdateJobExpectedStatus(ctx, tk.Session(), id, JobExpectedPaused) require.NoError(t, err) - info, err = GetJobInfo(ctx, tk.Session(), id) + info, err = GetJobInfo(ctx, tk.Session(), id, "user") require.NoError(t, err) expected.StatusMessage = "job expected paused but the node is timeout" - require.Equal(t, expected, info) + checkEqualIgnoreTimes(t, expected, info) err = UpdateJobExpectedStatus(ctx, tk.Session(), id, JobExpectedRunning) require.NoError(t, err) - info, err = GetJobInfo(ctx, tk.Session(), id) + info, err = GetJobInfo(ctx, tk.Session(), id, "user") require.NoError(t, err) expected.StatusMessage = "job expected running but the node is timeout" - require.Equal(t, expected, info) + checkEqualIgnoreTimes(t, expected, info) err = UpdateJobExpectedStatus(ctx, tk.Session(), id, JobExpectedCanceled) require.NoError(t, err) - info, err = GetJobInfo(ctx, tk.Session(), id) + info, err = GetJobInfo(ctx, tk.Session(), id, "user") require.NoError(t, err) expected.Status = JobCanceled expected.StatusMessage = "" - require.Equal(t, expected, info) + checkEqualIgnoreTimes(t, expected, info) // Now the worker calls FailJob err = FailJob(ctx, tk.Session(), id, "failed to keepalive") require.NoError(t, err) - info, err = GetJobInfo(ctx, tk.Session(), id) + info, err = GetJobInfo(ctx, tk.Session(), id, "user") require.NoError(t, err) expected.Status = JobFailed expected.StatusMessage = "failed to keepalive" - require.Equal(t, expected, info) + checkEqualIgnoreTimes(t, expected, info) } func TestJobIsFailedAndGetAllJobs(t *testing.T) { @@ -218,11 +227,11 @@ func TestJobIsFailedAndGetAllJobs(t *testing.T) { err := FailJob(ctx, tk.Session(), id, "failed message") require.NoError(t, err) - info, err := GetJobInfo(ctx, tk.Session(), id) + info, err := GetJobInfo(ctx, tk.Session(), id, "user") require.NoError(t, err) expected.Status = JobFailed expected.StatusMessage = "failed message" - require.Equal(t, expected, info) + checkEqualIgnoreTimes(t, expected, info) // create another job and fail it @@ -230,36 +239,36 @@ func TestJobIsFailedAndGetAllJobs(t *testing.T) { err = StartJob(ctx, tk.Session(), id) require.NoError(t, err) - info, err = GetJobInfo(ctx, tk.Session(), id) + info, err = GetJobInfo(ctx, tk.Session(), id, "user") require.NoError(t, err) expected.Status = JobRunning - require.Equal(t, expected, info) + checkEqualIgnoreTimes(t, expected, info) err = FailJob(ctx, tk.Session(), id, "failed message") require.NoError(t, err) - info, err = GetJobInfo(ctx, tk.Session(), id) + info, err = GetJobInfo(ctx, tk.Session(), id, "user") require.NoError(t, err) expected.Status = JobFailed expected.StatusMessage = "failed message" - require.Equal(t, expected, info) + checkEqualIgnoreTimes(t, expected, info) // test change expected status of a failed job. err = UpdateJobExpectedStatus(ctx, tk.Session(), id, JobExpectedPaused) require.NoError(t, err) - info, err = GetJobInfo(ctx, tk.Session(), id) + info, err = GetJobInfo(ctx, tk.Session(), id, "user") require.NoError(t, err) - require.Equal(t, expected, info) + checkEqualIgnoreTimes(t, expected, info) err = UpdateJobExpectedStatus(ctx, tk.Session(), id, JobExpectedRunning) require.NoError(t, err) - info, err = GetJobInfo(ctx, tk.Session(), id) + info, err = GetJobInfo(ctx, tk.Session(), id, "user") require.NoError(t, err) - require.Equal(t, expected, info) + checkEqualIgnoreTimes(t, expected, info) err = UpdateJobExpectedStatus(ctx, tk.Session(), id, JobExpectedCanceled) require.NoError(t, err) - info, err = GetJobInfo(ctx, tk.Session(), id) + info, err = GetJobInfo(ctx, tk.Session(), id, "user") require.NoError(t, err) - require.Equal(t, expected, info) + checkEqualIgnoreTimes(t, expected, info) // add job of another user and test GetAllJobInfo @@ -275,6 +284,9 @@ func TestJobIsFailedAndGetAllJobs(t *testing.T) { require.NoError(t, err) require.Equal(t, 1, len(jobs)) require.Equal(t, JobPending, jobs[0].Status) + + _, err = GetJobInfo(ctx, tk.Session(), jobs[0].JobID, "wrong_user") + require.ErrorContains(t, err, "doesn't exist") } func TestGetJobStatus(t *testing.T) { @@ -323,5 +335,21 @@ func TestGetJobStatus(t *testing.T) { status, msg, err = GetJobStatus(ctx, tk.Session(), id+1) require.NoError(t, err) require.Equal(t, JobFailed, status) - require.Contains(t, msg, "not found") + require.Contains(t, msg, "doesn't exist") +} + +func TestCreateLoadDataJobRedact(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec(CreateLoadDataJobs) + defer tk.MustExec("DROP TABLE IF EXISTS mysql.load_data_jobs") + ctx := context.Background() + + _, err := CreateLoadDataJob(ctx, tk.Session(), + "s3://bucket/a.csv?access-key=hideme&secret-access-key=hideme", + "db", "table", "mode", "user") + require.NoError(t, err) + result := tk.MustQuery("SELECT * FROM mysql.load_data_jobs;") + result.CheckContain("a.csv") + result.CheckNotContain("hideme") } diff --git a/executor/builder.go b/executor/builder.go index 21bcdd0043aba..a91d10344b2dd 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -795,6 +795,7 @@ func (b *executorBuilder) buildShow(v *plannercore.PhysicalShow) Executor { GlobalScope: v.GlobalScope, Extended: v.Extended, Extractor: v.Extractor, + LoadDataJobID: v.LoadDataJobID, } if e.Tp == ast.ShowMasterStatus { // show master status need start ts. diff --git a/executor/importer/import.go b/executor/importer/import.go index de379fd7380f9..75bb60bd03b0b 100644 --- a/executor/importer/import.go +++ b/executor/importer/import.go @@ -38,7 +38,8 @@ const ( // parquet LoadDataFormatParquet = "parquet" - logicalImportMode = "logical" // tidb backend + // LogicalImportMode represents the import mode is SQL-like. + LogicalImportMode = "logical" // tidb backend physicalImportMode = "physical" // local backend unlimitedWriteSpeed = config.ByteSize(math.MaxInt64) minDiskQuota = config.ByteSize(10 << 30) // 10GiB @@ -235,7 +236,7 @@ func (e *LoadDataController) initDefaultOptions() { threadCnt = int(math.Max(1, float64(threadCnt)*0.75)) } - e.importMode = logicalImportMode + e.importMode = LogicalImportMode _ = e.diskQuota.UnmarshalText([]byte("50GiB")) // todo confirm with pm e.checksum = config.OpLevelRequired e.addIndex = true @@ -277,13 +278,13 @@ func (e *LoadDataController) initOptions(seCtx sessionctx.Context, options []*pl return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name) } v = strings.ToLower(v) - if v != logicalImportMode && v != physicalImportMode { + if v != LogicalImportMode && v != physicalImportMode { return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name) } e.importMode = v } - if e.importMode == logicalImportMode { + if e.importMode == LogicalImportMode { // some options are only allowed in physical mode for _, opt := range specifiedOptions { if _, ok := optionsForPhysicalImport[opt.Name]; ok { diff --git a/executor/importer/import_test.go b/executor/importer/import_test.go index 5ad4fd8d30c67..fe5e4b84943d0 100644 --- a/executor/importer/import_test.go +++ b/executor/importer/import_test.go @@ -33,7 +33,7 @@ import ( func TestInitDefaultOptions(t *testing.T) { e := LoadDataController{} e.initDefaultOptions() - require.Equal(t, logicalImportMode, e.importMode) + require.Equal(t, LogicalImportMode, e.importMode) require.Equal(t, config.ByteSize(50<<30), e.diskQuota) require.Equal(t, config.OpLevelRequired, e.checksum) require.Equal(t, true, e.addIndex) diff --git a/executor/load_data.go b/executor/load_data.go index 4a481fba22ade..3a0edaddfc675 100644 --- a/executor/load_data.go +++ b/executor/load_data.go @@ -350,8 +350,14 @@ func (e *LoadDataWorker) Load( } jobID, err = asyncloaddata.CreateLoadDataJob( - ctx, sqlExec, e.GetInfilePath(), e.controller.SchemaName, e.table.Meta().Name.O, - "logical", e.Ctx.GetSessionVars().User.String()) + ctx, + sqlExec, + e.GetInfilePath(), + e.controller.SchemaName, + e.table.Meta().Name.O, + importer.LogicalImportMode, + e.Ctx.GetSessionVars().User.String(), + ) if err != nil { return err } @@ -367,8 +373,11 @@ func (e *LoadDataWorker) Load( return } errMsg := err.Error() - if errImpl, ok := err.(*errors.Error); ok { - errMsg = terror.ToSQLError(errImpl).Error() + if errImpl, ok := errors.Cause(err).(*errors.Error); ok { + b, marshalErr := errImpl.MarshalJSON() + if marshalErr == nil { + errMsg = string(b) + } } err2 := asyncloaddata.FailJob(ctx, sqlExec, jobID, errMsg) diff --git a/executor/show.go b/executor/show.go index aa4b20484f950..8db9ece118334 100644 --- a/executor/show.go +++ b/executor/show.go @@ -25,12 +25,15 @@ import ( "strings" "time" + "github.com/docker/go-units" "github.com/pingcap/errors" "github.com/pingcap/tidb/bindinfo" + "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/domain/infosync" + "github.com/pingcap/tidb/executor/asyncloaddata" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" @@ -66,12 +69,14 @@ import ( "github.com/pingcap/tidb/util/format" "github.com/pingcap/tidb/util/hack" "github.com/pingcap/tidb/util/hint" + "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/mathutil" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/sem" "github.com/pingcap/tidb/util/set" "github.com/pingcap/tidb/util/sqlexec" "github.com/pingcap/tidb/util/stringutil" + "go.uber.org/zap" "golang.org/x/exp/slices" ) @@ -104,6 +109,8 @@ type ShowExec struct { IfNotExists bool // Used for `show create database if not exists` GlobalScope bool // GlobalScope is used by show variables Extended bool // Used for `show extended columns from ...` + + LoadDataJobID *int64 } type showTableRegionRowItem struct { @@ -273,6 +280,8 @@ func (e *ShowExec) fetchAll(ctx context.Context) error { return e.fetchShowPlacementForPartition(ctx) case ast.ShowSessionStates: return e.fetchShowSessionStates(ctx) + case ast.ShowLoadDataJobs: + return e.fetchShowLoadDataJobs(ctx) } return nil } @@ -2134,6 +2143,67 @@ func (e *ShowExec) fetchShowSessionStates(ctx context.Context) error { return nil } +// fetchShowLoadDataJobs fills the result with the schema +// {"Job_ID", "Create_Time", "Start_Time", "End_Time", +// "Data_Source", "Target_Table", "Import_Mode", "Created_By", +// "Job_State", "Job_Status", "Source_File_Size", "Loaded_File_Size", +// "Result_Code", "Result_Message"}. +func (e *ShowExec) fetchShowLoadDataJobs(ctx context.Context) error { + exec := e.ctx.(sqlexec.SQLExecutor) + handleOneInfo := func(info *asyncloaddata.JobInfo) { + e.result.AppendInt64(0, info.JobID) + e.result.AppendTime(1, info.CreateTime) + e.result.AppendTime(2, info.StartTime) + e.result.AppendTime(3, info.EndTime) + e.result.AppendString(4, info.DataSource) + table := utils.EncloseDBAndTable(info.TableSchema, info.TableName) + e.result.AppendString(5, table) + e.result.AppendString(6, info.ImportMode) + e.result.AppendString(7, info.User) + e.result.AppendString(8, "loading") + e.result.AppendString(9, info.Status.String()) + progress, err2 := asyncloaddata.ProgressFromJSON([]byte(info.Progress)) + if err2 != nil { + // maybe empty progress + if info.Progress != "" { + logutil.Logger(ctx).Warn("invalid progress", zap.String("progress", info.Progress)) + } + e.result.AppendNull(10) + e.result.AppendNull(11) + } else { + e.result.AppendString(10, units.HumanSize(float64(progress.SourceFileSize))) + e.result.AppendString(11, units.HumanSize(float64(progress.LoadedFileSize))) + } + terr := new(terror.Error) + err2 = terr.UnmarshalJSON([]byte(info.StatusMessage)) + if err2 == nil { + e.result.AppendInt64(12, int64(terr.Code())) + e.result.AppendString(13, terr.GetMsg()) + return + } + e.result.AppendInt64(12, 0) + e.result.AppendString(13, info.StatusMessage) + } + + if e.LoadDataJobID != nil { + info, err := asyncloaddata.GetJobInfo(ctx, exec, *e.LoadDataJobID, e.ctx.GetSessionVars().User.String()) + if err != nil { + return err + } + handleOneInfo(info) + return nil + } + infos, err := asyncloaddata.GetAllJobInfo(ctx, exec, e.ctx.GetSessionVars().User.String()) + if err != nil { + return err + } + for _, info := range infos { + handleOneInfo(info) + } + // TODO: does not support filtering for now + return nil +} + // tryFillViewColumnType fill the columns type info of a view. // Because view's underlying table's column could change or recreate, so view's column type may change over time. // To avoid this situation we need to generate a logical plan and extract current column types from Schema. diff --git a/planner/core/logical_plans.go b/planner/core/logical_plans.go index 7b276e2adc1c8..3675a738d0622 100644 --- a/planner/core/logical_plans.go +++ b/planner/core/logical_plans.go @@ -1926,6 +1926,8 @@ type ShowContents struct { GlobalScope bool // Used by show variables. Extended bool // Used for `show extended columns from ...` Limit *ast.Limit // Used for limit Result Set row number. + + LoadDataJobID *int64 // Used for SHOW LOAD DATA JOB } const emptyShowContentsSize = int64(unsafe.Sizeof(ShowContents{})) diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index b4cda4ec7c762..636a6d3bfd4b6 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -3209,6 +3209,7 @@ func (b *PlanBuilder) buildShow(ctx context.Context, show *ast.ShowStmt) (Plan, GlobalScope: show.GlobalScope, Extended: show.Extended, Limit: show.Limit, + LoadDataJobID: show.LoadDataJobID, }, }.Init(b.ctx) isView := false @@ -5206,6 +5207,15 @@ func buildShowSchema(s *ast.ShowStmt, isView bool, isSequence bool) (schema *exp case ast.ShowSessionStates: names = []string{"Session_states", "Session_token"} ftypes = []byte{mysql.TypeJSON, mysql.TypeJSON} + case ast.ShowLoadDataJobs: + names = []string{"Job_ID", "Create_Time", "Start_Time", "End_Time", + "Data_Source", "Target_Table", "Import_Mode", "Created_By", + "Job_State", "Job_Status", "Source_File_Size", "Loaded_File_Size", + "Result_Code", "Result_Message"} + ftypes = []byte{mysql.TypeLonglong, mysql.TypeTimestamp, mysql.TypeTimestamp, mysql.TypeTimestamp, + mysql.TypeString, mysql.TypeString, mysql.TypeString, mysql.TypeString, + mysql.TypeString, mysql.TypeString, mysql.TypeString, mysql.TypeString, + mysql.TypeLonglong, mysql.TypeString} } schema = expression.NewSchema(make([]*expression.Column, 0, len(names))...) diff --git a/util/dbterror/exeerrors/errors.go b/util/dbterror/exeerrors/errors.go index cdd0a7c8e5e38..5dda589f487eb 100644 --- a/util/dbterror/exeerrors/errors.go +++ b/util/dbterror/exeerrors/errors.go @@ -91,4 +91,5 @@ var ( ErrInvalidOptionVal = dbterror.ClassExecutor.NewStd(mysql.ErrInvalidOptionVal) ErrDuplicateOption = dbterror.ClassExecutor.NewStd(mysql.ErrDuplicateOption) ErrLoadDataUnsupportedOption = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataUnsupportedOption) + ErrLoadDataJobNotFound = dbterror.ClassExecutor.NewStd(mysql.ErrLoadDataJobNotFound) )