qq_lib.respawn

Respawn utilities for qq jobs.

This module defines the Respawner class, which extends Operator to validate whether a failed or killed job can be respawned and to resubmit it with its original parameters.

Respawning involves cleaning up the working directory, clearing runtime files from the input directory, and submitting a fresh copy of the job. For loop jobs, the archive directory is checked for consistency before resubmission.

 1# Released under MIT License.
 2# Copyright (c) 2025-2026 Ladislav Bartos and Robert Vacha Lab
 3
 4"""
 5Respawn utilities for qq jobs.
 6
 7This module defines the `Respawner` class, which extends `Operator` to validate
 8whether a failed or killed job can be respawned and to resubmit it with its
 9original parameters.
10
11Respawning involves cleaning up the working directory, clearing runtime files
12from the input directory, and submitting a fresh copy of the job. For loop jobs,
13the archive directory is checked for consistency before resubmission.
14"""
15
16from .respawner import Respawner
17
18__all__ = [
19    "Respawner",
20]
class Respawner(qq_lib.core.operator.Operator):
 19class Respawner(Operator):
 20    """
 21    Respawns a failed or killed job by cleaning up and resubmitting it with the same parameters.
 22
 23    For loop jobs, the archive directory is checked for consistency before respawning.
 24    """
 25
 26    def ensure_suitable(self) -> None:
 27        """
 28        Verify that the job is in a state where it can be respawned.
 29
 30        Raises:
 31            QQNotSuitableError: If the job is in any other state than failed or killed.
 32        """
 33        if self._state not in {RealState.FAILED, RealState.KILLED}:
 34            raise QQNotSuitableError(
 35                f"Job cannot be respawned. Job is {str(self._state)}."
 36            )
 37
 38    def respawn(self) -> str:
 39        """
 40        Respawn the job by cleaning up and submitting a fresh copy.
 41
 42        Returns:
 43            str: The job ID of the newly submitted job.
 44
 45        Raises:
 46            QQError: If the submitter cannot be built or the job cannot be submitted.
 47        """
 48        informer = self.get_informer()
 49        submitter = self._build_submitter(informer)
 50
 51        input_dir = self._info_file.parent
 52
 53        # attempt to remove the working directory
 54        try:
 55            wiper = Wiper.from_informer(informer)
 56            wiper.ensure_suitable()
 57            wiper.wipe()
 58        except QQNotSuitableError:
 59            pass
 60        except QQError as e:
 61            logger.warning(f"Failed to remove working directory: {e}")
 62
 63        # clear files from the input directory
 64        clearer = Clearer([input_dir])
 65        clearer.clear()
 66
 67        # submit a new job
 68        return submitter.submit()
 69
 70    def _build_submitter(self, informer: Informer) -> Submitter:
 71        """
 72        Construct a Submitter configured for respawning.
 73
 74        All original job parameters are preserved. Dependencies are filtered
 75        to only include jobs still present in the batch system. For loop jobs,
 76        the archive directory is checked for consistency before proceeding.
 77
 78        Args:
 79            informer (Informer): The informer instance holding job metadata.
 80
 81        Returns:
 82            Submitter: A configured Submitter ready to submit the job.
 83
 84        Raises:
 85            QQError: If the loop job archive is inconsistent with the current cycle.
 86        """
 87        if (loop_info := informer.info.loop_info) is not None:
 88            self._ensure_archive_consistent(loop_info)
 89
 90        return Submitter(
 91            batch_system=informer.batch_system,
 92            queue=informer.info.queue,
 93            account=informer.info.account,
 94            script=self._info_file.parent / informer.info.script_name,
 95            job_type=informer.info.job_type,
 96            resources=informer.info.resources,
 97            loop_info=loop_info,
 98            exclude=informer.info.excluded_files,
 99            include=informer.info.included_files,
100            # we need to remove dependencies that are no longer present in the batch system
101            depend=filter_dependencies(informer.batch_system, informer.info.depend),
102            transfer_mode=informer.info.transfer_mode,
103            server=informer.info.server,
104            interpreter=informer.info.interpreter,
105            resubmit_from=informer.info.resubmit_from,
106        )
107
108    @staticmethod
109    def _ensure_archive_consistent(loop_info: LoopInfo) -> None:
110        """
111        Verify that the current loop cycle matches the archive contents.
112
113        Args:
114            loop_info (LoopInfo): Loop job metadata.
115
116        Raises:
117            QQError: If the cycle determined from the archive does not match
118                the current cycle in the loop metadata.
119        """
120        if (
121            archive_cycle := loop_info.determine_cycle_from_archive()
122        ) != loop_info.current:
123            raise QQError(
124                f"Respawning loop job in cycle '{loop_info.current}' but the loop job should continue from cycle '{archive_cycle}' "
125                "based on the contents of the archive directory. Canceling job respawn."
126            )

Respawns a failed or killed job by cleaning up and resubmitting it with the same parameters.

For loop jobs, the archive directory is checked for consistency before respawning.

def ensure_suitable(self) -> None:
26    def ensure_suitable(self) -> None:
27        """
28        Verify that the job is in a state where it can be respawned.
29
30        Raises:
31            QQNotSuitableError: If the job is in any other state than failed or killed.
32        """
33        if self._state not in {RealState.FAILED, RealState.KILLED}:
34            raise QQNotSuitableError(
35                f"Job cannot be respawned. Job is {str(self._state)}."
36            )

Verify that the job is in a state where it can be respawned.

Raises:
  • QQNotSuitableError: If the job is in any other state than failed or killed.
def respawn(self) -> str:
38    def respawn(self) -> str:
39        """
40        Respawn the job by cleaning up and submitting a fresh copy.
41
42        Returns:
43            str: The job ID of the newly submitted job.
44
45        Raises:
46            QQError: If the submitter cannot be built or the job cannot be submitted.
47        """
48        informer = self.get_informer()
49        submitter = self._build_submitter(informer)
50
51        input_dir = self._info_file.parent
52
53        # attempt to remove the working directory
54        try:
55            wiper = Wiper.from_informer(informer)
56            wiper.ensure_suitable()
57            wiper.wipe()
58        except QQNotSuitableError:
59            pass
60        except QQError as e:
61            logger.warning(f"Failed to remove working directory: {e}")
62
63        # clear files from the input directory
64        clearer = Clearer([input_dir])
65        clearer.clear()
66
67        # submit a new job
68        return submitter.submit()

Respawn the job by cleaning up and submitting a fresh copy.

Returns:

str: The job ID of the newly submitted job.

Raises:
  • QQError: If the submitter cannot be built or the job cannot be submitted.