Skip to content

Commit

Permalink
sink(ticdc): use pd clock in storage sink (#10351) (#10520)
Browse files Browse the repository at this point in the history
close #10352, ref #10374
  • Loading branch information
ti-chi-bot authored Feb 26, 2024
1 parent 55cc017 commit 223283c
Show file tree
Hide file tree
Showing 18 changed files with 120 additions and 53 deletions.
4 changes: 2 additions & 2 deletions cdc/api/v1/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func verifyCreateChangefeedConfig(
return nil, cerror.ErrAPIInvalidParam.Wrap(errors.Annotatef(err, "invalid timezone:%s", changefeedConfig.TimeZone))
}
ctx = contextutil.PutTimezoneInCtx(ctx, tz)
if err := validator.Validate(ctx, info.SinkURI, info.Config); err != nil {
if err := validator.Validate(ctx, info.SinkURI, info.Config, up.PDClock); err != nil {
return nil, err
}

Expand Down Expand Up @@ -238,7 +238,7 @@ func VerifyUpdateChangefeedConfig(ctx context.Context,
return nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err)
}

if err := validator.Validate(ctx, newInfo.SinkURI, newInfo.Config); err != nil {
if err := validator.Validate(ctx, newInfo.SinkURI, newInfo.Config, nil); err != nil {
return nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err)
}
}
Expand Down
4 changes: 2 additions & 2 deletions cdc/api/v2/api_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (APIV2HelpersImpl) verifyCreateChangefeedConfig(
}

// verify sink
if err := validator.Validate(ctx, cfg.SinkURI, replicaCfg); err != nil {
if err := validator.Validate(ctx, cfg.SinkURI, replicaCfg, nil); err != nil {
return nil, err
}

Expand Down Expand Up @@ -352,7 +352,7 @@ func (APIV2HelpersImpl) verifyUpdateChangefeedConfig(
return nil, nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err)
}

