qq_lib.properties.loop
Loop-job metadata and cycle-tracking utilities.
This module defines LoopInfo, a dataclass describing the iteration
parameters of a qq loop job: its cycle range, archive location, archive
naming format, and the current cycle as inferred from existing archived
files.
1# Released under MIT License. 2# Copyright (c) 2025-2026 Ladislav Bartos and Robert Vacha Lab 3 4""" 5Loop-job metadata and cycle-tracking utilities. 6 7This module defines `LoopInfo`, a dataclass describing the iteration 8parameters of a qq loop job: its cycle range, archive location, archive 9naming format, and the current cycle as inferred from existing archived 10files. 11""" 12 13import re 14from dataclasses import dataclass 15from pathlib import Path 16from typing import Self 17 18from qq_lib.archive.archiver import Archiver 19from qq_lib.core.config import CFG 20from qq_lib.core.error import QQError 21from qq_lib.core.logger import get_logger 22from qq_lib.core.logical_paths import logical_resolve 23from qq_lib.properties.transfer_mode import TransferMode 24 25logger = get_logger(__name__) 26 27 28@dataclass(init=False) 29class LoopInfo: 30 """ 31 Dataclass containing information about a qq loop job. 32 """ 33 34 start: int 35 end: int 36 archive: Path 37 archive_format: str 38 current: int 39 archive_mode: list[TransferMode] 40 41 def __init__( 42 self, 43 start: int, 44 end: int | None, 45 archive: Path, 46 archive_format: str, 47 current: int | None = None, 48 input_dir: Path | None = None, 49 archive_mode: list[TransferMode] | None = None, 50 ): 51 """ 52 Initialize loop job information with validation checks. 53 54 Args: 55 start (int): The starting cycle number. 56 end (int | None): The ending cycle number. Must be provided and >= `start`. 57 archive (Path): Path to the archive directory. 58 input_dir (Path | None): The job submission directory. Used to validate archive. 59 If `None`, no validation is performed. 60 archive_format (str): File naming pattern used for archived files. 61 current (int | None): The current cycle number. Defaults to `start` 62 if not provided. 63 archive_mode (list[TransferMode] | None): When should the files be archived? 64 Defaults to [Success()], meaning that archival should only be performed for 65 successfully completed jobs. 66 67 Raises: 68 QQError: If `end` is not provided, if `start > end`, if `current > end`, 69 or if the archive path is invalid. 70 """ 71 if not end: 72 raise QQError("Attribute 'loop-end' is undefined.") 73 74 self.archive = logical_resolve(archive) 75 if input_dir and self.archive == logical_resolve(input_dir): 76 raise QQError("Input directory cannot be used as the loop job's archive.") 77 78 self.archive_format = archive_format 79 self.archive_mode = archive_mode or TransferMode.multi_from_str( 80 CFG.transfer_files_options.default_archive_mode 81 ) 82 83 self.start = start 84 self.end = end 85 self.current = current or self.determine_cycle_from_archive() 86 87 if self.start < 0: 88 raise QQError(f"Attribute 'loop-start' ({self.start}) cannot be negative.") 89 90 if self.start > self.end: 91 raise QQError( 92 f"Attribute 'loop-start' ({self.start}) cannot be higher than 'loop-end' ({self.end})." 93 ) 94 95 if self.current > self.end: 96 raise QQError( 97 f"Current cycle number ({self.current}) cannot be higher than 'loop-end' ({self.end})." 98 ) 99 100 @classmethod 101 def from_dict(cls, data: dict[str, object]) -> Self: 102 """ 103 Reconstruct a LoopInfo instance from a dictionary produced by to_dict. 104 105 Args: 106 data (dict[str, object]): A dictionary as returned by `to_dict()`. 107 108 Returns: 109 LoopInfo: A new instance with fields populated from the dictionary. 110 """ 111 start = data.get("start") 112 end = data.get("end") 113 archive = data.get("archive") 114 archive_format = data.get("archive_format") 115 current = data.get("current") 116 archive_mode = data.get("archive_mode", ["success"]) 117 118 if not isinstance(start, int): 119 raise QQError(f"Field 'start' must be an int, got {type(start).__name__}.") 120 if not isinstance(end, int): 121 raise QQError(f"Field 'end' must be an int, got {type(end).__name__}.") 122 if not isinstance(archive, str): 123 raise QQError( 124 f"Field 'archive' must be a str, got {type(archive).__name__}." 125 ) 126 if not isinstance(archive_format, str): 127 raise QQError( 128 f"Field 'archive_format' must be a str, got {type(archive_format).__name__}." 129 ) 130 if not isinstance(current, int): 131 raise QQError( 132 f"Field 'current' must be an int, got {type(current).__name__}." 133 ) 134 if not isinstance(archive_mode, list) or not all( 135 isinstance(m, str) for m in archive_mode 136 ): 137 raise QQError("Field 'archive_mode' must be a list of strings.") 138 139 return cls( 140 start=start, 141 end=end, 142 archive=Path(archive), 143 archive_format=archive_format, 144 current=current, 145 archive_mode=[TransferMode.from_str(mode) for mode in archive_mode], # ty: ignore[invalid-argument-type] 146 ) 147 148 def to_dict(self) -> dict[str, object]: 149 """Return all fields as a dict.""" 150 return { 151 "start": self.start, 152 "end": self.end, 153 "archive": str(self.archive), 154 "archive_format": self.archive_format, 155 "current": self.current, 156 "archive_mode": [mode.to_str() for mode in self.archive_mode], 157 } 158 159 def determine_cycle_from_archive(self) -> int: 160 """ 161 Determine the current cycle number based on files in the archive directory. 162 163 Returns: 164 int: The detected maximum cycle number, or `self.start` if no valid cycle 165 can be inferred. 166 167 Notes: 168 - Only the first sequence of digits found in the stem is considered. 169 """ 170 171 # if the directory does not exist, use the starting cycle number 172 if not self.archive.is_dir(): 173 logger.debug( 174 f"Archive '{self.archive}' does not exist. Setting cycle number to start ({self.start})." 175 ) 176 return self.start 177 178 stem_pattern = Archiver._prepare_regex_pattern(self.archive_format) 179 logger.debug(f"Stem pattern: {stem_pattern}.") 180 181 # use start as default 182 max_number = self.start 183 for f in self.archive.iterdir(): 184 if not stem_pattern.search(f.stem): 185 continue 186 187 match = re.search(r"\d+", f.stem) 188 if match: 189 number = int(match.group(0)) 190 max_number = max(max_number, number) 191 192 return max_number
logger =
<Logger qq_lib.properties.loop (INFO)>
@dataclass(init=False)
class
LoopInfo:
29@dataclass(init=False) 30class LoopInfo: 31 """ 32 Dataclass containing information about a qq loop job. 33 """ 34 35 start: int 36 end: int 37 archive: Path 38 archive_format: str 39 current: int 40 archive_mode: list[TransferMode] 41 42 def __init__( 43 self, 44 start: int, 45 end: int | None, 46 archive: Path, 47 archive_format: str, 48 current: int | None = None, 49 input_dir: Path | None = None, 50 archive_mode: list[TransferMode] | None = None, 51 ): 52 """ 53 Initialize loop job information with validation checks. 54 55 Args: 56 start (int): The starting cycle number. 57 end (int | None): The ending cycle number. Must be provided and >= `start`. 58 archive (Path): Path to the archive directory. 59 input_dir (Path | None): The job submission directory. Used to validate archive. 60 If `None`, no validation is performed. 61 archive_format (str): File naming pattern used for archived files. 62 current (int | None): The current cycle number. Defaults to `start` 63 if not provided. 64 archive_mode (list[TransferMode] | None): When should the files be archived? 65 Defaults to [Success()], meaning that archival should only be performed for 66 successfully completed jobs. 67 68 Raises: 69 QQError: If `end` is not provided, if `start > end`, if `current > end`, 70 or if the archive path is invalid. 71 """ 72 if not end: 73 raise QQError("Attribute 'loop-end' is undefined.") 74 75 self.archive = logical_resolve(archive) 76 if input_dir and self.archive == logical_resolve(input_dir): 77 raise QQError("Input directory cannot be used as the loop job's archive.") 78 79 self.archive_format = archive_format 80 self.archive_mode = archive_mode or TransferMode.multi_from_str( 81 CFG.transfer_files_options.default_archive_mode 82 ) 83 84 self.start = start 85 self.end = end 86 self.current = current or self.determine_cycle_from_archive() 87 88 if self.start < 0: 89 raise QQError(f"Attribute 'loop-start' ({self.start}) cannot be negative.") 90 91 if self.start > self.end: 92 raise QQError( 93 f"Attribute 'loop-start' ({self.start}) cannot be higher than 'loop-end' ({self.end})." 94 ) 95 96 if self.current > self.end: 97 raise QQError( 98 f"Current cycle number ({self.current}) cannot be higher than 'loop-end' ({self.end})." 99 ) 100 101 @classmethod 102 def from_dict(cls, data: dict[str, object]) -> Self: 103 """ 104 Reconstruct a LoopInfo instance from a dictionary produced by to_dict. 105 106 Args: 107 data (dict[str, object]): A dictionary as returned by `to_dict()`. 108 109 Returns: 110 LoopInfo: A new instance with fields populated from the dictionary. 111 """ 112 start = data.get("start") 113 end = data.get("end") 114 archive = data.get("archive") 115 archive_format = data.get("archive_format") 116 current = data.get("current") 117 archive_mode = data.get("archive_mode", ["success"]) 118 119 if not isinstance(start, int): 120 raise QQError(f"Field 'start' must be an int, got {type(start).__name__}.") 121 if not isinstance(end, int): 122 raise QQError(f"Field 'end' must be an int, got {type(end).__name__}.") 123 if not isinstance(archive, str): 124 raise QQError( 125 f"Field 'archive' must be a str, got {type(archive).__name__}." 126 ) 127 if not isinstance(archive_format, str): 128 raise QQError( 129 f"Field 'archive_format' must be a str, got {type(archive_format).__name__}." 130 ) 131 if not isinstance(current, int): 132 raise QQError( 133 f"Field 'current' must be an int, got {type(current).__name__}." 134 ) 135 if not isinstance(archive_mode, list) or not all( 136 isinstance(m, str) for m in archive_mode 137 ): 138 raise QQError("Field 'archive_mode' must be a list of strings.") 139 140 return cls( 141 start=start, 142 end=end, 143 archive=Path(archive), 144 archive_format=archive_format, 145 current=current, 146 archive_mode=[TransferMode.from_str(mode) for mode in archive_mode], # ty: ignore[invalid-argument-type] 147 ) 148 149 def to_dict(self) -> dict[str, object]: 150 """Return all fields as a dict.""" 151 return { 152 "start": self.start, 153 "end": self.end, 154 "archive": str(self.archive), 155 "archive_format": self.archive_format, 156 "current": self.current, 157 "archive_mode": [mode.to_str() for mode in self.archive_mode], 158 } 159 160 def determine_cycle_from_archive(self) -> int: 161 """ 162 Determine the current cycle number based on files in the archive directory. 163 164 Returns: 165 int: The detected maximum cycle number, or `self.start` if no valid cycle 166 can be inferred. 167 168 Notes: 169 - Only the first sequence of digits found in the stem is considered. 170 """ 171 172 # if the directory does not exist, use the starting cycle number 173 if not self.archive.is_dir(): 174 logger.debug( 175 f"Archive '{self.archive}' does not exist. Setting cycle number to start ({self.start})." 176 ) 177 return self.start 178 179 stem_pattern = Archiver._prepare_regex_pattern(self.archive_format) 180 logger.debug(f"Stem pattern: {stem_pattern}.") 181 182 # use start as default 183 max_number = self.start 184 for f in self.archive.iterdir(): 185 if not stem_pattern.search(f.stem): 186 continue 187 188 match = re.search(r"\d+", f.stem) 189 if match: 190 number = int(match.group(0)) 191 max_number = max(max_number, number) 192 193 return max_number
Dataclass containing information about a qq loop job.
LoopInfo( start: int, end: int | None, archive: pathlib._local.Path, archive_format: str, current: int | None = None, input_dir: pathlib._local.Path | None = None, archive_mode: list[qq_lib.properties.transfer_mode.TransferMode] | None = None)
42 def __init__( 43 self, 44 start: int, 45 end: int | None, 46 archive: Path, 47 archive_format: str, 48 current: int | None = None, 49 input_dir: Path | None = None, 50 archive_mode: list[TransferMode] | None = None, 51 ): 52 """ 53 Initialize loop job information with validation checks. 54 55 Args: 56 start (int): The starting cycle number. 57 end (int | None): The ending cycle number. Must be provided and >= `start`. 58 archive (Path): Path to the archive directory. 59 input_dir (Path | None): The job submission directory. Used to validate archive. 60 If `None`, no validation is performed. 61 archive_format (str): File naming pattern used for archived files. 62 current (int | None): The current cycle number. Defaults to `start` 63 if not provided. 64 archive_mode (list[TransferMode] | None): When should the files be archived? 65 Defaults to [Success()], meaning that archival should only be performed for 66 successfully completed jobs. 67 68 Raises: 69 QQError: If `end` is not provided, if `start > end`, if `current > end`, 70 or if the archive path is invalid. 71 """ 72 if not end: 73 raise QQError("Attribute 'loop-end' is undefined.") 74 75 self.archive = logical_resolve(archive) 76 if input_dir and self.archive == logical_resolve(input_dir): 77 raise QQError("Input directory cannot be used as the loop job's archive.") 78 79 self.archive_format = archive_format 80 self.archive_mode = archive_mode or TransferMode.multi_from_str( 81 CFG.transfer_files_options.default_archive_mode 82 ) 83 84 self.start = start 85 self.end = end 86 self.current = current or self.determine_cycle_from_archive() 87 88 if self.start < 0: 89 raise QQError(f"Attribute 'loop-start' ({self.start}) cannot be negative.") 90 91 if self.start > self.end: 92 raise QQError( 93 f"Attribute 'loop-start' ({self.start}) cannot be higher than 'loop-end' ({self.end})." 94 ) 95 96 if self.current > self.end: 97 raise QQError( 98 f"Current cycle number ({self.current}) cannot be higher than 'loop-end' ({self.end})." 99 )
Initialize loop job information with validation checks.
Arguments:
- start (int): The starting cycle number.
- end (int | None): The ending cycle number. Must be provided and >=
start. - archive (Path): Path to the archive directory.
- input_dir (Path | None): The job submission directory. Used to validate archive.
If
None, no validation is performed. - archive_format (str): File naming pattern used for archived files.
- current (int | None): The current cycle number. Defaults to
startif not provided. - archive_mode (list[TransferMode] | None): When should the files be archived? Defaults to [Success()], meaning that archival should only be performed for successfully completed jobs.
Raises:
- QQError: If
endis not provided, ifstart > end, ifcurrent > end, or if the archive path is invalid.
archive_mode: list[qq_lib.properties.transfer_mode.TransferMode]
@classmethod
def
from_dict(cls, data: dict[str, object]) -> Self:
101 @classmethod 102 def from_dict(cls, data: dict[str, object]) -> Self: 103 """ 104 Reconstruct a LoopInfo instance from a dictionary produced by to_dict. 105 106 Args: 107 data (dict[str, object]): A dictionary as returned by `to_dict()`. 108 109 Returns: 110 LoopInfo: A new instance with fields populated from the dictionary. 111 """ 112 start = data.get("start") 113 end = data.get("end") 114 archive = data.get("archive") 115 archive_format = data.get("archive_format") 116 current = data.get("current") 117 archive_mode = data.get("archive_mode", ["success"]) 118 119 if not isinstance(start, int): 120 raise QQError(f"Field 'start' must be an int, got {type(start).__name__}.") 121 if not isinstance(end, int): 122 raise QQError(f"Field 'end' must be an int, got {type(end).__name__}.") 123 if not isinstance(archive, str): 124 raise QQError( 125 f"Field 'archive' must be a str, got {type(archive).__name__}." 126 ) 127 if not isinstance(archive_format, str): 128 raise QQError( 129 f"Field 'archive_format' must be a str, got {type(archive_format).__name__}." 130 ) 131 if not isinstance(current, int): 132 raise QQError( 133 f"Field 'current' must be an int, got {type(current).__name__}." 134 ) 135 if not isinstance(archive_mode, list) or not all( 136 isinstance(m, str) for m in archive_mode 137 ): 138 raise QQError("Field 'archive_mode' must be a list of strings.") 139 140 return cls( 141 start=start, 142 end=end, 143 archive=Path(archive), 144 archive_format=archive_format, 145 current=current, 146 archive_mode=[TransferMode.from_str(mode) for mode in archive_mode], # ty: ignore[invalid-argument-type] 147 )
Reconstruct a LoopInfo instance from a dictionary produced by to_dict.
Arguments:
- data (dict[str, object]): A dictionary as returned by
to_dict().
Returns:
LoopInfo: A new instance with fields populated from the dictionary.
def
to_dict(self) -> dict[str, object]:
149 def to_dict(self) -> dict[str, object]: 150 """Return all fields as a dict.""" 151 return { 152 "start": self.start, 153 "end": self.end, 154 "archive": str(self.archive), 155 "archive_format": self.archive_format, 156 "current": self.current, 157 "archive_mode": [mode.to_str() for mode in self.archive_mode], 158 }
Return all fields as a dict.
def
determine_cycle_from_archive(self) -> int:
160 def determine_cycle_from_archive(self) -> int: 161 """ 162 Determine the current cycle number based on files in the archive directory. 163 164 Returns: 165 int: The detected maximum cycle number, or `self.start` if no valid cycle 166 can be inferred. 167 168 Notes: 169 - Only the first sequence of digits found in the stem is considered. 170 """ 171 172 # if the directory does not exist, use the starting cycle number 173 if not self.archive.is_dir(): 174 logger.debug( 175 f"Archive '{self.archive}' does not exist. Setting cycle number to start ({self.start})." 176 ) 177 return self.start 178 179 stem_pattern = Archiver._prepare_regex_pattern(self.archive_format) 180 logger.debug(f"Stem pattern: {stem_pattern}.") 181 182 # use start as default 183 max_number = self.start 184 for f in self.archive.iterdir(): 185 if not stem_pattern.search(f.stem): 186 continue 187 188 match = re.search(r"\d+", f.stem) 189 if match: 190 number = int(match.group(0)) 191 max_number = max(max_number, number) 192 193 return max_number
Determine the current cycle number based on files in the archive directory.
Returns:
int: The detected maximum cycle number, or
self.startif no valid cycle can be inferred.
Notes:
- Only the first sequence of digits found in the stem is considered.