qq_lib.batch.pbs

PBS backend for qq: job submission, monitoring, and cluster-resource access.

This module implements qq's full integration with the PBS Pro batch system as configured on the Metacentrum-family clusters.

It provides:

  • The PBS batch-system backend, implementing job submission, killing, file synchronization (local and remote), work-directory handling, resource translation, dependency formatting, and scratch-directory logic.

  • PBSJob, PBSNode, and PBSQueue, concrete implementations of qq's job/node/queue interfaces, responsible for parsing PBS command output and exposing normalized metadata to the rest of qq.

 1# Released under MIT License.
 2# Copyright (c) 2025-2026 Ladislav Bartos and Robert Vacha Lab
 3
 4"""
 5PBS backend for qq: job submission, monitoring, and cluster-resource access.
 6
 7This module implements qq's full integration with the PBS Pro batch system
 8as configured on the Metacentrum-family clusters.
 9
10It provides:
11
12- The `PBS` batch-system backend, implementing job submission, killing, file
13  synchronization (local and remote), work-directory handling, resource
14  translation, dependency formatting, and scratch-directory logic.
15
16- `PBSJob`, `PBSNode`, and `PBSQueue`, concrete implementations of qq's
17  job/node/queue interfaces, responsible for parsing PBS command output and
18  exposing normalized metadata to the rest of qq.
19"""
20
21from .job import PBSJob
22from .node import PBSNode
23from .pbs import PBS
24from .queue import PBSQueue
25
26__all__ = [
27    "PBSJob",
28    "PBSNode",
29    "PBS",
30    "PBSQueue",
31]
class PBSJob(qq_lib.batch.interface.job.BatchJobInterface):
 31class PBSJob(BatchJobInterface):
 32    """
 33    Implementation of BatchJobInterface for PBS.
 34    Stores metadata for a single PBS job.
 35    """
 36
 37    def __init__(self, job_id: str):
 38        """Query the batch system for information about the job with the specified ID."""
 39        self._job_id = job_id
 40        self._info: dict[str, str] = {}
 41
 42        self.update()
 43
 44    def is_empty(self) -> bool:
 45        return not self._info
 46
 47    def get_id(self) -> str:
 48        return self._job_id
 49
 50    def get_account(self) -> str | None:
 51        return None
 52
 53    def update(self) -> None:
 54        # get job info from PBS
 55        command = f"qstat -fxw {self._job_id}"
 56
 57        result = subprocess.run(
 58            ["bash"],
 59            input=command,
 60            text=True,
 61            check=False,
 62            capture_output=True,
 63            errors="replace",
 64        )
 65
 66        if result.returncode != 0:
 67            # if qstat fails, information is empty
 68            logger.debug(
 69                f"qstat failed: no information about job '{self._job_id}' is available: {result.stderr.strip()}"
 70            )
 71            self._info: dict[str, str] = {}
 72        else:
 73            self._info = parse_pbs_dump_to_dictionary(result.stdout)
 74
 75    def get_state(self) -> BatchState:
 76        if not (state := self._info.get("job_state")):
 77            return BatchState.UNKNOWN
 78
 79        # X is used by PBS to indicate finished tasks in unfinished array jobs,
 80        # but qq uses X to indicate failure
 81        if state == "X":
 82            state = "F"
 83
 84        # if the job is finished and the return code is not zero, return FAILED
 85        if state == "F":
 86            exit_code = self.get_exit_code()
 87            # if exit code does not exist, the job never ran and was likely killed
 88            if exit_code is None or exit_code != 0:
 89                return BatchState.FAILED
 90
 91        return BatchState.from_code(state)
 92
 93    def get_comment(self) -> str | None:
 94        return self._info.get("comment")
 95
 96    def get_estimated(self) -> tuple[datetime, str] | None:
 97        if not (raw_time := self._info.get("estimated.start_time")):
 98            logger.debug("No 'estimated.start_time' found.")
 99            return None
100
101        try:
102            time = datetime.strptime(raw_time, CFG.date_formats.pbs)
103            # if the estimated start time is in the past, use the current time
104            if (current_time := datetime.now()) > time:
105                time = current_time
106        except Exception as e:
107            logger.debug(f"Could not parse 'estimated.start_time': {e}.")
108            return None
109
110        if not (raw_vnode := self._info.get("estimated.exec_vnode")):
111            logger.debug("No 'estimated.exec_vnode' found.")
112            return None
113
114        vnodes = []
115        for split in raw_vnode.split("+"):
116            vnodes.append(PBSJob._clean_node_name(split.strip()))
117
118        return (time, " + ".join(vnodes))
119
120    def get_main_node(self) -> str | None:
121        if raw_node := self._info.get("exec_host2"):
122            return PBSJob._clean_node_name(raw_node.split("+")[0].strip())
123
124        return None
125
126    def get_nodes(self) -> list[str] | None:
127        if not (raw_nodes := self._info.get("exec_host2")):
128            return None
129
130        nodes = []
131        for node in raw_nodes.split("+"):
132            nodes.append(PBSJob._clean_node_name(node.strip()))
133
134        return nodes
135
136    def get_short_nodes(self) -> list[str] | None:
137        if not (raw_nodes := self._info.get("exec_host")):
138            return None
139
140        nodes = []
141        for node in raw_nodes.split("+"):
142            nodes.append(PBSJob._clean_node_name(node.strip()))
143
144        return nodes
145
146    def get_name(self) -> str | None:
147        return self._info.get("Job_Name")
148
149    def get_n_cpus(self) -> int | None:
150        return self._get_int_property("Resource_List.ncpus", "the number of CPUs")
151
152    def get_n_gpus(self) -> int | None:
153        return self._get_int_property("Resource_List.ngpus", "the number of GPUs")
154
155    def get_n_nodes(self) -> int | None:
156        return self._get_int_property("Resource_List.nodect", "the number of nodes")
157
158    def get_mem(self) -> Size | None:
159        if not (mem := self._info.get("Resource_List.mem")):
160            logger.debug(
161                f"Could not get information about the amount of memory from the batch system for '{self._job_id}'."
162            )
163            return None
164
165        try:
166            return Size.from_string(mem)
167        except Exception as e:
168            logger.warning(f"Could not parse memory for '{self._job_id}': {e}.")
169            return None
170
171    def get_start_time(self) -> datetime | None:
172        return self._get_datetime_property("stime", "the job start time")
173
174    def get_submission_time(self) -> datetime | None:
175        return self._get_datetime_property("ctime", "the job submission time")
176
177    def get_completion_time(self) -> datetime | None:
178        return self._get_datetime_property("obittime", "the job completion time")
179
180    def get_modification_time(self) -> datetime | None:
181        return (
182            self._get_datetime_property("mtime", "the job modification time")
183            or self.get_submission_time()
184        )
185
186    def get_user(self) -> str | None:
187        if not (user := self._info.get("Job_Owner")):
188            return None
189
190        return user.split("@")[0]
191
192    def get_walltime(self) -> timedelta | None:
193        if not (walltime := self._info.get("Resource_List.walltime")):
194            return None
195
196        try:
197            return hhmmss_to_duration(walltime)
198        except QQError as e:
199            logger.warning(f"Could not parse walltime for '{self._job_id}': {e}.")
200            return None
201
202    def get_queue(self) -> str | None:
203        return self._info.get("queue")
204
205    def get_util_cpu(self) -> int | None:
206        if not (util_cpu := self._info.get("resources_used.cpupercent")):
207            logger.debug(
208                f"Information about CPU utilization is not available for '{self._job_id}'."
209            )
210            return None
211
212        if not (ncpus := self.get_n_cpus()):
213            logger.debug(
214                f"Information about the number of CPUs is not available for '{self._job_id}'."
215            )
216            return None
217
218        try:
219            # PBS report CPU utilization in the same way as `top` - we have to divide by number of CPUs
220            return int(util_cpu) // ncpus
221        except Exception as e:
222            # this catches both invalid util_cpu and invalid get_n_cpus
223            logger.warning(
224                f"Could not parse information about CPU utilization for '{self._job_id}': {e}."
225            )
226            return None
227
228    def get_util_mem(self) -> int | None:
229        if not (util_mem := self._info.get("resources_used.mem")):
230            logger.debug(
231                f"Information about memory utilization is not available for '{self._job_id}'."
232            )
233            return None
234
235        if not (mem := self.get_mem()):
236            logger.debug(
237                f"Information about the amount of memory is not available for '{self._job_id}'."
238            )
239            return None
240
241        try:
242            util_mem_kb = Size.from_string(util_mem).value
243            return int(util_mem_kb / mem.value * 100.0)
244        except Exception as e:
245            logger.warning(
246                f"Could not parse information about memory utilization for '{self._job_id}': {e}."
247            )
248            return None
249
250    def get_exit_code(self) -> int | None:
251        if not (exit := self._info.get("Exit_status")):
252            return None
253
254        try:
255            return int(exit)
256        except Exception as e:
257            logger.warning(f"Could not parse exit code for '{self._job_id}': {e}.")
258            return None
259
260    def get_input_machine(self) -> str | None:
261        return self._info.get("Submit_Host")
262
263    def get_input_dir(self) -> Path | None:
264        if not (env_vars := self._get_env_vars()):
265            logger.debug(
266                f"Could not get list of environment variables for '{self._job_id}'."
267            )
268            return None
269
270        if not (
271            # try qq first
272            # we always try qq first because it provides the most reliable information
273            # PBS sometimes sets the input directory to absolute path (resolving symlinks)
274            # which is not necessarily what we want
275            input_dir := env_vars.get(CFG.env_vars.input_dir)
276            or env_vars.get("PBS_O_WORKDIR")  # if this fails, try PBS
277            or env_vars.get("INF_INPUT_DIR")  # if this fails, try Infinity
278        ):
279            logger.debug(f"Could not obtain input directory for '{self._job_id}'.")
280            return None
281
282        return logical_resolve(Path(input_dir))
283
284    def get_info_file(self) -> Path | None:
285        if not (env_vars := self._get_env_vars()):
286            logger.debug(
287                f"Could not get list of environment variables for '{self._job_id}'."
288            )
289            return None
290
291        if not (info_file := env_vars.get(CFG.env_vars.info_file)):
292            logger.debug(
293                f"Job '{self._job_id}' does not have an assigned qq info file."
294            )
295            return None
296
297        return Path(info_file)
298
299    def to_yaml(self) -> str:
300        # we need to add job id to the start of the dictionary
301        to_dump = {"Job Id": self._job_id} | self._info
302        return yaml.dump(
303            to_dump, default_flow_style=False, sort_keys=False, Dumper=Dumper
304        )
305
306    def get_steps(self) -> Sequence[Self]:
307        # not available for PBS
308        return []
309
310    def get_step_id(self) -> str | None:
311        # no job steps for PBS
312        return None
313
314    def is_array_job(self) -> bool:
315        return (
316            array := self._info.get("array")
317        ) is not None and array.lower() == "true"
318
319    @classmethod
320    def from_dict(cls, job_id: str, info: dict[str, str]) -> Self:
321        """
322        Construct a new instance of PBSJob from a job ID and a dictionary of job information.
323
324        This method bypasses the standard initializer and directly sets the `_job_id` and `_info`
325        attributes of the new instance.
326
327        Args:
328            job_id (str): The unique identifier of the job.
329            info (dict[str, str]): A dictionary containing PBS job metadata as key-value pairs.
330
331        Returns:
332            Self: A new instance of PBSJob.
333
334        Note:
335            This method does not perform any validation or processing of the provided dictionary.
336        """
337        job_info = cls.__new__(cls)
338        job_info._job_id = job_id
339        job_info._info = info
340
341        return job_info
342
343    def get_id_int(self) -> int | None:
344        """
345        Extract the leading numeric portion of the job ID and return it as an integer.
346
347        Returns:
348            int | None: The integer value of the leading digits in the job ID,
349            or `None` if no valid digits are found or conversion fails.
350        """
351        match = re.match(r"\d+", self.get_id())
352        return int(match.group()) if match else None
353
354    def _get_env_vars(self) -> dict[str, str] | None:
355        """
356        Retrieve environment variables associated with the job.
357
358        Returns:
359            dict[str, str] | None: A dictionary of environment variables, or None
360            if no variable list is available.
361        """
362        if not (variable_list := self._info.get("Variable_List")):
363            return None
364
365        return dict(
366            item.split("=", 1) for item in variable_list.split(",") if "=" in item
367        )
368
369    def _get_int_property(self, property: str, property_name: str) -> int | None:
370        """
371        Retrieve an integer property value from the job information.
372
373        If the property is missing or cannot be converted, `None` is returned.
374
375        Args:
376            property (str): The key identifying the property in the job information.
377            property_name (str): A human-readable name of the property for logging.
378
379        Returns:
380            int | None: The integer value of the property, or `None` if unavailable or invalid.
381        """
382        try:
383            return int(self._info[property])
384        except Exception:
385            logger.debug(
386                f"Could not get information about {property_name} from the batch system for '{self._job_id}'."
387            )
388            return None
389
390    def _get_datetime_property(
391        self, property: str, property_name: str
392    ) -> datetime | None:
393        """
394        Retrieve and parse a datetime property from the job information.
395
396        Args:
397            property (str): The key identifying the property in the job information.
398            property_name (str): A human-readable name of the property for logging.
399
400        Returns:
401            datetime | None: A datetime object if parsing succeeds, otherwise None.
402        """
403        if not (raw_datetime := self._info.get(property)):
404            return None
405
406        try:
407            return datetime.strptime(raw_datetime, CFG.date_formats.pbs)
408        except Exception:
409            logger.warning(
410                f"Could not parse information about {property_name} for '{self._job_id}'."
411            )
412            return None
413
414    @staticmethod
415    def _clean_node_name(raw: str) -> str:
416        """
417        Normalize a raw node string to extract the clean hostname.
418
419        Args:
420            raw (str): Raw node string reported by the batch system.
421
422        Returns:
423            str: Cleaned node name.
424        """
425        return raw.split(":", 1)[0].split("/", 1)[0].replace("(", "").replace(")", "")

Implementation of BatchJobInterface for PBS. Stores metadata for a single PBS job.

PBSJob(job_id: str)
37    def __init__(self, job_id: str):
38        """Query the batch system for information about the job with the specified ID."""
39        self._job_id = job_id
40        self._info: dict[str, str] = {}
41
42        self.update()

Query the batch system for information about the job with the specified ID.

def is_empty(self) -> bool:
44    def is_empty(self) -> bool:
45        return not self._info

Check whether the job contains any information. This should return True if the job does not exist in the batch system.

Returns:

bool: True if the job contains no information.

def get_id(self) -> str:
47    def get_id(self) -> str:
48        return self._job_id

Return the ID of the job.

Returns:

str: The ID of the job.

def get_account(self) -> str | None:
50    def get_account(self) -> str | None:
51        return None

Return the account under which the job is submitted.

Returns:

str | None: Account associated with the job or None if no account is defined.

def update(self) -> None:
53    def update(self) -> None:
54        # get job info from PBS
55        command = f"qstat -fxw {self._job_id}"
56
57        result = subprocess.run(
58            ["bash"],
59            input=command,
60            text=True,
61            check=False,
62            capture_output=True,
63            errors="replace",
64        )
65
66        if result.returncode != 0:
67            # if qstat fails, information is empty
68            logger.debug(
69                f"qstat failed: no information about job '{self._job_id}' is available: {result.stderr.strip()}"
70            )
71            self._info: dict[str, str] = {}
72        else:
73            self._info = parse_pbs_dump_to_dictionary(result.stdout)

Refresh the stored job information from the batch system.

Raises:
  • QQError: If the job cannot be queried or its info updated.
