Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streamingccl: avoid passing evalCtx, txn as parameters to ingestion & replication funcs #90964

Merged
merged 1 commit into from
Nov 8, 2022

Conversation

ZhouXing19
Copy link
Collaborator

@ZhouXing19 ZhouXing19 commented Oct 31, 2022

This PR is part of the effort to eliminate usages of eval.Context.Txn.
It moves

  1. the definition of StreamIngestionManager and ReplicationStreamManager under eval;
  2. the implementation of StreamIngestionManager and ReplicationStreamManager via sql.planner.

The core changes are

// GetReplicationStreamManager returns a ReplicationStreamManager.
func (p *planner) GetReplicationStreamManager(
	ctx context.Context,
) (eval.ReplicationStreamManager, error) {
	return streaming.GetReplicationStreamManager(ctx, p.EvalContext(), p.Txn())
}

// GetStreamIngestManager returns a StreamIngestManager.
func (p *planner) GetStreamIngestManager(ctx context.Context) (eval.StreamIngestManager, error) {
	return streaming.GetStreamIngestManager(ctx, p.EvalContext(), p.Txn())
}

so that the functions under these 2 interfaces run upon eval.Context and kv.Txn from the sql.planner.

Follow-up:

  • Pass internal executor from planner too, rather than using register.ex.

informs #90923

@cockroach-teamcity
Copy link
Member

This change is Reviewable

@ZhouXing19 ZhouXing19 changed the title streamingccl, sql: moving StreamIngestionManager and ReplicationStreamManager under sql.planner streamingccl, sql: moving StreamIngestionManager and ReplicationStreamManager to sql.planner Oct 31, 2022
@ajwerner
Copy link
Contributor

This still has a cycle. Define the interfaces in question in eval. Consider moving StreamID to streamingpb and then forwarding the definition in streaming like:

package streaming

type StreamID = streamingpb.StreamID

Also, rather than embedding them in EvalPlanner, I'd prefer if you made new fields in eval.Context.

@ZhouXing19
Copy link
Collaborator Author

Define the interfaces in question in eval.

Not sure if I understood it correctly, but isn't this making eval more cumbersome? I thought we'd like to keep it lightweight so we'd prefer having things in sql.
Also, some of these functions (such as StartReplicationStream()) needs sql.ExecutorConfig for JobRegistry, ProtectedTimestampProvider and etc. I don't think eval should import sql though, no?

@ajwerner
Copy link
Contributor

Moving the interface definitions to eval doesn't make eval any more expensive to build than having eval import packages which export those interfaces. When you make eval import those other packages, it makes it easy to not realize when you transitively add new dependencies to eval.

What does "weight" mean to you in this context?

Also, some of these functions (such as StartReplicationStream()) needs sql.ExecutorConfig for JobRegistry, ProtectedTimestampProvider and etc. I don't think eval should import sql though, no?

:) No, eval cannot import sql. The idea here is that we need to refactor StartReplicationStream such that the object which implements that method already has its dependencies. In this specific case, the ReplicationStreamManger implementation ought to already have access to whatever dependencies it might need that are not the logical arguments to the function.

One way I think about it is that the eval.Context is that it's interpreter state for execution of our sql interpreter. One thing interpreter's generally have is access to builtin functions. When you invoke those builtins, your goal should be to plumb through arguments as the builtin functionality should be free-standing. Any time you find you have to plumb through dependencies as opposed to arguments, it should feel wrong.

@ajwerner
Copy link
Contributor

I'm realizing it may not be clear exactly what I'm suggesting. The implementations of these interfaces ought to be constructed from sql. Also, the implementations can be on the *planner, that's fine. I just want to see us stop growing EvalPlanner and to see us lifting dependency injection more reasonably.

@ZhouXing19
Copy link
Collaborator Author

What does "weight" mean to you in this context?

I meant the number of imported pkgs. But it was because I thought you meant having the implementation in eval too, so nvm.

Consider moving StreamID to streamingpb and then forwarding the definition in streaming

Hmm I can't find anything like StreamID in streampb. Is it ok for us to just define it as int64 in streaming/api.go?


I now created a eval.StreamManager under eval.Context so these streaming related methods won't go through eval.Planner. Is this closer to what you expected?

Comment on lines 1 to 10
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm surprised to find all this new logic here. Is it just copied from streamingest? This seems weird to me. Can you keep it there and just call that code from the sql package?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it to use those in streamingest and streamproducer, but directly calling them from sql caused this loop

