qq_lib.batch.slurmit4i

SlurmIT4I backend for qq: job submission, monitoring, and IT4I-specific scratch and resource handling.

This module provides qq's full integration with the Slurm batch system as configured on IT4Innovations clusters (e.g., Karolina, Barbora). It extends the generic Slurm backend with all IT4I-specific behavior:

  • SlurmIT4I, the batch-system backend implementing job submission, killing, resource translation, local/remote file access, scratch-directory creation, and work-directory selection logic.
 1# Released under MIT License.
 2# Copyright (c) 2025-2026 Ladislav Bartos and Robert Vacha Lab
 3
 4"""
 5SlurmIT4I backend for qq: job submission, monitoring, and IT4I-specific
 6scratch and resource handling.
 7
 8This module provides qq's full integration with the Slurm batch system as
 9configured on IT4Innovations clusters (e.g., Karolina, Barbora). It extends the
10generic Slurm backend with all IT4I-specific behavior:
11
12- `SlurmIT4I`, the batch-system backend implementing job submission, killing,
13  resource translation, local/remote file access, scratch-directory creation,
14  and work-directory selection logic.
15"""
16
17from .slurm import SlurmIT4I
18
19__all__ = [
20    "SlurmIT4I",
21]
class SlurmIT4I(qq_lib.batch.interface.interface.BatchInterface[qq_lib.batch.slurm.job.SlurmJob, qq_lib.batch.slurm.queue.SlurmQueue, qq_lib.batch.slurm.node.SlurmNode]):
 23class SlurmIT4I(Slurm):
 24    """
 25    Implementation of BatchInterface for Slurm on IT4I clusters.
 26    """
 27
 28    # all scratch directory types supported by SlurmIT4I
 29    SUPPORTED_SCRATCHES = ["scratch"]
 30
 31    @classmethod
 32    def env_name(cls) -> str:
 33        return "SlurmIT4I"
 34
 35    @classmethod
 36    def is_available(cls) -> bool:
 37        return shutil.which("it4ifree") is not None
 38
 39    @classmethod
 40    def create_work_dir_on_scratch(cls, job_id: str) -> Path:
 41        if not (account := os.environ.get(CFG.env_vars.slurm_job_account)):
 42            raise QQError(f"No account is defined for job '{job_id}'.")
 43
 44        user = getpass.getuser()
 45
 46        # we attempt to create the scratch directory multiple times in different user directory;
 47        # if the user directory is already created but the user does not have permissions
 48        # to write into it, we append a number to the user's name and try creating a new directory
 49        last_exception = None
 50        for attempt in range(CFG.slurm_it4i_options.scratch_dir_attempts):
 51            user_component = (
 52                user if attempt == 0 else f"{user}{attempt + 1}"
 53            )  # appended number is 2 for the second attempt
 54
 55            scratch = Path(
 56                f"/scratch/project/{account.lower()}/{user_component}/qq-jobs/job_{job_id}"
 57            )
 58
 59            try:
 60                scratch.mkdir(parents=True, exist_ok=True)
 61                return scratch
 62            except Exception as e:
 63                last_exception = e
 64
 65        # if all attempts failed
 66        raise QQError(
 67            f"Could not create a working directory on scratch for job '{job_id}' after {CFG.slurm_it4i_options.scratch_dir_attempts} attempts: {last_exception}"
 68        ) from last_exception
 69
 70    @classmethod
 71    def get_supported_work_dir_types(cls) -> list[str]:
 72        return cls.SUPPORTED_SCRATCHES + [
 73            "input_dir",
 74            "job_dir",  # same as input_dir
 75        ]
 76
 77    @classmethod
 78    def navigate_to_destination(cls, host: str, directory: Path) -> None:
 79        logger.info(
 80            f"Host '{host}' is not reachable in this environment. Navigating to '{directory}' on the current machine."
 81        )
 82        BatchInterface._navigate_same_host(directory)
 83
 84    @classmethod
 85    def read_remote_file(cls, host: str, file: Path) -> str:
 86        # file is always on shared storage
 87        _ = host
 88        try:
 89            return file.read_text()
 90        except Exception as e:
 91            raise QQError(f"Could not read file '{file}': {e}.") from e
 92
 93    @classmethod
 94    def write_remote_file(cls, host: str, file: Path, content: str) -> None:
 95        # file is always on shared storage
 96        _ = host
 97        try:
 98            file.write_text(content)
 99        except Exception as e:
