qq_lib.batch.interface

Abstractions for integrating qq with HPC batch scheduling systems.

This module defines the core interfaces that allow qq to interact with multiple batch systems through a unified API. It provides:

  • BatchInterface: the central abstract interface that every batch-system backend implements. It defines operations such as job submission, job querying, directory synchronization, remote file access, resubmission, and navigation to compute nodes.

  • BatchJobInterface, BatchNodeInterface, and BatchQueueInterface: lightweight abstractions representing jobs, nodes, and queues as reported by the underlying scheduler. These interfaces expose normalized metadata and allow qq to present consistent job/queue/node information regardless of scheduler differences.

  • AnyBatchClass: a type alias for batch system implementations.

 1# Released under MIT License.
 2# Copyright (c) 2025-2026 Ladislav Bartos and Robert Vacha Lab
 3
 4"""
 5Abstractions for integrating qq with HPC batch scheduling systems.
 6
 7This module defines the core interfaces that allow qq to interact with
 8multiple batch systems through a unified API. It provides:
 9
10- `BatchInterface`: the central abstract interface that every batch-system
11  backend implements. It defines operations such as job submission, job
12  querying, directory synchronization, remote file access, resubmission, and
13  navigation to compute nodes.
14
15- `BatchJobInterface`, `BatchNodeInterface`, and `BatchQueueInterface`:
16  lightweight abstractions representing jobs, nodes, and queues as reported
17  by the underlying scheduler. These interfaces expose normalized metadata
18  and allow qq to present consistent job/queue/node information regardless
19  of scheduler differences.
20
21- `AnyBatchClass`: a type alias for batch system implementations.
22"""
23
24from .interface import AnyBatchClass, BatchInterface
25from .job import BatchJobInterface
26from .node import BatchNodeInterface
27from .queue import BatchQueueInterface
28
29__all__ = [
30    "BatchInterface",
31    "BatchJobInterface",
32    "BatchNodeInterface",
33    "BatchQueueInterface",
34    "AnyBatchClass",
35]
class BatchInterface(abc.ABC, typing.Generic[TBatchJob, TBatchQueue, TBatchNode]):
 133class BatchInterface[
 134    TBatchJob: BatchJobInterface = BatchJobInterface,
 135    TBatchQueue: BatchQueueInterface = BatchQueueInterface,
 136    TBatchNode: BatchNodeInterface = BatchNodeInterface,
 137](ABC, metaclass=_BatchMeta):
 138    """
 139    Abstract base class for batch system integrations.
 140
 141    Concrete batch system classes must implement these methods to allow
 142    qq to interact with different batch systems uniformly.
 143
 144    All functions should raise QQError when encountering an error.
 145    """
 146
 147    # magic number indicating unreachable directory when navigating to it
 148    _CD_FAIL = 94
 149    # exit code of ssh if connection fails
 150    _SSH_FAIL = 255
 151
 152    @classmethod
 153    def env_name(cls) -> str:
 154        """
 155        Return the name of the batch system environment.
 156
 157        Returns:
 158            str: The batch system name.
 159        """
 160        raise NotImplementedError(
 161            f"env_name method is not implemented for {cls.__name__}"
 162        )
 163
 164    @classmethod
 165    def is_available(cls) -> bool:
 166        """
 167        Determine whether the batch system is available on the current host.
 168
 169        Implementations typically verify this by checking for the presence
 170        of required commands or other environment-specific indicators.
 171
 172        Returns:
 173            bool: True if the batch system is available, False otherwise.
 174        """
 175        raise NotImplementedError(
 176            f"is_available method is not implemented for {cls.__name__}"
 177        )
 178
 179    @classmethod
 180    def get_job_id(cls) -> str | None:
 181        """
 182        Get the id of the current job from the corresponding batch system's environment variable.
 183
 184        For this method to work, it has to be called from the inside of an active job.
 185
 186        Returns:
 187            str | None: Index of the job or None if the collective variable is not set.
 188        """
 189        raise NotImplementedError(
 190            f"get_job_id method is not implemented for {cls.__name__}"
 191        )
 192
 193    @classmethod
 194    def create_work_dir_on_scratch(cls, job_id: str) -> Path:
 195        """
 196        Create the working directory on scratch for the given job.
 197
 198        Args:
 199            job_id (int): Unique identifier of the job.
 200
 201        Returns:
 202            Path: Absolute path to the working directory on scratch.
 203
 204        Raises:
 205            QQError: If the working directory could not be created.
 206        """
 207        raise NotImplementedError(
 208            f"create_work_dir_on_scratch method is not implemented for {cls.__name__}"
 209        )
 210
 211    @classmethod
 212    def job_submit(
 213        cls,
 214        res: Resources,
 215        queue: str,
 216        script: Path,
 217        job_name: str,
 218        depend: list[Depend],
 219        env_vars: dict[str, str],
 220        account: str | None = None,
 221        server: str | None = None,
 222        remote_host: str | None = None,
 223    ) -> str:
 224        """
 225        Submit a job to the batch system.
 226
 227        Can also perform additional validation of the job's resources.
 228
 229        This method is NOT guaranteed to be thread-safe.
 230
 231        Args:
 232            res (Resources): Resources required for the job.
 233            queue (str): Target queue for the job submission.
 234            script (Path): Path to the script to execute.
 235            job_name (str): Name of the job to use.
 236            depend (list[Depend]): List of job dependencies.
 237            env_vars (dict[str, str]): Dictionary of environment variables to propagate to the job.
 238            account (str | None): Optional account name to use for the job.
 239            server (str | None): Optional name of the server to submit the job to.
 240            remote_host (str | None): Optional name of the machine to submit the job from.
 241
 242        Returns:
 243            str: Unique ID of the submitted job.
 244
 245        Raises:
 246            QQError: If the job submission fails.
 247        """
 248        raise NotImplementedError(
 249            f"job_submit method is not implemented for {cls.__name__}"
 250        )
 251
 252    @classmethod
 253    def job_kill(cls, job_id: str) -> None:
 254        """
 255        Terminate a job gracefully. The job should have time for proper cleanup.
 256
 257        Args:
 258            job_id (str): Identifier of the job to terminate.
 259
 260        Raises:
 261            QQError: If the job could not be killed.
 262        """
 263        raise NotImplementedError(
 264            f"job_kill method is not implemented for {cls.__name__}"
 265        )
 266
 267    @classmethod
 268    def job_kill_force(cls, job_id: str) -> None:
 269        """
 270        Forcefully terminate a job. There may be no time for proper cleanup.
 271
 272        Args:
 273            job_id (str): Identifier of the job to forcefully terminate.
 274
 275        Raises:
 276            QQError: If the job could not be killed.
 277        """
 278        raise NotImplementedError(
 279            f"job_kill_force method is not implemented for {cls.__name__}"
 280        )
 281
 282    @classmethod
 283    def get_batch_job(cls, job_id: str) -> TBatchJob:
 284        """
 285        Retrieve information about a job from the batch system.
 286
 287        The returned object should be fully initialized, even if the job
 288        no longer exists or its information is unavailable.
 289
 290        Args:
 291            job_id (str): Identifier of the job.
 292
 293        Returns:
 294            BatchJobInterface: Object containing the job's metadata and state.
 295        """
 296        raise NotImplementedError(
 297            f"get_batch_job method is not implemented for {cls.__name__}"
 298        )
 299
 300    @classmethod
 301    def get_batch_jobs_from_ids(cls, job_ids: list[str]) -> list[TBatchJob]:
 302        """
 303        Retrieve information about multiple jobs from the batch system.
 304
 305        Batch jobs should be returned in the same order as they appear in `job_ids`.
 306        A TBatchJob object should be returned for each job id, even if the job
 307        no longer exists or its information is unavailable.
 308
 309        Array jobs should NOT be expanded into their individual tasks.
 310
 311        The default implementation is to call `get_batch_job` for each job id.
 312        This implementation may be inefficient for large numbers of job ids and
 313        should be overriden by subclasses.
 314
 315        Args:
 316            job_ids (list[str]): List of job identifiers.
 317
 318        Returns:
 319            list[TBatchJob]: List of TBatchJob objects, one for each job id.
 320        """
 321        return [cls.get_batch_job(id) for id in job_ids]
 322
 323    @classmethod
 324    def get_unfinished_batch_jobs(
 325        cls, user: str, server: str | None = None
 326    ) -> list[TBatchJob]:
 327        """
 328        Retrieve information about all uncompleted jobs submitted by `user`
 329        on the specified or default batch server.
 330
 331        The jobs can be returned in arbitrary order.
 332
 333        Args:
 334            user (str): Username for which to fetch uncompleted jobs.
 335            server (str | None): Optional name of the batch server to get jobs from.
 336
 337        Returns:
 338            list[BatchJobInterface]: A list of job info objects representing the user's uncompleted jobs.
 339        """
 340        raise NotImplementedError(
 341            f"get_unfinished_batch_jobs method is not implemented for {cls.__name__}"
 342        )
 343
 344    @classmethod
 345    def get_batch_jobs(cls, user: str, server: str | None = None) -> list[TBatchJob]:
 346        """
 347        Retrieve information about all jobs submitted by a specific user (including finished jobs)
 348        on the specified or default batch server.
 349
 350        The jobs can be returned in arbitrary order.
 351
 352        Args:
 353            user (str): Username for which to fetch all jobs.
 354            server (str | None): Optional name of the batch server to get jobs from.
 355
 356        Returns:
 357            list[BatchJobInterface]: A list of job info objects representing all jobs of the user.
 358        """
 359        raise NotImplementedError(
 360            f"get_batch_jobs method is not implemented for {cls.__name__}"
 361        )
 362
 363    @classmethod
 364    def get_all_unfinished_batch_jobs(
 365        cls, server: str | None = None
 366    ) -> list[TBatchJob]:
 367        """
 368        Retrieve information about uncompleted jobs of all users on the specified or default batch server.
 369
 370        The jobs can be returned in arbitrary order.
 371
 372        Args:
 373            server (str | None): Optional name of the batch server to get jobs from.
 374
 375        Returns:
 376            list[BatchJobInterface]: A list of job info objects representing uncompleted jobs of all users.
 377        """
 378        raise NotImplementedError(
 379            f"get_all_unfinished_batch_jobs method is not implemented for {cls.__name__}"
 380        )
 381
 382    @classmethod
 383    def get_all_batch_jobs(cls, server: str | None = None) -> list[TBatchJob]:
 384        """
 385        Retrieve information about all jobs of all users on the specified or default batch server.
 386
 387        The jobs can be returned in arbitrary order.
 388
 389        Args:
 390            server (str | None): Optional name of the batch server to get jobs from.
 391
 392        Returns:
 393            list[BatchJobInterface]: A list of job info objects representing all jobs of all users.
 394        """
 395        raise NotImplementedError(
 396            f"get_all_batch_jobs method is not implemented for {cls.__name__}"
 397        )
 398
 399    @classmethod
 400    def get_queues(cls, server: str | None = None) -> list[TBatchQueue]:
 401        """
 402        Retrieve all queues managed by the batch system on the specified or default batch server.
 403
 404        Args:
 405            server (str | None): Optional name of the batch server to get queues from.
 406
 407        Returns:
 408            list[BatchQueueInterface]: A list of queue objects existing in the batch system.
 409        """
 410        raise NotImplementedError(
 411            f"get_queues method is not implemented for {cls.__name__}"
 412        )
 413
 414    @classmethod
 415    def get_nodes(cls, server: str | None = None) -> list[TBatchNode]:
 416        """
 417        Retrieve all nodes managed by the batch system on the specified or default batch server.
 418
 419        Args:
 420            server (str | None): Optional name of the batch server to get nodes from.
 421
 422        Returns:
 423            list[BatchNodeInterface]: A list of node objects existing in the batch system.
 424        """
 425        raise NotImplementedError(
 426            f"get_nodes method is not implemented for {cls.__name__}"
 427        )
 428
 429    @classmethod
 430    def get_supported_work_dir_types(cls) -> list[str]:
 431        """
 432        Retrieve the list of supported types of working directories
 433        (i.e., strings that can be used with the `--work-dir` option).
 434
 435        Returns:
 436            list[str]: A list of supported types of working directories.
 437        """
 438        raise NotImplementedError(
 439            f"get_supported_work_dir_types method is not implemented for {cls.__name__}"
 440        )
 441
 442    @classmethod
 443    def navigate_to_destination(cls, host: str, directory: Path) -> None:
 444        """
 445        Open a new terminal on the specified host and change the working directory
 446        to the given path, handing control over to the user.
 447
 448        Default behavior:
 449            - If the target host is different from the current host, SSH is used
 450            to connect and `cd` is executed to switch to the directory.
 451            Note that the timeout for the SSH connection is set to `CFG.timeouts.ssh` seconds.
 452            - If the target host matches the current host, only `cd` is used.
 453
 454        A new terminal should always be opened, regardless of the host.
 455
 456        Args:
 457            host (str): Hostname where the directory is located.
 458            directory (Path): Directory path to navigate to.
 459
 460        Raises:
 461            QQError: If the navigation fails.
 462        """
 463        # if the directory is on the current host, we do not need to use ssh
 464        if host == socket.getfqdn():
 465            cls._navigate_same_host(directory)
 466            return
 467
 468        # the directory is on an another node
 469        ssh_command = cls._translate_ssh_command(host, directory)
 470        logger.debug(f"Using ssh: '{' '.join(ssh_command)}'")
 471        result = subprocess.run(ssh_command)
 472
 473        # the subprocess exit code can come from:
 474        # - SSH itself failing - returns _SSH_FAIL
 475        # - the explicit exit code we set if 'cd' to the directory fails - returns _CD_FAIL
 476        # - the exit code of the last command the user runs in the interactive shell
 477        #
 478        # we ignore user exit codes entirely and only treat _SSH_FAIL and _CD_FAIL as errors
 479        if result.returncode == cls._SSH_FAIL:
 480            raise QQError(
 481                f"Could not reach '{host}:{str(directory)}': Could not connect to host."
 482            )
 483        if result.returncode == cls._CD_FAIL:
 484            raise QQError(
 485                f"Could not reach '{host}:{str(directory)}': Could not change directory."
 486            )
 487
 488    @classmethod
 489    def read_remote_file(cls, host: str, file: Path) -> str:
 490        """
 491        Read the contents of a file on a remote host and return it as a string.
 492
 493        The default implementation uses SSH to retrieve the file contents.
 494        This approach may be inefficient on shared storage or high-latency networks.
 495        Note that the timeout for the SSH connection is set to `CFG.timeouts.ssh` seconds.
 496
 497        Subclasses should override this method to provide a more efficient implementation
 498        if possible.
 499
 500        Args:
 501            host (str): The hostname of the remote machine where the file resides.
 502            file (Path): The path to the file on the remote host.
 503
 504        Returns:
 505            str: The contents of the remote file.
 506
 507        Raises:
 508            QQError: If the file cannot be read or SSH fails.
 509        """
 510        result = subprocess.run(
 511            [
 512                "ssh",
 513                "-o PasswordAuthentication=no",
 514                "-o GSSAPIAuthentication=yes",
 515                "-o StrictHostKeyChecking=no",  # allow unknown hosts
 516                f"-o ConnectTimeout={CFG.timeouts.ssh}",
 517                "-q",  # suppress some SSH messages
 518                host,
 519                f"cat {file}",
 520            ],
 521            capture_output=True,
 522            text=True,
 523        )
 524
 525        if result.returncode != 0:
 526            raise QQError(
 527                f"Could not read remote file '{file}' on '{host}': {result.stderr.strip()}."
 528            )
 529        return result.stdout
 530
 531    @classmethod
 532    def write_remote_file(cls, host: str, file: Path, content: str) -> None:
 533        """
 534        Write the given content to a file on a remote host, overwriting it if it exists.
 535
 536        The default implementation uses SSH to send the content to the remote file.
 537        This approach may be inefficient on shared storage or high-latency networks.
 538        Note that the timeout for the SSH connection is set to `CFG.timeouts.ssh` seconds.
 539
 540        Subclasses should override this method to provide a more efficient implementation
 541        if possible.
 542
 543        Args:
 544            host (str): The hostname of the remote machine where the file resides.
 545            file (Path): The path to the file on the remote host.
 546            content (str): The content to write to the remote file.
 547
 548        Raises:
 549            QQError: If the file cannot be written or SSH fails.
 550        """
 551
 552        result = subprocess.run(
 553            [
 554                "ssh",
 555                "-o PasswordAuthentication=no",
 556                "-o GSSAPIAuthentication=yes",
 557                "-o StrictHostKeyChecking=no",  # allow unknown hosts
 558                f"-o ConnectTimeout={CFG.timeouts.ssh}",
 559                host,
 560                f"cat > {file}",
 561            ],
 562            input=content,
 563            capture_output=True,
 564            text=True,
 565        )
 566
 567        if result.returncode != 0:
 568            raise QQError(
 569                f"Could not write to remote file '{file}' on '{host}': {result.stderr.strip()}."
 570            )
 571
 572    @classmethod
 573    def make_remote_dir(cls, host: str, directory: Path) -> None:
 574        """
 575        Create a directory at the specified path on a remote host.
 576
 577        The default implementation uses SSH to run `mkdir` on the remote host.
 578        This approach may be inefficient on shared storage or high-latency networks.
 579        Note that the timeout for the SSH connection is set to `CFG.timeouts.ssh` seconds.
 580
 581        Subclasses should override this method to provide a more efficient implementation
 582        if possible.
 583
 584        Args:
 585            host (str): The hostname of the remote machine where the directory should be created.
 586            directory (Path): The path of the directory to create on the remote host.
 587
 588        Raises:
 589            QQError: If the directory cannot be created but does not already exist or the SSH command fails.
 590        """
 591        result = subprocess.run(
 592            [
 593                "ssh",
 594                "-o PasswordAuthentication=no",
 595                "-o GSSAPIAuthentication=yes",
 596                "-o StrictHostKeyChecking=no",  # allow unknown hosts
 597                f"-o ConnectTimeout={CFG.timeouts.ssh}",
 598                host,
 599                # ignore an error if the directory already exists
 600                f"mkdir {directory} 2>/dev/null || [ -d {directory} ]",
 601            ],
 602            capture_output=True,
 603            text=True,
 604        )
 605
 606        if result.returncode != 0:
 607            raise QQError(
 608                f"Could not make remote directory '{directory}' on '{host}': {result.stderr.strip()}."
 609            )
 610
 611    @classmethod
 612    def list_remote_dir(cls, host: str, directory: Path) -> list[Path]:
 613        """
 614        List all files and directories (absolute paths) in the specified directory on a remote host.
 615
 616        The default implementation uses SSH to run `ls -A` on the remote host.
 617        This approach may be inefficient on shared storage or high-latency networks.
 618        Note that the timeout for the SSH connection is set to `CFG.timeouts.ssh` seconds.
 619
 620        Subclasses should override this method to provide a more efficient implementation
 621        if possible.
 622
 623        Args:
 624            host (str): The hostname of the remote machine where the directory resides.
 625            directory (Path): The remote directory to list.
 626
 627        Returns:
 628            list[Path]: A list of `Path` objects representing the entries inside the directory.
 629                        Entries are relative to the given `directory`.
 630
 631        Raises:
 632            QQError: If the directory cannot be listed or the SSH command fails.
 633        """
 634        result = subprocess.run(
 635            [
 636                "ssh",
 637                "-o PasswordAuthentication=no",
 638                "-o GSSAPIAuthentication=yes",
 639                "-o StrictHostKeyChecking=no",  # allow unknown hosts
 640                f"-o ConnectTimeout={CFG.timeouts.ssh}",
 641                host,
 642                f"ls -A {directory}",
 643            ],
 644            capture_output=True,
 645            text=True,
 646        )
 647
 648        if result.returncode != 0:
 649            raise QQError(
 650                f"Could not list remote directory '{directory}' on '{host}': {result.stderr.strip()}."
 651            )
 652
 653        # split by newline and filter out empty lines
 654        return [
 655            logical_resolve(Path(directory) / line)
 656            for line in result.stdout.splitlines()
 657            if line.strip()
 658        ]
 659
 660    @classmethod
 661    def delete_remote_dir(cls, host: str, directory: Path) -> None:
 662        """
 663        Delete a directory on a remote host.
 664
 665        The default implementation uses SSH to run `rm -r` on the remote host.
 666        This approach may be inefficient on shared storage or high-latency networks.
 667        Note that the timeout for the SSH connection is set to `CFG.timeouts.ssh` seconds.
 668
 669        Subclasses should override this method to provide a more efficient implementation
 670        if possible.
 671
 672        Args:
 673            host (str): The hostname of the remote machine where the directory resides.
 674            directory (Path): The remote directory to delete.
 675
 676        Raises:
 677            QQError: If the directory cannot be deleted or the SSH command fails.
 678        """
 679        result = subprocess.run(
 680            [
 681                "ssh",
 682                "-o PasswordAuthentication=no",
 683                "-o GSSAPIAuthentication=yes",
 684                "-o StrictHostKeyChecking=no",  # allow unknown hosts
 685                f"-o ConnectTimeout={CFG.timeouts.ssh}",
 686                host,
 687                f"yes | rm -r {directory}",
 688            ],
 689            capture_output=True,
 690            text=True,
 691        )
 692
 693        if result.returncode != 0:
 694            raise QQError(
 695                f"Could not delete remote directory '{directory}' on '{host}': {result.stderr.strip()}."
 696            )
 697
 698    @classmethod
 699    def move_remote_files(
 700        cls, host: str, files: list[Path], moved_files: list[Path]
 701    ) -> None:
 702        """
 703        Move files on a remote host from their current paths to new paths.
 704
 705        The default implementation uses SSH to run a sequence of `mv` commands on the remote host.
 706        This approach may be inefficient on shared storage or high-latency networks.
 707        Note that the timeout for the SSH connection is set to `CFG.timeouts.ssh` seconds.
 708
 709        Subclasses should override this method to provide a more efficient implementation
 710        if possible.
 711
 712        Args:
 713            host (str): The hostname of the remote machine where the files reside.
 714            files (list[Path]): A list of source file paths on the remote host.
 715            moved_files (list[Path]): A list of destination file paths on the remote host.
 716                                    Must be the same length as `files`.
 717
 718        Raises:
 719            QQError: If the SSH command fails, the files cannot be moved or
 720                    the length of `files` does not match the length of `moved_files`.
 721        """
 722        mv_command = cls._translate_move_command(files, moved_files)
 723
 724        result = subprocess.run(
 725            [
 726                "ssh",
 727                "-o PasswordAuthentication=no",
 728                "-o GSSAPIAuthentication=yes",
 729                "-o StrictHostKeyChecking=no",  # allow unknown hosts
 730                f"-o ConnectTimeout={CFG.timeouts.ssh}",
 731                host,
 732                mv_command,
 733            ],
 734            capture_output=True,
 735            text=True,
 736        )
 737
 738        if result.returncode != 0:
 739            raise QQError(
 740                f"Could not move files on a remote host '{host}': {result.stderr.strip()}."
 741            )
 742
 743    @classmethod
 744    def sync_with_exclusions(
 745        cls,
 746        src_dir: Path,
 747        dest_dir: Path,
 748        src_host: str | None,
 749        dest_host: str | None,
 750        exclude_files: list[Path] | None = None,
 751    ) -> None:
 752        """
 753        Synchronize the contents of two directories using rsync, optionally across remote hosts,
 754        while excluding specified files or subdirectories.
 755
 756        All files and directories in `src_dir` are copied to `dest_dir` except
 757        those listed in `exclude_files`. Files are never removed from the destination.
 758
 759        Args:
 760            src_dir (Path): Source directory to sync from.
 761            dest_dir (Path): Destination directory to sync to.
 762            src_host (str | None): Optional hostname of the source machine if remote;
 763                None if the source is local.
 764            dest_host (str | None): Optional hostname of the destination machine if remote;
 765                None if the destination is local.
 766            exclude_files (list[Path] | None): Optional list of absolute file paths to exclude from syncing.
 767                These will be converted to paths relative to `src_dir`.
 768
 769        Raises:
 770            QQError: If the rsync command fails for any reason or timeouts.
 771        """
 772        # convert absolute paths of files to exclude into relative to src_dir
 773        relative_excluded = (
 774            convert_absolute_to_relative(exclude_files, src_dir)
 775            if exclude_files
 776            else []
 777        )
 778
 779        command = cls._translate_rsync_excluded_command(
 780            src_dir, dest_dir, src_host, dest_host, relative_excluded
 781        )
 782        logger.debug(f"Rsync command: {command}.")
 783
 784        cls._run_rsync(src_dir, dest_dir, src_host, dest_host, command)
 785
 786    @classmethod
 787    def sync_selected(
 788        cls,
 789        src_dir: Path,
 790        dest_dir: Path,
 791        src_host: str | None,
 792        dest_host: str | None,
 793        include_files: list[Path] | None = None,
 794    ) -> None:
 795        """
 796        Synchronize only the explicitly selected files and directories from the source
 797        to the destination, optionally across remote hosts.
 798
 799        Only files listed in `include_files` are copied from `src_dir` to `dest_dir`.
 800        Files not listed are ignored. Files are never removed from the destination.
 801
 802        Args:
 803            src_dir (Path): Source directory to sync from.
 804            dest_dir (Path): Destination directory to sync to.
 805            src_host (str | None): Optional hostname of the source machine if remote;
 806                None if the source is local.
 807            dest_host (str | None): Optional hostname of the destination machine if remote;
 808                None if the destination is local.
 809            include_files (list[Path] | None): Optional list of absolute file paths to include in syncing.
 810                These paths are converted relative to `src_dir`.
 811                This argument is optional only for consistency with sync_with_exclusions.
 812
 813        Raises:
 814            QQError: If the rsync command fails or times out.
 815        """
 816        # convert absolute paths of files to include relative to src_dir
 817        relative_included = (
 818            convert_absolute_to_relative(include_files, src_dir)
 819            if include_files
 820            else []
 821        )
 822
 823        command = cls._translate_rsync_included_command(
 824            src_dir, dest_dir, src_host, dest_host, relative_included
 825        )
 826        logger.debug(f"Rsync command: {command}.")
 827
 828        cls._run_rsync(src_dir, dest_dir, src_host, dest_host, command)
 829
 830    @classmethod
 831    def transform_resources(
 832        cls, queue: str, server: str | None, provided_resources: Resources
 833    ) -> Resources:
 834        """
 835        Transform user-provided Resources into a batch system-specific Resources instance.
 836
 837        This method takes the resources provided during submission and returns a new
 838        Resources object with any necessary modifications or defaults applied for
 839        the target batch system. The original `provided_resources` object is not modified.
 840
 841        Args:
 842            queue (str): The name of the queue for which the resources are being adapted.
 843            server (str | None): Name of the server on which the queue is located.
 844                If `None`, the queue is treated as being located on the current server.
 845            provided_resources (Resources): The raw resources specified by the user.
 846
 847        Returns:
 848            Resources: A new Resources instance with batch system-specific adjustments,
 849                        fully constructed and validated.
 850
 851        Raises:
 852            QQError: If any of the provided parameters are invalid or inconsistent.
 853        """
 854        raise NotImplementedError(
 855            f"transform_resources method is not implemented for {cls.__name__}"
 856        )
 857
 858    @classmethod
 859    def is_shared(cls, directory: Path) -> bool:
 860        """
 861        Determine whether a given directory resides on a shared filesystem.
 862
 863        Args:
 864            directory (Path): The directory to check.
 865
 866        Returns:
 867            bool: True if the directory is on a shared filesystem, False if it is local.
 868        """
 869        # df -l exits with zero if the filesystem is local; otherwise it exits with a non-zero code
 870        result = subprocess.run(
 871            ["df", "-l", directory],
 872            stdout=subprocess.DEVNULL,
 873            stderr=subprocess.DEVNULL,
 874        )
 875
 876        return result.returncode != 0
 877
 878    @classmethod
 879    def get_default_resubmit_hosts(cls) -> list[ResubmitHost]:
 880        """
 881        Get the default job resubmission hosts for this batch system.
 882
 883        In the default implementation, resubmission from the input machine is attempted.
 884
 885        Returns:
 886            list[ResubmitHost]: A list of resubmission hosts.
 887        """
 888        return [InputHost()]
 889
 890    @classmethod
 891    def sort_jobs(cls, jobs: list[TBatchJob]) -> None:
 892        """
 893        Sort a list of batch system jobs by a defined attribute.
 894
 895        The default implementation sorts the jobs alphabetically by their job ID,
 896        as returned by `job.get_id()`. Subclasses may override this method to
 897        implement custom sorting logic.
 898
 899        Args:
 900            jobs (list[BatchJobInterface]): A list of batch job objects to be sorted
 901                in-place.
 902        """
 903        jobs.sort(key=lambda job: job.get_id())
 904
 905    @classmethod
 906    def jobs_presenter_columns_to_show(cls) -> set[str]:
 907        """
 908        Get a set of columns that should be shown in the output of JobsPresenter (`qq jobs`)
 909        for this batch system.
 910
 911        In the default implementation, all columns are shown.
 912
 913        Note that the 'Exit' column is not shown when printing queued and running jobs,
 914        even if you specify it here.
 915
 916        Args:
 917            set[str]: Set of column titles that should be shown.
 918        """
 919        return {
 920            "S",
 921            "Job ID",
 922            "User",
 923            "Job Name",
 924            "Queue",
 925            "NCPUs",
 926            "NGPUs",
 927            "NNodes",
 928            "Times",
 929            "Node",
 930            "%CPU",
 931            "%Mem",
 932            "Exit",
 933        }
 934
 935    @classmethod
 936    def _translate_ssh_command(cls, host: str, directory: Path) -> list[str]:
 937        """
 938        Construct the SSH command to navigate to a remote directory.
 939
 940        This is an internal method of `BatchInterface`; you typically should not override it.
 941
 942        Args:
 943            host (str): The hostname of the remote machine.
 944            directory (Path): The target directory to navigate to.
 945
 946        Returns:
 947            list[str]: SSH command as a list suitable for subprocess execution.
 948        """
 949        return [
 950            "ssh",
 951            "-o PasswordAuthentication=no",  # never ask for password
 952            "-o GSSAPIAuthentication=yes",  # allow Kerberos tickets
 953            "-o StrictHostKeyChecking=no",  # allow unknown hosts
 954            f"-o ConnectTimeout={CFG.timeouts.ssh}",
 955            host,
 956            "-t",
 957            f"cd {directory} || exit {cls._CD_FAIL} && exec bash -l",
 958        ]
 959
 960    @classmethod
 961    def _navigate_same_host(cls, directory: Path) -> None:
 962        """
 963        Navigate to a directory on the current host using a subprocess.
 964
 965        This is an internal method of `BatchInterface`; you typically should not override it.
 966
 967        Args:
 968            directory (Path): Directory to navigate to.
 969        """
 970        logger.debug("Current host is the same as target host. Using 'cd'.")
 971        if not directory.is_dir():
 972            raise QQError(
 973                f"Could not reach '{socket.getfqdn()}:{str(directory)}': Could not change directory."
 974            )
 975
 976        subprocess.run(["bash"], cwd=directory)
 977
 978        # if the directory exists, always report success,
 979        # no matter what the user does inside the terminal
 980
 981    @classmethod
 982    def _translate_move_command(cls, files: list[Path], moved_files: list[Path]) -> str:
 983        """
 984        Translate lists of source and destination file paths into a single shell
 985        command string for moving the files.
 986
 987        This is an internal method of `BatchInterface`; you typically should not override it.
 988
 989        Args:
 990            files (list[Path]): A list of source file paths to be moved.
 991            moved_files (list[Path]): A list of destination file paths of the same
 992                length as `files`.
 993
 994        Returns:
 995            str: A single shell command string consisting of `mv` commands joined
 996            with `&&`.
 997
 998        Raises:
 999            QQError: If `files` and `moved_files` do not have the same length.
1000        """
1001        if len(files) != len(moved_files):
1002            raise QQError(
1003                "The provided 'files' and 'moved_files' must have the same length."
1004            )
1005
1006        mv_commands: list[str] = []
1007        for src, dst in zip(files, moved_files):
1008            mv_commands.append(f"mv '{src}' '{dst}'")
1009
1010        return " && ".join(mv_commands)
1011
1012    @classmethod
1013    def _translate_rsync_excluded_command(
1014        cls,
1015        src_dir: Path,
1016        dest_dir: Path,
1017        src_host: str | None,
1018        dest_host: str | None,
1019        relative_excluded: list[Path],
1020    ) -> list[str]:
1021        """
1022        Build an rsync command to synchronize a directory while excluding specific files.
1023
1024        Both `src_host` and `dest_host` should not be set simultaneously,
1025        otherwise the resulting rsync command will be invalid.
1026
1027        This is an internal method of `BatchInterface`; you typically should not override it.
1028
1029        Args:
1030            src_dir (Path): Source directory path.
1031            dest_dir (Path): Destination directory path.
1032            src_host (str | None): Hostname of the source machine if remote;
1033                None if the source is local.
1034            dest_host (str | None): Hostname of the destination machine if remote;
1035                None if the destination is local.
1036            relative_excluded (list[Path]): List of paths relative to `src_dir`
1037                to exclude from syncing.
1038
1039        Returns:
1040            list[str]: List of command arguments for rsync, suitable for `subprocess.run`.
1041        """
1042        # syncing recursively (-r), symlinks copied as symlinks (-l),
1043        # preserving times (-t), preserving device/special files (-D),
1044        # but not preserving owners and groups
1045        # not using --checksum nor --ignore-times for performance reasons
1046        # some files may potentially not be correctly synced if they were
1047        # modified in both src_dir and dest_dir at the same time and have
1048        # the same size -> this should be so extremely rare that we do not care
1049        command = [
1050            "rsync",
1051            "-e",
1052            # allow Kerberos tickets, never ask for password, allow unknown hosts
1053            "ssh -o GSSAPIAuthentication=yes -o PasswordAuthentication=no -o StrictHostKeyChecking=no",
1054            "-rltD",
1055        ]
1056        for file in relative_excluded:
1057            command.extend(["--exclude", str(file)])
1058
1059        src = src_host + ":" + str(src_dir) + "/" if src_host else str(src_dir) + "/"
1060        dest = dest_host + ":" + str(dest_dir) if dest_host else str(dest_dir)
1061        command.extend([src, dest])
1062
1063        return command
1064
1065    @classmethod
1066    def _translate_rsync_included_command(
1067        cls,
1068        src_dir: Path,
1069        dest_dir: Path,
1070        src_host: str | None,
1071        dest_host: str | None,
1072        relative_included: list[Path],
1073    ) -> list[str]:
1074        """
1075        Build an rsync command to synchronize only the explicitly included files.
1076
1077        Both `src_host` and `dest_host` should not be set simultaneously,
1078        otherwise the resulting rsync command will be invalid.
1079
1080        This is an internal method of `BatchInterface`; you typically should not override it.
1081
1082        Args:
1083            src_dir (Path): Source directory path.
1084            dest_dir (Path): Destination directory path.
1085            src_host (str | None): Hostname of the source machine if remote;
1086                None if the source is local.
1087            dest_host (str | None): Hostname of the destination machine if remote;
1088                None if the destination is local.
1089            relative_included (list[Path]): List of paths relative to `src_dir`
1090                that should be included in the sync.
1091
1092        Returns:
1093            list[str]: List of command arguments for rsync, suitable for `subprocess.run`.
1094        """
1095
1096        command = [
1097            "rsync",
1098            "-e",
1099            # allow Kerberos tickets, never ask for password, allow unknown hosts
1100            "ssh -o GSSAPIAuthentication=yes -o PasswordAuthentication=no -o StrictHostKeyChecking=no",
1101            "-rltD",
1102        ]
1103        for file in relative_included:
1104            # if `file` is a file
1105            command.extend(["--include", str(file)])
1106            # if `file` is a directory
1107            # it's okay to include both patterns - if it is invalid, it's ignored
1108            command.extend(["--include", f"{str(file)}/***"])
1109        # exclude all files not specifically included
1110        command.extend(["--exclude", "*"])
1111
1112        src = src_host + ":" + str(src_dir) + "/" if src_host else str(src_dir) + "/"
1113        dest = dest_host + ":" + str(dest_dir) if dest_host else str(dest_dir)
1114        command.extend([src, dest])
1115
1116        return command
1117
1118    @classmethod
1119    def _run_rsync(
1120        cls,
1121        src_dir: Path,
1122        dest_dir: Path,
1123        src_host: str | None,
1124        dest_host: str | None,
1125        command: list[str],
1126    ) -> None:
1127        """
1128        Execute an rsync command to synchronize files between source and destination.
1129
1130        This is an internal method of `BatchInterface`; you typically should not override it.
1131
1132        Args:
1133            src_dir (Path): Source directory path.
1134            dest_dir (Path): Destination directory path.
1135            src_host (str | None): Optional hostname of the source machine if remote;
1136                None if the source is local.
1137            dest_host (str | None): Optional hostname of the destination machine if remote;
1138                None if the destination is local.
1139            command (list[str]): List of command-line arguments for rsync, typically
1140                generated by `_translate_rsync_excluded_command` or `_translate_rsync_included_command`.
1141
1142        Raises:
1143            QQError: If the rsync command fails (non-zero exit code) or
1144                if the command times out after `CFG.timeouts.rsync` seconds.
1145        """
1146        src = f"{src_host}:{str(src_dir)}" if src_host else str(src_dir)
1147        dest = f"{dest_host}:{str(dest_dir)}" if dest_host else str(dest_dir)
1148
1149        try:
1150            result = subprocess.run(
1151                command, capture_output=True, text=True, timeout=CFG.timeouts.rsync
1152            )
1153        except subprocess.TimeoutExpired as e:
1154            raise QQError(
1155                f"Could not rsync files between '{src}' and '{dest}': Connection timed out after {CFG.timeouts.rsync} seconds."
1156            ) from e
1157
1158        if result.returncode != 0:
1159            raise QQError(
1160                f"Could not rsync files between '{src}' and '{dest}': {result.stderr.strip()}."
1161            )

