qq_lib.properties.loop

Loop-job metadata and cycle-tracking utilities.

This module defines LoopInfo, a dataclass describing the iteration parameters of a qq loop job: its cycle range, archive location, archive naming format, and the current cycle as inferred from existing archived files.

  1# Released under MIT License.
  2# Copyright (c) 2025-2026 Ladislav Bartos and Robert Vacha Lab
  3
  4"""
  5Loop-job metadata and cycle-tracking utilities.
  6
  7This module defines `LoopInfo`, a dataclass describing the iteration
  8parameters of a qq loop job: its cycle range, archive location, archive
  9naming format, and the current cycle as inferred from existing archived
 10files.
 11"""
 12
 13import re
 14from dataclasses import dataclass
 15from pathlib import Path
 16from typing import Self
 17
 18from qq_lib.archive.archiver import Archiver
 19from qq_lib.core.config import CFG
 20from qq_lib.core.error import QQError
 21from qq_lib.core.logger import get_logger
 22from qq_lib.core.logical_paths import logical_resolve
 23from qq_lib.properties.transfer_mode import TransferMode
 24
 25logger = get_logger(__name__)
 26
 27
 28@dataclass(init=False)
 29class LoopInfo:
 30    """
 31    Dataclass containing information about a qq loop job.
 32    """
 33
 34    start: int
 35    end: int
 36    archive: Path
 37    archive_format: str
 38    current: int
 39    archive_mode: list[TransferMode]
 40
 41    def __init__(
 42        self,
 43        start: int,
 44        end: int | None,
 45        archive: Path,
 46        archive_format: str,
 47        current: int | None = None,
 48        input_dir: Path | None = None,
 49        archive_mode: list[TransferMode] | None = None,
 50    ):
 51        """
 52        Initialize loop job information with validation checks.
 53
 54        Args:
 55            start (int): The starting cycle number.
 56            end (int | None): The ending cycle number. Must be provided and >= `start`.
 57            archive (Path): Path to the archive directory.
 58            input_dir (Path | None): The job submission directory. Used to validate archive.
 59                If `None`, no validation is performed.
 60            archive_format (str): File naming pattern used for archived files.
 61            current (int | None): The current cycle number. Defaults to `start`
 62                if not provided.
 63            archive_mode (list[TransferMode] | None): When should the files be archived?
 64                Defaults to [Success()], meaning that archival should only be performed for
 65                successfully completed jobs.
 66
 67        Raises:
 68            QQError: If `end` is not provided, if `start > end`, if `current > end`,
 69                or if the archive path is invalid.
 70        """
 71        if not end:
 72            raise QQError("Attribute 'loop-end' is undefined.")
 73
 74        self.archive = logical_resolve(archive)
 75        if input_dir and self.archive == logical_resolve(input_dir):
 76            raise QQError("Input directory cannot be used as the loop job's archive.")
 77
 78        self.archive_format = archive_format
 79        self.archive_mode = archive_mode or TransferMode.multi_from_str(
 80            CFG.transfer_files_options.default_archive_mode
 81        )
 82
 83        self.start = start
 84        self.end = end
 85        self.current = current or self.determine_cycle_from_archive()
 86
 87        if self.start < 0:
 88            raise QQError(f"Attribute 'loop-start' ({self.start}) cannot be negative.")
 89
 90        if self.start > self.end:
 91            raise QQError(
 92                f"Attribute 'loop-start' ({self.start}) cannot be higher than 'loop-end' ({self.end})."
 93            )
 94
 95        if self.current > self.end:
 96            raise QQError(
 97                f"Current cycle number ({self.current}) cannot be higher than 'loop-end' ({self.end})."
 98            )
 99
