Skip to content

Commit

Permalink
sink(ticdc): use pd clock in storage sink (pingcap#10351)
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Feb 7, 2024
1 parent 3fcb14a commit 450c156
Show file tree
Hide file tree
Showing 19 changed files with 123 additions and 55 deletions.
5 changes: 3 additions & 2 deletions cdc/api/v1/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ func verifyCreateChangefeedConfig(
}
if err := validator.Validate(ctx,
model.ChangeFeedID{Namespace: changefeedConfig.Namespace, ID: changefeedConfig.ID},
info.SinkURI, info.Config); err != nil {
info.SinkURI, info.Config, up.PDClock,
); err != nil {
return nil, err
}

Expand Down Expand Up @@ -233,7 +234,7 @@ func VerifyUpdateChangefeedConfig(ctx context.Context,

if err := validator.Validate(ctx,
model.ChangeFeedID{Namespace: changefeedConfig.Namespace, ID: changefeedConfig.ID},
newInfo.SinkURI, newInfo.Config); err != nil {
newInfo.SinkURI, newInfo.Config, nil); err != nil {
return nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err)
}
}
Expand Down
5 changes: 3 additions & 2 deletions cdc/api/v2/api_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,8 @@ func (APIV2HelpersImpl) verifyCreateChangefeedConfig(
// verify sink
if err := validator.Validate(ctx,
model.ChangeFeedID{Namespace: cfg.Namespace, ID: cfg.ID},
cfg.SinkURI, replicaCfg); err != nil {
cfg.SinkURI, replicaCfg, nil,
); err != nil {
return nil, err
}

Expand Down Expand Up @@ -361,7 +362,7 @@ func (APIV2HelpersImpl) verifyUpdateChangefeedConfig(

if err := validator.Validate(ctx,
model.ChangeFeedID{Namespace: cfg.Namespace, ID: cfg.ID},
newInfo.SinkURI, newInfo.Config); err != nil {
newInfo.SinkURI, newInfo.Config, nil); err != nil {
return nil, nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func (m *SinkManager) initSinkFactory() (chan error, uint64) {
return m.sinkFactory.errors, m.sinkFactory.version
}

m.sinkFactory.f, err = factory.New(m.managerCtx, m.changefeedID, uri, cfg, m.sinkFactory.errors)
m.sinkFactory.f, err = factory.New(m.managerCtx, m.changefeedID, uri, cfg, m.sinkFactory.errors, m.up.PDClock)
if err != nil {
emitError(err)
return m.sinkFactory.errors, m.sinkFactory.version
Expand Down
6 changes: 3 additions & 3 deletions cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ import (
"github.com/pingcap/tiflow/cdc/sink/metrics"
"github.com/pingcap/tiflow/cdc/sink/tablesink/state"
"github.com/pingcap/tiflow/cdc/sink/util"
"github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/pkg/chann"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/sink/cloudstorage"
"github.com/pingcap/tiflow/pkg/sink/codec/builder"
Expand Down Expand Up @@ -94,6 +94,7 @@ type DMLSink struct {
// NewDMLSink creates a cloud storage sink.
func NewDMLSink(ctx context.Context,
changefeedID model.ChangeFeedID,
pdClock pdutil.Clock,
sinkURI *url.URL,
replicaConfig *config.ReplicaConfig,
errCh chan error,
Expand Down Expand Up @@ -155,11 +156,10 @@ func NewDMLSink(ctx context.Context,
// create defragmenter.
s.defragmenter = newDefragmenter(encodedCh, workerChannels)
// create a group of dml workers.
clock := clock.New()
for i := 0; i < cfg.WorkerCount; i++ {
inputCh := chann.NewAutoDrainChann[eventFragment]()
s.workers[i] = newDMLWorker(i, s.changefeedID, storage, cfg, ext,
inputCh, clock, s.statistics)
inputCh, pdClock, s.statistics)
workerChannels[i] = inputCh
}

Expand Down
21 changes: 13 additions & 8 deletions cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,14 @@ import (
"github.com/pingcap/tiflow/cdc/sink/tablesink/state"
"github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/util"
"github.com/stretchr/testify/require"
)

func setClock(s *DMLSink, clock clock.Clock) {
for _, w := range s.workers {
w.filePathGenerator.SetClock(clock)
w.filePathGenerator.SetClock(pdutil.NewMonotonicClock(clock))
}
}

Expand Down Expand Up @@ -129,6 +130,7 @@ func TestCloudStorageWriteEventsWithoutDateSeparator(t *testing.T) {
errCh := make(chan error, 5)
s, err := NewDMLSink(ctx,
model.DefaultChangeFeedID("test"),
pdutil.NewMonotonicClock(clock.New()),
sinkURI, replicaConfig, errCh)
require.Nil(t, err)
var cnt uint64 = 0
Expand Down Expand Up @@ -197,11 +199,12 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) {
replicaConfig.Sink.FileIndexWidth = util.AddressOf(6)

errCh := make(chan error, 5)
mockClock := clock.NewMock()
s, err := NewDMLSink(ctx,
model.DefaultChangeFeedID("test"), sinkURI, replicaConfig, errCh)
model.DefaultChangeFeedID("test"),
pdutil.NewMonotonicClock(mockClock),
sinkURI, replicaConfig, errCh)
require.Nil(t, err)
mockClock := clock.NewMock()
setClock(s, mockClock)

var cnt uint64 = 0
batch := 100
Expand Down Expand Up @@ -272,12 +275,14 @@ func TestCloudStorageWriteEventsWithDateSeparator(t *testing.T) {
// test table is scheduled from one node to another
cnt = 0
ctx, cancel = context.WithCancel(context.Background())
s, err = NewDMLSink(ctx,
model.DefaultChangeFeedID("test"), sinkURI, replicaConfig, errCh)
require.Nil(t, err)

mockClock = clock.NewMock()
mockClock.Set(time.Date(2023, 3, 9, 0, 1, 10, 0, time.UTC))
setClock(s, mockClock)
s, err = NewDMLSink(ctx,
model.DefaultChangeFeedID("test"),
pdutil.NewMonotonicClock(mockClock),
sinkURI, replicaConfig, errCh)
require.Nil(t, err)

err = s.WriteEvents(txns...)
require.Nil(t, err)
Expand Down
8 changes: 4 additions & 4 deletions cdc/sink/dmlsink/cloudstorage/dml_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/metrics"
mcloudstorage "github.com/pingcap/tiflow/cdc/sink/metrics/cloudstorage"
"github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/pkg/chann"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/sink/cloudstorage"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -103,7 +103,7 @@ func newDMLWorker(
config *cloudstorage.Config,
extension string,
inputCh *chann.DrainableChann[eventFragment],
clock clock.Clock,
pdClock pdutil.Clock,
statistics *metrics.Statistics,
) *dmlWorker {
d := &dmlWorker{
Expand All @@ -114,7 +114,7 @@ func newDMLWorker(
inputCh: inputCh,
flushNotifyCh: make(chan dmlTask, 64),
statistics: statistics,
filePathGenerator: cloudstorage.NewFilePathGenerator(config, storage, extension, clock),
filePathGenerator: cloudstorage.NewFilePathGenerator(changefeedID, config, storage, extension, pdClock),
metricWriteBytes: mcloudstorage.CloudStorageWriteBytesGauge.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricFileCount: mcloudstorage.CloudStorageFileCountGauge.
Expand Down
4 changes: 3 additions & 1 deletion cdc/sink/dmlsink/cloudstorage/dml_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/pkg/chann"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/sink/cloudstorage"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
Expand All @@ -52,8 +53,9 @@ func testDMLWorker(ctx context.Context, t *testing.T, dir string) *dmlWorker {

statistics := metrics.NewStatistics(ctx, model.DefaultChangeFeedID("dml-worker-test"),
sink.TxnSink)
pdlock := pdutil.NewMonotonicClock(clock.New())
d := newDMLWorker(1, model.DefaultChangeFeedID("dml-worker-test"), storage,
cfg, ".json", chann.NewAutoDrainChann[eventFragment](), clock.New(), statistics)
cfg, ".json", chann.NewAutoDrainChann[eventFragment](), pdlock, statistics)
return d
}

Expand Down
4 changes: 3 additions & 1 deletion cdc/sink/dmlsink/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tiflow/cdc/sink/tablesink"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/sink/kafka"
v2 "github.com/pingcap/tiflow/pkg/sink/kafka/v2"
Expand Down Expand Up @@ -70,6 +71,7 @@ func New(
sinkURIStr string,
cfg *config.ReplicaConfig,
errCh chan error,
pdClock pdutil.Clock,
) (*SinkFactory, error) {
sinkURI, err := url.Parse(sinkURIStr)
if err != nil {
Expand Down Expand Up @@ -100,7 +102,7 @@ func New(
s.txnSink = mqs
s.category = CategoryMQ
case sink.S3Scheme, sink.FileScheme, sink.GCSScheme, sink.GSScheme, sink.AzblobScheme, sink.AzureScheme, sink.CloudStorageNoopScheme:
storageSink, err := cloudstorage.NewDMLSink(ctx, changefeedID, sinkURI, cfg, errCh)
storageSink, err := cloudstorage.NewDMLSink(ctx, changefeedID, pdClock, sinkURI, cfg, errCh)
if err != nil {
return nil, err
}
Expand Down
4 changes: 3 additions & 1 deletion cdc/sink/validator/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/tiflow/cdc/sink/dmlsink/factory"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/sink"
pmysql "github.com/pingcap/tiflow/pkg/sink/mysql"
"github.com/pingcap/tiflow/pkg/util"
Expand All @@ -32,6 +33,7 @@ import (
func Validate(ctx context.Context,
changefeedID model.ChangeFeedID,
sinkURI string, cfg *config.ReplicaConfig,
pdClock pdutil.Clock,
) error {
uri, err := preCheckSinkURI(sinkURI)
if err != nil {
Expand All @@ -50,7 +52,7 @@ func Validate(ctx context.Context,
}

ctx, cancel := context.WithCancel(ctx)
s, err := factory.New(ctx, changefeedID, sinkURI, cfg, make(chan error))
s, err := factory.New(ctx, changefeedID, sinkURI, cfg, make(chan error), pdClock)
if err != nil {
cancel()
return err
Expand Down
8 changes: 4 additions & 4 deletions cdc/sink/validator/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,26 +101,26 @@ func TestValidateSink(t *testing.T) {

// test sink uri error
sinkURI := "mysql://root:111@127.0.0.1:3306/"
err := Validate(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicateConfig)
err := Validate(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicateConfig, nil)
require.NotNil(t, err)
require.Contains(t, err.Error(), "fail to open MySQL connection")

// test sink uri right
sinkURI = "blackhole://"
err = Validate(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicateConfig)
err = Validate(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicateConfig, nil)
require.Nil(t, err)

// test bdr mode error
replicateConfig.BDRMode = util.AddressOf(true)
sinkURI = "blackhole://"
err = Validate(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicateConfig)
err = Validate(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicateConfig, nil)
require.NotNil(t, err)
require.Contains(t, err.Error(), "sink uri scheme is not supported in BDR mode")

// test sink-scheme/syncpoint error
replicateConfig.EnableSyncPoint = util.AddressOf(true)
sinkURI = "kafka://"
err = Validate(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicateConfig)
err = Validate(ctx, model.DefaultChangeFeedID("test"), sinkURI, replicateConfig, nil)
require.NotNil(t, err)
require.Contains(
t, err.Error(),
Expand Down
2 changes: 1 addition & 1 deletion cmd/kafka-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ func NewConsumer(ctx context.Context, o *consumerOption) (*Consumer, error) {
}

changefeedID := model.DefaultChangeFeedID("kafka-consumer")
f, err := eventsinkfactory.New(ctx, changefeedID, o.downstreamURI, config.GetDefaultReplicaConfig(), errChan)
f, err := eventsinkfactory.New(ctx, changefeedID, o.downstreamURI, config.GetDefaultReplicaConfig(), errChan, nil)
if err != nil {
cancel()
return nil, errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/pulsar-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ func NewConsumer(ctx context.Context, o *ConsumerOption) (*Consumer, error) {
}

changefeedID := model.DefaultChangeFeedID("pulsar-consumer")
f, err := eventsinkfactory.New(ctx, changefeedID, o.downstreamURI, config.GetDefaultReplicaConfig(), errChan)
f, err := eventsinkfactory.New(ctx, changefeedID, o.downstreamURI, config.GetDefaultReplicaConfig(), errChan, nil)
if err != nil {
cancel()
return nil, errors.Trace(err)
Expand Down
1 change: 1 addition & 0 deletions cmd/storage-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ func newConsumer(ctx context.Context) (*consumer, error) {
downstreamURIStr,
config.GetDefaultReplicaConfig(),
errCh,
nil,
)
if err != nil {
log.Error("failed to create event sink factory", zap.Error(err))
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,11 @@ error = '''
incompatible configuration in sink uri(%s) and config file(%s), please try to update the configuration only through sink uri
'''

["CDC:ErrInternalCheckFailed"]
error = '''
internal check failed, %s
'''

["CDC:ErrInternalServerError"]
error = '''
internal server error
Expand Down
2 changes: 1 addition & 1 deletion pkg/applier/redo.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (ra *RedoApplier) catchError(ctx context.Context) error {

func (ra *RedoApplier) initSink(ctx context.Context) (err error) {
replicaConfig := config.GetDefaultReplicaConfig()
ra.sinkFactory, err = dmlfactory.New(ctx, ra.changefeedID, ra.cfg.SinkURI, replicaConfig, ra.errCh)
ra.sinkFactory, err = dmlfactory.New(ctx, ra.changefeedID, ra.cfg.SinkURI, replicaConfig, ra.errCh, nil)
if err != nil {
return err
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/errors/cdc_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,10 @@ var (
"invalid replica config, %s",
errors.RFCCodeText("CDC:ErrInvalidReplicaConfig"),
)
ErrInternalCheckFailed = errors.Normalize(
"internal check failed, %s",
errors.RFCCodeText("CDC:ErrInternalCheckFailed"),
)

ErrHandleDDLFailed = errors.Normalize(
"handle ddl failed, job: %s, query: %s, startTs: %d. "+
Expand Down
22 changes: 22 additions & 0 deletions pkg/pdutil/clock.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/log"
pclock "github.com/pingcap/tiflow/engine/pkg/clock"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
Expand Down Expand Up @@ -136,3 +137,24 @@ func (c *clock4Test) Run(ctx context.Context) {

func (c *clock4Test) Stop() {
}

type monotonicClock struct {
clock pclock.Clock
}

// NewMonotonicClock return a new monotonic clock.
func NewMonotonicClock(pClock pclock.Clock) Clock {
return &monotonicClock{
clock: pClock,
}
}

func (c *monotonicClock) CurrentTime() time.Time {
return c.clock.Now()
}

func (c *monotonicClock) Run(ctx context.Context) {
}

func (c *monotonicClock) Stop() {
}
Loading

0 comments on commit 450c156

Please sign in to comment.