diff --git a/pkg/ccl/streamingccl/streamclient/BUILD.bazel b/pkg/ccl/streamingccl/streamclient/BUILD.bazel index 0c8d2e326939..c1df0e140789 100644 --- a/pkg/ccl/streamingccl/streamclient/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamclient/BUILD.bazel @@ -17,7 +17,6 @@ go_library( "//pkg/keys", "//pkg/roachpb", "//pkg/security/username", - "//pkg/sql", "//pkg/sql/catalog", "//pkg/sql/catalog/catalogkeys", "//pkg/sql/catalog/catpb", diff --git a/pkg/ccl/streamingccl/streamclient/random_stream_client.go b/pkg/ccl/streamingccl/streamclient/random_stream_client.go index a16b9daf32f3..484f89607966 100644 --- a/pkg/ccl/streamingccl/streamclient/random_stream_client.go +++ b/pkg/ccl/streamingccl/streamclient/random_stream_client.go @@ -22,7 +22,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" - "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" @@ -402,20 +401,26 @@ func (m *RandomStreamClient) Heartbeat( return streampb.StreamReplicationStatus{}, nil } +// TestTableCreator converts a SQL string to a table descriptor for test purposes. +var TestTableCreator func( + ctx context.Context, + parentID, id descpb.ID, + schema string, + privileges *catpb.PrivilegeDescriptor, +) (*tabledesc.Mutable, error) + // getDescriptorAndNamespaceKVForTableID returns the namespace and descriptor // KVs for the table with tableID. func getDescriptorAndNamespaceKVForTableID( config randomStreamConfig, tableID descpb.ID, ) (*tabledesc.Mutable, []roachpb.KeyValue, error) { tableName := fmt.Sprintf("%s%d", IngestionTablePrefix, tableID) - testTable, err := sql.CreateTestTableDescriptor( + testTable, err := TestTableCreator( context.Background(), IngestionDatabaseID, tableID, fmt.Sprintf(RandomStreamSchemaPlaceholder, tableName), catpb.NewBasePrivilegeDescriptor(username.RootUserName()), - nil, /* txn */ - nil, /* collection */ ) if err != nil { return nil, nil, err diff --git a/pkg/sql/streaming.go b/pkg/sql/streaming.go index 841f6f655079..66317e482e7a 100644 --- a/pkg/sql/streaming.go +++ b/pkg/sql/streaming.go @@ -13,10 +13,14 @@ package sql import ( "context" + "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamingest" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamproducer" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/streaming" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -70,3 +74,14 @@ func (p *planner) CompleteReplicationStream( ) error { return streamproducer.CompleteReplicationStream(ctx, p, streamID, successfulIngestion) } + +func init() { + streamclient.TestTableCreator = func( + ctx context.Context, + parentID, id descpb.ID, + schema string, + privileges *catpb.PrivilegeDescriptor, + ) (*tabledesc.Mutable, error) { + return CreateTestTableDescriptor(ctx, parentID, id, schema, privileges, nil /* txn */, nil /* collection */) + } +}