Skip to content

Commit

Permalink
*: implement SHOW LOAD DATA JOB (#42118)
Browse files Browse the repository at this point in the history
ref #40499
  • Loading branch information
lance6716 authored Mar 13, 2023
1 parent ed1b4cf commit 7c3a22e
Show file tree
Hide file tree
Showing 16 changed files with 414 additions and 71 deletions.
1 change: 1 addition & 0 deletions errno/errcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -1058,6 +1058,7 @@ const (
ErrInvalidOptionVal = 8164
ErrDuplicateOption = 8165
ErrLoadDataUnsupportedOption = 8166
ErrLoadDataJobNotFound = 8170

// Error codes used by TiDB ddl package
ErrUnsupportedDDLOperation = 8200
Expand Down
1 change: 1 addition & 0 deletions errno/errname.go
Original file line number Diff line number Diff line change
Expand Up @@ -1052,6 +1052,7 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{
ErrInvalidOptionVal: mysql.Message("Invalid option value for %s", nil),
ErrDuplicateOption: mysql.Message("Option %s specified more than once", nil),
ErrLoadDataUnsupportedOption: mysql.Message("Unsupported option %s for %s import mode", nil),
ErrLoadDataJobNotFound: mysql.Message("Job ID %d doesn't exist", nil),

ErrWarnOptimizerHintInvalidInteger: mysql.Message("integer value is out of range in '%s'", nil),
ErrWarnOptimizerHintUnsupportedHint: mysql.Message("Optimizer hint %s is not supported by TiDB and is ignored", nil),
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1731,6 +1731,11 @@ error = '''
Unsupported option %s for %s import mode
'''

["executor:8170"]
error = '''
Job ID %d doesn't exist
'''

["executor:8212"]
error = '''
Failed to split region ranges: %s
Expand Down
2 changes: 2 additions & 0 deletions executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ go_library(
"//br/pkg/lightning/mydump",
"//br/pkg/storage",
"//br/pkg/task",
"//br/pkg/utils",
"//config",
"//ddl",
"//ddl/label",
Expand Down Expand Up @@ -212,6 +213,7 @@ go_library(
"//util/topsql/state",
"//util/tracing",
"@com_github_burntsushi_toml//:toml",
"@com_github_docker_go_units//:go-units",
"@com_github_gogo_protobuf//proto",
"@com_github_ngaut_pools//:pools",
"@com_github_opentracing_basictracer_go//:basictracer-go",
Expand Down
2 changes: 2 additions & 0 deletions executor/asyncloaddata/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ go_library(
deps = [
"//kv",
"//parser/terror",
"//types",
"//util/chunk",
"//util/dbterror/exeerrors",
"//util/sqlexec",
"@com_github_pingcap_errors//:errors",
"@com_github_tikv_client_go_v2//util",
Expand Down
178 changes: 172 additions & 6 deletions executor/asyncloaddata/show_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package asyncloaddata_test
import (
"context"
"fmt"
"strconv"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -79,6 +80,104 @@ func (s *mockGCSSuite) enableFailpoint(path, term string) {
})
}

type expectedRecord struct {
jobID string
dataSource string
targetTable string
importMode string
createdBy string
jobState string
jobStatus string
sourceFileSize string
loadedFileSize string
resultCode string
resultMessage string
}

func (r *expectedRecord) checkIgnoreTimes(t *testing.T, row []interface{}) {
require.Equal(t, r.jobID, row[0])
require.Equal(t, r.dataSource, row[4])
require.Equal(t, r.targetTable, row[5])
require.Equal(t, r.importMode, row[6])
require.Equal(t, r.createdBy, row[7])
require.Equal(t, r.jobState, row[8])
require.Equal(t, r.jobStatus, row[9])
require.Equal(t, r.sourceFileSize, row[10])
require.Equal(t, r.loadedFileSize, row[11])
require.Equal(t, r.resultCode, row[12])
require.Equal(t, r.resultMessage, row[13])
}

func (r *expectedRecord) check(t *testing.T, row []interface{}) {
r.checkIgnoreTimes(t, row)
require.NotEmpty(t, row[1])
require.NotEmpty(t, row[2])
require.NotEmpty(t, row[3])
}

func (s *mockGCSSuite) TestSimpleShowLoadDataJobs() {
s.tk.MustExec("DROP DATABASE IF EXISTS test_show;")
s.tk.MustExec("CREATE DATABASE test_show;")
s.tk.MustExec("CREATE TABLE test_show.t (i INT PRIMARY KEY);")
s.server.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{
BucketName: "test-show",
Name: "t.tsv",
},
Content: []byte(`1
2`),
})

user := &auth.UserIdentity{
AuthUsername: "test-load-2",
AuthHostname: "test-host",
}
s.tk.Session().GetSessionVars().User = user

sql := fmt.Sprintf(`LOAD DATA INFILE 'gs://test-show/t.tsv?endpoint=%s'
INTO TABLE test_show.t;`, gcsEndpoint)
s.tk.MustExec(sql)

rows := s.tk.MustQuery("SHOW LOAD DATA JOBS;").Rows()
require.Len(s.T(), rows, 1)
row := rows[0]
id := row[0].(string)
r := expectedRecord{
jobID: id,
dataSource: "gs://test-show/t.tsv",
targetTable: "`test_show`.`t`",
importMode: "logical",
createdBy: "test-load-2@test-host",
jobState: "loading",
jobStatus: "finished",
sourceFileSize: "3B",
loadedFileSize: "3B",
resultCode: "0",
resultMessage: "Records: 2 Deleted: 0 Skipped: 0 Warnings: 0",
}
r.check(s.T(), row)

err := s.tk.QueryToErr("SHOW LOAD DATA JOB 100")
require.ErrorContains(s.T(), err, "Job ID 100 doesn't exist")

// repeat LOAD DATA, will get duplicate entry error
s.tk.MustContainErrMsg(sql, "Duplicate entry '1' for key 't.PRIMARY'")
idNum, err := strconv.Atoi(id)
require.NoError(s.T(), err)
nextID := strconv.Itoa(idNum + 1)
rows = s.tk.MustQuery("SHOW LOAD DATA JOB " + nextID + ";").Rows()
require.Len(s.T(), rows, 1)
row = rows[0]

r.jobID = nextID
r.jobStatus = "failed"
r.sourceFileSize = "<nil>"
r.loadedFileSize = "<nil>"
r.resultCode = "1062"
r.resultMessage = "Duplicate entry '1' for key 't.PRIMARY'"
r.check(s.T(), row)
}

func (s *mockGCSSuite) TestInternalStatus() {
s.tk.MustExec("DROP DATABASE IF EXISTS load_tsv;")
s.tk.MustExec("CREATE DATABASE load_tsv;")
Expand Down Expand Up @@ -113,61 +212,128 @@ func (s *mockGCSSuite) TestInternalStatus() {
defer wg.Done()
tk2 := testkit.NewTestKit(s.T(), s.store)
tk2.Session().GetSessionVars().User = user
userStr := tk2.Session().GetSessionVars().User.String()

// tk @ 0:00
// create load data job record in the system table and sleep 3 seconds

time.Sleep(2 * time.Second)
// tk2 @ 0:02
jobInfos, err := GetAllJobInfo(ctx, tk2.Session(), tk2.Session().GetSessionVars().User.String())

jobInfos, err := GetAllJobInfo(ctx, tk2.Session(), userStr)
require.NoError(s.T(), err)
require.Len(s.T(), jobInfos, 1)
info := jobInfos[0]
id := info.JobID
expected := &JobInfo{
JobID: id,
User: "test-load@test-host",
DataSource: fmt.Sprintf("gs://test-tsv/t*.tsv?endpoint=%s", gcsEndpoint),
DataSource: "gs://test-tsv/t*.tsv",
TableSchema: "load_tsv",
TableName: "t",
ImportMode: "logical",
Progress: "",
Status: JobPending,
StatusMessage: "",
CreateTime: info.CreateTime,
StartTime: info.StartTime,
EndTime: info.EndTime,
}
require.Equal(s.T(), expected, info)

rows := tk2.MustQuery("SHOW LOAD DATA JOBS;").Rows()
require.Len(s.T(), rows, 1)
row := rows[0]
r := expectedRecord{
jobID: strconv.Itoa(int(id)),
dataSource: "gs://test-tsv/t*.tsv",
targetTable: "`load_tsv`.`t`",
importMode: "logical",
createdBy: "test-load@test-host",
jobState: "loading",
jobStatus: "pending",
sourceFileSize: "<nil>",
loadedFileSize: "<nil>",
resultCode: "0",
resultMessage: "",
}
r.checkIgnoreTimes(s.T(), row)

// tk @ 0:03
// start job and sleep 3 seconds

time.Sleep(3 * time.Second)
// tk2 @ 0:05
info, err = GetJobInfo(ctx, tk2.Session(), id)

info, err = GetJobInfo(ctx, tk2.Session(), id, userStr)
require.NoError(s.T(), err)
expected.StartTime = info.StartTime
expected.Status = JobRunning
require.Equal(s.T(), expected, info)

rows = tk2.MustQuery(fmt.Sprintf("SHOW LOAD DATA JOB %d;", id)).Rows()
require.Len(s.T(), rows, 1)
row = rows[0]
r.jobStatus = "running"
r.checkIgnoreTimes(s.T(), row)

// tk @ 0:06
// commit one task and sleep 3 seconds

time.Sleep(3 * time.Second)
// tk2 @ 0:08
info, err = GetJobInfo(ctx, tk2.Session(), id)

info, err = GetJobInfo(ctx, tk2.Session(), id, userStr)
require.NoError(s.T(), err)
expected.Progress = `{"SourceFileSize":2,"LoadedFileSize":0,"LoadedRowCnt":1}`
require.Equal(s.T(), expected, info)

rows = tk2.MustQuery(fmt.Sprintf("SHOW LOAD DATA JOB %d;", id)).Rows()
require.Len(s.T(), rows, 1)
row = rows[0]
r.sourceFileSize = "2B"
r.loadedFileSize = "0B"
r.checkIgnoreTimes(s.T(), row)

// tk @ 0:09
// commit one task and sleep 3 seconds

time.Sleep(3 * time.Second)
// tk2 @ 0:11
info, err = GetJobInfo(ctx, tk2.Session(), id)

info, err = GetJobInfo(ctx, tk2.Session(), id, userStr)
require.NoError(s.T(), err)
expected.Progress = `{"SourceFileSize":2,"LoadedFileSize":1,"LoadedRowCnt":2}`
require.Equal(s.T(), expected, info)

rows = tk2.MustQuery(fmt.Sprintf("SHOW LOAD DATA JOB %d;", id)).Rows()
require.Len(s.T(), rows, 1)
row = rows[0]
r.loadedFileSize = "1B"
r.checkIgnoreTimes(s.T(), row)

// tk @ 0:12
// finish job

time.Sleep(3 * time.Second)
// tk2 @ 0:14
info, err = GetJobInfo(ctx, tk2.Session(), id)

info, err = GetJobInfo(ctx, tk2.Session(), id, userStr)
require.NoError(s.T(), err)
expected.Status = JobFinished
expected.EndTime = info.EndTime
expected.StatusMessage = "Records: 2 Deleted: 0 Skipped: 0 Warnings: 0"
expected.Progress = `{"SourceFileSize":2,"LoadedFileSize":2,"LoadedRowCnt":2}`
require.Equal(s.T(), expected, info)

rows = tk2.MustQuery(fmt.Sprintf("SHOW LOAD DATA JOB %d;", id)).Rows()
require.Len(s.T(), rows, 1)
row = rows[0]
r.loadedFileSize = "2B"
r.jobStatus = "finished"
r.resultCode = "0"
r.resultMessage = "Records: 2 Deleted: 0 Skipped: 0 Warnings: 0"
r.checkIgnoreTimes(s.T(), row)
}()

backup := HeartBeatInSec
Expand Down
Loading

0 comments on commit 7c3a22e

Please sign in to comment.