qq_lib.batch.slurmlumi

SlurmLumi backend for qq: job submission, monitoring, and LUMI-specific scratch/flash storage handling.

This module integrates qq with the Slurm environment deployed on the LUMI supercomputer. It extends the IT4I Slurm backend with all LUMI-specific behavior, most importantly the dual-tier temporary storage model and queue-resource conventions.

  • SlurmLumi, the batch-system backend implementing job submission, dependency handling, resource translation, scratch/flash directory creation, and file/directory operations on LUMI's fully shared storage.
 1# Released under MIT License.
 2# Copyright (c) 2025-2026 Ladislav Bartos and Robert Vacha Lab
 3
 4"""
 5SlurmLumi backend for qq: job submission, monitoring, and LUMI-specific
 6scratch/flash storage handling.
 7
 8This module integrates qq with the Slurm environment deployed on the LUMI
 9supercomputer. It extends the IT4I Slurm backend with all LUMI-specific
10behavior, most importantly the dual-tier temporary storage model and
11queue-resource conventions.
12
13- `SlurmLumi`, the batch-system backend implementing job submission,
14  dependency handling, resource translation, scratch/flash directory creation,
15  and file/directory operations on LUMI's fully shared storage.
16"""
17
18from .slurm import SlurmLumi
19
20__all__ = [
21    "SlurmLumi",
22]
class SlurmLumi(qq_lib.batch.interface.interface.BatchInterface[qq_lib.batch.slurm.job.SlurmJob, qq_lib.batch.slurm.queue.SlurmQueue, qq_lib.batch.slurm.node.SlurmNode]):
 23class SlurmLumi(SlurmIT4I):
 24    """
 25    Implementation of BatchInterface for Slurm on the LUMI supercomputer.
 26    """
 27
 28    # all scratch directory types supported by SlurmLumi
 29    SUPPORTED_SCRATCHES = ["scratch", "flash"]
 30
 31    @classmethod
 32    def env_name(cls) -> str:
 33        return "SlurmLumi"
 34
 35    @classmethod
 36    def is_available(cls) -> bool:
 37        return shutil.which("lumi-allocations") is not None
 38
 39    @classmethod
 40    def job_submit(
 41        cls,
 42        res: Resources,
 43        queue: str,
 44        script: Path,
 45        job_name: str,
 46        depend: list[Depend],
 47        env_vars: dict[str, str],
 48        account: str | None = None,
 49        server: str | None = None,
 50        remote_host: str | None = None,
 51    ) -> str:
 52        # set the 'lumi_scratch_type' env var to be able to decide in get_scratch_dir
 53        # whether to create a scratch directory on /scratch or on /flash
 54        if res.uses_scratch():
 55            assert res.work_dir is not None
 56            env_vars[CFG.env_vars.lumi_scratch_type] = res.work_dir
 57
 58        return super().job_submit(
 59            res, queue, script, job_name, depend, env_vars, account, server, remote_host
 60        )
 61
 62    @classmethod
 63    def create_work_dir_on_scratch(cls, job_id: str) -> Path:
 64        if not (account := os.environ.get(CFG.env_vars.slurm_job_account)):
 65            raise QQError(f"No account is defined for job '{job_id}'.")
 66
 67        # get the storage type (scratch or flash)
 68        if not (storage_type := os.environ.get(CFG.env_vars.lumi_scratch_type)):
 69            raise QQError(
 70                f"Environment variable '{CFG.env_vars.lumi_scratch_type}' is not defined. This is a bug!"
 71            )
 72
 73        user = getpass.getuser()
 74
 75        # we attempt to create the scratch directory multiple times in different user directory;
 76        # if the user directory is already created but the user does not have permissions
 77        # to write into it, we append a number to the user's name and try creating a new directory
 78        last_exception = None
 79        for attempt in range(CFG.slurm_lumi_options.scratch_dir_attempts):
 80            user_component = (
 81                user if attempt == 0 else f"{user}{attempt + 1}"
 82            )  # appended number is 2 for the second attempt
 83
 84            scratch = Path(
 85                f"/{storage_type}/{account.lower()}/{user_component}/qq-jobs/job_{job_id}"
 86            )
 87            logger.debug(
 88                f"Creating directory '{str(scratch)}' on '{storage_type}' storage."
 89            )
 90
 91            try:
 92                scratch.mkdir(parents=True, exist_ok=True)
 93                return scratch
 94            except Exception as e:
 95                last_exception = e
 96
 97        # if all attempts failed
 98        raise QQError(
 99            f"Could not create a working directory on {storage_type} for job '{job_id}' after {CFG.slurm_lumi_options.scratch_dir_attempts} attempts: {last_exception}"
100        ) from last_exception
101
102    @classmethod
103    def get_nodes(cls, server: str | None = None) -> list[SlurmLumiNode]:  # ty: ignore[invalid-method-override]
104        nodes = Slurm.get_nodes(server)
105        for node in nodes:
106            node.__class__ = SlurmLumiNode
107
108        return cast("list[SlurmLumiNode]", nodes)
109
110    @classmethod
111    def get_supported_work_dir_types(cls) -> list[str]:
112        return cls.SUPPORTED_SCRATCHES + [
113            "input_dir",
114            "job_dir",  # same as input_dir
115        ]
116
117    @classmethod
118    def _get_default_resources(cls) -> Resources:
119        return Resources(
120            nnodes=1,
121            ncpus_per_node=128,
122            mem_per_cpu="500mb",
123            work_dir="scratch",
124            walltime="1d",
125        )

Implementation of BatchInterface for Slurm on the LUMI supercomputer.

