Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

codec: encode checksum for timestamp consider tz (#50896) #50929

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -1407,7 +1407,7 @@ func (w *updateColumnWorker) calcChecksums() []uint32 {
if !sort.IsSorted(w.checksumBuffer) {
sort.Sort(w.checksumBuffer)
}
checksum, err := w.checksumBuffer.Checksum()
checksum, err := w.checksumBuffer.Checksum(w.sessCtx.GetSessionVars().StmtCtx.TimeZone())
if err != nil {
logutil.BgLogger().Warn("skip checksum in update-column backfill due to encode error", zap.Error(err))
return nil
Expand Down
7 changes: 5 additions & 2 deletions pkg/table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -1795,11 +1795,14 @@ func (t *TableCommon) calcChecksums(sctx sessionctx.Context, h kv.Handle, data [
}
checksums := make([]uint32, len(data))
for i, cols := range data {
row := rowcodec.RowData{Cols: cols, Data: buf}
row := rowcodec.RowData{
Cols: cols,
Data: buf,
}
if !sort.IsSorted(row) {
sort.Sort(row)
}
checksum, err := row.Checksum()
checksum, err := row.Checksum(sctx.GetSessionVars().StmtCtx.TimeZone())
buf = row.Data
if err != nil {
logWithContext(sctx, logutil.BgLogger().Error,
Expand Down
2 changes: 1 addition & 1 deletion pkg/table/tables/tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1666,7 +1666,7 @@ func TestWriteWithChecksums(t *testing.T) {
}
data := rowcodec.RowData{Cols: cols}
sort.Sort(data)
checksum, err := data.Checksum()
checksum, err := data.Checksum(time.Local)
assert.NoError(t, err)
expectChecksums = append(expectChecksums, checksum)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/rowcodec/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func BenchmarkChecksum(b *testing.B) {
}
row := rowcodec.RowData{Cols: cols}
for i := 0; i < b.N; i++ {
_, err := row.Checksum()
_, err := row.Checksum(time.Local)
if err != nil {
b.Fatal(err)
}
Expand Down
24 changes: 16 additions & 8 deletions pkg/util/rowcodec/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"hash/crc32"
"math"
"reflect"
"time"
"unsafe"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -253,8 +254,8 @@ type ColData struct {
}

// Encode encodes the column datum into bytes for checksum. If buf provided, append encoded data to it.
func (c ColData) Encode(buf []byte) ([]byte, error) {
return appendDatumForChecksum(buf, c.Datum, c.GetType())
func (c ColData) Encode(loc *time.Location, buf []byte) ([]byte, error) {
return appendDatumForChecksum(loc, buf, c.Datum, c.GetType())
}

// RowData is a list of ColData for row checksum calculation.
Expand All @@ -276,13 +277,13 @@ func (r RowData) Less(i int, j int) bool { return r.Cols[i].ID < r.Cols[j].ID }
func (r RowData) Swap(i int, j int) { r.Cols[i], r.Cols[j] = r.Cols[j], r.Cols[i] }

// Encode encodes all columns into bytes (for test purpose).
func (r *RowData) Encode() ([]byte, error) {
func (r *RowData) Encode(loc *time.Location) ([]byte, error) {
var err error
if len(r.Data) > 0 {
r.Data = r.Data[:0]
}
for _, col := range r.Cols {
r.Data, err = col.Encode(r.Data)
r.Data, err = col.Encode(loc, r.Data)
if err != nil {
return nil, err
}
Expand All @@ -291,12 +292,12 @@ func (r *RowData) Encode() ([]byte, error) {
}

// Checksum calculates the checksum of columns. Callers should make sure columns are sorted by id.
func (r *RowData) Checksum() (checksum uint32, err error) {
func (r *RowData) Checksum(loc *time.Location) (checksum uint32, err error) {
for _, col := range r.Cols {
if len(r.Data) > 0 {
r.Data = r.Data[:0]
}
r.Data, err = col.Encode(r.Data)
r.Data, err = col.Encode(loc, r.Data)
if err != nil {
return 0, err
}
Expand All @@ -305,7 +306,7 @@ func (r *RowData) Checksum() (checksum uint32, err error) {
return checksum, nil
}

func appendDatumForChecksum(buf []byte, dat *data.Datum, typ byte) (out []byte, err error) {
func appendDatumForChecksum(loc *time.Location, buf []byte, dat *data.Datum, typ byte) (out []byte, err error) {
defer func() {
if x := recover(); x != nil {
// catch panic when datum and type mismatch
Expand All @@ -321,7 +322,14 @@ func appendDatumForChecksum(buf []byte, dat *data.Datum, typ byte) (out []byte,
case mysql.TypeVarchar, mysql.TypeVarString, mysql.TypeString, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob:
out = appendLengthValue(buf, dat.GetBytes())
case mysql.TypeTimestamp, mysql.TypeDatetime, mysql.TypeDate, mysql.TypeNewDate:
out = appendLengthValue(buf, []byte(dat.GetMysqlTime().String()))
t := dat.GetMysqlTime()
if t.Type() == mysql.TypeTimestamp && loc != nil && loc != time.UTC {
err = t.ConvertTimeZone(loc, time.UTC)
if err != nil {
return
}
}
out = appendLengthValue(buf, []byte(t.String()))
case mysql.TypeDuration:
out = appendLengthValue(buf, []byte(dat.GetMysqlDuration().String()))
case mysql.TypeFloat, mysql.TypeDouble:
Expand Down
29 changes: 18 additions & 11 deletions pkg/util/rowcodec/rowcodec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,10 @@ func TestColumnEncode(t *testing.T) {
encodeBytes := func(v []byte) []byte {
return append(binary.LittleEndian.AppendUint32(nil, uint32(len(v))), v...)
}
convertTZ := func(ts types.Time) types.Time {
require.NoError(t, ts.ConvertTimeZone(time.Local, time.UTC))
return ts
}
var (
buf = make([]byte, 0, 128)
intZero = 0
Expand Down Expand Up @@ -989,25 +993,25 @@ func TestColumnEncode(t *testing.T) {
{
"timestamp", types.NewFieldType(mysql.TypeTimestamp),
types.NewTimeDatum(types.NewTime(ct, mysql.TypeTimestamp, 3)),
encodeBytes([]byte(types.NewTime(ct, mysql.TypeTimestamp, 3).String())),
encodeBytes([]byte(convertTZ(types.NewTime(ct, mysql.TypeTimestamp, 3)).String())),
true,
},
{
"timestamp/zero", types.NewFieldType(mysql.TypeTimestamp),
types.NewTimeDatum(types.ZeroTimestamp),
encodeBytes([]byte(types.ZeroTimestamp.String())),
encodeBytes([]byte(convertTZ(types.ZeroTimestamp).String())),
true,
},
{
"timestamp/min", types.NewFieldType(mysql.TypeTimestamp),
types.NewTimeDatum(types.MinTimestamp),
encodeBytes([]byte(types.MinTimestamp.String())),
encodeBytes([]byte(convertTZ(types.MinTimestamp).String())),
true,
},
{
"timestamp/max", types.NewFieldType(mysql.TypeTimestamp),
types.NewTimeDatum(types.MaxTimestamp),
encodeBytes([]byte(types.MaxTimestamp.String())),
encodeBytes([]byte(convertTZ(types.MaxTimestamp).String())),
true,
},
{
Expand Down Expand Up @@ -1103,7 +1107,7 @@ func TestColumnEncode(t *testing.T) {
} {
t.Run(tt.name, func(t *testing.T) {
col := rowcodec.ColData{&model.ColumnInfo{FieldType: *tt.typ}, &tt.dat}
raw, err := col.Encode(buf[:0])
raw, err := col.Encode(time.Local, buf[:0])
if tt.ok {
require.NoError(t, err)
if len(tt.raw) == 0 {
Expand Down Expand Up @@ -1152,7 +1156,7 @@ func TestColumnEncode(t *testing.T) {
ft := types.NewFieldType(typ)
dat := types.NewDatum(nil)
col := rowcodec.ColData{&model.ColumnInfo{FieldType: *ft}, &dat}
raw, err := col.Encode(nil)
raw, err := col.Encode(time.Local, nil)
require.NoError(t, err)
require.Len(t, raw, 0)
}
Expand All @@ -1168,7 +1172,10 @@ func TestRowChecksum(t *testing.T) {
col2 := rowcodec.ColData{&model.ColumnInfo{ID: 2, FieldType: *typ2}, &dat2}
typ3 := types.NewFieldType(mysql.TypeVarchar)
dat3 := types.NewDatum("foobar")
col3 := rowcodec.ColData{&model.ColumnInfo{ID: 2, FieldType: *typ3}, &dat3}
col3 := rowcodec.ColData{&model.ColumnInfo{ID: 3, FieldType: *typ3}, &dat3}
typ4 := types.NewFieldType(mysql.TypeTimestamp)
dat4 := types.NewTimeDatum(types.NewTime(types.FromGoTime(time.Now()), mysql.TypeTimestamp, 6))
col4 := rowcodec.ColData{&model.ColumnInfo{ID: 4, FieldType: *typ4}, &dat4}
buf := make([]byte, 0, 64)
for _, tt := range []struct {
name string
Expand All @@ -1177,17 +1184,17 @@ func TestRowChecksum(t *testing.T) {
{"nil", nil},
{"empty", []rowcodec.ColData{}},
{"nullonly", []rowcodec.ColData{col1}},
{"ordered", []rowcodec.ColData{col1, col2, col3}},
{"unordered", []rowcodec.ColData{col3, col1, col2}},
{"ordered", []rowcodec.ColData{col1, col2, col3, col4}},
{"unordered", []rowcodec.ColData{col3, col1, col4, col2}},
} {
t.Run(tt.name, func(t *testing.T) {
row := rowcodec.RowData{tt.cols, buf}
if !sort.IsSorted(row) {
sort.Sort(row)
}
checksum, err := row.Checksum()
checksum, err := row.Checksum(time.Local)
require.NoError(t, err)
raw, err := row.Encode()
raw, err := row.Encode(time.Local)
require.NoError(t, err)
require.Equal(t, crc32.ChecksumIEEE(raw), checksum)
})
Expand Down