qq_lib.batch.interface
Abstractions for integrating qq with HPC batch scheduling systems.
This module defines the core interfaces that allow qq to interact with multiple batch systems through a unified API. It provides:
BatchInterface: the central abstract interface that every batch-system backend implements. It defines operations such as job submission, job querying, directory synchronization, remote file access, resubmission, and navigation to compute nodes.BatchJobInterface,BatchNodeInterface, andBatchQueueInterface: lightweight abstractions representing jobs, nodes, and queues as reported by the underlying scheduler. These interfaces expose normalized metadata and allow qq to present consistent job/queue/node information regardless of scheduler differences.AnyBatchClass: a type alias for batch system implementations.
1# Released under MIT License. 2# Copyright (c) 2025-2026 Ladislav Bartos and Robert Vacha Lab 3 4""" 5Abstractions for integrating qq with HPC batch scheduling systems. 6 7This module defines the core interfaces that allow qq to interact with 8multiple batch systems through a unified API. It provides: 9 10- `BatchInterface`: the central abstract interface that every batch-system 11 backend implements. It defines operations such as job submission, job 12 querying, directory synchronization, remote file access, resubmission, and 13 navigation to compute nodes. 14 15- `BatchJobInterface`, `BatchNodeInterface`, and `BatchQueueInterface`: 16 lightweight abstractions representing jobs, nodes, and queues as reported 17 by the underlying scheduler. These interfaces expose normalized metadata 18 and allow qq to present consistent job/queue/node information regardless 19 of scheduler differences. 20 21- `AnyBatchClass`: a type alias for batch system implementations. 22""" 23 24from .interface import AnyBatchClass, BatchInterface 25from .job import BatchJobInterface 26from .node import BatchNodeInterface 27from .queue import BatchQueueInterface 28 29__all__ = [ 30 "BatchInterface", 31 "BatchJobInterface", 32 "BatchNodeInterface", 33 "BatchQueueInterface", 34 "AnyBatchClass", 35]
133class BatchInterface[ 134 TBatchJob: BatchJobInterface = BatchJobInterface, 135 TBatchQueue: BatchQueueInterface = BatchQueueInterface, 136 TBatchNode: BatchNodeInterface = BatchNodeInterface, 137](ABC, metaclass=_BatchMeta): 138 """ 139 Abstract base class for batch system integrations. 140 141 Concrete batch system classes must implement these methods to allow 142 qq to interact with different batch systems uniformly. 143 144 All functions should raise QQError when encountering an error. 145 """ 146 147 # magic number indicating unreachable directory when navigating to it 148 _CD_FAIL = 94 149 # exit code of ssh if connection fails 150 _SSH_FAIL = 255 151 152 @classmethod 153 def env_name(cls) -> str: 154 """ 155 Return the name of the batch system environment. 156 157 Returns: 158 str: The batch system name. 159 """ 160 raise NotImplementedError( 161 f"env_name method is not implemented for {cls.__name__}" 162 ) 163 164 @classmethod 165 def is_available(cls) -> bool: 166 """ 167 Determine whether the batch system is available on the current host. 168 169 Implementations typically verify this by checking for the presence 170 of required commands or other environment-specific indicators. 171 172 Returns: 173 bool: True if the batch system is available, False otherwise. 174 """ 175 raise NotImplementedError( 176 f"is_available method is not implemented for {cls.__name__}" 177 ) 178 179 @classmethod 180 def get_job_id(cls) -> str | None: 181 """ 182 Get the id of the current job from the corresponding batch system's environment variable. 183 184 For this method to work, it has to be called from the inside of an active job. 185 186 Returns: 187 str | None: Index of the job or None if the collective variable is not set. 188 """ 189 raise NotImplementedError( 190 f"get_job_id method is not implemented for {cls.__name__}" 191 ) 192 193 @classmethod 194 def create_work_dir_on_scratch(cls, job_id: str) -> Path: 195 """ 196 Create the working directory on scratch for the given job. 197 198 Args: 199 job_id (int): Unique identifier of the job. 200 201 Returns: 202 Path: Absolute path to the working directory on scratch. 203 204 Raises: 205 QQError: If the working directory could not be created. 206 """ 207 raise NotImplementedError( 208 f"create_work_dir_on_scratch method is not implemented for {cls.__name__}" 209 ) 210 211 @classmethod 212 def job_submit( 213 cls, 214 res: Resources, 215 queue: str, 216 script: Path, 217 job_name: str, 218 depend: list[Depend], 219 env_vars: dict[str, str], 220 account: str | None = None, 221 server: str | None = None, 222 remote_host: str | None = None, 223 ) -> str: 224 """ 225 Submit a job to the batch system. 226 227 Can also perform additional validation of the job's resources. 228 229 This method is NOT guaranteed to be thread-safe. 230 231 Args: 232 res (Resources): Resources required for the job. 233 queue (str): Target queue for the job submission. 234 script (Path): Path to the script to execute. 235 job_name (str): Name of the job to use. 236 depend (list[Depend]): List of job dependencies. 237 env_vars (dict[str, str]): Dictionary of environment variables to propagate to the job. 238 account (str | None): Optional account name to use for the job. 239 server (str | None): Optional name of the server to submit the job to. 240 remote_host (str | None): Optional name of the machine to submit the job from. 241 242 Returns: 243 str: Unique ID of the submitted job. 244 245 Raises: 246 QQError: If the job submission fails. 247 """ 248 raise NotImplementedError( 249 f"job_submit method is not implemented for {cls.__name__}" 250 ) 251 252 @classmethod 253 def job_kill(cls, job_id: str) -> None: 254 """ 255 Terminate a job gracefully. The job should have time for proper cleanup. 256 257 Args: 258 job_id (str): Identifier of the job to terminate. 259 260 Raises: 261 QQError: If the job could not be killed. 262 """ 263 raise NotImplementedError( 264 f"job_kill method is not implemented for {cls.__name__}" 265 ) 266 267 @classmethod 268 def job_kill_force(cls, job_id: str) -> None: 269 """ 270 Forcefully terminate a job. There may be no time for proper cleanup. 271 272 Args: 273 job_id (str): Identifier of the job to forcefully terminate. 274 275 Raises: 276 QQError: If the job could not be killed. 277 """ 278 raise NotImplementedError( 279 f"job_kill_force method is not implemented for {cls.__name__}" 280 ) 281 282 @classmethod 283 def get_batch_job(cls, job_id: str) -> TBatchJob: 284 """ 285 Retrieve information about a job from the batch system. 286 287 The returned object should be fully initialized, even if the job 288 no longer exists or its information is unavailable. 289 290 Args: 291 job_id (str): Identifier of the job. 292 293 Returns: 294 BatchJobInterface: Object containing the job's metadata and state. 295 """ 296 raise NotImplementedError( 297 f"get_batch_job method is not implemented for {cls.__name__}" 298 ) 299 300 @classmethod 301 def get_batch_jobs_from_ids(cls, job_ids: list[str]) -> list[TBatchJob]: 302 """ 303 Retrieve information about multiple jobs from the batch system. 304 305 Batch jobs should be returned in the same order as they appear in `job_ids`. 306 A TBatchJob object should be returned for each job id, even if the job 307 no longer exists or its information is unavailable. 308 309 Array jobs should NOT be expanded into their individual tasks. 310 311 The default implementation is to call `get_batch_job` for each job id. 312 This implementation may be inefficient for large numbers of job ids and 313 should be overriden by subclasses. 314 315 Args: 316 job_ids (list[str]): List of job identifiers. 317 318 Returns: 319 list[TBatchJob]: List of TBatchJob objects, one for each job id. 320 """ 321 return [cls.get_batch_job(id) for id in job_ids] 322 323 @classmethod 324 def get_unfinished_batch_jobs( 325 cls, user: str, server: str | None = None 326 ) -> list[TBatchJob]: 327 """ 328 Retrieve information about all uncompleted jobs submitted by `user` 329 on the specified or default batch server. 330 331 The jobs can be returned in arbitrary order. 332 333 Args: 334 user (str): Username for which to fetch uncompleted jobs. 335 server (str | None): Optional name of the batch server to get jobs from. 336 337 Returns: 338 list[BatchJobInterface]: A list of job info objects representing the user's uncompleted jobs. 339 """ 340 raise NotImplementedError( 341 f"get_unfinished_batch_jobs method is not implemented for {cls.__name__}" 342 ) 343 344 @classmethod 345 def get_batch_jobs(cls, user: str, server: str | None = None) -> list[TBatchJob]: 346 """ 347 Retrieve information about all jobs submitted by a specific user (including finished jobs) 348 on the specified or default batch server. 349 350 The jobs can be returned in arbitrary order. 351 352 Args: 353 user (str): Username for which to fetch all jobs. 354 server (str | None): Optional name of the batch server to get jobs from. 355 356 Returns: 357 list[BatchJobInterface]: A list of job info objects representing all jobs of the user. 358 """ 359 raise NotImplementedError( 360 f"get_batch_jobs method is not implemented for {cls.__name__}" 361 ) 362 363 @classmethod 364 def get_all_unfinished_batch_jobs( 365 cls, server: str | None = None 366 ) -> list[TBatchJob]: 367 """ 368 Retrieve information about uncompleted jobs of all users on the specified or default batch server. 369 370 The jobs can be returned in arbitrary order. 371 372 Args: 373 server (str | None): Optional name of the batch server to get jobs from. 374 375 Returns: 376 list[BatchJobInterface]: A list of job info objects representing uncompleted jobs of all users. 377 """ 378 raise NotImplementedError( 379 f"get_all_unfinished_batch_jobs method is not implemented for {cls.__name__}" 380 ) 381 382 @classmethod 383 def get_all_batch_jobs(cls, server: str | None = None) -> list[TBatchJob]: 384 """ 385 Retrieve information about all jobs of all users on the specified or default batch server. 386 387 The jobs can be returned in arbitrary order. 388 389 Args: 390 server (str | None): Optional name of the batch server to get jobs from. 391 392 Returns: 393 list[BatchJobInterface]: A list of job info objects representing all jobs of all users. 394 """ 395 raise NotImplementedError( 396 f"get_all_batch_jobs method is not implemented for {cls.__name__}" 397 ) 398 399 @classmethod 400 def get_queues(cls, server: str | None = None) -> list[TBatchQueue]: 401 """ 402 Retrieve all queues managed by the batch system on the specified or default batch server. 403 404 Args: 405 server (str | None): Optional name of the batch server to get queues from. 406 407 Returns: 408 list[BatchQueueInterface]: A list of queue objects existing in the batch system. 409 """ 410 raise NotImplementedError( 411 f"get_queues method is not implemented for {cls.__name__}" 412 ) 413 414 @classmethod 415 def get_nodes(cls, server: str | None = None) -> list[TBatchNode]: 416 """ 417 Retrieve all nodes managed by the batch system on the specified or default batch server. 418 419 Args: 420 server (str | None): Optional name of the batch server to get nodes from. 421 422 Returns: 423 list[BatchNodeInterface]: A list of node objects existing in the batch system. 424 """ 425 raise NotImplementedError( 426 f"get_nodes method is not implemented for {cls.__name__}" 427 ) 428 429 @classmethod 430 def get_supported_work_dir_types(cls) -> list[str]: 431 """ 432 Retrieve the list of supported types of working directories 433 (i.e., strings that can be used with the `--work-dir` option). 434 435 Returns: 436 list[str]: A list of supported types of working directories. 437 """ 438 raise NotImplementedError( 439 f"get_supported_work_dir_types method is not implemented for {cls.__name__}" 440 ) 441 442 @classmethod 443 def navigate_to_destination(cls, host: str, directory: Path) -> None: 444 """ 445 Open a new terminal on the specified host and change the working directory 446 to the given path, handing control over to the user. 447 448 Default behavior: 449 - If the target host is different from the current host, SSH is used 450 to connect and `cd` is executed to switch to the directory. 451 Note that the timeout for the SSH connection is set to `CFG.timeouts.ssh` seconds. 452 - If the target host matches the current host, only `cd` is used. 453 454 A new terminal should always be opened, regardless of the host. 455 456 Args: 457 host (str): Hostname where the directory is located. 458 directory (Path): Directory path to navigate to. 459 460 Raises: 461 QQError: If the navigation fails. 462 """ 463 # if the directory is on the current host, we do not need to use ssh 464 if host == socket.getfqdn(): 465 cls._navigate_same_host(directory) 466 return 467 468 # the directory is on an another node 469 ssh_command = cls._translate_ssh_command(host, directory) 470 logger.debug(f"Using ssh: '{' '.join(ssh_command)}'") 471 result = subprocess.run(ssh_command) 472 473 # the subprocess exit code can come from: 474 # - SSH itself failing - returns _SSH_FAIL 475 # - the explicit exit code we set if 'cd' to the directory fails - returns _CD_FAIL 476 # - the exit code of the last command the user runs in the interactive shell 477 # 478 # we ignore user exit codes entirely and only treat _SSH_FAIL and _CD_FAIL as errors 479 if result.returncode == cls._SSH_FAIL: 480 raise QQError( 481 f"Could not reach '{host}:{str(directory)}': Could not connect to host." 482 ) 483 if result.returncode == cls._CD_FAIL: 484 raise QQError( 485 f"Could not reach '{host}:{str(directory)}': Could not change directory." 486 ) 487 488 @classmethod 489 def read_remote_file(cls, host: str, file: Path) -> str: 490 """ 491 Read the contents of a file on a remote host and return it as a string. 492 493 The default implementation uses SSH to retrieve the file contents. 494 This approach may be inefficient on shared storage or high-latency networks. 495 Note that the timeout for the SSH connection is set to `CFG.timeouts.ssh` seconds. 496 497 Subclasses should override this method to provide a more efficient implementation 498 if possible. 499 500 Args: 501 host (str): The hostname of the remote machine where the file resides. 502 file (Path): The path to the file on the remote host. 503 504 Returns: 505 str: The contents of the remote file. 506 507 Raises: 508 QQError: If the file cannot be read or SSH fails. 509 """ 510 result = subprocess.run( 511 [ 512 "ssh", 513 "-o PasswordAuthentication=no", 514 "-o GSSAPIAuthentication=yes", 515 "-o StrictHostKeyChecking=no", # allow unknown hosts 516 f"-o ConnectTimeout={CFG.timeouts.ssh}", 517 "-q", # suppress some SSH messages 518 host, 519 f"cat {file}", 520 ], 521 capture_output=True, 522 text=True, 523 ) 524 525 if result.returncode != 0: 526 raise QQError( 527 f"Could not read remote file '{file}' on '{host}': {result.stderr.strip()}." 528 ) 529 return result.stdout 530 531 @classmethod 532 def write_remote_file(cls, host: str, file: Path, content: str) -> None: 533 """ 534 Write the given content to a file on a remote host, overwriting it if it exists. 535 536 The default implementation uses SSH to send the content to the remote file. 537 This approach may be inefficient on shared storage or high-latency networks. 538 Note that the timeout for the SSH connection is set to `CFG.timeouts.ssh` seconds. 539 540 Subclasses should override this method to provide a more efficient implementation 541 if possible. 542 543 Args: 544 host (str): The hostname of the remote machine where the file resides. 545 file (Path): The path to the file on the remote host. 546 content (str): The content to write to the remote file. 547 548 Raises: 549 QQError: If the file cannot be written or SSH fails. 550 """ 551 552 result = subprocess.run( 553 [ 554 "ssh", 555 "-o PasswordAuthentication=no", 556 "-o GSSAPIAuthentication=yes", 557 "-o StrictHostKeyChecking=no", # allow unknown hosts 558 f"-o ConnectTimeout={CFG.timeouts.ssh}", 559 host, 560 f"cat > {file}", 561 ], 562 input=content, 563 capture_output=True, 564 text=True, 565 ) 566 567 if result.returncode != 0: 568 raise QQError( 569 f"Could not write to remote file '{file}' on '{host}': {result.stderr.strip()}." 570 ) 571 572 @classmethod 573 def make_remote_dir(cls, host: str, directory: Path) -> None: 574 """ 575 Create a directory at the specified path on a remote host. 576 577 The default implementation uses SSH to run `mkdir` on the remote host. 578 This approach may be inefficient on shared storage or high-latency networks. 579 Note that the timeout for the SSH connection is set to `CFG.timeouts.ssh` seconds. 580 581 Subclasses should override this method to provide a more efficient implementation 582 if possible. 583 584 Args: 585 host (str): The hostname of the remote machine where the directory should be created. 586 directory (Path): The path of the directory to create on the remote host. 587 588 Raises: 589 QQError: If the directory cannot be created but does not already exist or the SSH command fails. 590 """ 591 result = subprocess.run( 592 [ 593 "ssh", 594 "-o PasswordAuthentication=no", 595 "-o GSSAPIAuthentication=yes", 596 "-o StrictHostKeyChecking=no", # allow unknown hosts 597 f"-o ConnectTimeout={CFG.timeouts.ssh}", 598 host, 599 # ignore an error if the directory already exists 600 f"mkdir {directory} 2>/dev/null || [ -d {directory} ]", 601 ], 602 capture_output=True, 603 text=True, 604 ) 605 606 if result.returncode != 0: 607 raise QQError( 608 f"Could not make remote directory '{directory}' on '{host}': {result.stderr.strip()}." 609 ) 610 611 @classmethod 612 def list_remote_dir(cls, host: str, directory: Path) -> list[Path]: 613 """ 614 List all files and directories (absolute paths) in the specified directory on a remote host. 615 616 The default implementation uses SSH to run `ls -A` on the remote host. 617 This approach may be inefficient on shared storage or high-latency networks. 618 Note that the timeout for the SSH connection is set to `CFG.timeouts.ssh` seconds. 619 620 Subclasses should override this method to provide a more efficient implementation 621 if possible. 622 623 Args: 624 host (str): The hostname of the remote machine where the directory resides. 625 directory (Path): The remote directory to list. 626 627 Returns: 628 list[Path]: A list of `Path` objects representing the entries inside the directory. 629 Entries are relative to the given `directory`. 630 631 Raises: 632 QQError: If the directory cannot be listed or the SSH command fails. 633 """ 634 result = subprocess.run( 635 [ 636 "ssh", 637 "-o PasswordAuthentication=no", 638 "-o GSSAPIAuthentication=yes", 639 "-o StrictHostKeyChecking=no", # allow unknown hosts 640 f"-o ConnectTimeout={CFG.timeouts.ssh}", 641 host, 642 f"ls -A {directory}", 643 ], 644 capture_output=True, 645 text=True, 646 ) 647 648 if result.returncode != 0: 649 raise QQError( 650 f"Could not list remote directory '{directory}' on '{host}': {result.stderr.strip()}." 651 ) 652 653 # split by newline and filter out empty lines 654 return [ 655 logical_resolve(Path(directory) / line) 656 for line in result.stdout.splitlines() 657 if line.strip() 658 ] 659 660 @classmethod 661 def delete_remote_dir(cls, host: str, directory: Path) -> None: 662 """ 663 Delete a directory on a remote host. 664 665 The default implementation uses SSH to run `rm -r` on the remote host. 666 This approach may be inefficient on shared storage or high-latency networks. 667 Note that the timeout for the SSH connection is set to `CFG.timeouts.ssh` seconds. 668 669 Subclasses should override this method to provide a more efficient implementation 670 if possible. 671 672 Args: 673 host (str): The hostname of the remote machine where the directory resides. 674 directory (Path): The remote directory to delete. 675 676 Raises: 677 QQError: If the directory cannot be deleted or the SSH command fails. 678 """ 679 result = subprocess.run( 680 [ 681 "ssh", 682 "-o PasswordAuthentication=no", 683 "-o GSSAPIAuthentication=yes", 684 "-o StrictHostKeyChecking=no", # allow unknown hosts 685 f"-o ConnectTimeout={CFG.timeouts.ssh}", 686 host, 687 f"yes | rm -r {directory}", 688 ], 689 capture_output=True, 690 text=True, 691 ) 692 693 if result.returncode != 0: 694 raise QQError( 695 f"Could not delete remote directory '{directory}' on '{host}': {result.stderr.strip()}." 696 ) 697 698 @classmethod 699 def move_remote_files( 700 cls, host: str, files: list[Path], moved_files: list[Path] 701 ) -> None: 702 """ 703 Move files on a remote host from their current paths to new paths. 704 705 The default implementation uses SSH to run a sequence of `mv` commands on the remote host. 706 This approach may be inefficient on shared storage or high-latency networks. 707 Note that the timeout for the SSH connection is set to `CFG.timeouts.ssh` seconds. 708 709 Subclasses should override this method to provide a more efficient implementation 710 if possible. 711 712 Args: 713 host (str): The hostname of the remote machine where the files reside. 714 files (list[Path]): A list of source file paths on the remote host. 715 moved_files (list[Path]): A list of destination file paths on the remote host. 716 Must be the same length as `files`. 717 718 Raises: 719 QQError: If the SSH command fails, the files cannot be moved or 720 the length of `files` does not match the length of `moved_files`. 721 """ 722 mv_command = cls._translate_move_command(files, moved_files) 723 724 result = subprocess.run( 725 [ 726 "ssh", 727 "-o PasswordAuthentication=no", 728 "-o GSSAPIAuthentication=yes", 729 "-o StrictHostKeyChecking=no", # allow unknown hosts 730 f"-o ConnectTimeout={CFG.timeouts.ssh}", 731 host, 732 mv_command, 733 ], 734 capture_output=True, 735 text=True, 736 ) 737 738 if result.returncode != 0: 739 raise QQError( 740 f"Could not move files on a remote host '{host}': {result.stderr.strip()}." 741 ) 742 743 @classmethod 744 def sync_with_exclusions( 745 cls, 746 src_dir: Path, 747 dest_dir: Path, 748 src_host: str | None, 749 dest_host: str | None, 750 exclude_files: list[Path] | None = None, 751 ) -> None: 752 """ 753 Synchronize the contents of two directories using rsync, optionally across remote hosts, 754 while excluding specified files or subdirectories. 755 756 All files and directories in `src_dir` are copied to `dest_dir` except 757 those listed in `exclude_files`. Files are never removed from the destination. 758 759 Args: 760 src_dir (Path): Source directory to sync from. 761 dest_dir (Path): Destination directory to sync to. 762 src_host (str | None): Optional hostname of the source machine if remote; 763 None if the source is local. 764 dest_host (str | None): Optional hostname of the destination machine if remote; 765 None if the destination is local. 766 exclude_files (list[Path] | None): Optional list of absolute file paths to exclude from syncing. 767 These will be converted to paths relative to `src_dir`. 768 769 Raises: 770 QQError: If the rsync command fails for any reason or timeouts. 771 """ 772 # convert absolute paths of files to exclude into relative to src_dir 773 relative_excluded = ( 774 convert_absolute_to_relative(exclude_files, src_dir) 775 if exclude_files 776 else [] 777 ) 778 779 command = cls._translate_rsync_excluded_command( 780 src_dir, dest_dir, src_host, dest_host, relative_excluded 781 ) 782 logger.debug(f"Rsync command: {command}.") 783 784 cls._run_rsync(src_dir, dest_dir, src_host, dest_host, command) 785 786 @classmethod 787 def sync_selected( 788 cls, 789 src_dir: Path, 790 dest_dir: Path, 791 src_host: str | None, 792 dest_host: str | None, 793 include_files: list[Path] | None = None, 794 ) -> None: 795 """ 796 Synchronize only the explicitly selected files and directories from the source 797 to the destination, optionally across remote hosts. 798 799 Only files listed in `include_files` are copied from `src_dir` to `dest_dir`. 800 Files not listed are ignored. Files are never removed from the destination. 801 802 Args: 803 src_dir (Path): Source directory to sync from. 804 dest_dir (Path): Destination directory to sync to. 805 src_host (str | None): Optional hostname of the source machine if remote; 806 None if the source is local. 807 dest_host (str | None): Optional hostname of the destination machine if remote; 808 None if the destination is local. 809 include_files (list[Path] | None): Optional list of absolute file paths to include in syncing. 810 These paths are converted relative to `src_dir`. 811 This argument is optional only for consistency with sync_with_exclusions. 812 813 Raises: 814 QQError: If the rsync command fails or times out. 815 """ 816 # convert absolute paths of files to include relative to src_dir 817 relative_included = ( 818 convert_absolute_to_relative(include_files, src_dir) 819 if include_files 820 else [] 821 ) 822 823 command = cls._translate_rsync_included_command( 824 src_dir, dest_dir, src_host, dest_host, relative_included 825 ) 826 logger.debug(f"Rsync command: {command}.") 827 828 cls._run_rsync(src_dir, dest_dir, src_host, dest_host, command) 829 830 @classmethod 831 def transform_resources( 832 cls, queue: str, server: str | None, provided_resources: Resources 833 ) -> Resources: 834 """ 835 Transform user-provided Resources into a batch system-specific Resources instance. 836 837 This method takes the resources provided during submission and returns a new 838 Resources object with any necessary modifications or defaults applied for 839 the target batch system. The original `provided_resources` object is not modified. 840 841 Args: 842 queue (str): The name of the queue for which the resources are being adapted. 843 server (str | None): Name of the server on which the queue is located. 844 If `None`, the queue is treated as being located on the current server. 845 provided_resources (Resources): The raw resources specified by the user. 846 847 Returns: 848 Resources: A new Resources instance with batch system-specific adjustments, 849 fully constructed and validated. 850 851 Raises: 852 QQError: If any of the provided parameters are invalid or inconsistent. 853 """ 854 raise NotImplementedError( 855 f"transform_resources method is not implemented for {cls.__name__}" 856 ) 857 858 @classmethod 859 def is_shared(cls, directory: Path) -> bool: 860 """ 861 Determine whether a given directory resides on a shared filesystem. 862 863 Args: 864 directory (Path): The directory to check. 865 866 Returns: 867 bool: True if the directory is on a shared filesystem, False if it is local. 868 """ 869 # df -l exits with zero if the filesystem is local; otherwise it exits with a non-zero code 870 result = subprocess.run( 871 ["df", "-l", directory], 872 stdout=subprocess.DEVNULL, 873 stderr=subprocess.DEVNULL, 874 ) 875 876 return result.returncode != 0 877 878 @classmethod 879 def get_default_resubmit_hosts(cls) -> list[ResubmitHost]: 880 """ 881 Get the default job resubmission hosts for this batch system. 882 883 In the default implementation, resubmission from the input machine is attempted. 884 885 Returns: 886 list[ResubmitHost]: A list of resubmission hosts. 887 """ 888 return [InputHost()] 889 890 @classmethod 891 def sort_jobs(cls, jobs: list[TBatchJob]) -> None: 892 """ 893 Sort a list of batch system jobs by a defined attribute. 894 895 The default implementation sorts the jobs alphabetically by their job ID, 896 as returned by `job.get_id()`. Subclasses may override this method to 897 implement custom sorting logic. 898 899 Args: 900 jobs (list[BatchJobInterface]): A list of batch job objects to be sorted 901 in-place. 902 """ 903 jobs.sort(key=lambda job: job.get_id()) 904 905 @classmethod 906 def jobs_presenter_columns_to_show(cls) -> set[str]: 907 """ 908 Get a set of columns that should be shown in the output of JobsPresenter (`qq jobs`) 909 for this batch system. 910 911 In the default implementation, all columns are shown. 912 913 Note that the 'Exit' column is not shown when printing queued and running jobs, 914 even if you specify it here. 915 916 Args: 917 set[str]: Set of column titles that should be shown. 918 """ 919 return { 920 "S", 921 "Job ID", 922 "User", 923 "Job Name", 924 "Queue", 925 "NCPUs", 926 "NGPUs", 927 "NNodes", 928 "Times", 929 "Node", 930 "%CPU", 931 "%Mem", 932 "Exit", 933 } 934 935 @classmethod 936 def _translate_ssh_command(cls, host: str, directory: Path) -> list[str]: 937 """ 938 Construct the SSH command to navigate to a remote directory. 939 940 This is an internal method of `BatchInterface`; you typically should not override it. 941 942 Args: 943 host (str): The hostname of the remote machine. 944 directory (Path): The target directory to navigate to. 945 946 Returns: 947 list[str]: SSH command as a list suitable for subprocess execution. 948 """ 949 return [ 950 "ssh", 951 "-o PasswordAuthentication=no", # never ask for password 952 "-o GSSAPIAuthentication=yes", # allow Kerberos tickets 953 "-o StrictHostKeyChecking=no", # allow unknown hosts 954 f"-o ConnectTimeout={CFG.timeouts.ssh}", 955 host, 956 "-t", 957 f"cd {directory} || exit {cls._CD_FAIL} && exec bash -l", 958 ] 959 960 @classmethod 961 def _navigate_same_host(cls, directory: Path) -> None: 962 """ 963 Navigate to a directory on the current host using a subprocess. 964 965 This is an internal method of `BatchInterface`; you typically should not override it. 966 967 Args: 968 directory (Path): Directory to navigate to. 969 """ 970 logger.debug("Current host is the same as target host. Using 'cd'.") 971 if not directory.is_dir(): 972 raise QQError( 973 f"Could not reach '{socket.getfqdn()}:{str(directory)}': Could not change directory." 974 ) 975 976 subprocess.run(["bash"], cwd=directory) 977 978 # if the directory exists, always report success, 979 # no matter what the user does inside the terminal 980 981 @classmethod 982 def _translate_move_command(cls, files: list[Path], moved_files: list[Path]) -> str: 983 """ 984 Translate lists of source and destination file paths into a single shell 985 command string for moving the files. 986 987 This is an internal method of `BatchInterface`; you typically should not override it. 988 989 Args: 990 files (list[Path]): A list of source file paths to be moved. 991 moved_files (list[Path]): A list of destination file paths of the same 992 length as `files`. 993 994 Returns: 995 str: A single shell command string consisting of `mv` commands joined 996 with `&&`. 997 998 Raises: 999 QQError: If `files` and `moved_files` do not have the same length. 1000 """ 1001 if len(files) != len(moved_files): 1002 raise QQError( 1003 "The provided 'files' and 'moved_files' must have the same length." 1004 ) 1005 1006 mv_commands: list[str] = [] 1007 for src, dst in zip(files, moved_files): 1008 mv_commands.append(f"mv '{src}' '{dst}'") 1009 1010 return " && ".join(mv_commands) 1011 1012 @classmethod 1013 def _translate_rsync_excluded_command( 1014 cls, 1015 src_dir: Path, 1016 dest_dir: Path, 1017 src_host: str | None, 1018 dest_host: str | None, 1019 relative_excluded: list[Path], 1020 ) -> list[str]: 1021 """ 1022 Build an rsync command to synchronize a directory while excluding specific files. 1023 1024 Both `src_host` and `dest_host` should not be set simultaneously, 1025 otherwise the resulting rsync command will be invalid. 1026 1027 This is an internal method of `BatchInterface`; you typically should not override it. 1028 1029 Args: 1030 src_dir (Path): Source directory path. 1031 dest_dir (Path): Destination directory path. 1032 src_host (str | None): Hostname of the source machine if remote; 1033 None if the source is local. 1034 dest_host (str | None): Hostname of the destination machine if remote; 1035 None if the destination is local. 1036 relative_excluded (list[Path]): List of paths relative to `src_dir` 1037 to exclude from syncing. 1038 1039 Returns: 1040 list[str]: List of command arguments for rsync, suitable for `subprocess.run`. 1041 """ 1042 # syncing recursively (-r), symlinks copied as symlinks (-l), 1043 # preserving times (-t), preserving device/special files (-D), 1044 # but not preserving owners and groups 1045 # not using --checksum nor --ignore-times for performance reasons 1046 # some files may potentially not be correctly synced if they were 1047 # modified in both src_dir and dest_dir at the same time and have 1048 # the same size -> this should be so extremely rare that we do not care 1049 command = [ 1050 "rsync", 1051 "-e", 1052 # allow Kerberos tickets, never ask for password, allow unknown hosts 1053 "ssh -o GSSAPIAuthentication=yes -o PasswordAuthentication=no -o StrictHostKeyChecking=no", 1054 "-rltD", 1055 ] 1056 for file in relative_excluded: 1057 command.extend(["--exclude", str(file)]) 1058 1059 src = src_host + ":" + str(src_dir) + "/" if src_host else str(src_dir) + "/" 1060 dest = dest_host + ":" + str(dest_dir) if dest_host else str(dest_dir) 1061 command.extend([src, dest]) 1062 1063 return command 1064 1065 @classmethod 1066 def _translate_rsync_included_command( 1067 cls, 1068 src_dir: Path, 1069 dest_dir: Path, 1070 src_host: str | None, 1071 dest_host: str | None, 1072 relative_included: list[Path], 1073 ) -> list[str]: 1074 """ 1075 Build an rsync command to synchronize only the explicitly included files. 1076 1077 Both `src_host` and `dest_host` should not be set simultaneously, 1078 otherwise the resulting rsync command will be invalid. 1079 1080 This is an internal method of `BatchInterface`; you typically should not override it. 1081 1082 Args: 1083 src_dir (Path): Source directory path. 1084 dest_dir (Path): Destination directory path. 1085 src_host (str | None): Hostname of the source machine if remote; 1086 None if the source is local. 1087 dest_host (str | None): Hostname of the destination machine if remote; 1088 None if the destination is local. 1089 relative_included (list[Path]): List of paths relative to `src_dir` 1090 that should be included in the sync. 1091 1092 Returns: 1093 list[str]: List of command arguments for rsync, suitable for `subprocess.run`. 1094 """ 1095 1096 command = [ 1097 "rsync", 1098 "-e", 1099 # allow Kerberos tickets, never ask for password, allow unknown hosts 1100 "ssh -o GSSAPIAuthentication=yes -o PasswordAuthentication=no -o StrictHostKeyChecking=no", 1101 "-rltD", 1102 ] 1103 for file in relative_included: 1104 # if `file` is a file 1105 command.extend(["--include", str(file)]) 1106 # if `file` is a directory 1107 # it's okay to include both patterns - if it is invalid, it's ignored 1108 command.extend(["--include", f"{str(file)}/***"]) 1109 # exclude all files not specifically included 1110 command.extend(["--exclude", "*"]) 1111 1112 src = src_host + ":" + str(src_dir) + "/" if src_host else str(src_dir) + "/" 1113 dest = dest_host + ":" + str(dest_dir) if dest_host else str(dest_dir) 1114 command.extend([src, dest]) 1115 1116 return command 1117 1118 @classmethod 1119 def _run_rsync( 1120 cls, 1121 src_dir: Path, 1122 dest_dir: Path, 1123 src_host: str | None, 1124 dest_host: str | None, 1125 command: list[str], 1126 ) -> None: 1127 """ 1128 Execute an rsync command to synchronize files between source and destination. 1129 1130 This is an internal method of `BatchInterface`; you typically should not override it. 1131 1132 Args: 1133 src_dir (Path): Source directory path. 1134 dest_dir (Path): Destination directory path. 1135 src_host (str | None): Optional hostname of the source machine if remote; 1136 None if the source is local. 1137 dest_host (str | None): Optional hostname of the destination machine if remote; 1138 None if the destination is local. 1139 command (list[str]): List of command-line arguments for rsync, typically 1140 generated by `_translate_rsync_excluded_command` or `_translate_rsync_included_command`. 1141 1142 Raises: 1143 QQError: If the rsync command fails (non-zero exit code) or 1144 if the command times out after `CFG.timeouts.rsync` seconds. 1145 """ 1146 src = f"{src_host}:{str(src_dir)}" if src_host else str(src_dir) 1147 dest = f"{dest_host}:{str(dest_dir)}" if dest_host else str(dest_dir) 1148 1149 try: 1150 result = subprocess.run( 1151 command, capture_output=True, text=True, timeout=CFG.timeouts.rsync 1152 ) 1153 except subprocess.TimeoutExpired as e: 1154 raise QQError( 1155 f"Could not rsync files between '{src}' and '{dest}': Connection timed out after {CFG.timeouts.rsync} seconds." 1156 ) from e 1157 1158 if result.returncode != 0: 1159 raise QQError( 1160 f"Could not rsync files between '{src}' and '{dest}': {result.stderr.strip()}." 1161 )
Abstract base class for batch system integrations.
Concrete batch system classes must implement these methods to allow qq to interact with different batch systems uniformly.
All functions should raise QQError when encountering an error.
152 @classmethod 153 def env_name(cls) -> str: 154 """ 155 Return the name of the batch system environment. 156 157 Returns: 158 str: The batch system name. 159 """ 160 raise NotImplementedError( 161 f"env_name method is not implemented for {cls.__name__}" 162 )
Return the name of the batch system environment.
Returns:
str: The batch system name.
164 @classmethod 165 def is_available(cls) -> bool: 166 """ 167 Determine whether the batch system is available on the current host. 168 169 Implementations typically verify this by checking for the presence 170 of required commands or other environment-specific indicators. 171 172 Returns: 173 bool: True if the batch system is available, False otherwise. 174 """ 175 raise NotImplementedError( 176 f"is_available method is not implemented for {cls.__name__}" 177 )
Determine whether the batch system is available on the current host.
Implementations typically verify this by checking for the presence of required commands or other environment-specific indicators.
Returns:
bool: True if the batch system is available, False otherwise.
179 @classmethod 180 def get_job_id(cls) -> str | None: 181 """ 182 Get the id of the current job from the corresponding batch system's environment variable. 183 184 For this method to work, it has to be called from the inside of an active job. 185 186 Returns: 187 str | None: Index of the job or None if the collective variable is not set. 188 """ 189 raise NotImplementedError( 190 f"get_job_id method is not implemented for {cls.__name__}" 191 )
Get the id of the current job from the corresponding batch system's environment variable.
For this method to work, it has to be called from the inside of an active job.
Returns:
str | None: Index of the job or None if the collective variable is not set.
193 @classmethod 194 def create_work_dir_on_scratch(cls, job_id: str) -> Path: 195 """ 196 Create the working directory on scratch for the given job. 197 198 Args: 199 job_id (int): Unique identifier of the job. 200 201 Returns: 202 Path: Absolute path to the working directory on scratch. 203 204 Raises: 205 QQError: If the working directory could not be created. 206 """ 207 raise NotImplementedError( 208 f"create_work_dir_on_scratch method is not implemented for {cls.__name__}" 209 )
Create the working directory on scratch for the given job.
Arguments:
- job_id (int): Unique identifier of the job.
Returns:
Path: Absolute path to the working directory on scratch.
Raises:
- QQError: If the working directory could not be created.
211 @classmethod 212 def job_submit( 213 cls, 214 res: Resources, 215 queue: str, 216 script: Path, 217 job_name: str, 218 depend: list[Depend], 219 env_vars: dict[str, str], 220 account: str | None = None, 221 server: str | None = None, 222 remote_host: str | None = None, 223 ) -> str: 224 """ 225 Submit a job to the batch system. 226 227 Can also perform additional validation of the job's resources. 228 229 This method is NOT guaranteed to be thread-safe. 230 231 Args: 232 res (Resources): Resources required for the job. 233 queue (str): Target queue for the job submission. 234 script (Path): Path to the script to execute. 235 job_name (str): Name of the job to use. 236 depend (list[Depend]): List of job dependencies. 237 env_vars (dict[str, str]): Dictionary of environment variables to propagate to the job. 238 account (str | None): Optional account name to use for the job. 239 server (str | None): Optional name of the server to submit the job to. 240 remote_host (str | None): Optional name of the machine to submit the job from. 241 242 Returns: 243 str: Unique ID of the submitted job. 244 245 Raises: 246 QQError: If the job submission fails. 247 """ 248 raise NotImplementedError( 249 f"job_submit method is not implemented for {cls.__name__}" 250 )
Submit a job to the batch system.
Can also perform additional validation of the job's resources.
This method is NOT guaranteed to be thread-safe.
Arguments:
- res (Resources): Resources required for the job.
- queue (str): Target queue for the job submission.
- script (Path): Path to the script to execute.
- job_name (str): Name of the job to use.
- depend (list[Depend]): List of job dependencies.
- env_vars (dict[str, str]): Dictionary of environment variables to propagate to the job.
- account (str | None): Optional account name to use for the job.
- server (str | None): Optional name of the server to submit the job to.
- remote_host (str | None): Optional name of the machine to submit the job from.
Returns:
str: Unique ID of the submitted job.
Raises:
- QQError: If the job submission fails.
252 @classmethod 253 def job_kill(cls, job_id: str) -> None: 254 """ 255 Terminate a job gracefully. The job should have time for proper cleanup. 256 257 Args: 258 job_id (str): Identifier of the job to terminate. 259 260 Raises: 261 QQError: If the job could not be killed. 262 """ 263 raise NotImplementedError( 264 f"job_kill method is not implemented for {cls.__name__}" 265 )
Terminate a job gracefully. The job should have time for proper cleanup.
Arguments:
- job_id (str): Identifier of the job to terminate.
Raises:
- QQError: If the job could not be killed.
267 @classmethod 268 def job_kill_force(cls, job_id: str) -> None: 269 """ 270 Forcefully terminate a job. There may be no time for proper cleanup. 271 272 Args: 273 job_id (str): Identifier of the job to forcefully terminate. 274 275 Raises: 276 QQError: If the job could not be killed. 277 """ 278 raise NotImplementedError( 279 f"job_kill_force method is not implemented for {cls.__name__}" 280 )
Forcefully terminate a job. There may be no time for proper cleanup.
Arguments:
- job_id (str): Identifier of the job to forcefully terminate.
Raises:
- QQError: If the job could not be killed.
282 @classmethod 283 def get_batch_job(cls, job_id: str) -> TBatchJob: 284 """ 285 Retrieve information about a job from the batch system. 286 287 The returned object should be fully initialized, even if the job 288 no longer exists or its information is unavailable. 289 290 Args: 291 job_id (str): Identifier of the job. 292 293 Returns: 294 BatchJobInterface: Object containing the job's metadata and state. 295 """ 296 raise NotImplementedError( 297 f"get_batch_job method is not implemented for {cls.__name__}" 298 )
Retrieve information about a job from the batch system.
The returned object should be fully initialized, even if the job no longer exists or its information is unavailable.
Arguments:
- job_id (str): Identifier of the job.
Returns:
BatchJobInterface: Object containing the job's metadata and state.
300 @classmethod 301 def get_batch_jobs_from_ids(cls, job_ids: list[str]) -> list[TBatchJob]: 302 """ 303 Retrieve information about multiple jobs from the batch system. 304 305 Batch jobs should be returned in the same order as they appear in `job_ids`. 306 A TBatchJob object should be returned for each job id, even if the job 307 no longer exists or its information is unavailable. 308 309 Array jobs should NOT be expanded into their individual tasks. 310 311 The default implementation is to call `get_batch_job` for each job id. 312 This implementation may be inefficient for large numbers of job ids and 313 should be overriden by subclasses. 314 315 Args: 316 job_ids (list[str]): List of job identifiers. 317 318 Returns: 319 list[TBatchJob]: List of TBatchJob objects, one for each job id. 320 """ 321 return [cls.get_batch_job(id) for id in job_ids]
Retrieve information about multiple jobs from the batch system.
Batch jobs should be returned in the same order as they appear in job_ids.
A TBatchJob object should be returned for each job id, even if the job
no longer exists or its information is unavailable.
Array jobs should NOT be expanded into their individual tasks.
The default implementation is to call get_batch_job for each job id.
This implementation may be inefficient for large numbers of job ids and
should be overriden by subclasses.
Arguments:
- job_ids (list[str]): List of job identifiers.
Returns:
list[TBatchJob]: List of TBatchJob objects, one for each job id.
323 @classmethod 324 def get_unfinished_batch_jobs( 325 cls, user: str, server: str | None = None 326 ) -> list[TBatchJob]: 327 """ 328 Retrieve information about all uncompleted jobs submitted by `user` 329 on the specified or default batch server. 330 331 The jobs can be returned in arbitrary order. 332 333 Args: 334 user (str): Username for which to fetch uncompleted jobs. 335 server (str | None): Optional name of the batch server to get jobs from. 336 337 Returns: 338 list[BatchJobInterface]: A list of job info objects representing the user's uncompleted jobs. 339 """ 340 raise NotImplementedError( 341 f"get_unfinished_batch_jobs method is not implemented for {cls.__name__}" 342 )
Retrieve information about all uncompleted jobs submitted by user
on the specified or default batch server.
The jobs can be returned in arbitrary order.
Arguments:
- user (str): Username for which to fetch uncompleted jobs.
- server (str | None): Optional name of the batch server to get jobs from.
Returns:
list[BatchJobInterface]: A list of job info objects representing the user's uncompleted jobs.
344 @classmethod 345 def get_batch_jobs(cls, user: str, server: str | None = None) -> list[TBatchJob]: 346 """ 347 Retrieve information about all jobs submitted by a specific user (including finished jobs) 348 on the specified or default batch server. 349 350 The jobs can be returned in arbitrary order. 351 352 Args: 353 user (str): Username for which to fetch all jobs. 354 server (str | None): Optional name of the batch server to get jobs from. 355 356 Returns: 357 list[BatchJobInterface]: A list of job info objects representing all jobs of the user. 358 """ 359 raise NotImplementedError( 360 f"get_batch_jobs method is not implemented for {cls.__name__}" 361 )
Retrieve information about all jobs submitted by a specific user (including finished jobs) on the specified or default batch server.
The jobs can be returned in arbitrary order.
Arguments:
- user (str): Username for which to fetch all jobs.
- server (str | None): Optional name of the batch server to get jobs from.
Returns:
list[BatchJobInterface]: A list of job info objects representing all jobs of the user.
363 @classmethod 364 def get_all_unfinished_batch_jobs( 365 cls, server: str | None = None 366 ) -> list[TBatchJob]: 367 """ 368 Retrieve information about uncompleted jobs of all users on the specified or default batch server. 369 370 The jobs can be returned in arbitrary order. 371 372 Args: 373 server (str | None): Optional name of the batch server to get jobs from. 374 375 Returns: 376 list[BatchJobInterface]: A list of job info objects representing uncompleted jobs of all users. 377 """ 378 raise NotImplementedError( 379 f"get_all_unfinished_batch_jobs method is not implemented for {cls.__name__}" 380 )
Retrieve information about uncompleted jobs of all users on the specified or default batch server.
The jobs can be returned in arbitrary order.
Arguments:
- server (str | None): Optional name of the batch server to get jobs from.
Returns:
list[BatchJobInterface]: A list of job info objects representing uncompleted jobs of all users.
382 @classmethod 383 def get_all_batch_jobs(cls, server: str | None = None) -> list[TBatchJob]: 384 """ 385 Retrieve information about all jobs of all users on the specified or default batch server. 386 387 The jobs can be returned in arbitrary order. 388 389 Args: 390 server (str | None): Optional name of the batch server to get jobs from. 391 392 Returns: 393 list[BatchJobInterface]: A list of job info objects representing all jobs of all users. 394 """ 395 raise NotImplementedError( 396 f"get_all_batch_jobs method is not implemented for {cls.__name__}" 397 )
Retrieve information about all jobs of all users on the specified or default batch server.
The jobs can be returned in arbitrary order.
Arguments:
- server (str | None): Optional name of the batch server to get jobs from.
Returns:
list[BatchJobInterface]: A list of job info objects representing all jobs of all users.
399 @classmethod 400 def get_queues(cls, server: str | None = None) -> list[TBatchQueue]: 401 """ 402 Retrieve all queues managed by the batch system on the specified or default batch server. 403 404 Args: 405 server (str | None): Optional name of the batch server to get queues from. 406 407 Returns: 408 list[BatchQueueInterface]: A list of queue objects existing in the batch system. 409 """ 410 raise NotImplementedError( 411 f"get_queues method is not implemented for {cls.__name__}" 412 )
Retrieve all queues managed by the batch system on the specified or default batch server.
Arguments:
- server (str | None): Optional name of the batch server to get queues from.
Returns:
list[BatchQueueInterface]: A list of queue objects existing in the batch system.
414 @classmethod 415 def get_nodes(cls, server: str | None = None) -> list[TBatchNode]: 416 """ 417 Retrieve all nodes managed by the batch system on the specified or default batch server. 418 419 Args: 420 server (str | None): Optional name of the batch server to get nodes from. 421 422 Returns: 423 list[BatchNodeInterface]: A list of node objects existing in the batch system. 424 """ 425 raise NotImplementedError( 426 f"get_nodes method is not implemented for {cls.__name__}" 427 )
Retrieve all nodes managed by the batch system on the specified or default batch server.
Arguments:
- server (str | None): Optional name of the batch server to get nodes from.
Returns:
list[BatchNodeInterface]: A list of node objects existing in the batch system.
429 @classmethod 430 def get_supported_work_dir_types(cls) -> list[str]: 431 """ 432 Retrieve the list of supported types of working directories 433 (i.e., strings that can be used with the `--work-dir` option). 434 435 Returns: 436 list[str]: A list of supported types of working directories. 437 """ 438 raise NotImplementedError( 439 f"get_supported_work_dir_types method is not implemented for {cls.__name__}" 440 )
Retrieve the list of supported types of working directories
(i.e., strings that can be used with the --work-dir option).
Returns:
list[str]: A list of supported types of working directories.
488 @classmethod 489 def read_remote_file(cls, host: str, file: Path) -> str: 490 """ 491 Read the contents of a file on a remote host and return it as a string. 492 493 The default implementation uses SSH to retrieve the file contents. 494 This approach may be inefficient on shared storage or high-latency networks. 495 Note that the timeout for the SSH connection is set to `CFG.timeouts.ssh` seconds. 496 497 Subclasses should override this method to provide a more efficient implementation 498 if possible. 499 500 Args: 501 host (str): The hostname of the remote machine where the file resides. 502 file (Path): The path to the file on the remote host. 503 504 Returns: 505 str: The contents of the remote file. 506 507 Raises: 508 QQError: If the file cannot be read or SSH fails. 509 """ 510 result = subprocess.run( 511 [ 512 "ssh", 513 "-o PasswordAuthentication=no", 514 "-o GSSAPIAuthentication=yes", 515 "-o StrictHostKeyChecking=no", # allow unknown hosts 516 f"-o ConnectTimeout={CFG.timeouts.ssh}", 517 "-q", # suppress some SSH messages 518 host, 519 f"cat {file}", 520 ], 521 capture_output=True, 522 text=True, 523 ) 524 525 if result.returncode != 0: 526 raise QQError( 527 f"Could not read remote file '{file}' on '{host}': {result.stderr.strip()}." 528 ) 529 return result.stdout
Read the contents of a file on a remote host and return it as a string.
The default implementation uses SSH to retrieve the file contents.
This approach may be inefficient on shared storage or high-latency networks.
Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.
Subclasses should override this method to provide a more efficient implementation if possible.
Arguments:
- host (str): The hostname of the remote machine where the file resides.
- file (Path): The path to the file on the remote host.
Returns:
str: The contents of the remote file.
Raises:
- QQError: If the file cannot be read or SSH fails.
531 @classmethod 532 def write_remote_file(cls, host: str, file: Path, content: str) -> None: 533 """ 534 Write the given content to a file on a remote host, overwriting it if it exists. 535 536 The default implementation uses SSH to send the content to the remote file. 537 This approach may be inefficient on shared storage or high-latency networks. 538 Note that the timeout for the SSH connection is set to `CFG.timeouts.ssh` seconds. 539 540 Subclasses should override this method to provide a more efficient implementation 541 if possible. 542 543 Args: 544 host (str): The hostname of the remote machine where the file resides. 545 file (Path): The path to the file on the remote host. 546 content (str): The content to write to the remote file. 547 548 Raises: 549 QQError: If the file cannot be written or SSH fails. 550 """ 551 552 result = subprocess.run( 553 [ 554 "ssh", 555 "-o PasswordAuthentication=no", 556 "-o GSSAPIAuthentication=yes", 557 "-o StrictHostKeyChecking=no", # allow unknown hosts 558 f"-o ConnectTimeout={CFG.timeouts.ssh}", 559 host, 560 f"cat > {file}", 561 ], 562 input=content, 563 capture_output=True, 564 text=True, 565 ) 566 567 if result.returncode != 0: 568 raise QQError( 569 f"Could not write to remote file '{file}' on '{host}': {result.stderr.strip()}." 570 )
Write the given content to a file on a remote host, overwriting it if it exists.
The default implementation uses SSH to send the content to the remote file.
This approach may be inefficient on shared storage or high-latency networks.
Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.
Subclasses should override this method to provide a more efficient implementation if possible.
Arguments:
- host (str): The hostname of the remote machine where the file resides.
- file (Path): The path to the file on the remote host.
- content (str): The content to write to the remote file.
Raises:
- QQError: If the file cannot be written or SSH fails.
572 @classmethod 573 def make_remote_dir(cls, host: str, directory: Path) -> None: 574 """ 575 Create a directory at the specified path on a remote host. 576 577 The default implementation uses SSH to run `mkdir` on the remote host. 578 This approach may be inefficient on shared storage or high-latency networks. 579 Note that the timeout for the SSH connection is set to `CFG.timeouts.ssh` seconds. 580 581 Subclasses should override this method to provide a more efficient implementation 582 if possible. 583 584 Args: 585 host (str): The hostname of the remote machine where the directory should be created. 586 directory (Path): The path of the directory to create on the remote host. 587 588 Raises: 589 QQError: If the directory cannot be created but does not already exist or the SSH command fails. 590 """ 591 result = subprocess.run( 592 [ 593 "ssh", 594 "-o PasswordAuthentication=no", 595 "-o GSSAPIAuthentication=yes", 596 "-o StrictHostKeyChecking=no", # allow unknown hosts 597 f"-o ConnectTimeout={CFG.timeouts.ssh}", 598 host, 599 # ignore an error if the directory already exists 600 f"mkdir {directory} 2>/dev/null || [ -d {directory} ]", 601 ], 602 capture_output=True, 603 text=True, 604 ) 605 606 if result.returncode != 0: 607 raise QQError( 608 f"Could not make remote directory '{directory}' on '{host}': {result.stderr.strip()}." 609 )
Create a directory at the specified path on a remote host.
The default implementation uses SSH to run mkdir on the remote host.
This approach may be inefficient on shared storage or high-latency networks.
Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.
Subclasses should override this method to provide a more efficient implementation if possible.
Arguments:
- host (str): The hostname of the remote machine where the directory should be created.
- directory (Path): The path of the directory to create on the remote host.
Raises:
- QQError: If the directory cannot be created but does not already exist or the SSH command fails.
611 @classmethod 612 def list_remote_dir(cls, host: str, directory: Path) -> list[Path]: 613 """ 614 List all files and directories (absolute paths) in the specified directory on a remote host. 615 616 The default implementation uses SSH to run `ls -A` on the remote host. 617 This approach may be inefficient on shared storage or high-latency networks. 618 Note that the timeout for the SSH connection is set to `CFG.timeouts.ssh` seconds. 619 620 Subclasses should override this method to provide a more efficient implementation 621 if possible. 622 623 Args: 624 host (str): The hostname of the remote machine where the directory resides. 625 directory (Path): The remote directory to list. 626 627 Returns: 628 list[Path]: A list of `Path` objects representing the entries inside the directory. 629 Entries are relative to the given `directory`. 630 631 Raises: 632 QQError: If the directory cannot be listed or the SSH command fails. 633 """ 634 result = subprocess.run( 635 [ 636 "ssh", 637 "-o PasswordAuthentication=no", 638 "-o GSSAPIAuthentication=yes", 639 "-o StrictHostKeyChecking=no", # allow unknown hosts 640 f"-o ConnectTimeout={CFG.timeouts.ssh}", 641 host, 642 f"ls -A {directory}", 643 ], 644 capture_output=True, 645 text=True, 646 ) 647 648 if result.returncode != 0: 649 raise QQError( 650 f"Could not list remote directory '{directory}' on '{host}': {result.stderr.strip()}." 651 ) 652 653 # split by newline and filter out empty lines 654 return [ 655 logical_resolve(Path(directory) / line) 656 for line in result.stdout.splitlines() 657 if line.strip() 658 ]
List all files and directories (absolute paths) in the specified directory on a remote host.
The default implementation uses SSH to run ls -A on the remote host.
This approach may be inefficient on shared storage or high-latency networks.
Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.
Subclasses should override this method to provide a more efficient implementation if possible.
Arguments:
- host (str): The hostname of the remote machine where the directory resides.
- directory (Path): The remote directory to list.
Returns:
list[Path]: A list of
Pathobjects representing the entries inside the directory. Entries are relative to the givendirectory.
Raises:
- QQError: If the directory cannot be listed or the SSH command fails.
660 @classmethod 661 def delete_remote_dir(cls, host: str, directory: Path) -> None: 662 """ 663 Delete a directory on a remote host. 664 665 The default implementation uses SSH to run `rm -r` on the remote host. 666 This approach may be inefficient on shared storage or high-latency networks. 667 Note that the timeout for the SSH connection is set to `CFG.timeouts.ssh` seconds. 668 669 Subclasses should override this method to provide a more efficient implementation 670 if possible. 671 672 Args: 673 host (str): The hostname of the remote machine where the directory resides. 674 directory (Path): The remote directory to delete. 675 676 Raises: 677 QQError: If the directory cannot be deleted or the SSH command fails. 678 """ 679 result = subprocess.run( 680 [ 681 "ssh", 682 "-o PasswordAuthentication=no", 683 "-o GSSAPIAuthentication=yes", 684 "-o StrictHostKeyChecking=no", # allow unknown hosts 685 f"-o ConnectTimeout={CFG.timeouts.ssh}", 686 host, 687 f"yes | rm -r {directory}", 688 ], 689 capture_output=True, 690 text=True, 691 ) 692 693 if result.returncode != 0: 694 raise QQError( 695 f"Could not delete remote directory '{directory}' on '{host}': {result.stderr.strip()}." 696 )
Delete a directory on a remote host.
The default implementation uses SSH to run rm -r on the remote host.
This approach may be inefficient on shared storage or high-latency networks.
Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.
Subclasses should override this method to provide a more efficient implementation if possible.
Arguments:
- host (str): The hostname of the remote machine where the directory resides.
- directory (Path): The remote directory to delete.
Raises:
- QQError: If the directory cannot be deleted or the SSH command fails.
698 @classmethod 699 def move_remote_files( 700 cls, host: str, files: list[Path], moved_files: list[Path] 701 ) -> None: 702 """ 703 Move files on a remote host from their current paths to new paths. 704 705 The default implementation uses SSH to run a sequence of `mv` commands on the remote host. 706 This approach may be inefficient on shared storage or high-latency networks. 707 Note that the timeout for the SSH connection is set to `CFG.timeouts.ssh` seconds. 708 709 Subclasses should override this method to provide a more efficient implementation 710 if possible. 711 712 Args: 713 host (str): The hostname of the remote machine where the files reside. 714 files (list[Path]): A list of source file paths on the remote host. 715 moved_files (list[Path]): A list of destination file paths on the remote host. 716 Must be the same length as `files`. 717 718 Raises: 719 QQError: If the SSH command fails, the files cannot be moved or 720 the length of `files` does not match the length of `moved_files`. 721 """ 722 mv_command = cls._translate_move_command(files, moved_files) 723 724 result = subprocess.run( 725 [ 726 "ssh", 727 "-o PasswordAuthentication=no", 728 "-o GSSAPIAuthentication=yes", 729 "-o StrictHostKeyChecking=no", # allow unknown hosts 730 f"-o ConnectTimeout={CFG.timeouts.ssh}", 731 host, 732 mv_command, 733 ], 734 capture_output=True, 735 text=True, 736 ) 737 738 if result.returncode != 0: 739 raise QQError( 740 f"Could not move files on a remote host '{host}': {result.stderr.strip()}." 741 )
Move files on a remote host from their current paths to new paths.
The default implementation uses SSH to run a sequence of mv commands on the remote host.
This approach may be inefficient on shared storage or high-latency networks.
Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.
Subclasses should override this method to provide a more efficient implementation if possible.
Arguments:
- host (str): The hostname of the remote machine where the files reside.
- files (list[Path]): A list of source file paths on the remote host.
- moved_files (list[Path]): A list of destination file paths on the remote host.
Must be the same length as
files.
Raises:
- QQError: If the SSH command fails, the files cannot be moved or
the length of
filesdoes not match the length ofmoved_files.
743 @classmethod 744 def sync_with_exclusions( 745 cls, 746 src_dir: Path, 747 dest_dir: Path, 748 src_host: str | None, 749 dest_host: str | None, 750 exclude_files: list[Path] | None = None, 751 ) -> None: 752 """ 753 Synchronize the contents of two directories using rsync, optionally across remote hosts, 754 while excluding specified files or subdirectories. 755 756 All files and directories in `src_dir` are copied to `dest_dir` except 757 those listed in `exclude_files`. Files are never removed from the destination. 758 759 Args: 760 src_dir (Path): Source directory to sync from. 761 dest_dir (Path): Destination directory to sync to. 762 src_host (str | None): Optional hostname of the source machine if remote; 763 None if the source is local. 764 dest_host (str | None): Optional hostname of the destination machine if remote; 765 None if the destination is local. 766 exclude_files (list[Path] | None): Optional list of absolute file paths to exclude from syncing. 767 These will be converted to paths relative to `src_dir`. 768 769 Raises: 770 QQError: If the rsync command fails for any reason or timeouts. 771 """ 772 # convert absolute paths of files to exclude into relative to src_dir 773 relative_excluded = ( 774 convert_absolute_to_relative(exclude_files, src_dir) 775 if exclude_files 776 else [] 777 ) 778 779 command = cls._translate_rsync_excluded_command( 780 src_dir, dest_dir, src_host, dest_host, relative_excluded 781 ) 782 logger.debug(f"Rsync command: {command}.") 783 784 cls._run_rsync(src_dir, dest_dir, src_host, dest_host, command)
Synchronize the contents of two directories using rsync, optionally across remote hosts, while excluding specified files or subdirectories.
All files and directories in src_dir are copied to dest_dir except
those listed in exclude_files. Files are never removed from the destination.
Arguments:
- src_dir (Path): Source directory to sync from.
- dest_dir (Path): Destination directory to sync to.
- src_host (str | None): Optional hostname of the source machine if remote; None if the source is local.
- dest_host (str | None): Optional hostname of the destination machine if remote; None if the destination is local.
- exclude_files (list[Path] | None): Optional list of absolute file paths to exclude from syncing.
These will be converted to paths relative to
src_dir.
Raises:
- QQError: If the rsync command fails for any reason or timeouts.
786 @classmethod 787 def sync_selected( 788 cls, 789 src_dir: Path, 790 dest_dir: Path, 791 src_host: str | None, 792 dest_host: str | None, 793 include_files: list[Path] | None = None, 794 ) -> None: 795 """ 796 Synchronize only the explicitly selected files and directories from the source 797 to the destination, optionally across remote hosts. 798 799 Only files listed in `include_files` are copied from `src_dir` to `dest_dir`. 800 Files not listed are ignored. Files are never removed from the destination. 801 802 Args: 803 src_dir (Path): Source directory to sync from. 804 dest_dir (Path): Destination directory to sync to. 805 src_host (str | None): Optional hostname of the source machine if remote; 806 None if the source is local. 807 dest_host (str | None): Optional hostname of the destination machine if remote; 808 None if the destination is local. 809 include_files (list[Path] | None): Optional list of absolute file paths to include in syncing. 810 These paths are converted relative to `src_dir`. 811 This argument is optional only for consistency with sync_with_exclusions. 812 813 Raises: 814 QQError: If the rsync command fails or times out. 815 """ 816 # convert absolute paths of files to include relative to src_dir 817 relative_included = ( 818 convert_absolute_to_relative(include_files, src_dir) 819 if include_files 820 else [] 821 ) 822 823 command = cls._translate_rsync_included_command( 824 src_dir, dest_dir, src_host, dest_host, relative_included 825 ) 826 logger.debug(f"Rsync command: {command}.") 827 828 cls._run_rsync(src_dir, dest_dir, src_host, dest_host, command)
Synchronize only the explicitly selected files and directories from the source to the destination, optionally across remote hosts.
Only files listed in include_files are copied from src_dir to dest_dir.
Files not listed are ignored. Files are never removed from the destination.
Arguments:
- src_dir (Path): Source directory to sync from.
- dest_dir (Path): Destination directory to sync to.
- src_host (str | None): Optional hostname of the source machine if remote; None if the source is local.
- dest_host (str | None): Optional hostname of the destination machine if remote; None if the destination is local.
- include_files (list[Path] | None): Optional list of absolute file paths to include in syncing.
These paths are converted relative to
src_dir. This argument is optional only for consistency with sync_with_exclusions.
Raises:
- QQError: If the rsync command fails or times out.
830 @classmethod 831 def transform_resources( 832 cls, queue: str, server: str | None, provided_resources: Resources 833 ) -> Resources: 834 """ 835 Transform user-provided Resources into a batch system-specific Resources instance. 836 837 This method takes the resources provided during submission and returns a new 838 Resources object with any necessary modifications or defaults applied for 839 the target batch system. The original `provided_resources` object is not modified. 840 841 Args: 842 queue (str): The name of the queue for which the resources are being adapted. 843 server (str | None): Name of the server on which the queue is located. 844 If `None`, the queue is treated as being located on the current server. 845 provided_resources (Resources): The raw resources specified by the user. 846 847 Returns: 848 Resources: A new Resources instance with batch system-specific adjustments, 849 fully constructed and validated. 850 851 Raises: 852 QQError: If any of the provided parameters are invalid or inconsistent. 853 """ 854 raise NotImplementedError( 855 f"transform_resources method is not implemented for {cls.__name__}" 856 )
Transform user-provided Resources into a batch system-specific Resources instance.
This method takes the resources provided during submission and returns a new
Resources object with any necessary modifications or defaults applied for
the target batch system. The original provided_resources object is not modified.
Arguments:
- queue (str): The name of the queue for which the resources are being adapted.
- server (str | None): Name of the server on which the queue is located.
If
None, the queue is treated as being located on the current server. - provided_resources (Resources): The raw resources specified by the user.
Returns:
Resources: A new Resources instance with batch system-specific adjustments, fully constructed and validated.
Raises:
- QQError: If any of the provided parameters are invalid or inconsistent.
878 @classmethod 879 def get_default_resubmit_hosts(cls) -> list[ResubmitHost]: 880 """ 881 Get the default job resubmission hosts for this batch system. 882 883 In the default implementation, resubmission from the input machine is attempted. 884 885 Returns: 886 list[ResubmitHost]: A list of resubmission hosts. 887 """ 888 return [InputHost()]
Get the default job resubmission hosts for this batch system.
In the default implementation, resubmission from the input machine is attempted.
Returns:
list[ResubmitHost]: A list of resubmission hosts.
890 @classmethod 891 def sort_jobs(cls, jobs: list[TBatchJob]) -> None: 892 """ 893 Sort a list of batch system jobs by a defined attribute. 894 895 The default implementation sorts the jobs alphabetically by their job ID, 896 as returned by `job.get_id()`. Subclasses may override this method to 897 implement custom sorting logic. 898 899 Args: 900 jobs (list[BatchJobInterface]): A list of batch job objects to be sorted 901 in-place. 902 """ 903 jobs.sort(key=lambda job: job.get_id())
Sort a list of batch system jobs by a defined attribute.
The default implementation sorts the jobs alphabetically by their job ID,
as returned by job.get_id(). Subclasses may override this method to
implement custom sorting logic.
Arguments:
- jobs (list[BatchJobInterface]): A list of batch job objects to be sorted in-place.
905 @classmethod 906 def jobs_presenter_columns_to_show(cls) -> set[str]: 907 """ 908 Get a set of columns that should be shown in the output of JobsPresenter (`qq jobs`) 909 for this batch system. 910 911 In the default implementation, all columns are shown. 912 913 Note that the 'Exit' column is not shown when printing queued and running jobs, 914 even if you specify it here. 915 916 Args: 917 set[str]: Set of column titles that should be shown. 918 """ 919 return { 920 "S", 921 "Job ID", 922 "User", 923 "Job Name", 924 "Queue", 925 "NCPUs", 926 "NGPUs", 927 "NNodes", 928 "Times", 929 "Node", 930 "%CPU", 931 "%Mem", 932 "Exit", 933 }
Get a set of columns that should be shown in the output of JobsPresenter (qq jobs)
for this batch system.
In the default implementation, all columns are shown.
Note that the 'Exit' column is not shown when printing queued and running jobs, even if you specify it here.
Arguments:
- set[str]: Set of column titles that should be shown.
16class BatchJobInterface(ABC): 17 """ 18 Abstract base class for retrieving and maintaining job information 19 from a batch scheduling system. 20 21 Must support situations where the job information no longer exists. 22 23 The implementation of the constructor is arbitrary and should only 24 be used inside the corresponding implementation of `BatchInterface.get_batch_job`. 25 """ 26 27 @abstractmethod 28 def is_empty(self) -> bool: 29 """ 30 Check whether the job contains any information. 31 This should return True if the job does not exist in the batch system. 32 33 Returns: 34 bool: True if the job contains no information. 35 """ 36 37 @abstractmethod 38 def get_id(self) -> str: 39 """ 40 Return the ID of the job. 41 42 Returns: 43 str: The ID of the job. 44 """ 45 46 @abstractmethod 47 def get_account(self) -> str | None: 48 """ 49 Return the account under which the job is submitted. 50 51 Returns: 52 str | None: Account associated with the job or None if no 53 account is defined. 54 """ 55 56 @abstractmethod 57 def update(self) -> None: 58 """ 59 Refresh the stored job information from the batch system. 60 61 Raises: 62 QQError: If the job cannot be queried or its info updated. 63 """ 64 65 @abstractmethod 66 def get_state(self) -> BatchState: 67 """ 68 Return the current state of the job as reported by the batch system. 69 70 If the job information is no longer available, return `BatchState.UNKNOWN`. 71 72 Returns: 73 BatchState: The job state according to the batch system. 74 """ 75 76 @abstractmethod 77 def get_comment(self) -> str | None: 78 """ 79 Retrieve the batch system-provided comment for the job. 80 81 Returns: 82 str | None: The job's comment string if available, or None if the 83 batch system has not attached a comment. 84 """ 85 86 @abstractmethod 87 def get_estimated(self) -> tuple[datetime, str] | None: 88 """ 89 Retrieve the batch system's estimated job start time and execution node. 90 91 Returns: 92 tuple[datetime, str] | None: A tuple containing: 93 - datetime: The estimated start time of the job. 94 - str: The name of the node where the job is expected to run. 95 Returns None if either estimate is unavailable. 96 """ 97 98 @abstractmethod 99 def get_main_node(self) -> str | None: 100 """ 101 Retrieve the hostname of the main execution node for the job. 102 103 Returns: 104 str | None: The hostname of the main execution node, or ``None`` 105 if unavailable or not applicable. 106 """ 107 108 @abstractmethod 109 def get_nodes(self) -> list[str] | None: 110 """ 111 Retrieve the hostnames of all execution nodes allocated for the job. 112 113 Returns: 114 list[str] | None: 115 A list of hostnames or node identifiers used by the job, 116 or `None` if node information is not available. 117 """ 118 119 @abstractmethod 120 def get_short_nodes(self) -> list[str] | None: 121 """ 122 Retrieve the short hostnames of all execution nodes allocated for the job. 123 124 Returns: 125 list[str] | None: 126 A list of short hostnames used by the job, or `None` if node information 127 is not available. 128 """ 129 130 @abstractmethod 131 def get_user(self) -> str | None: 132 """ 133 Return the username of the job owner. 134 135 Returns: 136 str | None: Username of the user who owns the job or `None` if not available. 137 """ 138 139 @abstractmethod 140 def get_n_cpus(self) -> int | None: 141 """ 142 Return the number of CPU cores allocated for the job. 143 144 Returns: 145 int | None: Number of CPUs allocated for the job or `None` if not available. 146 """ 147 148 @abstractmethod 149 def get_n_gpus(self) -> int | None: 150 """ 151 Return the number of GPUs allocated for the job. 152 153 Returns: 154 int | None: Number of GPUs allocated for the job or `None` if not available. 155 """ 156 157 @abstractmethod 158 def get_n_nodes(self) -> int | None: 159 """ 160 Return the number of compute nodes assigned to the job. 161 162 Returns: 163 int | None: Number of nodes used by the job or `None` if not available. 164 """ 165 166 @abstractmethod 167 def get_mem(self) -> Size | None: 168 """ 169 Return the amount of memory allocated for the job. 170 171 Returns: 172 Size | None: Amount of memory allocated for the job or `None` if not available. 173 """ 174 175 @abstractmethod 176 def get_name(self) -> str | None: 177 """ 178 Return the name of the job. 179 180 Returns: 181 str | None: The name of the submitted job or `None` if not available. 182 """ 183 184 @abstractmethod 185 def get_submission_time(self) -> datetime | None: 186 """ 187 Return the timestamp when the job was submitted. 188 189 Returns: 190 datetime | None: Time when the job was submitted to the batch system 191 or `None` if not available. 192 """ 193 194 @abstractmethod 195 def get_start_time(self) -> datetime | None: 196 """ 197 Return the timestamp when the job started execution. 198 199 Returns: 200 datetime | None: Time when the job began running or 201 `None` if the job has not yet started. 202 """ 203 204 @abstractmethod 205 def get_completion_time(self) -> datetime | None: 206 """ 207 Return the timestamp when the job was completed. 208 209 Returns: 210 datetime | None: Time when the job completed or 211 `None` if the job has not yet completed. 212 """ 213 214 @abstractmethod 215 def get_modification_time(self) -> datetime | None: 216 """ 217 Return the timestamp at which the job was last modified. 218 219 Returns: 220 datetime | None: Time when the job was last modified or `None` 221 if the information is not available. 222 """ 223 224 @abstractmethod 225 def get_walltime(self) -> timedelta | None: 226 """ 227 Return the walltime limit of the job. 228 229 Returns: 230 timedelta | None: Walltime for the job or `None` if not available. 231 """ 232 233 @abstractmethod 234 def get_queue(self) -> str | None: 235 """ 236 Return the submission queue of the job. 237 238 Returns: 239 str | None: The queue this job is part of or `None` if not available. 240 """ 241 242 @abstractmethod 243 def get_util_cpu(self) -> int | None: 244 """ 245 Return the utilization of requested CPUs in percents (0-100). 246 247 Returns: 248 int | None: Utilization of requested CPUs or `None` if not available. 249 """ 250 251 @abstractmethod 252 def get_util_mem(self) -> int | None: 253 """ 254 Return the utilization of requested memory in percents (0-100). 255 256 Returns: 257 int | None: Utilization of requested memory or `None` if not available. 258 """ 259 260 @abstractmethod 261 def get_exit_code(self) -> int | None: 262 """ 263 Return the exit code of the job. 264 265 Returns: 266 int | None: Exit code of the job or `None` if exit code is not assigned. 267 """ 268 269 @abstractmethod 270 def get_input_dir(self) -> Path | None: 271 """ 272 Return path to the directory from which the job was submitted. 273 274 Returns: 275 Path | None: Path to the submission directory or `None` if not available. 276 """ 277 278 @abstractmethod 279 def get_input_machine(self) -> str | None: 280 """ 281 Return the hostname of the submission machine. 282 283 Returns: 284 str | None: Hostname of the submission machine or `None` if not available. 285 """ 286 287 @abstractmethod 288 def get_info_file(self) -> Path | None: 289 """ 290 Return path to the info file associated with this job. 291 292 Returns: 293 Path | None: Path to the qq info file or `None` if 294 this is not a qq job. 295 """ 296 297 @abstractmethod 298 def to_yaml(self) -> str: 299 """ 300 Return all information about the job from the batch system in YAML format. 301 302 Returns: 303 str: YAML-formatted string of job metadata. 304 """ 305 306 @abstractmethod 307 def get_steps(self) -> Sequence[Self]: 308 """ 309 Return a list of steps associated with this job. 310 311 Note that job step is represented by BatchJobInterface, but 312 may not contain all the values that a proper BatchJobInterface contains. 313 314 Returns: 315 Sequence[BatchJobInterface]: List of job steps. An empty list if there are none. 316 """ 317 318 @abstractmethod 319 def get_step_id(self) -> str | None: 320 """ 321 Return the step index if this job is a job step. 322 323 Returns: 324 str | None: Job step index or `None` if this is not a job step. 325 """ 326 327 @abstractmethod 328 def is_array_job(self) -> bool: 329 """ 330 Return `True` if the job is a top-level array job (not a sub-job). 331 332 Returns: 333 bool: `True` if the job is a top-level array job, else `False`. 334 """ 335 336 def is_completed(self) -> bool: 337 """ 338 Return `True` if the job is completed according to the batch system. 339 340 By default, a completed job is a job that is FINISHED or FAILED. 341 342 Returns: 343 bool: `True` if the job is completed, else `False`. 344 """ 345 return self.get_state() in {BatchState.FINISHED, BatchState.FAILED}
Abstract base class for retrieving and maintaining job information from a batch scheduling system.
Must support situations where the job information no longer exists.
The implementation of the constructor is arbitrary and should only
be used inside the corresponding implementation of BatchInterface.get_batch_job.
27 @abstractmethod 28 def is_empty(self) -> bool: 29 """ 30 Check whether the job contains any information. 31 This should return True if the job does not exist in the batch system. 32 33 Returns: 34 bool: True if the job contains no information. 35 """
Check whether the job contains any information. This should return True if the job does not exist in the batch system.
Returns:
bool: True if the job contains no information.
37 @abstractmethod 38 def get_id(self) -> str: 39 """ 40 Return the ID of the job. 41 42 Returns: 43 str: The ID of the job. 44 """
Return the ID of the job.
Returns:
str: The ID of the job.
46 @abstractmethod 47 def get_account(self) -> str | None: 48 """ 49 Return the account under which the job is submitted. 50 51 Returns: 52 str | None: Account associated with the job or None if no 53 account is defined. 54 """
Return the account under which the job is submitted.
Returns:
str | None: Account associated with the job or None if no account is defined.
56 @abstractmethod 57 def update(self) -> None: 58 """ 59 Refresh the stored job information from the batch system. 60 61 Raises: 62 QQError: If the job cannot be queried or its info updated. 63 """
Refresh the stored job information from the batch system.
Raises:
- QQError: If the job cannot be queried or its info updated.
65 @abstractmethod 66 def get_state(self) -> BatchState: 67 """ 68 Return the current state of the job as reported by the batch system. 69 70 If the job information is no longer available, return `BatchState.UNKNOWN`. 71 72 Returns: 73 BatchState: The job state according to the batch system. 74 """
Return the current state of the job as reported by the batch system.
If the job information is no longer available, return BatchState.UNKNOWN.
Returns:
BatchState: The job state according to the batch system.
76 @abstractmethod 77 def get_comment(self) -> str | None: 78 """ 79 Retrieve the batch system-provided comment for the job. 80 81 Returns: 82 str | None: The job's comment string if available, or None if the 83 batch system has not attached a comment. 84 """
Retrieve the batch system-provided comment for the job.
Returns:
str | None: The job's comment string if available, or None if the batch system has not attached a comment.
86 @abstractmethod 87 def get_estimated(self) -> tuple[datetime, str] | None: 88 """ 89 Retrieve the batch system's estimated job start time and execution node. 90 91 Returns: 92 tuple[datetime, str] | None: A tuple containing: 93 - datetime: The estimated start time of the job. 94 - str: The name of the node where the job is expected to run. 95 Returns None if either estimate is unavailable. 96 """
Retrieve the batch system's estimated job start time and execution node.
Returns:
tuple[datetime, str] | None: A tuple containing: - datetime: The estimated start time of the job. - str: The name of the node where the job is expected to run. Returns None if either estimate is unavailable.
98 @abstractmethod 99 def get_main_node(self) -> str | None: 100 """ 101 Retrieve the hostname of the main execution node for the job. 102 103 Returns: 104 str | None: The hostname of the main execution node, or ``None`` 105 if unavailable or not applicable. 106 """
Retrieve the hostname of the main execution node for the job.
Returns:
str | None: The hostname of the main execution node, or
Noneif unavailable or not applicable.
108 @abstractmethod 109 def get_nodes(self) -> list[str] | None: 110 """ 111 Retrieve the hostnames of all execution nodes allocated for the job. 112 113 Returns: 114 list[str] | None: 115 A list of hostnames or node identifiers used by the job, 116 or `None` if node information is not available. 117 """
Retrieve the hostnames of all execution nodes allocated for the job.
Returns:
list[str] | None: A list of hostnames or node identifiers used by the job, or
Noneif node information is not available.
119 @abstractmethod 120 def get_short_nodes(self) -> list[str] | None: 121 """ 122 Retrieve the short hostnames of all execution nodes allocated for the job. 123 124 Returns: 125 list[str] | None: 126 A list of short hostnames used by the job, or `None` if node information 127 is not available. 128 """
Retrieve the short hostnames of all execution nodes allocated for the job.
Returns:
list[str] | None: A list of short hostnames used by the job, or
Noneif node information is not available.
130 @abstractmethod 131 def get_user(self) -> str | None: 132 """ 133 Return the username of the job owner. 134 135 Returns: 136 str | None: Username of the user who owns the job or `None` if not available. 137 """
Return the username of the job owner.
Returns:
str | None: Username of the user who owns the job or
Noneif not available.
139 @abstractmethod 140 def get_n_cpus(self) -> int | None: 141 """ 142 Return the number of CPU cores allocated for the job. 143 144 Returns: 145 int | None: Number of CPUs allocated for the job or `None` if not available. 146 """
Return the number of CPU cores allocated for the job.
Returns:
int | None: Number of CPUs allocated for the job or
Noneif not available.
148 @abstractmethod 149 def get_n_gpus(self) -> int | None: 150 """ 151 Return the number of GPUs allocated for the job. 152 153 Returns: 154 int | None: Number of GPUs allocated for the job or `None` if not available. 155 """
Return the number of GPUs allocated for the job.
Returns:
int | None: Number of GPUs allocated for the job or
Noneif not available.
157 @abstractmethod 158 def get_n_nodes(self) -> int | None: 159 """ 160 Return the number of compute nodes assigned to the job. 161 162 Returns: 163 int | None: Number of nodes used by the job or `None` if not available. 164 """
Return the number of compute nodes assigned to the job.
Returns:
int | None: Number of nodes used by the job or
Noneif not available.
166 @abstractmethod 167 def get_mem(self) -> Size | None: 168 """ 169 Return the amount of memory allocated for the job. 170 171 Returns: 172 Size | None: Amount of memory allocated for the job or `None` if not available. 173 """
Return the amount of memory allocated for the job.
Returns:
Size | None: Amount of memory allocated for the job or
Noneif not available.
175 @abstractmethod 176 def get_name(self) -> str | None: 177 """ 178 Return the name of the job. 179 180 Returns: 181 str | None: The name of the submitted job or `None` if not available. 182 """
Return the name of the job.
Returns:
str | None: The name of the submitted job or
Noneif not available.
184 @abstractmethod 185 def get_submission_time(self) -> datetime | None: 186 """ 187 Return the timestamp when the job was submitted. 188 189 Returns: 190 datetime | None: Time when the job was submitted to the batch system 191 or `None` if not available. 192 """
Return the timestamp when the job was submitted.
Returns:
datetime | None: Time when the job was submitted to the batch system or
Noneif not available.
194 @abstractmethod 195 def get_start_time(self) -> datetime | None: 196 """ 197 Return the timestamp when the job started execution. 198 199 Returns: 200 datetime | None: Time when the job began running or 201 `None` if the job has not yet started. 202 """
Return the timestamp when the job started execution.
Returns:
datetime | None: Time when the job began running or
Noneif the job has not yet started.
204 @abstractmethod 205 def get_completion_time(self) -> datetime | None: 206 """ 207 Return the timestamp when the job was completed. 208 209 Returns: 210 datetime | None: Time when the job completed or 211 `None` if the job has not yet completed. 212 """
Return the timestamp when the job was completed.
Returns:
datetime | None: Time when the job completed or
Noneif the job has not yet completed.
214 @abstractmethod 215 def get_modification_time(self) -> datetime | None: 216 """ 217 Return the timestamp at which the job was last modified. 218 219 Returns: 220 datetime | None: Time when the job was last modified or `None` 221 if the information is not available. 222 """
Return the timestamp at which the job was last modified.
Returns:
datetime | None: Time when the job was last modified or
Noneif the information is not available.
224 @abstractmethod 225 def get_walltime(self) -> timedelta | None: 226 """ 227 Return the walltime limit of the job. 228 229 Returns: 230 timedelta | None: Walltime for the job or `None` if not available. 231 """
Return the walltime limit of the job.
Returns:
timedelta | None: Walltime for the job or
Noneif not available.
233 @abstractmethod 234 def get_queue(self) -> str | None: 235 """ 236 Return the submission queue of the job. 237 238 Returns: 239 str | None: The queue this job is part of or `None` if not available. 240 """
Return the submission queue of the job.
Returns:
str | None: The queue this job is part of or
Noneif not available.
242 @abstractmethod 243 def get_util_cpu(self) -> int | None: 244 """ 245 Return the utilization of requested CPUs in percents (0-100). 246 247 Returns: 248 int | None: Utilization of requested CPUs or `None` if not available. 249 """
Return the utilization of requested CPUs in percents (0-100).
Returns:
int | None: Utilization of requested CPUs or
Noneif not available.
251 @abstractmethod 252 def get_util_mem(self) -> int | None: 253 """ 254 Return the utilization of requested memory in percents (0-100). 255 256 Returns: 257 int | None: Utilization of requested memory or `None` if not available. 258 """
Return the utilization of requested memory in percents (0-100).
Returns:
int | None: Utilization of requested memory or
Noneif not available.
260 @abstractmethod 261 def get_exit_code(self) -> int | None: 262 """ 263 Return the exit code of the job. 264 265 Returns: 266 int | None: Exit code of the job or `None` if exit code is not assigned. 267 """
Return the exit code of the job.
Returns:
int | None: Exit code of the job or
Noneif exit code is not assigned.
269 @abstractmethod 270 def get_input_dir(self) -> Path | None: 271 """ 272 Return path to the directory from which the job was submitted. 273 274 Returns: 275 Path | None: Path to the submission directory or `None` if not available. 276 """
Return path to the directory from which the job was submitted.
Returns:
Path | None: Path to the submission directory or
Noneif not available.
278 @abstractmethod 279 def get_input_machine(self) -> str | None: 280 """ 281 Return the hostname of the submission machine. 282 283 Returns: 284 str | None: Hostname of the submission machine or `None` if not available. 285 """
Return the hostname of the submission machine.
Returns:
str | None: Hostname of the submission machine or
Noneif not available.
287 @abstractmethod 288 def get_info_file(self) -> Path | None: 289 """ 290 Return path to the info file associated with this job. 291 292 Returns: 293 Path | None: Path to the qq info file or `None` if 294 this is not a qq job. 295 """
Return path to the info file associated with this job.
Returns:
Path | None: Path to the qq info file or
Noneif this is not a qq job.
297 @abstractmethod 298 def to_yaml(self) -> str: 299 """ 300 Return all information about the job from the batch system in YAML format. 301 302 Returns: 303 str: YAML-formatted string of job metadata. 304 """
Return all information about the job from the batch system in YAML format.
Returns:
str: YAML-formatted string of job metadata.
306 @abstractmethod 307 def get_steps(self) -> Sequence[Self]: 308 """ 309 Return a list of steps associated with this job. 310 311 Note that job step is represented by BatchJobInterface, but 312 may not contain all the values that a proper BatchJobInterface contains. 313 314 Returns: 315 Sequence[BatchJobInterface]: List of job steps. An empty list if there are none. 316 """
Return a list of steps associated with this job.
Note that job step is represented by BatchJobInterface, but may not contain all the values that a proper BatchJobInterface contains.
Returns:
Sequence[BatchJobInterface]: List of job steps. An empty list if there are none.
318 @abstractmethod 319 def get_step_id(self) -> str | None: 320 """ 321 Return the step index if this job is a job step. 322 323 Returns: 324 str | None: Job step index or `None` if this is not a job step. 325 """
Return the step index if this job is a job step.
Returns:
str | None: Job step index or
Noneif this is not a job step.
327 @abstractmethod 328 def is_array_job(self) -> bool: 329 """ 330 Return `True` if the job is a top-level array job (not a sub-job). 331 332 Returns: 333 bool: `True` if the job is a top-level array job, else `False`. 334 """
Return True if the job is a top-level array job (not a sub-job).
Returns:
bool:
Trueif the job is a top-level array job, elseFalse.
336 def is_completed(self) -> bool: 337 """ 338 Return `True` if the job is completed according to the batch system. 339 340 By default, a completed job is a job that is FINISHED or FAILED. 341 342 Returns: 343 bool: `True` if the job is completed, else `False`. 344 """ 345 return self.get_state() in {BatchState.FINISHED, BatchState.FAILED}
Return True if the job is completed according to the batch system.
By default, a completed job is a job that is FINISHED or FAILED.
Returns:
bool:
Trueif the job is completed, elseFalse.
10class BatchNodeInterface(ABC): 11 """ 12 Abstract base class for obtaining information about compute nodes. 13 14 The implementation of the constructor is arbitrary and should only 15 be used inside the corresponding implementation of `BatchInterface.get_nodes`. 16 """ 17 18 @abstractmethod 19 def update(self) -> None: 20 """ 21 Refresh the stored node information from the batch system. 22 23 Raises: 24 QQError: If the node cannot be queried or its info updated. 25 """ 26 27 @abstractmethod 28 def get_name(self) -> str: 29 """ 30 Retrieve the name of the node. 31 32 Returns: 33 str: The name identifying the node in the batch system. 34 """ 35 36 @abstractmethod 37 def get_n_cpus(self) -> int | None: 38 """ 39 Retrieve the total number of CPU cores available on the node. 40 41 Returns: 42 int | None: Total CPU core count or `None` if not available. 43 """ 44 45 @abstractmethod 46 def get_n_free_cpus(self) -> int | None: 47 """ 48 Retrieve the number of currently available (unallocated) CPU cores. 49 50 Returns: 51 int | None: Number of free CPU cores or `None` if not available. 52 """ 53 54 @abstractmethod 55 def get_n_gpus(self) -> int | None: 56 """ 57 Retrieve the total number of GPUs available on the node. 58 59 Returns: 60 int | None: Total GPU count or `None` if not available.. 61 """ 62 63 @abstractmethod 64 def get_n_free_gpus(self) -> int | None: 65 """ 66 Retrieve the number of currently available (unallocated) GPUs. 67 68 Returns: 69 int | None: Number of free GPUs or `None` if not available. 70 """ 71 72 @abstractmethod 73 def get_cpu_memory(self) -> Size | None: 74 """ 75 Retrieve the total CPU memory capacity of the node. 76 77 Returns: 78 Size | None: Total CPU memory available on the node or `None` if not available. 79 """ 80 81 @abstractmethod 82 def get_free_cpu_memory(self) -> Size | None: 83 """ 84 Retrieve the currently available CPU memory. 85 86 Returns: 87 Size | None: Free (unused) CPU memory or `None` if not available. 88 """ 89 90 @abstractmethod 91 def get_gpu_memory(self) -> Size | None: 92 """ 93 Retrieve the total GPU memory capacity of the node. 94 95 Returns: 96 Size | None: Total GPU memory available or `None` if not available. 97 """ 98 99 @abstractmethod 100 def get_free_gpu_memory(self) -> Size | None: 101 """ 102 Retrieve the currently available GPU memory. 103 104 Returns: 105 Size | None: Free (unused) GPU memory or `None` if not available. 106 """ 107 108 @abstractmethod 109 def get_local_scratch(self) -> Size | None: 110 """ 111 Retrieve the total local scratch storage capacity of the node. 112 113 Returns: 114 Size | None: Total size of local scratch space or `None` if not available. 115 """ 116 117 @abstractmethod 118 def get_free_local_scratch(self) -> Size | None: 119 """ 120 Retrieve the available local scratch storage space. 121 122 Returns: 123 Size | None: Free local scratch space or `None` if not available. 124 """ 125 126 @abstractmethod 127 def get_ssd_scratch(self) -> Size | None: 128 """ 129 Retrieve the total SSD-based scratch storage capacity. 130 131 Returns: 132 Size | None: Total SSD scratch capacity or `None` if not available. 133 """ 134 135 @abstractmethod 136 def get_free_ssd_scratch(self) -> Size | None: 137 """ 138 Retrieve the currently available SSD-based scratch storage space. 139 140 Returns: 141 Size | None: Free SSD scratch space or `None` if not available. 142 """ 143 144 @abstractmethod 145 def get_shared_scratch(self) -> Size | None: 146 """ 147 Retrieve the total capacity of shared scratch storage accessible from the node. 148 149 Returns: 150 Size | None: Total shared scratch capacity or `None` if not available. 151 """ 152 153 @abstractmethod 154 def get_free_shared_scratch(self) -> Size | None: 155 """ 156 Retrieve the available space in shared scratch storage. 157 158 Returns: 159 Size | None: Free shared scratch space or `None` if not available. 160 """ 161 162 @abstractmethod 163 def get_properties(self) -> list[str]: 164 """ 165 Get the list of properties or labels assigned to the node. 166 167 Returns: 168 list[str]: List of node property strings. 169 """ 170 171 @abstractmethod 172 def is_available_to_user(self, user: str) -> bool: 173 """ 174 Check if the node is available to the specified user. 175 176 Args: 177 user (str): The username to check access for. 178 179 Returns: 180 bool: True if the node is up and schedulable, False otherwise. 181 """ 182 183 @abstractmethod 184 def to_yaml(self) -> str: 185 """ 186 Return all information about the node in YAML format. 187 188 Returns: 189 str: YAML-formatted string of node metadata. 190 """
Abstract base class for obtaining information about compute nodes.
The implementation of the constructor is arbitrary and should only
be used inside the corresponding implementation of BatchInterface.get_nodes.
18 @abstractmethod 19 def update(self) -> None: 20 """ 21 Refresh the stored node information from the batch system. 22 23 Raises: 24 QQError: If the node cannot be queried or its info updated. 25 """
Refresh the stored node information from the batch system.
Raises:
- QQError: If the node cannot be queried or its info updated.
27 @abstractmethod 28 def get_name(self) -> str: 29 """ 30 Retrieve the name of the node. 31 32 Returns: 33 str: The name identifying the node in the batch system. 34 """
Retrieve the name of the node.
Returns:
str: The name identifying the node in the batch system.
36 @abstractmethod 37 def get_n_cpus(self) -> int | None: 38 """ 39 Retrieve the total number of CPU cores available on the node. 40 41 Returns: 42 int | None: Total CPU core count or `None` if not available. 43 """
Retrieve the total number of CPU cores available on the node.
Returns:
int | None: Total CPU core count or
Noneif not available.
45 @abstractmethod 46 def get_n_free_cpus(self) -> int | None: 47 """ 48 Retrieve the number of currently available (unallocated) CPU cores. 49 50 Returns: 51 int | None: Number of free CPU cores or `None` if not available. 52 """
Retrieve the number of currently available (unallocated) CPU cores.
Returns:
int | None: Number of free CPU cores or
Noneif not available.
54 @abstractmethod 55 def get_n_gpus(self) -> int | None: 56 """ 57 Retrieve the total number of GPUs available on the node. 58 59 Returns: 60 int | None: Total GPU count or `None` if not available.. 61 """
Retrieve the total number of GPUs available on the node.
Returns:
int | None: Total GPU count or
Noneif not available..
63 @abstractmethod 64 def get_n_free_gpus(self) -> int | None: 65 """ 66 Retrieve the number of currently available (unallocated) GPUs. 67 68 Returns: 69 int | None: Number of free GPUs or `None` if not available. 70 """
Retrieve the number of currently available (unallocated) GPUs.
Returns:
int | None: Number of free GPUs or
Noneif not available.
72 @abstractmethod 73 def get_cpu_memory(self) -> Size | None: 74 """ 75 Retrieve the total CPU memory capacity of the node. 76 77 Returns: 78 Size | None: Total CPU memory available on the node or `None` if not available. 79 """
Retrieve the total CPU memory capacity of the node.
Returns:
Size | None: Total CPU memory available on the node or
Noneif not available.
81 @abstractmethod 82 def get_free_cpu_memory(self) -> Size | None: 83 """ 84 Retrieve the currently available CPU memory. 85 86 Returns: 87 Size | None: Free (unused) CPU memory or `None` if not available. 88 """
Retrieve the currently available CPU memory.
Returns:
Size | None: Free (unused) CPU memory or
Noneif not available.
90 @abstractmethod 91 def get_gpu_memory(self) -> Size | None: 92 """ 93 Retrieve the total GPU memory capacity of the node. 94 95 Returns: 96 Size | None: Total GPU memory available or `None` if not available. 97 """
Retrieve the total GPU memory capacity of the node.
Returns:
Size | None: Total GPU memory available or
Noneif not available.
99 @abstractmethod 100 def get_free_gpu_memory(self) -> Size | None: 101 """ 102 Retrieve the currently available GPU memory. 103 104 Returns: 105 Size | None: Free (unused) GPU memory or `None` if not available. 106 """
Retrieve the currently available GPU memory.
Returns:
Size | None: Free (unused) GPU memory or
Noneif not available.
108 @abstractmethod 109 def get_local_scratch(self) -> Size | None: 110 """ 111 Retrieve the total local scratch storage capacity of the node. 112 113 Returns: 114 Size | None: Total size of local scratch space or `None` if not available. 115 """
Retrieve the total local scratch storage capacity of the node.
Returns:
Size | None: Total size of local scratch space or
Noneif not available.
117 @abstractmethod 118 def get_free_local_scratch(self) -> Size | None: 119 """ 120 Retrieve the available local scratch storage space. 121 122 Returns: 123 Size | None: Free local scratch space or `None` if not available. 124 """
Retrieve the available local scratch storage space.
Returns:
Size | None: Free local scratch space or
Noneif not available.
126 @abstractmethod 127 def get_ssd_scratch(self) -> Size | None: 128 """ 129 Retrieve the total SSD-based scratch storage capacity. 130 131 Returns: 132 Size | None: Total SSD scratch capacity or `None` if not available. 133 """
Retrieve the total SSD-based scratch storage capacity.
Returns:
Size | None: Total SSD scratch capacity or
Noneif not available.
135 @abstractmethod 136 def get_free_ssd_scratch(self) -> Size | None: 137 """ 138 Retrieve the currently available SSD-based scratch storage space. 139 140 Returns: 141 Size | None: Free SSD scratch space or `None` if not available. 142 """
Retrieve the currently available SSD-based scratch storage space.
Returns:
Size | None: Free SSD scratch space or
Noneif not available.
162 @abstractmethod 163 def get_properties(self) -> list[str]: 164 """ 165 Get the list of properties or labels assigned to the node. 166 167 Returns: 168 list[str]: List of node property strings. 169 """
Get the list of properties or labels assigned to the node.
Returns:
list[str]: List of node property strings.
171 @abstractmethod 172 def is_available_to_user(self, user: str) -> bool: 173 """ 174 Check if the node is available to the specified user. 175 176 Args: 177 user (str): The username to check access for. 178 179 Returns: 180 bool: True if the node is up and schedulable, False otherwise. 181 """
Check if the node is available to the specified user.
Arguments:
- user (str): The username to check access for.
Returns:
bool: True if the node is up and schedulable, False otherwise.
183 @abstractmethod 184 def to_yaml(self) -> str: 185 """ 186 Return all information about the node in YAML format. 187 188 Returns: 189 str: YAML-formatted string of node metadata. 190 """
Return all information about the node in YAML format.
Returns:
str: YAML-formatted string of node metadata.
11class BatchQueueInterface(ABC): 12 """ 13 Abstract base class for retrieving and maintaining queue information 14 from a batch scheduling system. 15 16 The implementation of the constructor is arbitrary and should only 17 be used inside the corresponding implementation of `BatchInterface.get_queues`. 18 """ 19 20 @abstractmethod 21 def update(self) -> None: 22 """ 23 Refresh the stored queue information from the batch system. 24 25 Raises: 26 QQError: If the queue cannot be queried or its info updated. 27 """ 28 29 @abstractmethod 30 def get_name(self) -> str: 31 """ 32 Retrieve the name of the queue. 33 34 Returns: 35 str: The name identifying this queue in the batch system. 36 """ 37 38 @abstractmethod 39 def get_priority(self) -> str | None: 40 """ 41 Retrieve the scheduling priority of the queue. 42 43 Returns: 44 str | None: The queue priority, or None if priority information 45 is not available. 46 """ 47 48 @abstractmethod 49 def get_total_jobs(self) -> int | None: 50 """ 51 Retrieve the total number of jobs currently in the queue. 52 53 Returns: 54 int | None: The total count of jobs, regardless of status 55 or `None` if the information is not available. 56 """ 57 58 @abstractmethod 59 def get_running_jobs(self) -> int | None: 60 """ 61 Retrieve the number of jobs currently running in the queue. 62 63 Returns: 64 int | None: The number of running jobs or `None` 65 if the information is not available. 66 """ 67 68 @abstractmethod 69 def get_queued_jobs(self) -> int | None: 70 """ 71 Retrieve the number of jobs waiting to start in the queue. 72 73 Returns: 74 int | None: The number of queued jobs or `None` 75 if the information is not available. 76 """ 77 78 @abstractmethod 79 def get_other_jobs(self) -> int | None: 80 """ 81 Retrieve the number of jobs in other states (non-running and non-queued). 82 83 Returns: 84 int | None: The number of jobs that are neither running nor queued, 85 such as exiting or suspended jobs. 86 Returns `None` if the information is not available. 87 """ 88 89 @abstractmethod 90 def get_max_walltime(self) -> timedelta | None: 91 """ 92 Retrieve the maximum walltime allowed for jobs in the queue. 93 94 Returns: 95 timedelta | None: The walltime limit, or None if unlimited or unknown. 96 """ 97 98 @abstractmethod 99 def get_max_n_nodes(self) -> int | None: 100 """ 101 Retrieve the maximum number of nodes that can be requested in the queue. 102 103 Returns: 104 int | None: The maximum number of nodes that can be requested, or None if unlimited or unknown. 105 """ 106 107 @abstractmethod 108 def get_comment(self) -> str | None: 109 """ 110 Retrieve the comment or description associated with the queue. 111 112 Returns: 113 str | None: The human-readable comment or note about the queue 114 or `None` if the information is not available. 115 """ 116 117 @abstractmethod 118 def is_available_to_user(self, user: str) -> bool: 119 """ 120 Check whether the specified user has access to this queue. 121 122 Args: 123 user (str): The username to check access for. 124 125 Returns: 126 bool: True if the user can submit jobs to this queue, False otherwise. 127 """ 128 129 @abstractmethod 130 def get_destinations(self) -> list[str]: 131 """ 132 Retrieve all destinations available for this queue route. 133 134 Returns: 135 list[str]: A list of destination queue names associated with the queue. 136 """ 137 138 @abstractmethod 139 def from_route_only(self) -> bool: 140 """ 141 Determine whether this queue can only be accessed via a route. 142 143 Returns: 144 bool: True if the queue is accessible exclusively through a route, 145 False otherwise. 146 """ 147 148 @abstractmethod 149 def to_yaml(self) -> str: 150 """ 151 Return all information about the queue from the batch system in YAML format. 152 153 Returns: 154 str: YAML-formatted string of queue metadata. 155 """ 156 157 @abstractmethod 158 def get_default_resources(self) -> Resources: 159 """ 160 Return the default resource definitions for this queue. 161 162 Returns: 163 Resources: Default resources allocated for jobs submitted to this queue. 164 """
Abstract base class for retrieving and maintaining queue information from a batch scheduling system.
The implementation of the constructor is arbitrary and should only
be used inside the corresponding implementation of BatchInterface.get_queues.
20 @abstractmethod 21 def update(self) -> None: 22 """ 23 Refresh the stored queue information from the batch system. 24 25 Raises: 26 QQError: If the queue cannot be queried or its info updated. 27 """
Refresh the stored queue information from the batch system.
Raises:
- QQError: If the queue cannot be queried or its info updated.
29 @abstractmethod 30 def get_name(self) -> str: 31 """ 32 Retrieve the name of the queue. 33 34 Returns: 35 str: The name identifying this queue in the batch system. 36 """
Retrieve the name of the queue.
Returns:
str: The name identifying this queue in the batch system.
38 @abstractmethod 39 def get_priority(self) -> str | None: 40 """ 41 Retrieve the scheduling priority of the queue. 42 43 Returns: 44 str | None: The queue priority, or None if priority information 45 is not available. 46 """
Retrieve the scheduling priority of the queue.
Returns:
str | None: The queue priority, or None if priority information is not available.
48 @abstractmethod 49 def get_total_jobs(self) -> int | None: 50 """ 51 Retrieve the total number of jobs currently in the queue. 52 53 Returns: 54 int | None: The total count of jobs, regardless of status 55 or `None` if the information is not available. 56 """
Retrieve the total number of jobs currently in the queue.
Returns:
int | None: The total count of jobs, regardless of status or
Noneif the information is not available.
58 @abstractmethod 59 def get_running_jobs(self) -> int | None: 60 """ 61 Retrieve the number of jobs currently running in the queue. 62 63 Returns: 64 int | None: The number of running jobs or `None` 65 if the information is not available. 66 """
Retrieve the number of jobs currently running in the queue.
Returns:
int | None: The number of running jobs or
Noneif the information is not available.
68 @abstractmethod 69 def get_queued_jobs(self) -> int | None: 70 """ 71 Retrieve the number of jobs waiting to start in the queue. 72 73 Returns: 74 int | None: The number of queued jobs or `None` 75 if the information is not available. 76 """
Retrieve the number of jobs waiting to start in the queue.
Returns:
int | None: The number of queued jobs or
Noneif the information is not available.
78 @abstractmethod 79 def get_other_jobs(self) -> int | None: 80 """ 81 Retrieve the number of jobs in other states (non-running and non-queued). 82 83 Returns: 84 int | None: The number of jobs that are neither running nor queued, 85 such as exiting or suspended jobs. 86 Returns `None` if the information is not available. 87 """
Retrieve the number of jobs in other states (non-running and non-queued).
Returns:
int | None: The number of jobs that are neither running nor queued, such as exiting or suspended jobs. Returns
Noneif the information is not available.
89 @abstractmethod 90 def get_max_walltime(self) -> timedelta | None: 91 """ 92 Retrieve the maximum walltime allowed for jobs in the queue. 93 94 Returns: 95 timedelta | None: The walltime limit, or None if unlimited or unknown. 96 """
Retrieve the maximum walltime allowed for jobs in the queue.
Returns:
timedelta | None: The walltime limit, or None if unlimited or unknown.
98 @abstractmethod 99 def get_max_n_nodes(self) -> int | None: 100 """ 101 Retrieve the maximum number of nodes that can be requested in the queue. 102 103 Returns: 104 int | None: The maximum number of nodes that can be requested, or None if unlimited or unknown. 105 """
Retrieve the maximum number of nodes that can be requested in the queue.
Returns:
int | None: The maximum number of nodes that can be requested, or None if unlimited or unknown.
107 @abstractmethod 108 def get_comment(self) -> str | None: 109 """ 110 Retrieve the comment or description associated with the queue. 111 112 Returns: 113 str | None: The human-readable comment or note about the queue 114 or `None` if the information is not available. 115 """
Retrieve the comment or description associated with the queue.
Returns:
str | None: The human-readable comment or note about the queue or
Noneif the information is not available.
117 @abstractmethod 118 def is_available_to_user(self, user: str) -> bool: 119 """ 120 Check whether the specified user has access to this queue. 121 122 Args: 123 user (str): The username to check access for. 124 125 Returns: 126 bool: True if the user can submit jobs to this queue, False otherwise. 127 """
Check whether the specified user has access to this queue.
Arguments:
- user (str): The username to check access for.
Returns:
bool: True if the user can submit jobs to this queue, False otherwise.
129 @abstractmethod 130 def get_destinations(self) -> list[str]: 131 """ 132 Retrieve all destinations available for this queue route. 133 134 Returns: 135 list[str]: A list of destination queue names associated with the queue. 136 """
Retrieve all destinations available for this queue route.
Returns:
list[str]: A list of destination queue names associated with the queue.
138 @abstractmethod 139 def from_route_only(self) -> bool: 140 """ 141 Determine whether this queue can only be accessed via a route. 142 143 Returns: 144 bool: True if the queue is accessible exclusively through a route, 145 False otherwise. 146 """
Determine whether this queue can only be accessed via a route.
Returns:
bool: True if the queue is accessible exclusively through a route, False otherwise.
148 @abstractmethod 149 def to_yaml(self) -> str: 150 """ 151 Return all information about the queue from the batch system in YAML format. 152 153 Returns: 154 str: YAML-formatted string of queue metadata. 155 """
Return all information about the queue from the batch system in YAML format.
Returns:
str: YAML-formatted string of queue metadata.
157 @abstractmethod 158 def get_default_resources(self) -> Resources: 159 """ 160 Return the default resource definitions for this queue. 161 162 Returns: 163 Resources: Default resources allocated for jobs submitted to this queue. 164 """
Return the default resource definitions for this queue.
Returns:
Resources: Default resources allocated for jobs submitted to this queue.