Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: wrap the sessionctx to public delete range logic to BR (#48050) #49247

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 29 additions & 67 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ type Client struct {
// this feature is controlled by flag with-sys-table
fullClusterRestore bool
// the query to insert rows into table `gc_delete_range`, lack of ts.
deleteRangeQuery []string
deleteRangeQueryCh chan string
deleteRangeQuery []*stream.PreDelRangeQuery
deleteRangeQueryCh chan *stream.PreDelRangeQuery
deleteRangeQueryWaitGroup sync.WaitGroup

// see RestoreCommonConfig.WithSysTable
Expand Down Expand Up @@ -203,8 +203,8 @@ func NewRestoreClient(
tlsConf: tlsConf,
keepaliveConf: keepaliveConf,
switchCh: make(chan struct{}),
deleteRangeQuery: make([]string, 0),
deleteRangeQueryCh: make(chan string, 10),
deleteRangeQuery: make([]*stream.PreDelRangeQuery, 0),
deleteRangeQueryCh: make(chan *stream.PreDelRangeQuery, 10),
}
}

Expand Down Expand Up @@ -2816,7 +2816,7 @@ func (rc *Client) InitSchemasReplaceForDDL(

rp := stream.NewSchemasReplace(
dbReplaces, needConstructIdMap, cfg.TiFlashRecorder, rc.currentTS, cfg.TableFilter, rc.GenGlobalID, rc.GenGlobalIDs,
rc.InsertDeleteRangeForTable, rc.InsertDeleteRangeForIndex)
rc.RecordDeleteRange)
return rp, nil
}

Expand Down Expand Up @@ -3434,66 +3434,8 @@ NEXTSQL:
return nil
}

const (
insertDeleteRangeSQLPrefix = `INSERT IGNORE INTO mysql.gc_delete_range VALUES `
insertDeleteRangeSQLValue = "(%d, %d, '%s', '%s', %%[1]d)"

batchInsertDeleteRangeSize = 256
)

// InsertDeleteRangeForTable generates query to insert table delete job into table `gc_delete_range`.
func (rc *Client) InsertDeleteRangeForTable(jobID int64, tableIDs []int64) {
var elementID int64 = 1
var tableID int64
for i := 0; i < len(tableIDs); i += batchInsertDeleteRangeSize {
batchEnd := len(tableIDs)
if batchEnd > i+batchInsertDeleteRangeSize {
batchEnd = i + batchInsertDeleteRangeSize
}

var buf strings.Builder
buf.WriteString(insertDeleteRangeSQLPrefix)
for j := i; j < batchEnd; j++ {
tableID = tableIDs[j]
startKey := tablecodec.EncodeTablePrefix(tableID)
endKey := tablecodec.EncodeTablePrefix(tableID + 1)
startKeyEncoded := hex.EncodeToString(startKey)
endKeyEncoded := hex.EncodeToString(endKey)
buf.WriteString(fmt.Sprintf(insertDeleteRangeSQLValue, jobID, elementID, startKeyEncoded, endKeyEncoded))
if j != batchEnd-1 {
buf.WriteString(",")
}
elementID += 1
}
rc.deleteRangeQueryCh <- buf.String()
}
}

// InsertDeleteRangeForIndex generates query to insert index delete job into table `gc_delete_range`.
func (rc *Client) InsertDeleteRangeForIndex(jobID int64, elementID *int64, tableID int64, indexIDs []int64) {
var indexID int64
for i := 0; i < len(indexIDs); i += batchInsertDeleteRangeSize {
batchEnd := len(indexIDs)
if batchEnd > i+batchInsertDeleteRangeSize {
batchEnd = i + batchInsertDeleteRangeSize
}

var buf strings.Builder
buf.WriteString(insertDeleteRangeSQLPrefix)
for j := i; j < batchEnd; j++ {
indexID = indexIDs[j]
startKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID)
endKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID+1)
startKeyEncoded := hex.EncodeToString(startKey)
endKeyEncoded := hex.EncodeToString(endKey)
buf.WriteString(fmt.Sprintf(insertDeleteRangeSQLValue, jobID, *elementID, startKeyEncoded, endKeyEncoded))
if j != batchEnd-1 {
buf.WriteString(",")
}
*elementID += 1
}
rc.deleteRangeQueryCh <- buf.String()
}
func (rc *Client) RecordDeleteRange(sql *stream.PreDelRangeQuery) {
rc.deleteRangeQueryCh <- sql
}