def get_state(self) -> qq_lib.properties.states.BatchState:
75    def get_state(self) -> BatchState:
76        if not (state := self._info.get("job_state")):
77            return BatchState.UNKNOWN
78
79        # X is used by PBS to indicate finished tasks in unfinished array jobs,
80        # but qq uses X to indicate failure
81        if state == "X":
82            state = "F"
83
84        # if the job is finished and the return code is not zero, return FAILED
85        if state == "F":
86            exit_code = self.get_exit_code()
87            # if exit code does not exist, the job never ran and was likely killed
88            if exit_code is None or exit_code != 0:
89                return BatchState.FAILED
90
91        return BatchState.from_code(state)

Return the current state of the job as reported by the batch system.

If the job information is no longer available, return BatchState.UNKNOWN.

Returns:

BatchState: The job state according to the batch system.

def get_comment(self) -> str | None:
93    def get_comment(self) -> str | None:
94        return self._info.get("comment")

Retrieve the batch system-provided comment for the job.

Returns:

str | None: The job's comment string if available, or None if the batch system has not attached a comment.

def get_estimated(self) -> tuple[datetime.datetime, str] | None:
 96    def get_estimated(self) -> tuple[datetime, str] | None:
 97        if not (raw_time := self._info.get("estimated.start_time")):
 98            logger.debug("No 'estimated.start_time' found.")
 99            return None
100
101        try:
102            time = datetime.strptime(raw_time, CFG.date_formats.pbs)
103            # if the estimated start time is in the past, use the current time
104            if (current_time := datetime.now()) > time:
105                time = current_time
106        except Exception as e:
107            logger.debug(f"Could not parse 'estimated.start_time': {e}.")
108            return None
109
110        if not (raw_vnode := self._info.get("estimated.exec_vnode")):
111            logger.debug("No 'estimated.exec_vnode' found.")
112            return None
113
114        vnodes = []
115        for split in raw_vnode.split("+"):
116            vnodes.append(PBSJob._clean_node_name(split.strip()))
117
118        return (time, " + ".join(vnodes))

Retrieve the batch system's estimated job start time and execution node.

Returns:

tuple[datetime, str] | None: A tuple containing: - datetime: The estimated start time of the job. - str: The name of the node where the job is expected to run. Returns None if either estimate is unavailable.

def get_main_node(self) -> str | None:
120    def get_main_node(self) -> str | None:
121        if raw_node := self._info.get("exec_host2"):
122            return PBSJob._clean_node_name(raw_node.split("+")[0].strip())
123
124        return None

Retrieve the hostname of the main execution node for the job.

Returns:

str | None: The hostname of the main execution node, or None if unavailable or not applicable.

def get_nodes(self) -> list[str] | None:
126    def get_nodes(self) -> list[str] | None:
127        if not (raw_nodes := self._info.get("exec_host2")):
128            return None
129
130        nodes = []
131        for node in raw_nodes.split("+"):
132            nodes.append(PBSJob._clean_node_name(node.strip()))
133
134        return nodes

Retrieve the hostnames of all execution nodes allocated for the job.

Returns:

list[str] | None: A list of hostnames or node identifiers used by the job, or None if node information is not available.

def get_short_nodes(self) -> list[str] | None:
136    def get_short_nodes(self) -> list[str] | None:
137        if not (raw_nodes := self._info.get("exec_host")):
138            return None
139
140        nodes = []
141        for node in raw_nodes.split("+"):
142            nodes.append(PBSJob._clean_node_name(node.strip()))
143
144        return nodes

Retrieve the short hostnames of all execution nodes allocated for the job.

Returns:

list[str] | None: A list of short hostnames used by the job, or None if node information is not available.

def get_name(self) -> str | None:
146    def get_name(self) -> str | None:
147        return self._info.get("Job_Name")

Return the name of the job.

Returns:

str | None: The name of the submitted job or None if not available.

def get_n_cpus(self) -> int | None:
149    def get_n_cpus(self) -> int | None:
150        return self._get_int_property("Resource_List.ncpus", "the number of CPUs")

Return the number of CPU cores allocated for the job.

Returns:

int | None: Number of CPUs allocated for the job or None if not available.

def get_n_gpus(self) -> int | None:
152    def get_n_gpus(self) -> int | None:
153        return self._get_int_property("Resource_List.ngpus", "the number of GPUs")

Return the number of GPUs allocated for the job.

Returns:

int | None: Number of GPUs allocated for the job or None if not available.

def get_n_nodes(self) -> int | None:
155    def get_n_nodes(self) -> int | None:
156        return self._get_int_property("Resource_List.nodect", "the number of nodes")

Return the number of compute nodes assigned to the job.

Returns:

int | None: Number of nodes used by the job or None if not available.

def get_mem(self) -> qq_lib.properties.size.Size | None:
158    def get_mem(self) -> Size | None:
159        if not (mem := self._info.get("Resource_List.mem")):
160            logger.debug(
161                f"Could not get information about the amount of memory from the batch system for '{self._job_id}'."
162            )
163            return None
164
165        try:
166            return Size.from_string(mem)
167        except Exception as e:
168            logger.warning(f"Could not parse memory for '{self._job_id}': {e}.")
169            return None

Return the amount of memory allocated for the job.

Returns:

Size | None: Amount of memory allocated for the job or None if not available.

def get_start_time(self) -> datetime.datetime | None:
171    def get_start_time(self) -> datetime | None:
172        return self._get_datetime_property("stime", "the job start time")

Return the timestamp when the job started execution.

Returns:

datetime | None: Time when the job began running or None if the job has not yet started.

def get_submission_time(self) -> datetime.datetime | None:
174    def get_submission_time(self) -> datetime | None:
175        return self._get_datetime_property("ctime", "the job submission time")

Return the timestamp when the job was submitted.

Returns:

datetime | None: Time when the job was submitted to the batch system or None if not available.

def get_completion_time(self) -> datetime.datetime | None:
177    def get_completion_time(self) -> datetime | None:
178        return self._get_datetime_property("obittime", "the job completion time")

Return the timestamp when the job was completed.

Returns:

datetime | None: Time when the job completed or None if the job has not yet completed.

def get_modification_time(self) -> datetime.datetime | None:
180    def get_modification_time(self) -> datetime | None:
181        return (
182            self._get_datetime_property("mtime", "the job modification time")
183            or self.get_submission_time()
184        )

Return the timestamp at which the job was last modified.

Returns:

datetime | None: Time when the job was last modified or None if the information is not available.

def get_user(self) -> str | None:
186    def get_user(self) -> str | None:
187        if not (user := self._info.get("Job_Owner")):
188            return None
189
190        return user.split("@")[0]

Return the username of the job owner.

Returns:

str | None: Username of the user who owns the job or None if not available.

def get_walltime(self) -> datetime.timedelta | None:
192    def get_walltime(self) -> timedelta | None:
193        if not (walltime := self._info.get("Resource_List.walltime")):
194            return None
195
196        try:
197            return hhmmss_to_duration(walltime)
198        except QQError as e:
199            logger.warning(f"Could not parse walltime for '{self._job_id}': {e}.")
200            return None

Return the walltime limit of the job.

Returns:

timedelta | None: Walltime for the job or None if not available.

def get_queue(self) -> str | None:
202    def get_queue(self) -> str | None:
203        return self._info.get("queue")

Return the submission queue of the job.

Returns:

str | None: The queue this job is part of or None if not available.

def get_util_cpu(self) -> int | None:
205    def get_util_cpu(self) -> int | None:
206        if not (util_cpu := self._info.get("resources_used.cpupercent")):
207            logger.debug(
208                f"Information about CPU utilization is not available for '{self._job_id}'."
209            )
210            return None
211
212        if not (ncpus := self.get_n_cpus()):
213            logger.debug(
214                f"Information about the number of CPUs is not available for '{self._job_id}'."
215            )
216            return None
217
218        try:
219            # PBS report CPU utilization in the same way as `top` - we have to divide by number of CPUs
220            return int(util_cpu) // ncpus
221        except Exception as e:
222            # this catches both invalid util_cpu and invalid get_n_cpus
223            logger.warning(
224                f"Could not parse information about CPU utilization for '{self._job_id}': {e}."
225            )
226            return None

Return the utilization of requested CPUs in percents (0-100).

Returns:

int | None: Utilization of requested CPUs or None if not available.

def get_util_mem(self) -> int | None:
228    def get_util_mem(self) -> int | None:
229        if not (util_mem := self._info.get("resources_used.mem")):
230            logger.debug(
231                f"Information about memory utilization is not available for '{self._job_id}'."
232            )
233            return None
234
235        if not (mem := self.get_mem()):
236            logger.debug(
237                f"Information about the amount of memory is not available for '{self._job_id}'."
238            )
239            return None
240
241        try:
242            util_mem_kb = Size.from_string(util_mem).value
243            return int(util_mem_kb / mem.value * 100.0)
244        except Exception as e:
245            logger.warning(
246                f"Could not parse information about memory utilization for '{self._job_id}': {e}."
247            )
248            return None

Return the utilization of requested memory in percents (0-100).

Returns:

int | None: Utilization of requested memory or None if not available.

def get_exit_code(self) -> int | None:
250    def get_exit_code(self) -> int | None:
251        if not (exit := self._info.get("Exit_status")):
252            return None
253
254        try:
255            return int(exit)
256        except Exception as e:
257            logger.warning(f"Could not parse exit code for '{self._job_id}': {e}.")
258            return None

Return the exit code of the job.

Returns:

int | None: Exit code of the job or None if exit code is not assigned.

def get_input_machine(self) -> str | None:
260    def get_input_machine(self) -> str | None:
261        return self._info.get("Submit_Host")

Return the hostname of the submission machine.

Returns:

str | None: Hostname of the submission machine or None if not available.

def get_input_dir(self) -> pathlib._local.Path | None:
263    def get_input_dir(self) -> Path | None:
264        if not (env_vars := self._get_env_vars()):
265            logger.debug(
266                f"Could not get list of environment variables for '{self._job_id}'."
267            )
268            return None
269
270        if not (
271            # try qq first
272            # we always try qq first because it provides the most reliable information
273            # PBS sometimes sets the input directory to absolute path (resolving symlinks)
274            # which is not necessarily what we want
275            input_dir := env_vars.get(CFG.env_vars.input_dir)
276            or env_vars.get("PBS_O_WORKDIR")  # if this fails, try PBS
277            or env_vars.get("INF_INPUT_DIR")  # if this fails, try Infinity
278        ):
279            logger.debug(f"Could not obtain input directory for '{self._job_id}'.")
280            return None
281
282        return logical_resolve(Path(input_dir))

Return path to the directory from which the job was submitted.

Returns:

Path | None: Path to the submission directory or None if not available.

def get_info_file(self) -> pathlib._local.Path | None:
284    def get_info_file(self) -> Path | None:
285        if not (env_vars := self._get_env_vars()):
286            logger.debug(
287                f"Could not get list of environment variables for '{self._job_id}'."
288            )
289            return None
290
291        if not (info_file := env_vars.get(CFG.env_vars.info_file)):
292            logger.debug(
293                f"Job '{self._job_id}' does not have an assigned qq info file."
294            )
295            return None
296
297        return Path(info_file)

Return path to the info file associated with this job.

Returns:

Path | None: Path to the qq info file or None if this is not a qq job.

def to_yaml(self) -> str:
299    def to_yaml(self) -> str:
300        # we need to add job id to the start of the dictionary
301        to_dump = {"Job Id": self._job_id} | self._info
302        return yaml.dump(
303            to_dump, default_flow_style=False, sort_keys=False, Dumper=Dumper
304        )

Return all information about the job from the batch system in YAML format.

Returns:

str: YAML-formatted string of job metadata.

def get_steps(self) -> Sequence[typing.Self]:
306    def get_steps(self) -> Sequence[Self]:
307        # not available for PBS
308        return []

Return a list of steps associated with this job.

Note that job step is represented by BatchJobInterface, but may not contain all the values that a proper BatchJobInterface contains.

Returns:

Sequence[BatchJobInterface]: List of job steps. An empty list if there are none.

def get_step_id(self) -> str | None:
310    def get_step_id(self) -> str | None:
311        # no job steps for PBS
312        return None

Return the step index if this job is a job step.

Returns:

str | None: Job step index or None if this is not a job step.

def is_array_job(self) -> bool:
314    def is_array_job(self) -> bool:
315        return (
316            array := self._info.get("array")
317        ) is not None and array.lower() == "true"

Return True if the job is a top-level array job (not a sub-job).

Returns:

bool: True if the job is a top-level array job, else False.

@classmethod
def from_dict(cls, job_id: str, info: dict[str, str]) -> Self:
319    @classmethod
320    def from_dict(cls, job_id: str, info: dict[str, str]) -> Self:
321        """
322        Construct a new instance of PBSJob from a job ID and a dictionary of job information.
323
324        This method bypasses the standard initializer and directly sets the `_job_id` and `_info`
325        attributes of the new instance.
326
327        Args:
328            job_id (str): The unique identifier of the job.
329            info (dict[str, str]): A dictionary containing PBS job metadata as key-value pairs.
330
331        Returns:
332            Self: A new instance of PBSJob.
333
334        Note:
335            This method does not perform any validation or processing of the provided dictionary.
336        """
337        job_info = cls.__new__(cls)
338        job_info._job_id = job_id
339        job_info._info = info
340
341        return job_info

Construct a new instance of PBSJob from a job ID and a dictionary of job information.

This method bypasses the standard initializer and directly sets the _job_id and _info attributes of the new instance.

Arguments:
  • job_id (str): The unique identifier of the job.
  • info (dict[str, str]): A dictionary containing PBS job metadata as key-value pairs.
Returns:

Self: A new instance of PBSJob.

Note:

This method does not perform any validation or processing of the provided dictionary.

def get_id_int(self) -> int | None:
343    def get_id_int(self) -> int | None:
344        """
345        Extract the leading numeric portion of the job ID and return it as an integer.
346
347        Returns:
348            int | None: The integer value of the leading digits in the job ID,
349            or `None` if no valid digits are found or conversion fails.
350        """
351        match = re.match(r"\d+", self.get_id())
352        return int(match.group()) if match else None

Extract the leading numeric portion of the job ID and return it as an integer.

Returns:

int | None: The integer value of the leading digits in the job ID, or None if no valid digits are found or conversion fails.

class PBSNode(qq_lib.batch.interface.node.BatchNodeInterface):
 90class PBSNode(BatchNodeInterface):
 91    """
 92    Implementation of BatchNodeInterface for PBS.
 93    Stores metadata for a single PBS node.
 94    """
 95
 96    def __init__(self, name: str, server: str | None = None):
 97        self._name = name
 98        self._server = server
 99        self._info: dict[str, str] = {}
