diff --git a/changelog.d/17732.bugfix b/changelog.d/17732.bugfix new file mode 100644 index 0000000000..572c13fc57 --- /dev/null +++ b/changelog.d/17732.bugfix @@ -0,0 +1 @@ +Fix membership caches not updating in state reset scenarios. diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index e14d711c76..7251e72e3a 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -86,7 +86,9 @@ def process_replication_position( # noqa: B027 (no-op by design) """ def _invalidate_state_caches( - self, room_id: str, members_changed: Collection[str] + self, + room_id: str, + members_changed: Collection[str], ) -> None: """Invalidates caches that are based on the current state, but does not stream invalidations down replication. diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 32c3472e58..0ec33f7818 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -219,6 +219,11 @@ def process_replication_rows( room_id = row.keys[0] members_changed = set(row.keys[1:]) self._invalidate_state_caches(room_id, members_changed) + self._curr_state_delta_stream_cache.entity_has_changed( # type: ignore[attr-defined] + room_id, token + ) + for user_id in members_changed: + self._membership_stream_cache.entity_has_changed(user_id, token) # type: ignore[attr-defined] elif row.cache_func == PURGE_HISTORY_CACHE_NAME: if row.keys is None: raise Exception( @@ -236,6 +241,10 @@ def process_replication_rows( room_id = row.keys[0] self._invalidate_caches_for_room_events(room_id) self._invalidate_caches_for_room(room_id) + self._curr_state_delta_stream_cache.entity_has_changed( # type: ignore[attr-defined] + room_id, token + ) + self._membership_stream_cache.all_entities_changed(token) # type: ignore[attr-defined] else: self._attempt_to_invalidate_cache(row.cache_func, row.keys) @@ -275,6 +284,7 @@ def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None: self._attempt_to_invalidate_cache( "get_sliding_sync_rooms_for_user", None ) + self._membership_stream_cache.entity_has_changed(data.state_key, token) # type: ignore[attr-defined] elif data.type == EventTypes.RoomEncryption: self._attempt_to_invalidate_cache( "get_room_encryption", (data.room_id,) @@ -291,6 +301,7 @@ def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None: # Similar to the above, but the entire caches are invalidated. This is # unfortunate for the membership caches, but should recover quickly. self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) # type: ignore[attr-defined] + self._membership_stream_cache.all_entities_changed(token) # type: ignore[attr-defined] self._attempt_to_invalidate_cache("get_rooms_for_user", None) self._attempt_to_invalidate_cache("get_room_type", (data.room_id,)) self._attempt_to_invalidate_cache("get_room_encryption", (data.room_id,)) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index c0b7d8107d..1733ef98de 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1605,7 +1605,13 @@ def _update_current_state_txn( room_id delta_state: Deltas that are going to be used to update the `current_state_events` table. Changes to the current state of the room. - stream_id: TODO + stream_id: This is expected to be the minimum `stream_ordering` for the + batch of events that we are persisting; which means we do not end up in a + situation where workers see events before the `current_state_delta` updates. + FIXME: However, this function also gets called with next upcoming + `stream_ordering` when we re-sync the state of a partial stated room (see + `update_current_state(...)`) which may be "correct" but it would be good to + nail down what exactly is the expected value here. sliding_sync_table_changes: Changes to the `sliding_sync_membership_snapshots` and `sliding_sync_joined_rooms` tables derived from the given `delta_state` (see @@ -1904,6 +1910,13 @@ def _update_current_state_txn( stream_id, ) + for user_id in members_to_cache_bust: + txn.call_after( + self.store._membership_stream_cache.entity_has_changed, + user_id, + stream_id, + ) + # Invalidate the various caches self.store._invalidate_state_caches_and_stream( txn, room_id, members_to_cache_bust diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 03503abe0f..522cba3825 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -314,6 +314,17 @@ def entity_has_changed(self, entity: EntityType, stream_pos: int) -> None: self._entity_to_key[entity] = stream_pos self._evict() + def all_entities_changed(self, stream_pos: int) -> None: + """ + Mark all entities as changed. This is useful when the cache is invalidated and + there may be some potential change for all of the entities. + """ + # All entities are at the same stream position now. + self._cache = SortedDict({stream_pos: set(self._entity_to_key.keys())}) + self._entity_to_key = { + entity: stream_pos for entity in self._entity_to_key.keys() + } + def _evict(self) -> None: """ Ensure the cache has not exceeded the maximum size. diff --git a/tests/rest/client/sliding_sync/test_sliding_sync.py b/tests/rest/client/sliding_sync/test_sliding_sync.py index ea3ca57957..53f88dbabc 100644 --- a/tests/rest/client/sliding_sync/test_sliding_sync.py +++ b/tests/rest/client/sliding_sync/test_sliding_sync.py @@ -1058,12 +1058,6 @@ def test_state_reset_room_comes_down_incremental_sync(self) -> None: self.persistence.persist_event(join_rule_event, join_rule_context) ) - # FIXME: We're manually busting the cache since - # https://github.com/element-hq/synapse/issues/17368 is not solved yet - self.store._membership_stream_cache.entity_has_changed( - user1_id, join_rule_event_pos.stream - ) - # Ensure that the state reset worked and only user2 is in the room now users_in_room = self.get_success(self.store.get_users_in_room(room_id1)) self.assertIncludes(set(users_in_room), {user2_id}, exact=True) @@ -1211,12 +1205,6 @@ def test_state_reset_previously_room_comes_down_incremental_sync_with_filters( self.persistence.persist_event(join_rule_event, join_rule_context) ) - # FIXME: We're manually busting the cache since - # https://github.com/element-hq/synapse/issues/17368 is not solved yet - self.store._membership_stream_cache.entity_has_changed( - user1_id, join_rule_event_pos.stream - ) - # Ensure that the state reset worked and only user2 is in the room now users_in_room = self.get_success(self.store.get_users_in_room(space_room_id)) self.assertIncludes(set(users_in_room), {user2_id}, exact=True) @@ -1395,12 +1383,6 @@ def test_state_reset_never_room_incremental_sync_with_filters( self.persistence.persist_event(join_rule_event, join_rule_context) ) - # FIXME: We're manually busting the cache since - # https://github.com/element-hq/synapse/issues/17368 is not solved yet - self.store._membership_stream_cache.entity_has_changed( - user1_id, join_rule_event_pos.stream - ) - # Ensure that the state reset worked and only user2 is in the room now users_in_room = self.get_success(self.store.get_users_in_room(space_room_id)) self.assertIncludes(set(users_in_room), {user2_id}, exact=True) diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py index ed5f286243..c69dbd575e 100644 --- a/tests/storage/test_stream.py +++ b/tests/storage/test_stream.py @@ -1209,12 +1209,6 @@ def test_state_reset2(self) -> None: self.persistence.persist_event(join_rule_event, join_rule_context) ) - # FIXME: We're manually busting the cache since - # https://github.com/element-hq/synapse/issues/17368 is not solved yet - self.store._membership_stream_cache.entity_has_changed( - user1_id, join_rule_event_pos.stream - ) - after_reset_token = self.event_sources.get_current_token() membership_changes = self.get_success( diff --git a/tests/util/test_stream_change_cache.py b/tests/util/test_stream_change_cache.py index c41f5706af..6d5405cd70 100644 --- a/tests/util/test_stream_change_cache.py +++ b/tests/util/test_stream_change_cache.py @@ -255,3 +255,19 @@ def test_max_pos(self) -> None: # Unknown entities will return None self.assertEqual(cache.get_max_pos_of_last_change("not@here.website"), None) + + def test_all_entities_changed(self) -> None: + """ + `StreamChangeCache.all_entities_changed(...)` will mark all entites as changed. + """ + cache = StreamChangeCache("#test", 1, max_size=10) + + cache.entity_has_changed("user@foo.com", 2) + cache.entity_has_changed("bar@baz.net", 3) + cache.entity_has_changed("user@elsewhere.org", 4) + + cache.all_entities_changed(5) + + self.assertEqual(cache.get_max_pos_of_last_change("user@foo.com"), 5) + self.assertEqual(cache.get_max_pos_of_last_change("bar@baz.net"), 5) + self.assertEqual(cache.get_max_pos_of_last_change("user@elsewhere.org"), 5)