Skip to content

Commit

Permalink
config(ticdc): resolve conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
Rustin170506 committed Dec 24, 2021
1 parent 15e2f7d commit 63fe120
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 5 deletions.
18 changes: 18 additions & 0 deletions cdc/sink/codec/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,24 @@ func (p *Protocol) FromString(protocol string) {
}
}

// String converts the Protocol enum type string to string.
func (p Protocol) String() string {
switch p {
case ProtocolDefault:
return "default"
case ProtocolCanal:
return "canal"
case ProtocolAvro:
return "avro"
case ProtocolMaxwell:
return "maxwell"
case ProtocolCanalJSON:
return "canal-json"
default:
panic("unreachable")
}
}

// NewEventBatchEncoder returns a function of creating an EventBatchEncoder by protocol.
func NewEventBatchEncoder(p Protocol) func() EventBatchEncoder {
switch p {
Expand Down
8 changes: 5 additions & 3 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package sink

import (
"context"
"fmt"
"net/url"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -107,9 +108,10 @@ func newMqSink(
avroEncoder.SetTimeZone(util.TimezoneFromCtx(ctx))
return avroEncoder
}
} else if (protocol == codec.ProtocolCanal || protocol == codec.ProtocolCanalJSON) && !config.EnableOldValue {
log.Error("Old value is not enabled when using Canal protocol. Please update changefeed config")
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, errors.New("Canal requires old value to be enabled"))
} else if (protocol == codec.ProtocolCanal || protocol == codec.ProtocolCanalJSON || protocol == codec.ProtocolMaxwell) && !config.EnableOldValue {
log.Error(fmt.Sprintf("Old value is not enabled when using `%s` protocol. "+
"Please update changefeed config", protocol.String()))
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, errors.New(fmt.Sprintf("%s protocol requires old value to be enabled", protocol.String())))
}

// pre-flight verification of encoder parameters
Expand Down
8 changes: 6 additions & 2 deletions cmd/client_changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (

var forceEnableOldValueProtocols = []string{
"canal",
"canal-json",
"maxwell",
}

Expand Down Expand Up @@ -305,9 +306,12 @@ func verifyChangefeedParameters(ctx context.Context, cmd *cobra.Command, isCreat
}

protocol := sinkURIParsed.Query().Get("protocol")
if protocol != "" {
cfg.Sink.Protocol = protocol
}
for _, fp := range forceEnableOldValueProtocols {
if protocol == fp {
log.Warn("Attempting to replicate without old value enabled. CDC will enable old value and continue.", zap.String("protocol", protocol))
if cfg.Sink.Protocol == fp {
log.Warn("Attempting to replicate without old value enabled. CDC will enable old value and continue.", zap.String("protocol", cfg.Sink.Protocol))
cfg.EnableOldValue = true
break
}
Expand Down

0 comments on commit 63fe120

Please sign in to comment.