100
101        self.update()
102
103    def update(self) -> None:
104        # get node info from PBS
105        command = f"pbsnodes -v {self._name}"
106        if self._server:
107            command += f" -s {self._server}"
108
109        result = subprocess.run(
110            ["bash"],
111            input=command,
112            text=True,
113            check=False,
114            capture_output=True,
115            errors="replace",
116        )
117
118        if result.returncode != 0:
119            raise QQError(f"Node '{self._name}' does not exist.")
120
121        self._info = parse_pbs_dump_to_dictionary(result.stdout)
122
123    def get_name(self) -> str:
124        return self._name
125
126    def get_n_cpus(self) -> int | None:
127        return self._get_int_resource("resources_available.ncpus")
128
129    def get_n_free_cpus(self) -> int | None:
130        return self._get_free_int_resource("ncpus")
131
132    def get_n_gpus(self) -> int | None:
133        return self._get_int_resource("resources_available.ngpus")
134
135    def get_n_free_gpus(self) -> int | None:
136        return self._get_free_int_resource("ngpus")
137
138    def get_cpu_memory(self) -> Size | None:
139        return self._get_size_resource("resources_available.mem")
140
141    def get_free_cpu_memory(self) -> Size | None:
142        return self._get_free_size_resource("mem")
143
144    def get_gpu_memory(self) -> Size | None:
145        return self._get_size_resource("resources_available.gpu_mem")
146
147    def get_free_gpu_memory(self) -> Size | None:
148        return self._get_free_size_resource("gpu_mem")
149
150    def get_local_scratch(self) -> Size | None:
151        return self._get_size_resource("resources_available.scratch_local")
152
153    def get_free_local_scratch(self) -> Size | None:
154        return self._get_free_size_resource("scratch_local")
155
156    def get_ssd_scratch(self) -> Size | None:
157        return self._get_size_resource("resources_available.scratch_ssd")
158
159    def get_free_ssd_scratch(self) -> Size | None:
160        return self._get_free_size_resource("scratch_ssd")
161
162    def get_shared_scratch(self) -> Size | None:
163        return self._get_size_resource("resources_available.scratch_shared")
164
165    def get_free_shared_scratch(self) -> Size | None:
166        return self._get_free_size_resource("scratch_shared")
167
168    def get_properties(self) -> list[str]:
169        return [
170            key.split(".", 1)[-1]
171            for key in self._info
172            if "resources_available" in key and self._info[key] == "True"
173        ]
174
175    def is_available_to_user(self, user: str) -> bool:
176        if not (state := self._info.get("state")):
177            logger.debug(f"Could not get state information for node '{self._name}'.")
178            return False
179
180        if any(
181            disabled_state in state
182            for disabled_state in {"down", "unknown", "unresolvable", "resv-exclusive"}
183        ):
184            return False
185
186        if queue := self._info.get("queue"):
187            return QueuesAvailability.get_or_init(queue, user, self._server)
188
189        return True
190
191    @classmethod
192    def from_dict(cls, name: str, server: str | None, info: dict[str, str]) -> Self:
193        """
194        Construct a new instance of PBSNode from node name and a dictionary of node information.
195
196        Args:
197            name (str): The unique name of the node.
198            server (str | None): Server on which the node is located. If `None`, assumes the current server.
199            info (dict[str, str]): A dictionary containing PBS node metadata as key-value pairs.
200
201        Returns:
202            Self: A new instance of PBSNode.
203
204        Note:
205            This method does not perform any validation or processing of the provided dictionary.
206        """
207        node = cls.__new__(cls)
208        node._name = name
209        node._server = server
210        node._info = info
211
212        return node
213
214    def to_yaml(self) -> str:
215        # we need to add node name to the start of the dictionary
216        to_dump = {"Node": self._name} | self._info
217        return yaml.dump(
218            to_dump, default_flow_style=False, sort_keys=False, Dumper=Dumper
219        )
220
221    def _get_int_resource(self, res: str) -> int | None:
222        """
223        Retrieve an integer-valued resource from the node information.
224
225        Args:
226            res (str): The resource key to retrieve (e.g., "resources_available.ncpus").
227
228        Returns:
229            int: The integer value of the resource, or `None` if unavailable or invalid.
230        """
231        if not (val := self._info.get(res)):
232            return None
233        try:
234            return int(val)
235        except Exception as e:
236            logger.debug(f"Could not parse the value '{val}' of resource '{res}': {e}.")
237            return None
238
239    def _get_free_int_resource(self, res: str) -> int | None:
240        """
241        Compute the number of free units for an integer-valued resource.
242
243        Calculates the difference between the total available (`resources_available.<res>`)
244        and the assigned (`resources_assigned.<res>`) quantities. If the computed
245        difference is negative, returns 0. If the information is not available, returns None.
246
247        Args:
248            res (str): The base resource name (e.g., "ncpus", "ngpus").
249
250        Returns:
251            int | None: The number of unallocated (free) resource units, or None if unavailable.
252        """
253        if not (full := self._get_int_resource(f"resources_available.{res}")):
254            return None
255
256        # if the `resources_assigned` property is missing, we assume it means that there are no resources assigned
257        assigned = self._get_int_resource(f"resources_assigned.{res}") or 0
258
259        if (diff := full - assigned) >= 0:
260            return diff
261
262        return 0
263
264    def _get_size_resource(self, res: str) -> Size | None:
265        """
266        Retrieve a Size resource from the node information.
267
268        Args:
269            res (str): The resource key to retrieve (e.g., "resources_available.mem").
270
271        Returns:
272            Size | None: The parsed Size, or `None` if unavailable or invalid.
273        """
274        if not (val := self._info.get(res)):
275            return None
276
277        try:
278            return Size.from_string(val)
279        except Exception as e:
280            logger.debug(f"Could not parse the value '{val}' of resource '{res}': {e}.")
281            return None
282
283    def _get_free_size_resource(self, res: str) -> Size | None:
284        """
285        Compute the amount of free space for a Size resource.
286
287        Calculates the difference between total available (`resources_available.<res>`)
288        and assigned (`resources_assigned.<res>`) values. If subtraction results in a negative size,
289        returns a zero-size object. If the information are not available, returns None.
290
291        Args:
292            res (str): The base resource name (e.g., "mem", "scratch_local").
293
294        Returns:
295            Size | None: The available (free) size for the resource, or `None` if unavailable.
296        """
297        if not (full := self._get_size_resource(f"resources_available.{res}")):
298            return None
299
300        # if the `resources_assigned` property is missing, we assume it means that there are no resources assigned
301        assigned = self._get_size_resource(f"resources_assigned.{res}") or Size(0, "kb")
302
303        try:
304            return full - assigned
305        except ValueError as e:
306            logger.debug(f"Negative free size of resource '{res}': {e}.")
307            return Size(0, "kb")

Implementation of BatchNodeInterface for PBS. Stores metadata for a single PBS node.

PBSNode(name: str, server: str | None = None)
 96    def __init__(self, name: str, server: str | None = None):
 97        self._name = name
 98        self._server = server
 99        self._info: dict[str, str] = {}
100
101        self.update()
def update(self) -> None:
103    def update(self) -> None:
104        # get node info from PBS
105        command = f"pbsnodes -v {self._name}"
106        if self._server:
107            command += f" -s {self._server}"
108
109        result = subprocess.run(
110            ["bash"],
111            input=command,
112            text=True,
113            check=False,
114            capture_output=True,
115            errors="replace",
116        )
117
118        if result.returncode != 0:
119            raise QQError(f"Node '{self._name}' does not exist.")
120
121        self._info = parse_pbs_dump_to_dictionary(result.stdout)

Refresh the stored node information from the batch system.

Raises:
  • QQError: If the node cannot be queried or its info updated.
def get_name(self) -> str:
123    def get_name(self) -> str:
124        return self._name

Retrieve the name of the node.

Returns:

str: The name identifying the node in the batch system.

def get_n_cpus(self) -> int | None:
126    def get_n_cpus(self) -> int | None:
127        return self._get_int_resource("resources_available.ncpus")

Retrieve the total number of CPU cores available on the node.

Returns:

int | None: Total CPU core count or None if not available.

def get_n_free_cpus(self) -> int | None:
129    def get_n_free_cpus(self) -> int | None:
130        return self._get_free_int_resource("ncpus")

Retrieve the number of currently available (unallocated) CPU cores.

Returns:

int | None: Number of free CPU cores or None if not available.

def get_n_gpus(self) -> int | None:
132    def get_n_gpus(self) -> int | None:
133        return self._get_int_resource("resources_available.ngpus")

Retrieve the total number of GPUs available on the node.

Returns:

int | None: Total GPU count or None if not available..

def get_n_free_gpus(self) -> int | None:
135    def get_n_free_gpus(self) -> int | None:
136        return self._get_free_int_resource("ngpus")

Retrieve the number of currently available (unallocated) GPUs.

Returns:

int | None: Number of free GPUs or None if not available.

def get_cpu_memory(self) -> qq_lib.properties.size.Size | None:
138    def get_cpu_memory(self) -> Size | None:
139        return self._get_size_resource("resources_available.mem")

Retrieve the total CPU memory capacity of the node.

Returns:

Size | None: Total CPU memory available on the node or None if not available.

def get_free_cpu_memory(self) -> qq_lib.properties.size.Size | None:
141    def get_free_cpu_memory(self) -> Size | None:
142        return self._get_free_size_resource("mem")

Retrieve the currently available CPU memory.

Returns:

Size | None: Free (unused) CPU memory or None if not available.

def get_gpu_memory(self) -> qq_lib.properties.size.Size | None:
144    def get_gpu_memory(self) -> Size | None:
145        return self._get_size_resource("resources_available.gpu_mem")

Retrieve the total GPU memory capacity of the node.

Returns:

Size | None: Total GPU memory available or None if not available.

def get_free_gpu_memory(self) -> qq_lib.properties.size.Size | None:
147    def get_free_gpu_memory(self) -> Size | None:
148        return self._get_free_size_resource("gpu_mem")

Retrieve the currently available GPU memory.

Returns:

Size | None: Free (unused) GPU memory or None if not available.

def get_local_scratch(self) -> qq_lib.properties.size.Size | None:
150    def get_local_scratch(self) -> Size | None:
151        return self._get_size_resource("resources_available.scratch_local")

Retrieve the total local scratch storage capacity of the node.

Returns:

Size | None: Total size of local scratch space or None if not available.

def get_free_local_scratch(self) -> qq_lib.properties.size.Size | None:
153    def get_free_local_scratch(self) -> Size | None:
154        return self._get_free_size_resource("scratch_local")

Retrieve the available local scratch storage space.

Returns:

Size | None: Free local scratch space or None if not available.

def get_ssd_scratch(self) -> qq_lib.properties.size.Size | None:
156    def get_ssd_scratch(self) -> Size | None:
157        return self._get_size_resource("resources_available.scratch_ssd")

Retrieve the total SSD-based scratch storage capacity.

Returns:

Size | None: Total SSD scratch capacity or None if not available.

def get_free_ssd_scratch(self) -> qq_lib.properties.size.Size | None:
159    def get_free_ssd_scratch(self) -> Size | None:
160        return self._get_free_size_resource("scratch_ssd")

Retrieve the currently available SSD-based scratch storage space.

Returns:

Size | None: Free SSD scratch space or None if not available.

def get_shared_scratch(self) -> qq_lib.properties.size.Size | None:
162    def get_shared_scratch(self) -> Size | None:
163        return self._get_size_resource("resources_available.scratch_shared")

Retrieve the total capacity of shared scratch storage accessible from the node.

Returns:

Size | None: Total shared scratch capacity or None if not available.

def get_free_shared_scratch(self) -> qq_lib.properties.size.Size | None:
165    def get_free_shared_scratch(self) -> Size | None:
166        return self._get_free_size_resource("scratch_shared")

Retrieve the available space in shared scratch storage.

Returns:

Size | None: Free shared scratch space or None if not available.

def get_properties(self) -> list[str]:
168    def get_properties(self) -> list[str]:
169        return [
170            key.split(".", 1)[-1]
171            for key in self._info
172            if "resources_available" in key and self._info[key] == "True"
173        ]

Get the list of properties or labels assigned to the node.

Returns:

list[str]: List of node property strings.

def is_available_to_user(self, user: str) -> bool:
175    def is_available_to_user(self, user: str) -> bool:
176        if not (state := self._info.get("state")):
177            logger.debug(f"Could not get state information for node '{self._name}'.")
178            return False
179
180        if any(
181            disabled_state in state
182            for disabled_state in {"down", "unknown", "unresolvable", "resv-exclusive"}
183        ):
184            return False
185
186        if queue := self._info.get("queue"):
187            return QueuesAvailability.get_or_init(queue, user, self._server)
188
189        return True

Check if the node is available to the specified user.

Arguments:
  • user (str): The username to check access for.
Returns:

bool: True if the node is up and schedulable, False otherwise.

@classmethod
def from_dict(cls, name: str, server: str | None, info: dict[str, str]) -> Self:
191    @classmethod
192    def from_dict(cls, name: str, server: str | None, info: dict[str, str]) -> Self:
193        """
194        Construct a new instance of PBSNode from node name and a dictionary of node information.
195
196        Args:
197            name (str): The unique name of the node.
198            server (str | None): Server on which the node is located. If `None`, assumes the current server.
199            info (dict[str, str]): A dictionary containing PBS node metadata as key-value pairs.
200
201        Returns:
202            Self: A new instance of PBSNode.
203
204        Note:
205            This method does not perform any validation or processing of the provided dictionary.
206        """
207        node = cls.__new__(cls)
208        node._name = name
209        node._server = server
210        node._info = info
211
212        return node

Construct a new instance of PBSNode from node name and a dictionary of node information.

Arguments:
  • name (str): The unique name of the node.
  • server (str | None): Server on which the node is located. If None, assumes the current server.
  • info (dict[str, str]): A dictionary containing PBS node metadata as key-value pairs.
Returns:

Self: A new instance of PBSNode.

Note:

This method does not perform any validation or processing of the provided dictionary.

def to_yaml(self) -> str:
214    def to_yaml(self) -> str:
215        # we need to add node name to the start of the dictionary
216        to_dump = {"Node": self._name} | self._info
217        return yaml.dump(
218            to_dump, default_flow_style=False, sort_keys=False, Dumper=Dumper
219        )

Return all information about the node in YAML format.

Returns:

str: YAML-formatted string of node metadata.

