From 2339a2f3dfe649bef8589c2fe1cd86ee953406cc Mon Sep 17 00:00:00 2001 From: David Robertson Date: Tue, 28 Feb 2023 16:19:33 +0000 Subject: [PATCH] More tests for leaves during a partial state resync (#617) * Unskip leave-during-resync test * Group leave-during-resync tests under supertest * Test that the original joiner can leave if someone else arrives. * Test you can rejoin even if resync didn't complete * Test someone else can join after you leave * Test that remote kicks work during resync * Test that remote bans work during resync, and you can't rejoin during resync --- ...federation_room_join_partial_state_test.go | 473 ++++++++++++++---- 1 file changed, 364 insertions(+), 109 deletions(-) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index ff930e2e..9ed3d78b 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -3359,7 +3359,7 @@ func TestPartialStateJoin(t *testing.T) { // Test that display name changes do not block during a resync. // Display name changes are represented by `m.room_membership` events with a membership of // "join", and can be confused with join events. - t.Run("Can change display name during partial state join", func(t * testing.T) { + t.Run("Can change display name during partial state join", func(t *testing.T) { alice := deployment.RegisterUser(t, "hs1", "t45alice", "secret", false) pdusChannel := make(chan *gomatrixserverlib.Event) @@ -3398,9 +3398,9 @@ func TestPartialStateJoin(t *testing.T) { case pdu := <-pdusChannel: content := gjson.ParseBytes(pdu.Content()) if pdu.Type() != "m.room.member" || - *pdu.StateKey() != alice.UserID || - content.Get("membership").Str != "join" || - content.Get("displayname").Str != "alice 2" { + *pdu.StateKey() != alice.UserID || + content.Get("membership").Str != "join" || + content.Get("displayname").Str != "alice 2" { t.Errorf("Did not receive expected display name change event: %s", pdu.JSON()) } case <-time.After(1 * time.Second): @@ -3408,118 +3408,380 @@ func TestPartialStateJoin(t *testing.T) { } }) - t.Run("Leaving during resync is seen after the resync", func(t *testing.T) { - // Before testing that leaves during resyncs are seen during resyncs, sanity - // check that leaves during resyncs appear after the resync. - t.Log("Alice begins a partial join to a room") - alice := deployment.RegisterUser(t, "hs1", "t42alice", "secret", false) - handleTransactions := federation.HandleTransactionRequests( - // Accept all PDUs and EDUs - func(e *gomatrixserverlib.Event) {}, - func(e gomatrixserverlib.EDU) {}, - ) - server := createTestServer(t, deployment, handleTransactions) - cancel := server.Listen() - defer cancel() + t.Run("Leave during resync", func(t *testing.T) { - serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t)) - t.Log("Alice partial-joins her room") - psjResult := beginPartialStateJoin(t, server, serverRoom, alice) - defer psjResult.Destroy(t) + t.Run("is seen after the resync", func(t *testing.T) { + // Before testing that leaves during resyncs are seen during resyncs, sanity + // check that leaves during resyncs appear after the resync. + alice := deployment.RegisterUser(t, "hs1", "t42alice", "secret", false) + handleTransactions := federation.HandleTransactionRequests( + // Accept all PDUs and EDUs + func(e *gomatrixserverlib.Event) {}, + func(e gomatrixserverlib.EDU) {}, + ) + server := createTestServer(t, deployment, handleTransactions) + cancel := server.Listen() + defer cancel() - t.Log("Alice waits to see her join") - aliceNextBatch := alice.MustSyncUntil( - t, - client.SyncReq{Filter: buildLazyLoadingSyncFilter(nil)}, - client.SyncJoinedTo(alice.UserID, serverRoom.RoomID), - ) + serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t)) + t.Log("Alice partial-joins her room") + psjResult := beginPartialStateJoin(t, server, serverRoom, alice) + defer psjResult.Destroy(t) - leaveCompleted := NewWaiter() - t.Log("Alice starts a leave request") - go func() { - alice.LeaveRoom(t, serverRoom.RoomID) - t.Log("Alice's leave request completed") - leaveCompleted.Finish() - }() + t.Log("Alice waits to see her join") + aliceNextBatch := alice.MustSyncUntil( + t, + client.SyncReq{Filter: buildLazyLoadingSyncFilter(nil)}, + client.SyncJoinedTo(alice.UserID, serverRoom.RoomID), + ) - // We want Synapse to receive the leave before its resync completes. - // HACK: Use a sleep to try and ensure this. - time.Sleep(250 * time.Millisecond) - t.Log("The resync finishes") - psjResult.FinishStateRequest() + leaveCompleted := NewWaiter() + t.Log("Alice starts a leave request") + go func() { + alice.LeaveRoom(t, serverRoom.RoomID) + t.Log("Alice's leave request completed") + leaveCompleted.Finish() + }() - // Now that we've resynced, the leave call should be unblocked. - leaveCompleted.Wait(t, 1*time.Second) + t.Log("The resync finishes") + psjResult.FinishStateRequest() - t.Log("Alice waits to see her leave appear down /sync") - aliceNextBatch = alice.MustSyncUntil( - t, - client.SyncReq{Since: aliceNextBatch, Filter: buildLazyLoadingSyncFilter(nil)}, - client.SyncLeftFrom(alice.UserID, serverRoom.RoomID), - ) - }) + // Now that we've resynced, the leave call should be unblocked. + leaveCompleted.Wait(t, 1*time.Second) - t.Run("Leaving a room immediately after joining does not wait for resync", func(t *testing.T) { - t.Skip("Not yet implemented (synapse#12802)") - // Prepare to listen for leave events from the HS under test. - // We're only expecting one leave event, but give the channel extra capacity - // to avoid deadlock if the HS does something silly. - leavesChannel := make(chan *gomatrixserverlib.Event, 10) - handleTransactions := federation.HandleTransactionRequests( - func(e *gomatrixserverlib.Event) { - if e.Type() == "m.room.member" { - if ok := gjson.ValidBytes(e.Content()); !ok { - t.Fatalf("Received event %s with invalid content: %v", e.EventID(), e.Content()) - } - content := gjson.ParseBytes(e.Content()) - membership := content.Get("membership") - if membership.Exists() && membership.Str == "leave" { - leavesChannel <- e + t.Log("Alice waits to see her leave appear down /sync") + aliceNextBatch = alice.MustSyncUntil( + t, + client.SyncReq{Since: aliceNextBatch, Filter: buildLazyLoadingSyncFilter(nil)}, + client.SyncLeftFrom(alice.UserID, serverRoom.RoomID), + ) + }) + + t.Run("does not wait for resync", func(t *testing.T) { + // Prepare to listen for leave events from the HS under test. + // We're only expecting one leave event, but give the channel extra capacity + // to avoid deadlock if the HS does something silly. + leavesChannel := make(chan *gomatrixserverlib.Event, 10) + handleTransactions := federation.HandleTransactionRequests( + func(e *gomatrixserverlib.Event) { + if e.Type() == "m.room.member" { + if ok := gjson.ValidBytes(e.Content()); !ok { + t.Fatalf("Received event %s with invalid content: %v", e.EventID(), e.Content()) + } + content := gjson.ParseBytes(e.Content()) + membership := content.Get("membership") + if membership.Exists() && membership.Str == "leave" { + leavesChannel <- e + } } - } - }, - // we don't care about EDUs - func(e gomatrixserverlib.EDU) {}, - ) + }, + // we don't care about EDUs + func(e gomatrixserverlib.EDU) {}, + ) - t.Log("Alice begins a partial join to a room") - alice := deployment.RegisterUser(t, "hs1", "t43alice", "secret", false) - server := createTestServer( - t, - deployment, - handleTransactions, - ) - cancel := server.Listen() - defer cancel() + alice := deployment.RegisterUser(t, "hs1", "t43alice", "secret", false) + server := createTestServer( + t, + deployment, + handleTransactions, + ) + cancel := server.Listen() + defer cancel() - serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t)) - psjResult := beginPartialStateJoin(t, server, serverRoom, alice) - defer psjResult.Destroy(t) + serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t)) + t.Log("Alice begins a partial join to a room") + psjResult := beginPartialStateJoin(t, server, serverRoom, alice) + defer psjResult.Destroy(t) - t.Log("Alice waits to see her join") - aliceNextBatch := alice.MustSyncUntil( - t, - client.SyncReq{Filter: buildLazyLoadingSyncFilter(nil)}, - client.SyncJoinedTo(alice.UserID, serverRoom.RoomID), - ) + t.Log("Alice waits to see her join") + aliceNextBatch := alice.MustSyncUntil( + t, + client.SyncReq{Filter: buildLazyLoadingSyncFilter(nil)}, + client.SyncJoinedTo(alice.UserID, serverRoom.RoomID), + ) - t.Log("Alice leaves and waits for confirmation") - alice.LeaveRoom(t, serverRoom.RoomID) - aliceNextBatch = alice.MustSyncUntil( - t, - client.SyncReq{Since: aliceNextBatch, Filter: buildLazyLoadingSyncFilter(nil)}, - client.SyncLeftFrom(alice.UserID, serverRoom.RoomID), - ) + t.Log("Alice leaves and waits for confirmation") + alice.LeaveRoom(t, serverRoom.RoomID) + aliceNextBatch = alice.MustSyncUntil( + t, + client.SyncReq{Since: aliceNextBatch, Filter: buildLazyLoadingSyncFilter(nil)}, + client.SyncLeftFrom(alice.UserID, serverRoom.RoomID), + ) - t.Logf("Alice's leave is recieved by the resident server") - select { - case <-time.After(1 * time.Second): - t.Fatal("Resident server did not receive Alice's leave") - case e := <-leavesChannel: - if e.Sender() != alice.UserID { - t.Errorf("Unexpected leave event %s for %s", e.EventID(), e.Sender()) + t.Logf("Alice's leave is received by the resident server") + select { + case <-time.After(1 * time.Second): + t.Fatal("Resident server did not receive Alice's leave") + case e := <-leavesChannel: + if e.Sender() != alice.UserID { + t.Errorf("Unexpected leave event %s for %s", e.EventID(), e.Sender()) + } } - } + }) + + // Test that the original joiner can leave during the resync, even after someone else has joined + t.Run("works after a second partial join", func(t *testing.T) { + alice := deployment.RegisterUser(t, "hs1", "t47alice", "secret", false) + bob := deployment.RegisterUser(t, "hs1", "t47bob", "secret", false) + handleTransactions := federation.HandleTransactionRequests( + // Accept all PDUs and EDUs + func(e *gomatrixserverlib.Event) {}, + func(e gomatrixserverlib.EDU) {}, + ) + server := createTestServer(t, deployment, handleTransactions) + cancel := server.Listen() + defer cancel() + + serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t)) + t.Log("Alice partial-joins her room") + psjResult := beginPartialStateJoin(t, server, serverRoom, alice) + // At the end of the test, keep Bob in the room. Have him make a /members + // call to ensure the resync has completed. + psjResult.User = bob + defer psjResult.Destroy(t) + + t.Log("Alice sees her join") + aliceNextBatch := alice.MustSyncUntil( + t, + client.SyncReq{Filter: buildLazyLoadingSyncFilter(nil)}, + client.SyncJoinedTo(alice.UserID, serverRoom.RoomID), + ) + + t.Log("Bob joins too") + bob.JoinRoom(t, serverRoom.RoomID, []string{server.ServerName()}) + + t.Log("Bob waits to see his join") + bobNextBatch := bob.MustSyncUntil( + t, + client.SyncReq{Filter: buildLazyLoadingSyncFilter(nil)}, + client.SyncJoinedTo(bob.UserID, serverRoom.RoomID), + ) + + t.Log("Alice leaves the room") + alice.LeaveRoom(t, serverRoom.RoomID) + + t.Log("Alice sees Alice's leave") + aliceNextBatch = alice.MustSyncUntil( + t, + client.SyncReq{Since: aliceNextBatch, Filter: buildLazyLoadingSyncFilter(nil)}, + client.SyncLeftFrom(alice.UserID, serverRoom.RoomID), + ) + + t.Log("Bob sees Alice's leave") + bobNextBatch = bob.MustSyncUntil( + t, + client.SyncReq{Since: bobNextBatch, Filter: buildLazyLoadingSyncFilter(nil)}, + client.SyncLeftFrom(alice.UserID, serverRoom.RoomID), + ) + }) + + t.Run("succeeds, then rejoin succeeds without resync completing", func(t *testing.T) { + alice := deployment.RegisterUser(t, "hs1", "t48alice", "secret", false) + handleTransactions := federation.HandleTransactionRequests( + // Accept all PDUs and EDUs + func(e *gomatrixserverlib.Event) {}, + func(e gomatrixserverlib.EDU) {}, + ) + server := createTestServer(t, deployment, handleTransactions) + cancel := server.Listen() + defer cancel() + + serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t)) + t.Log("Alice partial-joins her room") + psjResult := beginPartialStateJoin(t, server, serverRoom, alice) + defer psjResult.Destroy(t) + + t.Log("Alice waits to see her join") + aliceNextBatch := alice.MustSyncUntil( + t, + client.SyncReq{Filter: buildLazyLoadingSyncFilter(nil)}, + client.SyncJoinedTo(alice.UserID, serverRoom.RoomID), + ) + + t.Log("Alice leaves the room") + alice.LeaveRoom(t, serverRoom.RoomID) + + t.Log("Alice sees Alice's leave") + aliceNextBatch = alice.MustSyncUntil( + t, + client.SyncReq{Since: aliceNextBatch, Filter: buildLazyLoadingSyncFilter(nil)}, + client.SyncLeftFrom(alice.UserID, serverRoom.RoomID), + ) + + // The resync has not completed because we have not called psjResult.FinishStateRequest() + t.Log("Alice rejoins her room") + alice.JoinRoom(t, serverRoom.RoomID, []string{server.ServerName()}) + aliceNextBatch = alice.MustSyncUntil( + t, + client.SyncReq{Since: aliceNextBatch, Filter: buildLazyLoadingSyncFilter(nil)}, + client.SyncJoinedTo(alice.UserID, serverRoom.RoomID), + ) + + }) + + t.Run("succeeds, then another user can join without resync completing", func(t *testing.T) { + alice := deployment.RegisterUser(t, "hs1", "t49alice", "secret", false) + bob := deployment.RegisterUser(t, "hs1", "t49bob", "secret", false) + handleTransactions := federation.HandleTransactionRequests( + // Accept all PDUs and EDUs + func(e *gomatrixserverlib.Event) {}, + func(e gomatrixserverlib.EDU) {}, + ) + server := createTestServer(t, deployment, handleTransactions) + cancel := server.Listen() + defer cancel() + + serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t)) + t.Log("Alice partial-joins her room") + psjResult := beginPartialStateJoin(t, server, serverRoom, alice) + // At the end of the test, keep Bob in the room. Have him make a /members + // call to ensure the resync has completed. + psjResult.User = bob + defer psjResult.Destroy(t) + + t.Log("Alice waits to see her join") + aliceNextBatch := alice.MustSyncUntil( + t, + client.SyncReq{Filter: buildLazyLoadingSyncFilter(nil)}, + client.SyncJoinedTo(alice.UserID, serverRoom.RoomID), + ) + + t.Log("Alice leaves the room") + alice.LeaveRoom(t, serverRoom.RoomID) + + t.Log("Alice sees Alice's leave") + aliceNextBatch = alice.MustSyncUntil( + t, + client.SyncReq{Since: aliceNextBatch, Filter: buildLazyLoadingSyncFilter(nil)}, + client.SyncLeftFrom(alice.UserID, serverRoom.RoomID), + ) + + // The resync has not completed because we have not called psjResult.FinishStateRequest() + t.Log("Now Bob joins the room") + bob.JoinRoom(t, serverRoom.RoomID, []string{server.ServerName()}) + bob.MustSyncUntil( + t, + client.SyncReq{Filter: buildLazyLoadingSyncFilter(nil)}, + client.SyncJoinedTo(alice.UserID, serverRoom.RoomID), + ) + + }) + + t.Run("can be triggered by remote kick", func(t *testing.T) { + alice := deployment.RegisterUser(t, "hs1", "t50alice", "secret", false) + handleTransactions := federation.HandleTransactionRequests( + // Accept all PDUs and EDUs + func(e *gomatrixserverlib.Event) {}, + func(e gomatrixserverlib.EDU) {}, + ) + server := createTestServer(t, deployment, handleTransactions) + cancel := server.Listen() + defer cancel() + + serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t)) + t.Log("Alice partial-joins her room") + psjResult := beginPartialStateJoin(t, server, serverRoom, alice) + // Alice is not joined to the room at the end of the test, so we do not + // `defer psjResult.Destroy(t)`. + + t.Log("Alice waits to see her join") + aliceNextBatch := alice.MustSyncUntil( + t, + client.SyncReq{Filter: buildLazyLoadingSyncFilter(nil)}, + client.SyncJoinedTo(alice.UserID, serverRoom.RoomID), + ) + + t.Log("A resident server user kicks Alice from the room.") + kickEvent := server.MustCreateEvent(t, serverRoom, b.Event{ + Type: "m.room.member", + StateKey: b.Ptr(alice.UserID), + Sender: server.UserID("charlie"), + Content: map[string]interface{}{"membership": "leave"}, + AuthEvents: serverRoom.EventIDsOrReferences([]*gomatrixserverlib.Event{ + serverRoom.CurrentState("m.room.create", ""), + serverRoom.CurrentState("m.room.power_levels", ""), + serverRoom.CurrentState("m.room.member", alice.UserID), + serverRoom.CurrentState("m.room.member", server.UserID("charlie")), + }), + }) + serverRoom.AddEvent(kickEvent) + server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{kickEvent.JSON()}, nil) + + // The kick occurs mid-resync, because we have not yet called + // psjResult.FinishStateRequest(). + t.Log("Alice sees that she's been kicked") + aliceNextBatch = alice.MustSyncUntil( + t, + client.SyncReq{Since: aliceNextBatch, Filter: buildLazyLoadingSyncFilter(nil)}, + client.SyncLeftFrom(alice.UserID, serverRoom.RoomID), + ) + + // Cleanup. + psjResult.FinishStateRequest() + }) + + t.Run("can be triggered by remote ban", func(t *testing.T) { + alice := deployment.RegisterUser(t, "hs1", "t51alice", "secret", false) + handleTransactions := federation.HandleTransactionRequests( + // Accept all PDUs and EDUs + func(e *gomatrixserverlib.Event) {}, + func(e gomatrixserverlib.EDU) {}, + ) + server := createTestServer(t, deployment, handleTransactions) + cancel := server.Listen() + defer cancel() + + serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t)) + t.Log("Alice partial-joins her room") + psjResult := beginPartialStateJoin(t, server, serverRoom, alice) + // Alice is not joined to the room at the end of the test, so we do not + // `defer psjResult.Destroy(t)`. + + t.Log("Alice waits to see her join") + aliceNextBatch := alice.MustSyncUntil( + t, + client.SyncReq{Filter: buildLazyLoadingSyncFilter(nil)}, + client.SyncJoinedTo(alice.UserID, serverRoom.RoomID), + ) + + t.Log("A resident server user bans Alice from the room.") + banEvent := server.MustCreateEvent(t, serverRoom, b.Event{ + Type: "m.room.member", + StateKey: b.Ptr(alice.UserID), + Sender: server.UserID("charlie"), + Content: map[string]interface{}{"membership": "ban"}, + AuthEvents: serverRoom.EventIDsOrReferences([]*gomatrixserverlib.Event{ + serverRoom.CurrentState("m.room.create", ""), + serverRoom.CurrentState("m.room.power_levels", ""), + serverRoom.CurrentState("m.room.member", alice.UserID), + serverRoom.CurrentState("m.room.member", server.UserID("charlie")), + }), + }) + serverRoom.AddEvent(banEvent) + server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{banEvent.JSON()}, nil) + + // The ban occurs mid-resync, because we have not yet called + // psjResult.FinishStateRequest(). + t.Log("Alice sees that she's been banned") + aliceNextBatch = alice.MustSyncUntil( + t, + client.SyncReq{Since: aliceNextBatch, Filter: buildLazyLoadingSyncFilter(nil)}, + // TODO: introduce a SyncBannedFrom which checks the membership of the + // leave event + client.SyncLeftFrom(alice.UserID, serverRoom.RoomID), + ) + + t.Log("Alice tries to rejoin...") + queryParams := url.Values{} + queryParams.Add("server_name", server.ServerName()) + response := alice.DoFunc(t, "POST", []string{"_matrix", "client", "v3", "join", serverRoom.RoomID}, client.WithQueries(queryParams)) + + t.Log("... but Alice was forbidden from rejoining") + must.MatchResponse(t, response, match.HTTPResponse{StatusCode: http.StatusForbidden}) + + // Cleanup. + psjResult.FinishStateRequest() + }) }) t.Run("Room stats are correctly updated once state re-sync completes", func(t *testing.T) { @@ -3631,13 +3893,6 @@ func TestPartialStateJoin(t *testing.T) { assertUserInDirectory(t, "todd", server.UserID("todd")) }) - // TODO: tests which assert that: - // - Join+Join+Leave+Leave works - // - Join+Leave+Join works - // - Join+Leave+Rejoin works - // - Join + remote kick works - // - Join + remote ban works, then cannot rejoin - t.Run("Purge during resync", func(t *testing.T) { if runtime.Homeserver != runtime.Synapse { // TOOD: Pull this into a Synapse-specific suite when someone figures out how