Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streamclient: add random stream client #59139

Merged
merged 5 commits into from
Jan 25, 2021

Conversation

pbardea
Copy link
Contributor

@pbardea pbardea commented Jan 19, 2021

See individual commits, but this PR does some cleanup while introducing a
stream client implementation that randomly generates rows.

I broke down each change into its own commit for reviews, but let me know
if splitting this into separate PRs would be helpful.

@cockroach-teamcity
Copy link
Member

This change is Reviewable

Copy link
Contributor

@adityamaru adityamaru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 18 of 18 files at r1, 7 of 7 files at r2, 5 of 5 files at r3, 10 of 10 files at r4, 3 of 3 files at r5, 1 of 1 files at r6, 1 of 1 files at r7.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @pbardea)


pkg/ccl/streamingccl/streamclient/client.go, line 30 at r3 (raw file):

the partition and close the event channel

nit: last part of the sentence is repeated.


pkg/ccl/streamingccl/streamclient/client_test.go, line 89 at r6 (raw file):

				panic(fmt.Sprintf("unexpected event type %v", event.Type()))
			}
			numReceivedEvents++

can we delete?


pkg/ccl/streamingccl/streamclient/random_stream_client.go, line 186 at r4 (raw file):

			}

			// TODO: Consider keeping an in-memory copy so that tests can verify

+1 this was on my mind too.


pkg/ccl/streamingccl/streamclient/random_stream_client.go, line 199 at r4 (raw file):

}

func (m *randomStreamClient) makeRandomKey(r *rand.Rand, minTs time.Time) roachpb.KeyValue {

love this new version!


pkg/kv/bulk/sst_batcher.go, line 146 at r4 (raw file):

// Keys must be added in order.
func (b *SSTBatcher) AddMVCCKey(ctx context.Context, key storage.MVCCKey, value []byte) error {
	if len(b.batchEndKey) > 0 && bytes.Equal(b.batchEndKey, key.Key) && b.disallowShadowing {

nice catch

Copy link
Contributor Author

@pbardea pbardea left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @adityamaru and @pbardea)


pkg/ccl/streamingccl/streamclient/client.go, line 30 at r3 (raw file):

Previously, adityamaru (Aditya Maru) wrote…
the partition and close the event channel

nit: last part of the sentence is repeated.

Done.


pkg/ccl/streamingccl/streamclient/client_test.go, line 89 at r6 (raw file):

Previously, adityamaru (Aditya Maru) wrote…

can we delete?

Nice catch. Done.


pkg/ccl/streamingccl/streamclient/stream_client.go, line 36 at r7 (raw file):

Previously, pbardea (Paul Bardea) wrote…

This doesn't need to be a select.

Done.


pkg/kv/bulk/sst_batcher.go, line 146 at r4 (raw file):

Previously, adityamaru (Aditya Maru) wrote…

nice catch

I don't think this was a bug before since we always ingested a snapshot of the data rather than the entire revision history.

@pbardea pbardea force-pushed the random-stream-client branch 6 times, most recently from d7973a7 to 2871c62 Compare January 20, 2021 01:52
@pbardea
Copy link
Contributor Author

pbardea commented Jan 20, 2021

The tests against CI were flaking so I through together a change that allows the tests to inject "observers" into the client. Curious what you think about this approach. It allows the tests to run deterministically and we could leverage this by adding another "observer" in the test that keeps the in-memory state like we were talking about.

@adityamaru
Copy link
Contributor

The tests against CI were flaking so I through together a change that allows the tests to inject "observers" into the client. Curious what you think about this approach. It allows the tests to run deterministically and we could leverage this by adding another "observer" in the test that keeps the in-memory state like we were talking about.

Just looked at it, and it's a good idea. I guess we still rely on being able to get a hold of the client to be able to invoke client.RegisterInterceptor, which will be hard when we write job level tests. We'd probably still need to add a testing knob where we deposit our "interceptors" and then the ingestion processor can register those for us in the NewStreamClient call for test:// URIs?

@pbardea pbardea force-pushed the random-stream-client branch 4 times, most recently from a86a18b to 8ea8ee1 Compare January 21, 2021 15:58
@pbardea pbardea requested review from a team as code owners January 21, 2021 17:14
@pbardea pbardea removed request for a team January 21, 2021 19:12
This commit refactors the stream ingestion processor to do all of its
work during the Next() call rather than starting a parallel producer
goroutine. This was not needed since there is no pipeline of stages to
process in this processor.

Release note: None
Stream clients now take in a context when opening an event stream for a
given partition. To close the event stream returned by the client, the
given context should be cancelled.

Release note: None
This commit introduces a new stream client implementation that generates
events of a specific schema for a table ID that is specified by the
stream URI. Properties of the stream, such as the frequency of the
events and the range of the randomly generated KVs can be controlled
with the appropriate parameters specified in the stream address.

To use the new stream client the `NewStreamClient` constructor has been
modified to accept a stream address. The stream address allows the
client to determine which client implementation should be used.

Further, the addition of this client exposed a bug in the SST batcher
which rejects batches that modify the same key more than once, even if
disallowShadowing is set to false.

Release note: None
@adityamaru
Copy link
Contributor

LGTM!

@pbardea
Copy link
Contributor Author

pbardea commented Jan 25, 2021

TFTR!
bors r+

@craig
Copy link
Contributor

craig bot commented Jan 25, 2021

Build succeeded:

@craig craig bot merged commit b51d9f1 into cockroachdb:master Jan 25, 2021
@shermanCRL
Copy link
Contributor

cc @HonoreDB this might be informative, as its the client of the producer stream

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants