qq_lib.run

Execution utilities for running qq jobs inside the batch environment.

This module defines the Runner class, which prepares the execution environment, launches the user's job script, updates qq's state tracking, and performs cleanup on success, failure, or interruption. It handles both shared and scratch working directories, loop-job archiving, resubmission, communication with the batch system, and SIGTERM-safe shutdown.

 1# Released under MIT License.
 2# Copyright (c) 2025-2026 Ladislav Bartos and Robert Vacha Lab
 3
 4"""
 5Execution utilities for running qq jobs inside the batch environment.
 6
 7This module defines the `Runner` class, which prepares the execution
 8environment, launches the user's job script, updates qq's state tracking,
 9and performs cleanup on success, failure, or interruption. It handles both
10shared and scratch working directories, loop-job archiving, resubmission,
11communication with the batch system, and SIGTERM-safe shutdown.
12"""
13
14from .runner import Runner
15
16__all__ = [
17    "Runner",
18]
class Runner:
 40class Runner:
 41    """
 42    Manages the setup, execution, and cleanup of scripts within the qq batch environment.
 43
 44    The Runner class is responsible for:
 45      - Preparing a working directory (shared or scratch space)
 46      - Executing a provided job script
 47      - Updating the job info file with run state, success, or failure
 48      - Cleaning up resources when execution is finished
 49    """
 50
 51    def __init__(self, info_file: Path, host: str):
 52        """
 53        Initialize a new Runner instance.
 54
 55        Args:
 56            info_file (Path): Path to the qq info file that contains job metadata.
 57            host (str): The hostname of the input machine from which the job was submitted.
 58
 59        Raises:
 60            QQRunFatalError: If loading the QQ info file fails fatally during initialization.
 61        """
 62        # install a signal handler
 63        signal.signal(signal.SIGTERM, self._handle_sigterm)
 64
 65        # process running the wrapped script
 66        self._process: subprocess.Popen[str] | None = None
 67
 68        self._info_file = Path(info_file)
 69        logger.debug(f"Info file: '{self._info_file}'.")
 70
 71        self._input_machine = host
 72        logger.debug(f"Input machine: '{self._input_machine}'.")
 73
 74        # load the info file or raise a fatal qq error if this fails
 75        try:
 76            # get the batch system from the environment variable (or guess it)
 77            self._batch_system = BatchInterface.from_env_var_or_guess()
 78            logger.debug(f"Batch system: {str(self._batch_system)}.")
 79
 80            # get the id of the job from the batch system
 81            if not (job_id := self._batch_system.get_job_id()):
 82                raise QQError("Job has no associated job id")
 83
 84            # load the info file
 85            self._informer: Informer = Retryer(
 86                Informer.from_file,
 87                self._info_file,
 88                host=self._input_machine,
 89                max_tries=CFG.runner.retry_tries,
 90                wait_seconds=CFG.runner.retry_wait,
 91            ).run()
 92
 93            # check that the id of this job matches the job id in the info file
 94            if not self._informer.matches_job(job_id):
 95                raise QQJobMismatchError(
 96                    "Info file does not correspond to the current job"
 97                )
 98
 99            # check that the batch system in info file matches the one loaded from the environment variable
