Skip to content

Commit

Permalink
In disk_channel queues synchronously push to disk on shutdown (#18415) (
Browse files Browse the repository at this point in the history
#18788)

Partial Backport of #18415

Instead of using an asynchronous goroutine to push to disk on shutdown
just close the datachan and immediately push to the disk.

Prevents messages of incompletely flushed queues.

Signed-off-by: Andrew Thornton <art27@cantab.net>

Co-authored-by: Lunny Xiao <xiaolunwen@gmail.com>
  • Loading branch information
zeripath and lunny authored Feb 22, 2022
1 parent 86c3481 commit 382101e
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 12 deletions.
8 changes: 5 additions & 3 deletions modules/queue/queue_bytefifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,11 @@ loop:
}
}

var errQueueEmpty = fmt.Errorf("empty queue")
var errEmptyBytes = fmt.Errorf("empty bytes")
var errUnmarshal = fmt.Errorf("failed to unmarshal")
var (
errQueueEmpty = fmt.Errorf("empty queue")
errEmptyBytes = fmt.Errorf("empty bytes")
errUnmarshal = fmt.Errorf("failed to unmarshal")
)

func (q *ByteFIFOQueue) doPop() error {
q.lock.Lock()
Expand Down
2 changes: 1 addition & 1 deletion modules/queue/queue_disk_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,8 @@ func (q *PersistableChannelQueue) Shutdown() {
q.channelQueue.Wait()
q.internal.(*LevelQueue).Wait()
// Redirect all remaining data in the chan to the internal channel
close(q.channelQueue.dataChan)
log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name)
close(q.channelQueue.dataChan)
for data := range q.channelQueue.dataChan {
_ = q.internal.Push(data)
atomic.AddInt64(&q.channelQueue.numInQueue, -1)
Expand Down
1 change: 0 additions & 1 deletion modules/queue/queue_disk_channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,5 +188,4 @@ func TestPersistableChannelQueue(t *testing.T) {
for _, callback := range callbacks {
callback()
}

}
13 changes: 6 additions & 7 deletions modules/queue/unique_queue_disk_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,13 +238,12 @@ func (q *PersistableChannelUniqueQueue) Shutdown() {
q.channelQueue.Wait()
q.internal.(*LevelUniqueQueue).Wait()
// Redirect all remaining data in the chan to the internal channel
go func() {
log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name)
for data := range q.channelQueue.dataChan {
_ = q.internal.Push(data)
}
log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
}()
close(q.channelQueue.dataChan)
log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name)
for data := range q.channelQueue.dataChan {
_ = q.internal.Push(data)
}
log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name)

log.Debug("PersistableChannelUniqueQueue: %s Shutdown", q.delayedStarter.name)
}
Expand Down

0 comments on commit 382101e

Please sign in to comment.