qq_lib.archive

Utilities for archiving and retrieving job-related files.

This module provides the Archiver class, which coordinates the movement of files between working directory and the job archive.

 1# Released under MIT License.
 2# Copyright (c) 2025-2026 Ladislav Bartos and Robert Vacha Lab
 3
 4"""
 5Utilities for archiving and retrieving job-related files.
 6
 7This module provides the `Archiver` class, which coordinates the movement
 8of files between working directory and the job archive.
 9"""
10
11from .archiver import Archiver
12
13__all__ = [
14    "Archiver",
15]
class Archiver:
 20class Archiver:
 21    """
 22    Manages archiving and retrieval of job-related files.
 23    """
 24
 25    def __init__(
 26        self,
 27        archive: Path,
 28        archive_format: str,
 29        input_machine: str,
 30        input_dir: Path,
 31        batch_system: AnyBatchClass,
 32    ):
 33        """
 34        Initialize the Archiver.
 35
 36        Args:
 37            archive (Path): Absolute path to the job's archive directory.
 38            archive_format (str): Printf-style or regex pattern describing archived filenames.
 39            input_machine (str): The hostname from which the job was submitted.
 40            input_dir (Path): The directory from which the job was submitted.
 41            batch_system (AnyBatchClass): The batch system which manages the job.
 42        """
 43        self._batch_system = batch_system
 44        self._archive = archive
 45        self._archive_format = archive_format
 46        self._input_machine = input_machine
 47        self._input_dir = input_dir
 48
 49    def make_archive_dir(self) -> None:
 50        """
 51        Create the archive directory in the job's input directory if it does not already exist.
 52        """
 53        logger.debug(
 54            f"Attempting to create an archive '{self._archive}' on '{self._input_machine}'."
 55        )
 56        self._batch_system.make_remote_dir(self._input_machine, self._archive)
 57
 58    def from_archive(self, dir: Path, cycle: int | None = None) -> None:
 59        """
 60        Fetch files from the archive to job's working directory.
 61
 62        This method retrieves files from the archive that match the
 63        configured archive pattern. If a cycle number is provided, only
 64        files corresponding to that cycle (for printf-style patterns) are
 65        fetched. If no cycle is provided, all files matching the pattern
 66        in the archive are fetched.
 67
 68        Args:
 69            dir (Path): The directory where files will be copied to.
 70            cycle (int | None): The cycle number to filter files for.
 71                Only relevant for printf-style patterns. If `None`, all
 72                matching files are fetched. Defaults to `None`.
 73
 74        Raises:
 75            QQError: If file transfer fails.
 76        """
 77        if not (
 78            files := self.get_files_matching_pattern(
 79                self._archive, self._input_machine, self._archive_format, cycle, False
 80            )
 81        ):
 82            logger.debug("Nothing to fetch from archive.")
 83            return
 84
 85        logger.debug(f"Files to fetch from archive: {files}.")
 86
 87        Retryer(
 88            self._batch_system.sync_selected,
 89            self._archive,
 90            dir,
 91            self._input_machine,
 92            socket.getfqdn(),
 93            files,
 94            max_tries=CFG.archiver.retry_tries,
 95            wait_seconds=CFG.archiver.retry_wait,
 96        ).run()
 97
 98    def to_archive(self, dir: Path) -> None:
 99        """
100        Archive all files matching the archive format in the specified directory.
101
102        Copies all files matching the archive pattern from directory
103        `dir` to the archive directory. After successfully transferring
104        the files, they are removed from the working directory.
105
106        Args:
107            work_dir (Path): The directory containing files to archive.
108
109        Raises:
110            QQError: If file transfer or removal fails.
111        """
112        if not (
113            files := self.get_files_matching_pattern(
114                dir, None, self._archive_format, None, False
115            )
116        ):
117            logger.debug("Nothing to archive.")
118            return
119
120        logger.debug(f"Files to archive: {files}.")
121
122        Retryer(
123            self._batch_system.sync_selected,
124            dir,
125            self._archive,
126            socket.getfqdn(),
127            self._input_machine,
128            files,
129            max_tries=CFG.archiver.retry_tries,
130            wait_seconds=CFG.archiver.retry_wait,
131        ).run()
132
133        # remove the archived files
134        Retryer(
135            self._remove_files,
136            files,
137            max_tries=CFG.archiver.retry_tries,
138            wait_seconds=CFG.archiver.retry_wait,
139        ).run()
140
141    def archive_runtime_files(self, job_name: str, cycle: int) -> None:
142        """
143        Archive qq runtime files from a specific job located in the input directory.
144
145        The archived files are moved from the input directory to the archive directory.
146
147        Ensure that `job_name` does not contain special regex characters, or that any such
148        characters are properly escaped.
149
150        This function will archive all files whose names match `job_name`, regardless
151        of whether they have any qq-specific suffixes.
152
153        Args:
154            job_name (str): The name of the job.
155            cycle (int): Cycle number for which the files should be archived.
156
157        Raises:
158            QQError: If moving the runtime files fails.
159        """
160        if not (
161            files := self.get_files_matching_pattern(
162                self._input_dir,
163                self._input_machine,
164                # only use the stem of the job name, the extension will not be matched
165                job_name.split(".", maxsplit=1)[0],
166                # we do not need to use the cycle number here since the job_name should already be expanded
167                cycle=None,
168                include_qq_files=True,
169            )
170        ):
171            logger.debug("No qq runtime files to archive.")
172            return
173
174        # the files are renamed to conform the the archive format
175        moved_files = [
176            self._archive / f"{self._archive_format % cycle}{f.suffix}" for f in files
177        ]
178
179        logger.debug(f"qq runtime files to archive: {files}.")
180        logger.debug(f"qq runtime files after moving: {moved_files}.")
181
182        Retryer(
183            self._batch_system.move_remote_files,
184            self._input_machine,
185            files,
186            moved_files,
187            max_tries=CFG.archiver.retry_tries,
188            wait_seconds=CFG.archiver.retry_wait,
189        ).run()
190
191    def get_files_matching_pattern(
192        self,
193        directory: Path,
194        host: str | None,
195        pattern: str,
196        cycle: int | None = None,
197        include_qq_files: bool = False,
198    ) -> list[Path]:
199        """
200        Determine which files in a directory match a given pattern.
201
202        Args:
203            directory (Path): Directory to search for files.
204            host (str | None): Hostname if the directory is remote,
205                or None if it is available from the current machine.
206            pattern (str): A printf-style or regex pattern to match file stems.
207            cycle (int | None): Optional cycle number for printf-style patterns.
208                If provided, only files corresponding to that loop are returned.
209                If `None`, all matching files are returned. Defaults to `None`.
210            include_qq_files (bool): Whether to include qq runtime files. Defaults to False.
211
212        Returns:
213            list[Path]: A list of absolute (logical) paths to matching files.
214        """
215        if cycle and is_printf_pattern(pattern):
216            try:
217                # try inserting the loop number into the printf pattern
218                regex = re.compile(f"{pattern % cycle}")
219            except Exception:
220                logger.debug(
221                    f"Ignoring loop number since the provided pattern ('{pattern}') does not support it."
222                )
223                regex = Archiver._prepare_regex_pattern(pattern)
224        else:
225            logger.debug(
226                f"Loop number not specified or the provided pattern ('{pattern}') does not support it."
227            )
228            regex = Archiver._prepare_regex_pattern(pattern)
229
230        logger.debug(f"Regex for matching: {regex}.")
231
232        # the directory must exist
233        if host and host != socket.getfqdn():
234            # remote directory
235            available_files: list[Path] = Retryer(
236                self._batch_system.list_remote_dir,
237                host,
238                directory,
239                max_tries=CFG.archiver.retry_tries,
240                wait_seconds=CFG.archiver.retry_wait,
241            ).run()
242        else:
243            # local directory
244            available_files = list(directory.iterdir())
245
246        logger.debug(f"All available files: {available_files}.")
247        if include_qq_files:
248            # the stem of the file must contain the regex pattern
249            return [logical_resolve(f) for f in available_files if regex.search(f.stem)]
250        return [
251            logical_resolve(f)
252            for f in available_files
253            if regex.search(f.stem) and f.suffix not in CFG.suffixes.all_suffixes
254        ]
255
256    def create_init_file(self, cycle: int) -> None:
257        """
258        Create an empty init file for the given cycle.
259        Used as a fallback when no valid archive file is produced, ensuring
260        the next iteration of the loop job can proceed normally.
261        Args:
262            cycle (int): The index of the next cycle of the loop job.
263        """
264        Path(f"{self._archive_format % cycle}.init").touch()
265
266    @staticmethod
267    def _prepare_regex_pattern(pattern: str) -> re.Pattern[str]:
268        """
269        Convert a printf-style pattern or regex string into a compiled regex.
270
271        Args:
272            pattern (str): The pattern to convert.
273
274        Returns:
275            re.Pattern[str]: Compiled regex pattern that can be used for matching.
276        """
277        if is_printf_pattern(pattern):
278            pattern = printf_to_regex(pattern)
279
280        return re.compile(pattern)
281
282    @staticmethod
283    def _remove_files(files: Iterable[Path]) -> None:
284        """
285        Remove a list of files from the filesystem.
286
287        Args:
288            files (Iterable[Path]): Files to delete.
289
290        Raises:
291            OSError: If file removal fails for any file.
292        """
293        for file in files:
294            file.unlink()

