From dea1a87cd647d857ed59e01e87791fd7139cbc64 Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Thu, 15 Aug 2024 14:07:31 +0300 Subject: [PATCH] backfill: slightly improve deduplication for forward backfills --- pkg/connector/backfill.go | 32 ++++++++++++++++++++++---------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/pkg/connector/backfill.go b/pkg/connector/backfill.go index 6fd32f1..9113a8d 100644 --- a/pkg/connector/backfill.go +++ b/pkg/connector/backfill.go @@ -63,18 +63,23 @@ func (gc *GMClient) FetchMessages(ctx context.Context, params bridgev2.FetchMess if params.Cursor != "" { cursor, _ = parsePaginationCursor(params.Cursor) } - if !params.Forward && params.AnchorMessage != nil { - msgID, err := gc.ParseMessageID(params.AnchorMessage.ID) + var anchorTS time.Time + var anchorMsgID string + if params.AnchorMessage != nil { + anchorMsgID, err = gc.ParseMessageID(params.AnchorMessage.ID) if err != nil { return nil, fmt.Errorf("failed to parse anchor message ID: %w", err) } - tsMilli := params.AnchorMessage.Timestamp.UnixMilli() - anchorMsgCursor = &gmproto.Cursor{ - LastItemID: msgID, - LastItemTimestamp: tsMilli, - } - if cursor == nil || tsMilli < cursor.LastItemTimestamp { - cursor = anchorMsgCursor + anchorTS = params.AnchorMessage.Timestamp + if !params.Forward { + tsMilli := anchorTS.UnixMilli() + anchorMsgCursor = &gmproto.Cursor{ + LastItemID: anchorMsgID, + LastItemTimestamp: tsMilli, + } + if cursor == nil || tsMilli < cursor.LastItemTimestamp { + cursor = anchorMsgCursor + } } } resp, err := gc.Client.FetchMessages(convID, int64(params.Count), cursor) @@ -99,10 +104,17 @@ func (gc *GMClient) FetchMessages(ctx context.Context, params bridgev2.FetchMess } for _, msg := range resp.Messages { msgTS := time.UnixMicro(msg.Timestamp) + log := zerolog.Ctx(ctx).With().Str("message_id", msg.MessageID).Time("message_ts", msgTS).Logger() if !params.Forward && cursor != nil && msgTS.UnixMilli() >= cursor.LastItemTimestamp { + log.Debug().Int64("cursor_ms", cursor.LastItemTimestamp).Msg("Ignoring message newer than cursor") + continue + } else if params.Forward && msgTS.Before(anchorTS) || anchorMsgID == msg.MessageID { + log.Debug(). + Time("anchor_ts", anchorTS). + Str("anchor_message_id", anchorMsgID). + Msg("Ignoring message older than anchor message") continue } - log := zerolog.Ctx(ctx).With().Str("message_id", msg.MessageID).Time("message_ts", msgTS).Logger() ctx := log.WithContext(ctx) sender := gc.getEventSenderFromMessage(msg) intent := params.Portal.GetIntentFor(ctx, sender, gc.UserLogin, bridgev2.RemoteEventBackfill)