Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: fix panic when add index on prefixed pk tables #39740

Merged
merged 2 commits into from
Dec 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion ddl/index_cop.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
Expand All @@ -36,6 +38,8 @@ import (
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/collate"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/generic"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/timeutil"
Expand Down Expand Up @@ -120,6 +124,10 @@ type copReqSender struct {
func (c *copReqSender) run() {
p := c.senderPool
defer p.wg.Done()
var curTaskID int
defer util.Recover(metrics.LabelDDL, "copReqSender.run", func() {
p.resultsCh <- idxRecResult{id: curTaskID, err: dbterror.ErrReorgPanic}
}, false)
for {
if util.HasCancelled(c.ctx) {
return
Expand All @@ -128,13 +136,19 @@ func (c *copReqSender) run() {
if !ok {
return
}
curTaskID = task.id
logutil.BgLogger().Info("[ddl-ingest] start a cop-request task",
zap.Int("id", task.id), zap.String("task", task.String()))
rs, err := p.copCtx.buildTableScan(p.ctx, p.startTS, task.startKey, task.excludedEndKey())
if err != nil {
p.resultsCh <- idxRecResult{id: task.id, err: err}
return
}
failpoint.Inject("MockCopSenderPanic", func(val failpoint.Value) {
if val.(bool) {
panic("mock panic")
}
})
var done bool
var total int
for !done {
Expand Down Expand Up @@ -437,12 +451,39 @@ func (c *copContext) fetchTableScanResult(ctx context.Context, result distsql.Se
if err != nil {
return nil, false, errors.Trace(err)
}
rsData := tables.TryGetHandleRestoredDataWrapper(c.tblInfo, hdDt, nil, c.idxInfo)
rsData := getRestoreData(c.tblInfo, c.idxInfo, c.pkInfo, hdDt)
buf = append(buf, &indexRecord{handle: handle, key: nil, vals: idxDt, rsData: rsData, skip: false})
}
return buf, false, nil
}

func getRestoreData(tblInfo *model.TableInfo, targetIdx, pkIdx *model.IndexInfo, handleDts []types.Datum) []types.Datum {
if !collate.NewCollationEnabled() || !tblInfo.IsCommonHandle || tblInfo.CommonHandleVersion == 0 {
return nil
}
if pkIdx == nil {
return nil
}
for i, pkIdxCol := range pkIdx.Columns {
pkCol := tblInfo.Columns[pkIdxCol.Offset]
if !types.NeedRestoredData(&pkCol.FieldType) {
// Since the handle data cannot be null, we can use SetNull to
// indicate that this column does not need to be restored.
handleDts[i].SetNull()
continue
}
tables.TryTruncateRestoredData(&handleDts[i], pkCol, pkIdxCol, targetIdx)
tables.ConvertDatumToTailSpaceCount(&handleDts[i], pkCol)
}
dtToRestored := handleDts[:0]
for _, handleDt := range handleDts {
if !handleDt.IsNull() {
dtToRestored = append(dtToRestored, handleDt)
}
}
return dtToRestored
}

func buildDAGPB(sCtx sessionctx.Context, tblInfo *model.TableInfo, colInfos []*model.ColumnInfo) (*tipb.DAGRequest, error) {
dagReq := &tipb.DAGRequest{}
dagReq.TimeZoneName, dagReq.TimeZoneOffset = timeutil.Zone(sCtx.GetSessionVars().Location())
Expand Down
45 changes: 27 additions & 18 deletions table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -1890,28 +1890,37 @@ func TryGetHandleRestoredDataWrapper(tblInfo *model.TableInfo, row []types.Datum
} else {
datum = row[pkCol.Offset]
}
// Try to truncate index values.
// Says that primary key(a (8)),
// For index t(a), don't truncate the value.
// For index t(a(9)), truncate to a(9).
// For index t(a(7)), truncate to a(8).
truncateTargetCol := pkIdxCol
for _, idxCol := range idx.Columns {
if idxCol.Offset == pkCol.Offset {
truncateTargetCol = maxIndexLen(pkIdxCol, idxCol)
break
}
}
tablecodec.TruncateIndexValue(&datum, truncateTargetCol, pkCol)
if collate.IsBinCollation(pkCol.GetCollate()) {
rsData = append(rsData, types.NewIntDatum(stringutil.GetTailSpaceCount(datum.GetString())))
} else {
rsData = append(rsData, datum)
}
TryTruncateRestoredData(&datum, pkCol, pkIdxCol, idx)
ConvertDatumToTailSpaceCount(&datum, pkCol)
rsData = append(rsData, datum)
}
return rsData
}

// TryTruncateRestoredData tries to truncate index values.
// Says that primary key(a (8)),
// For index t(a), don't truncate the value.
// For index t(a(9)), truncate to a(9).
// For index t(a(7)), truncate to a(8).
func TryTruncateRestoredData(datum *types.Datum, pkCol *model.ColumnInfo,
pkIdxCol *model.IndexColumn, idx *model.IndexInfo) {
truncateTargetCol := pkIdxCol
for _, idxCol := range idx.Columns {
if idxCol.Offset == pkIdxCol.Offset {
truncateTargetCol = maxIndexLen(pkIdxCol, idxCol)
break
}
}
tablecodec.TruncateIndexValue(datum, truncateTargetCol, pkCol)
}

// ConvertDatumToTailSpaceCount converts a string datum to an int datum that represents the tail space count.
func ConvertDatumToTailSpaceCount(datum *types.Datum, col *model.ColumnInfo) {
if collate.IsBinCollation(col.GetCollate()) {
*datum = types.NewIntDatum(stringutil.GetTailSpaceCount(datum.GetString()))
}
}

func maxIndexLen(idxA, idxB *model.IndexColumn) *model.IndexColumn {
if idxA.Length == types.UnspecifiedLength {
return idxA
Expand Down
48 changes: 48 additions & 0 deletions tests/realtikvtest/addindextest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,3 +272,51 @@ func TestAddIndexIngestEmptyTable(t *testing.T) {
jobTp := rows[0][3].(string)
require.True(t, strings.Contains(jobTp, "ingest"), jobTp)
}

func TestAddIndexIngestRestoredData(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("drop database if exists addindexlit;")
tk.MustExec("create database addindexlit;")
tk.MustExec("use addindexlit;")
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)

tk.MustExec(`
CREATE TABLE tbl_5 (
col_21 time DEFAULT '04:48:17',
col_22 varchar(403) COLLATE utf8_unicode_ci DEFAULT NULL,
col_23 year(4) NOT NULL,
col_24 char(182) CHARACTER SET gbk COLLATE gbk_chinese_ci NOT NULL,
col_25 set('Alice','Bob','Charlie','David') COLLATE utf8_unicode_ci DEFAULT NULL,
PRIMARY KEY (col_24(3)) /*T![clustered_index] CLUSTERED */,
KEY idx_10 (col_22)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;
`)
tk.MustExec("INSERT INTO tbl_5 VALUES ('15:33:15','&U+x1',2007,'','Bob');")
tk.MustExec("alter table tbl_5 add unique key idx_13 ( col_23 );")
tk.MustExec("admin check table tbl_5;")
rows := tk.MustQuery("admin show ddl jobs 1;").Rows()
require.Len(t, rows, 1)
jobTp := rows[0][3].(string)
require.True(t, strings.Contains(jobTp, "ingest"), jobTp)
}

func TestAddIndexIngestPanicOnCopRead(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("drop database if exists addindexlit;")
tk.MustExec("create database addindexlit;")
tk.MustExec("use addindexlit;")
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/MockCopSenderPanic", "return(true)"))
tk.MustExec("create table t (a int, b int, c int, d int, primary key (a) clustered);")
tk.MustExec("insert into t (a, b, c, d) values (1, 1, 1, 1), (2, 2, 2, 2), (3, 3, 3, 3);")
tk.MustExec("alter table t add index idx(b);")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/MockCopSenderPanic"))
rows := tk.MustQuery("admin show ddl jobs 1;").Rows()
require.Len(t, rows, 1)
jobTp := rows[0][3].(string)
// Fallback to txn-merge process.
require.True(t, strings.Contains(jobTp, "txn-merge"), jobTp)
}