Skip to content

Commit

Permalink
streamingccl: add ingestion job framework
Browse files Browse the repository at this point in the history
This change introduces a new StreamIngestionJob. It does not do much
more than laying out the general outline of the job, which is very
similar to other bulk jobs such as changefeed, backup etc.

More precisely:
- Introduces StreamIngestionDetails job details proto
- Hooks up the dependancy to a mock stream client
- Introduces a StreamIngestionProcessorSpec
- Sets up a simple DistSQL flow which round robin assigns the partitions
  to the processors.

Most notable TODOs in job land which will be addressed in follow up PRs:
- StreamIngestionPlanHook to create this job. Will involve figuring out
  SQL syntax.
- Introducing a ts watermark in both the job and processors. This
  watermark will represent the lowest resolved ts which all processors
have ingested till. Iron out semantics on job start and resumption.
- Introducing a StreamIngestionFrontier processor which will slurp the
  results from the StreamIngestionProcessors, and use them to keep track
of the minimum resolved ts across all processors.

Release note: None
  • Loading branch information
adityamaru committed Jan 7, 2021
1 parent 15765c0 commit 5797332
Show file tree
Hide file tree
Showing 16 changed files with 1,200 additions and 547 deletions.
19 changes: 18 additions & 1 deletion pkg/ccl/streamingccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,24 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "streamingccl",
srcs = ["stream_ingestion_job.go"],
srcs = [
"stream_ingestion_job.go",
"stream_ingestion_processor_planning.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl",
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/streamingccl/streamclient",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/kv",
"//pkg/roachpb",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/execinfrapb",
"//pkg/sql/physicalplan",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"@com_github_cockroachdb_logtags//:logtags",
],
)
85 changes: 84 additions & 1 deletion pkg/ccl/streamingccl/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,89 @@

package streamingccl

import (
"context"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
)

type streamIngestionResumer struct {
job *jobs.Job
}

func ingest(
ctx context.Context,
execCtx sql.JobExecContext,
streamAddress streamclient.PartitionAddress,
job *jobs.Job,
) error {
// Initialize a stream client and resolve topology.
client := streamclient.NewStreamClient()
sa := streamclient.StreamAddress(streamAddress)
topology, err := client.GetTopology(sa)
if err != nil {
return err
}

evalCtx := execCtx.ExtendedEvalContext()
dsp := execCtx.DistSQLPlanner()

planCtx, nodes, err := dsp.SetupAllNodesPlanning(ctx, evalCtx, execCtx.ExecCfg())
if err != nil {
return err
}

// Construct stream ingestion processor specs.
streamIngestionSpecs, err := distStreamIngestionPlanSpecs(topology, nodes)
if err != nil {
return err
}

// Plan and run the DistSQL flow.
err = distStreamIngest(ctx, execCtx, nodes, planCtx, dsp, streamIngestionSpecs)
if err != nil {
return err
}

return nil
}

// Resume is part of the jobs.Resumer interface.
func (s *streamIngestionResumer) Resume(
ctx context.Context, execCtx interface{}, resultsCh chan<- tree.Datums,
) error {
details := s.job.Details().(jobspb.StreamIngestionDetails)
p := execCtx.(sql.JobExecContext)

err := ingest(ctx, p, streamclient.PartitionAddress(details.StreamAddress), s.job)
if err != nil {
return err
}

// TODO(adityamaru): We probably want to use the resultsCh to indicate that
// the processors have completed setup. We can then return the job ID in the
// plan hook similar to how changefeeds do it.

return nil
}

// OnFailOrCancel is part of the jobs.Resumer interface.
func (s *streamIngestionResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}) error {
return nil
}

var _ jobs.Resumer = &streamIngestionResumer{}

func init() {
// TODO: Implement me.
jobs.RegisterConstructor(
jobspb.TypeStreamIngestion,
func(job *jobs.Job,
settings *cluster.Settings) jobs.Resumer {
return &streamIngestionResumer{job: job}
})
}
109 changes: 109 additions & 0 deletions pkg/ccl/streamingccl/stream_ingestion_processor_planning.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2020 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package streamingccl

import (
"context"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/logtags"
)

// TODO(adityamaru): Figure out what the processors will return.
var streamIngestionResultTypes = []*types.T{}

func distStreamIngestionPlanSpecs(
topology streamclient.Topology, nodes []roachpb.NodeID,
) ([]*execinfrapb.StreamIngestionDataSpec, error) {

// For each stream partition in the topology, assign it to a node.
streamIngestionSpecs := make([]*execinfrapb.StreamIngestionDataSpec, 0, len(nodes))

for i, partition := range topology.Partitions {
// Round robin assign the stream partitions to nodes. Partitions 0 through
// len(nodes) - 1 creates the spec. Future partitions just add themselves to
// the partition addresses.
if i < len(nodes) {
spec := &execinfrapb.StreamIngestionDataSpec{
PartitionAddress: make([]streamclient.PartitionAddress, 0),
}
streamIngestionSpecs = append(streamIngestionSpecs, spec)
}
n := i % len(nodes)
streamIngestionSpecs[n].PartitionAddress = append(streamIngestionSpecs[n].PartitionAddress, partition)
}

return streamIngestionSpecs, nil
}

