Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Protect against dropped databases #449

Merged
merged 4 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions sync3/extensions/todevice.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,30 @@ func (r *ToDeviceRequest) ProcessInitial(ctx context.Context, res *Response, ext
r.Limit = 100 // default to 100
}
l := logger.With().Str("user", extCtx.UserID).Str("device", extCtx.DeviceID).Logger()

mapMu.Lock()
lastSentPos, exists := deviceIDToSinceDebugOnly[extCtx.DeviceID]
internal.Logf(ctx, "to_device", "since=%v limit=%v last_sent=%v", r.Since, r.Limit, lastSentPos)
isFirstRequest := !exists
mapMu.Unlock()

// If this is the first time we've seen this device ID since starting up, ignore the client-provided 'since'
// value. This is done to protect against dropped postgres sequences. Consider:
// - 5 to-device messages arrive for Alice
// - Alice requests all messages, gets them and acks them so since=5, and the nextval() sequence is 6.
// - the server admin drops the DB and starts over again. The DB sequence starts back at 1.
// - 2 to-device messages arrive for Alice
// - Alice requests messages from since=5. No messages are returned as the 2 new messages have a lower sequence number.
// - Even worse, those 2 messages are deleted because sending since=5 ACKNOWLEDGES all messages <=5.
// By ignoring the first `since` on startup, we effectively force the client into sending since=0. In this scenario,
// it will then A) not delete anything as since=0 acknowledges nothing, B) return the 2 to-device events.
//
// The cost to this is that it is possible to send duplicate to-device events if the server restarts before the client
// has time to send the ACK to the server. This isn't fatal as clients do suppress duplicate to-device events.
if isFirstRequest {
r.Since = ""
}

var from int64
var err error
if r.Since != "" {
Expand All @@ -82,10 +106,7 @@ func (r *ToDeviceRequest) ProcessInitial(ctx context.Context, res *Response, ext
internal.GetSentryHubFromContextOrDefault(ctx).CaptureException(err)
}
}
mapMu.Lock()
lastSentPos := deviceIDToSinceDebugOnly[extCtx.DeviceID]
internal.Logf(ctx, "to_device", "since=%v limit=%v last_sent=%v", r.Since, r.Limit, lastSentPos)
mapMu.Unlock()

if from < lastSentPos {
// we told the client about a newer position, but yet they are using an older position, yell loudly
l.Warn().Int64("last_sent", lastSentPos).Int64("recv", from).Bool("initial", extCtx.IsInitial).Msg(
Expand Down
53 changes: 53 additions & 0 deletions tests-integration/extensions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package syncv3

import (
"encoding/json"
"fmt"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -443,6 +445,57 @@ func TestExtensionToDevice(t *testing.T) {
m.MatchResponse(t, res, m.MatchList("a", m.MatchV3Count(0)), m.MatchToDeviceMessages(newToDeviceMsgs))
}

// Test that if you sync with a very very high numbered since value, we return lower numbered entries.
// This guards against dropped databases.
func TestExtensionToDeviceSequence(t *testing.T) {
pqString := testutils.PrepareDBConnectionString()
// setup code
v2 := runTestV2Server(t)
v3 := runTestServer(t, v2, pqString)
defer v2.close()
defer v3.close()
alice := "@TestExtensionToDeviceSequence_alice:localhost"
aliceToken := "ALICE_BEARER_TOKEN_TestExtensionToDeviceSequence"
v2.addAccount(t, alice, aliceToken)
toDeviceMsgs := []json.RawMessage{
json.RawMessage(`{"sender":"alice","type":"something","content":{"foo":"1"}}`),
json.RawMessage(`{"sender":"alice","type":"something","content":{"foo":"2"}}`),
json.RawMessage(`{"sender":"alice","type":"something","content":{"foo":"3"}}`),
json.RawMessage(`{"sender":"alice","type":"something","content":{"foo":"4"}}`),
}
v2.queueResponse(alice, sync2.SyncResponse{
ToDevice: sync2.EventsResponse{
Events: toDeviceMsgs,
},
})

hiSince := 999999
res := v3.mustDoV3Request(t, aliceToken, sync3.Request{
Lists: map[string]sync3.RequestList{"a": {
Ranges: sync3.SliceRanges{
[2]int64{0, 10}, // doesn't matter
},
}},
Extensions: extensions.Request{
ToDevice: &extensions.ToDeviceRequest{
Core: extensions.Core{Enabled: &boolTrue},
Since: fmt.Sprintf("%d", hiSince),
},
},
})
m.MatchResponse(t, res, m.MatchList("a", m.MatchV3Count(0)), m.MatchToDeviceMessages(toDeviceMsgs), func(res *sync3.Response) error {
// ensure that we return a lower numbered since token
got, err := strconv.Atoi(res.Extensions.ToDevice.NextBatch)
if err != nil {
return err
}
if got >= hiSince {
return fmt.Errorf("next_batch got %v wanted lower than %v", got, hiSince)
}
return nil
})
}

// tests that the account data extension works:
// 1- check global account data is sent on first connection
// 2- check global account data updates are proxied through
Expand Down
Loading