qq_lib.batch.slurm

Slurm backend for qq: job submission, monitoring, and cluster-resource access.

This module implements qq's full integration with the Slurm batch system.

It provides:

  • The Slurm batch-system backend, implementing job submission, killing, remote file access and synchronization, resource translation, dependency formatting, and all Slurm-specific environment propagation.

  • SlurmJob, SlurmNode, and SlurmQueue, concrete implementations of qq's job/node/queue interfaces, responsible for parsing Slurm command output and exposing normalized metadata to the rest of qq.

 1# Released under MIT License.
 2# Copyright (c) 2025-2026 Ladislav Bartos and Robert Vacha Lab
 3
 4"""
 5Slurm backend for qq: job submission, monitoring, and cluster-resource access.
 6
 7This module implements qq's full integration with the Slurm batch system.
 8
 9It provides:
10
11- The `Slurm` batch-system backend, implementing job submission, killing,
12  remote file access and synchronization, resource translation, dependency formatting,
13  and all Slurm-specific environment propagation.
14
15- `SlurmJob`, `SlurmNode`, and `SlurmQueue`, concrete implementations of qq's
16  job/node/queue interfaces, responsible for parsing Slurm command output and exposing
17  normalized metadata to the rest of qq.
18"""
19
20from .job import SlurmJob
21from .node import SlurmNode
22from .queue import SlurmQueue
23from .slurm import Slurm
24
25__all__ = [
26    "SlurmJob",
27    "SlurmNode",
28    "SlurmQueue",
29    "Slurm",
30]
class SlurmJob(qq_lib.batch.interface.job.BatchJobInterface):
 30class SlurmJob(BatchJobInterface):
 31    """
 32    Implementation of BatchJobInterface for Slurm.
 33    Stores metadata for a single Slurm job.
 34    """
 35
 36    # converts from Slurm state names to qq BatchStates
 37    _STATE_CONVERTER: dict[str, BatchState] = {
 38        "BOOT_FAIL": BatchState.FAILED,
 39        "CANCELLED": BatchState.FAILED,
 40        "COMPLETED": BatchState.FINISHED,
 41        "DEADLINE": BatchState.FAILED,
 42        "FAILED": BatchState.FAILED,
 43        "NODE_FAIL": BatchState.FAILED,
 44        "OUT_OF_MEMORY": BatchState.FAILED,
 45        "PENDING": BatchState.QUEUED,
 46        "PREEMPTED": BatchState.SUSPENDED,
 47        "RUNNING": BatchState.RUNNING,
 48        "SUSPENDED": BatchState.SUSPENDED,
 49        "TIMEOUT": BatchState.FAILED,
 50    }
 51
 52    def __init__(self, job_id: str):
 53        """Query the batch system for information about the job with the specified ID."""
 54        self._job_id = job_id
 55        self._info: dict[str, str] = {}
 56
 57        self.update()
 58
 59    def is_empty(self) -> bool:
 60        return not self._info
 61
 62    def get_id(self) -> str:
 63        return self._job_id
 64
 65    def get_account(self) -> str | None:
 66        return self._info.get("Account")
 67
 68    def update(self) -> None:
 69        # first try `scontrol`
 70        command = f"scontrol show job {self._job_id} -o"
 71        logger.debug(command)
 72
 73        result = subprocess.run(
 74            ["bash"],
 75            input=command,
 76            text=True,
 77            check=False,
 78            capture_output=True,
 79            errors="replace",
 80        )
 81
 82        if result.returncode != 0:
 83            # if scontrol fails, try sacct
 84            logger.debug(
 85                f"scontrol failed for job '{self._job_id}' ({result.stderr.strip()}); trying sacct"
 86            )
 87        else:
 88            self._info: dict[str, str] = parse_slurm_dump_to_dictionary(result.stdout)
 89            return
 90
 91        # if `scontrol` fails, try `sacct`
 92        command = f"sacct --allocations --noheader --parsable2 -j {self._job_id} --format={SACCT_FIELDS} "
 93        logger.debug(command)
 94
 95        result = subprocess.run(
 96            ["bash"],
 97            input=command,
 98            text=True,
 99            check=False,
100            capture_output=True,
101            errors="replace",
102        )
103
104        if result.returncode != 0:
105            # if sacct fails, information is empty
106            logger.debug(
107                f"both scontrol and sacct failed: no information about job '{self._job_id}' is available: {result.stderr.strip()}"
108            )
109            self._info: dict[str, str] = {}
110        else:
111            job: SlurmJob = SlurmJob.from_sacct_string(result.stdout.strip())
112            self._info: dict[str, str] = job._info
113
114    def get_state(self) -> BatchState:
115        if not (raw_state := self._info.get("JobState")):
116            return BatchState.UNKNOWN
117
118        converted_state = SlurmJob._STATE_CONVERTER.get(raw_state) or BatchState.UNKNOWN
119
120        # if the job is queued due to depending on another job, it should be considered "held"
121        if (
122            converted_state == BatchState.QUEUED
123            and (comment := self.get_comment())
124            and "Dependency" in comment
125        ):
126            return BatchState.HELD
127
128        return converted_state
129
130    def get_comment(self) -> str | None:
131        if (reason := self._info.get("Reason")) and reason != "None":
132            return f"Reason: {reason}"
133
134        return None
135
136    def get_estimated(self) -> tuple[datetime, str] | None:
137        # use "StartTime" as an estimate
138        if not (time := self.get_start_time()) or time == "None":
139            return None
140
141        if not (node_list := self._info.get("SchedNodeList")) or "None" in node_list:
142            return None
143
144        return (time, node_list)
145
146    def get_main_node(self) -> str | None:
147        if (main_node := self._info.get("BatchHost")) and "None" not in main_node:
148            return main_node
149
150        # if BatchHost does not exist, use the first node from NodeList
151        if nodes := self.get_nodes():
152            return nodes[0]
153
154        return None
155
156    def get_nodes(self) -> list[str] | None:
157        if (node_list := self._info.get("NodeList")) and "None" not in node_list:
158            return SlurmJob._expand_node_list(node_list)
159
160        return None
161
162    def get_short_nodes(self) -> list[str] | None:
163        # treat all nodes a single node, without expanding
164        # this assumes that get_short_nodes is only used in qq jobs and qq stat
165        if (node_list := self._info.get("NodeList")) and "None" not in node_list:
166            return [node_list]
167
168        return None
169
170    def get_name(self) -> str | None:
171        if not (name := self._info.get("JobName")):
172            logger.debug(f"Could not get job name for '{self._job_id}'.")
173            return None
174
175        return name
176
177    def get_n_cpus(self) -> int | None:
178        min_cpus = (
179            self._get_int_property("MinCPUsNode", "the minimum number of CPUs per node")
180            or 0
181        ) * (self.get_n_nodes() or 0)
182
183        if not (cpus := self._get_int_property("NumCPUs", "the number of CPUs")):
184            return None
185
186        return max(min_cpus, cpus)
187
188    def get_n_gpus(self) -> int | None:
189        tres = self._get_tres()
190        for item in tres.split(","):
191            if item.startswith("gpu") or item.startswith("gres/gpu"):
192                try:
193                    return int(item.split("=")[1])
194                except ValueError as e:
195                    logger.warning(
196                        f"Could not parse the number of GPUs from '{item}': {e}."
197                    )
198                    return None
199
200        return None
201
202    def get_n_nodes(self) -> int | None:
203        return self._get_int_property("NumNodes", "the number of nodes")
204
205    def get_mem(self) -> Size | None:
206        tres = self._get_tres()
207        for item in tres.split(","):
208            if item.startswith("mem="):
209                try:
210                    return Size.from_string(item.split("=", 1)[1])
211                except Exception as e:
212                    logger.warning(f"Could not parse memory for '{self._job_id}': {e}.")
213                    return None
214
215        logger.debug(f"Memory not available for '{self._job_id}'.")
216        return None
217
218    def get_start_time(self) -> datetime | None:
219        return self._get_datetime_property("StartTime", "the job start time")
220
221    def get_submission_time(self) -> datetime | None:
222        return self._get_datetime_property("SubmitTime", "the job submission time")
223
224    def get_completion_time(self) -> datetime | None:
225        # the property EndTime is available for running jobs as well (estimated completion time)
226        # but that should not matter for our purposes
227        return self._get_datetime_property("EndTime", "the job completion time")
228
229    def get_modification_time(self) -> datetime | None:
230        # assuming this is only used for completed jobs
231        return self.get_completion_time() or self.get_submission_time()
232
233    def get_user(self) -> str | None:
234        if not (user := self._info.get("UserId")):
235            logger.debug(f"Could not get user for '{self._job_id}'.")
236            return None
237
238        return user.split("(")[0]
239
240    def get_walltime(self) -> timedelta | None:
241        if not (walltime := self._info.get("TimeLimit")):
242            logger.debug(f"Could not get walltime for '{self._job_id}'.")
243            return None
244
245        try:
246            return dhhmmss_to_duration(walltime)
247        except QQError as e:
248            logger.warning(f"Could not parse walltime for '{self._job_id}': {e}.")
249            return None
250
251    def get_queue(self) -> str | None:
252        if not (queue := self._info.get("Partition")):
253            logger.debug(f"Could not get queue for '{self._job_id}'.")
254            return None
255
256        return queue
257
258    def get_util_cpu(self) -> int | None:
259        # not available in Slurm
260        return None
261
262    def get_util_mem(self) -> int | None:
263        # not available in Slurm
264        return None
265
266    def get_exit_code(self) -> int | None:
267        if not (raw_exit := self._info.get("ExitCode")):
268            return None
269
270        try:
271            # Slurm reports two exit codes; the first one is exit code of the script
272            # the second one is a signal
273            # we return the first non-zero exit code or 0 if both exit codes are 0
274            code, signal = map(int, raw_exit.split(":"))
275            return code or signal
276        except Exception as e:
277            logger.debug(f"Could not parse exit codes '{raw_exit}': {e}.")
278            return None
279
280    def get_input_machine(self) -> str | None:
281        # not available for Slurm
282        return None
283
284    def get_input_dir(self) -> Path | None:
285        if not (raw_dir := self._info.get("WorkDir")):
286            logger.debug(f"Could not obtain input directory for '{self._job_id}'.")
287            return None
288
289        return logical_resolve(Path(raw_dir))
290
291    def get_info_file(self) -> Path | None:
292        if not (input_dir := self.get_input_dir()) or not (name := self.get_name()):
293            return None
294
295        info_file = (input_dir / name).with_suffix(CFG.suffixes.qq_info)
296
297        # we need to check whether the info file actually exists
298        # (or rather if it is available to the user)
299        try:
300            if not info_file.is_file():
301                return None
302        except PermissionError:
303            return None
304
305        return info_file
306
307    def to_yaml(self) -> str:
308        return yaml.dump(
309            self._info, default_flow_style=False, sort_keys=False, Dumper=Dumper
310        )
311
312    def get_steps(self) -> Sequence[Self]:
313        command = f"sacct -j {self._job_id} --parsable2 --format={SACCT_STEP_FIELDS}"
314        logger.debug(command)
315
316        result = subprocess.run(
317            ["bash"],
318            input=command,
319            text=True,
320            check=False,
321            capture_output=True,
322            errors="replace",
323        )
324
325        if result.returncode != 0:
326            logger.debug(f"Could not get steps for a job '{self._job_id}'.")
327            return []
328
329        jobs = []
330        for sacct_string in result.stdout.split("\n"):
331            if sacct_string.strip() == "":
332                continue
333
334            job = SlurmJob._step_from_sacct_string(sacct_string)
335            # only consider job steps with numeric indices
336            if (step_id := job.get_step_id()) and step_id.isnumeric():
337                jobs.append(job)
338
339        return jobs
340
341    def get_step_id(self) -> str | None:
342        try:
343            (_, step) = self._job_id.split(".", maxsplit=1)
344            return step
345        except ValueError:
346            return None
347
348    def is_array_job(self) -> bool:
349        return False
350
351    @classmethod
352    def from_dict(cls, job_id: str, info: dict[str, str]) -> Self:
353        """
354        Construct a new instance of SlurmJob from a job ID and a dictionary of job information.
355
356        This method bypasses the standard initializer and directly sets the `_job_id` and `_info`
357        attributes of the new instance.
358
359        Args:
360            job_id (str): The unique identifier of the job.
361            info (dict[str, str]): A dictionary containing Slurm job metadata as key-value pairs.
362
363        Returns:
364            Self: A new instance of SlurmJob.
365
366        Note:
367            This method does not perform any validation or processing of the provided dictionary.
368        """
369        job_info = cls.__new__(cls)
370        job_info._job_id = job_id
371        job_info._info = info
372
373        return job_info
374
375    @classmethod
376    def from_sacct_string(cls, string: str) -> Self:
377        """
378        Construct a new instance of SlurmJob using a string from sacct.
379
380        Args:
381            string (str): String describing the job properties obtained using sacct.
382
383        Returns:
384            Self: A new instance of SlurmJob.
385        """
386        fields: list[str] = [
387            "JobId",
388            "Account",
389            "JobState",
390            "UserId",
391            "JobName",
392            "Partition",
393            "WorkDir",
394            "AllocCPUs",
395            "ReqCPUs",
396            "AllocTRES",
397            "ReqTRES",
398            "AllocNodes",
399            "ReqNodes",
400            "SubmitTime",
401            "StartTime",
402            "EndTime",
403            "TimeLimit",
404            "NodeList",
405            "Reason",
406            "ExitCode",
407        ]
408
409        split = string.split("|")
410        if len(fields) != len(split):
411            raise QQError(
412                f"Number of items in a sacct string '{string}' ('{len(split)}') does not match the expected number of items ('{len(fields)}'). This is a bug, please report it!"
413            )
414
415        info: dict[str, str] = dict(zip(fields, split))
416
417        # only take the first word from JobState
418        # other words may contain useless additional information
419        info["JobState"] = info["JobState"].split()[0]
420
421        SlurmJob._assign_if_allocated(info, "AllocCPUs", "ReqCPUs", "NumCPUs")
422        SlurmJob._assign_if_allocated(info, "AllocNodes", "ReqNodes", "NumNodes")
423
424        return cls.from_dict(info["JobId"], info)
425
426    @classmethod
427    def _step_from_sacct_string(cls, string: str) -> Self:
428        """
429        Construct a new instance of SlurmJob step using a string from sacct.
430
431        Args:
432            string (str): String describing the job properties obtained using sacct.
433
434        Returns:
435            Self: A new instance of SlurmJob for a job step.
436        """
437        fields: list[str] = [
438            "JobId",
439            "JobState",
440            "StartTime",
441            "EndTime",
442        ]
443
444        split = string.split("|")
445        if len(fields) != len(split):
446            raise QQError(
447                f"Number of items in a sacct string for a slurm step '{string}' ('{len(split)}') does not match the expected number of items ('{len(fields)}'). This is a bug, please report it!"
448            )
449
450        info: dict[str, str] = dict(zip(fields, split))
451
452        # only take the first word from JobState
453        # other words may contain useless additional information
454        info["JobState"] = info["JobState"].split()[0]
455
456        return cls.from_dict(info["JobId"], info)
457
458    def get_ids_for_sorting(self) -> list[int]:
459        """
460        Extract numeric components of the job ID for sorting.
461
462        The method retrieves the leading numeric portion of the job ID, which may
463        contain multiple integer groups separated by underscores. Parsing stops
464        when a non-digit and non-underscore character is encountered.
465
466        Returns:
467            list[int]: A list of integer components extracted from the job ID,
468                or [0] if no valid numeric portion is found.
469        """
470        # get the numerical portion of the job ID (may contain underscores)
471        match = re.match(r"(\d+(?:_\d+)*)", self.get_id())
472        if not match:
473            return [0]
474
475        # split the matched portion into digit groups
476        groups = match.group(1).split("_")
477        return [int(g) for g in groups]
478
479    @staticmethod
480    def _expand_node_list(compact: str) -> list[str]:
481        """
482        Expand a compact Slurm node list expression into individual hostnames.
483
484        This method uses the Slurm `scontrol show hostnames` command to translate
485        a compact node list (e.g., "node[01-03]") into an explicit list of node names.
486        If the expansion fails, the original compact string is returned as a single-element list.
487
488        Args:
489            compact (str): The compact Slurm node list expression to expand.
490
491        Returns:
492            list[str]: A list of fully expanded node hostnames. If expansion fails,
493                returns a list containing the original input string.
494        """
495        command = f"scontrol show hostnames {compact}"
496        logger.debug(command)
497
498        result = subprocess.run(
499            ["bash"],
500            input=command,
501            text=True,
502            check=False,
503            capture_output=True,
504            errors="replace",
505        )
506
507        if result.returncode != 0:
508            logger.warning(
509                f"Could not expand '{compact}' into a list of nodes: {result.stderr.strip()}"
510            )
511            # use unexpanded string
512            return [compact]
513
514        return result.stdout.strip().split("\n")
515
516    def _get_int_property(self, property: str, property_name: str) -> int | None:
517        """
518        Retrieve an integer property value from the job information.
519
520        If the property contains a range (e.g., "MIN-MAX"), only the minimum value
521        is returned. If the property cannot be retrieved or converted to an integer,
522        `None` is returned.
523
524        Args:
525            property (str): The key identifying the property in the job information.
526            property_name (str): A human-readable name of the property for logging.
527
528        Returns:
529            int: The integer value of the property, or `None` if unavailable or invalid.
530        """
531        try:
532            # we split by '-' because pending jobs may have this property shown as MIN-MAX
533            # we show the value of the minimum
534            return int(self._info[property].split("-")[0])
535        except Exception:
536            logger.debug(
537                f"Could not get information about {property_name} from the batch system for '{self._job_id}'."
538            )
539            return None
540
541    def _get_datetime_property(
542        self, property: str, property_name: str
543    ) -> datetime | None:
544        """
545        Retrieve and parse a datetime property from the job information.
546
547        If the property is missing, empty, or marked as unknown, None is returned.
548        A warning is logged if parsing fails.
549
550        Args:
551            property (str): The key identifying the property in the job information.
552            property_name (str): A human-readable name of the property for logging.
553
554        Returns:
555            datetime | None: A datetime object if parsing succeeds, otherwise None.
556        """
557        if not (raw_datetime := self._info.get(property)) or raw_datetime.lower() in [
558            "unknown",
559            "n/a",
560            "none",
561            "",
562        ]:
563            return None
564
565        try:
566            return datetime.strptime(raw_datetime, CFG.date_formats.slurm)
567        except Exception as e:
568            logger.warning(
569                f"Could not parse information about {property_name} for '{self._job_id}': {e}."
570            )
571            return None
572
573    def _get_tres(self) -> str:
574        """
575        Return the AllocTRES property or ReqTRES property, depending on which of them is available.
576        Note that the resources specified in ReqTRES can potentially be different than the resources in AllocTRES.
577        """
578        tres = self._info.get("AllocTRES")
579        if not tres or "null" in tres or "None" in tres or "N/A" in tres:
580            tres = self._info.get("ReqTRES", "")
581
582        return tres
583
584    @staticmethod
585    def _assign_if_allocated(
586        info: dict[str, str], alloc_key: str, req_key: str, target_key: str
587    ) -> None:
588        """
589        Assigns a value to a target key in the `info` dictionary, preferring an allocated value
590        if it exists and is valid; otherwise, falls back to the requested value.
591
592        Args:
593            info (dict[str, str]): The dictionary containing allocation and request data.
594            alloc_key (str): The key for the allocated resource (e.g., "AllocCPUs").
595            req_key (str): The key for the requested resource (e.g., "ReqCPUs").
596            target_key (str): The key under which the resolved value should be stored (e.g., "NumCPUs").
597
598        Notes:
599            - A value is considered invalid if it is `None`, an empty string `""`, or `"0"`.
600            - The function updates `info` in place.
601        """
602        value = info.get(alloc_key)
603        info[target_key] = (
604            value if value not in (None, "None", "", "0") else info.get(req_key, "0")
605        )

