Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sliding Sync: Remember memberships already sent down connection when lazy-loading room members on incremental sync #17808

1 change: 1 addition & 0 deletions changelog.d/17805.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix bug with sliding sync where the server would not return state that was added to the `required_state` config.
166 changes: 124 additions & 42 deletions synapse/handlers/sliding_sync/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,8 @@ async def get_room_sync_data(

state_reset_out_of_room = True

prev_room_sync_config = previous_connection_state.room_configs.get(room_id)

# Determine whether we should limit the timeline to the token range.
#
# We should return historical messages (before token range) in the
Expand Down Expand Up @@ -562,7 +564,6 @@ async def get_room_sync_data(

log_kv({"sliding_sync.room_status": room_status})

prev_room_sync_config = previous_connection_state.room_configs.get(room_id)
if prev_room_sync_config is not None:
# Check if the timeline limit has increased, if so ignore the
# timeline bound and record the change (see "XXX: Odd behavior"
Expand Down Expand Up @@ -868,6 +869,14 @@ async def get_room_sync_data(
#
# Calculate the `StateFilter` based on the `required_state` for the room
required_state_filter = StateFilter.none()
# The requested `required_state_map` with the any lazy membership expanded and
# `$ME` replaced with the user's ID. This allows us to see what membership we've
# sent down to the client in the next request.
#
# Make a copy so we can modify it. Still need to be careful to make a copy of
# the state key sets if we want to add/remove from them. We could make a deep
# copy but this saves us some work.
expanded_required_state_map = dict(room_sync_config.required_state_map)
if room_membership_for_user_at_to_token.membership not in (
Membership.INVITE,
Membership.KNOCK,
Expand Down Expand Up @@ -932,22 +941,49 @@ async def get_room_sync_data(
and state_key == StateValues.LAZY
):
lazy_load_room_members = True

# Everyone in the timeline is relevant
#
# FIXME: We probably also care about invite, ban, kick, targets, etc
# but the spec only mentions "senders".
timeline_membership: Set[str] = set()
if timeline_events is not None:
for timeline_event in timeline_events:
timeline_membership.add(timeline_event.sender)

# Add an explicit entry for each user in the timeline
expanded_required_state_map[EventTypes.Member] = (
# Make a copy of the state key set so we can modify it
# without affecting the original `required_state_map`
set(
expanded_required_state_map.get(
EventTypes.Member, set()
)
).union(timeline_membership)
)

# Update the required state filter so we pick up the new
# membership
for user_id in timeline_membership:
required_state_types.append(
(EventTypes.Member, user_id)
)

# FIXME: We probably also care about invite, ban, kick, targets, etc
# but the spec only mentions "senders".
elif state_key == StateValues.ME:
num_others += 1
required_state_types.append((state_type, user.to_string()))
# Replace `$ME` with the user's ID so we can deduplicate
# when someone requests the same state with `$ME` or with
# their user ID.
# without affecting the original `required_state_map`
expanded_required_state_map[EventTypes.Member] = (
# Make a copy of the state key set so we can modify it
# without affecting the original `required_state_map`
set(
expanded_required_state_map.get(
EventTypes.Member, set()
)
).union({user.to_string()})
)
else:
num_others += 1
required_state_types.append((state_type, state_key))
Expand Down Expand Up @@ -1011,9 +1047,9 @@ async def get_room_sync_data(
changed_required_state_map, added_state_filter = (
_required_state_changes(
user.to_string(),
prev_room_sync_config,
room_sync_config,
room_state_delta_id_map,
prev_required_state_map=prev_room_sync_config.required_state_map,
request_required_state_map=expanded_required_state_map,
state_deltas=room_state_delta_id_map,
)
)

Expand Down Expand Up @@ -1106,14 +1142,15 @@ async def get_room_sync_data(
# sensible order again.
bump_stamp = 0

unstable_expanded_timeline = False
prev_room_sync_config = previous_connection_state.room_configs.get(room_id)
# Record the `room_sync_config` if we're `ignore_timeline_bound` (which means
# that the `timeline_limit` has increased)
room_sync_required_state_map_to_persist = room_sync_config.required_state_map
room_sync_required_state_map_to_persist: Mapping[str, AbstractSet[str]] = (
expanded_required_state_map
)
if changed_required_state_map:
room_sync_required_state_map_to_persist = changed_required_state_map

# Record the `room_sync_config` if we're `ignore_timeline_bound` (which means
# that the `timeline_limit` has increased)
unstable_expanded_timeline = False
if ignore_timeline_bound:
# FIXME: We signal the fact that we're sending down more events to
# the client by setting `unstable_expanded_timeline` to true (see
Expand Down Expand Up @@ -1161,7 +1198,10 @@ async def get_room_sync_data(
)

else:
new_connection_state.room_configs[room_id] = room_sync_config
new_connection_state.room_configs[room_id] = RoomSyncConfig(
timeline_limit=room_sync_config.timeline_limit,
required_state_map=room_sync_required_state_map_to_persist,
)

set_tag(SynapseTags.RESULT_PREFIX + "initial", initial)

Expand Down Expand Up @@ -1281,8 +1321,9 @@ async def _get_bump_stamp(

def _required_state_changes(
user_id: str,
previous_room_config: "RoomSyncConfig",
room_sync_config: RoomSyncConfig,
*,
prev_required_state_map: Mapping[str, AbstractSet[str]],
request_required_state_map: Mapping[str, AbstractSet[str]],
state_deltas: StateMap[str],
) -> Tuple[Optional[Mapping[str, AbstractSet[str]]], StateFilter]:
"""Calculates the changes between the required state room config from the
Expand All @@ -1299,20 +1340,25 @@ def _required_state_changes(
only want to re-send that entry down sync if it has changed.

Returns:
A 2-tuple of updated required state config and the state filter to use
to fetch extra current state that we need to return.
A 2-tuple of updated required state config (or None if there is no update)
and the state filter to use to fetch extra current state that we need to
return.
"""

prev_required_state_map = previous_room_config.required_state_map
request_required_state_map = room_sync_config.required_state_map

if prev_required_state_map == request_required_state_map:
# There has been no change. Return immediately.
return None, StateFilter.none()

prev_wildcard = prev_required_state_map.get(StateValues.WILDCARD, set())
request_wildcard = request_required_state_map.get(StateValues.WILDCARD, set())

# If we were previously fetching everything ("*", "*"), always update the effective
# room required state config to match the request. And since we we're previously
# already fetching everything, we don't have to fetch anything now that they've
# narrowed.
if StateValues.WILDCARD in prev_wildcard:
return request_required_state_map, StateFilter.none()

# If a event type wildcard has been added or removed we don't try and do
# anything fancy, and instead always update the effective room required
# state config to match the request.
Expand All @@ -1331,12 +1377,19 @@ def _required_state_changes(
# client. Passed to `StateFilter.from_types(...)`
added: List[Tuple[str, Optional[str]]] = []

# Convert the list of state deltas to map from type to state_keys that have
# changed.
changed_types_to_state_keys: Dict[str, Set[str]] = {}
for event_type, state_key in state_deltas:
changed_types_to_state_keys.setdefault(event_type, set()).add(state_key)

# First we calculate what, if anything, has been *added*.
for event_type in (
prev_required_state_map.keys() | request_required_state_map.keys()
):
old_state_keys = prev_required_state_map.get(event_type, set())
request_state_keys = request_required_state_map.get(event_type, set())
changed_state_keys = changed_types_to_state_keys.get(event_type, set())

if old_state_keys == request_state_keys:
# No change to this type
Expand All @@ -1346,8 +1399,26 @@ def _required_state_changes(
# Nothing *added*, so we skip. Removals happen below.
continue

# We only remove state keys from the effective state if they've been
# removed from the request *and* the state has changed. This ensures
# that if a client removes and then re-adds a state key, we only send
# down the associated current state event if its changed (rather than
# sending down the same event twice).
invalidated_state_keys = (
old_state_keys - request_state_keys
) & changed_state_keys

# Always update changes to include the newly added keys
changes[event_type] = request_state_keys
changes[event_type] = request_state_keys | (
# Wildcard and lazy state keys are not sticky from previous requests
(old_state_keys - {StateValues.WILDCARD, StateValues.LAZY})
- invalidated_state_keys
)

if StateValues.WILDCARD in old_state_keys:
# We were previously fetching everything for this type, so we don't need to
# fetch anything new.
continue

# Record the new state keys to fetch for this type.
if StateValues.WILDCARD in request_state_keys:
Expand All @@ -1358,20 +1429,17 @@ def _required_state_changes(
if state_key == StateValues.ME:
added.append((event_type, user_id))
elif state_key == StateValues.LAZY:
# We handle lazy loading separately, so don't need to
# explicitly add anything here.
# We handle lazy loading separately (outside this function),
# so don't need to explicitly add anything here.
#
# LAZY values should also be ignore for event types that are
# not membership.
pass
else:
added.append((event_type, state_key))

added_state_filter = StateFilter.from_types(added)

# Convert the list of state deltas to map from type to state_keys that have
# changed.
changed_types_to_state_keys: Dict[str, Set[str]] = {}
for event_type, state_key in state_deltas:
changed_types_to_state_keys.setdefault(event_type, set()).add(state_key)

# Figure out what changes we need to apply to the effective required state
# config.
for event_type, changed_state_keys in changed_types_to_state_keys.items():
Expand All @@ -1382,6 +1450,15 @@ def _required_state_changes(
# No change.
continue

# We only remove state keys from the effective state if they've been
# removed from the request *and* the state has changed. This ensures
# that if a client removes and then re-adds a state key, we only send
# down the associated current state event if its changed (rather than
# sending down the same event twice).
invalidated_state_keys = (
old_state_keys - request_state_keys
) & changed_state_keys

if request_state_keys - old_state_keys:
# We've expanded the set of state keys, so we just clobber the
# current set with the new set.
Expand All @@ -1390,38 +1467,43 @@ def _required_state_changes(
# changed, but are no longer in the requested required state, but
# that's a sufficient edge case that we can ignore (as its only a
# performance optimization).
changes[event_type] = request_state_keys
changes[event_type] = request_state_keys | (
# Wildcard and lazy state keys are not sticky from previous requests
(old_state_keys - {StateValues.WILDCARD, StateValues.LAZY})
- invalidated_state_keys
)
continue

old_state_key_wildcard = StateValues.WILDCARD in old_state_keys
request_state_key_wildcard = StateValues.WILDCARD in request_state_keys

if old_state_key_wildcard != request_state_key_wildcard:
# If a wildcard has been added or removed we always update the
# required state when any state with the same type has changed.
# If a state_key wildcard has been added or removed, we always update the
# effective room required state config to match the request.
changes[event_type] = request_state_keys
continue

old_state_key_lazy = StateValues.LAZY in old_state_keys
request_state_key_lazy = StateValues.LAZY in request_state_keys
if event_type == EventTypes.Member:
old_state_key_lazy = StateValues.LAZY in old_state_keys
request_state_key_lazy = StateValues.LAZY in request_state_keys

if old_state_key_lazy != request_state_key_lazy:
# If a "$LAZY" has been added or removed we always update the
# required state for simplicity.
changes[event_type] = request_state_keys
continue
if old_state_key_lazy != request_state_key_lazy:
# If a "$LAZY" has been added or removed we always update the effective room
# required state config to match the request.
changes[event_type] = request_state_keys
continue

# Handle "$ME" values by replacing state keys that match the user ID.
# Handle "$ME" values by adding "$ME" if the state key matches the user
# ID.
if user_id in changed_state_keys:
changed_state_keys.discard(user_id)
changed_state_keys.add(StateValues.ME)

# At this point there are no wildcards and no additions to the set of
# state keys requested, only deletions.
#
# We only remove state keys from the effective state if they've been
# removed from the request *and* the state has changed. This ensures
# that if a client removes and then readds a state key, we only send
# that if a client removes and then re-adds a state key, we only send
# down the associated current state event if its changed (rather than
# sending down the same event twice).
invalidated = (old_state_keys - request_state_keys) & changed_state_keys
Expand Down
Loading