Skip to content

Commit

Permalink
perf: Reduce map alloc during recording offsets
Browse files Browse the repository at this point in the history
  • Loading branch information
joeirimpan committed Jul 2, 2024
1 parent 528b6fe commit d5fbef7
Showing 1 changed file with 12 additions and 11 deletions.
23 changes: 12 additions & 11 deletions internal/relay/source_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,18 +201,19 @@ func (sp *SourcePool) GetFetches(s *Server) (kgo.Fetches, error) {
// RecordOffsets records the offsets of the latest fetched records per topic.
// This is used to resume consumption on new connections/reconnections from the source during runtime.
func (sp *SourcePool) RecordOffsets(rec *kgo.Record) {
oMap := make(map[int32]kgo.Offset)
oMap[rec.Partition] = kgo.NewOffset().At(rec.Offset + 1)
if sp.offsets != nil {
if o, ok := sp.offsets[rec.Topic]; ok {
o[rec.Partition] = oMap[rec.Partition]
sp.offsets[rec.Topic] = o
} else {
sp.offsets[rec.Topic] = oMap
}
} else {
if sp.offsets == nil {
sp.offsets = make(map[string]map[int32]kgo.Offset)
sp.offsets[rec.Topic] = oMap
}

if o, ok := sp.offsets[rec.Topic]; ok {
// If the topic already exists, update the offset for the partition.
o[rec.Partition] = kgo.NewOffset().At(rec.Offset + 1)
sp.offsets[rec.Topic] = o
} else {
// If the topic does not exist, create a new map for the topic.
o := make(map[int32]kgo.Offset)
o[rec.Partition] = kgo.NewOffset().At(rec.Offset + 1)
sp.offsets[rec.Topic] = o
}
}

Expand Down

0 comments on commit d5fbef7

Please sign in to comment.