100    @classmethod
101    def from_dict(cls, data: dict[str, object]) -> Self:
102        """
103        Reconstruct a LoopInfo instance from a dictionary produced by to_dict.
104
105        Args:
106            data (dict[str, object]): A dictionary as returned by `to_dict()`.
107
108        Returns:
109            LoopInfo: A new instance with fields populated from the dictionary.
110        """
111        start = data.get("start")
112        end = data.get("end")
113        archive = data.get("archive")
114        archive_format = data.get("archive_format")
115        current = data.get("current")
116        archive_mode = data.get("archive_mode", ["success"])
117
118        if not isinstance(start, int):
119            raise QQError(f"Field 'start' must be an int, got {type(start).__name__}.")
120        if not isinstance(end, int):
121            raise QQError(f"Field 'end' must be an int, got {type(end).__name__}.")
122        if not isinstance(archive, str):
123            raise QQError(
124                f"Field 'archive' must be a str, got {type(archive).__name__}."
125            )
126        if not isinstance(archive_format, str):
127            raise QQError(
128                f"Field 'archive_format' must be a str, got {type(archive_format).__name__}."
129            )
130        if not isinstance(current, int):
131            raise QQError(
132                f"Field 'current' must be an int, got {type(current).__name__}."
133            )
134        if not isinstance(archive_mode, list) or not all(
135            isinstance(m, str) for m in archive_mode
136        ):
137            raise QQError("Field 'archive_mode' must be a list of strings.")
138
139        return cls(
140            start=start,
141            end=end,
142            archive=Path(archive),
143            archive_format=archive_format,
144            current=current,
145            archive_mode=[TransferMode.from_str(mode) for mode in archive_mode],  # ty: ignore[invalid-argument-type]
146        )
147
148    def to_dict(self) -> dict[str, object]:
149        """Return all fields as a dict."""
150        return {
151            "start": self.start,
152            "end": self.end,
153            "archive": str(self.archive),
154            "archive_format": self.archive_format,
155            "current": self.current,
156            "archive_mode": [mode.to_str() for mode in self.archive_mode],
157        }
158
159    def determine_cycle_from_archive(self) -> int:
160        """
161        Determine the current cycle number based on files in the archive directory.
162
163        Returns:
164            int: The detected maximum cycle number, or `self.start` if no valid cycle
165                can be inferred.
166
167        Notes:
168            - Only the first sequence of digits found in the stem is considered.
169        """
170
171        # if the directory does not exist, use the starting cycle number
172        if not self.archive.is_dir():
173            logger.debug(
174                f"Archive '{self.archive}' does not exist. Setting cycle number to start ({self.start})."
175            )
176            return self.start
177
178        stem_pattern = Archiver._prepare_regex_pattern(self.archive_format)
179        logger.debug(f"Stem pattern: {stem_pattern}.")
180
181        # use start as default
182        max_number = self.start
183        for f in self.archive.iterdir():
184            if not stem_pattern.search(f.stem):
185                continue
186
187            match = re.search(r"\d+", f.stem)
188            if match:
189                number = int(match.group(0))
190                max_number = max(max_number, number)
191
192        return max_number
logger = <Logger qq_lib.properties.loop (INFO)>
@dataclass(init=False)
class LoopInfo:
 29@dataclass(init=False)
 30class LoopInfo:
 31    """
 32    Dataclass containing information about a qq loop job.
 33    """
 34
 35    start: int
 36    end: int
 37    archive: Path
 38    archive_format: str
 39    current: int
 40    archive_mode: list[TransferMode]
 41
 42    def __init__(
 43        self,
 44        start: int,
 45        end: int | None,
 46        archive: Path,
 47        archive_format: str,
 48        current: int | None = None,
 49        input_dir: Path | None = None,
 50        archive_mode: list[TransferMode] | None = None,
 51    ):
 52        """
 53        Initialize loop job information with validation checks.
 54
 55        Args:
 56            start (int): The starting cycle number.
 57            end (int | None): The ending cycle number. Must be provided and >= `start`.
 58            archive (Path): Path to the archive directory.
 59            input_dir (Path | None): The job submission directory. Used to validate archive.
 60                If `None`, no validation is performed.
 61            archive_format (str): File naming pattern used for archived files.
 62            current (int | None): The current cycle number. Defaults to `start`
 63                if not provided.
 64            archive_mode (list[TransferMode] | None): When should the files be archived?
 65                Defaults to [Success()], meaning that archival should only be performed for
 66                successfully completed jobs.
 67
 68        Raises:
 69            QQError: If `end` is not provided, if `start > end`, if `current > end`,
 70                or if the archive path is invalid.
 71        """
 72        if not end:
 73            raise QQError("Attribute 'loop-end' is undefined.")
 74
 75        self.archive = logical_resolve(archive)
 76        if input_dir and self.archive == logical_resolve(input_dir):
 77            raise QQError("Input directory cannot be used as the loop job's archive.")
 78
 79        self.archive_format = archive_format
 80        self.archive_mode = archive_mode or TransferMode.multi_from_str(
 81            CFG.transfer_files_options.default_archive_mode
 82        )
 83
 84        self.start = start
 85        self.end = end
 86        self.current = current or self.determine_cycle_from_archive()
 87
 88        if self.start < 0:
 89            raise QQError(f"Attribute 'loop-start' ({self.start}) cannot be negative.")
 90
 91        if self.start > self.end:
 92            raise QQError(
 93                f"Attribute 'loop-start' ({self.start}) cannot be higher than 'loop-end' ({self.end})."
 94            )
 95
 96        if self.current > self.end:
 97            raise QQError(
 98                f"Current cycle number ({self.current}) cannot be higher than 'loop-end' ({self.end})."
 99            )