100            raise QQError(f"Could not write file '{file}': {e}.") from e
101
102    @classmethod
103    def make_remote_dir(cls, host: str, directory: Path) -> None:
104        # directory is always on shared storage
105        _ = host
106        try:
107            directory.mkdir(exist_ok=True)
108        except Exception as e:
109            raise QQError(f"Could not create a directory '{directory}': {e}.") from e
110
111    @classmethod
112    def list_remote_dir(cls, host: str, directory: Path) -> list[Path]:
113        # directory is always on shared storage
114        _ = host
115        try:
116            return list(directory.iterdir())
117        except Exception as e:
118            raise QQError(f"Could not list a directory '{directory}': {e}.") from e
119
120    @classmethod
121    def delete_remote_dir(cls, host: str, directory: Path) -> None:
122        # directory is always on shared storage
123        _ = host
124        try:
125            shutil.rmtree(directory)
126        except Exception as e:
127            raise QQError(f"Could not delete directory '{directory}': {e}.") from e
128
129    @classmethod
130    def move_remote_files(
131        cls, host: str, files: list[Path], moved_files: list[Path]
132    ) -> None:
133        if len(files) != len(moved_files):
134            raise QQError(
135                "The provided 'files' and 'moved_files' must have the same length."
136            )
137
138        # always on shared storage
139        _ = host
140        for src, dst in zip(files, moved_files):
141            shutil.move(str(src), str(dst))
142
143    @classmethod
144    def sync_with_exclusions(
145        cls,
146        src_dir: Path,
147        dest_dir: Path,
148        src_host: str | None,
149        dest_host: str | None,
150        exclude_files: list[Path] | None = None,
151    ) -> None:
152        # always on shared storage
153        _ = src_host
154        _ = dest_host
155        BatchInterface.sync_with_exclusions(
156            src_dir, dest_dir, None, None, exclude_files
157        )
158
159    @classmethod
160    def sync_selected(
161        cls,
162        src_dir: Path,
163        dest_dir: Path,
164        src_host: str | None,
165        dest_host: str | None,
166        include_files: list[Path] | None = None,
167    ) -> None:
168        # always on shared storage
169        _ = src_host
170        _ = dest_host
171        BatchInterface.sync_selected(src_dir, dest_dir, None, None, include_files)
172
173    @classmethod
174    def transform_resources(
175        cls, queue: str, server: str | None, provided_resources: Resources
176    ) -> Resources:
177        # server is unused
178        _ = server
179
180        # default resources of the queue
181        default_queue_resources = SlurmQueue(queue).get_default_resources()
182        # default server or hard-coded resources
183        default_batch_resources = cls._get_default_server_resources()
184
185        # fill in default parameters
186        resources = Resources.merge_resources(
187            provided_resources, default_queue_resources, default_batch_resources
188        )
189        if not resources.work_dir:
190            raise QQError(
191                "Work-dir is not set after filling in default attributes. This is a bug."
192            )
193
194        if provided_resources.work_size_per_cpu or provided_resources.work_size:
195            logger.warning(
196                "Setting work-size is not supported in this environment. Working directory has a virtually unlimited capacity."
197            )
198
199        if not any(
200            equals_normalized(resources.work_dir, dir)
201            for dir in cls.get_supported_work_dir_types()
202        ):
203            raise QQError(
204                f"Unknown working directory type specified: work-dir='{resources.work_dir}'. Supported types for {cls.env_name()} are: {' '.join(cls.get_supported_work_dir_types())}."
205            )
206
207        return resources
208
209    @classmethod
210    def is_shared(cls, directory: Path) -> bool:
211        _ = directory
212        # always on shared storage
213        return True
214
215    @classmethod
216    def get_default_resubmit_hosts(cls) -> list[ResubmitHost]:
217        return [WorkHost()]
218
219    @classmethod
220    def _get_default_resources(cls) -> Resources:
221        return Resources(
222            nnodes=1,
223            ncpus_per_node=128,
224            mem_per_cpu="1gb",
225            work_dir="scratch",
226            walltime="1d",
227        )

Implementation of BatchInterface for Slurm on IT4I clusters.

SUPPORTED_SCRATCHES = ['scratch']
@classmethod
def env_name(cls) -> str:
31    @classmethod
32    def env_name(cls) -> str:
33        return "SlurmIT4I"

Return the name of the batch system environment.

Returns:

str: The batch system name.

@classmethod
def is_available(cls) -> bool:
35    @classmethod
36    def is_available(cls) -> bool:
37        return shutil.which("it4ifree") is not None

Determine whether the batch system is available on the current host.

Implementations typically verify this by checking for the presence of required commands or other environment-specific indicators.

Returns:

bool: True if the batch system is available, False otherwise.