Abstract base class for batch system integrations.

Concrete batch system classes must implement these methods to allow qq to interact with different batch systems uniformly.

All functions should raise QQError when encountering an error.

@classmethod
def env_name(cls) -> str:
152    @classmethod
153    def env_name(cls) -> str:
154        """
155        Return the name of the batch system environment.
156
157        Returns:
158            str: The batch system name.
159        """
160        raise NotImplementedError(
161            f"env_name method is not implemented for {cls.__name__}"
162        )

Return the name of the batch system environment.

Returns:

str: The batch system name.

@classmethod
def is_available(cls) -> bool:
164    @classmethod
165    def is_available(cls) -> bool:
166        """
167        Determine whether the batch system is available on the current host.
168
169        Implementations typically verify this by checking for the presence
170        of required commands or other environment-specific indicators.
171
172        Returns:
173            bool: True if the batch system is available, False otherwise.
174        """
175        raise NotImplementedError(
176            f"is_available method is not implemented for {cls.__name__}"
177        )

Determine whether the batch system is available on the current host.

Implementations typically verify this by checking for the presence of required commands or other environment-specific indicators.

Returns:

bool: True if the batch system is available, False otherwise.

@classmethod
def get_job_id(cls) -> str | None:
179    @classmethod
180    def get_job_id(cls) -> str | None:
181        """
182        Get the id of the current job from the corresponding batch system's environment variable.
183
184        For this method to work, it has to be called from the inside of an active job.
185
186        Returns:
187            str | None: Index of the job or None if the collective variable is not set.
188        """
189        raise NotImplementedError(
190            f"get_job_id method is not implemented for {cls.__name__}"
191        )

