qq_lib.submit

Utilities for submitting qq jobs.

This module integrates three main components - Parser, Submitter, and SubmitterFactory - that collectively interpret submission settings, construct job metadata, and hand off execution to the batch system.

Parser extracts qq directives declared inside the script (via # qq ... lines) and normalizes them into structured submission parameters such as resources, dependencies, file include/exclude rules and loop-job fields.

Submitter validates the script, prevents accidental duplicate submissions, constructs the qq info file, sets up environment variables needed by qq run, and finally invokes the batch system's submission mechanism.

SubmitterFactory coordinates command-line arguments with script-embedded directives, merges and resolves resources, determines the batch system and queue, constructs loop-job settings, and ultimately produces a fully configured Submitter. It ensures a consistent and unified interpretation of submission parameters from all available sources.

 1# Released under MIT License.
 2# Copyright (c) 2025-2026 Ladislav Bartos and Robert Vacha Lab
 3
 4"""
 5Utilities for submitting qq jobs.
 6
 7This module integrates three main components - `Parser`, `Submitter`, and
 8`SubmitterFactory` - that collectively interpret submission settings, construct
 9job metadata, and hand off execution to the batch system.
10
11`Parser` extracts qq directives declared inside the script (via `# qq ...`
12lines) and normalizes them into structured submission parameters such as
13resources, dependencies, file include/exclude rules and loop-job fields.
14
15`Submitter` validates the script, prevents accidental duplicate submissions,
16constructs the qq info file, sets up environment variables needed by `qq run`,
17and finally invokes the batch system's submission mechanism.
18
19`SubmitterFactory` coordinates command-line arguments with script-embedded
20directives, merges and resolves resources, determines the batch system and
21queue, constructs loop-job settings, and ultimately produces a fully configured
22`Submitter`. It ensures a consistent and unified interpretation of submission
23parameters from all available sources.
24"""
25
26from .factory import SubmitterFactory
27from .parser import Parser
28from .submitter import Submitter
29
30__all__ = ["SubmitterFactory", "Parser", "Submitter"]
class SubmitterFactory:
 27class SubmitterFactory:
 28    """
 29    Factory class to construct a Submitter instance based on parameters from
 30    the command-line and from the script itself.
 31    """
 32
 33    def __init__(self, script: Path, **kwargs):
 34        """
 35        Initialize the factory with the script, command-line parameters, and additional options.
 36
 37        Args:
 38            script (Path): Path to the script to submit.
 39            **kwargs: Keyword arguments from the command line.
 40        """
 41        from qq_lib.submit.cli import submit
 42
 43        self._parser = Parser(script, submit.params)
 44        self._script = script
 45        self._input_dir = script.parent
 46        self._kwargs = kwargs
 47
 48    def make_submitter(self) -> Submitter:
 49        """
 50        Construct and return a Submitter instance.
 51
 52        Returns:
 53            Submitter: A fully initialized submitter object ready to submit a job.
 54
 55        Raises:
 56            QQError: If required information, such as the submission queue, is missing.
 57        """
 58        self._parser.parse()
 59
 60        BatchSystem = self._get_batch_system()
 61        queue = self._get_queue()
 62
 63        if (job_type := self._get_job_type()) == JobType.LOOP:
 64            loop_info = self._get_loop_info()
 65        else:
 66            # tell the user that any loop job-specific options will be ignored
 67            # because the job is not a loop job
 68            self._print_warning_if_loop_info_defined(job_type)
 69            loop_info = None
 70
 71        if job_type not in (JobType.LOOP, JobType.CONTINUOUS):
 72            # tell the user that 'resubmit_from' will be ignored if the
 73            # job type is not 'loop' or 'continuous'
 74            self._print_warning_if_resubmit_from_defined(job_type)
 75
 76        server = self._get_server()
 77
 78        return Submitter(
 79            batch_system=BatchSystem,
 80            queue=queue,
 81            account=self._get_account(),
 82            script=self._script,
 83            job_type=job_type,
 84            resources=self._get_resources(BatchSystem, queue, server),
 85            loop_info=loop_info,
 86            exclude=self._get_exclude(),
 87            include=self._get_include(),
 88            depend=self._get_depend(),
 89            transfer_mode=self._get_transfer_mode(),
 90            server=server,
 91            interpreter=self._get_interpreter(),
 92            resubmit_from=self._get_resubmit_from(BatchSystem)
 93            if job_type in {JobType.LOOP, JobType.CONTINUOUS}
 94            else None,
 95        )
 96
 97    def _get_batch_system(self) -> type[BatchInterface]:
 98        """
 99        Determine which batch system to use for the job submission.
100
101        Priority:
102            1. Command-line specification
103            2. Batch system specified in the script
104            3. Environment variable
105            4. Guessed batch system
106
107        Returns:
108            type[BatchInterface]: The selected batch system class.
109        """
110        if batch_system := self._kwargs.get("batch_system"):
111            return BatchInterface.from_str(batch_system)
112        return self._parser.get_batch_system() or BatchInterface.from_env_var_or_guess()
113
114    def _get_job_type(self) -> JobType:
115        """
116        Determine the type of job to submit.
117
118        Priority:
119            1. Command-line specification
120            2. Job type specified in the script
121            3. Default to `JobType.STANDARD`
122
123        Returns:
124            JobType: The determined job type.
125        """
126        if job_type := self._kwargs.get("job_type"):
127            return JobType.from_str(job_type)
128        return self._parser.get_job_type() or JobType.STANDARD
129
130    def _get_queue(self) -> str:
131        """
132        Determine the submission queue to use.
133
134        Priority:
135            1. Command-line specification
136            2. Queue specified in the script
137
138        Returns:
139            str: Name of the submission queue.
140
141        Raises:
142            QQError: If no queue is specified either in kwargs or in the script.
143        """
144        if not (queue := self._kwargs.get("queue") or self._parser.get_queue()):
145            raise QQError("Submission queue not specified.")
146        return queue
147
148    def _get_resources(
149        self, BatchSystem: type[BatchInterface], queue: str, server: str | None
150    ) -> Resources:
151        """
152        Get the resource requirements for the job by merging the requirements specified on the command
153        line with requirements specified inside the submitted script.
154
155        The resources are then further modified to conform to the provided `BatchSystem` and submission `queue`.
156
157        Args:
158            BatchSystem (type[BatchInterface]): The batch system class to use.
159            queue (str): The submission queue.
160            server (str | None): The submission server. `None` = the current main server.
161
162        Returns:
163            Resources: A merged Resources object containing the final resource requirements.
164        """
165        field_names = {f.name for f in fields(Resources)}
166        command_line_resources = Resources(
167            **{k: v for k, v in self._kwargs.items() if k in field_names}
168        )
169
170        return BatchSystem.transform_resources(
171            queue,
172            server,
173            Resources.merge_resources(
174                command_line_resources, self._parser.get_resources()
175            ),
176        )
177
178    def _get_loop_info(self) -> LoopInfo:
179        """
180        Construct LoopInfo holding information about the loop job.
181
182        Returns:
183            LoopInfo: An object containing loop job parameters.
184
185        Raises:
186            QQError: If required loop job parameters are missing or invalid.
187        """
188        return LoopInfo(
189            self._kwargs.get("loop_start") or self._parser.get_loop_start() or 1,
190            self._kwargs.get("loop_end") or self._parser.get_loop_end(),
191            self._input_dir
192            / (
193                self._kwargs.get("archive")
194                or self._parser.get_archive()
195                or CFG.loop_jobs.archive_dir
196            ),
197            self._kwargs.get("archive_format")
198            or self._parser.get_archive_format()
199            or CFG.loop_jobs.archive_format,
200            input_dir=self._input_dir,
201            archive_mode=TransferMode.multi_from_str(
202                self._kwargs.get("archive_mode") or ""
203            )
204            or self._parser.get_archive_mode(),
205        )
206
207    def _print_warning_if_loop_info_defined(self, job_type: JobType) -> None:
208        """
209        Print warning(s) if any of the loop-job specific options
210        are defined either on the command line or in the script itself.
211        This should only be used if the job is not a loop job.
212        """
213        for option, parser_func in zip(
214            ["loop_start", "loop_end", "archive", "archive_format", "archive_mode"],
215            [
216                self._parser.get_loop_start,
217                self._parser.get_loop_end,
218                self._parser.get_archive,
219                self._parser.get_archive_format,
220                self._parser.get_archive_mode,
221            ],
222        ):
223            if self._kwargs.get(option) or parser_func():
224                logger.warning(
225                    f"Option '{option}' is specified but job type is '{str(job_type)}', not 'loop' - '{option}' will be ignored."
226                )
227
228    def _print_warning_if_resubmit_from_defined(self, job_type: JobType) -> None:
229        """
230        Print a warning if the 'resubmit_from' option is defined but the job type is not 'loop' or 'continuous'.
231        """
232        if self._kwargs.get("resubmit_from") or self._parser.get_resubmit_from():
233            logger.warning(
234                f"Option 'resubmit_from' is specified but job type is '{str(job_type)}', not 'loop' or 'continuous' - 'resubmit_from' will be ignored."
235            )
236
237    def _get_exclude(self) -> list[Path]:
238        """
239        Determine the files to exclude from being copied to the job's working directory.
240
241        Priority:
242            1. Excluded files specified on the command line.
243            2. Excluded files specified inside the submitted script.
244
245        The lists are NOT merged.
246
247        Returns:
248            list[Path]: List of relative file paths to exclude.
249        """
250        return (
251            split_files_list(self._kwargs.get("exclude")) or self._parser.get_exclude()
252        )
253
254    def _get_include(self) -> list[Path]:
255        """
256        Determine the files to explicitly copy to the job's working directory.
257
258        Priority:
259            1. Included files specified on the command line.
260            2. Included files specified inside the submitted script.
261
262        The lists are NOT merged.
263
264        Returns:
265            list[Path]: List of file paths to include.
266        """
267        return (
268            split_files_list(self._kwargs.get("include")) or self._parser.get_include()
269        )
270
271    def _get_depend(self) -> list[Depend]:
272        """
273        Determine the list of dependencies.
274
275        Priority:
276            1. Dependencies specified on the command line.
277            2. Dependencies specified inside the submitted script.
278
279        The lists are NOT merged.
280
281        Returns:
282            list[Depend]: List of job dependencies.
283        """
284        return (
285            Depend.multi_from_str(self._kwargs.get("depend") or "")
286            or self._parser.get_depend()
287        )
288
289    def _get_account(self) -> str | None:
290        """
291        Determine the account name to use for the job.
292
293        Returns:
294            str | None: The account name or None if not defined.
295        """
296        return self._kwargs.get("account") or self._parser.get_account()
297
298    def _get_transfer_mode(self) -> list[TransferMode]:
299        """
300        Determine the mode specifying when files should be
301        transferred from the working directory to the input directory.
302
303        Priority:
304            1. Transfer modes specified on the command line.
305            2. Transfer modes specified inside the submitted script.
306
307        The lists are NOT merged.
308
309        Returns:
310            list[TransferMode]: List of transfer modes.
311        """
312        return (
313            TransferMode.multi_from_str(self._kwargs.get("transfer_mode") or "")
314            or self._parser.get_transfer_mode()
315        )
316
317    def _get_server(self) -> str | None:
318        """
319        Determine the batch server to submit the job to.
320
321        Priority:
322            1. Command-line specification
323            2. Batch server specified in the script
324            3. None - the current batch server
325
326        Returns:
327            str | None: The full name of the batch server to use,
328                or `None` for the current batch server.
329        """
330        if raw := (self._kwargs.get("server") or self._parser.get_server()):
331            return translate_server(raw)
332
333        return None
334
335    def _get_interpreter(self) -> Interpreter | None:
336        """
337        Determine the interpreter to use for running the script.
338
339        Priority:
340            1. Command-line specification
341            2. Interpreter specified in the script
342            3. None - the default intepreter
343
344        Returns:
345            Interpreter | None: The interpreter to use for running the script
346                or `None` to use the default intepreter.
347        """
348        if (raw := self._kwargs.get("interpreter")) is not None:
349            return Interpreter.from_str(raw)
350
351        return self._parser.get_interpreter()
352
353    def _get_resubmit_from(self, BatchSystem: AnyBatchClass) -> list[ResubmitHost]:
354        """
355        Determine the list of resubmission hosts to be used to resubmit loop/continuous job.
356
357        Priority:
358            1. Resubmission hosts specified on the command line.
359            2. Resubmission hosts specified inside the submitted script.
360            3. Resubmission hosts specified in the configuration file.
361            4. Default resubmission hosts provided by the batch system.
362
363        The lists are NOT merged.
364
365        Args:
366            BatchSystem (AnyBatchClass): The batch system used for job submission.
367
368        Returns:
369            list[ResubmitHost]: List of resubmission hosts.
370        """
371        return (
372            ResubmitHost.multi_from_str(self._kwargs.get("resubmit_from") or "")
373            or self._parser.get_resubmit_from()
374            or ResubmitHost.multi_from_str(CFG.resubmitter.default_resubmit_hosts or "")
375            or BatchSystem.get_default_resubmit_hosts()
376        )

Factory class to construct a Submitter instance based on parameters from the command-line and from the script itself.

SubmitterFactory(script: pathlib._local.Path, **kwargs)
33    def __init__(self, script: Path, **kwargs):
34        """
35        Initialize the factory with the script, command-line parameters, and additional options.
36
37        Args:
38            script (Path): Path to the script to submit.
39            **kwargs: Keyword arguments from the command line.
40        """
41        from qq_lib.submit.cli import submit
42
43        self._parser = Parser(script, submit.params)
44        self._script = script
45        self._input_dir = script.parent
46        self._kwargs = kwargs

Initialize the factory with the script, command-line parameters, and additional options.

Arguments:
  • script (Path): Path to the script to submit.
  • **kwargs: Keyword arguments from the command line.
def make_submitter(self) -> Submitter:
48    def make_submitter(self) -> Submitter:
49        """
50        Construct and return a Submitter instance.
51
52        Returns:
53            Submitter: A fully initialized submitter object ready to submit a job.
54
55        Raises:
56            QQError: If required information, such as the submission queue, is missing.
57        """
58        self._parser.parse()
59
60        BatchSystem = self._get_batch_system()
61        queue = self._get_queue()
62
63        if (job_type := self._get_job_type()) == JobType.LOOP:
64            loop_info = self._get_loop_info()
65        else:
66            # tell the user that any loop job-specific options will be ignored
67            # because the job is not a loop job
68            self._print_warning_if_loop_info_defined(job_type)
69            loop_info = None
70
71        if job_type not in (JobType.LOOP, JobType.CONTINUOUS):
72            # tell the user that 'resubmit_from' will be ignored if the
73            # job type is not 'loop' or 'continuous'
74            self._print_warning_if_resubmit_from_defined(job_type)
75
76        server = self._get_server()
77
78        return Submitter(
79            batch_system=BatchSystem,
80            queue=queue,
81            account=self._get_account(),
82            script=self._script,
83            job_type=job_type,
84            resources=self._get_resources(BatchSystem, queue, server),
85            loop_info=loop_info,
86            exclude=self._get_exclude(),
87            include=self._get_include(),
88            depend=self._get_depend(),
89            transfer_mode=self._get_transfer_mode(),
90            server=server,
91            interpreter=self._get_interpreter(),
92            resubmit_from=self._get_resubmit_from(BatchSystem)
93            if job_type in {JobType.LOOP, JobType.CONTINUOUS}
94            else None,
95        )

Construct and return a Submitter instance.

Returns:

Submitter: A fully initialized submitter object ready to submit a job.

Raises:
  • QQError: If required information, such as the submission queue, is missing.
class Parser:
 26class Parser:
 27    """
 28    Parser for qq job submission options (qq directives) specified in a script.
 29    """
 30
 31    def __init__(self, script: Path, params: list[Parameter]):
 32        """
 33        Initialize the parser.
 34
 35        Args:
 36            script (Path): Path to the qq job script to parse.
 37            params (list[Parameter]): List of click Parameter objects defining
 38                valid options. Only `GroupedOption` names are considered.
 39        """
 40        self._script = script
 41        self._known_options = {
 42            p.name
 43            for p in params
 44            if isinstance(p, GroupedOption) and p.name is not None
 45        }
 46        logger.debug(
 47            f"Known options for Parser: {self._known_options} ({len(self._known_options)} options)."
 48        )
 49
 50        self._options: dict[str, object] = {}
 51
 52    def parse(self) -> None:
 53        """
 54        Extract and parse `qq` options from the script.
 55
 56        The method processes the script line by line, skipping the first line (shebang).
 57        It continues reading until it encounters a line that is not a `qq` directive,
 58        is non-empty, and is not a comment. Empty or commented lines are ignored.
 59
 60        Each valid `qq` line is parsed into key-value pairs, normalized to `snake_case`,
 61        and stored in `self._options`.
 62
 63        Raises:
 64            QQError: If the script cannot be read, or if an option line is malformed or
 65                    contains an unknown option.
 66        """
 67        if not self._script.is_file():
 68            raise QQError(f"Could not open '{self._script}' as a file.")
 69
 70        with self._script.open() as f:
 71            # skip the first line (shebang)
 72            next(f, None)
 73
 74            for line in f:
 75                stripped = line.strip()
 76                if stripped == "":
 77                    logger.debug("Parser: skipping empty line.")
 78                    continue  # skip empty lines
 79
 80                # check whether this is a qq command
 81                if not re.match(r"#\s*qq", stripped, re.IGNORECASE):
 82                    if stripped.startswith("#"):
 83                        logger.debug(f"Parser: skipping commented line '{line}'.")
 84                        continue  # skip commented lines
 85                    logger.debug(f"Parser: ending parsing at line '{line}'.")
 86                    break  # stop parsing at other lines
 87
 88                # remove the leading '# qq' and split by whitespace or '='
 89                parts = Parser._strip_and_split(line)
 90                if len(parts) < 2:
 91                    raise QQError(
 92                        f"Invalid qq submit option line in '{str(self._script)}': {line}."
 93                    )
 94
 95                key, value = parts[-2], parts[-1]
 96                snake_case_key = to_snake_case(key)
 97
 98                # handle workdir and worksize where two forms of the keyword are allowed
 99                snake_case_key = snake_case_key.replace("workdir", "work_dir").replace(
100                    "worksize", "work_size"
101                )
102
103                # is this a known option?
104                if snake_case_key in self._known_options:
105                    try:
106                        self._options[snake_case_key] = int(value)
107                    except ValueError:
108                        self._options[snake_case_key] = value
109                else:
110                    raise QQError(
111                        f"Unknown qq submit option '{key}' in '{str(self._script)}': {line.strip()}.\nKnown options are '{' '.join(self._known_options)}'."
112                    )
113
114        logger.debug(f"Parsed options from '{self._script}': {self._options}.")
115
116    def get_batch_system(self) -> type[BatchInterface] | None:
117        """
118        Return the batch system class specified in the script.
119
120        Returns:
121            type[BatchInterface] | None: The batch system class if specified, otherwise None.
122        """
123        if (batch_system := self._options.get("batch_system")) is not None:
124            return BatchInterface.from_str(str(batch_system))
125
126        return None
127
128    def get_queue(self) -> str | None:
129        """
130        Return the queue specified for the job.
131
132        Returns:
133            str | None: Queue name, or None if not set.
134        """
135        if (queue := self._options.get("queue")) is not None:
136            return str(queue)
137        return None
138
139    def get_job_type(self) -> JobType | None:
140        """
141        Return the job type specified in the script.
142
143        Returns:
144            JobType | None: Enum value representing the job type, or None if not set.
145        """
146        if (job_type := self._options.get("job_type")) is not None:
147            return JobType.from_str(str(job_type))
148
149        return None
150
151    def get_resources(self) -> Resources:
152        """
153        Return the job resource specifications parsed from the script.
154
155        Returns:
156            Resources: Resource requirements for the job.
157        """
158        field_names = {f.name for f in fields(Resources)}
159        # only select fields that are part of Resources
160        return Resources(**{k: v for k, v in self._options.items() if k in field_names})  # ty: ignore[invalid-argument-type]
161
162    def get_exclude(self) -> list[Path]:
163        """
164        Determine the files to exclude from being copied to the job's working directory.
165
166        Returns:
167            list[Path]: List of excluded file paths. Returns an empty list if none specified.
168        """
169        if (exclude := self._options.get("exclude")) is not None:
170            return split_files_list(str(exclude))
171
172        return []
173
174    def get_include(self) -> list[Path]:
175        """
176        Determine the files to explicitly copy to the job's working directory.
177
178        Returns:
179            list[Path]: List of included file paths. Returns an empty list if none specified.
180        """
181        if (include := self._options.get("include")) is not None:
182            return split_files_list(str(include))
183
184        return []
185
186    def get_loop_start(self) -> int | None:
187        """
188        Return the starting cycle number for loop jobs.
189
190        Returns:
191            int | None: Start cycle, or None if not specified.
192        """
193        if isinstance(loop_start := self._options.get("loop_start"), int):
194            return loop_start
195        return None
196
197    def get_loop_end(self) -> int | None:
198        """
199        Return the ending cycle number for loop jobs.
200
201        Returns:
202            int | None: End cycle, or None if not specified.
203        """
204        if isinstance(loop_end := self._options.get("loop_end"), int):
205            return loop_end
206        return None
207
208    def get_archive(self) -> Path | None:
209        """
210        Return the archive directory path specified in the script.
211
212        Returns:
213            Path | None: Archive directory path, or None if not set.
214        """
215        if (archive := self._options.get("archive")) is not None:
216            return Path(str(archive))
217
218        return None
219
220    def get_archive_format(self) -> str | None:
221        """
222        Return the file naming format used for archived files.
223
224        Returns:
225            str | None: Archive filename format string, or None if not set.
226        """
227        if (archive_format := self._options.get("archive_format")) is not None:
228            return str(archive_format)
229        return None
230
231    def get_archive_mode(self) -> list[TransferMode]:
232        """
233        Get the mode specifying when the files should be archived.
234
235        Returns:
236            list[TransferMode]: List of transfer modes.
237        """
238        if (raw := self._options.get("archive_mode")) is not None:
239            return TransferMode.multi_from_str(str(raw))
240
241        return []
242
243    def get_depend(self) -> list[Depend]:
244        """
245        Return the list of job dependencies.
246
247        Returns:
248            list[Depend]: List of job dependencies.
249        """
250        if (raw := self._options.get("depend")) is not None:
251            return Depend.multi_from_str(str(raw))
252
253        return []
254
255    def get_account(self) -> str | None:
256        """
257        Get the account name to use for the job.
258
259        Returns:
260            str | None: The account name or None if not defined.
261        """
262        if (account := self._options.get("account")) is not None:
263            return str(account)
264
265        return None
266
267    def get_transfer_mode(self) -> list[TransferMode]:
268        """
269        Get the mode specifying when the files should be transferred
270        from the working directory to the input directory.
271
272        Returns:
273            list[TransferMode]: List of transfer modes.
274        """
275        if (raw := self._options.get("transfer_mode")) is not None:
276            return TransferMode.multi_from_str(str(raw))
277
278        return []
279
280    def get_server(self) -> str | None:
281        """
282        Get the batch server to which the job should be submitted.
283
284        Note that this function returns the raw name of the server
285        as provided by the user. It should be then translated using
286        the `translate_server` function.
287
288        Returns:
289            str | None: The name or shortcut of the batch server or `None` if not specified.
290        """
291        if (server := self._options.get("server")) is not None:
292            return str(server)
293
294        return None
295
296    def get_interpreter(self) -> Interpreter | None:
297        """
298        Get the interpreter that should be used to run the script.
299
300        Returns:
301            Interpreter | None: The interpreter or `None` if not specified.
302        """
303        if (raw := self._options.get("interpreter")) is not None:
304            return Interpreter.from_str(str(raw))
305
306        return None
307
308    def get_resubmit_from(self) -> list[ResubmitHost]:
309        """
310        Return the list of resubmission hosts.
311
312        Returns:
313            list[ResubmitHost]: List of job dependencies.
314        """
315        if (raw := self._options.get("resubmit_from")) is not None:
316            return ResubmitHost.multi_from_str(str(raw))
317
318        return []
319
320    @staticmethod
321    def _strip_and_split(string: str) -> list[str]:
322        """
323        Remove the leading `# qq` directive from a line, extract content before the next `#`
324        (if any), and split the remaining content.
325
326        Args:
327            string (str): Input line to process.
328
329        Returns:
330            list[str]: A list with one or two elements depending on whether a split occurred.
331        """
332        match = re.search(
333            r"^#\s*qq\s*(.*?)\s*(?:#|$)", string.strip(), flags=re.IGNORECASE
334        )
335        content = match.group(1).strip() if match else string.strip()
336
337        # split by whitespace or '='
338        return re.split(r"[=\s]+", content, maxsplit=1)

Parser for qq job submission options (qq directives) specified in a script.

Parser(script: pathlib._local.Path, params: list[click.core.Parameter])
31    def __init__(self, script: Path, params: list[Parameter]):
32        """
33        Initialize the parser.
34
35        Args:
36            script (Path): Path to the qq job script to parse.
37            params (list[Parameter]): List of click Parameter objects defining
38                valid options. Only `GroupedOption` names are considered.
39        """
40        self._script = script
41        self._known_options = {
42            p.name
43            for p in params
44            if isinstance(p, GroupedOption) and p.name is not None
45        }
46        logger.debug(
47            f"Known options for Parser: {self._known_options} ({len(self._known_options)} options)."
48        )
49
50        self._options: dict[str, object] = {}

Initialize the parser.

Arguments:
  • script (Path): Path to the qq job script to parse.
  • params (list[Parameter]): List of click Parameter objects defining valid options. Only GroupedOption names are considered.
def parse(self) -> None:
 52    def parse(self) -> None:
 53        """
 54        Extract and parse `qq` options from the script.
 55
 56        The method processes the script line by line, skipping the first line (shebang).
 57        It continues reading until it encounters a line that is not a `qq` directive,
 58        is non-empty, and is not a comment. Empty or commented lines are ignored.
 59
 60        Each valid `qq` line is parsed into key-value pairs, normalized to `snake_case`,
 61        and stored in `self._options`.
 62
 63        Raises:
 64            QQError: If the script cannot be read, or if an option line is malformed or
 65                    contains an unknown option.
 66        """
 67        if not self._script.is_file():
 68            raise QQError(f"Could not open '{self._script}' as a file.")
 69
 70        with self._script.open() as f:
 71            # skip the first line (shebang)
 72            next(f, None)
 73
 74            for line in f:
 75                stripped = line.strip()
 76                if stripped == "":
 77                    logger.debug("Parser: skipping empty line.")
 78                    continue  # skip empty lines
 79
 80                # check whether this is a qq command
 81                if not re.match(r"#\s*qq", stripped, re.IGNORECASE):
 82                    if stripped.startswith("#"):
 83                        logger.debug(f"Parser: skipping commented line '{line}'.")
 84                        continue  # skip commented lines
 85                    logger.debug(f"Parser: ending parsing at line '{line}'.")
 86                    break  # stop parsing at other lines
 87
 88                # remove the leading '# qq' and split by whitespace or '='
 89                parts = Parser._strip_and_split(line)
 90                if len(parts) < 2:
 91                    raise QQError(
 92                        f"Invalid qq submit option line in '{str(self._script)}': {line}."
 93                    )
 94
 95                key, value = parts[-2], parts[-1]
 96                snake_case_key = to_snake_case(key)
 97
 98                # handle workdir and worksize where two forms of the keyword are allowed
 99                snake_case_key = snake_case_key.replace("workdir", "work_dir").replace(
100                    "worksize", "work_size"
101                )
102
103                # is this a known option?
104                if snake_case_key in self._known_options:
105                    try:
106                        self._options[snake_case_key] = int(value)
107                    except ValueError:
108                        self._options[snake_case_key] = value
109                else:
110                    raise QQError(
111                        f"Unknown qq submit option '{key}' in '{str(self._script)}': {line.strip()}.\nKnown options are '{' '.join(self._known_options)}'."
112                    )
113
114        logger.debug(f"Parsed options from '{self._script}': {self._options}.")

Extract and parse qq options from the script.

The method processes the script line by line, skipping the first line (shebang). It continues reading until it encounters a line that is not a qq directive, is non-empty, and is not a comment. Empty or commented lines are ignored.

Each valid qq line is parsed into key-value pairs, normalized to snake_case, and stored in self._options.

Raises:
  • QQError: If the script cannot be read, or if an option line is malformed or contains an unknown option.
def get_batch_system(self) -> type[qq_lib.batch.interface.BatchInterface] | None:
116    def get_batch_system(self) -> type[BatchInterface] | None:
117        """
118        Return the batch system class specified in the script.
119
120        Returns:
121            type[BatchInterface] | None: The batch system class if specified, otherwise None.
122        """
123        if (batch_system := self._options.get("batch_system")) is not None:
124            return BatchInterface.from_str(str(batch_system))
125
126        return None

Return the batch system class specified in the script.

Returns:

type[BatchInterface] | None: The batch system class if specified, otherwise None.

def get_queue(self) -> str | None:
128    def get_queue(self) -> str | None:
129        """
130        Return the queue specified for the job.
131
132        Returns:
133            str | None: Queue name, or None if not set.
134        """
135        if (queue := self._options.get("queue")) is not None:
136            return str(queue)
137        return None

Return the queue specified for the job.

Returns:

str | None: Queue name, or None if not set.

def get_job_type(self) -> qq_lib.properties.job_type.JobType | None:
139    def get_job_type(self) -> JobType | None:
140        """
141        Return the job type specified in the script.
142
143        Returns:
144            JobType | None: Enum value representing the job type, or None if not set.
145        """
146        if (job_type := self._options.get("job_type")) is not None:
147            return JobType.from_str(str(job_type))
148
149        return None

Return the job type specified in the script.

Returns:

JobType | None: Enum value representing the job type, or None if not set.

def get_resources(self) -> qq_lib.properties.resources.Resources:
151    def get_resources(self) -> Resources:
152        """
153        Return the job resource specifications parsed from the script.
154
155        Returns:
156            Resources: Resource requirements for the job.
157        """
158        field_names = {f.name for f in fields(Resources)}
159        # only select fields that are part of Resources
160        return Resources(**{k: v for k, v in self._options.items() if k in field_names})  # ty: ignore[invalid-argument-type]

Return the job resource specifications parsed from the script.

Returns:

Resources: Resource requirements for the job.

def get_exclude(self) -> list[pathlib._local.Path]:
162    def get_exclude(self) -> list[Path]:
163        """
164        Determine the files to exclude from being copied to the job's working directory.
165
166        Returns:
167            list[Path]: List of excluded file paths. Returns an empty list if none specified.
168        """
169        if (exclude := self._options.get("exclude")) is not None:
170            return split_files_list(str(exclude))
171
172        return []

Determine the files to exclude from being copied to the job's working directory.

Returns:

list[Path]: List of excluded file paths. Returns an empty list if none specified.

def get_include(self) -> list[pathlib._local.Path]:
174    def get_include(self) -> list[Path]:
175        """
176        Determine the files to explicitly copy to the job's working directory.
177
178        Returns:
179            list[Path]: List of included file paths. Returns an empty list if none specified.
180        """
181        if (include := self._options.get("include")) is not None:
182            return split_files_list(str(include))
183
184        return []

Determine the files to explicitly copy to the job's working directory.

Returns:

list[Path]: List of included file paths. Returns an empty list if none specified.

def get_loop_start(self) -> int | None:
186    def get_loop_start(self) -> int | None:
187        """
188        Return the starting cycle number for loop jobs.
189
190        Returns:
191            int | None: Start cycle, or None if not specified.
192        """
193        if isinstance(loop_start := self._options.get("loop_start"), int):
194            return loop_start
195        return None

Return the starting cycle number for loop jobs.

Returns:

int | None: Start cycle, or None if not specified.

def get_loop_end(self) -> int | None:
197    def get_loop_end(self) -> int | None:
198        """
199        Return the ending cycle number for loop jobs.
200
201        Returns:
202            int | None: End cycle, or None if not specified.
203        """
204        if isinstance(loop_end := self._options.get("loop_end"), int):
205            return loop_end
206        return None

Return the ending cycle number for loop jobs.

Returns:

int | None: End cycle, or None if not specified.

def get_archive(self) -> pathlib._local.Path | None:
208    def get_archive(self) -> Path | None:
209        """
210        Return the archive directory path specified in the script.
211
212        Returns:
213            Path | None: Archive directory path, or None if not set.
214        """
215        if (archive := self._options.get("archive")) is not None:
216            return Path(str(archive))
217
218        return None

Return the archive directory path specified in the script.

Returns:

Path | None: Archive directory path, or None if not set.

def get_archive_format(self) -> str | None:
220    def get_archive_format(self) -> str | None:
221        """
222        Return the file naming format used for archived files.
223
224        Returns:
225            str | None: Archive filename format string, or None if not set.
226        """
227        if (archive_format := self._options.get("archive_format")) is not None:
228            return str(archive_format)
229        return None

Return the file naming format used for archived files.

Returns:

str | None: Archive filename format string, or None if not set.

def get_archive_mode(self) -> list[qq_lib.properties.transfer_mode.TransferMode]:
231    def get_archive_mode(self) -> list[TransferMode]:
232        """
233        Get the mode specifying when the files should be archived.
234
235        Returns:
236            list[TransferMode]: List of transfer modes.
237        """
238        if (raw := self._options.get("archive_mode")) is not None:
239            return TransferMode.multi_from_str(str(raw))
240
241        return []

Get the mode specifying when the files should be archived.

Returns:

list[TransferMode]: List of transfer modes.

def get_depend(self) -> list[qq_lib.properties.depend.Depend]:
243    def get_depend(self) -> list[Depend]:
244        """
245        Return the list of job dependencies.
246
247        Returns:
248            list[Depend]: List of job dependencies.
249        """
250        if (raw := self._options.get("depend")) is not None:
251            return Depend.multi_from_str(str(raw))
252
253        return []

Return the list of job dependencies.

Returns:

list[Depend]: List of job dependencies.

def get_account(self) -> str | None:
255    def get_account(self) -> str | None:
256        """
257        Get the account name to use for the job.
258
259        Returns:
260            str | None: The account name or None if not defined.
261        """
262        if (account := self._options.get("account")) is not None:
263            return str(account)
264
265        return None

Get the account name to use for the job.

Returns:

str | None: The account name or None if not defined.

def get_transfer_mode(self) -> list[qq_lib.properties.transfer_mode.TransferMode]:
267    def get_transfer_mode(self) -> list[TransferMode]:
268        """
269        Get the mode specifying when the files should be transferred
270        from the working directory to the input directory.
271
272        Returns:
273            list[TransferMode]: List of transfer modes.
274        """
275        if (raw := self._options.get("transfer_mode")) is not None:
276            return TransferMode.multi_from_str(str(raw))
277
278        return []

Get the mode specifying when the files should be transferred from the working directory to the input directory.

Returns:

list[TransferMode]: List of transfer modes.

def get_server(self) -> str | None:
280    def get_server(self) -> str | None:
281        """
282        Get the batch server to which the job should be submitted.
283
284        Note that this function returns the raw name of the server
285        as provided by the user. It should be then translated using
286        the `translate_server` function.
287
288        Returns:
289            str | None: The name or shortcut of the batch server or `None` if not specified.
290        """
291        if (server := self._options.get("server")) is not None:
292            return str(server)
293
294        return None

Get the batch server to which the job should be submitted.

Note that this function returns the raw name of the server as provided by the user. It should be then translated using the translate_server function.

Returns:

str | None: The name or shortcut of the batch server or None if not specified.

def get_interpreter(self) -> qq_lib.properties.interpreter.Interpreter | None:
296    def get_interpreter(self) -> Interpreter | None:
297        """
298        Get the interpreter that should be used to run the script.
299
300        Returns:
301            Interpreter | None: The interpreter or `None` if not specified.
302        """
303        if (raw := self._options.get("interpreter")) is not None:
304            return Interpreter.from_str(str(raw))
305
306        return None

Get the interpreter that should be used to run the script.

Returns:

Interpreter | None: The interpreter or None if not specified.

def get_resubmit_from(self) -> list[qq_lib.properties.resubmit_host.ResubmitHost]:
308    def get_resubmit_from(self) -> list[ResubmitHost]:
309        """
310        Return the list of resubmission hosts.
311
312        Returns:
313            list[ResubmitHost]: List of job dependencies.
314        """
315        if (raw := self._options.get("resubmit_from")) is not None:
316            return ResubmitHost.multi_from_str(str(raw))
317
318        return []

Return the list of resubmission hosts.

Returns:

list[ResubmitHost]: List of job dependencies.

class Submitter:
 37class Submitter:
 38    """
 39    Class to submit jobs to a batch system.
 40
 41    Responsibilities:
 42        - Validate that the script exists and has a proper shebang.
 43        - Guard against multiple submissions from the same directory.
 44        - Set environment variables required for `qq run`.
 45        - Create a qq info file for tracking job state and metadata.
 46
 47    Note that Submitter ignores qq directives in the submitted script.
 48    To handle them, you have to build a Submitter using the SubmitterFactory.
 49    """
 50
 51    def __init__(
 52        self,
 53        batch_system: AnyBatchClass,
 54        queue: str,
 55        account: str | None,
 56        script: Path,
 57        job_type: JobType,
 58        resources: Resources,
 59        loop_info: LoopInfo | None = None,
 60        exclude: list[Path] | None = None,
 61        include: list[Path] | None = None,
 62        depend: list[Depend] | None = None,
 63        transfer_mode: list[TransferMode] | None = None,
 64        server: str | None = None,
 65        interpreter: Interpreter | None = None,
 66        resubmit_from: list[ResubmitHost] | None = None,
 67    ):
 68        """
 69        Initialize a Submitter instance.
 70
 71        Args:
 72            batch_system (AnyBatchClass): The batch system class implementing
 73                the BatchInterface used for job submission.
 74            queue (str): The name of the batch system queue to which the job will be submitted.
 75            account (str | None): The name of the account to use for the job.
 76            script (Path): Path to the job script to submit.
 77            job_type (JobType): Type of the job to submit (e.g. standard, loop).
 78            resources (Resources): Job resource requirements (e.g., CPUs, memory, walltime).
 79            loop_info (LoopInfo | None): Optional information for loop jobs. Pass None if not applicable.
 80            exclude (list[Path] | None): Optional list of files which should not be copied to the working directory.
 81                Paths are provided relative to the input directory.
 82            include (list[Path] | None): Optional list of files which should be copied to the working directory
 83                even though they are not part of the job's input directory.
 84                Paths are provided either absolute or relative to the input directory.
 85            depend (list[Depend] | None): Optional list of job dependencies.
 86            transfer_mode (list[TransferMode] | None): Mode specifying when files whould be transferred from the
 87                working directory to the input directory. Defaults to [`Success()`].
 88            server (str | None): Optional name of the server to which the job should be submitted.
 89                If `None`, the default batch server, as configured by the batch system is used.
 90            intepreter (Interpreter | None): Optional interpreter specification to use to execute the script.
 91                If not specified, the config default is used.
 92            resubmit_from (list[ResubmitHost] | None): List of hosts from which a loop/continuous job should be resubmitted.
 93                Must only be specified for loop/continuous jobs!
 94
 95        Raises:
 96            QQError: If the script does not exist or has an invalid shebang line.
 97        """
 98
 99        self._batch_system = batch_system
100        self._job_type = job_type
101        self._queue = queue
102        self._server = server
103        self._account = account
104        self._loop_info = loop_info
105        self._script = script
106        self._input_dir = logical_resolve(script).parent
107        self._script_name = script.name
108        self._job_name = self._construct_job_name()
109        self._info_file = construct_info_file_path(self._input_dir, self._job_name)
110        self._resources = resources
111        # convert relative paths to absolute paths by prepending the input dir path
112        self._exclude = [self._input_dir / e for e in (exclude or [])]
113        self._include = [
114            i if i.is_absolute() else self._input_dir / i for i in (include or [])
115        ]
116        self._depend = depend or []
117        self._transfer_mode = transfer_mode or TransferMode.multi_from_str(
118            CFG.transfer_files_options.default_transfer_mode
119        )
120        self._interpreter = interpreter
121        self._resubmit_from = resubmit_from or []
122
123        # script must exist
124        if not self._script.is_file():
125            raise QQError(f"Script '{script}' does not exist or is not a file.")
126
127        # script must have a valid qq shebang
128        if not self._has_valid_shebang(self._script):
129            raise QQError(
130                f"Script '{self._script}' has an invalid shebang. The first line of the script should be '#!/usr/bin/env -S {CFG.binary_name} run'."
131            )
132
133    def submit(self, remote: str | None = None) -> str:
134        """
135        Submit the script to the batch system.
136
137        Sets required environment variables, calls the batch system's
138        job submission mechanism, and creates an info file with job metadata.
139
140        This method is thread-safe, if the submission is done from the current machine.
141
142        Args:
143            remote (str | None): Name of the machine from which the job should be submitted.
144                If `None`, the current machine is used.
145
146        Returns:
147            str: The job ID of the submitted job.
148
149        Raises:
150            QQError: If job submission fails.
151        """
152        job_id = self._batch_system.job_submit(
153            self._resources,
154            self._queue,
155            self._script,
156            self._job_name,
157            self._depend,
158            self._create_env_vars_dict(),
159            self._account,
160            self._server,
161            remote_host=remote,
162        )
163
164        # create job qq info file
165        # we create the info file from the current machine no matter
166        # whether we are submiting from the current machine or from the remote machine
167        # the input directory should be available on both concerned machines,
168        # so this should be okay
169        Info(
170            batch_system=self._batch_system,
171            qq_version=qq_lib.__version__,
172            username=getpass.getuser(),
173            job_id=job_id,
174            job_name=self._job_name,
175            script_name=self._script_name,
176            queue=self._queue,
177            job_type=self._job_type,
178            input_machine=socket.getfqdn(remote or ""),
179            input_dir=self._input_dir,
180            job_state=NaiveState.QUEUED,
181            submission_time=datetime.now(),
182            stdout_file=str(Path(self._job_name).with_suffix(CFG.suffixes.stdout)),
183            stderr_file=str(Path(self._job_name).with_suffix(CFG.suffixes.stderr)),
184            resources=self._resources,
185            loop_info=self._loop_info,
186            excluded_files=self._exclude,
187            included_files=self._include,
188            depend=self._depend,
189            account=self._account,
190            transfer_mode=self._transfer_mode,
191            server=self._server,
192            interpreter=self._interpreter,
193            resubmit_from=self._resubmit_from,
194        ).to_file(self._info_file)
195
196        return job_id
197
198    def continues_loop(self) -> bool:
199        """
200        Determine whether the submitted job is a continuation of a loop/continuous job.
201
202        Returns:
203            bool: True if the job is a valid continuation of a previous loop/continuous job,
204                  False otherwise.
205        """
206        try:
207            # there should only be one info file for both loop jobs (runtime files are archived)
208            # and continuous jobs (runtime files overwrite each other)
209            info_file = get_info_file(self._input_dir)
210            informer = Informer.from_file(info_file)
211
212            if self._loop_job_continues_loop(
213                informer
214            ) or self._continuous_job_continues_loop(informer):
215                logger.debug("Valid loop job with a correct cycle or a continuous job.")
216                return True
217            logger.debug(
218                "Detected info file does not correspond to a resubmittable job."
219            )
220            return False
221        except QQError as e:
222            logger.debug(f"Could not read an info file: {e}.")
223            return False
224
225    def _loop_job_continues_loop(self, previous: Informer) -> bool:
226        """
227        Determine whether the submitted job is a continuation of a loop job.
228
229        Args:
230            previous (Informer): Informer associated with the previous job.
231
232        Returns:
233            bool: True if the job is a valid continuation of a previous loop job, False otherwise.
234        """
235        return (
236            # both the previous job and the current job must be loop jobs
237            previous.info.loop_info is not None
238            and self._loop_info is not None
239            # previous job must be successfully finished
240            and previous.info.job_state == NaiveState.FINISHED
241            # the cycle of the current job is one more than the cycle of the previous job
242            and previous.info.loop_info.current == self._loop_info.current - 1
243        )
244
245    def _continuous_job_continues_loop(self, previous: Informer) -> bool:
246        """
247        Determine whether the submitted job is a continuation of a continuous job.
248
249        Args:
250            previous (Informer): Informer associated with the previous job.
251
252        Returns:
253            bool: True if the job is a valid continuation of a previous continuous job, False otherwise.
254        """
255        return (
256            # both the previous and the current job must be continuous jobs
257            previous.info.job_type == JobType.CONTINUOUS
258            and self._job_type == JobType.CONTINUOUS
259            # previous job must be successfully finished
260            and previous.info.job_state == NaiveState.FINISHED
261        )
262
263    def get_input_dir(self) -> Path:
264        """
265        Get path to the job's input directory.
266
267        Returns:
268            Path: Path to the job's input directory.
269        """
270        return self._input_dir
271
272    def get_batch_system(self) -> AnyBatchClass:
273        """Get the batch system used for submiting."""
274        return self._batch_system
275
276    def get_job_name(self) -> str:
277        """Get the name of the job."""
278        return self._job_name
279
280    def get_queue(self) -> str:
281        """Get the submission queue."""
282        return self._queue
283
284    def get_account(self) -> str | None:
285        """Get the user's account."""
286        return self._account
287
288    def get_script(self) -> Path:
289        """Get absolute (logical) path to the submitted script."""
290        return self._script
291
292    def get_job_type(self) -> JobType:
293        """Get type of the job."""
294        return self._job_type
295
296    def get_resources(self) -> Resources:
297        """Get resources requested for the job."""
298        return self._resources
299
300    def get_loop_info(self) -> LoopInfo | None:
301        """Get loop job information."""
302        return self._loop_info
303
304    def get_exclude(self) -> list[Path]:
305        """Get a list of excluded files."""
306        return self._exclude
307
308    def get_include(self) -> list[Path]:
309        """Get a list of included files."""
310        return self._include
311
312    def get_depend(self) -> list[Depend]:
313        """Get the list of dependencies."""
314        return self._depend
315
316    def get_transfer_mode(self) -> list[TransferMode]:
317        """Get the list of transfer modes."""
318        return self._transfer_mode
319
320    def get_server(self) -> str | None:
321        """Get the submission server."""
322        return self._server
323
324    def get_interpreter(self) -> Interpreter | None:
325        """Get the interpreter to use for running the script."""
326        return self._interpreter
327
328    def get_resubmit_from(self) -> list[ResubmitHost] | None:
329        """Get the list of hosts to resubmit the job from."""
330        return self._resubmit_from
331
332    def _create_env_vars_dict(self) -> dict[str, str]:
333        """
334        Create a dictionary of environment variables provided to qq runtime.
335
336        Returns
337            dict[str, str]: Dictionary of environment variables and their values.
338        """
339        env_vars = {}
340
341        # propagate qq debug environment
342        if os.environ.get(CFG.env_vars.debug_mode):
343            env_vars[CFG.env_vars.debug_mode] = "true"
344
345        # indicates that the job is running in a qq environment
346        env_vars[CFG.env_vars.guard] = "true"
347
348        # contains a path to the qq info file
349        env_vars[CFG.env_vars.info_file] = str(self._info_file)
350
351        # contains the name of the input host
352        env_vars[CFG.env_vars.input_machine] = socket.getfqdn()
353
354        # contains the name of the used batch system
355        env_vars[CFG.env_vars.batch_system] = str(self._batch_system)
356
357        # contains the path to the input directory
358        env_vars[CFG.env_vars.input_dir] = str(self._input_dir)
359
360        # environment variables for resources
361        nnodes = self._resources.nnodes or 1
362        if ncpus := self._resources.ncpus:
363            env_vars[CFG.env_vars.ncpus] = str(ncpus)
364        elif ncpus_per_node := self._resources.ncpus_per_node:
365            env_vars[CFG.env_vars.ncpus] = str(ncpus_per_node * nnodes)
366        else:
367            env_vars[CFG.env_vars.ncpus] = "1"
368
369        if ngpus := self._resources.ngpus:
370            env_vars[CFG.env_vars.ngpus] = str(ngpus)
371        elif ngpus_per_node := self._resources.ngpus_per_node:
372            env_vars[CFG.env_vars.ngpus] = str(ngpus_per_node * nnodes)
373        else:
374            env_vars[CFG.env_vars.ngpus] = "0"
375
376        env_vars[CFG.env_vars.nnodes] = str(nnodes)
377        env_vars[CFG.env_vars.walltime] = str(
378            hhmmss_to_duration(self._resources.walltime or "00:00:00").total_seconds()
379            / 3600
380        )
381
382        # loop job-specific environment variables
383        if self._loop_info:
384            env_vars[CFG.env_vars.loop_current] = str(self._loop_info.current)
385            env_vars[CFG.env_vars.loop_start] = str(self._loop_info.start)
386            env_vars[CFG.env_vars.loop_end] = str(self._loop_info.end)
387            env_vars[CFG.env_vars.archive_format] = self._loop_info.archive_format
388
389        # loop job- or continuous job-specific environment variables
390        if self._job_type in [JobType.LOOP, JobType.CONTINUOUS]:
391            env_vars[CFG.env_vars.no_resubmit] = str(CFG.exit_codes.qq_run_no_resubmit)
392
393        return env_vars
394
395    def _has_valid_shebang(self, script: Path) -> bool:
396        """
397        Verify that the script has a valid shebang for qq run.
398
399        Args:
400            script (Path): Path to the script file.
401
402        Returns:
403            bool: True if the first line starts with '#!' and ends with 'qq run'.
404        """
405        with Path.open(script) as file:
406            first_line = file.readline()
407            return first_line.startswith("#!") and first_line.strip().endswith(
408                f"{CFG.binary_name} run"
409            )
410
411    def _construct_job_name(self) -> str:
412        """
413        Construct the job name for submission.
414
415        Returns:
416            str: The constructed job name.
417        """
418        # for standard jobs, use script name
419        if not self._loop_info:
420            return self._script_name
421
422        # for loop jobs, use script_name with cycle number
423        return construct_loop_job_name(self._script_name, self._loop_info.current)

Class to submit jobs to a batch system.

Responsibilities:
  • Validate that the script exists and has a proper shebang.
  • Guard against multiple submissions from the same directory.
  • Set environment variables required for qq run.
  • Create a qq info file for tracking job state and metadata.

Note that Submitter ignores qq directives in the submitted script. To handle them, you have to build a Submitter using the SubmitterFactory.

Submitter( batch_system: AnyBatchClass, queue: str, account: str | None, script: pathlib._local.Path, job_type: qq_lib.properties.job_type.JobType, resources: qq_lib.properties.resources.Resources, loop_info: qq_lib.properties.loop.LoopInfo | None = None, exclude: list[pathlib._local.Path] | None = None, include: list[pathlib._local.Path] | None = None, depend: list[qq_lib.properties.depend.Depend] | None = None, transfer_mode: list[qq_lib.properties.transfer_mode.TransferMode] | None = None, server: str | None = None, interpreter: qq_lib.properties.interpreter.Interpreter | None = None, resubmit_from: list[qq_lib.properties.resubmit_host.ResubmitHost] | None = None)
 51    def __init__(
 52        self,
 53        batch_system: AnyBatchClass,
 54        queue: str,
 55        account: str | None,
 56        script: Path,
 57        job_type: JobType,
 58        resources: Resources,
 59        loop_info: LoopInfo | None = None,
 60        exclude: list[Path] | None = None,
 61        include: list[Path] | None = None,
 62        depend: list[Depend] | None = None,
 63        transfer_mode: list[TransferMode] | None = None,
 64        server: str | None = None,
 65        interpreter: Interpreter | None = None,
 66        resubmit_from: list[ResubmitHost] | None = None,
 67    ):
 68        """
 69        Initialize a Submitter instance.
 70
 71        Args:
 72            batch_system (AnyBatchClass): The batch system class implementing
 73                the BatchInterface used for job submission.
 74            queue (str): The name of the batch system queue to which the job will be submitted.
 75            account (str | None): The name of the account to use for the job.
 76            script (Path): Path to the job script to submit.
 77            job_type (JobType): Type of the job to submit (e.g. standard, loop).
 78            resources (Resources): Job resource requirements (e.g., CPUs, memory, walltime).
 79            loop_info (LoopInfo | None): Optional information for loop jobs. Pass None if not applicable.
 80            exclude (list[Path] | None): Optional list of files which should not be copied to the working directory.
 81                Paths are provided relative to the input directory.
 82            include (list[Path] | None): Optional list of files which should be copied to the working directory
 83                even though they are not part of the job's input directory.
 84                Paths are provided either absolute or relative to the input directory.
 85            depend (list[Depend] | None): Optional list of job dependencies.
 86            transfer_mode (list[TransferMode] | None): Mode specifying when files whould be transferred from the
 87                working directory to the input directory. Defaults to [`Success()`].
 88            server (str | None): Optional name of the server to which the job should be submitted.
 89                If `None`, the default batch server, as configured by the batch system is used.
 90            intepreter (Interpreter | None): Optional interpreter specification to use to execute the script.
 91                If not specified, the config default is used.
 92            resubmit_from (list[ResubmitHost] | None): List of hosts from which a loop/continuous job should be resubmitted.
 93                Must only be specified for loop/continuous jobs!
 94
 95        Raises:
 96            QQError: If the script does not exist or has an invalid shebang line.
 97        """
 98
 99        self._batch_system = batch_system
100        self._job_type = job_type
101        self._queue = queue
102        self._server = server
103        self._account = account
104        self._loop_info = loop_info
105        self._script = script
106        self._input_dir = logical_resolve(script).parent
107        self._script_name = script.name
108        self._job_name = self._construct_job_name()
109        self._info_file = construct_info_file_path(self._input_dir, self._job_name)
110        self._resources = resources
111        # convert relative paths to absolute paths by prepending the input dir path
112        self._exclude = [self._input_dir / e for e in (exclude or [])]
113        self._include = [
114            i if i.is_absolute() else self._input_dir / i for i in (include or [])
115        ]
116        self._depend = depend or []
117        self._transfer_mode = transfer_mode or TransferMode.multi_from_str(
118            CFG.transfer_files_options.default_transfer_mode
119        )
120        self._interpreter = interpreter
121        self._resubmit_from = resubmit_from or []
122
123        # script must exist
124        if not self._script.is_file():
125            raise QQError(f"Script '{script}' does not exist or is not a file.")
126
127        # script must have a valid qq shebang
128        if not self._has_valid_shebang(self._script):
129            raise QQError(
130                f"Script '{self._script}' has an invalid shebang. The first line of the script should be '#!/usr/bin/env -S {CFG.binary_name} run'."
131            )

Initialize a Submitter instance.

Arguments:
  • batch_system (AnyBatchClass): The batch system class implementing the BatchInterface used for job submission.
  • queue (str): The name of the batch system queue to which the job will be submitted.
  • account (str | None): The name of the account to use for the job.
  • script (Path): Path to the job script to submit.
  • job_type (JobType): Type of the job to submit (e.g. standard, loop).
  • resources (Resources): Job resource requirements (e.g., CPUs, memory, walltime).
  • loop_info (LoopInfo | None): Optional information for loop jobs. Pass None if not applicable.
  • exclude (list[Path] | None): Optional list of files which should not be copied to the working directory. Paths are provided relative to the input directory.
  • include (list[Path] | None): Optional list of files which should be copied to the working directory even though they are not part of the job's input directory. Paths are provided either absolute or relative to the input directory.
  • depend (list[Depend] | None): Optional list of job dependencies.
  • transfer_mode (list[TransferMode] | None): Mode specifying when files whould be transferred from the working directory to the input directory. Defaults to [Success()].
  • server (str | None): Optional name of the server to which the job should be submitted. If None, the default batch server, as configured by the batch system is used.
  • intepreter (Interpreter | None): Optional interpreter specification to use to execute the script. If not specified, the config default is used.
  • resubmit_from (list[ResubmitHost] | None): List of hosts from which a loop/continuous job should be resubmitted. Must only be specified for loop/continuous jobs!
Raises:
  • QQError: If the script does not exist or has an invalid shebang line.
def submit(self, remote: str | None = None) -> str:
133    def submit(self, remote: str | None = None) -> str:
134        """
135        Submit the script to the batch system.
136
137        Sets required environment variables, calls the batch system's
138        job submission mechanism, and creates an info file with job metadata.
139
140        This method is thread-safe, if the submission is done from the current machine.
141
142        Args:
143            remote (str | None): Name of the machine from which the job should be submitted.
144                If `None`, the current machine is used.
145
146        Returns:
147            str: The job ID of the submitted job.
148
149        Raises:
150            QQError: If job submission fails.
151        """
152        job_id = self._batch_system.job_submit(
153            self._resources,
154            self._queue,
155            self._script,
156            self._job_name,
157            self._depend,
158            self._create_env_vars_dict(),
159            self._account,
160            self._server,
161            remote_host=remote,
162        )
163
164        # create job qq info file
165        # we create the info file from the current machine no matter
166        # whether we are submiting from the current machine or from the remote machine
167        # the input directory should be available on both concerned machines,
168        # so this should be okay
169        Info(
170            batch_system=self._batch_system,
171            qq_version=qq_lib.__version__,
172            username=getpass.getuser(),
173            job_id=job_id,
174            job_name=self._job_name,
175            script_name=self._script_name,
176            queue=self._queue,
177            job_type=self._job_type,
178            input_machine=socket.getfqdn(remote or ""),
179            input_dir=self._input_dir,
180            job_state=NaiveState.QUEUED,
181            submission_time=datetime.now(),
182            stdout_file=str(Path(self._job_name).with_suffix(CFG.suffixes.stdout)),
183            stderr_file=str(Path(self._job_name).with_suffix(CFG.suffixes.stderr)),
184            resources=self._resources,
185            loop_info=self._loop_info,
186            excluded_files=self._exclude,
187            included_files=self._include,
188            depend=self._depend,
189            account=self._account,
190            transfer_mode=self._transfer_mode,
191            server=self._server,
192            interpreter=self._interpreter,
193            resubmit_from=self._resubmit_from,
194        ).to_file(self._info_file)
195
196        return job_id

Submit the script to the batch system.

Sets required environment variables, calls the batch system's job submission mechanism, and creates an info file with job metadata.

This method is thread-safe, if the submission is done from the current machine.

Arguments:
  • remote (str | None): Name of the machine from which the job should be submitted. If None, the current machine is used.
Returns:

str: The job ID of the submitted job.

Raises:
  • QQError: If job submission fails.
def continues_loop(self) -> bool:
198    def continues_loop(self) -> bool:
199        """
200        Determine whether the submitted job is a continuation of a loop/continuous job.
201
202        Returns:
203            bool: True if the job is a valid continuation of a previous loop/continuous job,
204                  False otherwise.
205        """
206        try:
207            # there should only be one info file for both loop jobs (runtime files are archived)
208            # and continuous jobs (runtime files overwrite each other)
209            info_file = get_info_file(self._input_dir)
210            informer = Informer.from_file(info_file)
211
212            if self._loop_job_continues_loop(
213                informer
214            ) or self._continuous_job_continues_loop(informer):
215                logger.debug("Valid loop job with a correct cycle or a continuous job.")
216                return True
217            logger.debug(
218                "Detected info file does not correspond to a resubmittable job."
219            )
220            return False
221        except QQError as e:
222            logger.debug(f"Could not read an info file: {e}.")
223            return False

Determine whether the submitted job is a continuation of a loop/continuous job.

Returns:

bool: True if the job is a valid continuation of a previous loop/continuous job, False otherwise.

def get_input_dir(self) -> pathlib._local.Path:
263    def get_input_dir(self) -> Path:
264        """
265        Get path to the job's input directory.
266
267        Returns:
268            Path: Path to the job's input directory.
269        """
270        return self._input_dir

Get path to the job's input directory.

Returns:

Path: Path to the job's input directory.

def get_batch_system(self) -> AnyBatchClass:
272    def get_batch_system(self) -> AnyBatchClass:
273        """Get the batch system used for submiting."""
274        return self._batch_system

Get the batch system used for submiting.

def get_job_name(self) -> str:
276    def get_job_name(self) -> str:
277        """Get the name of the job."""
278        return self._job_name

Get the name of the job.

def get_queue(self) -> str:
280    def get_queue(self) -> str:
281        """Get the submission queue."""
282        return self._queue

Get the submission queue.

def get_account(self) -> str | None:
284    def get_account(self) -> str | None:
285        """Get the user's account."""
286        return self._account

Get the user's account.

def get_script(self) -> pathlib._local.Path:
288    def get_script(self) -> Path:
289        """Get absolute (logical) path to the submitted script."""
290        return self._script

Get absolute (logical) path to the submitted script.

def get_job_type(self) -> qq_lib.properties.job_type.JobType:
292    def get_job_type(self) -> JobType:
293        """Get type of the job."""
294        return self._job_type

Get type of the job.

def get_resources(self) -> qq_lib.properties.resources.Resources:
296    def get_resources(self) -> Resources:
297        """Get resources requested for the job."""
298        return self._resources

Get resources requested for the job.

def get_loop_info(self) -> qq_lib.properties.loop.LoopInfo | None:
300    def get_loop_info(self) -> LoopInfo | None:
301        """Get loop job information."""
302        return self._loop_info

Get loop job information.

def get_exclude(self) -> list[pathlib._local.Path]:
304    def get_exclude(self) -> list[Path]:
305        """Get a list of excluded files."""
306        return self._exclude

Get a list of excluded files.

def get_include(self) -> list[pathlib._local.Path]:
308    def get_include(self) -> list[Path]:
309        """Get a list of included files."""
310        return self._include

Get a list of included files.

def get_depend(self) -> list[qq_lib.properties.depend.Depend]:
312    def get_depend(self) -> list[Depend]:
313        """Get the list of dependencies."""
314        return self._depend

Get the list of dependencies.

def get_transfer_mode(self) -> list[qq_lib.properties.transfer_mode.TransferMode]:
316    def get_transfer_mode(self) -> list[TransferMode]:
317        """Get the list of transfer modes."""
318        return self._transfer_mode

Get the list of transfer modes.

def get_server(self) -> str | None:
320    def get_server(self) -> str | None:
321        """Get the submission server."""
322        return self._server

Get the submission server.

def get_interpreter(self) -> qq_lib.properties.interpreter.Interpreter | None:
324    def get_interpreter(self) -> Interpreter | None:
325        """Get the interpreter to use for running the script."""
326        return self._interpreter

Get the interpreter to use for running the script.

def get_resubmit_from(self) -> list[qq_lib.properties.resubmit_host.ResubmitHost] | None:
328    def get_resubmit_from(self) -> list[ResubmitHost] | None:
329        """Get the list of hosts to resubmit the job from."""
330        return self._resubmit_from

Get the list of hosts to resubmit the job from.