From ff32a57726c78fdac9eb037ca1e3683a8b2b0f7b Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 22 Sep 2021 00:03:51 -0500 Subject: [PATCH 1/7] Ensure (room_id, next_batch_id) is unique to avoid cross-talk/conflicts between chunks Part of [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) Part of https://github.com/matrix-org/synapse/issues/10737 --- synapse/storage/databases/main/events.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 584f818ff361..b30ec9156f97 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1795,6 +1795,25 @@ def _handle_insertion_event(self, txn: LoggingTransaction, event: EventBase): # Invalid insertion event without next batch ID return + conflicting_insertion_event_id = self.db_pool.simple_select_one_onecol_txn( + txn, + table="insertion_events", + keyvalues={ + "room_id": event.room_id, + "next_batch_id": next_batch_id, + }, + retcol="event_id", + desc="get_device_list_last_stream_id_for_remote", + allow_none=True, + ) + if conflicting_insertion_event_id is not None: + # The new insertion event is invalid because there already exists + # and insertion event in the room with the same next_batch_id. We + # can't allow multiple because the batch pointing will get weird, + # e.g. we can't determine which insertion event the batch event is + # pointing to. + return + logger.debug( "_handle_insertion_event (next_batch_id=%s) %s", next_batch_id, event ) From 1f3946e858c3c5c75be098a2056215f9292dc367 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 22 Sep 2021 00:32:39 -0500 Subject: [PATCH 2/7] Add changelog --- changelog.d/10877.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/10877.feature diff --git a/changelog.d/10877.feature b/changelog.d/10877.feature new file mode 100644 index 000000000000..75c3e60b7997 --- /dev/null +++ b/changelog.d/10877.feature @@ -0,0 +1 @@ +Ensure `(room_id, next_batch_id)` is unique across [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) insertion events in rooms to avoid cross-talk/conflicts between batches From fd32072e34d8a66cc0b3510c71e8c20dff53524d Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 22 Sep 2021 00:45:00 -0500 Subject: [PATCH 3/7] Fix lint --- synapse/storage/databases/main/events.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index b30ec9156f97..f6ed62e32fca 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1803,7 +1803,6 @@ def _handle_insertion_event(self, txn: LoggingTransaction, event: EventBase): "next_batch_id": next_batch_id, }, retcol="event_id", - desc="get_device_list_last_stream_id_for_remote", allow_none=True, ) if conflicting_insertion_event_id is not None: From 8daa576d4752ba522f21ae27a4e53e5a6a725747 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 22 Sep 2021 00:45:36 -0500 Subject: [PATCH 4/7] Add punctuation to changelog --- changelog.d/10877.feature | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/10877.feature b/changelog.d/10877.feature index 75c3e60b7997..06a246c108a7 100644 --- a/changelog.d/10877.feature +++ b/changelog.d/10877.feature @@ -1 +1 @@ -Ensure `(room_id, next_batch_id)` is unique across [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) insertion events in rooms to avoid cross-talk/conflicts between batches +Ensure `(room_id, next_batch_id)` is unique across [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) insertion events in rooms to avoid cross-talk/conflicts between batches. From 21200f8f8a117131cf96998eb57832377f9792be Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 23 Sep 2021 23:34:20 -0500 Subject: [PATCH 5/7] Stop people from sending duplicate insertion events --- synapse/handlers/message.py | 22 +++++++++++++++++--- synapse/rest/client/room_batch.py | 4 ++-- synapse/storage/databases/main/events.py | 18 ---------------- synapse/storage/databases/main/room_batch.py | 7 +++++-- 4 files changed, 26 insertions(+), 25 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 6cd694b2da35..f14ad7b86363 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -16,6 +16,7 @@ # limitations under the License. import logging import random +from http import HTTPStatus from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple from canonicaljson import encode_canonical_json @@ -1389,6 +1390,9 @@ async def persist_and_notify_client_event( self.room_prejoin_state_types, ) + room_version = await self.store.get_room_version_id(event.room_id) + room_version_obj = KNOWN_ROOM_VERSIONS[room_version] + if event.type == EventTypes.Redaction: original_event = await self.store.get_event( event.redacts, @@ -1398,9 +1402,6 @@ async def persist_and_notify_client_event( allow_none=True, ) - room_version = await self.store.get_room_version_id(event.room_id) - room_version_obj = KNOWN_ROOM_VERSIONS[room_version] - # we can make some additional checks now if we have the original event. if original_event: if original_event.type == EventTypes.Create: @@ -1461,6 +1462,21 @@ async def persist_and_notify_client_event( if prev_state_ids: raise AuthError(403, "Changing the room create event is forbidden") + if room_version_obj.msc2716_historical and event.type == EventTypes.MSC2716_INSERTION: + next_batch_id = event.content.get(EventContentFields.MSC2716_NEXT_BATCH_ID) + conflicting_insertion_event_id = await self.store.get_insertion_event_by_batch_id(event.room_id, next_batch_id) + if conflicting_insertion_event_id is not None: + # The current insertion event that we're processing is invalid + # because an insertion event already exists in the room with the + # same next_batch_id. We can't allow multiple because the batch + # pointing will get weird, e.g. we can't determine which insertion + # event the batch event is pointing to. + raise SynapseError( + HTTPStatus.BAD_REQUEST, + "Another insertion event already exists with the same next_batch_id", + errcode=Codes.INVALID_PARAM, + ) + # Mark any `m.historical` messages as backfilled so they don't appear # in `/sync` and have the proper decrementing `stream_ordering` as we import backfilled = False diff --git a/synapse/rest/client/room_batch.py b/synapse/rest/client/room_batch.py index bf14ec384ea3..787fb2b0a998 100644 --- a/synapse/rest/client/room_batch.py +++ b/synapse/rest/client/room_batch.py @@ -306,11 +306,11 @@ async def on_POST( # Verify the batch_id_from_query corresponds to an actual insertion event # and have the batch connected. corresponding_insertion_event_id = ( - await self.store.get_insertion_event_by_batch_id(batch_id_from_query) + await self.store.get_insertion_event_by_batch_id(room_id, batch_id_from_query) ) if corresponding_insertion_event_id is None: raise SynapseError( - 400, + HTTPStatus.BAD_REQUEST, "No insertion event corresponds to the given ?batch_id", errcode=Codes.INVALID_PARAM, ) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index f6ed62e32fca..584f818ff361 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1795,24 +1795,6 @@ def _handle_insertion_event(self, txn: LoggingTransaction, event: EventBase): # Invalid insertion event without next batch ID return - conflicting_insertion_event_id = self.db_pool.simple_select_one_onecol_txn( - txn, - table="insertion_events", - keyvalues={ - "room_id": event.room_id, - "next_batch_id": next_batch_id, - }, - retcol="event_id", - allow_none=True, - ) - if conflicting_insertion_event_id is not None: - # The new insertion event is invalid because there already exists - # and insertion event in the room with the same next_batch_id. We - # can't allow multiple because the batch pointing will get weird, - # e.g. we can't determine which insertion event the batch event is - # pointing to. - return - logger.debug( "_handle_insertion_event (next_batch_id=%s) %s", next_batch_id, event ) diff --git a/synapse/storage/databases/main/room_batch.py b/synapse/storage/databases/main/room_batch.py index a383388757aa..c92a467dd0a6 100644 --- a/synapse/storage/databases/main/room_batch.py +++ b/synapse/storage/databases/main/room_batch.py @@ -18,7 +18,7 @@ class RoomBatchStore(SQLBaseStore): - async def get_insertion_event_by_batch_id(self, batch_id: str) -> Optional[str]: + async def get_insertion_event_by_batch_id(self, room_id: str, batch_id: str) -> Optional[str]: """Retrieve a insertion event ID. Args: @@ -30,7 +30,10 @@ async def get_insertion_event_by_batch_id(self, batch_id: str) -> Optional[str]: """ return await self.db_pool.simple_select_one_onecol( table="insertion_events", - keyvalues={"next_batch_id": batch_id}, + keyvalues={ + "room_id": room_id, + "next_batch_id": batch_id + }, retcol="event_id", allow_none=True, ) From b22c9fafc42ae065808f0fd592630cd27c41904e Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 23 Sep 2021 23:54:22 -0500 Subject: [PATCH 6/7] Also check insertion events from room creators in existing room versions --- synapse/handlers/message.py | 52 +++++++++++++++++++++++++------------ 1 file changed, 36 insertions(+), 16 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index f14ad7b86363..746723809ee7 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1390,9 +1390,6 @@ async def persist_and_notify_client_event( self.room_prejoin_state_types, ) - room_version = await self.store.get_room_version_id(event.room_id) - room_version_obj = KNOWN_ROOM_VERSIONS[room_version] - if event.type == EventTypes.Redaction: original_event = await self.store.get_event( event.redacts, @@ -1402,6 +1399,9 @@ async def persist_and_notify_client_event( allow_none=True, ) + room_version = await self.store.get_room_version_id(event.room_id) + room_version_obj = KNOWN_ROOM_VERSIONS[room_version] + # we can make some additional checks now if we have the original event. if original_event: if original_event.type == EventTypes.Create: @@ -1462,20 +1462,40 @@ async def persist_and_notify_client_event( if prev_state_ids: raise AuthError(403, "Changing the room create event is forbidden") - if room_version_obj.msc2716_historical and event.type == EventTypes.MSC2716_INSERTION: - next_batch_id = event.content.get(EventContentFields.MSC2716_NEXT_BATCH_ID) - conflicting_insertion_event_id = await self.store.get_insertion_event_by_batch_id(event.room_id, next_batch_id) - if conflicting_insertion_event_id is not None: - # The current insertion event that we're processing is invalid - # because an insertion event already exists in the room with the - # same next_batch_id. We can't allow multiple because the batch - # pointing will get weird, e.g. we can't determine which insertion - # event the batch event is pointing to. - raise SynapseError( - HTTPStatus.BAD_REQUEST, - "Another insertion event already exists with the same next_batch_id", - errcode=Codes.INVALID_PARAM, + if event.type == EventTypes.MSC2716_INSERTION: + room_version = await self.store.get_room_version_id(event.room_id) + room_version_obj = KNOWN_ROOM_VERSIONS[room_version] + + create_event = await self.store.get_create_event_for_room( + event.room_id + ) + room_creator = create_event.content.get(EventContentFields.ROOM_CREATOR) + + # Only check an insertion event if the room version + # supports it or the event is from the room creator. + if room_version_obj.msc2716_historical or ( + self._config.experimental.msc2716_enabled + and event.sender == room_creator + ): + next_batch_id = event.content.get( + EventContentFields.MSC2716_NEXT_BATCH_ID ) + conflicting_insertion_event_id = ( + await self.store.get_insertion_event_by_batch_id( + event.room_id, next_batch_id + ) + ) + if conflicting_insertion_event_id is not None: + # The current insertion event that we're processing is invalid + # because an insertion event already exists in the room with the + # same next_batch_id. We can't allow multiple because the batch + # pointing will get weird, e.g. we can't determine which insertion + # event the batch event is pointing to. + raise SynapseError( + HTTPStatus.BAD_REQUEST, + "Another insertion event already exists with the same next_batch_id", + errcode=Codes.INVALID_PARAM, + ) # Mark any `m.historical` messages as backfilled so they don't appear # in `/sync` and have the proper decrementing `stream_ordering` as we import From ffa31e0e847c5dcd3925b6700c5eee8766b0a807 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 24 Sep 2021 00:00:37 -0500 Subject: [PATCH 7/7] Fix lint --- synapse/handlers/message.py | 6 ++---- synapse/rest/client/room_batch.py | 4 +++- synapse/storage/databases/main/room_batch.py | 9 ++++----- 3 files changed, 9 insertions(+), 10 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 746723809ee7..1f507684de73 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1466,15 +1466,13 @@ async def persist_and_notify_client_event( room_version = await self.store.get_room_version_id(event.room_id) room_version_obj = KNOWN_ROOM_VERSIONS[room_version] - create_event = await self.store.get_create_event_for_room( - event.room_id - ) + create_event = await self.store.get_create_event_for_room(event.room_id) room_creator = create_event.content.get(EventContentFields.ROOM_CREATOR) # Only check an insertion event if the room version # supports it or the event is from the room creator. if room_version_obj.msc2716_historical or ( - self._config.experimental.msc2716_enabled + self.config.experimental.msc2716_enabled and event.sender == room_creator ): next_batch_id = event.content.get( diff --git a/synapse/rest/client/room_batch.py b/synapse/rest/client/room_batch.py index 787fb2b0a998..1dffcc314793 100644 --- a/synapse/rest/client/room_batch.py +++ b/synapse/rest/client/room_batch.py @@ -306,7 +306,9 @@ async def on_POST( # Verify the batch_id_from_query corresponds to an actual insertion event # and have the batch connected. corresponding_insertion_event_id = ( - await self.store.get_insertion_event_by_batch_id(room_id, batch_id_from_query) + await self.store.get_insertion_event_by_batch_id( + room_id, batch_id_from_query + ) ) if corresponding_insertion_event_id is None: raise SynapseError( diff --git a/synapse/storage/databases/main/room_batch.py b/synapse/storage/databases/main/room_batch.py index c92a467dd0a6..300a563c9e09 100644 --- a/synapse/storage/databases/main/room_batch.py +++ b/synapse/storage/databases/main/room_batch.py @@ -18,7 +18,9 @@ class RoomBatchStore(SQLBaseStore): - async def get_insertion_event_by_batch_id(self, room_id: str, batch_id: str) -> Optional[str]: + async def get_insertion_event_by_batch_id( + self, room_id: str, batch_id: str + ) -> Optional[str]: """Retrieve a insertion event ID. Args: @@ -30,10 +32,7 @@ async def get_insertion_event_by_batch_id(self, room_id: str, batch_id: str) -> """ return await self.db_pool.simple_select_one_onecol( table="insertion_events", - keyvalues={ - "room_id": room_id, - "next_batch_id": batch_id - }, + keyvalues={"room_id": room_id, "next_batch_id": batch_id}, retcol="event_id", allow_none=True, )