qq_lib.batch.slurmit4i
SlurmIT4I backend for qq: job submission, monitoring, and IT4I-specific scratch and resource handling.
This module provides qq's full integration with the Slurm batch system as configured on IT4Innovations clusters (e.g., Karolina, Barbora). It extends the generic Slurm backend with all IT4I-specific behavior:
SlurmIT4I, the batch-system backend implementing job submission, killing, resource translation, local/remote file access, scratch-directory creation, and work-directory selection logic.
1# Released under MIT License. 2# Copyright (c) 2025-2026 Ladislav Bartos and Robert Vacha Lab 3 4""" 5SlurmIT4I backend for qq: job submission, monitoring, and IT4I-specific 6scratch and resource handling. 7 8This module provides qq's full integration with the Slurm batch system as 9configured on IT4Innovations clusters (e.g., Karolina, Barbora). It extends the 10generic Slurm backend with all IT4I-specific behavior: 11 12- `SlurmIT4I`, the batch-system backend implementing job submission, killing, 13 resource translation, local/remote file access, scratch-directory creation, 14 and work-directory selection logic. 15""" 16 17from .slurm import SlurmIT4I 18 19__all__ = [ 20 "SlurmIT4I", 21]
23class SlurmIT4I(Slurm): 24 """ 25 Implementation of BatchInterface for Slurm on IT4I clusters. 26 """ 27 28 # all scratch directory types supported by SlurmIT4I 29 SUPPORTED_SCRATCHES = ["scratch"] 30 31 @classmethod 32 def env_name(cls) -> str: 33 return "SlurmIT4I" 34 35 @classmethod 36 def is_available(cls) -> bool: 37 return shutil.which("it4ifree") is not None 38 39 @classmethod 40 def create_work_dir_on_scratch(cls, job_id: str) -> Path: 41 if not (account := os.environ.get(CFG.env_vars.slurm_job_account)): 42 raise QQError(f"No account is defined for job '{job_id}'.") 43 44 user = getpass.getuser() 45 46 # we attempt to create the scratch directory multiple times in different user directory; 47 # if the user directory is already created but the user does not have permissions 48 # to write into it, we append a number to the user's name and try creating a new directory 49 last_exception = None 50 for attempt in range(CFG.slurm_it4i_options.scratch_dir_attempts): 51 user_component = ( 52 user if attempt == 0 else f"{user}{attempt + 1}" 53 ) # appended number is 2 for the second attempt 54 55 scratch = Path( 56 f"/scratch/project/{account.lower()}/{user_component}/qq-jobs/job_{job_id}" 57 ) 58 59 try: 60 scratch.mkdir(parents=True, exist_ok=True) 61 return scratch 62 except Exception as e: 63 last_exception = e 64 65 # if all attempts failed 66 raise QQError( 67 f"Could not create a working directory on scratch for job '{job_id}' after {CFG.slurm_it4i_options.scratch_dir_attempts} attempts: {last_exception}" 68 ) from last_exception 69 70 @classmethod 71 def get_supported_work_dir_types(cls) -> list[str]: 72 return cls.SUPPORTED_SCRATCHES + [ 73 "input_dir", 74 "job_dir", # same as input_dir 75 ] 76 77 @classmethod 78 def navigate_to_destination(cls, host: str, directory: Path) -> None: 79 logger.info( 80 f"Host '{host}' is not reachable in this environment. Navigating to '{directory}' on the current machine." 81 ) 82 BatchInterface._navigate_same_host(directory) 83 84 @classmethod 85 def read_remote_file(cls, host: str, file: Path) -> str: 86 # file is always on shared storage 87 _ = host 88 try: 89 return file.read_text() 90 except Exception as e: 91 raise QQError(f"Could not read file '{file}': {e}.") from e 92 93 @classmethod 94 def write_remote_file(cls, host: str, file: Path, content: str) -> None: 95 # file is always on shared storage 96 _ = host 97 try: 98 file.write_text(content) 99 except Exception as e: 100 raise QQError(f"Could not write file '{file}': {e}.") from e 101 102 @classmethod 103 def make_remote_dir(cls, host: str, directory: Path) -> None: 104 # directory is always on shared storage 105 _ = host 106 try: 107 directory.mkdir(exist_ok=True) 108 except Exception as e: 109 raise QQError(f"Could not create a directory '{directory}': {e}.") from e 110 111 @classmethod 112 def list_remote_dir(cls, host: str, directory: Path) -> list[Path]: 113 # directory is always on shared storage 114 _ = host 115 try: 116 return list(directory.iterdir()) 117 except Exception as e: 118 raise QQError(f"Could not list a directory '{directory}': {e}.") from e 119 120 @classmethod 121 def delete_remote_dir(cls, host: str, directory: Path) -> None: 122 # directory is always on shared storage 123 _ = host 124 try: 125 shutil.rmtree(directory) 126 except Exception as e: 127 raise QQError(f"Could not delete directory '{directory}': {e}.") from e 128 129 @classmethod 130 def move_remote_files( 131 cls, host: str, files: list[Path], moved_files: list[Path] 132 ) -> None: 133 if len(files) != len(moved_files): 134 raise QQError( 135 "The provided 'files' and 'moved_files' must have the same length." 136 ) 137 138 # always on shared storage 139 _ = host 140 for src, dst in zip(files, moved_files): 141 shutil.move(str(src), str(dst)) 142 143 @classmethod 144 def sync_with_exclusions( 145 cls, 146 src_dir: Path, 147 dest_dir: Path, 148 src_host: str | None, 149 dest_host: str | None, 150 exclude_files: list[Path] | None = None, 151 ) -> None: 152 # always on shared storage 153 _ = src_host 154 _ = dest_host 155 BatchInterface.sync_with_exclusions( 156 src_dir, dest_dir, None, None, exclude_files 157 ) 158 159 @classmethod 160 def sync_selected( 161 cls, 162 src_dir: Path, 163 dest_dir: Path, 164 src_host: str | None, 165 dest_host: str | None, 166 include_files: list[Path] | None = None, 167 ) -> None: 168 # always on shared storage 169 _ = src_host 170 _ = dest_host 171 BatchInterface.sync_selected(src_dir, dest_dir, None, None, include_files) 172 173 @classmethod 174 def transform_resources( 175 cls, queue: str, server: str | None, provided_resources: Resources 176 ) -> Resources: 177 # server is unused 178 _ = server 179 180 # default resources of the queue 181 default_queue_resources = SlurmQueue(queue).get_default_resources() 182 # default server or hard-coded resources 183 default_batch_resources = cls._get_default_server_resources() 184 185 # fill in default parameters 186 resources = Resources.merge_resources( 187 provided_resources, default_queue_resources, default_batch_resources 188 ) 189 if not resources.work_dir: 190 raise QQError( 191 "Work-dir is not set after filling in default attributes. This is a bug." 192 ) 193 194 if provided_resources.work_size_per_cpu or provided_resources.work_size: 195 logger.warning( 196 "Setting work-size is not supported in this environment. Working directory has a virtually unlimited capacity." 197 ) 198 199 if not any( 200 equals_normalized(resources.work_dir, dir) 201 for dir in cls.get_supported_work_dir_types() 202 ): 203 raise QQError( 204 f"Unknown working directory type specified: work-dir='{resources.work_dir}'. Supported types for {cls.env_name()} are: {' '.join(cls.get_supported_work_dir_types())}." 205 ) 206 207 return resources 208 209 @classmethod 210 def is_shared(cls, directory: Path) -> bool: 211 _ = directory 212 # always on shared storage 213 return True 214 215 @classmethod 216 def get_default_resubmit_hosts(cls) -> list[ResubmitHost]: 217 return [WorkHost()] 218 219 @classmethod 220 def _get_default_resources(cls) -> Resources: 221 return Resources( 222 nnodes=1, 223 ncpus_per_node=128, 224 mem_per_cpu="1gb", 225 work_dir="scratch", 226 walltime="1d", 227 )
Implementation of BatchInterface for Slurm on IT4I clusters.
Return the name of the batch system environment.
Returns:
str: The batch system name.
Determine whether the batch system is available on the current host.
Implementations typically verify this by checking for the presence of required commands or other environment-specific indicators.
Returns:
bool: True if the batch system is available, False otherwise.
39 @classmethod 40 def create_work_dir_on_scratch(cls, job_id: str) -> Path: 41 if not (account := os.environ.get(CFG.env_vars.slurm_job_account)): 42 raise QQError(f"No account is defined for job '{job_id}'.") 43 44 user = getpass.getuser() 45 46 # we attempt to create the scratch directory multiple times in different user directory; 47 # if the user directory is already created but the user does not have permissions 48 # to write into it, we append a number to the user's name and try creating a new directory 49 last_exception = None 50 for attempt in range(CFG.slurm_it4i_options.scratch_dir_attempts): 51 user_component = ( 52 user if attempt == 0 else f"{user}{attempt + 1}" 53 ) # appended number is 2 for the second attempt 54 55 scratch = Path( 56 f"/scratch/project/{account.lower()}/{user_component}/qq-jobs/job_{job_id}" 57 ) 58 59 try: 60 scratch.mkdir(parents=True, exist_ok=True) 61 return scratch 62 except Exception as e: 63 last_exception = e 64 65 # if all attempts failed 66 raise QQError( 67 f"Could not create a working directory on scratch for job '{job_id}' after {CFG.slurm_it4i_options.scratch_dir_attempts} attempts: {last_exception}" 68 ) from last_exception
Create the working directory on scratch for the given job.
Arguments:
- job_id (int): Unique identifier of the job.
Returns:
Path: Absolute path to the working directory on scratch.
Raises:
- QQError: If the working directory could not be created.
70 @classmethod 71 def get_supported_work_dir_types(cls) -> list[str]: 72 return cls.SUPPORTED_SCRATCHES + [ 73 "input_dir", 74 "job_dir", # same as input_dir 75 ]
Retrieve the list of supported types of working directories
(i.e., strings that can be used with the --work-dir option).
Returns:
list[str]: A list of supported types of working directories.
84 @classmethod 85 def read_remote_file(cls, host: str, file: Path) -> str: 86 # file is always on shared storage 87 _ = host 88 try: 89 return file.read_text() 90 except Exception as e: 91 raise QQError(f"Could not read file '{file}': {e}.") from e
Read the contents of a file on a remote host and return it as a string.
The default implementation uses SSH to retrieve the file contents.
This approach may be inefficient on shared storage or high-latency networks.
Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.
Subclasses should override this method to provide a more efficient implementation if possible.
Arguments:
- host (str): The hostname of the remote machine where the file resides.
- file (Path): The path to the file on the remote host.
Returns:
str: The contents of the remote file.
Raises:
- QQError: If the file cannot be read or SSH fails.
93 @classmethod 94 def write_remote_file(cls, host: str, file: Path, content: str) -> None: 95 # file is always on shared storage 96 _ = host 97 try: 98 file.write_text(content) 99 except Exception as e: 100 raise QQError(f"Could not write file '{file}': {e}.") from e
Write the given content to a file on a remote host, overwriting it if it exists.
The default implementation uses SSH to send the content to the remote file.
This approach may be inefficient on shared storage or high-latency networks.
Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.
Subclasses should override this method to provide a more efficient implementation if possible.
Arguments:
- host (str): The hostname of the remote machine where the file resides.
- file (Path): The path to the file on the remote host.
- content (str): The content to write to the remote file.
Raises:
- QQError: If the file cannot be written or SSH fails.
102 @classmethod 103 def make_remote_dir(cls, host: str, directory: Path) -> None: 104 # directory is always on shared storage 105 _ = host 106 try: 107 directory.mkdir(exist_ok=True) 108 except Exception as e: 109 raise QQError(f"Could not create a directory '{directory}': {e}.") from e
Create a directory at the specified path on a remote host.
The default implementation uses SSH to run mkdir on the remote host.
This approach may be inefficient on shared storage or high-latency networks.
Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.
Subclasses should override this method to provide a more efficient implementation if possible.
Arguments:
- host (str): The hostname of the remote machine where the directory should be created.
- directory (Path): The path of the directory to create on the remote host.
Raises:
- QQError: If the directory cannot be created but does not already exist or the SSH command fails.
111 @classmethod 112 def list_remote_dir(cls, host: str, directory: Path) -> list[Path]: 113 # directory is always on shared storage 114 _ = host 115 try: 116 return list(directory.iterdir()) 117 except Exception as e: 118 raise QQError(f"Could not list a directory '{directory}': {e}.") from e
List all files and directories (absolute paths) in the specified directory on a remote host.
The default implementation uses SSH to run ls -A on the remote host.
This approach may be inefficient on shared storage or high-latency networks.
Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.
Subclasses should override this method to provide a more efficient implementation if possible.
Arguments:
- host (str): The hostname of the remote machine where the directory resides.
- directory (Path): The remote directory to list.
Returns:
list[Path]: A list of
Pathobjects representing the entries inside the directory. Entries are relative to the givendirectory.
Raises:
- QQError: If the directory cannot be listed or the SSH command fails.
120 @classmethod 121 def delete_remote_dir(cls, host: str, directory: Path) -> None: 122 # directory is always on shared storage 123 _ = host 124 try: 125 shutil.rmtree(directory) 126 except Exception as e: 127 raise QQError(f"Could not delete directory '{directory}': {e}.") from e
Delete a directory on a remote host.
The default implementation uses SSH to run rm -r on the remote host.
This approach may be inefficient on shared storage or high-latency networks.
Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.
Subclasses should override this method to provide a more efficient implementation if possible.
Arguments:
- host (str): The hostname of the remote machine where the directory resides.
- directory (Path): The remote directory to delete.
Raises:
- QQError: If the directory cannot be deleted or the SSH command fails.
129 @classmethod 130 def move_remote_files( 131 cls, host: str, files: list[Path], moved_files: list[Path] 132 ) -> None: 133 if len(files) != len(moved_files): 134 raise QQError( 135 "The provided 'files' and 'moved_files' must have the same length." 136 ) 137 138 # always on shared storage 139 _ = host 140 for src, dst in zip(files, moved_files): 141 shutil.move(str(src), str(dst))
Move files on a remote host from their current paths to new paths.
The default implementation uses SSH to run a sequence of mv commands on the remote host.
This approach may be inefficient on shared storage or high-latency networks.
Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.
Subclasses should override this method to provide a more efficient implementation if possible.
Arguments:
- host (str): The hostname of the remote machine where the files reside.
- files (list[Path]): A list of source file paths on the remote host.
- moved_files (list[Path]): A list of destination file paths on the remote host.
Must be the same length as
files.
Raises:
- QQError: If the SSH command fails, the files cannot be moved or
the length of
filesdoes not match the length ofmoved_files.
143 @classmethod 144 def sync_with_exclusions( 145 cls, 146 src_dir: Path, 147 dest_dir: Path, 148 src_host: str | None, 149 dest_host: str | None, 150 exclude_files: list[Path] | None = None, 151 ) -> None: 152 # always on shared storage 153 _ = src_host 154 _ = dest_host 155 BatchInterface.sync_with_exclusions( 156 src_dir, dest_dir, None, None, exclude_files 157 )
Synchronize the contents of two directories using rsync, optionally across remote hosts, while excluding specified files or subdirectories.
All files and directories in src_dir are copied to dest_dir except
those listed in exclude_files. Files are never removed from the destination.
Arguments:
- src_dir (Path): Source directory to sync from.
- dest_dir (Path): Destination directory to sync to.
- src_host (str | None): Optional hostname of the source machine if remote; None if the source is local.
- dest_host (str | None): Optional hostname of the destination machine if remote; None if the destination is local.
- exclude_files (list[Path] | None): Optional list of absolute file paths to exclude from syncing.
These will be converted to paths relative to
src_dir.
Raises:
- QQError: If the rsync command fails for any reason or timeouts.
159 @classmethod 160 def sync_selected( 161 cls, 162 src_dir: Path, 163 dest_dir: Path, 164 src_host: str | None, 165 dest_host: str | None, 166 include_files: list[Path] | None = None, 167 ) -> None: 168 # always on shared storage 169 _ = src_host 170 _ = dest_host 171 BatchInterface.sync_selected(src_dir, dest_dir, None, None, include_files)
Synchronize only the explicitly selected files and directories from the source to the destination, optionally across remote hosts.
Only files listed in include_files are copied from src_dir to dest_dir.
Files not listed are ignored. Files are never removed from the destination.
Arguments:
- src_dir (Path): Source directory to sync from.
- dest_dir (Path): Destination directory to sync to.
- src_host (str | None): Optional hostname of the source machine if remote; None if the source is local.
- dest_host (str | None): Optional hostname of the destination machine if remote; None if the destination is local.
- include_files (list[Path] | None): Optional list of absolute file paths to include in syncing.
These paths are converted relative to
src_dir. This argument is optional only for consistency with sync_with_exclusions.
Raises:
- QQError: If the rsync command fails or times out.
173 @classmethod 174 def transform_resources( 175 cls, queue: str, server: str | None, provided_resources: Resources 176 ) -> Resources: 177 # server is unused 178 _ = server 179 180 # default resources of the queue 181 default_queue_resources = SlurmQueue(queue).get_default_resources() 182 # default server or hard-coded resources 183 default_batch_resources = cls._get_default_server_resources() 184 185 # fill in default parameters 186 resources = Resources.merge_resources( 187 provided_resources, default_queue_resources, default_batch_resources 188 ) 189 if not resources.work_dir: 190 raise QQError( 191 "Work-dir is not set after filling in default attributes. This is a bug." 192 ) 193 194 if provided_resources.work_size_per_cpu or provided_resources.work_size: 195 logger.warning( 196 "Setting work-size is not supported in this environment. Working directory has a virtually unlimited capacity." 197 ) 198 199 if not any( 200 equals_normalized(resources.work_dir, dir) 201 for dir in cls.get_supported_work_dir_types() 202 ): 203 raise QQError( 204 f"Unknown working directory type specified: work-dir='{resources.work_dir}'. Supported types for {cls.env_name()} are: {' '.join(cls.get_supported_work_dir_types())}." 205 ) 206 207 return resources
Transform user-provided Resources into a batch system-specific Resources instance.
This method takes the resources provided during submission and returns a new
Resources object with any necessary modifications or defaults applied for
the target batch system. The original provided_resources object is not modified.
Arguments:
- queue (str): The name of the queue for which the resources are being adapted.
- server (str | None): Name of the server on which the queue is located.
If
None, the queue is treated as being located on the current server. - provided_resources (Resources): The raw resources specified by the user.
Returns:
Resources: A new Resources instance with batch system-specific adjustments, fully constructed and validated.
Raises:
- QQError: If any of the provided parameters are invalid or inconsistent.
215 @classmethod 216 def get_default_resubmit_hosts(cls) -> list[ResubmitHost]: 217 return [WorkHost()]
Get the default job resubmission hosts for this batch system.
In the default implementation, resubmission from the input machine is attempted.
Returns:
list[ResubmitHost]: A list of resubmission hosts.