100            if self._batch_system != self._informer.batch_system:
101                raise QQError(
102                    f"Batch system mismatch - env var: '{str(self._batch_system)}', info file: '{self._informer.batch_system}'"
103                )
104
105        except Exception as e:
106            raise QQRunFatalError(
107                f"Unable to load valid qq info file '{self._info_file}' on '{self._input_machine}': {e}"
108            ) from e
109
110        logger.info(
111            f"[qq-{str(self._batch_system)} v{qq_lib.__version__}] Initializing "
112            f"job '{self._informer.info.job_id}' on host '{socket.getfqdn()}'."
113        )
114
115        # get input directory
116        self._input_dir = Path(self._informer.info.input_dir)
117        logger.debug(f"Input directory: {self._input_dir}.")
118
119        # should the scratch directory be used?
120        self._use_scratch = self._informer.uses_scratch()
121        logger.debug(f"Use scratch: {self._use_scratch}.")
122
123        # initialize archiver, if this is a loop job
124        if loop_info := self._informer.info.loop_info:
125            self._archiver = Archiver(
126                loop_info.archive,
127                loop_info.archive_format,
128                self._informer.info.input_machine,
129                self._informer.info.input_dir,
130                self._batch_system,
131            )
132            self._should_resubmit = True
133        else:
134            self._archiver = None
135
136        if self._informer.info.job_type == JobType.CONTINUOUS:
137            self._should_resubmit = True
138
139    def prepare(self) -> None:
140        """
141        Prepare the script for execution, setting up the archive
142        and archiving run time files (if this is a loop job) and
143        preparing working directory.
144
145        Raises:
146            QQError: If working directory setup fails.
147        """
148        if self._archiver:
149            assert self._informer.info.loop_info is not None
150            # prepare the directory for archiving
151            self._archiver.make_archive_dir()
152
153            # archive runtime files from the previous cycle
154            # this has to be done before the working directory is prepared,
155            # otherwise the runtime files would get copied to the working directory
156            logger.debug(
157                f"Archiving run time files from cycle {self._informer.info.loop_info.current - 1}."
158            )
159            self._archiver.archive_runtime_files(
160                # we need to escape the '+' character
161                construct_loop_job_name(
162                    self._informer.info.script_name,
163                    self._informer.info.loop_info.current - 1,
164                ).replace("+", "\\+"),
165                self._informer.info.loop_info.current - 1,
166            )
167
168        if self._use_scratch:
169            self._set_up_scratch_dir()
170        else:
171            self._set_up_shared_dir()
172
173        if self._archiver:
174            assert self._informer.info.loop_info is not None
175            # fetch files for the current cycle of the loop job from the archive
176            self._archiver.from_archive(
177                self._work_dir, self._informer.info.loop_info.current
178            )
179
180    def execute(self) -> int:
181        """
182        Execute the job script in the working directory.
183
184        Returns:
185            int: The exit code from the executed script.
186
187        Raises:
188            QQError: If execution fails or info file cannot be updated.
189        """
190        # update the qqinfo file
191        self._update_info_running()
192
193        # get the actual name of the script to execute
194        script = logical_resolve(Path(self._informer.info.script_name))
195
196        # get paths to output files
197        stdout_log = self._informer.info.stdout_file
198        stderr_log = self._informer.info.stderr_file
199
200        logger.info(f"Executing script '{script}'.")
201
202        # get intepreter if configured, otherwise use the default
203        interpreter = self._informer.info.interpreter or Interpreter()
204
205        # get the command to execute
206        command_list = [*interpreter.to_command_list(), str(script)]
207        logger.debug(f"Command executed using subprocess.Popen: {command_list}")
208
209        try:
210            with Path(stdout_log).open("w") as out, Path(stderr_log).open("w") as err:
211                self._process = subprocess.Popen(
212                    command_list,
213                    stdout=out,
214                    stderr=err,
215                    text=True,
216                )
217
218                # wait for the process to finish in a non-blocking manner
219                while self._process.poll() is None:
220                    sleep(CFG.runner.subprocess_checks_wait_time)
221
222        except Exception as e:
223            raise QQError(f"Failed to execute script '{script}': {e}") from e
224
225        # if the script returns an exit code corresponding to CFG.exit_codes.qq_run_no_resubmit,
226        # do not submit the next cycle of the job but return 0
227        if (
228            self._informer.info.job_type in [JobType.LOOP, JobType.CONTINUOUS]
229            and self._process.returncode == CFG.exit_codes.qq_run_no_resubmit
230        ):
231            logger.debug(
232                f"Detected an exit code of '{self._process.returncode}'. Replacing with '0' and will not submit the next cycle of the job."
233            )
234            self._process.returncode = 0
235            self._should_resubmit = False
236
237        return self._process.returncode
238
239    def finalize(self) -> None:
240        """
241        Finalize the execution of the job script.
242
243        Handles post-processing of the job based on the script's exit code and the
244        configured transfer and archive modes. The specific actions taken depend on
245        the job's transfer mode, archive mode, and whether scratch directory is being used.
246
247        Specifically, this method:
248
249        1. Archives files from the working directory if archiving is enabled and the
250            archive mode allows it for the given exit code (loop jobs only).
251        2. Transfers or handles files based on whether scratch directory is used:
252            - If using scratch and transfer mode allows: Syncs the entire working
253                directory back to the input directory (excluding explicitly included files)
254                and removes the working directory from scratch.
255            - If using scratch and transfer mode disallows: Copies only runtime files
256                to the input directory and preserves the working directory.
257            - If not using scratch: No file operations are performed.
258        3. Updates the qq info file to "finished" (exit code 0) or "failed" (non-zero
259            exit code).
260        4. Resubmits the job if it is a loop or continuous job and was completed successfully.
261
262        Raises:
263            QQError: If copying, deletion, or archiving of files fails or if the resubmission fails.
264        """
265        logger.info("Finalizing the execution.")
266        assert self._process is not None
267
268        # archive files
269        if self._archiver and self._informer.should_archive_files(
270            self._process.returncode
271        ):
272            logger.debug(
273                f"Script exit code is '{self._process.returncode}'. Archiving files."
274            )
275            self._archive_files_from_work_dir()
276
277        # transfer files back to the input (submission) directory
278        if self._use_scratch:
279            if self._informer.should_transfer_files(self._process.returncode):
280                logger.debug(
281                    f"Script exit code is '{self._process.returncode}'. Transferring files from working directory."
282                )
283
284                Retryer(
285                    self._batch_system.sync_with_exclusions,
286                    self._work_dir,
287                    self._input_dir,
288                    socket.getfqdn(),
289                    self._informer.info.input_machine,
290                    # exclude files that were copied to workdir from the outside of input dir (--include option)
291                    # these files should not be copied to the input directory, since they were never inside it
292                    self._get_explicitly_included_files_in_work_dir(),
293                    max_tries=CFG.runner.retry_tries,
294                    wait_seconds=CFG.runner.retry_wait,
295                ).run()
296
297                # remove the working directory from scratch
298                self._delete_work_dir()
299            else:
300                # copy only the runtime files to input directory
301                # and keep the working directory
302                self._copy_runtime_files_to_input_dir(retry=True)
303
304        if self._process.returncode == 0:
305            # update the qqinfo file
306            self._update_info_finished()
307
308            # if this is a loop/continuous job
309            if self._informer.info.job_type in [JobType.LOOP, JobType.CONTINUOUS]:
310                self._resubmit()
311        else:
312            # update the qqinfo file
313            self._update_info_failed(self._process.returncode)
314
315        logger.info(f"Job completed with an exit code of {self._process.returncode}.")
316
317    def log_failure_and_exit(self, exception: BaseException) -> NoReturn:
318        """
319        Record a failure state into the qq info file and exit the program.
320
321        Args:
322            exception (BaseException): The exception to log.
323
324        Raises:
325            SystemExit: Always exits with the exit code associated with the given exception.
326        """
327        exit_code = getattr(exception, "exit_code", CFG.exit_codes.unexpected_error)
328        try:
329            self._update_info_failed(exit_code)
330            logger.error(exception)
331            sys.exit(exit_code)
332        except Exception as e:
333            # unable to log the current state into the info file
334            log_fatal_error_and_exit(e)  # exits here
335
336    def _set_up_shared_dir(self) -> None:
337        """
338        Configure the input directory as the working directory.
339        """
340        # set qq working directory to the input dir
341        self._work_dir = self._input_dir
342
343        # move to the working directory
344        Retryer(
345            os.chdir,
346            self._work_dir,
347            max_tries=CFG.runner.retry_tries,
348            wait_seconds=CFG.runner.retry_wait,
349        ).run()
350
351    def _set_up_scratch_dir(self) -> None:
352        """
353        Configure a scratch directory as the working directory.
354
355        Copies all files from the job directory to the working directory
356        (excluding the qq info file).
357
358        Raises:
359            QQError: If scratch directory cannot be determined.
360        """
361        # get path to the working directory (created by the batch system)
362        self._work_dir: Path = Retryer(
363            self._batch_system.create_work_dir_on_scratch,
364            self._informer.info.job_id,
365            max_tries=CFG.runner.retry_tries,
366            wait_seconds=CFG.runner.retry_wait,
367        ).run()
368
369        logger.info(f"Setting up working directory in '{self._work_dir}'.")
370
371        # move to the working directory
372        Retryer(
373            os.chdir,
374            self._work_dir,
375            max_tries=CFG.runner.retry_tries,
376            wait_seconds=CFG.runner.retry_wait,
377        ).run()
378
379        # files excluded from copying to the working directory
380        qq_out = (
381            self._informer.info.input_dir / self._informer.info.job_name
382        ).with_suffix(CFG.suffixes.qq_out)
383        excluded = self._informer.info.excluded_files + [self._info_file, qq_out]
384        if self._archiver:
385            excluded.append(self._archiver._archive)
386
387        # copy files from the input directory to the working directory
388        logger.debug(
389            f"Files excluded from being copied to the working directory: {excluded}."
390        )
391        Retryer(
392            self._batch_system.sync_with_exclusions,
393            self._input_dir,
394            self._work_dir,
395            self._informer.info.input_machine,
396            socket.getfqdn(),
397            excluded,
398            max_tries=CFG.runner.retry_tries,
399            wait_seconds=CFG.runner.retry_wait,
400        ).run()
401
402        # copy explicitly included files to the working directory
403        # this will copy files that were specified with the --include option, even if they are also in the list of excluded files
404        logger.debug(
405            f"Files explicitly requested to be copied to the working directory: {self._informer.info.included_files}."
406        )
407        Retryer(
408            self._copy_files,
409            self._informer.info.included_files,
410            max_tries=CFG.runner.retry_tries,
411            wait_seconds=CFG.runner.retry_wait,
412        ).run()
413
414    def _delete_work_dir(self) -> None:
415        """
416        Delete the entire working directory.
417
418        Used only after successful execution in scratch space.
419        """
420        logger.debug(f"Removing working directory '{self._work_dir}'.")
421        Retryer(
422            shutil.rmtree,
423            self._work_dir,
424            max_tries=CFG.runner.retry_tries,
425            wait_seconds=CFG.runner.retry_wait,
426        ).run()
427
428    def _update_info_running(self) -> None:
429        """
430        Update the qq info file to mark the job as running.
431
432        Raises:
433            QQRunCommunicationError: If the job was killed without informing Runner.
434            QQError: If the info file cannot be updated.
435        """
436        logger.debug(f"Updating '{self._info_file}' at job start.")
437        self._reload_info_and_ensure_valid()
438
439        try:
440            nodes = Retryer(
441                self._get_nodes,
442                max_tries=CFG.runner.retry_tries,
443                wait_seconds=CFG.runner.retry_wait,
444            ).run()
445
446            self._informer.set_running(
447                datetime.now(),
448                socket.getfqdn(),
449                nodes,
450                self._work_dir,
451            )
452
453            Retryer(
454                self._informer.to_file,
455                self._info_file,
456                host=self._input_machine,
457                max_tries=CFG.runner.retry_tries,
458                wait_seconds=CFG.runner.retry_wait,
459            ).run()
460        except Exception as e:
461            raise QQError(
462                f"Could not update qqinfo file '{self._info_file}' at JOB START: {e}."
463            ) from e
464
465    def _get_nodes(self) -> list[str]:
466        """
467        Get a list of nodes used to execute this job. The nodes are obtained by
468        querying the batch system.
469
470        If the batch server is not available and only one node was requested, uses
471        `socket.getfqdn()` instead and prints warning.
472
473        Returns:
474            list[str]: Names of nodes used to execute the job.
475
476        Raises:
477            QQError: If the batch system is unable to provide information about the nodes after retries
478                and more than one node is used.
479        """
480        nodes = self._informer.get_nodes()
481        if not nodes:
482            # if the batch server is not reachable but the requested number of nodes is one,
483            # we assume that only one node is actually being used and Runner thus runs on this node
484            # we can then get the node name from socket
485            # this avoids issues with occasional inaccessibility of the batch server in
486            # the unstable Metacentrum environment
487            if self._informer.info.resources.nnodes == 1:
488                node = socket.getfqdn()
489                logger.warning(
490                    f"Could not get the list of used nodes from the batch server. Assuming the only node is the current node '{node}'."
491                )
492                return [node]
493
494            raise QQError("Could not get the list of used nodes from the batch server")
495
496        return nodes
497
498    def _update_info_finished(self) -> None:
499        """
500        Update the qq info file to mark the job as successfully finished.
501
502        Logs errors as warnings if updating fails.
503
504        Raises:
505            QQRunCommunicationError: If the job was killed without informing Runner.
506        """
507        logger.debug(f"Updating '{self._info_file}' at job completion.")
508        self._reload_info_and_ensure_valid()
509
510        try:
511            self._informer.set_finished(datetime.now())
512            Retryer(
513                self._informer.to_file,
514                self._info_file,
515                host=self._input_machine,
516                max_tries=CFG.runner.retry_tries,
517                wait_seconds=CFG.runner.retry_wait,
518            ).run()
519        except Exception as e:
520            logger.warning(
521                f"Could not update qqinfo file '{self._info_file}' at JOB COMPLETION: {e}."
522            )
523
524    def _update_info_failed(self, return_code: int) -> None:
525        """
526        Update the qq info file to mark the job as failed.
527
528        Args:
529            return_code (int): Exit code from the failed job.
530
531        Logs errors as warnings if updating fails.
532
533        Raises:
534            QQRunCommunicationError: If the job was killed without informing Runner.
535        """
536        logger.debug(f"Updating '{self._info_file}' at job failure.")
537        self._reload_info_and_ensure_valid()
538
539        try:
540            self._informer.set_failed(datetime.now(), return_code)
541            Retryer(
542                self._informer.to_file,
543                self._info_file,
544                host=self._input_machine,
545                max_tries=CFG.runner.retry_tries,
546                wait_seconds=CFG.runner.retry_wait,
547            ).run()
548        except Exception as e:
549            logger.warning(
550                f"Could not update qqinfo file '{self._info_file}' at JOB FAILURE: {e}."
551            )
552
553    def _update_info_killed(self) -> None:
554        """
555        Update the qq info file to mark the job as killed.
556
557        Used during SIGTERM cleanup.
558
559        Logs errors as warnings if updating fails.
560
561        No retrying since there is no time for that.
562        """
563        logger.debug(f"Updating '{self._info_file}' at job kill.")
564        self._reload_info_and_ensure_valid(retry=False)
565
566        try:
567            self._informer.set_killed(datetime.now())
568            # no retrying here since we cannot afford multiple attempts here
569            self._informer.to_file(self._info_file, host=self._input_machine)
570        except Exception as e:
571            logger.warning(
572                f"Could not update qqinfo file '{self._info_file}' at JOB KILL: {e}."
573            )
574
575    def _copy_runtime_files_to_input_dir(self, retry: bool = True) -> None:
576        """
577        Copy .out and .err runtime files from the working directory to the input directory.
578
579        Args:
580            retry (bool): Retry the copying if it fails.
581
582        Raises:
583            QQError: If the files could not be copied after retrying.
584        """
585        files_to_copy = [
586            logical_resolve(Path(self._informer.info.stdout_file)),
587            logical_resolve(Path(self._informer.info.stderr_file)),
588        ]
589
590        logger.debug(f"Copying runtime files '{files_to_copy}' to input directory.")
591
592        if retry:
593            Retryer(
594                self._batch_system.sync_selected,
595                self._work_dir,
596                self._input_dir,
597                socket.getfqdn(),
598                self._informer.info.input_machine,
599                include_files=files_to_copy,
600                max_tries=CFG.runner.retry_tries,
601                wait_seconds=CFG.runner.retry_wait,
602            ).run()
603        else:
604            self._batch_system.sync_selected(
605                self._work_dir,
606                self._input_dir,
607                socket.getfqdn(),
608                self._informer.info.input_machine,
609                files_to_copy,
610            )
611
612    def _reload_info(self, retry: bool = True) -> None:
613        """
614        Reload the qq job info file for this job.
615
616        Args:
617            retry (bool): Retry the loading operation if it fails.
618
619        Raises:
620            QQError: If the qq info file cannot be reach or read after retrying.
621        """
622        if retry:
623            self._informer = Retryer(
624                Informer.from_file,
625                self._info_file,
626                host=self._input_machine,
627                max_tries=CFG.runner.retry_tries,
628                wait_seconds=CFG.runner.retry_wait,
629            ).run()
630        else:
631            self._informer = Informer.from_file(self._info_file, self._input_machine)
632
633    def _ensure_matches_job(self, job_id: str) -> None:
634        """
635        Ensure that the provided job_id matches the job id in the wrapped informer.
636
637        Raises:
638            QQJobMismatchError: If the info file corresponds to a different job.
639        """
640        if not self._informer.matches_job(job_id):
641            raise QQJobMismatchError(
642                f"Info file '{self._info_file}' does not correspond to job '{job_id}'."
643            )
644
645    def _ensure_not_killed(self) -> None:
646        """
647        Ensure that the job has not been killed.
648
649        Raises:
650            QQRunCommunicationError: If the job state is `KILLED`.
651        """
652        if self._informer.info.job_state == NaiveState.KILLED:
653            raise QQRunCommunicationError(
654                "Job has been killed without informing qq run. Aborting the job!"
655            )
656
657    def _reload_info_and_ensure_valid(self, retry: bool = False) -> None:
658        """
659        Reload the qq job info file and check that it corresponds to the current job
660        by comparing job ids.
661
662        Then check the job's state and ensure it is not killed.
663
664        Args:
665            retry (bool): Retry the loading operation if it fails.
666
667        Raises:
668            QQJobMismatchError: If the info file corresponds to a different job.
669            QQRunCommunicationError: If the job state is `KILLED`.
670            QQError: If the qq info file cannot be reached or read.
671        """
672        job_id = self._informer.info.job_id
673        self._reload_info(retry)
674        self._ensure_matches_job(job_id)
675        self._ensure_not_killed()
676
677    def _resubmit(self) -> None:
678        """
679        Resubmit the current job if either of the following is true:
680            a) it is a loop job and additional cycles remain,
681            b) it is a continuous job that should be resubmitted.
682
683        Raises:
684            QQError: If the job cannot be resubmitted.
685        """
686        if not self._should_resubmit:
687            logger.info(
688                f"The script finished with an exit code of '{CFG.exit_codes.qq_run_no_resubmit}' indicating that the next cycle of the job should not be submitted. Not resubmitting."
689            )
690            return
691
692        if self._informer.info.job_type == JobType.LOOP:
693            if not (loop_info := self._informer.info.loop_info):
694                raise QQError(
695                    "Loop info is undefined while resubmiting a loop job. This is a bug!"
696                )
697                return
698
699            if loop_info.current >= loop_info.end:
700                logger.info(
701                    "This was the final cycle of the loop job. Not resubmitting."
702                )
703                return
704
705        logger.info("Resubmitting the job.")
706        resubmitter = Resubmitter.from_informer(self._informer)
707        job_id = resubmitter.resubmit()
708
709        logger.info(f"Job resubmitted successfully as '{job_id}'.")
710
711    def _archive_files_from_work_dir(self) -> None:
712        """
713        Archive files from the working directory.
714
715        If no file exists for the next loop cycle, creates an empty init file to ensure the loop job continues normally.
716        """
717        if not self._archiver:
718            raise QQError("Archiver is undefined while archiving files. This is a bug!")
719
720        if not (loop_info := self._informer.info.loop_info):
721            raise QQError(
722                "Loop info is undefined while archiving files. This is a bug!"
723            )
724
725        # get the files to archive corresponding to the next loop job cycle
726        if not self._archiver.get_files_matching_pattern(
727            self._work_dir,
728            None,
729            loop_info.archive_format,
730            loop_info.current + 1,
731            False,
732        ):
733            # if there are no files matching the next loop job cycle, create an empty .init file
734            # so that the loop job continues normally
735            logger.debug(
736                f"Creating .init file for loop job cycle {loop_info.current + 1}."
737            )
738            self._archiver.create_init_file(loop_info.current + 1)
739
740        # archive all files matching the archive format
741        self._archiver.to_archive(self._work_dir)
742
743    def _get_explicitly_included_files_in_work_dir(self) -> list[Path]:
744        """
745        Return absolute paths to files and directories in the working directory
746        that were explicitly copied via the `--include` submission option.
747        """
748        files = [
749            logical_resolve(self._work_dir / f.name)
750            for f in self._informer.info.included_files
751        ]
752
753        logger.debug(
754            f"Files that were copied to work dir using the `--include` option: {files}."
755        )
756
757        return files
758
759    def _copy_files(self, files: list[Path]):
760        """
761        Copy files and directories using the provided absolute paths to the working directory.
762        """
763        for file in files:
764            # we rsync each file or directory individually because each file can be provided in a different directory
765            # this may be very slow if there is a large amount of files/directories to include
766            self._batch_system.sync_selected(
767                file.parent,
768                self._work_dir,
769                self._informer.info.input_machine,
770                socket.getfqdn(),
771                [file],
772            )
773
774    def _cleanup(self) -> None:
775        """
776        Clean up after execution is interrupted or killed.
777
778        - Copies .out and .err file to the input directory.
779        - Marks job as killed in the info file.
780        - Terminates the subprocess.
781        """
782        # update the qq info file
783        self._update_info_killed()
784
785        # send SIGTERM to the running process, if there is any
786        # this may potentially not even be called -- the subprocess might be already terminated
787        if self._process and self._process.poll() is None:
788            logger.info("Cleaning up: terminating subprocess.")
789            self._process.terminate()
790
791            # wait for the subprocess to exit, then SIGKILL it
792            sleep(CFG.runner.sigterm_to_sigkill)
793            if self._process and self._process.poll() is None:
794                self._process.kill()
795
796        # copy runtime files to input dir without retrying
797        if self._use_scratch:
798            self._copy_runtime_files_to_input_dir(retry=False)
799
800    def _handle_sigterm(self, _signum: int, _frame: FrameType | None) -> NoReturn:
801        """
802        Signal handler for SIGTERM.
803
804        Performs cleanup, logs termination, and exits.
805        """
806        logger.info("Received SIGTERM, initiating shutdown.")
807        self._cleanup()
808        logger.error("Execution was terminated by SIGTERM.")
809        # this may get ignored by the batch system
810        # so you should not rely on this specific exit code
811        sys.exit(143)

