Skip to content

Commit

Permalink
Kafka Connect Avro Support Part 3: Basic Avro support (#753)
Browse files Browse the repository at this point in the history
* avro sink support

* fix empty output

* avro sink support

* ready for pull request

* linting

* apply suggestions from code review

* apply suggestions from code review

* fix build

* fix unit test

* fix bug

* fix bug

* remove irrelevant logging
  • Loading branch information
liuzix committed Jul 15, 2020
1 parent 3483a02 commit 583ced8
Show file tree
Hide file tree
Showing 12 changed files with 200 additions and 77 deletions.
11 changes: 0 additions & 11 deletions cdc/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,17 +632,6 @@ func (c *changeFeed) handleDDL(ctx context.Context, captures map[string]*model.C

ddlEvent := new(model.DDLEvent)
ddlEvent.FromJob(todoDDLJob)
tableInfo, ok := c.schema.GetTableByName(ddlEvent.Schema, ddlEvent.Table)
if ok {
ddlEvent.ColumnInfo = make([]*model.ColumnInfo, len(tableInfo.Columns))

for i, colInfo := range tableInfo.Columns {
ddlEvent.ColumnInfo[i] = new(model.ColumnInfo)
ddlEvent.ColumnInfo[i].FromTiColumnInfo(colInfo)
}
} else {
ddlEvent.ColumnInfo = nil
}

// Execute DDL Job asynchronously
c.ddlState = model.ChangeFeedExecDDL
Expand Down
9 changes: 5 additions & 4 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,10 +388,11 @@ func (m *mounterImpl) mountRowKVEntry(tableInfo *TableInfo, row *rowKVEntry) (*m
}

event := &model.RowChangedEvent{
StartTs: row.StartTs,
CommitTs: row.CRTs,
RowID: row.RecordID,
SchemaID: tableInfo.SchemaID,
StartTs: row.StartTs,
CommitTs: row.CRTs,
RowID: row.RecordID,
SchemaID: tableInfo.SchemaID,
TableUpdateTs: tableInfo.UpdateTS,
Table: &model.TableName{
Schema: tableInfo.TableName.Schema,
Table: tableInfo.TableName.Table,
Expand Down
46 changes: 34 additions & 12 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ type RowChangedEvent struct {

SchemaID int64 `json:"schema-id,omitempty"`

TableUpdateTs uint64 `json:"table-update-ts,omitempty"`

// if the table of this row only has one unique index(includes primary key),
// IndieMarkCol will be set to the name of the unique index
IndieMarkCol string `json:"indie-mark-col"`
Expand All @@ -100,30 +102,50 @@ func (c *ColumnInfo) FromTiColumnInfo(tiColumnInfo *model.ColumnInfo) {
c.Name = tiColumnInfo.Name.String()
}

// TableInfo is the simplified table info passed to the sink
type TableInfo struct {
// db name
Schema string
// table name
Table string
// unique identifier for the current table schema.
UpdateTs uint64
ColumnInfo []*ColumnInfo
}

// DDLEvent represents a DDL event
type DDLEvent struct {
StartTs uint64
CommitTs uint64
Schema string
SchemaID int64
Table string
ColumnInfo []*ColumnInfo
Query string
Type model.ActionType
StartTs uint64
CommitTs uint64
Schema string
Table string
TableInfo *TableInfo
Query string
Type model.ActionType
}

// FromJob fills the values of DDLEvent from DDL job
func (e *DDLEvent) FromJob(job *model.Job) {
var tableName string
if job.BinlogInfo.TableInfo != nil {
tableName = job.BinlogInfo.TableInfo.Name.O
tableName := job.BinlogInfo.TableInfo.Name.String()
tableInfo := job.BinlogInfo.TableInfo
e.TableInfo = new(TableInfo)
e.TableInfo.ColumnInfo = make([]*ColumnInfo, len(tableInfo.Columns))

for i, colInfo := range tableInfo.Columns {
e.TableInfo.ColumnInfo[i] = new(ColumnInfo)
e.TableInfo.ColumnInfo[i].FromTiColumnInfo(colInfo)
}

e.TableInfo.Schema = job.SchemaName
e.TableInfo.Table = tableName
e.TableInfo.UpdateTs = tableInfo.UpdateTS
e.Table = tableName
}
e.StartTs = job.StartTS
e.CommitTs = job.BinlogInfo.FinishedTS
e.Query = job.Query
e.Schema = job.SchemaName
e.SchemaID = job.SchemaID
e.Table = tableName
e.Type = job.Type
}

Expand Down
17 changes: 17 additions & 0 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,10 @@ func (o *Owner) newChangeFeed(
tables := make(map[model.TableID]model.TableName)
partitions := make(map[model.TableID][]int64)
orphanTables := make(map[model.TableID]model.Ts)
sinkTableInfo := make([]*model.TableInfo, len(schemaSnap.CloneTables()))
j := 0
for tid, table := range schemaSnap.CloneTables() {
j++
if filter.ShouldIgnoreTable(table.Schema, table.Table) {
continue
}
Expand Down Expand Up @@ -270,6 +273,14 @@ func (o *Owner) newChangeFeed(
orphanTables[tid] = checkpointTs
}

sinkTableInfo[j-1] = new(model.TableInfo)
sinkTableInfo[j-1].ColumnInfo = make([]*model.ColumnInfo, len(tblInfo.Cols()))

for i, colInfo := range tblInfo.Cols() {
sinkTableInfo[j-1].ColumnInfo[i] = new(model.ColumnInfo)
sinkTableInfo[j-1].ColumnInfo[i].FromTiColumnInfo(colInfo)
}

}
errCh := make(chan error, 1)

Expand All @@ -287,6 +298,12 @@ func (o *Owner) newChangeFeed(
}
cancel()
}()

err = sink.Initialize(ctx, sinkTableInfo)
if err != nil {
log.Error("error on running owner", zap.Error(err))
}

cf := &changeFeed{
info: info,
id: id,
Expand Down
5 changes: 5 additions & 0 deletions cdc/sink/black_hole.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ func (b *blackHoleSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) e
return nil
}

// Initialize is no-op for blackhole
func (b *blackHoleSink) Initialize(ctx context.Context, tableInfo []*model.TableInfo) error {
return nil
}

func (b *blackHoleSink) Close() error {
return nil
}
56 changes: 34 additions & 22 deletions cdc/sink/codec/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/binary"
"encoding/json"
"math/rand"
"strconv"
"time"

Expand Down Expand Up @@ -45,23 +46,33 @@ type avroEncodeResult struct {
registryID int
}

// NewAvroEventBatchEncoder creates an AvroEventBatchEncoder from an AvroSchemaManager
func NewAvroEventBatchEncoder(manager *AvroSchemaManager) *AvroEventBatchEncoder {
// NewAvroEventBatchEncoder creates an AvroEventBatchEncoder
func NewAvroEventBatchEncoder() EventBatchEncoder {
return &AvroEventBatchEncoder{
valueSchemaManager: manager,
valueSchemaManager: nil,
keyBuf: nil,
valueBuf: nil,
}
}

// SetValueSchemaManager sets the value schema manager for an Avro encoder
func (a *AvroEventBatchEncoder) SetValueSchemaManager(manager *AvroSchemaManager) {
a.valueSchemaManager = manager
}

// GetValueSchemaManager gets the value schema manager for an Avro encoder
func (a *AvroEventBatchEncoder) GetValueSchemaManager() *AvroSchemaManager {
return a.valueSchemaManager
}

// AppendRowChangedEvent appends a row change event to the encoder
// NOTE: the encoder can only store one RowChangedEvent!
func (a *AvroEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent) error {
if a.keyBuf != nil || a.valueBuf != nil {
return errors.New("Fatal sink bug. Batch size must be 1")
}

res, err := a.avroEncode(e.Table, e.SchemaID, e.Columns)
res, err := a.avroEncode(e.Table, e.TableUpdateTs, e.Columns)
if err != nil {
log.Warn("AppendRowChangedEvent: avro encoding failed", zap.String("table", e.Table.String()))
return errors.Annotate(err, "AppendRowChangedEvent could not encode to Avro")
Expand All @@ -88,12 +99,12 @@ func (a *AvroEventBatchEncoder) AppendResolvedEvent(ts uint64) error {

// AppendDDLEvent generates new schema and registers it to the Registry
func (a *AvroEventBatchEncoder) AppendDDLEvent(e *model.DDLEvent) error {
if e.ColumnInfo == nil {
if e.TableInfo == nil {
log.Info("AppendDDLEvent: no schema generation needed, skip")
return nil
}

schemaStr, err := columnInfoToAvroSchema(e.Table, e.ColumnInfo)
schemaStr, err := ColumnInfoToAvroSchema(e.TableInfo.Table, e.TableInfo.ColumnInfo)
if err != nil {
return errors.Annotate(err, "AppendDDLEvent failed")
}
Expand All @@ -105,8 +116,8 @@ func (a *AvroEventBatchEncoder) AppendDDLEvent(e *model.DDLEvent) error {
}

err = a.valueSchemaManager.Register(context.Background(), model.TableName{
Schema: e.Schema,
Table: e.Table,
Schema: e.TableInfo.Schema,
Table: e.TableInfo.Table,
}, avroCodec)

if err != nil {
Expand All @@ -133,8 +144,8 @@ func (a *AvroEventBatchEncoder) Size() int {
return 1
}

func (a *AvroEventBatchEncoder) avroEncode(table *model.TableName, tiSchemaID int64, cols map[string]*model.Column) (*avroEncodeResult, error) {
avroCodec, registryID, err := a.valueSchemaManager.Lookup(context.Background(), *table, tiSchemaID)
func (a *AvroEventBatchEncoder) avroEncode(table *model.TableName, updateTs uint64, cols map[string]*model.Column) (*avroEncodeResult, error) {
avroCodec, registryID, err := a.valueSchemaManager.Lookup(context.Background(), *table, updateTs)
if err != nil {
return nil, errors.Annotate(err, "AvroEventBatchEncoder: lookup failed")
}
Expand Down Expand Up @@ -167,10 +178,11 @@ type avroSchemaField struct {
DefaultValue interface{} `json:"default"`
}

func columnInfoToAvroSchema(name string, columnInfo []*model.ColumnInfo) (string, error) {
// ColumnInfoToAvroSchema generates the Avro schema JSON for the corresponding columns
func ColumnInfoToAvroSchema(name string, columnInfo []*model.ColumnInfo) (string, error) {
top := avroSchemaTop{
Tp: "record",
Name: name,
Name: name + "_" + strconv.FormatInt(rand.Int63(), 10),
Fields: nil,
}

Expand All @@ -190,7 +202,7 @@ func columnInfoToAvroSchema(name string, columnInfo []*model.ColumnInfo) (string

str, err := json.Marshal(&top)
if err != nil {
return "", errors.Annotate(err, "columnInfoToAvroSchema: failed to generate json")
return "", errors.Annotate(err, "ColumnInfoToAvroSchema: failed to generate json")
}
return string(str), nil
}
Expand Down Expand Up @@ -229,9 +241,9 @@ func getAvroDataTypeName(v interface{}) (string, error) {
case string:
return "string", nil
case time.Duration:
return "long.time-millis", nil
return "long", nil
case time.Time:
return "long.timestamp-millis", nil
return "long", nil
default:
log.Warn("getAvroDataTypeName: unknown type")
return "", errors.New("unknown type for Avro")
Expand All @@ -247,9 +259,9 @@ func getAvroDataTypeNameMysql(tp byte) (string, error) {
case mysql.TypeVarchar, mysql.TypeString, mysql.TypeVarString:
return "string", nil
case mysql.TypeDate, mysql.TypeDatetime, mysql.TypeTimestamp:
return "long.timestamp-millis", nil
return "long", nil
case mysql.TypeDuration: //duration should read fsp from column meta data
return "long.time-millis", nil
return "long", nil
case mysql.TypeEnum:
return "long", nil
case mysql.TypeSet:
Expand Down Expand Up @@ -283,26 +295,26 @@ func columnToAvroNativeData(col *model.Column) (interface{}, string, error) {
str := col.Value.(string)
t, err := time.Parse(types.DateFormat, str)
if err == nil {
return t, "long.timestamp-millis", nil
return t, "long", nil
}

t, err = time.Parse(types.TimeFormat, str)
if err == nil {
return t, "long.timestamp-millis", nil
return t, "long", nil
}

t, err = time.Parse(types.TimeFSPFormat, str)
if err != nil {
return nil, "error", err
}
return t, "long.timestamp-millis", nil
return t, "long", nil
case mysql.TypeDuration:
str := col.Value.(string)
d, err := time.ParseDuration(str)
if err != nil {
return nil, "error", err
}
return d, "long.timestamp-millis", nil
return d, "long", nil
case mysql.TypeJSON:
return col.Value.(tijson.BinaryJSON).String(), "string", nil
case mysql.TypeNewDecimal, mysql.TypeDecimal:
Expand Down Expand Up @@ -334,7 +346,7 @@ func (r *avroEncodeResult) toEnvelope() ([]byte, error) {
buf := new(bytes.Buffer)
data := []interface{}{magicByte, int32(r.registryID), r.data}
for _, v := range data {
err := binary.Write(buf, binary.LittleEndian, v)
err := binary.Write(buf, binary.BigEndian, v)
if err != nil {
return nil, errors.Annotate(err, "converting Avro data to envelope failed")
}
Expand Down
18 changes: 9 additions & 9 deletions cdc/sink/codec/avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ package codec

import (
"context"
"time"

"github.com/linkedin/goavro/v2"
"github.com/pingcap/check"
"github.com/pingcap/errors"
Expand All @@ -26,7 +24,6 @@ import (
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/cdc/puller"
"github.com/pingcap/ticdc/pkg/regionspan"
"github.com/pingcap/tidb/types"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -85,7 +82,7 @@ func (s *avroBatchEncoderSuite) TestAvroEncodeOnly(c *check.C) {
"mybool": {Value: uint8(1), Type: mysql.TypeTiny},
"myfloat": {Value: float32(3.14), Type: mysql.TypeFloat},
"mybytes": {Value: []byte("Hello World"), Type: mysql.TypeBlob},
"ts": {Value: time.Now().Format(types.TimeFSPFormat), Type: mysql.TypeTimestamp},
//"ts": {Value: time.Now().Format(types.TimeFSPFormat), Type: mysql.TypeTimestamp},
})
c.Assert(err, check.IsNil)

Expand Down Expand Up @@ -125,7 +122,7 @@ func (s *avroBatchEncoderSuite) TestAvroEnvelope(c *check.C) {
c.Check(err, check.IsNil)

c.Assert(evlp[0], check.Equals, magicByte)
c.Assert(evlp[1:5], check.BytesEquals, []byte{7, 0, 0, 0})
c.Assert(evlp[1:5], check.BytesEquals, []byte{0, 0, 0, 7})

parsed, _, err := avroCodec.NativeFromBinary(evlp[5:])
c.Assert(err, check.IsNil)
Expand Down Expand Up @@ -174,12 +171,15 @@ func (s *avroBatchEncoderSuite) TestAvroEncode(c *check.C) {
}()

info := pm.GetTableInfo("test", "person")
testCaseDdl.ColumnInfo = make([]*model.ColumnInfo, len(info.Columns))
testCaseDdl.TableInfo = new(model.TableInfo)
testCaseDdl.TableInfo.Schema = "test"
testCaseDdl.TableInfo.Table = "person"
testCaseDdl.TableInfo.ColumnInfo = make([]*model.ColumnInfo, len(info.Columns))
for i, v := range info.Columns {
testCaseDdl.ColumnInfo[i] = new(model.ColumnInfo)
testCaseDdl.ColumnInfo[i].FromTiColumnInfo(v)
testCaseDdl.TableInfo.ColumnInfo[i] = new(model.ColumnInfo)
testCaseDdl.TableInfo.ColumnInfo[i].FromTiColumnInfo(v)
}
testCaseDdl.SchemaID = info.SchemaID
testCaseDdl.TableInfo.UpdateTs = 0xbeefbeef

err := s.encoder.AppendDDLEvent(testCaseDdl)
c.Check(err, check.IsNil)
Expand Down
Loading

0 comments on commit 583ced8

Please sign in to comment.