Skip to content

Commit

Permalink
ingestion: add planHook for ingestion job
Browse files Browse the repository at this point in the history
This change adds the boiler plate plan hook that configures and starts
up the ingestion job.

Release note: None
  • Loading branch information
adityamaru committed Jan 26, 2021
1 parent 3c22578 commit 70f88cc
Show file tree
Hide file tree
Showing 13 changed files with 420 additions and 443 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ go_library(
"//pkg/ccl/partitionccl",
"//pkg/ccl/storageccl",
"//pkg/ccl/storageccl/engineccl",
"//pkg/ccl/streamingccl/ingestion",
"//pkg/ccl/streamingccl/streamingest",
"//pkg/ccl/utilccl",
"//pkg/ccl/workloadccl",
],
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/ccl_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
_ "github.com/cockroachdb/cockroach/pkg/ccl/partitionccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/storageccl/engineccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/ingestion"
_ "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamingest"
_ "github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/workloadccl"
)
70 changes: 3 additions & 67 deletions pkg/ccl/streamingccl/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,79 +1,15 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "streamingccl",
srcs = [
"stream_ingestion_frontier_processor.go",
"stream_ingestion_job.go",
"stream_ingestion_processor.go",
"stream_ingestion_processor_planning.go",
"addresses.go",
"event.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl",
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/storageccl",
"//pkg/ccl/streamingccl/streamclient",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/kv",
"//pkg/kv/bulk",
"//pkg/roachpb",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/physicalplan",
"//pkg/sql/rowenc",
"//pkg/sql/rowexec",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/storage",
"//pkg/util/hlc",
"//pkg/util/log/logcrash",
"//pkg/util/protoutil",
"//pkg/util/span",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_redact//:redact",
],
)

go_test(
name = "streamingccl_test",
srcs = [
"main_test.go",
"stream_ingestion_frontier_processor_test.go",
"stream_ingestion_job_test.go",
"stream_ingestion_processor_test.go",
],
embed = [":streamingccl"],
deps = [
"//pkg/base",
"//pkg/ccl/storageccl",
"//pkg/ccl/streamingccl/streamclient",
"//pkg/ccl/utilccl",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/roachpb",
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/sem/tree",
"//pkg/testutils/distsqlutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/protoutil",
"//pkg/util/randutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
)
157 changes: 0 additions & 157 deletions pkg/ccl/streamingccl/stream_ingestion_processor_test.go

This file was deleted.

13 changes: 13 additions & 0 deletions pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "streamingest",
srcs = [
"stream_ingestion_frontier_processor.go",
"stream_ingestion_job.go",
"stream_ingestion_planning.go",
"stream_ingestion_processor.go",
"stream_ingestion_processor_planning.go",
],
Expand All @@ -13,13 +15,16 @@ go_library(
"//pkg/ccl/storageccl",
"//pkg/ccl/streamingccl",
"//pkg/ccl/streamingccl/streamclient",
"//pkg/ccl/utilccl",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/bulk",
"//pkg/roachpb",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/catalog/colinfo",
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
"//pkg/sql/physicalplan",
Expand All @@ -28,17 +33,24 @@ go_library(
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/storage",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/log/logcrash",
"//pkg/util/protoutil",
"//pkg/util/span",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_redact//:redact",
],
)

go_test(
name = "streamingest_test",
srcs = [
"main_test.go",
"stream_ingestion_frontier_processor_test.go",
"stream_ingestion_job_test.go",
"stream_ingestion_processor_test.go",
],
Expand Down Expand Up @@ -72,6 +84,7 @@ go_test(
"//pkg/util/protoutil",
"//pkg/util/randutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package streamingccl
package streamingest

import (
"context"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -62,11 +63,11 @@ func newStreamIngestionFrontierProcessor(
output execinfra.RowReceiver,
) (execinfra.Processor, error) {
sf := &streamIngestionFrontier{
flowCtx: flowCtx,
spec: spec,
input: input,
flowCtx: flowCtx,
spec: spec,
input: input,
highWaterAtStart: spec.HighWaterAtStart,
frontier: span.MakeFrontier(spec.TrackedSpans...),
frontier: span.MakeFrontier(spec.TrackedSpans...),
}
if err := sf.Init(
sf,
Expand Down
Loading

0 comments on commit 70f88cc

Please sign in to comment.