Manages the setup, execution, and cleanup of scripts within the qq batch environment.

The Runner class is responsible for:
  • Preparing a working directory (shared or scratch space)
  • Executing a provided job script
  • Updating the job info file with run state, success, or failure
  • Cleaning up resources when execution is finished
Runner(info_file: pathlib._local.Path, host: str)
 51    def __init__(self, info_file: Path, host: str):
 52        """
 53        Initialize a new Runner instance.
 54
 55        Args:
 56            info_file (Path): Path to the qq info file that contains job metadata.
 57            host (str): The hostname of the input machine from which the job was submitted.
 58
 59        Raises:
 60            QQRunFatalError: If loading the QQ info file fails fatally during initialization.
 61        """
 62        # install a signal handler
 63        signal.signal(signal.SIGTERM, self._handle_sigterm)
 64
 65        # process running the wrapped script
 66        self._process: subprocess.Popen[str] | None = None
 67
 68        self._info_file = Path(info_file)
 69        logger.debug(f"Info file: '{self._info_file}'.")
 70
 71        self._input_machine = host
 72        logger.debug(f"Input machine: '{self._input_machine}'.")
 73
 74        # load the info file or raise a fatal qq error if this fails
 75        try:
 76            # get the batch system from the environment variable (or guess it)
 77            self._batch_system = BatchInterface.from_env_var_or_guess()
 78            logger.debug(f"Batch system: {str(self._batch_system)}.")
 79
 80            # get the id of the job from the batch system
 81            if not (job_id := self._batch_system.get_job_id()):
 82                raise QQError("Job has no associated job id")
 83
 84            # load the info file
 85            self._informer: Informer = Retryer(
 86                Informer.from_file,
 87                self._info_file,
 88                host=self._input_machine,
 89                max_tries=CFG.runner.retry_tries,
 90                wait_seconds=CFG.runner.retry_wait,
 91            ).run()
 92
 93            # check that the id of this job matches the job id in the info file
 94            if not self._informer.matches_job(job_id):
 95                raise QQJobMismatchError(
 96                    "Info file does not correspond to the current job"
 97                )
 98
 99            # check that the batch system in info file matches the one loaded from the environment variable
