qq_lib.batch.slurm
Slurm backend for qq: job submission, monitoring, and cluster-resource access.
This module implements qq's full integration with the Slurm batch system.
It provides:
The
Slurmbatch-system backend, implementing job submission, killing, remote file access and synchronization, resource translation, dependency formatting, and all Slurm-specific environment propagation.SlurmJob,SlurmNode, andSlurmQueue, concrete implementations of qq's job/node/queue interfaces, responsible for parsing Slurm command output and exposing normalized metadata to the rest of qq.
1# Released under MIT License. 2# Copyright (c) 2025-2026 Ladislav Bartos and Robert Vacha Lab 3 4""" 5Slurm backend for qq: job submission, monitoring, and cluster-resource access. 6 7This module implements qq's full integration with the Slurm batch system. 8 9It provides: 10 11- The `Slurm` batch-system backend, implementing job submission, killing, 12 remote file access and synchronization, resource translation, dependency formatting, 13 and all Slurm-specific environment propagation. 14 15- `SlurmJob`, `SlurmNode`, and `SlurmQueue`, concrete implementations of qq's 16 job/node/queue interfaces, responsible for parsing Slurm command output and exposing 17 normalized metadata to the rest of qq. 18""" 19 20from .job import SlurmJob 21from .node import SlurmNode 22from .queue import SlurmQueue 23from .slurm import Slurm 24 25__all__ = [ 26 "SlurmJob", 27 "SlurmNode", 28 "SlurmQueue", 29 "Slurm", 30]
30class SlurmJob(BatchJobInterface): 31 """ 32 Implementation of BatchJobInterface for Slurm. 33 Stores metadata for a single Slurm job. 34 """ 35 36 # converts from Slurm state names to qq BatchStates 37 _STATE_CONVERTER: dict[str, BatchState] = { 38 "BOOT_FAIL": BatchState.FAILED, 39 "CANCELLED": BatchState.FAILED, 40 "COMPLETED": BatchState.FINISHED, 41 "DEADLINE": BatchState.FAILED, 42 "FAILED": BatchState.FAILED, 43 "NODE_FAIL": BatchState.FAILED, 44 "OUT_OF_MEMORY": BatchState.FAILED, 45 "PENDING": BatchState.QUEUED, 46 "PREEMPTED": BatchState.SUSPENDED, 47 "RUNNING": BatchState.RUNNING, 48 "SUSPENDED": BatchState.SUSPENDED, 49 "TIMEOUT": BatchState.FAILED, 50 } 51 52 def __init__(self, job_id: str): 53 """Query the batch system for information about the job with the specified ID.""" 54 self._job_id = job_id 55 self._info: dict[str, str] = {} 56 57 self.update() 58 59 def is_empty(self) -> bool: 60 return not self._info 61 62 def get_id(self) -> str: 63 return self._job_id 64 65 def get_account(self) -> str | None: 66 return self._info.get("Account") 67 68 def update(self) -> None: 69 # first try `scontrol` 70 command = f"scontrol show job {self._job_id} -o" 71 logger.debug(command) 72 73 result = subprocess.run( 74 ["bash"], 75 input=command, 76 text=True, 77 check=False, 78 capture_output=True, 79 errors="replace", 80 ) 81 82 if result.returncode != 0: 83 # if scontrol fails, try sacct 84 logger.debug( 85 f"scontrol failed for job '{self._job_id}' ({result.stderr.strip()}); trying sacct" 86 ) 87 else: 88 self._info: dict[str, str] = parse_slurm_dump_to_dictionary(result.stdout) 89 return 90 91 # if `scontrol` fails, try `sacct` 92 command = f"sacct --allocations --noheader --parsable2 -j {self._job_id} --format={SACCT_FIELDS} " 93 logger.debug(command) 94 95 result = subprocess.run( 96 ["bash"], 97 input=command, 98 text=True, 99 check=False, 100 capture_output=True, 101 errors="replace", 102 ) 103 104 if result.returncode != 0: 105 # if sacct fails, information is empty 106 logger.debug( 107 f"both scontrol and sacct failed: no information about job '{self._job_id}' is available: {result.stderr.strip()}" 108 ) 109 self._info: dict[str, str] = {} 110 else: 111 job: SlurmJob = SlurmJob.from_sacct_string(result.stdout.strip()) 112 self._info: dict[str, str] = job._info 113 114 def get_state(self) -> BatchState: 115 if not (raw_state := self._info.get("JobState")): 116 return BatchState.UNKNOWN 117 118 converted_state = SlurmJob._STATE_CONVERTER.get(raw_state) or BatchState.UNKNOWN 119 120 # if the job is queued due to depending on another job, it should be considered "held" 121 if ( 122 converted_state == BatchState.QUEUED 123 and (comment := self.get_comment()) 124 and "Dependency" in comment 125 ): 126 return BatchState.HELD 127 128 return converted_state 129 130 def get_comment(self) -> str | None: 131 if (reason := self._info.get("Reason")) and reason != "None": 132 return f"Reason: {reason}" 133 134 return None 135 136 def get_estimated(self) -> tuple[datetime, str] | None: 137 # use "StartTime" as an estimate 138 if not (time := self.get_start_time()) or time == "None": 139 return None 140 141 if not (node_list := self._info.get("SchedNodeList")) or "None" in node_list: 142 return None 143 144 return (time, node_list) 145 146 def get_main_node(self) -> str | None: 147 if (main_node := self._info.get("BatchHost")) and "None" not in main_node: 148 return main_node 149 150 # if BatchHost does not exist, use the first node from NodeList 151 if nodes := self.get_nodes(): 152 return nodes[0] 153 154 return None 155 156 def get_nodes(self) -> list[str] | None: 157 if (node_list := self._info.get("NodeList")) and "None" not in node_list: 158 return SlurmJob._expand_node_list(node_list) 159 160 return None 161 162 def get_short_nodes(self) -> list[str] | None: 163 # treat all nodes a single node, without expanding 164 # this assumes that get_short_nodes is only used in qq jobs and qq stat 165 if (node_list := self._info.get("NodeList")) and "None" not in node_list: 166 return [node_list] 167 168 return None 169 170 def get_name(self) -> str | None: 171 if not (name := self._info.get("JobName")): 172 logger.debug(f"Could not get job name for '{self._job_id}'.") 173 return None 174 175 return name 176 177 def get_n_cpus(self) -> int | None: 178 min_cpus = ( 179 self._get_int_property("MinCPUsNode", "the minimum number of CPUs per node") 180 or 0 181 ) * (self.get_n_nodes() or 0) 182 183 if not (cpus := self._get_int_property("NumCPUs", "the number of CPUs")): 184 return None 185 186 return max(min_cpus, cpus) 187 188 def get_n_gpus(self) -> int | None: 189 tres = self._get_tres() 190 for item in tres.split(","): 191 if item.startswith("gpu") or item.startswith("gres/gpu"): 192 try: 193 return int(item.split("=")[1]) 194 except ValueError as e: 195 logger.warning( 196 f"Could not parse the number of GPUs from '{item}': {e}." 197 ) 198 return None 199 200 return None 201 202 def get_n_nodes(self) -> int | None: 203 return self._get_int_property("NumNodes", "the number of nodes") 204 205 def get_mem(self) -> Size | None: 206 tres = self._get_tres() 207 for item in tres.split(","): 208 if item.startswith("mem="): 209 try: 210 return Size.from_string(item.split("=", 1)[1]) 211 except Exception as e: 212 logger.warning(f"Could not parse memory for '{self._job_id}': {e}.") 213 return None 214 215 logger.debug(f"Memory not available for '{self._job_id}'.") 216 return None 217 218 def get_start_time(self) -> datetime | None: 219 return self._get_datetime_property("StartTime", "the job start time") 220 221 def get_submission_time(self) -> datetime | None: 222 return self._get_datetime_property("SubmitTime", "the job submission time") 223 224 def get_completion_time(self) -> datetime | None: 225 # the property EndTime is available for running jobs as well (estimated completion time) 226 # but that should not matter for our purposes 227 return self._get_datetime_property("EndTime", "the job completion time") 228 229 def get_modification_time(self) -> datetime | None: 230 # assuming this is only used for completed jobs 231 return self.get_completion_time() or self.get_submission_time() 232 233 def get_user(self) -> str | None: 234 if not (user := self._info.get("UserId")): 235 logger.debug(f"Could not get user for '{self._job_id}'.") 236 return None 237 238 return user.split("(")[0] 239 240 def get_walltime(self) -> timedelta | None: 241 if not (walltime := self._info.get("TimeLimit")): 242 logger.debug(f"Could not get walltime for '{self._job_id}'.") 243 return None 244 245 try: 246 return dhhmmss_to_duration(walltime) 247 except QQError as e: 248 logger.warning(f"Could not parse walltime for '{self._job_id}': {e}.") 249 return None 250 251 def get_queue(self) -> str | None: 252 if not (queue := self._info.get("Partition")): 253 logger.debug(f"Could not get queue for '{self._job_id}'.") 254 return None 255 256 return queue 257 258 def get_util_cpu(self) -> int | None: 259 # not available in Slurm 260 return None 261 262 def get_util_mem(self) -> int | None: 263 # not available in Slurm 264 return None 265 266 def get_exit_code(self) -> int | None: 267 if not (raw_exit := self._info.get("ExitCode")): 268 return None 269 270 try: 271 # Slurm reports two exit codes; the first one is exit code of the script 272 # the second one is a signal 273 # we return the first non-zero exit code or 0 if both exit codes are 0 274 code, signal = map(int, raw_exit.split(":")) 275 return code or signal 276 except Exception as e: 277 logger.debug(f"Could not parse exit codes '{raw_exit}': {e}.") 278 return None 279 280 def get_input_machine(self) -> str | None: 281 # not available for Slurm 282 return None 283 284 def get_input_dir(self) -> Path | None: 285 if not (raw_dir := self._info.get("WorkDir")): 286 logger.debug(f"Could not obtain input directory for '{self._job_id}'.") 287 return None 288 289 return logical_resolve(Path(raw_dir)) 290 291 def get_info_file(self) -> Path | None: 292 if not (input_dir := self.get_input_dir()) or not (name := self.get_name()): 293 return None 294 295 info_file = (input_dir / name).with_suffix(CFG.suffixes.qq_info) 296 297 # we need to check whether the info file actually exists 298 # (or rather if it is available to the user) 299 try: 300 if not info_file.is_file(): 301 return None 302 except PermissionError: 303 return None 304 305 return info_file 306 307 def to_yaml(self) -> str: 308 return yaml.dump( 309 self._info, default_flow_style=False, sort_keys=False, Dumper=Dumper 310 ) 311 312 def get_steps(self) -> Sequence[Self]: 313 command = f"sacct -j {self._job_id} --parsable2 --format={SACCT_STEP_FIELDS}" 314 logger.debug(command) 315 316 result = subprocess.run( 317 ["bash"], 318 input=command, 319 text=True, 320 check=False, 321 capture_output=True, 322 errors="replace", 323 ) 324 325 if result.returncode != 0: 326 logger.debug(f"Could not get steps for a job '{self._job_id}'.") 327 return [] 328 329 jobs = [] 330 for sacct_string in result.stdout.split("\n"): 331 if sacct_string.strip() == "": 332 continue 333 334 job = SlurmJob._step_from_sacct_string(sacct_string) 335 # only consider job steps with numeric indices 336 if (step_id := job.get_step_id()) and step_id.isnumeric(): 337 jobs.append(job) 338 339 return jobs 340 341 def get_step_id(self) -> str | None: 342 try: 343 (_, step) = self._job_id.split(".", maxsplit=1) 344 return step 345 except ValueError: 346 return None 347 348 def is_array_job(self) -> bool: 349 return False 350 351 @classmethod 352 def from_dict(cls, job_id: str, info: dict[str, str]) -> Self: 353 """ 354 Construct a new instance of SlurmJob from a job ID and a dictionary of job information. 355 356 This method bypasses the standard initializer and directly sets the `_job_id` and `_info` 357 attributes of the new instance. 358 359 Args: 360 job_id (str): The unique identifier of the job. 361 info (dict[str, str]): A dictionary containing Slurm job metadata as key-value pairs. 362 363 Returns: 364 Self: A new instance of SlurmJob. 365 366 Note: 367 This method does not perform any validation or processing of the provided dictionary. 368 """ 369 job_info = cls.__new__(cls) 370 job_info._job_id = job_id 371 job_info._info = info 372 373 return job_info 374 375 @classmethod 376 def from_sacct_string(cls, string: str) -> Self: 377 """ 378 Construct a new instance of SlurmJob using a string from sacct. 379 380 Args: 381 string (str): String describing the job properties obtained using sacct. 382 383 Returns: 384 Self: A new instance of SlurmJob. 385 """ 386 fields: list[str] = [ 387 "JobId", 388 "Account", 389 "JobState", 390 "UserId", 391 "JobName", 392 "Partition", 393 "WorkDir", 394 "AllocCPUs", 395 "ReqCPUs", 396 "AllocTRES", 397 "ReqTRES", 398 "AllocNodes", 399 "ReqNodes", 400 "SubmitTime", 401 "StartTime", 402 "EndTime", 403 "TimeLimit", 404 "NodeList", 405 "Reason", 406 "ExitCode", 407 ] 408 409 split = string.split("|") 410 if len(fields) != len(split): 411 raise QQError( 412 f"Number of items in a sacct string '{string}' ('{len(split)}') does not match the expected number of items ('{len(fields)}'). This is a bug, please report it!" 413 ) 414 415 info: dict[str, str] = dict(zip(fields, split)) 416 417 # only take the first word from JobState 418 # other words may contain useless additional information 419 info["JobState"] = info["JobState"].split()[0] 420 421 SlurmJob._assign_if_allocated(info, "AllocCPUs", "ReqCPUs", "NumCPUs") 422 SlurmJob._assign_if_allocated(info, "AllocNodes", "ReqNodes", "NumNodes") 423 424 return cls.from_dict(info["JobId"], info) 425 426 @classmethod 427 def _step_from_sacct_string(cls, string: str) -> Self: 428 """ 429 Construct a new instance of SlurmJob step using a string from sacct. 430 431 Args: 432 string (str): String describing the job properties obtained using sacct. 433 434 Returns: 435 Self: A new instance of SlurmJob for a job step. 436 """ 437 fields: list[str] = [ 438 "JobId", 439 "JobState", 440 "StartTime", 441 "EndTime", 442 ] 443 444 split = string.split("|") 445 if len(fields) != len(split): 446 raise QQError( 447 f"Number of items in a sacct string for a slurm step '{string}' ('{len(split)}') does not match the expected number of items ('{len(fields)}'). This is a bug, please report it!" 448 ) 449 450 info: dict[str, str] = dict(zip(fields, split)) 451 452 # only take the first word from JobState 453 # other words may contain useless additional information 454 info["JobState"] = info["JobState"].split()[0] 455 456 return cls.from_dict(info["JobId"], info) 457 458 def get_ids_for_sorting(self) -> list[int]: 459 """ 460 Extract numeric components of the job ID for sorting. 461 462 The method retrieves the leading numeric portion of the job ID, which may 463 contain multiple integer groups separated by underscores. Parsing stops 464 when a non-digit and non-underscore character is encountered. 465 466 Returns: 467 list[int]: A list of integer components extracted from the job ID, 468 or [0] if no valid numeric portion is found. 469 """ 470 # get the numerical portion of the job ID (may contain underscores) 471 match = re.match(r"(\d+(?:_\d+)*)", self.get_id()) 472 if not match: 473 return [0] 474 475 # split the matched portion into digit groups 476 groups = match.group(1).split("_") 477 return [int(g) for g in groups] 478 479 @staticmethod 480 def _expand_node_list(compact: str) -> list[str]: 481 """ 482 Expand a compact Slurm node list expression into individual hostnames. 483 484 This method uses the Slurm `scontrol show hostnames` command to translate 485 a compact node list (e.g., "node[01-03]") into an explicit list of node names. 486 If the expansion fails, the original compact string is returned as a single-element list. 487 488 Args: 489 compact (str): The compact Slurm node list expression to expand. 490 491 Returns: 492 list[str]: A list of fully expanded node hostnames. If expansion fails, 493 returns a list containing the original input string. 494 """ 495 command = f"scontrol show hostnames {compact}" 496 logger.debug(command) 497 498 result = subprocess.run( 499 ["bash"], 500 input=command, 501 text=True, 502 check=False, 503 capture_output=True, 504 errors="replace", 505 ) 506 507 if result.returncode != 0: 508 logger.warning( 509 f"Could not expand '{compact}' into a list of nodes: {result.stderr.strip()}" 510 ) 511 # use unexpanded string 512 return [compact] 513 514 return result.stdout.strip().split("\n") 515 516 def _get_int_property(self, property: str, property_name: str) -> int | None: 517 """ 518 Retrieve an integer property value from the job information. 519 520 If the property contains a range (e.g., "MIN-MAX"), only the minimum value 521 is returned. If the property cannot be retrieved or converted to an integer, 522 `None` is returned. 523 524 Args: 525 property (str): The key identifying the property in the job information. 526 property_name (str): A human-readable name of the property for logging. 527 528 Returns: 529 int: The integer value of the property, or `None` if unavailable or invalid. 530 """ 531 try: 532 # we split by '-' because pending jobs may have this property shown as MIN-MAX 533 # we show the value of the minimum 534 return int(self._info[property].split("-")[0]) 535 except Exception: 536 logger.debug( 537 f"Could not get information about {property_name} from the batch system for '{self._job_id}'." 538 ) 539 return None 540 541 def _get_datetime_property( 542 self, property: str, property_name: str 543 ) -> datetime | None: 544 """ 545 Retrieve and parse a datetime property from the job information. 546 547 If the property is missing, empty, or marked as unknown, None is returned. 548 A warning is logged if parsing fails. 549 550 Args: 551 property (str): The key identifying the property in the job information. 552 property_name (str): A human-readable name of the property for logging. 553 554 Returns: 555 datetime | None: A datetime object if parsing succeeds, otherwise None. 556 """ 557 if not (raw_datetime := self._info.get(property)) or raw_datetime.lower() in [ 558 "unknown", 559 "n/a", 560 "none", 561 "", 562 ]: 563 return None 564 565 try: 566 return datetime.strptime(raw_datetime, CFG.date_formats.slurm) 567 except Exception as e: 568 logger.warning( 569 f"Could not parse information about {property_name} for '{self._job_id}': {e}." 570 ) 571 return None 572 573 def _get_tres(self) -> str: 574 """ 575 Return the AllocTRES property or ReqTRES property, depending on which of them is available. 576 Note that the resources specified in ReqTRES can potentially be different than the resources in AllocTRES. 577 """ 578 tres = self._info.get("AllocTRES") 579 if not tres or "null" in tres or "None" in tres or "N/A" in tres: 580 tres = self._info.get("ReqTRES", "") 581 582 return tres 583 584 @staticmethod 585 def _assign_if_allocated( 586 info: dict[str, str], alloc_key: str, req_key: str, target_key: str 587 ) -> None: 588 """ 589 Assigns a value to a target key in the `info` dictionary, preferring an allocated value 590 if it exists and is valid; otherwise, falls back to the requested value. 591 592 Args: 593 info (dict[str, str]): The dictionary containing allocation and request data. 594 alloc_key (str): The key for the allocated resource (e.g., "AllocCPUs"). 595 req_key (str): The key for the requested resource (e.g., "ReqCPUs"). 596 target_key (str): The key under which the resolved value should be stored (e.g., "NumCPUs"). 597 598 Notes: 599 - A value is considered invalid if it is `None`, an empty string `""`, or `"0"`. 600 - The function updates `info` in place. 601 """ 602 value = info.get(alloc_key) 603 info[target_key] = ( 604 value if value not in (None, "None", "", "0") else info.get(req_key, "0") 605 )
Implementation of BatchJobInterface for Slurm. Stores metadata for a single Slurm job.
52 def __init__(self, job_id: str): 53 """Query the batch system for information about the job with the specified ID.""" 54 self._job_id = job_id 55 self._info: dict[str, str] = {} 56 57 self.update()
Query the batch system for information about the job with the specified ID.
Check whether the job contains any information. This should return True if the job does not exist in the batch system.
Returns:
bool: True if the job contains no information.
Return the account under which the job is submitted.
Returns:
str | None: Account associated with the job or None if no account is defined.
68 def update(self) -> None: 69 # first try `scontrol` 70 command = f"scontrol show job {self._job_id} -o" 71 logger.debug(command) 72 73 result = subprocess.run( 74 ["bash"], 75 input=command, 76 text=True, 77 check=False, 78 capture_output=True, 79 errors="replace", 80 ) 81 82 if result.returncode != 0: 83 # if scontrol fails, try sacct 84 logger.debug( 85 f"scontrol failed for job '{self._job_id}' ({result.stderr.strip()}); trying sacct" 86 ) 87 else: 88 self._info: dict[str, str] = parse_slurm_dump_to_dictionary(result.stdout) 89 return 90 91 # if `scontrol` fails, try `sacct` 92 command = f"sacct --allocations --noheader --parsable2 -j {self._job_id} --format={SACCT_FIELDS} " 93 logger.debug(command) 94 95 result = subprocess.run( 96 ["bash"], 97 input=command, 98 text=True, 99 check=False, 100 capture_output=True, 101 errors="replace", 102 ) 103 104 if result.returncode != 0: 105 # if sacct fails, information is empty 106 logger.debug( 107 f"both scontrol and sacct failed: no information about job '{self._job_id}' is available: {result.stderr.strip()}" 108 ) 109 self._info: dict[str, str] = {} 110 else: 111 job: SlurmJob = SlurmJob.from_sacct_string(result.stdout.strip()) 112 self._info: dict[str, str] = job._info
Refresh the stored job information from the batch system.
Raises:
- QQError: If the job cannot be queried or its info updated.
114 def get_state(self) -> BatchState: 115 if not (raw_state := self._info.get("JobState")): 116 return BatchState.UNKNOWN 117 118 converted_state = SlurmJob._STATE_CONVERTER.get(raw_state) or BatchState.UNKNOWN 119 120 # if the job is queued due to depending on another job, it should be considered "held" 121 if ( 122 converted_state == BatchState.QUEUED 123 and (comment := self.get_comment()) 124 and "Dependency" in comment 125 ): 126 return BatchState.HELD 127 128 return converted_state
Return the current state of the job as reported by the batch system.
If the job information is no longer available, return BatchState.UNKNOWN.
Returns:
BatchState: The job state according to the batch system.
130 def get_comment(self) -> str | None: 131 if (reason := self._info.get("Reason")) and reason != "None": 132 return f"Reason: {reason}" 133 134 return None
Retrieve the batch system-provided comment for the job.
Returns:
str | None: The job's comment string if available, or None if the batch system has not attached a comment.
136 def get_estimated(self) -> tuple[datetime, str] | None: 137 # use "StartTime" as an estimate 138 if not (time := self.get_start_time()) or time == "None": 139 return None 140 141 if not (node_list := self._info.get("SchedNodeList")) or "None" in node_list: 142 return None 143 144 return (time, node_list)
Retrieve the batch system's estimated job start time and execution node.
Returns:
tuple[datetime, str] | None: A tuple containing: - datetime: The estimated start time of the job. - str: The name of the node where the job is expected to run. Returns None if either estimate is unavailable.
146 def get_main_node(self) -> str | None: 147 if (main_node := self._info.get("BatchHost")) and "None" not in main_node: 148 return main_node 149 150 # if BatchHost does not exist, use the first node from NodeList 151 if nodes := self.get_nodes(): 152 return nodes[0] 153 154 return None
Retrieve the hostname of the main execution node for the job.
Returns:
str | None: The hostname of the main execution node, or
Noneif unavailable or not applicable.
156 def get_nodes(self) -> list[str] | None: 157 if (node_list := self._info.get("NodeList")) and "None" not in node_list: 158 return SlurmJob._expand_node_list(node_list) 159 160 return None
Retrieve the hostnames of all execution nodes allocated for the job.
Returns:
list[str] | None: A list of hostnames or node identifiers used by the job, or
Noneif node information is not available.
162 def get_short_nodes(self) -> list[str] | None: 163 # treat all nodes a single node, without expanding 164 # this assumes that get_short_nodes is only used in qq jobs and qq stat 165 if (node_list := self._info.get("NodeList")) and "None" not in node_list: 166 return [node_list] 167 168 return None
Retrieve the short hostnames of all execution nodes allocated for the job.
Returns:
list[str] | None: A list of short hostnames used by the job, or
Noneif node information is not available.
170 def get_name(self) -> str | None: 171 if not (name := self._info.get("JobName")): 172 logger.debug(f"Could not get job name for '{self._job_id}'.") 173 return None 174 175 return name
Return the name of the job.
Returns:
str | None: The name of the submitted job or
Noneif not available.
177 def get_n_cpus(self) -> int | None: 178 min_cpus = ( 179 self._get_int_property("MinCPUsNode", "the minimum number of CPUs per node") 180 or 0 181 ) * (self.get_n_nodes() or 0) 182 183 if not (cpus := self._get_int_property("NumCPUs", "the number of CPUs")): 184 return None 185 186 return max(min_cpus, cpus)
Return the number of CPU cores allocated for the job.
Returns:
int | None: Number of CPUs allocated for the job or
Noneif not available.
188 def get_n_gpus(self) -> int | None: 189 tres = self._get_tres() 190 for item in tres.split(","): 191 if item.startswith("gpu") or item.startswith("gres/gpu"): 192 try: 193 return int(item.split("=")[1]) 194 except ValueError as e: 195 logger.warning( 196 f"Could not parse the number of GPUs from '{item}': {e}." 197 ) 198 return None 199 200 return None
Return the number of GPUs allocated for the job.
Returns:
int | None: Number of GPUs allocated for the job or
Noneif not available.
202 def get_n_nodes(self) -> int | None: 203 return self._get_int_property("NumNodes", "the number of nodes")
Return the number of compute nodes assigned to the job.
Returns:
int | None: Number of nodes used by the job or
Noneif not available.
205 def get_mem(self) -> Size | None: 206 tres = self._get_tres() 207 for item in tres.split(","): 208 if item.startswith("mem="): 209 try: 210 return Size.from_string(item.split("=", 1)[1]) 211 except Exception as e: 212 logger.warning(f"Could not parse memory for '{self._job_id}': {e}.") 213 return None 214 215 logger.debug(f"Memory not available for '{self._job_id}'.") 216 return None
Return the amount of memory allocated for the job.
Returns:
Size | None: Amount of memory allocated for the job or
Noneif not available.
218 def get_start_time(self) -> datetime | None: 219 return self._get_datetime_property("StartTime", "the job start time")
Return the timestamp when the job started execution.
Returns:
datetime | None: Time when the job began running or
Noneif the job has not yet started.
221 def get_submission_time(self) -> datetime | None: 222 return self._get_datetime_property("SubmitTime", "the job submission time")
Return the timestamp when the job was submitted.
Returns:
datetime | None: Time when the job was submitted to the batch system or
Noneif not available.
224 def get_completion_time(self) -> datetime | None: 225 # the property EndTime is available for running jobs as well (estimated completion time) 226 # but that should not matter for our purposes 227 return self._get_datetime_property("EndTime", "the job completion time")
Return the timestamp when the job was completed.
Returns:
datetime | None: Time when the job completed or
Noneif the job has not yet completed.
229 def get_modification_time(self) -> datetime | None: 230 # assuming this is only used for completed jobs 231 return self.get_completion_time() or self.get_submission_time()
Return the timestamp at which the job was last modified.
Returns:
datetime | None: Time when the job was last modified or
Noneif the information is not available.
233 def get_user(self) -> str | None: 234 if not (user := self._info.get("UserId")): 235 logger.debug(f"Could not get user for '{self._job_id}'.") 236 return None 237 238 return user.split("(")[0]
Return the username of the job owner.
Returns:
str | None: Username of the user who owns the job or
Noneif not available.
240 def get_walltime(self) -> timedelta | None: 241 if not (walltime := self._info.get("TimeLimit")): 242 logger.debug(f"Could not get walltime for '{self._job_id}'.") 243 return None 244 245 try: 246 return dhhmmss_to_duration(walltime) 247 except QQError as e: 248 logger.warning(f"Could not parse walltime for '{self._job_id}': {e}.") 249 return None
Return the walltime limit of the job.
Returns:
timedelta | None: Walltime for the job or
Noneif not available.
251 def get_queue(self) -> str | None: 252 if not (queue := self._info.get("Partition")): 253 logger.debug(f"Could not get queue for '{self._job_id}'.") 254 return None 255 256 return queue
Return the submission queue of the job.
Returns:
str | None: The queue this job is part of or
Noneif not available.
Return the utilization of requested CPUs in percents (0-100).
Returns:
int | None: Utilization of requested CPUs or
Noneif not available.
Return the utilization of requested memory in percents (0-100).
Returns:
int | None: Utilization of requested memory or
Noneif not available.
266 def get_exit_code(self) -> int | None: 267 if not (raw_exit := self._info.get("ExitCode")): 268 return None 269 270 try: 271 # Slurm reports two exit codes; the first one is exit code of the script 272 # the second one is a signal 273 # we return the first non-zero exit code or 0 if both exit codes are 0 274 code, signal = map(int, raw_exit.split(":")) 275 return code or signal 276 except Exception as e: 277 logger.debug(f"Could not parse exit codes '{raw_exit}': {e}.") 278 return None
Return the exit code of the job.
Returns:
int | None: Exit code of the job or
Noneif exit code is not assigned.
Return the hostname of the submission machine.
Returns:
str | None: Hostname of the submission machine or
Noneif not available.
284 def get_input_dir(self) -> Path | None: 285 if not (raw_dir := self._info.get("WorkDir")): 286 logger.debug(f"Could not obtain input directory for '{self._job_id}'.") 287 return None 288 289 return logical_resolve(Path(raw_dir))
Return path to the directory from which the job was submitted.
Returns:
Path | None: Path to the submission directory or
Noneif not available.
291 def get_info_file(self) -> Path | None: 292 if not (input_dir := self.get_input_dir()) or not (name := self.get_name()): 293 return None 294 295 info_file = (input_dir / name).with_suffix(CFG.suffixes.qq_info) 296 297 # we need to check whether the info file actually exists 298 # (or rather if it is available to the user) 299 try: 300 if not info_file.is_file(): 301 return None 302 except PermissionError: 303 return None 304 305 return info_file
Return path to the info file associated with this job.
Returns:
Path | None: Path to the qq info file or
Noneif this is not a qq job.
307 def to_yaml(self) -> str: 308 return yaml.dump( 309 self._info, default_flow_style=False, sort_keys=False, Dumper=Dumper 310 )
Return all information about the job from the batch system in YAML format.
Returns:
str: YAML-formatted string of job metadata.
312 def get_steps(self) -> Sequence[Self]: 313 command = f"sacct -j {self._job_id} --parsable2 --format={SACCT_STEP_FIELDS}" 314 logger.debug(command) 315 316 result = subprocess.run( 317 ["bash"], 318 input=command, 319 text=True, 320 check=False, 321 capture_output=True, 322 errors="replace", 323 ) 324 325 if result.returncode != 0: 326 logger.debug(f"Could not get steps for a job '{self._job_id}'.") 327 return [] 328 329 jobs = [] 330 for sacct_string in result.stdout.split("\n"): 331 if sacct_string.strip() == "": 332 continue 333 334 job = SlurmJob._step_from_sacct_string(sacct_string) 335 # only consider job steps with numeric indices 336 if (step_id := job.get_step_id()) and step_id.isnumeric(): 337 jobs.append(job) 338 339 return jobs
Return a list of steps associated with this job.
Note that job step is represented by BatchJobInterface, but may not contain all the values that a proper BatchJobInterface contains.
Returns:
Sequence[BatchJobInterface]: List of job steps. An empty list if there are none.
341 def get_step_id(self) -> str | None: 342 try: 343 (_, step) = self._job_id.split(".", maxsplit=1) 344 return step 345 except ValueError: 346 return None
Return the step index if this job is a job step.
Returns:
str | None: Job step index or
Noneif this is not a job step.
Return True if the job is a top-level array job (not a sub-job).
Returns:
bool:
Trueif the job is a top-level array job, elseFalse.
351 @classmethod 352 def from_dict(cls, job_id: str, info: dict[str, str]) -> Self: 353 """ 354 Construct a new instance of SlurmJob from a job ID and a dictionary of job information. 355 356 This method bypasses the standard initializer and directly sets the `_job_id` and `_info` 357 attributes of the new instance. 358 359 Args: 360 job_id (str): The unique identifier of the job. 361 info (dict[str, str]): A dictionary containing Slurm job metadata as key-value pairs. 362 363 Returns: 364 Self: A new instance of SlurmJob. 365 366 Note: 367 This method does not perform any validation or processing of the provided dictionary. 368 """ 369 job_info = cls.__new__(cls) 370 job_info._job_id = job_id 371 job_info._info = info 372 373 return job_info
Construct a new instance of SlurmJob from a job ID and a dictionary of job information.
This method bypasses the standard initializer and directly sets the _job_id and _info
attributes of the new instance.
Arguments:
- job_id (str): The unique identifier of the job.
- info (dict[str, str]): A dictionary containing Slurm job metadata as key-value pairs.
Returns:
Self: A new instance of SlurmJob.
Note:
This method does not perform any validation or processing of the provided dictionary.
375 @classmethod 376 def from_sacct_string(cls, string: str) -> Self: 377 """ 378 Construct a new instance of SlurmJob using a string from sacct. 379 380 Args: 381 string (str): String describing the job properties obtained using sacct. 382 383 Returns: 384 Self: A new instance of SlurmJob. 385 """ 386 fields: list[str] = [ 387 "JobId", 388 "Account", 389 "JobState", 390 "UserId", 391 "JobName", 392 "Partition", 393 "WorkDir", 394 "AllocCPUs", 395 "ReqCPUs", 396 "AllocTRES", 397 "ReqTRES", 398 "AllocNodes", 399 "ReqNodes", 400 "SubmitTime", 401 "StartTime", 402 "EndTime", 403 "TimeLimit", 404 "NodeList", 405 "Reason", 406 "ExitCode", 407 ] 408 409 split = string.split("|") 410 if len(fields) != len(split): 411 raise QQError( 412 f"Number of items in a sacct string '{string}' ('{len(split)}') does not match the expected number of items ('{len(fields)}'). This is a bug, please report it!" 413 ) 414 415 info: dict[str, str] = dict(zip(fields, split)) 416 417 # only take the first word from JobState 418 # other words may contain useless additional information 419 info["JobState"] = info["JobState"].split()[0] 420 421 SlurmJob._assign_if_allocated(info, "AllocCPUs", "ReqCPUs", "NumCPUs") 422 SlurmJob._assign_if_allocated(info, "AllocNodes", "ReqNodes", "NumNodes") 423 424 return cls.from_dict(info["JobId"], info)
Construct a new instance of SlurmJob using a string from sacct.
Arguments:
- string (str): String describing the job properties obtained using sacct.
Returns:
Self: A new instance of SlurmJob.
458 def get_ids_for_sorting(self) -> list[int]: 459 """ 460 Extract numeric components of the job ID for sorting. 461 462 The method retrieves the leading numeric portion of the job ID, which may 463 contain multiple integer groups separated by underscores. Parsing stops 464 when a non-digit and non-underscore character is encountered. 465 466 Returns: 467 list[int]: A list of integer components extracted from the job ID, 468 or [0] if no valid numeric portion is found. 469 """ 470 # get the numerical portion of the job ID (may contain underscores) 471 match = re.match(r"(\d+(?:_\d+)*)", self.get_id()) 472 if not match: 473 return [0] 474 475 # split the matched portion into digit groups 476 groups = match.group(1).split("_") 477 return [int(g) for g in groups]
Extract numeric components of the job ID for sorting.
The method retrieves the leading numeric portion of the job ID, which may contain multiple integer groups separated by underscores. Parsing stops when a non-digit and non-underscore character is encountered.
Returns:
list[int]: A list of integer components extracted from the job ID, or [0] if no valid numeric portion is found.
21class SlurmNode(BatchNodeInterface): 22 """ 23 Implementation of BatchNodeInterface for Slurm. 24 Stores metadata for a single Slurm node. 25 """ 26 27 def __init__(self, name: str): 28 self._name = name 29 self._info: dict[str, str] = {} 30 31 self.update() 32 33 def update(self) -> None: 34 # get node info from Slurm 35 command = f"scontrol show node {self._name} -o" 36 37 result = subprocess.run( 38 ["bash"], 39 input=command, 40 text=True, 41 check=False, 42 capture_output=True, 43 errors="replace", 44 ) 45 46 if result.returncode != 0: 47 raise QQError(f"Node '{self._name}' does not exist.") 48 49 self._info = parse_slurm_dump_to_dictionary(result.stdout) 50 51 def get_name(self) -> str: 52 return self._name 53 54 def get_n_cpus(self) -> int | None: 55 return self._get_int_resource("CPUTot") 56 57 def get_n_free_cpus(self) -> int | None: 58 if not (cpus := self.get_n_cpus()): 59 return None 60 61 return cpus - (self._get_int_resource("CPUAlloc") or 0) 62 63 def get_n_gpus(self) -> int | None: 64 return self._get_int_from_tres("CfgTRES", "gpu") 65 66 def get_n_free_gpus(self) -> int | None: 67 if not (gpus := self.get_n_gpus()): 68 return None 69 70 return gpus - (self._get_int_from_tres("AllocTRES", "gpu") or 0) 71 72 def get_cpu_memory(self) -> Size | None: 73 # RealMemory corresponds to memory configured in slurm.conf 74 return self._get_size_resource("RealMemory") 75 76 def get_free_cpu_memory(self) -> Size | None: 77 if not (mem := self.get_cpu_memory()): 78 return None 79 80 # we do not use the FreeMem property as it corresponds to the ACTUAL free memory on the machine 81 # and can be higher than RealMemory - AllocMem (e.g. if the jobs don't use all the allocated memory) 82 return mem - (self._get_size_resource("AllocMem") or Size(0, "kb")) 83 84 def get_gpu_memory(self) -> Size | None: 85 return None 86 87 def get_free_gpu_memory(self) -> Size | None: 88 return None 89 90 def get_local_scratch(self) -> Size | None: 91 return self._get_size_resource("TmpDisk") 92 93 def get_free_local_scratch(self) -> Size | None: 94 return self._get_size_resource("TmpDisk") 95 96 def get_ssd_scratch(self) -> Size | None: 97 return None 98 99 def get_free_ssd_scratch(self) -> Size | None: 100 return None 101 102 def get_shared_scratch(self) -> Size | None: 103 return None 104 105 def get_free_shared_scratch(self) -> Size | None: 106 return None 107 108 def get_properties(self) -> list[str]: 109 if not (raw := self._info.get("AvailableFeatures")): 110 return [] 111 112 return raw.split(",") 113 114 def is_available_to_user(self, user: str) -> bool: 115 _ = user 116 117 if not (state := self._info.get("State")): 118 logger.debug(f"Could not get state information for node '{self._name}'.") 119 return False 120 121 invalid_states = [ 122 "DOWN", 123 "DRAINED", 124 "FAIL", 125 "FUTURE", 126 "INVAL", 127 "MAINT", 128 "PERFCTRS", 129 "POWERED_DOWN", 130 "POWERING_DOWN", 131 "RESERVED", 132 "UNKNOWN", 133 ] 134 135 return state not in invalid_states 136 137 @classmethod 138 def from_dict(cls, name: str, info: dict[str, str]) -> Self: 139 """ 140 Construct a new instance of SlurmNode from node name and a dictionary of node information. 141 142 Args: 143 name (str): The unique name of the node. 144 info (dict[str, str]): A dictionary containing Slurm node metadata as key-value pairs. 145 146 Returns: 147 Self: A new instance of SlurmNode. 148 149 Note: 150 This method does not perform any validation or processing of the provided dictionary. 151 """ 152 node = cls.__new__(cls) 153 node._name = name 154 node._info = info 155 156 return node 157 158 def to_yaml(self) -> str: 159 return yaml.dump( 160 self._info, default_flow_style=False, sort_keys=False, Dumper=Dumper 161 ) 162 163 def _get_int_resource(self, res: str) -> int | None: 164 """ 165 Retrieve an integer-valued resource from the node information. 166 167 Args: 168 res (str): The resource key to retrieve. 169 170 Returns: 171 int | None: The integer value of the resource, or `None` if unavailable or invalid. 172 """ 173 if not (val := self._info.get(res)): 174 return None 175 try: 176 return int(val) 177 except Exception as e: 178 logger.debug(f"Could not parse the value '{val}' of resource '{res}': {e}.") 179 return None 180 181 def _get_int_from_tres(self, tres_key: str, res: str) -> int | None: 182 """ 183 Retrieve an integer-valued resources from TRES. 184 185 Args: 186 tres_key (str): The tres key to use. 187 res (str): The resource key to retrieve. 188 189 Returns: 190 int | None: The integer value of the resources, or `None` if unavailable or invalid. 191 """ 192 tres = self._info.get(tres_key, "") 193 194 for item in tres.split(","): 195 if res in item: 196 try: 197 return int(item.split("=")[1]) 198 except ValueError as e: 199 logger.debug( 200 f"Could not parse the property '{res}' from '{item}': {e}." 201 ) 202 203 return None 204 205 def _get_size_resource(self, res: str) -> Size | None: 206 """ 207 Retrieve a Size resource from the node information. 208 209 Args: 210 res (str): The resource key to retrieve. 211 212 Returns: 213 Size | None: The parsed Size, or `None` if unavailable or invalid. 214 """ 215 if not (val := self._info.get(res)): 216 return None 217 218 try: 219 if val.isnumeric(): 220 val += "M" 221 return Size.from_string(val) 222 except Exception as e: 223 logger.debug(f"Could not parse the value '{val}' of resource '{res}': {e}.") 224 return None
Implementation of BatchNodeInterface for Slurm. Stores metadata for a single Slurm node.
33 def update(self) -> None: 34 # get node info from Slurm 35 command = f"scontrol show node {self._name} -o" 36 37 result = subprocess.run( 38 ["bash"], 39 input=command, 40 text=True, 41 check=False, 42 capture_output=True, 43 errors="replace", 44 ) 45 46 if result.returncode != 0: 47 raise QQError(f"Node '{self._name}' does not exist.") 48 49 self._info = parse_slurm_dump_to_dictionary(result.stdout)
Refresh the stored node information from the batch system.
Raises:
- QQError: If the node cannot be queried or its info updated.
Retrieve the name of the node.
Returns:
str: The name identifying the node in the batch system.
Retrieve the total number of CPU cores available on the node.
Returns:
int | None: Total CPU core count or
Noneif not available.
57 def get_n_free_cpus(self) -> int | None: 58 if not (cpus := self.get_n_cpus()): 59 return None 60 61 return cpus - (self._get_int_resource("CPUAlloc") or 0)
Retrieve the number of currently available (unallocated) CPU cores.
Returns:
int | None: Number of free CPU cores or
Noneif not available.
Retrieve the total number of GPUs available on the node.
Returns:
int | None: Total GPU count or
Noneif not available..
66 def get_n_free_gpus(self) -> int | None: 67 if not (gpus := self.get_n_gpus()): 68 return None 69 70 return gpus - (self._get_int_from_tres("AllocTRES", "gpu") or 0)
Retrieve the number of currently available (unallocated) GPUs.
Returns:
int | None: Number of free GPUs or
Noneif not available.
72 def get_cpu_memory(self) -> Size | None: 73 # RealMemory corresponds to memory configured in slurm.conf 74 return self._get_size_resource("RealMemory")
Retrieve the total CPU memory capacity of the node.
Returns:
Size | None: Total CPU memory available on the node or
Noneif not available.
76 def get_free_cpu_memory(self) -> Size | None: 77 if not (mem := self.get_cpu_memory()): 78 return None 79 80 # we do not use the FreeMem property as it corresponds to the ACTUAL free memory on the machine 81 # and can be higher than RealMemory - AllocMem (e.g. if the jobs don't use all the allocated memory) 82 return mem - (self._get_size_resource("AllocMem") or Size(0, "kb"))
Retrieve the currently available CPU memory.
Returns:
Size | None: Free (unused) CPU memory or
Noneif not available.
Retrieve the total GPU memory capacity of the node.
Returns:
Size | None: Total GPU memory available or
Noneif not available.
Retrieve the currently available GPU memory.
Returns:
Size | None: Free (unused) GPU memory or
Noneif not available.
Retrieve the total local scratch storage capacity of the node.
Returns:
Size | None: Total size of local scratch space or
Noneif not available.
Retrieve the available local scratch storage space.
Returns:
Size | None: Free local scratch space or
Noneif not available.
Retrieve the total SSD-based scratch storage capacity.
Returns:
Size | None: Total SSD scratch capacity or
Noneif not available.
Retrieve the currently available SSD-based scratch storage space.
Returns:
Size | None: Free SSD scratch space or
Noneif not available.
108 def get_properties(self) -> list[str]: 109 if not (raw := self._info.get("AvailableFeatures")): 110 return [] 111 112 return raw.split(",")
Get the list of properties or labels assigned to the node.
Returns:
list[str]: List of node property strings.
114 def is_available_to_user(self, user: str) -> bool: 115 _ = user 116 117 if not (state := self._info.get("State")): 118 logger.debug(f"Could not get state information for node '{self._name}'.") 119 return False 120 121 invalid_states = [ 122 "DOWN", 123 "DRAINED", 124 "FAIL", 125 "FUTURE", 126 "INVAL", 127 "MAINT", 128 "PERFCTRS", 129 "POWERED_DOWN", 130 "POWERING_DOWN", 131 "RESERVED", 132 "UNKNOWN", 133 ] 134 135 return state not in invalid_states
Check if the node is available to the specified user.
Arguments:
- user (str): The username to check access for.
Returns:
bool: True if the node is up and schedulable, False otherwise.
137 @classmethod 138 def from_dict(cls, name: str, info: dict[str, str]) -> Self: 139 """ 140 Construct a new instance of SlurmNode from node name and a dictionary of node information. 141 142 Args: 143 name (str): The unique name of the node. 144 info (dict[str, str]): A dictionary containing Slurm node metadata as key-value pairs. 145 146 Returns: 147 Self: A new instance of SlurmNode. 148 149 Note: 150 This method does not perform any validation or processing of the provided dictionary. 151 """ 152 node = cls.__new__(cls) 153 node._name = name 154 node._info = info 155 156 return node
Construct a new instance of SlurmNode from node name and a dictionary of node information.
Arguments:
- name (str): The unique name of the node.
- info (dict[str, str]): A dictionary containing Slurm node metadata as key-value pairs.
Returns:
Self: A new instance of SlurmNode.
Note:
This method does not perform any validation or processing of the provided dictionary.
94class SlurmQueue(BatchQueueInterface): 95 """ 96 Implementation of BatchQueueInterface for Slurm. 97 Stores metadata for a single Slurm queue. 98 """ 99 100 def __init__(self, name: str): 101 self._name = name 102 self._info: dict[str, str] = {} 103 104 self.update() 105 106 def update(self) -> None: 107 # get queue info from Slurm 108 command = f"scontrol show partition {self._name} -o" 109 logger.debug(command) 110 111 result = subprocess.run( 112 ["bash"], 113 input=command, 114 text=True, 115 check=False, 116 capture_output=True, 117 errors="replace", 118 ) 119 120 if result.returncode != 0: 121 raise QQError(f"Queue '{self._name}' does not exist.") 122 123 self._info = parse_slurm_dump_to_dictionary(result.stdout) 124 self._set_job_numbers() 125 126 def get_name(self) -> str: 127 return self._name 128 129 def get_priority(self) -> str | None: 130 if not (tier := self._info.get("PriorityTier")): 131 return None 132 133 if not (job_factor := self._info.get("PriorityJobFactor")): 134 return None 135 136 return f"T{tier} ({job_factor})" 137 138 def get_total_jobs(self) -> int | None: 139 return self._running_jobs + self._queued_jobs + self._other_jobs 140 141 def get_running_jobs(self) -> int | None: 142 return self._running_jobs 143 144 def get_queued_jobs(self) -> int | None: 145 return self._queued_jobs 146 147 def get_other_jobs(self) -> int | None: 148 return self._other_jobs 149 150 def get_max_walltime(self) -> timedelta | None: 151 if raw := self._info.get("MaxTime"): 152 return dhhmmss_to_duration(raw) 153 154 return None 155 156 def get_max_n_nodes(self) -> int | None: 157 if not (raw := self._info.get("MaxNodes")): 158 return None 159 160 try: 161 return int(raw) 162 except ValueError as e: 163 logger.debug(f"Could not parse the 'MaxNodes' property as integer: {e}.") 164 return None 165 166 def get_comment(self) -> str | None: 167 return None 168 169 def is_available_to_user(self, user: str) -> bool: 170 # check the queue's state 171 state = self._info.get("State", "DOWN") 172 if state not in ["UP", "DRAIN"]: 173 return False 174 175 def parse_list(value): 176 if not value or value == "(null)" or value == "ALL": 177 return None 178 return [item.strip() for item in value.split(",")] 179 180 # check allowed accounts 181 allow_accounts = parse_list(self._info.get("AllowAccounts", "ALL")) 182 if allow_accounts and user not in allow_accounts: 183 return False 184 185 # check denied accounts 186 deny_accounts = parse_list(self._info.get("DenyAccounts", "(null)")) 187 if deny_accounts and user in deny_accounts: 188 return False 189 190 # check allowed groups 191 user_groups = UserGroups.get_groups_or_init(user) 192 allow_groups = parse_list(self._info.get("AllowGroups", "ALL")) 193 if allow_groups and not any(group in allow_groups for group in user_groups): 194 return False 195 196 # check denied groups 197 deny_groups = parse_list(self._info.get("DenyGroups", "(null)")) 198 if deny_groups and any(group in deny_groups for group in user_groups): 199 return False 200 201 # check allowed QOS 202 user_qos = UserGroups.get_qos_or_init(user) 203 allow_qos = parse_list(self._info.get("AllowQos", "ALL")) 204 if allow_qos and user_qos not in allow_qos: 205 return False 206 207 # check denies QOS 208 deny_qos = parse_list(self._info.get("DenyQos", "(null)")) 209 return not (deny_qos and user_qos in deny_qos) 210 211 def get_destinations(self) -> list[str]: 212 # no destinations 213 return [] 214 215 def from_route_only(self) -> bool: 216 return False 217 218 def to_yaml(self) -> str: 219 return yaml.dump( 220 self._info, default_flow_style=False, sort_keys=False, Dumper=Dumper 221 ) 222 223 def get_default_resources(self) -> Resources: 224 return default_resources_from_dict(self._info) 225 226 @classmethod 227 def from_dict(cls, name: str, info: dict[str, str]) -> Self: 228 """ 229 Construct a new instance of SlurmQueue from a queue name and a dictionary of queue information. 230 231 232 Args: 233 name (str): The unique name of the queue. 234 info (dict[str, str]): A dictionary containing Slurm queue metadata as key-value pairs. 235 236 Returns: 237 Self: A new instance of SlurmQueue. 238 239 Note: 240 This method does not perform any validation or processing of the provided dictionary. 241 """ 242 queue = cls.__new__(cls) 243 queue._name = name 244 queue._info = info 245 queue._set_job_numbers() 246 247 return queue 248 249 def _set_job_numbers(self) -> None: 250 """ 251 Get and set the numbers of jobs in this queue. 252 """ 253 self._running_jobs = 0 254 self._queued_jobs = 0 255 self._other_jobs = 0 256 257 # get the numbers of jobs in the queue by individual states 258 command = f'squeue -p {self._name} -h -o "%T" | uniq -c' 259 logger.debug(command) 260 261 result = subprocess.run( 262 ["bash"], 263 input=command, 264 text=True, 265 check=False, 266 capture_output=True, 267 errors="replace", 268 ) 269 270 if result.returncode != 0: 271 raise QQError( 272 f"Could not get job numbers for queue '{self._name}': {result.stderr.strip()}." 273 ) 274 275 for line in result.stdout.splitlines(): 276 if not line.strip(): 277 continue 278 279 try: 280 count_str, job_type = line.split() 281 count = int(count_str) 282 except ValueError as e: 283 logger.warning( 284 f"Could not parse line '{line}' when obtaining job numbers for queue '{self._name}': {e}." 285 ) 286 continue 287 288 match job_type: 289 case "RUNNING": 290 self._running_jobs += count 291 case "PENDING": 292 self._queued_jobs += count 293 case "SUSPENDED" | "PREEMPTED": 294 self._other_jobs += count 295 # ignore other jobs
Implementation of BatchQueueInterface for Slurm. Stores metadata for a single Slurm queue.
106 def update(self) -> None: 107 # get queue info from Slurm 108 command = f"scontrol show partition {self._name} -o" 109 logger.debug(command) 110 111 result = subprocess.run( 112 ["bash"], 113 input=command, 114 text=True, 115 check=False, 116 capture_output=True, 117 errors="replace", 118 ) 119 120 if result.returncode != 0: 121 raise QQError(f"Queue '{self._name}' does not exist.") 122 123 self._info = parse_slurm_dump_to_dictionary(result.stdout) 124 self._set_job_numbers()
Refresh the stored queue information from the batch system.
Raises:
- QQError: If the queue cannot be queried or its info updated.
Retrieve the name of the queue.
Returns:
str: The name identifying this queue in the batch system.
129 def get_priority(self) -> str | None: 130 if not (tier := self._info.get("PriorityTier")): 131 return None 132 133 if not (job_factor := self._info.get("PriorityJobFactor")): 134 return None 135 136 return f"T{tier} ({job_factor})"
Retrieve the scheduling priority of the queue.
Returns:
str | None: The queue priority, or None if priority information is not available.
138 def get_total_jobs(self) -> int | None: 139 return self._running_jobs + self._queued_jobs + self._other_jobs
Retrieve the total number of jobs currently in the queue.
Returns:
int | None: The total count of jobs, regardless of status or
Noneif the information is not available.
Retrieve the number of jobs currently running in the queue.
Returns:
int | None: The number of running jobs or
Noneif the information is not available.
Retrieve the number of jobs waiting to start in the queue.
Returns:
int | None: The number of queued jobs or
Noneif the information is not available.
Retrieve the number of jobs in other states (non-running and non-queued).
Returns:
int | None: The number of jobs that are neither running nor queued, such as exiting or suspended jobs. Returns
Noneif the information is not available.
150 def get_max_walltime(self) -> timedelta | None: 151 if raw := self._info.get("MaxTime"): 152 return dhhmmss_to_duration(raw) 153 154 return None
Retrieve the maximum walltime allowed for jobs in the queue.
Returns:
timedelta | None: The walltime limit, or None if unlimited or unknown.
156 def get_max_n_nodes(self) -> int | None: 157 if not (raw := self._info.get("MaxNodes")): 158 return None 159 160 try: 161 return int(raw) 162 except ValueError as e: 163 logger.debug(f"Could not parse the 'MaxNodes' property as integer: {e}.") 164 return None
Retrieve the maximum number of nodes that can be requested in the queue.
Returns:
int | None: The maximum number of nodes that can be requested, or None if unlimited or unknown.
Retrieve the comment or description associated with the queue.
Returns:
str | None: The human-readable comment or note about the queue or
Noneif the information is not available.
169 def is_available_to_user(self, user: str) -> bool: 170 # check the queue's state 171 state = self._info.get("State", "DOWN") 172 if state not in ["UP", "DRAIN"]: 173 return False 174 175 def parse_list(value): 176 if not value or value == "(null)" or value == "ALL": 177 return None 178 return [item.strip() for item in value.split(",")] 179 180 # check allowed accounts 181 allow_accounts = parse_list(self._info.get("AllowAccounts", "ALL")) 182 if allow_accounts and user not in allow_accounts: 183 return False 184 185 # check denied accounts 186 deny_accounts = parse_list(self._info.get("DenyAccounts", "(null)")) 187 if deny_accounts and user in deny_accounts: 188 return False 189 190 # check allowed groups 191 user_groups = UserGroups.get_groups_or_init(user) 192 allow_groups = parse_list(self._info.get("AllowGroups", "ALL")) 193 if allow_groups and not any(group in allow_groups for group in user_groups): 194 return False 195 196 # check denied groups 197 deny_groups = parse_list(self._info.get("DenyGroups", "(null)")) 198 if deny_groups and any(group in deny_groups for group in user_groups): 199 return False 200 201 # check allowed QOS 202 user_qos = UserGroups.get_qos_or_init(user) 203 allow_qos = parse_list(self._info.get("AllowQos", "ALL")) 204 if allow_qos and user_qos not in allow_qos: 205 return False 206 207 # check denies QOS 208 deny_qos = parse_list(self._info.get("DenyQos", "(null)")) 209 return not (deny_qos and user_qos in deny_qos)
Check whether the specified user has access to this queue.
Arguments:
- user (str): The username to check access for.
Returns:
bool: True if the user can submit jobs to this queue, False otherwise.
Retrieve all destinations available for this queue route.
Returns:
list[str]: A list of destination queue names associated with the queue.
Determine whether this queue can only be accessed via a route.
Returns:
bool: True if the queue is accessible exclusively through a route, False otherwise.
218 def to_yaml(self) -> str: 219 return yaml.dump( 220 self._info, default_flow_style=False, sort_keys=False, Dumper=Dumper 221 )
Return all information about the queue from the batch system in YAML format.
Returns:
str: YAML-formatted string of queue metadata.
223 def get_default_resources(self) -> Resources: 224 return default_resources_from_dict(self._info)
Return the default resource definitions for this queue.
Returns:
Resources: Default resources allocated for jobs submitted to this queue.
226 @classmethod 227 def from_dict(cls, name: str, info: dict[str, str]) -> Self: 228 """ 229 Construct a new instance of SlurmQueue from a queue name and a dictionary of queue information. 230 231 232 Args: 233 name (str): The unique name of the queue. 234 info (dict[str, str]): A dictionary containing Slurm queue metadata as key-value pairs. 235 236 Returns: 237 Self: A new instance of SlurmQueue. 238 239 Note: 240 This method does not perform any validation or processing of the provided dictionary. 241 """ 242 queue = cls.__new__(cls) 243 queue._name = name 244 queue._info = info 245 queue._set_job_numbers() 246 247 return queue
Construct a new instance of SlurmQueue from a queue name and a dictionary of queue information.
Arguments:
- name (str): The unique name of the queue.
- info (dict[str, str]): A dictionary containing Slurm queue metadata as key-value pairs.
Returns:
Self: A new instance of SlurmQueue.
Note:
This method does not perform any validation or processing of the provided dictionary.
32class Slurm(BatchInterface[SlurmJob, SlurmQueue, SlurmNode]): 33 """ 34 Implementation of BatchInterface for Slurm batch system. 35 """ 36 37 @classmethod 38 def env_name(cls) -> str: 39 return "Slurm" 40 41 @classmethod 42 def is_available(cls) -> bool: 43 return ( 44 shutil.which("sbatch") is not None 45 and shutil.which("it4ifree") is None 46 and shutil.which("lumi-allocations") is None 47 ) 48 49 @classmethod 50 def get_job_id(cls) -> str | None: 51 return os.environ.get("SLURM_JOB_ID") 52 53 @classmethod 54 def job_submit( 55 cls, 56 res: Resources, 57 queue: str, 58 script: Path, 59 job_name: str, 60 depend: list[Depend], 61 env_vars: dict[str, str], 62 account: str | None = None, 63 server: str | None = None, 64 remote_host: str | None = None, 65 ) -> str: 66 # server is unused 67 if server: 68 logger.warning("The 'server' option is ignored for Slurm.") 69 70 input_dir = script.parent 71 logger.debug(f"Job submission: input directory is '{str(input_dir)}'.") 72 73 # intentionally using PBS 74 PBS._shared_guard(input_dir, res, env_vars, server, remote_host) 75 76 command = cls._translate_submit( 77 res, queue, script.parent, str(script), job_name, depend, env_vars, account 78 ) 79 80 if not remote_host: 81 logger.debug(f"Submitting job using '{command}'.") 82 result = subprocess.run( 83 ["bash"], 84 input=command, 85 text=True, 86 check=False, 87 capture_output=True, 88 errors="replace", 89 ) 90 else: 91 # submit the script from the remote host 92 logger.debug( 93 f"Navigating to '{remote_host}' to execute the submission command '{command}'." 94 ) 95 result = subprocess.run( 96 [ 97 "ssh", 98 "-o PasswordAuthentication=no", 99 "-o GSSAPIAuthentication=yes", 100 "-o StrictHostKeyChecking=no", # allow unknown hosts 101 f"-o ConnectTimeout={CFG.timeouts.ssh}", 102 "-q", # suppress some SSH messages 103 remote_host, 104 command, 105 ], 106 capture_output=True, 107 text=True, 108 ) 109 110 if result.returncode != 0: 111 raise QQError( 112 f"Failed to submit script '{str(script)}': {result.stderr.strip()}." 113 ) 114 115 return result.stdout.split()[-1] 116 117 @classmethod 118 def job_kill(cls, job_id: str) -> None: 119 command = cls._translate_kill(job_id) 120 logger.debug(command) 121 122 # run the kill command 123 result = subprocess.run( 124 ["bash"], 125 input=command, 126 text=True, 127 check=False, 128 capture_output=True, 129 errors="replace", 130 ) 131 132 if result.returncode != 0: 133 raise QQError(f"Failed to kill job '{job_id}': {result.stderr.strip()}.") 134 135 @classmethod 136 def job_kill_force(cls, job_id: str) -> None: 137 command = cls._translate_kill_force(job_id) 138 logger.debug(command) 139 140 # run the kill command 141 result = subprocess.run( 142 ["bash"], 143 input=command, 144 text=True, 145 check=False, 146 capture_output=True, 147 errors="replace", 148 ) 149 150 if result.returncode != 0: 151 raise QQError(f"Failed to kill job '{job_id}': {result.stderr.strip()}.") 152 153 @classmethod 154 def get_batch_job(cls, job_id: str) -> SlurmJob: 155 return SlurmJob(job_id) 156 157 @classmethod 158 def get_batch_jobs_from_ids(cls, job_ids: list[str]) -> list[SlurmJob]: 159 command = f"sacct --allocations --noheader --parsable2 -j {','.join(job_ids)} --format={SACCT_FIELDS}" 160 logger.debug(command) 161 162 # sacct ignores IDs of nonexistent jobs - it will just not print any output for them 163 sacct_jobs = cls._get_batch_jobs_using_sacct_command(command) 164 sacct_jobs_dict = { 165 job.get_id(): job 166 for job in sacct_jobs 167 # filter out pending (queued) jobs - we want to use scontrol for them 168 if job.get_state() not in {BatchState.QUEUED, BatchState.HELD} 169 } 170 171 # get information about the remaining jobs using scontrol 172 scontrol_jobs = cls._get_jobs_in_parallel( 173 [id for id in job_ids if id not in sacct_jobs_dict] 174 ) 175 scontrol_jobs_dict = {job.get_id(): job for job in scontrol_jobs} 176 177 # merge all jobs but maintain their original order 178 all_jobs = [] 179 for id in job_ids: 180 if (job := sacct_jobs_dict.get(id)) or (job := scontrol_jobs_dict.get(id)): 181 all_jobs.append(job) 182 183 return all_jobs 184 185 @classmethod 186 def get_unfinished_batch_jobs( 187 cls, user: str, server: str | None = None 188 ) -> list[SlurmJob]: 189 # server unused 190 _ = server 191 192 # get running jobs from sacct (faster than using squeue and scontrol) 193 command = f"sacct -u {user} --state RUNNING --allocations --noheader --parsable2 --array --format={SACCT_FIELDS}" 194 logger.debug(command) 195 196 sacct_jobs = cls._get_batch_jobs_using_sacct_command(command) 197 198 # get pending jobs using squeue 199 command = f'squeue -u {user} --array -t PENDING -h -o "%i"' 200 logger.debug(command) 201 202 squeue_jobs = cls._get_batch_jobs_using_squeue_command(command) 203 204 # filter out duplicate jobs 205 merged = {job.get_id(): job for job in sacct_jobs + squeue_jobs} 206 return list(merged.values()) 207 208 @classmethod 209 def get_batch_jobs(cls, user: str, server: str | None = None) -> list[SlurmJob]: 210 # server unused 211 _ = server 212 213 # get all jobs, except pending for which full information is not available using sacct 214 command = f"sacct -u {user} --allocations --noheader --parsable2 --array --format={SACCT_FIELDS}" 215 logger.debug(command) 216 217 sacct_jobs = cls._get_batch_jobs_using_sacct_command(command) 218 219 # get pending jobs using squeue 220 command = f'squeue -u {user} --array -t PENDING -h -o "%i"' 221 logger.debug(command) 222 223 squeue_jobs = cls._get_batch_jobs_using_squeue_command(command) 224 225 # filter out duplicate jobs 226 merged = {job.get_id(): job for job in sacct_jobs + squeue_jobs} 227 return list(merged.values()) 228 229 @classmethod 230 def get_all_unfinished_batch_jobs(cls, server: str | None = None) -> list[SlurmJob]: 231 # server unused 232 _ = server 233 234 # get running jobs using sacct (faster than using squeue and scontrol) 235 command = f"sacct --state RUNNING --allusers --allocations --noheader --parsable2 --array --format={SACCT_FIELDS}" 236 logger.debug(command) 237 238 sacct_jobs = cls._get_batch_jobs_using_sacct_command(command) 239 240 # get pending jobs using squeue 241 command = 'squeue --array -t PENDING -h -o "%i"' 242 logger.debug(command) 243 244 squeue_jobs = cls._get_batch_jobs_using_squeue_command(command) 245 246 # filter out duplicate jobs 247 merged = {job.get_id(): job for job in sacct_jobs + squeue_jobs} 248 return list(merged.values()) 249 250 @classmethod 251 def get_all_batch_jobs(cls, server: str | None = None) -> list[SlurmJob]: 252 # server unused 253 _ = server 254 255 # get all jobs, except pending which are not available from sacct 256 command = f"sacct --allusers --allocations --noheader --parsable2 --array --format={SACCT_FIELDS}" 257 logger.debug(command) 258 259 sacct_jobs = cls._get_batch_jobs_using_sacct_command(command) 260 261 # get pending jobs using squeue 262 command = 'squeue --array -t PENDING -h -o "%i"' 263 logger.debug(command) 264 265 squeue_jobs = cls._get_batch_jobs_using_squeue_command(command) 266 267 # filter out duplicate jobs 268 merged = {job.get_id(): job for job in sacct_jobs + squeue_jobs} 269 return list(merged.values()) 270 271 @classmethod 272 def get_queues(cls, server: str | None = None) -> list[SlurmQueue]: 273 # server unused 274 _ = server 275 276 command = "scontrol show partition -o" 277 logger.debug(command) 278 279 result = subprocess.run( 280 ["bash"], 281 input=command, 282 text=True, 283 check=False, 284 capture_output=True, 285 errors="replace", 286 ) 287 288 if result.returncode != 0: 289 raise QQError( 290 f"Could not retrieve information about queues: {result.stderr.strip()}." 291 ) 292 293 queues = [] 294 for line in result.stdout.splitlines(): 295 info = parse_slurm_dump_to_dictionary(line) 296 queues.append(SlurmQueue.from_dict(info["PartitionName"], info)) 297 298 return queues 299 300 @classmethod 301 def get_nodes(cls, server: str | None = None) -> list[SlurmNode]: 302 # server unused 303 _ = server 304 305 command = "scontrol show node -o" 306 logger.debug(command) 307 308 result = subprocess.run( 309 ["bash"], 310 input=command, 311 text=True, 312 check=False, 313 capture_output=True, 314 errors="replace", 315 ) 316 317 if result.returncode != 0: 318 raise QQError( 319 f"Could not retrieve information about nodes: {result.stderr.strip()}." 320 ) 321 322 nodes = [] 323 for line in result.stdout.splitlines(): 324 info = parse_slurm_dump_to_dictionary(line) 325 nodes.append(SlurmNode.from_dict(info["NodeName"], info)) 326 327 return nodes 328 329 @classmethod 330 def read_remote_file(cls, host: str, file: Path) -> str: 331 return PBS.read_remote_file(host, file) 332 333 @classmethod 334 def write_remote_file(cls, host: str, file: Path, content: str) -> None: 335 PBS.write_remote_file(host, file, content) 336 337 @classmethod 338 def make_remote_dir(cls, host: str, directory: Path) -> None: 339 PBS.make_remote_dir(host, directory) 340 341 @classmethod 342 def list_remote_dir(cls, host: str, directory: Path) -> list[Path]: 343 return PBS.list_remote_dir(host, directory) 344 345 @classmethod 346 def delete_remote_dir(cls, host: str, directory: Path) -> None: 347 PBS.delete_remote_dir(host, directory) 348 349 @classmethod 350 def move_remote_files( 351 cls, host: str, files: list[Path], moved_files: list[Path] 352 ) -> None: 353 PBS.move_remote_files(host, files, moved_files) 354 355 @classmethod 356 def sync_with_exclusions( 357 cls, 358 src_dir: Path, 359 dest_dir: Path, 360 src_host: str | None, 361 dest_host: str | None, 362 exclude_files: list[Path] | None = None, 363 ) -> None: 364 PBS.sync_with_exclusions(src_dir, dest_dir, src_host, dest_host, exclude_files) 365 366 @classmethod 367 def sync_selected( 368 cls, 369 src_dir: Path, 370 dest_dir: Path, 371 src_host: str | None, 372 dest_host: str | None, 373 include_files: list[Path] | None = None, 374 ) -> None: 375 PBS.sync_selected(src_dir, dest_dir, src_host, dest_host, include_files) 376 377 @classmethod 378 def sort_jobs(cls, jobs: list[SlurmJob]) -> None: 379 jobs.sort(key=lambda job: job.get_ids_for_sorting()) 380 381 @classmethod 382 def jobs_presenter_columns_to_show(cls) -> set[str]: 383 return { 384 "S", 385 "Job ID", 386 "User", 387 "Job Name", 388 "Queue", 389 "NCPUs", 390 "NGPUs", 391 "NNodes", 392 "Times", 393 "Node", 394 "Exit", 395 } 396 397 @classmethod 398 def _translate_kill(cls, job_id: str) -> str: 399 """ 400 Generate the Slurm kill command for a job using SIGTERM. 401 402 Args: 403 job_id (str): The ID of the job to kill. 404 405 Returns: 406 str: The scancel command sending SIGTERM. 407 """ 408 return f"scancel {job_id}" 409 410 @classmethod 411 def _translate_kill_force(cls, job_id: str) -> str: 412 """ 413 Generate the Slurm kill command for a job using SIGKILL. 414 415 Args: 416 job_id (str): The ID of the job to kill. 417 418 Returns: 419 str: The scancel command sending SIGKILL. 420 """ 421 return f"scancel --signal=KILL {job_id}" 422 423 @classmethod 424 def _translate_submit( 425 cls, 426 res: Resources, 427 queue: str, 428 input_dir: Path, 429 script: str, 430 job_name: str, 431 depend: list[Depend], 432 env_vars: dict[str, str], 433 account: str | None, 434 ) -> str: 435 """ 436 Generate the Slurm submission command for a job. 437 438 Args: 439 res (Resources): The resources requested for the job. 440 queue (str): The queue name to submit to. 441 input_dir (Path): The directory from which the job is being submitted. 442 script (str): Path to the job script. 443 job_name (str): Name of the job. 444 depend (list[Depend]): List of dependencies of the job. 445 env_vars (dict[str, str]): Dictionary of environment variables and their values to propagate to the job's environment. 446 account (str | None): Optional name of the account to use for the job. 447 448 Returns: 449 str: The fully constructed sbatch command string. 450 """ 451 qq_output = str((input_dir / job_name).with_suffix(CFG.suffixes.qq_out)) 452 command = f"sbatch -J {job_name} -p {queue} -e {qq_output} -o {qq_output} " 453 454 if account: 455 command += f"--account {account} " 456 457 # translate environment variables 458 if env_vars: 459 command += f"--export ALL,{cls._translate_env_vars(env_vars)} " 460 461 # handle number of nodes 462 command += f"--nodes {res.nnodes} " 463 464 # handle per-chunk resources 465 translated = cls._translate_per_chunk_resources(res) 466 command += " ".join(translated) + " " 467 468 # handle properties 469 if res.props: 470 constraints = [] 471 for k, v in res.props.items(): 472 if v != "true": 473 raise QQError( 474 f"Slurm only supports properties with a value of 'true', not '{k}={v}'." 475 ) 476 constraints.append(k) 477 478 command += f'--constraint="{"&".join(constraints)}" ' 479 480 # handle walltime 481 if res.walltime: 482 command += f"--time={res.walltime} " 483 484 # handle dependencies 485 if converted_depend := cls._translate_dependencies(depend): 486 command += f"--dependency={converted_depend} " 487 488 # set input directory for the job 489 command += f"--chdir={str(input_dir)} " 490 491 # add script 492 command += str(input_dir / script) 493 494 return command 495 496 @classmethod 497 def _translate_env_vars(cls, env_vars: dict[str, str]) -> str: 498 """ 499 Convert a dictionary of environment variables into a formatted string. 500 501 Args: 502 env_vars (dict[str, str]): A mapping of environment variable names 503 to their corresponding values. 504 505 Returns: 506 str: A comma-separated string of environment variable assignments, 507 suitable for inclusion in the sbatch command. 508 """ 509 converted = [] 510 for key, value in env_vars.items(): 511 converted.append(f'{key}="{value}"') 512 513 return ",".join(converted) 514 515 @classmethod 516 def _translate_per_chunk_resources(cls, res: Resources) -> list[str]: 517 """ 518 Convert a Resources object into a list of per-node resource specifications. 519 520 Each resource that can be divided by the number of nodes (nnodes) is split 521 accordingly. 522 523 Args: 524 res (Resources): The resource specification for the job. 525 526 Returns: 527 list[str]: A list of per-node resource strings suitable for inclusion 528 in the sbatch command. 529 530 Raises: 531 QQError: If sanity checks fail or required memory attributes are missing. 532 """ 533 534 trans_res = [] 535 536 # sanity checking per-chunk resources 537 if res.nnodes is None: 538 raise QQError( 539 "Attribute 'nnodes' should not be undefined. This is a bug, please report it." 540 ) 541 if res.nnodes == 0: 542 raise QQError("Attribute 'nnodes' cannot be 0.") 543 544 if res.ncpus and res.ncpus != 0 and res.ncpus % res.nnodes != 0: 545 raise QQError( 546 f"Attribute 'ncpus' ({res.ncpus}) must be divisible by 'nnodes' ({res.nnodes})." 547 ) 548 if res.ngpus and res.ngpus != 0 and res.ngpus % res.nnodes != 0: 549 raise QQError( 550 f"Attribute 'ngpus' ({res.ngpus}) must be divisible by 'nnodes' ({res.nnodes})." 551 ) 552 553 # translate per-chunk resources 554 if res.ncpus: 555 # we set MPI ranks and OpenMP threads here, but these can be overriden 556 # in the body of the script 557 # this setup is here only to allow for better accounting by Slurm 558 trans_res.append("--ntasks-per-node=1") 559 trans_res.append(f"--cpus-per-task={res.ncpus // res.nnodes}") 560 elif res.ncpus_per_node: 561 trans_res.append("--ntasks-per-node=1") 562 trans_res.append(f"--cpus-per-task={res.ncpus_per_node}") 563 564 if res.mem: 565 trans_res.append(f"--mem={(res.mem // res.nnodes).to_str_exact_slurm()}") 566 elif res.mem_per_node: 567 trans_res.append(f"--mem={res.mem_per_node.to_str_exact_slurm()}") 568 elif res.mem_per_cpu: 569 trans_res.append(f"--mem-per-cpu={res.mem_per_cpu.to_str_exact_slurm()}") 570 else: 571 # memory not set in any way 572 raise QQError( 573 "None of the attributes 'mem', 'mem-per-node', or 'mem-per-cpu' is defined." 574 ) 575 576 if res.ngpus: 577 trans_res.append(f"--gpus-per-node={res.ngpus // res.nnodes}") 578 elif res.ngpus_per_node: 579 trans_res.append(f"--gpus-per-node={res.ngpus_per_node}") 580 581 return trans_res 582 583 @classmethod 584 def _translate_dependencies(cls, depend: list[Depend]) -> str | None: 585 """ 586 Convert a list of `Depend` objects into a Slurm-compatible dependency string. 587 588 Args: 589 depend (list[Depend]): List of dependency objects to translate. 590 591 Returns: 592 str | None: Slurm-style dependency string (e.g., "after:12345,afterok:1:2:3"), 593 or None if the input list is empty. 594 """ 595 if not depend: 596 return None 597 598 return ",".join(Depend.to_str(x).replace("=", ":") for x in depend) 599 600 @classmethod 601 def _get_default_server_resources(cls) -> Resources: 602 """ 603 Return a Resources object representing the default resources for a batch job. 604 605 Returns: 606 Resources: Default batch job resources obtained from `slurm.conf`. 607 """ 608 command = "scontrol show config" 609 610 result = subprocess.run( 611 ["bash"], 612 input=command, 613 text=True, 614 check=False, 615 capture_output=True, 616 errors="replace", 617 ) 618 619 if result.returncode != 0: 620 logger.debug("Could not get server resources. Ignoring.") 621 return Resources() 622 623 info = parse_slurm_dump_to_dictionary(result.stdout, "\n") 624 server_resources = default_resources_from_dict(info) 625 626 return Resources.merge_resources(server_resources, cls._get_default_resources()) 627 628 @classmethod 629 def _get_default_resources(cls) -> Resources: 630 """ 631 Return a Resources object representing the default, hard-coded resources for a batch job. 632 """ 633 return Resources( 634 nnodes=1, 635 ncpus=1, 636 mem_per_cpu="1gb", 637 work_dir="scratch_local", 638 work_size_per_cpu="1gb", 639 walltime="1d", 640 ) 641 642 @classmethod 643 def _get_batch_jobs_using_sacct_command(cls, command: str) -> list[SlurmJob]: 644 """ 645 Execute `sacct` to retrieve information about Slurm jobs and parse it. 646 647 Args: 648 command (str): A Slurm command to get the relevant jobs. 649 650 Returns: 651 list[SlurmJob]: A list of `SlurmJob` instances corresponding to the jobs 652 returned by the command. 653 654 Raises: 655 QQError: If the command fails (non-zero return code) or if the output 656 cannot be parsed into valid job information. 657 """ 658 result = subprocess.run( 659 ["bash"], 660 input=command, 661 text=True, 662 check=False, 663 capture_output=True, 664 errors="replace", 665 ) 666 667 if result.returncode != 0: 668 raise QQError( 669 f"Could not retrieve information about jobs: {result.stderr.strip()}." 670 ) 671 672 jobs = [] 673 for sacct_string in result.stdout.split("\n"): 674 if sacct_string.strip() == "": 675 continue 676 677 jobs.append(SlurmJob.from_sacct_string(sacct_string)) 678 679 return jobs 680 681 @classmethod 682 def _get_batch_jobs_using_squeue_command(cls, command: str) -> list[SlurmJob]: 683 """ 684 Execute `squeue` and `scontrol show job` to retrieve information about Slurm jobs. 685 686 Multiple `scontrol` commands are executed in parallel 687 to increase the speed of collecting the information about jobs. 688 689 Note that the jobs are returned in an arbitrary order. 690 691 Args: 692 command (str): A Slurm command to get the relevant job IDs. 693 694 Returns: 695 list[SlurmJob]: A list of `SlurmJob` instances corresponding to the jobs 696 returned by the command. 697 698 Raises: 699 QQError: If the command fails (non-zero return code) or if the output 700 cannot be parsed into valid job information. 701 """ 702 result = subprocess.run( 703 ["bash"], 704 input=command, 705 text=True, 706 check=False, 707 capture_output=True, 708 errors="replace", 709 ) 710 711 if result.returncode != 0: 712 raise QQError( 713 f"Could not retrieve information about jobs: {result.stderr.strip()}." 714 ) 715 716 ids = [line.strip() for line in result.stdout.split("\n") if line.strip()] 717 718 return Slurm._get_jobs_in_parallel(ids) 719 720 @classmethod 721 def _get_jobs_in_parallel(cls, job_ids: list[str]) -> list[SlurmJob]: 722 """ 723 Constructs Slurm jobs in parallel. 724 725 Args: 726 job_ids (list[str]): A list of job IDs to collect. 727 728 Returns: 729 list[SlurmJob]: A list of SlurmJob objects. 730 """ 731 732 def get_job(job_id: str) -> SlurmJob: 733 return SlurmJob(job_id) 734 735 jobs: list[SlurmJob] = [] 736 737 with ThreadPoolExecutor( 738 max_workers=CFG.slurm_options.jobs_scontrol_nthreads 739 ) as executor: 740 future_to_id = {executor.submit(get_job, id): id for id in job_ids} 741 742 for future in as_completed(future_to_id): 743 try: 744 jobs.append(future.result()) 745 except Exception as e: 746 id = future_to_id[future] 747 raise QQError(f"Failed to load job {id}: {e}.") from e 748 749 return jobs
Implementation of BatchInterface for Slurm batch system.
Return the name of the batch system environment.
Returns:
str: The batch system name.
41 @classmethod 42 def is_available(cls) -> bool: 43 return ( 44 shutil.which("sbatch") is not None 45 and shutil.which("it4ifree") is None 46 and shutil.which("lumi-allocations") is None 47 )
Determine whether the batch system is available on the current host.
Implementations typically verify this by checking for the presence of required commands or other environment-specific indicators.
Returns:
bool: True if the batch system is available, False otherwise.
Get the id of the current job from the corresponding batch system's environment variable.
For this method to work, it has to be called from the inside of an active job.
Returns:
str | None: Index of the job or None if the collective variable is not set.
53 @classmethod 54 def job_submit( 55 cls, 56 res: Resources, 57 queue: str, 58 script: Path, 59 job_name: str, 60 depend: list[Depend], 61 env_vars: dict[str, str], 62 account: str | None = None, 63 server: str | None = None, 64 remote_host: str | None = None, 65 ) -> str: 66 # server is unused 67 if server: 68 logger.warning("The 'server' option is ignored for Slurm.") 69 70 input_dir = script.parent 71 logger.debug(f"Job submission: input directory is '{str(input_dir)}'.") 72 73 # intentionally using PBS 74 PBS._shared_guard(input_dir, res, env_vars, server, remote_host) 75 76 command = cls._translate_submit( 77 res, queue, script.parent, str(script), job_name, depend, env_vars, account 78 ) 79 80 if not remote_host: 81 logger.debug(f"Submitting job using '{command}'.") 82 result = subprocess.run( 83 ["bash"], 84 input=command, 85 text=True, 86 check=False, 87 capture_output=True, 88 errors="replace", 89 ) 90 else: 91 # submit the script from the remote host 92 logger.debug( 93 f"Navigating to '{remote_host}' to execute the submission command '{command}'." 94 ) 95 result = subprocess.run( 96 [ 97 "ssh", 98 "-o PasswordAuthentication=no", 99 "-o GSSAPIAuthentication=yes", 100 "-o StrictHostKeyChecking=no", # allow unknown hosts 101 f"-o ConnectTimeout={CFG.timeouts.ssh}", 102 "-q", # suppress some SSH messages 103 remote_host, 104 command, 105 ], 106 capture_output=True, 107 text=True, 108 ) 109 110 if result.returncode != 0: 111 raise QQError( 112 f"Failed to submit script '{str(script)}': {result.stderr.strip()}." 113 ) 114 115 return result.stdout.split()[-1]
Submit a job to the batch system.
Can also perform additional validation of the job's resources.
This method is NOT guaranteed to be thread-safe.
Arguments:
- res (Resources): Resources required for the job.
- queue (str): Target queue for the job submission.
- script (Path): Path to the script to execute.
- job_name (str): Name of the job to use.
- depend (list[Depend]): List of job dependencies.
- env_vars (dict[str, str]): Dictionary of environment variables to propagate to the job.
- account (str | None): Optional account name to use for the job.
- server (str | None): Optional name of the server to submit the job to.
- remote_host (str | None): Optional name of the machine to submit the job from.
Returns:
str: Unique ID of the submitted job.
Raises:
- QQError: If the job submission fails.
117 @classmethod 118 def job_kill(cls, job_id: str) -> None: 119 command = cls._translate_kill(job_id) 120 logger.debug(command) 121 122 # run the kill command 123 result = subprocess.run( 124 ["bash"], 125 input=command, 126 text=True, 127 check=False, 128 capture_output=True, 129 errors="replace", 130 ) 131 132 if result.returncode != 0: 133 raise QQError(f"Failed to kill job '{job_id}': {result.stderr.strip()}.")
Terminate a job gracefully. The job should have time for proper cleanup.
Arguments:
- job_id (str): Identifier of the job to terminate.
Raises:
- QQError: If the job could not be killed.
135 @classmethod 136 def job_kill_force(cls, job_id: str) -> None: 137 command = cls._translate_kill_force(job_id) 138 logger.debug(command) 139 140 # run the kill command 141 result = subprocess.run( 142 ["bash"], 143 input=command, 144 text=True, 145 check=False, 146 capture_output=True, 147 errors="replace", 148 ) 149 150 if result.returncode != 0: 151 raise QQError(f"Failed to kill job '{job_id}': {result.stderr.strip()}.")
Forcefully terminate a job. There may be no time for proper cleanup.
Arguments:
- job_id (str): Identifier of the job to forcefully terminate.
Raises:
- QQError: If the job could not be killed.
Retrieve information about a job from the batch system.
The returned object should be fully initialized, even if the job no longer exists or its information is unavailable.
Arguments:
- job_id (str): Identifier of the job.
Returns:
BatchJobInterface: Object containing the job's metadata and state.
157 @classmethod 158 def get_batch_jobs_from_ids(cls, job_ids: list[str]) -> list[SlurmJob]: 159 command = f"sacct --allocations --noheader --parsable2 -j {','.join(job_ids)} --format={SACCT_FIELDS}" 160 logger.debug(command) 161 162 # sacct ignores IDs of nonexistent jobs - it will just not print any output for them 163 sacct_jobs = cls._get_batch_jobs_using_sacct_command(command) 164 sacct_jobs_dict = { 165 job.get_id(): job 166 for job in sacct_jobs 167 # filter out pending (queued) jobs - we want to use scontrol for them 168 if job.get_state() not in {BatchState.QUEUED, BatchState.HELD} 169 } 170 171 # get information about the remaining jobs using scontrol 172 scontrol_jobs = cls._get_jobs_in_parallel( 173 [id for id in job_ids if id not in sacct_jobs_dict] 174 ) 175 scontrol_jobs_dict = {job.get_id(): job for job in scontrol_jobs} 176 177 # merge all jobs but maintain their original order 178 all_jobs = [] 179 for id in job_ids: 180 if (job := sacct_jobs_dict.get(id)) or (job := scontrol_jobs_dict.get(id)): 181 all_jobs.append(job) 182 183 return all_jobs
Retrieve information about multiple jobs from the batch system.
Batch jobs should be returned in the same order as they appear in job_ids.
A TBatchJob object should be returned for each job id, even if the job
no longer exists or its information is unavailable.
Array jobs should NOT be expanded into their individual tasks.
The default implementation is to call get_batch_job for each job id.
This implementation may be inefficient for large numbers of job ids and
should be overriden by subclasses.
Arguments:
- job_ids (list[str]): List of job identifiers.
Returns:
list[TBatchJob]: List of TBatchJob objects, one for each job id.
185 @classmethod 186 def get_unfinished_batch_jobs( 187 cls, user: str, server: str | None = None 188 ) -> list[SlurmJob]: 189 # server unused 190 _ = server 191 192 # get running jobs from sacct (faster than using squeue and scontrol) 193 command = f"sacct -u {user} --state RUNNING --allocations --noheader --parsable2 --array --format={SACCT_FIELDS}" 194 logger.debug(command) 195 196 sacct_jobs = cls._get_batch_jobs_using_sacct_command(command) 197 198 # get pending jobs using squeue 199 command = f'squeue -u {user} --array -t PENDING -h -o "%i"' 200 logger.debug(command) 201 202 squeue_jobs = cls._get_batch_jobs_using_squeue_command(command) 203 204 # filter out duplicate jobs 205 merged = {job.get_id(): job for job in sacct_jobs + squeue_jobs} 206 return list(merged.values())
Retrieve information about all uncompleted jobs submitted by user
on the specified or default batch server.
The jobs can be returned in arbitrary order.
Arguments:
- user (str): Username for which to fetch uncompleted jobs.
- server (str | None): Optional name of the batch server to get jobs from.
Returns:
list[BatchJobInterface]: A list of job info objects representing the user's uncompleted jobs.
208 @classmethod 209 def get_batch_jobs(cls, user: str, server: str | None = None) -> list[SlurmJob]: 210 # server unused 211 _ = server 212 213 # get all jobs, except pending for which full information is not available using sacct 214 command = f"sacct -u {user} --allocations --noheader --parsable2 --array --format={SACCT_FIELDS}" 215 logger.debug(command) 216 217 sacct_jobs = cls._get_batch_jobs_using_sacct_command(command) 218 219 # get pending jobs using squeue 220 command = f'squeue -u {user} --array -t PENDING -h -o "%i"' 221 logger.debug(command) 222 223 squeue_jobs = cls._get_batch_jobs_using_squeue_command(command) 224 225 # filter out duplicate jobs 226 merged = {job.get_id(): job for job in sacct_jobs + squeue_jobs} 227 return list(merged.values())
Retrieve information about all jobs submitted by a specific user (including finished jobs) on the specified or default batch server.
The jobs can be returned in arbitrary order.
Arguments:
- user (str): Username for which to fetch all jobs.
- server (str | None): Optional name of the batch server to get jobs from.
Returns:
list[BatchJobInterface]: A list of job info objects representing all jobs of the user.
229 @classmethod 230 def get_all_unfinished_batch_jobs(cls, server: str | None = None) -> list[SlurmJob]: 231 # server unused 232 _ = server 233 234 # get running jobs using sacct (faster than using squeue and scontrol) 235 command = f"sacct --state RUNNING --allusers --allocations --noheader --parsable2 --array --format={SACCT_FIELDS}" 236 logger.debug(command) 237 238 sacct_jobs = cls._get_batch_jobs_using_sacct_command(command) 239 240 # get pending jobs using squeue 241 command = 'squeue --array -t PENDING -h -o "%i"' 242 logger.debug(command) 243 244 squeue_jobs = cls._get_batch_jobs_using_squeue_command(command) 245 246 # filter out duplicate jobs 247 merged = {job.get_id(): job for job in sacct_jobs + squeue_jobs} 248 return list(merged.values())
Retrieve information about uncompleted jobs of all users on the specified or default batch server.
The jobs can be returned in arbitrary order.
Arguments:
- server (str | None): Optional name of the batch server to get jobs from.
Returns:
list[BatchJobInterface]: A list of job info objects representing uncompleted jobs of all users.
250 @classmethod 251 def get_all_batch_jobs(cls, server: str | None = None) -> list[SlurmJob]: 252 # server unused 253 _ = server 254 255 # get all jobs, except pending which are not available from sacct 256 command = f"sacct --allusers --allocations --noheader --parsable2 --array --format={SACCT_FIELDS}" 257 logger.debug(command) 258 259 sacct_jobs = cls._get_batch_jobs_using_sacct_command(command) 260 261 # get pending jobs using squeue 262 command = 'squeue --array -t PENDING -h -o "%i"' 263 logger.debug(command) 264 265 squeue_jobs = cls._get_batch_jobs_using_squeue_command(command) 266 267 # filter out duplicate jobs 268 merged = {job.get_id(): job for job in sacct_jobs + squeue_jobs} 269 return list(merged.values())
Retrieve information about all jobs of all users on the specified or default batch server.
The jobs can be returned in arbitrary order.
Arguments:
- server (str | None): Optional name of the batch server to get jobs from.
Returns:
list[BatchJobInterface]: A list of job info objects representing all jobs of all users.
271 @classmethod 272 def get_queues(cls, server: str | None = None) -> list[SlurmQueue]: 273 # server unused 274 _ = server 275 276 command = "scontrol show partition -o" 277 logger.debug(command) 278 279 result = subprocess.run( 280 ["bash"], 281 input=command, 282 text=True, 283 check=False, 284 capture_output=True, 285 errors="replace", 286 ) 287 288 if result.returncode != 0: 289 raise QQError( 290 f"Could not retrieve information about queues: {result.stderr.strip()}." 291 ) 292 293 queues = [] 294 for line in result.stdout.splitlines(): 295 info = parse_slurm_dump_to_dictionary(line) 296 queues.append(SlurmQueue.from_dict(info["PartitionName"], info)) 297 298 return queues
Retrieve all queues managed by the batch system on the specified or default batch server.
Arguments:
- server (str | None): Optional name of the batch server to get queues from.
Returns:
list[BatchQueueInterface]: A list of queue objects existing in the batch system.
300 @classmethod 301 def get_nodes(cls, server: str | None = None) -> list[SlurmNode]: 302 # server unused 303 _ = server 304 305 command = "scontrol show node -o" 306 logger.debug(command) 307 308 result = subprocess.run( 309 ["bash"], 310 input=command, 311 text=True, 312 check=False, 313 capture_output=True, 314 errors="replace", 315 ) 316 317 if result.returncode != 0: 318 raise QQError( 319 f"Could not retrieve information about nodes: {result.stderr.strip()}." 320 ) 321 322 nodes = [] 323 for line in result.stdout.splitlines(): 324 info = parse_slurm_dump_to_dictionary(line) 325 nodes.append(SlurmNode.from_dict(info["NodeName"], info)) 326 327 return nodes
Retrieve all nodes managed by the batch system on the specified or default batch server.
Arguments:
- server (str | None): Optional name of the batch server to get nodes from.
Returns:
list[BatchNodeInterface]: A list of node objects existing in the batch system.
329 @classmethod 330 def read_remote_file(cls, host: str, file: Path) -> str: 331 return PBS.read_remote_file(host, file)
Read the contents of a file on a remote host and return it as a string.
The default implementation uses SSH to retrieve the file contents.
This approach may be inefficient on shared storage or high-latency networks.
Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.
Subclasses should override this method to provide a more efficient implementation if possible.
Arguments:
- host (str): The hostname of the remote machine where the file resides.
- file (Path): The path to the file on the remote host.
Returns:
str: The contents of the remote file.
Raises:
- QQError: If the file cannot be read or SSH fails.
333 @classmethod 334 def write_remote_file(cls, host: str, file: Path, content: str) -> None: 335 PBS.write_remote_file(host, file, content)
Write the given content to a file on a remote host, overwriting it if it exists.
The default implementation uses SSH to send the content to the remote file.
This approach may be inefficient on shared storage or high-latency networks.
Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.
Subclasses should override this method to provide a more efficient implementation if possible.
Arguments:
- host (str): The hostname of the remote machine where the file resides.
- file (Path): The path to the file on the remote host.
- content (str): The content to write to the remote file.
Raises:
- QQError: If the file cannot be written or SSH fails.
337 @classmethod 338 def make_remote_dir(cls, host: str, directory: Path) -> None: 339 PBS.make_remote_dir(host, directory)
Create a directory at the specified path on a remote host.
The default implementation uses SSH to run mkdir on the remote host.
This approach may be inefficient on shared storage or high-latency networks.
Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.
Subclasses should override this method to provide a more efficient implementation if possible.
Arguments:
- host (str): The hostname of the remote machine where the directory should be created.
- directory (Path): The path of the directory to create on the remote host.
Raises:
- QQError: If the directory cannot be created but does not already exist or the SSH command fails.
341 @classmethod 342 def list_remote_dir(cls, host: str, directory: Path) -> list[Path]: 343 return PBS.list_remote_dir(host, directory)
List all files and directories (absolute paths) in the specified directory on a remote host.
The default implementation uses SSH to run ls -A on the remote host.
This approach may be inefficient on shared storage or high-latency networks.
Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.
Subclasses should override this method to provide a more efficient implementation if possible.
Arguments:
- host (str): The hostname of the remote machine where the directory resides.
- directory (Path): The remote directory to list.
Returns:
list[Path]: A list of
Pathobjects representing the entries inside the directory. Entries are relative to the givendirectory.
Raises:
- QQError: If the directory cannot be listed or the SSH command fails.
345 @classmethod 346 def delete_remote_dir(cls, host: str, directory: Path) -> None: 347 PBS.delete_remote_dir(host, directory)
Delete a directory on a remote host.
The default implementation uses SSH to run rm -r on the remote host.
This approach may be inefficient on shared storage or high-latency networks.
Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.
Subclasses should override this method to provide a more efficient implementation if possible.
Arguments:
- host (str): The hostname of the remote machine where the directory resides.
- directory (Path): The remote directory to delete.
Raises:
- QQError: If the directory cannot be deleted or the SSH command fails.
349 @classmethod 350 def move_remote_files( 351 cls, host: str, files: list[Path], moved_files: list[Path] 352 ) -> None: 353 PBS.move_remote_files(host, files, moved_files)
Move files on a remote host from their current paths to new paths.
The default implementation uses SSH to run a sequence of mv commands on the remote host.
This approach may be inefficient on shared storage or high-latency networks.
Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.
Subclasses should override this method to provide a more efficient implementation if possible.
Arguments:
- host (str): The hostname of the remote machine where the files reside.
- files (list[Path]): A list of source file paths on the remote host.
- moved_files (list[Path]): A list of destination file paths on the remote host.
Must be the same length as
files.
Raises:
- QQError: If the SSH command fails, the files cannot be moved or
the length of
filesdoes not match the length ofmoved_files.
355 @classmethod 356 def sync_with_exclusions( 357 cls, 358 src_dir: Path, 359 dest_dir: Path, 360 src_host: str | None, 361 dest_host: str | None, 362 exclude_files: list[Path] | None = None, 363 ) -> None: 364 PBS.sync_with_exclusions(src_dir, dest_dir, src_host, dest_host, exclude_files)
Synchronize the contents of two directories using rsync, optionally across remote hosts, while excluding specified files or subdirectories.
All files and directories in src_dir are copied to dest_dir except
those listed in exclude_files. Files are never removed from the destination.
Arguments:
- src_dir (Path): Source directory to sync from.
- dest_dir (Path): Destination directory to sync to.
- src_host (str | None): Optional hostname of the source machine if remote; None if the source is local.
- dest_host (str | None): Optional hostname of the destination machine if remote; None if the destination is local.
- exclude_files (list[Path] | None): Optional list of absolute file paths to exclude from syncing.
These will be converted to paths relative to
src_dir.
Raises:
- QQError: If the rsync command fails for any reason or timeouts.
366 @classmethod 367 def sync_selected( 368 cls, 369 src_dir: Path, 370 dest_dir: Path, 371 src_host: str | None, 372 dest_host: str | None, 373 include_files: list[Path] | None = None, 374 ) -> None: 375 PBS.sync_selected(src_dir, dest_dir, src_host, dest_host, include_files)
Synchronize only the explicitly selected files and directories from the source to the destination, optionally across remote hosts.
Only files listed in include_files are copied from src_dir to dest_dir.
Files not listed are ignored. Files are never removed from the destination.
Arguments:
- src_dir (Path): Source directory to sync from.
- dest_dir (Path): Destination directory to sync to.
- src_host (str | None): Optional hostname of the source machine if remote; None if the source is local.
- dest_host (str | None): Optional hostname of the destination machine if remote; None if the destination is local.
- include_files (list[Path] | None): Optional list of absolute file paths to include in syncing.
These paths are converted relative to
src_dir. This argument is optional only for consistency with sync_with_exclusions.
Raises:
- QQError: If the rsync command fails or times out.
377 @classmethod 378 def sort_jobs(cls, jobs: list[SlurmJob]) -> None: 379 jobs.sort(key=lambda job: job.get_ids_for_sorting())
Sort a list of batch system jobs by a defined attribute.
The default implementation sorts the jobs alphabetically by their job ID,
as returned by job.get_id(). Subclasses may override this method to
implement custom sorting logic.
Arguments:
- jobs (list[BatchJobInterface]): A list of batch job objects to be sorted in-place.
381 @classmethod 382 def jobs_presenter_columns_to_show(cls) -> set[str]: 383 return { 384 "S", 385 "Job ID", 386 "User", 387 "Job Name", 388 "Queue", 389 "NCPUs", 390 "NGPUs", 391 "NNodes", 392 "Times", 393 "Node", 394 "Exit", 395 }
Get a set of columns that should be shown in the output of JobsPresenter (qq jobs)
for this batch system.
In the default implementation, all columns are shown.
Note that the 'Exit' column is not shown when printing queued and running jobs, even if you specify it here.
Arguments:
- set[str]: Set of column titles that should be shown.