qq_lib.info

Facilities for loading, interpreting, and presenting qq job information.

This module defines the Informer class, which loads qq job metadata, combines information from info files and from the batch system, and provides access to runtime details such as working nodes, submission/start/end times, and resources.

It also provides the Presenter class, which formats this information into Rich-based status panels, full job-information views, and compact summaries used throughout qq's CLI.

 1# Released under MIT License.
 2# Copyright (c) 2025-2026 Ladislav Bartos and Robert Vacha Lab
 3
 4"""
 5Facilities for loading, interpreting, and presenting qq job information.
 6
 7This module defines the `Informer` class, which loads qq job metadata,
 8combines information from info files and from the batch system, and provides
 9access to runtime details such as working nodes, submission/start/end times, and resources.
10
11It also provides the `Presenter` class, which formats this information into
12Rich-based status panels, full job-information views, and compact summaries used
13throughout qq's CLI.
14"""
15
16from .informer import Informer
17from .presenter import Presenter
18
19__all__ = ["Informer", "Presenter"]
class Informer:
 23class Informer:
 24    """
 25    Provides an interface to access and manipulate qq job information.
 26    """
 27
 28    def __init__(self, info: Info):
 29        """
 30        Initialize the informer with job information.
 31
 32        Args:
 33            info (Info): An Info object containing raw job data.
 34        """
 35        self.info = info
 36        self._batch_info: BatchJobInterface | None = None
 37
 38    @property
 39    def batch_system(self) -> type[BatchInterface]:
 40        """
 41        Return the batch system class used for this job.
 42
 43        Returns:
 44            type[BatchInterface]: The batch system class used for this job.
 45        """
 46        return self.info.batch_system
 47
 48    @classmethod
 49    def from_file(cls, file: Path, host: str | None = None) -> Self:
 50        """
 51        Create an Informer by loading job information from a file.
 52
 53        If 'host' is provided, the file is read from the remote host; otherwise, it is read locally.
 54
 55        Args:
 56            file (Path): Path to a YAML file containing job information.
 57            host (str | None): Optional remote host from which to read the file.
 58
 59        Returns:
 60            Informer: An instance initialized with the loaded Info.
 61
 62        Raises:
 63            QQError: If the file cannot be read, reached, or parsed correctly.
 64        """
 65        return cls(Info.from_file(file, host))
 66
 67    @classmethod
 68    def from_job_id(cls, job_id: str) -> Self:
 69        """
 70        Load an `Informer` from a job ID.
 71
 72        Retrieves batch metadata for the given job and loads its qq info file.
 73        This is more efficient than resolving the info-file path separately,
 74        as it also sets `_batch_info` without additional batch-system queries.
 75
 76        Args:
 77            job_id (str): The job identifier.
 78
 79        Returns:
 80            Informer: The loaded informer.
 81
 82        Raises:
 83            QQError: If the job does not exist or is not a valid qq job.
 84            QQJobMismatchError: If the info file does not correspond to the job's ID.
 85        """
 86        BatchSystem = BatchInterface.from_env_var_or_guess()
 87        batch_job: BatchJobInterface = BatchSystem.get_batch_job(job_id)
 88
 89        if batch_job.is_empty():
 90            raise QQError(f"Job '{job_id}' does not exist.")
 91
 92        return cls.from_batch_job(batch_job)
 93
 94    @classmethod
 95    def from_batch_job(cls, batch_job: BatchJobInterface) -> Self:
 96        """
 97        Load an `Informer` from batch job information.
 98
 99        Raises an exception if the job is not a qq job.
100        This is more efficient than resolving the info-file path separately,
101        as it also sets `_batch_info` without additional batch-system queries.
102
103        Args:
104            batch_job (BatchJobInterface): The job info provided by the batch system.
105
106        Returns:
107            Informer: The loaded informer.
108
109        Raises:
110            QQError: If the job is not a valid qq job (missing info file).
111            QQJobMismatchError: If the info file does not correspond to the job's ID.
112        """
113        if not (path := batch_job.get_info_file()):
114            raise QQError(f"Job '{batch_job.get_id()}' is not a valid qq job.")
115
116        informer = cls.from_file(path)
117
118        # check that the loaded info file actually corresponds to the batch job's ID
119        if not informer.matches_job(batch_job.get_id()):
120            raise QQJobMismatchError(
121                f"Info file for job '{batch_job.get_id()}' does not exist or is not reachable."
122            )
123
124        informer._batch_info = batch_job
125        return informer
126
127    @staticmethod
128    def set_batch_info_in_bulk(informers: list[Informer]) -> None:
129        """
130        Set the batch info for a list of informers in bulk,
131        by querying the batch system as few times as possible.
132
133        If the corresponding batch job no longer exists, batch info is still set,
134        but the BatchJobInterface is empty.
135
136        Args:
137            informers (list[Informer]): The list of informers to set the batch info for.
138        """
139        if not informers:
140            return
141
142        job_ids = [informer.info.job_id for informer in informers]
143        batch_jobs = informers[0].batch_system.get_batch_jobs_from_ids(job_ids)
144        for informer, batch_job in zip(informers, batch_jobs):
145            informer._batch_info = batch_job
146
147    def to_file(self, file: Path, host: str | None = None) -> None:
148        """
149        Export the job information to a file.
150
151        If `host` is provided, the file is written to the remote host; otherwise, it is written locally.
152
153        Args:
154            file (Path): Path to the output YAML file.
155            host (str | None): Optional remote host where the file should be written.
156
157        Raises:
158            QQError: If the file cannot be created, reached, or written to.
159        """
160        self.info.to_file(file, host)
161
162    def matches_job(self, job_id: str) -> bool:
163        """
164        Determine whether this informer corresponds to the specified job ID.
165
166        Args:
167            job_id (str): The job ID to compare against (e.g., "12345" or "12345.cluster.domain").
168
169        Returns:
170            bool: True if both job IDs refer to the same job (same numeric/job part),
171                False otherwise.
172        """
173        return self.info.job_id.split(".", 1)[0] == job_id.split(".", 1)[0]
174
175    def set_running(
176        self, time: datetime, main_node: str, all_nodes: list[str], work_dir: Path
177    ) -> None:
178        """
179        Mark the job as running and set associated metadata.
180
181        Args:
182            time (datetime): Job start time.
183            main_node (str): Main node assigned to the job.
184            work_dir (Path): Working directory used by the job.
185        """
186        self.info.job_state = NaiveState.RUNNING
187        self.info.start_time = time
188        self.info.main_node = main_node
189        self.info.all_nodes = all_nodes
190        self.info.work_dir = work_dir
191
192    def set_finished(self, time: datetime) -> None:
193        """
194        Mark the job as finished successfully.
195
196        Args:
197            time (datetime): Job completion time.
198        """
199        self.info.job_state = NaiveState.FINISHED
200        self.info.completion_time = time
201        self.info.job_exit_code = 0
202
203    def set_failed(self, time: datetime, exit_code: int) -> None:
204        """
205        Mark the job as failed.
206
207        Args:
208            time (datetime): Job completion (failure) time.
209            exit_code (int): Exit code of the failed job.
210        """
211        self.info.job_state = NaiveState.FAILED
212        self.info.completion_time = time
213        self.info.job_exit_code = exit_code
214
215    def set_killed(self, time: datetime) -> None:
216        """
217        Mark the job as killed.
218
219        Args:
220            time (datetime): Time when the job was killed.
221        """
222        self.info.job_state = NaiveState.KILLED
223        self.info.completion_time = time
224        # no exit code is intentionally set
225
226    def uses_scratch(self) -> bool:
227        """
228        Determine if the job uses a scratch directory.
229
230        Returns:
231            bool: True if a scratch is used, False if it is not.
232        """
233        return self.info.resources.uses_scratch()
234
235    def get_destination(self) -> tuple[str, Path] | None:
236        """
237        Retrieve the job's main node and working directory.
238
239        Returns:
240            tuple[str, Path] | None: A tuple of (main_node, work_dir)
241                if both are set, otherwise None.
242        """
243        if (main_node := self.info.main_node) and (work_dir := self.info.work_dir):
244            return main_node, work_dir
245        return None
246
247    def load_batch_info(self) -> None:
248        """
249        Load the batch job information from the batch system if it's not already loaded.
250        """
251        if self._batch_info is None:
252            self._batch_info = self.batch_system.get_batch_job(self.info.job_id)
253
254    def set_batch_info(self, batch_info: BatchJobInterface) -> None:
255        """
256        Set the batch job information.
257        """
258        self._batch_info = batch_info
259
260    def get_batch_state(self) -> BatchState:
261        """
262        Return the job's state as reported by the batch system.
263
264        Uses cached information if available; otherwise queries the batch system
265        via `batch_system.get_batch_job`. This avoids unnecessary remote calls.
266
267        Returns:
268            BatchState: The job's state according to the batch system.
269        """
270        if not self._batch_info:
271            self._batch_info = self.batch_system.get_batch_job(self.info.job_id)
272
273        return self._batch_info.get_state()
274
275    def get_real_state(self) -> RealState:
276        """
277        Get the job's real state by combining the internal state (`NaiveState`)
278        with the state reported by the batch system (`BatchState`).
279
280        Uses cached information if available; otherwise queries the batch system
281        via `batch_system.get_batch_job`. This avoids unnecessary remote calls.
282
283        Returns:
284            RealState: The job's real state obtained by combining information
285            from qq and the batch system.
286        """
287        # shortcut: if the naive state is unknown, there is no need to check batch state
288        if self.info.job_state in {
289            NaiveState.UNKNOWN,
290        }:
291            logger.debug(
292                "Short-circuiting get_real_state: the batch state will not affect the result."
293            )
294            return RealState.from_states(self.info.job_state, BatchState.UNKNOWN)
295
296        return RealState.from_states(self.info.job_state, self.get_batch_state())
297
298    def get_comment(self) -> str | None:
299        """
300        Return the job's comment as reported by the batch system.
301
302        Uses cached information if available; otherwise queries the batch system
303        via `batch_system.get_batch_job`. This avoids unnecessary remote calls.
304
305        Returns:
306            str | None: The job comment if available, otherwise None.
307        """
308        if not self._batch_info:
309            self._batch_info = self.batch_system.get_batch_job(self.info.job_id)
310
311        return self._batch_info.get_comment()
312
313    def get_estimated(self) -> tuple[datetime, str] | None:
314        """
315        Return the estimated start time and execution node for the job.
316
317        Uses cached information if available; otherwise queries the batch system
318        via `batch_system.get_batch_job`. This avoids unnecessary remote calls.
319
320        Returns:
321            tuple[datetime, str] | None: A tuple containing the estimated start time
322            (as a datetime) and the execution node (as a string), or None if the
323            information is not available.
324        """
325        if not self._batch_info:
326            self._batch_info = self.batch_system.get_batch_job(self.info.job_id)
327
328        return self._batch_info.get_estimated()
329
330    def get_main_node(self) -> str | None:
331        """
332        Return the main execution node for the job.
333
334        Note that this obtains the node information from the batch system itself!
335
336        Uses cached information if available; otherwise queries the batch system
337        via `batch_system.get_batch_job`. This avoids unnecessary remote calls.
338
339        Returns:
340            str | None: The hostname of the main execution node, or None if the
341            information is not available.
342        """
343        if not self._batch_info:
344            self._batch_info = self.batch_system.get_batch_job(self.info.job_id)
345
346        return self._batch_info.get_main_node()
347
348    def get_nodes(self) -> list[str] | None:
349        """
350        Retrieve the list of execution nodes on which the job is running.
351
352        Note that this obtains the node information from the batch system itself!
353
354        Uses cached information if available; otherwise queries the batch system
355        via `batch_system.get_batch_job`. This avoids unnecessary remote calls.
356
357        Returns:
358            list[str] | None:
359                A list of hostnames (or node identifiers) where the job is running,
360                or `None` if the job has not started or node information is unavailable.
361        """
362        if not self._batch_info:
363            self._batch_info = self.batch_system.get_batch_job(self.info.job_id)
364
365        return self._batch_info.get_nodes()
366
367    def get_batch_info(self) -> BatchJobInterface:
368        """
369        Return cached batch job information if available; otherwise fetch it from
370        the batch system, cache it, and return the result.
371
372        Returns:
373            BatchJobInterface: The information about the job from the batch system.
374        """
375        if self._batch_info is None:
376            self._batch_info = self.batch_system.get_batch_job(self.info.job_id)
377        return self._batch_info
378
379    def get_info_file(self) -> Path:
380        """
381        Get absolute path to the info file associated with this job.
382
383        Be aware that the info file does not have to exist.
384
385        Returns:
386            Path: Absolute path to the info file.
387        """
388        return construct_info_file_path(self.info.input_dir, self.info.job_name)
389
390    def should_transfer_files(self, exit_code: int) -> bool:
391        """
392        Determine whether files should be transferred from the working directory.
393
394        Checks the configured transfer mode to decide if files should be transferred
395        from the job's working directory to the input directory based on the job's
396        exit code.
397
398        Args:
399            exit_code: The exit code of the completed job.
400
401        Returns:
402            True if files should be transferred, False otherwise.
403        """
404        return any(mode.should_transfer(exit_code) for mode in self.info.transfer_mode)
405
406    def should_archive_files(self, exit_code: int) -> bool:
407        """
408        Determine whether files should be archived from the working directory to the archive directory.
409
410        Checks the configured archive mode to decide if files should be archived
411        from the job's working directory. This only applies to loop jobs.
412
413        Args:
414            exit_code: The exit code of the completed job iteration.
415
416        Returns:
417            True if files should be archived, False if the job is not a loop job or
418            the archive mode does not trigger on the given exit code.
419        """
420        if (loop_info := self.info.loop_info) is None:
421            return False
422
423        return any(mode.should_transfer(exit_code) for mode in loop_info.archive_mode)