class PBS(qq_lib.batch.interface.interface.BatchInterface[qq_lib.batch.pbs.job.PBSJob, qq_lib.batch.pbs.queue.PBSQueue, qq_lib.batch.pbs.node.PBSNode]):
  30class PBS(BatchInterface[PBSJob, PBSQueue, PBSNode]):
  31    """
  32    Implementation of BatchInterface for PBS Pro batch system.
  33    """
  34
  35    # all standard scratch directory (excl. in RAM scratch) types supported by PBS
  36    SUPPORTED_SCRATCHES = ["scratch_local", "scratch_ssd", "scratch_shared"]
  37
  38    @classmethod
  39    def env_name(cls) -> str:
  40        return "PBS"
  41
  42    @classmethod
  43    def is_available(cls) -> bool:
  44        return shutil.which("qsub") is not None
  45
  46    @classmethod
  47    def get_job_id(cls) -> str | None:
  48        return os.environ.get("PBS_JOBID")
  49
  50    @classmethod
  51    def create_work_dir_on_scratch(cls, job_id: str) -> Path:
  52        scratch_dir = cls._get_scratch_dir(job_id)
  53
  54        # create working directory inside the scratch directory allocated by the batch system
  55        # we create this directory because other processes may write files
  56        # into the allocated scratch directory and we do not want these files
  57        # to affect the job execution or be copied back to input_dir
  58        # this also simplifies deletion of the working directory
  59        # (the allocated scratch dir cannot be deleted)
  60        work_dir = logical_resolve(scratch_dir / CFG.pbs_options.scratch_dir_inner)
  61
  62        logger.debug(f"Creating working directory '{str(work_dir)}'.")
  63        work_dir.mkdir(exist_ok=True)
  64
  65        return work_dir
  66
  67    @classmethod
  68    def job_submit(
  69        cls,
  70        res: Resources,
  71        queue: str,
  72        script: Path,
  73        job_name: str,
  74        depend: list[Depend],
  75        env_vars: dict[str, str],
  76        account: str | None = None,
  77        server: str | None = None,
  78        remote_host: str | None = None,
  79    ) -> str:
  80        # account unused
  81        _ = account
  82
  83        input_dir = script.parent
  84        logger.debug(f"Job submission: input directory is '{str(input_dir)}'.")
  85
  86        cls._shared_guard(input_dir, res, env_vars, server, remote_host)
  87
  88        # set env vars required for Infinity modules
  89        # this can be removed once Infinity stops being supported
  90        env_vars.update(cls._collect_ams_env_vars())
  91
  92        # if we are submitting to a different server, we need to change the AMS site
  93        # this can be removed once Infinity stops being supported
  94        if server:
  95            cls._modify_ams_env_vars(env_vars, server)
  96
  97        # get the submission command
  98        command = cls._translate_submit(
  99            res,
 100            queue,
 101            server,
 102            script.parent,
 103            str(script),
 104            job_name,
 105            depend,
 106            env_vars,
 107        )
 108        logger.debug(command)
 109
 110        if not remote_host:
 111            # submit the script from the current host
 112            result = subprocess.run(
 113                ["bash"],
 114                input=command,
 115                text=True,
 116                check=False,
 117                capture_output=True,
 118                errors="replace",
 119            )
 120        else:
 121            # submit the script from the remote host
 122            logger.debug(
 123                f"Navigating to '{remote_host}' to execute the submission command '{command}'."
 124            )
 125            result = subprocess.run(
 126                [
 127                    "ssh",
 128                    "-o PasswordAuthentication=no",
 129                    "-o GSSAPIAuthentication=yes",
 130                    "-o StrictHostKeyChecking=no",  # allow unknown hosts
 131                    f"-o ConnectTimeout={CFG.timeouts.ssh}",
 132                    "-q",  # suppress some SSH messages
 133                    remote_host,
 134                    command,
 135                ],
 136                capture_output=True,
 137                text=True,
 138            )
 139
 140        if result.returncode != 0:
 141            raise QQError(
 142                f"Failed to submit script '{str(script)}': {result.stderr.strip()}."
 143            )
 144
 145        return result.stdout.strip()
 146
 147    @classmethod
 148    def job_kill(cls, job_id: str) -> None:
 149        command = cls._translate_kill(job_id)
 150        logger.debug(command)
 151
 152        # run the kill command
 153        result = subprocess.run(
 154            ["bash"],
 155            input=command,
 156            text=True,
 157            check=False,
 158            capture_output=True,
 159            errors="replace",
 160        )
 161
 162        if result.returncode != 0:
 163            raise QQError(f"Failed to kill job '{job_id}': {result.stderr.strip()}.")
 164
 165    @classmethod
 166    def job_kill_force(cls, job_id: str) -> None:
 167        command = cls._translate_kill_force(job_id)
 168        logger.debug(command)
 169
 170        # run the kill command
 171        result = subprocess.run(
 172            ["bash"],
 173            input=command,
 174            text=True,
 175            check=False,
 176            capture_output=True,
 177            errors="replace",
 178        )
 179
 180        if result.returncode != 0:
 181            raise QQError(f"Failed to kill job '{job_id}': {result.stderr.strip()}.")
 182
 183    @classmethod
 184    def get_batch_job(cls, job_id: str) -> PBSJob:
 185        return PBSJob(job_id)
 186
 187    @classmethod
 188    def get_batch_jobs_from_ids(cls, job_ids: list[str]) -> list[PBSJob]:
 189        if not job_ids:
 190            return []
 191
 192        command = f"qstat -fxw {' '.join(job_ids)}"
 193        logger.debug(command)
 194        return cls._get_batch_jobs_using_command(
 195            command,
 196            include_completed=True,
 197            include_top_level_array=True,
 198            ignore_exit_code=True,
 199        )
 200
 201    @classmethod
 202    def get_unfinished_batch_jobs(
 203        cls, user: str, server: str | None = None
 204    ) -> list[PBSJob]:
 205        command = f"qstat -fwtu {user}"
 206        if server:
 207            command += f" @{server}"
 208        logger.debug(command)
 209        return cls._get_batch_jobs_using_command(
 210            command,
 211            include_completed=False,
 212            include_top_level_array=True,
 213            ignore_exit_code=False,
 214        )
 215
 216    @classmethod
 217    def get_batch_jobs(cls, user: str, server: str | None = None) -> list[PBSJob]:
 218        command = f"qstat -fwxtu {user}"
 219        if server:
 220            command += f" @{server}"
 221        logger.debug(command)
 222        return cls._get_batch_jobs_using_command(
 223            command,
 224            include_completed=True,
 225            include_top_level_array=True,
 226            ignore_exit_code=False,
 227        )
 228
 229    @classmethod
 230    def get_all_unfinished_batch_jobs(cls, server: str | None = None) -> list[PBSJob]:
 231        command = "qstat -fwt"
 232        if server:
 233            command += f" @{server}"
 234        logger.debug(command)
 235        return cls._get_batch_jobs_using_command(
 236            command,
 237            include_completed=False,
 238            include_top_level_array=True,
 239            ignore_exit_code=False,
 240        )
 241
 242    @classmethod
 243    def get_all_batch_jobs(cls, server: str | None = None) -> list[PBSJob]:
 244        command = "qstat -fxwt"
 245        if server:
 246            command += f" @{server}"
 247        logger.debug(command)
 248        return cls._get_batch_jobs_using_command(
 249            command,
 250            include_completed=True,
 251            include_top_level_array=True,
 252            ignore_exit_code=False,
 253        )
 254
 255    @classmethod
 256    def get_queues(cls, server: str | None = None) -> list[PBSQueue]:
 257        command = "qstat -Qfw"
 258        if server:
 259            command += f" @{server}"
 260        logger.debug(command)
 261
 262        result = subprocess.run(
 263            ["bash"],
 264            input=command,
 265            text=True,
 266            check=False,
 267            capture_output=True,
 268            errors="replace",
 269        )
 270
 271        if result.returncode != 0:
 272            raise QQError(
 273                f"Could not retrieve information about queues: {result.stderr.strip()}."
 274            )
 275
 276        queues = []
 277        for data, name in parse_multi_pbs_dump_to_dictionaries(
 278            result.stdout.strip(), "Queue"
 279        ):
 280            queues.append(PBSQueue.from_dict(name, server, data))
 281
 282        return queues
 283
 284    @classmethod
 285    def get_nodes(cls, server: str | None = None) -> list[PBSNode]:
 286        command = "pbsnodes -a"
 287        if server:
 288            command += f" -s {server}"
 289        logger.debug(command)
 290
 291        result = subprocess.run(
 292            ["bash"],
 293            input=command,
 294            text=True,
 295            check=False,
 296            capture_output=True,
 297            errors="replace",
 298        )
 299
 300        if result.returncode != 0:
 301            raise QQError(
 302                f"Could not retrieve information about nodes: {result.stderr.strip()}."
 303            )
 304
 305        queues = []
 306        for data, name in parse_multi_pbs_dump_to_dictionaries(
 307            result.stdout.strip(), None
 308        ):
 309            queues.append(PBSNode.from_dict(name, server, data))
 310
 311        return queues
 312
 313    @classmethod
 314    def get_supported_work_dir_types(cls) -> list[str]:
 315        return cls.SUPPORTED_SCRATCHES + [
 316            "scratch_shm",
 317            "input_dir",
 318            "job_dir",  # same as input_dir
 319        ]
 320
 321    @classmethod
 322    def read_remote_file(cls, host: str, file: Path) -> str:
 323        if os.environ.get(CFG.env_vars.shared_submit):
 324            # file is on shared storage, we can read it directly
 325            # this assumes that this method is only used to read files in input_dir
 326            logger.debug(f"Reading a file '{file}' from shared storage.")
 327            try:
 328                return file.read_text()
 329            except Exception as e:
 330                raise QQError(f"Could not read file '{file}': {e}.") from e
 331        else:
 332            # otherwise, we fall back to the default implementation
 333            logger.debug(f"Reading a remote file '{file}' on '{host}'.")
 334            return super().read_remote_file(host, file)
 335
 336    @classmethod
 337    def write_remote_file(cls, host: str, file: Path, content: str) -> None:
 338        if os.environ.get(CFG.env_vars.shared_submit):
 339            # file should be written to shared storage
 340            # this assumes that the method is only used to write files into input_dir
 341            logger.debug(f"Writing a file '{file}' to shared storage.")
 342            try:
 343                file.write_text(content)
 344            except Exception as e:
 345                raise QQError(f"Could not write file '{file}': {e}.") from e
 346        else:
 347            # otherwise, we fall back to the default implementation
 348            logger.debug(f"Writing a remote file '{file}' on '{host}'.")
 349            super().write_remote_file(host, file, content)
 350
 351    @classmethod
 352    def make_remote_dir(cls, host: str, directory: Path) -> None:
 353        if os.environ.get(CFG.env_vars.shared_submit):
 354            # assuming the directory is created in input_dir
 355            logger.debug(f"Creating a directory '{directory}' on shared storage.")
 356            try:
 357                directory.mkdir(exist_ok=True)
 358            except Exception as e:
 359                raise QQError(
 360                    f"Could not create a directory '{directory}': {e}."
 361                ) from e
 362        else:
 363            # otherwise we fall back to the default implementation
 364            logger.debug(f"Creating a directory '{directory}' on '{host}'.")
 365            super().make_remote_dir(host, directory)
 366
 367    @classmethod
 368    def list_remote_dir(cls, host: str, directory: Path) -> list[Path]:
 369        if os.environ.get(CFG.env_vars.shared_submit):
 370            # assuming we are listing input_dir or another directory on shared storage
 371            logger.debug(f"Listing a directory '{directory}' on shared storage.")
 372            try:
 373                return list(directory.iterdir())
 374            except Exception as e:
 375                raise QQError(f"Could not list a directory '{directory}': {e}.") from e
 376        else:
 377            # otherwise we fall back to the default implementation
 378            logger.debug(f"Listing a directory '{directory}' on '{host}'.")
 379            return super().list_remote_dir(host, directory)
 380
 381    @classmethod
 382    def delete_remote_dir(cls, host: str, directory: Path) -> None:
 383        if host == socket.getfqdn():
 384            # directory is available on the current host
 385            logger.debug(f"Deleting a directory '{directory}' on local host.")
 386            try:
 387                shutil.rmtree(directory)
 388            except Exception as e:
 389                raise QQError(f"Could not delete directory '{directory}': {e}.") from e
 390        else:
 391            # otherwise we fall back to the default implementation
 392            logger.debug(f"Deleting a directory '{directory}' on '{host}'.")
 393            return super().delete_remote_dir(host, directory)
 394
 395    @classmethod
 396    def move_remote_files(
 397        cls, host: str, files: list[Path], moved_files: list[Path]
 398    ) -> None:
 399        if len(files) != len(moved_files):
 400            raise QQError(
 401                "The provided 'files' and 'moved_files' must have the same length."
 402            )
 403
 404        if os.environ.get(CFG.env_vars.shared_submit):
 405            # assuming we are moving files inside input_dir or another directory on shared storage
 406            logger.debug(
 407                f"Moving files '{files}' -> '{moved_files}' on a shared storage."
 408            )
 409            for src, dst in zip(files, moved_files):
 410                shutil.move(str(src), str(dst))
 411        else:
 412            # otherwise we fall back to the default implementation
 413            logger.debug(f"Moving files '{files}' -> '{moved_files}' on '{host}'.")
 414            super().move_remote_files(host, files, moved_files)
 415
 416    @classmethod
 417    def sync_with_exclusions(
 418        cls,
 419        src_dir: Path,
 420        dest_dir: Path,
 421        src_host: str | None,
 422        dest_host: str | None,
 423        exclude_files: list[Path] | None = None,
 424    ) -> None:
 425        cls._sync_directories(
 426            src_dir,
 427            dest_dir,
 428            src_host,
 429            dest_host,
 430            exclude_files,
 431            super().sync_with_exclusions,
 432        )
 433
 434    @classmethod
 435    def sync_selected(
 436        cls,
 437        src_dir: Path,
 438        dest_dir: Path,
 439        src_host: str | None,
 440        dest_host: str | None,
 441        include_files: list[Path] | None = None,
 442    ) -> None:
 443        cls._sync_directories(
 444            src_dir,
 445            dest_dir,
 446            src_host,
 447            dest_host,
 448            include_files,
 449            super().sync_selected,
 450        )
 451
 452    @classmethod
 453    def transform_resources(
 454        cls, queue: str, server: str | None, provided_resources: Resources
 455    ) -> Resources:
 456        # default resources of the queue
 457        default_queue_resources = PBSQueue(queue, server).get_default_resources()
 458        # default hard-coded resources
 459        default_batch_resources = cls._get_default_server_resources()
 460
 461        # fill in default parameters
 462        resources = Resources.merge_resources(
 463            provided_resources, default_queue_resources, default_batch_resources
 464        )
 465        if not resources.work_dir:
 466            raise QQError(
 467                "Work-dir is not set after filling in default attributes. This is a bug."
 468            )
 469
 470        # sanity check input_dir
 471        if equals_normalized(resources.work_dir, "job_dir") or equals_normalized(
 472            resources.work_dir, "input_dir"
 473        ):
 474            # work-size should not be used with job_dir
 475            if provided_resources.work_size:
 476                logger.warning(
 477                    "Setting work-size is not supported for work-dir='job_dir' or 'input_dir'.\n"
 478                    "Job will run in the submission directory with unlimited capacity.\n"
 479                    "The work-size attribute will be ignored."
 480                )
 481
 482            resources.work_dir = "input_dir"
 483            resources.work_size = None
 484            resources.work_size_per_cpu = None
 485            return resources
 486
 487        # scratch in RAM (https://docs.metacentrum.cz/en/docs/computing/infrastructure/scratch-storages#scratch-in-ram)
 488        if equals_normalized(resources.work_dir, "scratch_shm"):
 489            # work-size should not be used with scratch_shm
 490            if provided_resources.work_size:
 491                logger.warning(
 492                    "Setting work-size is not supported for work-dir='scratch_shm'.\n"
 493                    "Size of the in-RAM scratch is specified using the --mem property.\n"
 494                    "The work-size attribute will be ignored."
 495                )
 496
 497            resources.work_dir = "scratch_shm"
 498            resources.work_size = None
 499            resources.work_size_per_cpu = None
 500            return resources
 501
 502        # if work-dir matches any of the "standard" scratches supported by PBS
 503        if match := next(
 504            (
 505                x
 506                for x in cls.SUPPORTED_SCRATCHES
 507                if equals_normalized(x, resources.work_dir)
 508            ),
 509            None,
 510        ):
 511            resources.work_dir = match
 512            return resources
 513
 514        # unknown work-dir type
 515        raise QQError(
 516            f"Unknown working directory type specified: work-dir='{resources.work_dir}'. Supported types for {cls.env_name()} are: '{' '.join(cls.get_supported_work_dir_types())}'."
 517        )
 518
 519    @classmethod
 520    def sort_jobs(cls, jobs: list[PBSJob]) -> None:
 521        # jobs with invalid ID get assigned an ID of 0 for sorting => they are sorted to the start
 522        # and therefore are displayed at the top in the qq jobs / qq stat output
 523        jobs.sort(key=lambda job: job.get_id_int() or 0)
 524
 525    @classmethod
 526    def _get_scratch_dir(cls, job_id: str) -> Path:
 527        """
 528        Get the path to the scratch directory allocated by PBS.
 529        """
 530        scratch_dir = os.environ.get(CFG.env_vars.pbs_scratch_dir)
 531        if not scratch_dir:
 532            raise QQError(f"Scratch directory for job '{job_id}' is undefined")
 533
 534        return Path(scratch_dir)
 535
 536    @classmethod
 537    def _shared_guard(
 538        cls,
 539        input_dir: Path,
 540        res: Resources,
 541        env_vars: dict[str, str],
 542        server: str | None,
 543        remote_host: str | None,
 544    ) -> None:
 545        """
 546        Ensure correct handling of shared vs. local submission directories.
 547
 548        If the job's input directory is on shared storage, adds the
 549        environment variable `SHARED_SUBMIT` to the list of env vars to propagate to the job.
 550        This environment variable is later used e.g. to select the appropriate data copying method.
 551
 552        If the job is configured to use the submission directory as a working directory
 553        (`work-dir=input_dir` or 'job_dir') but that directory is not shared, a `QQError` is raised.
 554
 555        If the job is to be submitted to a potentially non-local server,
 556        but the input directory is not shared, a `QQError` is raised.
 557
 558        If the job is to be submitted on a remote host,
 559        but the input directory is not shared, a `QQError` is raised.
 560
 561        Args:
 562            input_dir (Path): Path to the input directory of the job.
 563            res (Resources): The job's resource configuration.
 564            env_vars (dict[str, str]): Dictionary of environment variables to propagate to the job.
 565            server (str | None): The target PBS server, or None if submitting to the default server.
 566            remote_host (str | None): Host from which the submission is performed, or None if sumitting from the current machine.
 567
 568        Raises:
 569            QQError: If the job is set to run directly in the submission
 570                    directory while submission is from a non-shared filesystem.
 571            QQError: If the job is set to run on a non-default server while
 572                submission is from a non-shared filesystem.
 573        """
 574        if cls.is_shared(input_dir):
 575            env_vars[CFG.env_vars.shared_submit] = "true"
 576        elif not res.uses_scratch():
 577            # if job directory is used as working directory, it must always be shared
 578            raise QQError(
 579                "Job was requested to run directly in the submission directory (work-dir='job_dir' or 'input_dir'), but submission is done from a local filesystem."
 580            )
 581        elif server is not None:
 582            # if we are submitting to a different server
 583            raise QQError(
 584                f"Job was requested to be submitted to server '{server}' which is potentially non-local, but the submission is done from a local filesystem."
 585            )
 586        elif (
 587            remote_host is not None and socket.getfqdn(remote_host) != socket.getfqdn()
 588        ):
 589            # if we are submitting from a different host than the current one
 590            raise QQError(
 591                f"Job was requested to be submitted from host '{remote_host}', but the submission is done from a local filesystem."
 592            )
 593
 594    @classmethod
 595    def _translate_submit(
 596        cls,
 597        res: Resources,
 598        queue: str,
 599        server: str | None,
 600        input_dir: Path,
 601        script: str,
 602        job_name: str,
 603        depend: list[Depend],
 604        env_vars: dict[str, str],
 605    ) -> str:
 606        """
 607        Generate the PBS submission command for a job.
 608
 609        Args:
 610            res (Resources): The resources requested for the job.
 611            queue (str): The queue name to submit to.
 612            server (str): Optional name of the server to submit the job to.
 613            input_dir (Path): The directory from which the job is being submitted.
 614            script (str): Path to the job script.
 615            job_name (str): Name of the job.
 616            depend (list[Depend]): List of dependencies of the job.
 617            env_vars (dict[str, str]): Dictionary of environment variables to set.
 618
 619        Returns:
 620            str: The fully constructed qsub command string.
 621        """
 622        command = f"qsub -N {job_name} {cls._translate_queue_server(queue, server)} {cls._translate_output_server(input_dir, job_name, server)} "
 623
 624        # translate environment variables
 625        if env_vars:
 626            command += f"-v {cls._translate_env_vars(env_vars)} "
 627
 628        # handle per-chunk resources, incl. workdir
 629        translated = cls._translate_per_chunk_resources(res)
 630
 631        # handle properties
 632        if res.props:
 633            translated.extend([f"{k}={v}" for k, v in res.props.items()])
 634
 635        if len(translated) > 0 and res.nnodes and res.nnodes > 1:
 636            # we only use the select syntax when multiple nodes are requested
 637            command += f"-l select={res.nnodes}:"
 638            join_char = ":"
 639        else:
 640            command += "-l "
 641            join_char = ","
 642
 643        command += join_char.join(translated) + " "
 644
 645        # handle walltime
 646        if res.walltime:
 647            command += f"-l walltime={res.walltime} "
 648
 649        if res.nnodes and res.nnodes > 1:
 650            # 'place=scatter' causes each chunk to be placed on a different node
 651            command += "-l place=vscatter "
 652
 653        # handle dependencies
 654        if converted_depend := cls._translate_dependencies(depend):
 655            command += f"-W depend={converted_depend} "
 656
 657        # add script
 658        command += str(input_dir / script)
 659
 660        return command
 661
 662    @classmethod
 663    def _translate_queue_server(cls, queue: str, server: str | None) -> str:
 664        """
 665        Build the PBS queue destination argument for qsub.
 666
 667        Constructs the `-q` string from the queue name and optional server.
 668        If a server is specified, the destination is formatted as `-q queue@server`,
 669        otherwise only the queue name is used.
 670
 671        Args:
 672            queue (str): The name of the target queue.
 673            server (str | None): The target PBS server, or None if submitting to the default server.
 674
 675        Returns:
 676            str: The PBS queue destination argument, e.g. `-q queue@server` or `-q queue`.
 677        """
 678        if server:
 679            return f"-q {queue}@{server}"
 680
 681        return f"-q {queue}"
 682
 683    @classmethod
 684    def _translate_output_server(
 685        cls, input_dir: Path, job_name: str, server: str | None
 686    ) -> str:
 687        """
 688        Build the PBS output redirection arguments for qsub.
 689
 690        Constructs the `-j eo -e` string pointing to the job's `.qqout` file.
 691        If a server is specified and has a configured output host, that host is
 692        prepended to the path as `host:path`. Otherwise, PBS will default to
 693        delivering the output file to the submitting host.
 694
 695        Args:
 696            input_dir (Path): Directory in which the output file will be placed.
 697            job_name (str): Name of the job, used to construct the output filename.
 698            server (str | None): The target PBS server, or None if submitting to the default server.
 699
 700        Returns:
 701            str: The PBS output redirection arguments.
 702        """
 703        qq_output = str((input_dir / job_name).with_suffix(CFG.suffixes.qq_out))
 704        if server:
 705            if output_host := CFG.batch_servers_options.known_output_hosts.get(server):
 706                logger.debug(
 707                    f"Using output host '{output_host}' for server '{server}'."
 708                )
 709                return f"-j eo -e {output_host}:{qq_output}"
 710
 711            logger.warning(
 712                f"No output host configured for server '{server}'. "
 713                "PBS will deliver qqout file to the submitting host (this desktop), "
 714                "which may fail if it is not accessible from the working node."
 715            )
 716
 717        return f"-j eo -e {qq_output}"
 718
 719    @classmethod
 720    def _translate_env_vars(cls, env_vars: dict[str, str]) -> str:
 721        """
 722        Convert a dictionary of environment variables into a formatted string.
 723
 724        Args:
 725            env_vars (dict[str, str]): A mapping of environment variable names
 726                to their corresponding values.
 727
 728        Returns:
 729            str: A comma-separated string of environment variable assignments,
 730                suitable for inclusion in the qsub command.
 731        """
 732        converted = []
 733        for key, value in env_vars.items():
 734            converted.append(f"\"{key}='{value}'\"")
 735
 736        return ",".join(converted)
 737
 738    @classmethod
 739    def _translate_per_chunk_resources(cls, res: Resources) -> list[str]:
 740        """
 741        Convert a Resources object into a list of per-node resource specifications.
 742
 743        Each resource that can be divided by the number of nodes (nnodes) is split
 744        accordingly.
 745
 746        Args:
 747            res (Resources): The resource specification for the job.
 748
 749        Returns:
 750            list[str]: A list of per-node resource strings suitable for inclusion
 751                    in a PBS submission command.
 752
 753        Raises:
 754            QQError: If sanity checks fail or required memory attributes are missing.
 755        """
 756
 757        trans_res = []
 758
 759        # sanity checking per-chunk resources
 760        if not res.nnodes:
 761            raise QQError(
 762                "Attribute 'nnodes' should not be undefined. This is a bug, please report it."
 763            )
 764        if res.nnodes == 0:
 765            raise QQError("Attribute 'nnodes' cannot be 0.")
 766
 767        if res.ncpus and res.ncpus != 0 and res.ncpus % res.nnodes != 0:
 768            raise QQError(
 769                f"Attribute 'ncpus' ({res.ncpus}) must be divisible by 'nnodes' ({res.nnodes})."
 770            )
 771        if res.ngpus and res.ngpus != 0 and res.ngpus % res.nnodes != 0:
 772            raise QQError(
 773                f"Attribute 'ngpus' ({res.ngpus}) must be divisible by 'nnodes' ({res.nnodes})."
 774            )
 775
 776        # translate per-chunk resources
 777        if res.ncpus:
 778            trans_res.append(f"ncpus={res.ncpus // res.nnodes}")
 779            # we need to specify the number of MPI processes so that mpirun uses the correct
 780            # number of sockets; this does not mean that the run script has to use one MPI
 781            # process per CPU core, this value can be overriden
 782            trans_res.append(f"mpiprocs={res.ncpus // res.nnodes}")
 783        elif res.ncpus_per_node:
 784            trans_res.append(f"ncpus={res.ncpus_per_node}")
 785            trans_res.append(f"mpiprocs={res.ncpus_per_node}")
 786
 787        if res.mem:
 788            trans_res.append(f"mem={(res.mem // res.nnodes).to_str_exact()}")
 789        elif res.mem_per_node:
 790            trans_res.append(f"mem={res.mem_per_node.to_str_exact()}")
 791        elif res.mem_per_cpu:
 792            if res.ncpus:
 793                trans_res.append(
 794                    f"mem={(res.mem_per_cpu * res.ncpus // res.nnodes).to_str_exact()}"
 795                )
 796            elif res.ncpus_per_node:
 797                trans_res.append(
 798                    f"mem={(res.mem_per_cpu * res.ncpus_per_node).to_str_exact()}"
 799                )
 800            else:
 801                raise QQError(
 802                    "Attribute 'mem-per-cpu' requires attributes 'ncpus' or 'ncpus-per-node' to be defined."
 803                )
 804        else:
 805            # memory not set in any way
 806            raise QQError(
 807                "None of the attributes 'mem', 'mem-per-node', or 'mem-per-cpu' is defined."
 808            )
 809
 810        if res.ngpus:
 811            trans_res.append(f"ngpus={res.ngpus // res.nnodes}")
 812        elif res.ngpus_per_node:
 813            trans_res.append(f"ngpus={res.ngpus_per_node}")
 814
 815        # translate work-dir
 816        if workdir := cls._translate_work_dir(res):
 817            trans_res.append(workdir)
 818
 819        return trans_res
 820
 821    @classmethod
 822    def _translate_work_dir(cls, res: Resources) -> str | None:
 823        """
 824        Translate the working directory and its requested size into a PBS resource string.
 825
 826        Args:
 827            res (Resources): The resources requested for the job.
 828
 829        Returns:
 830            str | None: Resource string specifying the working directory, or None if input_dir is used.
 831        """
 832        assert res.nnodes is not None
 833
 834        if res.work_dir == "job_dir" or res.work_dir == "input_dir":
 835            return None
 836
 837        if res.work_dir == "scratch_shm":
 838            return f"{res.work_dir}=true"
 839
 840        if res.work_size:
 841            return f"{res.work_dir}={(res.work_size // res.nnodes).to_str_exact()}"
 842        if res.work_size_per_node:
 843            return f"{res.work_dir}={res.work_size_per_node.to_str_exact()}"
 844        if res.work_size_per_cpu:
 845            if res.ncpus:
 846                return f"{res.work_dir}={(res.work_size_per_cpu * res.ncpus // res.nnodes).to_str_exact()}"
 847            if res.ncpus_per_node:
 848                return f"{res.work_dir}={(res.work_size_per_cpu * res.ncpus_per_node).to_str_exact()}"
 849
 850            raise QQError(
 851                "Attribute 'work-size-per-cpu' requires attributes 'ncpus' or 'ncpus-per-node' to be defined."
 852            )
 853
 854        raise QQError(
 855            "None of the attributes 'work-size', 'work-size-per-node', or 'work-size-per-cpu' is defined."
 856        )
 857
 858    @classmethod
 859    def _translate_dependencies(cls, depend: list[Depend]) -> str | None:
 860        """
 861        Convert a list of `Depend` objects into a PBS-compatible dependency string.
 862
 863        Args:
 864            depend (list[Depend]): List of dependency objects to translate.
 865
 866        Returns:
 867            str | None: PBS-style dependency string (e.g., "after:12345,afterok:1:2:3"),
 868                        or None if the input list is empty.
 869        """
 870        if not depend:
 871            return None
 872
 873        return ",".join(Depend.to_str(x).replace("=", ":") for x in depend)
 874
 875    @classmethod
 876    def _collect_ams_env_vars(cls) -> dict[str, str]:
 877        """
 878        Collect environment variables for Infinity AMS.
 879        This allows importing Infinity AMS modules in qq jobs.
 880
 881        Returns:
 882            dict[str, str]: Dictionary of AMS environment variables and their values.
 883        """
 884        ams_vars = {
 885            key: value
 886            for key, value in os.environ.items()
 887            if key
 888            in {
 889                "AMS_ACTIVE_MODULES",
 890                "AMS_SITE",
 891                "AMS_SITE_SUPPORT",
 892                "AMS_EXIT_CODE",
 893                "AMS_USER_CONFIG_DIR",
 894                "AMS_GROUPNS",
 895                "AMS_BUNDLE_NAME",
 896                "AMS_HOST_GROUP",
 897                "AMS_ROOT",
 898                "AMS_ROOT_V9",
 899                "AMS_BUNDLE_PATH",
 900            }
 901        }
 902        logger.debug(f"AMS vars: {ams_vars}")
 903
 904        return ams_vars
 905
 906    @classmethod
 907    def _modify_ams_env_vars(cls, env_vars: dict[str, str], server: str) -> None:
 908        """
 909        Modify environment variables for Infinity AMS if the job is submitted to a different server.
 910        """
 911        # bleh, seriously can't wait to get rid of having to support AMS...
 912        # this is so needlessly complicated
 913
 914        ams_site_converter = {
 915            "robox-pro.ceitec.muni.cz": "robox",
 916            "sokar-pbs.ncbr.muni.cz": "sokar",
 917            "pbs-m1.metacentrum.cz": "metavo24",
 918        }
 919
 920        if server not in ams_site_converter:
 921            logger.warning(
 922                f"Server '{server}' is not supported by the qq-AMS translation layer. The job will not have access to AMS modules. Please report this issue."
 923            )
 924
 925        if "AMS_SITE" in env_vars:
 926            env_vars["AMS_SITE"] = ams_site_converter[server]
 927
 928        ams_site_support_converter = {
 929            "robox-pro.ceitec.muni.cz": "linuxsupport@ics.muni.cz",
 930            "sokar-pbs.ncbr.muni.cz": "support@lcc.ncbr.muni.cz",
 931            "pbs-m1.metacentrum.cz": "support@lcc.ncbr.muni.cz",
 932        }
 933
 934        if "AMS_SITE_SUPPORT" in env_vars:
 935            env_vars["AMS_SITE_SUPPORT"] = ams_site_support_converter[server]
 936
 937        ams_groupns_converter = {
 938            "robox-pro.ceitec.muni.cz": "uvt",
 939            "sokar-pbs.ncbr.muni.cz": "ncbr",
 940            "pbs-m1.metacentrum.cz": "ics",
 941        }
 942
 943        if "AMS_GROUPNS" in env_vars:
 944            env_vars["AMS_GROUPNS"] = ams_groupns_converter[server]
 945
 946    @classmethod
 947    def _get_default_server_resources(cls) -> Resources:
 948        """
 949        Return a Resources object representing the default resources for a batch job.
 950
 951        Returns:
 952            Resources: Default batch job resources with predefined settings.
 953        """
 954        return Resources(
 955            nnodes=1,
 956            ncpus=1,
 957            mem_per_cpu="1gb",
 958            work_dir="scratch_local",
 959            work_size_per_cpu="1gb",
 960            walltime="1d",
 961        )
 962
 963    @classmethod
 964    def _translate_kill_force(cls, job_id: str) -> str:
 965        """
 966        Generate the PBS force kill command for a job.
 967
 968        Args:
 969            job_id (str): The ID of the job to kill.
 970
 971        Returns:
 972            str: The qdel command with force flag.
 973        """
 974        return f"qdel -W force {job_id}"
 975
 976    @classmethod
 977    def _translate_kill(cls, job_id: str) -> str:
 978        """
 979        Generate the standard PBS kill command for a job.
 980
 981        Args:
 982            job_id (str): The ID of the job to kill.
 983
 984        Returns:
 985            str: The qdel command without force flag.
 986        """
 987        return f"qdel {job_id}"
 988
 989    @classmethod
 990    def _sync_directories(
 991        cls,
 992        src_dir: Path,
 993        dest_dir: Path,
 994        src_host: str | None,
 995        dest_host: str | None,
 996        files: list[Path] | None,
 997        sync_function: Callable[
 998            [Path, Path, str | None, str | None, list[Path] | None], None
 999        ],
1000    ) -> None:
1001        """
1002        Synchronize directories either locally or across remote hosts, depending on the environment and setup.
1003
1004        Args:
1005            src_dir (Path): Source directory to sync from.
1006            dest_dir (Path): Destination directory to sync to.
1007            src_host (str | None): Hostname of the source machine if remote; None if local.
1008            dest_host (str | None): Hostname of the destination machine if remote; None if local.
1009            files (list[Path] | None): Optional list of file paths to include or exclude, depending on `sync_function`.
1010            sync_function (Callable): Function to perform the actual synchronization.
1011
1012        Raises:
1013            QQError: If both source and destination hosts are remote and cannot be
1014                accessed simultaneously, or if syncing fails internally.
1015        """
1016        if os.environ.get(CFG.env_vars.shared_submit):
1017            # input_dir is on shared storage -> we can copy files from/to it without connecting to the remote host
1018            logger.debug("Syncing directories on local and shared filesystem.")
1019            sync_function(src_dir, dest_dir, None, None, files)
1020        else:
1021            # input_dir is not on shared storage -> fall back to the default implementation
1022            logger.debug("Syncing directories on local filesystems.")
1023
1024            # convert local hosts to none
1025            local_hostname = socket.getfqdn()
1026            src = None if src_host == local_hostname else src_host
1027            dest = None if dest_host == local_hostname else dest_host
1028
1029            if src is None or dest is None:
1030                sync_function(src_dir, dest_dir, src, dest, files)
1031            else:
1032                raise QQError(
1033                    f"The source '{src_host}' and destination '{dest_host}' cannot be both remote."
1034                )
1035
1036    @classmethod
1037    def _get_batch_jobs_using_command(
1038        cls,
1039        command: str,
1040        include_completed: bool,
1041        include_top_level_array: bool,
1042        ignore_exit_code: bool,
1043    ) -> list[PBSJob]:
1044        """
1045        Execute a shell command to retrieve information about PBS jobs and parse it.
1046
1047        Args:
1048            command (str): The shell command to execute, typically a PBS query command.
1049            include_completed (bool): Include both completed and uncompleted jobs.
1050                If `False`, completed jobs are filtered out from the output.
1051            include_top_level_array (bool): Include top-level array jobs in the output.
1052                If `False`, top-level array jobs are filtered out from the output.
1053            ignore_exit_code (bool): Ignore the exit code of the command.
1054                If `False`, the command must return a zero exit code.
1055
1056        Returns:
1057            list[PBSJob]: A list of `PBSJob` instances corresponding to the jobs
1058                            returned by the command.
1059
1060        Raises:
1061            QQError: If the command fails (non-zero return code) or if the output
1062                    cannot be parsed into valid job information.
1063        """
1064        ...
1065        result = subprocess.run(
1066            # -oL (line-buffer stdout), -eL (line-buffer stderr)
1067            # necessary for stdout and stderr merging
1068            ["stdbuf", "-oL", "-eL", "bash"],
1069            input=command,
1070            text=True,
1071            check=False,
1072            stdout=subprocess.PIPE,
1073            stderr=subprocess.STDOUT,
1074            errors="replace",
1075        )
1076
1077        if not ignore_exit_code and result.returncode != 0:
1078            raise QQError(
1079                # standard error is written to stdout
1080                f"Could not retrieve information about jobs: {result.stdout.strip()}."
1081            )
1082
1083        jobs = []
1084        for data, job_id in parse_multi_pbs_dump_to_dictionaries(
1085            result.stdout.strip(), "Job Id"
1086        ):
1087            job = PBSJob.from_dict(job_id, data)
1088
1089            if not include_top_level_array and job.is_array_job():
1090                continue
1091
1092            # unless all jobs are to be shown, filter out completed jobs
1093            # this is necessary because the output from PBS will always contain
1094            # completed job tasks from array jobs that are uncompleted
1095            # we do not want qq to show them
1096            if not include_completed and job.is_completed():
1097                continue
1098
1099            jobs.append(job)
1100
1101        return jobs

