qq_lib.run
Execution utilities for running qq jobs inside the batch environment.
This module defines the Runner class, which prepares the execution
environment, launches the user's job script, updates qq's state tracking,
and performs cleanup on success, failure, or interruption. It handles both
shared and scratch working directories, loop-job archiving, resubmission,
communication with the batch system, and SIGTERM-safe shutdown.
1# Released under MIT License. 2# Copyright (c) 2025-2026 Ladislav Bartos and Robert Vacha Lab 3 4""" 5Execution utilities for running qq jobs inside the batch environment. 6 7This module defines the `Runner` class, which prepares the execution 8environment, launches the user's job script, updates qq's state tracking, 9and performs cleanup on success, failure, or interruption. It handles both 10shared and scratch working directories, loop-job archiving, resubmission, 11communication with the batch system, and SIGTERM-safe shutdown. 12""" 13 14from .runner import Runner 15 16__all__ = [ 17 "Runner", 18]
40class Runner: 41 """ 42 Manages the setup, execution, and cleanup of scripts within the qq batch environment. 43 44 The Runner class is responsible for: 45 - Preparing a working directory (shared or scratch space) 46 - Executing a provided job script 47 - Updating the job info file with run state, success, or failure 48 - Cleaning up resources when execution is finished 49 """ 50 51 def __init__(self, info_file: Path, host: str): 52 """ 53 Initialize a new Runner instance. 54 55 Args: 56 info_file (Path): Path to the qq info file that contains job metadata. 57 host (str): The hostname of the input machine from which the job was submitted. 58 59 Raises: 60 QQRunFatalError: If loading the QQ info file fails fatally during initialization. 61 """ 62 # install a signal handler 63 signal.signal(signal.SIGTERM, self._handle_sigterm) 64 65 # process running the wrapped script 66 self._process: subprocess.Popen[str] | None = None 67 68 self._info_file = Path(info_file) 69 logger.debug(f"Info file: '{self._info_file}'.") 70 71 self._input_machine = host 72 logger.debug(f"Input machine: '{self._input_machine}'.") 73 74 # load the info file or raise a fatal qq error if this fails 75 try: 76 # get the batch system from the environment variable (or guess it) 77 self._batch_system = BatchInterface.from_env_var_or_guess() 78 logger.debug(f"Batch system: {str(self._batch_system)}.") 79 80 # get the id of the job from the batch system 81 if not (job_id := self._batch_system.get_job_id()): 82 raise QQError("Job has no associated job id") 83 84 # load the info file 85 self._informer: Informer = Retryer( 86 Informer.from_file, 87 self._info_file, 88 host=self._input_machine, 89 max_tries=CFG.runner.retry_tries, 90 wait_seconds=CFG.runner.retry_wait, 91 ).run() 92 93 # check that the id of this job matches the job id in the info file 94 if not self._informer.matches_job(job_id): 95 raise QQJobMismatchError( 96 "Info file does not correspond to the current job" 97 ) 98 99 # check that the batch system in info file matches the one loaded from the environment variable 100 if self._batch_system != self._informer.batch_system: 101 raise QQError( 102 f"Batch system mismatch - env var: '{str(self._batch_system)}', info file: '{self._informer.batch_system}'" 103 ) 104 105 except Exception as e: 106 raise QQRunFatalError( 107 f"Unable to load valid qq info file '{self._info_file}' on '{self._input_machine}': {e}" 108 ) from e 109 110 logger.info( 111 f"[qq-{str(self._batch_system)} v{qq_lib.__version__}] Initializing " 112 f"job '{self._informer.info.job_id}' on host '{socket.getfqdn()}'." 113 ) 114 115 # get input directory 116 self._input_dir = Path(self._informer.info.input_dir) 117 logger.debug(f"Input directory: {self._input_dir}.") 118 119 # should the scratch directory be used? 120 self._use_scratch = self._informer.uses_scratch() 121 logger.debug(f"Use scratch: {self._use_scratch}.") 122 123 # initialize archiver, if this is a loop job 124 if loop_info := self._informer.info.loop_info: 125 self._archiver = Archiver( 126 loop_info.archive, 127 loop_info.archive_format, 128 self._informer.info.input_machine, 129 self._informer.info.input_dir, 130 self._batch_system, 131 ) 132 self._should_resubmit = True 133 else: 134 self._archiver = None 135 136 if self._informer.info.job_type == JobType.CONTINUOUS: 137 self._should_resubmit = True 138 139 def prepare(self) -> None: 140 """ 141 Prepare the script for execution, setting up the archive 142 and archiving run time files (if this is a loop job) and 143 preparing working directory. 144 145 Raises: 146 QQError: If working directory setup fails. 147 """ 148 if self._archiver: 149 assert self._informer.info.loop_info is not None 150 # prepare the directory for archiving 151 self._archiver.make_archive_dir() 152 153 # archive runtime files from the previous cycle 154 # this has to be done before the working directory is prepared, 155 # otherwise the runtime files would get copied to the working directory 156 logger.debug( 157 f"Archiving run time files from cycle {self._informer.info.loop_info.current - 1}." 158 ) 159 self._archiver.archive_runtime_files( 160 # we need to escape the '+' character 161 construct_loop_job_name( 162 self._informer.info.script_name, 163 self._informer.info.loop_info.current - 1, 164 ).replace("+", "\\+"), 165 self._informer.info.loop_info.current - 1, 166 ) 167 168 if self._use_scratch: 169 self._set_up_scratch_dir() 170 else: 171 self._set_up_shared_dir() 172 173 if self._archiver: 174 assert self._informer.info.loop_info is not None 175 # fetch files for the current cycle of the loop job from the archive 176 self._archiver.from_archive( 177 self._work_dir, self._informer.info.loop_info.current 178 ) 179 180 def execute(self) -> int: 181 """ 182 Execute the job script in the working directory. 183 184 Returns: 185 int: The exit code from the executed script. 186 187 Raises: 188 QQError: If execution fails or info file cannot be updated. 189 """ 190 # update the qqinfo file 191 self._update_info_running() 192 193 # get the actual name of the script to execute 194 script = logical_resolve(Path(self._informer.info.script_name)) 195 196 # get paths to output files 197 stdout_log = self._informer.info.stdout_file 198 stderr_log = self._informer.info.stderr_file 199 200 logger.info(f"Executing script '{script}'.") 201 202 # get intepreter if configured, otherwise use the default 203 interpreter = self._informer.info.interpreter or Interpreter() 204 205 # get the command to execute 206 command_list = [*interpreter.to_command_list(), str(script)] 207 logger.debug(f"Command executed using subprocess.Popen: {command_list}") 208 209 try: 210 with Path(stdout_log).open("w") as out, Path(stderr_log).open("w") as err: 211 self._process = subprocess.Popen( 212 command_list, 213 stdout=out, 214 stderr=err, 215 text=True, 216 ) 217 218 # wait for the process to finish in a non-blocking manner 219 while self._process.poll() is None: 220 sleep(CFG.runner.subprocess_checks_wait_time) 221 222 except Exception as e: 223 raise QQError(f"Failed to execute script '{script}': {e}") from e 224 225 # if the script returns an exit code corresponding to CFG.exit_codes.qq_run_no_resubmit, 226 # do not submit the next cycle of the job but return 0 227 if ( 228 self._informer.info.job_type in [JobType.LOOP, JobType.CONTINUOUS] 229 and self._process.returncode == CFG.exit_codes.qq_run_no_resubmit 230 ): 231 logger.debug( 232 f"Detected an exit code of '{self._process.returncode}'. Replacing with '0' and will not submit the next cycle of the job." 233 ) 234 self._process.returncode = 0 235 self._should_resubmit = False 236 237 return self._process.returncode 238 239 def finalize(self) -> None: 240 """ 241 Finalize the execution of the job script. 242 243 Handles post-processing of the job based on the script's exit code and the 244 configured transfer and archive modes. The specific actions taken depend on 245 the job's transfer mode, archive mode, and whether scratch directory is being used. 246 247 Specifically, this method: 248 249 1. Archives files from the working directory if archiving is enabled and the 250 archive mode allows it for the given exit code (loop jobs only). 251 2. Transfers or handles files based on whether scratch directory is used: 252 - If using scratch and transfer mode allows: Syncs the entire working 253 directory back to the input directory (excluding explicitly included files) 254 and removes the working directory from scratch. 255 - If using scratch and transfer mode disallows: Copies only runtime files 256 to the input directory and preserves the working directory. 257 - If not using scratch: No file operations are performed. 258 3. Updates the qq info file to "finished" (exit code 0) or "failed" (non-zero 259 exit code). 260 4. Resubmits the job if it is a loop or continuous job and was completed successfully. 261 262 Raises: 263 QQError: If copying, deletion, or archiving of files fails or if the resubmission fails. 264 """ 265 logger.info("Finalizing the execution.") 266 assert self._process is not None 267 268 # archive files 269 if self._archiver and self._informer.should_archive_files( 270 self._process.returncode 271 ): 272 logger.debug( 273 f"Script exit code is '{self._process.returncode}'. Archiving files." 274 ) 275 self._archive_files_from_work_dir() 276 277 # transfer files back to the input (submission) directory 278 if self._use_scratch: 279 if self._informer.should_transfer_files(self._process.returncode): 280 logger.debug( 281 f"Script exit code is '{self._process.returncode}'. Transferring files from working directory." 282 ) 283 284 Retryer( 285 self._batch_system.sync_with_exclusions, 286 self._work_dir, 287 self._input_dir, 288 socket.getfqdn(), 289 self._informer.info.input_machine, 290 # exclude files that were copied to workdir from the outside of input dir (--include option) 291 # these files should not be copied to the input directory, since they were never inside it 292 self._get_explicitly_included_files_in_work_dir(), 293 max_tries=CFG.runner.retry_tries, 294 wait_seconds=CFG.runner.retry_wait, 295 ).run() 296 297 # remove the working directory from scratch 298 self._delete_work_dir() 299 else: 300 # copy only the runtime files to input directory 301 # and keep the working directory 302 self._copy_runtime_files_to_input_dir(retry=True) 303 304 if self._process.returncode == 0: 305 # update the qqinfo file 306 self._update_info_finished() 307 308 # if this is a loop/continuous job 309 if self._informer.info.job_type in [JobType.LOOP, JobType.CONTINUOUS]: 310 self._resubmit() 311 else: 312 # update the qqinfo file 313 self._update_info_failed(self._process.returncode) 314 315 logger.info(f"Job completed with an exit code of {self._process.returncode}.") 316 317 def log_failure_and_exit(self, exception: BaseException) -> NoReturn: 318 """ 319 Record a failure state into the qq info file and exit the program. 320 321 Args: 322 exception (BaseException): The exception to log. 323 324 Raises: 325 SystemExit: Always exits with the exit code associated with the given exception. 326 """ 327 exit_code = getattr(exception, "exit_code", CFG.exit_codes.unexpected_error) 328 try: 329 self._update_info_failed(exit_code) 330 logger.error(exception) 331 sys.exit(exit_code) 332 except Exception as e: 333 # unable to log the current state into the info file 334 log_fatal_error_and_exit(e) # exits here 335 336 def _set_up_shared_dir(self) -> None: 337 """ 338 Configure the input directory as the working directory. 339 """ 340 # set qq working directory to the input dir 341 self._work_dir = self._input_dir 342 343 # move to the working directory 344 Retryer( 345 os.chdir, 346 self._work_dir, 347 max_tries=CFG.runner.retry_tries, 348 wait_seconds=CFG.runner.retry_wait, 349 ).run() 350 351 def _set_up_scratch_dir(self) -> None: 352 """ 353 Configure a scratch directory as the working directory. 354 355 Copies all files from the job directory to the working directory 356 (excluding the qq info file). 357 358 Raises: 359 QQError: If scratch directory cannot be determined. 360 """ 361 # get path to the working directory (created by the batch system) 362 self._work_dir: Path = Retryer( 363 self._batch_system.create_work_dir_on_scratch, 364 self._informer.info.job_id, 365 max_tries=CFG.runner.retry_tries, 366 wait_seconds=CFG.runner.retry_wait, 367 ).run() 368 369 logger.info(f"Setting up working directory in '{self._work_dir}'.") 370 371 # move to the working directory 372 Retryer( 373 os.chdir, 374 self._work_dir, 375 max_tries=CFG.runner.retry_tries, 376 wait_seconds=CFG.runner.retry_wait, 377 ).run() 378 379 # files excluded from copying to the working directory 380 qq_out = ( 381 self._informer.info.input_dir / self._informer.info.job_name 382 ).with_suffix(CFG.suffixes.qq_out) 383 excluded = self._informer.info.excluded_files + [self._info_file, qq_out] 384 if self._archiver: 385 excluded.append(self._archiver._archive) 386 387 # copy files from the input directory to the working directory 388 logger.debug( 389 f"Files excluded from being copied to the working directory: {excluded}." 390 ) 391 Retryer( 392 self._batch_system.sync_with_exclusions, 393 self._input_dir, 394 self._work_dir, 395 self._informer.info.input_machine, 396 socket.getfqdn(), 397 excluded, 398 max_tries=CFG.runner.retry_tries, 399 wait_seconds=CFG.runner.retry_wait, 400 ).run() 401 402 # copy explicitly included files to the working directory 403 # this will copy files that were specified with the --include option, even if they are also in the list of excluded files 404 logger.debug( 405 f"Files explicitly requested to be copied to the working directory: {self._informer.info.included_files}." 406 ) 407 Retryer( 408 self._copy_files, 409 self._informer.info.included_files, 410 max_tries=CFG.runner.retry_tries, 411 wait_seconds=CFG.runner.retry_wait, 412 ).run() 413 414 def _delete_work_dir(self) -> None: 415 """ 416 Delete the entire working directory. 417 418 Used only after successful execution in scratch space. 419 """ 420 logger.debug(f"Removing working directory '{self._work_dir}'.") 421 Retryer( 422 shutil.rmtree, 423 self._work_dir, 424 max_tries=CFG.runner.retry_tries, 425 wait_seconds=CFG.runner.retry_wait, 426 ).run() 427 428 def _update_info_running(self) -> None: 429 """ 430 Update the qq info file to mark the job as running. 431 432 Raises: 433 QQRunCommunicationError: If the job was killed without informing Runner. 434 QQError: If the info file cannot be updated. 435 """ 436 logger.debug(f"Updating '{self._info_file}' at job start.") 437 self._reload_info_and_ensure_valid() 438 439 try: 440 nodes = Retryer( 441 self._get_nodes, 442 max_tries=CFG.runner.retry_tries, 443 wait_seconds=CFG.runner.retry_wait, 444 ).run() 445 446 self._informer.set_running( 447 datetime.now(), 448 socket.getfqdn(), 449 nodes, 450 self._work_dir, 451 ) 452 453 Retryer( 454 self._informer.to_file, 455 self._info_file, 456 host=self._input_machine, 457 max_tries=CFG.runner.retry_tries, 458 wait_seconds=CFG.runner.retry_wait, 459 ).run() 460 except Exception as e: 461 raise QQError( 462 f"Could not update qqinfo file '{self._info_file}' at JOB START: {e}." 463 ) from e 464 465 def _get_nodes(self) -> list[str]: 466 """ 467 Get a list of nodes used to execute this job. The nodes are obtained by 468 querying the batch system. 469 470 If the batch server is not available and only one node was requested, uses 471 `socket.getfqdn()` instead and prints warning. 472 473 Returns: 474 list[str]: Names of nodes used to execute the job. 475 476 Raises: 477 QQError: If the batch system is unable to provide information about the nodes after retries 478 and more than one node is used. 479 """ 480 nodes = self._informer.get_nodes() 481 if not nodes: 482 # if the batch server is not reachable but the requested number of nodes is one, 483 # we assume that only one node is actually being used and Runner thus runs on this node 484 # we can then get the node name from socket 485 # this avoids issues with occasional inaccessibility of the batch server in 486 # the unstable Metacentrum environment 487 if self._informer.info.resources.nnodes == 1: 488 node = socket.getfqdn() 489 logger.warning( 490 f"Could not get the list of used nodes from the batch server. Assuming the only node is the current node '{node}'." 491 ) 492 return [node] 493 494 raise QQError("Could not get the list of used nodes from the batch server") 495 496 return nodes 497 498 def _update_info_finished(self) -> None: 499 """ 500 Update the qq info file to mark the job as successfully finished. 501 502 Logs errors as warnings if updating fails. 503 504 Raises: 505 QQRunCommunicationError: If the job was killed without informing Runner. 506 """ 507 logger.debug(f"Updating '{self._info_file}' at job completion.") 508 self._reload_info_and_ensure_valid() 509 510 try: 511 self._informer.set_finished(datetime.now()) 512 Retryer( 513 self._informer.to_file, 514 self._info_file, 515 host=self._input_machine, 516 max_tries=CFG.runner.retry_tries, 517 wait_seconds=CFG.runner.retry_wait, 518 ).run() 519 except Exception as e: 520 logger.warning( 521 f"Could not update qqinfo file '{self._info_file}' at JOB COMPLETION: {e}." 522 ) 523 524 def _update_info_failed(self, return_code: int) -> None: 525 """ 526 Update the qq info file to mark the job as failed. 527 528 Args: 529 return_code (int): Exit code from the failed job. 530 531 Logs errors as warnings if updating fails. 532 533 Raises: 534 QQRunCommunicationError: If the job was killed without informing Runner. 535 """ 536 logger.debug(f"Updating '{self._info_file}' at job failure.") 537 self._reload_info_and_ensure_valid() 538 539 try: 540 self._informer.set_failed(datetime.now(), return_code) 541 Retryer( 542 self._informer.to_file, 543 self._info_file, 544 host=self._input_machine, 545 max_tries=CFG.runner.retry_tries, 546 wait_seconds=CFG.runner.retry_wait, 547 ).run() 548 except Exception as e: 549 logger.warning( 550 f"Could not update qqinfo file '{self._info_file}' at JOB FAILURE: {e}." 551 ) 552 553 def _update_info_killed(self) -> None: 554 """ 555 Update the qq info file to mark the job as killed. 556 557 Used during SIGTERM cleanup. 558 559 Logs errors as warnings if updating fails. 560 561 No retrying since there is no time for that. 562 """ 563 logger.debug(f"Updating '{self._info_file}' at job kill.") 564 self._reload_info_and_ensure_valid(retry=False) 565 566 try: 567 self._informer.set_killed(datetime.now()) 568 # no retrying here since we cannot afford multiple attempts here 569 self._informer.to_file(self._info_file, host=self._input_machine) 570 except Exception as e: 571 logger.warning( 572 f"Could not update qqinfo file '{self._info_file}' at JOB KILL: {e}." 573 ) 574 575 def _copy_runtime_files_to_input_dir(self, retry: bool = True) -> None: 576 """ 577 Copy .out and .err runtime files from the working directory to the input directory. 578 579 Args: 580 retry (bool): Retry the copying if it fails. 581 582 Raises: 583 QQError: If the files could not be copied after retrying. 584 """ 585 files_to_copy = [ 586 logical_resolve(Path(self._informer.info.stdout_file)), 587 logical_resolve(Path(self._informer.info.stderr_file)), 588 ] 589 590 logger.debug(f"Copying runtime files '{files_to_copy}' to input directory.") 591 592 if retry: 593 Retryer( 594 self._batch_system.sync_selected, 595 self._work_dir, 596 self._input_dir, 597 socket.getfqdn(), 598 self._informer.info.input_machine, 599 include_files=files_to_copy, 600 max_tries=CFG.runner.retry_tries, 601 wait_seconds=CFG.runner.retry_wait, 602 ).run() 603 else: 604 self._batch_system.sync_selected( 605 self._work_dir, 606 self._input_dir, 607 socket.getfqdn(), 608 self._informer.info.input_machine, 609 files_to_copy, 610 ) 611 612 def _reload_info(self, retry: bool = True) -> None: 613 """ 614 Reload the qq job info file for this job. 615 616 Args: 617 retry (bool): Retry the loading operation if it fails. 618 619 Raises: 620 QQError: If the qq info file cannot be reach or read after retrying. 621 """ 622 if retry: 623 self._informer = Retryer( 624 Informer.from_file, 625 self._info_file, 626 host=self._input_machine, 627 max_tries=CFG.runner.retry_tries, 628 wait_seconds=CFG.runner.retry_wait, 629 ).run() 630 else: 631 self._informer = Informer.from_file(self._info_file, self._input_machine) 632 633 def _ensure_matches_job(self, job_id: str) -> None: 634 """ 635 Ensure that the provided job_id matches the job id in the wrapped informer. 636 637 Raises: 638 QQJobMismatchError: If the info file corresponds to a different job. 639 """ 640 if not self._informer.matches_job(job_id): 641 raise QQJobMismatchError( 642 f"Info file '{self._info_file}' does not correspond to job '{job_id}'." 643 ) 644 645 def _ensure_not_killed(self) -> None: 646 """ 647 Ensure that the job has not been killed. 648 649 Raises: 650 QQRunCommunicationError: If the job state is `KILLED`. 651 """ 652 if self._informer.info.job_state == NaiveState.KILLED: 653 raise QQRunCommunicationError( 654 "Job has been killed without informing qq run. Aborting the job!" 655 ) 656 657 def _reload_info_and_ensure_valid(self, retry: bool = False) -> None: 658 """ 659 Reload the qq job info file and check that it corresponds to the current job 660 by comparing job ids. 661 662 Then check the job's state and ensure it is not killed. 663 664 Args: 665 retry (bool): Retry the loading operation if it fails. 666 667 Raises: 668 QQJobMismatchError: If the info file corresponds to a different job. 669 QQRunCommunicationError: If the job state is `KILLED`. 670 QQError: If the qq info file cannot be reached or read. 671 """ 672 job_id = self._informer.info.job_id 673 self._reload_info(retry) 674 self._ensure_matches_job(job_id) 675 self._ensure_not_killed() 676 677 def _resubmit(self) -> None: 678 """ 679 Resubmit the current job if either of the following is true: 680 a) it is a loop job and additional cycles remain, 681 b) it is a continuous job that should be resubmitted. 682 683 Raises: 684 QQError: If the job cannot be resubmitted. 685 """ 686 if not self._should_resubmit: 687 logger.info( 688 f"The script finished with an exit code of '{CFG.exit_codes.qq_run_no_resubmit}' indicating that the next cycle of the job should not be submitted. Not resubmitting." 689 ) 690 return 691 692 if self._informer.info.job_type == JobType.LOOP: 693 if not (loop_info := self._informer.info.loop_info): 694 raise QQError( 695 "Loop info is undefined while resubmiting a loop job. This is a bug!" 696 ) 697 return 698 699 if loop_info.current >= loop_info.end: 700 logger.info( 701 "This was the final cycle of the loop job. Not resubmitting." 702 ) 703 return 704 705 logger.info("Resubmitting the job.") 706 resubmitter = Resubmitter.from_informer(self._informer) 707 job_id = resubmitter.resubmit() 708 709 logger.info(f"Job resubmitted successfully as '{job_id}'.") 710 711 def _archive_files_from_work_dir(self) -> None: 712 """ 713 Archive files from the working directory. 714 715 If no file exists for the next loop cycle, creates an empty init file to ensure the loop job continues normally. 716 """ 717 if not self._archiver: 718 raise QQError("Archiver is undefined while archiving files. This is a bug!") 719 720 if not (loop_info := self._informer.info.loop_info): 721 raise QQError( 722 "Loop info is undefined while archiving files. This is a bug!" 723 ) 724 725 # get the files to archive corresponding to the next loop job cycle 726 if not self._archiver.get_files_matching_pattern( 727 self._work_dir, 728 None, 729 loop_info.archive_format, 730 loop_info.current + 1, 731 False, 732 ): 733 # if there are no files matching the next loop job cycle, create an empty .init file 734 # so that the loop job continues normally 735 logger.debug( 736 f"Creating .init file for loop job cycle {loop_info.current + 1}." 737 ) 738 self._archiver.create_init_file(loop_info.current + 1) 739 740 # archive all files matching the archive format 741 self._archiver.to_archive(self._work_dir) 742 743 def _get_explicitly_included_files_in_work_dir(self) -> list[Path]: 744 """ 745 Return absolute paths to files and directories in the working directory 746 that were explicitly copied via the `--include` submission option. 747 """ 748 files = [ 749 logical_resolve(self._work_dir / f.name) 750 for f in self._informer.info.included_files 751 ] 752 753 logger.debug( 754 f"Files that were copied to work dir using the `--include` option: {files}." 755 ) 756 757 return files 758 759 def _copy_files(self, files: list[Path]): 760 """ 761 Copy files and directories using the provided absolute paths to the working directory. 762 """ 763 for file in files: 764 # we rsync each file or directory individually because each file can be provided in a different directory 765 # this may be very slow if there is a large amount of files/directories to include 766 self._batch_system.sync_selected( 767 file.parent, 768 self._work_dir, 769 self._informer.info.input_machine, 770 socket.getfqdn(), 771 [file], 772 ) 773 774 def _cleanup(self) -> None: 775 """ 776 Clean up after execution is interrupted or killed. 777 778 - Copies .out and .err file to the input directory. 779 - Marks job as killed in the info file. 780 - Terminates the subprocess. 781 """ 782 # update the qq info file 783 self._update_info_killed() 784 785 # send SIGTERM to the running process, if there is any 786 # this may potentially not even be called -- the subprocess might be already terminated 787 if self._process and self._process.poll() is None: 788 logger.info("Cleaning up: terminating subprocess.") 789 self._process.terminate() 790 791 # wait for the subprocess to exit, then SIGKILL it 792 sleep(CFG.runner.sigterm_to_sigkill) 793 if self._process and self._process.poll() is None: 794 self._process.kill() 795 796 # copy runtime files to input dir without retrying 797 if self._use_scratch: 798 self._copy_runtime_files_to_input_dir(retry=False) 799 800 def _handle_sigterm(self, _signum: int, _frame: FrameType | None) -> NoReturn: 801 """ 802 Signal handler for SIGTERM. 803 804 Performs cleanup, logs termination, and exits. 805 """ 806 logger.info("Received SIGTERM, initiating shutdown.") 807 self._cleanup() 808 logger.error("Execution was terminated by SIGTERM.") 809 # this may get ignored by the batch system 810 # so you should not rely on this specific exit code 811 sys.exit(143)
Manages the setup, execution, and cleanup of scripts within the qq batch environment.
The Runner class is responsible for:
- Preparing a working directory (shared or scratch space)
- Executing a provided job script
- Updating the job info file with run state, success, or failure
- Cleaning up resources when execution is finished
51 def __init__(self, info_file: Path, host: str): 52 """ 53 Initialize a new Runner instance. 54 55 Args: 56 info_file (Path): Path to the qq info file that contains job metadata. 57 host (str): The hostname of the input machine from which the job was submitted. 58 59 Raises: 60 QQRunFatalError: If loading the QQ info file fails fatally during initialization. 61 """ 62 # install a signal handler 63 signal.signal(signal.SIGTERM, self._handle_sigterm) 64 65 # process running the wrapped script 66 self._process: subprocess.Popen[str] | None = None 67 68 self._info_file = Path(info_file) 69 logger.debug(f"Info file: '{self._info_file}'.") 70 71 self._input_machine = host 72 logger.debug(f"Input machine: '{self._input_machine}'.") 73 74 # load the info file or raise a fatal qq error if this fails 75 try: 76 # get the batch system from the environment variable (or guess it) 77 self._batch_system = BatchInterface.from_env_var_or_guess() 78 logger.debug(f"Batch system: {str(self._batch_system)}.") 79 80 # get the id of the job from the batch system 81 if not (job_id := self._batch_system.get_job_id()): 82 raise QQError("Job has no associated job id") 83 84 # load the info file 85 self._informer: Informer = Retryer( 86 Informer.from_file, 87 self._info_file, 88 host=self._input_machine, 89 max_tries=CFG.runner.retry_tries, 90 wait_seconds=CFG.runner.retry_wait, 91 ).run() 92 93 # check that the id of this job matches the job id in the info file 94 if not self._informer.matches_job(job_id): 95 raise QQJobMismatchError( 96 "Info file does not correspond to the current job" 97 ) 98 99 # check that the batch system in info file matches the one loaded from the environment variable 100 if self._batch_system != self._informer.batch_system: 101 raise QQError( 102 f"Batch system mismatch - env var: '{str(self._batch_system)}', info file: '{self._informer.batch_system}'" 103 ) 104 105 except Exception as e: 106 raise QQRunFatalError( 107 f"Unable to load valid qq info file '{self._info_file}' on '{self._input_machine}': {e}" 108 ) from e 109 110 logger.info( 111 f"[qq-{str(self._batch_system)} v{qq_lib.__version__}] Initializing " 112 f"job '{self._informer.info.job_id}' on host '{socket.getfqdn()}'." 113 ) 114 115 # get input directory 116 self._input_dir = Path(self._informer.info.input_dir) 117 logger.debug(f"Input directory: {self._input_dir}.") 118 119 # should the scratch directory be used? 120 self._use_scratch = self._informer.uses_scratch() 121 logger.debug(f"Use scratch: {self._use_scratch}.") 122 123 # initialize archiver, if this is a loop job 124 if loop_info := self._informer.info.loop_info: 125 self._archiver = Archiver( 126 loop_info.archive, 127 loop_info.archive_format, 128 self._informer.info.input_machine, 129 self._informer.info.input_dir, 130 self._batch_system, 131 ) 132 self._should_resubmit = True 133 else: 134 self._archiver = None 135 136 if self._informer.info.job_type == JobType.CONTINUOUS: 137 self._should_resubmit = True
Initialize a new Runner instance.
Arguments:
- info_file (Path): Path to the qq info file that contains job metadata.
- host (str): The hostname of the input machine from which the job was submitted.
Raises:
- QQRunFatalError: If loading the QQ info file fails fatally during initialization.
139 def prepare(self) -> None: 140 """ 141 Prepare the script for execution, setting up the archive 142 and archiving run time files (if this is a loop job) and 143 preparing working directory. 144 145 Raises: 146 QQError: If working directory setup fails. 147 """ 148 if self._archiver: 149 assert self._informer.info.loop_info is not None 150 # prepare the directory for archiving 151 self._archiver.make_archive_dir() 152 153 # archive runtime files from the previous cycle 154 # this has to be done before the working directory is prepared, 155 # otherwise the runtime files would get copied to the working directory 156 logger.debug( 157 f"Archiving run time files from cycle {self._informer.info.loop_info.current - 1}." 158 ) 159 self._archiver.archive_runtime_files( 160 # we need to escape the '+' character 161 construct_loop_job_name( 162 self._informer.info.script_name, 163 self._informer.info.loop_info.current - 1, 164 ).replace("+", "\\+"), 165 self._informer.info.loop_info.current - 1, 166 ) 167 168 if self._use_scratch: 169 self._set_up_scratch_dir() 170 else: 171 self._set_up_shared_dir() 172 173 if self._archiver: 174 assert self._informer.info.loop_info is not None 175 # fetch files for the current cycle of the loop job from the archive 176 self._archiver.from_archive( 177 self._work_dir, self._informer.info.loop_info.current 178 )
Prepare the script for execution, setting up the archive and archiving run time files (if this is a loop job) and preparing working directory.
Raises:
- QQError: If working directory setup fails.
180 def execute(self) -> int: 181 """ 182 Execute the job script in the working directory. 183 184 Returns: 185 int: The exit code from the executed script. 186 187 Raises: 188 QQError: If execution fails or info file cannot be updated. 189 """ 190 # update the qqinfo file 191 self._update_info_running() 192 193 # get the actual name of the script to execute 194 script = logical_resolve(Path(self._informer.info.script_name)) 195 196 # get paths to output files 197 stdout_log = self._informer.info.stdout_file 198 stderr_log = self._informer.info.stderr_file 199 200 logger.info(f"Executing script '{script}'.") 201 202 # get intepreter if configured, otherwise use the default 203 interpreter = self._informer.info.interpreter or Interpreter() 204 205 # get the command to execute 206 command_list = [*interpreter.to_command_list(), str(script)] 207 logger.debug(f"Command executed using subprocess.Popen: {command_list}") 208 209 try: 210 with Path(stdout_log).open("w") as out, Path(stderr_log).open("w") as err: 211 self._process = subprocess.Popen( 212 command_list, 213 stdout=out, 214 stderr=err, 215 text=True, 216 ) 217 218 # wait for the process to finish in a non-blocking manner 219 while self._process.poll() is None: 220 sleep(CFG.runner.subprocess_checks_wait_time) 221 222 except Exception as e: 223 raise QQError(f"Failed to execute script '{script}': {e}") from e 224 225 # if the script returns an exit code corresponding to CFG.exit_codes.qq_run_no_resubmit, 226 # do not submit the next cycle of the job but return 0 227 if ( 228 self._informer.info.job_type in [JobType.LOOP, JobType.CONTINUOUS] 229 and self._process.returncode == CFG.exit_codes.qq_run_no_resubmit 230 ): 231 logger.debug( 232 f"Detected an exit code of '{self._process.returncode}'. Replacing with '0' and will not submit the next cycle of the job." 233 ) 234 self._process.returncode = 0 235 self._should_resubmit = False 236 237 return self._process.returncode
Execute the job script in the working directory.
Returns:
int: The exit code from the executed script.
Raises:
- QQError: If execution fails or info file cannot be updated.
239 def finalize(self) -> None: 240 """ 241 Finalize the execution of the job script. 242 243 Handles post-processing of the job based on the script's exit code and the 244 configured transfer and archive modes. The specific actions taken depend on 245 the job's transfer mode, archive mode, and whether scratch directory is being used. 246 247 Specifically, this method: 248 249 1. Archives files from the working directory if archiving is enabled and the 250 archive mode allows it for the given exit code (loop jobs only). 251 2. Transfers or handles files based on whether scratch directory is used: 252 - If using scratch and transfer mode allows: Syncs the entire working 253 directory back to the input directory (excluding explicitly included files) 254 and removes the working directory from scratch. 255 - If using scratch and transfer mode disallows: Copies only runtime files 256 to the input directory and preserves the working directory. 257 - If not using scratch: No file operations are performed. 258 3. Updates the qq info file to "finished" (exit code 0) or "failed" (non-zero 259 exit code). 260 4. Resubmits the job if it is a loop or continuous job and was completed successfully. 261 262 Raises: 263 QQError: If copying, deletion, or archiving of files fails or if the resubmission fails. 264 """ 265 logger.info("Finalizing the execution.") 266 assert self._process is not None 267 268 # archive files 269 if self._archiver and self._informer.should_archive_files( 270 self._process.returncode 271 ): 272 logger.debug( 273 f"Script exit code is '{self._process.returncode}'. Archiving files." 274 ) 275 self._archive_files_from_work_dir() 276 277 # transfer files back to the input (submission) directory 278 if self._use_scratch: 279 if self._informer.should_transfer_files(self._process.returncode): 280 logger.debug( 281 f"Script exit code is '{self._process.returncode}'. Transferring files from working directory." 282 ) 283 284 Retryer( 285 self._batch_system.sync_with_exclusions, 286 self._work_dir, 287 self._input_dir, 288 socket.getfqdn(), 289 self._informer.info.input_machine, 290 # exclude files that were copied to workdir from the outside of input dir (--include option) 291 # these files should not be copied to the input directory, since they were never inside it 292 self._get_explicitly_included_files_in_work_dir(), 293 max_tries=CFG.runner.retry_tries, 294 wait_seconds=CFG.runner.retry_wait, 295 ).run() 296 297 # remove the working directory from scratch 298 self._delete_work_dir() 299 else: 300 # copy only the runtime files to input directory 301 # and keep the working directory 302 self._copy_runtime_files_to_input_dir(retry=True) 303 304 if self._process.returncode == 0: 305 # update the qqinfo file 306 self._update_info_finished() 307 308 # if this is a loop/continuous job 309 if self._informer.info.job_type in [JobType.LOOP, JobType.CONTINUOUS]: 310 self._resubmit() 311 else: 312 # update the qqinfo file 313 self._update_info_failed(self._process.returncode) 314 315 logger.info(f"Job completed with an exit code of {self._process.returncode}.")
Finalize the execution of the job script.
Handles post-processing of the job based on the script's exit code and the configured transfer and archive modes. The specific actions taken depend on the job's transfer mode, archive mode, and whether scratch directory is being used.
Specifically, this method:
- Archives files from the working directory if archiving is enabled and the archive mode allows it for the given exit code (loop jobs only).
- Transfers or handles files based on whether scratch directory is used:
- If using scratch and transfer mode allows: Syncs the entire working directory back to the input directory (excluding explicitly included files) and removes the working directory from scratch.
- If using scratch and transfer mode disallows: Copies only runtime files to the input directory and preserves the working directory.
- If not using scratch: No file operations are performed.
- Updates the qq info file to "finished" (exit code 0) or "failed" (non-zero exit code).
- Resubmits the job if it is a loop or continuous job and was completed successfully.
Raises:
- QQError: If copying, deletion, or archiving of files fails or if the resubmission fails.
317 def log_failure_and_exit(self, exception: BaseException) -> NoReturn: 318 """ 319 Record a failure state into the qq info file and exit the program. 320 321 Args: 322 exception (BaseException): The exception to log. 323 324 Raises: 325 SystemExit: Always exits with the exit code associated with the given exception. 326 """ 327 exit_code = getattr(exception, "exit_code", CFG.exit_codes.unexpected_error) 328 try: 329 self._update_info_failed(exit_code) 330 logger.error(exception) 331 sys.exit(exit_code) 332 except Exception as e: 333 # unable to log the current state into the info file 334 log_fatal_error_and_exit(e) # exits here
Record a failure state into the qq info file and exit the program.
Arguments:
- exception (BaseException): The exception to log.
Raises:
- SystemExit: Always exits with the exit code associated with the given exception.