Manages archiving and retrieval of job-related files.

Archiver( archive: pathlib._local.Path, archive_format: str, input_machine: str, input_dir: pathlib._local.Path, batch_system: AnyBatchClass)
25    def __init__(
26        self,
27        archive: Path,
28        archive_format: str,
29        input_machine: str,
30        input_dir: Path,
31        batch_system: AnyBatchClass,
32    ):
33        """
34        Initialize the Archiver.
35
36        Args:
37            archive (Path): Absolute path to the job's archive directory.
38            archive_format (str): Printf-style or regex pattern describing archived filenames.
39            input_machine (str): The hostname from which the job was submitted.
40            input_dir (Path): The directory from which the job was submitted.
41            batch_system (AnyBatchClass): The batch system which manages the job.
42        """
43        self._batch_system = batch_system
44        self._archive = archive
45        self._archive_format = archive_format
46        self._input_machine = input_machine
47        self._input_dir = input_dir

Initialize the Archiver.

Arguments:
  • archive (Path): Absolute path to the job's archive directory.
  • archive_format (str): Printf-style or regex pattern describing archived filenames.
  • input_machine (str): The hostname from which the job was submitted.
  • input_dir (Path): The directory from which the job was submitted.
  • batch_system (AnyBatchClass): The batch system which manages the job.
