Skip to content

Commit

Permalink
streamingccl: improvements to the random stream test client
Browse files Browse the repository at this point in the history
This change improves on the random stream client to allow for better
testing of the various components of the stream ingestion job.
Specifically:

- Adds support for specifying number of partitions. For simplicity,
  a partition generates KVs for a particular table span.

- Generates system KVs (descriptor and namespace) KVs, as the first two
  KVs on the partition stream. I played around with the idea of having a
separate "system" and "table data" partition, but the code and tests
became more convoluted, compared to the current approach.

- Hookup the CDC orderValidator to the random stream client's output.
  This gives us some guarantees that the data being generated is
semantically correct.

- Maintain an in-memory copy of all the streamed events, that can be
  efficiently queried. This allows us to compare the ingested KVs to the
streamed KVs and gain more confidence in our pipeline.

Release note: None
  • Loading branch information
adityamaru committed Jan 27, 2021
1 parent 3296c52 commit a2e1ef8
Show file tree
Hide file tree
Showing 14 changed files with 516 additions and 158 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/cdctest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
deps = [
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/roachpb",
"//pkg/sql",
"//pkg/sql/parser",
"//pkg/sql/sem/tree",
Expand Down
112 changes: 93 additions & 19 deletions pkg/ccl/changefeedccl/cdctest/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"sort"
"strings"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/errors"
Expand All @@ -32,15 +33,83 @@ type Validator interface {
Failures() []string
}

// StreamClientValidatorWrapper wraps a Validator and exposes additional methods
// used by stream ingestion to check for correctness.
type StreamClientValidatorWrapper interface {
GetValuesForKeyBelowTimestamp(key string, timestamp hlc.Timestamp) ([]roachpb.KeyValue, error)
GetValidator() Validator
}

type streamValidator struct {
Validator
}

var _ StreamClientValidatorWrapper = &streamValidator{}

// NewStreamClientValidatorWrapper returns a wrapped Validator, that can be used
// to validate the events emitted by the cluster to cluster streaming client.
// The wrapper currently only "wraps" an orderValidator, but can be built out
// to utilize other Validator's.
// The wrapper also allows querying the orderValidator to retrieve streamed
// events from an in-memory store.
func NewStreamClientValidatorWrapper() StreamClientValidatorWrapper {
ov := NewOrderValidator("unusedC2C")
return &streamValidator{
ov,
}
}

// GetValidator implements the StreamClientValidatorWrapper interface.
func (sv *streamValidator) GetValidator() Validator {
return sv.Validator
}

// GetValuesForKeyBelowTimestamp implements the StreamClientValidatorWrapper
// interface.
// It returns the streamed KV updates for `key` with a ts less than equal to
// `timestamp`.
func (sv *streamValidator) GetValuesForKeyBelowTimestamp(
key string, timestamp hlc.Timestamp,
) ([]roachpb.KeyValue, error) {
orderValidator, ok := sv.GetValidator().(*orderValidator)
if !ok {
return nil, errors.Newf("unknown validator %T: ", sv.GetValidator())
}
timestampValueTuples := orderValidator.keyTimestampAndValues[key]
timestampsIdx := sort.Search(len(timestampValueTuples), func(i int) bool {
return timestamp.Less(timestampValueTuples[i].ts)
})
var kv []roachpb.KeyValue
for _, tsValue := range timestampValueTuples[:timestampsIdx] {
byteRep := []byte(key)
kv = append(kv, roachpb.KeyValue{
Key: byteRep,
Value: roachpb.Value{
RawBytes: []byte(tsValue.value),
Timestamp: tsValue.ts,
},
})
}

return kv, nil
}

type timestampValue struct {
ts hlc.Timestamp
value string
}

type orderValidator struct {
topic string
partitionForKey map[string]string
keyTimestamps map[string][]hlc.Timestamp
resolved map[string]hlc.Timestamp
topic string
partitionForKey map[string]string
keyTimestampAndValues map[string][]timestampValue
resolved map[string]hlc.Timestamp

failures []string
}

var _ Validator = &orderValidator{}

// NewOrderValidator returns a Validator that checks the row and resolved
// timestamp ordering guarantees. It also asserts that keys have an affinity to
// a single partition.
Expand All @@ -52,17 +121,15 @@ type orderValidator struct {
// lower update timestamp will be emitted on that partition.
func NewOrderValidator(topic string) Validator {
return &orderValidator{
topic: topic,
partitionForKey: make(map[string]string),
keyTimestamps: make(map[string][]hlc.Timestamp),
resolved: make(map[string]hlc.Timestamp),
topic: topic,
partitionForKey: make(map[string]string),
keyTimestampAndValues: make(map[string][]timestampValue),
resolved: make(map[string]hlc.Timestamp),
}
}

// NoteRow implements the Validator interface.
func (v *orderValidator) NoteRow(
partition string, key, ignoredValue string, updated hlc.Timestamp,
) error {
func (v *orderValidator) NoteRow(partition string, key, value string, updated hlc.Timestamp) error {
if prev, ok := v.partitionForKey[key]; ok && prev != partition {
v.failures = append(v.failures, fmt.Sprintf(
`key [%s] received on two partitions: %s and %s`, key, prev, partition,
Expand All @@ -71,17 +138,20 @@ func (v *orderValidator) NoteRow(
}
v.partitionForKey[key] = partition

timestamps := v.keyTimestamps[key]
timestampsIdx := sort.Search(len(timestamps), func(i int) bool {
return updated.LessEq(timestamps[i])
timestampValueTuples := v.keyTimestampAndValues[key]
timestampsIdx := sort.Search(len(timestampValueTuples), func(i int) bool {
return updated.LessEq(timestampValueTuples[i].ts)
})
seen := timestampsIdx < len(timestamps) && timestamps[timestampsIdx] == updated
seen := timestampsIdx < len(timestampValueTuples) &&
timestampValueTuples[timestampsIdx].ts == updated

if !seen && len(timestamps) > 0 && updated.Less(timestamps[len(timestamps)-1]) {
if !seen && len(timestampValueTuples) > 0 &&
updated.Less(timestampValueTuples[len(timestampValueTuples)-1].ts) {
v.failures = append(v.failures, fmt.Sprintf(
`topic %s partition %s: saw new row timestamp %s after %s was seen`,
v.topic, partition,
updated.AsOfSystemTime(), timestamps[len(timestamps)-1].AsOfSystemTime(),
updated.AsOfSystemTime(),
timestampValueTuples[len(timestampValueTuples)-1].ts.AsOfSystemTime(),
))
}
if !seen && updated.Less(v.resolved[partition]) {
Expand All @@ -92,8 +162,12 @@ func (v *orderValidator) NoteRow(
}

if !seen {
v.keyTimestamps[key] = append(
append(timestamps[:timestampsIdx], updated), timestamps[timestampsIdx:]...)
v.keyTimestampAndValues[key] = append(
append(timestampValueTuples[:timestampsIdx], timestampValue{
ts: updated,
value: value,
}),
timestampValueTuples[timestampsIdx:]...)
}
return nil
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/ccl/streamingccl/addresses.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ func (sa StreamAddress) URL() (*url.URL, error) {
// Each partition will emit events for a fixed span of keys.
type PartitionAddress string

// URL parses the partition address as a URL.
func (pa PartitionAddress) URL() (*url.URL, error) {
return url.Parse(string(pa))
}

// Topology is a configuration of stream partitions. These are particular to a
// stream. It specifies the number and addresses of partitions of the stream.
//
Expand Down
25 changes: 20 additions & 5 deletions pkg/ccl/streamingccl/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,15 @@ type Event interface {
// CheckpointEvent. The resolved timestamp indicates that all KV events until
// this time have been emitted.
GetResolved() *hlc.Timestamp
// GetPartitionAddress returns the PartitionAddress of the partition from which the
// event was emitted from.
GetPartitionAddress() *PartitionAddress
}

// kvEvent is a key value pair that needs to be ingested.
type kvEvent struct {
kv roachpb.KeyValue
kv roachpb.KeyValue
partitionAddress PartitionAddress
}

var _ Event = kvEvent{}
Expand All @@ -61,10 +65,16 @@ func (kve kvEvent) GetResolved() *hlc.Timestamp {
return nil
}

// GetPartitionAddress implements the Event interface.
func (kve kvEvent) GetPartitionAddress() *PartitionAddress {
return &kve.partitionAddress
}

// checkpointEvent indicates that the stream has emitted every change for all
// keys in the span it is responsible for up until this timestamp.
type checkpointEvent struct {
resolvedTimestamp hlc.Timestamp
partitionAddress PartitionAddress
}

var _ Event = checkpointEvent{}
Expand All @@ -84,12 +94,17 @@ func (ce checkpointEvent) GetResolved() *hlc.Timestamp {
return &ce.resolvedTimestamp
}

// GetPartitionAddress implements the Event interface.
func (ce checkpointEvent) GetPartitionAddress() *PartitionAddress {
return &ce.partitionAddress
}

// MakeKVEvent creates an Event from a KV.
func MakeKVEvent(kv roachpb.KeyValue) Event {
return kvEvent{kv: kv}
func MakeKVEvent(kv roachpb.KeyValue, address PartitionAddress) Event {
return kvEvent{kv: kv, partitionAddress: address}
}

// MakeCheckpointEvent creates an Event from a resolved timestamp.
func MakeCheckpointEvent(resolvedTimestamp hlc.Timestamp) Event {
return checkpointEvent{resolvedTimestamp: resolvedTimestamp}
func MakeCheckpointEvent(resolvedTimestamp hlc.Timestamp, address PartitionAddress) Event {
return checkpointEvent{resolvedTimestamp: resolvedTimestamp, partitionAddress: address}
}
2 changes: 2 additions & 0 deletions pkg/ccl/streamingccl/streamclient/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ go_library(
"//pkg/keys",
"//pkg/roachpb",
"//pkg/sql",
"//pkg/sql/catalog/catalogkeys",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/systemschema",
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/rowenc",
"//pkg/sql/sem/tree",
"//pkg/util/hlc",
"//pkg/util/randutil",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
],
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/streamingccl/streamclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (sc testStreamClient) GetTopology(

// ConsumePartition implements the Client interface.
func (sc testStreamClient) ConsumePartition(
_ context.Context, _ streamingccl.PartitionAddress, _ time.Time,
_ context.Context, pa streamingccl.PartitionAddress, _ time.Time,
) (chan streamingccl.Event, error) {
sampleKV := roachpb.KeyValue{
Key: []byte("key_1"),
Expand All @@ -49,8 +49,8 @@ func (sc testStreamClient) ConsumePartition(
}

events := make(chan streamingccl.Event, 2)
events <- streamingccl.MakeKVEvent(sampleKV)
events <- streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 100})
events <- streamingccl.MakeKVEvent(sampleKV, pa)
events <- streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 100}, pa)
close(events)

return events, nil
Expand Down
Loading

0 comments on commit a2e1ef8

Please sign in to comment.