Skip to content

Commit

Permalink
potentially fixed long stream updates
Browse files Browse the repository at this point in the history
  • Loading branch information
OS-M committed Dec 29, 2022
1 parent 2b6b367 commit 02a681d
Showing 1 changed file with 15 additions and 8 deletions.
23 changes: 15 additions & 8 deletions pkg/utils/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,7 @@ func (logger *Logger) StartLogging(ctx context.Context, logInterval time.Duratio
return

case req := <-logger.streamUpdates: // prioritize stream updates over other channels
data := streamDataById[req.StreamId]
if data == nil {
data = new(streamData)
streamDataById[req.StreamId] = data
data.startTime = time.Now()
}
data.firstOffset = &req.FirstOffset
data.lastOffset = &req.LastOffset
streamDataById[req.StreamId] = updateStreamData(streamDataById[req.StreamId], &req)

default:
break
Expand All @@ -106,6 +99,9 @@ func (logger *Logger) StartLogging(ctx context.Context, logInterval time.Duratio
lastLoggedTime = time.Now()
t.Render()

case req := <-logger.streamUpdates:
streamDataById[req.StreamId] = updateStreamData(streamDataById[req.StreamId], &req)

case event := <-logger.streamEventsToProcess:
data := streamDataById[event.streamId]
if data != nil {
Expand Down Expand Up @@ -148,6 +144,17 @@ func (logger *Logger) StartLiveStreamUpdate(
}
}

func updateStreamData(data *streamData, req *updateStreamRequest) *streamData {
res := data
if res == nil {
res = new(streamData)
res.startTime = time.Now()
}
res.firstOffset = &req.FirstOffset
res.lastOffset = &req.LastOffset
return res
}

func divideAsFloats[T constraints.Integer](a, b T) float32 {
return float32(a) / float32(b)
}
Expand Down

0 comments on commit 02a681d

Please sign in to comment.