Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Add background job to populate receipts event_stream_ordering
Browse files Browse the repository at this point in the history
  • Loading branch information
Fizzadar committed Sep 9, 2022
1 parent 1b3a2cb commit c0da6fa
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 1 deletion.
71 changes: 70 additions & 1 deletion synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -838,5 +838,74 @@ def _insert_graph_receipt_txn(
)


class ReceiptsStore(ReceiptsWorkerStore):
class ReceiptsBackgroundUpdateStore(SQLBaseStore):
POPULATE_RECEIPT_EVENT_STREAM_ORDERING = "populate_event_stream_ordering"

def __init__(
self,
database: DatabasePool,
db_conn: LoggingDatabaseConnection,
hs: "HomeServer",
):
super().__init__(database, db_conn, hs)

self.db_pool.updates.register_background_update_handler(
self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING,
self._populate_receipt_event_stream_ordering,
)

async def _populate_receipt_event_stream_ordering(
self, progress: JsonDict, batch_size: int
) -> int:
def _populate_receipt_event_stream_ordering_txn(txn: LoggingTransaction) -> bool:

if "max_stream_id" in progress:
max_stream_id = progress["max_stream_id"]
else:
txn.execute("SELECT max(stream_id) FROM receipts_linearized")
res = txn.fetchone()
if res is None or res[0] is None:
return True
else:
max_stream_id = res[0]

start = progress.get("stream_id", 0)
stop = start + batch_size

sql = """
UPDATE receipts_linearized
SET event_stream_ordering = (
SELECT stream_ordering
FROM events
WHERE event_id = receipts_linearized.event_id
)
WHERE stream_id >= ? AND stream_id < ?
"""
txn.execute(sql, (start, stop))

self.db_pool.updates._background_update_progress_txn(
txn,
self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING,
{
"stream_id": stop,
"max_stream_id": max_stream_id,
},
)

return stop > max_stream_id

finished = await self.db_pool.runInteraction(
"_remove_devices_from_device_inbox_txn",
_populate_receipt_event_stream_ordering_txn,
)

if finished:
await self.db_pool.updates._end_background_update(
self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING
)

return batch_size


class ReceiptsStore(ReceiptsWorkerStore, ReceiptsBackgroundUpdateStore):
pass
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,6 @@
*/

ALTER TABLE receipts_linearized ADD COLUMN event_stream_ordering integer;

INSERT INTO background_updates (update_name, progress_json) VALUES
('populate_event_stream_ordering', '{}');

0 comments on commit c0da6fa

Please sign in to comment.