qq_lib.batch.slurmlumi
SlurmLumi backend for qq: job submission, monitoring, and LUMI-specific scratch/flash storage handling.
This module integrates qq with the Slurm environment deployed on the LUMI supercomputer. It extends the IT4I Slurm backend with all LUMI-specific behavior, most importantly the dual-tier temporary storage model and queue-resource conventions.
SlurmLumi, the batch-system backend implementing job submission, dependency handling, resource translation, scratch/flash directory creation, and file/directory operations on LUMI's fully shared storage.
1# Released under MIT License. 2# Copyright (c) 2025-2026 Ladislav Bartos and Robert Vacha Lab 3 4""" 5SlurmLumi backend for qq: job submission, monitoring, and LUMI-specific 6scratch/flash storage handling. 7 8This module integrates qq with the Slurm environment deployed on the LUMI 9supercomputer. It extends the IT4I Slurm backend with all LUMI-specific 10behavior, most importantly the dual-tier temporary storage model and 11queue-resource conventions. 12 13- `SlurmLumi`, the batch-system backend implementing job submission, 14 dependency handling, resource translation, scratch/flash directory creation, 15 and file/directory operations on LUMI's fully shared storage. 16""" 17 18from .slurm import SlurmLumi 19 20__all__ = [ 21 "SlurmLumi", 22]
23class SlurmLumi(SlurmIT4I): 24 """ 25 Implementation of BatchInterface for Slurm on the LUMI supercomputer. 26 """ 27 28 # all scratch directory types supported by SlurmLumi 29 SUPPORTED_SCRATCHES = ["scratch", "flash"] 30 31 @classmethod 32 def env_name(cls) -> str: 33 return "SlurmLumi" 34 35 @classmethod 36 def is_available(cls) -> bool: 37 return shutil.which("lumi-allocations") is not None 38 39 @classmethod 40 def job_submit( 41 cls, 42 res: Resources, 43 queue: str, 44 script: Path, 45 job_name: str, 46 depend: list[Depend], 47 env_vars: dict[str, str], 48 account: str | None = None, 49 server: str | None = None, 50 remote_host: str | None = None, 51 ) -> str: 52 # set the 'lumi_scratch_type' env var to be able to decide in get_scratch_dir 53 # whether to create a scratch directory on /scratch or on /flash 54 if res.uses_scratch(): 55 assert res.work_dir is not None 56 env_vars[CFG.env_vars.lumi_scratch_type] = res.work_dir 57 58 return super().job_submit( 59 res, queue, script, job_name, depend, env_vars, account, server, remote_host 60 ) 61 62 @classmethod 63 def create_work_dir_on_scratch(cls, job_id: str) -> Path: 64 if not (account := os.environ.get(CFG.env_vars.slurm_job_account)): 65 raise QQError(f"No account is defined for job '{job_id}'.") 66 67 # get the storage type (scratch or flash) 68 if not (storage_type := os.environ.get(CFG.env_vars.lumi_scratch_type)): 69 raise QQError( 70 f"Environment variable '{CFG.env_vars.lumi_scratch_type}' is not defined. This is a bug!" 71 ) 72 73 user = getpass.getuser() 74 75 # we attempt to create the scratch directory multiple times in different user directory; 76 # if the user directory is already created but the user does not have permissions 77 # to write into it, we append a number to the user's name and try creating a new directory 78 last_exception = None 79 for attempt in range(CFG.slurm_lumi_options.scratch_dir_attempts): 80 user_component = ( 81 user if attempt == 0 else f"{user}{attempt + 1}" 82 ) # appended number is 2 for the second attempt 83 84 scratch = Path( 85 f"/{storage_type}/{account.lower()}/{user_component}/qq-jobs/job_{job_id}" 86 ) 87 logger.debug( 88 f"Creating directory '{str(scratch)}' on '{storage_type}' storage." 89 ) 90 91 try: 92 scratch.mkdir(parents=True, exist_ok=True) 93 return scratch 94 except Exception as e: 95 last_exception = e 96 97 # if all attempts failed 98 raise QQError( 99 f"Could not create a working directory on {storage_type} for job '{job_id}' after {CFG.slurm_lumi_options.scratch_dir_attempts} attempts: {last_exception}" 100 ) from last_exception 101 102 @classmethod 103 def get_nodes(cls, server: str | None = None) -> list[SlurmLumiNode]: # ty: ignore[invalid-method-override] 104 nodes = Slurm.get_nodes(server) 105 for node in nodes: 106 node.__class__ = SlurmLumiNode 107 108 return cast("list[SlurmLumiNode]", nodes) 109 110 @classmethod 111 def get_supported_work_dir_types(cls) -> list[str]: 112 return cls.SUPPORTED_SCRATCHES + [ 113 "input_dir", 114 "job_dir", # same as input_dir 115 ] 116 117 @classmethod 118 def _get_default_resources(cls) -> Resources: 119 return Resources( 120 nnodes=1, 121 ncpus_per_node=128, 122 mem_per_cpu="500mb", 123 work_dir="scratch", 124 walltime="1d", 125 )
Implementation of BatchInterface for Slurm on the LUMI supercomputer.
Return the name of the batch system environment.
Returns:
str: The batch system name.
35 @classmethod 36 def is_available(cls) -> bool: 37 return shutil.which("lumi-allocations") is not None
Determine whether the batch system is available on the current host.
Implementations typically verify this by checking for the presence of required commands or other environment-specific indicators.
Returns:
bool: True if the batch system is available, False otherwise.
39 @classmethod 40 def job_submit( 41 cls, 42 res: Resources, 43 queue: str, 44 script: Path, 45 job_name: str, 46 depend: list[Depend], 47 env_vars: dict[str, str], 48 account: str | None = None, 49 server: str | None = None, 50 remote_host: str | None = None, 51 ) -> str: 52 # set the 'lumi_scratch_type' env var to be able to decide in get_scratch_dir 53 # whether to create a scratch directory on /scratch or on /flash 54 if res.uses_scratch(): 55 assert res.work_dir is not None 56 env_vars[CFG.env_vars.lumi_scratch_type] = res.work_dir 57 58 return super().job_submit( 59 res, queue, script, job_name, depend, env_vars, account, server, remote_host 60 )
Submit a job to the batch system.
Can also perform additional validation of the job's resources.
This method is NOT guaranteed to be thread-safe.
Arguments:
- res (Resources): Resources required for the job.
- queue (str): Target queue for the job submission.
- script (Path): Path to the script to execute.
- job_name (str): Name of the job to use.
- depend (list[Depend]): List of job dependencies.
- env_vars (dict[str, str]): Dictionary of environment variables to propagate to the job.
- account (str | None): Optional account name to use for the job.
- server (str | None): Optional name of the server to submit the job to.
- remote_host (str | None): Optional name of the machine to submit the job from.
Returns:
str: Unique ID of the submitted job.
Raises:
- QQError: If the job submission fails.
62 @classmethod 63 def create_work_dir_on_scratch(cls, job_id: str) -> Path: 64 if not (account := os.environ.get(CFG.env_vars.slurm_job_account)): 65 raise QQError(f"No account is defined for job '{job_id}'.") 66 67 # get the storage type (scratch or flash) 68 if not (storage_type := os.environ.get(CFG.env_vars.lumi_scratch_type)): 69 raise QQError( 70 f"Environment variable '{CFG.env_vars.lumi_scratch_type}' is not defined. This is a bug!" 71 ) 72 73 user = getpass.getuser() 74 75 # we attempt to create the scratch directory multiple times in different user directory; 76 # if the user directory is already created but the user does not have permissions 77 # to write into it, we append a number to the user's name and try creating a new directory 78 last_exception = None 79 for attempt in range(CFG.slurm_lumi_options.scratch_dir_attempts): 80 user_component = ( 81 user if attempt == 0 else f"{user}{attempt + 1}" 82 ) # appended number is 2 for the second attempt 83 84 scratch = Path( 85 f"/{storage_type}/{account.lower()}/{user_component}/qq-jobs/job_{job_id}" 86 ) 87 logger.debug( 88 f"Creating directory '{str(scratch)}' on '{storage_type}' storage." 89 ) 90 91 try: 92 scratch.mkdir(parents=True, exist_ok=True) 93 return scratch 94 except Exception as e: 95 last_exception = e 96 97 # if all attempts failed 98 raise QQError( 99 f"Could not create a working directory on {storage_type} for job '{job_id}' after {CFG.slurm_lumi_options.scratch_dir_attempts} attempts: {last_exception}" 100 ) from last_exception
Create the working directory on scratch for the given job.
Arguments:
- job_id (int): Unique identifier of the job.
Returns:
Path: Absolute path to the working directory on scratch.
Raises:
- QQError: If the working directory could not be created.
102 @classmethod 103 def get_nodes(cls, server: str | None = None) -> list[SlurmLumiNode]: # ty: ignore[invalid-method-override] 104 nodes = Slurm.get_nodes(server) 105 for node in nodes: 106 node.__class__ = SlurmLumiNode 107 108 return cast("list[SlurmLumiNode]", nodes)
Retrieve all nodes managed by the batch system on the specified or default batch server.
Arguments:
- server (str | None): Optional name of the batch server to get nodes from.
Returns:
list[BatchNodeInterface]: A list of node objects existing in the batch system.
110 @classmethod 111 def get_supported_work_dir_types(cls) -> list[str]: 112 return cls.SUPPORTED_SCRATCHES + [ 113 "input_dir", 114 "job_dir", # same as input_dir 115 ]
Retrieve the list of supported types of working directories
(i.e., strings that can be used with the --work-dir option).
Returns:
list[str]: A list of supported types of working directories.