-
Notifications
You must be signed in to change notification settings - Fork 3.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
streamingccl: add ingestion job framework
This change introduces a new StreamIngestionJob. It does not do much more than laying out the general outline of the job, which is very similar to other bulk jobs such as changefeed, backup etc. More precisely: - Introduces StreamIngestionDetails job details proto - Hooks up the dependancy to a mock stream client - Introduces a StreamIngestionProcessorSpec - Sets up a simple DistSQL flow which round robin assigns the partitions to the processors. Most notable TODOs in job land which will be addressed in follow up PRs: - StreamIngestionPlanHook to create this job. Will involve figuring out SQL syntax. - Introducing a ts watermark in both the job and processors. This watermark will represent the lowest resolved ts which all processors have ingested till. Iron out semantics on job start and resumption. - Introducing a StreamIngestionFrontier processor which will slurp the results from the StreamIngestionProcessors, and use them to keep track of the minimum resolved ts across all processors. Release note: None
- Loading branch information
1 parent
15765c0
commit 5797332
Showing
16 changed files
with
1,200 additions
and
547 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
109 changes: 109 additions & 0 deletions
109
pkg/ccl/streamingccl/stream_ingestion_processor_planning.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
// Copyright 2020 The Cockroach Authors. | ||
// | ||
// Licensed as a CockroachDB Enterprise file under the Cockroach Community | ||
// License (the "License"); you may not use this file except in compliance with | ||
// the License. You may obtain a copy of the License at | ||
// | ||
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt | ||
|
||
package streamingccl | ||
|
||
import ( | ||
"context" | ||
|
||
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" | ||
"github.com/cockroachdb/cockroach/pkg/kv" | ||
"github.com/cockroachdb/cockroach/pkg/roachpb" | ||
"github.com/cockroachdb/cockroach/pkg/sql" | ||
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" | ||
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan" | ||
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree" | ||
"github.com/cockroachdb/cockroach/pkg/sql/types" | ||
"github.com/cockroachdb/logtags" | ||
) | ||
|
||
// TODO(adityamaru): Figure out what the processors will return. | ||
var streamIngestionResultTypes = []*types.T{} | ||
|
||
func distStreamIngestionPlanSpecs( | ||
topology streamclient.Topology, nodes []roachpb.NodeID, | ||
) ([]*execinfrapb.StreamIngestionDataSpec, error) { | ||
|
||
// For each stream partition in the topology, assign it to a node. | ||
streamIngestionSpecs := make([]*execinfrapb.StreamIngestionDataSpec, 0, len(nodes)) | ||
|
||
for i, partition := range topology.Partitions { | ||
// Round robin assign the stream partitions to nodes. Partitions 0 through | ||
// len(nodes) - 1 creates the spec. Future partitions just add themselves to | ||
// the partition addresses. | ||
if i < len(nodes) { | ||
spec := &execinfrapb.StreamIngestionDataSpec{ | ||
PartitionAddress: make([]streamclient.PartitionAddress, 0), | ||
} | ||
streamIngestionSpecs = append(streamIngestionSpecs, spec) | ||
} | ||
n := i % len(nodes) | ||
streamIngestionSpecs[n].PartitionAddress = append(streamIngestionSpecs[n].PartitionAddress, partition) | ||
} | ||
|
||
return streamIngestionSpecs, nil | ||
} | ||
|
||
func distStreamIngest( | ||
ctx context.Context, | ||
execCtx sql.JobExecContext, | ||
nodes []roachpb.NodeID, | ||
planCtx *sql.PlanningCtx, | ||
dsp *sql.DistSQLPlanner, | ||
streamIngestionSpecs []*execinfrapb.StreamIngestionDataSpec, | ||
) error { | ||
ctx = logtags.AddTag(ctx, "stream-ingest-distsql", nil) | ||
evalCtx := execCtx.ExtendedEvalContext() | ||
var noTxn *kv.Txn | ||
|
||
if len(streamIngestionSpecs) == 0 { | ||
return nil | ||
} | ||
|
||
// Setup a one-stage plan with one proc per input spec. | ||
corePlacement := make([]physicalplan.ProcessorCorePlacement, len(streamIngestionSpecs)) | ||
for i := range streamIngestionSpecs { | ||
corePlacement[i].NodeID = nodes[i] | ||
corePlacement[i].Core.StreamIngestionData = streamIngestionSpecs[i] | ||
} | ||
|
||
p := planCtx.NewPhysicalPlan() | ||
p.AddNoInputStage( | ||
corePlacement, | ||
execinfrapb.PostProcessSpec{}, | ||
streamIngestionResultTypes, | ||
execinfrapb.Ordering{}, | ||
) | ||
|
||
// TODO(adityamaru): It is likely that we will add a StreamIngestFrontier | ||
// processor on the coordinator node. All the StreamIngestionProcessors will | ||
// feed their results into this frontier. This is similar to the relationship | ||
// between the ChangeAggregator and ChangeFrontier processors. The | ||
// StreamIngestFrontier will be responsible for updating the job watermark | ||
// with the min of the resolved ts outputted by all the processors. | ||
|
||
// TODO(adityamaru): Once result types are updated, add PlanToStreamColMap. | ||
dsp.FinalizePlan(planCtx, p) | ||
|
||
recv := sql.MakeDistSQLReceiver( | ||
ctx, | ||
// TODO(adityamaru): Are there any results we want to surface to the user? | ||
nil, /* resultWriter */ | ||
tree.Rows, | ||
nil, /* rangeCache */ | ||
noTxn, | ||
nil, /* clockUpdater */ | ||
evalCtx.Tracing, | ||
) | ||
defer recv.Release() | ||
|
||
// Copy the evalCtx, as dsp.Run() might change it. | ||
evalCtxCopy := *evalCtx | ||
dsp.Run(planCtx, noTxn, p, recv, &evalCtxCopy, nil /* finishedSetupFn */) | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
// Copyright 2020 The Cockroach Authors. | ||
// | ||
// Licensed as a CockroachDB Enterprise file under the Cockroach Community | ||
// License (the "License"); you may not use this file except in compliance with | ||
// the License. You may obtain a copy of the License at | ||
// | ||
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt | ||
|
||
package streamclient | ||
|
||
import "time" | ||
|
||
// client is a mock stream client. | ||
type client struct{} | ||
|
||
var _ Client = &client{} | ||
|
||
// NewStreamClient returns a new mock stream client. | ||
func NewStreamClient() Client { | ||
return &client{} | ||
} | ||
|
||
// GetTopology implements the Client interface. | ||
func (m *client) GetTopology(address StreamAddress) (Topology, error) { | ||
panic("unimplemented mock method") | ||
} | ||
|
||
// ConsumePartition implements the Client interface. | ||
func (m *client) ConsumePartition( | ||
address PartitionAddress, startTime time.Time, | ||
) (chan Event, error) { | ||
panic("unimplemented mock method") | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.