diff --git a/broker_memory.go b/broker_memory.go index 80e30a26..d2f01625 100644 --- a/broker_memory.go +++ b/broker_memory.go @@ -340,8 +340,13 @@ func (h *historyHub) add(ch string, pub *Publication, opts PublishOptions) (Stre h.nextExpireCheck = expireAt } - if h.historyMetaTTL > 0 { - removeAt := time.Now().Unix() + int64(h.historyMetaTTL.Seconds()) + historyMetaTTL := opts.HistoryMetaTTL + if historyMetaTTL == 0 { + historyMetaTTL = h.historyMetaTTL + } + + if historyMetaTTL > 0 { + removeAt := time.Now().Unix() + int64(historyMetaTTL.Seconds()) if _, ok := h.removes[ch]; !ok { heap.Push(&h.removeQueue, &priority.Item{Value: ch, Priority: removeAt}) } @@ -388,8 +393,13 @@ func (h *historyHub) get(ch string, opts HistoryOptions) ([]*Publication, Stream filter := opts.Filter - if h.historyMetaTTL > 0 { - removeAt := time.Now().Unix() + int64(h.historyMetaTTL.Seconds()) + historyMetaTTL := opts.MetaTTL + if historyMetaTTL == 0 { + historyMetaTTL = h.historyMetaTTL + } + + if historyMetaTTL > 0 { + removeAt := time.Now().Unix() + int64(historyMetaTTL.Seconds()) if _, ok := h.removes[ch]; !ok { heap.Push(&h.removeQueue, &priority.Item{Value: ch, Priority: removeAt}) } diff --git a/broker_memory_test.go b/broker_memory_test.go index ed25bbb0..aff3a46f 100644 --- a/broker_memory_test.go +++ b/broker_memory_test.go @@ -340,6 +340,46 @@ func TestMemoryHistoryHubMetaTTL(t *testing.T) { h.RUnlock() } +func TestMemoryHistoryHubMetaTTLPerChannel(t *testing.T) { + h := newHistoryHub(300*time.Second, make(chan struct{})) + h.runCleanups() + + ch1 := "channel1" + ch2 := "channel2" + pub := newTestPublication() + h.RLock() + require.Equal(t, int64(0), h.nextRemoveCheck) + h.RUnlock() + _, _ = h.add(ch1, pub, PublishOptions{HistorySize: 1, HistoryTTL: time.Second, HistoryMetaTTL: time.Second}) + _, _ = h.add(ch1, pub, PublishOptions{HistorySize: 1, HistoryTTL: time.Second, HistoryMetaTTL: time.Second}) + _, _ = h.add(ch2, pub, PublishOptions{HistorySize: 2, HistoryTTL: time.Second, HistoryMetaTTL: time.Second}) + _, _ = h.add(ch2, pub, PublishOptions{HistorySize: 2, HistoryTTL: time.Second, HistoryMetaTTL: time.Second}) + h.RLock() + require.True(t, h.nextRemoveCheck > 0) + require.Equal(t, 2, len(h.streams)) + h.RUnlock() + pubs, _, err := h.get(ch1, HistoryOptions{ + Filter: HistoryFilter{Limit: -1}, + MetaTTL: time.Second, + }) + require.NoError(t, err) + require.Len(t, pubs, 1) + pubs, _, err = h.get(ch2, HistoryOptions{ + Filter: HistoryFilter{Limit: -1}, + MetaTTL: time.Second, + }) + require.NoError(t, err) + require.Len(t, pubs, 2) + + time.Sleep(2 * time.Second) + + // test that stream cleaned up by periodic task + h.RLock() + require.Equal(t, 0, len(h.streams)) + require.Equal(t, int64(0), h.nextRemoveCheck) + h.RUnlock() +} + func TestMemoryBrokerRecover(t *testing.T) { e := testMemoryBroker() defer func() { _ = e.node.Shutdown(context.Background()) }()