Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streamingccl: improvements to the random stream test client #59441

Merged
merged 1 commit into from
Feb 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/cdctest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
deps = [
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/roachpb",
"//pkg/sql",
"//pkg/sql/parser",
"//pkg/sql/sem/tree",
Expand Down
112 changes: 93 additions & 19 deletions pkg/ccl/changefeedccl/cdctest/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"sort"
"strings"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/errors"
Expand All @@ -32,15 +33,83 @@ type Validator interface {
Failures() []string
}

// StreamClientValidatorWrapper wraps a Validator and exposes additional methods
// used by stream ingestion to check for correctness.
type StreamClientValidatorWrapper interface {
GetValuesForKeyBelowTimestamp(key string, timestamp hlc.Timestamp) ([]roachpb.KeyValue, error)
GetValidator() Validator
}

type streamValidator struct {
Validator
}

var _ StreamClientValidatorWrapper = &streamValidator{}

// NewStreamClientValidatorWrapper returns a wrapped Validator, that can be used
// to validate the events emitted by the cluster to cluster streaming client.
// The wrapper currently only "wraps" an orderValidator, but can be built out
// to utilize other Validator's.
// The wrapper also allows querying the orderValidator to retrieve streamed
// events from an in-memory store.
func NewStreamClientValidatorWrapper() StreamClientValidatorWrapper {
ov := NewOrderValidator("unusedC2C")
return &streamValidator{
ov,
}
}

// GetValidator implements the StreamClientValidatorWrapper interface.
func (sv *streamValidator) GetValidator() Validator {
return sv.Validator
}

// GetValuesForKeyBelowTimestamp implements the StreamClientValidatorWrapper
// interface.
// It returns the streamed KV updates for `key` with a ts less than equal to
// `timestamp`.
func (sv *streamValidator) GetValuesForKeyBelowTimestamp(
key string, timestamp hlc.Timestamp,
) ([]roachpb.KeyValue, error) {
orderValidator, ok := sv.GetValidator().(*orderValidator)
if !ok {
return nil, errors.Newf("unknown validator %T: ", sv.GetValidator())
}
timestampValueTuples := orderValidator.keyTimestampAndValues[key]
timestampsIdx := sort.Search(len(timestampValueTuples), func(i int) bool {
return timestamp.Less(timestampValueTuples[i].ts)
})
var kv []roachpb.KeyValue
for _, tsValue := range timestampValueTuples[:timestampsIdx] {
byteRep := []byte(key)
kv = append(kv, roachpb.KeyValue{
Key: byteRep,
Value: roachpb.Value{
RawBytes: []byte(tsValue.value),
Timestamp: tsValue.ts,
},
})
}

return kv, nil
}

type timestampValue struct {
ts hlc.Timestamp
value string
}

type orderValidator struct {
topic string
partitionForKey map[string]string
keyTimestamps map[string][]hlc.Timestamp
resolved map[string]hlc.Timestamp
topic string
partitionForKey map[string]string
keyTimestampAndValues map[string][]timestampValue
resolved map[string]hlc.Timestamp

failures []string
}

var _ Validator = &orderValidator{}

// NewOrderValidator returns a Validator that checks the row and resolved
// timestamp ordering guarantees. It also asserts that keys have an affinity to
// a single partition.
Expand All @@ -52,17 +121,15 @@ type orderValidator struct {
// lower update timestamp will be emitted on that partition.
func NewOrderValidator(topic string) Validator {
return &orderValidator{
topic: topic,
partitionForKey: make(map[string]string),
keyTimestamps: make(map[string][]hlc.Timestamp),
resolved: make(map[string]hlc.Timestamp),
topic: topic,
partitionForKey: make(map[string]string),
keyTimestampAndValues: make(map[string][]timestampValue),
resolved: make(map[string]hlc.Timestamp),
}
}

// NoteRow implements the Validator interface.
func (v *orderValidator) NoteRow(
partition string, key, ignoredValue string, updated hlc.Timestamp,
) error {
func (v *orderValidator) NoteRow(partition string, key, value string, updated hlc.Timestamp) error {
if prev, ok := v.partitionForKey[key]; ok && prev != partition {
v.failures = append(v.failures, fmt.Sprintf(
`key [%s] received on two partitions: %s and %s`, key, prev, partition,
Expand All @@ -71,17 +138,20 @@ func (v *orderValidator) NoteRow(
}
v.partitionForKey[key] = partition

timestamps := v.keyTimestamps[key]
timestampsIdx := sort.Search(len(timestamps), func(i int) bool {
return updated.LessEq(timestamps[i])
timestampValueTuples := v.keyTimestampAndValues[key]
timestampsIdx := sort.Search(len(timestampValueTuples), func(i int) bool {
return updated.LessEq(timestampValueTuples[i].ts)
})
seen := timestampsIdx < len(timestamps) && timestamps[timestampsIdx] == updated
seen := timestampsIdx < len(timestampValueTuples) &&
timestampValueTuples[timestampsIdx].ts == updated

if !seen && len(timestamps) > 0 && updated.Less(timestamps[len(timestamps)-1]) {
if !seen && len(timestampValueTuples) > 0 &&
updated.Less(timestampValueTuples[len(timestampValueTuples)-1].ts) {
v.failures = append(v.failures, fmt.Sprintf(
`topic %s partition %s: saw new row timestamp %s after %s was seen`,
v.topic, partition,
updated.AsOfSystemTime(), timestamps[len(timestamps)-1].AsOfSystemTime(),
updated.AsOfSystemTime(),
timestampValueTuples[len(timestampValueTuples)-1].ts.AsOfSystemTime(),
))
}
if !seen && updated.Less(v.resolved[partition]) {
Expand All @@ -92,8 +162,12 @@ func (v *orderValidator) NoteRow(
}

if !seen {
v.keyTimestamps[key] = append(
append(timestamps[:timestampsIdx], updated), timestamps[timestampsIdx:]...)
v.keyTimestampAndValues[key] = append(
append(timestampValueTuples[:timestampsIdx], timestampValue{
ts: updated,
value: value,
}),
timestampValueTuples[timestampsIdx:]...)
}
return nil
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/ccl/streamingccl/addresses.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ func (sa StreamAddress) URL() (*url.URL, error) {
// Each partition will emit events for a fixed span of keys.
type PartitionAddress string

// URL parses the partition address as a URL.
func (pa PartitionAddress) URL() (*url.URL, error) {
return url.Parse(string(pa))
}

// Topology is a configuration of stream partitions. These are particular to a
// stream. It specifies the number and addresses of partitions of the stream.
//
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/streamingccl/streamclient/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ go_library(
"//pkg/keys",
"//pkg/roachpb",
"//pkg/sql",
"//pkg/sql/catalog/catalogkeys",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/systemschema",
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/rowenc",
"//pkg/sql/sem/tree",
"//pkg/util/hlc",
"//pkg/util/randutil",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
],
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (sc testStreamClient) GetTopology(

// ConsumePartition implements the Client interface.
func (sc testStreamClient) ConsumePartition(
_ context.Context, _ streamingccl.PartitionAddress, _ time.Time,
_ context.Context, pa streamingccl.PartitionAddress, _ time.Time,
) (chan streamingccl.Event, error) {
sampleKV := roachpb.KeyValue{
Key: []byte("key_1"),
Expand Down
Loading