Get the id of the current job from the corresponding batch system's environment variable.

For this method to work, it has to be called from the inside of an active job.

Returns:

str | None: Index of the job or None if the collective variable is not set.

@classmethod
def create_work_dir_on_scratch(cls, job_id: str) -> pathlib._local.Path:
193    @classmethod
194    def create_work_dir_on_scratch(cls, job_id: str) -> Path:
195        """
196        Create the working directory on scratch for the given job.
197
198        Args:
199            job_id (int): Unique identifier of the job.
200
201        Returns:
202            Path: Absolute path to the working directory on scratch.
203
204        Raises:
205            QQError: If the working directory could not be created.
206        """
207        raise NotImplementedError(
208            f"create_work_dir_on_scratch method is not implemented for {cls.__name__}"
209        )

Create the working directory on scratch for the given job.

Arguments:
  • job_id (int): Unique identifier of the job.
Returns:

Path: Absolute path to the working directory on scratch.

Raises:
  • QQError: If the working directory could not be created.
@classmethod
def job_submit( cls, res: qq_lib.properties.resources.Resources, queue: str, script: pathlib._local.Path, job_name: str, depend: list[qq_lib.properties.depend.Depend], env_vars: dict[str, str], account: str | None = None, server: str | None = None, remote_host: str | None = None) -> str:
211    @classmethod
212    def job_submit(
213        cls,
214        res: Resources,
215        queue: str,
216        script: Path,
217        job_name: str,
218        depend: list[Depend],
219        env_vars: dict[str, str],
220        account: str | None = None,
221        server: str | None = None,
222        remote_host: str | None = None,
223    ) -> str:
224        """
225        Submit a job to the batch system.
226
227        Can also perform additional validation of the job's resources.
228
229        This method is NOT guaranteed to be thread-safe.
230
231        Args:
232            res (Resources): Resources required for the job.
233            queue (str): Target queue for the job submission.
234            script (Path): Path to the script to execute.
235            job_name (str): Name of the job to use.
236            depend (list[Depend]): List of job dependencies.
237            env_vars (dict[str, str]): Dictionary of environment variables to propagate to the job.
238            account (str | None): Optional account name to use for the job.
239            server (str | None): Optional name of the server to submit the job to.
240            remote_host (str | None): Optional name of the machine to submit the job from.
241
242        Returns:
243            str: Unique ID of the submitted job.
244
245        Raises:
246            QQError: If the job submission fails.
247        """
248        raise NotImplementedError(
249            f"job_submit method is not implemented for {cls.__name__}"
250        )

Submit a job to the batch system.

Can also perform additional validation of the job's resources.

This method is NOT guaranteed to be thread-safe.

Arguments:
  • res (Resources): Resources required for the job.
  • queue (str): Target queue for the job submission.
  • script (Path): Path to the script to execute.
  • job_name (str): Name of the job to use.
  • depend (list[Depend]): List of job dependencies.
  • env_vars (dict[str, str]): Dictionary of environment variables to propagate to the job.
  • account (str | None): Optional account name to use for the job.
  • server (str | None): Optional name of the server to submit the job to.
  • remote_host (str | None): Optional name of the machine to submit the job from.
Returns:

str: Unique ID of the submitted job.

Raises:
  • QQError: If the job submission fails.
@classmethod
def job_kill(cls, job_id: str) -> None:
252    @classmethod
253    def job_kill(cls, job_id: str) -> None:
254        """
255        Terminate a job gracefully. The job should have time for proper cleanup.
256
257        Args:
258            job_id (str): Identifier of the job to terminate.
259
260        Raises:
261            QQError: If the job could not be killed.
262        """
263        raise NotImplementedError(
264            f"job_kill method is not implemented for {cls.__name__}"
265        )

Terminate a job gracefully. The job should have time for proper cleanup.

Arguments:
  • job_id (str): Identifier of the job to terminate.
Raises:
  • QQError: If the job could not be killed.
@classmethod
def job_kill_force(cls, job_id: str) -> None:
267    @classmethod
268    def job_kill_force(cls, job_id: str) -> None:
269        """
270        Forcefully terminate a job. There may be no time for proper cleanup.
271
272        Args:
273            job_id (str): Identifier of the job to forcefully terminate.
274
275        Raises:
276            QQError: If the job could not be killed.
277        """
278        raise NotImplementedError(
279            f"job_kill_force method is not implemented for {cls.__name__}"
280        )

Forcefully terminate a job. There may be no time for proper cleanup.

Arguments:
  • job_id (str): Identifier of the job to forcefully terminate.
Raises:
  • QQError: If the job could not be killed.
@classmethod
def get_batch_job(cls, job_id: str) -> 'TBatchJob':
282    @classmethod
283    def get_batch_job(cls, job_id: str) -> TBatchJob:
284        """
285        Retrieve information about a job from the batch system.
286
287        The returned object should be fully initialized, even if the job
288        no longer exists or its information is unavailable.
289
290        Args:
291            job_id (str): Identifier of the job.
292
293        Returns:
294            BatchJobInterface: Object containing the job's metadata and state.
295        """
296        raise NotImplementedError(
297            f"get_batch_job method is not implemented for {cls.__name__}"
298        )

Retrieve information about a job from the batch system.

The returned object should be fully initialized, even if the job no longer exists or its information is unavailable.

Arguments:
  • job_id (str): Identifier of the job.
Returns:

BatchJobInterface: Object containing the job's metadata and state.

@classmethod
def get_batch_jobs_from_ids(cls, job_ids: list[str]) -> 'list[TBatchJob]':
300    @classmethod
301    def get_batch_jobs_from_ids(cls, job_ids: list[str]) -> list[TBatchJob]:
302        """
303        Retrieve information about multiple jobs from the batch system.
304
305        Batch jobs should be returned in the same order as they appear in `job_ids`.
306        A TBatchJob object should be returned for each job id, even if the job
307        no longer exists or its information is unavailable.
308
309        Array jobs should NOT be expanded into their individual tasks.
310
311        The default implementation is to call `get_batch_job` for each job id.
312        This implementation may be inefficient for large numbers of job ids and
313        should be overriden by subclasses.
314
315        Args:
316            job_ids (list[str]): List of job identifiers.
317
318        Returns:
319            list[TBatchJob]: List of TBatchJob objects, one for each job id.
320        """
321        return [cls.get_batch_job(id) for id in job_ids]

Retrieve information about multiple jobs from the batch system.

Batch jobs should be returned in the same order as they appear in job_ids. A TBatchJob object should be returned for each job id, even if the job no longer exists or its information is unavailable.

Array jobs should NOT be expanded into their individual tasks.