@classmethod
def create_work_dir_on_scratch(cls, job_id: str) -> pathlib._local.Path:
39    @classmethod
40    def create_work_dir_on_scratch(cls, job_id: str) -> Path:
41        if not (account := os.environ.get(CFG.env_vars.slurm_job_account)):
42            raise QQError(f"No account is defined for job '{job_id}'.")
43
44        user = getpass.getuser()
45
46        # we attempt to create the scratch directory multiple times in different user directory;
47        # if the user directory is already created but the user does not have permissions
48        # to write into it, we append a number to the user's name and try creating a new directory
49        last_exception = None
50        for attempt in range(CFG.slurm_it4i_options.scratch_dir_attempts):
51            user_component = (
52                user if attempt == 0 else f"{user}{attempt + 1}"
53            )  # appended number is 2 for the second attempt
54
55            scratch = Path(
56                f"/scratch/project/{account.lower()}/{user_component}/qq-jobs/job_{job_id}"
57            )
58
59            try:
60                scratch.mkdir(parents=True, exist_ok=True)
61                return scratch
62            except Exception as e:
63                last_exception = e
64
65        # if all attempts failed
66        raise QQError(
67            f"Could not create a working directory on scratch for job '{job_id}' after {CFG.slurm_it4i_options.scratch_dir_attempts} attempts: {last_exception}"
68        ) from last_exception

Create the working directory on scratch for the given job.

Arguments:
  • job_id (int): Unique identifier of the job.
Returns:

Path: Absolute path to the working directory on scratch.

Raises:
  • QQError: If the working directory could not be created.
@classmethod
def get_supported_work_dir_types(cls) -> list[str]:
70    @classmethod
71    def get_supported_work_dir_types(cls) -> list[str]:
72        return cls.SUPPORTED_SCRATCHES + [
73            "input_dir",
74            "job_dir",  # same as input_dir
75        ]

Retrieve the list of supported types of working directories (i.e., strings that can be used with the --work-dir option).

Returns:

list[str]: A list of supported types of working directories.

@classmethod
def navigate_to_destination(cls, host: str, directory: pathlib._local.Path) -> None:
77    @classmethod
78    def navigate_to_destination(cls, host: str, directory: Path) -> None:
79        logger.info(
80            f"Host '{host}' is not reachable in this environment. Navigating to '{directory}' on the current machine."
81        )
82        BatchInterface._navigate_same_host(directory)

Open a new terminal on the specified host and change the working directory to the given path, handing control over to the user.

Default behavior:
  • If the target host is different from the current host, SSH is used to connect and cd is executed to switch to the directory. Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.
  • If the target host matches the current host, only cd is used.

A new terminal should always be opened, regardless of the host.

Arguments:
  • host (str): Hostname where the directory is located.
  • directory (Path): Directory path to navigate to.
Raises:
  • QQError: If the navigation fails.
@classmethod
def read_remote_file(cls, host: str, file: pathlib._local.Path) -> str:
84    @classmethod
85    def read_remote_file(cls, host: str, file: Path) -> str:
86        # file is always on shared storage
87        _ = host
88        try:
89            return file.read_text()
90        except Exception as e:
91            raise QQError(f"Could not read file '{file}': {e}.") from e

Read the contents of a file on a remote host and return it as a string.

The default implementation uses SSH to retrieve the file contents. This approach may be inefficient on shared storage or high-latency networks. Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.

Subclasses should override this method to provide a more efficient implementation if possible.

Arguments:
  • host (str): The hostname of the remote machine where the file resides.
  • file (Path): The path to the file on the remote host.
Returns:

str: The contents of the remote file.

Raises:
  • QQError: If the file cannot be read or SSH fails.
@classmethod
def write_remote_file(cls, host: str, file: pathlib._local.Path, content: str) -> None:
 93    @classmethod
 94    def write_remote_file(cls, host: str, file: Path, content: str) -> None:
 95        # file is always on shared storage
 96        _ = host
 97        try:
 98            file.write_text(content)
 99        except Exception as e:
100            raise QQError(f"Could not write file '{file}': {e}.") from e

Write the given content to a file on a remote host, overwriting it if it exists.

The default implementation uses SSH to send the content to the remote file. This approach may be inefficient on shared storage or high-latency networks. Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.

Subclasses should override this method to provide a more efficient implementation if possible.

Arguments:
  • host (str): The hostname of the remote machine where the file resides.
  • file (Path): The path to the file on the remote host.
  • content (str): The content to write to the remote file.
Raises:
  • QQError: If the file cannot be written or SSH fails.
