From d7a650fcb68f5e9d6beed68f577fdfa3e9db75af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Milo=C5=A1=20Prchl=C3=ADk?= Date: Fri, 6 Oct 2023 09:32:39 +0200 Subject: [PATCH] Implement the new test check `watchdog` --- docs/releases.rst | 6 + tests/execute/reboot/basic.sh | 6 +- tests/execute/reboot/reuse.sh | 6 +- tests/test/check/data/main.fmf | 43 +++ tests/test/check/main.fmf | 9 + tests/test/check/test-watchdog.sh | 79 +++++ tmt/checks/avc.py | 3 + tmt/checks/dmesg.py | 3 + tmt/checks/watchdog.py | 485 ++++++++++++++++++++++++++++++ tmt/steps/execute/__init__.py | 60 ++-- tmt/steps/execute/internal.py | 20 +- tmt/steps/provision/__init__.py | 171 +++++++---- tmt/steps/provision/connect.py | 3 +- tmt/steps/provision/mrack.py | 3 +- tmt/utils.py | 18 +- 15 files changed, 823 insertions(+), 92 deletions(-) create mode 100755 tests/test/check/test-watchdog.sh create mode 100644 tmt/checks/watchdog.py diff --git a/docs/releases.rst b/docs/releases.rst index 83cf3aa3ba..4b992d0c67 100644 --- a/docs/releases.rst +++ b/docs/releases.rst @@ -18,6 +18,12 @@ to TPM 2.0 for now, the future release of `testcloud`__, the library behing ``virtual`` plugin, will extend the support to more versions. +A new :ref:`watchdog test check` has been +added. It monitors a guest running the test with either ping or SSH +connections, and may force reboot of the guest when it becomes +unresponsive. This is the first step towards helping tests handle kernel +panics and similar situations. + __ https://pagure.io/testcloud/ diff --git a/tests/execute/reboot/basic.sh b/tests/execute/reboot/basic.sh index 2c9a8b54b5..6d2358d655 100755 --- a/tests/execute/reboot/basic.sh +++ b/tests/execute/reboot/basic.sh @@ -11,11 +11,11 @@ rlJournalStart for interactive in "" "--interactive"; do rlPhaseStartTest "Simple reboot test (interactivity: $interactive)" rlRun -s "tmt run --scratch -i $run -dddvvva execute -h tmt $interactive" - rlAssertGrep "Reboot during test '/test' with reboot count 1" $rlRun_LOG + rlAssertGrep "Soft reboot during test '/test' with reboot count 1" $rlRun_LOG rlAssertGrep "After first reboot" $rlRun_LOG - rlAssertGrep "Reboot during test '/test' with reboot count 2" $rlRun_LOG + rlAssertGrep "Soft reboot during test '/test' with reboot count 2" $rlRun_LOG rlAssertGrep "After second reboot" $rlRun_LOG - rlAssertGrep "Reboot during test '/test' with reboot count 3" $rlRun_LOG + rlAssertGrep "Soft reboot during test '/test' with reboot count 3" $rlRun_LOG rlAssertGrep "After third reboot" $rlRun_LOG rlRun "rm $rlRun_LOG" diff --git a/tests/execute/reboot/reuse.sh b/tests/execute/reboot/reuse.sh index 110ed320c2..2e610bb919 100755 --- a/tests/execute/reboot/reuse.sh +++ b/tests/execute/reboot/reuse.sh @@ -27,11 +27,11 @@ rlJournalStart provision="provision -h connect -g $guest -P $port -u $user -k $key" for _ in $(seq 0 1); do rlRun -s "tmt run --scratch -ai $run -dddvvv $provision" - rlAssertGrep "Reboot during test '/test' with reboot count 1" $rlRun_LOG + rlAssertGrep "Soft reboot during test '/test' with reboot count 1" $rlRun_LOG rlAssertGrep "After first reboot" $rlRun_LOG - rlAssertGrep "Reboot during test '/test' with reboot count 2" $rlRun_LOG + rlAssertGrep "Soft reboot during test '/test' with reboot count 2" $rlRun_LOG rlAssertGrep "After second reboot" $rlRun_LOG - rlAssertGrep "Reboot during test '/test' with reboot count 3" $rlRun_LOG + rlAssertGrep "Soft reboot during test '/test' with reboot count 3" $rlRun_LOG rlAssertGrep "After third reboot" $rlRun_LOG rlRun "rm $rlRun_LOG" diff --git a/tests/test/check/data/main.fmf b/tests/test/check/data/main.fmf index 1062da3513..0777dacc6e 100644 --- a/tests/test/check/data/main.fmf +++ b/tests/test/check/data/main.fmf @@ -22,3 +22,46 @@ sudo bash -c "passwd --help &> /root/passwd.log; \ ls -alZ /root/passwd.log; \ rm -f /root/passwd.log" || /bin/true + +/watchdog/ping: + require: + - /usr/bin/uptime + + test: | + set -x + + export + uptime + + if [ "$TMT_REBOOT_COUNT" = "1" ]; then exit 0; fi + + # Collect a couple of successful responses + sysctl net.ipv4.icmp_echo_ignore_all + sysctl net.ipv6.icmp.echo_ignore_all + + echo "test starts, will sleep for a while" + sleep 120 + uptime + + # Trigger kernel panic. There should be no development after this line, + # but to be sure, sleep more. + echo c > /proc/sysrq-trigger + + # Now wait to be noticed by the watchdog + sleep 300 + + duration: 30m + + check: + - how: watchdog + + interval: 5 + reboot: true + + # The only viable way to test this with easy-to-setup guests is `virtual` + SSH. + # Cannot use ping, we'd be pinging our own localhost, and Beaker requires nontrivial + # setup. The watchdog should detect this & disable the ping probe. + ping: true + + ssh-ping: true + ssh-ping-threshold: 3 diff --git a/tests/test/check/main.fmf b/tests/test/check/main.fmf index 9b4cbf0bd7..d324679604 100644 --- a/tests/test/check/main.fmf +++ b/tests/test/check/main.fmf @@ -12,5 +12,14 @@ tier: 2 /avc: test: ./test-avc.sh tag+: + - provision-only - provision-local - provision-virtual + +/watchdog: + test: ./test-watchdog.sh + duration: 15m + tag+: + - provision-only + - provision-container + - provision-virtual diff --git a/tests/test/check/test-watchdog.sh b/tests/test/check/test-watchdog.sh new file mode 100755 index 0000000000..7a61b7acad --- /dev/null +++ b/tests/test/check/test-watchdog.sh @@ -0,0 +1,79 @@ +#!/bin/bash + +. /usr/share/beakerlib/beakerlib.sh || exit 1 + +function assert_check_result () { + rlAssertEquals "$1" "watchdog:$2" "$(yq -r ".[] | .check | .[] | select(.event == \"$3\") | \"\\(.name):\\(.result)\"" $results)" +} + + +rlJournalStart + rlPhaseStartSetup + rlRun "PROVISION_HOW=${PROVISION_HOW:-virtual}" + + # Using /var/tmp instead of /tmp - we need the directory to survive + # reboot, under /tmp it would be removed :/ + rlRun "run=\$(mktemp -d -p /var/tmp)" 0 "Create run directory" + + rlRun "results=$run/plan/execute/results.yaml" + + rlRun "pushd data" + rlRun "set -o pipefail" + rlPhaseEnd + + rlPhaseStartTest "Test guest watchdog ping with $PROVISION_HOW provisioning" + rlRun "test_dir=$run/plan/execute/data/guest/default-0/watchdog/ping-1" + rlRun "log=$run/log.txt" + rlRun "test_log=$test_dir/output.txt" + rlRun "watchdog_log=$test_dir/tmt-watchdog.txt" + + if [ "$PROVISION_HOW" = "container" ]; then + rlRun "tmt -c provision_method=$PROVISION_HOW run --id $run --scratch -a -vv provision -h $PROVISION_HOW test -n /watchdog" 1 + + elif [ "$PROVISION_HOW" = "virtual" ]; then + rlRun "tmt -c provision_method=$PROVISION_HOW run --id $run --scratch -a -vv provision -h $PROVISION_HOW --connection system test -n /watchdog" + + else + rlDie "Provision method $PROVISION_HOW is not supported by the test." + fi + + rlRun "cat $results" + rlRun "cat $test_log" + + if [ "$PROVISION_HOW" = "container" ]; then + rlRun "grep -E '\\[watchdog\\][[:space:]]+warn: Ping against this guest is not supported, disabling.' $log" + rlRun "grep -E '\\[watchdog\\][[:space:]]+warn: SSH ping against this guest is not supported, disabling.' $log" + + assert_check_result "watchdog as an after-test should pass" "pass" "after-test" + + rlAssertGrep "TMT_REBOOT_COUNT=\"0\"" $test_log + rlAssertNotGrep "TMT_REBOOT_COUNT=\"1\"" $test_log + rlAssertGrep "/proc/sysrq-trigger: Read-only file system" $test_log + + elif [ "$PROVISION_HOW" = "virtual" ]; then + rlRun "cat $watchdog_log" + + rlRun "grep -E '\\[watchdog\\][[:space:]]+warn: Ping against this guest is not supported, disabling.' $log" + + assert_check_result "watchdog as an after-test should pass" "pass" "after-test" + + rlAssertGrep "TMT_REBOOT_COUNT=\"0\"" $test_log + rlAssertGrep "TMT_REBOOT_COUNT=\"1\"" $test_log + rlAssertGrep "++ exit 0" $test_log + + rlAssertGrep "# ssh-ping reported" $watchdog_log + rlAssertGrep "# failed 1 of 3 allowed" $watchdog_log + rlAssertGrep "# failed 2 of 3 allowed" $watchdog_log + rlAssertGrep "# failed 3 of 3 allowed" $watchdog_log + + rlRun "grep -E '\\[watchdog\\][[:space:]]+fail: exhausted 3 SSH ping attempts' $log" + rlAssertGrep "Hard reboot during test '/watchdog/ping' with reboot count 1." $log + fi + rlPhaseEnd + + rlPhaseStartCleanup + rlRun "popd" + + rlRun "rm -rf $run" + rlPhaseEnd +rlJournalEnd diff --git a/tmt/checks/avc.py b/tmt/checks/avc.py index be235ff252..d77d54c0de 100644 --- a/tmt/checks/avc.py +++ b/tmt/checks/avc.py @@ -330,6 +330,9 @@ def after_test( name='avc', result=ResultOutcome.SKIP)] + if invocation.hard_reboot_requested: + return [CheckResult(name='dmesg', result=ResultOutcome.SKIP)] + assert invocation.phase.step.workdir is not None # narrow type outcome, path = create_final_report(invocation, logger) diff --git a/tmt/checks/dmesg.py b/tmt/checks/dmesg.py index f58719cb0c..9bdc536b05 100644 --- a/tmt/checks/dmesg.py +++ b/tmt/checks/dmesg.py @@ -150,6 +150,9 @@ def after_test( if not invocation.guest.facts.has_capability(GuestCapability.SYSLOG_ACTION_READ_ALL): return [CheckResult(name='dmesg', result=ResultOutcome.SKIP)] + if invocation.hard_reboot_requested: + return [CheckResult(name='dmesg', result=ResultOutcome.SKIP)] + outcome, path = cls._save_dmesg(invocation, CheckEvent.AFTER_TEST, logger) return [CheckResult(name='dmesg', result=outcome, log=[path])] diff --git a/tmt/checks/watchdog.py b/tmt/checks/watchdog.py new file mode 100644 index 0000000000..6e60f215a9 --- /dev/null +++ b/tmt/checks/watchdog.py @@ -0,0 +1,485 @@ +import dataclasses +import datetime +import re +import threading +import time +from collections.abc import Iterable +from typing import TYPE_CHECKING, Optional + +import tmt.log +import tmt.steps.execute +import tmt.steps.provision +import tmt.steps.provision.artemis +import tmt.steps.provision.connect +import tmt.steps.provision.local +import tmt.steps.provision.mrack +import tmt.steps.provision.podman +import tmt.steps.provision.testcloud +import tmt.utils +from tmt.checks import Check, CheckPlugin, provides_check +from tmt.result import CheckResult, ResultOutcome +from tmt.utils import Path, field, render_run_exception_streams + +if TYPE_CHECKING: + from tmt.steps.execute import TestInvocation + +PING_OUTPUT_PATTERN = re.compile( + r'(?m)(?P\d+) packets transmitted, (?P\d+) received') +SSH_PING_OUTPUT_PATTERN = re.compile(r'Ncat: Connected') + +# TODO: do not use the list of classes, it's hard to maintain. +# Tracked in https://github.com/teemtee/tmt/issues/2739 +PINGABLE_GUEST_CLASSES: tuple[type[tmt.steps.provision.Guest], ...] = ( + tmt.steps.provision.artemis.GuestArtemis, + tmt.steps.provision.connect.GuestConnect, + tmt.steps.provision.mrack.GuestBeaker, + # TODO: is there a way to ping the VM instead of localhost? + # Tracked in https://github.com/teemtee/tmt/issues/2738 + # tmt.steps.provision.testcloud.GuestTestcloud + ) + +SSH_PINGABLE_GUEST_CLASSES: tuple[type[tmt.steps.provision.Guest], ...] = ( + tmt.steps.provision.GuestSsh, + tmt.steps.provision.local.GuestLocal + ) + + +REPORT_FILENAME = 'tmt-watchdog.txt' + + +def render_report_path(invocation: 'TestInvocation') -> Path: + """ Render path to a watchdog report file from necessary components """ + + return invocation.path / REPORT_FILENAME + + +def report_progress( + log: Path, + check_name: str, + report: Iterable[str], + command_output: Optional[str] = None) -> None: + """ + Add new report into a report file. + + :param log: path to the report file. + :param report: iterable of report lines to add. Each line is emitted on its + own line in the file. + :param command_output: if set, the string is added to the report file once + ``report`` lines are written into it. + """ + + timestamp = tmt.steps.execute.ExecutePlugin.format_timestamp( + datetime.datetime.now(datetime.timezone.utc)) + + with open(log, mode='a') as f: + f.write(f'# {check_name} reported at {timestamp}\n') + + for line in report: + f.write(line) + f.write('\n') + + if command_output: + f.write('\n') + f.write(command_output) + + f.write('\n') + + +@dataclasses.dataclass +class GuestContext: + """ Per-guest watchdog context """ + + #: Current number of failed watchdog checks. + ping_failures: int = 0 + ssh_ping_failures: int = 0 + + #: If set, contains a daemonized thread running the watchdog checks. + thread: Optional[threading.Thread] = None + + #: As long as this field is set to ``True``, the watchdog will run its + #: internal loop and run relevant checks. It is unset when terminating + #: the watchdog check to notify the thread it's time to quit. + keep_running: bool = True + + +@dataclasses.dataclass +class WatchdogCheck(Check): + interval: int = field( + default=60, + help='How often should the watchdog run, in seconds.') + + reboot: bool = field( + default=False, + help='If enabled, watchdog would reboot the guest after enough failed probes.') + + ping: bool = field( + default=False, + help="If enabled, watchdog would probe guest's responsiveness with ICMP packets.") + ping_packets: int = field( + default=1, + help='How many ICMP packates to send as one probe.') + ping_threshold: int = field( + default=10, + help='How many failed ping probes before taking any further action.') + + ssh_ping: bool = field( + default=False, + help=""" + If enabled, watchdog would probe guest's responsiveness by connecting + to its SSH port. + """) + ssh_ping_threshold: int = field( + default=10, + help='How many failed SSH connections before taking any further action.') + + def notify(self, invocation: 'TestInvocation', logger: tmt.log.Logger) -> None: + """ Notify invocation that hard reboot is required """ + + if not self.reboot: + return + + invocation.hard_reboot_requested = True + invocation.terminate_process(logger=logger) + + def do_ping( + self, + invocation: 'TestInvocation', + guest_context: GuestContext, + logger: tmt.log.Logger) -> None: + """ Perform a ping check """ + + logger.debug('pinging', level=4) + + log = render_report_path(invocation) + + def _fail_parse_error(ping_output: str) -> None: + """ Handle unparseable ``ping`` output """ + + logger.fail('failed to parse ping output') + + guest_context.ping_failures += 1 + + report_progress( + log, + 'ping', + [ + '# failed to parse ping output', + f'# failed {guest_context.ping_failures} of {self.ping_threshold} allowed', + ], + command_output=ping_output + ) + + def _fail_lost_packets(ping_output: str, transmitted: int, received: int) -> None: + """ Handle missing response packets """ + + logger.fail(f'not all packets returned: {transmitted=} {received=}') + + guest_context.ping_failures += 1 + + report_progress( + log, + 'ping', + [ + '# not all packets returned', + f'# failed {guest_context.ping_failures} of {self.ping_threshold} allowed', + ], + command_output=ping_output + ) + + def _success(ping_output: str) -> None: + """ Handle successfull response """ + + logger.verbose('Received successful response to ping.', level=2) + + report = [ + '# successfull response' + ] + + if guest_context.ping_failures != 0: + report.append(f'# replenished failure budget back to {self.ping_threshold}') + + guest_context.ping_failures = 0 + + report_progress( + log, + 'ping', + report, + command_output=ping_output + ) + + def _handle_output(ping_output: str) -> None: + """ Process ``ping`` output and decide on its outcome """ + + match = PING_OUTPUT_PATTERN.search(ping_output) + + if match is None: + _fail_parse_error(ping_output) + + else: + groups = match.groupdict() + + transmitted = int(groups['transmitted']) + received = int(groups['received']) + + if transmitted != received: + _fail_lost_packets(ping_output, transmitted, received) + + else: + _success(ping_output) + + logger.debug( + f'failed {guest_context.ping_failures}' + f' of {self.ping_threshold} allowed') + + if guest_context.ping_failures >= self.ping_threshold: + logger.fail(f'exhausted {self.ping_threshold} ping attempts') + + self.notify(invocation, logger) + + try: + assert invocation.guest.primary_address is not None # narrow type + + output = tmt.utils.Command('ping', + '-c', + str(self.ping_packets), + invocation.guest.primary_address) .run(cwd=Path.cwd(), + logger=logger) + + _handle_output(output.stdout or '') + + except tmt.utils.RunError as exc: + if exc.returncode == 1: + _handle_output(exc.stdout or '') + + else: + _handle_output('\n'.join(render_run_exception_streams(exc.stdout, exc.stderr))) + + def do_ssh_ping( + self, + invocation: 'TestInvocation', + guest_context: GuestContext, + logger: tmt.log.Logger) -> None: + """ Perform a "SSH ping" check """ + + assert isinstance(invocation.guest, tmt.steps.provision.GuestSsh) + + logger.debug('checking SSH port', level=4) + + log = render_report_path(invocation) + + def _fail_unknown(ncat_output: str) -> None: + """ Handle unknown failures """ + + logger.fail('unknown error') + + guest_context.ssh_ping_failures += 1 + + report_progress(log, + 'ssh-ping', + [ + '# unknown error', + f'# failed {guest_context.ssh_ping_failures}' + f' of {self.ssh_ping_threshold} allowed', + ], + command_output=ncat_output) + + def _fail_connection_refused(ncat_output: str) -> None: + """ Handle failed connection """ + + logger.fail('connection refused') + + guest_context.ssh_ping_failures += 1 + + report_progress(log, + 'ssh-ping', + [ + '# connection refused', + f'# failed {guest_context.ssh_ping_failures}' + f' of {self.ssh_ping_threshold} allowed', + ], + command_output=ncat_output) + + def _success(ncat_output: str) -> None: + """ Handle successfull response """ + + logger.verbose('Received successful response to SSH ping.', level=2) + + report = [ + '# successfull response' + ] + + if guest_context.ssh_ping_failures != 0: + report.append(f'# replenished failure budget back to {self.ssh_ping_threshold}') + + guest_context.ssh_ping_failures = 0 + + report_progress( + log, + 'ssh-ping', + report, + command_output=ncat_output + ) + + try: + assert invocation.guest.primary_address is not None # narrow type + + output = tmt.utils.Command('nc', + '-zv', + invocation.guest.primary_address, + str(invocation.guest.port or 22)) .run(cwd=Path.cwd(), + logger=logger) + + _success(output.stderr or '') + + except tmt.utils.RunError as exc: + if exc.returncode == 1: + _fail_connection_refused(exc.stderr or '') + + else: + _fail_unknown('\n'.join(render_run_exception_streams(exc.stdout, exc.stderr))) + + logger.debug( + f'failed {guest_context.ssh_ping_failures}' + f' of {self.ssh_ping_threshold} allowed') + + if guest_context.ssh_ping_failures >= self.ssh_ping_threshold: + logger.fail(f'exhausted {self.ssh_ping_threshold} SSH ping attempts') + + self.notify(invocation, logger) + + +@provides_check('watchdog') +class Watchdog(CheckPlugin[WatchdogCheck]): + """ + Take various actions when guest becomes unresponsive. + + Watchdog runs selected probes every now and then, and when a given + number of `probes` fail, watchdog would run one or more of the + predefined `actions`. + + Check comes with two probes, "ping" and "SSH ping", and single + action, "reboot". + + * "ping" uses the classic ICMP echo to check whether the guest is + still up and running, + * "SSH ping" tries to establish SSH connection, + * "reboot" action issues a hard reboot of the guest. + + Each probe has a "budget" of allowed failures, and when it runs out, + the action is taken. A successfull probe replenishes its budget to + the original level. + + Multiple probes can be enabled at the same time, for the action to + happen it's enough if just one of them runs out of its budget. + + .. code-block:: yaml + + check: + - how: watchdog + ping: true + reboot: true + + .. code-block:: yaml + + check: + - how: watchdog + + # Use only SSH ping. + ping: false + ssh-ping: true + + # Try every 5 minutes, allow 7 failed attempts, and reboot + # the guest when we run out of attempts. + interval: 300 + reboot: true + ssh-ping-threshold: 7 + + .. versionadded:: 1.32 + """ + + _check_class = WatchdogCheck + + @classmethod + def before_test( + cls, + *, + check: WatchdogCheck, + invocation: 'TestInvocation', + environment: Optional[tmt.utils.Environment] = None, + logger: tmt.log.Logger) -> list[CheckResult]: + + # Setup a logger + watchdog_logger = logger.clone() + watchdog_logger.labels.append('watchdog') + + # Create a guest context for the guest we've been given + invocation.check_data[check.how] = GuestContext() + + guest_context: GuestContext = invocation.check_data[check.how] + + if check.ping and not isinstance(invocation.guest, PINGABLE_GUEST_CLASSES): + watchdog_logger.warn('Ping against this guest is not supported, disabling.') + + check.ping = False + + if check.ssh_ping and not isinstance(invocation.guest, SSH_PINGABLE_GUEST_CLASSES): + watchdog_logger.warn('SSH ping against this guest is not supported, disabling.') + + check.ssh_ping = False + + def watchdog(guest_context: GuestContext) -> None: + """ Watchdog thread code """ + + tid = threading.get_ident() + + watchdog_logger.debug(f'Watchdog starts in thread {tid}') + + while guest_context.keep_running: + if check.ping: + check.do_ping(invocation, guest_context, watchdog_logger) + + if check.ssh_ping: + check.do_ssh_ping(invocation, guest_context, watchdog_logger) + + time.sleep(check.interval) + + watchdog_logger.debug(f'Watchdog finished in thread {tid}') + + guest_context.thread = threading.Thread( + target=watchdog, + args=(guest_context,), + name=f'watchdog-{invocation.guest.name}', + daemon=True) + + guest_context.thread.start() + + return [] + + @classmethod + def after_test( + cls, + *, + check: WatchdogCheck, + invocation: 'TestInvocation', + environment: Optional[tmt.utils.Environment] = None, + logger: tmt.log.Logger) -> list[CheckResult]: + + watchdog_logger = logger.clone() + watchdog_logger.labels.append('watchdog') + + guest_context: GuestContext = invocation.check_data[check.how] + + if guest_context.thread: + watchdog_logger.debug(f'Terminating watchdog in thread {guest_context.thread.ident}') + + guest_context.keep_running = False + guest_context.thread.join() + + guest_context.thread = None + + return [ + CheckResult( + name='watchdog', + result=ResultOutcome.PASS, + log=[render_report_path(invocation)] + )] diff --git a/tmt/steps/execute/__init__.py b/tmt/steps/execute/__init__.py index 6482ec6264..edb18eb15b 100644 --- a/tmt/steps/execute/__init__.py +++ b/tmt/steps/execute/__init__.py @@ -165,6 +165,8 @@ class TestInvocation: results: list[Result] = dataclasses.field(default_factory=list) check_results: list[CheckResult] = dataclasses.field(default_factory=list) + check_data: dict[str, Any] = field(default_factory=dict) + return_code: Optional[int] = None start_time: Optional[str] = None end_time: Optional[str] = None @@ -225,10 +227,19 @@ def reboot_request_path(self) -> Path: return self.test_data_path / TMT_REBOOT_SCRIPT.created_file @property - def reboot_requested(self) -> bool: - """ Whether a guest reboot has been requested by the test """ + def soft_reboot_requested(self) -> bool: + """ If set, test requested a reboot """ return self.reboot_request_path.exists() + #: If set, an asynchronous observer requested a reboot while the test was + #: running. + hard_reboot_requested: bool = False + + @property + def reboot_requested(self) -> bool: + """ Whether a guest reboot has been requested while the test was running """ + return self.soft_reboot_requested or self.hard_reboot_requested + def handle_reboot(self) -> bool: """ Reboot the guest if the test requested it. @@ -247,29 +258,39 @@ def handle_reboot(self) -> bool: self._reboot_count += 1 self.logger.debug( - f"Reboot during test '{self.test}' with reboot count {self._reboot_count}.") + f"{'Hard' if self.hard_reboot_requested else 'Soft'} reboot during test '{self.test}'" + f" with reboot count {self._reboot_count}.") - with open(self.reboot_request_path) as reboot_file: - reboot_data = json.loads(reboot_file.read()) + reboot_command: Optional[ShellScript] = None + timeout: Optional[int] = None - reboot_command = None - if reboot_data.get('command'): - with suppress(TypeError): - reboot_command = ShellScript(reboot_data.get('command')) + if self.hard_reboot_requested: + pass - try: - timeout = int(reboot_data.get('timeout')) - except ValueError: - timeout = None + elif self.soft_reboot_requested: + # Extract custom hints from the file, and reset it. + with open(self.reboot_request_path) as reboot_file: + reboot_data = json.loads(reboot_file.read()) + + if reboot_data.get('command'): + with suppress(TypeError): + reboot_command = ShellScript(reboot_data.get('command')) - # Reset the file - os.remove(self.reboot_request_path) - self.guest.push(self.test_data_path) + try: + timeout = int(reboot_data.get('timeout')) + except ValueError: + timeout = None + + os.remove(self.reboot_request_path) + self.guest.push(self.test_data_path) rebooted = False try: - rebooted = self.guest.reboot(command=reboot_command, timeout=timeout) + rebooted = self.guest.reboot( + hard=self.hard_reboot_requested, + command=reboot_command, + timeout=timeout) except tmt.utils.RunError: self.logger.fail( @@ -286,6 +307,8 @@ def handle_reboot(self) -> bool: if not rebooted: raise tmt.utils.RebootTimeoutError("Reboot timed out.") + self.hard_reboot_requested = False + return True def terminate_process( @@ -320,6 +343,9 @@ def terminate_process( self.process.send_signal(signal) + if isinstance(self.guest, tmt.steps.provision.GuestSsh): + self.guest._cleanup_ssh_master_process(signal, logger) + class ExecutePlugin(tmt.steps.Plugin[ExecuteStepDataT]): """ Common parent of execute plugins """ diff --git a/tmt/steps/execute/internal.py b/tmt/steps/execute/internal.py index 83c9d42b86..d9140a7278 100644 --- a/tmt/steps/execute/internal.py +++ b/tmt/steps/execute/internal.py @@ -365,7 +365,7 @@ def _save_process( # TODO: do we want timestamps? Yes, we do, leaving that for refactoring later, # to use some reusable decorator. - invocation.check_results += self.run_checks_before_test( + invocation.check_results = self.run_checks_before_test( invocation=invocation, environment=environment, logger=logger @@ -423,9 +423,10 @@ def _save_process( # Fetch #1: we need logs and everything the test produced so we could # collect its results. - guest.pull( - source=invocation.path, - extend_options=test.test_framework.get_pull_options(invocation, logger)) + if not invocation.hard_reboot_requested: + guest.pull( + source=invocation.path, + extend_options=test.test_framework.get_pull_options(invocation, logger)) # Extract test results and store them in the invocation. Note # that these results will be overwritten with a fresh set of @@ -438,11 +439,12 @@ def _save_process( logger=logger ) - # Fetch #2: after-test checks might have produced remote files as well, - # we need to fetch them too. - guest.pull( - source=invocation.path, - extend_options=test.test_framework.get_pull_options(invocation, logger)) + if not invocation.hard_reboot_requested: + # Fetch #2: after-test checks might have produced remote files as well, + # we need to fetch them too. + guest.pull( + source=invocation.path, + extend_options=test.test_framework.get_pull_options(invocation, logger)) # Attach check results to every test result. There might be more than one, # and it's hard to pick the main one, who knows what custom results might diff --git a/tmt/steps/provision/__init__.py b/tmt/steps/provision/__init__.py index 9dec163563..097bf6f864 100644 --- a/tmt/steps/provision/__init__.py +++ b/tmt/steps/provision/__init__.py @@ -7,9 +7,11 @@ import random import re import shlex +import signal as _signal import string import subprocess import tempfile +import threading from collections.abc import Iterator from concurrent.futures import Future, ThreadPoolExecutor, as_completed from shlex import quote @@ -1248,25 +1250,38 @@ class GuestSsh(Guest): ssh_option: list[str] # Master ssh connection process and socket path + _ssh_master_process_lock: threading.Lock _ssh_master_process: Optional['subprocess.Popen[bytes]'] = None - _ssh_socket_path: Optional[Path] = None + def __init__(self, + *, + data: GuestData, + name: Optional[str] = None, + parent: Optional[tmt.utils.Common] = None, + logger: tmt.log.Logger) -> None: + self._ssh_master_process_lock = threading.Lock() + + super().__init__(data=data, logger=logger, parent=parent, name=name) + + @tmt.utils.cached_property def _ssh_guest(self) -> str: """ Return user@guest """ return f'{self.user}@{self.primary_address}' - def _ssh_socket(self) -> Path: - """ Prepare path to the master connection socket """ - if not self._ssh_socket_path: - # Use '/run/user/uid' if it exists, '/tmp' otherwise - run_dir = Path(f"/run/user/{os.getuid()}") - socket_dir = run_dir / "tmt" if run_dir.is_dir() else Path("/tmp") - socket_dir.mkdir(exist_ok=True) - self._ssh_socket_path = Path(tempfile.mktemp(dir=socket_dir)) - return self._ssh_socket_path + @tmt.utils.cached_property + def _ssh_master_socket_path(self) -> Path: + """ Return path to the SSH master socket """ + + # Use '/run/user/uid' if it exists, '/tmp' otherwise. + run_dir = Path(f"/run/user/{os.getuid()}") + socket_dir = run_dir / "tmt" if run_dir.is_dir() else Path("/tmp") + socket_dir.mkdir(exist_ok=True) + return Path(tempfile.mktemp(dir=socket_dir)) + @property def _ssh_options(self) -> Command: - """ Return common ssh options (list or joined) """ + """ Return common SSH options """ + options: tmt.utils.RawCommand = [ '-oForwardX11=no', '-oStrictHostKeyChecking=no', @@ -1294,38 +1309,96 @@ def _ssh_options(self) -> Command: # by allowing proper re-try mechanisms to kick-in. options.extend(['-oPasswordAuthentication=no']) - # Use the shared master connection - options.append(f'-S{self._ssh_socket()}') + # Include the SSH master process + options.append(f'-S{self._ssh_master_socket_path}') options.extend([f'-o{option}' for option in self.ssh_option]) return Command(*options) - def _ssh_master_connection(self, command: Command) -> None: - """ Check/create the master ssh connection """ - if self._ssh_master_process: - return + @property + def _base_ssh_command(self) -> Command: + """ A base SSH command shared by all SSH processes """ - # Do not modify the original command... - ssh_master_command = command + self._ssh_options() + Command("-MNnT", self._ssh_guest()) - self.debug(f"Create the master ssh connection: {ssh_master_command}") - self._ssh_master_process = subprocess.Popen( + command = Command( + *(["sshpass", "-p", self.password] if self.password else []), + "ssh" + ) + + return command + self._ssh_options + + def _spawn_ssh_master_process(self) -> subprocess.Popen[bytes]: + """ Spawn the SSH master process """ + + # NOTE: do not modify `command`, it might be re-used by the caller. To + # be safe, include it in our own command. + ssh_master_command = self._base_ssh_command \ + + self._ssh_options \ + + Command("-MNnT", self._ssh_guest) + + self.debug(f"Spawning the SSH master process: {ssh_master_command}") + + return subprocess.Popen( ssh_master_command.to_popen(), stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + def _cleanup_ssh_master_process( + self, + signal: _signal.Signals = _signal.SIGTERM, + logger: Optional[tmt.log.Logger] = None) -> None: + logger = logger or self._logger + + with self._ssh_master_process_lock: + if self._ssh_master_process is None: + logger.debug('The SSH master process cannot be terminated because it is unset.', + level=3) + + return + + logger.debug( + f'Terminating the SSH master process {self._ssh_master_process.pid}' + f' with {signal.name}.', + level=3) + + self._ssh_master_process.send_signal(signal) + + try: + # TODO: make the deadline configurable + self._ssh_master_process.wait(timeout=3) + + except subprocess.TimeoutExpired: + logger.warn( + f'Terminating the SSH master process {self._ssh_master_process.pid}' + ' timed out.') + + self._ssh_master_process = None + + @property def _ssh_command(self) -> Command: - """ Prepare an ssh command line for execution """ - command = Command( - *(["sshpass", "-p", self.password] if self.password else []), - "ssh" - ) + """ A base SSH command shared by all SSH processes """ - # Check the master connection - self._ssh_master_connection(command) + with self._ssh_master_process_lock: + if self._ssh_master_process is None: + self._ssh_master_process = self._spawn_ssh_master_process() - return command + self._ssh_options() + return self._base_ssh_command + + def _unlink_ssh_master_socket_path(self) -> None: + with self._ssh_master_process_lock: + if not self._ssh_master_socket_path: + return + + self.debug(f"Remove SSH master socket '{self._ssh_master_socket_path}'.", level=3) + + try: + self._ssh_master_socket_path.unlink(missing_ok=True) + + except OSError as error: + self.debug(f"Failed to remove the SSH master socket: {error}", level=3) + + del self._ssh_master_socket_path def _run_ansible( self, @@ -1358,8 +1431,8 @@ def _run_ansible( ansible_command += self._ansible_extra_args(extra_args) ansible_command += Command( - '--ssh-common-args', self._ssh_options().to_element(), - '-i', f'{self._ssh_guest()},', + '--ssh-common-args', self._ssh_options.to_element(), + '-i', f'{self._ssh_guest},', playbook) # FIXME: cast() - https://github.com/teemtee/tmt/issues/1372 @@ -1425,7 +1498,7 @@ def execute(self, if self.primary_address is None and not self.is_dry_run: raise tmt.utils.GeneralError('The guest is not available.') - ssh_command: tmt.utils.Command = self._ssh_command() + ssh_command: tmt.utils.Command = self._ssh_command # Run in interactive mode if requested if interactive: @@ -1459,7 +1532,7 @@ def execute(self, remote_command = remote_commands.to_element() ssh_command += [ - self._ssh_guest(), + self._ssh_guest, remote_command ] @@ -1536,9 +1609,9 @@ def rsync() -> None: self._run_guest_command(Command( *cmd, *options, - "-e", self._ssh_command().to_element(), + "-e", self._ssh_command.to_element(), source, - f"{self._ssh_guest()}:{destination}" + f"{self._ssh_guest}:{destination}" ), silent=True) # Try to push twice, check for rsync after the first failure @@ -1601,8 +1674,8 @@ def rsync() -> None: self._run_guest_command(Command( "rsync", *options, - "-e", self._ssh_command().to_element(), - f"{self._ssh_guest()}:{source}", + "-e", self._ssh_command.to_element(), + f"{self._ssh_guest}:{source}", destination ), silent=True) @@ -1632,28 +1705,17 @@ def stop(self) -> None: """ # Close the master ssh connection - if self._ssh_master_process: - self.debug("Close the master ssh connection.", level=3) - try: - self._ssh_master_process.terminate() - self._ssh_master_process.wait(timeout=3) - except subprocess.TimeoutExpired: - pass + self._cleanup_ssh_master_process() # Remove the ssh socket - if self._ssh_socket_path and self._ssh_socket_path.exists(): - self.debug( - f"Remove ssh socket '{self._ssh_socket_path}'.", level=3) - try: - self._ssh_socket_path.unlink() - except OSError as error: - self.debug(f"Failed to remove the socket: {error}", level=3) + self._unlink_ssh_master_socket_path() def perform_reboot(self, command: Callable[[], tmt.utils.CommandOutput], timeout: Optional[int] = None, tick: float = tmt.utils.DEFAULT_WAIT_TICK, - tick_increase: float = tmt.utils.DEFAULT_WAIT_TICK_INCREASE) -> bool: + tick_increase: float = tmt.utils.DEFAULT_WAIT_TICK_INCREASE, + hard: bool = False) -> bool: """ Perform the actual reboot and wait for the guest to recover. @@ -1680,7 +1742,7 @@ def get_boot_time() -> int: return int(match.group(1)) - current_boot_time = get_boot_time() + current_boot_time = 0 if hard else get_boot_time() try: command() @@ -1762,7 +1824,8 @@ def reboot( lambda: self.execute(actual_command), timeout=timeout, tick=tick, - tick_increase=tick_increase) + tick_increase=tick_increase, + hard=hard) def remove(self) -> None: """ diff --git a/tmt/steps/provision/connect.py b/tmt/steps/provision/connect.py index 5efbec3d76..9cf3135313 100644 --- a/tmt/steps/provision/connect.py +++ b/tmt/steps/provision/connect.py @@ -123,7 +123,8 @@ def reboot( self.hard_reboot.to_shell_command()), # type: ignore[union-attr] timeout=timeout, tick=tick, - tick_increase=tick_increase) + tick_increase=tick_increase, + hard=True) if not hard and self.soft_reboot is not None: self.debug(f"Reboot using the soft reboot command '{self.soft_reboot}'.") diff --git a/tmt/steps/provision/mrack.py b/tmt/steps/provision/mrack.py index b383aa8f9d..a69625c4c0 100644 --- a/tmt/steps/provision/mrack.py +++ b/tmt/steps/provision/mrack.py @@ -814,7 +814,8 @@ def reboot( lambda: self._run_guest_command(reboot_script.to_shell_command()), timeout=timeout, tick=tick, - tick_increase=tick_increase) + tick_increase=tick_increase, + hard=True) return super().reboot( hard=hard, diff --git a/tmt/utils.py b/tmt/utils.py index 17fd40653d..38015b112a 100644 --- a/tmt/utils.py +++ b/tmt/utils.py @@ -1305,6 +1305,8 @@ def log_event(msg: str) -> None: level=4, topic=tmt.log.Topic.COMMAND_EVENTS) + log_event('waiting for process to finish') + try: process.wait(timeout=timeout) @@ -1319,10 +1321,15 @@ def log_event(msg: str) -> None: process.returncode = ProcessExitCodes.TIMEOUT + else: + log_event('waiting for process completed') + stdout: Optional[str] stderr: Optional[str] if interactive: + log_event('stream readers not active') + stdout, stderr = None, None else: @@ -6485,11 +6492,14 @@ def field( Consumed by :py:class:`tmt.export.Exportable`. """ - if is_flag is False and isinstance(default, bool): - raise GeneralError("Container field must be a flag to have boolean default value.") + if option: + if is_flag is False and isinstance(default, bool): + raise GeneralError( + "Container field must be a flag to have boolean default value.") - if is_flag is True and not isinstance(default, bool): - raise GeneralError("Container field must have a boolean default value when it is a flag.") + if is_flag is True and not isinstance(default, bool): + raise GeneralError( + "Container field must have a boolean default value when it is a flag.") metadata: FieldMetadata[T] = FieldMetadata( internal=internal,