Provides an interface to access and manipulate qq job information.

Informer(info: qq_lib.properties.info.Info)
28    def __init__(self, info: Info):
29        """
30        Initialize the informer with job information.
31
32        Args:
33            info (Info): An Info object containing raw job data.
34        """
35        self.info = info
36        self._batch_info: BatchJobInterface | None = None

Initialize the informer with job information.

Arguments:
  • info (Info): An Info object containing raw job data.
info
batch_system: type[qq_lib.batch.interface.BatchInterface]
38    @property
39    def batch_system(self) -> type[BatchInterface]:
40        """
41        Return the batch system class used for this job.
42
43        Returns:
44            type[BatchInterface]: The batch system class used for this job.
45        """
46        return self.info.batch_system

Return the batch system class used for this job.

Returns:

type[BatchInterface]: The batch system class used for this job.

@classmethod
def from_file(cls, file: pathlib._local.Path, host: str | None = None) -> Self:
48    @classmethod
49    def from_file(cls, file: Path, host: str | None = None) -> Self:
50        """
51        Create an Informer by loading job information from a file.
52
53        If 'host' is provided, the file is read from the remote host; otherwise, it is read locally.
54
55        Args:
56            file (Path): Path to a YAML file containing job information.
57            host (str | None): Optional remote host from which to read the file.
58
59        Returns:
60            Informer: An instance initialized with the loaded Info.
61
62        Raises:
63            QQError: If the file cannot be read, reached, or parsed correctly.
64        """
65        return cls(Info.from_file(file, host))