@classmethod
def make_remote_dir(cls, host: str, directory: pathlib._local.Path) -> None:
102    @classmethod
103    def make_remote_dir(cls, host: str, directory: Path) -> None:
104        # directory is always on shared storage
105        _ = host
106        try:
107            directory.mkdir(exist_ok=True)
108        except Exception as e:
109            raise QQError(f"Could not create a directory '{directory}': {e}.") from e

Create a directory at the specified path on a remote host.

The default implementation uses SSH to run mkdir on the remote host. This approach may be inefficient on shared storage or high-latency networks. Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.

Subclasses should override this method to provide a more efficient implementation if possible.

Arguments:
  • host (str): The hostname of the remote machine where the directory should be created.
  • directory (Path): The path of the directory to create on the remote host.
Raises:
  • QQError: If the directory cannot be created but does not already exist or the SSH command fails.
@classmethod
def list_remote_dir( cls, host: str, directory: pathlib._local.Path) -> list[pathlib._local.Path]:
111    @classmethod
112    def list_remote_dir(cls, host: str, directory: Path) -> list[Path]:
113        # directory is always on shared storage
114        _ = host
115        try:
116            return list(directory.iterdir())
117        except Exception as e:
118            raise QQError(f"Could not list a directory '{directory}': {e}.") from e

List all files and directories (absolute paths) in the specified directory on a remote host.

The default implementation uses SSH to run ls -A on the remote host. This approach may be inefficient on shared storage or high-latency networks. Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.

Subclasses should override this method to provide a more efficient implementation if possible.

Arguments:
  • host (str): The hostname of the remote machine where the directory resides.
  • directory (Path): The remote directory to list.
Returns:

list[Path]: A list of Path objects representing the entries inside the directory. Entries are relative to the given directory.

Raises:
  • QQError: If the directory cannot be listed or the SSH command fails.
@classmethod
def delete_remote_dir(cls, host: str, directory: pathlib._local.Path) -> None:
120    @classmethod
121    def delete_remote_dir(cls, host: str, directory: Path) -> None:
122        # directory is always on shared storage
123        _ = host
124        try:
125            shutil.rmtree(directory)
126        except Exception as e:
127            raise QQError(f"Could not delete directory '{directory}': {e}.") from e

Delete a directory on a remote host.

The default implementation uses SSH to run rm -r on the remote host. This approach may be inefficient on shared storage or high-latency networks. Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.

Subclasses should override this method to provide a more efficient implementation if possible.

Arguments:
  • host (str): The hostname of the remote machine where the directory resides.
  • directory (Path): The remote directory to delete.
Raises:
  • QQError: If the directory cannot be deleted or the SSH command fails.
@classmethod
def move_remote_files( cls, host: str, files: list[pathlib._local.Path], moved_files: list[pathlib._local.Path]) -> None:
129    @classmethod
130    def move_remote_files(
131        cls, host: str, files: list[Path], moved_files: list[Path]
132    ) -> None:
133        if len(files) != len(moved_files):
134            raise QQError(
135                "The provided 'files' and 'moved_files' must have the same length."
136            )
137
138        # always on shared storage
139        _ = host
140        for src, dst in zip(files, moved_files):
141            shutil.move(str(src), str(dst))

Move files on a remote host from their current paths to new paths.

The default implementation uses SSH to run a sequence of mv commands on the remote host. This approach may be inefficient on shared storage or high-latency networks. Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.

Subclasses should override this method to provide a more efficient implementation if possible.

Arguments:
  • host (str): The hostname of the remote machine where the files reside.
  • files (list[Path]): A list of source file paths on the remote host.
  • moved_files (list[Path]): A list of destination file paths on the remote host. Must be the same length as files.
Raises:
  • QQError: If the SSH command fails, the files cannot be moved or the length of files does not match the length of moved_files.
@classmethod
def sync_with_exclusions( cls, src_dir: pathlib._local.Path, dest_dir: pathlib._local.Path, src_host: str | None, dest_host: str | None, exclude_files: list[pathlib._local.Path] | None = None) -> None:
143    @classmethod
144    def sync_with_exclusions(
145        cls,
146        src_dir: Path,
147        dest_dir: Path,
148        src_host: str | None,
149        dest_host: str | None,
150        exclude_files: list[Path] | None = None,
151    ) -> None:
152        # always on shared storage
153        _ = src_host
154        _ = dest_host
155        BatchInterface.sync_with_exclusions(
156            src_dir, dest_dir, None, None, exclude_files
157        )

Synchronize the contents of two directories using rsync, optionally across remote hosts, while excluding specified files or subdirectories.

All files and directories in src_dir are copied to dest_dir except those listed in exclude_files. Files are never removed from the destination.