SUPPORTED_SCRATCHES = ['scratch', 'flash']
@classmethod
def env_name(cls) -> str:
31    @classmethod
32    def env_name(cls) -> str:
33        return "SlurmLumi"

Return the name of the batch system environment.

Returns:

str: The batch system name.

@classmethod
def is_available(cls) -> bool:
35    @classmethod
36    def is_available(cls) -> bool:
37        return shutil.which("lumi-allocations") is not None

Determine whether the batch system is available on the current host.

Implementations typically verify this by checking for the presence of required commands or other environment-specific indicators.

Returns:

bool: True if the batch system is available, False otherwise.

@classmethod
def job_submit( cls, res: qq_lib.properties.resources.Resources, queue: str, script: pathlib._local.Path, job_name: str, depend: list[qq_lib.properties.depend.Depend], env_vars: dict[str, str], account: str | None = None, server: str | None = None, remote_host: str | None = None) -> str:
39    @classmethod
40    def job_submit(
41        cls,
42        res: Resources,
43        queue: str,
44        script: Path,
45        job_name: str,
46        depend: list[Depend],
47        env_vars: dict[str, str],
48        account: str | None = None,
49        server: str | None = None,
50        remote_host: str | None = None,
51    ) -> str:
52        # set the 'lumi_scratch_type' env var to be able to decide in get_scratch_dir
53        # whether to create a scratch directory on /scratch or on /flash
54        if res.uses_scratch():
55            assert res.work_dir is not None
56            env_vars[CFG.env_vars.lumi_scratch_type] = res.work_dir
57
58        return super().job_submit(
59            res, queue, script, job_name, depend, env_vars, account, server, remote_host
60        )

Submit a job to the batch system.

Can also perform additional validation of the job's resources.

This method is NOT guaranteed to be thread-safe.

Arguments:
  • res (Resources): Resources required for the job.
  • queue (str): Target queue for the job submission.
  • script (Path): Path to the script to execute.
  • job_name (str): Name of the job to use.
  • depend (list[Depend]): List of job dependencies.
  • env_vars (dict[str, str]): Dictionary of environment variables to propagate to the job.
  • account (str | None): Optional account name to use for the job.
  • server (str | None): Optional name of the server to submit the job to.
  • remote_host (str | None): Optional name of the machine to submit the job from.
Returns:

str: Unique ID of the submitted job.

Raises:
  • QQError: If the job submission fails.
@classmethod
def create_work_dir_on_scratch(cls, job_id: str) -> pathlib._local.Path:
 62    @classmethod
 63    def create_work_dir_on_scratch(cls, job_id: str) -> Path:
 64        if not (account := os.environ.get(CFG.env_vars.slurm_job_account)):
 65            raise QQError(f"No account is defined for job '{job_id}'.")
 66
 67        # get the storage type (scratch or flash)
 68        if not (storage_type := os.environ.get(CFG.env_vars.lumi_scratch_type)):
 69            raise QQError(
 70                f"Environment variable '{CFG.env_vars.lumi_scratch_type}' is not defined. This is a bug!"
 71            )
 72
 73        user = getpass.getuser()
 74
 75        # we attempt to create the scratch directory multiple times in different user directory;
 76        # if the user directory is already created but the user does not have permissions
 77        # to write into it, we append a number to the user's name and try creating a new directory
 78        last_exception = None
 79        for attempt in range(CFG.slurm_lumi_options.scratch_dir_attempts):
 80            user_component = (
 81                user if attempt == 0 else f"{user}{attempt + 1}"
 82            )  # appended number is 2 for the second attempt
 83
 84            scratch = Path(
 85                f"/{storage_type}/{account.lower()}/{user_component}/qq-jobs/job_{job_id}"
 86            )
 87            logger.debug(
 88                f"Creating directory '{str(scratch)}' on '{storage_type}' storage."
 89            )
 90
 91            try:
 92                scratch.mkdir(parents=True, exist_ok=True)
 93                return scratch
 94            except Exception as e:
 95                last_exception = e
 96
 97        # if all attempts failed
 98        raise QQError(
 99            f"Could not create a working directory on {storage_type} for job '{job_id}' after {CFG.slurm_lumi_options.scratch_dir_attempts} attempts: {last_exception}"
100        ) from last_exception

Create the working directory on scratch for the given job.

Arguments:
  • job_id (int): Unique identifier of the job.
Returns:

Path: Absolute path to the working directory on scratch.

Raises:
  • QQError: If the working directory could not be created.
@classmethod
def get_nodes( cls, server: str | None = None) -> list[qq_lib.batch.slurmlumi.node.SlurmLumiNode]:
102    @classmethod
103    def get_nodes(cls, server: str | None = None) -> list[SlurmLumiNode]:  # ty: ignore[invalid-method-override]
104        nodes = Slurm.get_nodes(server)
105        for node in nodes:
106            node.__class__ = SlurmLumiNode
107
108        return cast("list[SlurmLumiNode]", nodes)

Retrieve all nodes managed by the batch system on the specified or default batch server.

Arguments:
  • server (str | None): Optional name of the batch server to get nodes from.
Returns:

list[BatchNodeInterface]: A list of node objects existing in the batch system.

@classmethod
def get_supported_work_dir_types(cls) -> list[str]:
110    @classmethod
111    def get_supported_work_dir_types(cls) -> list[str]:
112        return cls.SUPPORTED_SCRATCHES + [
113            "input_dir",
114            "job_dir",  # same as input_dir
115        ]

Retrieve the list of supported types of working directories (i.e., strings that can be used with the --work-dir option).

Returns:

list[str]: A list of supported types of working directories.