diff --git a/pkg/ccl/changefeedccl/changefeed.go b/pkg/ccl/changefeedccl/changefeed.go index 0a2883f2210a..3e2ccbccb90a 100644 --- a/pkg/ccl/changefeedccl/changefeed.go +++ b/pkg/ccl/changefeedccl/changefeed.go @@ -14,8 +14,10 @@ import ( gojson "encoding/json" "time" + "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql" @@ -69,6 +71,7 @@ type emitEntry struct { // returns a closure that may be repeatedly called to advance the changefeed. // The returned closure is not threadsafe. func kvsToRows( + db *client.DB, leaseMgr *sql.LeaseManager, tableHist *tableHistory, details jobspb.ChangefeedDetails, @@ -99,12 +102,48 @@ func kvsToRows( if err != nil { return nil, err } - // TODO(dan): Handle tables with multiple column families. - kvs.KVs = append(kvs.KVs, kv) + + if len(desc.Families) > 1 { + // If there's more than one column family, we need a followup fetch. + // This is fairly late to be kicking off and blocking on this fetch, + // but, in the common case, this is not as bad as it sounds since + // this code is likely running on the leaseholder and the data can + // be served locally. If this is a problem, we could kick off the + // fetch earlier (when it enters the buffer) or cache some values or + // both. + + // WIP: The real problem is that treating these a key at a time is a + // bit naive. If N column families from some row are updated at the + // same time, we'll refetch the row and emit an exact duplicate N + // times. + keyPrefix, err := keys.EnsureSafeSplitKey(kv.Key) + if err != nil { + return nil, err + } + if err := db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { + txn.SetFixedTimestamp(ctx, kv.Value.Timestamp) + clientKVs, err := txn.Scan(ctx, keyPrefix, keyPrefix.PrefixEnd(), 0 /* maxRows */) + if err != nil { + return err + } + kvs.KVs = kvs.KVs[:0] + for _, clientKV := range clientKVs { + kvs.KVs = append(kvs.KVs, roachpb.KeyValue{ + Key: clientKV.Key, + Value: *clientKV.Value, + }) + } + return nil + }); err != nil { + return nil, err + } + } else { + kvs.KVs = append(kvs.KVs, kv) + } + if err := rf.StartScanFrom(ctx, &kvs); err != nil { return nil, err } - for { var r emitEntry r.row.datums, r.row.tableDesc, _, err = rf.NextRowDecoded(ctx) diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 5d443a1c0024..1092026c8021 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -179,7 +179,7 @@ func (ca *changeAggregator) Start(ctx context.Context) context.Context { targets: ca.spec.Feed.Targets, m: tableHist, } - rowsFn := kvsToRows(leaseMgr, tableHist, ca.spec.Feed, buf.Get) + rowsFn := kvsToRows(ca.flowCtx.ClientDB, leaseMgr, tableHist, ca.spec.Feed, buf.Get) var knobs TestingKnobs if cfKnobs, ok := ca.flowCtx.TestingKnobs().Changefeed.(*TestingKnobs); ok { diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index 8e2c6b335a9e..921feb43f8fd 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -326,11 +326,6 @@ func validateChangefeedTable( if tableDesc.IsSequence() { return errors.Errorf(`CHANGEFEED cannot target sequences: %s`, tableDesc.Name) } - if len(tableDesc.Families) != 1 { - return errors.Errorf( - `CHANGEFEEDs are currently supported on tables with exactly 1 column family: %s has %d`, - tableDesc.Name, len(tableDesc.Families)) - } if tableDesc.State == sqlbase.TableDescriptor_DROP { return errors.Errorf(`"%s" was dropped or truncated`, t.StatementTimeName) diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 955ff9049ae2..9aa010cce947 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -468,11 +468,28 @@ func TestChangefeedColumnFamily(t *testing.T) { // Table with 2 column families. sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, FAMILY (a), FAMILY (b))`) - if _, err := sqlDB.DB.Query( - `CREATE CHANGEFEED FOR foo`, - ); !testutils.IsError(err, `exactly 1 column family`) { - t.Errorf(`expected "exactly 1 column family" error got: %+v`, err) - } + sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'a')`) + foo := f.Feed(t, `CREATE CHANGEFEED FOR foo`) + defer foo.Close(t) + assertPayloads(t, foo, []string{ + // WIP Changefeeds currently emit a duplicate for every column + // family modified at the same time. + `foo: [1]->{"a": 1, "b": "a"}`, + `foo: [1]->{"a": 1, "b": "a"}`, + }) + + sqlDB.Exec(t, `UPDATE foo SET a = 2 WHERE b = 'a'`) + assertPayloads(t, foo, []string{ + // WIP Changefeeds currently emit a duplicate for every column + // family modified at the same time. + `foo: [2]->{"a": 2, "b": "a"}`, + `foo: [2]->{"a": 2, "b": "a"}`, + }) + + sqlDB.Exec(t, `UPDATE foo SET b = 'b' WHERE a = 2`) + assertPayloads(t, foo, []string{ + `foo: [2]->{"a": 2, "b": "b"}`, + }) // Table with a second column family added after the changefeed starts. sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY, FAMILY f_a (a))`) @@ -483,11 +500,13 @@ func TestChangefeedColumnFamily(t *testing.T) { `bar: [0]->{"a": 0}`, }) sqlDB.Exec(t, `ALTER TABLE bar ADD COLUMN b STRING CREATE FAMILY f_b`) - sqlDB.Exec(t, `INSERT INTO bar VALUES (1)`) - _, _, _, _, _, _ = bar.Next(t) - if err := bar.Err(); !testutils.IsError(err, `exactly 1 column family`) { - t.Errorf(`expected "exactly 1 column family" error got: %+v`, err) - } + sqlDB.Exec(t, `INSERT INTO bar VALUES (1, 'a')`) + assertPayloads(t, bar, []string{ + // WIP Changefeeds currently emit a duplicate for every column + // family modified at the same time. + `bar: [1]->{"a": 1, "b": "a"}`, + `bar: [1]->{"a": 1, "b": "a"}`, + }) } t.Run(`sinkless`, sinklessTest(testFn)) diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index 2c72f3c127ce..807001193022 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -131,7 +131,7 @@ func createBenchmarkChangefeed( targets: details.Targets, m: th, } - rowsFn := kvsToRows(s.LeaseManager().(*sql.LeaseManager), th, details, buf.Get) + rowsFn := kvsToRows(s.DB(), s.LeaseManager().(*sql.LeaseManager), th, details, buf.Get) tickFn := emitEntries(details, sink, rowsFn, TestingKnobs{}) ctx, cancel := context.WithCancel(ctx)