diff --git a/internal/client/client.go b/internal/client/client.go index 7ba68423..8df67547 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -578,7 +578,7 @@ func NewLoggedClient(t *testing.T, hsName string, cli *http.Client) *http.Client t.Helper() if cli == nil { cli = &http.Client{ - Timeout: 30 * time.Second, + Timeout: 200 * time.Second, } } transport := cli.Transport diff --git a/tests/msc2716_test.go b/tests/msc2716_test.go index ae3a29d4..1e87c212 100644 --- a/tests/msc2716_test.go +++ b/tests/msc2716_test.go @@ -1026,6 +1026,75 @@ func TestImportHistoricalMessages(t *testing.T) { 2, ) }) + + t.Run("Backfill still works after many contiguous batches are imported", func(t *testing.T) { + t.Parallel() + + roomID := as.CreateRoom(t, createPublicRoomOpts) + alice.JoinRoom(t, roomID, nil) + + // Create some normal messages in the timeline + eventIDsBefore := createMessagesInRoom(t, alice, roomID, 2, "eventIDsBefore") + eventIdBefore := eventIDsBefore[len(eventIDsBefore)-1] + timeAfterEventBefore := time.Now() + + // We chose the magic number 11 because Synapse currently limits the + // backfill extremities to 5. 10 also seemed like a round number someone + // could pick for other homeserver implementations so I just did 10+1 to + // make sure it also worked in that case. + numBatches := 11 + numHistoricalMessagesPerBatch := 100 + // wait X number of ms to ensure that the timestamp changes enough for + // each of the historical messages we try to import later + time.Sleep(time.Duration(numBatches*numHistoricalMessagesPerBatch) * timeBetweenMessages) + + // eventIDsAfter + createMessagesInRoom(t, alice, roomID, 2, "eventIDsAfter") + + // Import a long chain of batches connected to each other. + // We want to make sure Synapse doesn't blow up after we import + // many messages. + var expectedEventIDs []string + var denyListEventIDs []string + var baseInsertionEventID string + nextBatchID := "" + for i := 0; i < numBatches; i++ { + insertTime := timeAfterEventBefore.Add(timeBetweenMessages * time.Duration(numBatches-numHistoricalMessagesPerBatch*i)) + batchSendRes := batchSendHistoricalMessages( + t, + as, + roomID, + eventIdBefore, + nextBatchID, + createJoinStateEventsForBatchSendRequest([]string{virtualUserID}, insertTime), + createMessageEventsForBatchSendRequest([]string{virtualUserID}, insertTime, numHistoricalMessagesPerBatch), + // Status + 200, + ) + batchSendResBody := client.ParseJSON(t, batchSendRes) + // Make sure we see all of the historical messages + expectedEventIDs = append(expectedEventIDs, client.GetJSONFieldStringArray(t, batchSendResBody, "event_ids")...) + // We should not find any historical state between the batches of messages + nextBatchID = client.GetJSONFieldStr(t, batchSendResBody, "next_batch_id") + + // Grab the base insertion event ID to reference later in the marker event + if i == 0 { + baseInsertionEventID = client.GetJSONFieldStr(t, batchSendResBody, "base_insertion_event_id") + } + } + + // Make sure we see the events at the very start of the message history + expectedEventIDs = append(expectedEventIDs, eventIDsBefore...) + + // Join the room from a remote homeserver after the historical messages were sent + remoteCharlie.JoinRoom(t, roomID, []string{"hs1"}) + + // Send the marker event + sendMarkerAndEnsureBackfilled(t, as, remoteCharlie, roomID, baseInsertionEventID) + + // Make sure events can be backfilled from the remote homeserver + paginateUntilMessageCheckOff(t, remoteCharlie, roomID, "", expectedEventIDs, denyListEventIDs) + }) }) t.Run("Existing room versions", func(t *testing.T) { @@ -1167,10 +1236,10 @@ func includes(needle string, haystack []string) bool { func fetchUntilMessagesResponseHas(t *testing.T, c *client.CSAPI, roomID string, check func(gjson.Result) bool) { t.Helper() start := time.Now() - checkCounter := 0 + callCounter := 0 for { if time.Since(start) > c.SyncUntilTimeout { - t.Fatalf("fetchUntilMessagesResponseHas timed out. Called check function %d times", checkCounter) + t.Fatalf("fetchUntilMessagesResponseHas timed out. Called check function %d times", callCounter) } messagesRes := c.MustDoFunc(t, "GET", []string{"_matrix", "client", "v3", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ @@ -1194,7 +1263,7 @@ func fetchUntilMessagesResponseHas(t *testing.T, c *client.CSAPI, roomID string, } } - checkCounter++ + callCounter++ // Add a slight delay so we don't hammmer the messages endpoint time.Sleep(500 * time.Millisecond) } @@ -1239,8 +1308,12 @@ func paginateUntilMessageCheckOff(t *testing.T, c *client.CSAPI, roomID string, ) } + // We grab 100 events per `/messages` request so it should only take us + // (total / 100) requests to see everything. Add +1 to add some slack + // for things like state events. + expectedNumberOfMessagesRequests := (len(expectedEventIDs) / 100) + 1 for { - if time.Since(start) > c.SyncUntilTimeout { + if time.Since(start) > time.Duration(expectedNumberOfMessagesRequests)*c.SyncUntilTimeout { t.Fatalf( "paginateUntilMessageCheckOff timed out. %s", generateErrorMesssageInfo(), @@ -1254,7 +1327,6 @@ func paginateUntilMessageCheckOff(t *testing.T, c *client.CSAPI, roomID string, })) callCounter++ messsageResBody := client.ParseJSON(t, messagesRes) - messageResEnd = client.GetJSONFieldStr(t, messsageResBody, "end") // Since the original body can only be read once, create a new one from the body bytes we just read messagesRes.Body = ioutil.NopCloser(bytes.NewBuffer(messsageResBody)) @@ -1294,6 +1366,12 @@ func paginateUntilMessageCheckOff(t *testing.T, c *client.CSAPI, roomID string, if len(workingExpectedEventIDMap) == 0 { return } + + // Since this will throw an error if they key does not exist, + // do this at the end of the loop. It's a valid scenario to be at the end + // of the room and have no more to paginate so we want the `return` when + // we've found all of the expected events to have a chance to run first. + messageResEnd = client.GetJSONFieldStr(t, messsageResBody, "end") } }