-
Notifications
You must be signed in to change notification settings - Fork 95
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update add queue feature with a more changes and concurrent safety #410
Conversation
1fdf021
to
d45f42f
Compare
// through Client.Queues. | ||
type queueBundle struct { | ||
// Function that adds a producer to the associated client. | ||
addProducer func(queueName string, queueConfig QueueConfig) *producer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taking a function pointer here might seem a little weird, but it lets us not internalize a client
instance, which lets us avoid putting a TTx
generic parameter on queueBundle
.
@@ -155,8 +155,6 @@ func newTestClient(t *testing.T, dbPool *pgxpool.Pool, config *Config) *Client[p | |||
client, err := NewClient(riverpgxv5.New(dbPool), config) | |||
require.NoError(t, err) | |||
|
|||
client.testSignals.Init() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was having trouble with my stress test where the test signal channels were filling up, and unfortunately because init'ing test signals also inits test signals for all maintenance services, it means there's no way to deinit them again. The only decent answer is to not init them on every single client initialization, and move inits into test cases that need them.
@@ -57,6 +57,8 @@ func (m *clientMonitor) Start(ctx context.Context) error { | |||
// uninitialized. Unlike SetProducerStatus, it does not broadcast the change | |||
// and is only meant to be used during initial client startup. | |||
func (m *clientMonitor) InitializeProducerStatus(queueName string) { | |||
m.statusSnapshotMu.Lock() | |||
defer m.statusSnapshotMu.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Found I needed to add this mutex when trying to run my new stress test. Most other client monitor functions already lock it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. As this wasn't dynamic before it wasn't really needed.
@bgentry Mind taking a look at this and letting me know what you think? I definitely found it a little on the tricky side to guarantee we make the right decision to start or not start the new producer based on whether the client is started because it's hard to know for sure whether a client is started or stopped because it may be in the process of stopping or stopping right at this instant (therefore the addition of the new mutex). I almost think a better design would be to never start a newly added producer/queue, and then require that the caller stop and then start the client again to have it start. However, that would cause some user confusion/surprise I'm sure. |
d45f42f
to
1f11bea
Compare
@PumpkinSeed BTW, I changed a fair bit of code, but I included all your original commits in this variant. We're trying a new CLA process (similar to the CLA Assistant you may have seen on any projects) just to make sure we don't have any copyright issues on any of any contributed code. Would you mind helping us test it out by going to https://github.com/riverqueue/cla and following the steps there? It basically involves opening a PR through GitHub's UI and adding your name to a CSV file. Thanks! |
1f11bea
to
e937cb4
Compare
@bgentry Just since I'm sending 0.9.0 out for a bug fix, I thought I'd ping this one once more. If you have concerns, no worries, but if it's just that you haven't reviewed yet, may be worth including it in the release. |
e937cb4
to
378324a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM aside from these comments, particularly the Client.Queues()
return type which I think is a blocker.
client.go
Outdated
@@ -1647,6 +1644,10 @@ func (c *Client[TTx]) JobListTx(ctx context.Context, tx TTx, params *JobListPara | |||
// client, and can be used to add new ones or remove existing ones. | |||
func (c *Client[TTx]) PeriodicJobs() *PeriodicJobBundle { return c.periodicJobs } | |||
|
|||
// Queues returns the currently configured set of queues for the client, and can | |||
// be used to add new ones. | |||
func (c *Client[TTx]) Queues() *queueBundle { return c.queues } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we want to have an exported method that returns an unexported type like this. Whether or not it works, it looks bad/undiscoverable in docs.
I think the type either needs to be exported with minimal methods exposed on it, or else we need to return an exported interface type here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yeah, makes sense. I'd make it unexported initially because I had to add a TTx
generic parameter so it could internalize a client, and it was just a really ugly type. Now that that's gone, totally okay to export this.
@@ -57,6 +57,8 @@ func (m *clientMonitor) Start(ctx context.Context) error { | |||
// uninitialized. Unlike SetProducerStatus, it does not broadcast the change | |||
// and is only meant to be used during initial client startup. | |||
func (m *clientMonitor) InitializeProducerStatus(queueName string) { | |||
m.statusSnapshotMu.Lock() | |||
defer m.statusSnapshotMu.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. As this wasn't dynamic before it wasn't really needed.
A feature to make it possible to add a new queue using an invocation like `client.Queues().Add(name, config)`, similar to the API for adding a new periodic job. If the client is started already, the new producer is also started. Fetch and work context are the same ones created for other producers during `Start`. For now we totally punt on the problem of removing queues, which is more complicated because it's a fairly hard problem on how producer stop context cancellation should work. A problem I was having while writing a stress test for the feature is that test signal channels were being filled which was difficult to solve because test signals were always initialized by `newTestClient` and because the init also initializes each maintenance service's test signals in turn, it's not possible to deinit them again. I had to refactor tests such that test signals are only initialized when they're used, which is probably better anyway.
378324a
to
89da605
Compare
@bgentry Updated. Mind taking another look? |
thx. |
A feature to make it possible to add a new queue using an invocation like
client.Queues().Add(name, config)
, similar to the API for adding a newperiodic job. If the client is started already, the new producer is also
started. Fetch and work context are the same ones created for other
producers during
Start
.For now we totally punt on the problem of removing queues, which is more
complicated because it's a fairly hard problem on how producer stop context
cancellation should work.
A problem I was having while writing a stress test for the feature is that
test signal channels were being filled which was difficult to solve because
test signals were always initialized by
newTestClient
and because theinit also initializes each maintenance service's test signals in turn, it's
not possible to deinit them again. I had to refactor tests such that test
signals are only initialized when they're used, which is probably better
anyway.