func distStreamIngest(
ctx context.Context,
execCtx sql.JobExecContext,
nodes []roachpb.NodeID,
planCtx *sql.PlanningCtx,
dsp *sql.DistSQLPlanner,
streamIngestionSpecs []*execinfrapb.StreamIngestionDataSpec,
) error {
ctx = logtags.AddTag(ctx, "stream-ingest-distsql", nil)
evalCtx := execCtx.ExtendedEvalContext()
var noTxn *kv.Txn

if len(streamIngestionSpecs) == 0 {
return nil
}

// Setup a one-stage plan with one proc per input spec.
corePlacement := make([]physicalplan.ProcessorCorePlacement, len(streamIngestionSpecs))
for i := range streamIngestionSpecs {
corePlacement[i].NodeID = nodes[i]
corePlacement[i].Core.StreamIngestionData = streamIngestionSpecs[i]
}

p := planCtx.NewPhysicalPlan()
p.AddNoInputStage(
corePlacement,
execinfrapb.PostProcessSpec{},
streamIngestionResultTypes,
execinfrapb.Ordering{},
)

// TODO(adityamaru): It is likely that we will add a StreamIngestFrontier
// processor on the coordinator node. All the StreamIngestionProcessors will
// feed their results into this frontier. This is similar to the relationship
// between the ChangeAggregator and ChangeFrontier processors. The
// StreamIngestFrontier will be responsible for updating the job watermark
// with the min of the resolved ts outputted by all the processors.

// TODO(adityamaru): Once result types are updated, add PlanToStreamColMap.
dsp.FinalizePlan(planCtx, p)

recv := sql.MakeDistSQLReceiver(
ctx,
// TODO(adityamaru): Are there any results we want to surface to the user?
nil, /* resultWriter */
tree.Rows,
nil, /* rangeCache */
noTxn,
nil, /* clockUpdater */
evalCtx.Tracing,
)
defer recv.Release()

// Copy the evalCtx, as dsp.Run() might change it.
evalCtxCopy := *evalCtx
dsp.Run(planCtx, noTxn, p, recv, &evalCtxCopy, nil /* finishedSetupFn */)
return nil
}
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamclient/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go_library(
srcs = [
"client.go",
"event.go",
"stream_client.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient",
visibility = ["//visibility:public"],
Expand Down
10 changes: 5 additions & 5 deletions pkg/ccl/streamingccl/streamclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@ import (
"github.com/stretchr/testify/require"
)

type mockStreamClient struct{}
type testStreamClient struct{}

var _ Client = mockStreamClient{}
var _ Client = testStreamClient{}

// GetTopology implements the Client interface.
func (sc mockStreamClient) GetTopology(_ StreamAddress) (Topology, error) {
func (sc testStreamClient) GetTopology(_ StreamAddress) (Topology, error) {
return Topology{Partitions: []PartitionAddress{
"s3://my_bucket/my_stream/partition_1",
"s3://my_bucket/my_stream/partition_2",
}}, nil
}

// ConsumePartition implements the Client interface.
func (sc mockStreamClient) ConsumePartition(_ PartitionAddress, _ time.Time) (chan Event, error) {
func (sc testStreamClient) ConsumePartition(_ PartitionAddress, _ time.Time) (chan Event, error) {
sampleKV := roachpb.KeyValue{
Key: []byte("key_1"),
Value: roachpb.Value{
Expand All @@ -51,7 +51,7 @@ func (sc mockStreamClient) ConsumePartition(_ PartitionAddress, _ time.Time) (ch
// TestExampleClientUsage serves as documentation to indicate how a stream
// client could be used.
func TestExampleClientUsage(t *testing.T) {
client := mockStreamClient{}
client := testStreamClient{}
sa := StreamAddress("s3://my_bucket/my_stream")
topology, err := client.GetTopology(sa)
require.NoError(t, err)
Expand Down
33 changes: 33 additions & 0 deletions pkg/ccl/streamingccl/streamclient/stream_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2020 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package streamclient

import "time"

// client is a mock stream client.
type client struct{}

var _ Client = &client{}

// NewStreamClient returns a new mock stream client.
func NewStreamClient() Client {
return &client{}
}

// GetTopology implements the Client interface.
func (m *client) GetTopology(address StreamAddress) (Topology, error) {
panic("unimplemented mock method")
}

// ConsumePartition implements the Client interface.
func (m *client) ConsumePartition(
address PartitionAddress, startTime time.Time,
) (chan Event, error) {
panic("unimplemented mock method")
}
1 change: 1 addition & 0 deletions pkg/jobs/jobspb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/jobs/jobspb",
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/streamingccl/streamclient",
"//pkg/roachpb",
"//pkg/security",
"//pkg/sql/catalog/descpb",
Expand Down
Loading

0 comments on commit 5797332

Please sign in to comment.