qq_lib.sync

File-synchronization utilities for retrieving data from a running or failed qq job.

This module defines the Syncer class, an extension of Navigator that handles copying files from a job's remote working directory back to the job's input directory. It performs safety checks based on the job's real state, ensuring that synchronization is attempted only when a working directory actually exists.

 1# Released under MIT License.
 2# Copyright (c) 2025-2026 Ladislav Bartos and Robert Vacha Lab
 3
 4"""
 5File-synchronization utilities for retrieving data from a running or failed qq job.
 6
 7This module defines the `Syncer` class, an extension of `Navigator` that handles
 8copying files from a job's remote working directory back to the job's input
 9directory. It performs safety checks based on the job's real state, ensuring
10that synchronization is attempted only when a working directory actually exists.
11"""
12
13from .syncer import Syncer
14
15__all__ = [
16    "Syncer",
17]
class Syncer(qq_lib.core.navigator.Navigator):
12class Syncer(Navigator):
13    """
14    Handle synchronization of job files between a remote working directory
15    (on a compute node) and the local input directory.
16    """
17
18    def ensure_suitable(self):
19        """
20        Verify that the job is in a state where files can be fetched from its working directory.
21
22        Raises:
23            QQNotSuitableError: If the working directory is not expected to exist
24                or if the working directory is the input directory.
25        """
26        if self._work_dir_is_input_dir():
27            raise QQNotSuitableError(
28                "Working directory of the job is the input directory of the job: implicitly synchronized."
29            )
30
31        if self._is_synchronized():
32            raise QQNotSuitableError(
33                "Job has been completed and was synchronized: working directory no longer exists."
34            )
35
36        # killed jobs may not have working directory
37        if self._is_killed() and not self.has_destination():
38            raise QQNotSuitableError(
39                "Job has been killed and no working directory is available."
40            )
41
42        # queued jobs do not have working directory
43        if self._is_queued():
44            raise QQNotSuitableError("Job is queued or booting: nothing to sync.")
45
46    def sync(self, files: list[str] | None = None) -> None:
47        """
48        Synchronize files from the remote working directory to the local input directory.
49
50        Args:
51            files (list[str] | None): Optional list of specific filenames to fetch.
52                If omitted, all files are synchronized except those excluded by the batch system.
53
54        Behavior:
55            - If `files` is provided, only those specific files are copied.
56            - If omitted, the entire working directory is synchronized.
57
58        Raises:
59            QQError: If the job's destination (host or working directory) cannot be determined.
60        """
61        if not self.has_destination():
62            raise QQError(
63                "Host ('main_node') or working directory ('work_dir') are not defined."
64            )
65
66        # hint for type checker
67        # work_dir and main_node must be set - we check that in self.hasDestination
68        assert self._work_dir and self._main_node
69
70        if files:
71            logger.info(
72                f"Fetching file{'s' if len(files) > 1 else ''} '{' '.join(files)}' from job's working directory to input directory."
73            )
74            self._batch_system.sync_selected(
75                self._work_dir,
76                self._informer.info.input_dir,
77                self._main_node,
78                None,
79                [self._work_dir / x for x in files],
80            )
81        else:
82            logger.info(
83                "Fetching all files from job's working directory to input directory."
84            )
85            self._batch_system.sync_with_exclusions(
86                self._work_dir, self._informer.info.input_dir, self._main_node, None
87            )

Handle synchronization of job files between a remote working directory (on a compute node) and the local input directory.

def ensure_suitable(self):
18    def ensure_suitable(self):
19        """
20        Verify that the job is in a state where files can be fetched from its working directory.
21
22        Raises:
23            QQNotSuitableError: If the working directory is not expected to exist
24                or if the working directory is the input directory.
25        """
26        if self._work_dir_is_input_dir():
27            raise QQNotSuitableError(
28                "Working directory of the job is the input directory of the job: implicitly synchronized."
29            )
30
31        if self._is_synchronized():
32            raise QQNotSuitableError(
33                "Job has been completed and was synchronized: working directory no longer exists."
34            )
35
36        # killed jobs may not have working directory
37        if self._is_killed() and not self.has_destination():
38            raise QQNotSuitableError(
39                "Job has been killed and no working directory is available."
40            )
41
42        # queued jobs do not have working directory
43        if self._is_queued():
44            raise QQNotSuitableError("Job is queued or booting: nothing to sync.")

Verify that the job is in a state where files can be fetched from its working directory.

Raises:
  • QQNotSuitableError: If the working directory is not expected to exist or if the working directory is the input directory.
def sync(self, files: list[str] | None = None) -> None:
46    def sync(self, files: list[str] | None = None) -> None:
47        """
48        Synchronize files from the remote working directory to the local input directory.
49
50        Args:
51            files (list[str] | None): Optional list of specific filenames to fetch.
52                If omitted, all files are synchronized except those excluded by the batch system.
53
54        Behavior:
55            - If `files` is provided, only those specific files are copied.
56            - If omitted, the entire working directory is synchronized.
57
58        Raises:
59            QQError: If the job's destination (host or working directory) cannot be determined.
60        """
61        if not self.has_destination():
62            raise QQError(
63                "Host ('main_node') or working directory ('work_dir') are not defined."
64            )
65
66        # hint for type checker
67        # work_dir and main_node must be set - we check that in self.hasDestination
68        assert self._work_dir and self._main_node
69
70        if files:
71            logger.info(
72                f"Fetching file{'s' if len(files) > 1 else ''} '{' '.join(files)}' from job's working directory to input directory."
73            )
74            self._batch_system.sync_selected(
75                self._work_dir,
76                self._informer.info.input_dir,
77                self._main_node,
78                None,
79                [self._work_dir / x for x in files],
80            )
81        else:
82            logger.info(
83                "Fetching all files from job's working directory to input directory."
84            )
85            self._batch_system.sync_with_exclusions(
86                self._work_dir, self._informer.info.input_dir, self._main_node, None
87            )

Synchronize files from the remote working directory to the local input directory.

Arguments:
  • files (list[str] | None): Optional list of specific filenames to fetch. If omitted, all files are synchronized except those excluded by the batch system.
Behavior:
  • If files is provided, only those specific files are copied.
  • If omitted, the entire working directory is synchronized.
Raises:
  • QQError: If the job's destination (host or working directory) cannot be determined.