100            if self._batch_system != self._informer.batch_system:
101                raise QQError(
102                    f"Batch system mismatch - env var: '{str(self._batch_system)}', info file: '{self._informer.batch_system}'"
103                )
104
105        except Exception as e:
106            raise QQRunFatalError(
107                f"Unable to load valid qq info file '{self._info_file}' on '{self._input_machine}': {e}"
108            ) from e
109
110        logger.info(
111            f"[qq-{str(self._batch_system)} v{qq_lib.__version__}] Initializing "
112            f"job '{self._informer.info.job_id}' on host '{socket.getfqdn()}'."
113        )
114
115        # get input directory
116        self._input_dir = Path(self._informer.info.input_dir)
117        logger.debug(f"Input directory: {self._input_dir}.")
118
119        # should the scratch directory be used?
120        self._use_scratch = self._informer.uses_scratch()
121        logger.debug(f"Use scratch: {self._use_scratch}.")
122
123        # initialize archiver, if this is a loop job
124        if loop_info := self._informer.info.loop_info:
125            self._archiver = Archiver(
126                loop_info.archive,
127                loop_info.archive_format,
128                self._informer.info.input_machine,
129                self._informer.info.input_dir,
130                self._batch_system,
131            )
132            self._should_resubmit = True
133        else:
134            self._archiver = None
135
136        if self._informer.info.job_type == JobType.CONTINUOUS:
137            self._should_resubmit = True

