Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Improve performance of remove_{hidden,deleted}_devices_from_device_inbox #11421

Merged
merged 19 commits into from
Nov 25, 2021
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11421.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve performance of various background database schema updates.
199 changes: 83 additions & 116 deletions synapse/storage/databases/main/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -650,69 +650,16 @@ async def _remove_deleted_devices_from_device_inbox(
Returns:
The number of deleted rows
"""

def _remove_deleted_devices_from_device_inbox_txn(
txn: LoggingTransaction,
) -> int:
"""stream_id is not unique
we need to use an inclusive `stream_id >= ?` clause,
since we might not have deleted all dead device messages for the stream_id
returned from the previous query

Then delete only rows matching the `(user_id, device_id, stream_id)` tuple,
to avoid problems of deleting a large number of rows all at once
due to a single device having lots of device messages.
"""

last_stream_id = progress.get("stream_id", 0)

sql = """
SELECT device_id, user_id, stream_id
FROM device_inbox
WHERE
stream_id >= ?
AND (device_id, user_id) NOT IN (
SELECT device_id, user_id FROM devices
)
ORDER BY stream_id
LIMIT ?
"""

txn.execute(sql, (last_stream_id, batch_size))
rows = txn.fetchall()

num_deleted = 0
for row in rows:
num_deleted += self.db_pool.simple_delete_txn(
txn,
"device_inbox",
{"device_id": row[0], "user_id": row[1], "stream_id": row[2]},
)

if rows:
# send more than stream_id to progress
# otherwise it can happen in large deployments that
# no change of status is visible in the log file
# it may be that the stream_id does not change in several runs
self.db_pool.updates._background_update_progress_txn(
txn,
self.REMOVE_DELETED_DEVICES,
{
"device_id": rows[-1][0],
"user_id": rows[-1][1],
"stream_id": rows[-1][2],
},
)

return num_deleted

number_deleted = await self.db_pool.runInteraction(
number_deleted, finished = await self.db_pool.runInteraction(
"_remove_deleted_devices_from_device_inbox",
_remove_deleted_devices_from_device_inbox_txn,
self._remove_devices_from_device_inbox_txn,
self.REMOVE_DELETED_DEVICES,
progress,
batch_size,
)

# The task is finished when no more lines are deleted.
if not number_deleted:
if finished:
await self.db_pool.updates._end_background_update(
self.REMOVE_DELETED_DEVICES
)
Expand All @@ -733,74 +680,94 @@ async def _remove_hidden_devices_from_device_inbox(
Returns:
The number of deleted rows
"""
number_deleted, finished = await self.db_pool.runInteraction(
"_remove_hidden_devices_from_device_inbox_txn",
self._remove_devices_from_device_inbox_txn,
self.REMOVE_HIDDEN_DEVICES,
progress,
batch_size,
)

def _remove_hidden_devices_from_device_inbox_txn(
txn: LoggingTransaction,
) -> int:
"""stream_id is not unique
we need to use an inclusive `stream_id >= ?` clause,
since we might not have deleted all hidden device messages for the stream_id
returned from the previous query

Then delete only rows matching the `(user_id, device_id, stream_id)` tuple,
to avoid problems of deleting a large number of rows all at once
due to a single device having lots of device messages.
"""
# The task is finished when no more lines are deleted.
if finished:
await self.db_pool.updates._end_background_update(
self.REMOVE_HIDDEN_DEVICES
)

last_stream_id = progress.get("stream_id", 0)
return number_deleted

sql = """
SELECT device_id, user_id, stream_id
FROM device_inbox
WHERE
stream_id >= ?
AND (device_id, user_id) IN (
SELECT device_id, user_id FROM devices WHERE hidden = ?
)
ORDER BY stream_id
LIMIT ?
"""
def _remove_devices_from_device_inbox_txn(
babolivier marked this conversation as resolved.
Show resolved Hide resolved
self,
txn: LoggingTransaction,
update_name: str,
progress: JsonDict,
batch_size: int,
) -> Tuple[int, bool]:
"""Remove devices that were either deleted or hidden from the device_inbox table.

txn.execute(sql, (last_stream_id, True, batch_size))
rows = txn.fetchall()
Args:
update_name: The name of the update to run, used to determine how to read the
devices table.
progress: The update's progress dict.
batch_size: The batch size for this update.

num_deleted = 0
for row in rows:
num_deleted += self.db_pool.simple_delete_txn(
txn,
"device_inbox",
{"device_id": row[0], "user_id": row[1], "stream_id": row[2]},
)
Returns:
The number of rows deleted, and whether the update should be ended.
"""

if rows:
# We don't just save the `stream_id` in progress as
# otherwise it can happen in large deployments that
# no change of status is visible in the log file, as
# it may be that the stream_id does not change in several runs
self.db_pool.updates._background_update_progress_txn(
txn,
self.REMOVE_HIDDEN_DEVICES,
{
"device_id": rows[-1][0],
"user_id": rows[-1][1],
"stream_id": rows[-1][2],
},
if "max_stream_id" in progress:
max_stream_id = progress["max_stream_id"]
else:
txn.execute("SELECT max(stream_id) FROM device_inbox")
# There's a type mismatch here between how we want to type the row and
# what fetchone says it returns, but we silence it because we know that
# res can't be None.
res: Tuple[Optional[int]] = txn.fetchone() # type: ignore[assignment]
if res[0] is None:
return 0, True
else:
max_stream_id = res[0]

start = progress.get("stream_id", 0)
stop = start + batch_size

nested_select = "SELECT device_id, user_id FROM devices"
args = (start, stop)
if update_name == self.REMOVE_HIDDEN_DEVICES:
# If we want to remove hidden devices, select only rows from the devices table
# that have `hidden = TRUE`.
nested_select += " WHERE hidden = ?"
# We need to ignore mypy's error here, otherwise we can't use different
# arguments depending on whether we want to filter for hidden devices.
args += (True,) # type: ignore[assignment]
babolivier marked this conversation as resolved.
Show resolved Hide resolved

sql = (
"""
DELETE FROM device_inbox
WHERE
stream_id >= ? AND stream_id < ?
AND (device_id, user_id) NOT IN (
%s
)
babolivier marked this conversation as resolved.
Show resolved Hide resolved

return num_deleted

number_deleted = await self.db_pool.runInteraction(
"_remove_hidden_devices_from_device_inbox",
_remove_hidden_devices_from_device_inbox_txn,
"""
% nested_select
)

# The task is finished when no more lines are deleted.
if not number_deleted:
await self.db_pool.updates._end_background_update(
self.REMOVE_HIDDEN_DEVICES
txn.execute(sql, args)
num_deleted = txn.rowcount
rows = txn.fetchall()

if rows:
self.db_pool.updates._background_update_progress_txn(
txn,
update_name,
{
"stream_id": stop,
"max_stream_id": max_stream_id,
},
)

return number_deleted
return num_deleted, stop >= max_stream_id


class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore):
Expand Down