Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix federation stall on concurrent access errors #9639

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9639.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix bug where federation sending can stall due to `concurrent access` database exceptions when it falls behind.
45 changes: 9 additions & 36 deletions synapse/storage/databases/main/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.types import JsonDict
from synapse.util.caches.expiringcache import ExpiringCache

Expand Down Expand Up @@ -312,49 +311,23 @@ async def store_destination_rooms_entries(
stream_ordering: the stream_ordering of the event
"""

return await self.db_pool.runInteraction(
"store_destination_rooms_entries",
self._store_destination_rooms_entries_txn,
destinations,
room_id,
stream_ordering,
await self.db_pool.simple_upsert_many(
table="destinations",
key_names=("destination",),
key_values=[(d,) for d in destinations],
value_names=[],
value_values=[],
desc="store_destination_rooms_entries_dests",
)

def _store_destination_rooms_entries_txn(
self,
txn: LoggingTransaction,
destinations: Iterable[str],
room_id: str,
stream_ordering: int,
) -> None:

# ensure we have a `destinations` row for this destination, as there is
# a foreign key constraint.
if isinstance(self.database_engine, PostgresEngine):
q = """
INSERT INTO destinations (destination)
VALUES (?)
ON CONFLICT DO NOTHING;
"""
elif isinstance(self.database_engine, Sqlite3Engine):
q = """
INSERT OR IGNORE INTO destinations (destination)
VALUES (?);
"""
else:
raise RuntimeError("Unknown database engine")

txn.execute_batch(q, ((destination,) for destination in destinations))

rows = [(destination, room_id) for destination in destinations]

self.db_pool.simple_upsert_many_txn(
txn,
await self.db_pool.simple_upsert_many(
table="destination_rooms",
key_names=("destination", "room_id"),
key_values=rows,
value_names=["stream_ordering"],
value_values=[(stream_ordering,)] * len(rows),
desc="store_destination_rooms_entries_rooms",
)

async def get_destination_last_successful_stream_ordering(
Expand Down