Implementation of BatchJobInterface for Slurm. Stores metadata for a single Slurm job.

SlurmJob(job_id: str)
52    def __init__(self, job_id: str):
53        """Query the batch system for information about the job with the specified ID."""
54        self._job_id = job_id
55        self._info: dict[str, str] = {}
56
57        self.update()

Query the batch system for information about the job with the specified ID.

def is_empty(self) -> bool:
59    def is_empty(self) -> bool:
60        return not self._info

Check whether the job contains any information. This should return True if the job does not exist in the batch system.

Returns:

bool: True if the job contains no information.

def get_id(self) -> str:
62    def get_id(self) -> str:
63        return self._job_id

Return the ID of the job.

Returns:

str: The ID of the job.

def get_account(self) -> str | None:
65    def get_account(self) -> str | None:
66        return self._info.get("Account")

Return the account under which the job is submitted.

Returns:

str | None: Account associated with the job or None if no account is defined.

def update(self) -> None:
 68    def update(self) -> None:
 69        # first try `scontrol`
 70        command = f"scontrol show job {self._job_id} -o"
 71        logger.debug(command)
 72
 73        result = subprocess.run(
 74            ["bash"],
 75            input=command,
 76            text=True,
 77            check=False,
 78            capture_output=True,
 79            errors="replace",
 80        )
 81
 82        if result.returncode != 0:
 83            # if scontrol fails, try sacct
 84            logger.debug(
 85                f"scontrol failed for job '{self._job_id}' ({result.stderr.strip()}); trying sacct"
 86            )
 87        else:
 88            self._info: dict[str, str] = parse_slurm_dump_to_dictionary(result.stdout)
 89            return
 90
 91        # if `scontrol` fails, try `sacct`
 92        command = f"sacct --allocations --noheader --parsable2 -j {self._job_id} --format={SACCT_FIELDS} "
 93        logger.debug(command)
 94
 95        result = subprocess.run(
 96            ["bash"],
 97            input=command,
 98            text=True,
 99            check=False,
100            capture_output=True,
101            errors="replace",
102        )
103
104        if result.returncode != 0:
105            # if sacct fails, information is empty
106            logger.debug(
107                f"both scontrol and sacct failed: no information about job '{self._job_id}' is available: {result.stderr.strip()}"
108            )
109            self._info: dict[str, str] = {}
110        else:
111            job: SlurmJob = SlurmJob.from_sacct_string(result.stdout.strip())
112            self._info: dict[str, str] = job._info

Refresh the stored job information from the batch system.

Raises:
  • QQError: If the job cannot be queried or its info updated.
def get_state(self) -> qq_lib.properties.states.BatchState:
114    def get_state(self) -> BatchState:
115        if not (raw_state := self._info.get("JobState")):
116            return BatchState.UNKNOWN
117
118        converted_state = SlurmJob._STATE_CONVERTER.get(raw_state) or BatchState.UNKNOWN
119
120        # if the job is queued due to depending on another job, it should be considered "held"
121        if (
122            converted_state == BatchState.QUEUED
123            and (comment := self.get_comment())
124            and "Dependency" in comment
125        ):
126            return BatchState.HELD
127
128        return converted_state

Return the current state of the job as reported by the batch system.

If the job information is no longer available, return BatchState.UNKNOWN.

Returns:

BatchState: The job state according to the batch system.

def get_comment(self) -> str | None:
130    def get_comment(self) -> str | None:
131        if (reason := self._info.get("Reason")) and reason != "None":
132            return f"Reason: {reason}"
133
134        return None

Retrieve the batch system-provided comment for the job.

Returns:

str | None: The job's comment string if available, or None if the batch system has not attached a comment.

def get_estimated(self) -> tuple[datetime.datetime, str] | None:
136    def get_estimated(self) -> tuple[datetime, str] | None:
137        # use "StartTime" as an estimate
138        if not (time := self.get_start_time()) or time == "None":
139            return None
140
141        if not (node_list := self._info.get("SchedNodeList")) or "None" in node_list:
142            return None
143
144        return (time, node_list)

Retrieve the batch system's estimated job start time and execution node.

Returns:

tuple[datetime, str] | None: A tuple containing: - datetime: The estimated start time of the job. - str: The name of the node where the job is expected to run. Returns None if either estimate is unavailable.

def get_main_node(self) -> str | None:
146    def get_main_node(self) -> str | None:
147        if (main_node := self._info.get("BatchHost")) and "None" not in main_node:
148            return main_node
149
150        # if BatchHost does not exist, use the first node from NodeList
151        if nodes := self.get_nodes():
152            return nodes[0]
153
154        return None

Retrieve the hostname of the main execution node for the job.

Returns:

str | None: The hostname of the main execution node, or None if unavailable or not applicable.

def get_nodes(self) -> list[str] | None:
156    def get_nodes(self) -> list[str] | None:
157        if (node_list := self._info.get("NodeList")) and "None" not in node_list:
158            return SlurmJob._expand_node_list(node_list)
159
160        return None

Retrieve the hostnames of all execution nodes allocated for the job.

Returns:

list[str] | None: A list of hostnames or node identifiers used by the job, or None if node information is not available.

def get_short_nodes(self) -> list[str] | None:
162    def get_short_nodes(self) -> list[str] | None:
163        # treat all nodes a single node, without expanding
164        # this assumes that get_short_nodes is only used in qq jobs and qq stat
165        if (node_list := self._info.get("NodeList")) and "None" not in node_list:
166            return [node_list]
167
168        return None

Retrieve the short hostnames of all execution nodes allocated for the job.

Returns:

list[str] | None: A list of short hostnames used by the job, or None if node information is not available.

def get_name(self) -> str | None:
170    def get_name(self) -> str | None:
171        if not (name := self._info.get("JobName")):
172            logger.debug(f"Could not get job name for '{self._job_id}'.")
173            return None
174
175        return name

Return the name of the job.

Returns:

str | None: The name of the submitted job or None if not available.

def get_n_cpus(self) -> int | None:
177    def get_n_cpus(self) -> int | None:
178        min_cpus = (
179            self._get_int_property("MinCPUsNode", "the minimum number of CPUs per node")
180            or 0
181        ) * (self.get_n_nodes() or 0)
182
183        if not (cpus := self._get_int_property("NumCPUs", "the number of CPUs")):
184            return None
185
186        return max(min_cpus, cpus)

Return the number of CPU cores allocated for the job.

Returns:

int | None: Number of CPUs allocated for the job or None if not available.

def get_n_gpus(self) -> int | None:
188    def get_n_gpus(self) -> int | None:
189        tres = self._get_tres()
190        for item in tres.split(","):
191            if item.startswith("gpu") or item.startswith("gres/gpu"):
192                try:
193                    return int(item.split("=")[1])
194                except ValueError as e:
195                    logger.warning(
196                        f"Could not parse the number of GPUs from '{item}': {e}."
197                    )
198                    return None
199
200        return None

Return the number of GPUs allocated for the job.

Returns:

int | None: Number of GPUs allocated for the job or None if not available.

def get_n_nodes(self) -> int | None:
202    def get_n_nodes(self) -> int | None:
203        return self._get_int_property("NumNodes", "the number of nodes")

Return the number of compute nodes assigned to the job.

Returns:

int | None: Number of nodes used by the job or None if not available.

def get_mem(self) -> qq_lib.properties.size.Size | None:
205    def get_mem(self) -> Size | None:
206        tres = self._get_tres()
207        for item in tres.split(","):
208            if item.startswith("mem="):
209                try:
210                    return Size.from_string(item.split("=", 1)[1])
211                except Exception as e:
212                    logger.warning(f"Could not parse memory for '{self._job_id}': {e}.")
213                    return None
214
215        logger.debug(f"Memory not available for '{self._job_id}'.")
216        return None

Return the amount of memory allocated for the job.

Returns:

Size | None: Amount of memory allocated for the job or None if not available.

def get_start_time(self) -> datetime.datetime | None:
218    def get_start_time(self) -> datetime | None:
219        return self._get_datetime_property("StartTime", "the job start time")

Return the timestamp when the job started execution.