Arguments:
  • src_dir (Path): Source directory to sync from.
  • dest_dir (Path): Destination directory to sync to.
  • src_host (str | None): Optional hostname of the source machine if remote; None if the source is local.
  • dest_host (str | None): Optional hostname of the destination machine if remote; None if the destination is local.
  • exclude_files (list[Path] | None): Optional list of absolute file paths to exclude from syncing. These will be converted to paths relative to src_dir.
Raises:
  • QQError: If the rsync command fails for any reason or timeouts.
@classmethod
def sync_selected( cls, src_dir: pathlib._local.Path, dest_dir: pathlib._local.Path, src_host: str | None, dest_host: str | None, include_files: list[pathlib._local.Path] | None = None) -> None:
159    @classmethod
160    def sync_selected(
161        cls,
162        src_dir: Path,
163        dest_dir: Path,
164        src_host: str | None,
165        dest_host: str | None,
166        include_files: list[Path] | None = None,
167    ) -> None:
168        # always on shared storage
169        _ = src_host
170        _ = dest_host
171        BatchInterface.sync_selected(src_dir, dest_dir, None, None, include_files)

Synchronize only the explicitly selected files and directories from the source to the destination, optionally across remote hosts.

Only files listed in include_files are copied from src_dir to dest_dir. Files not listed are ignored. Files are never removed from the destination.

Arguments:
  • src_dir (Path): Source directory to sync from.
  • dest_dir (Path): Destination directory to sync to.
  • src_host (str | None): Optional hostname of the source machine if remote; None if the source is local.
  • dest_host (str | None): Optional hostname of the destination machine if remote; None if the destination is local.
  • include_files (list[Path] | None): Optional list of absolute file paths to include in syncing. These paths are converted relative to src_dir. This argument is optional only for consistency with sync_with_exclusions.
Raises:
  • QQError: If the rsync command fails or times out.
@classmethod
def transform_resources( cls, queue: str, server: str | None, provided_resources: qq_lib.properties.resources.Resources) -> qq_lib.properties.resources.Resources:
173    @classmethod
174    def transform_resources(
175        cls, queue: str, server: str | None, provided_resources: Resources
176    ) -> Resources:
177        # server is unused
178        _ = server
179
180        # default resources of the queue
181        default_queue_resources = SlurmQueue(queue).get_default_resources()
182        # default server or hard-coded resources
183        default_batch_resources = cls._get_default_server_resources()
184
185        # fill in default parameters
186        resources = Resources.merge_resources(
187            provided_resources, default_queue_resources, default_batch_resources
188        )
189        if not resources.work_dir:
190            raise QQError(
191                "Work-dir is not set after filling in default attributes. This is a bug."
192            )
193
194        if provided_resources.work_size_per_cpu or provided_resources.work_size:
195            logger.warning(
196                "Setting work-size is not supported in this environment. Working directory has a virtually unlimited capacity."
197            )
198
199        if not any(
200            equals_normalized(resources.work_dir, dir)
201            for dir in cls.get_supported_work_dir_types()
202        ):
203            raise QQError(
204                f"Unknown working directory type specified: work-dir='{resources.work_dir}'. Supported types for {cls.env_name()} are: {' '.join(cls.get_supported_work_dir_types())}."
205            )
206
207        return resources

Transform user-provided Resources into a batch system-specific Resources instance.

This method takes the resources provided during submission and returns a new Resources object with any necessary modifications or defaults applied for the target batch system. The original provided_resources object is not modified.

Arguments:
  • queue (str): The name of the queue for which the resources are being adapted.
  • server (str | None): Name of the server on which the queue is located. If None, the queue is treated as being located on the current server.
  • provided_resources (Resources): The raw resources specified by the user.
Returns:

Resources: A new Resources instance with batch system-specific adjustments, fully constructed and validated.

Raises:
  • QQError: If any of the provided parameters are invalid or inconsistent.
@classmethod
def is_shared(cls, directory: pathlib._local.Path) -> bool:
209    @classmethod
210    def is_shared(cls, directory: Path) -> bool:
211        _ = directory
212        # always on shared storage
213        return True

Determine whether a given directory resides on a shared filesystem.

Arguments:
  • directory (Path): The directory to check.
Returns:

bool: True if the directory is on a shared filesystem, False if it is local.

@classmethod
def get_default_resubmit_hosts(cls) -> list[qq_lib.properties.resubmit_host.ResubmitHost]:
215    @classmethod
216    def get_default_resubmit_hosts(cls) -> list[ResubmitHost]:
217        return [WorkHost()]

Get the default job resubmission hosts for this batch system.

In the default implementation, resubmission from the input machine is attempted.

Returns:

list[ResubmitHost]: A list of resubmission hosts.