Implementation of BatchInterface for PBS Pro batch system.

SUPPORTED_SCRATCHES = ['scratch_local', 'scratch_ssd', 'scratch_shared']
@classmethod
def env_name(cls) -> str:
38    @classmethod
39    def env_name(cls) -> str:
40        return "PBS"

Return the name of the batch system environment.

Returns:

str: The batch system name.

@classmethod
def is_available(cls) -> bool:
42    @classmethod
43    def is_available(cls) -> bool:
44        return shutil.which("qsub") is not None

Determine whether the batch system is available on the current host.

Implementations typically verify this by checking for the presence of required commands or other environment-specific indicators.

Returns:

bool: True if the batch system is available, False otherwise.

@classmethod
def get_job_id(cls) -> str | None:
46    @classmethod
47    def get_job_id(cls) -> str | None:
48        return os.environ.get("PBS_JOBID")

Get the id of the current job from the corresponding batch system's environment variable.

For this method to work, it has to be called from the inside of an active job.

Returns:

str | None: Index of the job or None if the collective variable is not set.

@classmethod
def create_work_dir_on_scratch(cls, job_id: str) -> pathlib._local.Path:
50    @classmethod
51    def create_work_dir_on_scratch(cls, job_id: str) -> Path:
52        scratch_dir = cls._get_scratch_dir(job_id)
53
54        # create working directory inside the scratch directory allocated by the batch system
55        # we create this directory because other processes may write files
56        # into the allocated scratch directory and we do not want these files
57        # to affect the job execution or be copied back to input_dir
58        # this also simplifies deletion of the working directory
59        # (the allocated scratch dir cannot be deleted)
60        work_dir = logical_resolve(scratch_dir / CFG.pbs_options.scratch_dir_inner)
61
62        logger.debug(f"Creating working directory '{str(work_dir)}'.")
63        work_dir.mkdir(exist_ok=True)
64
65        return work_dir

Create the working directory on scratch for the given job.

Arguments:
  • job_id (int): Unique identifier of the job.
Returns:

Path: Absolute path to the working directory on scratch.

Raises:
  • QQError: If the working directory could not be created.
@classmethod
def job_submit( cls, res: qq_lib.properties.resources.Resources, queue: str, script: pathlib._local.Path, job_name: str, depend: list[qq_lib.properties.depend.Depend], env_vars: dict[str, str], account: str | None = None, server: str | None = None, remote_host: str | None = None) -> str:
 67    @classmethod
 68    def job_submit(
 69        cls,
 70        res: Resources,
 71        queue: str,
 72        script: Path,
 73        job_name: str,
 74        depend: list[Depend],
 75        env_vars: dict[str, str],
 76        account: str | None = None,
 77        server: str | None = None,
 78        remote_host: str | None = None,
 79    ) -> str:
 80        # account unused
 81        _ = account
 82
 83        input_dir = script.parent
 84        logger.debug(f"Job submission: input directory is '{str(input_dir)}'.")
 85
 86        cls._shared_guard(input_dir, res, env_vars, server, remote_host)
 87
 88        # set env vars required for Infinity modules
 89        # this can be removed once Infinity stops being supported
 90        env_vars.update(cls._collect_ams_env_vars())
 91
 92        # if we are submitting to a different server, we need to change the AMS site
 93        # this can be removed once Infinity stops being supported
 94        if server:
 95            cls._modify_ams_env_vars(env_vars, server)
 96
 97        # get the submission command
 98        command = cls._translate_submit(
 99            res,
100            queue,
101            server,
102            script.parent,
103            str(script),
104            job_name,
105            depend,
106            env_vars,
107        )
108        logger.debug(command)
109
110        if not remote_host:
111            # submit the script from the current host
112            result = subprocess.run(
113                ["bash"],
114                input=command,
115                text=True,
116                check=False,
117                capture_output=True,
118                errors="replace",
119            )
120        else:
121            # submit the script from the remote host
122            logger.debug(
123                f"Navigating to '{remote_host}' to execute the submission command '{command}'."
124            )
125            result = subprocess.run(
126                [
127                    "ssh",
128                    "-o PasswordAuthentication=no",
129                    "-o GSSAPIAuthentication=yes",
130                    "-o StrictHostKeyChecking=no",  # allow unknown hosts
131                    f"-o ConnectTimeout={CFG.timeouts.ssh}",
132                    "-q",  # suppress some SSH messages
133                    remote_host,
134                    command,
135                ],
136                capture_output=True,
137                text=True,
138            )
139
140        if result.returncode != 0:
141            raise QQError(
142                f"Failed to submit script '{str(script)}': {result.stderr.strip()}."
143            )
144
145        return result.stdout.strip()

Submit a job to the batch system.

Can also perform additional validation of the job's resources.

This method is NOT guaranteed to be thread-safe.

Arguments:
  • res (Resources): Resources required for the job.
  • queue (str): Target queue for the job submission.
  • script (Path): Path to the script to execute.
  • job_name (str): Name of the job to use.
  • depend (list[Depend]): List of job dependencies.
  • env_vars (dict[str, str]): Dictionary of environment variables to propagate to the job.
  • account (str | None): Optional account name to use for the job.
  • server (str | None): Optional name of the server to submit the job to.
  • remote_host (str | None): Optional name of the machine to submit the job from.
Returns:

str: Unique ID of the submitted job.

Raises:
  • QQError: If the job submission fails.
@classmethod
def job_kill(cls, job_id: str) -> None:
147    @classmethod
148    def job_kill(cls, job_id: str) -> None:
149        command = cls._translate_kill(job_id)
150        logger.debug(command)
151
152        # run the kill command
153        result = subprocess.run(
154            ["bash"],
155            input=command,
156            text=True,
157            check=False,
158            capture_output=True,
159            errors="replace",
160        )
161
162        if result.returncode != 0:
163            raise QQError(f"Failed to kill job '{job_id}': {result.stderr.strip()}.")

Terminate a job gracefully. The job should have time for proper cleanup.

Arguments:
  • job_id (str): Identifier of the job to terminate.
Raises:
  • QQError: If the job could not be killed.
@classmethod
def job_kill_force(cls, job_id: str) -> None:
165    @classmethod
166    def job_kill_force(cls, job_id: str) -> None:
167        command = cls._translate_kill_force(job_id)
168        logger.debug(command)
169
170        # run the kill command
171        result = subprocess.run(
172            ["bash"],
173            input=command,
174            text=True,
175            check=False,
176            capture_output=True,
177            errors="replace",
178        )
179
180        if result.returncode != 0:
181            raise QQError(f"Failed to kill job '{job_id}': {result.stderr.strip()}.")

Forcefully terminate a job. There may be no time for proper cleanup.

Arguments:
  • job_id (str): Identifier of the job to forcefully terminate.
Raises:
  • QQError: If the job could not be killed.
@classmethod
def get_batch_job(cls, job_id: str) -> PBSJob:
183    @classmethod
184    def get_batch_job(cls, job_id: str) -> PBSJob:
185        return PBSJob(job_id)

Retrieve information about a job from the batch system.

The returned object should be fully initialized, even if the job no longer exists or its information is unavailable.

Arguments:
  • job_id (str): Identifier of the job.
Returns:

BatchJobInterface: Object containing the job's metadata and state.

@classmethod
def get_batch_jobs_from_ids(cls, job_ids: list[str]) -> list[PBSJob]:
187    @classmethod
188    def get_batch_jobs_from_ids(cls, job_ids: list[str]) -> list[PBSJob]:
189        if not job_ids:
190            return []
191
192        command = f"qstat -fxw {' '.join(job_ids)}"
193        logger.debug(command)
194        return cls._get_batch_jobs_using_command(
195            command,
196            include_completed=True,
197            include_top_level_array=True,
198            ignore_exit_code=True,
199        )

Retrieve information about multiple jobs from the batch system.

Batch jobs should be returned in the same order as they appear in job_ids. A TBatchJob object should be returned for each job id, even if the job no longer exists or its information is unavailable.

Array jobs should NOT be expanded into their individual tasks.

The default implementation is to call get_batch_job for each job id. This implementation may be inefficient for large numbers of job ids and should be overriden by subclasses.

Arguments:
  • job_ids (list[str]): List of job identifiers.
Returns:

list[TBatchJob]: List of TBatchJob objects, one for each job id.

@classmethod
def get_unfinished_batch_jobs( cls, user: str, server: str | None = None) -> list[PBSJob]:
201    @classmethod
202    def get_unfinished_batch_jobs(
203        cls, user: str, server: str | None = None
204    ) -> list[PBSJob]:
205        command = f"qstat -fwtu {user}"
206        if server:
207            command += f" @{server}"
208        logger.debug(command)
209        return cls._get_batch_jobs_using_command(
210            command,
211            include_completed=False,
212            include_top_level_array=True,
213            ignore_exit_code=False,
214        )

Retrieve information about all uncompleted jobs submitted by user on the specified or default batch server.

The jobs can be returned in arbitrary order.

Arguments:
  • user (str): Username for which to fetch uncompleted jobs.
  • server (str | None): Optional name of the batch server to get jobs from.
Returns:

list[BatchJobInterface]: A list of job info objects representing the user's uncompleted jobs.

@classmethod
def get_batch_jobs( cls, user: str, server: str | None = None) -> list[PBSJob]:
216    @classmethod
217    def get_batch_jobs(cls, user: str, server: str | None = None) -> list[PBSJob]:
218        command = f"qstat -fwxtu {user}"
219        if server:
220            command += f" @{server}"
221        logger.debug(command)
222        return cls._get_batch_jobs_using_command(
223            command,
224            include_completed=True,
225            include_top_level_array=True,
226            ignore_exit_code=False,
227        )

Retrieve information about all jobs submitted by a specific user (including finished jobs) on the specified or default batch server.

The jobs can be returned in arbitrary order.

Arguments:
  • user (str): Username for which to fetch all jobs.
  • server (str | None): Optional name of the batch server to get jobs from.
Returns:

list[BatchJobInterface]: A list of job info objects representing all jobs of the user.

@classmethod
def get_all_unfinished_batch_jobs(cls, server: str | None = None) -> list[PBSJob]:
229    @classmethod
230    def get_all_unfinished_batch_jobs(cls, server: str | None = None) -> list[PBSJob]:
231        command = "qstat -fwt"
232        if server:
233            command += f" @{server}"
234        logger.debug(command)
235        return cls._get_batch_jobs_using_command(
236            command,
237            include_completed=False,
238            include_top_level_array=True,
239            ignore_exit_code=False,
240        )

Retrieve information about uncompleted jobs of all users on the specified or default batch server.

The jobs can be returned in arbitrary order.

Arguments:
  • server (str | None): Optional name of the batch server to get jobs from.
Returns:

list[BatchJobInterface]: A list of job info objects representing uncompleted jobs of all users.

@classmethod
def get_all_batch_jobs(cls, server: str | None = None) -> list[PBSJob]:
242    @classmethod
243    def get_all_batch_jobs(cls, server: str | None = None) -> list[PBSJob]:
244        command = "qstat -fxwt"
245        if server:
246            command += f" @{server}"
247        logger.debug(command)
248        return cls._get_batch_jobs_using_command(
249            command,
250            include_completed=True,
251            include_top_level_array=True,
252            ignore_exit_code=False,
253        )

Retrieve information about all jobs of all users on the specified or default batch server.

The jobs can be returned in arbitrary order.

Arguments:
  • server (str | None): Optional name of the batch server to get jobs from.
Returns:

list[BatchJobInterface]: A list of job info objects representing all jobs of all users.

@classmethod
def get_queues(cls, server: str | None = None) -> list[PBSQueue]:
255    @classmethod
256    def get_queues(cls, server: str | None = None) -> list[PBSQueue]:
257        command = "qstat -Qfw"
258        if server:
259            command += f" @{server}"
260        logger.debug(command)
261
262        result = subprocess.run(
263            ["bash"],
264            input=command,
265            text=True,
266            check=False,
267            capture_output=True,
268            errors="replace",
269        )
270
271        if result.returncode != 0:
272            raise QQError(
273                f"Could not retrieve information about queues: {result.stderr.strip()}."
274            )
275
276        queues = []
277        for data, name in parse_multi_pbs_dump_to_dictionaries(
278            result.stdout.strip(), "Queue"
279        ):
280            queues.append(PBSQueue.from_dict(name, server, data))
281
282        return queues

Retrieve all queues managed by the batch system on the specified or default batch server.

Arguments:
  • server (str | None): Optional name of the batch server to get queues from.
Returns:

list[BatchQueueInterface]: A list of queue objects existing in the batch system.

@classmethod
def get_nodes(cls, server: str | None = None) -> list[PBSNode]:
284    @classmethod
285    def get_nodes(cls, server: str | None = None) -> list[PBSNode]:
286        command = "pbsnodes -a"
287        if server:
288            command += f" -s {server}"
289        logger.debug(command)
290
291        result = subprocess.run(
292            ["bash"],
293            input=command,
294            text=True,
295            check=False,
296            capture_output=True,
297            errors="replace",
298        )
299
300        if result.returncode != 0:
301            raise QQError(
302                f"Could not retrieve information about nodes: {result.stderr.strip()}."
303            )
304
305        queues = []
306        for data, name in parse_multi_pbs_dump_to_dictionaries(
307            result.stdout.strip(), None
308        ):
309            queues.append(PBSNode.from_dict(name, server, data))
310
311        return queues

Retrieve all nodes managed by the batch system on the specified or default batch server.

Arguments:
  • server (str | None): Optional name of the batch server to get nodes from.
Returns:

list[BatchNodeInterface]: A list of node objects existing in the batch system.

@classmethod
def get_supported_work_dir_types(cls) -> list[str]:
313    @classmethod
314    def get_supported_work_dir_types(cls) -> list[str]:
315        return cls.SUPPORTED_SCRATCHES + [
316            "scratch_shm",
317            "input_dir",
318            "job_dir",  # same as input_dir
319        ]

Retrieve the list of supported types of working directories (i.e., strings that can be used with the --work-dir option).

Returns:

list[str]: A list of supported types of working directories.

@classmethod
def read_remote_file(cls, host: str, file: pathlib._local.Path) -> str:
321    @classmethod
322    def read_remote_file(cls, host: str, file: Path) -> str:
323        if os.environ.get(CFG.env_vars.shared_submit):
324            # file is on shared storage, we can read it directly
325            # this assumes that this method is only used to read files in input_dir
326            logger.debug(f"Reading a file '{file}' from shared storage.")
327            try:
328                return file.read_text()
329            except Exception as e:
330                raise QQError(f"Could not read file '{file}': {e}.") from e
331        else:
332            # otherwise, we fall back to the default implementation
333            logger.debug(f"Reading a remote file '{file}' on '{host}'.")
334            return super().read_remote_file(host, file)

Read the contents of a file on a remote host and return it as a string.

The default implementation uses SSH to retrieve the file contents. This approach may be inefficient on shared storage or high-latency networks. Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.

Subclasses should override this method to provide a more efficient implementation if possible.

Arguments:
  • host (str): The hostname of the remote machine where the file resides.
  • file (Path): The path to the file on the remote host.
Returns:

str: The contents of the remote file.

Raises:
  • QQError: If the file cannot be read or SSH fails.
@classmethod
def write_remote_file(cls, host: str, file: pathlib._local.Path, content: str) -> None:
336    @classmethod
337    def write_remote_file(cls, host: str, file: Path, content: str) -> None:
338        if os.environ.get(CFG.env_vars.shared_submit):
339            # file should be written to shared storage
340            # this assumes that the method is only used to write files into input_dir
341            logger.debug(f"Writing a file '{file}' to shared storage.")
342            try:
343                file.write_text(content)
344            except Exception as e:
345                raise QQError(f"Could not write file '{file}': {e}.") from e
346        else:
347            # otherwise, we fall back to the default implementation
348            logger.debug(f"Writing a remote file '{file}' on '{host}'.")
349            super().write_remote_file(host, file, content)

Write the given content to a file on a remote host, overwriting it if it exists.

The default implementation uses SSH to send the content to the remote file. This approach may be inefficient on shared storage or high-latency networks. Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.

Subclasses should override this method to provide a more efficient implementation if possible.

Arguments:
  • host (str): The hostname of the remote machine where the file resides.
  • file (Path): The path to the file on the remote host.
  • content (str): The content to write to the remote file.
Raises:
  • QQError: If the file cannot be written or SSH fails.
@classmethod
def make_remote_dir(cls, host: str, directory: pathlib._local.Path) -> None:
351    @classmethod
352    def make_remote_dir(cls, host: str, directory: Path) -> None:
353        if os.environ.get(CFG.env_vars.shared_submit):
354            # assuming the directory is created in input_dir
355            logger.debug(f"Creating a directory '{directory}' on shared storage.")
356            try:
357                directory.mkdir(exist_ok=True)
358            except Exception as e:
359                raise QQError(
360                    f"Could not create a directory '{directory}': {e}."
361                ) from e
362        else:
363            # otherwise we fall back to the default implementation
364            logger.debug(f"Creating a directory '{directory}' on '{host}'.")
365            super().make_remote_dir(host, directory)

Create a directory at the specified path on a remote host.

The default implementation uses SSH to run mkdir on the remote host. This approach may be inefficient on shared storage or high-latency networks. Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.

Subclasses should override this method to provide a more efficient implementation if possible.

Arguments:
  • host (str): The hostname of the remote machine where the directory should be created.
  • directory (Path): The path of the directory to create on the remote host.
Raises:
  • QQError: If the directory cannot be created but does not already exist or the SSH command fails.
@classmethod
def list_remote_dir( cls, host: str, directory: pathlib._local.Path) -> list[pathlib._local.Path]:
367    @classmethod
368    def list_remote_dir(cls, host: str, directory: Path) -> list[Path]:
369        if os.environ.get(CFG.env_vars.shared_submit):
370            # assuming we are listing input_dir or another directory on shared storage
371            logger.debug(f"Listing a directory '{directory}' on shared storage.")
372            try:
373                return list(directory.iterdir())
374            except Exception as e:
375                raise QQError(f"Could not list a directory '{directory}': {e}.") from e
376        else:
377            # otherwise we fall back to the default implementation
378            logger.debug(f"Listing a directory '{directory}' on '{host}'.")
379            return super().list_remote_dir(host, directory)

List all files and directories (absolute paths) in the specified directory on a remote host.

The default implementation uses SSH to run ls -A on the remote host. This approach may be inefficient on shared storage or high-latency networks. Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.

Subclasses should override this method to provide a more efficient implementation if possible.

Arguments:
  • host (str): The hostname of the remote machine where the directory resides.
  • directory (Path): The remote directory to list.
Returns:

list[Path]: A list of Path objects representing the entries inside the directory. Entries are relative to the given directory.

Raises:
  • QQError: If the directory cannot be listed or the SSH command fails.
@classmethod
def delete_remote_dir(cls, host: str, directory: pathlib._local.Path) -> None:
381    @classmethod
382    def delete_remote_dir(cls, host: str, directory: Path) -> None:
383        if host == socket.getfqdn():
384            # directory is available on the current host
385            logger.debug(f"Deleting a directory '{directory}' on local host.")
386            try:
387                shutil.rmtree(directory)
388            except Exception as e:
389                raise QQError(f"Could not delete directory '{directory}': {e}.") from e
390        else:
391            # otherwise we fall back to the default implementation
392            logger.debug(f"Deleting a directory '{directory}' on '{host}'.")
393            return super().delete_remote_dir(host, directory)

Delete a directory on a remote host.

The default implementation uses SSH to run rm -r on the remote host. This approach may be inefficient on shared storage or high-latency networks. Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.

Subclasses should override this method to provide a more efficient implementation if possible.

Arguments:
  • host (str): The hostname of the remote machine where the directory resides.
  • directory (Path): The remote directory to delete.
Raises:
  • QQError: If the directory cannot be deleted or the SSH command fails.
@classmethod
def move_remote_files( cls, host: str, files: list[pathlib._local.Path], moved_files: list[pathlib._local.Path]) -> None:
395    @classmethod
396    def move_remote_files(
397        cls, host: str, files: list[Path], moved_files: list[Path]
398    ) -> None:
399        if len(files) != len(moved_files):
400            raise QQError(
401                "The provided 'files' and 'moved_files' must have the same length."
402            )
403
404        if os.environ.get(CFG.env_vars.shared_submit):
405            # assuming we are moving files inside input_dir or another directory on shared storage
406            logger.debug(
407                f"Moving files '{files}' -> '{moved_files}' on a shared storage."
408            )
409            for src, dst in zip(files, moved_files):
410                shutil.move(str(src), str(dst))
411        else:
412            # otherwise we fall back to the default implementation
413            logger.debug(f"Moving files '{files}' -> '{moved_files}' on '{host}'.")
414            super().move_remote_files(host, files, moved_files)

Move files on a remote host from their current paths to new paths.

The default implementation uses SSH to run a sequence of mv commands on the remote host. This approach may be inefficient on shared storage or high-latency networks. Note that the timeout for the SSH connection is set to CFG.timeouts.ssh seconds.

Subclasses should override this method to provide a more efficient implementation if possible.

Arguments:
  • host (str): The hostname of the remote machine where the files reside.
  • files (list[Path]): A list of source file paths on the remote host.
  • moved_files (list[Path]): A list of destination file paths on the remote host. Must be the same length as files.
Raises:
  • QQError: If the SSH command fails, the files cannot be moved or the length of files does not match the length of moved_files.
@classmethod
def sync_with_exclusions( cls, src_dir: pathlib._local.Path, dest_dir: pathlib._local.Path, src_host: str | None, dest_host: str | None, exclude_files: list[pathlib._local.Path] | None = None) -> None:
416    @classmethod
417    def sync_with_exclusions(
418        cls,
419        src_dir: Path,
420        dest_dir: Path,
421        src_host: str | None,
422        dest_host: str | None,
423        exclude_files: list[Path] | None = None,
424    ) -> None:
425        cls._sync_directories(
426            src_dir,
427            dest_dir,
428            src_host,
429            dest_host,
430            exclude_files,
431            super().sync_with_exclusions,
432        )

Synchronize the contents of two directories using rsync, optionally across remote hosts, while excluding specified files or subdirectories.

All files and directories in src_dir are copied to dest_dir except those listed in exclude_files. Files are never removed from the destination.

Arguments:
  • src_dir (Path): Source directory to sync from.
  • dest_dir (Path): Destination directory to sync to.
  • src_host (str | None): Optional hostname of the source machine if remote; None if the source is local.
  • dest_host (str | None): Optional hostname of the destination machine if remote; None if the destination is local.
  • exclude_files (list[Path] | None): Optional list of absolute file paths to exclude from syncing. These will be converted to paths relative to src_dir.
Raises:
  • QQError: If the rsync command fails for any reason or timeouts.
@classmethod
def sync_selected( cls, src_dir: pathlib._local.Path, dest_dir: pathlib._local.Path, src_host: str | None, dest_host: str | None, include_files: list[pathlib._local.Path] | None = None) -> None:
434    @classmethod
435    def sync_selected(
436        cls,
437        src_dir: Path,
438        dest_dir: Path,
439        src_host: str | None,
440        dest_host: str | None,
441        include_files: list[Path] | None = None,
442    ) -> None:
443        cls._sync_directories(
444            src_dir,
445            dest_dir,
446            src_host,
447            dest_host,
448            include_files,
449            super().sync_selected,
450        )

Synchronize only the explicitly selected files and directories from the source to the destination, optionally across remote hosts.

Only files listed in include_files are copied from src_dir to dest_dir. Files not listed are ignored. Files are never removed from the destination.

Arguments:
  • src_dir (Path): Source directory to sync from.
  • dest_dir (Path): Destination directory to sync to.
  • src_host (str | None): Optional hostname of the source machine if remote; None if the source is local.
  • dest_host (str | None): Optional hostname of the destination machine if remote; None if the destination is local.
  • include_files (list[Path] | None): Optional list of absolute file paths to include in syncing. These paths are converted relative to src_dir. This argument is optional only for consistency with sync_with_exclusions.
Raises:
  • QQError: If the rsync command fails or times out.
@classmethod
def transform_resources( cls, queue: str, server: str | None, provided_resources: qq_lib.properties.resources.Resources) -> qq_lib.properties.resources.Resources:
452    @classmethod
453    def transform_resources(
454        cls, queue: str, server: str | None, provided_resources: Resources
455    ) -> Resources:
456        # default resources of the queue
457        default_queue_resources = PBSQueue(queue, server).get_default_resources()
458        # default hard-coded resources
459        default_batch_resources = cls._get_default_server_resources()
460
461        # fill in default parameters
462        resources = Resources.merge_resources(
463            provided_resources, default_queue_resources, default_batch_resources
464        )
465        if not resources.work_dir:
466            raise QQError(
467                "Work-dir is not set after filling in default attributes. This is a bug."
468            )
469
470        # sanity check input_dir
471        if equals_normalized(resources.work_dir, "job_dir") or equals_normalized(
472            resources.work_dir, "input_dir"
473        ):
474            # work-size should not be used with job_dir
475            if provided_resources.work_size:
476                logger.warning(
477                    "Setting work-size is not supported for work-dir='job_dir' or 'input_dir'.\n"
478                    "Job will run in the submission directory with unlimited capacity.\n"
479                    "The work-size attribute will be ignored."
480                )
481
482            resources.work_dir = "input_dir"
483            resources.work_size = None
484            resources.work_size_per_cpu = None
485            return resources
486
487        # scratch in RAM (https://docs.metacentrum.cz/en/docs/computing/infrastructure/scratch-storages#scratch-in-ram)
488        if equals_normalized(resources.work_dir, "scratch_shm"):
489            # work-size should not be used with scratch_shm
490            if provided_resources.work_size:
491                logger.warning(
492                    "Setting work-size is not supported for work-dir='scratch_shm'.\n"
493                    "Size of the in-RAM scratch is specified using the --mem property.\n"
494                    "The work-size attribute will be ignored."
495                )
496
497            resources.work_dir = "scratch_shm"
498            resources.work_size = None
499            resources.work_size_per_cpu = None
500            return resources
501
502        # if work-dir matches any of the "standard" scratches supported by PBS
503        if match := next(
504            (
505                x
506                for x in cls.SUPPORTED_SCRATCHES
507                if equals_normalized(x, resources.work_dir)
508            ),
509            None,
510        ):
511            resources.work_dir = match
512            return resources
513
514        # unknown work-dir type
515        raise QQError(
516            f"Unknown working directory type specified: work-dir='{resources.work_dir}'. Supported types for {cls.env_name()} are: '{' '.join(cls.get_supported_work_dir_types())}'."
517        )

Transform user-provided Resources into a batch system-specific Resources instance.

This method takes the resources provided during submission and returns a new Resources object with any necessary modifications or defaults applied for the target batch system. The original provided_resources object is not modified.