Create an Informer by loading job information from a file.

If 'host' is provided, the file is read from the remote host; otherwise, it is read locally.

Arguments:
  • file (Path): Path to a YAML file containing job information.
  • host (str | None): Optional remote host from which to read the file.
Returns:

Informer: An instance initialized with the loaded Info.

Raises:
  • QQError: If the file cannot be read, reached, or parsed correctly.
@classmethod
def from_job_id(cls, job_id: str) -> Self:
67    @classmethod
68    def from_job_id(cls, job_id: str) -> Self:
69        """
70        Load an `Informer` from a job ID.
71
72        Retrieves batch metadata for the given job and loads its qq info file.
73        This is more efficient than resolving the info-file path separately,
74        as it also sets `_batch_info` without additional batch-system queries.
75
76        Args:
77            job_id (str): The job identifier.
78
79        Returns:
80            Informer: The loaded informer.
81
82        Raises:
83            QQError: If the job does not exist or is not a valid qq job.
84            QQJobMismatchError: If the info file does not correspond to the job's ID.
85        """
86        BatchSystem = BatchInterface.from_env_var_or_guess()
87        batch_job: BatchJobInterface = BatchSystem.get_batch_job(job_id)
88
89        if batch_job.is_empty():
90            raise QQError(f"Job '{job_id}' does not exist.")
91
92        return cls.from_batch_job(batch_job)

Load an Informer from a job ID.

Retrieves batch metadata for the given job and loads its qq info file. This is more efficient than resolving the info-file path separately, as it also sets _batch_info without additional batch-system queries.

Arguments:
  • job_id (str): The job identifier.
Returns:

Informer: The loaded informer.

Raises:
  • QQError: If the job does not exist or is not a valid qq job.
  • QQJobMismatchError: If the info file does not correspond to the job's ID.
@classmethod
def from_batch_job(cls, batch_job: qq_lib.batch.interface.BatchJobInterface) -> Self:
 94    @classmethod
 95    def from_batch_job(cls, batch_job: BatchJobInterface) -> Self:
 96        """
 97        Load an `Informer` from batch job information.
 98
 99        Raises an exception if the job is not a qq job.
100        This is more efficient than resolving the info-file path separately,
101        as it also sets `_batch_info` without additional batch-system queries.
102
103        Args:
104            batch_job (BatchJobInterface): The job info provided by the batch system.
105
106        Returns:
107            Informer: The loaded informer.
108
109        Raises:
110            QQError: If the job is not a valid qq job (missing info file).
111            QQJobMismatchError: If the info file does not correspond to the job's ID.
112        """
113        if not (path := batch_job.get_info_file()):
114            raise QQError(f"Job '{batch_job.get_id()}' is not a valid qq job.")
115
116        informer = cls.from_file(path)
117
118        # check that the loaded info file actually corresponds to the batch job's ID
119        if not informer.matches_job(batch_job.get_id()):
120            raise QQJobMismatchError(
121                f"Info file for job '{batch_job.get_id()}' does not exist or is not reachable."
122            )
123
124        informer._batch_info = batch_job
125        return informer

Load an Informer from batch job information.

Raises an exception if the job is not a qq job. This is more efficient than resolving the info-file path separately, as it also sets _batch_info without additional batch-system queries.

Arguments:
  • batch_job (BatchJobInterface): The job info provided by the batch system.
Returns:

Informer: The loaded informer.

Raises:
  • QQError: If the job is not a valid qq job (missing info file).
  • QQJobMismatchError: If the info file does not correspond to the job's ID.
@staticmethod
def set_batch_info_in_bulk(informers: list[Informer]) -> None:
127    @staticmethod
128    def set_batch_info_in_bulk(informers: list[Informer]) -> None:
129        """
130        Set the batch info for a list of informers in bulk,
131        by querying the batch system as few times as possible.
132
133        If the corresponding batch job no longer exists, batch info is still set,
134        but the BatchJobInterface is empty.
135
136        Args:
137            informers (list[Informer]): The list of informers to set the batch info for.
138        """
139        if not informers:
140            return
141
142        job_ids = [informer.info.job_id for informer in informers]
143        batch_jobs = informers[0].batch_system.get_batch_jobs_from_ids(job_ids)
144        for informer, batch_job in zip(informers, batch_jobs):
145            informer._batch_info = batch_job

Set the batch info for a list of informers in bulk, by querying the batch system as few times as possible.

If the corresponding batch job no longer exists, batch info is still set, but the BatchJobInterface is empty.

Arguments:
  • informers (list[Informer]): The list of informers to set the batch info for.
def to_file(self, file: pathlib._local.Path, host: str | None = None) -> None:
147    def to_file(self, file: Path, host: str | None = None) -> None:
148        """
149        Export the job information to a file.
150
151        If `host` is provided, the file is written to the remote host; otherwise, it is written locally.
152
153        Args:
154            file (Path): Path to the output YAML file.
155            host (str | None): Optional remote host where the file should be written.
156
157        Raises:
158            QQError: If the file cannot be created, reached, or written to.
159        """
160        self.info.to_file(file, host)

Export the job information to a file.

If host is provided, the file is written to the remote host; otherwise, it is written locally.

Arguments:
  • file (Path): Path to the output YAML file.
  • host (str | None): Optional remote host where the file should be written.
Raises:
  • QQError: If the file cannot be created, reached, or written to.
def matches_job(self, job_id: str) -> bool:
162    def matches_job(self, job_id: str) -> bool:
163        """
164        Determine whether this informer corresponds to the specified job ID.
165
166        Args:
167            job_id (str): The job ID to compare against (e.g., "12345" or "12345.cluster.domain").
168
169        Returns:
170            bool: True if both job IDs refer to the same job (same numeric/job part),
171                False otherwise.
172        """
173        return self.info.job_id.split(".", 1)[0] == job_id.split(".", 1)[0]

Determine whether this informer corresponds to the specified job ID.

Arguments:
  • job_id (str): The job ID to compare against (e.g., "12345" or "12345.cluster.domain").
Returns:

bool: True if both job IDs refer to the same job (same numeric/job part), False otherwise.

def set_running( self, time: datetime.datetime, main_node: str, all_nodes: list[str], work_dir: pathlib._local.Path) -> None:
175    def set_running(
176        self, time: datetime, main_node: str, all_nodes: list[str], work_dir: Path
177    ) -> None:
178        """
179        Mark the job as running and set associated metadata.
180
181        Args:
182            time (datetime): Job start time.
183            main_node (str): Main node assigned to the job.
184            work_dir (Path): Working directory used by the job.
185        """
186        self.info.job_state = NaiveState.RUNNING
187        self.info.start_time = time
188        self.info.main_node = main_node
189        self.info.all_nodes = all_nodes
190        self.info.work_dir = work_dir

