Skip to content

Commit

Permalink
Update add queue feature with a more changes and concurrent safety
Browse files Browse the repository at this point in the history
A feature to make it possible to add a new queue using an invocation
like `client.Queues().Add(name, config)`, similar to the API for adding
a new periodic job. If the client is started already, the new producer
is also started. Fetch and work context are the same ones created for
other producers during `Start`.

For now we totally punt on the problem of removing queues, which is more
complicated because it's a fairly hard problem on how producer stop
context cancellation should work.

A problem I was having while writing a stress test for the feature is
that test signal channels were being filled which was difficult to solve
because test signals were always initialized by `newTestClient` and
because the init also initializes each maintenance service's test
signals in turn, it's not possible to deinit them again. I had to
refactor tests such that test signals are only initialized when they're
used, which is probably better anyway.
  • Loading branch information
brandur committed Jun 30, 2024
1 parent 9332965 commit d45f42f
Show file tree
Hide file tree
Showing 7 changed files with 187 additions and 81 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Queues can be added after a client is initialized using `client.Queues().Add(queueName string, queueConfig QueueConfig)`. [PR #410](https://github.com/riverqueue/river/pull/410).

### Fixed

- Pausing or resuming a queue that was already paused or not paused respectively no longer returns `rivertype.ErrNotFound`. The same goes for pausing or resuming using the all queues string (`*`) when no queues are in the database (previously that also returned `rivertype.ErrNotFound`). [PR #408](https://github.com/riverqueue/river/pull/408).
Expand Down
145 changes: 90 additions & 55 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,27 +313,26 @@ type Client[TTx any] struct {
baseService baseservice.BaseService
baseStartStop startstop.BaseStartStop

completer jobcompleter.JobCompleter
config *Config
driver riverdriver.Driver[TTx]
elector *leadership.Elector
insertNotifyLimiter *notifylimiter.Limiter
monitor *clientMonitor
notifier *notifier.Notifier // may be nil in poll-only mode
periodicJobs *PeriodicJobBundle
producersByQueueName map[string]*producer
producersByQueueNameMu sync.Mutex
queueMaintainer *maintenance.QueueMaintainer
services []startstop.Service
subscriptionManager *subscriptionManager
stopped chan struct{}
testSignals clientTestSignals
uniqueInserter *dbunique.UniqueInserter
completer jobcompleter.JobCompleter
config *Config
driver riverdriver.Driver[TTx]
elector *leadership.Elector
insertNotifyLimiter *notifylimiter.Limiter
monitor *clientMonitor
notifier *notifier.Notifier // may be nil in poll-only mode
periodicJobs *PeriodicJobBundle
producersByQueueName map[string]*producer
queueMaintainer *maintenance.QueueMaintainer
queues *queueBundle
services []startstop.Service
stopped chan struct{}
subscriptionManager *subscriptionManager
testSignals clientTestSignals
uniqueInserter *dbunique.UniqueInserter

// workCancel cancels the context used for all work goroutines. Normal Stop
// does not cancel that context.
workCancel context.CancelCauseFunc
workContext context.Context
workCancel context.CancelCauseFunc
}

// Test-only signals.
Expand Down Expand Up @@ -487,6 +486,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
AdvisoryLockPrefix: config.AdvisoryLockPrefix,
}),
}
client.queues = &queueBundle{addProducer: client.addProducer}

baseservice.Init(archetype, &client.baseService)
client.baseService.Name = "Client" // Have to correct the name because base service isn't embedded like it usually is
Expand Down Expand Up @@ -610,26 +610,6 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
return client, nil
}

func (c *Client[TTx]) AddQueue(ctx context.Context, queueName string, queueConfig QueueConfig) error {
if err := queueConfig.validate(queueName); err != nil {
return err
}

producerInstance := c.addProducer(queueName, queueConfig)
c.producersByQueueNameMu.Lock()
c.producersByQueueName[queueName] = producerInstance
c.producersByQueueNameMu.Unlock()

fetchCtx, started := c.baseStartStop.IsStarted()
if started {
if err := producerInstance.StartWorkContext(fetchCtx, c.workContext); err != nil {
return err
}
}

return nil
}

// Start starts the client's job fetching and working loops. Once this is called,
// the client will run in a background goroutine until stopped. All jobs are
// run with a context inheriting from the provided context, but with a timeout
Expand All @@ -648,6 +628,9 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
return nil
}

c.queues.startStopMu.Lock()
defer c.queues.startStopMu.Unlock()

c.stopped = stopped