def make_archive_dir(self) -> None:
49    def make_archive_dir(self) -> None:
50        """
51        Create the archive directory in the job's input directory if it does not already exist.
52        """
53        logger.debug(
54            f"Attempting to create an archive '{self._archive}' on '{self._input_machine}'."
55        )
56        self._batch_system.make_remote_dir(self._input_machine, self._archive)

Create the archive directory in the job's input directory if it does not already exist.

def from_archive(self, dir: pathlib._local.Path, cycle: int | None = None) -> None:
58    def from_archive(self, dir: Path, cycle: int | None = None) -> None:
59        """
60        Fetch files from the archive to job's working directory.
61
62        This method retrieves files from the archive that match the
63        configured archive pattern. If a cycle number is provided, only
64        files corresponding to that cycle (for printf-style patterns) are
65        fetched. If no cycle is provided, all files matching the pattern
66        in the archive are fetched.
67
68        Args:
69            dir (Path): The directory where files will be copied to.
70            cycle (int | None): The cycle number to filter files for.
71                Only relevant for printf-style patterns. If `None`, all
72                matching files are fetched. Defaults to `None`.
73
74        Raises:
75            QQError: If file transfer fails.
76        """
77        if not (
78            files := self.get_files_matching_pattern(
79                self._archive, self._input_machine, self._archive_format, cycle, False
80            )
81        ):
82            logger.debug("Nothing to fetch from archive.")
83            return
84
85        logger.debug(f"Files to fetch from archive: {files}.")
86
87        Retryer(
88            self._batch_system.sync_selected,
89            self._archive,
90            dir,
91            self._input_machine,
92            socket.getfqdn(),
93            files,
94            max_tries=CFG.archiver.retry_tries,
95            wait_seconds=CFG.archiver.retry_wait,
96        ).run()

