Skip to content

Commit

Permalink
txn: use handle to encode checksum instead of the key (#57139) (#57186)
Browse files Browse the repository at this point in the history
close #57174
  • Loading branch information
ti-chi-bot authored Nov 14, 2024
1 parent 7d9ee81 commit 70b8a30
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 25 deletions.
2 changes: 1 addition & 1 deletion pkg/ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,7 @@ func (w *updateColumnWorker) getRowRecord(handle kv.Handle, recordKey []byte, ra
ec := w.exprCtx.GetEvalCtx().ErrCtx()
var checksum rowcodec.Checksum
if w.checksumNeeded {
checksum = rowcodec.RawChecksum{Key: recordKey}
checksum = rowcodec.RawChecksum{Handle: handle}
}
newRowVal, err := tablecodec.EncodeRow(sysTZ, newRow, newColumnIDs, nil, nil, checksum, rd)
err = ec.HandleError(err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/column_type_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func TestRowFormatWithChecksums(t *testing.T) {
data, err := h.GetMvccByEncodedKey(encodedKey)
require.NoError(t, err)
// row value with checksums
expected := []byte{0x80, 0x2, 0x3, 0x0, 0x0, 0x0, 0x1, 0x2, 0x3, 0x1, 0x0, 0x4, 0x0, 0x7, 0x0, 0x1, 0x31, 0x32, 0x33, 0x31, 0x32, 0x33, 0x1, 0xa9, 0x7a, 0xf4, 0xc8}
expected := []byte{0x80, 0x2, 0x3, 0x0, 0x0, 0x0, 0x1, 0x2, 0x3, 0x1, 0x0, 0x4, 0x0, 0x7, 0x0, 0x1, 0x31, 0x32, 0x33, 0x31, 0x32, 0x33, 0x2, 0x9e, 0x56, 0xf5, 0x45}
require.Equal(t, expected, data.Info.Writes[0].ShortValue)
tk.MustExec("drop table if exists t")
}
Expand All @@ -284,7 +284,7 @@ func TestRowLevelChecksumWithMultiSchemaChange(t *testing.T) {
data, err := h.GetMvccByEncodedKey(encodedKey)
require.NoError(t, err)
// checksum skipped and with a null col vv
expected := []byte{0x80, 0x2, 0x3, 0x0, 0x1, 0x0, 0x1, 0x2, 0x4, 0x3, 0x1, 0x0, 0x4, 0x0, 0x7, 0x0, 0x1, 0x31, 0x32, 0x33, 0x31, 0x32, 0x33, 0x1, 0xeb, 0x42, 0xda, 0x20}
expected := []byte{0x80, 0x2, 0x3, 0x0, 0x1, 0x0, 0x1, 0x2, 0x4, 0x3, 0x1, 0x0, 0x4, 0x0, 0x7, 0x0, 0x1, 0x31, 0x32, 0x33, 0x31, 0x32, 0x33, 0x2, 0x0, 0x4f, 0xd2, 0x26}
require.Equal(t, expected, data.Info.Writes[0].ShortValue)
tk.MustExec("drop table if exists t")
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ func (t *TableCommon) updateRecord(sctx table.MutateContext, txn kv.Transaction,

key := t.RecordKey(h)
tc, ec := evalCtx.TypeCtx(), evalCtx.ErrCtx()
err = encodeRowBuffer.WriteMemBufferEncoded(sctx.GetRowEncodingConfig(), tc.Location(), ec, memBuffer, key)
err = encodeRowBuffer.WriteMemBufferEncoded(sctx.GetRowEncodingConfig(), tc.Location(), ec, memBuffer, key, h)
if err != nil {
return err
}
Expand Down Expand Up @@ -886,7 +886,7 @@ func (t *TableCommon) addRecord(sctx table.MutateContext, txn kv.Transaction, r
}
}

err = encodeRowBuffer.WriteMemBufferEncoded(sctx.GetRowEncodingConfig(), tc.Location(), ec, memBuffer, key, flags...)
err = encodeRowBuffer.WriteMemBufferEncoded(sctx.GetRowEncodingConfig(), tc.Location(), ec, memBuffer, key, recordID, flags...)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/table/tblctx/buffers.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ func (b *EncodeRowBuffer) AddColVal(colID int64, val types.Datum) {
// WriteMemBufferEncoded writes the encoded row to the memBuffer.
func (b *EncodeRowBuffer) WriteMemBufferEncoded(
cfg RowEncodingConfig, loc *time.Location, ec errctx.Context,
memBuffer kv.MemBuffer, key kv.Key, flags ...kv.FlagsOp,
memBuffer kv.MemBuffer, key kv.Key, handle kv.Handle, flags ...kv.FlagsOp,
) error {
var checksum rowcodec.Checksum
if cfg.IsRowLevelChecksumEnabled {
checksum = rowcodec.RawChecksum{Key: key}
checksum = rowcodec.RawChecksum{Handle: handle}
}

stmtBufs := b.writeStmtBufs
Expand Down
6 changes: 3 additions & 3 deletions pkg/table/tblctx/buffers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func TestEncodeRow(t *testing.T) {

var checksum rowcodec.Checksum
if cfg.IsRowLevelChecksumEnabled {
checksum = rowcodec.RawChecksum{Key: kv.Key("key1")}
checksum = rowcodec.RawChecksum{Handle: kv.IntHandle(1)}
}

expectedVal, err := tablecodec.EncodeRow(
Expand All @@ -125,7 +125,7 @@ func TestEncodeRow(t *testing.T) {
}
err = buffer.WriteMemBufferEncoded(
cfg, c.loc, errctx.StrictNoWarningContext,
memBuffer, kv.Key("key1"), c.flags...,
memBuffer, kv.Key("key1"), kv.IntHandle(1), c.flags...,
)
require.NoError(t, err)
memBuffer.AssertExpectations(t)
Expand Down Expand Up @@ -166,7 +166,7 @@ func TestEncodeBufferReserve(t *testing.T) {
require.Equal(t, 2, len(buffer.row))
require.NoError(t, buffer.WriteMemBufferEncoded(RowEncodingConfig{
RowEncoder: &rowcodec.Encoder{Enable: true},
}, time.UTC, errctx.StrictNoWarningContext, mb, kv.Key("key1")))
}, time.UTC, errctx.StrictNoWarningContext, mb, kv.Key("key1"), kv.IntHandle(1)))
encodedCap := cap(buffer.writeStmtBufs.RowValBuf)
require.Greater(t, encodedCap, 0)
require.Equal(t, 4, len(buffer.writeStmtBufs.AddRowValues))
Expand Down
20 changes: 14 additions & 6 deletions pkg/util/rowcodec/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/codec"
Expand Down Expand Up @@ -238,22 +239,29 @@ func (NoChecksum) encode(encoder *Encoder, buf []byte) ([]byte, error) {
return encoder.toBytes(buf), nil
}

const checksumVersionRaw byte = 1
// introduced since v7.1.0
const checksumVersionColumn byte = 0

// introduced since v8.3.0
const checksumVersionRawKey byte = 1

// introduced since v8.4.0
const checksumVersionRawHandle byte = 2

// RawChecksum indicates encode the raw bytes checksum and append it to the raw bytes.
type RawChecksum struct {
Key []byte
Handle kv.Handle
}

func (c RawChecksum) encode(encoder *Encoder, buf []byte) ([]byte, error) {
encoder.flags |= rowFlagChecksum
encoder.checksumHeader &^= checksumFlagExtra // revert extra checksum flag
encoder.checksumHeader &^= checksumMaskVersion // revert checksum version
encoder.checksumHeader |= checksumVersionRaw // set checksum version
encoder.checksumHeader &^= checksumFlagExtra // revert extra checksum flag
encoder.checksumHeader &^= checksumMaskVersion // revert checksum version
encoder.checksumHeader |= checksumVersionRawHandle // set checksum version
valueBytes := encoder.toBytes(buf)
valueBytes = append(valueBytes, encoder.checksumHeader)
encoder.checksum1 = crc32.Checksum(valueBytes, crc32.IEEETable)
encoder.checksum1 = crc32.Update(encoder.checksum1, crc32.IEEETable, c.Key)
encoder.checksum1 = crc32.Update(encoder.checksum1, crc32.IEEETable, c.Handle.Encoded())
valueBytes = binary.LittleEndian.AppendUint32(valueBytes, encoder.checksum1)
return valueBytes, nil
}
17 changes: 10 additions & 7 deletions pkg/util/rowcodec/row.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,9 @@ func (r *row) fromBytes(rowData []byte) error {
r.checksumHeader = rowData[cursor]
checksumVersion := r.ChecksumVersion()
// make sure it can be read previous version checksum to support backward compatibility.
if checksumVersion != 0 && checksumVersion != 1 {
switch checksumVersion {
case 0, 1, 2:
default:
return errInvalidChecksumVer
}
cursor++
Expand Down Expand Up @@ -303,12 +305,8 @@ func (r *row) initOffsets32() {
// CalculateRawChecksum calculates the bytes-level checksum by using the given elements.
// this is mainly used by the TiCDC to implement E2E checksum functionality.
func (r *row) CalculateRawChecksum(
loc *time.Location, colIDs []int64, values []*types.Datum, key kv.Key, buf []byte,
loc *time.Location, colIDs []int64, values []*types.Datum, key kv.Key, handle kv.Handle, buf []byte,
) (uint32, error) {
r.flags |= rowFlagChecksum
r.checksumHeader &^= checksumFlagExtra // revert extra checksum flag
r.checksumHeader &^= checksumMaskVersion // revert checksum version
r.checksumHeader |= checksumVersionRaw // set checksum version
for idx, colID := range colIDs {
data, err := encodeValueDatum(loc, values[idx], nil)
if err != nil {
Expand All @@ -325,6 +323,11 @@ func (r *row) CalculateRawChecksum(
buf = r.toBytes(buf)
buf = append(buf, r.checksumHeader)
rawChecksum := crc32.Checksum(buf, crc32.IEEETable)
rawChecksum = crc32.Update(rawChecksum, crc32.IEEETable, key)
// keep backward compatibility to v8.3.0
if r.ChecksumVersion() == int(checksumVersionRawKey) {
rawChecksum = crc32.Update(rawChecksum, crc32.IEEETable, key)
} else {
rawChecksum = crc32.Update(rawChecksum, crc32.IEEETable, handle.Encoded())
}
return rawChecksum, nil
}
4 changes: 2 additions & 2 deletions pkg/util/rowcodec/rowcodec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1209,7 +1209,7 @@ func TestEncodeDecodeRowWithChecksum(t *testing.T) {
require.Zero(t, checksum)

rawChecksum := rowcodec.RawChecksum{
Key: []byte("0x1"),
Handle: kv.IntHandle(1),
}
raw, err = enc.Encode(time.UTC, nil, nil, rawChecksum, nil)
require.NoError(t, err)
Expand All @@ -1226,7 +1226,7 @@ func TestEncodeDecodeRowWithChecksum(t *testing.T) {
require.Equal(t, expected, checksum)

version := dec.ChecksumVersion()
require.Equal(t, 1, version)
require.Equal(t, 2, version)
}

var (
Expand Down

0 comments on commit 70b8a30

Please sign in to comment.