The default implementation is to call get_batch_job for each job id. This implementation may be inefficient for large numbers of job ids and should be overriden by subclasses.

Arguments:
  • job_ids (list[str]): List of job identifiers.
Returns:

list[TBatchJob]: List of TBatchJob objects, one for each job id.

@classmethod
def get_unfinished_batch_jobs(cls, user: str, server: str | None = None) -> 'list[TBatchJob]':
323    @classmethod
324    def get_unfinished_batch_jobs(
325        cls, user: str, server: str | None = None
326    ) -> list[TBatchJob]:
327        """
328        Retrieve information about all uncompleted jobs submitted by `user`
329        on the specified or default batch server.
330
331        The jobs can be returned in arbitrary order.
332
333        Args:
334            user (str): Username for which to fetch uncompleted jobs.
335            server (str | None): Optional name of the batch server to get jobs from.
336
337        Returns:
338            list[BatchJobInterface]: A list of job info objects representing the user's uncompleted jobs.
339        """
340        raise NotImplementedError(
341            f"get_unfinished_batch_jobs method is not implemented for {cls.__name__}"
342        )

Retrieve information about all uncompleted jobs submitted by user on the specified or default batch server.

The jobs can be returned in arbitrary order.

Arguments:
  • user (str): Username for which to fetch uncompleted jobs.
  • server (str | None): Optional name of the batch server to get jobs from.
Returns:

list[BatchJobInterface]: A list of job info objects representing the user's uncompleted jobs.

@classmethod
def get_batch_jobs(cls, user: str, server: str | None = None) -> 'list[TBatchJob]':
344    @classmethod
345    def get_batch_jobs(cls, user: str, server: str | None = None) -> list[TBatchJob]:
346        """
347        Retrieve information about all jobs submitted by a specific user (including finished jobs)
348        on the specified or default batch server.
349
350        The jobs can be returned in arbitrary order.
351
352        Args:
353            user (str): Username for which to fetch all jobs.
354            server (str | None): Optional name of the batch server to get jobs from.
355
356        Returns:
357            list[BatchJobInterface]: A list of job info objects representing all jobs of the user.
358        """
359        raise NotImplementedError(
360            f"get_batch_jobs method is not implemented for {cls.__name__}"
361        )

Retrieve information about all jobs submitted by a specific user (including finished jobs) on the specified or default batch server.

The jobs can be returned in arbitrary order.

Arguments:
  • user (str): Username for which to fetch all jobs.
  • server (str | None): Optional name of the batch server to get jobs from.
Returns:

list[BatchJobInterface]: A list of job info objects representing all jobs of the user.

@classmethod
def get_all_unfinished_batch_jobs(cls, server: str | None = None) -> 'list[TBatchJob]':
363    @classmethod
364    def get_all_unfinished_batch_jobs(
365        cls, server: str | None = None
366    ) -> list[TBatchJob]:
367        """
368        Retrieve information about uncompleted jobs of all users on the specified or default batch server.
369
370        The jobs can be returned in arbitrary order.
371
372        Args:
373            server (str | None): Optional name of the batch server to get jobs from.
374
375        Returns:
376            list[BatchJobInterface]: A list of job info objects representing uncompleted jobs of all users.
377        """
378        raise NotImplementedError(
379            f"get_all_unfinished_batch_jobs method is not implemented for {cls.__name__}"
380        )

Retrieve information about uncompleted jobs of all users on the specified or default batch server.

The jobs can be returned in arbitrary order.

Arguments:
  • server (str | None): Optional name of the batch server to get jobs from.
Returns:

list[BatchJobInterface]: A list of job info objects representing uncompleted jobs of all users.

@classmethod
def get_all_batch_jobs(cls, server: str | None = None) -> 'list[TBatchJob]':
382    @classmethod
383    def get_all_batch_jobs(cls, server: str | None = None) -> list[TBatchJob]:
384        """
385        Retrieve information about all jobs of all users on the specified or default batch server.
386
387        The jobs can be returned in arbitrary order.
388
389        Args:
390            server (str | None): Optional name of the batch server to get jobs from.
391
392        Returns:
393            list[BatchJobInterface]: A list of job info objects representing all jobs of all users.
394        """
395        raise NotImplementedError(
396            f"get_all_batch_jobs method is not implemented for {cls.__name__}"
397        )

Retrieve information about all jobs of all users on the specified or default batch server.

The jobs can be returned in arbitrary order.

Arguments:
  • server (str | None): Optional name of the batch server to get jobs from.
Returns:

list[BatchJobInterface]: A list of job info objects representing all jobs of all users.

@classmethod
def get_queues(cls, server: str | None = None) -> 'list[TBatchQueue]':
399    @classmethod
400    def get_queues(cls, server: str | None = None) -> list[TBatchQueue]:
401        """
402        Retrieve all queues managed by the batch system on the specified or default batch server.
403
404        Args:
405            server (str | None): Optional name of the batch server to get queues from.
406
407        Returns:
408            list[BatchQueueInterface]: A list of queue objects existing in the batch system.
409        """
410        raise NotImplementedError(
411            f"get_queues method is not implemented for {cls.__name__}"
412        )

Retrieve all queues managed by the batch system on the specified or default batch server.

Arguments:
  • server (str | None): Optional name of the batch server to get queues from.
Returns:

list[BatchQueueInterface]: A list of queue objects existing in the batch system.

@classmethod
def get_nodes(cls, server: str | None = None) -> 'list[TBatchNode]':
414    @classmethod
415    def get_nodes(cls, server: str | None = None) -> list[TBatchNode]:
416        """
417        Retrieve all nodes managed by the batch system on the specified or default batch server.
418
419        Args:
420            server (str | None): Optional name of the batch server to get nodes from.
421
422        Returns:
423            list[BatchNodeInterface]: A list of node objects existing in the batch system.
424        """
425        raise NotImplementedError(
426            f"get_nodes method is not implemented for {cls.__name__}"
427        )

Retrieve all nodes managed by the batch system on the specified or default batch server.

Arguments:
  • server (str | None): Optional name of the batch server to get nodes from.
Returns:

list[BatchNodeInterface]: A list of node objects existing in the batch system.

@classmethod
def get_supported_work_dir_types(cls) -> list[str]:
429    @classmethod
430    def get_supported_work_dir_types(cls) -> list[str]:
431        """
432        Retrieve the list of supported types of working directories
433        (i.e., strings that can be used with the `--work-dir` option).
434
435        Returns:
436            list[str]: A list of supported types of working directories.
437        """
438        raise NotImplementedError(
439            f"get_supported_work_dir_types method is not implemented for {cls.__name__}"
440        )

Retrieve the list of supported types of working directories (i.e., strings that can be used with the --work-dir option).

Returns:

list[str]: A list of supported types of working directories.

@classmethod
def navigate_to_destination(cls, host: str, directory: pathlib._local.Path) -> None:
442    @classmethod
443    def navigate_to_destination(cls, host: str, directory: Path) -> None:
444        """
445        Open a new terminal on the specified host and change the working directory
446        to the given path, handing control over to the user.
447
448        Default behavior:
449            - If the target host is different from the current host, SSH is used
450            to connect and `cd` is executed to switch to the directory.
451            Note that the timeout for the SSH connection is set to `CFG.timeouts.ssh` seconds.
452            - If the target host matches the current host, only `cd` is used.
453
454        A new terminal should always be opened, regardless of the host.
455
456        Args:
457            host (str): Hostname where the directory is located.
458            directory (Path): Directory path to navigate to.
459
460        Raises:
461            QQError: If the navigation fails.
462        """
463        # if the directory is on the current host, we do not need to use ssh
464        if host == socket.getfqdn():
465            cls._navigate_same_host(directory)
466            return
467
468        # the directory is on an another node
469        ssh_command = cls._translate_ssh_command(host, directory)
470        logger.debug(f"Using ssh: '{' '.join(ssh_command)}'")
471        result = subprocess.run(ssh_command)
472
473        # the subprocess exit code can come from:
474        # - SSH itself failing - returns _SSH_FAIL
475        # - the explicit exit code we set if 'cd' to the directory fails - returns _CD_FAIL
476        # - the exit code of the last command the user runs in the interactive shell
477        #
478        # we ignore user exit codes entirely and only treat _SSH_FAIL and _CD_FAIL as errors
479        if result.returncode == cls._SSH_FAIL:
480            raise QQError(
481                f"Could not reach '{host}:{str(directory)}': Could not connect to host."
482            )
483        if result.returncode == cls._CD_FAIL:
484            raise QQError(
485                f"Could not reach '{host}:{str(directory)}': Could not change directory."
486            )

Open a new terminal on the specified host and change the working directory to the given path, handing control over to the user.

Default behavior:
  • If the target host is different from the current host, SSH is used to connect and cd is executed to switch to the directory. Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.
  • If the target host matches the current host, only cd is used.

A new terminal should always be opened, regardless of the host.

Arguments:
  • host (str): Hostname where the directory is located.
  • directory (Path): Directory path to navigate to.
Raises:
  • QQError: If the navigation fails.
@classmethod
def read_remote_file(cls, host: str, file: pathlib._local.Path) -> str:
488    @classmethod
489    def read_remote_file(cls, host: str, file: Path) -> str:
490        """
491        Read the contents of a file on a remote host and return it as a string.
492
493        The default implementation uses SSH to retrieve the file contents.
494        This approach may be inefficient on shared storage or high-latency networks.
495        Note that the timeout for the SSH connection is set to `CFG.timeouts.ssh` seconds.
496
497        Subclasses should override this method to provide a more efficient implementation
498        if possible.
499
500        Args:
501            host (str): The hostname of the remote machine where the file resides.
502            file (Path): The path to the file on the remote host.
503
504        Returns:
505            str: The contents of the remote file.
506
507        Raises:
508            QQError: If the file cannot be read or SSH fails.
509        """
510        result = subprocess.run(
511            [
512                "ssh",
513                "-o PasswordAuthentication=no",
514                "-o GSSAPIAuthentication=yes",
515                "-o StrictHostKeyChecking=no",  # allow unknown hosts
516                f"-o ConnectTimeout={CFG.timeouts.ssh}",
517                "-q",  # suppress some SSH messages
518                host,
519                f"cat {file}",
520            ],
521            capture_output=True,
522            text=True,
523        )
524
525        if result.returncode != 0:
526            raise QQError(
527                f"Could not read remote file '{file}' on '{host}': {result.stderr.strip()}."
528            )
529        return result.stdout

Read the contents of a file on a remote host and return it as a string.

The default implementation uses SSH to retrieve the file contents. This approach may be inefficient on shared storage or high-latency networks. Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.

Subclasses should override this method to provide a more efficient implementation if possible.

Arguments:
  • host (str): The hostname of the remote machine where the file resides.
  • file (Path): The path to the file on the remote host.
Returns:

str: The contents of the remote file.

Raises:
  • QQError: If the file cannot be read or SSH fails.
@classmethod
def write_remote_file(cls, host: str, file: pathlib._local.Path, content: str) -> None:
531    @classmethod
532    def write_remote_file(cls, host: str, file: Path, content: str) -> None:
533        """
534        Write the given content to a file on a remote host, overwriting it if it exists.
535
536        The default implementation uses SSH to send the content to the remote file.
537        This approach may be inefficient on shared storage or high-latency networks.
538        Note that the timeout for the SSH connection is set to `CFG.timeouts.ssh` seconds.
539
540        Subclasses should override this method to provide a more efficient implementation
541        if possible.
542
543        Args:
544            host (str): The hostname of the remote machine where the file resides.
545            file (Path): The path to the file on the remote host.
546            content (str): The content to write to the remote file.
547
548        Raises:
549            QQError: If the file cannot be written or SSH fails.
550        """
551
552        result = subprocess.run(
553            [
554                "ssh",
555                "-o PasswordAuthentication=no",
556                "-o GSSAPIAuthentication=yes",
557                "-o StrictHostKeyChecking=no",  # allow unknown hosts
558                f"-o ConnectTimeout={CFG.timeouts.ssh}",
559                host,
560                f"cat > {file}",
561            ],
562            input=content,
563            capture_output=True,
564            text=True,
565        )
566
567        if result.returncode != 0:
568            raise QQError(
569                f"Could not write to remote file '{file}' on '{host}': {result.stderr.strip()}."
570            )

Write the given content to a file on a remote host, overwriting it if it exists.

The default implementation uses SSH to send the content to the remote file. This approach may be inefficient on shared storage or high-latency networks. Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.

Subclasses should override this method to provide a more efficient implementation if possible.

Arguments:
  • host (str): The hostname of the remote machine where the file resides.
  • file (Path): The path to the file on the remote host.
  • content (str): The content to write to the remote file.
Raises:
  • QQError: If the file cannot be written or SSH fails.
@classmethod
def make_remote_dir(cls, host: str, directory: pathlib._local.Path) -> None:
572    @classmethod
573    def make_remote_dir(cls, host: str, directory: Path) -> None:
574        """
575        Create a directory at the specified path on a remote host.
576
577        The default implementation uses SSH to run `mkdir` on the remote host.
578        This approach may be inefficient on shared storage or high-latency networks.
579        Note that the timeout for the SSH connection is set to `CFG.timeouts.ssh` seconds.
580
581        Subclasses should override this method to provide a more efficient implementation
582        if possible.
583
584        Args:
585            host (str): The hostname of the remote machine where the directory should be created.
586            directory (Path): The path of the directory to create on the remote host.
587
588        Raises:
589            QQError: If the directory cannot be created but does not already exist or the SSH command fails.
590        """
591        result = subprocess.run(
592            [
593                "ssh",
594                "-o PasswordAuthentication=no",
595                "-o GSSAPIAuthentication=yes",
596                "-o StrictHostKeyChecking=no",  # allow unknown hosts
597                f"-o ConnectTimeout={CFG.timeouts.ssh}",
598                host,
599                # ignore an error if the directory already exists
600                f"mkdir {directory} 2>/dev/null || [ -d {directory} ]",
601            ],
602            capture_output=True,
603            text=True,
604        )
605
606        if result.returncode != 0:
607            raise QQError(
608                f"Could not make remote directory '{directory}' on '{host}': {result.stderr.strip()}."
609            )

Create a directory at the specified path on a remote host.

The default implementation uses SSH to run mkdir on the remote host. This approach may be inefficient on shared storage or high-latency networks. Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.

Subclasses should override this method to provide a more efficient implementation if possible.

Arguments:
  • host (str): The hostname of the remote machine where the directory should be created.
  • directory (Path): The path of the directory to create on the remote host.
Raises:
  • QQError: If the directory cannot be created but does not already exist or the SSH command fails.
@classmethod
def list_remote_dir( cls, host: str, directory: pathlib._local.Path) -> list[pathlib._local.Path]:
611    @classmethod
612    def list_remote_dir(cls, host: str, directory: Path) -> list[Path]:
613        """
614        List all files and directories (absolute paths) in the specified directory on a remote host.
615
616        The default implementation uses SSH to run `ls -A` on the remote host.
617        This approach may be inefficient on shared storage or high-latency networks.
618        Note that the timeout for the SSH connection is set to `CFG.timeouts.ssh` seconds.
619
620        Subclasses should override this method to provide a more efficient implementation
621        if possible.
622
623        Args:
624            host (str): The hostname of the remote machine where the directory resides.
625            directory (Path): The remote directory to list.
626
627        Returns:
628            list[Path]: A list of `Path` objects representing the entries inside the directory.
629                        Entries are relative to the given `directory`.
630
631        Raises:
632            QQError: If the directory cannot be listed or the SSH command fails.
633        """
634        result = subprocess.run(
635            [
636                "ssh",
637                "-o PasswordAuthentication=no",
638                "-o GSSAPIAuthentication=yes",
639                "-o StrictHostKeyChecking=no",  # allow unknown hosts
640                f"-o ConnectTimeout={CFG.timeouts.ssh}",
641                host,
642                f"ls -A {directory}",
643            ],
644            capture_output=True,
645            text=True,
646        )
647
648        if result.returncode != 0:
649            raise QQError(
650                f"Could not list remote directory '{directory}' on '{host}': {result.stderr.strip()}."
651            )
652
653        # split by newline and filter out empty lines
654        return [
655            logical_resolve(Path(directory) / line)
656            for line in result.stdout.splitlines()
657            if line.strip()
658        ]

