Skip to content

Commit

Permalink
streamclient: break dependency on sql
Browse files Browse the repository at this point in the history
Currently, `streamclient` pkg import `sql` pkg just to create a test table
descriptor. This is causing dependency loop. We now inject the
`streamclient.TestTableGenerator` via `sql/streaming.go`.

Release note: None
  • Loading branch information
ZhouXing19 committed Oct 31, 2022
1 parent ee61814 commit 9ce2d85
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 5 deletions.
1 change: 0 additions & 1 deletion pkg/ccl/streamingccl/streamclient/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ go_library(
"//pkg/keys",
"//pkg/roachpb",
"//pkg/security/username",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/catalogkeys",
"//pkg/sql/catalog/catpb",
Expand Down
13 changes: 9 additions & 4 deletions pkg/ccl/streamingccl/streamclient/random_stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb"
Expand Down Expand Up @@ -402,20 +401,26 @@ func (m *RandomStreamClient) Heartbeat(
return streampb.StreamReplicationStatus{}, nil
}

// TestTableCreator converts a SQL string to a table descriptor for test purposes.
var TestTableCreator func(
ctx context.Context,
parentID, id descpb.ID,
schema string,
privileges *catpb.PrivilegeDescriptor,
) (*tabledesc.Mutable, error)

// getDescriptorAndNamespaceKVForTableID returns the namespace and descriptor
// KVs for the table with tableID.
func getDescriptorAndNamespaceKVForTableID(
config randomStreamConfig, tableID descpb.ID,
) (*tabledesc.Mutable, []roachpb.KeyValue, error) {
tableName := fmt.Sprintf("%s%d", IngestionTablePrefix, tableID)
testTable, err := sql.CreateTestTableDescriptor(
testTable, err := TestTableCreator(
context.Background(),
IngestionDatabaseID,
tableID,
fmt.Sprintf(RandomStreamSchemaPlaceholder, tableName),
catpb.NewBasePrivilegeDescriptor(username.RootUserName()),
nil, /* txn */
nil, /* collection */
)
if err != nil {
return nil, nil, err
Expand Down
15 changes: 15 additions & 0 deletions pkg/sql/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,14 @@ package sql
import (
"context"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamingest"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamproducer"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/streaming"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -70,3 +74,14 @@ func (p *planner) CompleteReplicationStream(
) error {
return streamproducer.CompleteReplicationStream(ctx, p, streamID, successfulIngestion)
}

func init() {
streamclient.TestTableCreator = func(
ctx context.Context,
parentID, id descpb.ID,
schema string,
privileges *catpb.PrivilegeDescriptor,
) (*tabledesc.Mutable, error) {
return CreateTestTableDescriptor(ctx, parentID, id, schema, privileges, nil /* txn */, nil /* collection */)
}
}

0 comments on commit 9ce2d85

Please sign in to comment.