qq_lib.info
Facilities for loading, interpreting, and presenting qq job information.
This module defines the Informer class, which loads qq job metadata,
combines information from info files and from the batch system, and provides
access to runtime details such as working nodes, submission/start/end times, and resources.
It also provides the Presenter class, which formats this information into
Rich-based status panels, full job-information views, and compact summaries used
throughout qq's CLI.
1# Released under MIT License. 2# Copyright (c) 2025-2026 Ladislav Bartos and Robert Vacha Lab 3 4""" 5Facilities for loading, interpreting, and presenting qq job information. 6 7This module defines the `Informer` class, which loads qq job metadata, 8combines information from info files and from the batch system, and provides 9access to runtime details such as working nodes, submission/start/end times, and resources. 10 11It also provides the `Presenter` class, which formats this information into 12Rich-based status panels, full job-information views, and compact summaries used 13throughout qq's CLI. 14""" 15 16from .informer import Informer 17from .presenter import Presenter 18 19__all__ = ["Informer", "Presenter"]
23class Informer: 24 """ 25 Provides an interface to access and manipulate qq job information. 26 """ 27 28 def __init__(self, info: Info): 29 """ 30 Initialize the informer with job information. 31 32 Args: 33 info (Info): An Info object containing raw job data. 34 """ 35 self.info = info 36 self._batch_info: BatchJobInterface | None = None 37 38 @property 39 def batch_system(self) -> type[BatchInterface]: 40 """ 41 Return the batch system class used for this job. 42 43 Returns: 44 type[BatchInterface]: The batch system class used for this job. 45 """ 46 return self.info.batch_system 47 48 @classmethod 49 def from_file(cls, file: Path, host: str | None = None) -> Self: 50 """ 51 Create an Informer by loading job information from a file. 52 53 If 'host' is provided, the file is read from the remote host; otherwise, it is read locally. 54 55 Args: 56 file (Path): Path to a YAML file containing job information. 57 host (str | None): Optional remote host from which to read the file. 58 59 Returns: 60 Informer: An instance initialized with the loaded Info. 61 62 Raises: 63 QQError: If the file cannot be read, reached, or parsed correctly. 64 """ 65 return cls(Info.from_file(file, host)) 66 67 @classmethod 68 def from_job_id(cls, job_id: str) -> Self: 69 """ 70 Load an `Informer` from a job ID. 71 72 Retrieves batch metadata for the given job and loads its qq info file. 73 This is more efficient than resolving the info-file path separately, 74 as it also sets `_batch_info` without additional batch-system queries. 75 76 Args: 77 job_id (str): The job identifier. 78 79 Returns: 80 Informer: The loaded informer. 81 82 Raises: 83 QQError: If the job does not exist or is not a valid qq job. 84 QQJobMismatchError: If the info file does not correspond to the job's ID. 85 """ 86 BatchSystem = BatchInterface.from_env_var_or_guess() 87 batch_job: BatchJobInterface = BatchSystem.get_batch_job(job_id) 88 89 if batch_job.is_empty(): 90 raise QQError(f"Job '{job_id}' does not exist.") 91 92 return cls.from_batch_job(batch_job) 93 94 @classmethod 95 def from_batch_job(cls, batch_job: BatchJobInterface) -> Self: 96 """ 97 Load an `Informer` from batch job information. 98 99 Raises an exception if the job is not a qq job. 100 This is more efficient than resolving the info-file path separately, 101 as it also sets `_batch_info` without additional batch-system queries. 102 103 Args: 104 batch_job (BatchJobInterface): The job info provided by the batch system. 105 106 Returns: 107 Informer: The loaded informer. 108 109 Raises: 110 QQError: If the job is not a valid qq job (missing info file). 111 QQJobMismatchError: If the info file does not correspond to the job's ID. 112 """ 113 if not (path := batch_job.get_info_file()): 114 raise QQError(f"Job '{batch_job.get_id()}' is not a valid qq job.") 115 116 informer = cls.from_file(path) 117 118 # check that the loaded info file actually corresponds to the batch job's ID 119 if not informer.matches_job(batch_job.get_id()): 120 raise QQJobMismatchError( 121 f"Info file for job '{batch_job.get_id()}' does not exist or is not reachable." 122 ) 123 124 informer._batch_info = batch_job 125 return informer 126 127 @staticmethod 128 def set_batch_info_in_bulk(informers: list[Informer]) -> None: 129 """ 130 Set the batch info for a list of informers in bulk, 131 by querying the batch system as few times as possible. 132 133 If the corresponding batch job no longer exists, batch info is still set, 134 but the BatchJobInterface is empty. 135 136 Args: 137 informers (list[Informer]): The list of informers to set the batch info for. 138 """ 139 if not informers: 140 return 141 142 job_ids = [informer.info.job_id for informer in informers] 143 batch_jobs = informers[0].batch_system.get_batch_jobs_from_ids(job_ids) 144 for informer, batch_job in zip(informers, batch_jobs): 145 informer._batch_info = batch_job 146 147 def to_file(self, file: Path, host: str | None = None) -> None: 148 """ 149 Export the job information to a file. 150 151 If `host` is provided, the file is written to the remote host; otherwise, it is written locally. 152 153 Args: 154 file (Path): Path to the output YAML file. 155 host (str | None): Optional remote host where the file should be written. 156 157 Raises: 158 QQError: If the file cannot be created, reached, or written to. 159 """ 160 self.info.to_file(file, host) 161 162 def matches_job(self, job_id: str) -> bool: 163 """ 164 Determine whether this informer corresponds to the specified job ID. 165 166 Args: 167 job_id (str): The job ID to compare against (e.g., "12345" or "12345.cluster.domain"). 168 169 Returns: 170 bool: True if both job IDs refer to the same job (same numeric/job part), 171 False otherwise. 172 """ 173 return self.info.job_id.split(".", 1)[0] == job_id.split(".", 1)[0] 174 175 def set_running( 176 self, time: datetime, main_node: str, all_nodes: list[str], work_dir: Path 177 ) -> None: 178 """ 179 Mark the job as running and set associated metadata. 180 181 Args: 182 time (datetime): Job start time. 183 main_node (str): Main node assigned to the job. 184 work_dir (Path): Working directory used by the job. 185 """ 186 self.info.job_state = NaiveState.RUNNING 187 self.info.start_time = time 188 self.info.main_node = main_node 189 self.info.all_nodes = all_nodes 190 self.info.work_dir = work_dir 191 192 def set_finished(self, time: datetime) -> None: 193 """ 194 Mark the job as finished successfully. 195 196 Args: 197 time (datetime): Job completion time. 198 """ 199 self.info.job_state = NaiveState.FINISHED 200 self.info.completion_time = time 201 self.info.job_exit_code = 0 202 203 def set_failed(self, time: datetime, exit_code: int) -> None: 204 """ 205 Mark the job as failed. 206 207 Args: 208 time (datetime): Job completion (failure) time. 209 exit_code (int): Exit code of the failed job. 210 """ 211 self.info.job_state = NaiveState.FAILED 212 self.info.completion_time = time 213 self.info.job_exit_code = exit_code 214 215 def set_killed(self, time: datetime) -> None: 216 """ 217 Mark the job as killed. 218 219 Args: 220 time (datetime): Time when the job was killed. 221 """ 222 self.info.job_state = NaiveState.KILLED 223 self.info.completion_time = time 224 # no exit code is intentionally set 225 226 def uses_scratch(self) -> bool: 227 """ 228 Determine if the job uses a scratch directory. 229 230 Returns: 231 bool: True if a scratch is used, False if it is not. 232 """ 233 return self.info.resources.uses_scratch() 234 235 def get_destination(self) -> tuple[str, Path] | None: 236 """ 237 Retrieve the job's main node and working directory. 238 239 Returns: 240 tuple[str, Path] | None: A tuple of (main_node, work_dir) 241 if both are set, otherwise None. 242 """ 243 if (main_node := self.info.main_node) and (work_dir := self.info.work_dir): 244 return main_node, work_dir 245 return None 246 247 def load_batch_info(self) -> None: 248 """ 249 Load the batch job information from the batch system if it's not already loaded. 250 """ 251 if self._batch_info is None: 252 self._batch_info = self.batch_system.get_batch_job(self.info.job_id) 253 254 def set_batch_info(self, batch_info: BatchJobInterface) -> None: 255 """ 256 Set the batch job information. 257 """ 258 self._batch_info = batch_info 259 260 def get_batch_state(self) -> BatchState: 261 """ 262 Return the job's state as reported by the batch system. 263 264 Uses cached information if available; otherwise queries the batch system 265 via `batch_system.get_batch_job`. This avoids unnecessary remote calls. 266 267 Returns: 268 BatchState: The job's state according to the batch system. 269 """ 270 if not self._batch_info: 271 self._batch_info = self.batch_system.get_batch_job(self.info.job_id) 272 273 return self._batch_info.get_state() 274 275 def get_real_state(self) -> RealState: 276 """ 277 Get the job's real state by combining the internal state (`NaiveState`) 278 with the state reported by the batch system (`BatchState`). 279 280 Uses cached information if available; otherwise queries the batch system 281 via `batch_system.get_batch_job`. This avoids unnecessary remote calls. 282 283 Returns: 284 RealState: The job's real state obtained by combining information 285 from qq and the batch system. 286 """ 287 # shortcut: if the naive state is unknown, there is no need to check batch state 288 if self.info.job_state in { 289 NaiveState.UNKNOWN, 290 }: 291 logger.debug( 292 "Short-circuiting get_real_state: the batch state will not affect the result." 293 ) 294 return RealState.from_states(self.info.job_state, BatchState.UNKNOWN) 295 296 return RealState.from_states(self.info.job_state, self.get_batch_state()) 297 298 def get_comment(self) -> str | None: 299 """ 300 Return the job's comment as reported by the batch system. 301 302 Uses cached information if available; otherwise queries the batch system 303 via `batch_system.get_batch_job`. This avoids unnecessary remote calls. 304 305 Returns: 306 str | None: The job comment if available, otherwise None. 307 """ 308 if not self._batch_info: 309 self._batch_info = self.batch_system.get_batch_job(self.info.job_id) 310 311 return self._batch_info.get_comment() 312 313 def get_estimated(self) -> tuple[datetime, str] | None: 314 """ 315 Return the estimated start time and execution node for the job. 316 317 Uses cached information if available; otherwise queries the batch system 318 via `batch_system.get_batch_job`. This avoids unnecessary remote calls. 319 320 Returns: 321 tuple[datetime, str] | None: A tuple containing the estimated start time 322 (as a datetime) and the execution node (as a string), or None if the 323 information is not available. 324 """ 325 if not self._batch_info: 326 self._batch_info = self.batch_system.get_batch_job(self.info.job_id) 327 328 return self._batch_info.get_estimated() 329 330 def get_main_node(self) -> str | None: 331 """ 332 Return the main execution node for the job. 333 334 Note that this obtains the node information from the batch system itself! 335 336 Uses cached information if available; otherwise queries the batch system 337 via `batch_system.get_batch_job`. This avoids unnecessary remote calls. 338 339 Returns: 340 str | None: The hostname of the main execution node, or None if the 341 information is not available. 342 """ 343 if not self._batch_info: 344 self._batch_info = self.batch_system.get_batch_job(self.info.job_id) 345 346 return self._batch_info.get_main_node() 347 348 def get_nodes(self) -> list[str] | None: 349 """ 350 Retrieve the list of execution nodes on which the job is running. 351 352 Note that this obtains the node information from the batch system itself! 353 354 Uses cached information if available; otherwise queries the batch system 355 via `batch_system.get_batch_job`. This avoids unnecessary remote calls. 356 357 Returns: 358 list[str] | None: 359 A list of hostnames (or node identifiers) where the job is running, 360 or `None` if the job has not started or node information is unavailable. 361 """ 362 if not self._batch_info: 363 self._batch_info = self.batch_system.get_batch_job(self.info.job_id) 364 365 return self._batch_info.get_nodes() 366 367 def get_batch_info(self) -> BatchJobInterface: 368 """ 369 Return cached batch job information if available; otherwise fetch it from 370 the batch system, cache it, and return the result. 371 372 Returns: 373 BatchJobInterface: The information about the job from the batch system. 374 """ 375 if self._batch_info is None: 376 self._batch_info = self.batch_system.get_batch_job(self.info.job_id) 377 return self._batch_info 378 379 def get_info_file(self) -> Path: 380 """ 381 Get absolute path to the info file associated with this job. 382 383 Be aware that the info file does not have to exist. 384 385 Returns: 386 Path: Absolute path to the info file. 387 """ 388 return construct_info_file_path(self.info.input_dir, self.info.job_name) 389 390 def should_transfer_files(self, exit_code: int) -> bool: 391 """ 392 Determine whether files should be transferred from the working directory. 393 394 Checks the configured transfer mode to decide if files should be transferred 395 from the job's working directory to the input directory based on the job's 396 exit code. 397 398 Args: 399 exit_code: The exit code of the completed job. 400 401 Returns: 402 True if files should be transferred, False otherwise. 403 """ 404 return any(mode.should_transfer(exit_code) for mode in self.info.transfer_mode) 405 406 def should_archive_files(self, exit_code: int) -> bool: 407 """ 408 Determine whether files should be archived from the working directory to the archive directory. 409 410 Checks the configured archive mode to decide if files should be archived 411 from the job's working directory. This only applies to loop jobs. 412 413 Args: 414 exit_code: The exit code of the completed job iteration. 415 416 Returns: 417 True if files should be archived, False if the job is not a loop job or 418 the archive mode does not trigger on the given exit code. 419 """ 420 if (loop_info := self.info.loop_info) is None: 421 return False 422 423 return any(mode.should_transfer(exit_code) for mode in loop_info.archive_mode)
Provides an interface to access and manipulate qq job information.
28 def __init__(self, info: Info): 29 """ 30 Initialize the informer with job information. 31 32 Args: 33 info (Info): An Info object containing raw job data. 34 """ 35 self.info = info 36 self._batch_info: BatchJobInterface | None = None
Initialize the informer with job information.
Arguments:
- info (Info): An Info object containing raw job data.
38 @property 39 def batch_system(self) -> type[BatchInterface]: 40 """ 41 Return the batch system class used for this job. 42 43 Returns: 44 type[BatchInterface]: The batch system class used for this job. 45 """ 46 return self.info.batch_system
Return the batch system class used for this job.
Returns:
type[BatchInterface]: The batch system class used for this job.
48 @classmethod 49 def from_file(cls, file: Path, host: str | None = None) -> Self: 50 """ 51 Create an Informer by loading job information from a file. 52 53 If 'host' is provided, the file is read from the remote host; otherwise, it is read locally. 54 55 Args: 56 file (Path): Path to a YAML file containing job information. 57 host (str | None): Optional remote host from which to read the file. 58 59 Returns: 60 Informer: An instance initialized with the loaded Info. 61 62 Raises: 63 QQError: If the file cannot be read, reached, or parsed correctly. 64 """ 65 return cls(Info.from_file(file, host))
Create an Informer by loading job information from a file.
If 'host' is provided, the file is read from the remote host; otherwise, it is read locally.
Arguments:
- file (Path): Path to a YAML file containing job information.
- host (str | None): Optional remote host from which to read the file.
Returns:
Informer: An instance initialized with the loaded Info.
Raises:
- QQError: If the file cannot be read, reached, or parsed correctly.
67 @classmethod 68 def from_job_id(cls, job_id: str) -> Self: 69 """ 70 Load an `Informer` from a job ID. 71 72 Retrieves batch metadata for the given job and loads its qq info file. 73 This is more efficient than resolving the info-file path separately, 74 as it also sets `_batch_info` without additional batch-system queries. 75 76 Args: 77 job_id (str): The job identifier. 78 79 Returns: 80 Informer: The loaded informer. 81 82 Raises: 83 QQError: If the job does not exist or is not a valid qq job. 84 QQJobMismatchError: If the info file does not correspond to the job's ID. 85 """ 86 BatchSystem = BatchInterface.from_env_var_or_guess() 87 batch_job: BatchJobInterface = BatchSystem.get_batch_job(job_id) 88 89 if batch_job.is_empty(): 90 raise QQError(f"Job '{job_id}' does not exist.") 91 92 return cls.from_batch_job(batch_job)
Load an Informer from a job ID.
Retrieves batch metadata for the given job and loads its qq info file.
This is more efficient than resolving the info-file path separately,
as it also sets _batch_info without additional batch-system queries.
Arguments:
- job_id (str): The job identifier.
Returns:
Informer: The loaded informer.
Raises:
- QQError: If the job does not exist or is not a valid qq job.
- QQJobMismatchError: If the info file does not correspond to the job's ID.
94 @classmethod 95 def from_batch_job(cls, batch_job: BatchJobInterface) -> Self: 96 """ 97 Load an `Informer` from batch job information. 98 99 Raises an exception if the job is not a qq job. 100 This is more efficient than resolving the info-file path separately, 101 as it also sets `_batch_info` without additional batch-system queries. 102 103 Args: 104 batch_job (BatchJobInterface): The job info provided by the batch system. 105 106 Returns: 107 Informer: The loaded informer. 108 109 Raises: 110 QQError: If the job is not a valid qq job (missing info file). 111 QQJobMismatchError: If the info file does not correspond to the job's ID. 112 """ 113 if not (path := batch_job.get_info_file()): 114 raise QQError(f"Job '{batch_job.get_id()}' is not a valid qq job.") 115 116 informer = cls.from_file(path) 117 118 # check that the loaded info file actually corresponds to the batch job's ID 119 if not informer.matches_job(batch_job.get_id()): 120 raise QQJobMismatchError( 121 f"Info file for job '{batch_job.get_id()}' does not exist or is not reachable." 122 ) 123 124 informer._batch_info = batch_job 125 return informer
Load an Informer from batch job information.
Raises an exception if the job is not a qq job.
This is more efficient than resolving the info-file path separately,
as it also sets _batch_info without additional batch-system queries.
Arguments:
- batch_job (BatchJobInterface): The job info provided by the batch system.
Returns:
Informer: The loaded informer.
Raises:
- QQError: If the job is not a valid qq job (missing info file).
- QQJobMismatchError: If the info file does not correspond to the job's ID.
127 @staticmethod 128 def set_batch_info_in_bulk(informers: list[Informer]) -> None: 129 """ 130 Set the batch info for a list of informers in bulk, 131 by querying the batch system as few times as possible. 132 133 If the corresponding batch job no longer exists, batch info is still set, 134 but the BatchJobInterface is empty. 135 136 Args: 137 informers (list[Informer]): The list of informers to set the batch info for. 138 """ 139 if not informers: 140 return 141 142 job_ids = [informer.info.job_id for informer in informers] 143 batch_jobs = informers[0].batch_system.get_batch_jobs_from_ids(job_ids) 144 for informer, batch_job in zip(informers, batch_jobs): 145 informer._batch_info = batch_job
Set the batch info for a list of informers in bulk, by querying the batch system as few times as possible.
If the corresponding batch job no longer exists, batch info is still set, but the BatchJobInterface is empty.
Arguments:
- informers (list[Informer]): The list of informers to set the batch info for.
147 def to_file(self, file: Path, host: str | None = None) -> None: 148 """ 149 Export the job information to a file. 150 151 If `host` is provided, the file is written to the remote host; otherwise, it is written locally. 152 153 Args: 154 file (Path): Path to the output YAML file. 155 host (str | None): Optional remote host where the file should be written. 156 157 Raises: 158 QQError: If the file cannot be created, reached, or written to. 159 """ 160 self.info.to_file(file, host)
Export the job information to a file.
If host is provided, the file is written to the remote host; otherwise, it is written locally.
Arguments:
- file (Path): Path to the output YAML file.
- host (str | None): Optional remote host where the file should be written.
Raises:
- QQError: If the file cannot be created, reached, or written to.
162 def matches_job(self, job_id: str) -> bool: 163 """ 164 Determine whether this informer corresponds to the specified job ID. 165 166 Args: 167 job_id (str): The job ID to compare against (e.g., "12345" or "12345.cluster.domain"). 168 169 Returns: 170 bool: True if both job IDs refer to the same job (same numeric/job part), 171 False otherwise. 172 """ 173 return self.info.job_id.split(".", 1)[0] == job_id.split(".", 1)[0]
Determine whether this informer corresponds to the specified job ID.
Arguments:
- job_id (str): The job ID to compare against (e.g., "12345" or "12345.cluster.domain").
Returns:
bool: True if both job IDs refer to the same job (same numeric/job part), False otherwise.
175 def set_running( 176 self, time: datetime, main_node: str, all_nodes: list[str], work_dir: Path 177 ) -> None: 178 """ 179 Mark the job as running and set associated metadata. 180 181 Args: 182 time (datetime): Job start time. 183 main_node (str): Main node assigned to the job. 184 work_dir (Path): Working directory used by the job. 185 """ 186 self.info.job_state = NaiveState.RUNNING 187 self.info.start_time = time 188 self.info.main_node = main_node 189 self.info.all_nodes = all_nodes 190 self.info.work_dir = work_dir
Mark the job as running and set associated metadata.
Arguments:
- time (datetime): Job start time.
- main_node (str): Main node assigned to the job.
- work_dir (Path): Working directory used by the job.
192 def set_finished(self, time: datetime) -> None: 193 """ 194 Mark the job as finished successfully. 195 196 Args: 197 time (datetime): Job completion time. 198 """ 199 self.info.job_state = NaiveState.FINISHED 200 self.info.completion_time = time 201 self.info.job_exit_code = 0
Mark the job as finished successfully.
Arguments:
- time (datetime): Job completion time.
203 def set_failed(self, time: datetime, exit_code: int) -> None: 204 """ 205 Mark the job as failed. 206 207 Args: 208 time (datetime): Job completion (failure) time. 209 exit_code (int): Exit code of the failed job. 210 """ 211 self.info.job_state = NaiveState.FAILED 212 self.info.completion_time = time 213 self.info.job_exit_code = exit_code
Mark the job as failed.
Arguments:
- time (datetime): Job completion (failure) time.
- exit_code (int): Exit code of the failed job.
215 def set_killed(self, time: datetime) -> None: 216 """ 217 Mark the job as killed. 218 219 Args: 220 time (datetime): Time when the job was killed. 221 """ 222 self.info.job_state = NaiveState.KILLED 223 self.info.completion_time = time 224 # no exit code is intentionally set
Mark the job as killed.
Arguments:
- time (datetime): Time when the job was killed.
226 def uses_scratch(self) -> bool: 227 """ 228 Determine if the job uses a scratch directory. 229 230 Returns: 231 bool: True if a scratch is used, False if it is not. 232 """ 233 return self.info.resources.uses_scratch()
Determine if the job uses a scratch directory.
Returns:
bool: True if a scratch is used, False if it is not.
235 def get_destination(self) -> tuple[str, Path] | None: 236 """ 237 Retrieve the job's main node and working directory. 238 239 Returns: 240 tuple[str, Path] | None: A tuple of (main_node, work_dir) 241 if both are set, otherwise None. 242 """ 243 if (main_node := self.info.main_node) and (work_dir := self.info.work_dir): 244 return main_node, work_dir 245 return None
Retrieve the job's main node and working directory.
Returns:
tuple[str, Path] | None: A tuple of (main_node, work_dir) if both are set, otherwise None.
247 def load_batch_info(self) -> None: 248 """ 249 Load the batch job information from the batch system if it's not already loaded. 250 """ 251 if self._batch_info is None: 252 self._batch_info = self.batch_system.get_batch_job(self.info.job_id)
Load the batch job information from the batch system if it's not already loaded.
254 def set_batch_info(self, batch_info: BatchJobInterface) -> None: 255 """ 256 Set the batch job information. 257 """ 258 self._batch_info = batch_info
Set the batch job information.
260 def get_batch_state(self) -> BatchState: 261 """ 262 Return the job's state as reported by the batch system. 263 264 Uses cached information if available; otherwise queries the batch system 265 via `batch_system.get_batch_job`. This avoids unnecessary remote calls. 266 267 Returns: 268 BatchState: The job's state according to the batch system. 269 """ 270 if not self._batch_info: 271 self._batch_info = self.batch_system.get_batch_job(self.info.job_id) 272 273 return self._batch_info.get_state()
Return the job's state as reported by the batch system.
Uses cached information if available; otherwise queries the batch system
via batch_system.get_batch_job. This avoids unnecessary remote calls.
Returns:
BatchState: The job's state according to the batch system.
275 def get_real_state(self) -> RealState: 276 """ 277 Get the job's real state by combining the internal state (`NaiveState`) 278 with the state reported by the batch system (`BatchState`). 279 280 Uses cached information if available; otherwise queries the batch system 281 via `batch_system.get_batch_job`. This avoids unnecessary remote calls. 282 283 Returns: 284 RealState: The job's real state obtained by combining information 285 from qq and the batch system. 286 """ 287 # shortcut: if the naive state is unknown, there is no need to check batch state 288 if self.info.job_state in { 289 NaiveState.UNKNOWN, 290 }: 291 logger.debug( 292 "Short-circuiting get_real_state: the batch state will not affect the result." 293 ) 294 return RealState.from_states(self.info.job_state, BatchState.UNKNOWN) 295 296 return RealState.from_states(self.info.job_state, self.get_batch_state())
Get the job's real state by combining the internal state (NaiveState)
with the state reported by the batch system (BatchState).
Uses cached information if available; otherwise queries the batch system
via batch_system.get_batch_job. This avoids unnecessary remote calls.
Returns:
RealState: The job's real state obtained by combining information from qq and the batch system.
298 def get_comment(self) -> str | None: 299 """ 300 Return the job's comment as reported by the batch system. 301 302 Uses cached information if available; otherwise queries the batch system 303 via `batch_system.get_batch_job`. This avoids unnecessary remote calls. 304 305 Returns: 306 str | None: The job comment if available, otherwise None. 307 """ 308 if not self._batch_info: 309 self._batch_info = self.batch_system.get_batch_job(self.info.job_id) 310 311 return self._batch_info.get_comment()
Return the job's comment as reported by the batch system.
Uses cached information if available; otherwise queries the batch system
via batch_system.get_batch_job. This avoids unnecessary remote calls.
Returns:
str | None: The job comment if available, otherwise None.
313 def get_estimated(self) -> tuple[datetime, str] | None: 314 """ 315 Return the estimated start time and execution node for the job. 316 317 Uses cached information if available; otherwise queries the batch system 318 via `batch_system.get_batch_job`. This avoids unnecessary remote calls. 319 320 Returns: 321 tuple[datetime, str] | None: A tuple containing the estimated start time 322 (as a datetime) and the execution node (as a string), or None if the 323 information is not available. 324 """ 325 if not self._batch_info: 326 self._batch_info = self.batch_system.get_batch_job(self.info.job_id) 327 328 return self._batch_info.get_estimated()
Return the estimated start time and execution node for the job.
Uses cached information if available; otherwise queries the batch system
via batch_system.get_batch_job. This avoids unnecessary remote calls.
Returns:
tuple[datetime, str] | None: A tuple containing the estimated start time (as a datetime) and the execution node (as a string), or None if the information is not available.
330 def get_main_node(self) -> str | None: 331 """ 332 Return the main execution node for the job. 333 334 Note that this obtains the node information from the batch system itself! 335 336 Uses cached information if available; otherwise queries the batch system 337 via `batch_system.get_batch_job`. This avoids unnecessary remote calls. 338 339 Returns: 340 str | None: The hostname of the main execution node, or None if the 341 information is not available. 342 """ 343 if not self._batch_info: 344 self._batch_info = self.batch_system.get_batch_job(self.info.job_id) 345 346 return self._batch_info.get_main_node()
Return the main execution node for the job.
Note that this obtains the node information from the batch system itself!
Uses cached information if available; otherwise queries the batch system
via batch_system.get_batch_job. This avoids unnecessary remote calls.
Returns:
str | None: The hostname of the main execution node, or None if the information is not available.
348 def get_nodes(self) -> list[str] | None: 349 """ 350 Retrieve the list of execution nodes on which the job is running. 351 352 Note that this obtains the node information from the batch system itself! 353 354 Uses cached information if available; otherwise queries the batch system 355 via `batch_system.get_batch_job`. This avoids unnecessary remote calls. 356 357 Returns: 358 list[str] | None: 359 A list of hostnames (or node identifiers) where the job is running, 360 or `None` if the job has not started or node information is unavailable. 361 """ 362 if not self._batch_info: 363 self._batch_info = self.batch_system.get_batch_job(self.info.job_id) 364 365 return self._batch_info.get_nodes()
Retrieve the list of execution nodes on which the job is running.
Note that this obtains the node information from the batch system itself!
Uses cached information if available; otherwise queries the batch system
via batch_system.get_batch_job. This avoids unnecessary remote calls.
Returns:
list[str] | None: A list of hostnames (or node identifiers) where the job is running, or
Noneif the job has not started or node information is unavailable.
367 def get_batch_info(self) -> BatchJobInterface: 368 """ 369 Return cached batch job information if available; otherwise fetch it from 370 the batch system, cache it, and return the result. 371 372 Returns: 373 BatchJobInterface: The information about the job from the batch system. 374 """ 375 if self._batch_info is None: 376 self._batch_info = self.batch_system.get_batch_job(self.info.job_id) 377 return self._batch_info
Return cached batch job information if available; otherwise fetch it from the batch system, cache it, and return the result.
Returns:
BatchJobInterface: The information about the job from the batch system.
379 def get_info_file(self) -> Path: 380 """ 381 Get absolute path to the info file associated with this job. 382 383 Be aware that the info file does not have to exist. 384 385 Returns: 386 Path: Absolute path to the info file. 387 """ 388 return construct_info_file_path(self.info.input_dir, self.info.job_name)
Get absolute path to the info file associated with this job.
Be aware that the info file does not have to exist.
Returns:
Path: Absolute path to the info file.
390 def should_transfer_files(self, exit_code: int) -> bool: 391 """ 392 Determine whether files should be transferred from the working directory. 393 394 Checks the configured transfer mode to decide if files should be transferred 395 from the job's working directory to the input directory based on the job's 396 exit code. 397 398 Args: 399 exit_code: The exit code of the completed job. 400 401 Returns: 402 True if files should be transferred, False otherwise. 403 """ 404 return any(mode.should_transfer(exit_code) for mode in self.info.transfer_mode)
Determine whether files should be transferred from the working directory.
Checks the configured transfer mode to decide if files should be transferred from the job's working directory to the input directory based on the job's exit code.
Arguments:
- exit_code: The exit code of the completed job.
Returns:
True if files should be transferred, False otherwise.
406 def should_archive_files(self, exit_code: int) -> bool: 407 """ 408 Determine whether files should be archived from the working directory to the archive directory. 409 410 Checks the configured archive mode to decide if files should be archived 411 from the job's working directory. This only applies to loop jobs. 412 413 Args: 414 exit_code: The exit code of the completed job iteration. 415 416 Returns: 417 True if files should be archived, False if the job is not a loop job or 418 the archive mode does not trigger on the given exit code. 419 """ 420 if (loop_info := self.info.loop_info) is None: 421 return False 422 423 return any(mode.should_transfer(exit_code) for mode in loop_info.archive_mode)
Determine whether files should be archived from the working directory to the archive directory.
Checks the configured archive mode to decide if files should be archived from the job's working directory. This only applies to loop jobs.
Arguments:
- exit_code: The exit code of the completed job iteration.
Returns:
True if files should be archived, False if the job is not a loop job or the archive mode does not trigger on the given exit code.
26class Presenter: 27 """ 28 Presents information about a qq job. 29 """ 30 31 def __init__(self, informer: Informer): 32 """ 33 Initialize the presenter with an Informer. 34 35 Args: 36 informer (Informer): The informer object that provides 37 access to qq job metadata and runtime details. 38 """ 39 self._informer = informer 40 41 def create_job_status_panel(self, console: Console | None = None) -> Group: 42 """ 43 Create a standalone status panel for the job. 44 45 Args: 46 console (Console | None): Optional Rich console. 47 If not provided, a new Console is created. 48 49 Returns: 50 Group: A Rich Group containing the status panel. 51 """ 52 console = console or Console() 53 54 panel = Panel( 55 self._create_job_status_table(self._informer.get_real_state()), 56 title=Text( 57 f"JOB: {self._informer.info.job_id}", 58 style=CFG.presenter.job_status_panel.title_style, 59 justify="center", 60 ), 61 border_style=CFG.presenter.job_status_panel.border_style, 62 padding=(1, 2), 63 width=get_panel_width( 64 console, 65 3, 66 CFG.presenter.job_status_panel.min_width, 67 CFG.presenter.job_status_panel.max_width, 68 ), 69 ) 70 71 return Group(Text(""), panel, Text("")) 72 73 def create_full_info_panel(self, console: Console | None = None) -> Group: 74 """ 75 Create a full job information panel. 76 77 Args: 78 console (Console | None): Optional Rich console. 79 If not provided, a new Console is created. 80 81 Returns: 82 Group: A Rich Group containing the full job info panel. 83 """ 84 85 console = console or Console() 86 87 state = self._informer.get_real_state() 88 comment, estimated = self._get_comment_and_estimated(state) 89 90 content = Group( 91 Padding(self._create_basic_info_table(), (0, 2)), 92 Text(""), 93 Rule( 94 title=Text( 95 "RESOURCES", style=CFG.presenter.full_info_panel.title_style 96 ), 97 style=CFG.presenter.full_info_panel.rule_style, 98 ), 99 Text(""), 100 Padding( 101 Align.center(self._create_resources_table(console.size.width)), 102 (0, 2), 103 ), 104 Text(""), 105 Rule( 106 title=Text("HISTORY", style=CFG.presenter.full_info_panel.title_style), 107 style=CFG.presenter.full_info_panel.rule_style, 108 ), 109 Text(""), 110 Padding( 111 self._create_job_history_table( 112 state, self._informer.info.job_exit_code 113 ), 114 (0, 2), 115 ), 116 self._create_job_steps_block(), 117 Text(""), 118 Rule( 119 title=Text("STATE", style=CFG.presenter.full_info_panel.title_style), 120 style=CFG.presenter.full_info_panel.rule_style, 121 ), 122 Text(""), 123 Padding(self._create_job_status_table(state, comment, estimated), (0, 2)), 124 ) 125 126 # combine all sections 127 full_panel = Panel( 128 content, 129 title=Text( 130 f"JOB: {self._informer.info.job_id}", 131 style=CFG.presenter.full_info_panel.title_style, 132 justify="center", 133 ), 134 border_style=CFG.presenter.full_info_panel.border_style, 135 # no horizontal padding so Rule reaches borders 136 padding=(1, 0), 137 width=get_panel_width( 138 console, 139 3, 140 CFG.presenter.full_info_panel.min_width, 141 CFG.presenter.full_info_panel.max_width, 142 ), 143 ) 144 145 return Group(Text(""), full_panel, Text("")) 146 147 def get_brief_info(self, print_dir: bool) -> Text: 148 """ 149 Return a concise, colorized summary of the job's current state. 150 151 Returns: 152 Text: A Rich `Text` object containing brief information about the job. 153 """ 154 state = self._informer.get_real_state() 155 job_id = Text( 156 self._informer.info.job_id, 157 style=CFG.presenter.brief_info.job_id_color, 158 ) 159 state_text = Text(str(state), style=state.color) 160 161 if print_dir: 162 dir_path = Text( 163 str( 164 self._informer.info.input_dir.resolve().relative_to( 165 Path.cwd(), walk_up=True 166 ) 167 ), 168 style=CFG.presenter.brief_info.dir_path_color, 169 ) 170 full_text = job_id + " [" + dir_path + "] " + state_text 171 else: 172 full_text = job_id + " " + state_text 173 174 if loop_info := self._informer.info.loop_info: 175 full_text += ( 176 Text(" [") 177 + Text( 178 f"{loop_info.current}/{loop_info.end}", 179 style=CFG.presenter.brief_info.loop_info_color, 180 ) 181 + "]" 182 ) 183 184 return full_text 185 186 @deprecated("This function has been deprecated, use get_brief_info instead.") 187 def get_short_info(self, print_dir: bool) -> Text: 188 """ 189 Return a concise, colorized summary of the job's current state. 190 191 This function is deprecated, use `get_brief_info` instead. 192 193 Returns: 194 Text: A Rich `Text` object containing brief information about the job. 195 """ 196 return self.get_brief_info(print_dir) 197 198 def _create_basic_info_table(self) -> Table: 199 """ 200 Create a table with basic job information. 201 202 Returns: 203 Table: A Rich table with key-value pairs of basic job details. 204 """ 205 table = Table(show_header=False, box=None, padding=(0, 1)) 206 table.add_column(justify="right", style=CFG.presenter.key_style) 207 table.add_column( 208 justify="left", overflow="fold", style=CFG.presenter.value_style 209 ) 210 211 table.add_row("Job name:", Text(self._informer.info.job_name)) 212 213 loop_info = self._informer.info.loop_info 214 job_type_str = str(self._informer.info.job_type) 215 216 if loop_info: 217 content = f"{job_type_str} [{loop_info.current}/{loop_info.end}]" 218 else: 219 content = job_type_str 220 221 table.add_row("Job type:", Text(content)) 222 table.add_row("Submission queue:", Text(self._informer.info.queue)) 223 table.add_row("Input machine:", Text(self._informer.info.input_machine)) 224 table.add_row("Input directory:", Text(str(self._informer.info.input_dir))) 225 if self._informer.info.main_node and self._informer.info.all_nodes: 226 if len(self._informer.info.all_nodes) == 1: 227 table.add_row("Working node:", Text(self._informer.info.main_node)) 228 else: 229 table.add_row( 230 "Working nodes:", 231 Text(" + ".join(self._informer.info.all_nodes)), 232 ) 233 if self._informer.info.work_dir: 234 table.add_row( 235 "Working directory:", 236 Text(str(self._informer.info.work_dir)), 237 ) 238 239 return table 240 241 def _create_resources_table(self, term_width: int) -> Table: 242 """ 243 Create a table displaying job resource requirements. 244 245 Args: 246 term_width (int): Width of the current terminal, used 247 to size the spacer column. 248 249 Returns: 250 Table: A Rich table summarizing resource allocations. 251 """ 252 resources = self._informer.info.resources 253 table = Table(show_header=False, box=None, padding=(0, 1)) 254 255 table.add_column(justify="right", style=CFG.presenter.key_style, no_wrap=True) 256 table.add_column( 257 justify="left", 258 style=CFG.presenter.value_style, 259 no_wrap=False, 260 overflow="fold", 261 ) 262 # spacer column 263 table.add_column(justify="center", width=term_width // 30) 264 table.add_column(justify="right", style=CFG.presenter.key_style, no_wrap=True) 265 table.add_column( 266 justify="left", 267 style=CFG.presenter.value_style, 268 no_wrap=False, 269 overflow="fold", 270 ) 271 272 fields = vars(resources) 273 274 # filter out None values 275 items = [ 276 (k.replace("_", "-").lower(), str(v)) 277 for k, v in fields.items() 278 if v is not None and k != "props" 279 ] 280 281 # translate properties 282 if resources.props: 283 items.extend([(k, str(v)) for k, v in resources.props.items()]) 284 285 for i in range(0, len(items), 2): 286 row = items[i] 287 if i + 1 < len(items): 288 row2 = items[i + 1] 289 table.add_row( 290 row[0] + ":", 291 Text(row[1]), 292 "", 293 row2[0] + ":", 294 Text(row2[1]), 295 ) 296 else: 297 # only one item left 298 table.add_row(row[0] + ":", Text(row[1]), "", "", "") 299 300 return table 301 302 def _create_job_history_table( 303 self, state: RealState, exit_code: int | None 304 ) -> Table: 305 """ 306 Create a table summarizing the job timeline. 307 308 Args: 309 state (RealState): State of the job. 310 311 Returns: 312 Table: A Rich table showing the chronological job history. 313 """ 314 submitted = self._informer.info.submission_time 315 started = self._informer.info.start_time 316 completed = self._informer.info.completion_time 317 318 table = Table(show_header=False, box=None, padding=(0, 1)) 319 320 table.add_column(justify="right", style=CFG.presenter.key_style) 321 table.add_column( 322 justify="left", style=CFG.presenter.value_style, overflow="fold" 323 ) 324 325 table.add_row("Submitted at:", Text(f"{submitted}")) 326 # job started 327 if started: 328 table.add_row( 329 "", 330 Text( 331 f"was queued for {format_duration_wdhhmmss(started - submitted)}", 332 style=CFG.presenter.notes_style, 333 ), 334 ) 335 table.add_row("Started at:", Text(f"{started}")) 336 # job is completed (or was killed after start) 337 if started and completed: 338 table.add_row( 339 "", 340 Text( 341 f"was running for {format_duration_wdhhmmss(completed - started)}", 342 style=CFG.presenter.notes_style, 343 ), 344 ) 345 table.add_row( 346 f"{Presenter._translate_state_to_completed_msg(state, exit_code).title()} at:", 347 Text(f"{completed}"), 348 ) 349 # job is "completed" (likely killed) but never started running 350 elif completed: 351 table.add_row( 352 "", 353 Text( 354 f"was queued for {format_duration_wdhhmmss(completed - submitted)}", 355 style=CFG.presenter.notes_style, 356 ), 357 ) 358 table.add_row( 359 f"{Presenter._translate_state_to_completed_msg(state, exit_code).title()} at:", 360 Text(f"{completed}"), 361 ) 362 363 return table 364 365 def _create_job_status_table( 366 self, 367 state: RealState, 368 comment: str | None = None, 369 estimated: tuple[datetime, str] | None = None, 370 ) -> Table: 371 """ 372 Create a table summarizing the current job status. 373 374 Args: 375 state (RealState): The current real state of the job. 376 377 Returns: 378 Table: A Rich table with job state and details. 379 """ 380 (message, details) = self._get_state_messages( 381 state, 382 self._informer.info.start_time or self._informer.info.submission_time, 383 self._informer.info.completion_time or datetime.now(), 384 ) 385 386 table = Table(show_header=False, box=None, padding=(0, 1)) 387 table.add_column(justify="right", style=CFG.presenter.key_style) 388 table.add_column(justify="left", style=CFG.presenter.value_style) 389 390 table.add_row("Job state:", Text(message, style=f"{state.color} bold")) 391 if details.strip(): 392 table.add_row("", Text(details)) 393 394 if estimated: 395 table.add_row( 396 "", 397 Text( 398 f"Planned start within {format_duration_wdhhmmss(estimated[0] - datetime.now())} on '{estimated[1]}'", 399 style=CFG.presenter.notes_style, 400 ), 401 ) 402 403 # comment is typically only useful if the estimated start time is not defined 404 if not estimated and comment: 405 table.add_row("", Text(comment, style=CFG.presenter.notes_style)) 406 407 return table 408 409 def _create_job_steps_table(self, steps: Sequence[BatchJobInterface]) -> Table: 410 """ 411 Create a formatted Rich table displaying job step information. 412 413 Steps without a valid start time are skipped. The resulting table is intended 414 to be used within full-info job panels. 415 416 Args: 417 steps (Sequence[BatchJobInterface]): A list of batch-system step objects belonging to the job. 418 419 Returns: 420 Table: A Rich table containing the formatted step information. 421 """ 422 table = Table(show_header=True, box=None, padding=(0, 1)) 423 424 table.add_column("Step", justify="center", style=CFG.presenter.key_style) 425 table.add_column( 426 "State", justify="center", style=CFG.presenter.value_style, overflow="fold" 427 ) 428 table.add_column( 429 "Start", justify="center", style=CFG.presenter.value_style, overflow="fold" 430 ) 431 table.add_column( 432 "End", justify="center", style=CFG.presenter.value_style, overflow="fold" 433 ) 434 table.add_column( 435 "Duration", 436 justify="center", 437 style=CFG.presenter.value_style, 438 overflow="fold", 439 ) 440 for step in steps: 441 state = step.get_state() 442 start = step.get_start_time() 443 end = step.get_completion_time() 444 445 if not start: 446 continue 447 448 table.add_row( 449 f"{step.get_step_id()}", 450 Text(state.to_code(), style=state.color), 451 start.strftime(CFG.date_formats.standard), 452 end.strftime(CFG.date_formats.standard) if end else "", 453 format_duration_wdhhmmss((end or datetime.now()) - start), 454 ) 455 456 return table 457 458 def _create_job_steps_block(self) -> Group: 459 """ 460 Create a Rich block containing the job-steps section of the full info panel. 461 462 This block includes a section heading ("STEPS") and the table of job steps 463 created by `_create_job_steps_table()`. The block is only shown when the job 464 contains more than one step; for single-step jobs, an empty block is returned. 465 466 Returns: 467 Group: A Rich group representing the job-steps section, or an empty group 468 if no multi-step information should be displayed. 469 """ 470 471 job: BatchJobInterface = self._informer.get_batch_info() 472 steps = job.get_steps() 473 474 # only show the job steps if there is more than 1 of them 475 if len(steps) > 1: 476 return Group( 477 Text(""), 478 Rule( 479 title=Text( 480 "STEPS", style=CFG.presenter.full_info_panel.title_style 481 ), 482 style=CFG.presenter.full_info_panel.rule_style, 483 ), 484 Text(""), 485 Padding(self._create_job_steps_table(steps), (0, 2)), 486 ) 487 488 return Group() 489 490 def _get_state_messages( 491 self, state: RealState, start_time: datetime, end_time: datetime 492 ) -> tuple[str, str]: 493 """ 494 Map a RealState to human-readable messages. 495 496 Args: 497 state (RealState): The current job state. 498 start_time (datetime): Start time of the relevant state period. 499 end_time (datetime): End time of the relevant state period. 500 501 Returns: 502 tuple[str, str]: A tuple containing: 503 - A short status message (e.g., "Job is running"). 504 - Additional details, such as elapsed time or error info. 505 """ 506 match state: 507 case RealState.QUEUED: 508 return ( 509 "Job is queued", 510 f"In queue for {format_duration_wdhhmmss(end_time - start_time)}", 511 ) 512 case RealState.HELD: 513 return ( 514 "Job is held", 515 f"In queue for {format_duration_wdhhmmss(end_time - start_time)}", 516 ) 517 case RealState.SUSPENDED: 518 return ("Job is suspended", "") 519 case RealState.WAITING: 520 return ( 521 "Job is waiting", 522 f"In queue for {format_duration_wdhhmmss(end_time - start_time)}", 523 ) 524 case RealState.RUNNING: 525 if not (all_nodes := self._informer.info.all_nodes) or not ( 526 main_node := self._informer.info.main_node 527 ): 528 nodes = "unknown node(s)" 529 elif len(all_nodes) == 1: 530 nodes = f"'{main_node}'" 531 else: 532 nodes = f"'{main_node}' and {len(all_nodes) - 1} other node{'s' if len(all_nodes) > 2 else ''}" 533 return ( 534 "Job is running", 535 f"Running for {format_duration_wdhhmmss(end_time - start_time)} on {nodes}", 536 ) 537 case RealState.BOOTING: 538 return ( 539 "Job is booting", 540 f"Preparing working directory on '{self._informer.get_main_node()}'", 541 ) 542 case RealState.KILLED: 543 return ( 544 "Job has been killed", 545 f"Killed at {end_time.strftime(CFG.date_formats.standard)}", 546 ) 547 case RealState.FAILED: 548 return ( 549 "Job has failed", 550 f"Failed at {end_time.strftime(CFG.date_formats.standard)} [exit code: {self._informer.info.job_exit_code}]", 551 ) 552 case RealState.FINISHED: 553 return ( 554 "Job has finished", 555 f"Completed at {end_time.strftime(CFG.date_formats.standard)}", 556 ) 557 case RealState.EXITING: 558 exit_code = self._informer.info.job_exit_code 559 if exit_code is None: 560 # no logged exit code -> job was killed 561 msg = "Job is being killed" 562 elif exit_code == 0: 563 msg = "Job is finishing successfully" 564 else: 565 msg = f"Job is failing [exit code: {exit_code}]" 566 567 return ( 568 "Job is exiting", 569 msg, 570 ) 571 case RealState.IN_AN_INCONSISTENT_STATE: 572 return ( 573 "Job is in an inconsistent state", 574 "The batch system and qq disagree on the status of the job", 575 ) 576 case RealState.UNKNOWN: 577 return ( 578 "Job is in an unknown state", 579 "Job is in a state that qq does not recognize", 580 ) 581 582 return ( 583 "Job is in an unknown state", 584 "Job is in a state that qq does not recognize", 585 ) 586 587 def _get_comment_and_estimated( 588 self, state: RealState 589 ) -> tuple[str | None, tuple[datetime, str] | None]: 590 """ 591 Retrieve the job comment and estimated start information 592 if the job is queued, held, waiting or suspended. 593 594 For jobs in other states, return (None, None). 595 596 Args: 597 state (RealState): The current job state. 598 599 Returns: 600 tuple[str | None, tuple[datetime, str] | None]: 601 A tuple containing: 602 - The job comment as a string, or None if unavailable. 603 - A tuple with the estimated start time (datetime) and execution node (str), 604 or None if unavailable or not applicable. 605 """ 606 if state in { 607 RealState.QUEUED, 608 RealState.HELD, 609 RealState.WAITING, 610 RealState.SUSPENDED, 611 }: 612 comment = self._informer.get_comment() 613 estimated = self._informer.get_estimated() 614 return comment, estimated 615 616 return None, None 617 618 @staticmethod 619 def _translate_state_to_completed_msg( 620 state: RealState, exit_code: None | int 621 ) -> str: 622 """ 623 Translates a RealState and optional exit code into a human-readable completion message. 624 625 Returns: 626 str: A string representing the final status of the job/process. 627 """ 628 if state == RealState.FINISHED or ( 629 state == RealState.EXITING and exit_code == 0 630 ): 631 return "finished" 632 633 if state == RealState.KILLED or ( 634 state == RealState.EXITING and exit_code is None 635 ): 636 return "killed" 637 638 if state == RealState.FAILED or (state == RealState.EXITING and exit_code != 0): 639 return "failed" 640 641 return "completed" # default option; should not happen
Presents information about a qq job.
31 def __init__(self, informer: Informer): 32 """ 33 Initialize the presenter with an Informer. 34 35 Args: 36 informer (Informer): The informer object that provides 37 access to qq job metadata and runtime details. 38 """ 39 self._informer = informer
Initialize the presenter with an Informer.
Arguments:
- informer (Informer): The informer object that provides access to qq job metadata and runtime details.
41 def create_job_status_panel(self, console: Console | None = None) -> Group: 42 """ 43 Create a standalone status panel for the job. 44 45 Args: 46 console (Console | None): Optional Rich console. 47 If not provided, a new Console is created. 48 49 Returns: 50 Group: A Rich Group containing the status panel. 51 """ 52 console = console or Console() 53 54 panel = Panel( 55 self._create_job_status_table(self._informer.get_real_state()), 56 title=Text( 57 f"JOB: {self._informer.info.job_id}", 58 style=CFG.presenter.job_status_panel.title_style, 59 justify="center", 60 ), 61 border_style=CFG.presenter.job_status_panel.border_style, 62 padding=(1, 2), 63 width=get_panel_width( 64 console, 65 3, 66 CFG.presenter.job_status_panel.min_width, 67 CFG.presenter.job_status_panel.max_width, 68 ), 69 ) 70 71 return Group(Text(""), panel, Text(""))
Create a standalone status panel for the job.
Arguments:
- console (Console | None): Optional Rich console. If not provided, a new Console is created.
Returns:
Group: A Rich Group containing the status panel.
73 def create_full_info_panel(self, console: Console | None = None) -> Group: 74 """ 75 Create a full job information panel. 76 77 Args: 78 console (Console | None): Optional Rich console. 79 If not provided, a new Console is created. 80 81 Returns: 82 Group: A Rich Group containing the full job info panel. 83 """ 84 85 console = console or Console() 86 87 state = self._informer.get_real_state() 88 comment, estimated = self._get_comment_and_estimated(state) 89 90 content = Group( 91 Padding(self._create_basic_info_table(), (0, 2)), 92 Text(""), 93 Rule( 94 title=Text( 95 "RESOURCES", style=CFG.presenter.full_info_panel.title_style 96 ), 97 style=CFG.presenter.full_info_panel.rule_style, 98 ), 99 Text(""), 100 Padding( 101 Align.center(self._create_resources_table(console.size.width)), 102 (0, 2), 103 ), 104 Text(""), 105 Rule( 106 title=Text("HISTORY", style=CFG.presenter.full_info_panel.title_style), 107 style=CFG.presenter.full_info_panel.rule_style, 108 ), 109 Text(""), 110 Padding( 111 self._create_job_history_table( 112 state, self._informer.info.job_exit_code 113 ), 114 (0, 2), 115 ), 116 self._create_job_steps_block(), 117 Text(""), 118 Rule( 119 title=Text("STATE", style=CFG.presenter.full_info_panel.title_style), 120 style=CFG.presenter.full_info_panel.rule_style, 121 ), 122 Text(""), 123 Padding(self._create_job_status_table(state, comment, estimated), (0, 2)), 124 ) 125 126 # combine all sections 127 full_panel = Panel( 128 content, 129 title=Text( 130 f"JOB: {self._informer.info.job_id}", 131 style=CFG.presenter.full_info_panel.title_style, 132 justify="center", 133 ), 134 border_style=CFG.presenter.full_info_panel.border_style, 135 # no horizontal padding so Rule reaches borders 136 padding=(1, 0), 137 width=get_panel_width( 138 console, 139 3, 140 CFG.presenter.full_info_panel.min_width, 141 CFG.presenter.full_info_panel.max_width, 142 ), 143 ) 144 145 return Group(Text(""), full_panel, Text(""))
Create a full job information panel.
Arguments:
- console (Console | None): Optional Rich console. If not provided, a new Console is created.
Returns:
Group: A Rich Group containing the full job info panel.
147 def get_brief_info(self, print_dir: bool) -> Text: 148 """ 149 Return a concise, colorized summary of the job's current state. 150 151 Returns: 152 Text: A Rich `Text` object containing brief information about the job. 153 """ 154 state = self._informer.get_real_state() 155 job_id = Text( 156 self._informer.info.job_id, 157 style=CFG.presenter.brief_info.job_id_color, 158 ) 159 state_text = Text(str(state), style=state.color) 160 161 if print_dir: 162 dir_path = Text( 163 str( 164 self._informer.info.input_dir.resolve().relative_to( 165 Path.cwd(), walk_up=True 166 ) 167 ), 168 style=CFG.presenter.brief_info.dir_path_color, 169 ) 170 full_text = job_id + " [" + dir_path + "] " + state_text 171 else: 172 full_text = job_id + " " + state_text 173 174 if loop_info := self._informer.info.loop_info: 175 full_text += ( 176 Text(" [") 177 + Text( 178 f"{loop_info.current}/{loop_info.end}", 179 style=CFG.presenter.brief_info.loop_info_color, 180 ) 181 + "]" 182 ) 183 184 return full_text
Return a concise, colorized summary of the job's current state.
Returns:
Text: A Rich
Textobject containing brief information about the job.
186 @deprecated("This function has been deprecated, use get_brief_info instead.") 187 def get_short_info(self, print_dir: bool) -> Text: 188 """ 189 Return a concise, colorized summary of the job's current state. 190 191 This function is deprecated, use `get_brief_info` instead. 192 193 Returns: 194 Text: A Rich `Text` object containing brief information about the job. 195 """ 196 return self.get_brief_info(print_dir)
Return a concise, colorized summary of the job's current state.
This function is deprecated, use get_brief_info instead.
Returns:
Text: A Rich
Textobject containing brief information about the job.