qq_lib.kill

Termination utilities for qq jobs.

This module defines the Killer class, which extends Operator to validate whether a job can be terminated and to invoke the batch system's kill command.

It also updates and locks the qq info file when appropriate, ensuring that killed jobs are consistently recorded.

 1# Released under MIT License.
 2# Copyright (c) 2025-2026 Ladislav Bartos and Robert Vacha Lab
 3
 4"""
 5Termination utilities for qq jobs.
 6
 7This module defines the `Killer` class, which extends `Operator` to validate
 8whether a job can be terminated and to invoke the batch system's kill command.
 9
10It also updates and locks the qq info file when appropriate, ensuring that killed jobs
11are consistently recorded.
12"""
13
14from .killer import Killer
15
16__all__ = [
17    "Killer",
18]
class Killer(qq_lib.core.operator.Operator):
 17class Killer(Operator):
 18    """
 19    Class managing the termination of qq jobs.
 20    """
 21
 22    def ensure_suitable(self) -> None:
 23        """
 24        Verify that the job is in a state where it can be terminated.
 25
 26        Raises:
 27            QQNotSuitableError: If the job has already been completed, killed,
 28                                or is currently exiting.
 29        """
 30        if self._is_completed():
 31            raise QQNotSuitableError(
 32                "Job cannot be terminated. Job is already completed."
 33            )
 34
 35        if self._is_killed():
 36            raise QQNotSuitableError(
 37                "Job cannot be terminated. Job has already been killed."
 38            )
 39
 40        if self._is_exiting():
 41            raise QQNotSuitableError(
 42                "Job cannot be terminated. Job is in an exiting state."
 43            )
 44
 45    def kill(self, force: bool = False) -> str:
 46        """
 47        Execute the kill command for the job using the batch system.
 48
 49        Returns:
 50            str: The identifier of the terminated job.
 51
 52        Raises:
 53            QQError: If the kill command fails.
 54        """
 55        # has to be performed before actually killing the job
 56        should_update = self._should_update_info_file(force)
 57
 58        if force:
 59            self._batch_system.job_kill_force(self._informer.info.job_id)
 60        else:
 61            self._batch_system.job_kill(self._informer.info.job_id)
 62
 63        if should_update:
 64            self._update_info_file()
 65
 66        return self._informer.info.job_id
 67
 68    def _should_update_info_file(self, force: bool) -> bool:
 69        """
 70        Determine whether the killer itself should log that
 71        the job has been killed into the info file.
 72
 73        Args:
 74            force (bool): The job is being killed forcibly.
 75
 76        Returns:
 77            bool:
 78                True if the info file should be updated by the qq kill process,
 79                False otherwise.
 80        """
 81
 82        return (
 83            (force or self._is_queued() or self._is_suspended())
 84            and not self._is_completed()
 85            and not self._is_killed()
 86            and not self._is_unknown_inconsistent()
 87        )
 88
 89    def _update_info_file(self) -> None:
 90        """
 91        Mark the job as killed in the info file and lock it to prevent overwriting.
 92        """
 93        self._informer.set_killed(datetime.now())
 94        self._informer.to_file(self._info_file)
 95        # strictly speaking, we only need to lock the info file
 96        # when dealing with a booting job but doing it for the other jobs
 97        # which state is managed by `qq kill` does not hurt anything
 98        self._lock_file(self._info_file)
 99
100    def _is_suspended(self) -> bool:
101        """Check if the job is currently suspended."""
102        return self._state == RealState.SUSPENDED
103
104    def _is_queued(self) -> bool:
105        """Check if the job is queued, held, waiting, or booting."""
106        return self._state in {
107            RealState.QUEUED,
108            RealState.HELD,
109            RealState.WAITING,
110            RealState.BOOTING,
111        }
112
113    def _is_killed(self) -> bool:
114        """Check if the job has already been killed."""
115        return self._state == RealState.KILLED or (
116            self._state == RealState.EXITING
117            and self._informer.info.job_exit_code is None
118        )
119
120    def _is_completed(self) -> bool:
121        """Check if the job has finished or failed."""
122        return self._state in {RealState.FINISHED, RealState.FAILED}
123
124    def _is_exiting(self) -> bool:
125        """Check if the job is currently exiting."""
126        return self._state == RealState.EXITING
127
128    def _is_unknown_inconsistent(self) -> bool:
129        """Check if the job is in an unknown or inconsistent state."""
130        return self._state in {RealState.UNKNOWN, RealState.IN_AN_INCONSISTENT_STATE}
131
132    def _lock_file(self, file_path: Path) -> None:
133        """
134        Remove write permissions for an info file to prevent overwriting
135        information about the killed job.
136        """
137        current_mode = Path.stat(file_path).st_mode
138        new_mode = current_mode & ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
139
140        Path.chmod(file_path, new_mode)

Class managing the termination of qq jobs.

def ensure_suitable(self) -> None:
22    def ensure_suitable(self) -> None:
23        """
24        Verify that the job is in a state where it can be terminated.
25
26        Raises:
27            QQNotSuitableError: If the job has already been completed, killed,
28                                or is currently exiting.
29        """
30        if self._is_completed():
31            raise QQNotSuitableError(
32                "Job cannot be terminated. Job is already completed."
33            )
34
35        if self._is_killed():
36            raise QQNotSuitableError(
37                "Job cannot be terminated. Job has already been killed."
38            )
39
40        if self._is_exiting():
41            raise QQNotSuitableError(
42                "Job cannot be terminated. Job is in an exiting state."
43            )

Verify that the job is in a state where it can be terminated.

Raises:
  • QQNotSuitableError: If the job has already been completed, killed, or is currently exiting.
def kill(self, force: bool = False) -> str:
45    def kill(self, force: bool = False) -> str:
46        """
47        Execute the kill command for the job using the batch system.
48
49        Returns:
50            str: The identifier of the terminated job.
51
52        Raises:
53            QQError: If the kill command fails.
54        """
55        # has to be performed before actually killing the job
56        should_update = self._should_update_info_file(force)
57
58        if force:
59            self._batch_system.job_kill_force(self._informer.info.job_id)
60        else:
61            self._batch_system.job_kill(self._informer.info.job_id)
62
63        if should_update:
64            self._update_info_file()
65
66        return self._informer.info.job_id

Execute the kill command for the job using the batch system.

Returns:

str: The identifier of the terminated job.

Raises:
  • QQError: If the kill command fails.