Returns:

datetime | None: Time when the job began running or None if the job has not yet started.

def get_submission_time(self) -> datetime.datetime | None:
221    def get_submission_time(self) -> datetime | None:
222        return self._get_datetime_property("SubmitTime", "the job submission time")

Return the timestamp when the job was submitted.

Returns:

datetime | None: Time when the job was submitted to the batch system or None if not available.

def get_completion_time(self) -> datetime.datetime | None:
224    def get_completion_time(self) -> datetime | None:
225        # the property EndTime is available for running jobs as well (estimated completion time)
226        # but that should not matter for our purposes
227        return self._get_datetime_property("EndTime", "the job completion time")

Return the timestamp when the job was completed.

Returns:

datetime | None: Time when the job completed or None if the job has not yet completed.

def get_modification_time(self) -> datetime.datetime | None:
229    def get_modification_time(self) -> datetime | None:
230        # assuming this is only used for completed jobs
231        return self.get_completion_time() or self.get_submission_time()

Return the timestamp at which the job was last modified.

Returns:

datetime | None: Time when the job was last modified or None if the information is not available.

def get_user(self) -> str | None:
233    def get_user(self) -> str | None:
234        if not (user := self._info.get("UserId")):
235            logger.debug(f"Could not get user for '{self._job_id}'.")
236            return None
237
238        return user.split("(")[0]

Return the username of the job owner.

Returns:

str | None: Username of the user who owns the job or None if not available.

def get_walltime(self) -> datetime.timedelta | None:
240    def get_walltime(self) -> timedelta | None:
241        if not (walltime := self._info.get("TimeLimit")):
242            logger.debug(f"Could not get walltime for '{self._job_id}'.")
243            return None
244
245        try:
246            return dhhmmss_to_duration(walltime)
247        except QQError as e:
248            logger.warning(f"Could not parse walltime for '{self._job_id}': {e}.")
249            return None

Return the walltime limit of the job.

Returns:

timedelta | None: Walltime for the job or None if not available.

def get_queue(self) -> str | None:
251    def get_queue(self) -> str | None:
252        if not (queue := self._info.get("Partition")):
253            logger.debug(f"Could not get queue for '{self._job_id}'.")
254            return None
255
256        return queue

Return the submission queue of the job.

Returns:

str | None: The queue this job is part of or None if not available.

def get_util_cpu(self) -> int | None:
258    def get_util_cpu(self) -> int | None:
259        # not available in Slurm
260        return None

Return the utilization of requested CPUs in percents (0-100).

Returns:

int | None: Utilization of requested CPUs or None if not available.

def get_util_mem(self) -> int | None:
262    def get_util_mem(self) -> int | None:
263        # not available in Slurm
264        return None

Return the utilization of requested memory in percents (0-100).

Returns:

int | None: Utilization of requested memory or None if not available.

def get_exit_code(self) -> int | None:
266    def get_exit_code(self) -> int | None:
267        if not (raw_exit := self._info.get("ExitCode")):
268            return None
269
270        try:
271            # Slurm reports two exit codes; the first one is exit code of the script
272            # the second one is a signal
273            # we return the first non-zero exit code or 0 if both exit codes are 0
274            code, signal = map(int, raw_exit.split(":"))
275            return code or signal
276        except Exception as e:
277            logger.debug(f"Could not parse exit codes '{raw_exit}': {e}.")
278            return None

Return the exit code of the job.

Returns:

int | None: Exit code of the job or None if exit code is not assigned.

def get_input_machine(self) -> str | None:
280    def get_input_machine(self) -> str | None:
281        # not available for Slurm
282        return None

Return the hostname of the submission machine.

Returns:

str | None: Hostname of the submission machine or None if not available.

def get_input_dir(self) -> pathlib._local.Path | None:
284    def get_input_dir(self) -> Path | None:
285        if not (raw_dir := self._info.get("WorkDir")):
286            logger.debug(f"Could not obtain input directory for '{self._job_id}'.")
287            return None
288
289        return logical_resolve(Path(raw_dir))

Return path to the directory from which the job was submitted.

Returns:

Path | None: Path to the submission directory or None if not available.

def get_info_file(self) -> pathlib._local.Path | None:
291    def get_info_file(self) -> Path | None:
292        if not (input_dir := self.get_input_dir()) or not (name := self.get_name()):
293            return None
294
295        info_file = (input_dir / name).with_suffix(CFG.suffixes.qq_info)
296
297        # we need to check whether the info file actually exists
298        # (or rather if it is available to the user)
299        try:
300            if not info_file.is_file():
301                return None
302        except PermissionError:
303            return None
304
305        return info_file

Return path to the info file associated with this job.

Returns:

Path | None: Path to the qq info file or None if this is not a qq job.

def to_yaml(self) -> str:
307    def to_yaml(self) -> str:
308        return yaml.dump(
309            self._info, default_flow_style=False, sort_keys=False, Dumper=Dumper
310        )

Return all information about the job from the batch system in YAML format.

Returns:

str: YAML-formatted string of job metadata.

def get_steps(self) -> Sequence[typing.Self]:
312    def get_steps(self) -> Sequence[Self]:
313        command = f"sacct -j {self._job_id} --parsable2 --format={SACCT_STEP_FIELDS}"
314        logger.debug(command)
315
316        result = subprocess.run(
317            ["bash"],
318            input=command,
319            text=True,
320            check=False,
321            capture_output=True,
322            errors="replace",
323        )
324
325        if result.returncode != 0:
326            logger.debug(f"Could not get steps for a job '{self._job_id}'.")
327            return []
328
329        jobs = []
330        for sacct_string in result.stdout.split("\n"):
331            if sacct_string.strip() == "":
332                continue
333
334            job = SlurmJob._step_from_sacct_string(sacct_string)
335            # only consider job steps with numeric indices
336            if (step_id := job.get_step_id()) and step_id.isnumeric():
337                jobs.append(job)
338
339        return jobs

Return a list of steps associated with this job.

Note that job step is represented by BatchJobInterface, but may not contain all the values that a proper BatchJobInterface contains.

Returns:

Sequence[BatchJobInterface]: List of job steps. An empty list if there are none.

def get_step_id(self) -> str | None:
341    def get_step_id(self) -> str | None:
342        try:
343            (_, step) = self._job_id.split(".", maxsplit=1)
344            return step
345        except ValueError:
346            return None

Return the step index if this job is a job step.

Returns:

str | None: Job step index or None if this is not a job step.

def is_array_job(self) -> bool:
348    def is_array_job(self) -> bool:
349        return False

Return True if the job is a top-level array job (not a sub-job).

Returns:

bool: True if the job is a top-level array job, else False.

@classmethod
def from_dict(cls, job_id: str, info: dict[str, str]) -> Self:
351    @classmethod
352    def from_dict(cls, job_id: str, info: dict[str, str]) -> Self:
353        """
354        Construct a new instance of SlurmJob from a job ID and a dictionary of job information.
355
356        This method bypasses the standard initializer and directly sets the `_job_id` and `_info`
357        attributes of the new instance.
358
359        Args:
360            job_id (str): The unique identifier of the job.
361            info (dict[str, str]): A dictionary containing Slurm job metadata as key-value pairs.
362
363        Returns:
364            Self: A new instance of SlurmJob.
365
366        Note:
367            This method does not perform any validation or processing of the provided dictionary.
368        """
369        job_info = cls.__new__(cls)
370        job_info._job_id = job_id
371        job_info._info = info
372
373        return job_info

Construct a new instance of SlurmJob from a job ID and a dictionary of job information.

This method bypasses the standard initializer and directly sets the _job_id and _info attributes of the new instance.

Arguments:
  • job_id (str): The unique identifier of the job.
  • info (dict[str, str]): A dictionary containing Slurm job metadata as key-value pairs.
Returns:

Self: A new instance of SlurmJob.

Note:

This method does not perform any validation or processing of the provided dictionary.

@classmethod
def from_sacct_string(cls, string: str) -> Self:
375    @classmethod
376    def from_sacct_string(cls, string: str) -> Self:
377        """
378        Construct a new instance of SlurmJob using a string from sacct.
379
380        Args:
381            string (str): String describing the job properties obtained using sacct.
382
383        Returns:
384            Self: A new instance of SlurmJob.
385        """
386        fields: list[str] = [
387            "JobId",
388            "Account",
389            "JobState",
390            "UserId",
391            "JobName",
392            "Partition",
393            "WorkDir",
394            "AllocCPUs",
395            "ReqCPUs",
396            "AllocTRES",
397            "ReqTRES",
398            "AllocNodes",
399            "ReqNodes",
400            "SubmitTime",
401            "StartTime",
402            "EndTime",
403            "TimeLimit",
404            "NodeList",
405            "Reason",
406            "ExitCode",
407        ]
408
409        split = string.split("|")
410        if len(fields) != len(split):
411            raise QQError(
412                f"Number of items in a sacct string '{string}' ('{len(split)}') does not match the expected number of items ('{len(fields)}'). This is a bug, please report it!"
413            )
414
415        info: dict[str, str] = dict(zip(fields, split))
416
417        # only take the first word from JobState
418        # other words may contain useless additional information
419        info["JobState"] = info["JobState"].split()[0]
420
421        SlurmJob._assign_if_allocated(info, "AllocCPUs", "ReqCPUs", "NumCPUs")
422        SlurmJob._assign_if_allocated(info, "AllocNodes", "ReqNodes", "NumNodes")
423
424        return cls.from_dict(info["JobId"], info)

Construct a new instance of SlurmJob using a string from sacct.

Arguments:
  • string (str): String describing the job properties obtained using sacct.
Returns:

Self: A new instance of SlurmJob.

def get_ids_for_sorting(self) -> list[int]:
458    def get_ids_for_sorting(self) -> list[int]:
459        """
460        Extract numeric components of the job ID for sorting.
461
462        The method retrieves the leading numeric portion of the job ID, which may
463        contain multiple integer groups separated by underscores. Parsing stops
464        when a non-digit and non-underscore character is encountered.
465
466        Returns:
467            list[int]: A list of integer components extracted from the job ID,
468                or [0] if no valid numeric portion is found.
469        """
470        # get the numerical portion of the job ID (may contain underscores)
471        match = re.match(r"(\d+(?:_\d+)*)", self.get_id())
472        if not match:
473            return [0]
474
475        # split the matched portion into digit groups
476        groups = match.group(1).split("_")
477        return [int(g) for g in groups]

Extract numeric components of the job ID for sorting.

The method retrieves the leading numeric portion of the job ID, which may contain multiple integer groups separated by underscores. Parsing stops when a non-digit and non-underscore character is encountered.

Returns:

list[int]: A list of integer components extracted from the job ID, or [0] if no valid numeric portion is found.

class SlurmNode(qq_lib.batch.interface.node.BatchNodeInterface):
 21class SlurmNode(BatchNodeInterface):
 22    """
 23    Implementation of BatchNodeInterface for Slurm.
 24    Stores metadata for a single Slurm node.
 25    """
 26
 27    def __init__(self, name: str):
 28        self._name = name
 29        self._info: dict[str, str] = {}
 30
 31        self.update()
 32
 33    def update(self) -> None:
 34        # get node info from Slurm
 35        command = f"scontrol show node {self._name} -o"
 36
 37        result = subprocess.run(
 38            ["bash"],
 39            input=command,
 40            text=True,
 41            check=False,
 42            capture_output=True,
 43            errors="replace",
 44        )
 45
 46        if result.returncode != 0:
 47            raise QQError(f"Node '{self._name}' does not exist.")
 48
 49        self._info = parse_slurm_dump_to_dictionary(result.stdout)
 50
 51    def get_name(self) -> str:
 52        return self._name
 53
 54    def get_n_cpus(self) -> int | None:
 55        return self._get_int_resource("CPUTot")
 56
 57    def get_n_free_cpus(self) -> int | None:
 58        if not (cpus := self.get_n_cpus()):
 59            return None
 60
 61        return cpus - (self._get_int_resource("CPUAlloc") or 0)
 62
 63    def get_n_gpus(self) -> int | None:
 64        return self._get_int_from_tres("CfgTRES", "gpu")
 65
 66    def get_n_free_gpus(self) -> int | None:
 67        if not (gpus := self.get_n_gpus()):
 68            return None
 69
 70        return gpus - (self._get_int_from_tres("AllocTRES", "gpu") or 0)
 71
 72    def get_cpu_memory(self) -> Size | None:
 73        # RealMemory corresponds to memory configured in slurm.conf
 74        return self._get_size_resource("RealMemory")
 75
 76    def get_free_cpu_memory(self) -> Size | None:
 77        if not (mem := self.get_cpu_memory()):
 78            return None
 79
 80        # we do not use the FreeMem property as it corresponds to the ACTUAL free memory on the machine
 81        # and can be higher than RealMemory - AllocMem (e.g. if the jobs don't use all the allocated memory)
 82        return mem - (self._get_size_resource("AllocMem") or Size(0, "kb"))
 83
 84    def get_gpu_memory(self) -> Size | None:
 85        return None
 86
 87    def get_free_gpu_memory(self) -> Size | None:
 88        return None
 89
 90    def get_local_scratch(self) -> Size | None:
 91        return self._get_size_resource("TmpDisk")
 92
 93    def get_free_local_scratch(self) -> Size | None:
 94        return self._get_size_resource("TmpDisk")
 95
 96    def get_ssd_scratch(self) -> Size | None:
 97        return None
 98
 99    def get_free_ssd_scratch(self) -> Size | None:
100        return None
101
102    def get_shared_scratch(self) -> Size | None:
103        return None
104
105    def get_free_shared_scratch(self) -> Size | None:
106        return None
107
108    def get_properties(self) -> list[str]:
109        if not (raw := self._info.get("AvailableFeatures")):
110            return []
111
112        return raw.split(",")
113
114    def is_available_to_user(self, user: str) -> bool:
115        _ = user
116
117        if not (state := self._info.get("State")):
118            logger.debug(f"Could not get state information for node '{self._name}'.")
119            return False
120
121        invalid_states = [
122            "DOWN",
123            "DRAINED",
124            "FAIL",
125            "FUTURE",
126            "INVAL",
127            "MAINT",
128            "PERFCTRS",
129            "POWERED_DOWN",
130            "POWERING_DOWN",
131            "RESERVED",
132            "UNKNOWN",
133        ]
134
135        return state not in invalid_states
136
137    @classmethod
138    def from_dict(cls, name: str, info: dict[str, str]) -> Self:
139        """
140        Construct a new instance of SlurmNode from node name and a dictionary of node information.
141
142        Args:
143            name (str): The unique name of the node.
144            info (dict[str, str]): A dictionary containing Slurm node metadata as key-value pairs.
145
146        Returns:
147            Self: A new instance of SlurmNode.
148
149        Note:
150            This method does not perform any validation or processing of the provided dictionary.
151        """
152        node = cls.__new__(cls)
153        node._name = name
154        node._info = info
155
156        return node
157
158    def to_yaml(self) -> str:
159        return yaml.dump(
160            self._info, default_flow_style=False, sort_keys=False, Dumper=Dumper
161        )
162
163    def _get_int_resource(self, res: str) -> int | None:
164        """
165        Retrieve an integer-valued resource from the node information.
166
167        Args:
168            res (str): The resource key to retrieve.
169
170        Returns:
171            int | None: The integer value of the resource, or `None` if unavailable or invalid.
172        """
173        if not (val := self._info.get(res)):
174            return None
175        try:
176            return int(val)
177        except Exception as e:
178            logger.debug(f"Could not parse the value '{val}' of resource '{res}': {e}.")
179            return None
180
181    def _get_int_from_tres(self, tres_key: str, res: str) -> int | None:
182        """
183        Retrieve an integer-valued resources from TRES.
184
185        Args:
186            tres_key (str): The tres key to use.
187            res (str): The resource key to retrieve.
188
189        Returns:
190            int | None: The integer value of the resources, or `None` if unavailable or invalid.
191        """
192        tres = self._info.get(tres_key, "")
193
194        for item in tres.split(","):
195            if res in item:
196                try:
197                    return int(item.split("=")[1])
198                except ValueError as e:
199                    logger.debug(
200                        f"Could not parse the property '{res}' from '{item}': {e}."
201                    )
202
203        return None
204
205    def _get_size_resource(self, res: str) -> Size | None:
206        """
207        Retrieve a Size resource from the node information.
208
209        Args:
210            res (str): The resource key to retrieve.
211
212        Returns:
213            Size | None: The parsed Size, or `None` if unavailable or invalid.
214        """
215        if not (val := self._info.get(res)):
216            return None
217
218        try:
219            if val.isnumeric():
220                val += "M"
221            return Size.from_string(val)
222        except Exception as e:
223            logger.debug(f"Could not parse the value '{val}' of resource '{res}': {e}.")
224            return None

Implementation of BatchNodeInterface for Slurm. Stores metadata for a single Slurm node.

SlurmNode(name: str)
27    def __init__(self, name: str):
28        self._name = name
29        self._info: dict[str, str] = {}
30
31        self.update()
def update(self) -> None:
33    def update(self) -> None:
34        # get node info from Slurm
35        command = f"scontrol show node {self._name} -o"
36
37        result = subprocess.run(
38            ["bash"],
39            input=command,
40            text=True,
41            check=False,
42            capture_output=True,
43            errors="replace",
44        )
45
46        if result.returncode != 0:
47            raise QQError(f"Node '{self._name}' does not exist.")
48
49        self._info = parse_slurm_dump_to_dictionary(result.stdout)

Refresh the stored node information from the batch system.

Raises:
  • QQError: If the node cannot be queried or its info updated.
def get_name(self) -> str:
51    def get_name(self) -> str:
52        return self._name

Retrieve the name of the node.

Returns:

str: The name identifying the node in the batch system.

def get_n_cpus(self) -> int | None:
54    def get_n_cpus(self) -> int | None:
55        return self._get_int_resource("CPUTot")

Retrieve the total number of CPU cores available on the node.

Returns:

int | None: Total CPU core count or None if not available.

def get_n_free_cpus(self) -> int | None:
57    def get_n_free_cpus(self) -> int | None:
58        if not (cpus := self.get_n_cpus()):
59            return None
60
61        return cpus - (self._get_int_resource("CPUAlloc") or 0)

Retrieve the number of currently available (unallocated) CPU cores.

Returns:

int | None: Number of free CPU cores or None if not available.

def get_n_gpus(self) -> int | None:
63    def get_n_gpus(self) -> int | None:
64        return self._get_int_from_tres("CfgTRES", "gpu")

Retrieve the total number of GPUs available on the node.

Returns:

int | None: Total GPU count or None if not available..

def get_n_free_gpus(self) -> int | None:
66    def get_n_free_gpus(self) -> int | None:
67        if not (gpus := self.get_n_gpus()):
68            return None
69
70        return gpus - (self._get_int_from_tres("AllocTRES", "gpu") or 0)

Retrieve the number of currently available (unallocated) GPUs.

Returns:

int | None: Number of free GPUs or None if not available.

def get_cpu_memory(self) -> qq_lib.properties.size.Size | None:
72    def get_cpu_memory(self) -> Size | None:
73        # RealMemory corresponds to memory configured in slurm.conf
74        return self._get_size_resource("RealMemory")

Retrieve the total CPU memory capacity of the node.

Returns:

Size | None: Total CPU memory available on the node or None if not available.

def get_free_cpu_memory(self) -> qq_lib.properties.size.Size | None:
76    def get_free_cpu_memory(self) -> Size | None:
77        if not (mem := self.get_cpu_memory()):
78            return None
79
80        # we do not use the FreeMem property as it corresponds to the ACTUAL free memory on the machine
81        # and can be higher than RealMemory - AllocMem (e.g. if the jobs don't use all the allocated memory)
82        return mem - (self._get_size_resource("AllocMem") or Size(0, "kb"))

Retrieve the currently available CPU memory.

Returns:

Size | None: Free (unused) CPU memory or None if not available.

def get_gpu_memory(self) -> qq_lib.properties.size.Size | None:
84    def get_gpu_memory(self) -> Size | None:
85        return None

Retrieve the total GPU memory capacity of the node.

Returns:

Size | None: Total GPU memory available or None if not available.

def get_free_gpu_memory(self) -> qq_lib.properties.size.Size | None:
87    def get_free_gpu_memory(self) -> Size | None:
88        return None

Retrieve the currently available GPU memory.

Returns:

Size | None: Free (unused) GPU memory or None if not available.

def get_local_scratch(self) -> qq_lib.properties.size.Size | None:
90    def get_local_scratch(self) -> Size | None:
91        return self._get_size_resource("TmpDisk")

Retrieve the total local scratch storage capacity of the node.

Returns:

Size | None: Total size of local scratch space or None if not available.

def get_free_local_scratch(self) -> qq_lib.properties.size.Size | None:
93    def get_free_local_scratch(self) -> Size | None:
94        return self._get_size_resource("TmpDisk")

Retrieve the available local scratch storage space.

Returns:

Size | None: Free local scratch space or None if not available.

def get_ssd_scratch(self) -> qq_lib.properties.size.Size | None:
96    def get_ssd_scratch(self) -> Size | None:
97        return None

Retrieve the total SSD-based scratch storage capacity.

Returns:

Size | None: Total SSD scratch capacity or None if not available.

def get_free_ssd_scratch(self) -> qq_lib.properties.size.Size | None:
 99    def get_free_ssd_scratch(self) -> Size | None:
100        return None

Retrieve the currently available SSD-based scratch storage space.

Returns:

Size | None: Free SSD scratch space or None if not available.

def get_shared_scratch(self) -> qq_lib.properties.size.Size | None:
102    def get_shared_scratch(self) -> Size | None:
103        return None

Retrieve the total capacity of shared scratch storage accessible from the node.

Returns:

Size | None: Total shared scratch capacity or None if not available.

def get_free_shared_scratch(self) -> qq_lib.properties.size.Size | None:
105    def get_free_shared_scratch(self) -> Size | None:
106        return None

Retrieve the available space in shared scratch storage.

Returns:

Size | None: Free shared scratch space or None if not available.

def get_properties(self) -> list[str]:
108    def get_properties(self) -> list[str]:
109        if not (raw := self._info.get("AvailableFeatures")):
110            return []
111
112        return raw.split(",")

Get the list of properties or labels assigned to the node.

Returns:

list[str]: List of node property strings.

def is_available_to_user(self, user: str) -> bool:
114    def is_available_to_user(self, user: str) -> bool:
115        _ = user
116
117        if not (state := self._info.get("State")):
118            logger.debug(f"Could not get state information for node '{self._name}'.")
119            return False
120
121        invalid_states = [
122            "DOWN",
123            "DRAINED",
124            "FAIL",
125            "FUTURE",
126            "INVAL",
127            "MAINT",
128            "PERFCTRS",
129            "POWERED_DOWN",
130            "POWERING_DOWN",
131            "RESERVED",
132            "UNKNOWN",
133        ]
134
135        return state not in invalid_states

Check if the node is available to the specified user.

Arguments:
  • user (str): The username to check access for.
Returns:

bool: True if the node is up and schedulable, False otherwise.

@classmethod
def from_dict(cls, name: str, info: dict[str, str]) -> Self:
137    @classmethod
138    def from_dict(cls, name: str, info: dict[str, str]) -> Self:
139        """
140        Construct a new instance of SlurmNode from node name and a dictionary of node information.
141
142        Args:
143            name (str): The unique name of the node.
144            info (dict[str, str]): A dictionary containing Slurm node metadata as key-value pairs.
145
146        Returns:
147            Self: A new instance of SlurmNode.
148
149        Note:
150            This method does not perform any validation or processing of the provided dictionary.
151        """
152        node = cls.__new__(cls)
153        node._name = name
154        node._info = info
155
156        return node

Construct a new instance of SlurmNode from node name and a dictionary of node information.

Arguments:
  • name (str): The unique name of the node.
  • info (dict[str, str]): A dictionary containing Slurm node metadata as key-value pairs.
Returns:

Self: A new instance of SlurmNode.

Note:

This method does not perform any validation or processing of the provided dictionary.

def to_yaml(self) -> str:
158    def to_yaml(self) -> str:
159        return yaml.dump(
160            self._info, default_flow_style=False, sort_keys=False, Dumper=Dumper
161        )

Return all information about the node in YAML format.

Returns:

str: YAML-formatted string of node metadata.

class SlurmQueue(qq_lib.batch.interface.queue.BatchQueueInterface):
 94class SlurmQueue(BatchQueueInterface):
 95    """
 96    Implementation of BatchQueueInterface for Slurm.
 97    Stores metadata for a single Slurm queue.
 98    """
 99
100    def __init__(self, name: str):
101        self._name = name
102        self._info: dict[str, str] = {}
103
104        self.update()
105
106    def update(self) -> None:
107        # get queue info from Slurm
108        command = f"scontrol show partition {self._name} -o"
109        logger.debug(command)
110
111        result = subprocess.run(
112            ["bash"],
113            input=command,
114            text=True,
115            check=False,
116            capture_output=True,
117            errors="replace",
118        )
119
120        if result.returncode != 0:
121            raise QQError(f"Queue '{self._name}' does not exist.")
122
123        self._info = parse_slurm_dump_to_dictionary(result.stdout)
124        self._set_job_numbers()
125
126    def get_name(self) -> str:
127        return self._name
128
129    def get_priority(self) -> str | None:
130        if not (tier := self._info.get("PriorityTier")):
131            return None
132
133        if not (job_factor := self._info.get("PriorityJobFactor")):
134            return None
135
136        return f"T{tier} ({job_factor})"
137
138    def get_total_jobs(self) -> int | None:
139        return self._running_jobs + self._queued_jobs + self._other_jobs
140
141    def get_running_jobs(self) -> int | None:
142        return self._running_jobs
143
144    def get_queued_jobs(self) -> int | None:
145        return self._queued_jobs
146
147    def get_other_jobs(self) -> int | None:
148        return self._other_jobs
149
150    def get_max_walltime(self) -> timedelta | None:
151        if raw := self._info.get("MaxTime"):
152            return dhhmmss_to_duration(raw)
153
154        return None
155
156    def get_max_n_nodes(self) -> int | None:
157        if not (raw := self._info.get("MaxNodes")):
158            return None
159
160        try:
161            return int(raw)
162        except ValueError as e:
163            logger.debug(f"Could not parse the 'MaxNodes' property as integer: {e}.")
164            return None
165
166    def get_comment(self) -> str | None:
167        return None
168
169    def is_available_to_user(self, user: str) -> bool:
170        # check the queue's state
171        state = self._info.get("State", "DOWN")
172        if state not in ["UP", "DRAIN"]:
173            return False
174
175        def parse_list(value):
176            if not value or value == "(null)" or value == "ALL":
177                return None
178            return [item.strip() for item in value.split(",")]
179
180        # check allowed accounts
181        allow_accounts = parse_list(self._info.get("AllowAccounts", "ALL"))
182        if allow_accounts and user not in allow_accounts:
183            return False
184
185        # check denied accounts
186        deny_accounts = parse_list(self._info.get("DenyAccounts", "(null)"))
187        if deny_accounts and user in deny_accounts:
188            return False
189
190        # check allowed groups
191        user_groups = UserGroups.get_groups_or_init(user)
192        allow_groups = parse_list(self._info.get("AllowGroups", "ALL"))
193        if allow_groups and not any(group in allow_groups for group in user_groups):
194            return False
195
196        # check denied groups
197        deny_groups = parse_list(self._info.get("DenyGroups", "(null)"))
198        if deny_groups and any(group in deny_groups for group in user_groups):
199            return False
200
201        # check allowed QOS
202        user_qos = UserGroups.get_qos_or_init(user)
203        allow_qos = parse_list(self._info.get("AllowQos", "ALL"))
204        if allow_qos and user_qos not in allow_qos:
205            return False
206
207        # check denies QOS
208        deny_qos = parse_list(self._info.get("DenyQos", "(null)"))
209        return not (deny_qos and user_qos in deny_qos)
210
211    def get_destinations(self) -> list[str]:
212        # no destinations
213        return []
214
215    def from_route_only(self) -> bool:
216        return False
217
218    def to_yaml(self) -> str:
219        return yaml.dump(
220            self._info, default_flow_style=False, sort_keys=False, Dumper=Dumper
221        )
222
223    def get_default_resources(self) -> Resources:
224        return default_resources_from_dict(self._info)
225
226    @classmethod
227    def from_dict(cls, name: str, info: dict[str, str]) -> Self:
228        """
229        Construct a new instance of SlurmQueue from a queue name and a dictionary of queue information.
230
231
232        Args:
233            name (str): The unique name of the queue.
234            info (dict[str, str]): A dictionary containing Slurm queue metadata as key-value pairs.
235
236        Returns:
237            Self: A new instance of SlurmQueue.
238
239        Note:
240            This method does not perform any validation or processing of the provided dictionary.
241        """
242        queue = cls.__new__(cls)
243        queue._name = name
244        queue._info = info
245        queue._set_job_numbers()
246
247        return queue
248
249    def _set_job_numbers(self) -> None:
250        """
251        Get and set the numbers of jobs in this queue.
252        """
253        self._running_jobs = 0
254        self._queued_jobs = 0
255        self._other_jobs = 0
256
257        # get the numbers of jobs in the queue by individual states
258        command = f'squeue -p {self._name} -h -o "%T" | uniq -c'
259        logger.debug(command)
260
261        result = subprocess.run(
262            ["bash"],
263            input=command,
264            text=True,
265            check=False,
266            capture_output=True,
267            errors="replace",
268        )
269
270        if result.returncode != 0:
271            raise QQError(
272                f"Could not get job numbers for queue '{self._name}': {result.stderr.strip()}."
273            )
274
275        for line in result.stdout.splitlines():
276            if not line.strip():
277                continue
278
279            try:
280                count_str, job_type = line.split()
281                count = int(count_str)
282            except ValueError as e:
283                logger.warning(
284                    f"Could not parse line '{line}' when obtaining job numbers for queue '{self._name}': {e}."
285                )
286                continue
287
288            match job_type:
289                case "RUNNING":
290                    self._running_jobs += count
291                case "PENDING":
292                    self._queued_jobs += count
293                case "SUSPENDED" | "PREEMPTED":
294                    self._other_jobs += count
295                # ignore other jobs