List all files and directories (absolute paths) in the specified directory on a remote host.

The default implementation uses SSH to run ls -A on the remote host. This approach may be inefficient on shared storage or high-latency networks. Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.

Subclasses should override this method to provide a more efficient implementation if possible.

Arguments:
  • host (str): The hostname of the remote machine where the directory resides.
  • directory (Path): The remote directory to list.
Returns:

list[Path]: A list of Path objects representing the entries inside the directory. Entries are relative to the given directory.

Raises:
  • QQError: If the directory cannot be listed or the SSH command fails.
@classmethod
def delete_remote_dir(cls, host: str, directory: pathlib._local.Path) -> None:
660    @classmethod
661    def delete_remote_dir(cls, host: str, directory: Path) -> None:
662        """
663        Delete a directory on a remote host.
664
665        The default implementation uses SSH to run `rm -r` on the remote host.
666        This approach may be inefficient on shared storage or high-latency networks.
667        Note that the timeout for the SSH connection is set to `CFG.timeouts.ssh` seconds.
668
669        Subclasses should override this method to provide a more efficient implementation
670        if possible.
671
672        Args:
673            host (str): The hostname of the remote machine where the directory resides.
674            directory (Path): The remote directory to delete.
675
676        Raises:
677            QQError: If the directory cannot be deleted or the SSH command fails.
678        """
679        result = subprocess.run(
680            [
681                "ssh",
682                "-o PasswordAuthentication=no",
683                "-o GSSAPIAuthentication=yes",
684                "-o StrictHostKeyChecking=no",  # allow unknown hosts
685                f"-o ConnectTimeout={CFG.timeouts.ssh}",
686                host,
687                f"yes | rm -r {directory}",
688            ],
689            capture_output=True,
690            text=True,
691        )
692
693        if result.returncode != 0:
694            raise QQError(
695                f"Could not delete remote directory '{directory}' on '{host}': {result.stderr.strip()}."
696            )

Delete a directory on a remote host.

The default implementation uses SSH to run rm -r on the remote host. This approach may be inefficient on shared storage or high-latency networks. Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.

Subclasses should override this method to provide a more efficient implementation if possible.

Arguments:
  • host (str): The hostname of the remote machine where the directory resides.
  • directory (Path): The remote directory to delete.
Raises:
  • QQError: If the directory cannot be deleted or the SSH command fails.
@classmethod
def move_remote_files( cls, host: str, files: list[pathlib._local.Path], moved_files: list[pathlib._local.Path]) -> None:
698    @classmethod
699    def move_remote_files(
700        cls, host: str, files: list[Path], moved_files: list[Path]
701    ) -> None:
702        """
703        Move files on a remote host from their current paths to new paths.
704
705        The default implementation uses SSH to run a sequence of `mv` commands on the remote host.
706        This approach may be inefficient on shared storage or high-latency networks.
707        Note that the timeout for the SSH connection is set to `CFG.timeouts.ssh` seconds.
708
709        Subclasses should override this method to provide a more efficient implementation
710        if possible.
711
712        Args:
713            host (str): The hostname of the remote machine where the files reside.
714            files (list[Path]): A list of source file paths on the remote host.
715            moved_files (list[Path]): A list of destination file paths on the remote host.
716                                    Must be the same length as `files`.
717
718        Raises:
719            QQError: If the SSH command fails, the files cannot be moved or
720                    the length of `files` does not match the length of `moved_files`.
721        """
722        mv_command = cls._translate_move_command(files, moved_files)
723
724        result = subprocess.run(
725            [
726                "ssh",
727                "-o PasswordAuthentication=no",
728                "-o GSSAPIAuthentication=yes",
729                "-o StrictHostKeyChecking=no",  # allow unknown hosts
730                f"-o ConnectTimeout={CFG.timeouts.ssh}",
731                host,
732                mv_command,
733            ],
734            capture_output=True,
735            text=True,
736        )
737
738        if result.returncode != 0:
739            raise QQError(
740                f"Could not move files on a remote host '{host}': {result.stderr.strip()}."
741            )

Move files on a remote host from their current paths to new paths.

The default implementation uses SSH to run a sequence of mv commands on the remote host. This approach may be inefficient on shared storage or high-latency networks. Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.

Subclasses should override this method to provide a more efficient implementation if possible.

Arguments:
  • host (str): The hostname of the remote machine where the files reside.
  • files (list[Path]): A list of source file paths on the remote host.
  • moved_files (list[Path]): A list of destination file paths on the remote host. Must be the same length as files.
Raises:
  • QQError: If the SSH command fails, the files cannot be moved or the length of files does not match the length of moved_files.
@classmethod
def sync_with_exclusions( cls, src_dir: pathlib._local.Path, dest_dir: pathlib._local.Path, src_host: str | None, dest_host: str | None, exclude_files: list[pathlib._local.Path] | None = None) -> None:
743    @classmethod
744    def sync_with_exclusions(
745        cls,
746        src_dir: Path,
747        dest_dir: Path,
748        src_host: str | None,
749        dest_host: str | None,
750        exclude_files: list[Path] | None = None,
751    ) -> None:
752        """
753        Synchronize the contents of two directories using rsync, optionally across remote hosts,
754        while excluding specified files or subdirectories.
755
756        All files and directories in `src_dir` are copied to `dest_dir` except
757        those listed in `exclude_files`. Files are never removed from the destination.
758
759        Args:
760            src_dir (Path): Source directory to sync from.
761            dest_dir (Path): Destination directory to sync to.
762            src_host (str | None): Optional hostname of the source machine if remote;
763                None if the source is local.
764            dest_host (str | None): Optional hostname of the destination machine if remote;
765                None if the destination is local.
766            exclude_files (list[Path] | None): Optional list of absolute file paths to exclude from syncing.
767                These will be converted to paths relative to `src_dir`.
768
769        Raises:
770            QQError: If the rsync command fails for any reason or timeouts.
771        """
772        # convert absolute paths of files to exclude into relative to src_dir
773        relative_excluded = (
774            convert_absolute_to_relative(exclude_files, src_dir)
775            if exclude_files
776            else []
777        )
778
779        command = cls._translate_rsync_excluded_command(
780            src_dir, dest_dir, src_host, dest_host, relative_excluded
781        )
782        logger.debug(f"Rsync command: {command}.")
783
784        cls._run_rsync(src_dir, dest_dir, src_host, dest_host, command)

Synchronize the contents of two directories using rsync, optionally across remote hosts, while excluding specified files or subdirectories.

All files and directories in src_dir are copied to dest_dir except those listed in exclude_files. Files are never removed from the destination.

Arguments:
  • src_dir (Path): Source directory to sync from.
  • dest_dir (Path): Destination directory to sync to.
  • src_host (str | None): Optional hostname of the source machine if remote; None if the source is local.
  • dest_host (str | None): Optional hostname of the destination machine if remote; None if the destination is local.
  • exclude_files (list[Path] | None): Optional list of absolute file paths to exclude from syncing. These will be converted to paths relative to src_dir.
Raises:
  • QQError: If the rsync command fails for any reason or timeouts.
@classmethod
def sync_selected( cls, src_dir: pathlib._local.Path, dest_dir: pathlib._local.Path, src_host: str | None, dest_host: str | None, include_files: list[pathlib._local.Path] | None = None) -> None:
786    @classmethod
787    def sync_selected(
788        cls,
789        src_dir: Path,
790        dest_dir: Path,
791        src_host: str | None,
792        dest_host: str | None,
793        include_files: list[Path] | None = None,
794    ) -> None:
795        """
796        Synchronize only the explicitly selected files and directories from the source
797        to the destination, optionally across remote hosts.
798
799        Only files listed in `include_files` are copied from `src_dir` to `dest_dir`.
800        Files not listed are ignored. Files are never removed from the destination.
801
802        Args:
803            src_dir (Path): Source directory to sync from.
804            dest_dir (Path): Destination directory to sync to.
805            src_host (str | None): Optional hostname of the source machine if remote;
806                None if the source is local.
807            dest_host (str | None): Optional hostname of the destination machine if remote;
808                None if the destination is local.
809            include_files (list[Path] | None): Optional list of absolute file paths to include in syncing.
810                These paths are converted relative to `src_dir`.
811                This argument is optional only for consistency with sync_with_exclusions.
812
813        Raises:
814            QQError: If the rsync command fails or times out.
815        """
816        # convert absolute paths of files to include relative to src_dir
817        relative_included = (
818            convert_absolute_to_relative(include_files, src_dir)
819            if include_files
820            else []
821        )
822
823        command = cls._translate_rsync_included_command(
824            src_dir, dest_dir, src_host, dest_host, relative_included
825        )
826        logger.debug(f"Rsync command: {command}.")
827
828        cls._run_rsync(src_dir, dest_dir, src_host, dest_host, command)

Synchronize only the explicitly selected files and directories from the source to the destination, optionally across remote hosts.

Only files listed in include_files are copied from src_dir to dest_dir. Files not listed are ignored. Files are never removed from the destination.

Arguments:
  • src_dir (Path): Source directory to sync from.
  • dest_dir (Path): Destination directory to sync to.
  • src_host (str | None): Optional hostname of the source machine if remote; None if the source is local.
  • dest_host (str | None): Optional hostname of the destination machine if remote; None if the destination is local.
  • include_files (list[Path] | None): Optional list of absolute file paths to include in syncing. These paths are converted relative to src_dir. This argument is optional only for consistency with sync_with_exclusions.
Raises:
  • QQError: If the rsync command fails or times out.
@classmethod
def transform_resources( cls, queue: str, server: str | None, provided_resources: qq_lib.properties.resources.Resources) -> qq_lib.properties.resources.Resources:
830    @classmethod
831    def transform_resources(
832        cls, queue: str, server: str | None, provided_resources: Resources
833    ) -> Resources:
834        """
835        Transform user-provided Resources into a batch system-specific Resources instance.
836
837        This method takes the resources provided during submission and returns a new
838        Resources object with any necessary modifications or defaults applied for
839        the target batch system. The original `provided_resources` object is not modified.
840
841        Args:
842            queue (str): The name of the queue for which the resources are being adapted.
843            server (str | None): Name of the server on which the queue is located.
844                If `None`, the queue is treated as being located on the current server.
845            provided_resources (Resources): The raw resources specified by the user.
846
847        Returns:
848            Resources: A new Resources instance with batch system-specific adjustments,
849                        fully constructed and validated.
850
851        Raises:
852            QQError: If any of the provided parameters are invalid or inconsistent.
853        """
854        raise NotImplementedError(
855            f"transform_resources method is not implemented for {cls.__name__}"
856        )

Transform user-provided Resources into a batch system-specific Resources instance.

This method takes the resources provided during submission and returns a new Resources object with any necessary modifications or defaults applied for the target batch system. The original provided_resources object is not modified.

Arguments:
  • queue (str): The name of the queue for which the resources are being adapted.
  • server (str | None): Name of the server on which the queue is located. If None, the queue is treated as being located on the current server.
  • provided_resources (Resources): The raw resources specified by the user.
Returns:

Resources: A new Resources instance with batch system-specific adjustments, fully constructed and validated.

Raises:
  • QQError: If any of the provided parameters are invalid or inconsistent.
@classmethod
def is_shared(cls, directory: pathlib._local.Path) -> bool:
858    @classmethod
859    def is_shared(cls, directory: Path) -> bool:
860        """
861        Determine whether a given directory resides on a shared filesystem.
862
863        Args:
864            directory (Path): The directory to check.
865
866        Returns:
867            bool: True if the directory is on a shared filesystem, False if it is local.
868        """
869        # df -l exits with zero if the filesystem is local; otherwise it exits with a non-zero code
870        result = subprocess.run(
871            ["df", "-l", directory],
872            stdout=subprocess.DEVNULL,
873            stderr=subprocess.DEVNULL,
874        )
875
876        return result.returncode != 0

Determine whether a given directory resides on a shared filesystem.

Arguments:
  • directory (Path): The directory to check.
Returns:

bool: True if the directory is on a shared filesystem, False if it is local.

@classmethod
def get_default_resubmit_hosts(cls) -> list[qq_lib.properties.resubmit_host.ResubmitHost]:
878    @classmethod
879    def get_default_resubmit_hosts(cls) -> list[ResubmitHost]:
880        """
881        Get the default job resubmission hosts for this batch system.
882
883        In the default implementation, resubmission from the input machine is attempted.
884
885        Returns:
886            list[ResubmitHost]: A list of resubmission hosts.
887        """
888        return [InputHost()]

Get the default job resubmission hosts for this batch system.

In the default implementation, resubmission from the input machine is attempted.

Returns:

list[ResubmitHost]: A list of resubmission hosts.

@classmethod
def sort_jobs(cls, jobs: 'list[TBatchJob]') -> None:
890    @classmethod
891    def sort_jobs(cls, jobs: list[TBatchJob]) -> None:
892        """
893        Sort a list of batch system jobs by a defined attribute.
894
895        The default implementation sorts the jobs alphabetically by their job ID,
896        as returned by `job.get_id()`. Subclasses may override this method to
897        implement custom sorting logic.
898
899        Args:
900            jobs (list[BatchJobInterface]): A list of batch job objects to be sorted
901                in-place.
902        """
903        jobs.sort(key=lambda job: job.get_id())

Sort a list of batch system jobs by a defined attribute.

The default implementation sorts the jobs alphabetically by their job ID, as returned by job.get_id(). Subclasses may override this method to implement custom sorting logic.

Arguments:
  • jobs (list[BatchJobInterface]): A list of batch job objects to be sorted in-place.
@classmethod
def jobs_presenter_columns_to_show(cls) -> set[str]:
905    @classmethod
906    def jobs_presenter_columns_to_show(cls) -> set[str]:
907        """
908        Get a set of columns that should be shown in the output of JobsPresenter (`qq jobs`)
909        for this batch system.
910
911        In the default implementation, all columns are shown.
912
913        Note that the 'Exit' column is not shown when printing queued and running jobs,
914        even if you specify it here.
915
916        Args:
917            set[str]: Set of column titles that should be shown.
918        """
919        return {
920            "S",
921            "Job ID",
922            "User",
923            "Job Name",
924            "Queue",
925            "NCPUs",
926            "NGPUs",
927            "NNodes",
928            "Times",
929            "Node",
930            "%CPU",
931            "%Mem",
932            "Exit",
933        }

Get a set of columns that should be shown in the output of JobsPresenter (qq jobs) for this batch system.

In the default implementation, all columns are shown.

Note that the 'Exit' column is not shown when printing queued and running jobs, even if you specify it here.

Arguments:
  • set[str]: Set of column titles that should be shown.
