Skip to content

Commit

Permalink
use CheckCompatibilityWithSinkURI
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Dec 30, 2022
1 parent 8e6c348 commit 7c044af
Show file tree
Hide file tree
Showing 8 changed files with 367 additions and 121 deletions.
20 changes: 15 additions & 5 deletions cdc/api/v1/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,22 +205,32 @@ func VerifyUpdateChangefeedConfig(ctx context.Context,
}
}

var sinkConfigUpdated, sinkURIUpdated bool
if len(changefeedConfig.IgnoreTxnStartTs) != 0 {
newInfo.Config.Filter.IgnoreTxnStartTs = changefeedConfig.IgnoreTxnStartTs
}

if changefeedConfig.MounterWorkerNum != 0 {
newInfo.Config.Mounter.WorkerNum = changefeedConfig.MounterWorkerNum
}

if changefeedConfig.SinkConfig != nil {
sinkConfigUpdated = true
newInfo.Config.Sink = changefeedConfig.SinkConfig
}

// verify sink_uri
if changefeedConfig.SinkURI != "" {
sinkURIUpdated = true
newInfo.SinkURI = changefeedConfig.SinkURI
if err := sink.Validate(ctx, changefeedConfig.SinkURI, newInfo.Config); err != nil {
}

if sinkConfigUpdated || sinkURIUpdated {
// check sink config is compatible with sinkURI
newCfg := newInfo.Config.Sink
oldCfg := oldInfo.Config.Sink
err := newCfg.CheckCompatibilityWithSinkURI(oldCfg, newInfo.SinkURI)
if err != nil {
return nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err)
}

if err := sink.Validate(ctx, newInfo.SinkURI, newInfo.Config); err != nil {
return nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err)
}
}
Expand Down
42 changes: 26 additions & 16 deletions cdc/api/v2/api_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,12 +294,13 @@ func (APIV2HelpersImpl) verifyUpdateChangefeedConfig(
kvStorage tidbkv.Storage,
checkpointTs uint64,
) (*model.ChangeFeedInfo, *model.UpstreamInfo, error) {
// update changefeed info
newInfo, err := oldInfo.Clone()
if err != nil {
return nil, nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByArgs(err.Error())
}

// verify TargetTs
var configUpdated, sinkURIUpdated bool
if cfg.TargetTs != 0 {
if cfg.TargetTs <= newInfo.StartTs {
return nil, nil, cerror.ErrChangefeedUpdateRefused.GenWithStack(
Expand All @@ -308,43 +309,52 @@ func (APIV2HelpersImpl) verifyUpdateChangefeedConfig(
}
newInfo.TargetTs = cfg.TargetTs
}

// verify replica config
if cfg.Engine != "" {
newInfo.Engine = cfg.Engine
}
if cfg.ReplicaConfig != nil {
configUpdated = true
newInfo.Config = cfg.ReplicaConfig.ToInternalReplicaConfig()
err = newInfo.Config.ValidateAndAdjust(nil)
if err != nil {
return nil, nil, err
}
}
if cfg.SinkURI != "" {
sinkURIUpdated = true
newInfo.SinkURI = cfg.SinkURI
}

// verify changefeed info
f, err := filter.NewFilter(newInfo.Config, "")
if err != nil {
return nil, nil, cerror.ErrChangefeedUpdateRefused.
GenWithStackByArgs(errors.Cause(err).Error())
}

tableInfos, _, _, err := entry.VerifyTables(f, kvStorage, checkpointTs)
if err != nil {
return nil, nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err)
}

err = f.Verify(tableInfos)
if err != nil {
return nil, nil, cerror.ErrChangefeedUpdateRefused.
GenWithStackByArgs(errors.Cause(err).Error())
}

// verify SinkURI
if cfg.SinkURI != "" {
newInfo.SinkURI = cfg.SinkURI
if configUpdated || sinkURIUpdated {
log.Info("config or sink uri updated, check the compatibility",
zap.Bool("configUpdated", configUpdated),
zap.Bool("sinkURIUpdated", sinkURIUpdated))
// check sink config is compatible with sinkURI
newCfg := newInfo.Config.Sink
oldCfg := oldInfo.Config.Sink
err := newCfg.CheckCompatibilityWithSinkURI(oldCfg, newInfo.SinkURI)
if err != nil {
return nil, nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err)
}

if err := sink.Validate(ctx, newInfo.SinkURI, newInfo.Config); err != nil {
return nil, nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err)
}
}
if cfg.Engine != "" {
newInfo.Engine = cfg.Engine
}

// update and verify up info
newUpInfo, err := oldUpInfo.Clone()
if err != nil {
return nil, nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByArgs(err.Error())
Expand All @@ -364,9 +374,9 @@ func (APIV2HelpersImpl) verifyUpdateChangefeedConfig(
if cfg.CertAllowedCN != nil {
newUpInfo.CertAllowedCN = cfg.CertAllowedCN
}

changefeedInfoChanged := diff.Changed(oldInfo, newInfo)
upstreamInfoChanged := diff.Changed(oldUpInfo, newUpInfo)

if !changefeedInfoChanged && !upstreamInfoChanged {
return nil, nil, cerror.ErrChangefeedUpdateRefused.
GenWithStackByArgs("changefeed config is the same with the old one, do nothing")
Expand Down
1 change: 0 additions & 1 deletion cdc/api/v2/api_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ func TestVerifyUpdateChangefeedConfig(t *testing.T) {
newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo, storage, 0)
require.Nil(t, err)
// startTs can not be updated
require.Equal(t, "none", string(newCfInfo.Config.Sink.TxnAtomicity))
newCfInfo.Config.Sink.TxnAtomicity = ""
require.Equal(t, uint64(0), newCfInfo.StartTs)
require.Equal(t, uint64(10), newCfInfo.TargetTs)
Expand Down
35 changes: 17 additions & 18 deletions cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,12 +211,12 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) {
return
}

cfInfo, err := h.capture.StatusProvider().GetChangeFeedInfo(ctx, changefeedID)
oldCfInfo, err := h.capture.StatusProvider().GetChangeFeedInfo(ctx, changefeedID)
if err != nil {
_ = c.Error(err)
return
}
if cfInfo.State != model.StateStopped {
if oldCfInfo.State != model.StateStopped {
_ = c.Error(cerror.ErrChangefeedUpdateRefused.
GenWithStackByArgs("can only update changefeed config when it is stopped"))
return
Expand All @@ -232,40 +232,39 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) {
_ = c.Error(err)
return
}
cfInfo.Namespace = changefeedID.Namespace
cfInfo.ID = changefeedID.ID
upInfo, err := etcdClient.GetUpstreamInfo(ctx, cfInfo.UpstreamID,
cfInfo.Namespace)
oldCfInfo.Namespace = changefeedID.Namespace
oldCfInfo.ID = changefeedID.ID
OldUpInfo, err := etcdClient.GetUpstreamInfo(ctx, oldCfInfo.UpstreamID,
oldCfInfo.Namespace)
if err != nil {
_ = c.Error(err)
return
}

updateCfConfig := &ChangefeedConfig{}
updateCfConfig.ReplicaConfig = ToAPIReplicaConfig(cfInfo.Config)
if err = c.BindJSON(updateCfConfig); err != nil {
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
return
}

if err = h.helpers.verifyUpstream(ctx, updateCfConfig, cfInfo); err != nil {
if err = h.helpers.verifyUpstream(ctx, updateCfConfig, oldCfInfo); err != nil {
_ = c.Error(errors.Trace(err))
return
}

log.Info("Old ChangeFeed and Upstream Info",
zap.String("changefeedInfo", cfInfo.String()),
zap.Any("upstreamInfo", upInfo))
zap.String("changefeedInfo", oldCfInfo.String()),
zap.Any("upstreamInfo", OldUpInfo))

var pdAddrs []string
var credentials *security.Credential
if upInfo != nil {
pdAddrs = strings.Split(upInfo.PDEndpoints, ",")
if OldUpInfo != nil {
pdAddrs = strings.Split(OldUpInfo.PDEndpoints, ",")
credentials = &security.Credential{
CAPath: upInfo.CAPath,
CertPath: upInfo.CertPath,
KeyPath: upInfo.KeyPath,
CertAllowedCN: upInfo.CertAllowedCN,
CAPath: OldUpInfo.CAPath,
CertPath: OldUpInfo.CertPath,
KeyPath: OldUpInfo.KeyPath,
CertAllowedCN: OldUpInfo.CertAllowedCN,
}
}
if len(updateCfConfig.PDAddrs) != 0 {
Expand All @@ -277,8 +276,8 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) {
if err != nil {
_ = c.Error(errors.Trace(err))
}
newCfInfo, newUpInfo, err := h.helpers.
verifyUpdateChangefeedConfig(ctx, updateCfConfig, cfInfo, upInfo, storage, cfStatus.CheckpointTs)
newCfInfo, newUpInfo, err := h.helpers.verifyUpdateChangefeedConfig(ctx,
updateCfConfig, oldCfInfo, OldUpInfo, storage, cfStatus.CheckpointTs)
if err != nil {
_ = c.Error(errors.Trace(err))
return
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,11 @@ error = '''
illegal parameter for sorter: %s
'''

["CDC:ErrIncompatibleSinkConfig"]
error = '''
incompatible configuration in sink uri(%s) and config file(%s)
'''

["CDC:ErrIndexKeyTableNotFound"]
error = '''
table not found with index ID %d in index kv
Expand Down
Loading

0 comments on commit 7c044af

Please sign in to comment.