if err := validator.Validate(ctx, newInfo.SinkURI, newInfo.Config); err != nil {
if err := validator.Validate(ctx, newInfo.SinkURI, newInfo.Config, nil); err != nil {
return nil, nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func (m *SinkManager) initSinkFactory() (chan error, uint64) {
return m.sinkFactory.errors, m.sinkFactory.version
}

m.sinkFactory.f, err = factory.New(m.managerCtx, uri, cfg, m.sinkFactory.errors)
m.sinkFactory.f, err = factory.New(m.managerCtx, uri, cfg, m.sinkFactory.errors, m.up.PDClock)
if err != nil {
emitError(err)
return m.sinkFactory.errors, m.sinkFactory.version
Expand Down
6 changes: 3 additions & 3 deletions cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ import (
"github.com/pingcap/tiflow/cdc/sink/metrics"
"github.com/pingcap/tiflow/cdc/sink/tablesink/state"
"github.com/pingcap/tiflow/cdc/sink/util"
"github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/pkg/chann"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/sink/cloudstorage"
"github.com/pingcap/tiflow/pkg/sink/codec/builder"
Expand Down Expand Up @@ -94,6 +94,7 @@ type DMLSink struct {

// NewDMLSink creates a cloud storage sink.
func NewDMLSink(ctx context.Context,
pdClock pdutil.Clock,
sinkURI *url.URL,
replicaConfig *config.ReplicaConfig,
errCh chan error,
Expand Down Expand Up @@ -153,11 +154,10 @@ func NewDMLSink(ctx context.Context,
// create defragmenter.
s.defragmenter = newDefragmenter(encodedCh, workerChannels)
// create a group of dml workers.
clock := clock.New()
for i := 0; i < cfg.WorkerCount; i++ {
inputCh := chann.NewAutoDrainChann[eventFragment]()
s.workers[i] = newDMLWorker(i, s.changefeedID, storage, cfg, ext,
inputCh, clock, s.statistics)
inputCh, pdClock, s.statistics)
workerChannels[i] = inputCh
}

Expand Down
19 changes: 11 additions & 8 deletions cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@ import (
"github.com/pingcap/tiflow/cdc/sink/tablesink/state"
"github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/stretchr/testify/require"
)

func setClock(s *DMLSink, clock clock.Clock) {
for _, w := range s.workers {
w.filePathGenerator.SetClock(clock)
w.filePathGenerator.SetClock(pdutil.NewMonotonicClock(clock))
}
}

Expand Down Expand Up @@ -126,7 +127,7 @@ func TestCloudStorageWriteEventsWithoutDateSeparator(t *testing.T) {
replicaConfig.Sink.Protocol = config.ProtocolCsv.String()
replicaConfig.Sink.FileIndexWidth = 6
errCh := make(chan error, 5)
s, err := NewDMLSink(ctx, sinkURI, replicaConfig, errCh)
s, err := NewDMLSink(ctx, pdutil.NewMonotonicClock(clock.New()), sinkURI, replicaConfig, errCh)
require.Nil(t, err)
var cnt uint64 = 0
batch := 100
Expand Down Expand Up @@ -194,10 +195,11 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) {
replicaConfig.Sink.FileIndexWidth = 6

errCh := make(chan error, 5)
s, err := NewDMLSink(ctx, sinkURI, replicaConfig, errCh)
require.Nil(t, err)
mockClock := clock.NewMock()
setClock(s, mockClock)
s, err := NewDMLSink(ctx,
pdutil.NewMonotonicClock(mockClock),
sinkURI, replicaConfig, errCh)
require.Nil(t, err)

var cnt uint64 = 0
batch := 100
Expand Down Expand Up @@ -268,11 +270,12 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) {
// test table is scheduled from one node to another
cnt = 0
ctx, cancel = context.WithCancel(context.Background())
s, err = NewDMLSink(ctx, sinkURI, replicaConfig, errCh)
require.Nil(t, err)
mockClock = clock.NewMock()
mockClock.Set(time.Date(2023, 3, 9, 0, 1, 10, 0, time.UTC))
setClock(s, mockClock)
s, err = NewDMLSink(ctx,
pdutil.NewMonotonicClock(mockClock),
sinkURI, replicaConfig, errCh)
require.Nil(t, err)

err = s.WriteEvents(txns...)
require.Nil(t, err)
Expand Down
6 changes: 3 additions & 3 deletions cdc/sink/dmlsink/cloudstorage/dml_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/metrics"
mcloudstorage "github.com/pingcap/tiflow/cdc/sink/metrics/cloudstorage"
"github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/pkg/chann"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/sink/cloudstorage"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -103,7 +103,7 @@ func newDMLWorker(
config *cloudstorage.Config,
extension string,
inputCh *chann.DrainableChann[eventFragment],
clock clock.Clock,
pdClock pdutil.Clock,
statistics *metrics.Statistics,
) *dmlWorker {
d := &dmlWorker{
Expand All @@ -114,7 +114,7 @@ func newDMLWorker(
inputCh: inputCh,
flushNotifyCh: make(chan dmlTask, 64),
statistics: statistics,
filePathGenerator: cloudstorage.NewFilePathGenerator(config, storage, extension, clock),
filePathGenerator: cloudstorage.NewFilePathGenerator(changefeedID, config, storage, extension, pdClock),
metricWriteBytes: mcloudstorage.CloudStorageWriteBytesGauge.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricFileCount: mcloudstorage.CloudStorageFileCountGauge.
Expand Down
4 changes: 3 additions & 1 deletion cdc/sink/dmlsink/cloudstorage/dml_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/pkg/chann"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/sink/cloudstorage"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
Expand All @@ -51,8 +52,9 @@ func testDMLWorker(ctx context.Context, t *testing.T, dir string) *dmlWorker {
require.Nil(t, err)

statistics := metrics.NewStatistics(ctx, sink.TxnSink)
pdlock := pdutil.NewMonotonicClock(clock.New())
d := newDMLWorker(1, model.DefaultChangeFeedID("dml-worker-test"), storage,
cfg, ".json", chann.NewAutoDrainChann[eventFragment](), clock.New(), statistics)
cfg, ".json", chann.NewAutoDrainChann[eventFragment](), pdlock, statistics)
return d
}

Expand Down
4 changes: 3 additions & 1 deletion cdc/sink/dmlsink/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tiflow/cdc/sink/tablesink"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/sink/kafka"
v2 "github.com/pingcap/tiflow/pkg/sink/kafka/v2"
Expand Down Expand Up @@ -67,6 +68,7 @@ func New(
sinkURIStr string,
cfg *config.ReplicaConfig,
errCh chan error,
pdClock pdutil.Clock,
) (*SinkFactory, error) {
sinkURI, err := url.Parse(sinkURIStr)
if err != nil {
Expand Down Expand Up @@ -96,7 +98,7 @@ func New(
s.txnSink = mqs
s.category = CategoryMQ
case sink.S3Scheme, sink.FileScheme, sink.GCSScheme, sink.GSScheme, sink.AzblobScheme, sink.AzureScheme, sink.CloudStorageNoopScheme:
storageSink, err := cloudstorage.NewDMLSink(ctx, sinkURI, cfg, errCh)
storageSink, err := cloudstorage.NewDMLSink(ctx, pdClock, sinkURI, cfg, errCh)
if err != nil {
return nil, err
}
Expand Down
8 changes: 6 additions & 2 deletions cdc/sink/validator/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tiflow/cdc/sink/dmlsink/factory"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/sink"
pmysql "github.com/pingcap/tiflow/pkg/sink/mysql"
"github.com/pingcap/tiflow/pkg/util"
Expand All @@ -30,7 +31,10 @@ import (
// Validate sink if given valid parameters.
// TODO: For now, we create a real sink instance and validate it.
// Maybe we should support the dry-run mode to validate sink.
func Validate(ctx context.Context, sinkURI string, cfg *config.ReplicaConfig) error {
func Validate(ctx context.Context,
sinkURI string, cfg *config.ReplicaConfig,
pdClock pdutil.Clock,
) error {
uri, err := preCheckSinkURI(sinkURI)
if err != nil {
return err
Expand All @@ -48,7 +52,7 @@ func Validate(ctx context.Context, sinkURI string, cfg *config.ReplicaConfig) er
}

ctx, cancel := context.WithCancel(contextutil.PutRoleInCtx(ctx, util.RoleClient))
s, err := factory.New(ctx, sinkURI, cfg, make(chan error))
s, err := factory.New(ctx, sinkURI, cfg, make(chan error), pdClock)
if err != nil {
cancel()
return err
Expand Down
8 changes: 4 additions & 4 deletions cdc/sink/validator/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,26 +99,26 @@ func TestValidateSink(t *testing.T) {

// test sink uri error
sinkURI := "mysql://root:111@127.0.0.1:3306/"
err := Validate(ctx, sinkURI, replicateConfig)
err := Validate(ctx, sinkURI, replicateConfig, nil)
require.NotNil(t, err)
require.Contains(t, err.Error(), "fail to open MySQL connection")

// test sink uri right
sinkURI = "blackhole://"
err = Validate(ctx, sinkURI, replicateConfig)
err = Validate(ctx, sinkURI, replicateConfig, nil)
require.Nil(t, err)

// test bdr mode error
replicateConfig.BDRMode = true
sinkURI = "blackhole://"
err = Validate(ctx, sinkURI, replicateConfig)
err = Validate(ctx, sinkURI, replicateConfig, nil)
require.NotNil(t, err)
require.Contains(t, err.Error(), "sink uri scheme is not supported in BDR mode")

// test sink-scheme/syncpoint error
replicateConfig.EnableSyncPoint = true
sinkURI = "kafka://"
err = Validate(ctx, sinkURI, replicateConfig)
err = Validate(ctx, sinkURI, replicateConfig, nil)
require.NotNil(t, err)
require.Contains(
t, err.Error(),
Expand Down
1 change: 1 addition & 0 deletions cmd/kafka-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ func NewConsumer(ctx context.Context) (*Consumer, error) {
downstreamURIStr,
config.GetDefaultReplicaConfig(),
errChan,
nil,
)
if err != nil {
cancel()
Expand Down
1 change: 1 addition & 0 deletions cmd/storage-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ func newConsumer(ctx context.Context) (*consumer, error) {
downstreamURIStr,
config.GetDefaultReplicaConfig(),
errCh,
nil,
)
if err != nil {
log.Error("failed to create event sink factory", zap.Error(err))
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,11 @@ error = '''
incompatible configuration in sink uri(%s) and config file(%s), please try to update the configuration only through sink uri
'''

["CDC:ErrInternalCheckFailed"]
error = '''
internal check failed, %s
'''

["CDC:ErrInternalServerError"]
error = '''
internal server error
Expand Down
2 changes: 1 addition & 1 deletion pkg/applier/redo.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (ra *RedoApplier) catchError(ctx context.Context) error {

func (ra *RedoApplier) initSink(ctx context.Context) (err error) {
replicaConfig := config.GetDefaultReplicaConfig()
ra.sinkFactory, err = dmlfactory.New(ctx, ra.cfg.SinkURI, replicaConfig, ra.errCh)
ra.sinkFactory, err = dmlfactory.New(ctx, ra.cfg.SinkURI, replicaConfig, ra.errCh, nil)
if err != nil {
return err
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/errors/cdc_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,10 @@ var (
"invalid replica config, %s",
errors.RFCCodeText("CDC:ErrInvalidReplicaConfig"),
)
ErrInternalCheckFailed = errors.Normalize(
"internal check failed, %s",
errors.RFCCodeText("CDC:ErrInternalCheckFailed"),
)

ErrHandleDDLFailed = errors.Normalize(
"handle ddl failed, job: %s, query: %s, startTs: %d. "+
Expand Down
22 changes: 22 additions & 0 deletions pkg/pdutil/clock.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/log"
pclock "github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
Expand Down Expand Up @@ -141,3 +142,24 @@ func (c *clock4Test) Run(ctx context.Context) {

func (c *clock4Test) Stop() {
}

type monotonicClock struct {
clock pclock.Clock
}

// NewMonotonicClock return a new monotonic clock.
func NewMonotonicClock(pClock pclock.Clock) Clock {
return &monotonicClock{
clock: pClock,
}
}

func (c *monotonicClock) CurrentTime() (time.Time, error) {
return c.clock.Now(), nil
}

func (c *monotonicClock) Run(ctx context.Context) {
}

func (c *monotonicClock) Stop() {
}
Loading

0 comments on commit 223283c

Please sign in to comment.