100
101    @classmethod
102    def from_dict(cls, data: dict[str, object]) -> Self:
103        """
104        Reconstruct a LoopInfo instance from a dictionary produced by to_dict.
105
106        Args:
107            data (dict[str, object]): A dictionary as returned by `to_dict()`.
108
109        Returns:
110            LoopInfo: A new instance with fields populated from the dictionary.
111        """
112        start = data.get("start")
113        end = data.get("end")
114        archive = data.get("archive")
115        archive_format = data.get("archive_format")
116        current = data.get("current")
117        archive_mode = data.get("archive_mode", ["success"])
118
119        if not isinstance(start, int):
120            raise QQError(f"Field 'start' must be an int, got {type(start).__name__}.")
121        if not isinstance(end, int):
122            raise QQError(f"Field 'end' must be an int, got {type(end).__name__}.")
123        if not isinstance(archive, str):
124            raise QQError(
125                f"Field 'archive' must be a str, got {type(archive).__name__}."
126            )
127        if not isinstance(archive_format, str):
128            raise QQError(
129                f"Field 'archive_format' must be a str, got {type(archive_format).__name__}."
130            )
131        if not isinstance(current, int):
132            raise QQError(
133                f"Field 'current' must be an int, got {type(current).__name__}."
134            )
135        if not isinstance(archive_mode, list) or not all(
136            isinstance(m, str) for m in archive_mode
137        ):
138            raise QQError("Field 'archive_mode' must be a list of strings.")
139
140        return cls(
141            start=start,
142            end=end,
143            archive=Path(archive),
144            archive_format=archive_format,
145            current=current,
146            archive_mode=[TransferMode.from_str(mode) for mode in archive_mode],  # ty: ignore[invalid-argument-type]
147        )
148
149    def to_dict(self) -> dict[str, object]:
150        """Return all fields as a dict."""
151        return {
152            "start": self.start,
153            "end": self.end,
154            "archive": str(self.archive),
155            "archive_format": self.archive_format,
156            "current": self.current,
157            "archive_mode": [mode.to_str() for mode in self.archive_mode],
158        }
159
160    def determine_cycle_from_archive(self) -> int:
161        """
162        Determine the current cycle number based on files in the archive directory.
163
164        Returns:
165            int: The detected maximum cycle number, or `self.start` if no valid cycle
166                can be inferred.
167
168        Notes:
169            - Only the first sequence of digits found in the stem is considered.
170        """
171
172        # if the directory does not exist, use the starting cycle number
173        if not self.archive.is_dir():
174            logger.debug(
175                f"Archive '{self.archive}' does not exist. Setting cycle number to start ({self.start})."
176            )
177            return self.start
178
179        stem_pattern = Archiver._prepare_regex_pattern(self.archive_format)
180        logger.debug(f"Stem pattern: {stem_pattern}.")
181
182        # use start as default
183        max_number = self.start
184        for f in self.archive.iterdir():
185            if not stem_pattern.search(f.stem):
186                continue
187
188            match = re.search(r"\d+", f.stem)
189            if match:
190                number = int(match.group(0))
191                max_number = max(max_number, number)
192
193        return max_number