class BatchJobInterface(abc.ABC):
 16class BatchJobInterface(ABC):
 17    """
 18    Abstract base class for retrieving and maintaining job information
 19    from a batch scheduling system.
 20
 21    Must support situations where the job information no longer exists.
 22
 23    The implementation of the constructor is arbitrary and should only
 24    be used inside the corresponding implementation of `BatchInterface.get_batch_job`.
 25    """
 26
 27    @abstractmethod
 28    def is_empty(self) -> bool:
 29        """
 30        Check whether the job contains any information.
 31        This should return True if the job does not exist in the batch system.
 32
 33        Returns:
 34            bool: True if the job contains no information.
 35        """
 36
 37    @abstractmethod
 38    def get_id(self) -> str:
 39        """
 40        Return the ID of the job.
 41
 42        Returns:
 43            str: The ID of the job.
 44        """
 45
 46    @abstractmethod
 47    def get_account(self) -> str | None:
 48        """
 49        Return the account under which the job is submitted.
 50
 51        Returns:
 52            str | None: Account associated with the job or None if no
 53            account is defined.
 54        """
 55
 56    @abstractmethod
 57    def update(self) -> None:
 58        """
 59        Refresh the stored job information from the batch system.
 60
 61        Raises:
 62            QQError: If the job cannot be queried or its info updated.
 63        """
 64
 65    @abstractmethod
 66    def get_state(self) -> BatchState:
 67        """
 68        Return the current state of the job as reported by the batch system.
 69
 70        If the job information is no longer available, return `BatchState.UNKNOWN`.
 71
 72        Returns:
 73            BatchState: The job state according to the batch system.
 74        """
 75
 76    @abstractmethod
 77    def get_comment(self) -> str | None:
 78        """
 79        Retrieve the batch system-provided comment for the job.
 80
 81        Returns:
 82            str | None: The job's comment string if available, or None if the
 83            batch system has not attached a comment.
 84        """
 85
 86    @abstractmethod
 87    def get_estimated(self) -> tuple[datetime, str] | None:
 88        """
 89        Retrieve the batch system's estimated job start time and execution node.
 90
 91        Returns:
 92            tuple[datetime, str] | None: A tuple containing:
 93                - datetime: The estimated start time of the job.
 94                - str: The name of the node where the job is expected to run.
 95            Returns None if either estimate is unavailable.
 96        """
 97
 98    @abstractmethod
 99    def get_main_node(self) -> str | None:
100        """
101        Retrieve the hostname of the main execution node for the job.
102
103        Returns:
104            str | None: The hostname of the main execution node, or ``None``
105            if unavailable or not applicable.
106        """
107
108    @abstractmethod
109    def get_nodes(self) -> list[str] | None:
110        """
111        Retrieve the hostnames of all execution nodes allocated for the job.
112
113        Returns:
114            list[str] | None:
115                A list of hostnames or node identifiers used by the job,
116                or `None` if node information is not available.
117        """
118
119    @abstractmethod
120    def get_short_nodes(self) -> list[str] | None:
121        """
122        Retrieve the short hostnames of all execution nodes allocated for the job.
123
124        Returns:
125            list[str] | None:
126                A list of short hostnames used by the job, or `None` if node information
127                is not available.
128        """
129
130    @abstractmethod
131    def get_user(self) -> str | None:
132        """
133        Return the username of the job owner.
134
135        Returns:
136            str | None: Username of the user who owns the job or `None` if not available.
137        """
138
139    @abstractmethod
140    def get_n_cpus(self) -> int | None:
141        """
142        Return the number of CPU cores allocated for the job.
143
144        Returns:
145            int | None: Number of CPUs allocated for the job or `None` if not available.
146        """
147
148    @abstractmethod
149    def get_n_gpus(self) -> int | None:
150        """
151        Return the number of GPUs allocated for the job.
152
153        Returns:
154            int | None: Number of GPUs allocated for the job or `None` if not available.
155        """
156
157    @abstractmethod
158    def get_n_nodes(self) -> int | None:
159        """
160        Return the number of compute nodes assigned to the job.
161
162        Returns:
163            int | None: Number of nodes used by the job or `None` if not available.
164        """
165
166    @abstractmethod
167    def get_mem(self) -> Size | None:
168        """
169        Return the amount of memory allocated for the job.
170
171        Returns:
172            Size | None: Amount of memory allocated for the job or `None` if not available.
173        """
174
175    @abstractmethod
176    def get_name(self) -> str | None:
177        """
178        Return the name of the job.
179
180        Returns:
181            str | None: The name of the submitted job or `None` if not available.
182        """
183
184    @abstractmethod
185    def get_submission_time(self) -> datetime | None:
186        """
187        Return the timestamp when the job was submitted.
188
189        Returns:
190            datetime | None: Time when the job was submitted to the batch system
191            or `None` if not available.
192        """
193
194    @abstractmethod
195    def get_start_time(self) -> datetime | None:
196        """
197        Return the timestamp when the job started execution.
198
199        Returns:
200            datetime | None: Time when the job began running or
201            `None` if the job has not yet started.
202        """
203
204    @abstractmethod
205    def get_completion_time(self) -> datetime | None:
206        """
207        Return the timestamp when the job was completed.
208
209        Returns:
210            datetime | None: Time when the job completed or
211            `None` if the job has not yet completed.
212        """
213
214    @abstractmethod
215    def get_modification_time(self) -> datetime | None:
216        """
217        Return the timestamp at which the job was last modified.
218
219        Returns:
220            datetime | None: Time when the job was last modified or `None`
221            if the information is not available.
222        """
223
224    @abstractmethod
225    def get_walltime(self) -> timedelta | None:
226        """
227        Return the walltime limit of the job.
228
229        Returns:
230            timedelta | None: Walltime for the job or `None` if not available.
231        """
232
233    @abstractmethod
234    def get_queue(self) -> str | None:
235        """
236        Return the submission queue of the job.
237
238        Returns:
239            str | None: The queue this job is part of or `None` if not available.
240        """
241
242    @abstractmethod
243    def get_util_cpu(self) -> int | None:
244        """
245        Return the utilization of requested CPUs in percents (0-100).
246
247        Returns:
248            int | None: Utilization of requested CPUs or `None` if not available.
249        """
250
251    @abstractmethod
252    def get_util_mem(self) -> int | None:
253        """
254        Return the utilization of requested memory in percents (0-100).
255
256        Returns:
257            int | None: Utilization of requested memory or `None` if not available.
258        """
259
260    @abstractmethod
261    def get_exit_code(self) -> int | None:
262        """
263        Return the exit code of the job.
264
265        Returns:
266            int | None: Exit code of the job or `None` if exit code is not assigned.
267        """
268
269    @abstractmethod
270    def get_input_dir(self) -> Path | None:
271        """
272        Return path to the directory from which the job was submitted.
273
274        Returns:
275            Path | None: Path to the submission directory or `None` if not available.
276        """
277
278    @abstractmethod
279    def get_input_machine(self) -> str | None:
280        """
281        Return the hostname of the submission machine.
282
283        Returns:
284            str | None: Hostname of the submission machine or `None` if not available.
285        """
286
287    @abstractmethod
288    def get_info_file(self) -> Path | None:
289        """
290        Return path to the info file associated with this job.
291
292        Returns:
293            Path | None: Path to the qq info file or `None` if
294            this is not a qq job.
295        """
296
297    @abstractmethod
298    def to_yaml(self) -> str:
299        """
300        Return all information about the job from the batch system in YAML format.
301
302        Returns:
303            str: YAML-formatted string of job metadata.
304        """
305
306    @abstractmethod
307    def get_steps(self) -> Sequence[Self]:
308        """
309        Return a list of steps associated with this job.
310
311        Note that job step is represented by BatchJobInterface, but
312        may not contain all the values that a proper BatchJobInterface contains.
313
314        Returns:
315            Sequence[BatchJobInterface]: List of job steps. An empty list if there are none.
316        """
317
318    @abstractmethod
319    def get_step_id(self) -> str | None:
320        """
321        Return the step index if this job is a job step.
322
323        Returns:
324            str | None: Job step index or `None` if this is not a job step.
325        """
326
327    @abstractmethod
328    def is_array_job(self) -> bool:
329        """
330        Return `True` if the job is a top-level array job (not a sub-job).
331
332        Returns:
333            bool: `True` if the job is a top-level array job, else `False`.
334        """
335
336    def is_completed(self) -> bool:
337        """
338        Return `True` if the job is completed according to the batch system.
339
340        By default, a completed job is a job that is FINISHED or FAILED.
341
342        Returns:
343            bool: `True` if the job is completed, else `False`.
344        """
345        return self.get_state() in {BatchState.FINISHED, BatchState.FAILED}

Abstract base class for retrieving and maintaining job information from a batch scheduling system.

Must support situations where the job information no longer exists.

The implementation of the constructor is arbitrary and should only be used inside the corresponding implementation of BatchInterface.get_batch_job.

@abstractmethod
def is_empty(self) -> bool:
27    @abstractmethod
28    def is_empty(self) -> bool:
29        """
30        Check whether the job contains any information.
31        This should return True if the job does not exist in the batch system.
32
33        Returns:
34            bool: True if the job contains no information.
35        """

Check whether the job contains any information. This should return True if the job does not exist in the batch system.

Returns:

bool: True if the job contains no information.

@abstractmethod
def get_id(self) -> str:
37    @abstractmethod
38    def get_id(self) -> str:
39        """
40        Return the ID of the job.
41
42        Returns:
43            str: The ID of the job.
44        """

Return the ID of the job.

Returns:

str: The ID of the job.

@abstractmethod
def get_account(self) -> str | None:
46    @abstractmethod
47    def get_account(self) -> str | None:
48        """
49        Return the account under which the job is submitted.
50
51        Returns:
52            str | None: Account associated with the job or None if no
53            account is defined.
54        """

Return the account under which the job is submitted.

Returns:

str | None: Account associated with the job or None if no account is defined.

@abstractmethod
def update(self) -> None:
56    @abstractmethod
57    def update(self) -> None:
58        """
59        Refresh the stored job information from the batch system.
60
61        Raises:
62            QQError: If the job cannot be queried or its info updated.
63        """

Refresh the stored job information from the batch system.

Raises:
  • QQError: If the job cannot be queried or its info updated.
@abstractmethod
def get_state(self) -> qq_lib.properties.states.BatchState:
65    @abstractmethod
66    def get_state(self) -> BatchState:
67        """
68        Return the current state of the job as reported by the batch system.
69
70        If the job information is no longer available, return `BatchState.UNKNOWN`.
71
72        Returns:
73            BatchState: The job state according to the batch system.
74        """

Return the current state of the job as reported by the batch system.

If the job information is no longer available, return BatchState.UNKNOWN.

Returns:

BatchState: The job state according to the batch system.

@abstractmethod
def get_comment(self) -> str | None:
76    @abstractmethod
77    def get_comment(self) -> str | None:
78        """
79        Retrieve the batch system-provided comment for the job.
80
81        Returns:
82            str | None: The job's comment string if available, or None if the
83            batch system has not attached a comment.
84        """

Retrieve the batch system-provided comment for the job.

Returns:

str | None: The job's comment string if available, or None if the batch system has not attached a comment.

@abstractmethod
def get_estimated(self) -> tuple[datetime.datetime, str] | None:
86    @abstractmethod
87    def get_estimated(self) -> tuple[datetime, str] | None:
88        """
89        Retrieve the batch system's estimated job start time and execution node.
90
91        Returns:
92            tuple[datetime, str] | None: A tuple containing:
93                - datetime: The estimated start time of the job.
94                - str: The name of the node where the job is expected to run.
95            Returns None if either estimate is unavailable.
96        """

Retrieve the batch system's estimated job start time and execution node.

Returns:

tuple[datetime, str] | None: A tuple containing: - datetime: The estimated start time of the job. - str: The name of the node where the job is expected to run. Returns None if either estimate is unavailable.

@abstractmethod
def get_main_node(self) -> str | None:
 98    @abstractmethod
 99    def get_main_node(self) -> str | None:
100        """
101        Retrieve the hostname of the main execution node for the job.
102
103        Returns:
104            str | None: The hostname of the main execution node, or ``None``
105            if unavailable or not applicable.
106        """

Retrieve the hostname of the main execution node for the job.

Returns:

str | None: The hostname of the main execution node, or None if unavailable or not applicable.

@abstractmethod
def get_nodes(self) -> list[str] | None:
108    @abstractmethod
109    def get_nodes(self) -> list[str] | None:
110        """
111        Retrieve the hostnames of all execution nodes allocated for the job.
112
113        Returns:
114            list[str] | None:
115                A list of hostnames or node identifiers used by the job,
116                or `None` if node information is not available.
117        """

Retrieve the hostnames of all execution nodes allocated for the job.

Returns:

list[str] | None: A list of hostnames or node identifiers used by the job, or None if node information is not available.

@abstractmethod
def get_short_nodes(self) -> list[str] | None:
119    @abstractmethod
120    def get_short_nodes(self) -> list[str] | None:
121        """
122        Retrieve the short hostnames of all execution nodes allocated for the job.
123
124        Returns:
125            list[str] | None:
126                A list of short hostnames used by the job, or `None` if node information
127                is not available.
128        """

Retrieve the short hostnames of all execution nodes allocated for the job.

Returns:

list[str] | None: A list of short hostnames used by the job, or None if node information is not available.

@abstractmethod
def get_user(self) -> str | None:
130    @abstractmethod
131    def get_user(self) -> str | None:
132        """
133        Return the username of the job owner.
134
135        Returns:
136            str | None: Username of the user who owns the job or `None` if not available.
137        """

Return the username of the job owner.

Returns:

str | None: Username of the user who owns the job or None if not available.

@abstractmethod
def get_n_cpus(self) -> int | None:
139    @abstractmethod
140    def get_n_cpus(self) -> int | None:
141        """
142        Return the number of CPU cores allocated for the job.
143
144        Returns:
145            int | None: Number of CPUs allocated for the job or `None` if not available.
146        """

Return the number of CPU cores allocated for the job.

Returns:

int | None: Number of CPUs allocated for the job or None if not available.

@abstractmethod
def get_n_gpus(self) -> int | None:
148    @abstractmethod
149    def get_n_gpus(self) -> int | None:
150        """
151        Return the number of GPUs allocated for the job.
152
153        Returns:
154            int | None: Number of GPUs allocated for the job or `None` if not available.
155        """

Return the number of GPUs allocated for the job.

Returns:

int | None: Number of GPUs allocated for the job or None if not available.

@abstractmethod
def get_n_nodes(self) -> int | None:
157    @abstractmethod
158    def get_n_nodes(self) -> int | None:
159        """
160        Return the number of compute nodes assigned to the job.
161
162        Returns:
163            int | None: Number of nodes used by the job or `None` if not available.
164        """

Return the number of compute nodes assigned to the job.

Returns:

int | None: Number of nodes used by the job or None if not available.

@abstractmethod
def get_mem(self) -> qq_lib.properties.size.Size | None:
166    @abstractmethod
167    def get_mem(self) -> Size | None:
168        """
169        Return the amount of memory allocated for the job.
170
171        Returns:
172            Size | None: Amount of memory allocated for the job or `None` if not available.
173        """

Return the amount of memory allocated for the job.

Returns:

Size | None: Amount of memory allocated for the job or None if not available.

@abstractmethod
def get_name(self) -> str | None:
175    @abstractmethod
176    def get_name(self) -> str | None:
177        """
178        Return the name of the job.
179
180        Returns:
181            str | None: The name of the submitted job or `None` if not available.
182        """

Return the name of the job.

Returns:

str | None: The name of the submitted job or None if not available.

