Skip to content

Commit

Permalink
streamingest: add job level test for stream ingestion
Browse files Browse the repository at this point in the history
This change adds a test that exercises all the components of the stream
ingestion flow. It fixes some missing pieces that were discovered while
writing the test.

Release note: None
  • Loading branch information
adityamaru committed Jan 29, 2021
1 parent a2e1ef8 commit 665b56d
Show file tree
Hide file tree
Showing 8 changed files with 383 additions and 59 deletions.
37 changes: 28 additions & 9 deletions pkg/ccl/changefeedccl/cdctest/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
)

Expand All @@ -36,15 +37,17 @@ type Validator interface {
// StreamClientValidatorWrapper wraps a Validator and exposes additional methods
// used by stream ingestion to check for correctness.
type StreamClientValidatorWrapper interface {
Validator
GetValuesForKeyBelowTimestamp(key string, timestamp hlc.Timestamp) ([]roachpb.KeyValue, error)
GetValidator() Validator
}

type streamValidator struct {
Validator
mu syncutil.Mutex
}

var _ StreamClientValidatorWrapper = &streamValidator{}
var _ Validator = &streamValidator{}

// NewStreamClientValidatorWrapper returns a wrapped Validator, that can be used
// to validate the events emitted by the cluster to cluster streaming client.
Expand All @@ -55,25 +58,20 @@ var _ StreamClientValidatorWrapper = &streamValidator{}
func NewStreamClientValidatorWrapper() StreamClientValidatorWrapper {
ov := NewOrderValidator("unusedC2C")
return &streamValidator{
ov,
Validator: ov,
}
}

// GetValidator implements the StreamClientValidatorWrapper interface.
func (sv *streamValidator) GetValidator() Validator {
return sv.Validator
}

// GetValuesForKeyBelowTimestamp implements the StreamClientValidatorWrapper
// interface.
// It returns the streamed KV updates for `key` with a ts less than equal to
// `timestamp`.
func (sv *streamValidator) GetValuesForKeyBelowTimestamp(
key string, timestamp hlc.Timestamp,
) ([]roachpb.KeyValue, error) {
orderValidator, ok := sv.GetValidator().(*orderValidator)
orderValidator, ok := sv.Validator.(*orderValidator)
if !ok {
return nil, errors.Newf("unknown validator %T: ", sv.GetValidator())
return nil, errors.Newf("unknown validator %T: ", sv.Validator)
}
timestampValueTuples := orderValidator.keyTimestampAndValues[key]
timestampsIdx := sort.Search(len(timestampValueTuples), func(i int) bool {
Expand All @@ -94,6 +92,27 @@ func (sv *streamValidator) GetValuesForKeyBelowTimestamp(
return kv, nil
}

// NoteRow implements the Validator interface.
func (sv *streamValidator) NoteRow(
partition string, key, value string, updated hlc.Timestamp,
) error {
sv.mu.Lock()
defer sv.mu.Unlock()
return sv.Validator.NoteRow(partition, key, value, updated)
}

// NoteResolved implements the Validator interface.
func (sv *streamValidator) NoteResolved(partition string, resolved hlc.Timestamp) error {
sv.mu.Lock()
defer sv.mu.Unlock()
return sv.Validator.NoteResolved(partition, resolved)
}

// Failures implements the Validator interface.
func (sv *streamValidator) Failures() []string {
return sv.Validator.Failures()
}

type timestampValue struct {
ts hlc.Timestamp
value string
Expand Down
61 changes: 48 additions & 13 deletions pkg/ccl/streamingccl/streamclient/random_stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -54,6 +53,10 @@ const (
// DupProbability controls the probability with which we emit duplicate KV
// events.
DupProbability = "DUP_PROBABILITY"
// TenantID specifies the ID of the tenant we are ingesting data into. This
// allows the client to prefix the generated KVs with the appropriate tenant
// prefix.
TenantID = "TENANT_ID"
// IngestionDatabaseID is the ID used in the generated table descriptor.
IngestionDatabaseID = 50 /* defaultDB */
// IngestionTablePrefix is the prefix of the table name used in the generated
Expand All @@ -79,6 +82,7 @@ type randomStreamConfig struct {
kvsPerCheckpoint int
numPartitions int
dupProbability float64
tenantID roachpb.TenantID
}

func parseRandomStreamConfig(streamURL *url.URL) (randomStreamConfig, error) {
Expand All @@ -88,6 +92,7 @@ func parseRandomStreamConfig(streamURL *url.URL) (randomStreamConfig, error) {
kvsPerCheckpoint: 100,
numPartitions: 1,
dupProbability: 0.5,
tenantID: roachpb.SystemTenantID,
}

var err error
Expand Down Expand Up @@ -126,6 +131,14 @@ func parseRandomStreamConfig(streamURL *url.URL) (randomStreamConfig, error) {
return c, err
}
}

if tenantIDStr := streamURL.Query().Get(TenantID); tenantIDStr != "" {
id, err := strconv.Atoi(tenantIDStr)
if err != nil {
return c, err
}
c.tenantID = roachpb.MakeTenantID(uint64(id))
}
return c, nil
}

Expand All @@ -147,6 +160,7 @@ type randomStreamClient struct {
syncutil.Mutex

interceptors []func(streamingccl.Event)
tableID int
}
}

Expand All @@ -162,16 +176,18 @@ func newRandomStreamClient(streamURL *url.URL) (Client, error) {
return nil, err
}

return &randomStreamClient{
config: streamConfig,
}, nil
client := randomStreamClient{config: streamConfig}
client.mu.Lock()
defer client.mu.Unlock()
client.mu.tableID = 52
return &client, nil
}

var testTableID = 52

func getNextTableID() int {
ret := testTableID
testTableID++
func (m *randomStreamClient) getNextTableID() int {
m.mu.Lock()
defer m.mu.Unlock()
ret := m.mu.tableID
m.mu.tableID++
return ret
}

Expand All @@ -184,7 +200,7 @@ func (m *randomStreamClient) GetTopology(

// Allocate table IDs and return one per partition address in the topology.
for i := 0; i < m.config.numPartitions; i++ {
tableID := descpb.ID(getNextTableID())
tableID := descpb.ID(m.getNextTableID())
partitionURI := url.URL{
Scheme: TestScheme,
Host: strconv.Itoa(int(tableID)),
Expand All @@ -207,28 +223,32 @@ func (m *randomStreamClient) getDescriptorAndNamespaceKVForTableID(
IngestionDatabaseID,
tableID,
fmt.Sprintf(RandomStreamSchemaPlaceholder, tableName),
systemschema.JobsTable.Privileges,
&descpb.PrivilegeDescriptor{},
)
if err != nil {
return nil, nil, err
}

//// Generate namespace entry.
// Generate namespace entry.
key := catalogkeys.NewTableKey(50, keys.PublicSchemaID, testTable.Name)
k := rekey(m.config.tenantID, key.Key(keys.TODOSQLCodec))
var value roachpb.Value
value.SetInt(int64(testTable.GetID()))
value.InitChecksum(k)
namespaceKV := roachpb.KeyValue{
Key: key.Key(keys.TODOSQLCodec),
Key: k,
Value: value,
}

// Generate descriptor entry.
descKey := catalogkeys.MakeDescMetadataKey(keys.TODOSQLCodec, testTable.GetID())
descKey = rekey(m.config.tenantID, descKey)
descDesc := testTable.DescriptorProto()
var descValue roachpb.Value
if err := descValue.SetProto(descDesc); err != nil {
panic(err)
}
descValue.InitChecksum(descKey)
descKV := roachpb.KeyValue{
Key: descKey,
Value: descValue,
Expand Down Expand Up @@ -332,6 +352,19 @@ func (m *randomStreamClient) ConsumePartition(
return eventCh, nil
}

func rekey(tenantID roachpb.TenantID, k roachpb.Key) roachpb.Key {
// Strip old prefix.
tenantPrefix := keys.MakeTenantPrefix(tenantID)
noTenantPrefix, _, err := keys.DecodeTenantPrefix(k)
if err != nil {
panic(err)
}

// Prepend tenant prefix.
rekeyedKey := append(tenantPrefix, noTenantPrefix...)
return rekeyedKey
}

func (m *randomStreamClient) makeRandomKey(
r *rand.Rand, tableDesc *tabledesc.Mutable,
) roachpb.KeyValue {
Expand All @@ -342,6 +375,8 @@ func (m *randomStreamClient) makeRandomKey(
}
k = keys.MakeFamilyKey(k, uint32(tableDesc.Families[0].ID))

k = rekey(m.config.tenantID, k)

// Create a value holding a random integer.
valueDatum := tree.NewDInt(tree.DInt(r.Intn(m.config.valueRange)))
valueBuf, err := rowenc.EncodeTableValue(
Expand Down
9 changes: 6 additions & 3 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -48,6 +49,10 @@ func ingest(
// KVs. We can skip to ingesting after this resolved ts. Plumb the
// initialHighwatermark to the ingestion processor spec based on what we read
// from the job progress.
var initialHighWater hlc.Timestamp
if h := progress.GetHighWater(); h != nil && !h.IsEmpty() {
initialHighWater = *h
}

evalCtx := execCtx.ExtendedEvalContext()
dsp := execCtx.DistSQLPlanner()
Expand All @@ -59,14 +64,12 @@ func ingest(

// Construct stream ingestion processor specs.
streamIngestionSpecs, streamIngestionFrontierSpec, err := distStreamIngestionPlanSpecs(
streamAddress, topology, nodes, jobID)
streamAddress, topology, nodes, initialHighWater)
if err != nil {
return err
}

// Plan and run the DistSQL flow.
// TODO: Updates from this flow need to feed back into the job to update the
// progress.
err = distStreamIngest(ctx, execCtx, nodes, jobID, planCtx, dsp, streamIngestionSpecs,
streamIngestionFrontierSpec)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/stretchr/testify/require"
)

// TestStreamIngestionJobRollBack tests that the job rollsback the data to the
// TestStreamIngestionJobRollBack tests that the job rolls back the data to the
// start time if there are no progress updates. This test should be expanded
// after the job's progress field is updated as the job runs.
func TestStreamIngestionJobRollBack(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func ingestionPlanHook(
prefix := keys.MakeTenantPrefix(ingestionStmt.Targets.Tenant)
streamIngestionDetails := jobspb.StreamIngestionDetails{
StreamAddress: streamingccl.StreamAddress(from[0]),
Span: roachpb.Span{Key: prefix, EndKey: prefix.Next()},
Span: roachpb.Span{Key: prefix, EndKey: prefix.PrefixEnd()},
// TODO: Figure out what the initial ts should be.
StartTime: hlc.Timestamp{},
}
Expand Down Expand Up @@ -110,7 +110,10 @@ func ingestionPlanHook(
return err
}

return sj.Start(ctx)
if err := sj.Start(ctx); err != nil {
return err
}
return sj.AwaitCompletion(ctx)
}

return fn, utilccl.BulkJobExecutionResultHeader, nil, false, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func distStreamIngestionPlanSpecs(
streamAddress streamingccl.StreamAddress,
topology streamingccl.Topology,
nodes []roachpb.NodeID,
jobID int64,
initialHighWater hlc.Timestamp,
) ([]*execinfrapb.StreamIngestionDataSpec, *execinfrapb.StreamIngestionFrontierSpec, error) {

// For each stream partition in the topology, assign it to a node.
Expand Down Expand Up @@ -63,8 +63,8 @@ func distStreamIngestionPlanSpecs(

// Create a spec for the StreamIngestionFrontier processor on the coordinator
// node.
// TODO: set HighWaterAtStart once the job progress logic has been hooked up.
streamIngestionFrontierSpec := &execinfrapb.StreamIngestionFrontierSpec{TrackedSpans: trackedSpans}
streamIngestionFrontierSpec := &execinfrapb.StreamIngestionFrontierSpec{
HighWaterAtStart: initialHighWater, TrackedSpans: trackedSpans}

return streamIngestionSpecs, streamIngestionFrontierSpec, nil
}
Expand Down
Loading

0 comments on commit 665b56d

Please sign in to comment.