diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 6c655618148a..3cd066110a2a 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -838,5 +838,74 @@ def _insert_graph_receipt_txn( ) -class ReceiptsStore(ReceiptsWorkerStore): +class ReceiptsBackgroundUpdateStore(SQLBaseStore): + POPULATE_RECEIPT_EVENT_STREAM_ORDERING = "populate_event_stream_ordering" + + def __init__( + self, + database: DatabasePool, + db_conn: LoggingDatabaseConnection, + hs: "HomeServer", + ): + super().__init__(database, db_conn, hs) + + self.db_pool.updates.register_background_update_handler( + self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING, + self._populate_receipt_event_stream_ordering, + ) + + async def _populate_receipt_event_stream_ordering( + self, progress: JsonDict, batch_size: int + ) -> int: + def _populate_receipt_event_stream_ordering_txn(txn: LoggingTransaction) -> bool: + + if "max_stream_id" in progress: + max_stream_id = progress["max_stream_id"] + else: + txn.execute("SELECT max(stream_id) FROM receipts_linearized") + res = txn.fetchone() + if res is None or res[0] is None: + return True + else: + max_stream_id = res[0] + + start = progress.get("stream_id", 0) + stop = start + batch_size + + sql = """ + UPDATE receipts_linearized + SET event_stream_ordering = ( + SELECT stream_ordering + FROM events + WHERE event_id = receipts_linearized.event_id + ) + WHERE stream_id >= ? AND stream_id < ? + """ + txn.execute(sql, (start, stop)) + + self.db_pool.updates._background_update_progress_txn( + txn, + self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING, + { + "stream_id": stop, + "max_stream_id": max_stream_id, + }, + ) + + return stop > max_stream_id + + finished = await self.db_pool.runInteraction( + "_remove_devices_from_device_inbox_txn", + _populate_receipt_event_stream_ordering_txn, + ) + + if finished: + await self.db_pool.updates._end_background_update( + self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING + ) + + return batch_size + + +class ReceiptsStore(ReceiptsWorkerStore, ReceiptsBackgroundUpdateStore): pass diff --git a/synapse/storage/schema/main/delta/72/05receipts_event_stream_ordering.sql b/synapse/storage/schema/main/delta/72/05receipts_event_stream_ordering.sql index 7911250c2c26..e40a9fa4a4c9 100644 --- a/synapse/storage/schema/main/delta/72/05receipts_event_stream_ordering.sql +++ b/synapse/storage/schema/main/delta/72/05receipts_event_stream_ordering.sql @@ -14,3 +14,6 @@ */ ALTER TABLE receipts_linearized ADD COLUMN event_stream_ordering integer; + +INSERT INTO background_updates (update_name, progress_json) VALUES + ('populate_event_stream_ordering', '{}');