Implementation of BatchQueueInterface for Slurm. Stores metadata for a single Slurm queue.

SlurmQueue(name: str)
100    def __init__(self, name: str):
101        self._name = name
102        self._info: dict[str, str] = {}
103
104        self.update()
def update(self) -> None:
106    def update(self) -> None:
107        # get queue info from Slurm
108        command = f"scontrol show partition {self._name} -o"
109        logger.debug(command)
110
111        result = subprocess.run(
112            ["bash"],
113            input=command,
114            text=True,
115            check=False,
116            capture_output=True,
117            errors="replace",
118        )
119
120        if result.returncode != 0:
121            raise QQError(f"Queue '{self._name}' does not exist.")
122
123        self._info = parse_slurm_dump_to_dictionary(result.stdout)
124        self._set_job_numbers()

Refresh the stored queue information from the batch system.

Raises:
  • QQError: If the queue cannot be queried or its info updated.
def get_name(self) -> str:
126    def get_name(self) -> str:
127        return self._name

Retrieve the name of the queue.

Returns:

str: The name identifying this queue in the batch system.

def get_priority(self) -> str | None:
129    def get_priority(self) -> str | None:
130        if not (tier := self._info.get("PriorityTier")):
131            return None
132
133        if not (job_factor := self._info.get("PriorityJobFactor")):
134            return None
135
136        return f"T{tier} ({job_factor})"

Retrieve the scheduling priority of the queue.

Returns:

str | None: The queue priority, or None if priority information is not available.

def get_total_jobs(self) -> int | None:
138    def get_total_jobs(self) -> int | None:
139        return self._running_jobs + self._queued_jobs + self._other_jobs

Retrieve the total number of jobs currently in the queue.

Returns:

int | None: The total count of jobs, regardless of status or None if the information is not available.

def get_running_jobs(self) -> int | None:
141    def get_running_jobs(self) -> int | None:
142        return self._running_jobs

Retrieve the number of jobs currently running in the queue.

Returns:

int | None: The number of running jobs or None if the information is not available.

def get_queued_jobs(self) -> int | None:
144    def get_queued_jobs(self) -> int | None:
145        return self._queued_jobs

Retrieve the number of jobs waiting to start in the queue.

Returns:

int | None: The number of queued jobs or None if the information is not available.

def get_other_jobs(self) -> int | None:
147    def get_other_jobs(self) -> int | None:
148        return self._other_jobs

Retrieve the number of jobs in other states (non-running and non-queued).

Returns:

int | None: The number of jobs that are neither running nor queued, such as exiting or suspended jobs. Returns None if the information is not available.

def get_max_walltime(self) -> datetime.timedelta | None:
150    def get_max_walltime(self) -> timedelta | None:
151        if raw := self._info.get("MaxTime"):
152            return dhhmmss_to_duration(raw)
153
154        return None

Retrieve the maximum walltime allowed for jobs in the queue.

Returns:

timedelta | None: The walltime limit, or None if unlimited or unknown.

def get_max_n_nodes(self) -> int | None:
156    def get_max_n_nodes(self) -> int | None:
157        if not (raw := self._info.get("MaxNodes")):
158            return None
159
160        try:
161            return int(raw)
162        except ValueError as e:
163            logger.debug(f"Could not parse the 'MaxNodes' property as integer: {e}.")
164            return None

Retrieve the maximum number of nodes that can be requested in the queue.

Returns:

int | None: The maximum number of nodes that can be requested, or None if unlimited or unknown.

def get_comment(self) -> str | None:
166    def get_comment(self) -> str | None:
167        return None

Retrieve the comment or description associated with the queue.

Returns:

str | None: The human-readable comment or note about the queue or None if the information is not available.

def is_available_to_user(self, user: str) -> bool:
169    def is_available_to_user(self, user: str) -> bool:
170        # check the queue's state
171        state = self._info.get("State", "DOWN")
172        if state not in ["UP", "DRAIN"]:
173            return False
174
175        def parse_list(value):
176            if not value or value == "(null)" or value == "ALL":
177                return None
178            return [item.strip() for item in value.split(",")]
179
180        # check allowed accounts
181        allow_accounts = parse_list(self._info.get("AllowAccounts", "ALL"))
182        if allow_accounts and user not in allow_accounts:
183            return False
184
185        # check denied accounts
186        deny_accounts = parse_list(self._info.get("DenyAccounts", "(null)"))
187        if deny_accounts and user in deny_accounts:
188            return False
189
190        # check allowed groups
191        user_groups = UserGroups.get_groups_or_init(user)
192        allow_groups = parse_list(self._info.get("AllowGroups", "ALL"))
193        if allow_groups and not any(group in allow_groups for group in user_groups):
194            return False
195
196        # check denied groups
197        deny_groups = parse_list(self._info.get("DenyGroups", "(null)"))
198        if deny_groups and any(group in deny_groups for group in user_groups):
199            return False
200
201        # check allowed QOS
202        user_qos = UserGroups.get_qos_or_init(user)
203        allow_qos = parse_list(self._info.get("AllowQos", "ALL"))
204        if allow_qos and user_qos not in allow_qos:
205            return False
206
207        # check denies QOS
208        deny_qos = parse_list(self._info.get("DenyQos", "(null)"))
209        return not (deny_qos and user_qos in deny_qos)

Check whether the specified user has access to this queue.

Arguments:
  • user (str): The username to check access for.
Returns:

bool: True if the user can submit jobs to this queue, False otherwise.

def get_destinations(self) -> list[str]:
211    def get_destinations(self) -> list[str]:
212        # no destinations
213        return []

Retrieve all destinations available for this queue route.

Returns:

list[str]: A list of destination queue names associated with the queue.

def from_route_only(self) -> bool:
215    def from_route_only(self) -> bool:
216        return False

Determine whether this queue can only be accessed via a route.

Returns:

bool: True if the queue is accessible exclusively through a route, False otherwise.

def to_yaml(self) -> str:
218    def to_yaml(self) -> str:
219        return yaml.dump(
220            self._info, default_flow_style=False, sort_keys=False, Dumper=Dumper
221        )

Return all information about the queue from the batch system in YAML format.

Returns:

str: YAML-formatted string of queue metadata.

def get_default_resources(self) -> qq_lib.properties.resources.Resources:
223    def get_default_resources(self) -> Resources:
224        return default_resources_from_dict(self._info)

Return the default resource definitions for this queue.

Returns:

Resources: Default resources allocated for jobs submitted to this queue.

@classmethod
def from_dict(cls, name: str, info: dict[str, str]) -> Self:
226    @classmethod
227    def from_dict(cls, name: str, info: dict[str, str]) -> Self:
228        """
229        Construct a new instance of SlurmQueue from a queue name and a dictionary of queue information.
230
231
232        Args:
233            name (str): The unique name of the queue.
234            info (dict[str, str]): A dictionary containing Slurm queue metadata as key-value pairs.
235
236        Returns:
237            Self: A new instance of SlurmQueue.
238
239        Note:
240            This method does not perform any validation or processing of the provided dictionary.
241        """
242        queue = cls.__new__(cls)
243        queue._name = name
244        queue._info = info
245        queue._set_job_numbers()
246
247        return queue

Construct a new instance of SlurmQueue from a queue name and a dictionary of queue information.

Arguments:
  • name (str): The unique name of the queue.
  • info (dict[str, str]): A dictionary containing Slurm queue metadata as key-value pairs.
Returns:

Self: A new instance of SlurmQueue.

Note:

This method does not perform any validation or processing of the provided dictionary.

