From 382101ecc734ed2f2b3f1ecf19ec20b003d057a9 Mon Sep 17 00:00:00 2001 From: zeripath Date: Tue, 22 Feb 2022 12:08:35 +0000 Subject: [PATCH] In disk_channel queues synchronously push to disk on shutdown (#18415) (#18788) Partial Backport of #18415 Instead of using an asynchronous goroutine to push to disk on shutdown just close the datachan and immediately push to the disk. Prevents messages of incompletely flushed queues. Signed-off-by: Andrew Thornton Co-authored-by: Lunny Xiao --- modules/queue/queue_bytefifo.go | 8 +++++--- modules/queue/queue_disk_channel.go | 2 +- modules/queue/queue_disk_channel_test.go | 1 - modules/queue/unique_queue_disk_channel.go | 13 ++++++------- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go index edde47a62d..c4d5d20a89 100644 --- a/modules/queue/queue_bytefifo.go +++ b/modules/queue/queue_bytefifo.go @@ -195,9 +195,11 @@ loop: } } -var errQueueEmpty = fmt.Errorf("empty queue") -var errEmptyBytes = fmt.Errorf("empty bytes") -var errUnmarshal = fmt.Errorf("failed to unmarshal") +var ( + errQueueEmpty = fmt.Errorf("empty queue") + errEmptyBytes = fmt.Errorf("empty bytes") + errUnmarshal = fmt.Errorf("failed to unmarshal") +) func (q *ByteFIFOQueue) doPop() error { q.lock.Lock() diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go index 72f330670a..199f958bc3 100644 --- a/modules/queue/queue_disk_channel.go +++ b/modules/queue/queue_disk_channel.go @@ -251,8 +251,8 @@ func (q *PersistableChannelQueue) Shutdown() { q.channelQueue.Wait() q.internal.(*LevelQueue).Wait() // Redirect all remaining data in the chan to the internal channel - close(q.channelQueue.dataChan) log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name) + close(q.channelQueue.dataChan) for data := range q.channelQueue.dataChan { _ = q.internal.Push(data) atomic.AddInt64(&q.channelQueue.numInQueue, -1) diff --git a/modules/queue/queue_disk_channel_test.go b/modules/queue/queue_disk_channel_test.go index c90d715a73..db12d9575c 100644 --- a/modules/queue/queue_disk_channel_test.go +++ b/modules/queue/queue_disk_channel_test.go @@ -188,5 +188,4 @@ func TestPersistableChannelQueue(t *testing.T) { for _, callback := range callbacks { callback() } - } diff --git a/modules/queue/unique_queue_disk_channel.go b/modules/queue/unique_queue_disk_channel.go index af42c0913d..975421a339 100644 --- a/modules/queue/unique_queue_disk_channel.go +++ b/modules/queue/unique_queue_disk_channel.go @@ -238,13 +238,12 @@ func (q *PersistableChannelUniqueQueue) Shutdown() { q.channelQueue.Wait() q.internal.(*LevelUniqueQueue).Wait() // Redirect all remaining data in the chan to the internal channel - go func() { - log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name) - for data := range q.channelQueue.dataChan { - _ = q.internal.Push(data) - } - log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name) - }() + close(q.channelQueue.dataChan) + log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name) + for data := range q.channelQueue.dataChan { + _ = q.internal.Push(data) + } + log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name) log.Debug("PersistableChannelUniqueQueue: %s Shutdown", q.delayedStarter.name) }