Add logging to tasks managed by the task scheduler, showing CPU and database usage. (#17219)
The log format is the same as the request log format, except: - fields that are specific to HTTP requests have been removed - the task's params are included at the end of the log line. These log lines are emitted: - when the task function finishes — both completion and failure (and I suppose it is possible for a task to become schedulable again?) - every 5 minutes whilst it is running Closes #17217. --------- Signed-off-by: Olivier 'reivilibre <oliverw@matrix.org>
This commit is contained in:
parent
b71d277438
commit
7ef00b7628
|
@ -0,0 +1 @@
|
||||||
|
Add logging to tasks managed by the task scheduler, showing CPU and database usage.
|
|
@ -24,7 +24,12 @@ from typing import TYPE_CHECKING, Awaitable, Callable, Dict, List, Optional, Set
|
||||||
|
|
||||||
from twisted.python.failure import Failure
|
from twisted.python.failure import Failure
|
||||||
|
|
||||||
from synapse.logging.context import nested_logging_context
|
from synapse.logging.context import (
|
||||||
|
ContextResourceUsage,
|
||||||
|
LoggingContext,
|
||||||
|
nested_logging_context,
|
||||||
|
set_current_context,
|
||||||
|
)
|
||||||
from synapse.metrics import LaterGauge
|
from synapse.metrics import LaterGauge
|
||||||
from synapse.metrics.background_process_metrics import (
|
from synapse.metrics.background_process_metrics import (
|
||||||
run_as_background_process,
|
run_as_background_process,
|
||||||
|
@ -81,6 +86,8 @@ class TaskScheduler:
|
||||||
MAX_CONCURRENT_RUNNING_TASKS = 5
|
MAX_CONCURRENT_RUNNING_TASKS = 5
|
||||||
# Time from the last task update after which we will log a warning
|
# Time from the last task update after which we will log a warning
|
||||||
LAST_UPDATE_BEFORE_WARNING_MS = 24 * 60 * 60 * 1000 # 24hrs
|
LAST_UPDATE_BEFORE_WARNING_MS = 24 * 60 * 60 * 1000 # 24hrs
|
||||||
|
# Report a running task's status and usage every so often.
|
||||||
|
OCCASIONAL_REPORT_INTERVAL_MS = 5 * 60 * 1000 # 5 minutes
|
||||||
|
|
||||||
def __init__(self, hs: "HomeServer"):
|
def __init__(self, hs: "HomeServer"):
|
||||||
self._hs = hs
|
self._hs = hs
|
||||||
|
@ -346,6 +353,33 @@ class TaskScheduler:
|
||||||
assert task.id not in self._running_tasks
|
assert task.id not in self._running_tasks
|
||||||
await self._store.delete_scheduled_task(task.id)
|
await self._store.delete_scheduled_task(task.id)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _log_task_usage(
|
||||||
|
state: str, task: ScheduledTask, usage: ContextResourceUsage, active_time: float
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
Log a line describing the state and usage of a task.
|
||||||
|
The log line is inspired by / a copy of the request log line format,
|
||||||
|
but with irrelevant fields removed.
|
||||||
|
|
||||||
|
active_time: Time that the task has been running for, in seconds.
|
||||||
|
"""
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"Task %s: %.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)"
|
||||||
|
" [%d dbevts] %r, %r",
|
||||||
|
state,
|
||||||
|
active_time,
|
||||||
|
usage.ru_utime,
|
||||||
|
usage.ru_stime,
|
||||||
|
usage.db_sched_duration_sec,
|
||||||
|
usage.db_txn_duration_sec,
|
||||||
|
int(usage.db_txn_count),
|
||||||
|
usage.evt_db_fetch_count,
|
||||||
|
task.resource_id,
|
||||||
|
task.params,
|
||||||
|
)
|
||||||
|
|
||||||
async def _launch_task(self, task: ScheduledTask) -> None:
|
async def _launch_task(self, task: ScheduledTask) -> None:
|
||||||
"""Launch a scheduled task now.
|
"""Launch a scheduled task now.
|
||||||
|
|
||||||
|
@ -360,8 +394,32 @@ class TaskScheduler:
|
||||||
)
|
)
|
||||||
function = self._actions[task.action]
|
function = self._actions[task.action]
|
||||||
|
|
||||||
|
def _occasional_report(
|
||||||
|
task_log_context: LoggingContext, start_time: float
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
Helper to log a 'Task continuing' line every so often.
|
||||||
|
"""
|
||||||
|
|
||||||
|
current_time = self._clock.time()
|
||||||
|
calling_context = set_current_context(task_log_context)
|
||||||
|
try:
|
||||||
|
usage = task_log_context.get_resource_usage()
|
||||||
|
TaskScheduler._log_task_usage(
|
||||||
|
"continuing", task, usage, current_time - start_time
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
set_current_context(calling_context)
|
||||||
|
|
||||||
async def wrapper() -> None:
|
async def wrapper() -> None:
|
||||||
with nested_logging_context(task.id):
|
with nested_logging_context(task.id) as log_context:
|
||||||
|
start_time = self._clock.time()
|
||||||
|
occasional_status_call = self._clock.looping_call(
|
||||||
|
_occasional_report,
|
||||||
|
TaskScheduler.OCCASIONAL_REPORT_INTERVAL_MS,
|
||||||
|
log_context,
|
||||||
|
start_time,
|
||||||
|
)
|
||||||
try:
|
try:
|
||||||
(status, result, error) = await function(task)
|
(status, result, error) = await function(task)
|
||||||
except Exception:
|
except Exception:
|
||||||
|
@ -383,6 +441,13 @@ class TaskScheduler:
|
||||||
)
|
)
|
||||||
self._running_tasks.remove(task.id)
|
self._running_tasks.remove(task.id)
|
||||||
|
|
||||||
|
current_time = self._clock.time()
|
||||||
|
usage = log_context.get_resource_usage()
|
||||||
|
TaskScheduler._log_task_usage(
|
||||||
|
status.value, task, usage, current_time - start_time
|
||||||
|
)
|
||||||
|
occasional_status_call.stop()
|
||||||
|
|
||||||
# Try launch a new task since we've finished with this one.
|
# Try launch a new task since we've finished with this one.
|
||||||
self._clock.call_later(0.1, self._launch_scheduled_tasks)
|
self._clock.call_later(0.1, self._launch_scheduled_tasks)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue