Skip to content

Commit

Permalink
s3Sink: cancel create existing table directory on s3 when initialize …
Browse files Browse the repository at this point in the history
…s3Sink (#1477) (#1672)
  • Loading branch information
ti-srebot authored Apr 16, 2021
1 parent cb9bd17 commit 4fb1b6b
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 13 deletions.
2 changes: 2 additions & 0 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,8 @@ func (o *Owner) newChangeFeed(
sinkTableInfo[j-1] = new(model.SimpleTableInfo)
sinkTableInfo[j-1].TableID = tid
sinkTableInfo[j-1].ColumnInfo = make([]*model.ColumnInfo, len(tblInfo.Cols()))
sinkTableInfo[j-1].Schema = table.Schema
sinkTableInfo[j-1].Table = table.Table

for i, colInfo := range tblInfo.Cols() {
sinkTableInfo[j-1].ColumnInfo[i] = new(model.ColumnInfo)
Expand Down
10 changes: 0 additions & 10 deletions cdc/sink/cdclog/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,16 +330,6 @@ func (s *s3Sink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {

func (s *s3Sink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
if tableInfo != nil {
for _, table := range tableInfo {
if table != nil {
err := s.storage.WriteFile(ctx, makeTableDirectoryName(table.TableID), nil)
if err != nil {
return errors.Annotate(
cerror.WrapError(cerror.ErrS3SinkStorageAPI, err),
"create table directory on s3 failed")
}
}
}
// update log meta to record the relationship about tableName and tableID
s.logMeta = makeLogMetaContent(tableInfo)

Expand Down
8 changes: 5 additions & 3 deletions tests/cdclog_s3/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,6 @@ function prepare() {

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY

SINK_URI="s3://logbucket/test?endpoint=http://$S3_ENDPOINT/"

run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"
}

success=0
Expand Down Expand Up @@ -87,6 +84,11 @@ function cdclog_test() {
run_sql "drop database if exists $TEST_NAME"
run_sql "create database $TEST_NAME"
run_sql "create table $TEST_NAME.t1 (c0 int primary key, payload varchar(1024));"

SINK_URI="s3://logbucket/test?endpoint=http://$S3_ENDPOINT/"

run_cdc_cli changefeed create --start-ts=0 --sink-uri="$SINK_URI"

run_sql "create table $TEST_NAME.t2 (c0 int primary key, payload varchar(1024));"

run_sql "insert into $TEST_NAME.t1 values (1, 'a')"
Expand Down

0 comments on commit 4fb1b6b

Please sign in to comment.