Arguments:
  • queue (str): The name of the queue for which the resources are being adapted.
  • server (str | None): Name of the server on which the queue is located. If None, the queue is treated as being located on the current server.
  • provided_resources (Resources): The raw resources specified by the user.
Returns:

Resources: A new Resources instance with batch system-specific adjustments, fully constructed and validated.

Raises:
  • QQError: If any of the provided parameters are invalid or inconsistent.
@classmethod
def sort_jobs(cls, jobs: list[PBSJob]) -> None:
519    @classmethod
520    def sort_jobs(cls, jobs: list[PBSJob]) -> None:
521        # jobs with invalid ID get assigned an ID of 0 for sorting => they are sorted to the start
522        # and therefore are displayed at the top in the qq jobs / qq stat output
523        jobs.sort(key=lambda job: job.get_id_int() or 0)

Sort a list of batch system jobs by a defined attribute.

The default implementation sorts the jobs alphabetically by their job ID, as returned by job.get_id(). Subclasses may override this method to implement custom sorting logic.

Arguments:
  • jobs (list[BatchJobInterface]): A list of batch job objects to be sorted in-place.
class PBSQueue(qq_lib.batch.interface.queue.BatchQueueInterface):
 84class PBSQueue(BatchQueueInterface):
 85    """
 86    Implementation of BatchQueueInterface for PBS.
 87    Stores metadata for a single PBS queue.
 88    """
 89
 90    def __init__(self, name: str, server: str | None = None):
 91        self._name = name
 92        self._server = server
 93        self._info: dict[str, str] = {}
 94
 95        self.update()
 96
 97    def update(self) -> None:
 98        # get queue info from PBS
 99        command = f"qstat -Qfw {self._name}"
100        if self._server:
101            command += f"@{self._server}"
102
103        result = subprocess.run(
104            ["bash"],
105            input=command,
106            text=True,
107            check=False,
108            capture_output=True,
109            errors="replace",
110        )
111
112        if result.returncode != 0:
113            raise QQError(f"Queue '{self._name}' does not exist.")
114
115        self._info = parse_pbs_dump_to_dictionary(result.stdout)
116        self._set_attributes()
117
118    def get_name(self) -> str:
119        return self._name
120
121    def get_priority(self) -> str | None:
122        return self._info.get("Priority")
123
124    def get_total_jobs(self) -> int | None:
125        return PBSQueue._get_int_value(self._info, "total_jobs")
126
127    def get_running_jobs(self) -> int | None:
128        return PBSQueue._get_int_value(self._job_numbers, "Running")
129
130    def get_queued_jobs(self) -> int | None:
131        # we count held and waiting jobs as queued for consistency with Slurm
132        return (
133            (PBSQueue._get_int_value(self._job_numbers, "Queued") or 0)
134            + (PBSQueue._get_int_value(self._job_numbers, "Held") or 0)
135            + (PBSQueue._get_int_value(self._job_numbers, "Waiting") or 0)
136        )
137
138    def get_other_jobs(self) -> int | None:
139        return (
140            (PBSQueue._get_int_value(self._job_numbers, "Transit") or 0)
141            + (PBSQueue._get_int_value(self._job_numbers, "Exiting") or 0)
142            + (PBSQueue._get_int_value(self._job_numbers, "Begun") or 0)
143        )
144
145    def get_max_walltime(self) -> timedelta | None:
146        if raw_time := self._info.get("resources_max.walltime"):
147            return hhmmss_to_duration(raw_time)
148
149        return None
150
151    def get_max_n_nodes(self) -> int | None:
152        return PBSQueue._get_int_value(self._info, "resources_max.nodect")
153
154    def get_comment(self) -> str | None:
155        if not (raw_comment := self._info.get("comment")):
156            return None
157
158        return raw_comment.split("|", 1)[0]
159
160    def is_available_to_user(self, user: str) -> bool:
161        # queues that are not enabled or not started are unavailable to all users
162        if self._info.get("enabled") != "True" or self._info.get("started") != "True":
163            return False
164
165        # check acl users
166        if self._info.get("acl_user_enable") == "True":
167            acl_users = self._acl_users
168            if user not in acl_users:
169                return False
170
171        # check acl groups
172        if self._info.get("acl_group_enable") == "True":
173            expected_acl_groups = self._acl_groups
174            users_acl_groups = ACLData.get_groups_or_init(user)
175            if not any(item in expected_acl_groups for item in users_acl_groups):
176                return False
177
178        # check acl hosts
179        if (host := self._info.get("acl_host_enable")) == "True":
180            acl_hosts = self._acl_hosts
181            host = ACLData.get_host_or_init()
182            if host not in acl_hosts:
183                return False
184
185        return True
186
187    def get_destinations(self) -> list[str]:
188        if raw_destinations := self._info.get("route_destinations"):
189            return raw_destinations.split(",")
190
191        return []
192
193    def from_route_only(self) -> bool:
194        return self._info.get("from_route_only") == "True"
195
196    def to_yaml(self) -> str:
197        # we need to add queue name to the start of the dictionary
198        to_dump = {"Queue": self._name} | self._info
199        return yaml.dump(
200            to_dump, default_flow_style=False, sort_keys=False, Dumper=Dumper
201        )
202
203    def get_default_resources(self) -> Resources:
204        default_resources = {}
205
206        for key, value in self._info.items():
207            if "resources_default" in key:
208                resource = key.split(".")[-1]
209                default_resources[resource.strip()] = value.strip()
210
211        # filter resources that are part of Resources
212        field_names = {f.name for f in fields(Resources)}
213        return Resources(
214            **{k: v for k, v in default_resources.items() if k in field_names}
215        )
216
217    @staticmethod
218    def _get_int_value(dict: dict[str, str], key: str) -> int | None:
219        """
220        Retrieve an integer value from the provided dictionary.
221
222        Args:
223            key (str): The key to look up in the dictionary.
224
225        Returns:
226            int | None: The integer value if conversion succeeds, otherwise `None`.
227        """
228        if not (raw := dict.get(key)):
229            return None
230
231        try:
232            return int(raw)
233        except ValueError as e:
234            logger.debug(
235                f"Could not parse '{key}' value of '{raw}' as an integer: {e}."
236            )
237            return None
238
239    def _set_attributes(self) -> None:
240        """
241        Initialize derived queue attributes to avoid redundant parsing.
242        """
243        self._set_job_numbers()
244        self._acl_users = self._info.get("acl_users", "").split(",")
245        self._acl_groups = self._info.get("acl_groups", "").split(",")
246        self._acl_hosts = self._info.get("acl_hosts", "").split(",")
247
248    @classmethod
249    def from_dict(cls, name: str, server: str | None, info: dict[str, str]) -> Self:
250        """
251        Construct a new instance of PBSQueue from a queue name and a dictionary of queue information.
252
253
254        Args:
255            name (str): The unique name of the queue.
256            server (str | None): Server on which the queue is located. If `None`, assumes the current server.
257            info (dict[str, str]): A dictionary containing PBS queue metadata as key-value pairs.
258
259        Returns:
260            Self: A new instance of PBSQueue.
261
262        Note:
263            This method does not perform any validation or processing of the provided dictionary.
264        """
265        queue = cls.__new__(cls)
266        queue._name = name
267        queue._server = server
268        queue._info = info
269        queue._set_attributes()
270
271        return queue
272
273    def _set_job_numbers(self) -> None:
274        """
275        Parse and store job counts by state from the 'state_count' field.
276
277        If parsing fails or the field is missing, `_job_numbers` is set to an empty dictionary.
278        """
279        if not (state_count := self._info.get("state_count")):
280            self._job_numbers: dict[str, str] = {}
281            return
282
283        try:
284            self._job_numbers = dict(p.split(":") for p in state_count.split())
285        except Exception as e:
286            logger.warning(f"Could not get job counts for queue '{self._name}': {e}.")
287            self._job_numbers: dict[str, str] = {}

Implementation of BatchQueueInterface for PBS. Stores metadata for a single PBS queue.

PBSQueue(name: str, server: str | None = None)
90    def __init__(self, name: str, server: str | None = None):
91        self._name = name
92        self._server = server
93        self._info: dict[str, str] = {}
94
95        self.update()
def update(self) -> None:
 97    def update(self) -> None:
 98        # get queue info from PBS
 99        command = f"qstat -Qfw {self._name}"
100        if self._server:
101            command += f"@{self._server}"
102
103        result = subprocess.run(
104            ["bash"],
105            input=command,
106            text=True,
107            check=False,
108            capture_output=True,
109            errors="replace",
110        )
111
112        if result.returncode != 0:
113            raise QQError(f"Queue '{self._name}' does not exist.")
114
115        self._info = parse_pbs_dump_to_dictionary(result.stdout)
116        self._set_attributes()

Refresh the stored queue information from the batch system.

Raises:
  • QQError: If the queue cannot be queried or its info updated.
def get_name(self) -> str:
118    def get_name(self) -> str:
119        return self._name

Retrieve the name of the queue.

Returns:

str: The name identifying this queue in the batch system.

def get_priority(self) -> str | None:
121    def get_priority(self) -> str | None:
122        return self._info.get("Priority")

Retrieve the scheduling priority of the queue.

Returns:

str | None: The queue priority, or None if priority information is not available.

def get_total_jobs(self) -> int | None:
124    def get_total_jobs(self) -> int | None:
125        return PBSQueue._get_int_value(self._info, "total_jobs")

Retrieve the total number of jobs currently in the queue.

Returns:

int | None: The total count of jobs, regardless of status or None if the information is not available.

def get_running_jobs(self) -> int | None:
127    def get_running_jobs(self) -> int | None:
128        return PBSQueue._get_int_value(self._job_numbers, "Running")

Retrieve the number of jobs currently running in the queue.

Returns:

int | None: The number of running jobs or None if the information is not available.

def get_queued_jobs(self) -> int | None:
130    def get_queued_jobs(self) -> int | None:
131        # we count held and waiting jobs as queued for consistency with Slurm
132        return (
133            (PBSQueue._get_int_value(self._job_numbers, "Queued") or 0)
134            + (PBSQueue._get_int_value(self._job_numbers, "Held") or 0)
135            + (PBSQueue._get_int_value(self._job_numbers, "Waiting") or 0)
136        )

Retrieve the number of jobs waiting to start in the queue.

Returns:

int | None: The number of queued jobs or None if the information is not available.

def get_other_jobs(self) -> int | None:
138    def get_other_jobs(self) -> int | None:
139        return (
140            (PBSQueue._get_int_value(self._job_numbers, "Transit") or 0)
141            + (PBSQueue._get_int_value(self._job_numbers, "Exiting") or 0)
142            + (PBSQueue._get_int_value(self._job_numbers, "Begun") or 0)
143        )

Retrieve the number of jobs in other states (non-running and non-queued).

Returns:

int | None: The number of jobs that are neither running nor queued, such as exiting or suspended jobs. Returns None if the information is not available.

def get_max_walltime(self) -> datetime.timedelta | None:
145    def get_max_walltime(self) -> timedelta | None:
146        if raw_time := self._info.get("resources_max.walltime"):
147            return hhmmss_to_duration(raw_time)
148
149        return None

Retrieve the maximum walltime allowed for jobs in the queue.

Returns:

timedelta | None: The walltime limit, or None if unlimited or unknown.

def get_max_n_nodes(self) -> int | None:
151    def get_max_n_nodes(self) -> int | None:
152        return PBSQueue._get_int_value(self._info, "resources_max.nodect")

Retrieve the maximum number of nodes that can be requested in the queue.

Returns:

int | None: The maximum number of nodes that can be requested, or None if unlimited or unknown.

def get_comment(self) -> str | None:
154    def get_comment(self) -> str | None:
155        if not (raw_comment := self._info.get("comment")):
156            return None
157
158        return raw_comment.split("|", 1)[0]

Retrieve the comment or description associated with the queue.

Returns:

str | None: The human-readable comment or note about the queue or None if the information is not available.

def is_available_to_user(self, user: str) -> bool:
160    def is_available_to_user(self, user: str) -> bool:
161        # queues that are not enabled or not started are unavailable to all users
162        if self._info.get("enabled") != "True" or self._info.get("started") != "True":
163            return False
164
165        # check acl users
166        if self._info.get("acl_user_enable") == "True":
167            acl_users = self._acl_users
168            if user not in acl_users:
169                return False
170
171        # check acl groups
172        if self._info.get("acl_group_enable") == "True":
173            expected_acl_groups = self._acl_groups
174            users_acl_groups = ACLData.get_groups_or_init(user)
175            if not any(item in expected_acl_groups for item in users_acl_groups):
176                return False
177
178        # check acl hosts
179        if (host := self._info.get("acl_host_enable")) == "True":
180            acl_hosts = self._acl_hosts
181            host = ACLData.get_host_or_init()
182            if host not in acl_hosts:
183                return False
184
185        return True

Check whether the specified user has access to this queue.

Arguments:
  • user (str): The username to check access for.
Returns:

bool: True if the user can submit jobs to this queue, False otherwise.

def get_destinations(self) -> list[str]:
187    def get_destinations(self) -> list[str]:
188        if raw_destinations := self._info.get("route_destinations"):
189            return raw_destinations.split(",")
190
191        return []

Retrieve all destinations available for this queue route.

Returns:

list[str]: A list of destination queue names associated with the queue.

def from_route_only(self) -> bool:
193    def from_route_only(self) -> bool:
194        return self._info.get("from_route_only") == "True"

Determine whether this queue can only be accessed via a route.

Returns:

bool: True if the queue is accessible exclusively through a route, False otherwise.

def to_yaml(self) -> str:
196    def to_yaml(self) -> str:
197        # we need to add queue name to the start of the dictionary
198        to_dump = {"Queue": self._name} | self._info
199        return yaml.dump(
200            to_dump, default_flow_style=False, sort_keys=False, Dumper=Dumper
201        )

Return all information about the queue from the batch system in YAML format.

Returns:

str: YAML-formatted string of queue metadata.

def get_default_resources(self) -> qq_lib.properties.resources.Resources:
203    def get_default_resources(self) -> Resources:
204        default_resources = {}
205
206        for key, value in self._info.items():
207            if "resources_default" in key:
208                resource = key.split(".")[-1]
209                default_resources[resource.strip()] = value.strip()
210
211        # filter resources that are part of Resources
212        field_names = {f.name for f in fields(Resources)}
213        return Resources(
214            **{k: v for k, v in default_resources.items() if k in field_names}
215        )

Return the default resource definitions for this queue.

Returns:

Resources: Default resources allocated for jobs submitted to this queue.

@classmethod
def from_dict(cls, name: str, server: str | None, info: dict[str, str]) -> Self:
248    @classmethod
249    def from_dict(cls, name: str, server: str | None, info: dict[str, str]) -> Self:
250        """
251        Construct a new instance of PBSQueue from a queue name and a dictionary of queue information.
252
253
254        Args:
255            name (str): The unique name of the queue.
256            server (str | None): Server on which the queue is located. If `None`, assumes the current server.
257            info (dict[str, str]): A dictionary containing PBS queue metadata as key-value pairs.
258
259        Returns:
260            Self: A new instance of PBSQueue.
261
262        Note:
263            This method does not perform any validation or processing of the provided dictionary.
264        """
265        queue = cls.__new__(cls)
266        queue._name = name
267        queue._server = server
268        queue._info = info
269        queue._set_attributes()
270
271        return queue

Construct a new instance of PBSQueue from a queue name and a dictionary of queue information.

Arguments:
  • name (str): The unique name of the queue.
  • server (str | None): Server on which the queue is located. If None, assumes the current server.
  • info (dict[str, str]): A dictionary containing PBS queue metadata as key-value pairs.
Returns:

Self: A new instance of PBSQueue.

Note:

This method does not perform any validation or processing of the provided dictionary.