qq_lib.core.navigator

Navigation utilities for qq job directories.

This module defines the Navigator class, an extension of Operator that locates a job's working directory and execution host. It provides helpers for determining job destination, checking whether the current process is already in the working directory, and inspecting job state in the context of directory navigation.

  1# Released under MIT License.
  2# Copyright (c) 2025-2026 Ladislav Bartos and Robert Vacha Lab
  3
  4"""
  5Navigation utilities for qq job directories.
  6
  7This module defines the `Navigator` class, an extension of `Operator` that
  8locates a job's working directory and execution host. It provides helpers for
  9determining job destination, checking whether the current process is already in
 10the working directory, and inspecting job state in the context of directory navigation.
 11"""
 12
 13import socket
 14from pathlib import Path
 15from typing import Self
 16
 17from qq_lib.core.logger import get_logger
 18from qq_lib.core.logical_paths import logical_resolve
 19from qq_lib.info.informer import Informer
 20from qq_lib.properties.states import RealState
 21
 22from .operator import Operator
 23
 24logger = get_logger(__name__)
 25
 26
 27class Navigator(Operator):
 28    """
 29    Base class for performing operations with job's working directory.
 30
 31    Attributes:
 32        _informer (Informer): The underlying informer object that provides job details.
 33        _info_file (Path): The path to the qq info file associated with this job.
 34        _input_machine (str | None): Hostname of the machine on which the qq info file is stored.
 35        _batch_system (str): The batch system type as reported by the informer.
 36        _state (RealState): The current real state of the qq job.
 37        _work_dir (Path | None): Path to the job's working directory. None if it does not exist.
 38        _main_node (str | None): Main node on which the job is running. None if main node is not known.
 39    """
 40
 41    def __init__(self, info_file: Path, host: str | None = None):
 42        """
 43        Initialize a Navigator instance from a qq info file.
 44
 45        Args:
 46            info_file (Path): Path to the qq info file describing the job.
 47            host (str | None, optional): Optional hostname of a machine from
 48                which to load job information. Defaults to None meaning 'current machine'.
 49        """
 50        super().__init__(info_file, host)
 51        self._set_destination()
 52
 53    @classmethod
 54    def from_informer(cls, informer: Informer) -> Self:
 55        """
 56        Initialize a Navigator instance from an Informer.
 57
 58        Path to info file is set based on the information in the Informer, even if it does not exist.
 59
 60        Args:
 61            informer (Informer): Initialized informer instance containing information about the job.
 62
 63        Returns:
 64            Navigator: Initialized Navigator.
 65        """
 66        navigator = super().from_informer(informer)
 67        navigator._set_destination()
 68
 69        return navigator
 70
 71    def update(self):
 72        super().update()
 73        self._set_destination()
 74
 75    def has_destination(self) -> bool:
 76        """
 77        Check that the job has an assigned host and working directory.
 78
 79        Returns:
 80            bool: True if the job has both a host and a working directory,
 81            False otherwise.
 82        """
 83        return self._work_dir is not None and self._main_node is not None
 84
 85    def get_main_node(self) -> str | None:
 86        """
 87        Get the hostname of the main node where the job is running.
 88
 89        Returns:
 90            str | None: Hostname of the main node or None if undefined.
 91        """
 92        return self._main_node
 93
 94    def get_work_dir(self) -> Path | None:
 95        """
 96        Get the absolute path to the working directory of the job.
 97
 98        Returns:
 99            Path | None: Absolute path to the working directory or None if undefined.
100        """
101        return self._work_dir
102
103    def _set_destination(self) -> None:
104        """
105        Get the job's host and working directory from the wrapped informer.
106
107        Updates:
108            - _main_node: hostname of the main node where the job runs
109            - _work_dir: absolute path to the working directory
110
111        Raises:
112            QQError: If main_node or work_dir are not defined in the informer.
113        """
114        destination = self._informer.get_destination()
115        logger.debug(f"Destination: {destination}")
116
117        if destination:
118            (self._main_node, self._work_dir) = destination
119        else:
120            self._main_node = None
121            self._work_dir = None
122
123    def _is_in_work_dir(self) -> bool:
124        """
125        Check if the current process is already in the job's working directory.
126
127        Returns:
128            bool: True if the current directory matches the job's work_dir and:
129              a) either an input_dir was used to run the job, or
130              b) local hostname matches the job's main node
131        """
132        # note that we cannot just compare directory paths, since
133        # the same directory path may point to different directories
134        # on the current machine and on the execution node
135        # we also need to check that
136        #   a) job was running in shared storage or
137        #   b) we are on the same machine
138        return (
139            self._work_dir is not None
140            and logical_resolve(self._work_dir) == logical_resolve(Path())
141            and (
142                not self._informer.uses_scratch() or self._main_node == socket.getfqdn()
143            )
144        )
145
146    def _is_synchronized(self) -> bool:
147        """
148        Check whether the job has been synchronized.
149
150        Ignores the actual existence/non-existence of the working directory.
151        """
152        # if exit code is not defined, then the job was never synchronized
153        # (it was either never run or it was killed)
154        if (exit_code := self._informer.info.job_exit_code) is None:
155            return False
156
157        return any(
158            mode.should_transfer(exit_code)
159            for mode in self._informer.info.transfer_mode
160        )
161
162    def _is_queued(self) -> bool:
163        """Check if the job is queued, booting, held, or waiting."""
164        return self._state in {
165            RealState.QUEUED,
166            RealState.BOOTING,
167            RealState.HELD,
168            RealState.WAITING,
169        }
170
171    def _is_killed(self) -> bool:
172        """Check if the job has been or is being killed."""
173        return self._state == RealState.KILLED or (
174            self._state == RealState.EXITING
175            and self._informer.info.job_exit_code is None
176        )
177
178    def _is_finished(self) -> bool:
179        """Check if the job has finished succesfully."""
180        return self._state == RealState.FINISHED
181
182    def _is_failed(self) -> bool:
183        """Check if the job has failed."""
184        return self._state == RealState.FAILED
185
186    def _is_unknown_inconsistent(self) -> bool:
187        """Check if the job is in an unknown or inconsistent state."""
188        return self._state in {RealState.UNKNOWN, RealState.IN_AN_INCONSISTENT_STATE}
189
190    def _is_exiting_successfully(self) -> bool:
191        """
192        Check whether the job is currently successfully exiting.
193        """
194        return (
195            self._state == RealState.EXITING and self._informer.info.job_exit_code == 0
196        )
197
198    def _is_suspended(self) -> bool:
199        """Check if the job is currently suspended."""
200        return self._state == RealState.SUSPENDED
201
202    def _is_running(self) -> bool:
203        """Check if the job is running."""
204        return self._state == RealState.RUNNING
205
206    def _work_dir_is_input_dir(self) -> bool:
207        """Check whether the working directory of the job is the input directory of the job."""
208        # note that we cannot just compare directory paths, since
209        # the same directory path may point to different directories
210        # on the input machine and on the execution node
211        # we also need to check that
212        #   a) job was running in shared storage or
213        #   b) the job was running on the input machine
214        return (
215            self._work_dir is not None
216            and logical_resolve(self._work_dir)
217            == logical_resolve(self._informer.info.input_dir)
218            and (
219                not self._informer.uses_scratch()
220                or self._main_node == self._input_machine
221            )
222        )
logger = <Logger qq_lib.core.navigator (INFO)>