qq_lib.submit
Utilities for submitting qq jobs.
This module integrates three main components - Parser, Submitter, and
SubmitterFactory - that collectively interpret submission settings, construct
job metadata, and hand off execution to the batch system.
Parser extracts qq directives declared inside the script (via # qq ...
lines) and normalizes them into structured submission parameters such as
resources, dependencies, file include/exclude rules and loop-job fields.
Submitter validates the script, prevents accidental duplicate submissions,
constructs the qq info file, sets up environment variables needed by qq run,
and finally invokes the batch system's submission mechanism.
SubmitterFactory coordinates command-line arguments with script-embedded
directives, merges and resolves resources, determines the batch system and
queue, constructs loop-job settings, and ultimately produces a fully configured
Submitter. It ensures a consistent and unified interpretation of submission
parameters from all available sources.
1# Released under MIT License. 2# Copyright (c) 2025-2026 Ladislav Bartos and Robert Vacha Lab 3 4""" 5Utilities for submitting qq jobs. 6 7This module integrates three main components - `Parser`, `Submitter`, and 8`SubmitterFactory` - that collectively interpret submission settings, construct 9job metadata, and hand off execution to the batch system. 10 11`Parser` extracts qq directives declared inside the script (via `# qq ...` 12lines) and normalizes them into structured submission parameters such as 13resources, dependencies, file include/exclude rules and loop-job fields. 14 15`Submitter` validates the script, prevents accidental duplicate submissions, 16constructs the qq info file, sets up environment variables needed by `qq run`, 17and finally invokes the batch system's submission mechanism. 18 19`SubmitterFactory` coordinates command-line arguments with script-embedded 20directives, merges and resolves resources, determines the batch system and 21queue, constructs loop-job settings, and ultimately produces a fully configured 22`Submitter`. It ensures a consistent and unified interpretation of submission 23parameters from all available sources. 24""" 25 26from .factory import SubmitterFactory 27from .parser import Parser 28from .submitter import Submitter 29 30__all__ = ["SubmitterFactory", "Parser", "Submitter"]
27class SubmitterFactory: 28 """ 29 Factory class to construct a Submitter instance based on parameters from 30 the command-line and from the script itself. 31 """ 32 33 def __init__(self, script: Path, **kwargs): 34 """ 35 Initialize the factory with the script, command-line parameters, and additional options. 36 37 Args: 38 script (Path): Path to the script to submit. 39 **kwargs: Keyword arguments from the command line. 40 """ 41 from qq_lib.submit.cli import submit 42 43 self._parser = Parser(script, submit.params) 44 self._script = script 45 self._input_dir = script.parent 46 self._kwargs = kwargs 47 48 def make_submitter(self) -> Submitter: 49 """ 50 Construct and return a Submitter instance. 51 52 Returns: 53 Submitter: A fully initialized submitter object ready to submit a job. 54 55 Raises: 56 QQError: If required information, such as the submission queue, is missing. 57 """ 58 self._parser.parse() 59 60 BatchSystem = self._get_batch_system() 61 queue = self._get_queue() 62 63 if (job_type := self._get_job_type()) == JobType.LOOP: 64 loop_info = self._get_loop_info() 65 else: 66 # tell the user that any loop job-specific options will be ignored 67 # because the job is not a loop job 68 self._print_warning_if_loop_info_defined(job_type) 69 loop_info = None 70 71 if job_type not in (JobType.LOOP, JobType.CONTINUOUS): 72 # tell the user that 'resubmit_from' will be ignored if the 73 # job type is not 'loop' or 'continuous' 74 self._print_warning_if_resubmit_from_defined(job_type) 75 76 server = self._get_server() 77 78 return Submitter( 79 batch_system=BatchSystem, 80 queue=queue, 81 account=self._get_account(), 82 script=self._script, 83 job_type=job_type, 84 resources=self._get_resources(BatchSystem, queue, server), 85 loop_info=loop_info, 86 exclude=self._get_exclude(), 87 include=self._get_include(), 88 depend=self._get_depend(), 89 transfer_mode=self._get_transfer_mode(), 90 server=server, 91 interpreter=self._get_interpreter(), 92 resubmit_from=self._get_resubmit_from(BatchSystem) 93 if job_type in {JobType.LOOP, JobType.CONTINUOUS} 94 else None, 95 ) 96 97 def _get_batch_system(self) -> type[BatchInterface]: 98 """ 99 Determine which batch system to use for the job submission. 100 101 Priority: 102 1. Command-line specification 103 2. Batch system specified in the script 104 3. Environment variable 105 4. Guessed batch system 106 107 Returns: 108 type[BatchInterface]: The selected batch system class. 109 """ 110 if batch_system := self._kwargs.get("batch_system"): 111 return BatchInterface.from_str(batch_system) 112 return self._parser.get_batch_system() or BatchInterface.from_env_var_or_guess() 113 114 def _get_job_type(self) -> JobType: 115 """ 116 Determine the type of job to submit. 117 118 Priority: 119 1. Command-line specification 120 2. Job type specified in the script 121 3. Default to `JobType.STANDARD` 122 123 Returns: 124 JobType: The determined job type. 125 """ 126 if job_type := self._kwargs.get("job_type"): 127 return JobType.from_str(job_type) 128 return self._parser.get_job_type() or JobType.STANDARD 129 130 def _get_queue(self) -> str: 131 """ 132 Determine the submission queue to use. 133 134 Priority: 135 1. Command-line specification 136 2. Queue specified in the script 137 138 Returns: 139 str: Name of the submission queue. 140 141 Raises: 142 QQError: If no queue is specified either in kwargs or in the script. 143 """ 144 if not (queue := self._kwargs.get("queue") or self._parser.get_queue()): 145 raise QQError("Submission queue not specified.") 146 return queue 147 148 def _get_resources( 149 self, BatchSystem: type[BatchInterface], queue: str, server: str | None 150 ) -> Resources: 151 """ 152 Get the resource requirements for the job by merging the requirements specified on the command 153 line with requirements specified inside the submitted script. 154 155 The resources are then further modified to conform to the provided `BatchSystem` and submission `queue`. 156 157 Args: 158 BatchSystem (type[BatchInterface]): The batch system class to use. 159 queue (str): The submission queue. 160 server (str | None): The submission server. `None` = the current main server. 161 162 Returns: 163 Resources: A merged Resources object containing the final resource requirements. 164 """ 165 field_names = {f.name for f in fields(Resources)} 166 command_line_resources = Resources( 167 **{k: v for k, v in self._kwargs.items() if k in field_names} 168 ) 169 170 return BatchSystem.transform_resources( 171 queue, 172 server, 173 Resources.merge_resources( 174 command_line_resources, self._parser.get_resources() 175 ), 176 ) 177 178 def _get_loop_info(self) -> LoopInfo: 179 """ 180 Construct LoopInfo holding information about the loop job. 181 182 Returns: 183 LoopInfo: An object containing loop job parameters. 184 185 Raises: 186 QQError: If required loop job parameters are missing or invalid. 187 """ 188 return LoopInfo( 189 self._kwargs.get("loop_start") or self._parser.get_loop_start() or 1, 190 self._kwargs.get("loop_end") or self._parser.get_loop_end(), 191 self._input_dir 192 / ( 193 self._kwargs.get("archive") 194 or self._parser.get_archive() 195 or CFG.loop_jobs.archive_dir 196 ), 197 self._kwargs.get("archive_format") 198 or self._parser.get_archive_format() 199 or CFG.loop_jobs.archive_format, 200 input_dir=self._input_dir, 201 archive_mode=TransferMode.multi_from_str( 202 self._kwargs.get("archive_mode") or "" 203 ) 204 or self._parser.get_archive_mode(), 205 ) 206 207 def _print_warning_if_loop_info_defined(self, job_type: JobType) -> None: 208 """ 209 Print warning(s) if any of the loop-job specific options 210 are defined either on the command line or in the script itself. 211 This should only be used if the job is not a loop job. 212 """ 213 for option, parser_func in zip( 214 ["loop_start", "loop_end", "archive", "archive_format", "archive_mode"], 215 [ 216 self._parser.get_loop_start, 217 self._parser.get_loop_end, 218 self._parser.get_archive, 219 self._parser.get_archive_format, 220 self._parser.get_archive_mode, 221 ], 222 ): 223 if self._kwargs.get(option) or parser_func(): 224 logger.warning( 225 f"Option '{option}' is specified but job type is '{str(job_type)}', not 'loop' - '{option}' will be ignored." 226 ) 227 228 def _print_warning_if_resubmit_from_defined(self, job_type: JobType) -> None: 229 """ 230 Print a warning if the 'resubmit_from' option is defined but the job type is not 'loop' or 'continuous'. 231 """ 232 if self._kwargs.get("resubmit_from") or self._parser.get_resubmit_from(): 233 logger.warning( 234 f"Option 'resubmit_from' is specified but job type is '{str(job_type)}', not 'loop' or 'continuous' - 'resubmit_from' will be ignored." 235 ) 236 237 def _get_exclude(self) -> list[Path]: 238 """ 239 Determine the files to exclude from being copied to the job's working directory. 240 241 Priority: 242 1. Excluded files specified on the command line. 243 2. Excluded files specified inside the submitted script. 244 245 The lists are NOT merged. 246 247 Returns: 248 list[Path]: List of relative file paths to exclude. 249 """ 250 return ( 251 split_files_list(self._kwargs.get("exclude")) or self._parser.get_exclude() 252 ) 253 254 def _get_include(self) -> list[Path]: 255 """ 256 Determine the files to explicitly copy to the job's working directory. 257 258 Priority: 259 1. Included files specified on the command line. 260 2. Included files specified inside the submitted script. 261 262 The lists are NOT merged. 263 264 Returns: 265 list[Path]: List of file paths to include. 266 """ 267 return ( 268 split_files_list(self._kwargs.get("include")) or self._parser.get_include() 269 ) 270 271 def _get_depend(self) -> list[Depend]: 272 """ 273 Determine the list of dependencies. 274 275 Priority: 276 1. Dependencies specified on the command line. 277 2. Dependencies specified inside the submitted script. 278 279 The lists are NOT merged. 280 281 Returns: 282 list[Depend]: List of job dependencies. 283 """ 284 return ( 285 Depend.multi_from_str(self._kwargs.get("depend") or "") 286 or self._parser.get_depend() 287 ) 288 289 def _get_account(self) -> str | None: 290 """ 291 Determine the account name to use for the job. 292 293 Returns: 294 str | None: The account name or None if not defined. 295 """ 296 return self._kwargs.get("account") or self._parser.get_account() 297 298 def _get_transfer_mode(self) -> list[TransferMode]: 299 """ 300 Determine the mode specifying when files should be 301 transferred from the working directory to the input directory. 302 303 Priority: 304 1. Transfer modes specified on the command line. 305 2. Transfer modes specified inside the submitted script. 306 307 The lists are NOT merged. 308 309 Returns: 310 list[TransferMode]: List of transfer modes. 311 """ 312 return ( 313 TransferMode.multi_from_str(self._kwargs.get("transfer_mode") or "") 314 or self._parser.get_transfer_mode() 315 ) 316 317 def _get_server(self) -> str | None: 318 """ 319 Determine the batch server to submit the job to. 320 321 Priority: 322 1. Command-line specification 323 2. Batch server specified in the script 324 3. None - the current batch server 325 326 Returns: 327 str | None: The full name of the batch server to use, 328 or `None` for the current batch server. 329 """ 330 if raw := (self._kwargs.get("server") or self._parser.get_server()): 331 return translate_server(raw) 332 333 return None 334 335 def _get_interpreter(self) -> Interpreter | None: 336 """ 337 Determine the interpreter to use for running the script. 338 339 Priority: 340 1. Command-line specification 341 2. Interpreter specified in the script 342 3. None - the default intepreter 343 344 Returns: 345 Interpreter | None: The interpreter to use for running the script 346 or `None` to use the default intepreter. 347 """ 348 if (raw := self._kwargs.get("interpreter")) is not None: 349 return Interpreter.from_str(raw) 350 351 return self._parser.get_interpreter() 352 353 def _get_resubmit_from(self, BatchSystem: AnyBatchClass) -> list[ResubmitHost]: 354 """ 355 Determine the list of resubmission hosts to be used to resubmit loop/continuous job. 356 357 Priority: 358 1. Resubmission hosts specified on the command line. 359 2. Resubmission hosts specified inside the submitted script. 360 3. Resubmission hosts specified in the configuration file. 361 4. Default resubmission hosts provided by the batch system. 362 363 The lists are NOT merged. 364 365 Args: 366 BatchSystem (AnyBatchClass): The batch system used for job submission. 367 368 Returns: 369 list[ResubmitHost]: List of resubmission hosts. 370 """ 371 return ( 372 ResubmitHost.multi_from_str(self._kwargs.get("resubmit_from") or "") 373 or self._parser.get_resubmit_from() 374 or ResubmitHost.multi_from_str(CFG.resubmitter.default_resubmit_hosts or "") 375 or BatchSystem.get_default_resubmit_hosts() 376 )
Factory class to construct a Submitter instance based on parameters from the command-line and from the script itself.
33 def __init__(self, script: Path, **kwargs): 34 """ 35 Initialize the factory with the script, command-line parameters, and additional options. 36 37 Args: 38 script (Path): Path to the script to submit. 39 **kwargs: Keyword arguments from the command line. 40 """ 41 from qq_lib.submit.cli import submit 42 43 self._parser = Parser(script, submit.params) 44 self._script = script 45 self._input_dir = script.parent 46 self._kwargs = kwargs
Initialize the factory with the script, command-line parameters, and additional options.
Arguments:
- script (Path): Path to the script to submit.
- **kwargs: Keyword arguments from the command line.
48 def make_submitter(self) -> Submitter: 49 """ 50 Construct and return a Submitter instance. 51 52 Returns: 53 Submitter: A fully initialized submitter object ready to submit a job. 54 55 Raises: 56 QQError: If required information, such as the submission queue, is missing. 57 """ 58 self._parser.parse() 59 60 BatchSystem = self._get_batch_system() 61 queue = self._get_queue() 62 63 if (job_type := self._get_job_type()) == JobType.LOOP: 64 loop_info = self._get_loop_info() 65 else: 66 # tell the user that any loop job-specific options will be ignored 67 # because the job is not a loop job 68 self._print_warning_if_loop_info_defined(job_type) 69 loop_info = None 70 71 if job_type not in (JobType.LOOP, JobType.CONTINUOUS): 72 # tell the user that 'resubmit_from' will be ignored if the 73 # job type is not 'loop' or 'continuous' 74 self._print_warning_if_resubmit_from_defined(job_type) 75 76 server = self._get_server() 77 78 return Submitter( 79 batch_system=BatchSystem, 80 queue=queue, 81 account=self._get_account(), 82 script=self._script, 83 job_type=job_type, 84 resources=self._get_resources(BatchSystem, queue, server), 85 loop_info=loop_info, 86 exclude=self._get_exclude(), 87 include=self._get_include(), 88 depend=self._get_depend(), 89 transfer_mode=self._get_transfer_mode(), 90 server=server, 91 interpreter=self._get_interpreter(), 92 resubmit_from=self._get_resubmit_from(BatchSystem) 93 if job_type in {JobType.LOOP, JobType.CONTINUOUS} 94 else None, 95 )
Construct and return a Submitter instance.
Returns:
Submitter: A fully initialized submitter object ready to submit a job.
Raises:
- QQError: If required information, such as the submission queue, is missing.
26class Parser: 27 """ 28 Parser for qq job submission options (qq directives) specified in a script. 29 """ 30 31 def __init__(self, script: Path, params: list[Parameter]): 32 """ 33 Initialize the parser. 34 35 Args: 36 script (Path): Path to the qq job script to parse. 37 params (list[Parameter]): List of click Parameter objects defining 38 valid options. Only `GroupedOption` names are considered. 39 """ 40 self._script = script 41 self._known_options = { 42 p.name 43 for p in params 44 if isinstance(p, GroupedOption) and p.name is not None 45 } 46 logger.debug( 47 f"Known options for Parser: {self._known_options} ({len(self._known_options)} options)." 48 ) 49 50 self._options: dict[str, object] = {} 51 52 def parse(self) -> None: 53 """ 54 Extract and parse `qq` options from the script. 55 56 The method processes the script line by line, skipping the first line (shebang). 57 It continues reading until it encounters a line that is not a `qq` directive, 58 is non-empty, and is not a comment. Empty or commented lines are ignored. 59 60 Each valid `qq` line is parsed into key-value pairs, normalized to `snake_case`, 61 and stored in `self._options`. 62 63 Raises: 64 QQError: If the script cannot be read, or if an option line is malformed or 65 contains an unknown option. 66 """ 67 if not self._script.is_file(): 68 raise QQError(f"Could not open '{self._script}' as a file.") 69 70 with self._script.open() as f: 71 # skip the first line (shebang) 72 next(f, None) 73 74 for line in f: 75 stripped = line.strip() 76 if stripped == "": 77 logger.debug("Parser: skipping empty line.") 78 continue # skip empty lines 79 80 # check whether this is a qq command 81 if not re.match(r"#\s*qq", stripped, re.IGNORECASE): 82 if stripped.startswith("#"): 83 logger.debug(f"Parser: skipping commented line '{line}'.") 84 continue # skip commented lines 85 logger.debug(f"Parser: ending parsing at line '{line}'.") 86 break # stop parsing at other lines 87 88 # remove the leading '# qq' and split by whitespace or '=' 89 parts = Parser._strip_and_split(line) 90 if len(parts) < 2: 91 raise QQError( 92 f"Invalid qq submit option line in '{str(self._script)}': {line}." 93 ) 94 95 key, value = parts[-2], parts[-1] 96 snake_case_key = to_snake_case(key) 97 98 # handle workdir and worksize where two forms of the keyword are allowed 99 snake_case_key = snake_case_key.replace("workdir", "work_dir").replace( 100 "worksize", "work_size" 101 ) 102 103 # is this a known option? 104 if snake_case_key in self._known_options: 105 try: 106 self._options[snake_case_key] = int(value) 107 except ValueError: 108 self._options[snake_case_key] = value 109 else: 110 raise QQError( 111 f"Unknown qq submit option '{key}' in '{str(self._script)}': {line.strip()}.\nKnown options are '{' '.join(self._known_options)}'." 112 ) 113 114 logger.debug(f"Parsed options from '{self._script}': {self._options}.") 115 116 def get_batch_system(self) -> type[BatchInterface] | None: 117 """ 118 Return the batch system class specified in the script. 119 120 Returns: 121 type[BatchInterface] | None: The batch system class if specified, otherwise None. 122 """ 123 if (batch_system := self._options.get("batch_system")) is not None: 124 return BatchInterface.from_str(str(batch_system)) 125 126 return None 127 128 def get_queue(self) -> str | None: 129 """ 130 Return the queue specified for the job. 131 132 Returns: 133 str | None: Queue name, or None if not set. 134 """ 135 if (queue := self._options.get("queue")) is not None: 136 return str(queue) 137 return None 138 139 def get_job_type(self) -> JobType | None: 140 """ 141 Return the job type specified in the script. 142 143 Returns: 144 JobType | None: Enum value representing the job type, or None if not set. 145 """ 146 if (job_type := self._options.get("job_type")) is not None: 147 return JobType.from_str(str(job_type)) 148 149 return None 150 151 def get_resources(self) -> Resources: 152 """ 153 Return the job resource specifications parsed from the script. 154 155 Returns: 156 Resources: Resource requirements for the job. 157 """ 158 field_names = {f.name for f in fields(Resources)} 159 # only select fields that are part of Resources 160 return Resources(**{k: v for k, v in self._options.items() if k in field_names}) # ty: ignore[invalid-argument-type] 161 162 def get_exclude(self) -> list[Path]: 163 """ 164 Determine the files to exclude from being copied to the job's working directory. 165 166 Returns: 167 list[Path]: List of excluded file paths. Returns an empty list if none specified. 168 """ 169 if (exclude := self._options.get("exclude")) is not None: 170 return split_files_list(str(exclude)) 171 172 return [] 173 174 def get_include(self) -> list[Path]: 175 """ 176 Determine the files to explicitly copy to the job's working directory. 177 178 Returns: 179 list[Path]: List of included file paths. Returns an empty list if none specified. 180 """ 181 if (include := self._options.get("include")) is not None: 182 return split_files_list(str(include)) 183 184 return [] 185 186 def get_loop_start(self) -> int | None: 187 """ 188 Return the starting cycle number for loop jobs. 189 190 Returns: 191 int | None: Start cycle, or None if not specified. 192 """ 193 if isinstance(loop_start := self._options.get("loop_start"), int): 194 return loop_start 195 return None 196 197 def get_loop_end(self) -> int | None: 198 """ 199 Return the ending cycle number for loop jobs. 200 201 Returns: 202 int | None: End cycle, or None if not specified. 203 """ 204 if isinstance(loop_end := self._options.get("loop_end"), int): 205 return loop_end 206 return None 207 208 def get_archive(self) -> Path | None: 209 """ 210 Return the archive directory path specified in the script. 211 212 Returns: 213 Path | None: Archive directory path, or None if not set. 214 """ 215 if (archive := self._options.get("archive")) is not None: 216 return Path(str(archive)) 217 218 return None 219 220 def get_archive_format(self) -> str | None: 221 """ 222 Return the file naming format used for archived files. 223 224 Returns: 225 str | None: Archive filename format string, or None if not set. 226 """ 227 if (archive_format := self._options.get("archive_format")) is not None: 228 return str(archive_format) 229 return None 230 231 def get_archive_mode(self) -> list[TransferMode]: 232 """ 233 Get the mode specifying when the files should be archived. 234 235 Returns: 236 list[TransferMode]: List of transfer modes. 237 """ 238 if (raw := self._options.get("archive_mode")) is not None: 239 return TransferMode.multi_from_str(str(raw)) 240 241 return [] 242 243 def get_depend(self) -> list[Depend]: 244 """ 245 Return the list of job dependencies. 246 247 Returns: 248 list[Depend]: List of job dependencies. 249 """ 250 if (raw := self._options.get("depend")) is not None: 251 return Depend.multi_from_str(str(raw)) 252 253 return [] 254 255 def get_account(self) -> str | None: 256 """ 257 Get the account name to use for the job. 258 259 Returns: 260 str | None: The account name or None if not defined. 261 """ 262 if (account := self._options.get("account")) is not None: 263 return str(account) 264 265 return None 266 267 def get_transfer_mode(self) -> list[TransferMode]: 268 """ 269 Get the mode specifying when the files should be transferred 270 from the working directory to the input directory. 271 272 Returns: 273 list[TransferMode]: List of transfer modes. 274 """ 275 if (raw := self._options.get("transfer_mode")) is not None: 276 return TransferMode.multi_from_str(str(raw)) 277 278 return [] 279 280 def get_server(self) -> str | None: 281 """ 282 Get the batch server to which the job should be submitted. 283 284 Note that this function returns the raw name of the server 285 as provided by the user. It should be then translated using 286 the `translate_server` function. 287 288 Returns: 289 str | None: The name or shortcut of the batch server or `None` if not specified. 290 """ 291 if (server := self._options.get("server")) is not None: 292 return str(server) 293 294 return None 295 296 def get_interpreter(self) -> Interpreter | None: 297 """ 298 Get the interpreter that should be used to run the script. 299 300 Returns: 301 Interpreter | None: The interpreter or `None` if not specified. 302 """ 303 if (raw := self._options.get("interpreter")) is not None: 304 return Interpreter.from_str(str(raw)) 305 306 return None 307 308 def get_resubmit_from(self) -> list[ResubmitHost]: 309 """ 310 Return the list of resubmission hosts. 311 312 Returns: 313 list[ResubmitHost]: List of job dependencies. 314 """ 315 if (raw := self._options.get("resubmit_from")) is not None: 316 return ResubmitHost.multi_from_str(str(raw)) 317 318 return [] 319 320 @staticmethod 321 def _strip_and_split(string: str) -> list[str]: 322 """ 323 Remove the leading `# qq` directive from a line, extract content before the next `#` 324 (if any), and split the remaining content. 325 326 Args: 327 string (str): Input line to process. 328 329 Returns: 330 list[str]: A list with one or two elements depending on whether a split occurred. 331 """ 332 match = re.search( 333 r"^#\s*qq\s*(.*?)\s*(?:#|$)", string.strip(), flags=re.IGNORECASE 334 ) 335 content = match.group(1).strip() if match else string.strip() 336 337 # split by whitespace or '=' 338 return re.split(r"[=\s]+", content, maxsplit=1)
Parser for qq job submission options (qq directives) specified in a script.
31 def __init__(self, script: Path, params: list[Parameter]): 32 """ 33 Initialize the parser. 34 35 Args: 36 script (Path): Path to the qq job script to parse. 37 params (list[Parameter]): List of click Parameter objects defining 38 valid options. Only `GroupedOption` names are considered. 39 """ 40 self._script = script 41 self._known_options = { 42 p.name 43 for p in params 44 if isinstance(p, GroupedOption) and p.name is not None 45 } 46 logger.debug( 47 f"Known options for Parser: {self._known_options} ({len(self._known_options)} options)." 48 ) 49 50 self._options: dict[str, object] = {}
Initialize the parser.
Arguments:
- script (Path): Path to the qq job script to parse.
- params (list[Parameter]): List of click Parameter objects defining
valid options. Only
GroupedOptionnames are considered.
52 def parse(self) -> None: 53 """ 54 Extract and parse `qq` options from the script. 55 56 The method processes the script line by line, skipping the first line (shebang). 57 It continues reading until it encounters a line that is not a `qq` directive, 58 is non-empty, and is not a comment. Empty or commented lines are ignored. 59 60 Each valid `qq` line is parsed into key-value pairs, normalized to `snake_case`, 61 and stored in `self._options`. 62 63 Raises: 64 QQError: If the script cannot be read, or if an option line is malformed or 65 contains an unknown option. 66 """ 67 if not self._script.is_file(): 68 raise QQError(f"Could not open '{self._script}' as a file.") 69 70 with self._script.open() as f: 71 # skip the first line (shebang) 72 next(f, None) 73 74 for line in f: 75 stripped = line.strip() 76 if stripped == "": 77 logger.debug("Parser: skipping empty line.") 78 continue # skip empty lines 79 80 # check whether this is a qq command 81 if not re.match(r"#\s*qq", stripped, re.IGNORECASE): 82 if stripped.startswith("#"): 83 logger.debug(f"Parser: skipping commented line '{line}'.") 84 continue # skip commented lines 85 logger.debug(f"Parser: ending parsing at line '{line}'.") 86 break # stop parsing at other lines 87 88 # remove the leading '# qq' and split by whitespace or '=' 89 parts = Parser._strip_and_split(line) 90 if len(parts) < 2: 91 raise QQError( 92 f"Invalid qq submit option line in '{str(self._script)}': {line}." 93 ) 94 95 key, value = parts[-2], parts[-1] 96 snake_case_key = to_snake_case(key) 97 98 # handle workdir and worksize where two forms of the keyword are allowed 99 snake_case_key = snake_case_key.replace("workdir", "work_dir").replace( 100 "worksize", "work_size" 101 ) 102 103 # is this a known option? 104 if snake_case_key in self._known_options: 105 try: 106 self._options[snake_case_key] = int(value) 107 except ValueError: 108 self._options[snake_case_key] = value 109 else: 110 raise QQError( 111 f"Unknown qq submit option '{key}' in '{str(self._script)}': {line.strip()}.\nKnown options are '{' '.join(self._known_options)}'." 112 ) 113 114 logger.debug(f"Parsed options from '{self._script}': {self._options}.")
Extract and parse qq options from the script.
The method processes the script line by line, skipping the first line (shebang).
It continues reading until it encounters a line that is not a qq directive,
is non-empty, and is not a comment. Empty or commented lines are ignored.
Each valid qq line is parsed into key-value pairs, normalized to snake_case,
and stored in self._options.
Raises:
- QQError: If the script cannot be read, or if an option line is malformed or contains an unknown option.
116 def get_batch_system(self) -> type[BatchInterface] | None: 117 """ 118 Return the batch system class specified in the script. 119 120 Returns: 121 type[BatchInterface] | None: The batch system class if specified, otherwise None. 122 """ 123 if (batch_system := self._options.get("batch_system")) is not None: 124 return BatchInterface.from_str(str(batch_system)) 125 126 return None
Return the batch system class specified in the script.
Returns:
type[BatchInterface] | None: The batch system class if specified, otherwise None.
128 def get_queue(self) -> str | None: 129 """ 130 Return the queue specified for the job. 131 132 Returns: 133 str | None: Queue name, or None if not set. 134 """ 135 if (queue := self._options.get("queue")) is not None: 136 return str(queue) 137 return None
Return the queue specified for the job.
Returns:
str | None: Queue name, or None if not set.
139 def get_job_type(self) -> JobType | None: 140 """ 141 Return the job type specified in the script. 142 143 Returns: 144 JobType | None: Enum value representing the job type, or None if not set. 145 """ 146 if (job_type := self._options.get("job_type")) is not None: 147 return JobType.from_str(str(job_type)) 148 149 return None
Return the job type specified in the script.
Returns:
JobType | None: Enum value representing the job type, or None if not set.
151 def get_resources(self) -> Resources: 152 """ 153 Return the job resource specifications parsed from the script. 154 155 Returns: 156 Resources: Resource requirements for the job. 157 """ 158 field_names = {f.name for f in fields(Resources)} 159 # only select fields that are part of Resources 160 return Resources(**{k: v for k, v in self._options.items() if k in field_names}) # ty: ignore[invalid-argument-type]
Return the job resource specifications parsed from the script.
Returns:
Resources: Resource requirements for the job.
162 def get_exclude(self) -> list[Path]: 163 """ 164 Determine the files to exclude from being copied to the job's working directory. 165 166 Returns: 167 list[Path]: List of excluded file paths. Returns an empty list if none specified. 168 """ 169 if (exclude := self._options.get("exclude")) is not None: 170 return split_files_list(str(exclude)) 171 172 return []
Determine the files to exclude from being copied to the job's working directory.
Returns:
list[Path]: List of excluded file paths. Returns an empty list if none specified.
174 def get_include(self) -> list[Path]: 175 """ 176 Determine the files to explicitly copy to the job's working directory. 177 178 Returns: 179 list[Path]: List of included file paths. Returns an empty list if none specified. 180 """ 181 if (include := self._options.get("include")) is not None: 182 return split_files_list(str(include)) 183 184 return []
Determine the files to explicitly copy to the job's working directory.
Returns:
list[Path]: List of included file paths. Returns an empty list if none specified.
186 def get_loop_start(self) -> int | None: 187 """ 188 Return the starting cycle number for loop jobs. 189 190 Returns: 191 int | None: Start cycle, or None if not specified. 192 """ 193 if isinstance(loop_start := self._options.get("loop_start"), int): 194 return loop_start 195 return None
Return the starting cycle number for loop jobs.
Returns:
int | None: Start cycle, or None if not specified.
197 def get_loop_end(self) -> int | None: 198 """ 199 Return the ending cycle number for loop jobs. 200 201 Returns: 202 int | None: End cycle, or None if not specified. 203 """ 204 if isinstance(loop_end := self._options.get("loop_end"), int): 205 return loop_end 206 return None
Return the ending cycle number for loop jobs.
Returns:
int | None: End cycle, or None if not specified.
208 def get_archive(self) -> Path | None: 209 """ 210 Return the archive directory path specified in the script. 211 212 Returns: 213 Path | None: Archive directory path, or None if not set. 214 """ 215 if (archive := self._options.get("archive")) is not None: 216 return Path(str(archive)) 217 218 return None
Return the archive directory path specified in the script.
Returns:
Path | None: Archive directory path, or None if not set.
220 def get_archive_format(self) -> str | None: 221 """ 222 Return the file naming format used for archived files. 223 224 Returns: 225 str | None: Archive filename format string, or None if not set. 226 """ 227 if (archive_format := self._options.get("archive_format")) is not None: 228 return str(archive_format) 229 return None
Return the file naming format used for archived files.
Returns:
str | None: Archive filename format string, or None if not set.
231 def get_archive_mode(self) -> list[TransferMode]: 232 """ 233 Get the mode specifying when the files should be archived. 234 235 Returns: 236 list[TransferMode]: List of transfer modes. 237 """ 238 if (raw := self._options.get("archive_mode")) is not None: 239 return TransferMode.multi_from_str(str(raw)) 240 241 return []
Get the mode specifying when the files should be archived.
Returns:
list[TransferMode]: List of transfer modes.
243 def get_depend(self) -> list[Depend]: 244 """ 245 Return the list of job dependencies. 246 247 Returns: 248 list[Depend]: List of job dependencies. 249 """ 250 if (raw := self._options.get("depend")) is not None: 251 return Depend.multi_from_str(str(raw)) 252 253 return []
Return the list of job dependencies.
Returns:
list[Depend]: List of job dependencies.
255 def get_account(self) -> str | None: 256 """ 257 Get the account name to use for the job. 258 259 Returns: 260 str | None: The account name or None if not defined. 261 """ 262 if (account := self._options.get("account")) is not None: 263 return str(account) 264 265 return None
Get the account name to use for the job.
Returns:
str | None: The account name or None if not defined.
267 def get_transfer_mode(self) -> list[TransferMode]: 268 """ 269 Get the mode specifying when the files should be transferred 270 from the working directory to the input directory. 271 272 Returns: 273 list[TransferMode]: List of transfer modes. 274 """ 275 if (raw := self._options.get("transfer_mode")) is not None: 276 return TransferMode.multi_from_str(str(raw)) 277 278 return []
Get the mode specifying when the files should be transferred from the working directory to the input directory.
Returns:
list[TransferMode]: List of transfer modes.
280 def get_server(self) -> str | None: 281 """ 282 Get the batch server to which the job should be submitted. 283 284 Note that this function returns the raw name of the server 285 as provided by the user. It should be then translated using 286 the `translate_server` function. 287 288 Returns: 289 str | None: The name or shortcut of the batch server or `None` if not specified. 290 """ 291 if (server := self._options.get("server")) is not None: 292 return str(server) 293 294 return None
Get the batch server to which the job should be submitted.
Note that this function returns the raw name of the server
as provided by the user. It should be then translated using
the translate_server function.
Returns:
str | None: The name or shortcut of the batch server or
Noneif not specified.
296 def get_interpreter(self) -> Interpreter | None: 297 """ 298 Get the interpreter that should be used to run the script. 299 300 Returns: 301 Interpreter | None: The interpreter or `None` if not specified. 302 """ 303 if (raw := self._options.get("interpreter")) is not None: 304 return Interpreter.from_str(str(raw)) 305 306 return None
Get the interpreter that should be used to run the script.
Returns:
Interpreter | None: The interpreter or
Noneif not specified.
308 def get_resubmit_from(self) -> list[ResubmitHost]: 309 """ 310 Return the list of resubmission hosts. 311 312 Returns: 313 list[ResubmitHost]: List of job dependencies. 314 """ 315 if (raw := self._options.get("resubmit_from")) is not None: 316 return ResubmitHost.multi_from_str(str(raw)) 317 318 return []
Return the list of resubmission hosts.
Returns:
list[ResubmitHost]: List of job dependencies.
37class Submitter: 38 """ 39 Class to submit jobs to a batch system. 40 41 Responsibilities: 42 - Validate that the script exists and has a proper shebang. 43 - Guard against multiple submissions from the same directory. 44 - Set environment variables required for `qq run`. 45 - Create a qq info file for tracking job state and metadata. 46 47 Note that Submitter ignores qq directives in the submitted script. 48 To handle them, you have to build a Submitter using the SubmitterFactory. 49 """ 50 51 def __init__( 52 self, 53 batch_system: AnyBatchClass, 54 queue: str, 55 account: str | None, 56 script: Path, 57 job_type: JobType, 58 resources: Resources, 59 loop_info: LoopInfo | None = None, 60 exclude: list[Path] | None = None, 61 include: list[Path] | None = None, 62 depend: list[Depend] | None = None, 63 transfer_mode: list[TransferMode] | None = None, 64 server: str | None = None, 65 interpreter: Interpreter | None = None, 66 resubmit_from: list[ResubmitHost] | None = None, 67 ): 68 """ 69 Initialize a Submitter instance. 70 71 Args: 72 batch_system (AnyBatchClass): The batch system class implementing 73 the BatchInterface used for job submission. 74 queue (str): The name of the batch system queue to which the job will be submitted. 75 account (str | None): The name of the account to use for the job. 76 script (Path): Path to the job script to submit. 77 job_type (JobType): Type of the job to submit (e.g. standard, loop). 78 resources (Resources): Job resource requirements (e.g., CPUs, memory, walltime). 79 loop_info (LoopInfo | None): Optional information for loop jobs. Pass None if not applicable. 80 exclude (list[Path] | None): Optional list of files which should not be copied to the working directory. 81 Paths are provided relative to the input directory. 82 include (list[Path] | None): Optional list of files which should be copied to the working directory 83 even though they are not part of the job's input directory. 84 Paths are provided either absolute or relative to the input directory. 85 depend (list[Depend] | None): Optional list of job dependencies. 86 transfer_mode (list[TransferMode] | None): Mode specifying when files whould be transferred from the 87 working directory to the input directory. Defaults to [`Success()`]. 88 server (str | None): Optional name of the server to which the job should be submitted. 89 If `None`, the default batch server, as configured by the batch system is used. 90 intepreter (Interpreter | None): Optional interpreter specification to use to execute the script. 91 If not specified, the config default is used. 92 resubmit_from (list[ResubmitHost] | None): List of hosts from which a loop/continuous job should be resubmitted. 93 Must only be specified for loop/continuous jobs! 94 95 Raises: 96 QQError: If the script does not exist or has an invalid shebang line. 97 """ 98 99 self._batch_system = batch_system 100 self._job_type = job_type 101 self._queue = queue 102 self._server = server 103 self._account = account 104 self._loop_info = loop_info 105 self._script = script 106 self._input_dir = logical_resolve(script).parent 107 self._script_name = script.name 108 self._job_name = self._construct_job_name() 109 self._info_file = construct_info_file_path(self._input_dir, self._job_name) 110 self._resources = resources 111 # convert relative paths to absolute paths by prepending the input dir path 112 self._exclude = [self._input_dir / e for e in (exclude or [])] 113 self._include = [ 114 i if i.is_absolute() else self._input_dir / i for i in (include or []) 115 ] 116 self._depend = depend or [] 117 self._transfer_mode = transfer_mode or TransferMode.multi_from_str( 118 CFG.transfer_files_options.default_transfer_mode 119 ) 120 self._interpreter = interpreter 121 self._resubmit_from = resubmit_from or [] 122 123 # script must exist 124 if not self._script.is_file(): 125 raise QQError(f"Script '{script}' does not exist or is not a file.") 126 127 # script must have a valid qq shebang 128 if not self._has_valid_shebang(self._script): 129 raise QQError( 130 f"Script '{self._script}' has an invalid shebang. The first line of the script should be '#!/usr/bin/env -S {CFG.binary_name} run'." 131 ) 132 133 def submit(self, remote: str | None = None) -> str: 134 """ 135 Submit the script to the batch system. 136 137 Sets required environment variables, calls the batch system's 138 job submission mechanism, and creates an info file with job metadata. 139 140 This method is thread-safe, if the submission is done from the current machine. 141 142 Args: 143 remote (str | None): Name of the machine from which the job should be submitted. 144 If `None`, the current machine is used. 145 146 Returns: 147 str: The job ID of the submitted job. 148 149 Raises: 150 QQError: If job submission fails. 151 """ 152 job_id = self._batch_system.job_submit( 153 self._resources, 154 self._queue, 155 self._script, 156 self._job_name, 157 self._depend, 158 self._create_env_vars_dict(), 159 self._account, 160 self._server, 161 remote_host=remote, 162 ) 163 164 # create job qq info file 165 # we create the info file from the current machine no matter 166 # whether we are submiting from the current machine or from the remote machine 167 # the input directory should be available on both concerned machines, 168 # so this should be okay 169 Info( 170 batch_system=self._batch_system, 171 qq_version=qq_lib.__version__, 172 username=getpass.getuser(), 173 job_id=job_id, 174 job_name=self._job_name, 175 script_name=self._script_name, 176 queue=self._queue, 177 job_type=self._job_type, 178 input_machine=socket.getfqdn(remote or ""), 179 input_dir=self._input_dir, 180 job_state=NaiveState.QUEUED, 181 submission_time=datetime.now(), 182 stdout_file=str(Path(self._job_name).with_suffix(CFG.suffixes.stdout)), 183 stderr_file=str(Path(self._job_name).with_suffix(CFG.suffixes.stderr)), 184 resources=self._resources, 185 loop_info=self._loop_info, 186 excluded_files=self._exclude, 187 included_files=self._include, 188 depend=self._depend, 189 account=self._account, 190 transfer_mode=self._transfer_mode, 191 server=self._server, 192 interpreter=self._interpreter, 193 resubmit_from=self._resubmit_from, 194 ).to_file(self._info_file) 195 196 return job_id 197 198 def continues_loop(self) -> bool: 199 """ 200 Determine whether the submitted job is a continuation of a loop/continuous job. 201 202 Returns: 203 bool: True if the job is a valid continuation of a previous loop/continuous job, 204 False otherwise. 205 """ 206 try: 207 # there should only be one info file for both loop jobs (runtime files are archived) 208 # and continuous jobs (runtime files overwrite each other) 209 info_file = get_info_file(self._input_dir) 210 informer = Informer.from_file(info_file) 211 212 if self._loop_job_continues_loop( 213 informer 214 ) or self._continuous_job_continues_loop(informer): 215 logger.debug("Valid loop job with a correct cycle or a continuous job.") 216 return True 217 logger.debug( 218 "Detected info file does not correspond to a resubmittable job." 219 ) 220 return False 221 except QQError as e: 222 logger.debug(f"Could not read an info file: {e}.") 223 return False 224 225 def _loop_job_continues_loop(self, previous: Informer) -> bool: 226 """ 227 Determine whether the submitted job is a continuation of a loop job. 228 229 Args: 230 previous (Informer): Informer associated with the previous job. 231 232 Returns: 233 bool: True if the job is a valid continuation of a previous loop job, False otherwise. 234 """ 235 return ( 236 # both the previous job and the current job must be loop jobs 237 previous.info.loop_info is not None 238 and self._loop_info is not None 239 # previous job must be successfully finished 240 and previous.info.job_state == NaiveState.FINISHED 241 # the cycle of the current job is one more than the cycle of the previous job 242 and previous.info.loop_info.current == self._loop_info.current - 1 243 ) 244 245 def _continuous_job_continues_loop(self, previous: Informer) -> bool: 246 """ 247 Determine whether the submitted job is a continuation of a continuous job. 248 249 Args: 250 previous (Informer): Informer associated with the previous job. 251 252 Returns: 253 bool: True if the job is a valid continuation of a previous continuous job, False otherwise. 254 """ 255 return ( 256 # both the previous and the current job must be continuous jobs 257 previous.info.job_type == JobType.CONTINUOUS 258 and self._job_type == JobType.CONTINUOUS 259 # previous job must be successfully finished 260 and previous.info.job_state == NaiveState.FINISHED 261 ) 262 263 def get_input_dir(self) -> Path: 264 """ 265 Get path to the job's input directory. 266 267 Returns: 268 Path: Path to the job's input directory. 269 """ 270 return self._input_dir 271 272 def get_batch_system(self) -> AnyBatchClass: 273 """Get the batch system used for submiting.""" 274 return self._batch_system 275 276 def get_job_name(self) -> str: 277 """Get the name of the job.""" 278 return self._job_name 279 280 def get_queue(self) -> str: 281 """Get the submission queue.""" 282 return self._queue 283 284 def get_account(self) -> str | None: 285 """Get the user's account.""" 286 return self._account 287 288 def get_script(self) -> Path: 289 """Get absolute (logical) path to the submitted script.""" 290 return self._script 291 292 def get_job_type(self) -> JobType: 293 """Get type of the job.""" 294 return self._job_type 295 296 def get_resources(self) -> Resources: 297 """Get resources requested for the job.""" 298 return self._resources 299 300 def get_loop_info(self) -> LoopInfo | None: 301 """Get loop job information.""" 302 return self._loop_info 303 304 def get_exclude(self) -> list[Path]: 305 """Get a list of excluded files.""" 306 return self._exclude 307 308 def get_include(self) -> list[Path]: 309 """Get a list of included files.""" 310 return self._include 311 312 def get_depend(self) -> list[Depend]: 313 """Get the list of dependencies.""" 314 return self._depend 315 316 def get_transfer_mode(self) -> list[TransferMode]: 317 """Get the list of transfer modes.""" 318 return self._transfer_mode 319 320 def get_server(self) -> str | None: 321 """Get the submission server.""" 322 return self._server 323 324 def get_interpreter(self) -> Interpreter | None: 325 """Get the interpreter to use for running the script.""" 326 return self._interpreter 327 328 def get_resubmit_from(self) -> list[ResubmitHost] | None: 329 """Get the list of hosts to resubmit the job from.""" 330 return self._resubmit_from 331 332 def _create_env_vars_dict(self) -> dict[str, str]: 333 """ 334 Create a dictionary of environment variables provided to qq runtime. 335 336 Returns 337 dict[str, str]: Dictionary of environment variables and their values. 338 """ 339 env_vars = {} 340 341 # propagate qq debug environment 342 if os.environ.get(CFG.env_vars.debug_mode): 343 env_vars[CFG.env_vars.debug_mode] = "true" 344 345 # indicates that the job is running in a qq environment 346 env_vars[CFG.env_vars.guard] = "true" 347 348 # contains a path to the qq info file 349 env_vars[CFG.env_vars.info_file] = str(self._info_file) 350 351 # contains the name of the input host 352 env_vars[CFG.env_vars.input_machine] = socket.getfqdn() 353 354 # contains the name of the used batch system 355 env_vars[CFG.env_vars.batch_system] = str(self._batch_system) 356 357 # contains the path to the input directory 358 env_vars[CFG.env_vars.input_dir] = str(self._input_dir) 359 360 # environment variables for resources 361 nnodes = self._resources.nnodes or 1 362 if ncpus := self._resources.ncpus: 363 env_vars[CFG.env_vars.ncpus] = str(ncpus) 364 elif ncpus_per_node := self._resources.ncpus_per_node: 365 env_vars[CFG.env_vars.ncpus] = str(ncpus_per_node * nnodes) 366 else: 367 env_vars[CFG.env_vars.ncpus] = "1" 368 369 if ngpus := self._resources.ngpus: 370 env_vars[CFG.env_vars.ngpus] = str(ngpus) 371 elif ngpus_per_node := self._resources.ngpus_per_node: 372 env_vars[CFG.env_vars.ngpus] = str(ngpus_per_node * nnodes) 373 else: 374 env_vars[CFG.env_vars.ngpus] = "0" 375 376 env_vars[CFG.env_vars.nnodes] = str(nnodes) 377 env_vars[CFG.env_vars.walltime] = str( 378 hhmmss_to_duration(self._resources.walltime or "00:00:00").total_seconds() 379 / 3600 380 ) 381 382 # loop job-specific environment variables 383 if self._loop_info: 384 env_vars[CFG.env_vars.loop_current] = str(self._loop_info.current) 385 env_vars[CFG.env_vars.loop_start] = str(self._loop_info.start) 386 env_vars[CFG.env_vars.loop_end] = str(self._loop_info.end) 387 env_vars[CFG.env_vars.archive_format] = self._loop_info.archive_format 388 389 # loop job- or continuous job-specific environment variables 390 if self._job_type in [JobType.LOOP, JobType.CONTINUOUS]: 391 env_vars[CFG.env_vars.no_resubmit] = str(CFG.exit_codes.qq_run_no_resubmit) 392 393 return env_vars 394 395 def _has_valid_shebang(self, script: Path) -> bool: 396 """ 397 Verify that the script has a valid shebang for qq run. 398 399 Args: 400 script (Path): Path to the script file. 401 402 Returns: 403 bool: True if the first line starts with '#!' and ends with 'qq run'. 404 """ 405 with Path.open(script) as file: 406 first_line = file.readline() 407 return first_line.startswith("#!") and first_line.strip().endswith( 408 f"{CFG.binary_name} run" 409 ) 410 411 def _construct_job_name(self) -> str: 412 """ 413 Construct the job name for submission. 414 415 Returns: 416 str: The constructed job name. 417 """ 418 # for standard jobs, use script name 419 if not self._loop_info: 420 return self._script_name 421 422 # for loop jobs, use script_name with cycle number 423 return construct_loop_job_name(self._script_name, self._loop_info.current)
Class to submit jobs to a batch system.
Responsibilities:
- Validate that the script exists and has a proper shebang.
- Guard against multiple submissions from the same directory.
- Set environment variables required for
qq run.- Create a qq info file for tracking job state and metadata.
Note that Submitter ignores qq directives in the submitted script. To handle them, you have to build a Submitter using the SubmitterFactory.
51 def __init__( 52 self, 53 batch_system: AnyBatchClass, 54 queue: str, 55 account: str | None, 56 script: Path, 57 job_type: JobType, 58 resources: Resources, 59 loop_info: LoopInfo | None = None, 60 exclude: list[Path] | None = None, 61 include: list[Path] | None = None, 62 depend: list[Depend] | None = None, 63 transfer_mode: list[TransferMode] | None = None, 64 server: str | None = None, 65 interpreter: Interpreter | None = None, 66 resubmit_from: list[ResubmitHost] | None = None, 67 ): 68 """ 69 Initialize a Submitter instance. 70 71 Args: 72 batch_system (AnyBatchClass): The batch system class implementing 73 the BatchInterface used for job submission. 74 queue (str): The name of the batch system queue to which the job will be submitted. 75 account (str | None): The name of the account to use for the job. 76 script (Path): Path to the job script to submit. 77 job_type (JobType): Type of the job to submit (e.g. standard, loop). 78 resources (Resources): Job resource requirements (e.g., CPUs, memory, walltime). 79 loop_info (LoopInfo | None): Optional information for loop jobs. Pass None if not applicable. 80 exclude (list[Path] | None): Optional list of files which should not be copied to the working directory. 81 Paths are provided relative to the input directory. 82 include (list[Path] | None): Optional list of files which should be copied to the working directory 83 even though they are not part of the job's input directory. 84 Paths are provided either absolute or relative to the input directory. 85 depend (list[Depend] | None): Optional list of job dependencies. 86 transfer_mode (list[TransferMode] | None): Mode specifying when files whould be transferred from the 87 working directory to the input directory. Defaults to [`Success()`]. 88 server (str | None): Optional name of the server to which the job should be submitted. 89 If `None`, the default batch server, as configured by the batch system is used. 90 intepreter (Interpreter | None): Optional interpreter specification to use to execute the script. 91 If not specified, the config default is used. 92 resubmit_from (list[ResubmitHost] | None): List of hosts from which a loop/continuous job should be resubmitted. 93 Must only be specified for loop/continuous jobs! 94 95 Raises: 96 QQError: If the script does not exist or has an invalid shebang line. 97 """ 98 99 self._batch_system = batch_system 100 self._job_type = job_type 101 self._queue = queue 102 self._server = server 103 self._account = account 104 self._loop_info = loop_info 105 self._script = script 106 self._input_dir = logical_resolve(script).parent 107 self._script_name = script.name 108 self._job_name = self._construct_job_name() 109 self._info_file = construct_info_file_path(self._input_dir, self._job_name) 110 self._resources = resources 111 # convert relative paths to absolute paths by prepending the input dir path 112 self._exclude = [self._input_dir / e for e in (exclude or [])] 113 self._include = [ 114 i if i.is_absolute() else self._input_dir / i for i in (include or []) 115 ] 116 self._depend = depend or [] 117 self._transfer_mode = transfer_mode or TransferMode.multi_from_str( 118 CFG.transfer_files_options.default_transfer_mode 119 ) 120 self._interpreter = interpreter 121 self._resubmit_from = resubmit_from or [] 122 123 # script must exist 124 if not self._script.is_file(): 125 raise QQError(f"Script '{script}' does not exist or is not a file.") 126 127 # script must have a valid qq shebang 128 if not self._has_valid_shebang(self._script): 129 raise QQError( 130 f"Script '{self._script}' has an invalid shebang. The first line of the script should be '#!/usr/bin/env -S {CFG.binary_name} run'." 131 )
Initialize a Submitter instance.
Arguments:
- batch_system (AnyBatchClass): The batch system class implementing the BatchInterface used for job submission.
- queue (str): The name of the batch system queue to which the job will be submitted.
- account (str | None): The name of the account to use for the job.
- script (Path): Path to the job script to submit.
- job_type (JobType): Type of the job to submit (e.g. standard, loop).
- resources (Resources): Job resource requirements (e.g., CPUs, memory, walltime).
- loop_info (LoopInfo | None): Optional information for loop jobs. Pass None if not applicable.
- exclude (list[Path] | None): Optional list of files which should not be copied to the working directory. Paths are provided relative to the input directory.
- include (list[Path] | None): Optional list of files which should be copied to the working directory even though they are not part of the job's input directory. Paths are provided either absolute or relative to the input directory.
- depend (list[Depend] | None): Optional list of job dependencies.
- transfer_mode (list[TransferMode] | None): Mode specifying when files whould be transferred from the
working directory to the input directory. Defaults to [
Success()]. - server (str | None): Optional name of the server to which the job should be submitted.
If
None, the default batch server, as configured by the batch system is used. - intepreter (Interpreter | None): Optional interpreter specification to use to execute the script. If not specified, the config default is used.
- resubmit_from (list[ResubmitHost] | None): List of hosts from which a loop/continuous job should be resubmitted. Must only be specified for loop/continuous jobs!
Raises:
- QQError: If the script does not exist or has an invalid shebang line.
133 def submit(self, remote: str | None = None) -> str: 134 """ 135 Submit the script to the batch system. 136 137 Sets required environment variables, calls the batch system's 138 job submission mechanism, and creates an info file with job metadata. 139 140 This method is thread-safe, if the submission is done from the current machine. 141 142 Args: 143 remote (str | None): Name of the machine from which the job should be submitted. 144 If `None`, the current machine is used. 145 146 Returns: 147 str: The job ID of the submitted job. 148 149 Raises: 150 QQError: If job submission fails. 151 """ 152 job_id = self._batch_system.job_submit( 153 self._resources, 154 self._queue, 155 self._script, 156 self._job_name, 157 self._depend, 158 self._create_env_vars_dict(), 159 self._account, 160 self._server, 161 remote_host=remote, 162 ) 163 164 # create job qq info file 165 # we create the info file from the current machine no matter 166 # whether we are submiting from the current machine or from the remote machine 167 # the input directory should be available on both concerned machines, 168 # so this should be okay 169 Info( 170 batch_system=self._batch_system, 171 qq_version=qq_lib.__version__, 172 username=getpass.getuser(), 173 job_id=job_id, 174 job_name=self._job_name, 175 script_name=self._script_name, 176 queue=self._queue, 177 job_type=self._job_type, 178 input_machine=socket.getfqdn(remote or ""), 179 input_dir=self._input_dir, 180 job_state=NaiveState.QUEUED, 181 submission_time=datetime.now(), 182 stdout_file=str(Path(self._job_name).with_suffix(CFG.suffixes.stdout)), 183 stderr_file=str(Path(self._job_name).with_suffix(CFG.suffixes.stderr)), 184 resources=self._resources, 185 loop_info=self._loop_info, 186 excluded_files=self._exclude, 187 included_files=self._include, 188 depend=self._depend, 189 account=self._account, 190 transfer_mode=self._transfer_mode, 191 server=self._server, 192 interpreter=self._interpreter, 193 resubmit_from=self._resubmit_from, 194 ).to_file(self._info_file) 195 196 return job_id
Submit the script to the batch system.
Sets required environment variables, calls the batch system's job submission mechanism, and creates an info file with job metadata.
This method is thread-safe, if the submission is done from the current machine.
Arguments:
- remote (str | None): Name of the machine from which the job should be submitted.
If
None, the current machine is used.
Returns:
str: The job ID of the submitted job.
Raises:
- QQError: If job submission fails.
198 def continues_loop(self) -> bool: 199 """ 200 Determine whether the submitted job is a continuation of a loop/continuous job. 201 202 Returns: 203 bool: True if the job is a valid continuation of a previous loop/continuous job, 204 False otherwise. 205 """ 206 try: 207 # there should only be one info file for both loop jobs (runtime files are archived) 208 # and continuous jobs (runtime files overwrite each other) 209 info_file = get_info_file(self._input_dir) 210 informer = Informer.from_file(info_file) 211 212 if self._loop_job_continues_loop( 213 informer 214 ) or self._continuous_job_continues_loop(informer): 215 logger.debug("Valid loop job with a correct cycle or a continuous job.") 216 return True 217 logger.debug( 218 "Detected info file does not correspond to a resubmittable job." 219 ) 220 return False 221 except QQError as e: 222 logger.debug(f"Could not read an info file: {e}.") 223 return False
Determine whether the submitted job is a continuation of a loop/continuous job.
Returns:
bool: True if the job is a valid continuation of a previous loop/continuous job, False otherwise.
263 def get_input_dir(self) -> Path: 264 """ 265 Get path to the job's input directory. 266 267 Returns: 268 Path: Path to the job's input directory. 269 """ 270 return self._input_dir
Get path to the job's input directory.
Returns:
Path: Path to the job's input directory.
272 def get_batch_system(self) -> AnyBatchClass: 273 """Get the batch system used for submiting.""" 274 return self._batch_system
Get the batch system used for submiting.
288 def get_script(self) -> Path: 289 """Get absolute (logical) path to the submitted script.""" 290 return self._script
Get absolute (logical) path to the submitted script.
296 def get_resources(self) -> Resources: 297 """Get resources requested for the job.""" 298 return self._resources
Get resources requested for the job.
300 def get_loop_info(self) -> LoopInfo | None: 301 """Get loop job information.""" 302 return self._loop_info
Get loop job information.
304 def get_exclude(self) -> list[Path]: 305 """Get a list of excluded files.""" 306 return self._exclude
Get a list of excluded files.
308 def get_include(self) -> list[Path]: 309 """Get a list of included files.""" 310 return self._include
Get a list of included files.
312 def get_depend(self) -> list[Depend]: 313 """Get the list of dependencies.""" 314 return self._depend
Get the list of dependencies.
316 def get_transfer_mode(self) -> list[TransferMode]: 317 """Get the list of transfer modes.""" 318 return self._transfer_mode
Get the list of transfer modes.
320 def get_server(self) -> str | None: 321 """Get the submission server.""" 322 return self._server
Get the submission server.
324 def get_interpreter(self) -> Interpreter | None: 325 """Get the interpreter to use for running the script.""" 326 return self._interpreter
Get the interpreter to use for running the script.