@abstractmethod
def get_submission_time(self) -> datetime.datetime | None:
184    @abstractmethod
185    def get_submission_time(self) -> datetime | None:
186        """
187        Return the timestamp when the job was submitted.
188
189        Returns:
190            datetime | None: Time when the job was submitted to the batch system
191            or `None` if not available.
192        """

Return the timestamp when the job was submitted.

Returns:

datetime | None: Time when the job was submitted to the batch system or None if not available.

@abstractmethod
def get_start_time(self) -> datetime.datetime | None:
194    @abstractmethod
195    def get_start_time(self) -> datetime | None:
196        """
197        Return the timestamp when the job started execution.
198
199        Returns:
200            datetime | None: Time when the job began running or
201            `None` if the job has not yet started.
202        """

Return the timestamp when the job started execution.

Returns:

datetime | None: Time when the job began running or None if the job has not yet started.

@abstractmethod
def get_completion_time(self) -> datetime.datetime | None:
204    @abstractmethod
205    def get_completion_time(self) -> datetime | None:
206        """
207        Return the timestamp when the job was completed.
208
209        Returns:
210            datetime | None: Time when the job completed or
211            `None` if the job has not yet completed.
212        """

Return the timestamp when the job was completed.

Returns:

datetime | None: Time when the job completed or None if the job has not yet completed.

@abstractmethod
def get_modification_time(self) -> datetime.datetime | None:
214    @abstractmethod
215    def get_modification_time(self) -> datetime | None:
216        """
217        Return the timestamp at which the job was last modified.
218
219        Returns:
220            datetime | None: Time when the job was last modified or `None`
221            if the information is not available.
222        """

Return the timestamp at which the job was last modified.

Returns:

datetime | None: Time when the job was last modified or None if the information is not available.

@abstractmethod
def get_walltime(self) -> datetime.timedelta | None:
224    @abstractmethod
225    def get_walltime(self) -> timedelta | None:
226        """
227        Return the walltime limit of the job.
228
229        Returns:
230            timedelta | None: Walltime for the job or `None` if not available.
231        """

Return the walltime limit of the job.

Returns:

timedelta | None: Walltime for the job or None if not available.

@abstractmethod
def get_queue(self) -> str | None:
233    @abstractmethod
234    def get_queue(self) -> str | None:
235        """
236        Return the submission queue of the job.
237
238        Returns:
239            str | None: The queue this job is part of or `None` if not available.
240        """

Return the submission queue of the job.

Returns:

str | None: The queue this job is part of or None if not available.

@abstractmethod
def get_util_cpu(self) -> int | None:
242    @abstractmethod
243    def get_util_cpu(self) -> int | None:
244        """
245        Return the utilization of requested CPUs in percents (0-100).
246
247        Returns:
248            int | None: Utilization of requested CPUs or `None` if not available.
249        """

Return the utilization of requested CPUs in percents (0-100).

Returns:

int | None: Utilization of requested CPUs or None if not available.

@abstractmethod
def get_util_mem(self) -> int | None:
251    @abstractmethod
252    def get_util_mem(self) -> int | None:
253        """
254        Return the utilization of requested memory in percents (0-100).
255
256        Returns:
257            int | None: Utilization of requested memory or `None` if not available.
258        """

Return the utilization of requested memory in percents (0-100).

Returns:

int | None: Utilization of requested memory or None if not available.

@abstractmethod
def get_exit_code(self) -> int | None:
260    @abstractmethod
261    def get_exit_code(self) -> int | None:
262        """
263        Return the exit code of the job.
264
265        Returns:
266            int | None: Exit code of the job or `None` if exit code is not assigned.
267        """

Return the exit code of the job.

Returns:

int | None: Exit code of the job or None if exit code is not assigned.

@abstractmethod
def get_input_dir(self) -> pathlib._local.Path | None:
269    @abstractmethod
270    def get_input_dir(self) -> Path | None:
271        """
272        Return path to the directory from which the job was submitted.
273
274        Returns:
275            Path | None: Path to the submission directory or `None` if not available.
276        """

Return path to the directory from which the job was submitted.

Returns:

Path | None: Path to the submission directory or None if not available.

@abstractmethod
def get_input_machine(self) -> str | None:
278    @abstractmethod
279    def get_input_machine(self) -> str | None:
280        """
281        Return the hostname of the submission machine.
282
283        Returns:
284            str | None: Hostname of the submission machine or `None` if not available.
285        """

Return the hostname of the submission machine.

Returns:

str | None: Hostname of the submission machine or None if not available.

@abstractmethod
def get_info_file(self) -> pathlib._local.Path | None:
287    @abstractmethod
288    def get_info_file(self) -> Path | None:
289        """
290        Return path to the info file associated with this job.
291
292        Returns:
293            Path | None: Path to the qq info file or `None` if
294            this is not a qq job.
295        """

Return path to the info file associated with this job.

Returns:

Path | None: Path to the qq info file or None if this is not a qq job.

@abstractmethod
def to_yaml(self) -> str:
297    @abstractmethod
298    def to_yaml(self) -> str:
299        """
300        Return all information about the job from the batch system in YAML format.
301
302        Returns:
303            str: YAML-formatted string of job metadata.
304        """

Return all information about the job from the batch system in YAML format.

Returns:

str: YAML-formatted string of job metadata.

@abstractmethod
def get_steps(self) -> Sequence[typing.Self]:
306    @abstractmethod
307    def get_steps(self) -> Sequence[Self]:
308        """
309        Return a list of steps associated with this job.
310
311        Note that job step is represented by BatchJobInterface, but
312        may not contain all the values that a proper BatchJobInterface contains.
313
314        Returns:
315            Sequence[BatchJobInterface]: List of job steps. An empty list if there are none.
316        """

Return a list of steps associated with this job.

Note that job step is represented by BatchJobInterface, but may not contain all the values that a proper BatchJobInterface contains.

Returns:

Sequence[BatchJobInterface]: List of job steps. An empty list if there are none.

@abstractmethod
def get_step_id(self) -> str | None:
318    @abstractmethod
319    def get_step_id(self) -> str | None:
320        """
321        Return the step index if this job is a job step.
322
323        Returns:
324            str | None: Job step index or `None` if this is not a job step.
325        """

Return the step index if this job is a job step.

Returns:

str | None: Job step index or None if this is not a job step.

@abstractmethod
def is_array_job(self) -> bool:
327    @abstractmethod
328    def is_array_job(self) -> bool:
329        """
330        Return `True` if the job is a top-level array job (not a sub-job).
331
332        Returns:
333            bool: `True` if the job is a top-level array job, else `False`.
334        """

Return True if the job is a top-level array job (not a sub-job).

Returns:

bool: True if the job is a top-level array job, else False.

def is_completed(self) -> bool:
336    def is_completed(self) -> bool:
337        """
338        Return `True` if the job is completed according to the batch system.
339
340        By default, a completed job is a job that is FINISHED or FAILED.
341
342        Returns:
343            bool: `True` if the job is completed, else `False`.
344        """
345        return self.get_state() in {BatchState.FINISHED, BatchState.FAILED}

Return True if the job is completed according to the batch system.

By default, a completed job is a job that is FINISHED or FAILED.

Returns:

bool: True if the job is completed, else False.

class BatchNodeInterface(abc.ABC):
 10class BatchNodeInterface(ABC):
 11    """
 12    Abstract base class for obtaining information about compute nodes.
 13
 14    The implementation of the constructor is arbitrary and should only
 15    be used inside the corresponding implementation of `BatchInterface.get_nodes`.
 16    """
 17
 18    @abstractmethod
 19    def update(self) -> None:
 20        """
 21        Refresh the stored node information from the batch system.
 22
 23        Raises:
 24            QQError: If the node cannot be queried or its info updated.
 25        """
 26
 27    @abstractmethod
 28    def get_name(self) -> str:
 29        """
 30        Retrieve the name of the node.
 31
 32        Returns:
 33            str: The name identifying the node in the batch system.
 34        """
 35
 36    @abstractmethod
 37    def get_n_cpus(self) -> int | None:
 38        """
 39        Retrieve the total number of CPU cores available on the node.
 40
 41        Returns:
 42            int | None: Total CPU core count or `None` if not available.
 43        """
 44
 45    @abstractmethod
 46    def get_n_free_cpus(self) -> int | None:
 47        """
 48        Retrieve the number of currently available (unallocated) CPU cores.
 49
 50        Returns:
 51            int | None: Number of free CPU cores or `None` if not available.
 52        """
 53
 54    @abstractmethod
 55    def get_n_gpus(self) -> int | None:
 56        """
 57        Retrieve the total number of GPUs available on the node.
 58
 59        Returns:
 60            int | None: Total GPU count or `None` if not available..
 61        """
 62
 63    @abstractmethod
 64    def get_n_free_gpus(self) -> int | None:
 65        """
 66        Retrieve the number of currently available (unallocated) GPUs.
 67
 68        Returns:
 69            int | None: Number of free GPUs or `None` if not available.
 70        """
 71
 72    @abstractmethod
 73    def get_cpu_memory(self) -> Size | None:
 74        """
 75        Retrieve the total CPU memory capacity of the node.
 76
 77        Returns:
 78            Size | None: Total CPU memory available on the node or `None` if not available.
 79        """
 80
 81    @abstractmethod
 82    def get_free_cpu_memory(self) -> Size | None:
 83        """
 84        Retrieve the currently available CPU memory.
 85
 86        Returns:
 87            Size | None: Free (unused) CPU memory or `None` if not available.
 88        """
 89
 90    @abstractmethod
 91    def get_gpu_memory(self) -> Size | None:
 92        """
 93        Retrieve the total GPU memory capacity of the node.
 94
 95        Returns:
 96            Size | None: Total GPU memory available or `None` if not available.
 97        """
 98
 99    @abstractmethod
100    def get_free_gpu_memory(self) -> Size | None:
101        """
102        Retrieve the currently available GPU memory.
103
104        Returns:
105            Size | None: Free (unused) GPU memory or `None` if not available.
106        """
107
108    @abstractmethod
109    def get_local_scratch(self) -> Size | None:
110        """
111        Retrieve the total local scratch storage capacity of the node.
112
113        Returns:
114            Size | None: Total size of local scratch space or `None` if not available.
115        """
116
117    @abstractmethod
118    def get_free_local_scratch(self) -> Size | None:
119        """
120        Retrieve the available local scratch storage space.
121
122        Returns:
123            Size | None: Free local scratch space or `None` if not available.
124        """
125
126    @abstractmethod
127    def get_ssd_scratch(self) -> Size | None:
128        """
129        Retrieve the total SSD-based scratch storage capacity.
130
131        Returns:
132            Size | None: Total SSD scratch capacity or `None` if not available.
133        """
134
135    @abstractmethod
136    def get_free_ssd_scratch(self) -> Size | None:
137        """
138        Retrieve the currently available SSD-based scratch storage space.
139
140        Returns:
141            Size | None: Free SSD scratch space or `None` if not available.
142        """
143
144    @abstractmethod
145    def get_shared_scratch(self) -> Size | None:
146        """
147        Retrieve the total capacity of shared scratch storage accessible from the node.
148
149        Returns:
150            Size | None: Total shared scratch capacity or `None` if not available.
151        """
152
153    @abstractmethod
154    def get_free_shared_scratch(self) -> Size | None:
155        """
156        Retrieve the available space in shared scratch storage.
157
158        Returns:
159            Size | None: Free shared scratch space or `None` if not available.
160        """
161
162    @abstractmethod
163    def get_properties(self) -> list[str]:
164        """
165        Get the list of properties or labels assigned to the node.
166
167        Returns:
168            list[str]: List of node property strings.
169        """
170
171    @abstractmethod
172    def is_available_to_user(self, user: str) -> bool:
173        """
174        Check if the node is available to the specified user.
175
176        Args:
177            user (str): The username to check access for.
178
179        Returns:
180            bool: True if the node is up and schedulable, False otherwise.
181        """
182
183    @abstractmethod
184    def to_yaml(self) -> str:
185        """
186        Return all information about the node in YAML format.
187
188        Returns:
189            str: YAML-formatted string of node metadata.
190        """

Abstract base class for obtaining information about compute nodes.

The implementation of the constructor is arbitrary and should only be used inside the corresponding implementation of BatchInterface.get_nodes.

@abstractmethod
def update(self) -> None:
18    @abstractmethod
19    def update(self) -> None:
20        """
21        Refresh the stored node information from the batch system.
22
23        Raises:
24            QQError: If the node cannot be queried or its info updated.
25        """

Refresh the stored node information from the batch system.

Raises:
  • QQError: If the node cannot be queried or its info updated.
@abstractmethod
def get_name(self) -> str:
27    @abstractmethod
28    def get_name(self) -> str:
29        """
30        Retrieve the name of the node.
31
32        Returns:
33            str: The name identifying the node in the batch system.
34        """

Retrieve the name of the node.

Returns:

str: The name identifying the node in the batch system.

@abstractmethod
def get_n_cpus(self) -> int | None:
36    @abstractmethod
37    def get_n_cpus(self) -> int | None:
38        """
39        Retrieve the total number of CPU cores available on the node.
40
41        Returns:
42            int | None: Total CPU core count or `None` if not available.
43        """

Retrieve the total number of CPU cores available on the node.

Returns:

int | None: Total CPU core count or None if not available.

@abstractmethod
def get_n_free_cpus(self) -> int | None:
45    @abstractmethod
46    def get_n_free_cpus(self) -> int | None:
47        """
48        Retrieve the number of currently available (unallocated) CPU cores.
49
50        Returns:
51            int | None: Number of free CPU cores or `None` if not available.
52        """

Retrieve the number of currently available (unallocated) CPU cores.

Returns:

int | None: Number of free CPU cores or None if not available.

@abstractmethod
def get_n_gpus(self) -> int | None:
54    @abstractmethod
55    def get_n_gpus(self) -> int | None:
56        """
57        Retrieve the total number of GPUs available on the node.
58
59        Returns:
60            int | None: Total GPU count or `None` if not available..
61        """

Retrieve the total number of GPUs available on the node.

Returns:

int | None: Total GPU count or None if not available..

@abstractmethod
def get_n_free_gpus(self) -> int | None:
63    @abstractmethod
64    def get_n_free_gpus(self) -> int | None:
65        """
66        Retrieve the number of currently available (unallocated) GPUs.
67
68        Returns:
69            int | None: Number of free GPUs or `None` if not available.
70        """

Retrieve the number of currently available (unallocated) GPUs.

Returns:

int | None: Number of free GPUs or None if not available.

@abstractmethod
def get_cpu_memory(self) -> qq_lib.properties.size.Size | None:
72    @abstractmethod
73    def get_cpu_memory(self) -> Size | None:
74        """
75        Retrieve the total CPU memory capacity of the node.
76
77        Returns:
78            Size | None: Total CPU memory available on the node or `None` if not available.
79        """

Retrieve the total CPU memory capacity of the node.

Returns:

Size | None: Total CPU memory available on the node or None if not available.

@abstractmethod
def get_free_cpu_memory(self) -> qq_lib.properties.size.Size | None:
81    @abstractmethod
82    def get_free_cpu_memory(self) -> Size | None:
83        """
84        Retrieve the currently available CPU memory.
85
86        Returns:
87            Size | None: Free (unused) CPU memory or `None` if not available.
88        """

Retrieve the currently available CPU memory.

Returns:

Size | None: Free (unused) CPU memory or None if not available.

