qq_lib.resubmit

Execution utilities for resubmitting qq continuous and loop jobs inside the batch environment.

This module defines the Resubmitter class, which handles resubmission of loop and continuous jobs by resolving candidate hosts and attempting submission on each in order until one succeeds.

 1# Released under MIT License.
 2# Copyright (c) 2025-2026 Ladislav Bartos and Robert Vacha Lab
 3
 4"""
 5Execution utilities for resubmitting qq continuous and loop jobs inside the batch environment.
 6
 7This module defines the `Resubmitter` class, which handles resubmission of loop
 8and continuous jobs by resolving candidate hosts and attempting submission
 9on each in order until one succeeds.
10"""
11
12from .resubmitter import Resubmitter
13
14__all__ = [
15    "Resubmitter",
16]
class Resubmitter(qq_lib.core.operator.Operator):
 20class Resubmitter(Operator):
 21    """
 22    Handles resubmission of loop and continuous jobs.
 23    """
 24
 25    def resubmit(self) -> str:
 26        """
 27        Resubmit the job to the next cycle.
 28
 29        Returns:
 30            str: The job ID of the newly submitted job.
 31
 32        Raises:
 33            QQError: If the main node is not defined or if resubmission fails
 34                on all candidate hosts.
 35        """
 36        informer = self.get_informer()
 37        input_dir = self._info_file.parent
 38
 39        # we set the current cycle manually instead of determining it from the archive
 40        # this means that the job will always be resubmitted to the "correct" (expected) cycle,
 41        # but may fail after starting if the archive does not contain files for this cycle
 42        self._advance_loop_cycle(informer)
 43
 44        submitter = self._build_submitter(informer, input_dir)
 45        hosts = (
 46            informer.info.resubmit_from
 47            # fall back to batch system default
 48            # this is only needed to accomodate transition from loop jobs submitted using previous versions of qq
 49            or informer.batch_system.get_default_resubmit_hosts()
 50        )
 51
 52        return self._try_resubmit(submitter, informer, hosts)
 53
 54    @staticmethod
 55    def _advance_loop_cycle(informer: Informer) -> None:
 56        """
 57        Advance the loop cycle counter if the job is a loop job.
 58
 59        Args:
 60            informer: The informer instance holding job metadata.
 61        """
 62        if informer.info.loop_info:
 63            informer.info.loop_info.current += 1
 64
 65    @staticmethod
 66    def _build_submitter(informer: Informer, input_dir: Path) -> Submitter:
 67        """
 68        Construct a Submitter configured for resubmission.
 69
 70        All original job parameters are preserved, except dependencies are replaced
 71        with a single dependency on the current job. This is because the previous
 72        cycle has already run, so its original dependencies must have been satisfied.
 73
 74        Args:
 75            informer (Informer): The informer instance holding job metadata.
 76            input_dir (Path): Path to the directory containing the job's input files.
 77
 78        Returns:
 79            Submitter: A configured submitter ready to submit the job.
 80        """
 81        return Submitter(
 82            batch_system=informer.batch_system,
 83            queue=informer.info.queue,
 84            account=informer.info.account,
 85            script=input_dir / informer.info.script_name,
 86            job_type=informer.info.job_type,
 87            resources=informer.info.resources,
 88            loop_info=informer.info.loop_info,
 89            exclude=informer.info.excluded_files,
 90            include=informer.info.included_files,
 91            depend=[Depend(type=DependType.AFTER_SUCCESS, jobs=[informer.info.job_id])],
 92            transfer_mode=informer.info.transfer_mode,
 93            server=informer.info.server,
 94            interpreter=informer.info.interpreter,
 95            resubmit_from=informer.info.resubmit_from
 96            # fall back to batch system default
 97            # this is only needed to accommodate transition from loop jobs submitted using previous versions of qq
 98            or informer.batch_system.get_default_resubmit_hosts(),
 99        )
100
101    @staticmethod
102    def _try_resubmit(
103        submitter: Submitter,
104        informer: Informer,
105        hosts: list[ResubmitHost],
106    ) -> str:
107        """
108        Attempt resubmission on each candidate host in order.
109
110        Args:
111            submitter (Submitter): The configured submitter to use for job submission.
112            informer (Informer): The informer instance holding job metadata.
113            hosts (list[ResubmitHost]): Ordered list of candidate resubmission hosts to try.
114
115        Returns:
116            str: The job ID of the newly submitted job.
117
118        Raises:
119            QQError: If the main node is not defined in the job metadata.
120            QQError: If the list of resubmission hosts is empty.
121            QQError: If resubmission fails on all candidate hosts.
122        """
123        # get the main node for host resolution
124        # since the job should be running, the main node should be defined
125        main_node = informer.info.main_node
126        if not main_node:
127            raise QQError(
128                "Job cannot be resubmitted. The 'main_node' of the job is not defined."
129            )
130
131        if not hosts:
132            raise QQError(
133                "Job cannot be resubmitted. No resubmission hosts defined. This is a bug."
134            )
135
136        for host in hosts:
137            hostname = host.resolve(informer.info.input_machine, main_node)
138            logger.info(f"Resubmitting from host '{hostname}'.")
139            try:
140                return Retryer(
141                    submitter.submit,
142                    remote=hostname,
143                    max_tries=CFG.resubmitter.retry_tries,
144                    wait_seconds=CFG.resubmitter.retry_wait,
145                ).run()
146            except Exception as e:
147                logger.warning(f"Failed resubmission from host '{hostname}': {e}")
148
149        raise QQError("Could not resubmit the job.")

Handles resubmission of loop and continuous jobs.

def resubmit(self) -> str:
25    def resubmit(self) -> str:
26        """
27        Resubmit the job to the next cycle.
28
29        Returns:
30            str: The job ID of the newly submitted job.
31
32        Raises:
33            QQError: If the main node is not defined or if resubmission fails
34                on all candidate hosts.
35        """
36        informer = self.get_informer()
37        input_dir = self._info_file.parent
38
39        # we set the current cycle manually instead of determining it from the archive
40        # this means that the job will always be resubmitted to the "correct" (expected) cycle,
41        # but may fail after starting if the archive does not contain files for this cycle
42        self._advance_loop_cycle(informer)
43
44        submitter = self._build_submitter(informer, input_dir)
45        hosts = (
46            informer.info.resubmit_from
47            # fall back to batch system default
48            # this is only needed to accomodate transition from loop jobs submitted using previous versions of qq
49            or informer.batch_system.get_default_resubmit_hosts()
50        )
51
52        return self._try_resubmit(submitter, informer, hosts)

Resubmit the job to the next cycle.

Returns:

str: The job ID of the newly submitted job.

Raises:
  • QQError: If the main node is not defined or if resubmission fails on all candidate hosts.