Initialize a new Runner instance.

Arguments:
  • info_file (Path): Path to the qq info file that contains job metadata.
  • host (str): The hostname of the input machine from which the job was submitted.
Raises:
  • QQRunFatalError: If loading the QQ info file fails fatally during initialization.
def prepare(self) -> None:
139    def prepare(self) -> None:
140        """
141        Prepare the script for execution, setting up the archive
142        and archiving run time files (if this is a loop job) and
143        preparing working directory.
144
145        Raises:
146            QQError: If working directory setup fails.
147        """
148        if self._archiver:
149            assert self._informer.info.loop_info is not None
150            # prepare the directory for archiving
151            self._archiver.make_archive_dir()
152
153            # archive runtime files from the previous cycle
154            # this has to be done before the working directory is prepared,
155            # otherwise the runtime files would get copied to the working directory
156            logger.debug(
157                f"Archiving run time files from cycle {self._informer.info.loop_info.current - 1}."
158            )
159            self._archiver.archive_runtime_files(
160                # we need to escape the '+' character
161                construct_loop_job_name(
162                    self._informer.info.script_name,
163                    self._informer.info.loop_info.current - 1,
164                ).replace("+", "\\+"),
165                self._informer.info.loop_info.current - 1,
166            )
167
168        if self._use_scratch:
169            self._set_up_scratch_dir()
170        else:
171            self._set_up_shared_dir()
172
173        if self._archiver:
174            assert self._informer.info.loop_info is not None
175            # fetch files for the current cycle of the loop job from the archive
176            self._archiver.from_archive(
177                self._work_dir, self._informer.info.loop_info.current
178            )

