Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streamclient: add random stream client #59139

Merged
merged 5 commits into from
Jan 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/ccl/streamingccl/addresses.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,17 @@

package streamingccl

import "net/url"

// StreamAddress is the location of the stream. The topology of a stream should
// be resolvable given a stream address.
type StreamAddress string

// URL parses the stream address as a URL.
func (sa StreamAddress) URL() (*url.URL, error) {
return url.Parse(string(sa))
}

// PartitionAddress is the address where the stream client should be able to
// read the events produced by a partition of a stream.
//
Expand Down
16 changes: 15 additions & 1 deletion pkg/ccl/streamingccl/streamclient/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,25 @@ go_library(
name = "streamclient",
srcs = [
"client.go",
"random_stream_client.go",
"stream_client.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient",
visibility = ["//visibility:public"],
deps = ["//pkg/ccl/streamingccl"],
deps = [
"//pkg/ccl/streamingccl",
"//pkg/keys",
"//pkg/roachpb",
"//pkg/sql",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/systemschema",
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/rowenc",
"//pkg/sql/sem/tree",
"//pkg/util/hlc",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
],
)

go_test(
Expand Down
30 changes: 29 additions & 1 deletion pkg/ccl/streamingccl/streamclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package streamclient

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
Expand All @@ -24,5 +25,32 @@ type Client interface {

// ConsumePartition returns a channel on which we can start listening for
// events from a given partition that occur after a startTime.
ConsumePartition(address streamingccl.PartitionAddress, startTime time.Time) (chan streamingccl.Event, error)
//
// Canceling the context will stop reading the partition and close the event
// channel.
// TODO: Add an error channel so that the client can report any errors
// encountered while reading the stream.
ConsumePartition(ctx context.Context, address streamingccl.PartitionAddress, startTime time.Time) (chan streamingccl.Event, error)
}

// NewStreamClient creates a new stream client based on the stream
// address.
func NewStreamClient(streamAddress streamingccl.StreamAddress) (Client, error) {
var streamClient Client
streamURL, err := streamAddress.URL()
if err != nil {
return streamClient, err
}

switch streamURL.Scheme {
case TestScheme:
streamClient, err = newRandomStreamClient(streamURL)
if err != nil {
return streamClient, err
}
default:
streamClient = &mockClient{}
}

return streamClient, nil
}
76 changes: 57 additions & 19 deletions pkg/ccl/streamingccl/streamclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
package streamclient

import (
"context"
"fmt"
"net/url"
"testing"
"time"

Expand All @@ -35,51 +38,86 @@ func (sc testStreamClient) GetTopology(

// ConsumePartition implements the Client interface.
func (sc testStreamClient) ConsumePartition(
_ streamingccl.PartitionAddress, _ time.Time,
_ context.Context, _ streamingccl.PartitionAddress, _ time.Time,
) (chan streamingccl.Event, error) {
sampleKV := roachpb.KeyValue{
Key: []byte("key_1"),
Value: roachpb.Value{
RawBytes: []byte("value 1"),
RawBytes: []byte("value_1"),
Timestamp: hlc.Timestamp{WallTime: 1},
},
}

events := make(chan streamingccl.Event, 100)
events := make(chan streamingccl.Event, 2)
events <- streamingccl.MakeKVEvent(sampleKV)
events <- streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: timeutil.Now().UnixNano()})
events <- streamingccl.MakeCheckpointEvent(hlc.Timestamp{WallTime: 100})
close(events)

return events, nil
}

// TestExampleClientUsage serves as documentation to indicate how a stream
// ExampleClientUsage serves as documentation to indicate how a stream
// client could be used.
func TestExampleClientUsage(t *testing.T) {
func ExampleClient() {
client := testStreamClient{}
sa := streamingccl.StreamAddress("s3://my_bucket/my_stream")
topology, err := client.GetTopology(sa)
require.NoError(t, err)
topology, err := client.GetTopology("s3://my_bucket/my_stream")
if err != nil {
panic(err)
}

startTimestamp := timeutil.Now()
numReceivedEvents := 0

for _, partition := range topology.Partitions {
eventCh, err := client.ConsumePartition(partition, startTimestamp)
require.NoError(t, err)
eventCh, err := client.ConsumePartition(context.Background(), partition, startTimestamp)
if err != nil {
panic(err)
}

// This example looks for the closing of the channel to terminate the test,
// but an ingestion job should look for another event such as the user
// cutting over to the new cluster to move to the next stage.
for {
_, ok := <-eventCh
if !ok {
break
for event := range eventCh {
switch event.Type() {
case streamingccl.KVEvent:
kv := event.GetKV()
fmt.Printf("%s->%s@%d\n", kv.Key.String(), string(kv.Value.RawBytes), kv.Value.Timestamp.WallTime)
case streamingccl.CheckpointEvent:
fmt.Printf("resolved %d\n", event.GetResolved().WallTime)
default:
panic(fmt.Sprintf("unexpected event type %v", event.Type()))
}
numReceivedEvents++
}
}

// We expect 4 events, 2 from each partition.
require.Equal(t, 4, numReceivedEvents)
// Output:
// "key_1"->value_1@1
// resolved 100
// "key_1"->value_1@1
// resolved 100
}

// Ensure that all implementations specified in this test properly close the
// eventChannel when the given context is canceled.
func TestImplementationsCloseChannel(t *testing.T) {
streamURL, err := url.Parse("test://52")
require.NoError(t, err)
randomClient, err := newRandomStreamClient(streamURL)
require.NoError(t, err)

// TODO: Add SQL client and file client here when implemented.
impls := []Client{
&mockClient{},
randomClient,
}

for _, impl := range impls {
ctx, cancel := context.WithCancel(context.Background())
eventCh, err := impl.ConsumePartition(ctx, "test://53/", timeutil.Now())
require.NoError(t, err)

// Ensure that the eventCh closes when the context is canceled.
cancel()
for range eventCh {
}
}
}
Loading