Skip to content

Commit

Permalink
backfill: slightly improve deduplication for forward backfills
Browse files Browse the repository at this point in the history
  • Loading branch information
tulir committed Aug 15, 2024
1 parent 82df896 commit dea1a87
Showing 1 changed file with 22 additions and 10 deletions.
32 changes: 22 additions & 10 deletions pkg/connector/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,18 +63,23 @@ func (gc *GMClient) FetchMessages(ctx context.Context, params bridgev2.FetchMess
if params.Cursor != "" {
cursor, _ = parsePaginationCursor(params.Cursor)
}
if !params.Forward && params.AnchorMessage != nil {
msgID, err := gc.ParseMessageID(params.AnchorMessage.ID)
var anchorTS time.Time
var anchorMsgID string
if params.AnchorMessage != nil {
anchorMsgID, err = gc.ParseMessageID(params.AnchorMessage.ID)
if err != nil {
return nil, fmt.Errorf("failed to parse anchor message ID: %w", err)
}
tsMilli := params.AnchorMessage.Timestamp.UnixMilli()
anchorMsgCursor = &gmproto.Cursor{
LastItemID: msgID,
LastItemTimestamp: tsMilli,
}
if cursor == nil || tsMilli < cursor.LastItemTimestamp {
cursor = anchorMsgCursor
anchorTS = params.AnchorMessage.Timestamp
if !params.Forward {
tsMilli := anchorTS.UnixMilli()
anchorMsgCursor = &gmproto.Cursor{
LastItemID: anchorMsgID,
LastItemTimestamp: tsMilli,
}
if cursor == nil || tsMilli < cursor.LastItemTimestamp {
cursor = anchorMsgCursor
}
}
}
resp, err := gc.Client.FetchMessages(convID, int64(params.Count), cursor)
Expand All @@ -99,10 +104,17 @@ func (gc *GMClient) FetchMessages(ctx context.Context, params bridgev2.FetchMess
}
for _, msg := range resp.Messages {
msgTS := time.UnixMicro(msg.Timestamp)
log := zerolog.Ctx(ctx).With().Str("message_id", msg.MessageID).Time("message_ts", msgTS).Logger()
if !params.Forward && cursor != nil && msgTS.UnixMilli() >= cursor.LastItemTimestamp {
log.Debug().Int64("cursor_ms", cursor.LastItemTimestamp).Msg("Ignoring message newer than cursor")
continue
} else if params.Forward && msgTS.Before(anchorTS) || anchorMsgID == msg.MessageID {
log.Debug().
Time("anchor_ts", anchorTS).
Str("anchor_message_id", anchorMsgID).
Msg("Ignoring message older than anchor message")
continue
}
log := zerolog.Ctx(ctx).With().Str("message_id", msg.MessageID).Time("message_ts", msgTS).Logger()
ctx := log.WithContext(ctx)
sender := gc.getEventSenderFromMessage(msg)
intent := params.Portal.GetIntentFor(ctx, sender, gc.UserLogin, bridgev2.RemoteEventBackfill)
Expand Down

0 comments on commit dea1a87

Please sign in to comment.