diff --git a/pkg/pattern/drain/drain.go b/pkg/pattern/drain/drain.go index 784beabb2a876..d783e9d1919e3 100644 --- a/pkg/pattern/drain/drain.go +++ b/pkg/pattern/drain/drain.go @@ -284,6 +284,27 @@ func (d *Drain) PatternString(c *LogCluster) string { return s } +func (d *Drain) Prune() { + d.pruneTree(d.rootNode) +} + +func (d *Drain) pruneTree(node *Node) int { + for key, child := range node.keyToChildNode { + if d.pruneTree(child) == 0 { + delete(node.keyToChildNode, key) + } + } + + validClusterIds := 0 + for _, clusterID := range node.clusterIDs { + cluster := d.idToCluster.Get(clusterID) + if cluster != nil { + validClusterIds++ + } + } + return len(node.keyToChildNode) + validClusterIds +} + func (d *Drain) Delete(cluster *LogCluster) { d.idToCluster.cache.Remove(cluster.id) } diff --git a/pkg/pattern/drain/drain_test.go b/pkg/pattern/drain/drain_test.go index 34bcf8b4c12a5..690db7da29ee8 100644 --- a/pkg/pattern/drain/drain_test.go +++ b/pkg/pattern/drain/drain_test.go @@ -6,6 +6,7 @@ import ( "os" "strings" "testing" + "time" "github.com/stretchr/testify/require" "golang.org/x/exp/slices" @@ -625,3 +626,68 @@ func TestDeduplicatePlaceholders(b *testing.T) { }) } } + +func TestDrain_PruneTreeClearsOldBranches(t *testing.T) { + t.Parallel() + tests := []struct { + name string + drain *Drain + inputLines []string + }{ + { + name: "should prune old branches", + drain: New(DefaultConfig(), nil), + inputLines: []string{ + "test test test A", + "test test test B", + "test test test C", + "test test test D", + "test test test E", + "test test test F", + "test test test G", + "my name is W", + "my name is X", + "my name is Y", + "my name is Z", + }, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + now := time.Now() + for i, line := range tt.inputLines { + ts := now.Add(time.Millisecond * time.Duration(i)) + if i < 7 { + ts = ts.Add(-time.Duration(7-i) * time.Minute) + } + tt.drain.Train(line, ts.UnixNano()) + } + + require.Len(t, tt.drain.Clusters(), 2) + require.Equal(t, 8, countNodes(tt.drain.rootNode)) + + clusters := tt.drain.Clusters() + for _, cluster := range clusters { + cluster.Prune(time.Second * 10) + if cluster.Size == 0 { + tt.drain.Delete(cluster) + } + } + require.Len(t, tt.drain.Clusters(), 1) + require.Equal(t, 8, countNodes(tt.drain.rootNode), "expected same number of nodes before pruning") + + tt.drain.Prune() + require.Len(t, tt.drain.Clusters(), 1) + require.Equal(t, 5, countNodes(tt.drain.rootNode), "expected fewer nodes after pruning") + }) + } +} + +func countNodes(node *Node) int { + total := 1 + for _, child := range node.keyToChildNode { + total += countNodes(child) + } + return total +} diff --git a/pkg/pattern/stream.go b/pkg/pattern/stream.go index ec075fece7396..fef357eb1b40f 100644 --- a/pkg/pattern/stream.go +++ b/pkg/pattern/stream.go @@ -275,6 +275,8 @@ func (s *stream) prune(olderThan time.Duration) bool { s.patterns.Delete(cluster) } } + // Clear empty branches after deleting chunks & clusters + s.patterns.Prune() chunksPruned := true if s.chunks != nil {