qq_lib.resubmit
Execution utilities for resubmitting qq continuous and loop jobs inside the batch environment.
This module defines the Resubmitter class, which handles resubmission of loop
and continuous jobs by resolving candidate hosts and attempting submission
on each in order until one succeeds.
1# Released under MIT License. 2# Copyright (c) 2025-2026 Ladislav Bartos and Robert Vacha Lab 3 4""" 5Execution utilities for resubmitting qq continuous and loop jobs inside the batch environment. 6 7This module defines the `Resubmitter` class, which handles resubmission of loop 8and continuous jobs by resolving candidate hosts and attempting submission 9on each in order until one succeeds. 10""" 11 12from .resubmitter import Resubmitter 13 14__all__ = [ 15 "Resubmitter", 16]
20class Resubmitter(Operator): 21 """ 22 Handles resubmission of loop and continuous jobs. 23 """ 24 25 def resubmit(self) -> str: 26 """ 27 Resubmit the job to the next cycle. 28 29 Returns: 30 str: The job ID of the newly submitted job. 31 32 Raises: 33 QQError: If the main node is not defined or if resubmission fails 34 on all candidate hosts. 35 """ 36 informer = self.get_informer() 37 input_dir = self._info_file.parent 38 39 # we set the current cycle manually instead of determining it from the archive 40 # this means that the job will always be resubmitted to the "correct" (expected) cycle, 41 # but may fail after starting if the archive does not contain files for this cycle 42 self._advance_loop_cycle(informer) 43 44 submitter = self._build_submitter(informer, input_dir) 45 hosts = ( 46 informer.info.resubmit_from 47 # fall back to batch system default 48 # this is only needed to accomodate transition from loop jobs submitted using previous versions of qq 49 or informer.batch_system.get_default_resubmit_hosts() 50 ) 51 52 return self._try_resubmit(submitter, informer, hosts) 53 54 @staticmethod 55 def _advance_loop_cycle(informer: Informer) -> None: 56 """ 57 Advance the loop cycle counter if the job is a loop job. 58 59 Args: 60 informer: The informer instance holding job metadata. 61 """ 62 if informer.info.loop_info: 63 informer.info.loop_info.current += 1 64 65 @staticmethod 66 def _build_submitter(informer: Informer, input_dir: Path) -> Submitter: 67 """ 68 Construct a Submitter configured for resubmission. 69 70 All original job parameters are preserved, except dependencies are replaced 71 with a single dependency on the current job. This is because the previous 72 cycle has already run, so its original dependencies must have been satisfied. 73 74 Args: 75 informer (Informer): The informer instance holding job metadata. 76 input_dir (Path): Path to the directory containing the job's input files. 77 78 Returns: 79 Submitter: A configured submitter ready to submit the job. 80 """ 81 return Submitter( 82 batch_system=informer.batch_system, 83 queue=informer.info.queue, 84 account=informer.info.account, 85 script=input_dir / informer.info.script_name, 86 job_type=informer.info.job_type, 87 resources=informer.info.resources, 88 loop_info=informer.info.loop_info, 89 exclude=informer.info.excluded_files, 90 include=informer.info.included_files, 91 depend=[Depend(type=DependType.AFTER_SUCCESS, jobs=[informer.info.job_id])], 92 transfer_mode=informer.info.transfer_mode, 93 server=informer.info.server, 94 interpreter=informer.info.interpreter, 95 resubmit_from=informer.info.resubmit_from 96 # fall back to batch system default 97 # this is only needed to accommodate transition from loop jobs submitted using previous versions of qq 98 or informer.batch_system.get_default_resubmit_hosts(), 99 ) 100 101 @staticmethod 102 def _try_resubmit( 103 submitter: Submitter, 104 informer: Informer, 105 hosts: list[ResubmitHost], 106 ) -> str: 107 """ 108 Attempt resubmission on each candidate host in order. 109 110 Args: 111 submitter (Submitter): The configured submitter to use for job submission. 112 informer (Informer): The informer instance holding job metadata. 113 hosts (list[ResubmitHost]): Ordered list of candidate resubmission hosts to try. 114 115 Returns: 116 str: The job ID of the newly submitted job. 117 118 Raises: 119 QQError: If the main node is not defined in the job metadata. 120 QQError: If the list of resubmission hosts is empty. 121 QQError: If resubmission fails on all candidate hosts. 122 """ 123 # get the main node for host resolution 124 # since the job should be running, the main node should be defined 125 main_node = informer.info.main_node 126 if not main_node: 127 raise QQError( 128 "Job cannot be resubmitted. The 'main_node' of the job is not defined." 129 ) 130 131 if not hosts: 132 raise QQError( 133 "Job cannot be resubmitted. No resubmission hosts defined. This is a bug." 134 ) 135 136 for host in hosts: 137 hostname = host.resolve(informer.info.input_machine, main_node) 138 logger.info(f"Resubmitting from host '{hostname}'.") 139 try: 140 return Retryer( 141 submitter.submit, 142 remote=hostname, 143 max_tries=CFG.resubmitter.retry_tries, 144 wait_seconds=CFG.resubmitter.retry_wait, 145 ).run() 146 except Exception as e: 147 logger.warning(f"Failed resubmission from host '{hostname}': {e}") 148 149 raise QQError("Could not resubmit the job.")
Handles resubmission of loop and continuous jobs.
def
resubmit(self) -> str:
25 def resubmit(self) -> str: 26 """ 27 Resubmit the job to the next cycle. 28 29 Returns: 30 str: The job ID of the newly submitted job. 31 32 Raises: 33 QQError: If the main node is not defined or if resubmission fails 34 on all candidate hosts. 35 """ 36 informer = self.get_informer() 37 input_dir = self._info_file.parent 38 39 # we set the current cycle manually instead of determining it from the archive 40 # this means that the job will always be resubmitted to the "correct" (expected) cycle, 41 # but may fail after starting if the archive does not contain files for this cycle 42 self._advance_loop_cycle(informer) 43 44 submitter = self._build_submitter(informer, input_dir) 45 hosts = ( 46 informer.info.resubmit_from 47 # fall back to batch system default 48 # this is only needed to accomodate transition from loop jobs submitted using previous versions of qq 49 or informer.batch_system.get_default_resubmit_hosts() 50 ) 51 52 return self._try_resubmit(submitter, informer, hosts)
Resubmit the job to the next cycle.
Returns:
str: The job ID of the newly submitted job.
Raises:
- QQError: If the main node is not defined or if resubmission fails on all candidate hosts.