Fetch files from the archive to job's working directory.

This method retrieves files from the archive that match the configured archive pattern. If a cycle number is provided, only files corresponding to that cycle (for printf-style patterns) are fetched. If no cycle is provided, all files matching the pattern in the archive are fetched.

Arguments:
  • dir (Path): The directory where files will be copied to.
  • cycle (int | None): The cycle number to filter files for. Only relevant for printf-style patterns. If None, all matching files are fetched. Defaults to None.
Raises:
  • QQError: If file transfer fails.
def to_archive(self, dir: pathlib._local.Path) -> None:
 98    def to_archive(self, dir: Path) -> None:
 99        """
100        Archive all files matching the archive format in the specified directory.
101
102        Copies all files matching the archive pattern from directory
103        `dir` to the archive directory. After successfully transferring
104        the files, they are removed from the working directory.
105
106        Args:
107            work_dir (Path): The directory containing files to archive.
108
109        Raises:
110            QQError: If file transfer or removal fails.
111        """
112        if not (
113            files := self.get_files_matching_pattern(
114                dir, None, self._archive_format, None, False
115            )
116        ):
117            logger.debug("Nothing to archive.")
118            return
119
120        logger.debug(f"Files to archive: {files}.")
121
122        Retryer(
123            self._batch_system.sync_selected,
124            dir,
125            self._archive,
126            socket.getfqdn(),
127            self._input_machine,
128            files,
129            max_tries=CFG.archiver.retry_tries,
130            wait_seconds=CFG.archiver.retry_wait,
131        ).run()
132
133        # remove the archived files
134        Retryer(
135            self._remove_files,
136            files,
137            max_tries=CFG.archiver.retry_tries,
138            wait_seconds=CFG.archiver.retry_wait,
139        ).run()

Archive all files matching the archive format in the specified directory.

Copies all files matching the archive pattern from directory dir to the archive directory. After successfully transferring the files, they are removed from the working directory.

Arguments:
  • work_dir (Path): The directory containing files to archive.
Raises:
  • QQError: If file transfer or removal fails.
def archive_runtime_files(self, job_name: str, cycle: int) -> None:
141    def archive_runtime_files(self, job_name: str, cycle: int) -> None:
142        """
143        Archive qq runtime files from a specific job located in the input directory.
144
145        The archived files are moved from the input directory to the archive directory.
146
147        Ensure that `job_name` does not contain special regex characters, or that any such
148        characters are properly escaped.
149
150        This function will archive all files whose names match `job_name`, regardless
151        of whether they have any qq-specific suffixes.
152
153        Args:
154            job_name (str): The name of the job.
155            cycle (int): Cycle number for which the files should be archived.
156
157        Raises:
158            QQError: If moving the runtime files fails.
159        """
160        if not (
161            files := self.get_files_matching_pattern(
162                self._input_dir,
163                self._input_machine,
164                # only use the stem of the job name, the extension will not be matched
165                job_name.split(".", maxsplit=1)[0],
166                # we do not need to use the cycle number here since the job_name should already be expanded
167                cycle=None,
168                include_qq_files=True,
169            )
170        ):
171            logger.debug("No qq runtime files to archive.")
172            return
173
174        # the files are renamed to conform the the archive format
175        moved_files = [
176            self._archive / f"{self._archive_format % cycle}{f.suffix}" for f in files
177        ]
178
179        logger.debug(f"qq runtime files to archive: {files}.")
180        logger.debug(f"qq runtime files after moving: {moved_files}.")
181
182        Retryer(
183            self._batch_system.move_remote_files,
184            self._input_machine,
185            files,
186            moved_files,
187            max_tries=CFG.archiver.retry_tries,
188            wait_seconds=CFG.archiver.retry_wait,
189        ).run()

