Skip to content

Commit

Permalink
streamingest: add job level test for stream ingestion
Browse files Browse the repository at this point in the history
This change adds a test that exercises all the components of the stream
ingestion flow. It fixes some missing pieces that were discovered while
writing the test.

Release note: None
  • Loading branch information
adityamaru committed Feb 18, 2021
1 parent bededc2 commit aa6c8b5
Show file tree
Hide file tree
Showing 12 changed files with 472 additions and 107 deletions.
99 changes: 41 additions & 58 deletions pkg/ccl/changefeedccl/cdctest/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,65 +33,13 @@ type Validator interface {
Failures() []string
}

// StreamClientValidatorWrapper wraps a Validator and exposes additional methods
// used by stream ingestion to check for correctness.
type StreamClientValidatorWrapper interface {
GetValuesForKeyBelowTimestamp(key string, timestamp hlc.Timestamp) ([]roachpb.KeyValue, error)
GetValidator() Validator
}

type streamValidator struct {
// StreamValidator wraps a Validator and exposes additional methods for
// introspection.
type StreamValidator interface {
Validator
}

var _ StreamClientValidatorWrapper = &streamValidator{}

// NewStreamClientValidatorWrapper returns a wrapped Validator, that can be used
// to validate the events emitted by the cluster to cluster streaming client.
// The wrapper currently only "wraps" an orderValidator, but can be built out
// to utilize other Validator's.
// The wrapper also allows querying the orderValidator to retrieve streamed
// events from an in-memory store.
func NewStreamClientValidatorWrapper() StreamClientValidatorWrapper {
ov := NewOrderValidator("unusedC2C")
return &streamValidator{
ov,
}
}

// GetValidator implements the StreamClientValidatorWrapper interface.
func (sv *streamValidator) GetValidator() Validator {
return sv.Validator
}

// GetValuesForKeyBelowTimestamp implements the StreamClientValidatorWrapper
// interface.
// It returns the streamed KV updates for `key` with a ts less than equal to
// `timestamp`.
func (sv *streamValidator) GetValuesForKeyBelowTimestamp(
key string, timestamp hlc.Timestamp,
) ([]roachpb.KeyValue, error) {
orderValidator, ok := sv.GetValidator().(*orderValidator)
if !ok {
return nil, errors.Newf("unknown validator %T: ", sv.GetValidator())
}
timestampValueTuples := orderValidator.keyTimestampAndValues[key]
timestampsIdx := sort.Search(len(timestampValueTuples), func(i int) bool {
return timestamp.Less(timestampValueTuples[i].ts)
})
var kv []roachpb.KeyValue
for _, tsValue := range timestampValueTuples[:timestampsIdx] {
byteRep := []byte(key)
kv = append(kv, roachpb.KeyValue{
Key: byteRep,
Value: roachpb.Value{
RawBytes: []byte(tsValue.value),
Timestamp: tsValue.ts,
},
})
}

return kv, nil
// GetValuesForKeyBelowTimestamp returns the streamed KV updates for `key`
// with a ts less than equal to timestamp`.
GetValuesForKeyBelowTimestamp(key string, timestamp hlc.Timestamp) ([]roachpb.KeyValue, error)
}

type timestampValue struct {
Expand All @@ -109,6 +57,7 @@ type orderValidator struct {
}

var _ Validator = &orderValidator{}
var _ StreamValidator = &orderValidator{}

// NewOrderValidator returns a Validator that checks the row and resolved
// timestamp ordering guarantees. It also asserts that keys have an affinity to
Expand All @@ -128,6 +77,40 @@ func NewOrderValidator(topic string) Validator {
}
}

// NewStreamOrderValidator wraps and orderValidator as described above, and
// exposes additional methods for introspection.
func NewStreamOrderValidator() StreamValidator {
return &orderValidator{
topic: "unused",
partitionForKey: make(map[string]string),
keyTimestampAndValues: make(map[string][]timestampValue),
resolved: make(map[string]hlc.Timestamp),
}
}

// GetValuesForKeyBelowTimestamp implements the StreamValidator interface.
func (v *orderValidator) GetValuesForKeyBelowTimestamp(
key string, timestamp hlc.Timestamp,
) ([]roachpb.KeyValue, error) {
timestampValueTuples := v.keyTimestampAndValues[key]
timestampsIdx := sort.Search(len(timestampValueTuples), func(i int) bool {
return timestamp.Less(timestampValueTuples[i].ts)
})
var kv []roachpb.KeyValue
for _, tsValue := range timestampValueTuples[:timestampsIdx] {
byteRep := []byte(key)
kv = append(kv, roachpb.KeyValue{
Key: byteRep,
Value: roachpb.Value{
RawBytes: []byte(tsValue.value),
Timestamp: tsValue.ts,
},
})
}

return kv, nil
}

// NoteRow implements the Validator interface.
func (v *orderValidator) NoteRow(partition string, key, value string, updated hlc.Timestamp) error {
if prev, ok := v.partitionForKey[key]; ok && prev != partition {
Expand Down
1 change: 0 additions & 1 deletion pkg/ccl/streamingccl/streamclient/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ go_library(
"//pkg/sql",
"//pkg/sql/catalog/catalogkeys",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/systemschema",
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/rowenc",
"//pkg/sql/sem/tree",
Expand Down
60 changes: 48 additions & 12 deletions pkg/ccl/streamingccl/streamclient/random_stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -54,6 +53,10 @@ const (
// DupProbability controls the probability with which we emit duplicate KV
// events.
DupProbability = "DUP_PROBABILITY"
// TenantID specifies the ID of the tenant we are ingesting data into. This
// allows the client to prefix the generated KVs with the appropriate tenant
// prefix.
TenantID = "TENANT_ID"
// IngestionDatabaseID is the ID used in the generated table descriptor.
IngestionDatabaseID = 50 /* defaultDB */
// IngestionTablePrefix is the prefix of the table name used in the generated
Expand All @@ -79,6 +82,7 @@ type randomStreamConfig struct {
kvsPerCheckpoint int
numPartitions int
dupProbability float64
tenantID roachpb.TenantID
}

func parseRandomStreamConfig(streamURL *url.URL) (randomStreamConfig, error) {
Expand All @@ -88,6 +92,7 @@ func parseRandomStreamConfig(streamURL *url.URL) (randomStreamConfig, error) {
kvsPerCheckpoint: 100,
numPartitions: 1,
dupProbability: 0.5,
tenantID: roachpb.SystemTenantID,
}

var err error
Expand Down Expand Up @@ -126,6 +131,14 @@ func parseRandomStreamConfig(streamURL *url.URL) (randomStreamConfig, error) {
return c, err
}
}

if tenantIDStr := streamURL.Query().Get(TenantID); tenantIDStr != "" {
id, err := strconv.Atoi(tenantIDStr)
if err != nil {
return c, err
}
c.tenantID = roachpb.MakeTenantID(uint64(id))
}
return c, nil
}

Expand All @@ -147,6 +160,7 @@ type randomStreamClient struct {
// interceptors can be registered to peek at every event generated by this
// client.
interceptors []func(streamingccl.Event, streamingccl.PartitionAddress)
tableID int
}
}

Expand All @@ -162,16 +176,18 @@ func newRandomStreamClient(streamURL *url.URL) (Client, error) {
return nil, err
}

return &randomStreamClient{
config: streamConfig,
}, nil
client := randomStreamClient{config: streamConfig}
client.mu.Lock()
defer client.mu.Unlock()
client.mu.tableID = 52
return &client, nil
}

var testTableID = 52

func getNextTableID() int {
ret := testTableID
testTableID++
func (m *randomStreamClient) getNextTableID() int {
m.mu.Lock()
defer m.mu.Unlock()
ret := m.mu.tableID
m.mu.tableID++
return ret
}

Expand All @@ -184,7 +200,7 @@ func (m *randomStreamClient) GetTopology(

// Allocate table IDs and return one per partition address in the topology.
for i := 0; i < m.config.numPartitions; i++ {
tableID := descpb.ID(getNextTableID())
tableID := descpb.ID(m.getNextTableID())
partitionURI := url.URL{
Scheme: TestScheme,
Host: strconv.Itoa(int(tableID)),
Expand All @@ -207,28 +223,32 @@ func (m *randomStreamClient) getDescriptorAndNamespaceKVForTableID(
IngestionDatabaseID,
tableID,
fmt.Sprintf(RandomStreamSchemaPlaceholder, tableName),
systemschema.JobsTable.GetPrivileges(),
&descpb.PrivilegeDescriptor{},
)
if err != nil {
return nil, nil, err
}

// Generate namespace entry.
key := catalogkeys.NewTableKey(50, keys.PublicSchemaID, testTable.Name)
k := rekey(m.config.tenantID, key.Key(keys.TODOSQLCodec))
var value roachpb.Value
value.SetInt(int64(testTable.GetID()))
value.InitChecksum(k)
namespaceKV := roachpb.KeyValue{
Key: key.Key(keys.TODOSQLCodec),
Key: k,
Value: value,
}

// Generate descriptor entry.
descKey := catalogkeys.MakeDescMetadataKey(keys.TODOSQLCodec, testTable.GetID())
descKey = rekey(m.config.tenantID, descKey)
descDesc := testTable.DescriptorProto()
var descValue roachpb.Value
if err := descValue.SetProto(descDesc); err != nil {
panic(err)
}
descValue.InitChecksum(descKey)
descKV := roachpb.KeyValue{
Key: descKey,
Value: descValue,
Expand Down Expand Up @@ -280,6 +300,7 @@ func (m *randomStreamClient) ConsumePartition(
resolvedTime := timeutil.Now()
hlcResolvedTime := hlc.Timestamp{WallTime: resolvedTime.UnixNano()}
event = streamingccl.MakeCheckpointEvent(hlcResolvedTime)
dupKVEvent = nil

numKVEventsSinceLastResolved = 0
} else {
Expand Down Expand Up @@ -328,6 +349,19 @@ func (m *randomStreamClient) ConsumePartition(
return eventCh, nil
}

func rekey(tenantID roachpb.TenantID, k roachpb.Key) roachpb.Key {
// Strip old prefix.
tenantPrefix := keys.MakeTenantPrefix(tenantID)
noTenantPrefix, _, err := keys.DecodeTenantPrefix(k)
if err != nil {
panic(err)
}

// Prepend tenant prefix.
rekeyedKey := append(tenantPrefix, noTenantPrefix...)
return rekeyedKey
}

func (m *randomStreamClient) makeRandomKey(
r *rand.Rand, tableDesc *tabledesc.Mutable,
) roachpb.KeyValue {
Expand All @@ -338,6 +372,8 @@ func (m *randomStreamClient) makeRandomKey(
}
k = keys.MakeFamilyKey(k, uint32(tableDesc.Families[0].ID))

k = rekey(m.config.tenantID, k)

// Create a value holding a random integer.
valueDatum := tree.NewDInt(tree.DInt(r.Intn(m.config.valueRange)))
valueBuf, err := rowenc.EncodeTableValue(
Expand Down
10 changes: 9 additions & 1 deletion pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ go_library(
"stream_ingestion_planning.go",
"stream_ingestion_processor.go",
"stream_ingestion_processor_planning.go",
"stream_ingestion_test_utils.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamingest",
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/changefeedccl/cdctest",
"//pkg/ccl/storageccl",
"//pkg/ccl/streamingccl",
"//pkg/ccl/streamingccl/streamclient",
Expand All @@ -38,6 +40,7 @@ go_library(
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/span",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"@com_github_cockroachdb_errors//:errors",
Expand All @@ -55,11 +58,12 @@ go_test(
"stream_ingestion_frontier_processor_test.go",
"stream_ingestion_job_test.go",
"stream_ingestion_processor_test.go",
"stream_ingestion_test.go",
],
embed = [":streamingest"],
deps = [
"//pkg/base",
"//pkg/ccl/changefeedccl/cdctest",
"//pkg/ccl/kvccl/kvtenantccl",
"//pkg/ccl/storageccl",
"//pkg/ccl/streamingccl",
"//pkg/ccl/streamingccl/streamclient",
Expand All @@ -68,6 +72,7 @@ go_test(
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvserver",
"//pkg/roachpb",
"//pkg/security",
"//pkg/security/securitytest",
Expand All @@ -80,14 +85,17 @@ go_test(
"//pkg/storage",
"//pkg/testutils",
"//pkg/testutils/distsqlutils",
"//pkg/testutils/jobutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/randutil",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
Expand Down
Loading

0 comments on commit aa6c8b5

Please sign in to comment.