class Slurm(qq_lib.batch.interface.interface.BatchInterface[qq_lib.batch.slurm.job.SlurmJob, qq_lib.batch.slurm.queue.SlurmQueue, qq_lib.batch.slurm.node.SlurmNode]):
 32class Slurm(BatchInterface[SlurmJob, SlurmQueue, SlurmNode]):
 33    """
 34    Implementation of BatchInterface for Slurm batch system.
 35    """
 36
 37    @classmethod
 38    def env_name(cls) -> str:
 39        return "Slurm"
 40
 41    @classmethod
 42    def is_available(cls) -> bool:
 43        return (
 44            shutil.which("sbatch") is not None
 45            and shutil.which("it4ifree") is None
 46            and shutil.which("lumi-allocations") is None
 47        )
 48
 49    @classmethod
 50    def get_job_id(cls) -> str | None:
 51        return os.environ.get("SLURM_JOB_ID")
 52
 53    @classmethod
 54    def job_submit(
 55        cls,
 56        res: Resources,
 57        queue: str,
 58        script: Path,
 59        job_name: str,
 60        depend: list[Depend],
 61        env_vars: dict[str, str],
 62        account: str | None = None,
 63        server: str | None = None,
 64        remote_host: str | None = None,
 65    ) -> str:
 66        # server is unused
 67        if server:
 68            logger.warning("The 'server' option is ignored for Slurm.")
 69
 70        input_dir = script.parent
 71        logger.debug(f"Job submission: input directory is '{str(input_dir)}'.")
 72
 73        # intentionally using PBS
 74        PBS._shared_guard(input_dir, res, env_vars, server, remote_host)
 75
 76        command = cls._translate_submit(
 77            res, queue, script.parent, str(script), job_name, depend, env_vars, account
 78        )
 79
 80        if not remote_host:
 81            logger.debug(f"Submitting job using '{command}'.")
 82            result = subprocess.run(
 83                ["bash"],
 84                input=command,
 85                text=True,
 86                check=False,
 87                capture_output=True,
 88                errors="replace",
 89            )
 90        else:
 91            # submit the script from the remote host
 92            logger.debug(
 93                f"Navigating to '{remote_host}' to execute the submission command '{command}'."
 94            )
 95            result = subprocess.run(
 96                [
 97                    "ssh",
 98                    "-o PasswordAuthentication=no",
 99                    "-o GSSAPIAuthentication=yes",
100                    "-o StrictHostKeyChecking=no",  # allow unknown hosts
101                    f"-o ConnectTimeout={CFG.timeouts.ssh}",
102                    "-q",  # suppress some SSH messages
103                    remote_host,
104                    command,
105                ],
106                capture_output=True,
107                text=True,
108            )
109
110        if result.returncode != 0:
111            raise QQError(
112                f"Failed to submit script '{str(script)}': {result.stderr.strip()}."
113            )
114
115        return result.stdout.split()[-1]
116
117    @classmethod
118    def job_kill(cls, job_id: str) -> None:
119        command = cls._translate_kill(job_id)
120        logger.debug(command)
121
122        # run the kill command
123        result = subprocess.run(
124            ["bash"],
125            input=command,
126            text=True,
127            check=False,
128            capture_output=True,
129            errors="replace",
130        )
131
132        if result.returncode != 0:
133            raise QQError(f"Failed to kill job '{job_id}': {result.stderr.strip()}.")
134
135    @classmethod
136    def job_kill_force(cls, job_id: str) -> None:
137        command = cls._translate_kill_force(job_id)
138        logger.debug(command)
139
140        # run the kill command
141        result = subprocess.run(
142            ["bash"],
143            input=command,
144            text=True,
145            check=False,
146            capture_output=True,
147            errors="replace",
148        )
149
150        if result.returncode != 0:
151            raise QQError(f"Failed to kill job '{job_id}': {result.stderr.strip()}.")
152
153    @classmethod
154    def get_batch_job(cls, job_id: str) -> SlurmJob:
155        return SlurmJob(job_id)
156
157    @classmethod
158    def get_batch_jobs_from_ids(cls, job_ids: list[str]) -> list[SlurmJob]:
159        command = f"sacct --allocations --noheader --parsable2 -j {','.join(job_ids)} --format={SACCT_FIELDS}"
160        logger.debug(command)
161
162        # sacct ignores IDs of nonexistent jobs - it will just not print any output for them
163        sacct_jobs = cls._get_batch_jobs_using_sacct_command(command)
164        sacct_jobs_dict = {
165            job.get_id(): job
166            for job in sacct_jobs
167            # filter out pending (queued) jobs - we want to use scontrol for them
168            if job.get_state() not in {BatchState.QUEUED, BatchState.HELD}
169        }
170
171        # get information about the remaining jobs using scontrol
172        scontrol_jobs = cls._get_jobs_in_parallel(
173            [id for id in job_ids if id not in sacct_jobs_dict]
174        )
175        scontrol_jobs_dict = {job.get_id(): job for job in scontrol_jobs}
176
177        # merge all jobs but maintain their original order
178        all_jobs = []
179        for id in job_ids:
180            if (job := sacct_jobs_dict.get(id)) or (job := scontrol_jobs_dict.get(id)):
181                all_jobs.append(job)
182
183        return all_jobs
184
185    @classmethod
186    def get_unfinished_batch_jobs(
187        cls, user: str, server: str | None = None
188    ) -> list[SlurmJob]:
189        # server unused
190        _ = server
191
192        # get running jobs from sacct (faster than using squeue and scontrol)
193        command = f"sacct -u {user} --state RUNNING --allocations --noheader --parsable2 --array --format={SACCT_FIELDS}"
194        logger.debug(command)
195
196        sacct_jobs = cls._get_batch_jobs_using_sacct_command(command)
197
198        # get pending jobs using squeue
199        command = f'squeue -u {user} --array -t PENDING -h -o "%i"'
200        logger.debug(command)
201
202        squeue_jobs = cls._get_batch_jobs_using_squeue_command(command)
203
204        # filter out duplicate jobs
205        merged = {job.get_id(): job for job in sacct_jobs + squeue_jobs}
206        return list(merged.values())
207
208    @classmethod
209    def get_batch_jobs(cls, user: str, server: str | None = None) -> list[SlurmJob]:
210        # server unused
211        _ = server
212
213        # get all jobs, except pending for which full information is not available using sacct
214        command = f"sacct -u {user} --allocations --noheader --parsable2 --array --format={SACCT_FIELDS}"
215        logger.debug(command)
216
217        sacct_jobs = cls._get_batch_jobs_using_sacct_command(command)
218
219        # get pending jobs using squeue
220        command = f'squeue -u {user} --array -t PENDING -h -o "%i"'
221        logger.debug(command)
222
223        squeue_jobs = cls._get_batch_jobs_using_squeue_command(command)
224
225        # filter out duplicate jobs
226        merged = {job.get_id(): job for job in sacct_jobs + squeue_jobs}
227        return list(merged.values())
228
229    @classmethod
230    def get_all_unfinished_batch_jobs(cls, server: str | None = None) -> list[SlurmJob]:
231        # server unused
232        _ = server
233
234        # get running jobs using sacct (faster than using squeue and scontrol)
235        command = f"sacct --state RUNNING --allusers --allocations --noheader --parsable2 --array --format={SACCT_FIELDS}"
236        logger.debug(command)
237
238        sacct_jobs = cls._get_batch_jobs_using_sacct_command(command)
239
240        # get pending jobs using squeue
241        command = 'squeue --array -t PENDING -h -o "%i"'
242        logger.debug(command)
243
244        squeue_jobs = cls._get_batch_jobs_using_squeue_command(command)
245
246        # filter out duplicate jobs
247        merged = {job.get_id(): job for job in sacct_jobs + squeue_jobs}
248        return list(merged.values())
249
250    @classmethod
251    def get_all_batch_jobs(cls, server: str | None = None) -> list[SlurmJob]:
252        # server unused
253        _ = server
254
255        # get all jobs, except pending which are not available from sacct
256        command = f"sacct --allusers --allocations --noheader --parsable2 --array --format={SACCT_FIELDS}"
257        logger.debug(command)
258
259        sacct_jobs = cls._get_batch_jobs_using_sacct_command(command)
260
261        # get pending jobs using squeue
262        command = 'squeue --array -t PENDING -h -o "%i"'
263        logger.debug(command)
264
265        squeue_jobs = cls._get_batch_jobs_using_squeue_command(command)
266
267        # filter out duplicate jobs
268        merged = {job.get_id(): job for job in sacct_jobs + squeue_jobs}
269        return list(merged.values())
270
271    @classmethod
272    def get_queues(cls, server: str | None = None) -> list[SlurmQueue]:
273        # server unused
274        _ = server
275
276        command = "scontrol show partition -o"
277        logger.debug(command)
278
279        result = subprocess.run(
280            ["bash"],
281            input=command,
282            text=True,
283            check=False,
284            capture_output=True,
285            errors="replace",
286        )
287
288        if result.returncode != 0:
289            raise QQError(
290                f"Could not retrieve information about queues: {result.stderr.strip()}."
291            )
292
293        queues = []
294        for line in result.stdout.splitlines():
295            info = parse_slurm_dump_to_dictionary(line)
296            queues.append(SlurmQueue.from_dict(info["PartitionName"], info))
297
298        return queues
299
300    @classmethod
301    def get_nodes(cls, server: str | None = None) -> list[SlurmNode]:
302        # server unused
303        _ = server
304
305        command = "scontrol show node -o"
306        logger.debug(command)
307
308        result = subprocess.run(
309            ["bash"],
310            input=command,
311            text=True,
312            check=False,
313            capture_output=True,
314            errors="replace",
315        )
316
317        if result.returncode != 0:
318            raise QQError(
319                f"Could not retrieve information about nodes: {result.stderr.strip()}."
320            )
321
322        nodes = []
323        for line in result.stdout.splitlines():
324            info = parse_slurm_dump_to_dictionary(line)
325            nodes.append(SlurmNode.from_dict(info["NodeName"], info))
326
327        return nodes
328
329    @classmethod
330    def read_remote_file(cls, host: str, file: Path) -> str:
331        return PBS.read_remote_file(host, file)
332
333    @classmethod
334    def write_remote_file(cls, host: str, file: Path, content: str) -> None:
335        PBS.write_remote_file(host, file, content)
336
337    @classmethod
338    def make_remote_dir(cls, host: str, directory: Path) -> None:
339        PBS.make_remote_dir(host, directory)
340
341    @classmethod
342    def list_remote_dir(cls, host: str, directory: Path) -> list[Path]:
343        return PBS.list_remote_dir(host, directory)
344
345    @classmethod
346    def delete_remote_dir(cls, host: str, directory: Path) -> None:
347        PBS.delete_remote_dir(host, directory)
348
349    @classmethod
350    def move_remote_files(
351        cls, host: str, files: list[Path], moved_files: list[Path]
352    ) -> None:
353        PBS.move_remote_files(host, files, moved_files)
354
355    @classmethod
356    def sync_with_exclusions(
357        cls,
358        src_dir: Path,
359        dest_dir: Path,
360        src_host: str | None,
361        dest_host: str | None,
362        exclude_files: list[Path] | None = None,
363    ) -> None:
364        PBS.sync_with_exclusions(src_dir, dest_dir, src_host, dest_host, exclude_files)
365
366    @classmethod
367    def sync_selected(
368        cls,
369        src_dir: Path,
370        dest_dir: Path,
371        src_host: str | None,
372        dest_host: str | None,
373        include_files: list[Path] | None = None,
374    ) -> None:
375        PBS.sync_selected(src_dir, dest_dir, src_host, dest_host, include_files)
376
377    @classmethod
378    def sort_jobs(cls, jobs: list[SlurmJob]) -> None:
379        jobs.sort(key=lambda job: job.get_ids_for_sorting())
380
381    @classmethod
382    def jobs_presenter_columns_to_show(cls) -> set[str]:
383        return {
384            "S",
385            "Job ID",
386            "User",
387            "Job Name",
388            "Queue",
389            "NCPUs",
390            "NGPUs",
391            "NNodes",
392            "Times",
393            "Node",
394            "Exit",
395        }
396
397    @classmethod
398    def _translate_kill(cls, job_id: str) -> str:
399        """
400        Generate the Slurm kill command for a job using SIGTERM.
401
402        Args:
403            job_id (str): The ID of the job to kill.
404
405        Returns:
406            str: The scancel command sending SIGTERM.
407        """
408        return f"scancel {job_id}"
409
410    @classmethod
411    def _translate_kill_force(cls, job_id: str) -> str:
412        """
413        Generate the Slurm kill command for a job using SIGKILL.
414
415        Args:
416            job_id (str): The ID of the job to kill.
417
418        Returns:
419            str: The scancel command sending SIGKILL.
420        """
421        return f"scancel --signal=KILL {job_id}"
422
423    @classmethod
424    def _translate_submit(
425        cls,
426        res: Resources,
427        queue: str,
428        input_dir: Path,
429        script: str,
430        job_name: str,
431        depend: list[Depend],
432        env_vars: dict[str, str],
433        account: str | None,
434    ) -> str:
435        """
436        Generate the Slurm submission command for a job.
437
438        Args:
439            res (Resources): The resources requested for the job.
440            queue (str): The queue name to submit to.
441            input_dir (Path): The directory from which the job is being submitted.
442            script (str): Path to the job script.
443            job_name (str): Name of the job.
444            depend (list[Depend]): List of dependencies of the job.
445            env_vars (dict[str, str]): Dictionary of environment variables and their values to propagate to the job's environment.
446            account (str | None): Optional name of the account to use for the job.
447
448        Returns:
449            str: The fully constructed sbatch command string.
450        """
451        qq_output = str((input_dir / job_name).with_suffix(CFG.suffixes.qq_out))
452        command = f"sbatch -J {job_name} -p {queue} -e {qq_output} -o {qq_output} "
453
454        if account:
455            command += f"--account {account} "
456
457        # translate environment variables
458        if env_vars:
459            command += f"--export ALL,{cls._translate_env_vars(env_vars)} "
460
461        # handle number of nodes
462        command += f"--nodes {res.nnodes} "
463
464        # handle per-chunk resources
465        translated = cls._translate_per_chunk_resources(res)
466        command += " ".join(translated) + " "
467
468        # handle properties
469        if res.props:
470            constraints = []
471            for k, v in res.props.items():
472                if v != "true":
473                    raise QQError(
474                        f"Slurm only supports properties with a value of 'true', not '{k}={v}'."
475                    )
476                constraints.append(k)
477
478            command += f'--constraint="{"&".join(constraints)}" '
479
480        # handle walltime
481        if res.walltime:
482            command += f"--time={res.walltime} "
483
484        # handle dependencies
485        if converted_depend := cls._translate_dependencies(depend):
486            command += f"--dependency={converted_depend} "
487
488        # set input directory for the job
489        command += f"--chdir={str(input_dir)} "
490
491        # add script
492        command += str(input_dir / script)
493
494        return command
495
496    @classmethod
497    def _translate_env_vars(cls, env_vars: dict[str, str]) -> str:
498        """
499        Convert a dictionary of environment variables into a formatted string.
500
501        Args:
502            env_vars (dict[str, str]): A mapping of environment variable names
503                to their corresponding values.
504
505        Returns:
506            str: A comma-separated string of environment variable assignments,
507                suitable for inclusion in the sbatch command.
508        """
509        converted = []
510        for key, value in env_vars.items():
511            converted.append(f'{key}="{value}"')
512
513        return ",".join(converted)
514
515    @classmethod
516    def _translate_per_chunk_resources(cls, res: Resources) -> list[str]:
517        """
518        Convert a Resources object into a list of per-node resource specifications.
519
520        Each resource that can be divided by the number of nodes (nnodes) is split
521        accordingly.
522
523        Args:
524            res (Resources): The resource specification for the job.
525
526        Returns:
527            list[str]: A list of per-node resource strings suitable for inclusion
528                    in the sbatch command.
529
530        Raises:
531            QQError: If sanity checks fail or required memory attributes are missing.
532        """
533
534        trans_res = []
535
536        # sanity checking per-chunk resources
537        if res.nnodes is None:
538            raise QQError(
539                "Attribute 'nnodes' should not be undefined. This is a bug, please report it."
540            )
541        if res.nnodes == 0:
542            raise QQError("Attribute 'nnodes' cannot be 0.")
543
544        if res.ncpus and res.ncpus != 0 and res.ncpus % res.nnodes != 0:
545            raise QQError(
546                f"Attribute 'ncpus' ({res.ncpus}) must be divisible by 'nnodes' ({res.nnodes})."
547            )
548        if res.ngpus and res.ngpus != 0 and res.ngpus % res.nnodes != 0:
549            raise QQError(
550                f"Attribute 'ngpus' ({res.ngpus}) must be divisible by 'nnodes' ({res.nnodes})."
551            )
552
553        # translate per-chunk resources
554        if res.ncpus:
555            # we set MPI ranks and OpenMP threads here, but these can be overriden
556            # in the body of the script
557            # this setup is here only to allow for better accounting by Slurm
558            trans_res.append("--ntasks-per-node=1")
559            trans_res.append(f"--cpus-per-task={res.ncpus // res.nnodes}")
560        elif res.ncpus_per_node:
561            trans_res.append("--ntasks-per-node=1")
562            trans_res.append(f"--cpus-per-task={res.ncpus_per_node}")
563
564        if res.mem:
565            trans_res.append(f"--mem={(res.mem // res.nnodes).to_str_exact_slurm()}")
566        elif res.mem_per_node:
567            trans_res.append(f"--mem={res.mem_per_node.to_str_exact_slurm()}")
568        elif res.mem_per_cpu:
569            trans_res.append(f"--mem-per-cpu={res.mem_per_cpu.to_str_exact_slurm()}")
570        else:
571            # memory not set in any way
572            raise QQError(
573                "None of the attributes 'mem', 'mem-per-node', or 'mem-per-cpu' is defined."
574            )
575
576        if res.ngpus:
577            trans_res.append(f"--gpus-per-node={res.ngpus // res.nnodes}")
578        elif res.ngpus_per_node:
579            trans_res.append(f"--gpus-per-node={res.ngpus_per_node}")
580
581        return trans_res
582
583    @classmethod
584    def _translate_dependencies(cls, depend: list[Depend]) -> str | None:
585        """
586        Convert a list of `Depend` objects into a Slurm-compatible dependency string.
587
588        Args:
589            depend (list[Depend]): List of dependency objects to translate.
590
591        Returns:
592            str | None: Slurm-style dependency string (e.g., "after:12345,afterok:1:2:3"),
593                        or None if the input list is empty.
594        """
595        if not depend:
596            return None
597
598        return ",".join(Depend.to_str(x).replace("=", ":") for x in depend)
599
600    @classmethod
601    def _get_default_server_resources(cls) -> Resources:
602        """
603        Return a Resources object representing the default resources for a batch job.
604
605        Returns:
606            Resources: Default batch job resources obtained from `slurm.conf`.
607        """
608        command = "scontrol show config"
609
610        result = subprocess.run(
611            ["bash"],
612            input=command,
613            text=True,
614            check=False,
615            capture_output=True,
616            errors="replace",
617        )
618
619        if result.returncode != 0:
620            logger.debug("Could not get server resources. Ignoring.")
621            return Resources()
622
623        info = parse_slurm_dump_to_dictionary(result.stdout, "\n")
624        server_resources = default_resources_from_dict(info)
625
626        return Resources.merge_resources(server_resources, cls._get_default_resources())
627
628    @classmethod
629    def _get_default_resources(cls) -> Resources:
630        """
631        Return a Resources object representing the default, hard-coded resources for a batch job.
632        """
633        return Resources(
634            nnodes=1,
635            ncpus=1,
636            mem_per_cpu="1gb",
637            work_dir="scratch_local",
638            work_size_per_cpu="1gb",
639            walltime="1d",
640        )
641
642    @classmethod
643    def _get_batch_jobs_using_sacct_command(cls, command: str) -> list[SlurmJob]:
644        """
645        Execute `sacct` to retrieve information about Slurm jobs and parse it.
646
647        Args:
648            command (str): A Slurm command to get the relevant jobs.
649
650        Returns:
651            list[SlurmJob]: A list of `SlurmJob` instances corresponding to the jobs
652                            returned by the command.
653
654        Raises:
655            QQError: If the command fails (non-zero return code) or if the output
656                    cannot be parsed into valid job information.
657        """
658        result = subprocess.run(
659            ["bash"],
660            input=command,
661            text=True,
662            check=False,
663            capture_output=True,
664            errors="replace",
665        )
666
667        if result.returncode != 0:
668            raise QQError(
669                f"Could not retrieve information about jobs: {result.stderr.strip()}."
670            )
671
672        jobs = []
673        for sacct_string in result.stdout.split("\n"):
674            if sacct_string.strip() == "":
675                continue
676
677            jobs.append(SlurmJob.from_sacct_string(sacct_string))
678
679        return jobs
680
681    @classmethod
682    def _get_batch_jobs_using_squeue_command(cls, command: str) -> list[SlurmJob]:
683        """
684        Execute `squeue` and `scontrol show job` to retrieve information about Slurm jobs.
685
686        Multiple `scontrol` commands are executed in parallel
687        to increase the speed of collecting the information about jobs.
688
689        Note that the jobs are returned in an arbitrary order.
690
691        Args:
692            command (str): A Slurm command to get the relevant job IDs.
693
694        Returns:
695            list[SlurmJob]: A list of `SlurmJob` instances corresponding to the jobs
696                            returned by the command.
697
698        Raises:
699            QQError: If the command fails (non-zero return code) or if the output
700                    cannot be parsed into valid job information.
701        """
702        result = subprocess.run(
703            ["bash"],
704            input=command,
705            text=True,
706            check=False,
707            capture_output=True,
708            errors="replace",
709        )
710
711        if result.returncode != 0:
712            raise QQError(
713                f"Could not retrieve information about jobs: {result.stderr.strip()}."
714            )
715
716        ids = [line.strip() for line in result.stdout.split("\n") if line.strip()]
717
718        return Slurm._get_jobs_in_parallel(ids)
719
720    @classmethod
721    def _get_jobs_in_parallel(cls, job_ids: list[str]) -> list[SlurmJob]:
722        """
723        Constructs Slurm jobs in parallel.
724
725        Args:
726            job_ids (list[str]): A list of job IDs to collect.
727
728        Returns:
729            list[SlurmJob]: A list of SlurmJob objects.
730        """
731
732        def get_job(job_id: str) -> SlurmJob:
733            return SlurmJob(job_id)
734
735        jobs: list[SlurmJob] = []
736
737        with ThreadPoolExecutor(
738            max_workers=CFG.slurm_options.jobs_scontrol_nthreads
739        ) as executor:
740            future_to_id = {executor.submit(get_job, id): id for id in job_ids}
741
742            for future in as_completed(future_to_id):
743                try:
744                    jobs.append(future.result())
745                except Exception as e:
746                    id = future_to_id[future]
747                    raise QQError(f"Failed to load job {id}: {e}.") from e
748
749        return jobs

