qq_lib.archive
Utilities for archiving and retrieving job-related files.
This module provides the Archiver class, which coordinates the movement
of files between working directory and the job archive.
1# Released under MIT License. 2# Copyright (c) 2025-2026 Ladislav Bartos and Robert Vacha Lab 3 4""" 5Utilities for archiving and retrieving job-related files. 6 7This module provides the `Archiver` class, which coordinates the movement 8of files between working directory and the job archive. 9""" 10 11from .archiver import Archiver 12 13__all__ = [ 14 "Archiver", 15]
20class Archiver: 21 """ 22 Manages archiving and retrieval of job-related files. 23 """ 24 25 def __init__( 26 self, 27 archive: Path, 28 archive_format: str, 29 input_machine: str, 30 input_dir: Path, 31 batch_system: AnyBatchClass, 32 ): 33 """ 34 Initialize the Archiver. 35 36 Args: 37 archive (Path): Absolute path to the job's archive directory. 38 archive_format (str): Printf-style or regex pattern describing archived filenames. 39 input_machine (str): The hostname from which the job was submitted. 40 input_dir (Path): The directory from which the job was submitted. 41 batch_system (AnyBatchClass): The batch system which manages the job. 42 """ 43 self._batch_system = batch_system 44 self._archive = archive 45 self._archive_format = archive_format 46 self._input_machine = input_machine 47 self._input_dir = input_dir 48 49 def make_archive_dir(self) -> None: 50 """ 51 Create the archive directory in the job's input directory if it does not already exist. 52 """ 53 logger.debug( 54 f"Attempting to create an archive '{self._archive}' on '{self._input_machine}'." 55 ) 56 self._batch_system.make_remote_dir(self._input_machine, self._archive) 57 58 def from_archive(self, dir: Path, cycle: int | None = None) -> None: 59 """ 60 Fetch files from the archive to job's working directory. 61 62 This method retrieves files from the archive that match the 63 configured archive pattern. If a cycle number is provided, only 64 files corresponding to that cycle (for printf-style patterns) are 65 fetched. If no cycle is provided, all files matching the pattern 66 in the archive are fetched. 67 68 Args: 69 dir (Path): The directory where files will be copied to. 70 cycle (int | None): The cycle number to filter files for. 71 Only relevant for printf-style patterns. If `None`, all 72 matching files are fetched. Defaults to `None`. 73 74 Raises: 75 QQError: If file transfer fails. 76 """ 77 if not ( 78 files := self.get_files_matching_pattern( 79 self._archive, self._input_machine, self._archive_format, cycle, False 80 ) 81 ): 82 logger.debug("Nothing to fetch from archive.") 83 return 84 85 logger.debug(f"Files to fetch from archive: {files}.") 86 87 Retryer( 88 self._batch_system.sync_selected, 89 self._archive, 90 dir, 91 self._input_machine, 92 socket.getfqdn(), 93 files, 94 max_tries=CFG.archiver.retry_tries, 95 wait_seconds=CFG.archiver.retry_wait, 96 ).run() 97 98 def to_archive(self, dir: Path) -> None: 99 """ 100 Archive all files matching the archive format in the specified directory. 101 102 Copies all files matching the archive pattern from directory 103 `dir` to the archive directory. After successfully transferring 104 the files, they are removed from the working directory. 105 106 Args: 107 work_dir (Path): The directory containing files to archive. 108 109 Raises: 110 QQError: If file transfer or removal fails. 111 """ 112 if not ( 113 files := self.get_files_matching_pattern( 114 dir, None, self._archive_format, None, False 115 ) 116 ): 117 logger.debug("Nothing to archive.") 118 return 119 120 logger.debug(f"Files to archive: {files}.") 121 122 Retryer( 123 self._batch_system.sync_selected, 124 dir, 125 self._archive, 126 socket.getfqdn(), 127 self._input_machine, 128 files, 129 max_tries=CFG.archiver.retry_tries, 130 wait_seconds=CFG.archiver.retry_wait, 131 ).run() 132 133 # remove the archived files 134 Retryer( 135 self._remove_files, 136 files, 137 max_tries=CFG.archiver.retry_tries, 138 wait_seconds=CFG.archiver.retry_wait, 139 ).run() 140 141 def archive_runtime_files(self, job_name: str, cycle: int) -> None: 142 """ 143 Archive qq runtime files from a specific job located in the input directory. 144 145 The archived files are moved from the input directory to the archive directory. 146 147 Ensure that `job_name` does not contain special regex characters, or that any such 148 characters are properly escaped. 149 150 This function will archive all files whose names match `job_name`, regardless 151 of whether they have any qq-specific suffixes. 152 153 Args: 154 job_name (str): The name of the job. 155 cycle (int): Cycle number for which the files should be archived. 156 157 Raises: 158 QQError: If moving the runtime files fails. 159 """ 160 if not ( 161 files := self.get_files_matching_pattern( 162 self._input_dir, 163 self._input_machine, 164 # only use the stem of the job name, the extension will not be matched 165 job_name.split(".", maxsplit=1)[0], 166 # we do not need to use the cycle number here since the job_name should already be expanded 167 cycle=None, 168 include_qq_files=True, 169 ) 170 ): 171 logger.debug("No qq runtime files to archive.") 172 return 173 174 # the files are renamed to conform the the archive format 175 moved_files = [ 176 self._archive / f"{self._archive_format % cycle}{f.suffix}" for f in files 177 ] 178 179 logger.debug(f"qq runtime files to archive: {files}.") 180 logger.debug(f"qq runtime files after moving: {moved_files}.") 181 182 Retryer( 183 self._batch_system.move_remote_files, 184 self._input_machine, 185 files, 186 moved_files, 187 max_tries=CFG.archiver.retry_tries, 188 wait_seconds=CFG.archiver.retry_wait, 189 ).run() 190 191 def get_files_matching_pattern( 192 self, 193 directory: Path, 194 host: str | None, 195 pattern: str, 196 cycle: int | None = None, 197 include_qq_files: bool = False, 198 ) -> list[Path]: 199 """ 200 Determine which files in a directory match a given pattern. 201 202 Args: 203 directory (Path): Directory to search for files. 204 host (str | None): Hostname if the directory is remote, 205 or None if it is available from the current machine. 206 pattern (str): A printf-style or regex pattern to match file stems. 207 cycle (int | None): Optional cycle number for printf-style patterns. 208 If provided, only files corresponding to that loop are returned. 209 If `None`, all matching files are returned. Defaults to `None`. 210 include_qq_files (bool): Whether to include qq runtime files. Defaults to False. 211 212 Returns: 213 list[Path]: A list of absolute (logical) paths to matching files. 214 """ 215 if cycle and is_printf_pattern(pattern): 216 try: 217 # try inserting the loop number into the printf pattern 218 regex = re.compile(f"{pattern % cycle}") 219 except Exception: 220 logger.debug( 221 f"Ignoring loop number since the provided pattern ('{pattern}') does not support it." 222 ) 223 regex = Archiver._prepare_regex_pattern(pattern) 224 else: 225 logger.debug( 226 f"Loop number not specified or the provided pattern ('{pattern}') does not support it." 227 ) 228 regex = Archiver._prepare_regex_pattern(pattern) 229 230 logger.debug(f"Regex for matching: {regex}.") 231 232 # the directory must exist 233 if host and host != socket.getfqdn(): 234 # remote directory 235 available_files: list[Path] = Retryer( 236 self._batch_system.list_remote_dir, 237 host, 238 directory, 239 max_tries=CFG.archiver.retry_tries, 240 wait_seconds=CFG.archiver.retry_wait, 241 ).run() 242 else: 243 # local directory 244 available_files = list(directory.iterdir()) 245 246 logger.debug(f"All available files: {available_files}.") 247 if include_qq_files: 248 # the stem of the file must contain the regex pattern 249 return [logical_resolve(f) for f in available_files if regex.search(f.stem)] 250 return [ 251 logical_resolve(f) 252 for f in available_files 253 if regex.search(f.stem) and f.suffix not in CFG.suffixes.all_suffixes 254 ] 255 256 def create_init_file(self, cycle: int) -> None: 257 """ 258 Create an empty init file for the given cycle. 259 Used as a fallback when no valid archive file is produced, ensuring 260 the next iteration of the loop job can proceed normally. 261 Args: 262 cycle (int): The index of the next cycle of the loop job. 263 """ 264 Path(f"{self._archive_format % cycle}.init").touch() 265 266 @staticmethod 267 def _prepare_regex_pattern(pattern: str) -> re.Pattern[str]: 268 """ 269 Convert a printf-style pattern or regex string into a compiled regex. 270 271 Args: 272 pattern (str): The pattern to convert. 273 274 Returns: 275 re.Pattern[str]: Compiled regex pattern that can be used for matching. 276 """ 277 if is_printf_pattern(pattern): 278 pattern = printf_to_regex(pattern) 279 280 return re.compile(pattern) 281 282 @staticmethod 283 def _remove_files(files: Iterable[Path]) -> None: 284 """ 285 Remove a list of files from the filesystem. 286 287 Args: 288 files (Iterable[Path]): Files to delete. 289 290 Raises: 291 OSError: If file removal fails for any file. 292 """ 293 for file in files: 294 file.unlink()
Manages archiving and retrieval of job-related files.
25 def __init__( 26 self, 27 archive: Path, 28 archive_format: str, 29 input_machine: str, 30 input_dir: Path, 31 batch_system: AnyBatchClass, 32 ): 33 """ 34 Initialize the Archiver. 35 36 Args: 37 archive (Path): Absolute path to the job's archive directory. 38 archive_format (str): Printf-style or regex pattern describing archived filenames. 39 input_machine (str): The hostname from which the job was submitted. 40 input_dir (Path): The directory from which the job was submitted. 41 batch_system (AnyBatchClass): The batch system which manages the job. 42 """ 43 self._batch_system = batch_system 44 self._archive = archive 45 self._archive_format = archive_format 46 self._input_machine = input_machine 47 self._input_dir = input_dir
Initialize the Archiver.
Arguments:
- archive (Path): Absolute path to the job's archive directory.
- archive_format (str): Printf-style or regex pattern describing archived filenames.
- input_machine (str): The hostname from which the job was submitted.
- input_dir (Path): The directory from which the job was submitted.
- batch_system (AnyBatchClass): The batch system which manages the job.
49 def make_archive_dir(self) -> None: 50 """ 51 Create the archive directory in the job's input directory if it does not already exist. 52 """ 53 logger.debug( 54 f"Attempting to create an archive '{self._archive}' on '{self._input_machine}'." 55 ) 56 self._batch_system.make_remote_dir(self._input_machine, self._archive)
Create the archive directory in the job's input directory if it does not already exist.
58 def from_archive(self, dir: Path, cycle: int | None = None) -> None: 59 """ 60 Fetch files from the archive to job's working directory. 61 62 This method retrieves files from the archive that match the 63 configured archive pattern. If a cycle number is provided, only 64 files corresponding to that cycle (for printf-style patterns) are 65 fetched. If no cycle is provided, all files matching the pattern 66 in the archive are fetched. 67 68 Args: 69 dir (Path): The directory where files will be copied to. 70 cycle (int | None): The cycle number to filter files for. 71 Only relevant for printf-style patterns. If `None`, all 72 matching files are fetched. Defaults to `None`. 73 74 Raises: 75 QQError: If file transfer fails. 76 """ 77 if not ( 78 files := self.get_files_matching_pattern( 79 self._archive, self._input_machine, self._archive_format, cycle, False 80 ) 81 ): 82 logger.debug("Nothing to fetch from archive.") 83 return 84 85 logger.debug(f"Files to fetch from archive: {files}.") 86 87 Retryer( 88 self._batch_system.sync_selected, 89 self._archive, 90 dir, 91 self._input_machine, 92 socket.getfqdn(), 93 files, 94 max_tries=CFG.archiver.retry_tries, 95 wait_seconds=CFG.archiver.retry_wait, 96 ).run()
Fetch files from the archive to job's working directory.
This method retrieves files from the archive that match the configured archive pattern. If a cycle number is provided, only files corresponding to that cycle (for printf-style patterns) are fetched. If no cycle is provided, all files matching the pattern in the archive are fetched.
Arguments:
- dir (Path): The directory where files will be copied to.
- cycle (int | None): The cycle number to filter files for.
Only relevant for printf-style patterns. If
None, all matching files are fetched. Defaults toNone.
Raises:
- QQError: If file transfer fails.
98 def to_archive(self, dir: Path) -> None: 99 """ 100 Archive all files matching the archive format in the specified directory. 101 102 Copies all files matching the archive pattern from directory 103 `dir` to the archive directory. After successfully transferring 104 the files, they are removed from the working directory. 105 106 Args: 107 work_dir (Path): The directory containing files to archive. 108 109 Raises: 110 QQError: If file transfer or removal fails. 111 """ 112 if not ( 113 files := self.get_files_matching_pattern( 114 dir, None, self._archive_format, None, False 115 ) 116 ): 117 logger.debug("Nothing to archive.") 118 return 119 120 logger.debug(f"Files to archive: {files}.") 121 122 Retryer( 123 self._batch_system.sync_selected, 124 dir, 125 self._archive, 126 socket.getfqdn(), 127 self._input_machine, 128 files, 129 max_tries=CFG.archiver.retry_tries, 130 wait_seconds=CFG.archiver.retry_wait, 131 ).run() 132 133 # remove the archived files 134 Retryer( 135 self._remove_files, 136 files, 137 max_tries=CFG.archiver.retry_tries, 138 wait_seconds=CFG.archiver.retry_wait, 139 ).run()
Archive all files matching the archive format in the specified directory.
Copies all files matching the archive pattern from directory
dir to the archive directory. After successfully transferring
the files, they are removed from the working directory.
Arguments:
- work_dir (Path): The directory containing files to archive.
Raises:
- QQError: If file transfer or removal fails.
141 def archive_runtime_files(self, job_name: str, cycle: int) -> None: 142 """ 143 Archive qq runtime files from a specific job located in the input directory. 144 145 The archived files are moved from the input directory to the archive directory. 146 147 Ensure that `job_name` does not contain special regex characters, or that any such 148 characters are properly escaped. 149 150 This function will archive all files whose names match `job_name`, regardless 151 of whether they have any qq-specific suffixes. 152 153 Args: 154 job_name (str): The name of the job. 155 cycle (int): Cycle number for which the files should be archived. 156 157 Raises: 158 QQError: If moving the runtime files fails. 159 """ 160 if not ( 161 files := self.get_files_matching_pattern( 162 self._input_dir, 163 self._input_machine, 164 # only use the stem of the job name, the extension will not be matched 165 job_name.split(".", maxsplit=1)[0], 166 # we do not need to use the cycle number here since the job_name should already be expanded 167 cycle=None, 168 include_qq_files=True, 169 ) 170 ): 171 logger.debug("No qq runtime files to archive.") 172 return 173 174 # the files are renamed to conform the the archive format 175 moved_files = [ 176 self._archive / f"{self._archive_format % cycle}{f.suffix}" for f in files 177 ] 178 179 logger.debug(f"qq runtime files to archive: {files}.") 180 logger.debug(f"qq runtime files after moving: {moved_files}.") 181 182 Retryer( 183 self._batch_system.move_remote_files, 184 self._input_machine, 185 files, 186 moved_files, 187 max_tries=CFG.archiver.retry_tries, 188 wait_seconds=CFG.archiver.retry_wait, 189 ).run()
Archive qq runtime files from a specific job located in the input directory.
The archived files are moved from the input directory to the archive directory.
Ensure that job_name does not contain special regex characters, or that any such
characters are properly escaped.
This function will archive all files whose names match job_name, regardless
of whether they have any qq-specific suffixes.
Arguments:
- job_name (str): The name of the job.
- cycle (int): Cycle number for which the files should be archived.
Raises:
- QQError: If moving the runtime files fails.
191 def get_files_matching_pattern( 192 self, 193 directory: Path, 194 host: str | None, 195 pattern: str, 196 cycle: int | None = None, 197 include_qq_files: bool = False, 198 ) -> list[Path]: 199 """ 200 Determine which files in a directory match a given pattern. 201 202 Args: 203 directory (Path): Directory to search for files. 204 host (str | None): Hostname if the directory is remote, 205 or None if it is available from the current machine. 206 pattern (str): A printf-style or regex pattern to match file stems. 207 cycle (int | None): Optional cycle number for printf-style patterns. 208 If provided, only files corresponding to that loop are returned. 209 If `None`, all matching files are returned. Defaults to `None`. 210 include_qq_files (bool): Whether to include qq runtime files. Defaults to False. 211 212 Returns: 213 list[Path]: A list of absolute (logical) paths to matching files. 214 """ 215 if cycle and is_printf_pattern(pattern): 216 try: 217 # try inserting the loop number into the printf pattern 218 regex = re.compile(f"{pattern % cycle}") 219 except Exception: 220 logger.debug( 221 f"Ignoring loop number since the provided pattern ('{pattern}') does not support it." 222 ) 223 regex = Archiver._prepare_regex_pattern(pattern) 224 else: 225 logger.debug( 226 f"Loop number not specified or the provided pattern ('{pattern}') does not support it." 227 ) 228 regex = Archiver._prepare_regex_pattern(pattern) 229 230 logger.debug(f"Regex for matching: {regex}.") 231 232 # the directory must exist 233 if host and host != socket.getfqdn(): 234 # remote directory 235 available_files: list[Path] = Retryer( 236 self._batch_system.list_remote_dir, 237 host, 238 directory, 239 max_tries=CFG.archiver.retry_tries, 240 wait_seconds=CFG.archiver.retry_wait, 241 ).run() 242 else: 243 # local directory 244 available_files = list(directory.iterdir()) 245 246 logger.debug(f"All available files: {available_files}.") 247 if include_qq_files: 248 # the stem of the file must contain the regex pattern 249 return [logical_resolve(f) for f in available_files if regex.search(f.stem)] 250 return [ 251 logical_resolve(f) 252 for f in available_files 253 if regex.search(f.stem) and f.suffix not in CFG.suffixes.all_suffixes 254 ]
Determine which files in a directory match a given pattern.
Arguments:
- directory (Path): Directory to search for files.
- host (str | None): Hostname if the directory is remote, or None if it is available from the current machine.
- pattern (str): A printf-style or regex pattern to match file stems.
- cycle (int | None): Optional cycle number for printf-style patterns.
If provided, only files corresponding to that loop are returned.
If
None, all matching files are returned. Defaults toNone. - include_qq_files (bool): Whether to include qq runtime files. Defaults to False.
Returns:
list[Path]: A list of absolute (logical) paths to matching files.
256 def create_init_file(self, cycle: int) -> None: 257 """ 258 Create an empty init file for the given cycle. 259 Used as a fallback when no valid archive file is produced, ensuring 260 the next iteration of the loop job can proceed normally. 261 Args: 262 cycle (int): The index of the next cycle of the loop job. 263 """ 264 Path(f"{self._archive_format % cycle}.init").touch()
Create an empty init file for the given cycle. Used as a fallback when no valid archive file is produced, ensuring the next iteration of the loop job can proceed normally.
Arguments:
- cycle (int): The index of the next cycle of the loop job.