Dataclass containing information about a qq loop job.

LoopInfo( start: int, end: int | None, archive: pathlib._local.Path, archive_format: str, current: int | None = None, input_dir: pathlib._local.Path | None = None, archive_mode: list[qq_lib.properties.transfer_mode.TransferMode] | None = None)
42    def __init__(
43        self,
44        start: int,
45        end: int | None,
46        archive: Path,
47        archive_format: str,
48        current: int | None = None,
49        input_dir: Path | None = None,
50        archive_mode: list[TransferMode] | None = None,
51    ):
52        """
53        Initialize loop job information with validation checks.
54
55        Args:
56            start (int): The starting cycle number.
57            end (int | None): The ending cycle number. Must be provided and >= `start`.
58            archive (Path): Path to the archive directory.
59            input_dir (Path | None): The job submission directory. Used to validate archive.
60                If `None`, no validation is performed.
61            archive_format (str): File naming pattern used for archived files.
62            current (int | None): The current cycle number. Defaults to `start`
63                if not provided.
64            archive_mode (list[TransferMode] | None): When should the files be archived?
65                Defaults to [Success()], meaning that archival should only be performed for
66                successfully completed jobs.
67
68        Raises:
69            QQError: If `end` is not provided, if `start > end`, if `current > end`,
70                or if the archive path is invalid.
71        """
72        if not end:
73            raise QQError("Attribute 'loop-end' is undefined.")
74
75        self.archive = logical_resolve(archive)
76        if input_dir and self.archive == logical_resolve(input_dir):
77            raise QQError("Input directory cannot be used as the loop job's archive.")
78
79        self.archive_format = archive_format
80        self.archive_mode = archive_mode or TransferMode.multi_from_str(
81            CFG.transfer_files_options.default_archive_mode
82        )
83
84        self.start = start
85        self.end = end
86        self.current = current or self.determine_cycle_from_archive()
87
88        if self.start < 0:
89            raise QQError(f"Attribute 'loop-start' ({self.start}) cannot be negative.")
90
91        if self.start > self.end:
92            raise QQError(
93                f"Attribute 'loop-start' ({self.start}) cannot be higher than 'loop-end' ({self.end})."
94            )
95
96        if self.current > self.end:
97            raise QQError(
98                f"Current cycle number ({self.current}) cannot be higher than 'loop-end' ({self.end})."
99            )

Initialize loop job information with validation checks.

Arguments:
  • start (int): The starting cycle number.
  • end (int | None): The ending cycle number. Must be provided and >= start.
  • archive (Path): Path to the archive directory.
  • input_dir (Path | None): The job submission directory. Used to validate archive. If None, no validation is performed.
  • archive_format (str): File naming pattern used for archived files.
  • current (int | None): The current cycle number. Defaults to start if not provided.
  • archive_mode (list[TransferMode] | None): When should the files be archived? Defaults to [Success()], meaning that archival should only be performed for successfully completed jobs.
Raises:
  • QQError: If end is not provided, if start > end, if current > end, or if the archive path is invalid.