.-> //pkg/sql:sql (d5981f1a3d4a46828dfe84289c55e9bfde67ac814b3f9fd2987adc5d9089163d)
|   //pkg/ccl/streamingccl/streamproducer:streamproducer (d5981f1a3d4a46828dfe84289c55e9bfde67ac814b3f9fd2987adc5d9089163d)
`-- //pkg/sql:sql (d5981f1a3d4a46828dfe84289c55e9bfde67ac814b3f9fd2987adc5d9089163d)

I made the 3rd commit to break sql's dependency on streamproducer with injection. Not very sure if it's correct though...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And here's an ugly part -- i passed different interfaces for planner as a parameter for these functions -- e.g. for CompleteStreamIngestion() it's sql.JobExecContext, for StartReplicationStream() it's sql.PlanHookState. It's because I had to adapt to the different methods of *sql.planner called within them.

@ajwerner
Copy link
Contributor

I feel like this got complicated. Consider the following:

	StreamManagerFactory StreamManagerFactory
}

type StreamManagerFactory interface {
	GetReplicationStreamManager() ReplicationStreamManager
	GetStreamIngestManager() StreamIngestManager
}

// ReplicationStreamManager represents a collection of APIs that streaming replication supports
// on the production side.
type ReplicationStreamManager interface {
	// StartReplicationStream starts a stream replication job for the specified tenant on the producer side.
	StartReplicationStream(
		ctx context.Context,
		tenantID uint64,
	) (streampb.StreamID, error)

	// HeartbeatReplicationStream sends a heartbeat to the replication stream producer, indicating
	// consumer has consumed until the given 'frontier' timestamp. This updates the producer job
	// progress and extends its life, and the new producer progress will be returned.
	// If 'frontier' is hlc.MaxTimestamp, returns the producer progress without updating it.
	HeartbeatReplicationStream(
		ctx context.Context,
		streamID streampb.StreamID,
		frontier hlc.Timestamp,
	) (streampb.StreamReplicationStatus, error)

	// StreamPartition starts streaming replication on the producer side for the partition specified
	// by opaqueSpec which contains serialized streampb.StreamPartitionSpec protocol message and
	// returns a value generator which yields events for the specified partition.
	StreamPartition(
		streamID streampb.StreamID,
		opaqueSpec []byte,
	) (ValueGenerator, error)

	// GetReplicationStreamSpec gets a stream replication spec on the producer side.
	GetReplicationStreamSpec(
		ctx context.Context,
		streamID streampb.StreamID,
	) (*streampb.ReplicationStreamSpec, error)

	// CompleteReplicationStream completes a replication stream job on the producer side.
	// 'successfulIngestion' indicates whether the stream ingestion finished successfully and
	// determines the fate of the producer job, succeeded or canceled.
	CompleteReplicationStream(
		ctx context.Context,
		streamID streampb.StreamID,
		successfulIngestion bool,
	) error
}

// StreamIngestManager represents a collection of APIs that streaming replication supports
// on the ingestion side.
type StreamIngestManager interface {
	// CompleteStreamIngestion signals a running stream ingestion job to complete on the consumer side.
	CompleteStreamIngestion(
		ctx context.Context,
		ingestionJobID jobspb.JobID,
		cutoverTimestamp hlc.Timestamp,
	) error

	// GetStreamIngestionStats gets a statistics summary for a stream ingestion job.
	GetStreamIngestionStats(
		ctx context.Context,
		ingestionJobID jobspb.JobID,
	) (*streampb.StreamIngestionStats, error)
}
package streaming
// StreamID forwards the definition os streampb.StreamID.
type StreamID = streampb.StreamID

// InvalidStreamID is the zero value for StreamID corresponding to no stream.
const InvalidStreamID StreamID = 0

// GetReplicationStreamManagerHook is the hook to get access to the producer side replication APIs.
// Used by builtin functions to trigger streaming replication.
var GetReplicationStreamManagerHook func(ctx context.Context, evalCtx *eval.Context, tx *kv.Txn) (eval.ReplicationStreamManager, error)

// GetStreamIngestManagerHook is the hook to get access to the ingestion side replication APIs.
// Used by builtin functions to trigger streaming replication.
var GetStreamIngestManagerHook func(ctx context.Context, evalCtx *eval.Context, txn *kv.Txn) (eval.StreamIngestManager, error)

// GetReplicationStreamManager returns a ReplicationStreamManager if a CCL binary is loaded.
func GetReplicationStreamManager(
	ctx context.Context, evalCtx *eval.Context, txn *kv.Txn,
) (eval.ReplicationStreamManager, error) {
	if GetReplicationStreamManagerHook == nil {
		return nil, errors.New("replication streaming requires a CCL binary")
	}
	return GetReplicationStreamManagerHook(ctx, evalCtx, txn)
}

// GetStreamIngestManager returns a StreamIngestManager if a CCL binary is loaded.
func GetStreamIngestManager(
	ctx context.Context, evalCtx *eval.Context, txn *kv.Txn,
) (eval.StreamIngestManager, error) {
	if GetReplicationStreamManagerHook == nil {
		return nil, errors.New("replication streaming requires a CCL binary")
	}
	return GetStreamIngestManagerHook(ctx, evalCtx, txn)
}

Then implement just StreamManagerFactory on the planner and have that call the hooks already installed in streaming?

@ZhouXing19
Copy link
Collaborator Author

ZhouXing19 commented Oct 31, 2022

Then implement just StreamManagerFactory on the planner

I'm confused -- do we still want to define those methods in ReplicationStreamManager and StreamIngestManager in *sql.planner in this case? if yes, why do we need this factory? If no, how do we pass sql.ExecCfg and etc. to those functions?

// GetReplicationStreamManagerHook is the hook to get access to the producer side replication APIs.
// Used by builtin functions to trigger streaming replication.
var GetReplicationStreamManagerHook func(ctx context.Context, evalCtx *eval.Context, tx *kv.Txn) (eval.ReplicationStreamManager, error)

// GetStreamIngestManagerHook is the hook to get access to the ingestion side replication APIs.
// Used by builtin functions to trigger streaming replication.
var GetStreamIngestManagerHook func(ctx context.Context, evalCtx *eval.Context, txn *kv.Txn) (eval.StreamIngestManager, error)

I thought we'd like to deprecate sending an eval context to functions in ReplicationStreamManager and StreamIngestManager, so why evalCtx and txn are in the parameter list here?

@ajwerner
Copy link
Contributor

I don't care about the eval.Context or sql.ExecConfig flowing from sql->streamingccl via the hook that exists. I care about it showing up in builtins or eval.

@ZhouXing19 ZhouXing19 force-pushed the fix-stream-managers branch 4 times, most recently from bc5f111 to 0eb68d2 Compare November 1, 2022 02:45
@ZhouXing19
Copy link
Collaborator Author

ZhouXing19 commented Nov 1, 2022

It failed in TestPartialZip for having created more log files (5) than expected (1). Still looking into why
This was fixed by #91043 so I just rebased and it passed :)

@ZhouXing19 ZhouXing19 marked this pull request as ready for review November 1, 2022 14:52
@ZhouXing19 ZhouXing19 requested a review from a team as a code owner November 1, 2022 14:52
@ZhouXing19 ZhouXing19 requested review from a team November 1, 2022 14:52
@ZhouXing19 ZhouXing19 requested review from a team as code owners November 1, 2022 14:52
@ZhouXing19 ZhouXing19 requested review from rhu713 and removed request for a team November 1, 2022 14:52
…ager to eval

This commit:

1. moves the definition of StreamIngestionManager and ReplicationStreamManager
to eval;
2. has planner implements functions in StreamIngestionManager and
ReplicationStreamManager, so that they won't take eval.Context and evalCtx.Txn
as parameters.

Release note: None
@ZhouXing19
Copy link
Collaborator Author

This PR is ready for another look.

Note that I haven't covered changes with the internal executor yet, as I'm leaning towards having another PR to deprecate registry.ex and storage.ex altogether.

Copy link
Contributor

@ajwerner ajwerner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm: @stevendanna can you sign off on this/signal your awareness?

@ZhouXing19, please update the PR title before merging.

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @rhu713)

@ZhouXing19 ZhouXing19 changed the title streamingccl, sql: moving StreamIngestionManager and ReplicationStreamManager to sql.planner streamingccl: avoid passing evalCtx, txn as parameter to ingestion & replication funcs Nov 7, 2022
@ZhouXing19 ZhouXing19 changed the title streamingccl: avoid passing evalCtx, txn as parameter to ingestion & replication funcs streamingccl: avoid passing evalCtx, txn as parameters to ingestion & replication funcs Nov 7, 2022
Copy link
Collaborator Author

@ZhouXing19 ZhouXing19 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ZhouXing19, please update the PR title before merging.

Done

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @ajwerner and @rhu713)

Copy link
Collaborator

@stevendanna stevendanna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the ping. Overall this seems like a good thing.

One question I have is whether moving StreamID to streampb was necessary to resolve a dependency conflict. If not, it is the only bit of this I might reconsider just so we don't have CCL packages being imported in non-test code pkg/sql.

@ZhouXing19
Copy link
Collaborator Author

One question I have is whether moving StreamID to streampb was necessary to resolve a dependency conflict.

We definitely don't want to use the one in the streaming pkg for eval. ReplicationStreamManager and eval. StreamIngestManager. Do you think it makes sense to create a new pkg under streaming for it? Such as streaming/streamid/stream_id.go.

@ajwerner
Copy link
Contributor

ajwerner commented Nov 8, 2022

If not, it is the only bit of this I might reconsider just so we don't have CCL packages being imported in non-test code pkg/sql.

I don't really get this in that the streampb code was being imported transitively by sql via many other things, no?

 bazel query 'somepath(//pkg/sql,  //pkg/ccl/streamingccl/streampb)'
//pkg/sql:sql
//pkg/sql/sem/builtins:builtins
//pkg/streaming:streaming
//pkg/ccl/streamingccl/streampb:streampb

I couldn't care less about the direct import vs indirect.

#91005

@ajwerner
Copy link
Contributor

ajwerner commented Nov 8, 2022

If you did want to move it, I think sql/sem/catid would be fine too. That thing is just ID constant types.

Copy link
Collaborator

@stevendanna stevendanna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having #91005 open works for me.

Thanks for this cleanup!

@ZhouXing19
Copy link
Collaborator Author

Thanks all for reviewing!
bors r+

@craig craig bot merged commit 2e90394 into cockroachdb:master Nov 8, 2022
@craig
Copy link
Contributor

craig bot commented Nov 8, 2022

Build succeeded:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants