Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: Test that /messages works on remote homeserver and can be backfilled properly after many batches (MSC2716) #214

Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
8099f47
Test that /messages can be backfilled properly after many batches
MadLittleMods Oct 19, 2021
094c5f7
Add message suffix to better distinguish messages
MadLittleMods Oct 21, 2021
e30bcd4
v4 room version
MadLittleMods Oct 30, 2021
1e333d6
Make sure marker event is sent
MadLittleMods Nov 3, 2021
2022b31
Cleanup tests
MadLittleMods Nov 3, 2021
d325349
Some test cleanup and comments
MadLittleMods Nov 3, 2021
2fe5180
Delays not needed for test servers (get result ASAP)
MadLittleMods Nov 11, 2021
0604564
Fix typo
MadLittleMods Nov 11, 2021
83adbe2
Merge branch 'master' into madlittlemods/test-backfill-and-messages-s…
MadLittleMods Nov 11, 2021
4aba836
Fix missing params after merge
MadLittleMods Nov 11, 2021
ffbca43
Make sure historical state doesn't appear between batches
MadLittleMods Dec 2, 2021
37109fa
Re-use JSONArrayEach
MadLittleMods Dec 2, 2021
cc7236b
Merge branch 'master' into madlittlemods/test-backfill-and-messages-s…
MadLittleMods Dec 17, 2021
4c8284a
v4 was never merged
MadLittleMods Dec 17, 2021
9b90429
Merge branch 'master' into madlittlemods/test-backfill-and-messages-s…
MadLittleMods Dec 17, 2021
677836b
Merge branch 'master' into madlittlemods/test-backfill-and-messages-s…
MadLittleMods Jan 13, 2022
85eb7bd
Merge branch 'main' into madlittlemods/test-backfill-and-messages-sti…
MadLittleMods Mar 30, 2022
3532821
Merge branch 'main' into madlittlemods/test-backfill-and-messages-sti…
MadLittleMods May 13, 2022
1667e15
Merge branch 'main' into madlittlemods/test-backfill-and-messages-sti…
MadLittleMods Aug 10, 2022
0589546
Merge branch 'main' into madlittlemods/test-backfill-and-messages-sti…
MadLittleMods Sep 20, 2022
606197a
Update test name
MadLittleMods Sep 21, 2022
d679384
Changes and debugging
MadLittleMods Sep 29, 2022
230c46e
Mulitply timeout by the number of requests we expect
MadLittleMods Sep 29, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func (c *CSAPI) DoFunc(t *testing.T, method string, paths []string, opts ...Requ
}
// debug log the request
if c.Debug {
t.Logf("Making %s request to %s", method, reqURL)
t.Logf("Making %s request to %s", method, req.URL)
kegsay marked this conversation as resolved.
Show resolved Hide resolved
contentType := req.Header.Get("Content-Type")
if contentType == "application/json" || strings.HasPrefix(contentType, "text/") {
if req.Body != nil {
Expand Down
190 changes: 168 additions & 22 deletions tests/msc2716_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ var createPrivateRoomOpts = map[string]interface{}{
func TestImportHistoricalMessages(t *testing.T) {
deployment := Deploy(t, b.BlueprintHSWithApplicationService)
defer deployment.Destroy(t)
//defer time.Sleep(2 * time.Hour)
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

// Create the application service bridge user that is able to import historical messages
asUserID := "@the-bridge-user:hs1"
Expand Down Expand Up @@ -98,7 +99,7 @@ func TestImportHistoricalMessages(t *testing.T) {
//
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
// Create the first batch including the "live" event we are going to
// import our historical events next to.
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 2)
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 2, "eventIDsBefore")
eventIdBefore := eventIDsBefore[len(eventIDsBefore)-1]
timeAfterEventBefore := time.Now()

Expand All @@ -110,7 +111,7 @@ func TestImportHistoricalMessages(t *testing.T) {
// Create the second batch of events.
// This will also fill up the buffer so we have to scrollback to the
// inserted history later.
eventIDsAfter := createMessagesInRoom(t, alice, roomID, 2)
eventIDsAfter := createMessagesInRoom(t, alice, roomID, 2, "eventIDsAfter")

// Insert the most recent batch of historical messages
insertTime0 := timeAfterEventBefore.Add(timeBetweenMessages * 3)
Expand Down Expand Up @@ -214,7 +215,7 @@ func TestImportHistoricalMessages(t *testing.T) {
alice.JoinRoom(t, roomID, nil)

// Create the "live" event we are going to insert our historical events next to
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1)
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore")
eventIdBefore := eventIDsBefore[0]
timeAfterEventBefore := time.Now()

Expand Down Expand Up @@ -255,19 +256,74 @@ func TestImportHistoricalMessages(t *testing.T) {
})
})

t.Run("Backfill still works after many batches are imported", func(t *testing.T) {
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
t.Parallel()
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

roomID := as.CreateRoom(t, createPublicRoomOpts)
alice.JoinRoom(t, roomID, nil)

// Create some normal messages in the timeline
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 2, "eventIDsBefore")
eventIdBefore := eventIDsBefore[len(eventIDsBefore)-1]
timeAfterEventBefore := time.Now()

// wait X number of ms to ensure that the timestamp changes enough for
// each of the historical messages we try to import later
//numBatches := 11
numBatches := 2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lovely comment but then you seem to use 2. Why?

Copy link
Contributor Author

@MadLittleMods MadLittleMods Nov 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See:

I used numBatches=2 during testing because it's much faster to see results while developing.

#214 (comment)

Running this test against Synapse is extremely slow atm, #214 (comment)


TODO: Reminder to revert this to 11 before merge

numHistoricalMessagesPerBatch := 100
time.Sleep(time.Duration(numBatches*numHistoricalMessagesPerBatch) * timeBetweenMessages)

// eventIDsAfter
createMessagesInRoom(t, alice, roomID, 2, "eventIDsAfter")

// Import a long chain of batches connected to each other.
// We want to make sure Synapse doesn't blow up after we import
// many messages.
var expectedEventIDs []string
nextBatchID := ""
for i := 0; i < numBatches; i++ {
insertTime := timeAfterEventBefore.Add(timeBetweenMessages * time.Duration(numBatches-numHistoricalMessagesPerBatch*i))
batchSendRes := batchSendHistoricalMessages(
t,
as,
roomID,
eventIdBefore,
nextBatchID,
createJoinStateEventsForBatchSendRequest([]string{virtualUserID}, insertTime),
createMessageEventsForBatchSendRequest([]string{virtualUserID}, insertTime, numHistoricalMessagesPerBatch),
// Status
200,
)
batchSendResBody := client.ParseJSON(t, batchSendRes)
// Make sure we see all of the historical messages
expectedEventIDs = append(expectedEventIDs, client.GetJSONFieldStringArray(t, batchSendResBody, "event_ids")...)
nextBatchID = client.GetJSONFieldStr(t, batchSendResBody, "next_batch_id")
}
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved

// Make sure we see the event at the very start of the message history
expectedEventIDs = append(expectedEventIDs, eventIdBefore)

// Join the room from a remote homeserver after the historical messages were sent
remoteCharlie.JoinRoom(t, roomID, []string{"hs1"})

// Make sure events can be backfilled from the remote homeserver
paginateUntilMessageCheckOff(t, remoteCharlie, roomID, expectedEventIDs)
})

t.Run("Historical events from /batch_send do not come down in an incremental sync", func(t *testing.T) {
t.Parallel()

roomID := as.CreateRoom(t, createPublicRoomOpts)
alice.JoinRoom(t, roomID, nil)

// Create the "live" event we are going to insert our historical events next to
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1)
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore")
eventIdBefore := eventIDsBefore[0]
timeAfterEventBefore := time.Now()

// Create some "live" events to saturate and fill up the /sync response
createMessagesInRoom(t, alice, roomID, 5)
createMessagesInRoom(t, alice, roomID, 5, "live")

// Import a historical event
batchSendRes := batchSendHistoricalMessages(
Expand All @@ -286,7 +342,7 @@ func TestImportHistoricalMessages(t *testing.T) {
historicalEventId := historicalEventIDs[0]

// This is just a dummy event we search for after the historicalEventId
eventIDsAfterHistoricalImport := createMessagesInRoom(t, alice, roomID, 1)
eventIDsAfterHistoricalImport := createMessagesInRoom(t, alice, roomID, 1, "eventIDsAfterHistoricalImport")
eventIDAfterHistoricalImport := eventIDsAfterHistoricalImport[0]

// Sync until we find the eventIDAfterHistoricalImport.
Expand All @@ -309,7 +365,7 @@ func TestImportHistoricalMessages(t *testing.T) {
alice.JoinRoom(t, roomID, nil)

// Create the "live" event we are going to import our historical events next to
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1)
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore")
eventIdBefore := eventIDsBefore[0]
timeAfterEventBefore := time.Now()

Expand Down Expand Up @@ -363,7 +419,7 @@ func TestImportHistoricalMessages(t *testing.T) {
roomID := as.CreateRoom(t, createPublicRoomOpts)
alice.JoinRoom(t, roomID, nil)

eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1)
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore")
eventIdBefore := eventIDsBefore[0]
timeAfterEventBefore := time.Now()

Expand All @@ -386,7 +442,7 @@ func TestImportHistoricalMessages(t *testing.T) {
roomID := as.CreateRoom(t, createPublicRoomOpts)
alice.JoinRoom(t, roomID, nil)

eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1)
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore")
eventIdBefore := eventIDsBefore[0]
timeAfterEventBefore := time.Now()

Expand Down Expand Up @@ -417,7 +473,7 @@ func TestImportHistoricalMessages(t *testing.T) {
alice.JoinRoom(t, roomID, nil)

// Create the "live" event we are going to import our historical events next to
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1)
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore")
eventIdBefore := eventIDsBefore[0]
timeAfterEventBefore := time.Now()

Expand Down Expand Up @@ -469,12 +525,12 @@ func TestImportHistoricalMessages(t *testing.T) {
roomID := as.CreateRoom(t, createPublicRoomOpts)
alice.JoinRoom(t, roomID, nil)

eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1)
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore")
eventIdBefore := eventIDsBefore[0]
timeAfterEventBefore := time.Now()

// eventIDsAfter
createMessagesInRoom(t, alice, roomID, 3)
createMessagesInRoom(t, alice, roomID, 3, "eventIDsAfter")

batchSendRes := batchSendHistoricalMessages(
t,
Expand Down Expand Up @@ -522,7 +578,7 @@ func TestImportHistoricalMessages(t *testing.T) {
roomID := as.CreateRoom(t, createPublicRoomOpts)
alice.JoinRoom(t, roomID, nil)

eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1)
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore")
eventIdBefore := eventIDsBefore[0]
timeAfterEventBefore := time.Now()

Expand All @@ -546,7 +602,7 @@ func TestImportHistoricalMessages(t *testing.T) {
})

// eventIDsAfter
createMessagesInRoom(t, alice, roomID, 3)
createMessagesInRoom(t, alice, roomID, 3, "eventIDsAfter")

batchSendRes := batchSendHistoricalMessages(
t,
Expand Down Expand Up @@ -597,12 +653,12 @@ func TestImportHistoricalMessages(t *testing.T) {
// Join the room from a remote homeserver before any historical messages are sent
remoteCharlie.JoinRoom(t, roomID, []string{"hs1"})

eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1)
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore")
eventIdBefore := eventIDsBefore[0]
timeAfterEventBefore := time.Now()

// eventIDsAfter
createMessagesInRoom(t, alice, roomID, 10)
createMessagesInRoom(t, alice, roomID, 10, "eventIDsAfter")

// Mimic scrollback just through the latest messages
remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{
Expand Down Expand Up @@ -685,12 +741,12 @@ func TestImportHistoricalMessages(t *testing.T) {
// Join the room from a remote homeserver before any historical messages are sent
remoteCharlie.JoinRoom(t, roomID, []string{"hs1"})

eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1)
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore")
eventIdBefore := eventIDsBefore[0]
timeAfterEventBefore := time.Now()

// eventIDsAfter
createMessagesInRoom(t, alice, roomID, 3)
createMessagesInRoom(t, alice, roomID, 3, "eventIDsAfter")

// Mimic scrollback to all of the messages
// scrollbackMessagesRes
Expand Down Expand Up @@ -778,7 +834,7 @@ func TestImportHistoricalMessages(t *testing.T) {
alice.JoinRoom(t, roomID, nil)

// Create the "live" event we are going to import our historical events next to
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1)
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore")
eventIdBefore := eventIDsBefore[0]
timeAfterEventBefore := time.Now()

Expand Down Expand Up @@ -818,7 +874,7 @@ func TestImportHistoricalMessages(t *testing.T) {
alice.JoinRoom(t, roomID, nil)

// Create the "live" event we are going to import our historical events next to
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1)
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore")
eventIdBefore := eventIDsBefore[0]
timeAfterEventBefore := time.Now()

Expand Down Expand Up @@ -913,6 +969,96 @@ func fetchUntilMessagesResponseHas(t *testing.T, c *client.CSAPI, roomID string,
}
}

func paginateUntilMessageCheckOff(t *testing.T, c *client.CSAPI, roomID string, expectedEventIDs []string) {
t.Helper()
start := time.Now()

workingExpectedEventIDMap := make(map[string]string)
for _, expectedEventID := range expectedEventIDs {
workingExpectedEventIDMap[expectedEventID] = expectedEventID
}

var actualEventIDList []string
checkCounter := 0
messageResEnd := ""
generateErrorMesssageInfo := func() string {
i := 0
leftoverEventIDs := make([]string, len(workingExpectedEventIDMap))
for eventID := range workingExpectedEventIDMap {
leftoverEventIDs[i] = eventID
i++
}

return fmt.Sprintf("Called /messages %d times but only found %d/%d expected messages. Leftover messages we expected (%d): %s. We saw %d events over all of the API calls: %s",
checkCounter,
len(expectedEventIDs)-len(leftoverEventIDs),
len(expectedEventIDs),
len(leftoverEventIDs),
leftoverEventIDs,
len(actualEventIDList),
actualEventIDList,
)
}

for {
if time.Since(start) > 200*c.SyncUntilTimeout {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This timeout is currently very large to accommodate the long ~20s /messages durations. We also have to make that request 11 times during the test which adds up very fast.

Synapse really has to chug for those requests 👹 and ideally wouldn't have to modify this at all.

I would need to look into optimizing Synapse to make this fast which we should probably do anyway as this is painfully slow.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is currently set to a whopping 1000s by default

SyncUntilTimeout: 5 * time.Second,
which seems excessive even given this.

Copy link
Contributor Author

@MadLittleMods MadLittleMods Nov 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kegsay Sorry this wasn't clear as undrafting indicates "marked this pull request as ready for review" but I didn't assign you this one yet specifically because of this problem.

The test itself is good to go (timeout can be switched to normal and numBatches could be switched back to 11) and shipped but want to make it actually acceptable time-wise to run against Synapse before merging.

I used numBatches=2 during testing because it's much faster to see results while developing.

Thanks for the review pass though and I'll fix up these other spots ⏩

Copy link
Contributor Author

@MadLittleMods MadLittleMods May 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MadLittleMods what is the status of this PR? It's still marked as a Draft.

#214 (comment)

Status is still the same as the last update in this thread. It's too slow on Synapse for me to be comfortable merging it yet.

Copy link
Contributor Author

@MadLittleMods MadLittleMods May 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In terms of optimizing Synapse to make this test viable to run time-wise, I'm a bit blocked on a race condition in some recent code, matrix-org/synapse#12394 (comment) -> matrix-org/synapse#12646

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In terms of optimizing Synapse to make this test viable to run time-wise, I'm a bit blocked on a race condition in some recent code, matrix-org/synapse#12394 (comment) -> matrix-org/synapse#12646

Now matrix-org/synapse#12988 I think

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

t.Fatalf(
"paginateUntilMessageCheckOff timed out. %s",
generateErrorMesssageInfo(),
)
}

messagesRes := c.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{
"dir": []string{"b"},
// TODO: Can we do it with 100?
"limit": []string{"100"},
"from": []string{messageResEnd},
}))
messsageResBody := client.ParseJSON(t, messagesRes)

messageResEnd = client.GetJSONFieldStr(t, messsageResBody, "end")

wantKey := "chunk"
keyRes := gjson.GetBytes(messsageResBody, wantKey)
if !keyRes.Exists() {
t.Fatalf("missing key '%s'", wantKey)
}
if !keyRes.IsArray() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use JSONArrayEach?

func JSONArrayEach(wantKey string, fn func(gjson.Result) error) JSON {
It does these checks for you.

Copy link
Contributor Author

@MadLittleMods MadLittleMods Nov 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, I think it was because we can't break early with JSONArrayEach (can only break early if you throw an error which fails the test). JSONArrayEach could be refactored to have the item function return keepGoing, err but rather do that in another PR since it touches so many other tests.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the early return really a deal breaker? I don't think it'll affect runtime performance that much?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to re-use JSONArrayEach. It's not so bad ⏩

t.Fatalf("key '%s' is not an array (was %s)", wantKey, keyRes.Type)
}

events := keyRes.Array()

if len(events) == 0 {
t.Fatalf(
"paginateUntilMessageCheckOff reached the end of the messages without finding all expected events. %s",
generateErrorMesssageInfo(),
)
}

// logrus.WithFields(logrus.Fields{
// "events": events,
// "messageResEnd": messageResEnd,
// }).Error("paginateUntilMessageCheckOff chunk")
for _, ev := range events {
eventID := ev.Get("event_id").Str
actualEventIDList = append(actualEventIDList, eventID)

if _, keyExists := workingExpectedEventIDMap[eventID]; keyExists {
delete(workingExpectedEventIDMap, eventID)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this function doesn't actually check the events are in the correct order, just that they exist?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, just a checkoff function

}

if len(workingExpectedEventIDMap) == 0 {
return
}
}

checkCounter++
// Add a slight delay so we don't hammmer the messages endpoint
time.Sleep(500 * time.Millisecond)
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
}
}

func isRelevantEvent(r gjson.Result) bool {
return len(r.Get("content").Get("body").Str) > 0 ||
r.Get("type").Str == insertionEventType ||
Expand Down Expand Up @@ -1003,14 +1149,14 @@ func sendMarkerAndEnsureBackfilled(t *testing.T, as *client.CSAPI, c *client.CSA
return markerEventID
}

func createMessagesInRoom(t *testing.T, c *client.CSAPI, roomID string, count int) (eventIDs []string) {
func createMessagesInRoom(t *testing.T, c *client.CSAPI, roomID string, count int, messageSuffix string) (eventIDs []string) {
kegsay marked this conversation as resolved.
Show resolved Hide resolved
eventIDs = make([]string, count)
for i := 0; i < len(eventIDs); i++ {
newEvent := b.Event{
Type: "m.room.message",
Content: map[string]interface{}{
"msgtype": "m.text",
"body": fmt.Sprintf("Message %d", i),
"body": fmt.Sprintf("Message %d (%s)", i, messageSuffix),
},
}
newEventId := c.SendEventSynced(t, roomID, newEvent)
Expand Down