qq_lib.batch.pbs
PBS backend for qq: job submission, monitoring, and cluster-resource access.
This module implements qq's full integration with the PBS Pro batch system as configured on the Metacentrum-family clusters.
It provides:
The
PBSbatch-system backend, implementing job submission, killing, file synchronization (local and remote), work-directory handling, resource translation, dependency formatting, and scratch-directory logic.PBSJob,PBSNode, andPBSQueue, concrete implementations of qq's job/node/queue interfaces, responsible for parsing PBS command output and exposing normalized metadata to the rest of qq.
1# Released under MIT License. 2# Copyright (c) 2025-2026 Ladislav Bartos and Robert Vacha Lab 3 4""" 5PBS backend for qq: job submission, monitoring, and cluster-resource access. 6 7This module implements qq's full integration with the PBS Pro batch system 8as configured on the Metacentrum-family clusters. 9 10It provides: 11 12- The `PBS` batch-system backend, implementing job submission, killing, file 13 synchronization (local and remote), work-directory handling, resource 14 translation, dependency formatting, and scratch-directory logic. 15 16- `PBSJob`, `PBSNode`, and `PBSQueue`, concrete implementations of qq's 17 job/node/queue interfaces, responsible for parsing PBS command output and 18 exposing normalized metadata to the rest of qq. 19""" 20 21from .job import PBSJob 22from .node import PBSNode 23from .pbs import PBS 24from .queue import PBSQueue 25 26__all__ = [ 27 "PBSJob", 28 "PBSNode", 29 "PBS", 30 "PBSQueue", 31]
31class PBSJob(BatchJobInterface): 32 """ 33 Implementation of BatchJobInterface for PBS. 34 Stores metadata for a single PBS job. 35 """ 36 37 def __init__(self, job_id: str): 38 """Query the batch system for information about the job with the specified ID.""" 39 self._job_id = job_id 40 self._info: dict[str, str] = {} 41 42 self.update() 43 44 def is_empty(self) -> bool: 45 return not self._info 46 47 def get_id(self) -> str: 48 return self._job_id 49 50 def get_account(self) -> str | None: 51 return None 52 53 def update(self) -> None: 54 # get job info from PBS 55 command = f"qstat -fxw {self._job_id}" 56 57 result = subprocess.run( 58 ["bash"], 59 input=command, 60 text=True, 61 check=False, 62 capture_output=True, 63 errors="replace", 64 ) 65 66 if result.returncode != 0: 67 # if qstat fails, information is empty 68 logger.debug( 69 f"qstat failed: no information about job '{self._job_id}' is available: {result.stderr.strip()}" 70 ) 71 self._info: dict[str, str] = {} 72 else: 73 self._info = parse_pbs_dump_to_dictionary(result.stdout) 74 75 def get_state(self) -> BatchState: 76 if not (state := self._info.get("job_state")): 77 return BatchState.UNKNOWN 78 79 # X is used by PBS to indicate finished tasks in unfinished array jobs, 80 # but qq uses X to indicate failure 81 if state == "X": 82 state = "F" 83 84 # if the job is finished and the return code is not zero, return FAILED 85 if state == "F": 86 exit_code = self.get_exit_code() 87 # if exit code does not exist, the job never ran and was likely killed 88 if exit_code is None or exit_code != 0: 89 return BatchState.FAILED 90 91 return BatchState.from_code(state) 92 93 def get_comment(self) -> str | None: 94 return self._info.get("comment") 95 96 def get_estimated(self) -> tuple[datetime, str] | None: 97 if not (raw_time := self._info.get("estimated.start_time")): 98 logger.debug("No 'estimated.start_time' found.") 99 return None 100 101 try: 102 time = datetime.strptime(raw_time, CFG.date_formats.pbs) 103 # if the estimated start time is in the past, use the current time 104 if (current_time := datetime.now()) > time: 105 time = current_time 106 except Exception as e: 107 logger.debug(f"Could not parse 'estimated.start_time': {e}.") 108 return None 109 110 if not (raw_vnode := self._info.get("estimated.exec_vnode")): 111 logger.debug("No 'estimated.exec_vnode' found.") 112 return None 113 114 vnodes = [] 115 for split in raw_vnode.split("+"): 116 vnodes.append(PBSJob._clean_node_name(split.strip())) 117 118 return (time, " + ".join(vnodes)) 119 120 def get_main_node(self) -> str | None: 121 if raw_node := self._info.get("exec_host2"): 122 return PBSJob._clean_node_name(raw_node.split("+")[0].strip()) 123 124 return None 125 126 def get_nodes(self) -> list[str] | None: 127 if not (raw_nodes := self._info.get("exec_host2")): 128 return None 129 130 nodes = [] 131 for node in raw_nodes.split("+"): 132 nodes.append(PBSJob._clean_node_name(node.strip())) 133 134 return nodes 135 136 def get_short_nodes(self) -> list[str] | None: 137 if not (raw_nodes := self._info.get("exec_host")): 138 return None 139 140 nodes = [] 141 for node in raw_nodes.split("+"): 142 nodes.append(PBSJob._clean_node_name(node.strip())) 143 144 return nodes 145 146 def get_name(self) -> str | None: 147 return self._info.get("Job_Name") 148 149 def get_n_cpus(self) -> int | None: 150 return self._get_int_property("Resource_List.ncpus", "the number of CPUs") 151 152 def get_n_gpus(self) -> int | None: 153 return self._get_int_property("Resource_List.ngpus", "the number of GPUs") 154 155 def get_n_nodes(self) -> int | None: 156 return self._get_int_property("Resource_List.nodect", "the number of nodes") 157 158 def get_mem(self) -> Size | None: 159 if not (mem := self._info.get("Resource_List.mem")): 160 logger.debug( 161 f"Could not get information about the amount of memory from the batch system for '{self._job_id}'." 162 ) 163 return None 164 165 try: 166 return Size.from_string(mem) 167 except Exception as e: 168 logger.warning(f"Could not parse memory for '{self._job_id}': {e}.") 169 return None 170 171 def get_start_time(self) -> datetime | None: 172 return self._get_datetime_property("stime", "the job start time") 173 174 def get_submission_time(self) -> datetime | None: 175 return self._get_datetime_property("ctime", "the job submission time") 176 177 def get_completion_time(self) -> datetime | None: 178 return self._get_datetime_property("obittime", "the job completion time") 179 180 def get_modification_time(self) -> datetime | None: 181 return ( 182 self._get_datetime_property("mtime", "the job modification time") 183 or self.get_submission_time() 184 ) 185 186 def get_user(self) -> str | None: 187 if not (user := self._info.get("Job_Owner")): 188 return None 189 190 return user.split("@")[0] 191 192 def get_walltime(self) -> timedelta | None: 193 if not (walltime := self._info.get("Resource_List.walltime")): 194 return None 195 196 try: 197 return hhmmss_to_duration(walltime) 198 except QQError as e: 199 logger.warning(f"Could not parse walltime for '{self._job_id}': {e}.") 200 return None 201 202 def get_queue(self) -> str | None: 203 return self._info.get("queue") 204 205 def get_util_cpu(self) -> int | None: 206 if not (util_cpu := self._info.get("resources_used.cpupercent")): 207 logger.debug( 208 f"Information about CPU utilization is not available for '{self._job_id}'." 209 ) 210 return None 211 212 if not (ncpus := self.get_n_cpus()): 213 logger.debug( 214 f"Information about the number of CPUs is not available for '{self._job_id}'." 215 ) 216 return None 217 218 try: 219 # PBS report CPU utilization in the same way as `top` - we have to divide by number of CPUs 220 return int(util_cpu) // ncpus 221 except Exception as e: 222 # this catches both invalid util_cpu and invalid get_n_cpus 223 logger.warning( 224 f"Could not parse information about CPU utilization for '{self._job_id}': {e}." 225 ) 226 return None 227 228 def get_util_mem(self) -> int | None: 229 if not (util_mem := self._info.get("resources_used.mem")): 230 logger.debug( 231 f"Information about memory utilization is not available for '{self._job_id}'." 232 ) 233 return None 234 235 if not (mem := self.get_mem()): 236 logger.debug( 237 f"Information about the amount of memory is not available for '{self._job_id}'." 238 ) 239 return None 240 241 try: 242 util_mem_kb = Size.from_string(util_mem).value 243 return int(util_mem_kb / mem.value * 100.0) 244 except Exception as e: 245 logger.warning( 246 f"Could not parse information about memory utilization for '{self._job_id}': {e}." 247 ) 248 return None 249 250 def get_exit_code(self) -> int | None: 251 if not (exit := self._info.get("Exit_status")): 252 return None 253 254 try: 255 return int(exit) 256 except Exception as e: 257 logger.warning(f"Could not parse exit code for '{self._job_id}': {e}.") 258 return None 259 260 def get_input_machine(self) -> str | None: 261 return self._info.get("Submit_Host") 262 263 def get_input_dir(self) -> Path | None: 264 if not (env_vars := self._get_env_vars()): 265 logger.debug( 266 f"Could not get list of environment variables for '{self._job_id}'." 267 ) 268 return None 269 270 if not ( 271 # try qq first 272 # we always try qq first because it provides the most reliable information 273 # PBS sometimes sets the input directory to absolute path (resolving symlinks) 274 # which is not necessarily what we want 275 input_dir := env_vars.get(CFG.env_vars.input_dir) 276 or env_vars.get("PBS_O_WORKDIR") # if this fails, try PBS 277 or env_vars.get("INF_INPUT_DIR") # if this fails, try Infinity 278 ): 279 logger.debug(f"Could not obtain input directory for '{self._job_id}'.") 280 return None 281 282 return logical_resolve(Path(input_dir)) 283 284 def get_info_file(self) -> Path | None: 285 if not (env_vars := self._get_env_vars()): 286 logger.debug( 287 f"Could not get list of environment variables for '{self._job_id}'." 288 ) 289 return None 290 291 if not (info_file := env_vars.get(CFG.env_vars.info_file)): 292 logger.debug( 293 f"Job '{self._job_id}' does not have an assigned qq info file." 294 ) 295 return None 296 297 return Path(info_file) 298 299 def to_yaml(self) -> str: 300 # we need to add job id to the start of the dictionary 301 to_dump = {"Job Id": self._job_id} | self._info 302 return yaml.dump( 303 to_dump, default_flow_style=False, sort_keys=False, Dumper=Dumper 304 ) 305 306 def get_steps(self) -> Sequence[Self]: 307 # not available for PBS 308 return [] 309 310 def get_step_id(self) -> str | None: 311 # no job steps for PBS 312 return None 313 314 def is_array_job(self) -> bool: 315 return ( 316 array := self._info.get("array") 317 ) is not None and array.lower() == "true" 318 319 @classmethod 320 def from_dict(cls, job_id: str, info: dict[str, str]) -> Self: 321 """ 322 Construct a new instance of PBSJob from a job ID and a dictionary of job information. 323 324 This method bypasses the standard initializer and directly sets the `_job_id` and `_info` 325 attributes of the new instance. 326 327 Args: 328 job_id (str): The unique identifier of the job. 329 info (dict[str, str]): A dictionary containing PBS job metadata as key-value pairs. 330 331 Returns: 332 Self: A new instance of PBSJob. 333 334 Note: 335 This method does not perform any validation or processing of the provided dictionary. 336 """ 337 job_info = cls.__new__(cls) 338 job_info._job_id = job_id 339 job_info._info = info 340 341 return job_info 342 343 def get_id_int(self) -> int | None: 344 """ 345 Extract the leading numeric portion of the job ID and return it as an integer. 346 347 Returns: 348 int | None: The integer value of the leading digits in the job ID, 349 or `None` if no valid digits are found or conversion fails. 350 """ 351 match = re.match(r"\d+", self.get_id()) 352 return int(match.group()) if match else None 353 354 def _get_env_vars(self) -> dict[str, str] | None: 355 """ 356 Retrieve environment variables associated with the job. 357 358 Returns: 359 dict[str, str] | None: A dictionary of environment variables, or None 360 if no variable list is available. 361 """ 362 if not (variable_list := self._info.get("Variable_List")): 363 return None 364 365 return dict( 366 item.split("=", 1) for item in variable_list.split(",") if "=" in item 367 ) 368 369 def _get_int_property(self, property: str, property_name: str) -> int | None: 370 """ 371 Retrieve an integer property value from the job information. 372 373 If the property is missing or cannot be converted, `None` is returned. 374 375 Args: 376 property (str): The key identifying the property in the job information. 377 property_name (str): A human-readable name of the property for logging. 378 379 Returns: 380 int | None: The integer value of the property, or `None` if unavailable or invalid. 381 """ 382 try: 383 return int(self._info[property]) 384 except Exception: 385 logger.debug( 386 f"Could not get information about {property_name} from the batch system for '{self._job_id}'." 387 ) 388 return None 389 390 def _get_datetime_property( 391 self, property: str, property_name: str 392 ) -> datetime | None: 393 """ 394 Retrieve and parse a datetime property from the job information. 395 396 Args: 397 property (str): The key identifying the property in the job information. 398 property_name (str): A human-readable name of the property for logging. 399 400 Returns: 401 datetime | None: A datetime object if parsing succeeds, otherwise None. 402 """ 403 if not (raw_datetime := self._info.get(property)): 404 return None 405 406 try: 407 return datetime.strptime(raw_datetime, CFG.date_formats.pbs) 408 except Exception: 409 logger.warning( 410 f"Could not parse information about {property_name} for '{self._job_id}'." 411 ) 412 return None 413 414 @staticmethod 415 def _clean_node_name(raw: str) -> str: 416 """ 417 Normalize a raw node string to extract the clean hostname. 418 419 Args: 420 raw (str): Raw node string reported by the batch system. 421 422 Returns: 423 str: Cleaned node name. 424 """ 425 return raw.split(":", 1)[0].split("/", 1)[0].replace("(", "").replace(")", "")
Implementation of BatchJobInterface for PBS. Stores metadata for a single PBS job.
37 def __init__(self, job_id: str): 38 """Query the batch system for information about the job with the specified ID.""" 39 self._job_id = job_id 40 self._info: dict[str, str] = {} 41 42 self.update()
Query the batch system for information about the job with the specified ID.
Check whether the job contains any information. This should return True if the job does not exist in the batch system.
Returns:
bool: True if the job contains no information.
Return the account under which the job is submitted.
Returns:
str | None: Account associated with the job or None if no account is defined.
53 def update(self) -> None: 54 # get job info from PBS 55 command = f"qstat -fxw {self._job_id}" 56 57 result = subprocess.run( 58 ["bash"], 59 input=command, 60 text=True, 61 check=False, 62 capture_output=True, 63 errors="replace", 64 ) 65 66 if result.returncode != 0: 67 # if qstat fails, information is empty 68 logger.debug( 69 f"qstat failed: no information about job '{self._job_id}' is available: {result.stderr.strip()}" 70 ) 71 self._info: dict[str, str] = {} 72 else: 73 self._info = parse_pbs_dump_to_dictionary(result.stdout)
Refresh the stored job information from the batch system.
Raises:
- QQError: If the job cannot be queried or its info updated.
75 def get_state(self) -> BatchState: 76 if not (state := self._info.get("job_state")): 77 return BatchState.UNKNOWN 78 79 # X is used by PBS to indicate finished tasks in unfinished array jobs, 80 # but qq uses X to indicate failure 81 if state == "X": 82 state = "F" 83 84 # if the job is finished and the return code is not zero, return FAILED 85 if state == "F": 86 exit_code = self.get_exit_code() 87 # if exit code does not exist, the job never ran and was likely killed 88 if exit_code is None or exit_code != 0: 89 return BatchState.FAILED 90 91 return BatchState.from_code(state)
Return the current state of the job as reported by the batch system.
If the job information is no longer available, return BatchState.UNKNOWN.
Returns:
BatchState: The job state according to the batch system.
Retrieve the batch system-provided comment for the job.
Returns:
str | None: The job's comment string if available, or None if the batch system has not attached a comment.
96 def get_estimated(self) -> tuple[datetime, str] | None: 97 if not (raw_time := self._info.get("estimated.start_time")): 98 logger.debug("No 'estimated.start_time' found.") 99 return None 100 101 try: 102 time = datetime.strptime(raw_time, CFG.date_formats.pbs) 103 # if the estimated start time is in the past, use the current time 104 if (current_time := datetime.now()) > time: 105 time = current_time 106 except Exception as e: 107 logger.debug(f"Could not parse 'estimated.start_time': {e}.") 108 return None 109 110 if not (raw_vnode := self._info.get("estimated.exec_vnode")): 111 logger.debug("No 'estimated.exec_vnode' found.") 112 return None 113 114 vnodes = [] 115 for split in raw_vnode.split("+"): 116 vnodes.append(PBSJob._clean_node_name(split.strip())) 117 118 return (time, " + ".join(vnodes))
Retrieve the batch system's estimated job start time and execution node.
Returns:
tuple[datetime, str] | None: A tuple containing: - datetime: The estimated start time of the job. - str: The name of the node where the job is expected to run. Returns None if either estimate is unavailable.
120 def get_main_node(self) -> str | None: 121 if raw_node := self._info.get("exec_host2"): 122 return PBSJob._clean_node_name(raw_node.split("+")[0].strip()) 123 124 return None
Retrieve the hostname of the main execution node for the job.
Returns:
str | None: The hostname of the main execution node, or
Noneif unavailable or not applicable.
126 def get_nodes(self) -> list[str] | None: 127 if not (raw_nodes := self._info.get("exec_host2")): 128 return None 129 130 nodes = [] 131 for node in raw_nodes.split("+"): 132 nodes.append(PBSJob._clean_node_name(node.strip())) 133 134 return nodes
Retrieve the hostnames of all execution nodes allocated for the job.
Returns:
list[str] | None: A list of hostnames or node identifiers used by the job, or
Noneif node information is not available.
136 def get_short_nodes(self) -> list[str] | None: 137 if not (raw_nodes := self._info.get("exec_host")): 138 return None 139 140 nodes = [] 141 for node in raw_nodes.split("+"): 142 nodes.append(PBSJob._clean_node_name(node.strip())) 143 144 return nodes
Retrieve the short hostnames of all execution nodes allocated for the job.
Returns:
list[str] | None: A list of short hostnames used by the job, or
Noneif node information is not available.
Return the name of the job.
Returns:
str | None: The name of the submitted job or
Noneif not available.
149 def get_n_cpus(self) -> int | None: 150 return self._get_int_property("Resource_List.ncpus", "the number of CPUs")
Return the number of CPU cores allocated for the job.
Returns:
int | None: Number of CPUs allocated for the job or
Noneif not available.
152 def get_n_gpus(self) -> int | None: 153 return self._get_int_property("Resource_List.ngpus", "the number of GPUs")
Return the number of GPUs allocated for the job.
Returns:
int | None: Number of GPUs allocated for the job or
Noneif not available.
155 def get_n_nodes(self) -> int | None: 156 return self._get_int_property("Resource_List.nodect", "the number of nodes")
Return the number of compute nodes assigned to the job.
Returns:
int | None: Number of nodes used by the job or
Noneif not available.
158 def get_mem(self) -> Size | None: 159 if not (mem := self._info.get("Resource_List.mem")): 160 logger.debug( 161 f"Could not get information about the amount of memory from the batch system for '{self._job_id}'." 162 ) 163 return None 164 165 try: 166 return Size.from_string(mem) 167 except Exception as e: 168 logger.warning(f"Could not parse memory for '{self._job_id}': {e}.") 169 return None
Return the amount of memory allocated for the job.
Returns:
Size | None: Amount of memory allocated for the job or
Noneif not available.
171 def get_start_time(self) -> datetime | None: 172 return self._get_datetime_property("stime", "the job start time")
Return the timestamp when the job started execution.
Returns:
datetime | None: Time when the job began running or
Noneif the job has not yet started.
174 def get_submission_time(self) -> datetime | None: 175 return self._get_datetime_property("ctime", "the job submission time")
Return the timestamp when the job was submitted.
Returns:
datetime | None: Time when the job was submitted to the batch system or
Noneif not available.
177 def get_completion_time(self) -> datetime | None: 178 return self._get_datetime_property("obittime", "the job completion time")
Return the timestamp when the job was completed.
Returns:
datetime | None: Time when the job completed or
Noneif the job has not yet completed.
180 def get_modification_time(self) -> datetime | None: 181 return ( 182 self._get_datetime_property("mtime", "the job modification time") 183 or self.get_submission_time() 184 )
Return the timestamp at which the job was last modified.
Returns:
datetime | None: Time when the job was last modified or
Noneif the information is not available.
186 def get_user(self) -> str | None: 187 if not (user := self._info.get("Job_Owner")): 188 return None 189 190 return user.split("@")[0]
Return the username of the job owner.
Returns:
str | None: Username of the user who owns the job or
Noneif not available.
192 def get_walltime(self) -> timedelta | None: 193 if not (walltime := self._info.get("Resource_List.walltime")): 194 return None 195 196 try: 197 return hhmmss_to_duration(walltime) 198 except QQError as e: 199 logger.warning(f"Could not parse walltime for '{self._job_id}': {e}.") 200 return None
Return the walltime limit of the job.
Returns:
timedelta | None: Walltime for the job or
Noneif not available.
Return the submission queue of the job.
Returns:
str | None: The queue this job is part of or
Noneif not available.
205 def get_util_cpu(self) -> int | None: 206 if not (util_cpu := self._info.get("resources_used.cpupercent")): 207 logger.debug( 208 f"Information about CPU utilization is not available for '{self._job_id}'." 209 ) 210 return None 211 212 if not (ncpus := self.get_n_cpus()): 213 logger.debug( 214 f"Information about the number of CPUs is not available for '{self._job_id}'." 215 ) 216 return None 217 218 try: 219 # PBS report CPU utilization in the same way as `top` - we have to divide by number of CPUs 220 return int(util_cpu) // ncpus 221 except Exception as e: 222 # this catches both invalid util_cpu and invalid get_n_cpus 223 logger.warning( 224 f"Could not parse information about CPU utilization for '{self._job_id}': {e}." 225 ) 226 return None
Return the utilization of requested CPUs in percents (0-100).
Returns:
int | None: Utilization of requested CPUs or
Noneif not available.
228 def get_util_mem(self) -> int | None: 229 if not (util_mem := self._info.get("resources_used.mem")): 230 logger.debug( 231 f"Information about memory utilization is not available for '{self._job_id}'." 232 ) 233 return None 234 235 if not (mem := self.get_mem()): 236 logger.debug( 237 f"Information about the amount of memory is not available for '{self._job_id}'." 238 ) 239 return None 240 241 try: 242 util_mem_kb = Size.from_string(util_mem).value 243 return int(util_mem_kb / mem.value * 100.0) 244 except Exception as e: 245 logger.warning( 246 f"Could not parse information about memory utilization for '{self._job_id}': {e}." 247 ) 248 return None
Return the utilization of requested memory in percents (0-100).
Returns:
int | None: Utilization of requested memory or
Noneif not available.
250 def get_exit_code(self) -> int | None: 251 if not (exit := self._info.get("Exit_status")): 252 return None 253 254 try: 255 return int(exit) 256 except Exception as e: 257 logger.warning(f"Could not parse exit code for '{self._job_id}': {e}.") 258 return None
Return the exit code of the job.
Returns:
int | None: Exit code of the job or
Noneif exit code is not assigned.
Return the hostname of the submission machine.
Returns:
str | None: Hostname of the submission machine or
Noneif not available.
263 def get_input_dir(self) -> Path | None: 264 if not (env_vars := self._get_env_vars()): 265 logger.debug( 266 f"Could not get list of environment variables for '{self._job_id}'." 267 ) 268 return None 269 270 if not ( 271 # try qq first 272 # we always try qq first because it provides the most reliable information 273 # PBS sometimes sets the input directory to absolute path (resolving symlinks) 274 # which is not necessarily what we want 275 input_dir := env_vars.get(CFG.env_vars.input_dir) 276 or env_vars.get("PBS_O_WORKDIR") # if this fails, try PBS 277 or env_vars.get("INF_INPUT_DIR") # if this fails, try Infinity 278 ): 279 logger.debug(f"Could not obtain input directory for '{self._job_id}'.") 280 return None 281 282 return logical_resolve(Path(input_dir))
Return path to the directory from which the job was submitted.
Returns:
Path | None: Path to the submission directory or
Noneif not available.
284 def get_info_file(self) -> Path | None: 285 if not (env_vars := self._get_env_vars()): 286 logger.debug( 287 f"Could not get list of environment variables for '{self._job_id}'." 288 ) 289 return None 290 291 if not (info_file := env_vars.get(CFG.env_vars.info_file)): 292 logger.debug( 293 f"Job '{self._job_id}' does not have an assigned qq info file." 294 ) 295 return None 296 297 return Path(info_file)
Return path to the info file associated with this job.
Returns:
Path | None: Path to the qq info file or
Noneif this is not a qq job.
299 def to_yaml(self) -> str: 300 # we need to add job id to the start of the dictionary 301 to_dump = {"Job Id": self._job_id} | self._info 302 return yaml.dump( 303 to_dump, default_flow_style=False, sort_keys=False, Dumper=Dumper 304 )
Return all information about the job from the batch system in YAML format.
Returns:
str: YAML-formatted string of job metadata.
Return a list of steps associated with this job.
Note that job step is represented by BatchJobInterface, but may not contain all the values that a proper BatchJobInterface contains.
Returns:
Sequence[BatchJobInterface]: List of job steps. An empty list if there are none.
Return the step index if this job is a job step.
Returns:
str | None: Job step index or
Noneif this is not a job step.
314 def is_array_job(self) -> bool: 315 return ( 316 array := self._info.get("array") 317 ) is not None and array.lower() == "true"
Return True if the job is a top-level array job (not a sub-job).
Returns:
bool:
Trueif the job is a top-level array job, elseFalse.
319 @classmethod 320 def from_dict(cls, job_id: str, info: dict[str, str]) -> Self: 321 """ 322 Construct a new instance of PBSJob from a job ID and a dictionary of job information. 323 324 This method bypasses the standard initializer and directly sets the `_job_id` and `_info` 325 attributes of the new instance. 326 327 Args: 328 job_id (str): The unique identifier of the job. 329 info (dict[str, str]): A dictionary containing PBS job metadata as key-value pairs. 330 331 Returns: 332 Self: A new instance of PBSJob. 333 334 Note: 335 This method does not perform any validation or processing of the provided dictionary. 336 """ 337 job_info = cls.__new__(cls) 338 job_info._job_id = job_id 339 job_info._info = info 340 341 return job_info
Construct a new instance of PBSJob from a job ID and a dictionary of job information.
This method bypasses the standard initializer and directly sets the _job_id and _info
attributes of the new instance.
Arguments:
- job_id (str): The unique identifier of the job.
- info (dict[str, str]): A dictionary containing PBS job metadata as key-value pairs.
Returns:
Self: A new instance of PBSJob.
Note:
This method does not perform any validation or processing of the provided dictionary.
343 def get_id_int(self) -> int | None: 344 """ 345 Extract the leading numeric portion of the job ID and return it as an integer. 346 347 Returns: 348 int | None: The integer value of the leading digits in the job ID, 349 or `None` if no valid digits are found or conversion fails. 350 """ 351 match = re.match(r"\d+", self.get_id()) 352 return int(match.group()) if match else None
Extract the leading numeric portion of the job ID and return it as an integer.
Returns:
int | None: The integer value of the leading digits in the job ID, or
Noneif no valid digits are found or conversion fails.
90class PBSNode(BatchNodeInterface): 91 """ 92 Implementation of BatchNodeInterface for PBS. 93 Stores metadata for a single PBS node. 94 """ 95 96 def __init__(self, name: str, server: str | None = None): 97 self._name = name 98 self._server = server 99 self._info: dict[str, str] = {} 100 101 self.update() 102 103 def update(self) -> None: 104 # get node info from PBS 105 command = f"pbsnodes -v {self._name}" 106 if self._server: 107 command += f" -s {self._server}" 108 109 result = subprocess.run( 110 ["bash"], 111 input=command, 112 text=True, 113 check=False, 114 capture_output=True, 115 errors="replace", 116 ) 117 118 if result.returncode != 0: 119 raise QQError(f"Node '{self._name}' does not exist.") 120 121 self._info = parse_pbs_dump_to_dictionary(result.stdout) 122 123 def get_name(self) -> str: 124 return self._name 125 126 def get_n_cpus(self) -> int | None: 127 return self._get_int_resource("resources_available.ncpus") 128 129 def get_n_free_cpus(self) -> int | None: 130 return self._get_free_int_resource("ncpus") 131 132 def get_n_gpus(self) -> int | None: 133 return self._get_int_resource("resources_available.ngpus") 134 135 def get_n_free_gpus(self) -> int | None: 136 return self._get_free_int_resource("ngpus") 137 138 def get_cpu_memory(self) -> Size | None: 139 return self._get_size_resource("resources_available.mem") 140 141 def get_free_cpu_memory(self) -> Size | None: 142 return self._get_free_size_resource("mem") 143 144 def get_gpu_memory(self) -> Size | None: 145 return self._get_size_resource("resources_available.gpu_mem") 146 147 def get_free_gpu_memory(self) -> Size | None: 148 return self._get_free_size_resource("gpu_mem") 149 150 def get_local_scratch(self) -> Size | None: 151 return self._get_size_resource("resources_available.scratch_local") 152 153 def get_free_local_scratch(self) -> Size | None: 154 return self._get_free_size_resource("scratch_local") 155 156 def get_ssd_scratch(self) -> Size | None: 157 return self._get_size_resource("resources_available.scratch_ssd") 158 159 def get_free_ssd_scratch(self) -> Size | None: 160 return self._get_free_size_resource("scratch_ssd") 161 162 def get_shared_scratch(self) -> Size | None: 163 return self._get_size_resource("resources_available.scratch_shared") 164 165 def get_free_shared_scratch(self) -> Size | None: 166 return self._get_free_size_resource("scratch_shared") 167 168 def get_properties(self) -> list[str]: 169 return [ 170 key.split(".", 1)[-1] 171 for key in self._info 172 if "resources_available" in key and self._info[key] == "True" 173 ] 174 175 def is_available_to_user(self, user: str) -> bool: 176 if not (state := self._info.get("state")): 177 logger.debug(f"Could not get state information for node '{self._name}'.") 178 return False 179 180 if any( 181 disabled_state in state 182 for disabled_state in {"down", "unknown", "unresolvable", "resv-exclusive"} 183 ): 184 return False 185 186 if queue := self._info.get("queue"): 187 return QueuesAvailability.get_or_init(queue, user, self._server) 188 189 return True 190 191 @classmethod 192 def from_dict(cls, name: str, server: str | None, info: dict[str, str]) -> Self: 193 """ 194 Construct a new instance of PBSNode from node name and a dictionary of node information. 195 196 Args: 197 name (str): The unique name of the node. 198 server (str | None): Server on which the node is located. If `None`, assumes the current server. 199 info (dict[str, str]): A dictionary containing PBS node metadata as key-value pairs. 200 201 Returns: 202 Self: A new instance of PBSNode. 203 204 Note: 205 This method does not perform any validation or processing of the provided dictionary. 206 """ 207 node = cls.__new__(cls) 208 node._name = name 209 node._server = server 210 node._info = info 211 212 return node 213 214 def to_yaml(self) -> str: 215 # we need to add node name to the start of the dictionary 216 to_dump = {"Node": self._name} | self._info 217 return yaml.dump( 218 to_dump, default_flow_style=False, sort_keys=False, Dumper=Dumper 219 ) 220 221 def _get_int_resource(self, res: str) -> int | None: 222 """ 223 Retrieve an integer-valued resource from the node information. 224 225 Args: 226 res (str): The resource key to retrieve (e.g., "resources_available.ncpus"). 227 228 Returns: 229 int: The integer value of the resource, or `None` if unavailable or invalid. 230 """ 231 if not (val := self._info.get(res)): 232 return None 233 try: 234 return int(val) 235 except Exception as e: 236 logger.debug(f"Could not parse the value '{val}' of resource '{res}': {e}.") 237 return None 238 239 def _get_free_int_resource(self, res: str) -> int | None: 240 """ 241 Compute the number of free units for an integer-valued resource. 242 243 Calculates the difference between the total available (`resources_available.<res>`) 244 and the assigned (`resources_assigned.<res>`) quantities. If the computed 245 difference is negative, returns 0. If the information is not available, returns None. 246 247 Args: 248 res (str): The base resource name (e.g., "ncpus", "ngpus"). 249 250 Returns: 251 int | None: The number of unallocated (free) resource units, or None if unavailable. 252 """ 253 if not (full := self._get_int_resource(f"resources_available.{res}")): 254 return None 255 256 # if the `resources_assigned` property is missing, we assume it means that there are no resources assigned 257 assigned = self._get_int_resource(f"resources_assigned.{res}") or 0 258 259 if (diff := full - assigned) >= 0: 260 return diff 261 262 return 0 263 264 def _get_size_resource(self, res: str) -> Size | None: 265 """ 266 Retrieve a Size resource from the node information. 267 268 Args: 269 res (str): The resource key to retrieve (e.g., "resources_available.mem"). 270 271 Returns: 272 Size | None: The parsed Size, or `None` if unavailable or invalid. 273 """ 274 if not (val := self._info.get(res)): 275 return None 276 277 try: 278 return Size.from_string(val) 279 except Exception as e: 280 logger.debug(f"Could not parse the value '{val}' of resource '{res}': {e}.") 281 return None 282 283 def _get_free_size_resource(self, res: str) -> Size | None: 284 """ 285 Compute the amount of free space for a Size resource. 286 287 Calculates the difference between total available (`resources_available.<res>`) 288 and assigned (`resources_assigned.<res>`) values. If subtraction results in a negative size, 289 returns a zero-size object. If the information are not available, returns None. 290 291 Args: 292 res (str): The base resource name (e.g., "mem", "scratch_local"). 293 294 Returns: 295 Size | None: The available (free) size for the resource, or `None` if unavailable. 296 """ 297 if not (full := self._get_size_resource(f"resources_available.{res}")): 298 return None 299 300 # if the `resources_assigned` property is missing, we assume it means that there are no resources assigned 301 assigned = self._get_size_resource(f"resources_assigned.{res}") or Size(0, "kb") 302 303 try: 304 return full - assigned 305 except ValueError as e: 306 logger.debug(f"Negative free size of resource '{res}': {e}.") 307 return Size(0, "kb")
Implementation of BatchNodeInterface for PBS. Stores metadata for a single PBS node.
103 def update(self) -> None: 104 # get node info from PBS 105 command = f"pbsnodes -v {self._name}" 106 if self._server: 107 command += f" -s {self._server}" 108 109 result = subprocess.run( 110 ["bash"], 111 input=command, 112 text=True, 113 check=False, 114 capture_output=True, 115 errors="replace", 116 ) 117 118 if result.returncode != 0: 119 raise QQError(f"Node '{self._name}' does not exist.") 120 121 self._info = parse_pbs_dump_to_dictionary(result.stdout)
Refresh the stored node information from the batch system.
Raises:
- QQError: If the node cannot be queried or its info updated.
Retrieve the name of the node.
Returns:
str: The name identifying the node in the batch system.
126 def get_n_cpus(self) -> int | None: 127 return self._get_int_resource("resources_available.ncpus")
Retrieve the total number of CPU cores available on the node.
Returns:
int | None: Total CPU core count or
Noneif not available.
Retrieve the number of currently available (unallocated) CPU cores.
Returns:
int | None: Number of free CPU cores or
Noneif not available.
132 def get_n_gpus(self) -> int | None: 133 return self._get_int_resource("resources_available.ngpus")
Retrieve the total number of GPUs available on the node.
Returns:
int | None: Total GPU count or
Noneif not available..
Retrieve the number of currently available (unallocated) GPUs.
Returns:
int | None: Number of free GPUs or
Noneif not available.
138 def get_cpu_memory(self) -> Size | None: 139 return self._get_size_resource("resources_available.mem")
Retrieve the total CPU memory capacity of the node.
Returns:
Size | None: Total CPU memory available on the node or
Noneif not available.
Retrieve the currently available CPU memory.
Returns:
Size | None: Free (unused) CPU memory or
Noneif not available.
144 def get_gpu_memory(self) -> Size | None: 145 return self._get_size_resource("resources_available.gpu_mem")
Retrieve the total GPU memory capacity of the node.
Returns:
Size | None: Total GPU memory available or
Noneif not available.
147 def get_free_gpu_memory(self) -> Size | None: 148 return self._get_free_size_resource("gpu_mem")
Retrieve the currently available GPU memory.
Returns:
Size | None: Free (unused) GPU memory or
Noneif not available.
150 def get_local_scratch(self) -> Size | None: 151 return self._get_size_resource("resources_available.scratch_local")
Retrieve the total local scratch storage capacity of the node.
Returns:
Size | None: Total size of local scratch space or
Noneif not available.
153 def get_free_local_scratch(self) -> Size | None: 154 return self._get_free_size_resource("scratch_local")
Retrieve the available local scratch storage space.
Returns:
Size | None: Free local scratch space or
Noneif not available.
156 def get_ssd_scratch(self) -> Size | None: 157 return self._get_size_resource("resources_available.scratch_ssd")
Retrieve the total SSD-based scratch storage capacity.
Returns:
Size | None: Total SSD scratch capacity or
Noneif not available.
159 def get_free_ssd_scratch(self) -> Size | None: 160 return self._get_free_size_resource("scratch_ssd")
Retrieve the currently available SSD-based scratch storage space.
Returns:
Size | None: Free SSD scratch space or
Noneif not available.
168 def get_properties(self) -> list[str]: 169 return [ 170 key.split(".", 1)[-1] 171 for key in self._info 172 if "resources_available" in key and self._info[key] == "True" 173 ]
Get the list of properties or labels assigned to the node.
Returns:
list[str]: List of node property strings.
175 def is_available_to_user(self, user: str) -> bool: 176 if not (state := self._info.get("state")): 177 logger.debug(f"Could not get state information for node '{self._name}'.") 178 return False 179 180 if any( 181 disabled_state in state 182 for disabled_state in {"down", "unknown", "unresolvable", "resv-exclusive"} 183 ): 184 return False 185 186 if queue := self._info.get("queue"): 187 return QueuesAvailability.get_or_init(queue, user, self._server) 188 189 return True
Check if the node is available to the specified user.
Arguments:
- user (str): The username to check access for.
Returns:
bool: True if the node is up and schedulable, False otherwise.
191 @classmethod 192 def from_dict(cls, name: str, server: str | None, info: dict[str, str]) -> Self: 193 """ 194 Construct a new instance of PBSNode from node name and a dictionary of node information. 195 196 Args: 197 name (str): The unique name of the node. 198 server (str | None): Server on which the node is located. If `None`, assumes the current server. 199 info (dict[str, str]): A dictionary containing PBS node metadata as key-value pairs. 200 201 Returns: 202 Self: A new instance of PBSNode. 203 204 Note: 205 This method does not perform any validation or processing of the provided dictionary. 206 """ 207 node = cls.__new__(cls) 208 node._name = name 209 node._server = server 210 node._info = info 211 212 return node
Construct a new instance of PBSNode from node name and a dictionary of node information.
Arguments:
- name (str): The unique name of the node.
- server (str | None): Server on which the node is located. If
None, assumes the current server. - info (dict[str, str]): A dictionary containing PBS node metadata as key-value pairs.
Returns:
Self: A new instance of PBSNode.
Note:
This method does not perform any validation or processing of the provided dictionary.
214 def to_yaml(self) -> str: 215 # we need to add node name to the start of the dictionary 216 to_dump = {"Node": self._name} | self._info 217 return yaml.dump( 218 to_dump, default_flow_style=False, sort_keys=False, Dumper=Dumper 219 )
Return all information about the node in YAML format.
Returns:
str: YAML-formatted string of node metadata.
30class PBS(BatchInterface[PBSJob, PBSQueue, PBSNode]): 31 """ 32 Implementation of BatchInterface for PBS Pro batch system. 33 """ 34 35 # all standard scratch directory (excl. in RAM scratch) types supported by PBS 36 SUPPORTED_SCRATCHES = ["scratch_local", "scratch_ssd", "scratch_shared"] 37 38 @classmethod 39 def env_name(cls) -> str: 40 return "PBS" 41 42 @classmethod 43 def is_available(cls) -> bool: 44 return shutil.which("qsub") is not None 45 46 @classmethod 47 def get_job_id(cls) -> str | None: 48 return os.environ.get("PBS_JOBID") 49 50 @classmethod 51 def create_work_dir_on_scratch(cls, job_id: str) -> Path: 52 scratch_dir = cls._get_scratch_dir(job_id) 53 54 # create working directory inside the scratch directory allocated by the batch system 55 # we create this directory because other processes may write files 56 # into the allocated scratch directory and we do not want these files 57 # to affect the job execution or be copied back to input_dir 58 # this also simplifies deletion of the working directory 59 # (the allocated scratch dir cannot be deleted) 60 work_dir = logical_resolve(scratch_dir / CFG.pbs_options.scratch_dir_inner) 61 62 logger.debug(f"Creating working directory '{str(work_dir)}'.") 63 work_dir.mkdir(exist_ok=True) 64 65 return work_dir 66 67 @classmethod 68 def job_submit( 69 cls, 70 res: Resources, 71 queue: str, 72 script: Path, 73 job_name: str, 74 depend: list[Depend], 75 env_vars: dict[str, str], 76 account: str | None = None, 77 server: str | None = None, 78 remote_host: str | None = None, 79 ) -> str: 80 # account unused 81 _ = account 82 83 input_dir = script.parent 84 logger.debug(f"Job submission: input directory is '{str(input_dir)}'.") 85 86 cls._shared_guard(input_dir, res, env_vars, server, remote_host) 87 88 # set env vars required for Infinity modules 89 # this can be removed once Infinity stops being supported 90 env_vars.update(cls._collect_ams_env_vars()) 91 92 # if we are submitting to a different server, we need to change the AMS site 93 # this can be removed once Infinity stops being supported 94 if server: 95 cls._modify_ams_env_vars(env_vars, server) 96 97 # get the submission command 98 command = cls._translate_submit( 99 res, 100 queue, 101 server, 102 script.parent, 103 str(script), 104 job_name, 105 depend, 106 env_vars, 107 ) 108 logger.debug(command) 109 110 if not remote_host: 111 # submit the script from the current host 112 result = subprocess.run( 113 ["bash"], 114 input=command, 115 text=True, 116 check=False, 117 capture_output=True, 118 errors="replace", 119 ) 120 else: 121 # submit the script from the remote host 122 logger.debug( 123 f"Navigating to '{remote_host}' to execute the submission command '{command}'." 124 ) 125 result = subprocess.run( 126 [ 127 "ssh", 128 "-o PasswordAuthentication=no", 129 "-o GSSAPIAuthentication=yes", 130 "-o StrictHostKeyChecking=no", # allow unknown hosts 131 f"-o ConnectTimeout={CFG.timeouts.ssh}", 132 "-q", # suppress some SSH messages 133 remote_host, 134 command, 135 ], 136 capture_output=True, 137 text=True, 138 ) 139 140 if result.returncode != 0: 141 raise QQError( 142 f"Failed to submit script '{str(script)}': {result.stderr.strip()}." 143 ) 144 145 return result.stdout.strip() 146 147 @classmethod 148 def job_kill(cls, job_id: str) -> None: 149 command = cls._translate_kill(job_id) 150 logger.debug(command) 151 152 # run the kill command 153 result = subprocess.run( 154 ["bash"], 155 input=command, 156 text=True, 157 check=False, 158 capture_output=True, 159 errors="replace", 160 ) 161 162 if result.returncode != 0: 163 raise QQError(f"Failed to kill job '{job_id}': {result.stderr.strip()}.") 164 165 @classmethod 166 def job_kill_force(cls, job_id: str) -> None: 167 command = cls._translate_kill_force(job_id) 168 logger.debug(command) 169 170 # run the kill command 171 result = subprocess.run( 172 ["bash"], 173 input=command, 174 text=True, 175 check=False, 176 capture_output=True, 177 errors="replace", 178 ) 179 180 if result.returncode != 0: 181 raise QQError(f"Failed to kill job '{job_id}': {result.stderr.strip()}.") 182 183 @classmethod 184 def get_batch_job(cls, job_id: str) -> PBSJob: 185 return PBSJob(job_id) 186 187 @classmethod 188 def get_batch_jobs_from_ids(cls, job_ids: list[str]) -> list[PBSJob]: 189 if not job_ids: 190 return [] 191 192 command = f"qstat -fxw {' '.join(job_ids)}" 193 logger.debug(command) 194 return cls._get_batch_jobs_using_command( 195 command, 196 include_completed=True, 197 include_top_level_array=True, 198 ignore_exit_code=True, 199 ) 200 201 @classmethod 202 def get_unfinished_batch_jobs( 203 cls, user: str, server: str | None = None 204 ) -> list[PBSJob]: 205 command = f"qstat -fwtu {user}" 206 if server: 207 command += f" @{server}" 208 logger.debug(command) 209 return cls._get_batch_jobs_using_command( 210 command, 211 include_completed=False, 212 include_top_level_array=True, 213 ignore_exit_code=False, 214 ) 215 216 @classmethod 217 def get_batch_jobs(cls, user: str, server: str | None = None) -> list[PBSJob]: 218 command = f"qstat -fwxtu {user}" 219 if server: 220 command += f" @{server}" 221 logger.debug(command) 222 return cls._get_batch_jobs_using_command( 223 command, 224 include_completed=True, 225 include_top_level_array=True, 226 ignore_exit_code=False, 227 ) 228 229 @classmethod 230 def get_all_unfinished_batch_jobs(cls, server: str | None = None) -> list[PBSJob]: 231 command = "qstat -fwt" 232 if server: 233 command += f" @{server}" 234 logger.debug(command) 235 return cls._get_batch_jobs_using_command( 236 command, 237 include_completed=False, 238 include_top_level_array=True, 239 ignore_exit_code=False, 240 ) 241 242 @classmethod 243 def get_all_batch_jobs(cls, server: str | None = None) -> list[PBSJob]: 244 command = "qstat -fxwt" 245 if server: 246 command += f" @{server}" 247 logger.debug(command) 248 return cls._get_batch_jobs_using_command( 249 command, 250 include_completed=True, 251 include_top_level_array=True, 252 ignore_exit_code=False, 253 ) 254 255 @classmethod 256 def get_queues(cls, server: str | None = None) -> list[PBSQueue]: 257 command = "qstat -Qfw" 258 if server: 259 command += f" @{server}" 260 logger.debug(command) 261 262 result = subprocess.run( 263 ["bash"], 264 input=command, 265 text=True, 266 check=False, 267 capture_output=True, 268 errors="replace", 269 ) 270 271 if result.returncode != 0: 272 raise QQError( 273 f"Could not retrieve information about queues: {result.stderr.strip()}." 274 ) 275 276 queues = [] 277 for data, name in parse_multi_pbs_dump_to_dictionaries( 278 result.stdout.strip(), "Queue" 279 ): 280 queues.append(PBSQueue.from_dict(name, server, data)) 281 282 return queues 283 284 @classmethod 285 def get_nodes(cls, server: str | None = None) -> list[PBSNode]: 286 command = "pbsnodes -a" 287 if server: 288 command += f" -s {server}" 289 logger.debug(command) 290 291 result = subprocess.run( 292 ["bash"], 293 input=command, 294 text=True, 295 check=False, 296 capture_output=True, 297 errors="replace", 298 ) 299 300 if result.returncode != 0: 301 raise QQError( 302 f"Could not retrieve information about nodes: {result.stderr.strip()}." 303 ) 304 305 queues = [] 306 for data, name in parse_multi_pbs_dump_to_dictionaries( 307 result.stdout.strip(), None 308 ): 309 queues.append(PBSNode.from_dict(name, server, data)) 310 311 return queues 312 313 @classmethod 314 def get_supported_work_dir_types(cls) -> list[str]: 315 return cls.SUPPORTED_SCRATCHES + [ 316 "scratch_shm", 317 "input_dir", 318 "job_dir", # same as input_dir 319 ] 320 321 @classmethod 322 def read_remote_file(cls, host: str, file: Path) -> str: 323 if os.environ.get(CFG.env_vars.shared_submit): 324 # file is on shared storage, we can read it directly 325 # this assumes that this method is only used to read files in input_dir 326 logger.debug(f"Reading a file '{file}' from shared storage.") 327 try: 328 return file.read_text() 329 except Exception as e: 330 raise QQError(f"Could not read file '{file}': {e}.") from e 331 else: 332 # otherwise, we fall back to the default implementation 333 logger.debug(f"Reading a remote file '{file}' on '{host}'.") 334 return super().read_remote_file(host, file) 335 336 @classmethod 337 def write_remote_file(cls, host: str, file: Path, content: str) -> None: 338 if os.environ.get(CFG.env_vars.shared_submit): 339 # file should be written to shared storage 340 # this assumes that the method is only used to write files into input_dir 341 logger.debug(f"Writing a file '{file}' to shared storage.") 342 try: 343 file.write_text(content) 344 except Exception as e: 345 raise QQError(f"Could not write file '{file}': {e}.") from e 346 else: 347 # otherwise, we fall back to the default implementation 348 logger.debug(f"Writing a remote file '{file}' on '{host}'.") 349 super().write_remote_file(host, file, content) 350 351 @classmethod 352 def make_remote_dir(cls, host: str, directory: Path) -> None: 353 if os.environ.get(CFG.env_vars.shared_submit): 354 # assuming the directory is created in input_dir 355 logger.debug(f"Creating a directory '{directory}' on shared storage.") 356 try: 357 directory.mkdir(exist_ok=True) 358 except Exception as e: 359 raise QQError( 360 f"Could not create a directory '{directory}': {e}." 361 ) from e 362 else: 363 # otherwise we fall back to the default implementation 364 logger.debug(f"Creating a directory '{directory}' on '{host}'.") 365 super().make_remote_dir(host, directory) 366 367 @classmethod 368 def list_remote_dir(cls, host: str, directory: Path) -> list[Path]: 369 if os.environ.get(CFG.env_vars.shared_submit): 370 # assuming we are listing input_dir or another directory on shared storage 371 logger.debug(f"Listing a directory '{directory}' on shared storage.") 372 try: 373 return list(directory.iterdir()) 374 except Exception as e: 375 raise QQError(f"Could not list a directory '{directory}': {e}.") from e 376 else: 377 # otherwise we fall back to the default implementation 378 logger.debug(f"Listing a directory '{directory}' on '{host}'.") 379 return super().list_remote_dir(host, directory) 380 381 @classmethod 382 def delete_remote_dir(cls, host: str, directory: Path) -> None: 383 if host == socket.getfqdn(): 384 # directory is available on the current host 385 logger.debug(f"Deleting a directory '{directory}' on local host.") 386 try: 387 shutil.rmtree(directory) 388 except Exception as e: 389 raise QQError(f"Could not delete directory '{directory}': {e}.") from e 390 else: 391 # otherwise we fall back to the default implementation 392 logger.debug(f"Deleting a directory '{directory}' on '{host}'.") 393 return super().delete_remote_dir(host, directory) 394 395 @classmethod 396 def move_remote_files( 397 cls, host: str, files: list[Path], moved_files: list[Path] 398 ) -> None: 399 if len(files) != len(moved_files): 400 raise QQError( 401 "The provided 'files' and 'moved_files' must have the same length." 402 ) 403 404 if os.environ.get(CFG.env_vars.shared_submit): 405 # assuming we are moving files inside input_dir or another directory on shared storage 406 logger.debug( 407 f"Moving files '{files}' -> '{moved_files}' on a shared storage." 408 ) 409 for src, dst in zip(files, moved_files): 410 shutil.move(str(src), str(dst)) 411 else: 412 # otherwise we fall back to the default implementation 413 logger.debug(f"Moving files '{files}' -> '{moved_files}' on '{host}'.") 414 super().move_remote_files(host, files, moved_files) 415 416 @classmethod 417 def sync_with_exclusions( 418 cls, 419 src_dir: Path, 420 dest_dir: Path, 421 src_host: str | None, 422 dest_host: str | None, 423 exclude_files: list[Path] | None = None, 424 ) -> None: 425 cls._sync_directories( 426 src_dir, 427 dest_dir, 428 src_host, 429 dest_host, 430 exclude_files, 431 super().sync_with_exclusions, 432 ) 433 434 @classmethod 435 def sync_selected( 436 cls, 437 src_dir: Path, 438 dest_dir: Path, 439 src_host: str | None, 440 dest_host: str | None, 441 include_files: list[Path] | None = None, 442 ) -> None: 443 cls._sync_directories( 444 src_dir, 445 dest_dir, 446 src_host, 447 dest_host, 448 include_files, 449 super().sync_selected, 450 ) 451 452 @classmethod 453 def transform_resources( 454 cls, queue: str, server: str | None, provided_resources: Resources 455 ) -> Resources: 456 # default resources of the queue 457 default_queue_resources = PBSQueue(queue, server).get_default_resources() 458 # default hard-coded resources 459 default_batch_resources = cls._get_default_server_resources() 460 461 # fill in default parameters 462 resources = Resources.merge_resources( 463 provided_resources, default_queue_resources, default_batch_resources 464 ) 465 if not resources.work_dir: 466 raise QQError( 467 "Work-dir is not set after filling in default attributes. This is a bug." 468 ) 469 470 # sanity check input_dir 471 if equals_normalized(resources.work_dir, "job_dir") or equals_normalized( 472 resources.work_dir, "input_dir" 473 ): 474 # work-size should not be used with job_dir 475 if provided_resources.work_size: 476 logger.warning( 477 "Setting work-size is not supported for work-dir='job_dir' or 'input_dir'.\n" 478 "Job will run in the submission directory with unlimited capacity.\n" 479 "The work-size attribute will be ignored." 480 ) 481 482 resources.work_dir = "input_dir" 483 resources.work_size = None 484 resources.work_size_per_cpu = None 485 return resources 486 487 # scratch in RAM (https://docs.metacentrum.cz/en/docs/computing/infrastructure/scratch-storages#scratch-in-ram) 488 if equals_normalized(resources.work_dir, "scratch_shm"): 489 # work-size should not be used with scratch_shm 490 if provided_resources.work_size: 491 logger.warning( 492 "Setting work-size is not supported for work-dir='scratch_shm'.\n" 493 "Size of the in-RAM scratch is specified using the --mem property.\n" 494 "The work-size attribute will be ignored." 495 ) 496 497 resources.work_dir = "scratch_shm" 498 resources.work_size = None 499 resources.work_size_per_cpu = None 500 return resources 501 502 # if work-dir matches any of the "standard" scratches supported by PBS 503 if match := next( 504 ( 505 x 506 for x in cls.SUPPORTED_SCRATCHES 507 if equals_normalized(x, resources.work_dir) 508 ), 509 None, 510 ): 511 resources.work_dir = match 512 return resources 513 514 # unknown work-dir type 515 raise QQError( 516 f"Unknown working directory type specified: work-dir='{resources.work_dir}'. Supported types for {cls.env_name()} are: '{' '.join(cls.get_supported_work_dir_types())}'." 517 ) 518 519 @classmethod 520 def sort_jobs(cls, jobs: list[PBSJob]) -> None: 521 # jobs with invalid ID get assigned an ID of 0 for sorting => they are sorted to the start 522 # and therefore are displayed at the top in the qq jobs / qq stat output 523 jobs.sort(key=lambda job: job.get_id_int() or 0) 524 525 @classmethod 526 def _get_scratch_dir(cls, job_id: str) -> Path: 527 """ 528 Get the path to the scratch directory allocated by PBS. 529 """ 530 scratch_dir = os.environ.get(CFG.env_vars.pbs_scratch_dir) 531 if not scratch_dir: 532 raise QQError(f"Scratch directory for job '{job_id}' is undefined") 533 534 return Path(scratch_dir) 535 536 @classmethod 537 def _shared_guard( 538 cls, 539 input_dir: Path, 540 res: Resources, 541 env_vars: dict[str, str], 542 server: str | None, 543 remote_host: str | None, 544 ) -> None: 545 """ 546 Ensure correct handling of shared vs. local submission directories. 547 548 If the job's input directory is on shared storage, adds the 549 environment variable `SHARED_SUBMIT` to the list of env vars to propagate to the job. 550 This environment variable is later used e.g. to select the appropriate data copying method. 551 552 If the job is configured to use the submission directory as a working directory 553 (`work-dir=input_dir` or 'job_dir') but that directory is not shared, a `QQError` is raised. 554 555 If the job is to be submitted to a potentially non-local server, 556 but the input directory is not shared, a `QQError` is raised. 557 558 If the job is to be submitted on a remote host, 559 but the input directory is not shared, a `QQError` is raised. 560 561 Args: 562 input_dir (Path): Path to the input directory of the job. 563 res (Resources): The job's resource configuration. 564 env_vars (dict[str, str]): Dictionary of environment variables to propagate to the job. 565 server (str | None): The target PBS server, or None if submitting to the default server. 566 remote_host (str | None): Host from which the submission is performed, or None if sumitting from the current machine. 567 568 Raises: 569 QQError: If the job is set to run directly in the submission 570 directory while submission is from a non-shared filesystem. 571 QQError: If the job is set to run on a non-default server while 572 submission is from a non-shared filesystem. 573 """ 574 if cls.is_shared(input_dir): 575 env_vars[CFG.env_vars.shared_submit] = "true" 576 elif not res.uses_scratch(): 577 # if job directory is used as working directory, it must always be shared 578 raise QQError( 579 "Job was requested to run directly in the submission directory (work-dir='job_dir' or 'input_dir'), but submission is done from a local filesystem." 580 ) 581 elif server is not None: 582 # if we are submitting to a different server 583 raise QQError( 584 f"Job was requested to be submitted to server '{server}' which is potentially non-local, but the submission is done from a local filesystem." 585 ) 586 elif ( 587 remote_host is not None and socket.getfqdn(remote_host) != socket.getfqdn() 588 ): 589 # if we are submitting from a different host than the current one 590 raise QQError( 591 f"Job was requested to be submitted from host '{remote_host}', but the submission is done from a local filesystem." 592 ) 593 594 @classmethod 595 def _translate_submit( 596 cls, 597 res: Resources, 598 queue: str, 599 server: str | None, 600 input_dir: Path, 601 script: str, 602 job_name: str, 603 depend: list[Depend], 604 env_vars: dict[str, str], 605 ) -> str: 606 """ 607 Generate the PBS submission command for a job. 608 609 Args: 610 res (Resources): The resources requested for the job. 611 queue (str): The queue name to submit to. 612 server (str): Optional name of the server to submit the job to. 613 input_dir (Path): The directory from which the job is being submitted. 614 script (str): Path to the job script. 615 job_name (str): Name of the job. 616 depend (list[Depend]): List of dependencies of the job. 617 env_vars (dict[str, str]): Dictionary of environment variables to set. 618 619 Returns: 620 str: The fully constructed qsub command string. 621 """ 622 command = f"qsub -N {job_name} {cls._translate_queue_server(queue, server)} {cls._translate_output_server(input_dir, job_name, server)} " 623 624 # translate environment variables 625 if env_vars: 626 command += f"-v {cls._translate_env_vars(env_vars)} " 627 628 # handle per-chunk resources, incl. workdir 629 translated = cls._translate_per_chunk_resources(res) 630 631 # handle properties 632 if res.props: 633 translated.extend([f"{k}={v}" for k, v in res.props.items()]) 634 635 if len(translated) > 0 and res.nnodes and res.nnodes > 1: 636 # we only use the select syntax when multiple nodes are requested 637 command += f"-l select={res.nnodes}:" 638 join_char = ":" 639 else: 640 command += "-l " 641 join_char = "," 642 643 command += join_char.join(translated) + " " 644 645 # handle walltime 646 if res.walltime: 647 command += f"-l walltime={res.walltime} " 648 649 if res.nnodes and res.nnodes > 1: 650 # 'place=scatter' causes each chunk to be placed on a different node 651 command += "-l place=vscatter " 652 653 # handle dependencies 654 if converted_depend := cls._translate_dependencies(depend): 655 command += f"-W depend={converted_depend} " 656 657 # add script 658 command += str(input_dir / script) 659 660 return command 661 662 @classmethod 663 def _translate_queue_server(cls, queue: str, server: str | None) -> str: 664 """ 665 Build the PBS queue destination argument for qsub. 666 667 Constructs the `-q` string from the queue name and optional server. 668 If a server is specified, the destination is formatted as `-q queue@server`, 669 otherwise only the queue name is used. 670 671 Args: 672 queue (str): The name of the target queue. 673 server (str | None): The target PBS server, or None if submitting to the default server. 674 675 Returns: 676 str: The PBS queue destination argument, e.g. `-q queue@server` or `-q queue`. 677 """ 678 if server: 679 return f"-q {queue}@{server}" 680 681 return f"-q {queue}" 682 683 @classmethod 684 def _translate_output_server( 685 cls, input_dir: Path, job_name: str, server: str | None 686 ) -> str: 687 """ 688 Build the PBS output redirection arguments for qsub. 689 690 Constructs the `-j eo -e` string pointing to the job's `.qqout` file. 691 If a server is specified and has a configured output host, that host is 692 prepended to the path as `host:path`. Otherwise, PBS will default to 693 delivering the output file to the submitting host. 694 695 Args: 696 input_dir (Path): Directory in which the output file will be placed. 697 job_name (str): Name of the job, used to construct the output filename. 698 server (str | None): The target PBS server, or None if submitting to the default server. 699 700 Returns: 701 str: The PBS output redirection arguments. 702 """ 703 qq_output = str((input_dir / job_name).with_suffix(CFG.suffixes.qq_out)) 704 if server: 705 if output_host := CFG.batch_servers_options.known_output_hosts.get(server): 706 logger.debug( 707 f"Using output host '{output_host}' for server '{server}'." 708 ) 709 return f"-j eo -e {output_host}:{qq_output}" 710 711 logger.warning( 712 f"No output host configured for server '{server}'. " 713 "PBS will deliver qqout file to the submitting host (this desktop), " 714 "which may fail if it is not accessible from the working node." 715 ) 716 717 return f"-j eo -e {qq_output}" 718 719 @classmethod 720 def _translate_env_vars(cls, env_vars: dict[str, str]) -> str: 721 """ 722 Convert a dictionary of environment variables into a formatted string. 723 724 Args: 725 env_vars (dict[str, str]): A mapping of environment variable names 726 to their corresponding values. 727 728 Returns: 729 str: A comma-separated string of environment variable assignments, 730 suitable for inclusion in the qsub command. 731 """ 732 converted = [] 733 for key, value in env_vars.items(): 734 converted.append(f"\"{key}='{value}'\"") 735 736 return ",".join(converted) 737 738 @classmethod 739 def _translate_per_chunk_resources(cls, res: Resources) -> list[str]: 740 """ 741 Convert a Resources object into a list of per-node resource specifications. 742 743 Each resource that can be divided by the number of nodes (nnodes) is split 744 accordingly. 745 746 Args: 747 res (Resources): The resource specification for the job. 748 749 Returns: 750 list[str]: A list of per-node resource strings suitable for inclusion 751 in a PBS submission command. 752 753 Raises: 754 QQError: If sanity checks fail or required memory attributes are missing. 755 """ 756 757 trans_res = [] 758 759 # sanity checking per-chunk resources 760 if not res.nnodes: 761 raise QQError( 762 "Attribute 'nnodes' should not be undefined. This is a bug, please report it." 763 ) 764 if res.nnodes == 0: 765 raise QQError("Attribute 'nnodes' cannot be 0.") 766 767 if res.ncpus and res.ncpus != 0 and res.ncpus % res.nnodes != 0: 768 raise QQError( 769 f"Attribute 'ncpus' ({res.ncpus}) must be divisible by 'nnodes' ({res.nnodes})." 770 ) 771 if res.ngpus and res.ngpus != 0 and res.ngpus % res.nnodes != 0: 772 raise QQError( 773 f"Attribute 'ngpus' ({res.ngpus}) must be divisible by 'nnodes' ({res.nnodes})." 774 ) 775 776 # translate per-chunk resources 777 if res.ncpus: 778 trans_res.append(f"ncpus={res.ncpus // res.nnodes}") 779 # we need to specify the number of MPI processes so that mpirun uses the correct 780 # number of sockets; this does not mean that the run script has to use one MPI 781 # process per CPU core, this value can be overriden 782 trans_res.append(f"mpiprocs={res.ncpus // res.nnodes}") 783 elif res.ncpus_per_node: 784 trans_res.append(f"ncpus={res.ncpus_per_node}") 785 trans_res.append(f"mpiprocs={res.ncpus_per_node}") 786 787 if res.mem: 788 trans_res.append(f"mem={(res.mem // res.nnodes).to_str_exact()}") 789 elif res.mem_per_node: 790 trans_res.append(f"mem={res.mem_per_node.to_str_exact()}") 791 elif res.mem_per_cpu: 792 if res.ncpus: 793 trans_res.append( 794 f"mem={(res.mem_per_cpu * res.ncpus // res.nnodes).to_str_exact()}" 795 ) 796 elif res.ncpus_per_node: 797 trans_res.append( 798 f"mem={(res.mem_per_cpu * res.ncpus_per_node).to_str_exact()}" 799 ) 800 else: 801 raise QQError( 802 "Attribute 'mem-per-cpu' requires attributes 'ncpus' or 'ncpus-per-node' to be defined." 803 ) 804 else: 805 # memory not set in any way 806 raise QQError( 807 "None of the attributes 'mem', 'mem-per-node', or 'mem-per-cpu' is defined." 808 ) 809 810 if res.ngpus: 811 trans_res.append(f"ngpus={res.ngpus // res.nnodes}") 812 elif res.ngpus_per_node: 813 trans_res.append(f"ngpus={res.ngpus_per_node}") 814 815 # translate work-dir 816 if workdir := cls._translate_work_dir(res): 817 trans_res.append(workdir) 818 819 return trans_res 820 821 @classmethod 822 def _translate_work_dir(cls, res: Resources) -> str | None: 823 """ 824 Translate the working directory and its requested size into a PBS resource string. 825 826 Args: 827 res (Resources): The resources requested for the job. 828 829 Returns: 830 str | None: Resource string specifying the working directory, or None if input_dir is used. 831 """ 832 assert res.nnodes is not None 833 834 if res.work_dir == "job_dir" or res.work_dir == "input_dir": 835 return None 836 837 if res.work_dir == "scratch_shm": 838 return f"{res.work_dir}=true" 839 840 if res.work_size: 841 return f"{res.work_dir}={(res.work_size // res.nnodes).to_str_exact()}" 842 if res.work_size_per_node: 843 return f"{res.work_dir}={res.work_size_per_node.to_str_exact()}" 844 if res.work_size_per_cpu: 845 if res.ncpus: 846 return f"{res.work_dir}={(res.work_size_per_cpu * res.ncpus // res.nnodes).to_str_exact()}" 847 if res.ncpus_per_node: 848 return f"{res.work_dir}={(res.work_size_per_cpu * res.ncpus_per_node).to_str_exact()}" 849 850 raise QQError( 851 "Attribute 'work-size-per-cpu' requires attributes 'ncpus' or 'ncpus-per-node' to be defined." 852 ) 853 854 raise QQError( 855 "None of the attributes 'work-size', 'work-size-per-node', or 'work-size-per-cpu' is defined." 856 ) 857 858 @classmethod 859 def _translate_dependencies(cls, depend: list[Depend]) -> str | None: 860 """ 861 Convert a list of `Depend` objects into a PBS-compatible dependency string. 862 863 Args: 864 depend (list[Depend]): List of dependency objects to translate. 865 866 Returns: 867 str | None: PBS-style dependency string (e.g., "after:12345,afterok:1:2:3"), 868 or None if the input list is empty. 869 """ 870 if not depend: 871 return None 872 873 return ",".join(Depend.to_str(x).replace("=", ":") for x in depend) 874 875 @classmethod 876 def _collect_ams_env_vars(cls) -> dict[str, str]: 877 """ 878 Collect environment variables for Infinity AMS. 879 This allows importing Infinity AMS modules in qq jobs. 880 881 Returns: 882 dict[str, str]: Dictionary of AMS environment variables and their values. 883 """ 884 ams_vars = { 885 key: value 886 for key, value in os.environ.items() 887 if key 888 in { 889 "AMS_ACTIVE_MODULES", 890 "AMS_SITE", 891 "AMS_SITE_SUPPORT", 892 "AMS_EXIT_CODE", 893 "AMS_USER_CONFIG_DIR", 894 "AMS_GROUPNS", 895 "AMS_BUNDLE_NAME", 896 "AMS_HOST_GROUP", 897 "AMS_ROOT", 898 "AMS_ROOT_V9", 899 "AMS_BUNDLE_PATH", 900 } 901 } 902 logger.debug(f"AMS vars: {ams_vars}") 903 904 return ams_vars 905 906 @classmethod 907 def _modify_ams_env_vars(cls, env_vars: dict[str, str], server: str) -> None: 908 """ 909 Modify environment variables for Infinity AMS if the job is submitted to a different server. 910 """ 911 # bleh, seriously can't wait to get rid of having to support AMS... 912 # this is so needlessly complicated 913 914 ams_site_converter = { 915 "robox-pro.ceitec.muni.cz": "robox", 916 "sokar-pbs.ncbr.muni.cz": "sokar", 917 "pbs-m1.metacentrum.cz": "metavo24", 918 } 919 920 if server not in ams_site_converter: 921 logger.warning( 922 f"Server '{server}' is not supported by the qq-AMS translation layer. The job will not have access to AMS modules. Please report this issue." 923 ) 924 925 if "AMS_SITE" in env_vars: 926 env_vars["AMS_SITE"] = ams_site_converter[server] 927 928 ams_site_support_converter = { 929 "robox-pro.ceitec.muni.cz": "linuxsupport@ics.muni.cz", 930 "sokar-pbs.ncbr.muni.cz": "support@lcc.ncbr.muni.cz", 931 "pbs-m1.metacentrum.cz": "support@lcc.ncbr.muni.cz", 932 } 933 934 if "AMS_SITE_SUPPORT" in env_vars: 935 env_vars["AMS_SITE_SUPPORT"] = ams_site_support_converter[server] 936 937 ams_groupns_converter = { 938 "robox-pro.ceitec.muni.cz": "uvt", 939 "sokar-pbs.ncbr.muni.cz": "ncbr", 940 "pbs-m1.metacentrum.cz": "ics", 941 } 942 943 if "AMS_GROUPNS" in env_vars: 944 env_vars["AMS_GROUPNS"] = ams_groupns_converter[server] 945 946 @classmethod 947 def _get_default_server_resources(cls) -> Resources: 948 """ 949 Return a Resources object representing the default resources for a batch job. 950 951 Returns: 952 Resources: Default batch job resources with predefined settings. 953 """ 954 return Resources( 955 nnodes=1, 956 ncpus=1, 957 mem_per_cpu="1gb", 958 work_dir="scratch_local", 959 work_size_per_cpu="1gb", 960 walltime="1d", 961 ) 962 963 @classmethod 964 def _translate_kill_force(cls, job_id: str) -> str: 965 """ 966 Generate the PBS force kill command for a job. 967 968 Args: 969 job_id (str): The ID of the job to kill. 970 971 Returns: 972 str: The qdel command with force flag. 973 """ 974 return f"qdel -W force {job_id}" 975 976 @classmethod 977 def _translate_kill(cls, job_id: str) -> str: 978 """ 979 Generate the standard PBS kill command for a job. 980 981 Args: 982 job_id (str): The ID of the job to kill. 983 984 Returns: 985 str: The qdel command without force flag. 986 """ 987 return f"qdel {job_id}" 988 989 @classmethod 990 def _sync_directories( 991 cls, 992 src_dir: Path, 993 dest_dir: Path, 994 src_host: str | None, 995 dest_host: str | None, 996 files: list[Path] | None, 997 sync_function: Callable[ 998 [Path, Path, str | None, str | None, list[Path] | None], None 999 ], 1000 ) -> None: 1001 """ 1002 Synchronize directories either locally or across remote hosts, depending on the environment and setup. 1003 1004 Args: 1005 src_dir (Path): Source directory to sync from. 1006 dest_dir (Path): Destination directory to sync to. 1007 src_host (str | None): Hostname of the source machine if remote; None if local. 1008 dest_host (str | None): Hostname of the destination machine if remote; None if local. 1009 files (list[Path] | None): Optional list of file paths to include or exclude, depending on `sync_function`. 1010 sync_function (Callable): Function to perform the actual synchronization. 1011 1012 Raises: 1013 QQError: If both source and destination hosts are remote and cannot be 1014 accessed simultaneously, or if syncing fails internally. 1015 """ 1016 if os.environ.get(CFG.env_vars.shared_submit): 1017 # input_dir is on shared storage -> we can copy files from/to it without connecting to the remote host 1018 logger.debug("Syncing directories on local and shared filesystem.") 1019 sync_function(src_dir, dest_dir, None, None, files) 1020 else: 1021 # input_dir is not on shared storage -> fall back to the default implementation 1022 logger.debug("Syncing directories on local filesystems.") 1023 1024 # convert local hosts to none 1025 local_hostname = socket.getfqdn() 1026 src = None if src_host == local_hostname else src_host 1027 dest = None if dest_host == local_hostname else dest_host 1028 1029 if src is None or dest is None: 1030 sync_function(src_dir, dest_dir, src, dest, files) 1031 else: 1032 raise QQError( 1033 f"The source '{src_host}' and destination '{dest_host}' cannot be both remote." 1034 ) 1035 1036 @classmethod 1037 def _get_batch_jobs_using_command( 1038 cls, 1039 command: str, 1040 include_completed: bool, 1041 include_top_level_array: bool, 1042 ignore_exit_code: bool, 1043 ) -> list[PBSJob]: 1044 """ 1045 Execute a shell command to retrieve information about PBS jobs and parse it. 1046 1047 Args: 1048 command (str): The shell command to execute, typically a PBS query command. 1049 include_completed (bool): Include both completed and uncompleted jobs. 1050 If `False`, completed jobs are filtered out from the output. 1051 include_top_level_array (bool): Include top-level array jobs in the output. 1052 If `False`, top-level array jobs are filtered out from the output. 1053 ignore_exit_code (bool): Ignore the exit code of the command. 1054 If `False`, the command must return a zero exit code. 1055 1056 Returns: 1057 list[PBSJob]: A list of `PBSJob` instances corresponding to the jobs 1058 returned by the command. 1059 1060 Raises: 1061 QQError: If the command fails (non-zero return code) or if the output 1062 cannot be parsed into valid job information. 1063 """ 1064 ... 1065 result = subprocess.run( 1066 # -oL (line-buffer stdout), -eL (line-buffer stderr) 1067 # necessary for stdout and stderr merging 1068 ["stdbuf", "-oL", "-eL", "bash"], 1069 input=command, 1070 text=True, 1071 check=False, 1072 stdout=subprocess.PIPE, 1073 stderr=subprocess.STDOUT, 1074 errors="replace", 1075 ) 1076 1077 if not ignore_exit_code and result.returncode != 0: 1078 raise QQError( 1079 # standard error is written to stdout 1080 f"Could not retrieve information about jobs: {result.stdout.strip()}." 1081 ) 1082 1083 jobs = [] 1084 for data, job_id in parse_multi_pbs_dump_to_dictionaries( 1085 result.stdout.strip(), "Job Id" 1086 ): 1087 job = PBSJob.from_dict(job_id, data) 1088 1089 if not include_top_level_array and job.is_array_job(): 1090 continue 1091 1092 # unless all jobs are to be shown, filter out completed jobs 1093 # this is necessary because the output from PBS will always contain 1094 # completed job tasks from array jobs that are uncompleted 1095 # we do not want qq to show them 1096 if not include_completed and job.is_completed(): 1097 continue 1098 1099 jobs.append(job) 1100 1101 return jobs
Implementation of BatchInterface for PBS Pro batch system.
Return the name of the batch system environment.
Returns:
str: The batch system name.
Determine whether the batch system is available on the current host.
Implementations typically verify this by checking for the presence of required commands or other environment-specific indicators.
Returns:
bool: True if the batch system is available, False otherwise.
Get the id of the current job from the corresponding batch system's environment variable.
For this method to work, it has to be called from the inside of an active job.
Returns:
str | None: Index of the job or None if the collective variable is not set.
50 @classmethod 51 def create_work_dir_on_scratch(cls, job_id: str) -> Path: 52 scratch_dir = cls._get_scratch_dir(job_id) 53 54 # create working directory inside the scratch directory allocated by the batch system 55 # we create this directory because other processes may write files 56 # into the allocated scratch directory and we do not want these files 57 # to affect the job execution or be copied back to input_dir 58 # this also simplifies deletion of the working directory 59 # (the allocated scratch dir cannot be deleted) 60 work_dir = logical_resolve(scratch_dir / CFG.pbs_options.scratch_dir_inner) 61 62 logger.debug(f"Creating working directory '{str(work_dir)}'.") 63 work_dir.mkdir(exist_ok=True) 64 65 return work_dir
Create the working directory on scratch for the given job.
Arguments:
- job_id (int): Unique identifier of the job.
Returns:
Path: Absolute path to the working directory on scratch.
Raises:
- QQError: If the working directory could not be created.
67 @classmethod 68 def job_submit( 69 cls, 70 res: Resources, 71 queue: str, 72 script: Path, 73 job_name: str, 74 depend: list[Depend], 75 env_vars: dict[str, str], 76 account: str | None = None, 77 server: str | None = None, 78 remote_host: str | None = None, 79 ) -> str: 80 # account unused 81 _ = account 82 83 input_dir = script.parent 84 logger.debug(f"Job submission: input directory is '{str(input_dir)}'.") 85 86 cls._shared_guard(input_dir, res, env_vars, server, remote_host) 87 88 # set env vars required for Infinity modules 89 # this can be removed once Infinity stops being supported 90 env_vars.update(cls._collect_ams_env_vars()) 91 92 # if we are submitting to a different server, we need to change the AMS site 93 # this can be removed once Infinity stops being supported 94 if server: 95 cls._modify_ams_env_vars(env_vars, server) 96 97 # get the submission command 98 command = cls._translate_submit( 99 res, 100 queue, 101 server, 102 script.parent, 103 str(script), 104 job_name, 105 depend, 106 env_vars, 107 ) 108 logger.debug(command) 109 110 if not remote_host: 111 # submit the script from the current host 112 result = subprocess.run( 113 ["bash"], 114 input=command, 115 text=True, 116 check=False, 117 capture_output=True, 118 errors="replace", 119 ) 120 else: 121 # submit the script from the remote host 122 logger.debug( 123 f"Navigating to '{remote_host}' to execute the submission command '{command}'." 124 ) 125 result = subprocess.run( 126 [ 127 "ssh", 128 "-o PasswordAuthentication=no", 129 "-o GSSAPIAuthentication=yes", 130 "-o StrictHostKeyChecking=no", # allow unknown hosts 131 f"-o ConnectTimeout={CFG.timeouts.ssh}", 132 "-q", # suppress some SSH messages 133 remote_host, 134 command, 135 ], 136 capture_output=True, 137 text=True, 138 ) 139 140 if result.returncode != 0: 141 raise QQError( 142 f"Failed to submit script '{str(script)}': {result.stderr.strip()}." 143 ) 144 145 return result.stdout.strip()
Submit a job to the batch system.
Can also perform additional validation of the job's resources.
This method is NOT guaranteed to be thread-safe.
Arguments:
- res (Resources): Resources required for the job.
- queue (str): Target queue for the job submission.
- script (Path): Path to the script to execute.
- job_name (str): Name of the job to use.
- depend (list[Depend]): List of job dependencies.
- env_vars (dict[str, str]): Dictionary of environment variables to propagate to the job.
- account (str | None): Optional account name to use for the job.
- server (str | None): Optional name of the server to submit the job to.
- remote_host (str | None): Optional name of the machine to submit the job from.
Returns:
str: Unique ID of the submitted job.
Raises:
- QQError: If the job submission fails.
147 @classmethod 148 def job_kill(cls, job_id: str) -> None: 149 command = cls._translate_kill(job_id) 150 logger.debug(command) 151 152 # run the kill command 153 result = subprocess.run( 154 ["bash"], 155 input=command, 156 text=True, 157 check=False, 158 capture_output=True, 159 errors="replace", 160 ) 161 162 if result.returncode != 0: 163 raise QQError(f"Failed to kill job '{job_id}': {result.stderr.strip()}.")
Terminate a job gracefully. The job should have time for proper cleanup.
Arguments:
- job_id (str): Identifier of the job to terminate.
Raises:
- QQError: If the job could not be killed.
165 @classmethod 166 def job_kill_force(cls, job_id: str) -> None: 167 command = cls._translate_kill_force(job_id) 168 logger.debug(command) 169 170 # run the kill command 171 result = subprocess.run( 172 ["bash"], 173 input=command, 174 text=True, 175 check=False, 176 capture_output=True, 177 errors="replace", 178 ) 179 180 if result.returncode != 0: 181 raise QQError(f"Failed to kill job '{job_id}': {result.stderr.strip()}.")
Forcefully terminate a job. There may be no time for proper cleanup.
Arguments:
- job_id (str): Identifier of the job to forcefully terminate.
Raises:
- QQError: If the job could not be killed.
Retrieve information about a job from the batch system.
The returned object should be fully initialized, even if the job no longer exists or its information is unavailable.
Arguments:
- job_id (str): Identifier of the job.
Returns:
BatchJobInterface: Object containing the job's metadata and state.
187 @classmethod 188 def get_batch_jobs_from_ids(cls, job_ids: list[str]) -> list[PBSJob]: 189 if not job_ids: 190 return [] 191 192 command = f"qstat -fxw {' '.join(job_ids)}" 193 logger.debug(command) 194 return cls._get_batch_jobs_using_command( 195 command, 196 include_completed=True, 197 include_top_level_array=True, 198 ignore_exit_code=True, 199 )
Retrieve information about multiple jobs from the batch system.
Batch jobs should be returned in the same order as they appear in job_ids.
A TBatchJob object should be returned for each job id, even if the job
no longer exists or its information is unavailable.
Array jobs should NOT be expanded into their individual tasks.
The default implementation is to call get_batch_job for each job id.
This implementation may be inefficient for large numbers of job ids and
should be overriden by subclasses.
Arguments:
- job_ids (list[str]): List of job identifiers.
Returns:
list[TBatchJob]: List of TBatchJob objects, one for each job id.
201 @classmethod 202 def get_unfinished_batch_jobs( 203 cls, user: str, server: str | None = None 204 ) -> list[PBSJob]: 205 command = f"qstat -fwtu {user}" 206 if server: 207 command += f" @{server}" 208 logger.debug(command) 209 return cls._get_batch_jobs_using_command( 210 command, 211 include_completed=False, 212 include_top_level_array=True, 213 ignore_exit_code=False, 214 )
Retrieve information about all uncompleted jobs submitted by user
on the specified or default batch server.
The jobs can be returned in arbitrary order.
Arguments:
- user (str): Username for which to fetch uncompleted jobs.
- server (str | None): Optional name of the batch server to get jobs from.
Returns:
list[BatchJobInterface]: A list of job info objects representing the user's uncompleted jobs.
216 @classmethod 217 def get_batch_jobs(cls, user: str, server: str | None = None) -> list[PBSJob]: 218 command = f"qstat -fwxtu {user}" 219 if server: 220 command += f" @{server}" 221 logger.debug(command) 222 return cls._get_batch_jobs_using_command( 223 command, 224 include_completed=True, 225 include_top_level_array=True, 226 ignore_exit_code=False, 227 )
Retrieve information about all jobs submitted by a specific user (including finished jobs) on the specified or default batch server.
The jobs can be returned in arbitrary order.
Arguments:
- user (str): Username for which to fetch all jobs.
- server (str | None): Optional name of the batch server to get jobs from.
Returns:
list[BatchJobInterface]: A list of job info objects representing all jobs of the user.
229 @classmethod 230 def get_all_unfinished_batch_jobs(cls, server: str | None = None) -> list[PBSJob]: 231 command = "qstat -fwt" 232 if server: 233 command += f" @{server}" 234 logger.debug(command) 235 return cls._get_batch_jobs_using_command( 236 command, 237 include_completed=False, 238 include_top_level_array=True, 239 ignore_exit_code=False, 240 )
Retrieve information about uncompleted jobs of all users on the specified or default batch server.
The jobs can be returned in arbitrary order.
Arguments:
- server (str | None): Optional name of the batch server to get jobs from.
Returns:
list[BatchJobInterface]: A list of job info objects representing uncompleted jobs of all users.
242 @classmethod 243 def get_all_batch_jobs(cls, server: str | None = None) -> list[PBSJob]: 244 command = "qstat -fxwt" 245 if server: 246 command += f" @{server}" 247 logger.debug(command) 248 return cls._get_batch_jobs_using_command( 249 command, 250 include_completed=True, 251 include_top_level_array=True, 252 ignore_exit_code=False, 253 )
Retrieve information about all jobs of all users on the specified or default batch server.
The jobs can be returned in arbitrary order.
Arguments:
- server (str | None): Optional name of the batch server to get jobs from.
Returns:
list[BatchJobInterface]: A list of job info objects representing all jobs of all users.
255 @classmethod 256 def get_queues(cls, server: str | None = None) -> list[PBSQueue]: 257 command = "qstat -Qfw" 258 if server: 259 command += f" @{server}" 260 logger.debug(command) 261 262 result = subprocess.run( 263 ["bash"], 264 input=command, 265 text=True, 266 check=False, 267 capture_output=True, 268 errors="replace", 269 ) 270 271 if result.returncode != 0: 272 raise QQError( 273 f"Could not retrieve information about queues: {result.stderr.strip()}." 274 ) 275 276 queues = [] 277 for data, name in parse_multi_pbs_dump_to_dictionaries( 278 result.stdout.strip(), "Queue" 279 ): 280 queues.append(PBSQueue.from_dict(name, server, data)) 281 282 return queues
Retrieve all queues managed by the batch system on the specified or default batch server.
Arguments:
- server (str | None): Optional name of the batch server to get queues from.
Returns:
list[BatchQueueInterface]: A list of queue objects existing in the batch system.
284 @classmethod 285 def get_nodes(cls, server: str | None = None) -> list[PBSNode]: 286 command = "pbsnodes -a" 287 if server: 288 command += f" -s {server}" 289 logger.debug(command) 290 291 result = subprocess.run( 292 ["bash"], 293 input=command, 294 text=True, 295 check=False, 296 capture_output=True, 297 errors="replace", 298 ) 299 300 if result.returncode != 0: 301 raise QQError( 302 f"Could not retrieve information about nodes: {result.stderr.strip()}." 303 ) 304 305 queues = [] 306 for data, name in parse_multi_pbs_dump_to_dictionaries( 307 result.stdout.strip(), None 308 ): 309 queues.append(PBSNode.from_dict(name, server, data)) 310 311 return queues
Retrieve all nodes managed by the batch system on the specified or default batch server.
Arguments:
- server (str | None): Optional name of the batch server to get nodes from.
Returns:
list[BatchNodeInterface]: A list of node objects existing in the batch system.
313 @classmethod 314 def get_supported_work_dir_types(cls) -> list[str]: 315 return cls.SUPPORTED_SCRATCHES + [ 316 "scratch_shm", 317 "input_dir", 318 "job_dir", # same as input_dir 319 ]
Retrieve the list of supported types of working directories
(i.e., strings that can be used with the --work-dir option).
Returns:
list[str]: A list of supported types of working directories.
321 @classmethod 322 def read_remote_file(cls, host: str, file: Path) -> str: 323 if os.environ.get(CFG.env_vars.shared_submit): 324 # file is on shared storage, we can read it directly 325 # this assumes that this method is only used to read files in input_dir 326 logger.debug(f"Reading a file '{file}' from shared storage.") 327 try: 328 return file.read_text() 329 except Exception as e: 330 raise QQError(f"Could not read file '{file}': {e}.") from e 331 else: 332 # otherwise, we fall back to the default implementation 333 logger.debug(f"Reading a remote file '{file}' on '{host}'.") 334 return super().read_remote_file(host, file)
Read the contents of a file on a remote host and return it as a string.
The default implementation uses SSH to retrieve the file contents.
This approach may be inefficient on shared storage or high-latency networks.
Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.
Subclasses should override this method to provide a more efficient implementation if possible.
Arguments:
- host (str): The hostname of the remote machine where the file resides.
- file (Path): The path to the file on the remote host.
Returns:
str: The contents of the remote file.
Raises:
- QQError: If the file cannot be read or SSH fails.
336 @classmethod 337 def write_remote_file(cls, host: str, file: Path, content: str) -> None: 338 if os.environ.get(CFG.env_vars.shared_submit): 339 # file should be written to shared storage 340 # this assumes that the method is only used to write files into input_dir 341 logger.debug(f"Writing a file '{file}' to shared storage.") 342 try: 343 file.write_text(content) 344 except Exception as e: 345 raise QQError(f"Could not write file '{file}': {e}.") from e 346 else: 347 # otherwise, we fall back to the default implementation 348 logger.debug(f"Writing a remote file '{file}' on '{host}'.") 349 super().write_remote_file(host, file, content)
Write the given content to a file on a remote host, overwriting it if it exists.
The default implementation uses SSH to send the content to the remote file.
This approach may be inefficient on shared storage or high-latency networks.
Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.
Subclasses should override this method to provide a more efficient implementation if possible.
Arguments:
- host (str): The hostname of the remote machine where the file resides.
- file (Path): The path to the file on the remote host.
- content (str): The content to write to the remote file.
Raises:
- QQError: If the file cannot be written or SSH fails.
351 @classmethod 352 def make_remote_dir(cls, host: str, directory: Path) -> None: 353 if os.environ.get(CFG.env_vars.shared_submit): 354 # assuming the directory is created in input_dir 355 logger.debug(f"Creating a directory '{directory}' on shared storage.") 356 try: 357 directory.mkdir(exist_ok=True) 358 except Exception as e: 359 raise QQError( 360 f"Could not create a directory '{directory}': {e}." 361 ) from e 362 else: 363 # otherwise we fall back to the default implementation 364 logger.debug(f"Creating a directory '{directory}' on '{host}'.") 365 super().make_remote_dir(host, directory)
Create a directory at the specified path on a remote host.
The default implementation uses SSH to run mkdir on the remote host.
This approach may be inefficient on shared storage or high-latency networks.
Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.
Subclasses should override this method to provide a more efficient implementation if possible.
Arguments:
- host (str): The hostname of the remote machine where the directory should be created.
- directory (Path): The path of the directory to create on the remote host.
Raises:
- QQError: If the directory cannot be created but does not already exist or the SSH command fails.
367 @classmethod 368 def list_remote_dir(cls, host: str, directory: Path) -> list[Path]: 369 if os.environ.get(CFG.env_vars.shared_submit): 370 # assuming we are listing input_dir or another directory on shared storage 371 logger.debug(f"Listing a directory '{directory}' on shared storage.") 372 try: 373 return list(directory.iterdir()) 374 except Exception as e: 375 raise QQError(f"Could not list a directory '{directory}': {e}.") from e 376 else: 377 # otherwise we fall back to the default implementation 378 logger.debug(f"Listing a directory '{directory}' on '{host}'.") 379 return super().list_remote_dir(host, directory)
List all files and directories (absolute paths) in the specified directory on a remote host.
The default implementation uses SSH to run ls -A on the remote host.
This approach may be inefficient on shared storage or high-latency networks.
Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.
Subclasses should override this method to provide a more efficient implementation if possible.
Arguments:
- host (str): The hostname of the remote machine where the directory resides.
- directory (Path): The remote directory to list.
Returns:
list[Path]: A list of
Pathobjects representing the entries inside the directory. Entries are relative to the givendirectory.
Raises:
- QQError: If the directory cannot be listed or the SSH command fails.
381 @classmethod 382 def delete_remote_dir(cls, host: str, directory: Path) -> None: 383 if host == socket.getfqdn(): 384 # directory is available on the current host 385 logger.debug(f"Deleting a directory '{directory}' on local host.") 386 try: 387 shutil.rmtree(directory) 388 except Exception as e: 389 raise QQError(f"Could not delete directory '{directory}': {e}.") from e 390 else: 391 # otherwise we fall back to the default implementation 392 logger.debug(f"Deleting a directory '{directory}' on '{host}'.") 393 return super().delete_remote_dir(host, directory)
Delete a directory on a remote host.
The default implementation uses SSH to run rm -r on the remote host.
This approach may be inefficient on shared storage or high-latency networks.
Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.
Subclasses should override this method to provide a more efficient implementation if possible.
Arguments:
- host (str): The hostname of the remote machine where the directory resides.
- directory (Path): The remote directory to delete.
Raises:
- QQError: If the directory cannot be deleted or the SSH command fails.
395 @classmethod 396 def move_remote_files( 397 cls, host: str, files: list[Path], moved_files: list[Path] 398 ) -> None: 399 if len(files) != len(moved_files): 400 raise QQError( 401 "The provided 'files' and 'moved_files' must have the same length." 402 ) 403 404 if os.environ.get(CFG.env_vars.shared_submit): 405 # assuming we are moving files inside input_dir or another directory on shared storage 406 logger.debug( 407 f"Moving files '{files}' -> '{moved_files}' on a shared storage." 408 ) 409 for src, dst in zip(files, moved_files): 410 shutil.move(str(src), str(dst)) 411 else: 412 # otherwise we fall back to the default implementation 413 logger.debug(f"Moving files '{files}' -> '{moved_files}' on '{host}'.") 414 super().move_remote_files(host, files, moved_files)
Move files on a remote host from their current paths to new paths.
The default implementation uses SSH to run a sequence of mv commands on the remote host.
This approach may be inefficient on shared storage or high-latency networks.
Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.
Subclasses should override this method to provide a more efficient implementation if possible.
Arguments:
- host (str): The hostname of the remote machine where the files reside.
- files (list[Path]): A list of source file paths on the remote host.
- moved_files (list[Path]): A list of destination file paths on the remote host.
Must be the same length as
files.
Raises:
- QQError: If the SSH command fails, the files cannot be moved or
the length of
filesdoes not match the length ofmoved_files.
416 @classmethod 417 def sync_with_exclusions( 418 cls, 419 src_dir: Path, 420 dest_dir: Path, 421 src_host: str | None, 422 dest_host: str | None, 423 exclude_files: list[Path] | None = None, 424 ) -> None: 425 cls._sync_directories( 426 src_dir, 427 dest_dir, 428 src_host, 429 dest_host, 430 exclude_files, 431 super().sync_with_exclusions, 432 )
Synchronize the contents of two directories using rsync, optionally across remote hosts, while excluding specified files or subdirectories.
All files and directories in src_dir are copied to dest_dir except
those listed in exclude_files. Files are never removed from the destination.
Arguments:
- src_dir (Path): Source directory to sync from.
- dest_dir (Path): Destination directory to sync to.
- src_host (str | None): Optional hostname of the source machine if remote; None if the source is local.
- dest_host (str | None): Optional hostname of the destination machine if remote; None if the destination is local.
- exclude_files (list[Path] | None): Optional list of absolute file paths to exclude from syncing.
These will be converted to paths relative to
src_dir.
Raises:
- QQError: If the rsync command fails for any reason or timeouts.
434 @classmethod 435 def sync_selected( 436 cls, 437 src_dir: Path, 438 dest_dir: Path, 439 src_host: str | None, 440 dest_host: str | None, 441 include_files: list[Path] | None = None, 442 ) -> None: 443 cls._sync_directories( 444 src_dir, 445 dest_dir, 446 src_host, 447 dest_host, 448 include_files, 449 super().sync_selected, 450 )
Synchronize only the explicitly selected files and directories from the source to the destination, optionally across remote hosts.
Only files listed in include_files are copied from src_dir to dest_dir.
Files not listed are ignored. Files are never removed from the destination.
Arguments:
- src_dir (Path): Source directory to sync from.
- dest_dir (Path): Destination directory to sync to.
- src_host (str | None): Optional hostname of the source machine if remote; None if the source is local.
- dest_host (str | None): Optional hostname of the destination machine if remote; None if the destination is local.
- include_files (list[Path] | None): Optional list of absolute file paths to include in syncing.
These paths are converted relative to
src_dir. This argument is optional only for consistency with sync_with_exclusions.
Raises:
- QQError: If the rsync command fails or times out.
452 @classmethod 453 def transform_resources( 454 cls, queue: str, server: str | None, provided_resources: Resources 455 ) -> Resources: 456 # default resources of the queue 457 default_queue_resources = PBSQueue(queue, server).get_default_resources() 458 # default hard-coded resources 459 default_batch_resources = cls._get_default_server_resources() 460 461 # fill in default parameters 462 resources = Resources.merge_resources( 463 provided_resources, default_queue_resources, default_batch_resources 464 ) 465 if not resources.work_dir: 466 raise QQError( 467 "Work-dir is not set after filling in default attributes. This is a bug." 468 ) 469 470 # sanity check input_dir 471 if equals_normalized(resources.work_dir, "job_dir") or equals_normalized( 472 resources.work_dir, "input_dir" 473 ): 474 # work-size should not be used with job_dir 475 if provided_resources.work_size: 476 logger.warning( 477 "Setting work-size is not supported for work-dir='job_dir' or 'input_dir'.\n" 478 "Job will run in the submission directory with unlimited capacity.\n" 479 "The work-size attribute will be ignored." 480 ) 481 482 resources.work_dir = "input_dir" 483 resources.work_size = None 484 resources.work_size_per_cpu = None 485 return resources 486 487 # scratch in RAM (https://docs.metacentrum.cz/en/docs/computing/infrastructure/scratch-storages#scratch-in-ram) 488 if equals_normalized(resources.work_dir, "scratch_shm"): 489 # work-size should not be used with scratch_shm 490 if provided_resources.work_size: 491 logger.warning( 492 "Setting work-size is not supported for work-dir='scratch_shm'.\n" 493 "Size of the in-RAM scratch is specified using the --mem property.\n" 494 "The work-size attribute will be ignored." 495 ) 496 497 resources.work_dir = "scratch_shm" 498 resources.work_size = None 499 resources.work_size_per_cpu = None 500 return resources 501 502 # if work-dir matches any of the "standard" scratches supported by PBS 503 if match := next( 504 ( 505 x 506 for x in cls.SUPPORTED_SCRATCHES 507 if equals_normalized(x, resources.work_dir) 508 ), 509 None, 510 ): 511 resources.work_dir = match 512 return resources 513 514 # unknown work-dir type 515 raise QQError( 516 f"Unknown working directory type specified: work-dir='{resources.work_dir}'. Supported types for {cls.env_name()} are: '{' '.join(cls.get_supported_work_dir_types())}'." 517 )
Transform user-provided Resources into a batch system-specific Resources instance.
This method takes the resources provided during submission and returns a new
Resources object with any necessary modifications or defaults applied for
the target batch system. The original provided_resources object is not modified.
Arguments:
- queue (str): The name of the queue for which the resources are being adapted.
- server (str | None): Name of the server on which the queue is located.
If
None, the queue is treated as being located on the current server. - provided_resources (Resources): The raw resources specified by the user.
Returns:
Resources: A new Resources instance with batch system-specific adjustments, fully constructed and validated.
Raises:
- QQError: If any of the provided parameters are invalid or inconsistent.
519 @classmethod 520 def sort_jobs(cls, jobs: list[PBSJob]) -> None: 521 # jobs with invalid ID get assigned an ID of 0 for sorting => they are sorted to the start 522 # and therefore are displayed at the top in the qq jobs / qq stat output 523 jobs.sort(key=lambda job: job.get_id_int() or 0)
Sort a list of batch system jobs by a defined attribute.
The default implementation sorts the jobs alphabetically by their job ID,
as returned by job.get_id(). Subclasses may override this method to
implement custom sorting logic.
Arguments:
- jobs (list[BatchJobInterface]): A list of batch job objects to be sorted in-place.
84class PBSQueue(BatchQueueInterface): 85 """ 86 Implementation of BatchQueueInterface for PBS. 87 Stores metadata for a single PBS queue. 88 """ 89 90 def __init__(self, name: str, server: str | None = None): 91 self._name = name 92 self._server = server 93 self._info: dict[str, str] = {} 94 95 self.update() 96 97 def update(self) -> None: 98 # get queue info from PBS 99 command = f"qstat -Qfw {self._name}" 100 if self._server: 101 command += f"@{self._server}" 102 103 result = subprocess.run( 104 ["bash"], 105 input=command, 106 text=True, 107 check=False, 108 capture_output=True, 109 errors="replace", 110 ) 111 112 if result.returncode != 0: 113 raise QQError(f"Queue '{self._name}' does not exist.") 114 115 self._info = parse_pbs_dump_to_dictionary(result.stdout) 116 self._set_attributes() 117 118 def get_name(self) -> str: 119 return self._name 120 121 def get_priority(self) -> str | None: 122 return self._info.get("Priority") 123 124 def get_total_jobs(self) -> int | None: 125 return PBSQueue._get_int_value(self._info, "total_jobs") 126 127 def get_running_jobs(self) -> int | None: 128 return PBSQueue._get_int_value(self._job_numbers, "Running") 129 130 def get_queued_jobs(self) -> int | None: 131 # we count held and waiting jobs as queued for consistency with Slurm 132 return ( 133 (PBSQueue._get_int_value(self._job_numbers, "Queued") or 0) 134 + (PBSQueue._get_int_value(self._job_numbers, "Held") or 0) 135 + (PBSQueue._get_int_value(self._job_numbers, "Waiting") or 0) 136 ) 137 138 def get_other_jobs(self) -> int | None: 139 return ( 140 (PBSQueue._get_int_value(self._job_numbers, "Transit") or 0) 141 + (PBSQueue._get_int_value(self._job_numbers, "Exiting") or 0) 142 + (PBSQueue._get_int_value(self._job_numbers, "Begun") or 0) 143 ) 144 145 def get_max_walltime(self) -> timedelta | None: 146 if raw_time := self._info.get("resources_max.walltime"): 147 return hhmmss_to_duration(raw_time) 148 149 return None 150 151 def get_max_n_nodes(self) -> int | None: 152 return PBSQueue._get_int_value(self._info, "resources_max.nodect") 153 154 def get_comment(self) -> str | None: 155 if not (raw_comment := self._info.get("comment")): 156 return None 157 158 return raw_comment.split("|", 1)[0] 159 160 def is_available_to_user(self, user: str) -> bool: 161 # queues that are not enabled or not started are unavailable to all users 162 if self._info.get("enabled") != "True" or self._info.get("started") != "True": 163 return False 164 165 # check acl users 166 if self._info.get("acl_user_enable") == "True": 167 acl_users = self._acl_users 168 if user not in acl_users: 169 return False 170 171 # check acl groups 172 if self._info.get("acl_group_enable") == "True": 173 expected_acl_groups = self._acl_groups 174 users_acl_groups = ACLData.get_groups_or_init(user) 175 if not any(item in expected_acl_groups for item in users_acl_groups): 176 return False 177 178 # check acl hosts 179 if (host := self._info.get("acl_host_enable")) == "True": 180 acl_hosts = self._acl_hosts 181 host = ACLData.get_host_or_init() 182 if host not in acl_hosts: 183 return False 184 185 return True 186 187 def get_destinations(self) -> list[str]: 188 if raw_destinations := self._info.get("route_destinations"): 189 return raw_destinations.split(",") 190 191 return [] 192 193 def from_route_only(self) -> bool: 194 return self._info.get("from_route_only") == "True" 195 196 def to_yaml(self) -> str: 197 # we need to add queue name to the start of the dictionary 198 to_dump = {"Queue": self._name} | self._info 199 return yaml.dump( 200 to_dump, default_flow_style=False, sort_keys=False, Dumper=Dumper 201 ) 202 203 def get_default_resources(self) -> Resources: 204 default_resources = {} 205 206 for key, value in self._info.items(): 207 if "resources_default" in key: 208 resource = key.split(".")[-1] 209 default_resources[resource.strip()] = value.strip() 210 211 # filter resources that are part of Resources 212 field_names = {f.name for f in fields(Resources)} 213 return Resources( 214 **{k: v for k, v in default_resources.items() if k in field_names} 215 ) 216 217 @staticmethod 218 def _get_int_value(dict: dict[str, str], key: str) -> int | None: 219 """ 220 Retrieve an integer value from the provided dictionary. 221 222 Args: 223 key (str): The key to look up in the dictionary. 224 225 Returns: 226 int | None: The integer value if conversion succeeds, otherwise `None`. 227 """ 228 if not (raw := dict.get(key)): 229 return None 230 231 try: 232 return int(raw) 233 except ValueError as e: 234 logger.debug( 235 f"Could not parse '{key}' value of '{raw}' as an integer: {e}." 236 ) 237 return None 238 239 def _set_attributes(self) -> None: 240 """ 241 Initialize derived queue attributes to avoid redundant parsing. 242 """ 243 self._set_job_numbers() 244 self._acl_users = self._info.get("acl_users", "").split(",") 245 self._acl_groups = self._info.get("acl_groups", "").split(",") 246 self._acl_hosts = self._info.get("acl_hosts", "").split(",") 247 248 @classmethod 249 def from_dict(cls, name: str, server: str | None, info: dict[str, str]) -> Self: 250 """ 251 Construct a new instance of PBSQueue from a queue name and a dictionary of queue information. 252 253 254 Args: 255 name (str): The unique name of the queue. 256 server (str | None): Server on which the queue is located. If `None`, assumes the current server. 257 info (dict[str, str]): A dictionary containing PBS queue metadata as key-value pairs. 258 259 Returns: 260 Self: A new instance of PBSQueue. 261 262 Note: 263 This method does not perform any validation or processing of the provided dictionary. 264 """ 265 queue = cls.__new__(cls) 266 queue._name = name 267 queue._server = server 268 queue._info = info 269 queue._set_attributes() 270 271 return queue 272 273 def _set_job_numbers(self) -> None: 274 """ 275 Parse and store job counts by state from the 'state_count' field. 276 277 If parsing fails or the field is missing, `_job_numbers` is set to an empty dictionary. 278 """ 279 if not (state_count := self._info.get("state_count")): 280 self._job_numbers: dict[str, str] = {} 281 return 282 283 try: 284 self._job_numbers = dict(p.split(":") for p in state_count.split()) 285 except Exception as e: 286 logger.warning(f"Could not get job counts for queue '{self._name}': {e}.") 287 self._job_numbers: dict[str, str] = {}
Implementation of BatchQueueInterface for PBS. Stores metadata for a single PBS queue.
97 def update(self) -> None: 98 # get queue info from PBS 99 command = f"qstat -Qfw {self._name}" 100 if self._server: 101 command += f"@{self._server}" 102 103 result = subprocess.run( 104 ["bash"], 105 input=command, 106 text=True, 107 check=False, 108 capture_output=True, 109 errors="replace", 110 ) 111 112 if result.returncode != 0: 113 raise QQError(f"Queue '{self._name}' does not exist.") 114 115 self._info = parse_pbs_dump_to_dictionary(result.stdout) 116 self._set_attributes()
Refresh the stored queue information from the batch system.
Raises:
- QQError: If the queue cannot be queried or its info updated.
Retrieve the name of the queue.
Returns:
str: The name identifying this queue in the batch system.
Retrieve the scheduling priority of the queue.
Returns:
str | None: The queue priority, or None if priority information is not available.
124 def get_total_jobs(self) -> int | None: 125 return PBSQueue._get_int_value(self._info, "total_jobs")
Retrieve the total number of jobs currently in the queue.
Returns:
int | None: The total count of jobs, regardless of status or
Noneif the information is not available.
127 def get_running_jobs(self) -> int | None: 128 return PBSQueue._get_int_value(self._job_numbers, "Running")
Retrieve the number of jobs currently running in the queue.
Returns:
int | None: The number of running jobs or
Noneif the information is not available.
130 def get_queued_jobs(self) -> int | None: 131 # we count held and waiting jobs as queued for consistency with Slurm 132 return ( 133 (PBSQueue._get_int_value(self._job_numbers, "Queued") or 0) 134 + (PBSQueue._get_int_value(self._job_numbers, "Held") or 0) 135 + (PBSQueue._get_int_value(self._job_numbers, "Waiting") or 0) 136 )
Retrieve the number of jobs waiting to start in the queue.
Returns:
int | None: The number of queued jobs or
Noneif the information is not available.
138 def get_other_jobs(self) -> int | None: 139 return ( 140 (PBSQueue._get_int_value(self._job_numbers, "Transit") or 0) 141 + (PBSQueue._get_int_value(self._job_numbers, "Exiting") or 0) 142 + (PBSQueue._get_int_value(self._job_numbers, "Begun") or 0) 143 )
Retrieve the number of jobs in other states (non-running and non-queued).
Returns:
int | None: The number of jobs that are neither running nor queued, such as exiting or suspended jobs. Returns
Noneif the information is not available.
145 def get_max_walltime(self) -> timedelta | None: 146 if raw_time := self._info.get("resources_max.walltime"): 147 return hhmmss_to_duration(raw_time) 148 149 return None
Retrieve the maximum walltime allowed for jobs in the queue.
Returns:
timedelta | None: The walltime limit, or None if unlimited or unknown.
151 def get_max_n_nodes(self) -> int | None: 152 return PBSQueue._get_int_value(self._info, "resources_max.nodect")
Retrieve the maximum number of nodes that can be requested in the queue.
Returns:
int | None: The maximum number of nodes that can be requested, or None if unlimited or unknown.
154 def get_comment(self) -> str | None: 155 if not (raw_comment := self._info.get("comment")): 156 return None 157 158 return raw_comment.split("|", 1)[0]
Retrieve the comment or description associated with the queue.
Returns:
str | None: The human-readable comment or note about the queue or
Noneif the information is not available.
160 def is_available_to_user(self, user: str) -> bool: 161 # queues that are not enabled or not started are unavailable to all users 162 if self._info.get("enabled") != "True" or self._info.get("started") != "True": 163 return False 164 165 # check acl users 166 if self._info.get("acl_user_enable") == "True": 167 acl_users = self._acl_users 168 if user not in acl_users: 169 return False 170 171 # check acl groups 172 if self._info.get("acl_group_enable") == "True": 173 expected_acl_groups = self._acl_groups 174 users_acl_groups = ACLData.get_groups_or_init(user) 175 if not any(item in expected_acl_groups for item in users_acl_groups): 176 return False 177 178 # check acl hosts 179 if (host := self._info.get("acl_host_enable")) == "True": 180 acl_hosts = self._acl_hosts 181 host = ACLData.get_host_or_init() 182 if host not in acl_hosts: 183 return False 184 185 return True
Check whether the specified user has access to this queue.
Arguments:
- user (str): The username to check access for.
Returns:
bool: True if the user can submit jobs to this queue, False otherwise.
187 def get_destinations(self) -> list[str]: 188 if raw_destinations := self._info.get("route_destinations"): 189 return raw_destinations.split(",") 190 191 return []
Retrieve all destinations available for this queue route.
Returns:
list[str]: A list of destination queue names associated with the queue.
Determine whether this queue can only be accessed via a route.
Returns:
bool: True if the queue is accessible exclusively through a route, False otherwise.
196 def to_yaml(self) -> str: 197 # we need to add queue name to the start of the dictionary 198 to_dump = {"Queue": self._name} | self._info 199 return yaml.dump( 200 to_dump, default_flow_style=False, sort_keys=False, Dumper=Dumper 201 )
Return all information about the queue from the batch system in YAML format.
Returns:
str: YAML-formatted string of queue metadata.
203 def get_default_resources(self) -> Resources: 204 default_resources = {} 205 206 for key, value in self._info.items(): 207 if "resources_default" in key: 208 resource = key.split(".")[-1] 209 default_resources[resource.strip()] = value.strip() 210 211 # filter resources that are part of Resources 212 field_names = {f.name for f in fields(Resources)} 213 return Resources( 214 **{k: v for k, v in default_resources.items() if k in field_names} 215 )
Return the default resource definitions for this queue.
Returns:
Resources: Default resources allocated for jobs submitted to this queue.
248 @classmethod 249 def from_dict(cls, name: str, server: str | None, info: dict[str, str]) -> Self: 250 """ 251 Construct a new instance of PBSQueue from a queue name and a dictionary of queue information. 252 253 254 Args: 255 name (str): The unique name of the queue. 256 server (str | None): Server on which the queue is located. If `None`, assumes the current server. 257 info (dict[str, str]): A dictionary containing PBS queue metadata as key-value pairs. 258 259 Returns: 260 Self: A new instance of PBSQueue. 261 262 Note: 263 This method does not perform any validation or processing of the provided dictionary. 264 """ 265 queue = cls.__new__(cls) 266 queue._name = name 267 queue._server = server 268 queue._info = info 269 queue._set_attributes() 270 271 return queue
Construct a new instance of PBSQueue from a queue name and a dictionary of queue information.
Arguments:
- name (str): The unique name of the queue.
- server (str | None): Server on which the queue is located. If
None, assumes the current server. - info (dict[str, str]): A dictionary containing PBS queue metadata as key-value pairs.
Returns:
Self: A new instance of PBSQueue.
Note:
This method does not perform any validation or processing of the provided dictionary.