Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Fix bug with new task scheduler using lots of CPU. (#16278)
Browse files Browse the repository at this point in the history
Using the new `TaskScheduler` meant that we'ed create lots of new
metrics (due to adding task ID to the desc of background process),
resulting in requests for metrics taking an increasing amount of CPU.
  • Loading branch information
erikjohnston authored Sep 8, 2023
1 parent 9084429 commit f43d994
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 21 deletions.
1 change: 1 addition & 0 deletions changelog.d/16278.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix using the new task scheduler causing lots of CPU to be used.
43 changes: 22 additions & 21 deletions synapse/util/task_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from twisted.python.failure import Failure

from synapse.logging.context import nested_logging_context
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import JsonMapping, ScheduledTask, TaskStatus
from synapse.util.stringutils import random_string
Expand Down Expand Up @@ -316,26 +317,27 @@ async def _launch_task(self, task: ScheduledTask) -> None:
function = self._actions[task.action]

async def wrapper() -> None:
try:
(status, result, error) = await function(task)
except Exception:
f = Failure()
logger.error(
f"scheduled task {task.id} failed",
exc_info=(f.type, f.value, f.getTracebackObject()),
with nested_logging_context(task.id):
try:
(status, result, error) = await function(task)
except Exception:
f = Failure()
logger.error(
f"scheduled task {task.id} failed",
exc_info=(f.type, f.value, f.getTracebackObject()),
)
status = TaskStatus.FAILED
result = None
error = f.getErrorMessage()

await self._store.update_scheduled_task(
task.id,
self._clock.time_msec(),
status=status,
result=result,
error=error,
)
status = TaskStatus.FAILED
result = None
error = f.getErrorMessage()

await self._store.update_scheduled_task(
task.id,
self._clock.time_msec(),
status=status,
result=result,
error=error,
)
self._running_tasks.remove(task.id)
self._running_tasks.remove(task.id)

if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS:
return
Expand All @@ -353,5 +355,4 @@ async def wrapper() -> None:

self._running_tasks.add(task.id)
await self.update_task(task.id, status=TaskStatus.ACTIVE)
description = f"{task.id}-{task.action}"
run_as_background_process(description, wrapper)
run_as_background_process(task.action, wrapper)

0 comments on commit f43d994

Please sign in to comment.