Skip to content

Commit

Permalink
mounter(ticdc): calculate row level checksum for timestmap by using U…
Browse files Browse the repository at this point in the history
…TC time zone (#10564)

close #10573
  • Loading branch information
3AceShowHand authored Feb 22, 2024
1 parent dcdaead commit 71b5a0a
Show file tree
Hide file tree
Showing 33 changed files with 745 additions and 553 deletions.
55 changes: 32 additions & 23 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,9 +391,35 @@ func datum2Column(
return cols, rawCols, columnInfos, nil
}

// return error if cannot get the expected checksum from the decoder
func (m *mounter) calculateChecksum(
columnInfos []*timodel.ColumnInfo, rawColumns []types.Datum,
) (uint32, error) {
columns := make([]rowcodec.ColData, 0, len(rawColumns))
for idx, col := range columnInfos {
column := rowcodec.ColData{
ColumnInfo: col,
Datum: &rawColumns[idx],
}
columns = append(columns, column)
}
sort.Slice(columns, func(i, j int) bool {
return columns[i].ID < columns[j].ID
})

calculator := rowcodec.RowData{
Cols: columns,
Data: make([]byte, 0),
}

checksum, err := calculator.Checksum(m.tz)
if err != nil {
return 0, errors.Trace(err)
}
return checksum, nil
}

// return error when calculate the checksum.
// return false if the checksum is not matched
// return true if the checksum is matched and the checksum is the matched one.
func (m *mounter) verifyChecksum(
columnInfos []*timodel.ColumnInfo, rawColumns []types.Datum, isPreRow bool,
) (uint32, int, bool, error) {
Expand All @@ -419,24 +445,9 @@ func (m *mounter) verifyChecksum(
return 0, version, true, nil
}

columns := make([]rowcodec.ColData, 0, len(rawColumns))
for idx, col := range columnInfos {
columns = append(columns, rowcodec.ColData{
ColumnInfo: col,
Datum: &rawColumns[idx],
})
}
sort.Slice(columns, func(i, j int) bool {
return columns[i].ID < columns[j].ID
})
calculator := rowcodec.RowData{
Cols: columns,
Data: make([]byte, 0),
}

checksum, err := calculator.Checksum()
checksum, err := m.calculateChecksum(columnInfos, rawColumns)
if err != nil {
log.Error("failed to calculate the checksum", zap.Error(err))
log.Error("failed to calculate the checksum", zap.Uint32("first", first), zap.Error(err))
return 0, version, false, errors.Trace(err)
}

Expand All @@ -451,10 +462,8 @@ func (m *mounter) verifyChecksum(
if !ok {
log.Error("cannot found the extra checksum, the first checksum mismatched",
zap.Uint32("checksum", checksum),
zap.Uint32("first", first),
zap.Uint32("extra", extra))
return checksum, version,
false, errors.New("cannot found the extra checksum from the event")
zap.Uint32("first", first))
return checksum, version, false, nil
}

if checksum == extra {
Expand Down
21 changes: 21 additions & 0 deletions cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1192,6 +1192,27 @@ func TestE2ERowLevelChecksum(t *testing.T) {
require.NoError(t, err)
}

func TestVerifyChecksumTime(t *testing.T) {
replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.Integrity.IntegrityCheckLevel = integrity.CheckLevelCorrectness
replicaConfig.Integrity.CorruptionHandleLevel = integrity.CorruptionHandleLevelError

helper := NewSchemaTestHelperWithReplicaConfig(t, replicaConfig)
defer helper.Close()

helper.Tk().MustExec("set global tidb_enable_row_level_checksum = 1")
helper.Tk().MustExec("use test")

helper.Tk().MustExec("set global time_zone = '-5:00'")
_ = helper.DDL2Event(`CREATE table TBL2 (a int primary key, b TIMESTAMP)`)
event := helper.DML2Event(`INSERT INTO TBL2 VALUES (1, '2023-02-09 13:00:00')`, "test", "TBL2")
require.NotNil(t, event)

_ = helper.DDL2Event("create table t (a timestamp primary key, b int)")
event = helper.DML2Event("insert into t values ('2023-02-09 13:00:00', 1)", "test", "t")
require.NotNil(t, event)
}

func TestDecodeRowEnableChecksum(t *testing.T) {
helper := NewSchemaTestHelper(t)
defer helper.Close()
Expand Down
30 changes: 19 additions & 11 deletions dm/checker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

_ "github.com/go-sql-driver/mysql" // for mysql
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/importer"
"github.com/pingcap/tidb/br/pkg/lightning/importer/opts"
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
Expand All @@ -50,7 +51,7 @@ import (
"github.com/pingcap/tiflow/dm/pkg/terror"
onlineddl "github.com/pingcap/tiflow/dm/syncer/online-ddl-tools"
"github.com/pingcap/tiflow/dm/unit"
pd "github.com/tikv/pd/client"
pdhttp "github.com/tikv/pd/client/http"
"go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -451,18 +452,25 @@ func (c *Checker) Init(ctx context.Context) (err error) {
return err
}

pdClient, err := pd.NewClientWithContext(
ctx, []string{lCfg.TiDB.PdAddr}, pd.SecurityOption{
CAPath: lCfg.Security.CAPath,
CertPath: lCfg.Security.CertPath,
KeyPath: lCfg.Security.KeyPath,
SSLCABytes: lCfg.Security.CABytes,
SSLCertBytes: lCfg.Security.CertBytes,
SSLKEYBytes: lCfg.Security.KeyBytes,
})
var opts []pdhttp.ClientOption
tls, err := common.NewTLS(
lCfg.Security.CAPath,
lCfg.Security.CertPath,
lCfg.Security.KeyPath,
"",
lCfg.Security.CABytes,
lCfg.Security.CertBytes,
lCfg.Security.KeyBytes,
)
if err != nil {
return err
log.L().Fatal("failed to load TLS certificates", zap.Error(err))
}
if o := tls.TLSConfig(); o != nil {
opts = append(opts, pdhttp.WithTLSConfig(o))
}
pdClient := pdhttp.NewClient(
"dm-check", []string{lCfg.TiDB.PdAddr}, opts...)

targetInfoGetter, err := importer.NewTargetInfoGetterImpl(lCfg, targetDB, pdClient)
if err != nil {
return err
Expand Down
2 changes: 2 additions & 0 deletions dm/dumpling/dumpling.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package dumpling

import (
"context"
"crypto/tls"
"fmt"
"strings"
"sync"
Expand Down Expand Up @@ -417,6 +418,7 @@ func (m *Dumpling) constructArgs(ctx context.Context) (*export.Config, error) {
// update sql_mode if needed
m.detectSQLMode(ctx, dumpConfig)
dumpConfig.ExtStorage = cfg.ExtStorage
dumpConfig.MinTLSVersion = tls.VersionTLS10

return dumpConfig, nil
}
Expand Down
2 changes: 1 addition & 1 deletion dm/loader/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ func GetLightningConfig(globalCfg *lcfg.GlobalConfig, subtaskCfg *config.SubTask
}
switch subtaskCfg.OnDuplicatePhysical {
case config.OnDuplicateManual:
cfg.TikvImporter.DuplicateResolution = lcfg.DupeResAlgRemove
cfg.TikvImporter.DuplicateResolution = lcfg.DupeResAlgReplace
cfg.App.TaskInfoSchemaName = GetTaskInfoSchemaName(subtaskCfg.MetaSchema, subtaskCfg.Name)
case config.OnDuplicateNone:
cfg.TikvImporter.DuplicateResolution = lcfg.DupeResAlgNone
Expand Down
1 change: 1 addition & 0 deletions dm/pkg/checker/binlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ func (c *MetaPositionChecker) Check(ctx context.Context) *Result {
util.WithCAContent(c.sourceCfg.Security.SSLCABytes),
util.WithCertAndKeyContent(c.sourceCfg.Security.SSLCertBytes, c.sourceCfg.Security.SSLKeyBytes),
util.WithVerifyCommonName(c.sourceCfg.Security.CertAllowedCN),
util.WithMinTLSVersion(tls.VersionTLS10),
)
if err != nil {
markCheckError(result, err)
Expand Down
16 changes: 12 additions & 4 deletions dm/pkg/checker/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,17 @@ func (c *LightningFreeSpaceChecker) Check(ctx context.Context) *Result {
markCheckError(result, err)
return result
}
clusterAvail := uint64(0)
var (
clusterAvail uint64
avail int64
)
for _, store := range storeInfo.Stores {
clusterAvail += uint64(store.Status.Available)
avail, err = units.RAMInBytes(store.Status.Available)
if err != nil {
markCheckError(result, err)
return result
}
clusterAvail += uint64(avail)
}
if clusterAvail < uint64(c.sourceDataSize) {
result.State = StateFailure
Expand All @@ -186,12 +194,12 @@ func (c *LightningFreeSpaceChecker) Check(ctx context.Context) *Result {
return result
}

replConfig, err := c.infoGetter.GetReplicationConfig(ctx)
maxReplicas, err := c.infoGetter.GetMaxReplica(ctx)
if err != nil {
markCheckError(result, err)
return result
}
safeSize := uint64(c.sourceDataSize) * replConfig.MaxReplicas * 2
safeSize := uint64(c.sourceDataSize) * maxReplicas * 2
if clusterAvail < safeSize {
result.State = StateWarning
result.Errors = append(result.Errors, &Error{
Expand Down
2 changes: 2 additions & 0 deletions dm/pkg/conn/basedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package conn

import (
"context"
"crypto/tls"
"database/sql"
"fmt"
"net"
Expand Down Expand Up @@ -104,6 +105,7 @@ func (d *DefaultDBProviderImpl) Apply(config ScopedDBConfig) (*BaseDB, error) {
util.WithCAContent(config.Security.SSLCABytes),
util.WithCertAndKeyContent(config.Security.SSLCertBytes, config.Security.SSLKeyBytes),
util.WithVerifyCommonName(config.Security.CertAllowedCN),
util.WithMinTLSVersion(tls.VersionTLS10),
)
if err != nil {
return nil, terror.ErrConnInvalidTLSConfig.Delegate(err)
Expand Down
2 changes: 1 addition & 1 deletion dm/pkg/conn/mockdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (mock *Cluster) Start() error {
mock.Server = svr
mock.Server.SetDomain(mock.Domain)
go func() {
if err1 := svr.Run(); err1 != nil {
if err1 := svr.Run(nil); err1 != nil {
panic(err1)
}
}()
Expand Down
25 changes: 20 additions & 5 deletions dm/pkg/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/store/mockstore"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/filter"
"github.com/pingcap/tidb/pkg/util/mock"
Expand Down Expand Up @@ -134,14 +135,24 @@ func (tr *Tracker) Init(

upTracker := schematracker.NewSchemaTracker(lowerCaseTableNames)
dsSession := mock.NewContext()
dsSession.GetSessionVars().StrictSQLMode = false
dsSession.SetValue(ddl.SuppressErrorTooLongKeyKey, true)
downTracker := &downstreamTracker{
downstreamConn: downstreamConn,
se: dsSession,
tableInfos: make(map[string]*DownstreamTableInfo),
}
// TODO: need to use upstream timezone to correctly check literal is in [1970, 2038]
se := executorContext{Context: mock.NewContext()}
sctx := mock.NewContext()
store, err := mockstore.NewMockStore()
if err != nil {
return err
}
sctx.Store = store
err = sctx.NewTxn(ctx)
if err != nil {
return err
}
se := executorContext{Context: sctx}
tr.Lock()
defer tr.Unlock()
tr.lowerCaseTableNames = lowerCaseTableNames
Expand Down Expand Up @@ -294,6 +305,11 @@ func (tr *Tracker) Close() {
// other components are getting/setting table info
tr.Lock()
defer tr.Unlock()
if tr.se != nil {
if store := tr.se.GetStore(); store != nil {
store.Close()
}
}
tr.closed.Store(true)
}

Expand Down Expand Up @@ -467,13 +483,12 @@ func (dt *downstreamTracker) getTableInfoByCreateStmt(tctx *tcontext.Context, ta
}

// suppress ErrTooLongKey
strictSQLModeBackup := dt.se.GetSessionVars().StrictSQLMode
dt.se.GetSessionVars().StrictSQLMode = false
dt.se.SetValue(ddl.SuppressErrorTooLongKeyKey, true)
// support drop PK
enableClusteredIndexBackup := dt.se.GetSessionVars().EnableClusteredIndex
dt.se.GetSessionVars().EnableClusteredIndex = variable.ClusteredIndexDefModeOff
defer func() {
dt.se.GetSessionVars().StrictSQLMode = strictSQLModeBackup
dt.se.ClearValue(ddl.SuppressErrorTooLongKeyKey)
dt.se.GetSessionVars().EnableClusteredIndex = enableClusteredIndexBackup
}()

Expand Down
1 change: 1 addition & 0 deletions dm/pkg/utils/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ func NewSessionCtx(vars map[string]string) sessionctx.Context {
if strings.EqualFold(k, "time_zone") {
loc, _ := ParseTimeZone(v)
variables.StmtCtx.SetTimeZone(loc)
variables.TimeZone = loc
}
}

Expand Down
1 change: 1 addition & 0 deletions dm/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -1145,6 +1145,7 @@ func (r *Relay) setSyncConfig() error {
util.WithCAContent(r.cfg.From.Security.SSLCABytes),
util.WithCertAndKeyContent(r.cfg.From.Security.SSLCertBytes, r.cfg.From.Security.SSLKeyBytes),
util.WithVerifyCommonName(r.cfg.From.Security.CertAllowedCN),
util.WithMinTLSVersion(tls.VersionTLS10),
)
if err != nil {
return terror.ErrConnInvalidTLSConfig.Delegate(err)
Expand Down
4 changes: 2 additions & 2 deletions dm/syncer/expr_filter_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ package syncer
import (
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/dbterror/plannererrors"
"github.com/pingcap/tidb/pkg/util/dbutil"
"github.com/pingcap/tidb/pkg/util/filter"
"github.com/pingcap/tiflow/dm/config"
Expand Down Expand Up @@ -201,7 +201,7 @@ func getSimpleExprOfTable(ctx sessionctx.Context, expr string, ti *model.TableIn
e, err := expression.ParseSimpleExprWithTableInfo(ctx, expr, ti)
if err != nil {
// if expression contains an unknown column, we return an expression that skips nothing
if core.ErrUnknownColumn.Equal(err) {
if plannererrors.ErrUnknownColumn.Equal(err) {
logger.Warn("meet unknown column when generating expression, return a FALSE expression instead",
zap.String("expression", expr),
zap.Error(err))
Expand Down
1 change: 1 addition & 0 deletions dm/syncer/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func subtaskCfg2BinlogSyncerCfg(cfg *config.SubTaskConfig, timezone *time.Locati
util.WithCAContent(cfg.From.Security.SSLCABytes),
util.WithCertAndKeyContent(cfg.From.Security.SSLCertBytes, cfg.From.Security.SSLKeyBytes),
util.WithVerifyCommonName(cfg.From.Security.CertAllowedCN),
util.WithMinTLSVersion(tls.VersionTLS10),
)
if err != nil {
return replication.BinlogSyncerConfig{}, terror.ErrConnInvalidTLSConfig.Delegate(err)
Expand Down
3 changes: 3 additions & 0 deletions engine/jobmaster/dm/checkpoint/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,9 @@ func TestFetchTableStmt(t *testing.T) {
p := parser.New()
tracker, err := schema.NewTestTracker(context.Background(), "test-tracker", nil, dlog.L())
require.NoError(t, err)
defer func() {
tracker.Close()
}()
stmt := "CREATE DATABASE `db`"
ret, err := p.ParseOneStmt(stmt, "", "")
require.NoError(t, err)
Expand Down
4 changes: 3 additions & 1 deletion engine/pkg/meta/model/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package model

import (
"crypto/tls"
"strings"

validation "github.com/go-ozzo/ozzo-validation/v4"
Expand Down Expand Up @@ -127,7 +128,8 @@ func GenerateDSNByParams(storeConf *StoreConfig, pairs map[string]string) (strin
cfg, err := util.NewTLSConfig(
util.WithCAPath(storeConf.Security.CAPath),
util.WithCertAndKeyPath(storeConf.Security.CertPath, storeConf.Security.KeyPath),
util.WithVerifyCommonName(storeConf.Security.CertAllowedCN))
util.WithVerifyCommonName(storeConf.Security.CertAllowedCN),
util.WithMinTLSVersion(tls.VersionTLS10))
if err != nil {
return "", errors.ErrMetaParamsInvalid.Wrap(err)
}
Expand Down
Loading

0 comments on commit 71b5a0a

Please sign in to comment.