diff --git a/internal/b/blueprints.go b/internal/b/blueprints.go index 70976776..6cebf7bb 100644 --- a/internal/b/blueprints.go +++ b/internal/b/blueprints.go @@ -227,14 +227,71 @@ func manyMessages(senders []string, count int) []Event { evs := make([]Event, count) for i := 0; i < len(evs); i++ { sender := senders[i%len(senders)] + + // Sprinkle in some state every so often to make it harder for the HS + if i%10 == 0 { + evs[i] = Event{ + Type: "m.room.topic", + StateKey: Ptr(""), + Content: map[string]interface{}{ + "topic": "Room topic " + strconv.Itoa(i), + }, + Sender: "@alice", + } + } else { + evs[i] = Event{ + Type: "m.room.message", + Content: map[string]interface{}{ + "body": "Hello world " + strconv.Itoa(i), + "msgtype": "m.text", + }, + Sender: sender, + } + } + } + return evs +} + +func manJoinEvents(homeserver string, users []User) []Event { + evs := make([]Event, len(users)) + for i := 0; i < len(users); i++ { + user := users[i] evs[i] = Event{ - Type: "m.room.message", + Type: "m.room.member", + StateKey: Ptr(fmt.Sprintf("%s:%s", user.Localpart, homeserver)), Content: map[string]interface{}{ - "body": "Hello world " + strconv.Itoa(i), - "msgtype": "m.text", + "membership": "join", }, - Sender: sender, + Sender: user.Localpart, } } return evs } + +func manyUsers(count int) []User { + users := make([]User, count) + + for i := 0; i < count; i++ { + localPart := fmt.Sprintf("@user_%d", i) + displayName := fmt.Sprintf("User %d", i) + deviceID := fmt.Sprintf("USERDEVICE%d", i) + + users[i] = User{ + Localpart: localPart, + DisplayName: displayName, + OneTimeKeys: 50, + DeviceID: Ptr(deviceID), + } + } + + return users +} + +func getSendersFromUsers(users []User) []string { + senders := make([]string, len(users)) + for i := 0; i < len(users); i++ { + senders[i] = users[i].Localpart + } + + return senders +} diff --git a/internal/b/perf_e2ee_room.go b/internal/b/perf_e2ee_room.go index ae171b22..8f2acf46 100644 --- a/internal/b/perf_e2ee_room.go +++ b/internal/b/perf_e2ee_room.go @@ -83,22 +83,3 @@ func memberships(count int) []Event { return events } - -func manyUsers(count int) []User { - users := make([]User, count) - - for i := 0; i < count; i++ { - localPart := fmt.Sprintf("@alice_%d", i) - displayName := fmt.Sprintf("Alice %d", i) - deviceID := fmt.Sprintf("ALICEDEVICE%d", i) - - users[i] = User{ - Localpart: localPart, - DisplayName: displayName, - OneTimeKeys: 50, - DeviceID: Ptr(deviceID), - } - } - - return users -} diff --git a/internal/b/perf_many_messages.go b/internal/b/perf_many_messages.go index 77d7360b..697c211b 100644 --- a/internal/b/perf_many_messages.go +++ b/internal/b/perf_many_messages.go @@ -14,13 +14,34 @@ package b +var manyUsersList = manyUsers(20) + +func makeEvents(homeserver string) []Event { + events := []Event{ + { + Type: "m.room.member", + StateKey: Ptr("@bob:hs1"), + Content: map[string]interface{}{ + "membership": "join", + }, + Sender: "@bob", + }, + } + events = append(events, manJoinEvents(homeserver, manyUsersList)...) + events = append(events, manyMessages(getSendersFromUsers(manyUsersList), 300)...) + + //fmt.Printf("events made %d events=%+v", len(events), events) + + return events +} + // BlueprintPerfManyMessages contains a homeserver with 2 users, who are joined to the same room with thousands of messages. var BlueprintPerfManyMessages = MustValidate(Blueprint{ Name: "perf_many_messages", Homeservers: []Homeserver{ { Name: "hs1", - Users: []User{ + Users: append([]User{ { Localpart: "@alice", DisplayName: "Alice", @@ -29,23 +50,23 @@ var BlueprintPerfManyMessages = MustValidate(Blueprint{ Localpart: "@bob", DisplayName: "Bob", }, - }, + }, manyUsersList...), Rooms: []Room{ { CreateRoom: map[string]interface{}{ "preset": "public_chat", }, Creator: "@alice", - Events: append([]Event{ - Event{ - Type: "m.room.member", - StateKey: Ptr("@bob:hs1"), - Content: map[string]interface{}{ - "membership": "join", - }, - Sender: "@bob", - }, - }, manyMessages([]string{"@alice", "@bob"}, 7000)...), + Events: makeEvents("hs1"), + }, + }, + }, + { + Name: "hs2", + Users: []User{ + { + Localpart: "@charlie", + DisplayName: "Charlie", }, }, }, diff --git a/internal/docker/builder.go b/internal/docker/builder.go index 3389862d..1ab6cdf5 100644 --- a/internal/docker/builder.go +++ b/internal/docker/builder.go @@ -123,16 +123,17 @@ func (d *Builder) removeImages() error { d.log("Not cleaning up image with tags: %v", img.RepoTags) continue } - bprintName := img.Labels["complement_blueprint"] + //bprintName := img.Labels["complement_blueprint"] + contextStr := img.Labels[complementLabel] keep := false for _, keepBprint := range d.Config.KeepBlueprints { - if bprintName == keepBprint { + if contextStr == keepBprint { keep = true break } } if keep { - d.log("Keeping image created from blueprint %s", bprintName) + d.log("Keeping image created from blueprint %s", contextStr) continue } _, err = d.Docker.ImageRemove(context.Background(), img.ID, types.ImageRemoveOptions{ @@ -179,8 +180,24 @@ func (d *Builder) ConstructBlueprintIfNotExist(bprint b.Blueprint) error { if err != nil { return fmt.Errorf("ConstructBlueprintIfNotExist(%s): failed to ImageList: %w", bprint.Name, err) } - if len(images) == 0 { - err = d.ConstructBlueprint(bprint) + + var missingHomeservers []b.Homeserver + for _, homeserver := range bprint.Homeservers { + found := false + for _, image := range images { + if image.Labels["complement_hs_name"] == homeserver.Name { + found = true + break + } + } + + if !found { + missingHomeservers = append(missingHomeservers, homeserver) + } + } + + if len(images) < len(bprint.Homeservers) { + err = d.ConstructBlueprint(bprint, missingHomeservers) if err != nil { return fmt.Errorf("ConstructBlueprintIfNotExist(%s): failed to ConstructBlueprint: %w", bprint.Name, err) } @@ -188,8 +205,8 @@ func (d *Builder) ConstructBlueprintIfNotExist(bprint b.Blueprint) error { return nil } -func (d *Builder) ConstructBlueprint(bprint b.Blueprint) error { - errs := d.construct(bprint) +func (d *Builder) ConstructBlueprint(bprint b.Blueprint, homeserversToConstruct []b.Homeserver) error { + errs := d.construct(bprint, homeserversToConstruct) if len(errs) > 0 { for _, err := range errs { d.log("could not construct blueprint: %s", err) @@ -236,7 +253,7 @@ func (d *Builder) ConstructBlueprint(bprint b.Blueprint) error { } // construct all Homeservers sequentially then commits them -func (d *Builder) construct(bprint b.Blueprint) (errs []error) { +func (d *Builder) construct(bprint b.Blueprint, homeserversToConstruct []b.Homeserver) (errs []error) { d.log("Constructing blueprint '%s'", bprint.Name) networkName, err := createNetworkIfNotExists(d.Docker, d.Config.PackageNamespace, bprint.Name) @@ -245,8 +262,8 @@ func (d *Builder) construct(bprint b.Blueprint) (errs []error) { } runner := instruction.NewRunner(bprint.Name, d.Config.BestEffort, d.Config.DebugLoggingEnabled) - results := make([]result, len(bprint.Homeservers)) - for i, hs := range bprint.Homeservers { + results := make([]result, len(homeserversToConstruct)) + for i, hs := range homeserversToConstruct { res := d.constructHomeserver(bprint.Name, runner, hs, networkName) if res.err != nil { errs = append(errs, res.err) diff --git a/tests/federation_room_messages_test.go b/tests/federation_room_messages_test.go new file mode 100644 index 00000000..036c1b4f --- /dev/null +++ b/tests/federation_room_messages_test.go @@ -0,0 +1,69 @@ +package tests + +import ( + "net/url" + "testing" + "time" + + "github.com/matrix-org/complement/internal/b" + "github.com/matrix-org/complement/internal/client" + "github.com/sirupsen/logrus" +) + +func TestMessagesOverFederation(t *testing.T) { + deployment := Deploy(t, b.BlueprintPerfManyMessages) + defer deployment.Destroy(t) + + alice := deployment.Client(t, "hs1", "@alice:hs1") + + remoteCharlie := deployment.Client(t, "hs2", "@charlie:hs2") + + t.Run("parallel", func(t *testing.T) { + t.Run("asdf", func(t *testing.T) { + t.Parallel() + + syncResult, _ := alice.MustSync(t, client.SyncReq{}) + joinedRooms := syncResult.Get("rooms.join|@keys") + roomWithManyMessages := joinedRooms.Get("0").String() + + // logrus.WithFields(logrus.Fields{ + // "joinedRooms": joinedRooms, + // "roomWithManyMessages": roomWithManyMessages, + // }).Error("asdf") + + remoteCharlie.JoinRoom(t, roomWithManyMessages, []string{"hs1"}) + + messagesRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomWithManyMessages, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ + "dir": []string{"b"}, + "limit": []string{"100"}, + })) + messagesResBody := client.ParseJSON(t, messagesRes) + eventIDs := client.GetJSONFieldStringArray(t, messagesResBody, "chunk") + // end := client.GetJSONFieldStringArray(t, messagesResBody, "end") + + logrus.WithFields(logrus.Fields{ + "joinedRooms": joinedRooms, + "roomWithManyMessages": roomWithManyMessages, + "eventIDsLength": len(eventIDs), + "eventIDs": eventIDs, + }).Error("asdf") + + // messagesRes2 := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomWithManyMessages, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ + // "dir": []string{"b"}, + // "from": end, + // "limit": []string{"500"}, + // })) + // messagesResBody2 := client.ParseJSON(t, messagesRes2) + // eventIDs2 := client.GetJSONFieldStringArray(t, messagesResBody2, "chunk") + + // logrus.WithFields(logrus.Fields{ + // "joinedRooms": joinedRooms, + // "roomWithManyMessages": roomWithManyMessages, + // "eventIDsLength": len(eventIDs2), + // "eventIDs": eventIDs2, + // }).Error("asdf2") + + time.Sleep(5 * time.Second) + }) + }) +}