From d5fbef7c9d4ef71a8115251205732295055a3c0e Mon Sep 17 00:00:00 2001 From: Joe Paul Date: Tue, 2 Jul 2024 11:01:59 +0530 Subject: [PATCH] perf: Reduce map alloc during recording offsets --- internal/relay/source_pool.go | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/internal/relay/source_pool.go b/internal/relay/source_pool.go index 9442aac..8f12ca8 100644 --- a/internal/relay/source_pool.go +++ b/internal/relay/source_pool.go @@ -201,18 +201,19 @@ func (sp *SourcePool) GetFetches(s *Server) (kgo.Fetches, error) { // RecordOffsets records the offsets of the latest fetched records per topic. // This is used to resume consumption on new connections/reconnections from the source during runtime. func (sp *SourcePool) RecordOffsets(rec *kgo.Record) { - oMap := make(map[int32]kgo.Offset) - oMap[rec.Partition] = kgo.NewOffset().At(rec.Offset + 1) - if sp.offsets != nil { - if o, ok := sp.offsets[rec.Topic]; ok { - o[rec.Partition] = oMap[rec.Partition] - sp.offsets[rec.Topic] = o - } else { - sp.offsets[rec.Topic] = oMap - } - } else { + if sp.offsets == nil { sp.offsets = make(map[string]map[int32]kgo.Offset) - sp.offsets[rec.Topic] = oMap + } + + if o, ok := sp.offsets[rec.Topic]; ok { + // If the topic already exists, update the offset for the partition. + o[rec.Partition] = kgo.NewOffset().At(rec.Offset + 1) + sp.offsets[rec.Topic] = o + } else { + // If the topic does not exist, create a new map for the topic. + o := make(map[int32]kgo.Offset) + o[rec.Partition] = kgo.NewOffset().At(rec.Offset + 1) + sp.offsets[rec.Topic] = o } }