Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
59588: streamingest: add job level test for stream ingestion r=pbardea a=adityamaru

This change adds a test that exercises all the components of the stream
ingestion flow. It fixes some missing pieces that were discovered while
writing the test.

Informs: #59175

Release note: None

60424: sql: include sampled stats in TestSampledStatsCollection r=yuzefovich a=asubiotto

Depends on #59992, which is required for this new regression test to pass.

TestSampledStatsCollection would previously only check that stats that are
collected regardless of the sample rate are returned. These types of stats
(rows/bytes read) are propagated using metadata, rather than the trace.

This resulted in us silently failing to collect any stats when sampling was
enabled once the tracing mode was reverted back to legacy. To avoid this kind
of thing happening again, this commit adds a check that max memory usage is
reported to be non-zero.

Release note: None (this is a new feature that has no user impact yet)

60626: kvserver: initialize propBuf LAI tracking r=andreimatei a=andreimatei

The initialization of the LAI tracking in the proposal buffer seems
pretty lacking (see #60625). This patch adds initialization of
propBuf.liBase at propBuf.Init() time, which is irrelevant for
production, but will help future tests which will surely want the
a propBuf's first assigned LAIs to have some relationship to the replica
state.

Release note: None

Co-authored-by: Aditya Maru <adityamaru@gmail.com>
Co-authored-by: Alfonso Subiotto Marques <alfonso@cockroachlabs.com>
Co-authored-by: Andrei Matei <andrei@cockroachlabs.com>
  • Loading branch information
4 people committed Feb 18, 2021
4 parents c142ac5 + aa6c8b5 + 610125f + 8fbe6fc commit 80dd7d0
Show file tree
Hide file tree
Showing 15 changed files with 478 additions and 108 deletions.
99 changes: 41 additions & 58 deletions pkg/ccl/changefeedccl/cdctest/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,65 +33,13 @@ type Validator interface {
Failures() []string
}

// StreamClientValidatorWrapper wraps a Validator and exposes additional methods
// used by stream ingestion to check for correctness.
type StreamClientValidatorWrapper interface {
GetValuesForKeyBelowTimestamp(key string, timestamp hlc.Timestamp) ([]roachpb.KeyValue, error)
GetValidator() Validator
}

type streamValidator struct {
// StreamValidator wraps a Validator and exposes additional methods for
// introspection.
type StreamValidator interface {
Validator
}

var _ StreamClientValidatorWrapper = &streamValidator{}

// NewStreamClientValidatorWrapper returns a wrapped Validator, that can be used
// to validate the events emitted by the cluster to cluster streaming client.
// The wrapper currently only "wraps" an orderValidator, but can be built out
// to utilize other Validator's.
// The wrapper also allows querying the orderValidator to retrieve streamed
// events from an in-memory store.
func NewStreamClientValidatorWrapper() StreamClientValidatorWrapper {
ov := NewOrderValidator("unusedC2C")
return &streamValidator{
ov,
}
}

// GetValidator implements the StreamClientValidatorWrapper interface.
func (sv *streamValidator) GetValidator() Validator {
return sv.Validator
}

// GetValuesForKeyBelowTimestamp implements the StreamClientValidatorWrapper
// interface.
// It returns the streamed KV updates for `key` with a ts less than equal to
// `timestamp`.
func (sv *streamValidator) GetValuesForKeyBelowTimestamp(
key string, timestamp hlc.Timestamp,
) ([]roachpb.KeyValue, error) {
orderValidator, ok := sv.GetValidator().(*orderValidator)
if !ok {
return nil, errors.Newf("unknown validator %T: ", sv.GetValidator())
}
timestampValueTuples := orderValidator.keyTimestampAndValues[key]
timestampsIdx := sort.Search(len(timestampValueTuples), func(i int) bool {
return timestamp.Less(timestampValueTuples[i].ts)
})
var kv []roachpb.KeyValue
for _, tsValue := range timestampValueTuples[:timestampsIdx] {
byteRep := []byte(key)
kv = append(kv, roachpb.KeyValue{
Key: byteRep,
Value: roachpb.Value{
RawBytes: []byte(tsValue.value),
Timestamp: tsValue.ts,
},
})
}

return kv, nil
// GetValuesForKeyBelowTimestamp returns the streamed KV updates for `key`
// with a ts less than equal to timestamp`.
GetValuesForKeyBelowTimestamp(key string, timestamp hlc.Timestamp) ([]roachpb.KeyValue, error)
}

type timestampValue struct {
Expand All @@ -109,6 +57,7 @@ type orderValidator struct {
}

var _ Validator = &orderValidator{}
var _ StreamValidator = &orderValidator{}

// NewOrderValidator returns a Validator that checks the row and resolved
// timestamp ordering guarantees. It also asserts that keys have an affinity to
Expand All @@ -128,6 +77,40 @@ func NewOrderValidator(topic string) Validator {
}
}

// NewStreamOrderValidator wraps and orderValidator as described above, and
// exposes additional methods for introspection.
func NewStreamOrderValidator() StreamValidator {
return &orderValidator{
topic: "unused",
partitionForKey: make(map[string]string),
keyTimestampAndValues: make(map[string][]timestampValue),
resolved: make(map[string]hlc.Timestamp),
}
}

// GetValuesForKeyBelowTimestamp implements the StreamValidator interface.
func (v *orderValidator) GetValuesForKeyBelowTimestamp(
key string, timestamp hlc.Timestamp,
) ([]roachpb.KeyValue, error) {
timestampValueTuples := v.keyTimestampAndValues[key]
timestampsIdx := sort.Search(len(timestampValueTuples), func(i int) bool {
return timestamp.Less(timestampValueTuples[i].ts)
})
var kv []roachpb.KeyValue
for _, tsValue := range timestampValueTuples[:timestampsIdx] {
byteRep := []byte(key)
kv = append(kv, roachpb.KeyValue{
Key: byteRep,
Value: roachpb.Value{
RawBytes: []byte(tsValue.value),
Timestamp: tsValue.ts,
},
})
}

return kv, nil
}

// NoteRow implements the Validator interface.
func (v *orderValidator) NoteRow(partition string, key, value string, updated hlc.Timestamp) error {
if prev, ok := v.partitionForKey[key]; ok && prev != partition {
Expand Down
1 change: 0 additions & 1 deletion pkg/ccl/streamingccl/streamclient/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ go_library(
"//pkg/sql",
"//pkg/sql/catalog/catalogkeys",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/systemschema",
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/rowenc",
"//pkg/sql/sem/tree",
Expand Down
60 changes: 48 additions & 12 deletions pkg/ccl/streamingccl/streamclient/random_stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -54,6 +53,10 @@ const (
// DupProbability controls the probability with which we emit duplicate KV
// events.
DupProbability = "DUP_PROBABILITY"
// TenantID specifies the ID of the tenant we are ingesting data into. This
// allows the client to prefix the generated KVs with the appropriate tenant
// prefix.
TenantID = "TENANT_ID"
// IngestionDatabaseID is the ID used in the generated table descriptor.
IngestionDatabaseID = 50 /* defaultDB */
// IngestionTablePrefix is the prefix of the table name used in the generated
Expand All @@ -79,6 +82,7 @@ type randomStreamConfig struct {
kvsPerCheckpoint int
numPartitions int
dupProbability float64
tenantID roachpb.TenantID
}

func parseRandomStreamConfig(streamURL *url.URL) (randomStreamConfig, error) {
Expand All @@ -88,6 +92,7 @@ func parseRandomStreamConfig(streamURL *url.URL) (randomStreamConfig, error) {
kvsPerCheckpoint: 100,
numPartitions: 1,
dupProbability: 0.5,
tenantID: roachpb.SystemTenantID,
}

var err error
Expand Down Expand Up @@ -126,6 +131,14 @@ func parseRandomStreamConfig(streamURL *url.URL) (randomStreamConfig, error) {
return c, err
}
}

if tenantIDStr := streamURL.Query().Get(TenantID); tenantIDStr != "" {
id, err := strconv.Atoi(tenantIDStr)
if err != nil {
return c, err
}
c.tenantID = roachpb.MakeTenantID(uint64(id))
}
return c, nil
}

Expand All @@ -147,6 +160,7 @@ type randomStreamClient struct {
// interceptors can be registered to peek at every event generated by this
// client.
interceptors []func(streamingccl.Event, streamingccl.PartitionAddress)
tableID int
}
}

Expand All @@ -162,16 +176,18 @@ func newRandomStreamClient(streamURL *url.URL) (Client, error) {
return nil, err
}

return &randomStreamClient{
config: streamConfig,
}, nil
client := randomStreamClient{config: streamConfig}
client.mu.Lock()
defer client.mu.Unlock()
client.mu.tableID = 52
return &client, nil
}

var testTableID = 52

func getNextTableID() int {
ret := testTableID
testTableID++
func (m *randomStreamClient) getNextTableID() int {
m.mu.Lock()
defer m.mu.Unlock()
ret := m.mu.tableID
m.mu.tableID++
return ret
}

Expand All @@ -184,7 +200,7 @@ func (m *randomStreamClient) GetTopology(

// Allocate table IDs and return one per partition address in the topology.
for i := 0; i < m.config.numPartitions; i++ {
tableID := descpb.ID(getNextTableID())
tableID := descpb.ID(m.getNextTableID())
partitionURI := url.URL{
Scheme: TestScheme,
Host: strconv.Itoa(int(tableID)),
Expand All @@ -207,28 +223,32 @@ func (m *randomStreamClient) getDescriptorAndNamespaceKVForTableID(
IngestionDatabaseID,
tableID,
fmt.Sprintf(RandomStreamSchemaPlaceholder, tableName),
systemschema.JobsTable.GetPrivileges(),
&descpb.PrivilegeDescriptor{},
)
if err != nil {
return nil, nil, err
}

// Generate namespace entry.
key := catalogkeys.NewTableKey(50, keys.PublicSchemaID, testTable.Name)
k := rekey(m.config.tenantID, key.Key(keys.TODOSQLCodec))
var value roachpb.Value
value.SetInt(int64(testTable.GetID()))
value.InitChecksum(k)
namespaceKV := roachpb.KeyValue{
Key: key.Key(keys.TODOSQLCodec),
Key: k,
Value: value,
}

// Generate descriptor entry.
descKey := catalogkeys.MakeDescMetadataKey(keys.TODOSQLCodec, testTable.GetID())
descKey = rekey(m.config.tenantID, descKey)
descDesc := testTable.DescriptorProto()
var descValue roachpb.Value
if err := descValue.SetProto(descDesc); err != nil {
panic(err)
}
descValue.InitChecksum(descKey)
descKV := roachpb.KeyValue{
Key: descKey,
Value: descValue,
Expand Down Expand Up @@ -280,6 +300,7 @@ func (m *randomStreamClient) ConsumePartition(
resolvedTime := timeutil.Now()
hlcResolvedTime := hlc.Timestamp{WallTime: resolvedTime.UnixNano()}
event = streamingccl.MakeCheckpointEvent(hlcResolvedTime)
dupKVEvent = nil

numKVEventsSinceLastResolved = 0
} else {
Expand Down Expand Up @@ -328,6 +349,19 @@ func (m *randomStreamClient) ConsumePartition(
return eventCh, nil
}

func rekey(tenantID roachpb.TenantID, k roachpb.Key) roachpb.Key {
// Strip old prefix.
tenantPrefix := keys.MakeTenantPrefix(tenantID)
noTenantPrefix, _, err := keys.DecodeTenantPrefix(k)
if err != nil {
panic(err)
}

// Prepend tenant prefix.
rekeyedKey := append(tenantPrefix, noTenantPrefix...)
return rekeyedKey
}

func (m *randomStreamClient) makeRandomKey(
r *rand.Rand, tableDesc *tabledesc.Mutable,
) roachpb.KeyValue {
Expand All @@ -338,6 +372,8 @@ func (m *randomStreamClient) makeRandomKey(
}
k = keys.MakeFamilyKey(k, uint32(tableDesc.Families[0].ID))

k = rekey(m.config.tenantID, k)

// Create a value holding a random integer.
valueDatum := tree.NewDInt(tree.DInt(r.Intn(m.config.valueRange)))
valueBuf, err := rowenc.EncodeTableValue(
Expand Down
10 changes: 9 additions & 1 deletion pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ go_library(
"stream_ingestion_planning.go",
"stream_ingestion_processor.go",
"stream_ingestion_processor_planning.go",
"stream_ingestion_test_utils.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamingest",
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/changefeedccl/cdctest",
"//pkg/ccl/storageccl",
"//pkg/ccl/streamingccl",
"//pkg/ccl/streamingccl/streamclient",
Expand All @@ -38,6 +40,7 @@ go_library(
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/span",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"@com_github_cockroachdb_errors//:errors",
Expand All @@ -55,11 +58,12 @@ go_test(
"stream_ingestion_frontier_processor_test.go",
"stream_ingestion_job_test.go",
"stream_ingestion_processor_test.go",
"stream_ingestion_test.go",
],
embed = [":streamingest"],
deps = [
"//pkg/base",
"//pkg/ccl/changefeedccl/cdctest",
"//pkg/ccl/kvccl/kvtenantccl",
"//pkg/ccl/storageccl",
"//pkg/ccl/streamingccl",
"//pkg/ccl/streamingccl/streamclient",
Expand All @@ -68,6 +72,7 @@ go_test(
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvserver",
"//pkg/roachpb",
"//pkg/security",
"//pkg/security/securitytest",
Expand All @@ -80,14 +85,17 @@ go_test(
"//pkg/storage",
"//pkg/testutils",
"//pkg/testutils/distsqlutils",
"//pkg/testutils/jobutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/randutil",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
Expand Down
Loading

0 comments on commit 80dd7d0

Please sign in to comment.