Archive qq runtime files from a specific job located in the input directory.

The archived files are moved from the input directory to the archive directory.

Ensure that job_name does not contain special regex characters, or that any such characters are properly escaped.

This function will archive all files whose names match job_name, regardless of whether they have any qq-specific suffixes.

Arguments:
  • job_name (str): The name of the job.
  • cycle (int): Cycle number for which the files should be archived.
Raises:
  • QQError: If moving the runtime files fails.
def get_files_matching_pattern( self, directory: pathlib._local.Path, host: str | None, pattern: str, cycle: int | None = None, include_qq_files: bool = False) -> list[pathlib._local.Path]:
191    def get_files_matching_pattern(
192        self,
193        directory: Path,
194        host: str | None,
195        pattern: str,
196        cycle: int | None = None,
197        include_qq_files: bool = False,
198    ) -> list[Path]:
199        """
200        Determine which files in a directory match a given pattern.
201
202        Args:
203            directory (Path): Directory to search for files.
204            host (str | None): Hostname if the directory is remote,
205                or None if it is available from the current machine.
206            pattern (str): A printf-style or regex pattern to match file stems.
207            cycle (int | None): Optional cycle number for printf-style patterns.
208                If provided, only files corresponding to that loop are returned.
209                If `None`, all matching files are returned. Defaults to `None`.
210            include_qq_files (bool): Whether to include qq runtime files. Defaults to False.
211
212        Returns:
213            list[Path]: A list of absolute (logical) paths to matching files.
214        """
215        if cycle and is_printf_pattern(pattern):
216            try:
217                # try inserting the loop number into the printf pattern
218                regex = re.compile(f"{pattern % cycle}")
219            except Exception:
220                logger.debug(
221                    f"Ignoring loop number since the provided pattern ('{pattern}') does not support it."
222                )
223                regex = Archiver._prepare_regex_pattern(pattern)
224        else:
225            logger.debug(
226                f"Loop number not specified or the provided pattern ('{pattern}') does not support it."
227            )
228            regex = Archiver._prepare_regex_pattern(pattern)
229
230        logger.debug(f"Regex for matching: {regex}.")
231
232        # the directory must exist
233        if host and host != socket.getfqdn():
234            # remote directory
235            available_files: list[Path] = Retryer(
236                self._batch_system.list_remote_dir,
237                host,
238                directory,
239                max_tries=CFG.archiver.retry_tries,
240                wait_seconds=CFG.archiver.retry_wait,
241            ).run()
242        else:
243            # local directory
244            available_files = list(directory.iterdir())
245
246        logger.debug(f"All available files: {available_files}.")
247        if include_qq_files:
248            # the stem of the file must contain the regex pattern
249            return [logical_resolve(f) for f in available_files if regex.search(f.stem)]
250        return [
251            logical_resolve(f)
252            for f in available_files
253            if regex.search(f.stem) and f.suffix not in CFG.suffixes.all_suffixes
254        ]

Determine which files in a directory match a given pattern.

Arguments:
  • directory (Path): Directory to search for files.
  • host (str | None): Hostname if the directory is remote, or None if it is available from the current machine.
  • pattern (str): A printf-style or regex pattern to match file stems.
  • cycle (int | None): Optional cycle number for printf-style patterns. If provided, only files corresponding to that loop are returned. If None, all matching files are returned. Defaults to None.
  • include_qq_files (bool): Whether to include qq runtime files. Defaults to False.
Returns:

list[Path]: A list of absolute (logical) paths to matching files.

def create_init_file(self, cycle: int) -> None:
256    def create_init_file(self, cycle: int) -> None:
257        """
258        Create an empty init file for the given cycle.
259        Used as a fallback when no valid archive file is produced, ensuring
260        the next iteration of the loop job can proceed normally.
261        Args:
262            cycle (int): The index of the next cycle of the loop job.
263        """
264        Path(f"{self._archive_format % cycle}.init").touch()

Create an empty init file for the given cycle. Used as a fallback when no valid archive file is produced, ensuring the next iteration of the loop job can proceed normally.

Arguments:
  • cycle (int): The index of the next cycle of the loop job.