// use channel to save the delete-range query to make it thread-safety.
Expand Down Expand Up @@ -3524,16 +3466,36 @@ func (rc *Client) InsertGCRows(ctx context.Context) error {
if err != nil {
return errors.Trace(err)
}
jobIDMap := make(map[int64]int64)
for _, query := range rc.deleteRangeQuery {
if err := rc.db.se.ExecuteInternal(ctx, fmt.Sprintf(query, ts)); err != nil {
paramsList := make([]interface{}, 0, len(query.ParamsList)*5)
for _, params := range query.ParamsList {
newJobID, exists := jobIDMap[params.JobID]
if !exists {
newJobID, err = rc.GenGlobalID(ctx)
if err != nil {
return errors.Trace(err)
}
jobIDMap[params.JobID] = newJobID
}
log.Info("insert into the delete range",
zap.Int64("jobID", newJobID),
zap.Int64("elemID", params.ElemID),
zap.String("startKey", params.StartKey),
zap.String("endKey", params.EndKey),
zap.Uint64("ts", ts))
// (job_id, elem_id, start_key, end_key, ts)
paramsList = append(paramsList, newJobID, params.ElemID, params.StartKey, params.EndKey, ts)
}
if err := rc.db.se.ExecuteInternal(ctx, query.Sql, paramsList...); err != nil {
return errors.Trace(err)
}
}
return nil
}

// only for unit test
func (rc *Client) GetGCRows() []string {
func (rc *Client) GetGCRows() []*stream.PreDelRangeQuery {
close(rc.deleteRangeQueryCh)
rc.deleteRangeQueryWaitGroup.Wait()
return rc.deleteRangeQuery
Expand Down
43 changes: 32 additions & 11 deletions br/pkg/restore/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,18 +619,40 @@ func TestDeleteRangeQuery(t *testing.T) {

client.RunGCRowsLoader(ctx)

client.InsertDeleteRangeForTable(2, []int64{3})
client.InsertDeleteRangeForTable(4, []int64{5, 6})

elementID := int64(1)
client.InsertDeleteRangeForIndex(7, &elementID, 8, []int64{1})
client.InsertDeleteRangeForIndex(9, &elementID, 10, []int64{1, 2})
client.RecordDeleteRange(&stream.PreDelRangeQuery{
Sql: "INSERT IGNORE INTO mysql.gc_delete_range VALUES (%?, %?, %?, %?, %?), (%?, %?, %?, %?, %?)",
ParamsList: []stream.DelRangeParams{
{
JobID: 1,
ElemID: 1,
StartKey: "a",
EndKey: "b",
},
{
JobID: 1,
ElemID: 2,
StartKey: "b",
EndKey: "c",
},
},
})

querys := client.GetGCRows()
require.Equal(t, querys[0], "INSERT IGNORE INTO mysql.gc_delete_range VALUES (2, 1, '748000000000000003', '748000000000000004', %[1]d)")
require.Equal(t, querys[1], "INSERT IGNORE INTO mysql.gc_delete_range VALUES (4, 1, '748000000000000005', '748000000000000006', %[1]d),(4, 2, '748000000000000006', '748000000000000007', %[1]d)")
require.Equal(t, querys[2], "INSERT IGNORE INTO mysql.gc_delete_range VALUES (7, 1, '7480000000000000085f698000000000000001', '7480000000000000085f698000000000000002', %[1]d)")
require.Equal(t, querys[3], "INSERT IGNORE INTO mysql.gc_delete_range VALUES (9, 2, '74800000000000000a5f698000000000000001', '74800000000000000a5f698000000000000002', %[1]d),(9, 3, '74800000000000000a5f698000000000000002', '74800000000000000a5f698000000000000003', %[1]d)")
require.Equal(t, len(querys), 1)
require.Equal(t, querys[0].Sql, "INSERT IGNORE INTO mysql.gc_delete_range VALUES (%?, %?, %?, %?, %?), (%?, %?, %?, %?, %?)")
require.Equal(t, len(querys[0].ParamsList), 2)
require.Equal(t, querys[0].ParamsList[0], stream.DelRangeParams{
JobID: 1,
ElemID: 1,
StartKey: "a",
EndKey: "b",
})
require.Equal(t, querys[0].ParamsList[1], stream.DelRangeParams{
JobID: 1,
ElemID: 2,
StartKey: "b",
EndKey: "c",
})
}

func MockEmptySchemasReplace() *stream.SchemasReplace {
Expand All @@ -644,7 +666,6 @@ func MockEmptySchemasReplace() *stream.SchemasReplace {
nil,
nil,
nil,
nil,
)
}

Expand Down
4 changes: 2 additions & 2 deletions br/pkg/restore/ingestrec/ingest_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ func notSynced(job *model.Job, isSubJob bool) bool {
return (job.State != model.JobStateSynced) && !(isSubJob && job.State == model.JobStateDone)
}

// AddJob firstly filters the ingest index add operation job, and records it into IngestRecorder.
func (i *IngestRecorder) AddJob(job *model.Job, isSubJob bool) error {
// TryAddJob firstly filters the ingest index add operation job, and records it into IngestRecorder.
func (i *IngestRecorder) TryAddJob(job *model.Job, isSubJob bool) error {
if job == nil || notIngestJob(job) || notAddIndexJob(job) || notSynced(job, isSubJob) {
return nil
}
Expand Down
16 changes: 8 additions & 8 deletions br/pkg/restore/ingestrec/ingest_recorder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func TestAddIngestRecorder(t *testing.T) {
}
recorder := ingestrec.New()
// no ingest job, should ignore it
err := recorder.AddJob(fakeJob(
err := recorder.TryAddJob(fakeJob(
model.ReorgTypeTxn,
model.ActionAddIndex,
model.JobStateSynced,
Expand All @@ -154,7 +154,7 @@ func TestAddIngestRecorder(t *testing.T) {
require.NoError(t, err)

// no add-index job, should ignore it
err = recorder.AddJob(fakeJob(
err = recorder.TryAddJob(fakeJob(
model.ReorgTypeLitMerge,
model.ActionDropIndex,
model.JobStateSynced,
Expand All @@ -170,7 +170,7 @@ func TestAddIngestRecorder(t *testing.T) {
require.NoError(t, err)

// no synced job, should ignore it
err = recorder.AddJob(fakeJob(
err = recorder.TryAddJob(fakeJob(
model.ReorgTypeLitMerge,
model.ActionAddIndex,
model.JobStateRollbackDone,
Expand All @@ -188,7 +188,7 @@ func TestAddIngestRecorder(t *testing.T) {
{
recorder := ingestrec.New()
// a normal ingest add index job
err = recorder.AddJob(fakeJob(
err = recorder.TryAddJob(fakeJob(
model.ReorgTypeLitMerge,
model.ActionAddIndex,
model.JobStateSynced,
Expand All @@ -209,7 +209,7 @@ func TestAddIngestRecorder(t *testing.T) {
{
recorder := ingestrec.New()
// a normal ingest add primary index job
err = recorder.AddJob(fakeJob(
err = recorder.TryAddJob(fakeJob(
model.ReorgTypeLitMerge,
model.ActionAddPrimaryKey,
model.JobStateSynced,
Expand All @@ -229,7 +229,7 @@ func TestAddIngestRecorder(t *testing.T) {

{
// a sub job as add primary index job
err = recorder.AddJob(fakeJob(
err = recorder.TryAddJob(fakeJob(
model.ReorgTypeLitMerge,
model.ActionAddPrimaryKey,
model.JobStateDone,
Expand Down Expand Up @@ -304,7 +304,7 @@ func TestIndexesKind(t *testing.T) {
}

recorder := ingestrec.New()
err := recorder.AddJob(fakeJob(
err := recorder.TryAddJob(fakeJob(
model.ReorgTypeLitMerge,
model.ActionAddIndex,
model.JobStateSynced,
Expand Down Expand Up @@ -382,7 +382,7 @@ func TestRewriteTableID(t *testing.T) {
},
}
recorder := ingestrec.New()
err := recorder.AddJob(fakeJob(
err := recorder.TryAddJob(fakeJob(
model.ReorgTypeLitMerge,
model.ActionAddIndex,
model.JobStateSynced,
Expand Down
25 changes: 24 additions & 1 deletion br/pkg/stream/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,24 @@ go_library(
"//br/pkg/storage",
"//br/pkg/streamhelper",
"//br/pkg/utils",
<<<<<<< HEAD
"//kv",
"//meta",
"//parser/model",
"//tablecodec",
"//util",
"//util/codec",
"//util/table-filter",
=======
"//pkg/ddl",
"//pkg/kv",
"//pkg/meta",
"//pkg/parser/model",
"//pkg/tablecodec",
"//pkg/util",
"//pkg/util/codec",
"//pkg/util/table-filter",
>>>>>>> be62f754fb4 (ddl: wrap the sessionctx to public delete range logic to BR (#48050))
"@com_github_fatih_color//:color",
"@com_github_klauspost_compress//zstd",
"@com_github_pingcap_errors//:errors",
Expand All @@ -54,10 +65,11 @@ go_test(
],
embed = [":stream"],
flaky = True,
shard_count = 24,
shard_count = 25,
deps = [
"//br/pkg/storage",
"//br/pkg/streamhelper",
<<<<<<< HEAD
"//meta",
"//parser/ast",
"//parser/model",
Expand All @@ -66,6 +78,17 @@ go_test(
"//types",
"//util/codec",
"//util/table-filter",
=======
"//pkg/ddl",
"//pkg/meta",
"//pkg/parser/ast",
"//pkg/parser/model",
"//pkg/parser/mysql",
"//pkg/tablecodec",
"//pkg/types",
"//pkg/util/codec",
"//pkg/util/table-filter",
>>>>>>> be62f754fb4 (ddl: wrap the sessionctx to public delete range logic to BR (#48050))
"@com_github_pingcap_kvproto//pkg/brpb",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//oracle",
Expand Down
Loading