stopProducers := func() {
Expand Down Expand Up @@ -718,7 +701,7 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
// We use separate contexts for fetching and working to allow for a graceful
// stop. Both inherit from the provided context, so if it's cancelled, a
// more aggressive stop will be initiated.
c.workContext, c.workCancel = context.WithCancelCause(withClient[TTx](ctx, c))
workCtx, workCancel := context.WithCancelCause(withClient[TTx](ctx, c))

for _, service := range c.services {
if err := service.Start(fetchCtx); err != nil {
Expand All @@ -730,13 +713,17 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
for _, producer := range c.producersByQueueName {
producer := producer

if err := producer.StartWorkContext(fetchCtx, c.workContext); err != nil {
if err := producer.StartWorkContext(fetchCtx, workCtx); err != nil {
stopProducers()
stopServicesOnError()
return err
}
}

c.queues.fetchCtx = fetchCtx
c.queues.workCtx = workCtx
c.workCancel = workCancel

return nil
}(); err != nil {
defer close(stopped)
Expand All @@ -755,6 +742,9 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
// The call to Stop cancels this context. Block here until shutdown.
<-fetchCtx.Done()

c.queues.startStopMu.Lock()
defer c.queues.startStopMu.Unlock()

// On stop, have the producers stop fetching first of all.
c.baseService.Logger.DebugContext(ctx, c.baseService.Name+": Stopping producers")
stopProducers()
Expand Down Expand Up @@ -1520,23 +1510,25 @@ func (c *Client[TTx]) validateJobArgs(args JobArgs) error {
}

func (c *Client[TTx]) addProducer(queueName string, queueConfig QueueConfig) *producer {
producerInstance := newProducer(&c.baseService.Archetype, c.driver.GetExecutor(), &producerConfig{
ClientID: c.config.ID,
Completer: c.completer,
ErrorHandler: c.config.ErrorHandler,
FetchCooldown: c.config.FetchCooldown,
FetchPollInterval: c.config.FetchPollInterval,
JobTimeout: c.config.JobTimeout,
MaxWorkers: queueConfig.MaxWorkers,
Notifier: c.notifier,
Queue: queueName,
RetryPolicy: c.config.RetryPolicy,
SchedulerInterval: c.config.schedulerInterval,
StatusFunc: c.monitor.SetProducerStatus,
Workers: c.config.Workers,
producer := newProducer(&c.baseService.Archetype, c.driver.GetExecutor(), &producerConfig{
ClientID: c.config.ID,
Completer: c.completer,
ErrorHandler: c.config.ErrorHandler,
FetchCooldown: c.config.FetchCooldown,
FetchPollInterval: c.config.FetchPollInterval,
JobTimeout: c.config.JobTimeout,
MaxWorkers: queueConfig.MaxWorkers,
Notifier: c.notifier,
Queue: queueName,
QueueEventCallback: c.subscriptionManager.distributeQueueEvent,
RetryPolicy: c.config.RetryPolicy,
SchedulerInterval: c.config.schedulerInterval,
StatusFunc: c.monitor.SetProducerStatus,
Workers: c.config.Workers,
})
c.monitor.InitializeProducerStatus(queueName)
return producerInstance
c.producersByQueueName[queueName] = producer
return producer
}

var nameRegex = regexp.MustCompile(`^(?:[a-z0-9])+(?:[_|\-]?[a-z0-9]+)*$`)
Expand Down Expand Up @@ -1631,6 +1623,10 @@ func (c *Client[TTx]) JobListTx(ctx context.Context, tx TTx, params *JobListPara
// client, and can be used to add new ones or remove existing ones.
func (c *Client[TTx]) PeriodicJobs() *PeriodicJobBundle { return c.periodicJobs }

// Queues returns the currently configured set of queues for the client, and can
// be used to add new ones.
func (c *Client[TTx]) Queues() *queueBundle { return c.queues }

// QueueGet returns the queue with the given name. If the queue has not recently
// been active or does not exist, returns ErrNotFound.
//
Expand Down Expand Up @@ -1814,6 +1810,45 @@ func (c *Client[TTx]) QueueResumeTx(ctx context.Context, tx TTx, name string, op
return nil
}

// queueBundle is a bundle for adding additional queues. It's made accessible
// through Client.Queues.
type queueBundle struct {
// Function that adds a producer to the associated client.
addProducer func(queueName string, queueConfig QueueConfig) *producer

fetchCtx context.Context //nolint:containedctx

// Mutex that's acquired when client is starting and stopping and when a
// queue is being added so that we can be sure that a client is fully
// stopped or fully started when adding a new queue.
startStopMu sync.Mutex

workCtx context.Context //nolint:containedctx
}

// Add adds a new queue to the client. If the client is already started, a
// producer for the queue is started. Context is inherited from the one given to
// Client.Start.
func (b *queueBundle) Add(queueName string, queueConfig QueueConfig) error {
if err := queueConfig.validate(queueName); err != nil {
return err
}

b.startStopMu.Lock()
defer b.startStopMu.Unlock()

producer := b.addProducer(queueName, queueConfig)

// Start the queue if the client is already started.
if b.fetchCtx != nil && b.fetchCtx.Err() == nil {
if err := producer.StartWorkContext(b.fetchCtx, b.workCtx); err != nil {
return err
}
}

return nil
}

// Generates a default client ID using the current hostname and time.
func defaultClientID(startedAt time.Time) string {
host, _ := os.Hostname()
Expand Down
2 changes: 2 additions & 0 deletions client_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ func (m *clientMonitor) Start(ctx context.Context) error {
// uninitialized. Unlike SetProducerStatus, it does not broadcast the change
// and is only meant to be used during initial client startup.
func (m *clientMonitor) InitializeProducerStatus(queueName string) {
m.statusSnapshotMu.Lock()
defer m.statusSnapshotMu.Unlock()
m.currentSnapshot.Producers[queueName] = componentstatus.Uninitialized
}

Expand Down
Loading

0 comments on commit d45f42f

Please sign in to comment.