qq_lib.core.navigator
Navigation utilities for qq job directories.
This module defines the Navigator class, an extension of Operator that
locates a job's working directory and execution host. It provides helpers for
determining job destination, checking whether the current process is already in
the working directory, and inspecting job state in the context of directory navigation.
1# Released under MIT License. 2# Copyright (c) 2025-2026 Ladislav Bartos and Robert Vacha Lab 3 4""" 5Navigation utilities for qq job directories. 6 7This module defines the `Navigator` class, an extension of `Operator` that 8locates a job's working directory and execution host. It provides helpers for 9determining job destination, checking whether the current process is already in 10the working directory, and inspecting job state in the context of directory navigation. 11""" 12 13import socket 14from pathlib import Path 15from typing import Self 16 17from qq_lib.core.logger import get_logger 18from qq_lib.core.logical_paths import logical_resolve 19from qq_lib.info.informer import Informer 20from qq_lib.properties.states import RealState 21 22from .operator import Operator 23 24logger = get_logger(__name__) 25 26 27class Navigator(Operator): 28 """ 29 Base class for performing operations with job's working directory. 30 31 Attributes: 32 _informer (Informer): The underlying informer object that provides job details. 33 _info_file (Path): The path to the qq info file associated with this job. 34 _input_machine (str | None): Hostname of the machine on which the qq info file is stored. 35 _batch_system (str): The batch system type as reported by the informer. 36 _state (RealState): The current real state of the qq job. 37 _work_dir (Path | None): Path to the job's working directory. None if it does not exist. 38 _main_node (str | None): Main node on which the job is running. None if main node is not known. 39 """ 40 41 def __init__(self, info_file: Path, host: str | None = None): 42 """ 43 Initialize a Navigator instance from a qq info file. 44 45 Args: 46 info_file (Path): Path to the qq info file describing the job. 47 host (str | None, optional): Optional hostname of a machine from 48 which to load job information. Defaults to None meaning 'current machine'. 49 """ 50 super().__init__(info_file, host) 51 self._set_destination() 52 53 @classmethod 54 def from_informer(cls, informer: Informer) -> Self: 55 """ 56 Initialize a Navigator instance from an Informer. 57 58 Path to info file is set based on the information in the Informer, even if it does not exist. 59 60 Args: 61 informer (Informer): Initialized informer instance containing information about the job. 62 63 Returns: 64 Navigator: Initialized Navigator. 65 """ 66 navigator = super().from_informer(informer) 67 navigator._set_destination() 68 69 return navigator 70 71 def update(self): 72 super().update() 73 self._set_destination() 74 75 def has_destination(self) -> bool: 76 """ 77 Check that the job has an assigned host and working directory. 78 79 Returns: 80 bool: True if the job has both a host and a working directory, 81 False otherwise. 82 """ 83 return self._work_dir is not None and self._main_node is not None 84 85 def get_main_node(self) -> str | None: 86 """ 87 Get the hostname of the main node where the job is running. 88 89 Returns: 90 str | None: Hostname of the main node or None if undefined. 91 """ 92 return self._main_node 93 94 def get_work_dir(self) -> Path | None: 95 """ 96 Get the absolute path to the working directory of the job. 97 98 Returns: 99 Path | None: Absolute path to the working directory or None if undefined. 100 """ 101 return self._work_dir 102 103 def _set_destination(self) -> None: 104 """ 105 Get the job's host and working directory from the wrapped informer. 106 107 Updates: 108 - _main_node: hostname of the main node where the job runs 109 - _work_dir: absolute path to the working directory 110 111 Raises: 112 QQError: If main_node or work_dir are not defined in the informer. 113 """ 114 destination = self._informer.get_destination() 115 logger.debug(f"Destination: {destination}") 116 117 if destination: 118 (self._main_node, self._work_dir) = destination 119 else: 120 self._main_node = None 121 self._work_dir = None 122 123 def _is_in_work_dir(self) -> bool: 124 """ 125 Check if the current process is already in the job's working directory. 126 127 Returns: 128 bool: True if the current directory matches the job's work_dir and: 129 a) either an input_dir was used to run the job, or 130 b) local hostname matches the job's main node 131 """ 132 # note that we cannot just compare directory paths, since 133 # the same directory path may point to different directories 134 # on the current machine and on the execution node 135 # we also need to check that 136 # a) job was running in shared storage or 137 # b) we are on the same machine 138 return ( 139 self._work_dir is not None 140 and logical_resolve(self._work_dir) == logical_resolve(Path()) 141 and ( 142 not self._informer.uses_scratch() or self._main_node == socket.getfqdn() 143 ) 144 ) 145 146 def _is_synchronized(self) -> bool: 147 """ 148 Check whether the job has been synchronized. 149 150 Ignores the actual existence/non-existence of the working directory. 151 """ 152 # if exit code is not defined, then the job was never synchronized 153 # (it was either never run or it was killed) 154 if (exit_code := self._informer.info.job_exit_code) is None: 155 return False 156 157 return any( 158 mode.should_transfer(exit_code) 159 for mode in self._informer.info.transfer_mode 160 ) 161 162 def _is_queued(self) -> bool: 163 """Check if the job is queued, booting, held, or waiting.""" 164 return self._state in { 165 RealState.QUEUED, 166 RealState.BOOTING, 167 RealState.HELD, 168 RealState.WAITING, 169 } 170 171 def _is_killed(self) -> bool: 172 """Check if the job has been or is being killed.""" 173 return self._state == RealState.KILLED or ( 174 self._state == RealState.EXITING 175 and self._informer.info.job_exit_code is None 176 ) 177 178 def _is_finished(self) -> bool: 179 """Check if the job has finished succesfully.""" 180 return self._state == RealState.FINISHED 181 182 def _is_failed(self) -> bool: 183 """Check if the job has failed.""" 184 return self._state == RealState.FAILED 185 186 def _is_unknown_inconsistent(self) -> bool: 187 """Check if the job is in an unknown or inconsistent state.""" 188 return self._state in {RealState.UNKNOWN, RealState.IN_AN_INCONSISTENT_STATE} 189 190 def _is_exiting_successfully(self) -> bool: 191 """ 192 Check whether the job is currently successfully exiting. 193 """ 194 return ( 195 self._state == RealState.EXITING and self._informer.info.job_exit_code == 0 196 ) 197 198 def _is_suspended(self) -> bool: 199 """Check if the job is currently suspended.""" 200 return self._state == RealState.SUSPENDED 201 202 def _is_running(self) -> bool: 203 """Check if the job is running.""" 204 return self._state == RealState.RUNNING 205 206 def _work_dir_is_input_dir(self) -> bool: 207 """Check whether the working directory of the job is the input directory of the job.""" 208 # note that we cannot just compare directory paths, since 209 # the same directory path may point to different directories 210 # on the input machine and on the execution node 211 # we also need to check that 212 # a) job was running in shared storage or 213 # b) the job was running on the input machine 214 return ( 215 self._work_dir is not None 216 and logical_resolve(self._work_dir) 217 == logical_resolve(self._informer.info.input_dir) 218 and ( 219 not self._informer.uses_scratch() 220 or self._main_node == self._input_machine 221 ) 222 )
logger =
<Logger qq_lib.core.navigator (INFO)>