Mark the job as running and set associated metadata.

Arguments:
  • time (datetime): Job start time.
  • main_node (str): Main node assigned to the job.
  • work_dir (Path): Working directory used by the job.
def set_finished(self, time: datetime.datetime) -> None:
192    def set_finished(self, time: datetime) -> None:
193        """
194        Mark the job as finished successfully.
195
196        Args:
197            time (datetime): Job completion time.
198        """
199        self.info.job_state = NaiveState.FINISHED
200        self.info.completion_time = time
201        self.info.job_exit_code = 0

Mark the job as finished successfully.

Arguments:
  • time (datetime): Job completion time.
def set_failed(self, time: datetime.datetime, exit_code: int) -> None:
203    def set_failed(self, time: datetime, exit_code: int) -> None:
204        """
205        Mark the job as failed.
206
207        Args:
208            time (datetime): Job completion (failure) time.
209            exit_code (int): Exit code of the failed job.
210        """
211        self.info.job_state = NaiveState.FAILED
212        self.info.completion_time = time
213        self.info.job_exit_code = exit_code

Mark the job as failed.

Arguments:
  • time (datetime): Job completion (failure) time.
  • exit_code (int): Exit code of the failed job.
def set_killed(self, time: datetime.datetime) -> None:
215    def set_killed(self, time: datetime) -> None:
216        """
217        Mark the job as killed.
218
219        Args:
220            time (datetime): Time when the job was killed.
221        """
222        self.info.job_state = NaiveState.KILLED
223        self.info.completion_time = time
224        # no exit code is intentionally set

Mark the job as killed.

Arguments:
  • time (datetime): Time when the job was killed.
def uses_scratch(self) -> bool:
226    def uses_scratch(self) -> bool:
227        """
228        Determine if the job uses a scratch directory.
229
230        Returns:
231            bool: True if a scratch is used, False if it is not.
232        """
233        return self.info.resources.uses_scratch()

Determine if the job uses a scratch directory.

Returns:

bool: True if a scratch is used, False if it is not.

def get_destination(self) -> tuple[str, pathlib._local.Path] | None:
235    def get_destination(self) -> tuple[str, Path] | None:
236        """
237        Retrieve the job's main node and working directory.
238
239        Returns:
240            tuple[str, Path] | None: A tuple of (main_node, work_dir)
241                if both are set, otherwise None.
242        """
243        if (main_node := self.info.main_node) and (work_dir := self.info.work_dir):
244            return main_node, work_dir
245        return None

Retrieve the job's main node and working directory.

Returns:

tuple[str, Path] | None: A tuple of (main_node, work_dir) if both are set, otherwise None.

def load_batch_info(self) -> None:
247    def load_batch_info(self) -> None:
248        """
249        Load the batch job information from the batch system if it's not already loaded.
250        """
251        if self._batch_info is None:
252            self._batch_info = self.batch_system.get_batch_job(self.info.job_id)

Load the batch job information from the batch system if it's not already loaded.

def set_batch_info(self, batch_info: qq_lib.batch.interface.BatchJobInterface) -> None:
254    def set_batch_info(self, batch_info: BatchJobInterface) -> None:
255        """
256        Set the batch job information.
257        """
258        self._batch_info = batch_info

Set the batch job information.

def get_batch_state(self) -> qq_lib.properties.states.BatchState:
260    def get_batch_state(self) -> BatchState:
261        """
262        Return the job's state as reported by the batch system.
263
264        Uses cached information if available; otherwise queries the batch system
265        via `batch_system.get_batch_job`. This avoids unnecessary remote calls.
266
267        Returns:
268            BatchState: The job's state according to the batch system.
269        """
270        if not self._batch_info:
271            self._batch_info = self.batch_system.get_batch_job(self.info.job_id)
272
273        return self._batch_info.get_state()

Return the job's state as reported by the batch system.

Uses cached information if available; otherwise queries the batch system via batch_system.get_batch_job. This avoids unnecessary remote calls.

Returns:

BatchState: The job's state according to the batch system.

def get_real_state(self) -> qq_lib.properties.states.RealState:
275    def get_real_state(self) -> RealState:
276        """
277        Get the job's real state by combining the internal state (`NaiveState`)
278        with the state reported by the batch system (`BatchState`).
279
280        Uses cached information if available; otherwise queries the batch system
281        via `batch_system.get_batch_job`. This avoids unnecessary remote calls.
282
283        Returns:
284            RealState: The job's real state obtained by combining information
285            from qq and the batch system.
286        """
287        # shortcut: if the naive state is unknown, there is no need to check batch state
288        if self.info.job_state in {
289            NaiveState.UNKNOWN,
290        }:
291            logger.debug(
292                "Short-circuiting get_real_state: the batch state will not affect the result."
293            )
294            return RealState.from_states(self.info.job_state, BatchState.UNKNOWN)
295
296        return RealState.from_states(self.info.job_state, self.get_batch_state())

Get the job's real state by combining the internal state (NaiveState) with the state reported by the batch system (BatchState).

Uses cached information if available; otherwise queries the batch system via batch_system.get_batch_job. This avoids unnecessary remote calls.

Returns:

RealState: The job's real state obtained by combining information from qq and the batch system.

def get_comment(self) -> str | None:
298    def get_comment(self) -> str | None:
299        """
300        Return the job's comment as reported by the batch system.
301
302        Uses cached information if available; otherwise queries the batch system
303        via `batch_system.get_batch_job`. This avoids unnecessary remote calls.
304
305        Returns:
306            str | None: The job comment if available, otherwise None.
307        """
308        if not self._batch_info:
309            self._batch_info = self.batch_system.get_batch_job(self.info.job_id)
310
311        return self._batch_info.get_comment()

Return the job's comment as reported by the batch system.

Uses cached information if available; otherwise queries the batch system via batch_system.get_batch_job. This avoids unnecessary remote calls.

Returns:

str | None: The job comment if available, otherwise None.

def get_estimated(self) -> tuple[datetime.datetime, str] | None:
313    def get_estimated(self) -> tuple[datetime, str] | None:
314        """
315        Return the estimated start time and execution node for the job.
316
317        Uses cached information if available; otherwise queries the batch system
318        via `batch_system.get_batch_job`. This avoids unnecessary remote calls.
319
320        Returns:
321            tuple[datetime, str] | None: A tuple containing the estimated start time
322            (as a datetime) and the execution node (as a string), or None if the
323            information is not available.
324        """
325        if not self._batch_info:
326            self._batch_info = self.batch_system.get_batch_job(self.info.job_id)
327
328        return self._batch_info.get_estimated()

Return the estimated start time and execution node for the job.

Uses cached information if available; otherwise queries the batch system via batch_system.get_batch_job. This avoids unnecessary remote calls.

Returns:

tuple[datetime, str] | None: A tuple containing the estimated start time (as a datetime) and the execution node (as a string), or None if the information is not available.

def get_main_node(self) -> str | None:
330    def get_main_node(self) -> str | None:
331        """
332        Return the main execution node for the job.
333
334        Note that this obtains the node information from the batch system itself!
335
336        Uses cached information if available; otherwise queries the batch system
337        via `batch_system.get_batch_job`. This avoids unnecessary remote calls.
338
339        Returns:
340            str | None: The hostname of the main execution node, or None if the
341            information is not available.
342        """
343        if not self._batch_info:
344            self._batch_info = self.batch_system.get_batch_job(self.info.job_id)
345
346        return self._batch_info.get_main_node()