Prepare the script for execution, setting up the archive and archiving run time files (if this is a loop job) and preparing working directory.

Raises:
  • QQError: If working directory setup fails.
def execute(self) -> int:
180    def execute(self) -> int:
181        """
182        Execute the job script in the working directory.
183
184        Returns:
185            int: The exit code from the executed script.
186
187        Raises:
188            QQError: If execution fails or info file cannot be updated.
189        """
190        # update the qqinfo file
191        self._update_info_running()
192
193        # get the actual name of the script to execute
194        script = logical_resolve(Path(self._informer.info.script_name))
195
196        # get paths to output files
197        stdout_log = self._informer.info.stdout_file
198        stderr_log = self._informer.info.stderr_file
199
200        logger.info(f"Executing script '{script}'.")
201
202        # get intepreter if configured, otherwise use the default
203        interpreter = self._informer.info.interpreter or Interpreter()
204
205        # get the command to execute
206        command_list = [*interpreter.to_command_list(), str(script)]
207        logger.debug(f"Command executed using subprocess.Popen: {command_list}")
208
209        try:
210            with Path(stdout_log).open("w") as out, Path(stderr_log).open("w") as err:
211                self._process = subprocess.Popen(
212                    command_list,
213                    stdout=out,
214                    stderr=err,
215                    text=True,
216                )
217
218                # wait for the process to finish in a non-blocking manner
219                while self._process.poll() is None:
220                    sleep(CFG.runner.subprocess_checks_wait_time)
221
222        except Exception as e:
223            raise QQError(f"Failed to execute script '{script}': {e}") from e
224
225        # if the script returns an exit code corresponding to CFG.exit_codes.qq_run_no_resubmit,
226        # do not submit the next cycle of the job but return 0
227        if (
228            self._informer.info.job_type in [JobType.LOOP, JobType.CONTINUOUS]
229            and self._process.returncode == CFG.exit_codes.qq_run_no_resubmit
230        ):
231            logger.debug(
232                f"Detected an exit code of '{self._process.returncode}'. Replacing with '0' and will not submit the next cycle of the job."
233            )
234            self._process.returncode = 0
235            self._should_resubmit = False
236
237        return self._process.returncode

