Skip to content

Commit

Permalink
changefeedccl: support tables with more than one column family
Browse files Browse the repository at this point in the history
If a table has multiple column families and only one of them is changed
in a given statement, only the kv for that column family will be changed
by the sql system. This means we wouldn't have enough data to
reconstruct the row.

We can fetch the row's entire kv data with a prefix scan that's forced
to be at the timestamp of the changed kv. This has two issues

- it introduces a blocking kv read (in the common case, this is not as
  bad as it sounds since this code is likely running on the leaseholder
  and the data can be served locally)
- if N column families were updated at once, then we'll run this process
  N times and get N exact duplicates in the output

Release note (enterprise change): CHANGEFEEDs now support tables with
multiple column families, though with degraded performance.

Closes cockroachdb#28667
  • Loading branch information
danhhz committed Sep 19, 2018
1 parent 280d8c7 commit cc96ff0
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 20 deletions.
45 changes: 42 additions & 3 deletions pkg/ccl/changefeedccl/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ import (
gojson "encoding/json"
"time"

"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql"
Expand Down Expand Up @@ -69,6 +71,7 @@ type emitEntry struct {
// returns a closure that may be repeatedly called to advance the changefeed.
// The returned closure is not threadsafe.
func kvsToRows(
db *client.DB,
leaseMgr *sql.LeaseManager,
tableHist *tableHistory,
details jobspb.ChangefeedDetails,
Expand Down Expand Up @@ -99,12 +102,48 @@ func kvsToRows(
if err != nil {
return nil, err
}
// TODO(dan): Handle tables with multiple column families.
kvs.KVs = append(kvs.KVs, kv)

if len(desc.Families) > 1 {
// If there's more than one column family, we need a followup fetch.
// This is fairly late to be kicking off and blocking on this fetch,
// but, in the common case, this is not as bad as it sounds since
// this code is likely running on the leaseholder and the data can
// be served locally. If this is a problem, we could kick off the
// fetch earlier (when it enters the buffer) or cache some values or
// both.

// WIP: The real problem is that treating these a key at a time is a
// bit naive. If N column families from some row are updated at the
// same time, we'll refetch the row and emit an exact duplicate N
// times.
keyPrefix, err := keys.EnsureSafeSplitKey(kv.Key)
if err != nil {
return nil, err
}
if err := db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
txn.SetFixedTimestamp(ctx, kv.Value.Timestamp)
clientKVs, err := txn.Scan(ctx, keyPrefix, keyPrefix.PrefixEnd(), 0 /* maxRows */)
if err != nil {
return err
}
kvs.KVs = kvs.KVs[:0]
for _, clientKV := range clientKVs {
kvs.KVs = append(kvs.KVs, roachpb.KeyValue{
Key: clientKV.Key,
Value: *clientKV.Value,
})
}
return nil
}); err != nil {
return nil, err
}
} else {
kvs.KVs = append(kvs.KVs, kv)
}

if err := rf.StartScanFrom(ctx, &kvs); err != nil {
return nil, err
}

for {
var r emitEntry
r.row.datums, r.row.tableDesc, _, err = rf.NextRowDecoded(ctx)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (ca *changeAggregator) Start(ctx context.Context) context.Context {
targets: ca.spec.Feed.Targets,
m: tableHist,
}
rowsFn := kvsToRows(leaseMgr, tableHist, ca.spec.Feed, buf.Get)
rowsFn := kvsToRows(ca.flowCtx.ClientDB, leaseMgr, tableHist, ca.spec.Feed, buf.Get)

var knobs TestingKnobs
if cfKnobs, ok := ca.flowCtx.TestingKnobs().Changefeed.(*TestingKnobs); ok {
Expand Down
5 changes: 0 additions & 5 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,11 +326,6 @@ func validateChangefeedTable(
if tableDesc.IsSequence() {
return errors.Errorf(`CHANGEFEED cannot target sequences: %s`, tableDesc.Name)
}
if len(tableDesc.Families) != 1 {
return errors.Errorf(
`CHANGEFEEDs are currently supported on tables with exactly 1 column family: %s has %d`,
tableDesc.Name, len(tableDesc.Families))
}

if tableDesc.State == sqlbase.TableDescriptor_DROP {
return errors.Errorf(`"%s" was dropped or truncated`, t.StatementTimeName)
Expand Down
39 changes: 29 additions & 10 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,11 +468,28 @@ func TestChangefeedColumnFamily(t *testing.T) {

// Table with 2 column families.
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING, FAMILY (a), FAMILY (b))`)
if _, err := sqlDB.DB.Query(
`CREATE CHANGEFEED FOR foo`,
); !testutils.IsError(err, `exactly 1 column family`) {
t.Errorf(`expected "exactly 1 column family" error got: %+v`, err)
}
sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'a')`)
foo := f.Feed(t, `CREATE CHANGEFEED FOR foo`)
defer foo.Close(t)
assertPayloads(t, foo, []string{
// WIP Changefeeds currently emit a duplicate for every column
// family modified at the same time.
`foo: [1]->{"a": 1, "b": "a"}`,
`foo: [1]->{"a": 1, "b": "a"}`,
})

sqlDB.Exec(t, `UPDATE foo SET a = 2 WHERE b = 'a'`)
assertPayloads(t, foo, []string{
// WIP Changefeeds currently emit a duplicate for every column
// family modified at the same time.
`foo: [2]->{"a": 2, "b": "a"}`,
`foo: [2]->{"a": 2, "b": "a"}`,
})

sqlDB.Exec(t, `UPDATE foo SET b = 'b' WHERE a = 2`)
assertPayloads(t, foo, []string{
`foo: [2]->{"a": 2, "b": "b"}`,
})

// Table with a second column family added after the changefeed starts.
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY, FAMILY f_a (a))`)
Expand All @@ -483,11 +500,13 @@ func TestChangefeedColumnFamily(t *testing.T) {
`bar: [0]->{"a": 0}`,
})
sqlDB.Exec(t, `ALTER TABLE bar ADD COLUMN b STRING CREATE FAMILY f_b`)
sqlDB.Exec(t, `INSERT INTO bar VALUES (1)`)
_, _, _, _, _, _ = bar.Next(t)
if err := bar.Err(); !testutils.IsError(err, `exactly 1 column family`) {
t.Errorf(`expected "exactly 1 column family" error got: %+v`, err)
}
sqlDB.Exec(t, `INSERT INTO bar VALUES (1, 'a')`)
assertPayloads(t, bar, []string{
// WIP Changefeeds currently emit a duplicate for every column
// family modified at the same time.
`bar: [1]->{"a": 1, "b": "a"}`,
`bar: [1]->{"a": 1, "b": "a"}`,
})
}

t.Run(`sinkless`, sinklessTest(testFn))
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func createBenchmarkChangefeed(
targets: details.Targets,
m: th,
}
rowsFn := kvsToRows(s.LeaseManager().(*sql.LeaseManager), th, details, buf.Get)
rowsFn := kvsToRows(s.DB(), s.LeaseManager().(*sql.LeaseManager), th, details, buf.Get)
tickFn := emitEntries(details, sink, rowsFn, TestingKnobs{})

ctx, cancel := context.WithCancel(ctx)
Expand Down

0 comments on commit cc96ff0

Please sign in to comment.