Return the main execution node for the job.

Note that this obtains the node information from the batch system itself!

Uses cached information if available; otherwise queries the batch system via batch_system.get_batch_job. This avoids unnecessary remote calls.

Returns:

str | None: The hostname of the main execution node, or None if the information is not available.

def get_nodes(self) -> list[str] | None:
348    def get_nodes(self) -> list[str] | None:
349        """
350        Retrieve the list of execution nodes on which the job is running.
351
352        Note that this obtains the node information from the batch system itself!
353
354        Uses cached information if available; otherwise queries the batch system
355        via `batch_system.get_batch_job`. This avoids unnecessary remote calls.
356
357        Returns:
358            list[str] | None:
359                A list of hostnames (or node identifiers) where the job is running,
360                or `None` if the job has not started or node information is unavailable.
361        """
362        if not self._batch_info:
363            self._batch_info = self.batch_system.get_batch_job(self.info.job_id)
364
365        return self._batch_info.get_nodes()

Retrieve the list of execution nodes on which the job is running.

Note that this obtains the node information from the batch system itself!

Uses cached information if available; otherwise queries the batch system via batch_system.get_batch_job. This avoids unnecessary remote calls.

Returns:

list[str] | None: A list of hostnames (or node identifiers) where the job is running, or None if the job has not started or node information is unavailable.

def get_batch_info(self) -> qq_lib.batch.interface.BatchJobInterface:
367    def get_batch_info(self) -> BatchJobInterface:
368        """
369        Return cached batch job information if available; otherwise fetch it from
370        the batch system, cache it, and return the result.
371
372        Returns:
373            BatchJobInterface: The information about the job from the batch system.
374        """
375        if self._batch_info is None:
376            self._batch_info = self.batch_system.get_batch_job(self.info.job_id)
377        return self._batch_info

Return cached batch job information if available; otherwise fetch it from the batch system, cache it, and return the result.

Returns:

BatchJobInterface: The information about the job from the batch system.

def get_info_file(self) -> pathlib._local.Path:
379    def get_info_file(self) -> Path:
380        """
381        Get absolute path to the info file associated with this job.
382
383        Be aware that the info file does not have to exist.
384
385        Returns:
386            Path: Absolute path to the info file.
387        """
388        return construct_info_file_path(self.info.input_dir, self.info.job_name)

Get absolute path to the info file associated with this job.

Be aware that the info file does not have to exist.

Returns:

Path: Absolute path to the info file.

def should_transfer_files(self, exit_code: int) -> bool:
390    def should_transfer_files(self, exit_code: int) -> bool:
391        """
392        Determine whether files should be transferred from the working directory.
393
394        Checks the configured transfer mode to decide if files should be transferred
395        from the job's working directory to the input directory based on the job's
396        exit code.
397
398        Args:
399            exit_code: The exit code of the completed job.
400
401        Returns:
402            True if files should be transferred, False otherwise.
403        """
404        return any(mode.should_transfer(exit_code) for mode in self.info.transfer_mode)

Determine whether files should be transferred from the working directory.

Checks the configured transfer mode to decide if files should be transferred from the job's working directory to the input directory based on the job's exit code.

Arguments:
  • exit_code: The exit code of the completed job.
Returns:

True if files should be transferred, False otherwise.

def should_archive_files(self, exit_code: int) -> bool:
406    def should_archive_files(self, exit_code: int) -> bool:
407        """
408        Determine whether files should be archived from the working directory to the archive directory.
409
410        Checks the configured archive mode to decide if files should be archived
411        from the job's working directory. This only applies to loop jobs.
412
413        Args:
414            exit_code: The exit code of the completed job iteration.
415
416        Returns:
417            True if files should be archived, False if the job is not a loop job or
418            the archive mode does not trigger on the given exit code.
419        """
420        if (loop_info := self.info.loop_info) is None:
421            return False
422
423        return any(mode.should_transfer(exit_code) for mode in loop_info.archive_mode)

Determine whether files should be archived from the working directory to the archive directory.

Checks the configured archive mode to decide if files should be archived from the job's working directory. This only applies to loop jobs.

Arguments:
  • exit_code: The exit code of the completed job iteration.
Returns:

True if files should be archived, False if the job is not a loop job or the archive mode does not trigger on the given exit code.