@abstractmethod
def get_gpu_memory(self) -> qq_lib.properties.size.Size | None:
90    @abstractmethod
91    def get_gpu_memory(self) -> Size | None:
92        """
93        Retrieve the total GPU memory capacity of the node.
94
95        Returns:
96            Size | None: Total GPU memory available or `None` if not available.
97        """

Retrieve the total GPU memory capacity of the node.

Returns:

Size | None: Total GPU memory available or None if not available.

@abstractmethod
def get_free_gpu_memory(self) -> qq_lib.properties.size.Size | None:
 99    @abstractmethod
100    def get_free_gpu_memory(self) -> Size | None:
101        """
102        Retrieve the currently available GPU memory.
103
104        Returns:
105            Size | None: Free (unused) GPU memory or `None` if not available.
106        """

Retrieve the currently available GPU memory.

Returns:

Size | None: Free (unused) GPU memory or None if not available.

@abstractmethod
def get_local_scratch(self) -> qq_lib.properties.size.Size | None:
108    @abstractmethod
109    def get_local_scratch(self) -> Size | None:
110        """
111        Retrieve the total local scratch storage capacity of the node.
112
113        Returns:
114            Size | None: Total size of local scratch space or `None` if not available.
115        """

Retrieve the total local scratch storage capacity of the node.

Returns:

Size | None: Total size of local scratch space or None if not available.

@abstractmethod
def get_free_local_scratch(self) -> qq_lib.properties.size.Size | None:
117    @abstractmethod
118    def get_free_local_scratch(self) -> Size | None:
119        """
120        Retrieve the available local scratch storage space.
121
122        Returns:
123            Size | None: Free local scratch space or `None` if not available.
124        """

Retrieve the available local scratch storage space.

Returns:

Size | None: Free local scratch space or None if not available.

@abstractmethod
def get_ssd_scratch(self) -> qq_lib.properties.size.Size | None:
126    @abstractmethod
127    def get_ssd_scratch(self) -> Size | None:
128        """
129        Retrieve the total SSD-based scratch storage capacity.
130
131        Returns:
132            Size | None: Total SSD scratch capacity or `None` if not available.
133        """

Retrieve the total SSD-based scratch storage capacity.

Returns:

Size | None: Total SSD scratch capacity or None if not available.

@abstractmethod
def get_free_ssd_scratch(self) -> qq_lib.properties.size.Size | None:
135    @abstractmethod
136    def get_free_ssd_scratch(self) -> Size | None:
137        """
138        Retrieve the currently available SSD-based scratch storage space.
139
140        Returns:
141            Size | None: Free SSD scratch space or `None` if not available.
142        """

Retrieve the currently available SSD-based scratch storage space.

Returns:

Size | None: Free SSD scratch space or None if not available.

@abstractmethod
def get_shared_scratch(self) -> qq_lib.properties.size.Size | None:
144    @abstractmethod
145    def get_shared_scratch(self) -> Size | None:
146        """
147        Retrieve the total capacity of shared scratch storage accessible from the node.
148
149        Returns:
150            Size | None: Total shared scratch capacity or `None` if not available.
151        """

Retrieve the total capacity of shared scratch storage accessible from the node.

Returns:

Size | None: Total shared scratch capacity or None if not available.

@abstractmethod
def get_free_shared_scratch(self) -> qq_lib.properties.size.Size | None:
153    @abstractmethod
154    def get_free_shared_scratch(self) -> Size | None:
155        """
156        Retrieve the available space in shared scratch storage.
157
158        Returns:
159            Size | None: Free shared scratch space or `None` if not available.
160        """

Retrieve the available space in shared scratch storage.

Returns:

Size | None: Free shared scratch space or None if not available.

@abstractmethod
def get_properties(self) -> list[str]:
162    @abstractmethod
163    def get_properties(self) -> list[str]:
164        """
165        Get the list of properties or labels assigned to the node.
166
167        Returns:
168            list[str]: List of node property strings.
169        """

Get the list of properties or labels assigned to the node.

Returns:

list[str]: List of node property strings.

@abstractmethod
def is_available_to_user(self, user: str) -> bool:
171    @abstractmethod
172    def is_available_to_user(self, user: str) -> bool:
173        """
174        Check if the node is available to the specified user.
175
176        Args:
177            user (str): The username to check access for.
178
179        Returns:
180            bool: True if the node is up and schedulable, False otherwise.
181        """

Check if the node is available to the specified user.

Arguments:
  • user (str): The username to check access for.
Returns:

bool: True if the node is up and schedulable, False otherwise.

@abstractmethod
def to_yaml(self) -> str:
183    @abstractmethod
184    def to_yaml(self) -> str:
185        """
186        Return all information about the node in YAML format.
187
188        Returns:
189            str: YAML-formatted string of node metadata.
190        """

Return all information about the node in YAML format.

Returns:

str: YAML-formatted string of node metadata.

class BatchQueueInterface(abc.ABC):
 11class BatchQueueInterface(ABC):
 12    """
 13    Abstract base class for retrieving and maintaining queue information
 14    from a batch scheduling system.
 15
 16    The implementation of the constructor is arbitrary and should only
 17    be used inside the corresponding implementation of `BatchInterface.get_queues`.
 18    """
 19
 20    @abstractmethod
 21    def update(self) -> None:
 22        """
 23        Refresh the stored queue information from the batch system.
 24
 25        Raises:
 26            QQError: If the queue cannot be queried or its info updated.
 27        """
 28
 29    @abstractmethod
 30    def get_name(self) -> str:
 31        """
 32        Retrieve the name of the queue.
 33
 34        Returns:
 35            str: The name identifying this queue in the batch system.
 36        """
 37
 38    @abstractmethod
 39    def get_priority(self) -> str | None:
 40        """
 41        Retrieve the scheduling priority of the queue.
 42
 43        Returns:
 44            str | None: The queue priority, or None if priority information
 45            is not available.
 46        """
 47
 48    @abstractmethod
 49    def get_total_jobs(self) -> int | None:
 50        """
 51        Retrieve the total number of jobs currently in the queue.
 52
 53        Returns:
 54            int | None: The total count of jobs, regardless of status
 55            or `None` if the information is not available.
 56        """
 57
 58    @abstractmethod
 59    def get_running_jobs(self) -> int | None:
 60        """
 61        Retrieve the number of jobs currently running in the queue.
 62
 63        Returns:
 64            int | None: The number of running jobs or `None`
 65            if the information is not available.
 66        """
 67
 68    @abstractmethod
 69    def get_queued_jobs(self) -> int | None:
 70        """
 71        Retrieve the number of jobs waiting to start in the queue.
 72
 73        Returns:
 74            int | None: The number of queued jobs or `None`
 75            if the information is not available.
 76        """
 77
 78    @abstractmethod
 79    def get_other_jobs(self) -> int | None:
 80        """
 81        Retrieve the number of jobs in other states (non-running and non-queued).
 82
 83        Returns:
 84            int | None: The number of jobs that are neither running nor queued,
 85            such as exiting or suspended jobs.
 86            Returns `None` if the information is not available.
 87        """
 88
 89    @abstractmethod
 90    def get_max_walltime(self) -> timedelta | None:
 91        """
 92        Retrieve the maximum walltime allowed for jobs in the queue.
 93
 94        Returns:
 95            timedelta | None: The walltime limit, or None if unlimited or unknown.
 96        """
 97
 98    @abstractmethod
 99    def get_max_n_nodes(self) -> int | None:
100        """
101        Retrieve the maximum number of nodes that can be requested in the queue.
102
103        Returns:
104            int | None: The maximum number of nodes that can be requested, or None if unlimited or unknown.
105        """
106
107    @abstractmethod
108    def get_comment(self) -> str | None:
109        """
110        Retrieve the comment or description associated with the queue.
111
112        Returns:
113            str | None: The human-readable comment or note about the queue
114            or `None` if the information is not available.
115        """
116
117    @abstractmethod
118    def is_available_to_user(self, user: str) -> bool:
119        """
120        Check whether the specified user has access to this queue.
121
122        Args:
123            user (str): The username to check access for.
124
125        Returns:
126            bool: True if the user can submit jobs to this queue, False otherwise.
127        """
128
129    @abstractmethod
130    def get_destinations(self) -> list[str]:
131        """
132        Retrieve all destinations available for this queue route.
133
134        Returns:
135            list[str]: A list of destination queue names associated with the queue.
136        """
137
138    @abstractmethod
139    def from_route_only(self) -> bool:
140        """
141        Determine whether this queue can only be accessed via a route.
142
143        Returns:
144            bool: True if the queue is accessible exclusively through a route,
145            False otherwise.
146        """
147
148    @abstractmethod
149    def to_yaml(self) -> str:
150        """
151        Return all information about the queue from the batch system in YAML format.
152
153        Returns:
154            str: YAML-formatted string of queue metadata.
155        """
156
157    @abstractmethod
158    def get_default_resources(self) -> Resources:
159        """
160        Return the default resource definitions for this queue.
161
162        Returns:
163            Resources: Default resources allocated for jobs submitted to this queue.
164        """

Abstract base class for retrieving and maintaining queue information from a batch scheduling system.

The implementation of the constructor is arbitrary and should only be used inside the corresponding implementation of BatchInterface.get_queues.

@abstractmethod
def update(self) -> None:
20    @abstractmethod
21    def update(self) -> None:
22        """
23        Refresh the stored queue information from the batch system.
24
25        Raises:
26            QQError: If the queue cannot be queried or its info updated.
27        """

Refresh the stored queue information from the batch system.

Raises:
  • QQError: If the queue cannot be queried or its info updated.
@abstractmethod
def get_name(self) -> str:
29    @abstractmethod
30    def get_name(self) -> str:
31        """
32        Retrieve the name of the queue.
33
34        Returns:
35            str: The name identifying this queue in the batch system.
36        """

Retrieve the name of the queue.

Returns:

str: The name identifying this queue in the batch system.

@abstractmethod
def get_priority(self) -> str | None:
38    @abstractmethod
39    def get_priority(self) -> str | None:
40        """
41        Retrieve the scheduling priority of the queue.
42
43        Returns:
44            str | None: The queue priority, or None if priority information
45            is not available.
46        """

Retrieve the scheduling priority of the queue.

Returns:

str | None: The queue priority, or None if priority information is not available.

@abstractmethod
def get_total_jobs(self) -> int | None:
48    @abstractmethod
49    def get_total_jobs(self) -> int | None:
50        """
51        Retrieve the total number of jobs currently in the queue.
52
53        Returns:
54            int | None: The total count of jobs, regardless of status
55            or `None` if the information is not available.
56        """

Retrieve the total number of jobs currently in the queue.

Returns:

int | None: The total count of jobs, regardless of status or None if the information is not available.

@abstractmethod
def get_running_jobs(self) -> int | None:
58    @abstractmethod
59    def get_running_jobs(self) -> int | None:
60        """
61        Retrieve the number of jobs currently running in the queue.
62
63        Returns:
64            int | None: The number of running jobs or `None`
65            if the information is not available.
66        """

Retrieve the number of jobs currently running in the queue.

Returns:

int | None: The number of running jobs or None if the information is not available.

@abstractmethod
def get_queued_jobs(self) -> int | None:
68    @abstractmethod
69    def get_queued_jobs(self) -> int | None:
70        """
71        Retrieve the number of jobs waiting to start in the queue.
72
73        Returns:
74            int | None: The number of queued jobs or `None`
75            if the information is not available.
76        """

Retrieve the number of jobs waiting to start in the queue.

Returns:

int | None: The number of queued jobs or None if the information is not available.

@abstractmethod
def get_other_jobs(self) -> int | None:
78    @abstractmethod
79    def get_other_jobs(self) -> int | None:
80        """
81        Retrieve the number of jobs in other states (non-running and non-queued).
82
83        Returns:
84            int | None: The number of jobs that are neither running nor queued,
85            such as exiting or suspended jobs.
86            Returns `None` if the information is not available.
87        """

Retrieve the number of jobs in other states (non-running and non-queued).

Returns:

int | None: The number of jobs that are neither running nor queued, such as exiting or suspended jobs. Returns None if the information is not available.

@abstractmethod
def get_max_walltime(self) -> datetime.timedelta | None:
89    @abstractmethod
90    def get_max_walltime(self) -> timedelta | None:
91        """
92        Retrieve the maximum walltime allowed for jobs in the queue.
93
94        Returns:
95            timedelta | None: The walltime limit, or None if unlimited or unknown.
96        """

Retrieve the maximum walltime allowed for jobs in the queue.

Returns:

timedelta | None: The walltime limit, or None if unlimited or unknown.

@abstractmethod
def get_max_n_nodes(self) -> int | None:
 98    @abstractmethod
 99    def get_max_n_nodes(self) -> int | None:
100        """
101        Retrieve the maximum number of nodes that can be requested in the queue.
102
103        Returns:
104            int | None: The maximum number of nodes that can be requested, or None if unlimited or unknown.
105        """

Retrieve the maximum number of nodes that can be requested in the queue.

Returns:

int | None: The maximum number of nodes that can be requested, or None if unlimited or unknown.

@abstractmethod
def get_comment(self) -> str | None:
107    @abstractmethod
108    def get_comment(self) -> str | None:
109        """
110        Retrieve the comment or description associated with the queue.
111
112        Returns:
113            str | None: The human-readable comment or note about the queue
114            or `None` if the information is not available.
115        """

Retrieve the comment or description associated with the queue.

Returns:

str | None: The human-readable comment or note about the queue or None if the information is not available.

@abstractmethod
def is_available_to_user(self, user: str) -> bool:
117    @abstractmethod
118    def is_available_to_user(self, user: str) -> bool:
119        """
120        Check whether the specified user has access to this queue.
121
122        Args:
123            user (str): The username to check access for.
124
125        Returns:
126            bool: True if the user can submit jobs to this queue, False otherwise.
127        """

Check whether the specified user has access to this queue.

Arguments:
  • user (str): The username to check access for.
Returns:

bool: True if the user can submit jobs to this queue, False otherwise.

@abstractmethod
def get_destinations(self) -> list[str]:
129    @abstractmethod
130    def get_destinations(self) -> list[str]:
131        """
132        Retrieve all destinations available for this queue route.
133
134        Returns:
135            list[str]: A list of destination queue names associated with the queue.
136        """

Retrieve all destinations available for this queue route.

Returns:

list[str]: A list of destination queue names associated with the queue.

@abstractmethod
def from_route_only(self) -> bool:
138    @abstractmethod
139    def from_route_only(self) -> bool:
140        """
141        Determine whether this queue can only be accessed via a route.
142
143        Returns:
144            bool: True if the queue is accessible exclusively through a route,
145            False otherwise.
146        """

Determine whether this queue can only be accessed via a route.

Returns:

bool: True if the queue is accessible exclusively through a route, False otherwise.

@abstractmethod
def to_yaml(self) -> str:
148    @abstractmethod
149    def to_yaml(self) -> str:
150        """
151        Return all information about the queue from the batch system in YAML format.
152
153        Returns:
154            str: YAML-formatted string of queue metadata.
155        """

Return all information about the queue from the batch system in YAML format.

Returns:

str: YAML-formatted string of queue metadata.

@abstractmethod
def get_default_resources(self) -> qq_lib.properties.resources.Resources:
157    @abstractmethod
158    def get_default_resources(self) -> Resources:
159        """
160        Return the default resource definitions for this queue.
161
162        Returns:
163            Resources: Default resources allocated for jobs submitted to this queue.
164        """

Return the default resource definitions for this queue.

Returns:

Resources: Default resources allocated for jobs submitted to this queue.

type AnyBatchClass = type[BatchInterface[typing.Any, typing.Any, typing.Any]]