Skip to content

Commit

Permalink
mounter(ticdc): calculate raw bytes checksum by using handle (#11720) (
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Nov 8, 2024
1 parent 5a10c36 commit d238ea7
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 83 deletions.
80 changes: 41 additions & 39 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,20 @@ func (m *mounter) unmarshalAndMountRowChanged(ctx context.Context, raw *model.Ra
return nil, cerror.ErrSnapshotTableNotFound.GenWithStackByArgs(physicalTableID)
}
if bytes.HasPrefix(key, recordPrefix) {
rowKV, err := m.unmarshalRowKVEntry(tableInfo, raw.Key, raw.Value, raw.OldValue, baseInfo)
recordID, err := tablecodec.DecodeRowKey(raw.Key)
if err != nil {
return nil, errors.Trace(err)
}
baseInfo.RecordID = recordID

rowKV, err := m.unmarshalRowKVEntry(tableInfo, raw.Value, raw.OldValue, baseInfo)
if err != nil {
return nil, errors.Trace(err)
}
if rowKV == nil {
return nil, nil
}
row, rawRow, err := m.mountRowKVEntry(tableInfo, rowKV, checksumKey, raw.ApproximateDataSize())
row, rawRow, err := m.mountRowKVEntry(tableInfo, rowKV, recordID, checksumKey, raw.ApproximateDataSize())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -231,28 +237,21 @@ func (m *mounter) unmarshalAndMountRowChanged(ctx context.Context, raw *model.Ra

func (m *mounter) unmarshalRowKVEntry(
tableInfo *model.TableInfo,
rawKey []byte,
rawValue []byte,
rawOldValue []byte,
base baseKVEntry,
) (*rowKVEntry, error) {
recordID, err := tablecodec.DecodeRowKey(rawKey)
if err != nil {
return nil, errors.Trace(err)
}
base.RecordID = recordID

var (
row, preRow map[int64]types.Datum
rowExist, preRowExist bool
)

row, rowExist, err = m.decodeRow(rawValue, recordID, tableInfo, false)
row, rowExist, err := m.decodeRow(rawValue, base.RecordID, tableInfo, false)
if err != nil {
return nil, errors.Trace(err)
}

preRow, preRowExist, err = m.decodeRow(rawOldValue, recordID, tableInfo, true)
preRow, preRowExist, err = m.decodeRow(rawOldValue, base.RecordID, tableInfo, true)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -490,33 +489,34 @@ func (m *mounter) verifyColumnChecksum(

checksum, err := calculateColumnChecksum(columnInfos, rawColumns, m.tz)
if err != nil {
log.Error("failed to calculate the checksum", zap.Uint32("first", first), zap.Error(err))
log.Error("failed to calculate the checksum",
zap.Uint32("first", first), zap.Any("columnInfos", columnInfos),
zap.Any("rawColumns", rawColumns), zap.Error(err))
return 0, false, err
}

// the first checksum matched, it hits in the most case.
if checksum == first {
log.Debug("checksum matched", zap.Uint32("checksum", checksum), zap.Uint32("first", first))
return checksum, true, nil
}

extra, ok := decoder.GetExtraChecksum()
if ok && checksum == extra {
log.Debug("extra checksum matched, this may happen the upstream TiDB is during the DDL execution phase",
zap.Uint32("checksum", checksum), zap.Uint32("extra", extra))
return checksum, true, nil
}

if !skipFail {
log.Error("cannot found the extra checksum, the first checksum mismatched",
zap.Uint32("checksum", checksum), zap.Uint32("first", first), zap.Uint32("extra", extra))
zap.Uint32("checksum", checksum), zap.Uint32("first", first), zap.Uint32("extra", extra),
zap.Any("columnInfos", columnInfos), zap.Any("rawColumns", rawColumns), zap.Any("tz", m.tz))
return checksum, false, nil
}

if time.Since(m.lastSkipOldValueTime) > time.Minute {
log.Warn("checksum mismatch on the old value, "+
"this may caused by Add Column / Drop Column executed, skip verification",
zap.Uint32("checksum", checksum), zap.Uint32("first", first), zap.Uint32("extra", extra))
zap.Uint32("checksum", checksum), zap.Uint32("first", first), zap.Uint32("extra", extra),
zap.Any("columnInfos", columnInfos), zap.Any("rawColumns", rawColumns), zap.Any("tz", m.tz))
m.lastSkipOldValueTime = time.Now()
}
return checksum, true, nil
Expand Down Expand Up @@ -602,7 +602,7 @@ func newDatum(value interface{}, ft types.FieldType) (types.Datum, error) {

func verifyRawBytesChecksum(
tableInfo *model.TableInfo, columns []*model.ColumnData, decoder *rowcodec.DatumMapDecoder,
key kv.Key, tz *time.Location,
handle kv.Handle, key kv.Key, tz *time.Location,
) (uint32, bool, error) {
expected, ok := decoder.GetChecksum()
if !ok {
Expand All @@ -621,12 +621,14 @@ func verifyRawBytesChecksum(
columnInfo := tableInfo.ForceGetColumnInfo(columnID)
datum, err := newDatum(col.Value, columnInfo.FieldType)
if err != nil {
log.Error("build datum for raw checksum calculation failed",
zap.Any("col", col), zap.Any("columnInfo", columnInfo), zap.Error(err))
return 0, false, errors.Trace(err)
}
datums = append(datums, &datum)
columnIDs = append(columnIDs, columnID)
}
obtained, err := decoder.CalculateRawChecksum(tz, columnIDs, datums, key, nil)
obtained, err := decoder.CalculateRawChecksum(tz, columnIDs, datums, key, handle, nil)
if err != nil {
return 0, false, errors.Trace(err)
}
Expand All @@ -635,7 +637,10 @@ func verifyRawBytesChecksum(
}

log.Error("raw bytes checksum mismatch",
zap.Uint32("expected", expected), zap.Uint32("obtained", obtained))
zap.Int("version", decoder.ChecksumVersion()),
zap.Uint32("expected", expected), zap.Uint32("obtained", obtained),
zap.Any("tableInfo", tableInfo), zap.Any("columns", columns),
zap.Any("handle", handle.String()), zap.Any("tz", tz))

return expected, false, nil
}
Expand All @@ -645,7 +650,7 @@ func verifyRawBytesChecksum(
func (m *mounter) verifyChecksum(
tableInfo *model.TableInfo, columnInfos []*timodel.ColumnInfo,
columns []*model.ColumnData, rawColumns []types.Datum,
key kv.Key, isPreRow bool,
handle kv.Handle, key kv.Key, isPreRow bool,
) (uint32, bool, error) {
if !m.integrity.Enabled() {
return 0, true, nil
Expand All @@ -665,17 +670,22 @@ func (m *mounter) verifyChecksum(
// Update / Delete event correctly, after Add Column / Drop column DDL,
// since the table schema does not contain complete column information.
return m.verifyColumnChecksum(columnInfos, rawColumns, decoder, isPreRow)
case 1:
expected, matched, err := verifyRawBytesChecksum(tableInfo, columns, decoder, key, m.tz)
case 1, 2:
expected, matched, err := verifyRawBytesChecksum(tableInfo, columns, decoder, handle, key, m.tz)
if err != nil {
log.Error("calculate raw checksum failed",
zap.Int("version", version), zap.Any("tz", m.tz), zap.Any("handle", handle.String()),
zap.Any("key", key), zap.Any("columns", columns), zap.Error(err))
return 0, false, errors.Trace(err)
}
if !matched {
return expected, matched, err
}
columnChecksum, err := calculateColumnChecksum(columnInfos, rawColumns, m.tz)
if err != nil {
log.Error("failed to calculate column-level checksum, after raw checksum verification passed", zap.Error(err))
log.Error("failed to calculate column-level checksum, after raw checksum verification passed",
zap.Any("columnsInfo", columnInfos), zap.Any("rawColumns", rawColumns),
zap.Any("tz", m.tz), zap.Error(err))
return 0, false, errors.Trace(err)
}
return columnChecksum, true, nil
Expand All @@ -685,7 +695,7 @@ func (m *mounter) verifyChecksum(
}

func (m *mounter) mountRowKVEntry(
tableInfo *model.TableInfo, row *rowKVEntry, key kv.Key, dataSize int64,
tableInfo *model.TableInfo, row *rowKVEntry, handle kv.Handle, key kv.Key, dataSize int64,
) (*model.RowChangedEvent, model.RowChangedDatums, error) {
var (
rawRow model.RowChangedDatums
Expand Down Expand Up @@ -719,19 +729,15 @@ func (m *mounter) mountRowKVEntry(
return nil, rawRow, errors.Trace(err)
}

preChecksum, matched, err = m.verifyChecksum(tableInfo, columnInfos, preCols, preRawCols, key, true)
preChecksum, matched, err = m.verifyChecksum(tableInfo, columnInfos, preCols, preRawCols, handle, key, true)
if err != nil {
log.Error("calculate the previous columns checksum failed",
zap.Any("tableInfo", tableInfo),
zap.Any("rawCols", preRawCols))
return nil, rawRow, errors.Trace(err)
}

if !matched {
log.Error("previous columns checksum mismatch",
zap.Uint32("checksum", preChecksum),
zap.Any("tableInfo", tableInfo),
zap.Any("rawCols", preRawCols))
zap.Uint32("checksum", preChecksum), zap.Any("tableInfo", tableInfo),
zap.Any("preCols", preCols), zap.Any("rawCols", preRawCols))
if m.integrity.ErrorHandle() {
return nil, rawRow, cerror.ErrCorruptedDataMutation.
GenWithStackByArgs(m.changefeedID.Namespace, m.changefeedID.ID)
Expand All @@ -751,18 +757,14 @@ func (m *mounter) mountRowKVEntry(
return nil, rawRow, errors.Trace(err)
}

currentChecksum, matched, err = m.verifyChecksum(tableInfo, columnInfos, cols, rawCols, key, false)
currentChecksum, matched, err = m.verifyChecksum(tableInfo, columnInfos, cols, rawCols, handle, key, false)
if err != nil {
log.Error("calculate the current columns checksum failed",
zap.Any("tableInfo", tableInfo),
zap.Any("rawCols", rawCols))
return nil, rawRow, errors.Trace(err)
}
if !matched {
log.Error("current columns checksum mismatch",
zap.Uint32("checksum", currentChecksum),
zap.Any("tableInfo", tableInfo),
zap.Any("rawCols", rawCols))
zap.Uint32("checksum", currentChecksum), zap.Any("tableInfo", tableInfo),
zap.Any("cols", cols), zap.Any("rawCols", rawCols))
if m.integrity.ErrorHandle() {
return nil, rawRow, cerror.ErrCorruptedDataMutation.
GenWithStackByArgs(m.changefeedID.Namespace, m.changefeedID.ID)
Expand Down
11 changes: 0 additions & 11 deletions cdc/puller/ddl_puller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
timodel "github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
Expand Down Expand Up @@ -531,16 +530,6 @@ func TestHandleJob(t *testing.T) {
job := &timodel.Job{
Type: timodel.ActionFlashbackCluster,
BinlogInfo: &timodel.HistoryInfo{},
Args: []interface{}{
998,
map[string]interface{}{},
true, /* tidb_gc_enable */
variable.On, /* tidb_enable_auto_analyze */
variable.Off, /* tidb_super_read_only */
0, /* totalRegions */
0, /* startTS */
0, /* commitTS */
},
}
skip, err := ddlJobPullerImpl.handleJob(job)
require.NoError(t, err)
Expand Down
17 changes: 9 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ require (
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86
github.com/pingcap/kvproto v0.0.0-20240924080114-4a3e17f5e62d
github.com/pingcap/log v1.1.1-0.20240314023424-862ccc32f18d
github.com/pingcap/tidb v1.1.0-beta.0.20241014034929-94b2ac04a0c4
github.com/pingcap/tidb v1.1.0-beta.0.20241107131230-e2505e95a03c
github.com/pingcap/tidb-dashboard v0.0.0-20240326110213-9768844ff5d7
github.com/pingcap/tidb/pkg/parser v0.0.0-20241014034929-94b2ac04a0c4
github.com/prometheus/client_golang v1.20.4
github.com/prometheus/client_golang v1.20.5
github.com/prometheus/client_model v0.6.1
github.com/r3labs/diff v1.1.0
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
Expand All @@ -89,9 +89,9 @@ require (
github.com/swaggo/swag v1.16.3
github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954
github.com/thanhpk/randstr v1.0.6
github.com/tikv/client-go/v2 v2.0.8-0.20241008085809-c3e10ae7c8fc
github.com/tikv/client-go/v2 v2.0.8-0.20241023023120-691e80ae0ea9
github.com/tikv/pd v1.1.0-beta.0.20240407022249-7179657d129b
github.com/tikv/pd/client v0.0.0-20240926021936-642f0e919b0d
github.com/tikv/pd/client v0.0.0-20241016064947-b70107ec31e6
github.com/tinylib/msgp v1.1.6
github.com/uber-go/atomic v1.4.0
github.com/vmihailenco/msgpack/v5 v5.3.5
Expand All @@ -116,7 +116,7 @@ require (
golang.org/x/sync v0.8.0
golang.org/x/sys v0.26.0
golang.org/x/text v0.19.0
golang.org/x/time v0.5.0
golang.org/x/time v0.7.0
google.golang.org/genproto/googleapis/api v0.0.0-20240401170217-c3f982113cda
google.golang.org/genproto/googleapis/rpc v0.0.0-20240515191416-fc5f0ca64291
google.golang.org/grpc v1.64.0
Expand Down Expand Up @@ -159,7 +159,7 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-resty/resty/v2 v2.11.0 // indirect
github.com/goccy/go-reflect v1.2.0 // indirect
github.com/golang-jwt/jwt/v4 v4.5.0 // indirect
github.com/golang-jwt/jwt/v4 v4.5.1 // indirect
github.com/golang-jwt/jwt/v5 v5.2.1 // indirect
github.com/google/flatbuffers v2.0.8+incompatible // indirect
github.com/google/gofuzz v1.2.0 // indirect
Expand All @@ -173,6 +173,7 @@ require (
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
github.com/ks3sdklib/aws-sdk-go v1.2.9 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/lance6716/pebble v0.0.0-20241104073946-6f55c09bd183 // indirect
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
Expand Down Expand Up @@ -251,7 +252,7 @@ require (
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/desertbit/timer v0.0.0-20180107155436-c41aec40b27f // indirect
github.com/dgraph-io/ristretto v0.1.1 // indirect
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 // indirect
github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da // indirect
github.com/dvsekhvalnov/jose2go v1.5.0 // indirect
github.com/eapache/go-resiliency v1.4.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
Expand Down Expand Up @@ -325,7 +326,7 @@ require (
github.com/petermattis/goid v0.0.0-20240813172612-4fcff4a6cae7 // indirect
github.com/philhofer/fwd v1.1.1 // indirect
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
github.com/pingcap/badger v1.5.1-0.20230103063557-828f39b09b6d // indirect
github.com/pingcap/badger v1.5.1-0.20241015064302-38533b6cbf8d // indirect
github.com/pingcap/fn v1.0.0 // indirect
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 // indirect
github.com/pingcap/sysutil v1.0.1-0.20240311050922-ae81ee01f3a5
Expand Down
Loading

0 comments on commit d238ea7

Please sign in to comment.