From cc96ff0c8c63f3e5804df7eefe21084f0b3f2ac8 Mon Sep 17 00:00:00 2001 From: Daniel Harrison Date: Wed, 19 Sep 2018 08:12:17 -0700 Subject: [PATCH] changefeedccl: support tables with more than one column family If a table has multiple column families and only one of them is changed in a given statement, only the kv for that column family will be changed by the sql system. This means we wouldn't have enough data to reconstruct the row. We can fetch the row's entire kv data with a prefix scan that's forced to be at the timestamp of the changed kv. This has two issues - it introduces a blocking kv read (in the common case, this is not as bad as it sounds since this code is likely running on the leaseholder and the data can be served locally) - if N column families were updated at once, then we'll run this process N times and get N exact duplicates in the output Release note (enterprise change): CHANGEFEEDs now support tables with multiple column families, though with degraded performance. Closes #28667 --- pkg/ccl/changefeedccl/changefeed.go | 45 +++++++++++++++++-- .../changefeedccl/changefeed_processors.go | 2 +- pkg/ccl/changefeedccl/changefeed_stmt.go | 5 --- pkg/ccl/changefeedccl/changefeed_test.go | 39 +++++++++++----- pkg/ccl/changefeedccl/helpers_test.go | 2 +- 5 files changed, 73 insertions(+), 20 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeed.go b/pkg/ccl/changefeedccl/changefeed.go index 0a2883f2210a..3e2ccbccb90a 100644 --- a/pkg/ccl/changefeedccl/changefeed.go +++ b/pkg/ccl/changefeedccl/changefeed.go @@ -14,8 +14,10 @@ import ( gojson "encoding/json" "time" + "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql" @@ -69,6 +71,7 @@ type emitEntry struct { // returns a closure that may be repeatedly called to advance the changefeed. // The returned closure is not threadsafe. func kvsToRows( + db *client.DB, leaseMgr *sql.LeaseManager, tableHist *tableHistory, details jobspb.ChangefeedDetails, @@ -99,12 +102,48 @@ func kvsToRows( if err != nil { return nil, err } - // TODO(dan): Handle tables with multiple column families. - kvs.KVs = append(kvs.KVs, kv) + + if len(desc.Families) > 1 { + // If there's more than one column family, we need a followup fetch. + // This is fairly late to be kicking off and blocking on this fetch, + // but, in the common case, this is not as bad as it sounds since + // this code is likely running on the leaseholder and the data can + // be served locally. If this is a problem, we could kick off the + // fetch earlier (when it enters the buffer) or cache some values or + // both. + + // WIP: The real problem is that treating these a key at a time is a + // bit naive. If N column families from some row are updated at the + // same time, we'll refetch the row and emit an exact duplicate N + // times. + keyPrefix, err := keys.EnsureSafeSplitKey(kv.Key) + if err != nil { + return nil, err + } + if err := db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { + txn.SetFixedTimestamp(ctx, kv.Value.Timestamp) + clientKVs, err := txn.Scan(ctx, keyPrefix, keyPrefix.PrefixEnd(), 0 /* maxRows */) + if err != nil { + return err + } + kvs.KVs = kvs.KVs[:0] + for _, clientKV := range clientKVs { + kvs.KVs = append(kvs.KVs, roachpb.KeyValue{ + Key: clientKV.Key, + Value: *clientKV.Value, + }) + } + return nil + }); err != nil { + return nil, err + } + } else { + kvs.KVs = append(kvs.KVs, kv) + } + if err := rf.StartScanFrom(ctx, &kvs); err != nil { return nil, err } - for { var r emitEntry r.row.datums, r.row.tableDesc, _, err = rf.NextRowDecoded(ctx) diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 5d443a1c0024..1092026c8021 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -179,7 +179,7 @@ func (ca *changeAggregator) Start(ctx context.Context) context.Context { targets: ca.spec.Feed.Targets, m: tableHist, } - rowsFn := kvsToRows(leaseMgr, tableHist, ca.spec.Feed, buf.Get) + rowsFn := kvsToRows(ca.flowCtx.ClientDB, leaseMgr, tableHist, ca.spec.Feed, buf.Get) var knobs TestingKnobs if cfKnobs, ok := ca.flowCtx.TestingKnobs().Changefeed.(*TestingKnobs); ok { diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index 8e2c6b335a9e..921feb43f8fd 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -326,11 +326,6 @@ func validateChangefeedTable( if tableDesc.IsSequence() { return errors.Errorf(`CHANGEFEED cannot target sequences: %s`, tableDesc.Name) } - if len(tableDesc.Families) != 1 { - return errors.Errorf( - `CHANGEFEEDs are currently supported on tables with exactly 1 column family: %s has %d`, - tableDesc.Name, len(tableDesc.Families)) - } if tableDesc.State == sqlbase.TableDescriptor_DROP { return errors.Errorf(`"%s" was dropped or truncated`, t.StatementTimeName) diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 955ff9049ae2..9aa010cce947 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -468,11 +468,28 @@ func TestChangefeedColumnFamily(t *testing.T) { // Table with 2 column families. sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, FAMILY (a), FAMILY (b))`) - if _, err := sqlDB.DB.Query( - `CREATE CHANGEFEED FOR foo`, - ); !testutils.IsError(err, `exactly 1 column family`) { - t.Errorf(`expected "exactly 1 column family" error got: %+v`, err) - } + sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'a')`) + foo := f.Feed(t, `CREATE CHANGEFEED FOR foo`) + defer foo.Close(t) + assertPayloads(t, foo, []string{ + // WIP Changefeeds currently emit a duplicate for every column + // family modified at the same time. + `foo: [1]->{"a": 1, "b": "a"}`, + `foo: [1]->{"a": 1, "b": "a"}`, + }) + + sqlDB.Exec(t, `UPDATE foo SET a = 2 WHERE b = 'a'`) + assertPayloads(t, foo, []string{ + // WIP Changefeeds currently emit a duplicate for every column + // family modified at the same time. + `foo: [2]->{"a": 2, "b": "a"}`, + `foo: [2]->{"a": 2, "b": "a"}`, + }) + + sqlDB.Exec(t, `UPDATE foo SET b = 'b' WHERE a = 2`) + assertPayloads(t, foo, []string{ + `foo: [2]->{"a": 2, "b": "b"}`, + }) // Table with a second column family added after the changefeed starts. sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY, FAMILY f_a (a))`) @@ -483,11 +500,13 @@ func TestChangefeedColumnFamily(t *testing.T) { `bar: [0]->{"a": 0}`, }) sqlDB.Exec(t, `ALTER TABLE bar ADD COLUMN b STRING CREATE FAMILY f_b`) - sqlDB.Exec(t, `INSERT INTO bar VALUES (1)`) - _, _, _, _, _, _ = bar.Next(t) - if err := bar.Err(); !testutils.IsError(err, `exactly 1 column family`) { - t.Errorf(`expected "exactly 1 column family" error got: %+v`, err) - } + sqlDB.Exec(t, `INSERT INTO bar VALUES (1, 'a')`) + assertPayloads(t, bar, []string{ + // WIP Changefeeds currently emit a duplicate for every column + // family modified at the same time. + `bar: [1]->{"a": 1, "b": "a"}`, + `bar: [1]->{"a": 1, "b": "a"}`, + }) } t.Run(`sinkless`, sinklessTest(testFn)) diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index 2c72f3c127ce..807001193022 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -131,7 +131,7 @@ func createBenchmarkChangefeed( targets: details.Targets, m: th, } - rowsFn := kvsToRows(s.LeaseManager().(*sql.LeaseManager), th, details, buf.Get) + rowsFn := kvsToRows(s.DB(), s.LeaseManager().(*sql.LeaseManager), th, details, buf.Get) tickFn := emitEntries(details, sink, rowsFn, TestingKnobs{}) ctx, cancel := context.WithCancel(ctx)