Skip to content

Commit

Permalink
streamingest: add job level test for stream ingestion
Browse files Browse the repository at this point in the history
This change adds a test that exercises all the components of the stream
ingestion flow. It fixes some missing pieces that were discovered while
writing the test.

Release note: None
  • Loading branch information
adityamaru committed Feb 11, 2021
1 parent 81e617e commit a06e6cd
Show file tree
Hide file tree
Showing 10 changed files with 441 additions and 105 deletions.
95 changes: 37 additions & 58 deletions pkg/ccl/changefeedccl/cdctest/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,65 +33,11 @@ type Validator interface {
Failures() []string
}

// StreamClientValidatorWrapper wraps a Validator and exposes additional methods
// used by stream ingestion to check for correctness.
type StreamClientValidatorWrapper interface {
GetValuesForKeyBelowTimestamp(key string, timestamp hlc.Timestamp) ([]roachpb.KeyValue, error)
GetValidator() Validator
}

type streamValidator struct {
type StreamValidator interface {
Validator
}

var _ StreamClientValidatorWrapper = &streamValidator{}

// NewStreamClientValidatorWrapper returns a wrapped Validator, that can be used
// to validate the events emitted by the cluster to cluster streaming client.
// The wrapper currently only "wraps" an orderValidator, but can be built out
// to utilize other Validator's.
// The wrapper also allows querying the orderValidator to retrieve streamed
// events from an in-memory store.
func NewStreamClientValidatorWrapper() StreamClientValidatorWrapper {
ov := NewOrderValidator("unusedC2C")
return &streamValidator{
ov,
}
}

// GetValidator implements the StreamClientValidatorWrapper interface.
func (sv *streamValidator) GetValidator() Validator {
return sv.Validator
}

// GetValuesForKeyBelowTimestamp implements the StreamClientValidatorWrapper
// interface.
// It returns the streamed KV updates for `key` with a ts less than equal to
// `timestamp`.
func (sv *streamValidator) GetValuesForKeyBelowTimestamp(
key string, timestamp hlc.Timestamp,
) ([]roachpb.KeyValue, error) {
orderValidator, ok := sv.GetValidator().(*orderValidator)
if !ok {
return nil, errors.Newf("unknown validator %T: ", sv.GetValidator())
}
timestampValueTuples := orderValidator.keyTimestampAndValues[key]
timestampsIdx := sort.Search(len(timestampValueTuples), func(i int) bool {
return timestamp.Less(timestampValueTuples[i].ts)
})
var kv []roachpb.KeyValue
for _, tsValue := range timestampValueTuples[:timestampsIdx] {
byteRep := []byte(key)
kv = append(kv, roachpb.KeyValue{
Key: byteRep,
Value: roachpb.Value{
RawBytes: []byte(tsValue.value),
Timestamp: tsValue.ts,
},
})
}

return kv, nil
// GetValuesForKeyBelowTimestamp returns the streamed KV updates for `key`
// with a ts less than equal to timestamp`.
GetValuesForKeyBelowTimestamp(key string, timestamp hlc.Timestamp) ([]roachpb.KeyValue, error)
}

type timestampValue struct {
Expand All @@ -109,6 +55,7 @@ type orderValidator struct {
}

var _ Validator = &orderValidator{}
var _ StreamValidator = &orderValidator{}

// NewOrderValidator returns a Validator that checks the row and resolved
// timestamp ordering guarantees. It also asserts that keys have an affinity to
Expand All @@ -128,6 +75,38 @@ func NewOrderValidator(topic string) Validator {
}
}

func NewStreamOrderValidator() StreamValidator {
return &orderValidator{
topic: "unused",
partitionForKey: make(map[string]string),
keyTimestampAndValues: make(map[string][]timestampValue),
resolved: make(map[string]hlc.Timestamp),
}
}

// GetValuesForKeyBelowTimestamp implements the StreamValidator interface.
func (v *orderValidator) GetValuesForKeyBelowTimestamp(
key string, timestamp hlc.Timestamp,
) ([]roachpb.KeyValue, error) {
timestampValueTuples := v.keyTimestampAndValues[key]
timestampsIdx := sort.Search(len(timestampValueTuples), func(i int) bool {
return timestamp.Less(timestampValueTuples[i].ts)
})
var kv []roachpb.KeyValue
for _, tsValue := range timestampValueTuples[:timestampsIdx] {
byteRep := []byte(key)
kv = append(kv, roachpb.KeyValue{
Key: byteRep,
Value: roachpb.Value{
RawBytes: []byte(tsValue.value),
Timestamp: tsValue.ts,
},
})
}

return kv, nil
}

// NoteRow implements the Validator interface.
func (v *orderValidator) NoteRow(partition string, key, value string, updated hlc.Timestamp) error {
if prev, ok := v.partitionForKey[key]; ok && prev != partition {
Expand Down
59 changes: 47 additions & 12 deletions pkg/ccl/streamingccl/streamclient/random_stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -54,6 +53,10 @@ const (
// DupProbability controls the probability with which we emit duplicate KV
// events.
DupProbability = "DUP_PROBABILITY"
// TenantID specifies the ID of the tenant we are ingesting data into. This
// allows the client to prefix the generated KVs with the appropriate tenant
// prefix.
TenantID = "TENANT_ID"
// IngestionDatabaseID is the ID used in the generated table descriptor.
IngestionDatabaseID = 50 /* defaultDB */
// IngestionTablePrefix is the prefix of the table name used in the generated
Expand All @@ -79,6 +82,7 @@ type randomStreamConfig struct {
kvsPerCheckpoint int
numPartitions int
dupProbability float64
tenantID roachpb.TenantID
}

func parseRandomStreamConfig(streamURL *url.URL) (randomStreamConfig, error) {
Expand All @@ -88,6 +92,7 @@ func parseRandomStreamConfig(streamURL *url.URL) (randomStreamConfig, error) {
kvsPerCheckpoint: 100,
numPartitions: 1,
dupProbability: 0.5,
tenantID: roachpb.SystemTenantID,
}

var err error
Expand Down Expand Up @@ -126,6 +131,14 @@ func parseRandomStreamConfig(streamURL *url.URL) (randomStreamConfig, error) {
return c, err
}
}

if tenantIDStr := streamURL.Query().Get(TenantID); tenantIDStr != "" {
id, err := strconv.Atoi(tenantIDStr)
if err != nil {
return c, err
}
c.tenantID = roachpb.MakeTenantID(uint64(id))
}
return c, nil
}

Expand All @@ -146,6 +159,7 @@ type randomStreamClient struct {
syncutil.Mutex

interceptors []func(streamingccl.Event, streamingccl.PartitionAddress)
tableID int
}
}

Expand All @@ -161,16 +175,18 @@ func newRandomStreamClient(streamURL *url.URL) (Client, error) {
return nil, err
}

return &randomStreamClient{
config: streamConfig,
}, nil
client := randomStreamClient{config: streamConfig}
client.mu.Lock()
defer client.mu.Unlock()
client.mu.tableID = 52
return &client, nil
}

var testTableID = 52

func getNextTableID() int {
ret := testTableID
testTableID++
func (m *randomStreamClient) getNextTableID() int {
m.mu.Lock()
defer m.mu.Unlock()
ret := m.mu.tableID
m.mu.tableID++
return ret
}

Expand All @@ -183,7 +199,7 @@ func (m *randomStreamClient) GetTopology(

// Allocate table IDs and return one per partition address in the topology.
for i := 0; i < m.config.numPartitions; i++ {
tableID := descpb.ID(getNextTableID())
tableID := descpb.ID(m.getNextTableID())
partitionURI := url.URL{
Scheme: TestScheme,
Host: strconv.Itoa(int(tableID)),
Expand All @@ -206,28 +222,32 @@ func (m *randomStreamClient) getDescriptorAndNamespaceKVForTableID(
IngestionDatabaseID,
tableID,
fmt.Sprintf(RandomStreamSchemaPlaceholder, tableName),
systemschema.JobsTable.GetPrivileges(),
&descpb.PrivilegeDescriptor{},
)
if err != nil {
return nil, nil, err
}

// Generate namespace entry.
key := catalogkeys.NewTableKey(50, keys.PublicSchemaID, testTable.Name)
k := rekey(m.config.tenantID, key.Key(keys.TODOSQLCodec))
var value roachpb.Value
value.SetInt(int64(testTable.GetID()))
value.InitChecksum(k)
namespaceKV := roachpb.KeyValue{
Key: key.Key(keys.TODOSQLCodec),
Key: k,
Value: value,
}

// Generate descriptor entry.
descKey := catalogkeys.MakeDescMetadataKey(keys.TODOSQLCodec, testTable.GetID())
descKey = rekey(m.config.tenantID, descKey)
descDesc := testTable.DescriptorProto()
var descValue roachpb.Value
if err := descValue.SetProto(descDesc); err != nil {
panic(err)
}
descValue.InitChecksum(descKey)
descKV := roachpb.KeyValue{
Key: descKey,
Value: descValue,
Expand Down Expand Up @@ -331,6 +351,19 @@ func (m *randomStreamClient) ConsumePartition(
return eventCh, nil
}

func rekey(tenantID roachpb.TenantID, k roachpb.Key) roachpb.Key {
// Strip old prefix.
tenantPrefix := keys.MakeTenantPrefix(tenantID)
noTenantPrefix, _, err := keys.DecodeTenantPrefix(k)
if err != nil {
panic(err)
}

// Prepend tenant prefix.
rekeyedKey := append(tenantPrefix, noTenantPrefix...)
return rekeyedKey
}

func (m *randomStreamClient) makeRandomKey(
r *rand.Rand, tableDesc *tabledesc.Mutable,
) roachpb.KeyValue {
Expand All @@ -341,6 +374,8 @@ func (m *randomStreamClient) makeRandomKey(
}
k = keys.MakeFamilyKey(k, uint32(tableDesc.Families[0].ID))

k = rekey(m.config.tenantID, k)

// Create a value holding a random integer.
valueDatum := tree.NewDInt(tree.DInt(r.Intn(m.config.valueRange)))
valueBuf, err := rowenc.EncodeTableValue(
Expand Down
9 changes: 6 additions & 3 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -48,6 +49,10 @@ func ingest(
// KVs. We can skip to ingesting after this resolved ts. Plumb the
// initialHighwatermark to the ingestion processor spec based on what we read
// from the job progress.
var initialHighWater hlc.Timestamp
if h := progress.GetHighWater(); h != nil && !h.IsEmpty() {
initialHighWater = *h
}

evalCtx := execCtx.ExtendedEvalContext()
dsp := execCtx.DistSQLPlanner()
Expand All @@ -59,14 +64,12 @@ func ingest(

// Construct stream ingestion processor specs.
streamIngestionSpecs, streamIngestionFrontierSpec, err := distStreamIngestionPlanSpecs(
streamAddress, topology, nodes, jobID)
streamAddress, topology, nodes, initialHighWater)
if err != nil {
return err
}

// Plan and run the DistSQL flow.
// TODO: Updates from this flow need to feed back into the job to update the
// progress.
err = distStreamIngest(ctx, execCtx, nodes, jobID, planCtx, dsp, streamIngestionSpecs,
streamIngestionFrontierSpec)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/stretchr/testify/require"
)

// TestStreamIngestionJobRollBack tests that the job rollsback the data to the
// TestStreamIngestionJobRollBack tests that the job rolls back the data to the
// start time if there are no progress updates. This test should be expanded
// after the job's progress field is updated as the job runs.
func TestStreamIngestionJobRollBack(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func ingestionPlanHook(
prefix := keys.MakeTenantPrefix(ingestionStmt.Targets.Tenant)
streamIngestionDetails := jobspb.StreamIngestionDetails{
StreamAddress: streamingccl.StreamAddress(from[0]),
Span: roachpb.Span{Key: prefix, EndKey: prefix.Next()},
Span: roachpb.Span{Key: prefix, EndKey: prefix.PrefixEnd()},
// TODO: Figure out what the initial ts should be.
StartTime: hlc.Timestamp{},
}
Expand Down Expand Up @@ -110,7 +110,10 @@ func ingestionPlanHook(
return err
}

return sj.Start(ctx)
if err := sj.Start(ctx); err != nil {
return err
}
return sj.AwaitCompletion(ctx)
}

return fn, utilccl.BulkJobExecutionResultHeader, nil, false, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func distStreamIngestionPlanSpecs(
streamAddress streamingccl.StreamAddress,
topology streamingccl.Topology,
nodes []roachpb.NodeID,
jobID int64,
initialHighWater hlc.Timestamp,
) ([]*execinfrapb.StreamIngestionDataSpec, *execinfrapb.StreamIngestionFrontierSpec, error) {

// For each stream partition in the topology, assign it to a node.
Expand Down Expand Up @@ -63,8 +63,8 @@ func distStreamIngestionPlanSpecs(

// Create a spec for the StreamIngestionFrontier processor on the coordinator
// node.
// TODO: set HighWaterAtStart once the job progress logic has been hooked up.
streamIngestionFrontierSpec := &execinfrapb.StreamIngestionFrontierSpec{TrackedSpans: trackedSpans}
streamIngestionFrontierSpec := &execinfrapb.StreamIngestionFrontierSpec{
HighWaterAtStart: initialHighWater, TrackedSpans: trackedSpans}

return streamIngestionSpecs, streamIngestionFrontierSpec, nil
}
Expand Down
Loading

0 comments on commit a06e6cd

Please sign in to comment.