Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
125902: streamingccl: rename to crosscluster r=dt a=dt

Streamclient is the only one I didn't shorten here since it won't be 100% mechanical -- we have some "client" var names so we'd need to change those or import it with an alias, so I've left it off for now to keep this one 100% find-and-replace.

Co-authored-by: David Taylor <tinystatemachine@gmail.com>
  • Loading branch information
craig[bot] and dt committed Jun 19, 2024
2 parents fa6f659 + 5b92aa2 commit 13c4659
Show file tree
Hide file tree
Showing 105 changed files with 388 additions and 388 deletions.
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@
/pkg/ccl/jobsccl/ @cockroachdb/jobs-prs @cockroachdb/disaster-recovery
/pkg/ccl/changefeedccl/ @cockroachdb/cdc-prs

/pkg/ccl/streamingccl/ @cockroachdb/disaster-recovery
/pkg/ccl/crosscluster/ @cockroachdb/disaster-recovery
/pkg/ccl/backupccl/ @cockroachdb/disaster-recovery
/pkg/ccl/backupccl/*_job.go @cockroachdb/disaster-recovery @cockroachdb/jobs-prs
/pkg/ccl/revertccl/ @cockroachdb/disaster-recovery
Expand Down
2 changes: 1 addition & 1 deletion build/bazelutil/nogo_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
"pkg/ccl/multitenantccl/tenantcostclient/tenant_side.go": "flagged by linter, should be evaluated",
"pkg/ccl/sqlproxyccl/acl/watcher.go": "flagged by linter, should be evaluated",
"pkg/ccl/sqlproxyccl/tenantdirsvr/test_directory_svr.go": "flagged by linter, should be evaluated",
"pkg/ccl/streamingccl/streamclient/random_stream_client.go": "flagged by linter, should be evaluated",
"pkg/ccl/crosscluster/streamclient/random_stream_client.go": "flagged by linter, should be evaluated",
"pkg/cli/clisqlexec/format_csv_tsv.go": "flagged by linter, should be evaluated",
"pkg/cli/clisqlshell/sql.go": "flagged by linter, should be evaluated",
"pkg/cli/syncbench/syncbench.go": "flagged by linter, should be evaluated",
Expand Down
34 changes: 17 additions & 17 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ ALL_TESTS = [
"//pkg/ccl/cloudccl/cloudprivilege:cloudprivilege_test",
"//pkg/ccl/cloudccl/externalconn:externalconn_test",
"//pkg/ccl/cloudccl/gcp:gcp_test",
"//pkg/ccl/crosscluster/logical:logical_test",
"//pkg/ccl/crosscluster/physical:physical_test",
"//pkg/ccl/crosscluster/producer:producer_test",
"//pkg/ccl/crosscluster/replicationutils:replicationutils_test",
"//pkg/ccl/crosscluster/streamclient:streamclient_test",
"//pkg/ccl/importerccl:importerccl_test",
"//pkg/ccl/jobsccl/jobsprotectedtsccl:jobsprotectedtsccl_test",
"//pkg/ccl/jwtauthccl:jwtauthccl_test",
Expand Down Expand Up @@ -95,11 +100,6 @@ ALL_TESTS = [
"//pkg/ccl/sqlproxyccl:sqlproxyccl_test",
"//pkg/ccl/storageccl/engineccl:engineccl_test",
"//pkg/ccl/storageccl:storageccl_test",
"//pkg/ccl/streamingccl/logical:logical_test",
"//pkg/ccl/streamingccl/replicationutils:replicationutils_test",
"//pkg/ccl/streamingccl/streamclient:streamclient_test",
"//pkg/ccl/streamingccl/streamingest:streamingest_test",
"//pkg/ccl/streamingccl/streamproducer:streamproducer_test",
"//pkg/ccl/telemetryccl:telemetryccl_test",
"//pkg/ccl/testccl/authccl:authccl_test",
"//pkg/ccl/testccl/sqlccl:sqlccl_test",
Expand Down Expand Up @@ -870,6 +870,18 @@ GO_TARGETS = [
"//pkg/ccl/cmdccl/clusterrepl:clusterrepl_lib",
"//pkg/ccl/cmdccl/stub-schema-registry:stub-schema-registry",
"//pkg/ccl/cmdccl/stub-schema-registry:stub-schema-registry_lib",
"//pkg/ccl/crosscluster/logical:logical",
"//pkg/ccl/crosscluster/logical:logical_test",
"//pkg/ccl/crosscluster/physical:physical",
"//pkg/ccl/crosscluster/physical:physical_test",
"//pkg/ccl/crosscluster/producer:producer",
"//pkg/ccl/crosscluster/producer:producer_test",
"//pkg/ccl/crosscluster/replicationtestutils:replicationtestutils",
"//pkg/ccl/crosscluster/replicationutils:replicationutils",
"//pkg/ccl/crosscluster/replicationutils:replicationutils_test",
"//pkg/ccl/crosscluster/streamclient:streamclient",
"//pkg/ccl/crosscluster/streamclient:streamclient_test",
"//pkg/ccl/crosscluster:crosscluster",
"//pkg/ccl/gssapiccl:gssapiccl",
"//pkg/ccl/importerccl:importerccl_test",
"//pkg/ccl/jobsccl/jobsprotectedtsccl:jobsprotectedtsccl_test",
Expand Down Expand Up @@ -960,18 +972,6 @@ GO_TARGETS = [
"//pkg/ccl/storageccl/engineccl:engineccl_test",
"//pkg/ccl/storageccl:storageccl",
"//pkg/ccl/storageccl:storageccl_test",
"//pkg/ccl/streamingccl/logical:logical",
"//pkg/ccl/streamingccl/logical:logical_test",
"//pkg/ccl/streamingccl/replicationtestutils:replicationtestutils",
"//pkg/ccl/streamingccl/replicationutils:replicationutils",
"//pkg/ccl/streamingccl/replicationutils:replicationutils_test",
"//pkg/ccl/streamingccl/streamclient:streamclient",
"//pkg/ccl/streamingccl/streamclient:streamclient_test",
"//pkg/ccl/streamingccl/streamingest:streamingest",
"//pkg/ccl/streamingccl/streamingest:streamingest_test",
"//pkg/ccl/streamingccl/streamproducer:streamproducer",
"//pkg/ccl/streamingccl/streamproducer:streamproducer_test",
"//pkg/ccl/streamingccl:streamingccl",
"//pkg/ccl/telemetryccl:telemetryccl_test",
"//pkg/ccl/testccl/authccl:authccl_test",
"//pkg/ccl/testccl/sqlccl:sqlccl_test",
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ go_library(
"//pkg/ccl/buildccl",
"//pkg/ccl/changefeedccl",
"//pkg/ccl/cliccl",
"//pkg/ccl/crosscluster/logical",
"//pkg/ccl/crosscluster/physical",
"//pkg/ccl/crosscluster/producer",
"//pkg/ccl/gssapiccl",
"//pkg/ccl/jwtauthccl",
"//pkg/ccl/kvccl",
Expand All @@ -25,9 +28,6 @@ go_library(
"//pkg/ccl/securityccl/fipsccl",
"//pkg/ccl/storageccl",
"//pkg/ccl/storageccl/engineccl",
"//pkg/ccl/streamingccl/logical",
"//pkg/ccl/streamingccl/streamingest",
"//pkg/ccl/streamingccl/streamproducer",
"//pkg/ccl/utilccl",
"//pkg/ccl/workloadccl",
"//pkg/server",
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/ccl_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
_ "github.com/cockroachdb/cockroach/pkg/ccl/buildccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/cliccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/logical"
_ "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/physical"
_ "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/producer"
_ "github.com/cockroachdb/cockroach/pkg/ccl/gssapiccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/jwtauthccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl"
Expand All @@ -31,9 +34,6 @@ import (
_ "github.com/cockroachdb/cockroach/pkg/ccl/securityccl/fipsccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/storageccl/engineccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/logical"
_ "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamingest"
_ "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamproducer"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/workloadccl"
"github.com/cockroachdb/cockroach/pkg/server"
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/cmdccl/clusterrepl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/cmdccl/clusterrepl",
visibility = ["//visibility:private"],
deps = [
"//pkg/ccl/streamingccl",
"//pkg/ccl/streamingccl/streamclient",
"//pkg/ccl/crosscluster",
"//pkg/ccl/crosscluster/streamclient",
"//pkg/cli/exit",
"//pkg/keys",
"//pkg/repstream/streampb",
Expand Down
12 changes: 6 additions & 6 deletions pkg/ccl/cmdccl/clusterrepl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
"os/signal"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
"github.com/cockroachdb/cockroach/pkg/ccl/crosscluster"
"github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/streamclient"
"github.com/cockroachdb/cockroach/pkg/cli/exit"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
Expand Down Expand Up @@ -243,16 +243,16 @@ func subscriptionConsumer(
return sub.Err()
}
switch event.Type() {
case streamingccl.KVEvent:
case crosscluster.KVEvent:
sz = 0
for _, kv := range event.GetKVs() {
sz += kv.Size()
}
case streamingccl.SSTableEvent:
case crosscluster.SSTableEvent:
ssTab := event.GetSSTable()
sz = ssTab.Size()
case streamingccl.DeleteRangeEvent:
case streamingccl.CheckpointEvent:
case crosscluster.DeleteRangeEvent:
case crosscluster.CheckpointEvent:
fmt.Printf("%s checkpoint\n", timeutil.Now().Format(time.RFC3339))
resolved := event.GetResolvedSpans()
for _, r := range resolved {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "streamingccl",
name = "crosscluster",
srcs = [
"addresses.go",
"errors.go",
"event.go",
"settings.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl",
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster",
visibility = ["//visibility:public"],
deps = [
"//pkg/jobs/jobspb",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package streamingccl
package crosscluster

import "net/url"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package streamingccl
package crosscluster

import (
"fmt"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package streamingccl
package crosscluster

import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ go_library(
"lww_row_processor.go",
"metrics.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/logical",
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/logical",
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/ccl/changefeedccl/cdcevent",
"//pkg/ccl/changefeedccl/changefeedbase",
"//pkg/ccl/streamingccl",
"//pkg/ccl/streamingccl/streamclient",
"//pkg/ccl/streamingccl/streamingest",
"//pkg/ccl/crosscluster",
"//pkg/ccl/crosscluster/physical",
"//pkg/ccl/crosscluster/streamclient",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/jobs/jobsprofiler",
Expand Down Expand Up @@ -71,8 +71,8 @@ go_test(
deps = [
"//pkg/base",
"//pkg/ccl",
"//pkg/ccl/crosscluster/replicationtestutils",
"//pkg/ccl/storageccl",
"//pkg/ccl/streamingccl/replicationtestutils",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/kv/kvpb",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamingest"
"github.com/cockroachdb/cockroach/pkg/ccl/crosscluster"
"github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/physical"
"github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/streamclient"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -27,7 +27,7 @@ import (

func constructLogicalReplicationWriterSpecs(
ctx context.Context,
streamAddress streamingccl.StreamAddress,
streamAddress crosscluster.StreamAddress,
topology streamclient.Topology,
destSQLInstances []sql.InstanceLocality,
initialScanTimestamp hlc.Timestamp,
Expand All @@ -51,7 +51,7 @@ func constructLogicalReplicationWriterSpecs(
writerSpecs := make(map[base.SQLInstanceID][]execinfrapb.LogicalReplicationWriterSpec, len(destSQLInstances))

// Update stream ingestion specs with their matched source node.
matcher := streamingest.MakeNodeMatcher(destSQLInstances)
matcher := physical.MakeNodeMatcher(destSQLInstances)
for _, candidate := range matcher.FindSourceNodePriority(topology) {
destID := matcher.FindMatch(candidate.ClosestDestIDs)
partition := candidate.Partition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamingest"
"github.com/cockroachdb/cockroach/pkg/ccl/crosscluster"
"github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/physical"
"github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/streamclient"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler"
Expand Down Expand Up @@ -195,13 +195,13 @@ func (r *logicalReplicationResumer) ingest(
if err != nil {
return err
}
destNodeLocalities, err := streamingest.GetDestNodeLocalities(ctx, distSQLPlanner, nodes)
destNodeLocalities, err := physical.GetDestNodeLocalities(ctx, distSQLPlanner, nodes)
if err != nil {
return err
}

specs, err := constructLogicalReplicationWriterSpecs(ctx,
streamingccl.StreamAddress(payload.TargetClusterConnStr),
crosscluster.StreamAddress(payload.TargetClusterConnStr),
plan.Topology,
destNodeLocalities,
payload.ReplicationStartTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
"github.com/cockroachdb/cockroach/pkg/ccl/crosscluster"
"github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/streamclient"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
Expand Down Expand Up @@ -190,7 +190,7 @@ func (lrw *logicalReplicationWriterProcessor) Start(ctx context.Context) {
if redactedErr != nil {
log.Warning(lrw.Ctx(), "could not redact stream address")
}
streamClient, err := streamclient.NewStreamClient(ctx, streamingccl.StreamAddress(addr), db,
streamClient, err := streamclient.NewStreamClient(ctx, crosscluster.StreamAddress(addr), db,
streamclient.WithStreamID(streampb.StreamID(lrw.spec.StreamID)),
streamclient.WithCompression(true),
)
Expand Down Expand Up @@ -348,10 +348,10 @@ func (lrw *logicalReplicationWriterProcessor) consumeEvents(ctx context.Context)
}

func (lrw *logicalReplicationWriterProcessor) handleEvent(
ctx context.Context, event streamingccl.Event,
ctx context.Context, event crosscluster.Event,
) error {
switch event.Type() {
case streamingccl.KVEvent:
case crosscluster.KVEvent:
ts := event.GetKVs()[0].KeyValue.Value.Timestamp.GoTime()
lrw.metrics.AdmitLatency.RecordValue(
timeutil.Since(ts).Nanoseconds())
Expand All @@ -366,21 +366,21 @@ func (lrw *logicalReplicationWriterProcessor) handleEvent(
}

switch event.Type() {
case streamingccl.KVEvent:
case crosscluster.KVEvent:
if err := lrw.flushBuffer(ctx, event.GetKVs()); err != nil {
return err
}
case streamingccl.CheckpointEvent:
case crosscluster.CheckpointEvent:
if err := lrw.checkpoint(ctx, event); err != nil {
return err
}
case streamingccl.SSTableEvent, streamingccl.DeleteRangeEvent:
case crosscluster.SSTableEvent, crosscluster.DeleteRangeEvent:
// TODO(ssd): Handle SSTableEvent here eventually. I'm not sure
// we'll ever want to truly handle DeleteRangeEvent since
// currently those are only used by DROP which should be handled
// via whatever mechanism handles schema changes.
return errors.Newf("unexpected event for online stream: %v", event)
case streamingccl.SplitEvent:
case crosscluster.SplitEvent:
log.Infof(lrw.Ctx(), "SplitEvent received on logical replication stream")
default:
return errors.Newf("unknown streaming event type %v", event.Type())
Expand All @@ -389,7 +389,7 @@ func (lrw *logicalReplicationWriterProcessor) handleEvent(
}

func (lrw *logicalReplicationWriterProcessor) checkpoint(
ctx context.Context, event streamingccl.Event,
ctx context.Context, event crosscluster.Event,
) error {
if streamingKnobs, ok := lrw.FlowCtx.TestingKnobs().StreamingTestingKnobs.(*sql.StreamingTestingKnobs); ok {
if streamingKnobs != nil && streamingKnobs.ElideCheckpointEvent != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationtestutils"
"github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationtestutils"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down
File renamed without changes.
File renamed without changes.
Loading

0 comments on commit 13c4659

Please sign in to comment.