Skip to content

Commit

Permalink
cdc: Add Debezium protocol (#10197)
Browse files Browse the repository at this point in the history
ref #1799
  • Loading branch information
breezewish authored Dec 13, 2023
1 parent a3f2fe2 commit 7d2acf2
Show file tree
Hide file tree
Showing 15 changed files with 1,796 additions and 4 deletions.
11 changes: 10 additions & 1 deletion cdc/sink/util/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
"github.com/pingcap/tiflow/pkg/sink/kafka"
"github.com/pingcap/tiflow/pkg/util"
)

// GetTopic returns the topic name from the sink URI.
Expand Down Expand Up @@ -79,7 +80,15 @@ func GetEncoderConfig(
// Always set encoder's `MaxMessageBytes` equal to producer's `MaxMessageBytes`
// to prevent that the encoder generate batched message too large
// then cause producer meet `message too large`.
encoderConfig = encoderConfig.WithMaxMessageBytes(maxMsgBytes).WithChangefeedID(changefeedID)
encoderConfig = encoderConfig.
WithMaxMessageBytes(maxMsgBytes).
WithChangefeedID(changefeedID)

tz, err := util.GetTimezone(config.GetGlobalServerConfig().TZ)
if err != nil {
return nil, cerror.WrapError(cerror.ErrSinkInvalidConfig, err)
}
encoderConfig.TimeZone = tz

if err := encoderConfig.Validate(); err != nil {
return nil, cerror.WrapError(cerror.ErrSinkInvalidConfig, err)
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,11 @@ error = '''
unflatten datume data
'''

["CDC:ErrDebeziumEncodeFailed"]
error = '''
debezium encode failed
'''

["CDC:ErrDecodeFailed"]
error = '''
decode failed: %s
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ require (
github.com/golang/mock v1.6.0
github.com/golang/protobuf v1.5.3
github.com/google/btree v1.1.2
github.com/google/go-cmp v0.5.9
github.com/google/go-cmp v0.6.0
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510
github.com/google/uuid v1.3.0
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -589,8 +589,9 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/gofuzz v1.1.0 h1:Hsa8mG0dQ46ij8Sl2AYJDUv1oA9/d6Vk+3LG99Oe02g=
Expand Down
5 changes: 5 additions & 0 deletions pkg/config/sink_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const (
ProtocolCraft
ProtocolOpen
ProtocolCsv
ProtocolDebezium
ProtocolSimple
)

Expand Down Expand Up @@ -67,6 +68,8 @@ func ParseSinkProtocolFromString(protocol string) (Protocol, error) {
return ProtocolOpen, nil
case "csv":
return ProtocolCsv, nil
case "debezium":
return ProtocolDebezium, nil
case "simple":
return ProtocolSimple, nil
default:
Expand All @@ -93,6 +96,8 @@ func (p Protocol) String() string {
return "open-protocol"
case ProtocolCsv:
return "csv"
case ProtocolDebezium:
return "debezium"
case ProtocolSimple:
return "simple"
default:
Expand Down
4 changes: 4 additions & 0 deletions pkg/errors/cdc_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,10 @@ var (
"csv decode failed",
errors.RFCCodeText("CDC:ErrCSVDecodeFailed"),
)
ErrDebeziumEncodeFailed = errors.Normalize(
"debezium encode failed",
errors.RFCCodeText("CDC:ErrDebeziumEncodeFailed"),
)
ErrStorageSinkInvalidConfig = errors.Normalize(
"storage sink config invalid",
errors.RFCCodeText("CDC:ErrStorageSinkInvalidConfig"),
Expand Down
6 changes: 5 additions & 1 deletion pkg/sink/codec/builder/encoder_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,16 @@ import (
"github.com/pingcap/tiflow/pkg/sink/codec/common"
"github.com/pingcap/tiflow/pkg/sink/codec/craft"
"github.com/pingcap/tiflow/pkg/sink/codec/csv"
"github.com/pingcap/tiflow/pkg/sink/codec/debezium"
"github.com/pingcap/tiflow/pkg/sink/codec/maxwell"
"github.com/pingcap/tiflow/pkg/sink/codec/open"
"github.com/pingcap/tiflow/pkg/sink/codec/simple"
)

// NewRowEventEncoderBuilder returns an RowEventEncoderBuilder
func NewRowEventEncoderBuilder(
ctx context.Context, cfg *common.Config,
ctx context.Context,
cfg *common.Config,
) (codec.RowEventEncoderBuilder, error) {
switch cfg.Protocol {
case config.ProtocolDefault, config.ProtocolOpen:
Expand All @@ -46,6 +48,8 @@ func NewRowEventEncoderBuilder(
return canal.NewJSONRowEventEncoderBuilder(ctx, cfg)
case config.ProtocolCraft:
return craft.NewBatchEncoderBuilder(cfg), nil
case config.ProtocolDebezium:
return debezium.NewBatchEncoderBuilder(cfg), nil
case config.ProtocolSimple:
return simple.NewBuilder(cfg), nil
default:
Expand Down
6 changes: 6 additions & 0 deletions pkg/sink/codec/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package common
import (
"net/http"
"net/url"
"time"

"github.com/gin-gonic/gin/binding"
"github.com/imdario/mergo"
Expand Down Expand Up @@ -74,6 +75,9 @@ type Config struct {

// for open protocol
OnlyOutputUpdatedColumns bool

// Currently only Debezium protocol is aware of the time zone
TimeZone *time.Location
}

// NewConfig return a Config for codec
Expand All @@ -95,6 +99,8 @@ func NewConfig(protocol config.Protocol) *Config {
OnlyOutputUpdatedColumns: false,
DeleteOnlyHandleKeyColumns: false,
LargeMessageHandle: config.NewDefaultLargeMessageHandleConfig(),

TimeZone: time.Local,
}
}

Expand Down
Loading

0 comments on commit 7d2acf2

Please sign in to comment.