class Presenter:
 26class Presenter:
 27    """
 28    Presents information about a qq job.
 29    """
 30
 31    def __init__(self, informer: Informer):
 32        """
 33        Initialize the presenter with an Informer.
 34
 35        Args:
 36            informer (Informer): The informer object that provides
 37                access to qq job metadata and runtime details.
 38        """
 39        self._informer = informer
 40
 41    def create_job_status_panel(self, console: Console | None = None) -> Group:
 42        """
 43        Create a standalone status panel for the job.
 44
 45        Args:
 46            console (Console | None): Optional Rich console.
 47                If not provided, a new Console is created.
 48
 49        Returns:
 50            Group: A Rich Group containing the status panel.
 51        """
 52        console = console or Console()
 53
 54        panel = Panel(
 55            self._create_job_status_table(self._informer.get_real_state()),
 56            title=Text(
 57                f"JOB: {self._informer.info.job_id}",
 58                style=CFG.presenter.job_status_panel.title_style,
 59                justify="center",
 60            ),
 61            border_style=CFG.presenter.job_status_panel.border_style,
 62            padding=(1, 2),
 63            width=get_panel_width(
 64                console,
 65                3,
 66                CFG.presenter.job_status_panel.min_width,
 67                CFG.presenter.job_status_panel.max_width,
 68            ),
 69        )
 70
 71        return Group(Text(""), panel, Text(""))
 72
 73    def create_full_info_panel(self, console: Console | None = None) -> Group:
 74        """
 75        Create a full job information panel.
 76
 77        Args:
 78            console (Console | None): Optional Rich console.
 79                If not provided, a new Console is created.
 80
 81        Returns:
 82            Group: A Rich Group containing the full job info panel.
 83        """
 84
 85        console = console or Console()
 86
 87        state = self._informer.get_real_state()
 88        comment, estimated = self._get_comment_and_estimated(state)
 89
 90        content = Group(
 91            Padding(self._create_basic_info_table(), (0, 2)),
 92            Text(""),
 93            Rule(
 94                title=Text(
 95                    "RESOURCES", style=CFG.presenter.full_info_panel.title_style
 96                ),
 97                style=CFG.presenter.full_info_panel.rule_style,
 98            ),
 99            Text(""),
100            Padding(
101                Align.center(self._create_resources_table(console.size.width)),
102                (0, 2),
103            ),
104            Text(""),
105            Rule(
106                title=Text("HISTORY", style=CFG.presenter.full_info_panel.title_style),
107                style=CFG.presenter.full_info_panel.rule_style,
108            ),
109            Text(""),
110            Padding(
111                self._create_job_history_table(
112                    state, self._informer.info.job_exit_code
113                ),
114                (0, 2),
115            ),
116            self._create_job_steps_block(),
117            Text(""),
118            Rule(
119                title=Text("STATE", style=CFG.presenter.full_info_panel.title_style),
120                style=CFG.presenter.full_info_panel.rule_style,
121            ),
122            Text(""),
123            Padding(self._create_job_status_table(state, comment, estimated), (0, 2)),
124        )
125
126        # combine all sections
127        full_panel = Panel(
128            content,
129            title=Text(
130                f"JOB: {self._informer.info.job_id}",
131                style=CFG.presenter.full_info_panel.title_style,
132                justify="center",
133            ),
134            border_style=CFG.presenter.full_info_panel.border_style,
135            # no horizontal padding so Rule reaches borders
136            padding=(1, 0),
137            width=get_panel_width(
138                console,
139                3,
140                CFG.presenter.full_info_panel.min_width,
141                CFG.presenter.full_info_panel.max_width,
142            ),
143        )
144
145        return Group(Text(""), full_panel, Text(""))
146
147    def get_brief_info(self, print_dir: bool) -> Text:
148        """
149        Return a concise, colorized summary of the job's current state.
150
151        Returns:
152            Text: A Rich `Text` object containing brief information about the job.
153        """
154        state = self._informer.get_real_state()
155        job_id = Text(
156            self._informer.info.job_id,
157            style=CFG.presenter.brief_info.job_id_color,
158        )
159        state_text = Text(str(state), style=state.color)
160
161        if print_dir:
162            dir_path = Text(
163                str(
164                    self._informer.info.input_dir.resolve().relative_to(
165                        Path.cwd(), walk_up=True
166                    )
167                ),
168                style=CFG.presenter.brief_info.dir_path_color,
169            )
170            full_text = job_id + "   [" + dir_path + "]   " + state_text
171        else:
172            full_text = job_id + "   " + state_text
173
174        if loop_info := self._informer.info.loop_info:
175            full_text += (
176                Text("   [")
177                + Text(
178                    f"{loop_info.current}/{loop_info.end}",
179                    style=CFG.presenter.brief_info.loop_info_color,
180                )
181                + "]"
182            )
183
184        return full_text
185
186    @deprecated("This function has been deprecated, use get_brief_info instead.")
187    def get_short_info(self, print_dir: bool) -> Text:
188        """
189        Return a concise, colorized summary of the job's current state.
190
191        This function is deprecated, use `get_brief_info` instead.
192
193        Returns:
194            Text: A Rich `Text` object containing brief information about the job.
195        """
196        return self.get_brief_info(print_dir)
197
198    def _create_basic_info_table(self) -> Table:
199        """
200        Create a table with basic job information.
201
202        Returns:
203            Table: A Rich table with key-value pairs of basic job details.
204        """
205        table = Table(show_header=False, box=None, padding=(0, 1))
206        table.add_column(justify="right", style=CFG.presenter.key_style)
207        table.add_column(
208            justify="left", overflow="fold", style=CFG.presenter.value_style
209        )
210
211        table.add_row("Job name:", Text(self._informer.info.job_name))
212
213        loop_info = self._informer.info.loop_info
214        job_type_str = str(self._informer.info.job_type)
215
216        if loop_info:
217            content = f"{job_type_str} [{loop_info.current}/{loop_info.end}]"
218        else:
219            content = job_type_str
220
221        table.add_row("Job type:", Text(content))
222        table.add_row("Submission queue:", Text(self._informer.info.queue))
223        table.add_row("Input machine:", Text(self._informer.info.input_machine))
224        table.add_row("Input directory:", Text(str(self._informer.info.input_dir)))
225        if self._informer.info.main_node and self._informer.info.all_nodes:
226            if len(self._informer.info.all_nodes) == 1:
227                table.add_row("Working node:", Text(self._informer.info.main_node))
228            else:
229                table.add_row(
230                    "Working nodes:",
231                    Text(" + ".join(self._informer.info.all_nodes)),
232                )
233        if self._informer.info.work_dir:
234            table.add_row(
235                "Working directory:",
236                Text(str(self._informer.info.work_dir)),
237            )
238
239        return table
240
241    def _create_resources_table(self, term_width: int) -> Table:
242        """
243        Create a table displaying job resource requirements.
244
245        Args:
246            term_width (int): Width of the current terminal, used
247                to size the spacer column.
248
249        Returns:
250            Table: A Rich table summarizing resource allocations.
251        """
252        resources = self._informer.info.resources
253        table = Table(show_header=False, box=None, padding=(0, 1))
254
255        table.add_column(justify="right", style=CFG.presenter.key_style, no_wrap=True)
256        table.add_column(
257            justify="left",
258            style=CFG.presenter.value_style,
259            no_wrap=False,
260            overflow="fold",
261        )
262        # spacer column
263        table.add_column(justify="center", width=term_width // 30)
264        table.add_column(justify="right", style=CFG.presenter.key_style, no_wrap=True)
265        table.add_column(
266            justify="left",
267            style=CFG.presenter.value_style,
268            no_wrap=False,
269            overflow="fold",
270        )
271
272        fields = vars(resources)
273
274        # filter out None values
275        items = [
276            (k.replace("_", "-").lower(), str(v))
277            for k, v in fields.items()
278            if v is not None and k != "props"
279        ]
280
281        # translate properties
282        if resources.props:
283            items.extend([(k, str(v)) for k, v in resources.props.items()])
284
285        for i in range(0, len(items), 2):
286            row = items[i]
287            if i + 1 < len(items):
288                row2 = items[i + 1]
289                table.add_row(
290                    row[0] + ":",
291                    Text(row[1]),
292                    "",
293                    row2[0] + ":",
294                    Text(row2[1]),
295                )
296            else:
297                # only one item left
298                table.add_row(row[0] + ":", Text(row[1]), "", "", "")
299
300        return table
301
302    def _create_job_history_table(
303        self, state: RealState, exit_code: int | None
304    ) -> Table:
305        """
306        Create a table summarizing the job timeline.
307
308        Args:
309            state (RealState): State of the job.
310
311        Returns:
312            Table: A Rich table showing the chronological job history.
313        """
314        submitted = self._informer.info.submission_time
315        started = self._informer.info.start_time
316        completed = self._informer.info.completion_time
317
318        table = Table(show_header=False, box=None, padding=(0, 1))
319
320        table.add_column(justify="right", style=CFG.presenter.key_style)
321        table.add_column(
322            justify="left", style=CFG.presenter.value_style, overflow="fold"
323        )
324
325        table.add_row("Submitted at:", Text(f"{submitted}"))
326        # job started
327        if started:
328            table.add_row(
329                "",
330                Text(
331                    f"was queued for {format_duration_wdhhmmss(started - submitted)}",
332                    style=CFG.presenter.notes_style,
333                ),
334            )
335            table.add_row("Started at:", Text(f"{started}"))
336        # job is completed (or was killed after start)
337        if started and completed:
338            table.add_row(
339                "",
340                Text(
341                    f"was running for {format_duration_wdhhmmss(completed - started)}",
342                    style=CFG.presenter.notes_style,
343                ),
344            )
345            table.add_row(
346                f"{Presenter._translate_state_to_completed_msg(state, exit_code).title()} at:",
347                Text(f"{completed}"),
348            )
349        # job is "completed" (likely killed) but never started running
350        elif completed:
351            table.add_row(
352                "",
353                Text(
354                    f"was queued for {format_duration_wdhhmmss(completed - submitted)}",
355                    style=CFG.presenter.notes_style,
356                ),
357            )
358            table.add_row(
359                f"{Presenter._translate_state_to_completed_msg(state, exit_code).title()} at:",
360                Text(f"{completed}"),
361            )
362
363        return table
364
365    def _create_job_status_table(
366        self,
367        state: RealState,
368        comment: str | None = None,
369        estimated: tuple[datetime, str] | None = None,
370    ) -> Table:
371        """
372        Create a table summarizing the current job status.
373
374        Args:
375            state (RealState): The current real state of the job.
376
377        Returns:
378            Table: A Rich table with job state and details.
379        """
380        (message, details) = self._get_state_messages(
381            state,
382            self._informer.info.start_time or self._informer.info.submission_time,
383            self._informer.info.completion_time or datetime.now(),
384        )
385
386        table = Table(show_header=False, box=None, padding=(0, 1))
387        table.add_column(justify="right", style=CFG.presenter.key_style)
388        table.add_column(justify="left", style=CFG.presenter.value_style)
389
390        table.add_row("Job state:", Text(message, style=f"{state.color} bold"))
391        if details.strip():
392            table.add_row("", Text(details))
393
394        if estimated:
395            table.add_row(
396                "",
397                Text(
398                    f"Planned start within {format_duration_wdhhmmss(estimated[0] - datetime.now())} on '{estimated[1]}'",
399                    style=CFG.presenter.notes_style,
400                ),
401            )
402
403        # comment is typically only useful if the estimated start time is not defined
404        if not estimated and comment:
405            table.add_row("", Text(comment, style=CFG.presenter.notes_style))
406
407        return table
408
409    def _create_job_steps_table(self, steps: Sequence[BatchJobInterface]) -> Table:
410        """
411        Create a formatted Rich table displaying job step information.
412
413        Steps without a valid start time are skipped. The resulting table is intended
414        to be used within full-info job panels.
415
416        Args:
417            steps (Sequence[BatchJobInterface]): A list of batch-system step objects belonging to the job.
418
419        Returns:
420            Table: A Rich table containing the formatted step information.
421        """
422        table = Table(show_header=True, box=None, padding=(0, 1))
423
424        table.add_column("Step", justify="center", style=CFG.presenter.key_style)
425        table.add_column(
426            "State", justify="center", style=CFG.presenter.value_style, overflow="fold"
427        )
428        table.add_column(
429            "Start", justify="center", style=CFG.presenter.value_style, overflow="fold"
430        )
431        table.add_column(
432            "End", justify="center", style=CFG.presenter.value_style, overflow="fold"
433        )
434        table.add_column(
435            "Duration",
436            justify="center",
437            style=CFG.presenter.value_style,
438            overflow="fold",
439        )
440        for step in steps:
441            state = step.get_state()
442            start = step.get_start_time()
443            end = step.get_completion_time()
444
445            if not start:
446                continue
447
448            table.add_row(
449                f"{step.get_step_id()}",
450                Text(state.to_code(), style=state.color),
451                start.strftime(CFG.date_formats.standard),
452                end.strftime(CFG.date_formats.standard) if end else "",
453                format_duration_wdhhmmss((end or datetime.now()) - start),
454            )
455
456        return table
457
458    def _create_job_steps_block(self) -> Group:
459        """
460        Create a Rich block containing the job-steps section of the full info panel.
461
462        This block includes a section heading ("STEPS") and the table of job steps
463        created by `_create_job_steps_table()`. The block is only shown when the job
464        contains more than one step; for single-step jobs, an empty block is returned.
465
466        Returns:
467            Group: A Rich group representing the job-steps section, or an empty group
468            if no multi-step information should be displayed.
469        """
470
471        job: BatchJobInterface = self._informer.get_batch_info()
472        steps = job.get_steps()
473
474        # only show the job steps if there is more than 1 of them
475        if len(steps) > 1:
476            return Group(
477                Text(""),
478                Rule(
479                    title=Text(
480                        "STEPS", style=CFG.presenter.full_info_panel.title_style
481                    ),
482                    style=CFG.presenter.full_info_panel.rule_style,
483                ),
484                Text(""),
485                Padding(self._create_job_steps_table(steps), (0, 2)),
486            )
487
488        return Group()
489
490    def _get_state_messages(
491        self, state: RealState, start_time: datetime, end_time: datetime
492    ) -> tuple[str, str]:
493        """
494        Map a RealState to human-readable messages.
495
496        Args:
497            state (RealState): The current job state.
498            start_time (datetime): Start time of the relevant state period.
499            end_time (datetime): End time of the relevant state period.
500
501        Returns:
502            tuple[str, str]: A tuple containing:
503                - A short status message (e.g., "Job is running").
504                - Additional details, such as elapsed time or error info.
505        """
506        match state:
507            case RealState.QUEUED:
508                return (
509                    "Job is queued",
510                    f"In queue for {format_duration_wdhhmmss(end_time - start_time)}",
511                )
512            case RealState.HELD:
513                return (
514                    "Job is held",
515                    f"In queue for {format_duration_wdhhmmss(end_time - start_time)}",
516                )
517            case RealState.SUSPENDED:
518                return ("Job is suspended", "")
519            case RealState.WAITING:
520                return (
521                    "Job is waiting",
522                    f"In queue for {format_duration_wdhhmmss(end_time - start_time)}",
523                )
524            case RealState.RUNNING:
525                if not (all_nodes := self._informer.info.all_nodes) or not (
526                    main_node := self._informer.info.main_node
527                ):
528                    nodes = "unknown node(s)"
529                elif len(all_nodes) == 1:
530                    nodes = f"'{main_node}'"
531                else:
532                    nodes = f"'{main_node}' and {len(all_nodes) - 1} other node{'s' if len(all_nodes) > 2 else ''}"
533                return (
534                    "Job is running",
535                    f"Running for {format_duration_wdhhmmss(end_time - start_time)} on {nodes}",
536                )
537            case RealState.BOOTING:
538                return (
539                    "Job is booting",
540                    f"Preparing working directory on '{self._informer.get_main_node()}'",
541                )
542            case RealState.KILLED:
543                return (
544                    "Job has been killed",
545                    f"Killed at {end_time.strftime(CFG.date_formats.standard)}",
546                )
547            case RealState.FAILED:
548                return (
549                    "Job has failed",
550                    f"Failed at {end_time.strftime(CFG.date_formats.standard)} [exit code: {self._informer.info.job_exit_code}]",
551                )
552            case RealState.FINISHED:
553                return (
554                    "Job has finished",
555                    f"Completed at {end_time.strftime(CFG.date_formats.standard)}",
556                )
557            case RealState.EXITING:
558                exit_code = self._informer.info.job_exit_code
559                if exit_code is None:
560                    # no logged exit code -> job was killed
561                    msg = "Job is being killed"
562                elif exit_code == 0:
563                    msg = "Job is finishing successfully"
564                else:
565                    msg = f"Job is failing [exit code: {exit_code}]"
566
567                return (
568                    "Job is exiting",
569                    msg,
570                )
571            case RealState.IN_AN_INCONSISTENT_STATE:
572                return (
573                    "Job is in an inconsistent state",
574                    "The batch system and qq disagree on the status of the job",
575                )
576            case RealState.UNKNOWN:
577                return (
578                    "Job is in an unknown state",
579                    "Job is in a state that qq does not recognize",
580                )
581
582        return (
583            "Job is in an unknown state",
584            "Job is in a state that qq does not recognize",
585        )
586
587    def _get_comment_and_estimated(
588        self, state: RealState
589    ) -> tuple[str | None, tuple[datetime, str] | None]:
590        """
591        Retrieve the job comment and estimated start information
592        if the job is queued, held, waiting or suspended.
593
594        For jobs in other states, return (None, None).
595
596        Args:
597            state (RealState): The current job state.
598
599        Returns:
600            tuple[str | None, tuple[datetime, str] | None]:
601                A tuple containing:
602                - The job comment as a string, or None if unavailable.
603                - A tuple with the estimated start time (datetime) and execution node (str),
604                    or None if unavailable or not applicable.
605        """
606        if state in {
607            RealState.QUEUED,
608            RealState.HELD,
609            RealState.WAITING,
610            RealState.SUSPENDED,
611        }:
612            comment = self._informer.get_comment()
613            estimated = self._informer.get_estimated()
614            return comment, estimated
615
616        return None, None
617
618    @staticmethod
619    def _translate_state_to_completed_msg(
620        state: RealState, exit_code: None | int
621    ) -> str:
622        """
623        Translates a RealState and optional exit code into a human-readable completion message.
624
625        Returns:
626            str: A string representing the final status of the job/process.
627        """
628        if state == RealState.FINISHED or (
629            state == RealState.EXITING and exit_code == 0
630        ):
631            return "finished"
632
633        if state == RealState.KILLED or (
634            state == RealState.EXITING and exit_code is None
635        ):
636            return "killed"
637
638        if state == RealState.FAILED or (state == RealState.EXITING and exit_code != 0):
639            return "failed"
640
641        return "completed"  # default option; should not happen

Presents information about a qq job.

Presenter(informer: Informer)
31    def __init__(self, informer: Informer):
32        """
33        Initialize the presenter with an Informer.
34
35        Args:
36            informer (Informer): The informer object that provides
37                access to qq job metadata and runtime details.
38        """
39        self._informer = informer

Initialize the presenter with an Informer.

Arguments:
  • informer (Informer): The informer object that provides access to qq job metadata and runtime details.
def create_job_status_panel(self, console: rich.console.Console | None = None) -> rich.console.Group:
41    def create_job_status_panel(self, console: Console | None = None) -> Group:
42        """
43        Create a standalone status panel for the job.
44
45        Args:
46            console (Console | None): Optional Rich console.
47                If not provided, a new Console is created.
48
49        Returns:
50            Group: A Rich Group containing the status panel.
51        """
52        console = console or Console()
53
54        panel = Panel(
55            self._create_job_status_table(self._informer.get_real_state()),
56            title=Text(
57                f"JOB: {self._informer.info.job_id}",
58                style=CFG.presenter.job_status_panel.title_style,
59                justify="center",
60            ),
61            border_style=CFG.presenter.job_status_panel.border_style,
62            padding=(1, 2),
63            width=get_panel_width(
64                console,
65                3,
66                CFG.presenter.job_status_panel.min_width,
67                CFG.presenter.job_status_panel.max_width,
68            ),
69        )
70
71        return Group(Text(""), panel, Text(""))

Create a standalone status panel for the job.

Arguments:
  • console (Console | None): Optional Rich console. If not provided, a new Console is created.
Returns:

Group: A Rich Group containing the status panel.

def create_full_info_panel(self, console: rich.console.Console | None = None) -> rich.console.Group:
 73    def create_full_info_panel(self, console: Console | None = None) -> Group:
 74        """
 75        Create a full job information panel.
 76
 77        Args:
 78            console (Console | None): Optional Rich console.
 79                If not provided, a new Console is created.
 80
 81        Returns:
 82            Group: A Rich Group containing the full job info panel.
 83        """
 84
 85        console = console or Console()
 86
 87        state = self._informer.get_real_state()
 88        comment, estimated = self._get_comment_and_estimated(state)
 89
 90        content = Group(
 91            Padding(self._create_basic_info_table(), (0, 2)),
 92            Text(""),
 93            Rule(
 94                title=Text(
 95                    "RESOURCES", style=CFG.presenter.full_info_panel.title_style
 96                ),
 97                style=CFG.presenter.full_info_panel.rule_style,
 98            ),
 99            Text(""),
100            Padding(
101                Align.center(self._create_resources_table(console.size.width)),
102                (0, 2),
103            ),
104            Text(""),
105            Rule(
106                title=Text("HISTORY", style=CFG.presenter.full_info_panel.title_style),
107                style=CFG.presenter.full_info_panel.rule_style,
108            ),
109            Text(""),
110            Padding(
111                self._create_job_history_table(
112                    state, self._informer.info.job_exit_code
113                ),
114                (0, 2),
115            ),
116            self._create_job_steps_block(),
117            Text(""),
118            Rule(
119                title=Text("STATE", style=CFG.presenter.full_info_panel.title_style),
120                style=CFG.presenter.full_info_panel.rule_style,
121            ),
122            Text(""),
123            Padding(self._create_job_status_table(state, comment, estimated), (0, 2)),
124        )
125
126        # combine all sections
127        full_panel = Panel(
128            content,
129            title=Text(
130                f"JOB: {self._informer.info.job_id}",
131                style=CFG.presenter.full_info_panel.title_style,
132                justify="center",
133            ),
134            border_style=CFG.presenter.full_info_panel.border_style,
135            # no horizontal padding so Rule reaches borders
136            padding=(1, 0),
137            width=get_panel_width(
138                console,
139                3,
140                CFG.presenter.full_info_panel.min_width,
141                CFG.presenter.full_info_panel.max_width,
142            ),
143        )
144
145        return Group(Text(""), full_panel, Text(""))

Create a full job information panel.

Arguments:
  • console (Console | None): Optional Rich console. If not provided, a new Console is created.
Returns:

Group: A Rich Group containing the full job info panel.

def get_brief_info(self, print_dir: bool) -> rich.text.Text:
147    def get_brief_info(self, print_dir: bool) -> Text:
148        """
149        Return a concise, colorized summary of the job's current state.
150
151        Returns:
152            Text: A Rich `Text` object containing brief information about the job.
153        """
154        state = self._informer.get_real_state()
155        job_id = Text(
156            self._informer.info.job_id,
157            style=CFG.presenter.brief_info.job_id_color,
158        )
159        state_text = Text(str(state), style=state.color)
160
161        if print_dir:
162            dir_path = Text(
163                str(
164                    self._informer.info.input_dir.resolve().relative_to(
165                        Path.cwd(), walk_up=True
166                    )
167                ),
168                style=CFG.presenter.brief_info.dir_path_color,
169            )
170            full_text = job_id + "   [" + dir_path + "]   " + state_text
171        else:
172            full_text = job_id + "   " + state_text
173
174        if loop_info := self._informer.info.loop_info:
175            full_text += (
176                Text("   [")
177                + Text(
178                    f"{loop_info.current}/{loop_info.end}",
179                    style=CFG.presenter.brief_info.loop_info_color,
180                )
181                + "]"
182            )
183
184        return full_text

Return a concise, colorized summary of the job's current state.

Returns:

Text: A Rich Text object containing brief information about the job.

@deprecated('This function has been deprecated, use get_brief_info instead.')
def get_short_info(self, print_dir: bool) -> rich.text.Text:
186    @deprecated("This function has been deprecated, use get_brief_info instead.")
187    def get_short_info(self, print_dir: bool) -> Text:
188        """
189        Return a concise, colorized summary of the job's current state.
190
191        This function is deprecated, use `get_brief_info` instead.
192
193        Returns:
194            Text: A Rich `Text` object containing brief information about the job.
195        """
196        return self.get_brief_info(print_dir)

Return a concise, colorized summary of the job's current state.

This function is deprecated, use get_brief_info instead.

Returns:

Text: A Rich Text object containing brief information about the job.