start: int
end: int
archive: pathlib._local.Path
archive_format: str
current: int
@classmethod
def from_dict(cls, data: dict[str, object]) -> Self:
101    @classmethod
102    def from_dict(cls, data: dict[str, object]) -> Self:
103        """
104        Reconstruct a LoopInfo instance from a dictionary produced by to_dict.
105
106        Args:
107            data (dict[str, object]): A dictionary as returned by `to_dict()`.
108
109        Returns:
110            LoopInfo: A new instance with fields populated from the dictionary.
111        """
112        start = data.get("start")
113        end = data.get("end")
114        archive = data.get("archive")
115        archive_format = data.get("archive_format")
116        current = data.get("current")
117        archive_mode = data.get("archive_mode", ["success"])
118
119        if not isinstance(start, int):
120            raise QQError(f"Field 'start' must be an int, got {type(start).__name__}.")
121        if not isinstance(end, int):
122            raise QQError(f"Field 'end' must be an int, got {type(end).__name__}.")
123        if not isinstance(archive, str):
124            raise QQError(
125                f"Field 'archive' must be a str, got {type(archive).__name__}."
126            )
127        if not isinstance(archive_format, str):
128            raise QQError(
129                f"Field 'archive_format' must be a str, got {type(archive_format).__name__}."
130            )
131        if not isinstance(current, int):
132            raise QQError(
133                f"Field 'current' must be an int, got {type(current).__name__}."
134            )
135        if not isinstance(archive_mode, list) or not all(
136            isinstance(m, str) for m in archive_mode
137        ):
138            raise QQError("Field 'archive_mode' must be a list of strings.")
139
140        return cls(
141            start=start,
142            end=end,
143            archive=Path(archive),
144            archive_format=archive_format,
145            current=current,
146            archive_mode=[TransferMode.from_str(mode) for mode in archive_mode],  # ty: ignore[invalid-argument-type]
147        )

Reconstruct a LoopInfo instance from a dictionary produced by to_dict.

Arguments:
  • data (dict[str, object]): A dictionary as returned by to_dict().
Returns:

LoopInfo: A new instance with fields populated from the dictionary.

def to_dict(self) -> dict[str, object]:
149    def to_dict(self) -> dict[str, object]:
150        """Return all fields as a dict."""
151        return {
152            "start": self.start,
153            "end": self.end,
154            "archive": str(self.archive),
155            "archive_format": self.archive_format,
156            "current": self.current,
157            "archive_mode": [mode.to_str() for mode in self.archive_mode],
158        }

Return all fields as a dict.

def determine_cycle_from_archive(self) -> int:
160    def determine_cycle_from_archive(self) -> int:
161        """
162        Determine the current cycle number based on files in the archive directory.
163
164        Returns:
165            int: The detected maximum cycle number, or `self.start` if no valid cycle
166                can be inferred.
167
168        Notes:
169            - Only the first sequence of digits found in the stem is considered.
170        """
171
172        # if the directory does not exist, use the starting cycle number
173        if not self.archive.is_dir():
174            logger.debug(
175                f"Archive '{self.archive}' does not exist. Setting cycle number to start ({self.start})."
176            )
177            return self.start
178
179        stem_pattern = Archiver._prepare_regex_pattern(self.archive_format)
180        logger.debug(f"Stem pattern: {stem_pattern}.")
181
182        # use start as default
183        max_number = self.start
184        for f in self.archive.iterdir():
185            if not stem_pattern.search(f.stem):
186                continue
187
188            match = re.search(r"\d+", f.stem)
189            if match:
190                number = int(match.group(0))
191                max_number = max(max_number, number)
192
193        return max_number

Determine the current cycle number based on files in the archive directory.

Returns:

int: The detected maximum cycle number, or self.start if no valid cycle can be inferred.

Notes:
  • Only the first sequence of digits found in the stem is considered.