Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 79 additions & 22 deletions tests/msc3902/federation_room_join_partial_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,15 @@ func (s *server) AddEDUHandler(eduHandler func(gomatrixserverlib.EDU) bool) func
}
}

// WithWaitForLeave runs the given action and waits for the user to leave the room.
// WithWaitForLeave runs the given action and, when the resulting leave is
// expected to reach this server, waits for it. `leaveAction` is always run; the
// wait is skipped when `user` had already left the room (per their own
// homeserver, so the action produces no new leave) or when this server isn't in
// the room (so the leave won't be federated to us).
func (s *server) WithWaitForLeave(
Comment thread
MadLittleMods marked this conversation as resolved.
t *testing.T, room *federation.ServerRoom, userID string, leaveAction func(),
t *testing.T, room *federation.ServerRoom, user *client.CSAPI, leaveAction func(),
) {
userID := user.UserID
leaveChannel := make(chan gomatrixserverlib.PDU, 10)
removePDUHandler := s.AddPDUHandler(
func(e gomatrixserverlib.PDU) bool {
Expand All @@ -141,24 +146,76 @@ func (s *server) WithWaitForLeave(
)
defer removePDUHandler()

// We need to check if the user (on their homeserver) thinks they're in the
// room, before performing the `leaveAction` (to avoid races).
//
// If they are not in the room, then the `leaveAction` will not produce a
// new leave event and we should not wait for one.
//
// If they are in the room then the `leaveAction` will produce a new leave
// event. We then need to check if we expect this server receive the leave
// event by checking if this server is in the room. If they are, we wait, if
// not we can return immediately after the `leaveAction`.
userInRoom := userIsJoinedTo(t, user, room.RoomID)

@MadLittleMods MadLittleMods Jun 10, 2026

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Complement, the way we typically do this is using user.MustSyncUntil(t, client.SyncReq{}, client.SyncJoinedTo(user.UserID, room.RoomID))

And since we seem to only use this for the negative case and return, I'm guessing that using the typical flow and failing the test is also fine (related to the points below about programming errors)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Complement, the way we typically do this is using user.MustSyncUntil(t, client.SyncReq{Since: aliceSince}, client.SyncJoinedTo(user.UserID, room.RoomID))

That only tests whether a new join has happened, not whether the user is joined at all

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That only tests whether a new join has happened, not whether the user is joined at all

I accidentally copied an incremental sync here but it will work just fine to tell you whether you're joined to a room if you use an initial sync.

But overall, I think this review point was more in the context when I didn't understand what we're trying to accomplish with WithWaitForLeave. It's a general clean-up function (that could be run at any stage of the test in the case of failures), not something used to actually wait for someone to leave as part of the test. Probably mostly a case of this being a bad name and awkward usage.


leaveAction()

memberEvent := room.CurrentState("m.room.member", userID)
membership := ""
if memberEvent != nil {
membership, _ = memberEvent.Membership()
if !userInRoom {
// The user had already left, so the action produced no new leave and
// none is coming: don't wait.
t.Logf("%s is not joined to test room %s; not waiting for them to leave.", userID, room.RoomID)
return
}
Comment thread
MadLittleMods marked this conversation as resolved.
if membership == "leave" {
t.Logf("%s has already seen %s leave test room %s.", s.ServerName(), userID, room.RoomID)
} else {
select {
case <-leaveChannel:
t.Logf("%s saw %s leave test room %s.", s.ServerName(), userID, room.RoomID)
break
case <-time.After(1 * time.Second):
t.Errorf("%s timed out waiting for %s to leave test room %s.", s.ServerName(), userID, room.RoomID)

if !s.isInRoom(room) {
// The homeserver only federates the leave to servers that are in the
// room. If we aren't, no leave PDU is coming to us, so don't block until
// the timeout.
t.Logf("%s is not in test room %s; not waiting for %s to leave.", s.ServerName(), room.RoomID, userID)
return
}
Comment thread
MadLittleMods marked this conversation as resolved.
Comment thread
erikjohnston marked this conversation as resolved.

@MadLittleMods MadLittleMods Jun 11, 2026

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Many of the faster joins test flakes are due to the homeserver under test failing to contact Complement homeservers after they have been torn down. When this happens, subsequent tests can fail if they use a Complement homeserver that happens to have the same hostname:port as one which the homeserver under test has previously marked as offline.

-- #626

Is there anything about a federation request where our engineered homeserver can reject requests meant for some old homeserver. I'd assume keys used in the federation requests would be unique to each engineered Complement homeserver that we create and should fail authentication. Perhaps this is missing? For example, why isn't this already rejecting the stray requests:

// Check federation signature
fedReq, errResp := fclient.VerifyHTTPRequest(
req, time.Now(), srv.serverName, nil, srv.keyRing,
)
if fedReq == nil {
log.Printf(
"complement: Transaction '%s': HTTP Code %d. Invalid http request: %s",
transactionID, errResp.Code, errResp.JSON,
)
w.WriteHeader(errResp.Code)
b, _ := json.Marshal(errResp.JSON)
w.Write(b)
return
}

If this kind of thing is possible, it would eliminate the need for all of this cleanup logic while still being able to share the real homeserver deployment

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly, I haven't really looked whether there are architectural changes we could make to obviate the need for this cleanup code.

I think landing this as is makes sense to reduce the number of flakey tests we see though

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

urghh, it can make sense to merge as this pattern is already here and we can follow-up with a better strategy ⏩ . I find this existing approach so bad that I'm trying to avoid us taking this any further and trying to reason about it. Crap on top of crap.

@MadLittleMods MadLittleMods Jun 12, 2026

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've got a working solution to this in #880

As explained in the PR, I couldn't quite do what I was thinking here but there is still a good general solution to this problem by avoiding port re-use which lead to the same server names being used.

We can still merge this PR if you'd like but #880 would also solve the problem. I plan to strip all of this logic out. Just waiting on this to merge or not.


// Otherwise the action triggered the leave, which arrives as a PDU our
// handler matches. Wait on its channel rather than polling
// `room.CurrentState`: the room's current state is updated (by
// `room.AddEvent`) *before* the PDU callback runs, so returning on a
// `CurrentState` check could deregister our handler in the window before the
// callback fires, making the (expected) leave look unexpected to
// `HandleTransactionRequests`. This returns as soon as the leave arrives; the
// timeout is only a ceiling for declaring failure.
select {
case <-leaveChannel:
t.Logf("%s saw %s leave test room %s.", s.ServerName(), userID, room.RoomID)
case <-time.After(1 * time.Second):
t.Errorf("%s timed out waiting for %s to leave test room %s.", s.ServerName(), userID, room.RoomID)
}
}

Comment thread
erikjohnston marked this conversation as resolved.
// isInRoom reports whether this Complement server has a joined user in the room,
// according to its own `ServerRoom` view. The server reliably tracks its own
// users' membership (it created their join/leave events), so this answers "will
// the homeserver federate events in this room to us?".
func (s *server) isInRoom(room *federation.ServerRoom) bool {
for _, serverInRoom := range room.ServersInRoom() {
if serverInRoom == s.ServerName() {
return true
}
}
return false
}

// userIsJoinedTo reports whether the user is currently joined to the room,
// according to the user's own homeserver.
func userIsJoinedTo(t *testing.T, user *client.CSAPI, roomID string) bool {
t.Helper()
res := user.MustDo(t, "GET", []string{"_matrix", "client", "v3", "joined_rooms"})
joinedRooms := gjson.ParseBytes(client.ParseJSON(t, res)).Get("joined_rooms")
for _, joinedRoom := range joinedRooms.Array() {
if joinedRoom.Str == roomID {
return true
}
}
return false
}

// Wait for the server to receive the event with given event ID.
Expand Down Expand Up @@ -2249,7 +2306,7 @@ func TestPartialStateJoin(t *testing.T) {

// @t23alice:hs1 joins the room.
psjResult := beginPartialStateJoin(t, server1, room, alice)
defer server2.WithWaitForLeave(t, server2Room, alice.UserID, func() { psjResult.Destroy(t) })
defer server2.WithWaitForLeave(t, server2Room, alice, func() { psjResult.Destroy(t) })

// Both homeservers should receive device list updates.
renameDevice(t, alice, "A new device name 1")
Expand Down Expand Up @@ -2296,7 +2353,7 @@ func TestPartialStateJoin(t *testing.T) {
)
// NB: We register the `psjResult.Destroy()` cleanup twice. This is alright because it
// is idempotent. Here we wait for server 2 to observe the leave too.
defer server2.WithWaitForLeave(t, server2Room, alice.UserID, func() { psjResult.Destroy(t) })
defer server2.WithWaitForLeave(t, server2Room, alice, func() { psjResult.Destroy(t) })
joinEvent := room.CurrentState("m.room.member", server2.UserID("elsie"))
server1.MustSendTransaction(t, deployment, deployment.GetFullyQualifiedHomeserverName(t, "hs1"), []json.RawMessage{joinEvent.JSON()}, nil)
awaitEventViaSync(t, alice, room.RoomID, joinEvent.EventID(), "")
Expand Down Expand Up @@ -2473,7 +2530,7 @@ func TestPartialStateJoin(t *testing.T) {
syncToken = awaitEventViaSync(t, alice, partialStateRoom.RoomID, leaveEvent.EventID(), syncToken)

leaveSharedRoom = func() {
server2.WithWaitForLeave(t, server2Room, alice.UserID, func() {
server2.WithWaitForLeave(t, server2Room, alice, func() {
alice.MustLeaveRoom(t, roomID)
})
}
Expand Down Expand Up @@ -2533,7 +2590,7 @@ func TestPartialStateJoin(t *testing.T) {
// @t26alice:hs1 joins the room, followed by @elsie:server2.
// @elsie:server2 is kicked with an invalid event.
syncToken, server2Room, psjResult := setupIncorrectlyAcceptedKick(t, deployment, alice, server1, server2, deviceListUpdateChannel1, deviceListUpdateChannel2, room)
defer server2.WithWaitForLeave(t, server2Room, alice.UserID, func() { psjResult.Destroy(t) })
defer server2.WithWaitForLeave(t, server2Room, alice, func() { psjResult.Destroy(t) })

// @t26alice:hs1 sends out a device list update which is missed by @elsie:server2.
// @elsie:server2 must receive missed device list updates once the partial state join finishes.
Expand Down Expand Up @@ -2593,7 +2650,7 @@ func TestPartialStateJoin(t *testing.T) {
federation.WithPartialState(),
)
psjResult := beginPartialStateJoin(t, server1, room, alice)
defer server2.WithWaitForLeave(t, server2Room, alice.UserID, func() { psjResult.Destroy(t) })
defer server2.WithWaitForLeave(t, server2Room, alice, func() { psjResult.Destroy(t) })

// @t28alice:hs1 sends out a device list update which is missed by @elsie:server2.
// @elsie:server2 must receive missed device list updates once the partial state join finishes.
Expand Down Expand Up @@ -2996,7 +3053,7 @@ func TestPartialStateJoin(t *testing.T) {
},
client.SyncJoinedTo(server.UserID("charlie"), otherRoomID),
)
defer server.WithWaitForLeave(t, otherRoom, alice.UserID, func() { alice.MustLeaveRoom(t, otherRoomID) })
defer server.WithWaitForLeave(t, otherRoom, alice, func() { alice.MustLeaveRoom(t, otherRoomID) })

// Depending on the homeserver implementation, @t31alice:hs1 must have been told that either:
// * charlie updated their device list, or
Expand Down Expand Up @@ -4422,7 +4479,7 @@ func (psj *partialStateJoinResult) Destroy(t *testing.T) {
psj.Server.WithWaitForLeave(
t,
psj.ServerRoom,
psj.User.UserID,
psj.User,
func() { psj.User.MustLeaveRoom(t, psj.ServerRoom.RoomID) },
)
}
Expand Down
Loading