Execute the job script in the working directory.

Returns:

int: The exit code from the executed script.

Raises:
  • QQError: If execution fails or info file cannot be updated.
def finalize(self) -> None:
239    def finalize(self) -> None:
240        """
241        Finalize the execution of the job script.
242
243        Handles post-processing of the job based on the script's exit code and the
244        configured transfer and archive modes. The specific actions taken depend on
245        the job's transfer mode, archive mode, and whether scratch directory is being used.
246
247        Specifically, this method:
248
249        1. Archives files from the working directory if archiving is enabled and the
250            archive mode allows it for the given exit code (loop jobs only).
251        2. Transfers or handles files based on whether scratch directory is used:
252            - If using scratch and transfer mode allows: Syncs the entire working
253                directory back to the input directory (excluding explicitly included files)
254                and removes the working directory from scratch.
255            - If using scratch and transfer mode disallows: Copies only runtime files
256                to the input directory and preserves the working directory.
257            - If not using scratch: No file operations are performed.
258        3. Updates the qq info file to "finished" (exit code 0) or "failed" (non-zero
259            exit code).
260        4. Resubmits the job if it is a loop or continuous job and was completed successfully.
261
262        Raises:
263            QQError: If copying, deletion, or archiving of files fails or if the resubmission fails.
264        """
265        logger.info("Finalizing the execution.")
266        assert self._process is not None
267
268        # archive files
269        if self._archiver and self._informer.should_archive_files(
270            self._process.returncode
271        ):
272            logger.debug(
273                f"Script exit code is '{self._process.returncode}'. Archiving files."
274            )
275            self._archive_files_from_work_dir()
276
277        # transfer files back to the input (submission) directory
278        if self._use_scratch:
279            if self._informer.should_transfer_files(self._process.returncode):
280                logger.debug(
281                    f"Script exit code is '{self._process.returncode}'. Transferring files from working directory."
282                )
283
284                Retryer(
285                    self._batch_system.sync_with_exclusions,
286                    self._work_dir,
287                    self._input_dir,
288                    socket.getfqdn(),
289                    self._informer.info.input_machine,
290                    # exclude files that were copied to workdir from the outside of input dir (--include option)
291                    # these files should not be copied to the input directory, since they were never inside it
292                    self._get_explicitly_included_files_in_work_dir(),
293                    max_tries=CFG.runner.retry_tries,
294                    wait_seconds=CFG.runner.retry_wait,
295                ).run()
296
297                # remove the working directory from scratch
298                self._delete_work_dir()
299            else:
300                # copy only the runtime files to input directory
301                # and keep the working directory
302                self._copy_runtime_files_to_input_dir(retry=True)
303
304        if self._process.returncode == 0:
305            # update the qqinfo file
306            self._update_info_finished()
307
308            # if this is a loop/continuous job
309            if self._informer.info.job_type in [JobType.LOOP, JobType.CONTINUOUS]:
310                self._resubmit()
311        else:
312            # update the qqinfo file
313            self._update_info_failed(self._process.returncode)
314
315        logger.info(f"Job completed with an exit code of {self._process.returncode}.")