Implementation of BatchInterface for Slurm batch system.

@classmethod
def env_name(cls) -> str:
37    @classmethod
38    def env_name(cls) -> str:
39        return "Slurm"

Return the name of the batch system environment.

Returns:

str: The batch system name.

@classmethod
def is_available(cls) -> bool:
41    @classmethod
42    def is_available(cls) -> bool:
43        return (
44            shutil.which("sbatch") is not None
45            and shutil.which("it4ifree") is None
46            and shutil.which("lumi-allocations") is None
47        )

Determine whether the batch system is available on the current host.

Implementations typically verify this by checking for the presence of required commands or other environment-specific indicators.

Returns:

bool: True if the batch system is available, False otherwise.

@classmethod
def get_job_id(cls) -> str | None:
49    @classmethod
50    def get_job_id(cls) -> str | None:
51        return os.environ.get("SLURM_JOB_ID")

Get the id of the current job from the corresponding batch system's environment variable.

For this method to work, it has to be called from the inside of an active job.

Returns:

str | None: Index of the job or None if the collective variable is not set.

@classmethod
def job_submit( cls, res: qq_lib.properties.resources.Resources, queue: str, script: pathlib._local.Path, job_name: str, depend: list[qq_lib.properties.depend.Depend], env_vars: dict[str, str], account: str | None = None, server: str | None = None, remote_host: str | None = None) -> str:
 53    @classmethod
 54    def job_submit(
 55        cls,
 56        res: Resources,
 57        queue: str,
 58        script: Path,
 59        job_name: str,
 60        depend: list[Depend],
 61        env_vars: dict[str, str],
 62        account: str | None = None,
 63        server: str | None = None,
 64        remote_host: str | None = None,
 65    ) -> str:
 66        # server is unused
 67        if server:
 68            logger.warning("The 'server' option is ignored for Slurm.")
 69
 70        input_dir = script.parent
 71        logger.debug(f"Job submission: input directory is '{str(input_dir)}'.")
 72
 73        # intentionally using PBS
 74        PBS._shared_guard(input_dir, res, env_vars, server, remote_host)
 75
 76        command = cls._translate_submit(
 77            res, queue, script.parent, str(script), job_name, depend, env_vars, account
 78        )
 79
 80        if not remote_host:
 81            logger.debug(f"Submitting job using '{command}'.")
 82            result = subprocess.run(
 83                ["bash"],
 84                input=command,
 85                text=True,
 86                check=False,
 87                capture_output=True,
 88                errors="replace",
 89            )
 90        else:
 91            # submit the script from the remote host
 92            logger.debug(
 93                f"Navigating to '{remote_host}' to execute the submission command '{command}'."
 94            )
 95            result = subprocess.run(
 96                [
 97                    "ssh",
 98                    "-o PasswordAuthentication=no",
 99                    "-o GSSAPIAuthentication=yes",
100                    "-o StrictHostKeyChecking=no",  # allow unknown hosts
101                    f"-o ConnectTimeout={CFG.timeouts.ssh}",
102                    "-q",  # suppress some SSH messages
103                    remote_host,
104                    command,
105                ],
106                capture_output=True,
107                text=True,
108            )
109
110        if result.returncode != 0:
111            raise QQError(
112                f"Failed to submit script '{str(script)}': {result.stderr.strip()}."
113            )
114
115        return result.stdout.split()[-1]

Submit a job to the batch system.

Can also perform additional validation of the job's resources.

This method is NOT guaranteed to be thread-safe.

Arguments:
  • res (Resources): Resources required for the job.
  • queue (str): Target queue for the job submission.
  • script (Path): Path to the script to execute.
  • job_name (str): Name of the job to use.
  • depend (list[Depend]): List of job dependencies.
  • env_vars (dict[str, str]): Dictionary of environment variables to propagate to the job.
  • account (str | None): Optional account name to use for the job.
  • server (str | None): Optional name of the server to submit the job to.
  • remote_host (str | None): Optional name of the machine to submit the job from.
Returns:

str: Unique ID of the submitted job.

Raises:
  • QQError: If the job submission fails.
@classmethod
def job_kill(cls, job_id: str) -> None:
117    @classmethod
118    def job_kill(cls, job_id: str) -> None:
119        command = cls._translate_kill(job_id)
120        logger.debug(command)
121
122        # run the kill command
123        result = subprocess.run(
124            ["bash"],
125            input=command,
126            text=True,
127            check=False,
128            capture_output=True,
129            errors="replace",
130        )
131
132        if result.returncode != 0:
133            raise QQError(f"Failed to kill job '{job_id}': {result.stderr.strip()}.")

Terminate a job gracefully. The job should have time for proper cleanup.

Arguments:
  • job_id (str): Identifier of the job to terminate.
Raises:
  • QQError: If the job could not be killed.
@classmethod
def job_kill_force(cls, job_id: str) -> None:
135    @classmethod
136    def job_kill_force(cls, job_id: str) -> None:
137        command = cls._translate_kill_force(job_id)
138        logger.debug(command)
139
140        # run the kill command
141        result = subprocess.run(
142            ["bash"],
143            input=command,
144            text=True,
145            check=False,
146            capture_output=True,
147            errors="replace",
148        )
149
150        if result.returncode != 0:
151            raise QQError(f"Failed to kill job '{job_id}': {result.stderr.strip()}.")

Forcefully terminate a job. There may be no time for proper cleanup.

Arguments:
  • job_id (str): Identifier of the job to forcefully terminate.
Raises:
  • QQError: If the job could not be killed.
@classmethod
def get_batch_job(cls, job_id: str) -> SlurmJob:
153    @classmethod
154    def get_batch_job(cls, job_id: str) -> SlurmJob:
155        return SlurmJob(job_id)

Retrieve information about a job from the batch system.

The returned object should be fully initialized, even if the job no longer exists or its information is unavailable.

Arguments:
  • job_id (str): Identifier of the job.
Returns:

BatchJobInterface: Object containing the job's metadata and state.

@classmethod
def get_batch_jobs_from_ids(cls, job_ids: list[str]) -> list[SlurmJob]:
157    @classmethod
158    def get_batch_jobs_from_ids(cls, job_ids: list[str]) -> list[SlurmJob]:
159        command = f"sacct --allocations --noheader --parsable2 -j {','.join(job_ids)} --format={SACCT_FIELDS}"
160        logger.debug(command)
161
162        # sacct ignores IDs of nonexistent jobs - it will just not print any output for them
163        sacct_jobs = cls._get_batch_jobs_using_sacct_command(command)
164        sacct_jobs_dict = {
165            job.get_id(): job
166            for job in sacct_jobs
167            # filter out pending (queued) jobs - we want to use scontrol for them
168            if job.get_state() not in {BatchState.QUEUED, BatchState.HELD}
169        }
170
171        # get information about the remaining jobs using scontrol
172        scontrol_jobs = cls._get_jobs_in_parallel(
173            [id for id in job_ids if id not in sacct_jobs_dict]
174        )
175        scontrol_jobs_dict = {job.get_id(): job for job in scontrol_jobs}
176
177        # merge all jobs but maintain their original order
178        all_jobs = []
179        for id in job_ids:
180            if (job := sacct_jobs_dict.get(id)) or (job := scontrol_jobs_dict.get(id)):
181                all_jobs.append(job)
182
183        return all_jobs

Retrieve information about multiple jobs from the batch system.

Batch jobs should be returned in the same order as they appear in job_ids. A TBatchJob object should be returned for each job id, even if the job no longer exists or its information is unavailable.

Array jobs should NOT be expanded into their individual tasks.

The default implementation is to call get_batch_job for each job id. This implementation may be inefficient for large numbers of job ids and should be overriden by subclasses.

Arguments:
  • job_ids (list[str]): List of job identifiers.
Returns:

list[TBatchJob]: List of TBatchJob objects, one for each job id.

@classmethod
def get_unfinished_batch_jobs( cls, user: str, server: str | None = None) -> list[SlurmJob]:
185    @classmethod
186    def get_unfinished_batch_jobs(
187        cls, user: str, server: str | None = None
188    ) -> list[SlurmJob]:
189        # server unused
190        _ = server
191
192        # get running jobs from sacct (faster than using squeue and scontrol)
193        command = f"sacct -u {user} --state RUNNING --allocations --noheader --parsable2 --array --format={SACCT_FIELDS}"
194        logger.debug(command)
195
196        sacct_jobs = cls._get_batch_jobs_using_sacct_command(command)
197
198        # get pending jobs using squeue
199        command = f'squeue -u {user} --array -t PENDING -h -o "%i"'
200        logger.debug(command)
201
202        squeue_jobs = cls._get_batch_jobs_using_squeue_command(command)
203
204        # filter out duplicate jobs
205        merged = {job.get_id(): job for job in sacct_jobs + squeue_jobs}
206        return list(merged.values())

Retrieve information about all uncompleted jobs submitted by user on the specified or default batch server.

The jobs can be returned in arbitrary order.

Arguments:
  • user (str): Username for which to fetch uncompleted jobs.
  • server (str | None): Optional name of the batch server to get jobs from.
Returns:

list[BatchJobInterface]: A list of job info objects representing the user's uncompleted jobs.

@classmethod
def get_batch_jobs( cls, user: str, server: str | None = None) -> list[SlurmJob]:
208    @classmethod
209    def get_batch_jobs(cls, user: str, server: str | None = None) -> list[SlurmJob]:
210        # server unused
211        _ = server
212
213        # get all jobs, except pending for which full information is not available using sacct
214        command = f"sacct -u {user} --allocations --noheader --parsable2 --array --format={SACCT_FIELDS}"
215        logger.debug(command)
216
217        sacct_jobs = cls._get_batch_jobs_using_sacct_command(command)
218
219        # get pending jobs using squeue
220        command = f'squeue -u {user} --array -t PENDING -h -o "%i"'
221        logger.debug(command)
222
223        squeue_jobs = cls._get_batch_jobs_using_squeue_command(command)
224
225        # filter out duplicate jobs
226        merged = {job.get_id(): job for job in sacct_jobs + squeue_jobs}
227        return list(merged.values())

Retrieve information about all jobs submitted by a specific user (including finished jobs) on the specified or default batch server.

The jobs can be returned in arbitrary order.

Arguments:
  • user (str): Username for which to fetch all jobs.
  • server (str | None): Optional name of the batch server to get jobs from.
Returns:

list[BatchJobInterface]: A list of job info objects representing all jobs of the user.

@classmethod
def get_all_unfinished_batch_jobs(cls, server: str | None = None) -> list[SlurmJob]:
229    @classmethod
230    def get_all_unfinished_batch_jobs(cls, server: str | None = None) -> list[SlurmJob]:
231        # server unused
232        _ = server
233
234        # get running jobs using sacct (faster than using squeue and scontrol)
235        command = f"sacct --state RUNNING --allusers --allocations --noheader --parsable2 --array --format={SACCT_FIELDS}"
236        logger.debug(command)
237
238        sacct_jobs = cls._get_batch_jobs_using_sacct_command(command)
239
240        # get pending jobs using squeue
241        command = 'squeue --array -t PENDING -h -o "%i"'
242        logger.debug(command)
243
244        squeue_jobs = cls._get_batch_jobs_using_squeue_command(command)
245
246        # filter out duplicate jobs
247        merged = {job.get_id(): job for job in sacct_jobs + squeue_jobs}
248        return list(merged.values())

Retrieve information about uncompleted jobs of all users on the specified or default batch server.

The jobs can be returned in arbitrary order.

Arguments:
  • server (str | None): Optional name of the batch server to get jobs from.
Returns:

list[BatchJobInterface]: A list of job info objects representing uncompleted jobs of all users.

@classmethod
def get_all_batch_jobs(cls, server: str | None = None) -> list[SlurmJob]:
250    @classmethod
251    def get_all_batch_jobs(cls, server: str | None = None) -> list[SlurmJob]:
252        # server unused
253        _ = server
254
255        # get all jobs, except pending which are not available from sacct
256        command = f"sacct --allusers --allocations --noheader --parsable2 --array --format={SACCT_FIELDS}"
257        logger.debug(command)
258
259        sacct_jobs = cls._get_batch_jobs_using_sacct_command(command)
260
261        # get pending jobs using squeue
262        command = 'squeue --array -t PENDING -h -o "%i"'
263        logger.debug(command)
264
265        squeue_jobs = cls._get_batch_jobs_using_squeue_command(command)
266
267        # filter out duplicate jobs
268        merged = {job.get_id(): job for job in sacct_jobs + squeue_jobs}
269        return list(merged.values())

Retrieve information about all jobs of all users on the specified or default batch server.

The jobs can be returned in arbitrary order.

Arguments:
  • server (str | None): Optional name of the batch server to get jobs from.
Returns:

list[BatchJobInterface]: A list of job info objects representing all jobs of all users.

@classmethod
def get_queues( cls, server: str | None = None) -> list[SlurmQueue]:
271    @classmethod
272    def get_queues(cls, server: str | None = None) -> list[SlurmQueue]:
273        # server unused
274        _ = server
275
276        command = "scontrol show partition -o"
277        logger.debug(command)
278
279        result = subprocess.run(
280            ["bash"],
281            input=command,
282            text=True,
283            check=False,
284            capture_output=True,
285            errors="replace",
286        )
287
288        if result.returncode != 0:
289            raise QQError(
290                f"Could not retrieve information about queues: {result.stderr.strip()}."
291            )
292
293        queues = []
294        for line in result.stdout.splitlines():
295            info = parse_slurm_dump_to_dictionary(line)
296            queues.append(SlurmQueue.from_dict(info["PartitionName"], info))
297
298        return queues

Retrieve all queues managed by the batch system on the specified or default batch server.

Arguments:
  • server (str | None): Optional name of the batch server to get queues from.
Returns:

list[BatchQueueInterface]: A list of queue objects existing in the batch system.

@classmethod
def get_nodes( cls, server: str | None = None) -> list[SlurmNode]:
300    @classmethod
301    def get_nodes(cls, server: str | None = None) -> list[SlurmNode]:
302        # server unused
303        _ = server
304
305        command = "scontrol show node -o"
306        logger.debug(command)
307
308        result = subprocess.run(
309            ["bash"],
310            input=command,
311            text=True,
312            check=False,
313            capture_output=True,
314            errors="replace",
315        )
316
317        if result.returncode != 0:
318            raise QQError(
319                f"Could not retrieve information about nodes: {result.stderr.strip()}."
320            )
321
322        nodes = []
323        for line in result.stdout.splitlines():
324            info = parse_slurm_dump_to_dictionary(line)
325            nodes.append(SlurmNode.from_dict(info["NodeName"], info))
326
327        return nodes

Retrieve all nodes managed by the batch system on the specified or default batch server.

Arguments:
  • server (str | None): Optional name of the batch server to get nodes from.
Returns:

list[BatchNodeInterface]: A list of node objects existing in the batch system.

@classmethod
def read_remote_file(cls, host: str, file: pathlib._local.Path) -> str:
329    @classmethod
330    def read_remote_file(cls, host: str, file: Path) -> str:
331        return PBS.read_remote_file(host, file)

Read the contents of a file on a remote host and return it as a string.

The default implementation uses SSH to retrieve the file contents. This approach may be inefficient on shared storage or high-latency networks. Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.

Subclasses should override this method to provide a more efficient implementation if possible.

Arguments:
  • host (str): The hostname of the remote machine where the file resides.
  • file (Path): The path to the file on the remote host.
Returns:

str: The contents of the remote file.

Raises:
  • QQError: If the file cannot be read or SSH fails.
@classmethod
def write_remote_file(cls, host: str, file: pathlib._local.Path, content: str) -> None:
333    @classmethod
334    def write_remote_file(cls, host: str, file: Path, content: str) -> None:
335        PBS.write_remote_file(host, file, content)

Write the given content to a file on a remote host, overwriting it if it exists.

The default implementation uses SSH to send the content to the remote file. This approach may be inefficient on shared storage or high-latency networks. Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.

Subclasses should override this method to provide a more efficient implementation if possible.

Arguments:
  • host (str): The hostname of the remote machine where the file resides.
  • file (Path): The path to the file on the remote host.
  • content (str): The content to write to the remote file.
Raises:
  • QQError: If the file cannot be written or SSH fails.
@classmethod
def make_remote_dir(cls, host: str, directory: pathlib._local.Path) -> None:
337    @classmethod
338    def make_remote_dir(cls, host: str, directory: Path) -> None:
339        PBS.make_remote_dir(host, directory)

Create a directory at the specified path on a remote host.

The default implementation uses SSH to run mkdir on the remote host. This approach may be inefficient on shared storage or high-latency networks. Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.

Subclasses should override this method to provide a more efficient implementation if possible.

Arguments:
  • host (str): The hostname of the remote machine where the directory should be created.
  • directory (Path): The path of the directory to create on the remote host.
Raises:
  • QQError: If the directory cannot be created but does not already exist or the SSH command fails.
@classmethod
def list_remote_dir( cls, host: str, directory: pathlib._local.Path) -> list[pathlib._local.Path]:
341    @classmethod
342    def list_remote_dir(cls, host: str, directory: Path) -> list[Path]:
343        return PBS.list_remote_dir(host, directory)

List all files and directories (absolute paths) in the specified directory on a remote host.

The default implementation uses SSH to run ls -A on the remote host. This approach may be inefficient on shared storage or high-latency networks. Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.

Subclasses should override this method to provide a more efficient implementation if possible.

Arguments:
  • host (str): The hostname of the remote machine where the directory resides.
  • directory (Path): The remote directory to list.
Returns:

list[Path]: A list of Path objects representing the entries inside the directory. Entries are relative to the given directory.

Raises:
  • QQError: If the directory cannot be listed or the SSH command fails.
@classmethod
def delete_remote_dir(cls, host: str, directory: pathlib._local.Path) -> None:
345    @classmethod
346    def delete_remote_dir(cls, host: str, directory: Path) -> None:
347        PBS.delete_remote_dir(host, directory)

Delete a directory on a remote host.

The default implementation uses SSH to run rm -r on the remote host. This approach may be inefficient on shared storage or high-latency networks. Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.

Subclasses should override this method to provide a more efficient implementation if possible.

Arguments:
  • host (str): The hostname of the remote machine where the directory resides.
  • directory (Path): The remote directory to delete.
Raises:
  • QQError: If the directory cannot be deleted or the SSH command fails.
@classmethod
def move_remote_files( cls, host: str, files: list[pathlib._local.Path], moved_files: list[pathlib._local.Path]) -> None:
349    @classmethod
350    def move_remote_files(
351        cls, host: str, files: list[Path], moved_files: list[Path]
352    ) -> None:
353        PBS.move_remote_files(host, files, moved_files)

Move files on a remote host from their current paths to new paths.

The default implementation uses SSH to run a sequence of mv commands on the remote host. This approach may be inefficient on shared storage or high-latency networks. Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.

Subclasses should override this method to provide a more efficient implementation if possible.

Arguments:
  • host (str): The hostname of the remote machine where the files reside.
  • files (list[Path]): A list of source file paths on the remote host.
  • moved_files (list[Path]): A list of destination file paths on the remote host. Must be the same length as files.
Raises:
  • QQError: If the SSH command fails, the files cannot be moved or the length of files does not match the length of moved_files.
@classmethod
def sync_with_exclusions( cls, src_dir: pathlib._local.Path, dest_dir: pathlib._local.Path, src_host: str | None, dest_host: str | None, exclude_files: list[pathlib._local.Path] | None = None) -> None:
355    @classmethod
356    def sync_with_exclusions(
357        cls,
358        src_dir: Path,
359        dest_dir: Path,
360        src_host: str | None,
361        dest_host: str | None,
362        exclude_files: list[Path] | None = None,
363    ) -> None:
364        PBS.sync_with_exclusions(src_dir, dest_dir, src_host, dest_host, exclude_files)

Synchronize the contents of two directories using rsync, optionally across remote hosts, while excluding specified files or subdirectories.

All files and directories in src_dir are copied to dest_dir except those listed in exclude_files. Files are never removed from the destination.

Arguments:
  • src_dir (Path): Source directory to sync from.
  • dest_dir (Path): Destination directory to sync to.
  • src_host (str | None): Optional hostname of the source machine if remote; None if the source is local.
  • dest_host (str | None): Optional hostname of the destination machine if remote; None if the destination is local.
  • exclude_files (list[Path] | None): Optional list of absolute file paths to exclude from syncing. These will be converted to paths relative to src_dir.
Raises:
  • QQError: If the rsync command fails for any reason or timeouts.
@classmethod
def sync_selected( cls, src_dir: pathlib._local.Path, dest_dir: pathlib._local.Path, src_host: str | None, dest_host: str | None, include_files: list[pathlib._local.Path] | None = None) -> None:
366    @classmethod
367    def sync_selected(
368        cls,
369        src_dir: Path,
370        dest_dir: Path,
371        src_host: str | None,
372        dest_host: str | None,
373        include_files: list[Path] | None = None,
374    ) -> None:
375        PBS.sync_selected(src_dir, dest_dir, src_host, dest_host, include_files)

Synchronize only the explicitly selected files and directories from the source to the destination, optionally across remote hosts.

Only files listed in include_files are copied from src_dir to dest_dir. Files not listed are ignored. Files are never removed from the destination.

Arguments:
  • src_dir (Path): Source directory to sync from.
  • dest_dir (Path): Destination directory to sync to.
  • src_host (str | None): Optional hostname of the source machine if remote; None if the source is local.
  • dest_host (str | None): Optional hostname of the destination machine if remote; None if the destination is local.
  • include_files (list[Path] | None): Optional list of absolute file paths to include in syncing. These paths are converted relative to src_dir. This argument is optional only for consistency with sync_with_exclusions.
Raises:
  • QQError: If the rsync command fails or times out.
@classmethod
def sort_jobs(cls, jobs: list[SlurmJob]) -> None:
377    @classmethod
378    def sort_jobs(cls, jobs: list[SlurmJob]) -> None:
379        jobs.sort(key=lambda job: job.get_ids_for_sorting())

Sort a list of batch system jobs by a defined attribute.

The default implementation sorts the jobs alphabetically by their job ID, as returned by job.get_id(). Subclasses may override this method to implement custom sorting logic.

Arguments:
  • jobs (list[BatchJobInterface]): A list of batch job objects to be sorted in-place.
@classmethod
def jobs_presenter_columns_to_show(cls) -> set[str]:
381    @classmethod
382    def jobs_presenter_columns_to_show(cls) -> set[str]:
383        return {
384            "S",
385            "Job ID",
386            "User",
387            "Job Name",
388            "Queue",
389            "NCPUs",
390            "NGPUs",
391            "NNodes",
392            "Times",
393            "Node",
394            "Exit",
395        }

Get a set of columns that should be shown in the output of JobsPresenter (qq jobs) for this batch system.

In the default implementation, all columns are shown.

Note that the 'Exit' column is not shown when printing queued and running jobs, even if you specify it here.

Arguments:
  • set[str]: Set of column titles that should be shown.