diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index 01bd71ba..a08830f0 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -220,6 +220,7 @@ func TestPartialStateJoin(t *testing.T) { // we should be able to send events in the room, during the resync t.Run("CanSendEventsDuringPartialStateJoin", func(t *testing.T) { + // See https://github.com/matrix-org/synapse/issues/12997 t.Skip("Cannot yet send events during resync") alice := deployment.RegisterUser(t, "hs1", "t3alice", "secret", false) @@ -241,6 +242,317 @@ func TestPartialStateJoin(t *testing.T) { t.Logf("Alice sent event event ID %s", eventID) }) + // we should be able to receive typing EDU over federation during the resync + t.Run("CanReceiveTypingDuringPartialStateJoin", func(t *testing.T) { + deployment := Deploy(t, b.BlueprintAlice) + defer deployment.Destroy(t) + alice := deployment.Client(t, "hs1", "@alice:hs1") + + server := createTestServer(t, deployment) + cancel := server.Listen() + defer cancel() + serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t)) + psjResult := beginPartialStateJoin(t, server, serverRoom, alice) + defer psjResult.Destroy() + + // Derek starts typing in the room. + derekUserId := psjResult.Server.UserID("derek") + content, _ := json.Marshal(map[string]interface{}{ + "room_id": serverRoom.RoomID, + "user_id": derekUserId, + "typing": true, + }) + edu := gomatrixserverlib.EDU{ + Type: "m.typing", + Content: content, + } + psjResult.Server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{}, []gomatrixserverlib.EDU{edu}) + + // Alice should be able to see that Derek is typing (even though HS1 is resyncing). + aliceNextBatch := alice.MustSyncUntil(t, + client.SyncReq{ + Filter: buildLazyLoadingSyncFilter(nil), + }, + client.SyncEphemeralHas(serverRoom.RoomID, func(result gjson.Result) bool { + if result.Get("type").Str != "m.typing" { + return false + } + user_ids := result.Get("content.user_ids").Array() + if len(user_ids) != 1 { + return false + } + return user_ids[0].Str == derekUserId + }), + ) + + // Alice should still be able to see incoming PDUs in the room during + // the resync; the earlier EDU shouldn't interfere with this. + // (See https://github.com/matrix-org/synapse/issues/13684) + event := psjResult.CreateMessageEvent(t, "charlie", nil) + serverRoom.AddEvent(event) + server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{event.JSON()}, nil) + aliceNextBatch = awaitEventViaSync(t, alice, serverRoom.RoomID, event.EventID(), aliceNextBatch) + + // The resync completes. + psjResult.FinishStateRequest() + + // Derek stops typing. + content, _ = json.Marshal(map[string]interface{}{ + "room_id": serverRoom.RoomID, + "user_id": derekUserId, + "typing": false, + }) + edu = gomatrixserverlib.EDU{ + Type: "m.typing", + Content: content, + } + psjResult.Server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{}, []gomatrixserverlib.EDU{edu}) + + // Alice should be able to see that no-one is typing. + alice.MustSyncUntil(t, + client.SyncReq{ + Filter: buildLazyLoadingSyncFilter(nil), + Since: aliceNextBatch, + }, + client.SyncEphemeralHas(serverRoom.RoomID, func(result gjson.Result) bool { + return (result.Get("type").Str == "m.typing" && + result.Get("content.user_ids.#").Int() == 0) + }), + ) + + }) + + // we should be able to receive presence EDU over federation during the resync + t.Run("CanReceivePresenceDuringPartialStateJoin", func(t *testing.T) { + // See https://github.com/matrix-org/synapse/issues/13008") + t.Skip("Presence EDUs are currently dropped during a resync") + deployment := Deploy(t, b.BlueprintAlice) + defer deployment.Destroy(t) + alice := deployment.Client(t, "hs1", "@alice:hs1") + + server := createTestServer(t, deployment) + cancel := server.Listen() + defer cancel() + serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t)) + psjResult := beginPartialStateJoin(t, server, serverRoom, alice) + defer psjResult.Destroy() + + derekUserId := psjResult.Server.UserID("derek") + + content, _ := json.Marshal(map[string]interface{}{ + "push": []map[string]interface{}{ + map[string]interface{}{ + "user_id": derekUserId, + "presence": "online", + "last_active_ago": 100, + }, + }, + }) + edu := gomatrixserverlib.EDU{ + Type: "m.presence", + Content: content, + } + psjResult.Server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{}, []gomatrixserverlib.EDU{edu}) + + alice.MustSyncUntil(t, + client.SyncReq{ + Filter: buildLazyLoadingSyncFilter(nil), + }, + func(userID string, sync gjson.Result) error { + for _, e := range sync.Get("presence").Get("events").Array() { + if e.Get("sender").Str == derekUserId { + return nil + } + } + return fmt.Errorf("No presence update from %s", derekUserId) + }, + ) + + psjResult.FinishStateRequest() + }) + + // we should be able to receive to_device EDU over federation during the resync + t.Run("CanReceiveToDeviceDuringPartialStateJoin", func(t *testing.T) { + deployment := Deploy(t, b.BlueprintAlice) + defer deployment.Destroy(t) + alice := deployment.Client(t, "hs1", "@alice:hs1") + + server := createTestServer(t, deployment) + cancel := server.Listen() + defer cancel() + serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t)) + psjResult := beginPartialStateJoin(t, server, serverRoom, alice) + defer psjResult.Destroy() + + // Send a to-device message from Derek to Alice. + derekUserId := psjResult.Server.UserID("derek") + messageId := "hiezohf6Hoo7kaev" + content, _ := json.Marshal(map[string]interface{}{ + "message_id": messageId, + "sender": derekUserId, + "type": "m.test", + "messages": map[string]interface{}{ + alice.UserID: map[string]interface{}{ + "*": map[string]interface{}{}, + }, + }, + }) + edu := gomatrixserverlib.EDU{ + Type: "m.direct_to_device", + Content: content, + } + psjResult.Server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{}, []gomatrixserverlib.EDU{edu}) + + // Alice should see Derek's to-device message when she syncs. + alice.MustSyncUntil(t, + client.SyncReq{ + Filter: buildLazyLoadingSyncFilter(nil), + }, + func(userID string, sync gjson.Result) error { + for _, e := range sync.Get("to_device.events").Array() { + if e.Get("sender").Str == derekUserId && + e.Get("type").Str == "m.test" { + return nil + } + } + return fmt.Errorf("No to_device update from %s", derekUserId) + }, + ) + psjResult.FinishStateRequest() + }) + + // we should be able to receive receipt EDU over federation during the resync + t.Run("CanReceiveReceiptDuringPartialStateJoin", func(t *testing.T) { + deployment := Deploy(t, b.BlueprintAlice) + defer deployment.Destroy(t) + alice := deployment.Client(t, "hs1", "@alice:hs1") + + server := createTestServer(t, deployment) + cancel := server.Listen() + defer cancel() + serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t)) + psjResult := beginPartialStateJoin(t, server, serverRoom, alice) + defer psjResult.Destroy() + + derekUserId := psjResult.Server.UserID("derek") + + // Derek sends a read receipt into the room. + content, _ := json.Marshal(map[string]interface{}{ + serverRoom.RoomID: map[string]interface{}{ + "m.read": map[string]interface{}{ + derekUserId: map[string]interface{}{ + "data": map[string]interface{}{ + "ts": 1436451550453, + }, + "event_ids": []string{"mytesteventid"}, + }, + }, + }, + }) + edu := gomatrixserverlib.EDU{ + Type: "m.receipt", + Content: content, + } + psjResult.Server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{}, []gomatrixserverlib.EDU{edu}) + + // Alice should be able to see Derek's read receipt during the resync + alice.MustSyncUntil(t, + client.SyncReq{ + Filter: buildLazyLoadingSyncFilter(nil), + }, + client.SyncEphemeralHas(serverRoom.RoomID, func(result gjson.Result) bool { + if result.Get("type").Str != "m.receipt" { + return false + } + + if result.Get("content").Get("mytesteventid").Get("m\\.read").Get(strings.Replace(derekUserId, ".", "\\.", -1)).Get("ts").Int() == 1436451550453 { + return true + } + return false + }), + ) + psjResult.FinishStateRequest() + }) + + // we should be able to receive device list update EDU over federation during the resync + t.Run("CanReceiveDeviceListUpdateDuringPartialStateJoin", func(t *testing.T) { + deployment := Deploy(t, b.BlueprintAlice) + defer deployment.Destroy(t) + alice := deployment.Client(t, "hs1", "@alice:hs1") + + server := createTestServer(t, deployment) + cancel := server.Listen() + defer cancel() + serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t)) + psjResult := beginPartialStateJoin(t, server, serverRoom, alice) + defer psjResult.Destroy() + + derekUserId := psjResult.Server.UserID("derek") + + content, _ := json.Marshal(map[string]interface{}{ + "device_id": "QBUAZIFURK", + "stream_id": 1, + "user_id": derekUserId, + }) + edu := gomatrixserverlib.EDU{ + Type: "m.device_list_update", + Content: content, + } + aliceNextBatch := getSyncToken(t, alice) + psjResult.Server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{}, []gomatrixserverlib.EDU{edu}) + + // The resync completes. + psjResult.FinishStateRequest() + + // Check that Alice is told that Derek's devices have changed. + // (Alice does not get told this during the resync, since we can't know + // for certain who is in that room until the resync completes.) + aliceNextBatch = alice.MustSyncUntil( + t, + client.SyncReq{ + Filter: buildLazyLoadingSyncFilter(nil), + Since: aliceNextBatch, + }, + func(clientUserID string, res gjson.Result) error { + matcher := match.JSONCheckOff( + "device_lists.changed", + []interface{}{derekUserId}, + func(r gjson.Result) interface{} { return r.Str }, + nil, + ) + return matcher([]byte(res.Raw)) + }, + ) + }) + + // we should be able to receive signing key update EDU over federation during the resync + t.Run("CanReceiveSigningKeyUpdateDuringPartialStateJoin", func(t *testing.T) { + deployment := Deploy(t, b.BlueprintAlice) + defer deployment.Destroy(t) + alice := deployment.Client(t, "hs1", "@alice:hs1") + + server := createTestServer(t, deployment) + cancel := server.Listen() + defer cancel() + serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t)) + psjResult := beginPartialStateJoin(t, server, serverRoom, alice) + defer psjResult.Destroy() + + derekUserId := psjResult.Server.UserID("derek") + + content, _ := json.Marshal(map[string]interface{}{ + "user_id": derekUserId, + }) + edu := gomatrixserverlib.EDU{ + Type: "m.signing_key_update", + Content: content, + } + psjResult.Server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{}, []gomatrixserverlib.EDU{edu}) + + // If we want to check the sync we need to have an encrypted room, + // for now just check that the fed transaction is accepted. + }) + // we should be able to receive events over federation during the resync t.Run("CanReceiveEventsDuringPartialStateJoin", func(t *testing.T) { alice := deployment.RegisterUser(t, "hs1", "t4alice", "secret", false) @@ -2482,7 +2794,7 @@ func testReceiveEventDuringPartialStateJoin( "GET", []string{"_matrix", "client", "v3", "rooms", psjResult.ServerRoom.RoomID, "state", "m.room.member", "@non-existent:remote"}, ) - + // check the server's idea of the state at the event. We do this by making a `state_ids` request over federation stateReq = gomatrixserverlib.NewFederationRequest("GET", "hs1", fmt.Sprintf("/_matrix/federation/v1/state_ids/%s?event_id=%s",