Finalize the execution of the job script.

Handles post-processing of the job based on the script's exit code and the configured transfer and archive modes. The specific actions taken depend on the job's transfer mode, archive mode, and whether scratch directory is being used.

Specifically, this method:

  1. Archives files from the working directory if archiving is enabled and the archive mode allows it for the given exit code (loop jobs only).
  2. Transfers or handles files based on whether scratch directory is used:
    • If using scratch and transfer mode allows: Syncs the entire working directory back to the input directory (excluding explicitly included files) and removes the working directory from scratch.
    • If using scratch and transfer mode disallows: Copies only runtime files to the input directory and preserves the working directory.
    • If not using scratch: No file operations are performed.
  3. Updates the qq info file to "finished" (exit code 0) or "failed" (non-zero exit code).
  4. Resubmits the job if it is a loop or continuous job and was completed successfully.
Raises:
  • QQError: If copying, deletion, or archiving of files fails or if the resubmission fails.
def log_failure_and_exit(self, exception: BaseException) -> NoReturn:
317    def log_failure_and_exit(self, exception: BaseException) -> NoReturn:
318        """
319        Record a failure state into the qq info file and exit the program.
320
321        Args:
322            exception (BaseException): The exception to log.
323
324        Raises:
325            SystemExit: Always exits with the exit code associated with the given exception.
326        """
327        exit_code = getattr(exception, "exit_code", CFG.exit_codes.unexpected_error)
328        try:
329            self._update_info_failed(exit_code)
330            logger.error(exception)
331            sys.exit(exit_code)
332        except Exception as e:
333            # unable to log the current state into the info file
334            log_fatal_error_and_exit(e)  # exits here

Record a failure state into the qq info file and exit the program.

Arguments:
  • exception (BaseException): The exception to log.
Raises:
  • SystemExit: Always exits with the exit code associated with the given exception.