qq_lib.core.common

General utility functions for the qq library.

This module provides helpers for working with qq job files, time durations, YAML I/O, string normalization, user prompts, path manipulation, and job-name construction.

  1# Released under MIT License.
  2# Copyright (c) 2025-2026 Ladislav Bartos and Robert Vacha Lab
  3
  4"""
  5General utility functions for the qq library.
  6
  7This module provides helpers for working with qq job files, time durations,
  8YAML I/O, string normalization, user prompts, path manipulation, and job-name construction.
  9"""
 10
 11import re
 12from datetime import timedelta
 13from functools import lru_cache
 14from pathlib import Path
 15
 16import readchar
 17import yaml
 18from rich.console import Console
 19from rich.live import Live
 20from rich.text import Text
 21
 22from qq_lib.core.logical_paths import logical_resolve
 23
 24from .config import CFG
 25from .error import QQError
 26from .logger import get_logger
 27
 28logger = get_logger(__name__)
 29
 30
 31@lru_cache(maxsize=1)
 32def load_yaml_dumper() -> type[yaml.Dumper]:
 33    """Return the fastest available YAML dumper (CDumper if possible)."""
 34    try:
 35        from yaml import CDumper as Dumper
 36
 37        logger.debug("Loaded YAML CDumper.")
 38    except ImportError:
 39        from yaml import Dumper
 40
 41        logger.debug("Loaded default YAML dumper.")
 42    return Dumper
 43
 44
 45@lru_cache(maxsize=1)
 46def load_yaml_loader() -> type[yaml.SafeLoader]:
 47    """Return the fastest available safe YAML loader (CSafeLoader if possible)."""
 48    try:
 49        from yaml import (
 50            CSafeLoader as SafeLoader,
 51        )
 52
 53        logger.debug("Loaded YAML CLoader.")
 54    except ImportError:
 55        from yaml import SafeLoader
 56
 57        logger.debug("Loaded default YAML loader.")
 58
 59    return SafeLoader
 60
 61
 62def get_files_with_suffix(directory: Path, suffix: str) -> list[Path]:
 63    """
 64    Retrieve all files in a directory that have the specified file suffix.
 65
 66    Args:
 67        directory (Path): The directory to search in.
 68        suffix (str): The file suffix to match (including the dot, e.g., '.txt').
 69
 70    Returns:
 71        list[Path]: A list of Path objects representing files with the given suffix.
 72    """
 73    files = []
 74    for file in directory.iterdir():
 75        if file.is_file() and file.suffix == suffix:
 76            files.append(file)
 77
 78    return files
 79
 80
 81def get_runtime_files(directory: Path) -> list[Path]:
 82    """
 83    Retrieve all qq runtime files in a directory.
 84
 85    Args:
 86        directory (Path): The directory to search in.
 87
 88    Returns:
 89        list[Path]: A list of Path objects representing qq runtime files.
 90    """
 91    files = []
 92    for suffix in CFG.suffixes.all_suffixes:
 93        files.extend(get_files_with_suffix(directory, suffix))
 94
 95    return files
 96
 97
 98def get_info_file(directory: Path) -> Path:
 99    """
100    Locate the qq job info file in a directory.
101
102    This function searches for files with suffix `CFG.suffixes.qq_info` in the
103    provided directory. It raises an error if none or multiple info files are found.
104
105    Args:
106        directory (Path): The directory to search in.
107
108    Returns:
109        Path: The Path object of the detected qq job info file.
110
111    Raises:
112        QQError: If no info file is found or multiple info files are detected.
113    """
114    info_files = get_info_files(directory)
115    if len(info_files) == 0:
116        raise QQError("No qq job info file found.")
117    if len(info_files) > 1:
118        raise QQError("Multiple qq job info files found.")
119
120    return info_files[0]
121
122
123def get_info_files(directory: Path) -> list[Path]:
124    """
125    Retrieve all qq job info files in a directory.
126
127    This function searches for files with suffix `CFG.suffixes.qq_info` in the
128    provided directory. The files are sorted by their last modification time
129    (with the newest modified file being last in the list).
130
131    Args:
132        directory (Path): The directory to search in.
133
134    Returns:
135        list[Path]: A list of Path objects representing the detected qq job info files.
136    """
137    info_files = get_files_with_suffix(directory, CFG.suffixes.qq_info)
138    logger.debug(f"Detected the following qq info files: {info_files}.")
139
140    return sorted(info_files, key=lambda f: f.stat().st_mtime)
141
142
143def get_info_file_from_job_id(job_id: str) -> Path:
144    """
145    Get path to the qq info file corresponding to a job with the given ID.
146    The batch system to use is obtained from the environment variable or guessed.
147
148    Args:
149        job_id (str): The ID of the job for which to retrieve the info file.
150
151    Returns:
152        Path: Absolute path to the qq job info file.
153
154    Raises:
155        QQError: If the batch system could not be guessed,
156        the job does not exist or is not a qq job.
157    """
158
159    from qq_lib.batch.interface import BatchInterface, BatchJobInterface
160
161    BatchSystem = BatchInterface.from_env_var_or_guess()
162    job_info: BatchJobInterface = BatchSystem.get_batch_job(job_id)
163
164    if job_info.is_empty():
165        raise QQError(f"Job '{job_id}' does not exist.")
166
167    if not (path := job_info.get_info_file()):
168        raise QQError(f"Job '{job_id}' is not a valid qq job.")
169
170    return path
171
172
173def get_info_files_from_job_id_or_dir(job_id: str | None) -> list[Path]:
174    """
175    Retrieve qq job info files based on a job ID or from the current directory.
176
177    Args:
178        job_id (str | None): The ID of the qq job to retrieve the info file for.
179            If None, the function searches for qq job info files in the current directory.
180
181    Returns:
182        list[Path]: A list containing the qq job info file(s). If a job ID is provided,
183            the list contains a single Path. If not, it contains all detected info files
184            in the current directory.
185
186    Raises:
187        QQError: If the info file corresponding to the given job ID does not exist
188            or is not reachable, or if no qq job info file is found in the current
189            directory when no job ID is provided.
190    """
191    if job_id:
192        info_file = get_info_file_from_job_id(job_id)
193        # check that the detected info file exists and we have permissions to read it
194        try:
195            missing = not info_file.is_file()
196        except PermissionError:
197            missing = True
198
199        if missing:
200            raise QQError(
201                f"Info file for job '{job_id}' does not exist or is not reachable."
202            )
203
204        return [info_file]
205
206    # get info files from the directory
207    info_files = get_info_files(Path())
208    if not info_files:
209        raise QQError("No qq job info file found.")
210
211    return info_files
212
213
214def yes_or_no_prompt(prompt: str) -> bool:
215    """
216    Display an interactive yes/no prompt to the user and return the selection.
217
218    The prompt highlights the pressed key ('y' in green for yes, 'N' in red for no)
219    and defaults to 'No' if the user presses any key other than 'y'.
220
221    Args:
222        prompt (str): The text to display as the question.
223
224    Returns:
225        bool: True if the user selects 'yes' (presses 'y'), False otherwise.
226    """
227    prompt = f"   {prompt} "
228    text = (
229        Text("PROMPT", style="magenta")
230        + Text(prompt, style="default")
231        + Text("[y/N]", style="bold default")
232    )
233
234    with Live(text, refresh_per_second=1) as live:
235        key = readchar.readkey().lower()
236
237        # highlight the pressed key
238        if key == "y":
239            choice = (
240                Text("[", style="bold default")
241                + Text("y", style="bold green")
242                + Text("/N]", style="bold default")
243            )
244        else:
245            choice = (
246                Text("[y/", style="bold default")
247                + Text("N", style="bold red")
248                + Text("]", style="bold default")
249            )
250
251        live.update(
252            Text("PROMPT", style="magenta") + Text(prompt, style="default") + choice
253        )
254
255    return key == "y"
256
257
258def format_duration(td: timedelta) -> str:
259    """
260    Convert a timedelta into a human-readable string showing only relevant units.
261
262    The output string includes weeks, days, hours, minutes, and seconds, but omits
263    units that are zero.
264
265    Args:
266        td (timedelta): The duration to format.
267
268    Returns:
269        str: A formatted string representing the duration, e.g., '1d 2h 3m 4s'.
270    """
271    total_seconds = int(td.total_seconds())
272
273    days_total, remainder = divmod(total_seconds, 86400)
274    weeks, days = divmod(days_total, 7)
275    hours, remainder = divmod(remainder, 3600)
276    minutes, seconds = divmod(remainder, 60)
277
278    parts = []
279    if weeks > 0:
280        parts.append(f"{weeks}w")
281    if days > 0:
282        parts.append(f"{days}d")
283    if hours > 0:
284        parts.append(f"{hours}h")
285    if minutes > 0:
286        parts.append(f"{minutes}m")
287    if seconds > 0 or total_seconds == 0:
288        parts.append(f"{seconds}s")
289
290    return " ".join(parts)
291
292
293def format_duration_wdhhmmss(td: timedelta) -> str:
294    """
295    Format a timedelta into a human-readable string: Xw Yd HH:MM:SS.
296
297    Weeks and days are included only if non-zero.
298    Hours, minutes, and seconds are always displayed with zero-padding.
299
300    Examples:
301        0:00:45         -> "00:00:45"
302        1 day, 2:03:04  -> "1d 02:03:04"
303        10 days, 5:06:07 -> "1w 3d 05:06:07"
304
305    Args:
306        td (timedelta): The duration to format.
307
308    Returns:
309        str: Formatted string in "Xw Yd HH:MM:SS" format.
310    """
311    total_seconds = int(td.total_seconds())
312
313    weeks, remainder = divmod(total_seconds, 7 * 24 * 3600)
314    days, remainder = divmod(remainder, 24 * 3600)
315    hours, remainder = divmod(remainder, 3600)
316    minutes, seconds = divmod(remainder, 60)
317
318    parts = []
319    if weeks > 0:
320        parts.append(f"{weeks}w")
321    if days > 0:
322        parts.append(f"{days}d")
323
324    parts.append(f"{hours:02}:{minutes:02}:{seconds:02}")
325
326    return " ".join(parts)
327
328
329def hhmmss_to_duration(timestr: str) -> timedelta:
330    """
331    Convert a time string in HH:MM:SS (or HHH:MM:SS) format to a timedelta object.
332
333    Examples:
334        "0:00:00"   -> 0 seconds
335        "1:23:45"   -> 1 hour, 23 minutes, 45 seconds
336        "100:00:00" -> 100 hours
337
338    Args:
339        timestr (str): Input string in HH:MM:SS format.
340
341    Returns:
342        timedelta: The corresponding duration.
343
344    Raises:
345        QQError: If the input string is not in a valid HH:MM:SS format or contains
346                    invalid numeric values.
347    """
348    pattern = re.compile(r"^\s*(\d+):([0-5]?\d):([0-5]?\d)\s*$")
349    match = pattern.fullmatch(timestr)
350    if not match:
351        raise QQError(f"Invalid HH:MM:SS time string '{timestr}'.")
352
353    hours, minutes, seconds = map(int, match.groups())
354
355    return timedelta(hours=hours, minutes=minutes, seconds=seconds)
356
357
358def dhhmmss_to_duration(timestr: str) -> timedelta:
359    """
360    Convert a time string in optional D-HH:MM:SS (or HH:MM:SS / HHH:MM:SS) format to a timedelta object.
361
362    Examples:
363        "0:00:00"      -> 0 seconds
364        "1:23:45"      -> 1 hour, 23 minutes, 45 seconds
365        "100:00:00"    -> 100 hours
366        "2-12:34:56"   -> 2 days, 12 hours, 34 minutes, 56 seconds
367
368    Args:
369        timestr (str): Input string in optional D-HH:MM:SS format.
370
371    Returns:
372        timedelta: The corresponding duration.
373
374    Raises:
375        QQError: If the input string is not in a valid format or contains invalid numeric values.
376    """
377    pattern = re.compile(r"^\s*(?:(\d+)-)?(\d+):([0-5]?\d):([0-5]?\d)\s*$")
378    match = pattern.fullmatch(timestr)
379    if not match:
380        raise QQError(f"Invalid D-HH:MM:SS time string '{timestr}'.")
381
382    days_str, hours_str, minutes_str, seconds_str = match.groups()
383    days = int(days_str) if days_str else 0
384    hours = int(hours_str)
385    minutes = int(minutes_str)
386    seconds = int(seconds_str)
387
388    return timedelta(days=days, hours=hours, minutes=minutes, seconds=seconds)
389
390
391def normalize(s: str) -> str:
392    """
393    Normalize a string for consistent comparison.
394
395    The string is converted to lowercase and all hyphens and underscores are removed.
396
397    Args:
398        s (str): The input string to normalize.
399
400    Returns:
401        str: The normalized string.
402    """
403    return s.lower().replace("-", "").replace("_", "")
404
405
406def equals_normalized(a: str, b: str) -> bool:
407    """
408    Compare two strings for equality, ignoring case, hyphens, and underscores.
409
410    Args:
411        a (str): First string to compare.
412        b (str): Second string to compare.
413
414    Returns:
415        bool: True if the normalized strings are equal, False otherwise.
416    """
417
418    return normalize(a) == normalize(b)
419
420
421def convert_absolute_to_relative(files: list[Path], target: Path) -> list[Path]:
422    """
423    Convert a list of absolute paths into paths relative to a target directory.
424
425    Each file in `files` must be located inside `target` or one of its
426    subdirectories. If any file is outside `target`, a `QQError` is raised.
427
428    This function works even for remote files or paths to non-existent files.
429
430    Args:
431        files (list[Path]): A list of absolute file paths to convert.
432        target (Path): The target directory against which paths are made relative.
433
434    Returns:
435        list[Path]: A list of paths relative to `target`.
436
437    Raises:
438        QQError: If any file in `files` is not located within `target`.
439    """
440    relative = []
441    target_parts = target.parts
442
443    for file in files:
444        file_parts = file.parts
445
446        # file must starts with the target path
447        if file_parts[: len(target_parts)] != target_parts:
448            raise QQError(f"Item '{file}' is not in target directory '{target}'.")
449
450        # create a relative path
451        rel_path = Path(*file_parts[len(target_parts) :])
452        relative.append(rel_path)
453
454    logger.debug(f"Converted paths: {relative}.")
455    return relative
456
457
458def wdhms_to_hhmmss(timestr: str) -> str:
459    """
460    Convert a time specification in the wdhms format into (H)HH:MM:SS.
461
462    The accepted format is a sequence of one or more integer + unit tokens,
463    where unit is one of:
464      w = weeks, d = days, h = hours, m = minutes, s = seconds
465
466    Tokens may be compact (e.g. "1w2d3h") or space-separated
467    (e.g. "1w 2d 3h"). The function is case-insensitive.
468
469    Examples:
470      "1w2d3h4m5s" -> "195:04:05"
471      "90m"         -> "1:30:00"
472      ""            -> "0:00:00"
473
474    Args:
475        timestr: Input duration string in wdhms format.
476
477    Returns:
478        Converted time as a string in (H)HH:MM:SS.
479
480    Raises:
481        QQError: If the string contains invalid characters or does not
482                 conform to the token pattern (excluding empty/whitespace,
483                 which is treated as zero).
484    """
485    # treat empty / whitespace-only as zero
486    if timestr.strip() == "":
487        return "0:00:00"
488
489    # validation
490    full_pattern = re.compile(r"^\s*(?:\d+\s*[wdhms]\s*)+$", re.IGNORECASE)
491    if not full_pattern.fullmatch(timestr):
492        raise QQError(f"Invalid time string '{timestr}'.")
493
494    # extract tokens
495    token_pattern = re.compile(r"(\d+)\s*([wdhms])", re.IGNORECASE)
496    matches = token_pattern.findall(timestr)
497
498    weeks = days = hours = minutes = seconds = 0
499
500    for value_str, unit in matches:
501        value = int(value_str)
502        unit = unit.lower()
503        if unit == "w":
504            weeks += value
505        elif unit == "d":
506            days += value
507        elif unit == "h":
508            hours += value
509        elif unit == "m":
510            minutes += value
511        elif unit == "s":
512            seconds += value
513
514    total_seconds = (
515        weeks * 7 * 24 * 3600 + days * 24 * 3600 + hours * 3600 + minutes * 60 + seconds
516    )
517
518    h, remainder = divmod(total_seconds, 3600)
519    m, s = divmod(remainder, 60)
520
521    return f"{h}:{m:02}:{s:02}"
522
523
524def hhmmss_to_wdhms(timestr: str) -> str:
525    """
526    Convert a time specification in (H)HH:MM:SS format into the compact wdhms format.
527
528    The output format expresses the duration as a sequence of one or more integer + unit tokens:
529      w = weeks, d = days, h = hours, m = minutes, s = seconds
530
531    Units that are zero are omitted, except that "0s" is returned if the total duration is zero.
532
533    Examples:
534        "195:04:05" -> "1w2d3h4m5s"
535        "1:30:00"   -> "1h30m"
536        "0:00:00"   -> "0s"
537        "49:00:00"  -> "2d1h"
538
539    Args:
540        timestr (str): Input time string in (H)HH:MM:SS format.
541
542    Returns:
543        str: Time duration converted into the compact wdhms format.
544
545    Raises:
546        QQError: If the input string is malformed or does not conform
547                 to the expected (H)HH:MM:SS pattern.
548    """
549    pattern = re.compile(r"^\s*(\d+):([0-5]?\d):([0-5]?\d)\s*$")
550    match = pattern.fullmatch(timestr)
551    if not match:
552        raise QQError(f"Invalid HH:MM:SS time string '{timestr}'.")
553
554    hours, minutes, seconds = map(int, match.groups())
555    total_seconds = hours * 3600 + minutes * 60 + seconds
556
557    if total_seconds == 0:
558        return "0s"
559
560    weeks, remainder = divmod(total_seconds, 7 * 24 * 3600)
561    days, remainder = divmod(remainder, 24 * 3600)
562    hours, remainder = divmod(remainder, 3600)
563    minutes, seconds = divmod(remainder, 60)
564
565    parts = []
566    if weeks:
567        parts.append(f"{weeks}w")
568    if days:
569        parts.append(f"{days}d")
570    if hours:
571        parts.append(f"{hours}h")
572    if minutes:
573        parts.append(f"{minutes}m")
574    if seconds:
575        parts.append(f"{seconds}s")
576
577    return "".join(parts)
578
579
580def printf_to_regex(pattern: str) -> str:
581    """
582    Convert a simple printf-style pattern to an equivalent regular expression pattern.
583
584    Args:
585        pattern (str): A printf-style pattern (e.g., "md%04d", "file%03d_part%02d").
586
587    Returns:
588        str: A string representing the equivalent regex pattern.
589    """
590    regex = re.escape(pattern)
591    regex = re.sub(r"%0(\d+)d", r"\\d{\1}", regex)  # double backslash
592    return re.sub(r"%d", r"\\d+", regex)
593
594
595def is_printf_pattern(pattern: str) -> bool:
596    """
597    Detect whether a string pattern uses printf-style numeric placeholders.
598
599    Args:
600        pattern (str): The pattern string to check.
601
602    Returns:
603        bool: True if the pattern contains printf-style placeholders, False otherwise.
604    """
605    return bool(re.search(r"%0?\d*d", pattern))
606
607
608def split_files_list(string: str | None) -> list[Path]:
609    """
610    Split a string containing multiple file paths into a list of relative Path objects.
611
612    The string can contain file paths separated by colons (:), commas (,), or
613    any whitespace characters (space, tab, newline).
614
615    Args:
616        string (str | None): The string containing file paths. If None or empty,
617                             an empty list is returned.
618
619    Returns:
620        list[Path]: A list of Path objects corresponding to the individual
621                    relative file paths in the input string.
622    """
623    if not string:
624        return []
625
626    return [Path(f) for f in re.split(r"[:,\s]+", string)]
627
628
629def to_snake_case(s: str) -> str:
630    """
631    Convert a string from PascalCase or kebab-case to snake_case.
632
633    Args:
634        s (str): Input string in PascalCase or kebab-case.
635
636    Returns:
637        str: Converted string in snake_case.
638    """
639    # replace hyphens with underscores
640    s = s.replace("-", "_")
641
642    # convert PascalCase to snake_case
643    return re.sub(r"(?<!^)(?=[A-Z])", "_", s).lower()
644
645
646def get_panel_width(
647    console: Console, factor: int, min_width: int | None, max_width: int | None
648):
649    """
650    Calculate the width of a panel relative to the console width, constrained by
651    optional minimum and maximum width values.
652
653    Args:
654        console (Console): A rich Console-like object that provides terminal size.
655        factor (int): A divisor used to scale down the terminal width.
656        min_width (int): The minimum allowable panel width. If None, no lower bound is applied.
657        max_width (int): The maximum allowable panel width. If None, no upper bound is applied.
658
659    Returns:
660        int: The computed panel width after applying scaling and bounds.
661    """
662
663    term_width = console.size.width
664    panel_width = term_width // factor
665    if min_width is not None:
666        panel_width = max(panel_width, min_width)
667    if max_width is not None:
668        panel_width = min(panel_width, max_width)
669
670    return panel_width
671
672
673def construct_loop_job_name(script_name: str, cycle: int) -> str:
674    """
675    Construct a job name for a loop job.
676
677    Args:
678        script_name (str): Filename of the submitted script.
679        cycle (int): The current cycle of the loop job.
680
681    Returns:
682        str: The name of the loop job in the current cycle.
683    """
684    try:
685        # if the script has an extension, put the cycle number BEFORE the extension
686        stem, suffix = script_name.split(".", maxsplit=1)
687        return f"{stem}{CFG.loop_jobs.pattern % cycle}.{suffix}"
688    except ValueError:
689        # if the script has no extension, add the cycle number after the full name
690        return f"{script_name}{CFG.loop_jobs.pattern % cycle}"
691
692
693def construct_info_file_path(input_dir: Path, job_name: str) -> Path:
694    """
695    Construct the absolute path to a job's qq info file.
696
697    Args:
698        input_dir (Path): The directory containing the job script.
699        job_name (str): The name of the job.
700
701    Returns:
702        Path: The absolute path to the job's qq info file.
703    """
704    return logical_resolve((input_dir / job_name).with_suffix(CFG.suffixes.qq_info))
705
706
707def available_work_dirs() -> str:
708    """
709    Return the supported work-directory types for the detected batch system.
710
711    The batch system is determined using the `QQ_BATCH_SYSTEM` environment
712    variable or by automatic detection. The supported work-directory types are
713    returned as a comma-separated string formatted for display in help text.
714
715    Returns:
716        str: A comma-separated list of supported work directory types, each
717        wrapped in quotes.
718    """
719    from qq_lib.batch.interface import BatchInterface
720
721    try:
722        BatchSystem = BatchInterface.from_env_var_or_guess()
723        work_dirs = BatchSystem.get_supported_work_dir_types()
724        return ", ".join([f"'{work_dir_type}'" for work_dir_type in work_dirs])
725    except QQError:
726        return "??? (no batch system detected)"
727
728
729def available_job_types() -> str:
730    """
731    Return the supported job types.
732
733    Returns:
734        str: A comma-separated list of supported job types, each wrapped in quotes.
735    """
736    from qq_lib.properties.job_type import JobType
737
738    return ", ".join([f"'{str(job_type)}'" for job_type in JobType])
739
740
741def translate_server(raw: str) -> str:
742    """
743    Translate a batch server shortcut to its full name.
744    If the shortcut is not recognized, the original value is returned unchanged.
745
746    Returns:
747        str: Full name the the batch server.
748    """
749    return CFG.batch_servers_options.known_servers.get(raw, raw)
750
751
752def default_resubmit_from_hosts() -> str:
753    """
754    Returns the default resubmission hosts as a string.
755
756    Returns:
757            str: A comma-separated list of the default resubmission hosts.
758    """
759    from qq_lib.batch.interface import BatchInterface
760
761    try:
762        return CFG.resubmitter.default_resubmit_hosts or ",".join(
763            x.to_str()
764            for x in BatchInterface.from_env_var_or_guess().get_default_resubmit_hosts()
765        )
766    # if no batch system is available
767    except QQError:
768        return "??? (no batch system detected)"
logger = <Logger qq_lib.core.common (INFO)>
@lru_cache(maxsize=1)
def load_yaml_dumper() -> type[yaml.dumper.Dumper]:
32@lru_cache(maxsize=1)
33def load_yaml_dumper() -> type[yaml.Dumper]:
34    """Return the fastest available YAML dumper (CDumper if possible)."""
35    try:
36        from yaml import CDumper as Dumper
37
38        logger.debug("Loaded YAML CDumper.")
39    except ImportError:
40        from yaml import Dumper
41
42        logger.debug("Loaded default YAML dumper.")
43    return Dumper

Return the fastest available YAML dumper (CDumper if possible).

@lru_cache(maxsize=1)
def load_yaml_loader() -> type[yaml.loader.SafeLoader]:
46@lru_cache(maxsize=1)
47def load_yaml_loader() -> type[yaml.SafeLoader]:
48    """Return the fastest available safe YAML loader (CSafeLoader if possible)."""
49    try:
50        from yaml import (
51            CSafeLoader as SafeLoader,
52        )
53
54        logger.debug("Loaded YAML CLoader.")
55    except ImportError:
56        from yaml import SafeLoader
57
58        logger.debug("Loaded default YAML loader.")
59
60    return SafeLoader

Return the fastest available safe YAML loader (CSafeLoader if possible).

def get_files_with_suffix(directory: pathlib._local.Path, suffix: str) -> list[pathlib._local.Path]:
63def get_files_with_suffix(directory: Path, suffix: str) -> list[Path]:
64    """
65    Retrieve all files in a directory that have the specified file suffix.
66
67    Args:
68        directory (Path): The directory to search in.
69        suffix (str): The file suffix to match (including the dot, e.g., '.txt').
70
71    Returns:
72        list[Path]: A list of Path objects representing files with the given suffix.
73    """
74    files = []
75    for file in directory.iterdir():
76        if file.is_file() and file.suffix == suffix:
77            files.append(file)
78
79    return files

Retrieve all files in a directory that have the specified file suffix.

Arguments:
  • directory (Path): The directory to search in.
  • suffix (str): The file suffix to match (including the dot, e.g., '.txt').
Returns:

list[Path]: A list of Path objects representing files with the given suffix.

def get_runtime_files(directory: pathlib._local.Path) -> list[pathlib._local.Path]:
82def get_runtime_files(directory: Path) -> list[Path]:
83    """
84    Retrieve all qq runtime files in a directory.
85
86    Args:
87        directory (Path): The directory to search in.
88
89    Returns:
90        list[Path]: A list of Path objects representing qq runtime files.
91    """
92    files = []
93    for suffix in CFG.suffixes.all_suffixes:
94        files.extend(get_files_with_suffix(directory, suffix))
95
96    return files

Retrieve all qq runtime files in a directory.

Arguments:
  • directory (Path): The directory to search in.
Returns:

list[Path]: A list of Path objects representing qq runtime files.

def get_info_file(directory: pathlib._local.Path) -> pathlib._local.Path:
 99def get_info_file(directory: Path) -> Path:
100    """
101    Locate the qq job info file in a directory.
102
103    This function searches for files with suffix `CFG.suffixes.qq_info` in the
104    provided directory. It raises an error if none or multiple info files are found.
105
106    Args:
107        directory (Path): The directory to search in.
108
109    Returns:
110        Path: The Path object of the detected qq job info file.
111
112    Raises:
113        QQError: If no info file is found or multiple info files are detected.
114    """
115    info_files = get_info_files(directory)
116    if len(info_files) == 0:
117        raise QQError("No qq job info file found.")
118    if len(info_files) > 1:
119        raise QQError("Multiple qq job info files found.")
120
121    return info_files[0]

Locate the qq job info file in a directory.

This function searches for files with suffix CFG.suffixes.qq_info in the provided directory. It raises an error if none or multiple info files are found.

Arguments:
  • directory (Path): The directory to search in.
Returns:

Path: The Path object of the detected qq job info file.

Raises:
  • QQError: If no info file is found or multiple info files are detected.
def get_info_files(directory: pathlib._local.Path) -> list[pathlib._local.Path]:
124def get_info_files(directory: Path) -> list[Path]:
125    """
126    Retrieve all qq job info files in a directory.
127
128    This function searches for files with suffix `CFG.suffixes.qq_info` in the
129    provided directory. The files are sorted by their last modification time
130    (with the newest modified file being last in the list).
131
132    Args:
133        directory (Path): The directory to search in.
134
135    Returns:
136        list[Path]: A list of Path objects representing the detected qq job info files.
137    """
138    info_files = get_files_with_suffix(directory, CFG.suffixes.qq_info)
139    logger.debug(f"Detected the following qq info files: {info_files}.")
140
141    return sorted(info_files, key=lambda f: f.stat().st_mtime)

Retrieve all qq job info files in a directory.

This function searches for files with suffix CFG.suffixes.qq_info in the provided directory. The files are sorted by their last modification time (with the newest modified file being last in the list).

Arguments:
  • directory (Path): The directory to search in.
Returns:

list[Path]: A list of Path objects representing the detected qq job info files.

def get_info_file_from_job_id(job_id: str) -> pathlib._local.Path:
144def get_info_file_from_job_id(job_id: str) -> Path:
145    """
146    Get path to the qq info file corresponding to a job with the given ID.
147    The batch system to use is obtained from the environment variable or guessed.
148
149    Args:
150        job_id (str): The ID of the job for which to retrieve the info file.
151
152    Returns:
153        Path: Absolute path to the qq job info file.
154
155    Raises:
156        QQError: If the batch system could not be guessed,
157        the job does not exist or is not a qq job.
158    """
159
160    from qq_lib.batch.interface import BatchInterface, BatchJobInterface
161
162    BatchSystem = BatchInterface.from_env_var_or_guess()
163    job_info: BatchJobInterface = BatchSystem.get_batch_job(job_id)
164
165    if job_info.is_empty():
166        raise QQError(f"Job '{job_id}' does not exist.")
167
168    if not (path := job_info.get_info_file()):
169        raise QQError(f"Job '{job_id}' is not a valid qq job.")
170
171    return path

Get path to the qq info file corresponding to a job with the given ID. The batch system to use is obtained from the environment variable or guessed.

Arguments:
  • job_id (str): The ID of the job for which to retrieve the info file.
Returns:

Path: Absolute path to the qq job info file.

Raises:
  • QQError: If the batch system could not be guessed,
  • the job does not exist or is not a qq job.
def get_info_files_from_job_id_or_dir(job_id: str | None) -> list[pathlib._local.Path]:
174def get_info_files_from_job_id_or_dir(job_id: str | None) -> list[Path]:
175    """
176    Retrieve qq job info files based on a job ID or from the current directory.
177
178    Args:
179        job_id (str | None): The ID of the qq job to retrieve the info file for.
180            If None, the function searches for qq job info files in the current directory.
181
182    Returns:
183        list[Path]: A list containing the qq job info file(s). If a job ID is provided,
184            the list contains a single Path. If not, it contains all detected info files
185            in the current directory.
186
187    Raises:
188        QQError: If the info file corresponding to the given job ID does not exist
189            or is not reachable, or if no qq job info file is found in the current
190            directory when no job ID is provided.
191    """
192    if job_id:
193        info_file = get_info_file_from_job_id(job_id)
194        # check that the detected info file exists and we have permissions to read it
195        try:
196            missing = not info_file.is_file()
197        except PermissionError:
198            missing = True
199
200        if missing:
201            raise QQError(
202                f"Info file for job '{job_id}' does not exist or is not reachable."
203            )
204
205        return [info_file]
206
207    # get info files from the directory
208    info_files = get_info_files(Path())
209    if not info_files:
210        raise QQError("No qq job info file found.")
211
212    return info_files

Retrieve qq job info files based on a job ID or from the current directory.

Arguments:
  • job_id (str | None): The ID of the qq job to retrieve the info file for. If None, the function searches for qq job info files in the current directory.
Returns:

list[Path]: A list containing the qq job info file(s). If a job ID is provided, the list contains a single Path. If not, it contains all detected info files in the current directory.

Raises:
  • QQError: If the info file corresponding to the given job ID does not exist or is not reachable, or if no qq job info file is found in the current directory when no job ID is provided.
def yes_or_no_prompt(prompt: str) -> bool:
215def yes_or_no_prompt(prompt: str) -> bool:
216    """
217    Display an interactive yes/no prompt to the user and return the selection.
218
219    The prompt highlights the pressed key ('y' in green for yes, 'N' in red for no)
220    and defaults to 'No' if the user presses any key other than 'y'.
221
222    Args:
223        prompt (str): The text to display as the question.
224
225    Returns:
226        bool: True if the user selects 'yes' (presses 'y'), False otherwise.
227    """
228    prompt = f"   {prompt} "
229    text = (
230        Text("PROMPT", style="magenta")
231        + Text(prompt, style="default")
232        + Text("[y/N]", style="bold default")
233    )
234
235    with Live(text, refresh_per_second=1) as live:
236        key = readchar.readkey().lower()
237
238        # highlight the pressed key
239        if key == "y":
240            choice = (
241                Text("[", style="bold default")
242                + Text("y", style="bold green")
243                + Text("/N]", style="bold default")
244            )
245        else:
246            choice = (
247                Text("[y/", style="bold default")
248                + Text("N", style="bold red")
249                + Text("]", style="bold default")
250            )
251
252        live.update(
253            Text("PROMPT", style="magenta") + Text(prompt, style="default") + choice
254        )
255
256    return key == "y"

Display an interactive yes/no prompt to the user and return the selection.

The prompt highlights the pressed key ('y' in green for yes, 'N' in red for no) and defaults to 'No' if the user presses any key other than 'y'.

Arguments:
  • prompt (str): The text to display as the question.
Returns:

bool: True if the user selects 'yes' (presses 'y'), False otherwise.

def format_duration(td: datetime.timedelta) -> str:
259def format_duration(td: timedelta) -> str:
260    """
261    Convert a timedelta into a human-readable string showing only relevant units.
262
263    The output string includes weeks, days, hours, minutes, and seconds, but omits
264    units that are zero.
265
266    Args:
267        td (timedelta): The duration to format.
268
269    Returns:
270        str: A formatted string representing the duration, e.g., '1d 2h 3m 4s'.
271    """
272    total_seconds = int(td.total_seconds())
273
274    days_total, remainder = divmod(total_seconds, 86400)
275    weeks, days = divmod(days_total, 7)
276    hours, remainder = divmod(remainder, 3600)
277    minutes, seconds = divmod(remainder, 60)
278
279    parts = []
280    if weeks > 0:
281        parts.append(f"{weeks}w")
282    if days > 0:
283        parts.append(f"{days}d")
284    if hours > 0:
285        parts.append(f"{hours}h")
286    if minutes > 0:
287        parts.append(f"{minutes}m")
288    if seconds > 0 or total_seconds == 0:
289        parts.append(f"{seconds}s")
290
291    return " ".join(parts)

Convert a timedelta into a human-readable string showing only relevant units.

The output string includes weeks, days, hours, minutes, and seconds, but omits units that are zero.

Arguments:
  • td (timedelta): The duration to format.
Returns:

str: A formatted string representing the duration, e.g., '1d 2h 3m 4s'.

def format_duration_wdhhmmss(td: datetime.timedelta) -> str:
294def format_duration_wdhhmmss(td: timedelta) -> str:
295    """
296    Format a timedelta into a human-readable string: Xw Yd HH:MM:SS.
297
298    Weeks and days are included only if non-zero.
299    Hours, minutes, and seconds are always displayed with zero-padding.
300
301    Examples:
302        0:00:45         -> "00:00:45"
303        1 day, 2:03:04  -> "1d 02:03:04"
304        10 days, 5:06:07 -> "1w 3d 05:06:07"
305
306    Args:
307        td (timedelta): The duration to format.
308
309    Returns:
310        str: Formatted string in "Xw Yd HH:MM:SS" format.
311    """
312    total_seconds = int(td.total_seconds())
313
314    weeks, remainder = divmod(total_seconds, 7 * 24 * 3600)
315    days, remainder = divmod(remainder, 24 * 3600)
316    hours, remainder = divmod(remainder, 3600)
317    minutes, seconds = divmod(remainder, 60)
318
319    parts = []
320    if weeks > 0:
321        parts.append(f"{weeks}w")
322    if days > 0:
323        parts.append(f"{days}d")
324
325    parts.append(f"{hours:02}:{minutes:02}:{seconds:02}")
326
327    return " ".join(parts)

Format a timedelta into a human-readable string: Xw Yd HH:MM:SS.

Weeks and days are included only if non-zero. Hours, minutes, and seconds are always displayed with zero-padding.

Examples:

0:00:45 -> "00:00:45" 1 day, 2:03:04 -> "1d 02:03:04" 10 days, 5:06:07 -> "1w 3d 05:06:07"

Arguments:
  • td (timedelta): The duration to format.
Returns:

str: Formatted string in "Xw Yd HH:MM:SS" format.

def hhmmss_to_duration(timestr: str) -> datetime.timedelta:
330def hhmmss_to_duration(timestr: str) -> timedelta:
331    """
332    Convert a time string in HH:MM:SS (or HHH:MM:SS) format to a timedelta object.
333
334    Examples:
335        "0:00:00"   -> 0 seconds
336        "1:23:45"   -> 1 hour, 23 minutes, 45 seconds
337        "100:00:00" -> 100 hours
338
339    Args:
340        timestr (str): Input string in HH:MM:SS format.
341
342    Returns:
343        timedelta: The corresponding duration.
344
345    Raises:
346        QQError: If the input string is not in a valid HH:MM:SS format or contains
347                    invalid numeric values.
348    """
349    pattern = re.compile(r"^\s*(\d+):([0-5]?\d):([0-5]?\d)\s*$")
350    match = pattern.fullmatch(timestr)
351    if not match:
352        raise QQError(f"Invalid HH:MM:SS time string '{timestr}'.")
353
354    hours, minutes, seconds = map(int, match.groups())
355
356    return timedelta(hours=hours, minutes=minutes, seconds=seconds)

Convert a time string in HH:MM:SS (or HHH:MM:SS) format to a timedelta object.

Examples:

"0:00:00" -> 0 seconds "1:23:45" -> 1 hour, 23 minutes, 45 seconds "100:00:00" -> 100 hours

Arguments:
  • timestr (str): Input string in HH:MM:SS format.
Returns:

timedelta: The corresponding duration.

Raises:
  • QQError: If the input string is not in a valid HH:MM:SS format or contains invalid numeric values.
def dhhmmss_to_duration(timestr: str) -> datetime.timedelta:
359def dhhmmss_to_duration(timestr: str) -> timedelta:
360    """
361    Convert a time string in optional D-HH:MM:SS (or HH:MM:SS / HHH:MM:SS) format to a timedelta object.
362
363    Examples:
364        "0:00:00"      -> 0 seconds
365        "1:23:45"      -> 1 hour, 23 minutes, 45 seconds
366        "100:00:00"    -> 100 hours
367        "2-12:34:56"   -> 2 days, 12 hours, 34 minutes, 56 seconds
368
369    Args:
370        timestr (str): Input string in optional D-HH:MM:SS format.
371
372    Returns:
373        timedelta: The corresponding duration.
374
375    Raises:
376        QQError: If the input string is not in a valid format or contains invalid numeric values.
377    """
378    pattern = re.compile(r"^\s*(?:(\d+)-)?(\d+):([0-5]?\d):([0-5]?\d)\s*$")
379    match = pattern.fullmatch(timestr)
380    if not match:
381        raise QQError(f"Invalid D-HH:MM:SS time string '{timestr}'.")
382
383    days_str, hours_str, minutes_str, seconds_str = match.groups()
384    days = int(days_str) if days_str else 0
385    hours = int(hours_str)
386    minutes = int(minutes_str)
387    seconds = int(seconds_str)
388
389    return timedelta(days=days, hours=hours, minutes=minutes, seconds=seconds)

Convert a time string in optional D-HH:MM:SS (or HH:MM:SS / HHH:MM:SS) format to a timedelta object.

Examples:

"0:00:00" -> 0 seconds "1:23:45" -> 1 hour, 23 minutes, 45 seconds "100:00:00" -> 100 hours "2-12:34:56" -> 2 days, 12 hours, 34 minutes, 56 seconds

Arguments:
  • timestr (str): Input string in optional D-HH:MM:SS format.
Returns:

timedelta: The corresponding duration.

Raises:
  • QQError: If the input string is not in a valid format or contains invalid numeric values.
def normalize(s: str) -> str:
392def normalize(s: str) -> str:
393    """
394    Normalize a string for consistent comparison.
395
396    The string is converted to lowercase and all hyphens and underscores are removed.
397
398    Args:
399        s (str): The input string to normalize.
400
401    Returns:
402        str: The normalized string.
403    """
404    return s.lower().replace("-", "").replace("_", "")

Normalize a string for consistent comparison.

The string is converted to lowercase and all hyphens and underscores are removed.

Arguments:
  • s (str): The input string to normalize.
Returns:

str: The normalized string.

def equals_normalized(a: str, b: str) -> bool:
407def equals_normalized(a: str, b: str) -> bool:
408    """
409    Compare two strings for equality, ignoring case, hyphens, and underscores.
410
411    Args:
412        a (str): First string to compare.
413        b (str): Second string to compare.
414
415    Returns:
416        bool: True if the normalized strings are equal, False otherwise.
417    """
418
419    return normalize(a) == normalize(b)

Compare two strings for equality, ignoring case, hyphens, and underscores.

Arguments:
  • a (str): First string to compare.
  • b (str): Second string to compare.
Returns:

bool: True if the normalized strings are equal, False otherwise.

def convert_absolute_to_relative( files: list[pathlib._local.Path], target: pathlib._local.Path) -> list[pathlib._local.Path]:
422def convert_absolute_to_relative(files: list[Path], target: Path) -> list[Path]:
423    """
424    Convert a list of absolute paths into paths relative to a target directory.
425
426    Each file in `files` must be located inside `target` or one of its
427    subdirectories. If any file is outside `target`, a `QQError` is raised.
428
429    This function works even for remote files or paths to non-existent files.
430
431    Args:
432        files (list[Path]): A list of absolute file paths to convert.
433        target (Path): The target directory against which paths are made relative.
434
435    Returns:
436        list[Path]: A list of paths relative to `target`.
437
438    Raises:
439        QQError: If any file in `files` is not located within `target`.
440    """
441    relative = []
442    target_parts = target.parts
443
444    for file in files:
445        file_parts = file.parts
446
447        # file must starts with the target path
448        if file_parts[: len(target_parts)] != target_parts:
449            raise QQError(f"Item '{file}' is not in target directory '{target}'.")
450
451        # create a relative path
452        rel_path = Path(*file_parts[len(target_parts) :])
453        relative.append(rel_path)
454
455    logger.debug(f"Converted paths: {relative}.")
456    return relative

Convert a list of absolute paths into paths relative to a target directory.

Each file in files must be located inside target or one of its subdirectories. If any file is outside target, a QQError is raised.

This function works even for remote files or paths to non-existent files.

Arguments:
  • files (list[Path]): A list of absolute file paths to convert.
  • target (Path): The target directory against which paths are made relative.
Returns:

list[Path]: A list of paths relative to target.

Raises:
  • QQError: If any file in files is not located within target.
def wdhms_to_hhmmss(timestr: str) -> str:
459def wdhms_to_hhmmss(timestr: str) -> str:
460    """
461    Convert a time specification in the wdhms format into (H)HH:MM:SS.
462
463    The accepted format is a sequence of one or more integer + unit tokens,
464    where unit is one of:
465      w = weeks, d = days, h = hours, m = minutes, s = seconds
466
467    Tokens may be compact (e.g. "1w2d3h") or space-separated
468    (e.g. "1w 2d 3h"). The function is case-insensitive.
469
470    Examples:
471      "1w2d3h4m5s" -> "195:04:05"
472      "90m"         -> "1:30:00"
473      ""            -> "0:00:00"
474
475    Args:
476        timestr: Input duration string in wdhms format.
477
478    Returns:
479        Converted time as a string in (H)HH:MM:SS.
480
481    Raises:
482        QQError: If the string contains invalid characters or does not
483                 conform to the token pattern (excluding empty/whitespace,
484                 which is treated as zero).
485    """
486    # treat empty / whitespace-only as zero
487    if timestr.strip() == "":
488        return "0:00:00"
489
490    # validation
491    full_pattern = re.compile(r"^\s*(?:\d+\s*[wdhms]\s*)+$", re.IGNORECASE)
492    if not full_pattern.fullmatch(timestr):
493        raise QQError(f"Invalid time string '{timestr}'.")
494
495    # extract tokens
496    token_pattern = re.compile(r"(\d+)\s*([wdhms])", re.IGNORECASE)
497    matches = token_pattern.findall(timestr)
498
499    weeks = days = hours = minutes = seconds = 0
500
501    for value_str, unit in matches:
502        value = int(value_str)
503        unit = unit.lower()
504        if unit == "w":
505            weeks += value
506        elif unit == "d":
507            days += value
508        elif unit == "h":
509            hours += value
510        elif unit == "m":
511            minutes += value
512        elif unit == "s":
513            seconds += value
514
515    total_seconds = (
516        weeks * 7 * 24 * 3600 + days * 24 * 3600 + hours * 3600 + minutes * 60 + seconds
517    )
518
519    h, remainder = divmod(total_seconds, 3600)
520    m, s = divmod(remainder, 60)
521
522    return f"{h}:{m:02}:{s:02}"

Convert a time specification in the wdhms format into (H)HH:MM:SS.

The accepted format is a sequence of one or more integer + unit tokens, where unit is one of: w = weeks, d = days, h = hours, m = minutes, s = seconds

Tokens may be compact (e.g. "1w2d3h") or space-separated (e.g. "1w 2d 3h"). The function is case-insensitive.

Examples:

"1w2d3h4m5s" -> "195:04:05" "90m" -> "1:30:00" "" -> "0:00:00"

Arguments:
  • timestr: Input duration string in wdhms format.
Returns:

Converted time as a string in (H)HH:MM:SS.

Raises:
  • QQError: If the string contains invalid characters or does not conform to the token pattern (excluding empty/whitespace, which is treated as zero).
def hhmmss_to_wdhms(timestr: str) -> str:
525def hhmmss_to_wdhms(timestr: str) -> str:
526    """
527    Convert a time specification in (H)HH:MM:SS format into the compact wdhms format.
528
529    The output format expresses the duration as a sequence of one or more integer + unit tokens:
530      w = weeks, d = days, h = hours, m = minutes, s = seconds
531
532    Units that are zero are omitted, except that "0s" is returned if the total duration is zero.
533
534    Examples:
535        "195:04:05" -> "1w2d3h4m5s"
536        "1:30:00"   -> "1h30m"
537        "0:00:00"   -> "0s"
538        "49:00:00"  -> "2d1h"
539
540    Args:
541        timestr (str): Input time string in (H)HH:MM:SS format.
542
543    Returns:
544        str: Time duration converted into the compact wdhms format.
545
546    Raises:
547        QQError: If the input string is malformed or does not conform
548                 to the expected (H)HH:MM:SS pattern.
549    """
550    pattern = re.compile(r"^\s*(\d+):([0-5]?\d):([0-5]?\d)\s*$")
551    match = pattern.fullmatch(timestr)
552    if not match:
553        raise QQError(f"Invalid HH:MM:SS time string '{timestr}'.")
554
555    hours, minutes, seconds = map(int, match.groups())
556    total_seconds = hours * 3600 + minutes * 60 + seconds
557
558    if total_seconds == 0:
559        return "0s"
560
561    weeks, remainder = divmod(total_seconds, 7 * 24 * 3600)
562    days, remainder = divmod(remainder, 24 * 3600)
563    hours, remainder = divmod(remainder, 3600)
564    minutes, seconds = divmod(remainder, 60)
565
566    parts = []
567    if weeks:
568        parts.append(f"{weeks}w")
569    if days:
570        parts.append(f"{days}d")
571    if hours:
572        parts.append(f"{hours}h")
573    if minutes:
574        parts.append(f"{minutes}m")
575    if seconds:
576        parts.append(f"{seconds}s")
577
578    return "".join(parts)

Convert a time specification in (H)HH:MM:SS format into the compact wdhms format.

The output format expresses the duration as a sequence of one or more integer + unit tokens: w = weeks, d = days, h = hours, m = minutes, s = seconds

Units that are zero are omitted, except that "0s" is returned if the total duration is zero.

Examples:

"195:04:05" -> "1w2d3h4m5s" "1:30:00" -> "1h30m" "0:00:00" -> "0s" "49:00:00" -> "2d1h"

Arguments:
  • timestr (str): Input time string in (H)HH:MM:SS format.
Returns:

str: Time duration converted into the compact wdhms format.

Raises:
  • QQError: If the input string is malformed or does not conform to the expected (H)HH:MM:SS pattern.
def printf_to_regex(pattern: str) -> str:
581def printf_to_regex(pattern: str) -> str:
582    """
583    Convert a simple printf-style pattern to an equivalent regular expression pattern.
584
585    Args:
586        pattern (str): A printf-style pattern (e.g., "md%04d", "file%03d_part%02d").
587
588    Returns:
589        str: A string representing the equivalent regex pattern.
590    """
591    regex = re.escape(pattern)
592    regex = re.sub(r"%0(\d+)d", r"\\d{\1}", regex)  # double backslash
593    return re.sub(r"%d", r"\\d+", regex)

Convert a simple printf-style pattern to an equivalent regular expression pattern.

Arguments:
  • pattern (str): A printf-style pattern (e.g., "md%04d", "file%03d_part%02d").
Returns:

str: A string representing the equivalent regex pattern.

def is_printf_pattern(pattern: str) -> bool:
596def is_printf_pattern(pattern: str) -> bool:
597    """
598    Detect whether a string pattern uses printf-style numeric placeholders.
599
600    Args:
601        pattern (str): The pattern string to check.
602
603    Returns:
604        bool: True if the pattern contains printf-style placeholders, False otherwise.
605    """
606    return bool(re.search(r"%0?\d*d", pattern))

Detect whether a string pattern uses printf-style numeric placeholders.

Arguments:
  • pattern (str): The pattern string to check.
Returns:

bool: True if the pattern contains printf-style placeholders, False otherwise.

def split_files_list(string: str | None) -> list[pathlib._local.Path]:
609def split_files_list(string: str | None) -> list[Path]:
610    """
611    Split a string containing multiple file paths into a list of relative Path objects.
612
613    The string can contain file paths separated by colons (:), commas (,), or
614    any whitespace characters (space, tab, newline).
615
616    Args:
617        string (str | None): The string containing file paths. If None or empty,
618                             an empty list is returned.
619
620    Returns:
621        list[Path]: A list of Path objects corresponding to the individual
622                    relative file paths in the input string.
623    """
624    if not string:
625        return []
626
627    return [Path(f) for f in re.split(r"[:,\s]+", string)]

Split a string containing multiple file paths into a list of relative Path objects.

The string can contain file paths separated by colons (:), commas (,), or any whitespace characters (space, tab, newline).

Arguments:
  • string (str | None): The string containing file paths. If None or empty, an empty list is returned.
Returns:

list[Path]: A list of Path objects corresponding to the individual relative file paths in the input string.

def to_snake_case(s: str) -> str:
630def to_snake_case(s: str) -> str:
631    """
632    Convert a string from PascalCase or kebab-case to snake_case.
633
634    Args:
635        s (str): Input string in PascalCase or kebab-case.
636
637    Returns:
638        str: Converted string in snake_case.
639    """
640    # replace hyphens with underscores
641    s = s.replace("-", "_")
642
643    # convert PascalCase to snake_case
644    return re.sub(r"(?<!^)(?=[A-Z])", "_", s).lower()

Convert a string from PascalCase or kebab-case to snake_case.

Arguments:
  • s (str): Input string in PascalCase or kebab-case.
Returns:

str: Converted string in snake_case.

def get_panel_width( console: rich.console.Console, factor: int, min_width: int | None, max_width: int | None):
647def get_panel_width(
648    console: Console, factor: int, min_width: int | None, max_width: int | None
649):
650    """
651    Calculate the width of a panel relative to the console width, constrained by
652    optional minimum and maximum width values.
653
654    Args:
655        console (Console): A rich Console-like object that provides terminal size.
656        factor (int): A divisor used to scale down the terminal width.
657        min_width (int): The minimum allowable panel width. If None, no lower bound is applied.
658        max_width (int): The maximum allowable panel width. If None, no upper bound is applied.
659
660    Returns:
661        int: The computed panel width after applying scaling and bounds.
662    """
663
664    term_width = console.size.width
665    panel_width = term_width // factor
666    if min_width is not None:
667        panel_width = max(panel_width, min_width)
668    if max_width is not None:
669        panel_width = min(panel_width, max_width)
670
671    return panel_width

Calculate the width of a panel relative to the console width, constrained by optional minimum and maximum width values.

Arguments:
  • console (Console): A rich Console-like object that provides terminal size.
  • factor (int): A divisor used to scale down the terminal width.
  • min_width (int): The minimum allowable panel width. If None, no lower bound is applied.
  • max_width (int): The maximum allowable panel width. If None, no upper bound is applied.
Returns:

int: The computed panel width after applying scaling and bounds.

def construct_loop_job_name(script_name: str, cycle: int) -> str:
674def construct_loop_job_name(script_name: str, cycle: int) -> str:
675    """
676    Construct a job name for a loop job.
677
678    Args:
679        script_name (str): Filename of the submitted script.
680        cycle (int): The current cycle of the loop job.
681
682    Returns:
683        str: The name of the loop job in the current cycle.
684    """
685    try:
686        # if the script has an extension, put the cycle number BEFORE the extension
687        stem, suffix = script_name.split(".", maxsplit=1)
688        return f"{stem}{CFG.loop_jobs.pattern % cycle}.{suffix}"
689    except ValueError:
690        # if the script has no extension, add the cycle number after the full name
691        return f"{script_name}{CFG.loop_jobs.pattern % cycle}"

Construct a job name for a loop job.

Arguments:
  • script_name (str): Filename of the submitted script.
  • cycle (int): The current cycle of the loop job.
Returns:

str: The name of the loop job in the current cycle.

def construct_info_file_path(input_dir: pathlib._local.Path, job_name: str) -> pathlib._local.Path:
694def construct_info_file_path(input_dir: Path, job_name: str) -> Path:
695    """
696    Construct the absolute path to a job's qq info file.
697
698    Args:
699        input_dir (Path): The directory containing the job script.
700        job_name (str): The name of the job.
701
702    Returns:
703        Path: The absolute path to the job's qq info file.
704    """
705    return logical_resolve((input_dir / job_name).with_suffix(CFG.suffixes.qq_info))

Construct the absolute path to a job's qq info file.

Arguments:
  • input_dir (Path): The directory containing the job script.
  • job_name (str): The name of the job.
Returns:

Path: The absolute path to the job's qq info file.

def available_work_dirs() -> str:
708def available_work_dirs() -> str:
709    """
710    Return the supported work-directory types for the detected batch system.
711
712    The batch system is determined using the `QQ_BATCH_SYSTEM` environment
713    variable or by automatic detection. The supported work-directory types are
714    returned as a comma-separated string formatted for display in help text.
715
716    Returns:
717        str: A comma-separated list of supported work directory types, each
718        wrapped in quotes.
719    """
720    from qq_lib.batch.interface import BatchInterface
721
722    try:
723        BatchSystem = BatchInterface.from_env_var_or_guess()
724        work_dirs = BatchSystem.get_supported_work_dir_types()
725        return ", ".join([f"'{work_dir_type}'" for work_dir_type in work_dirs])
726    except QQError:
727        return "??? (no batch system detected)"

Return the supported work-directory types for the detected batch system.

The batch system is determined using the QQ_BATCH_SYSTEM environment variable or by automatic detection. The supported work-directory types are returned as a comma-separated string formatted for display in help text.

Returns:

str: A comma-separated list of supported work directory types, each wrapped in quotes.

def available_job_types() -> str:
730def available_job_types() -> str:
731    """
732    Return the supported job types.
733
734    Returns:
735        str: A comma-separated list of supported job types, each wrapped in quotes.
736    """
737    from qq_lib.properties.job_type import JobType
738
739    return ", ".join([f"'{str(job_type)}'" for job_type in JobType])

Return the supported job types.

Returns:

str: A comma-separated list of supported job types, each wrapped in quotes.

def translate_server(raw: str) -> str:
742def translate_server(raw: str) -> str:
743    """
744    Translate a batch server shortcut to its full name.
745    If the shortcut is not recognized, the original value is returned unchanged.
746
747    Returns:
748        str: Full name the the batch server.
749    """
750    return CFG.batch_servers_options.known_servers.get(raw, raw)

Translate a batch server shortcut to its full name. If the shortcut is not recognized, the original value is returned unchanged.

Returns:

str: Full name the the batch server.

def default_resubmit_from_hosts() -> str:
753def default_resubmit_from_hosts() -> str:
754    """
755    Returns the default resubmission hosts as a string.
756
757    Returns:
758            str: A comma-separated list of the default resubmission hosts.
759    """
760    from qq_lib.batch.interface import BatchInterface
761
762    try:
763        return CFG.resubmitter.default_resubmit_hosts or ",".join(
764            x.to_str()
765            for x in BatchInterface.from_env_var_or_guess().get_default_resubmit_hosts()
766        )
767    # if no batch system is available
768    except QQError:
769        return "??? (no batch system detected)"

Returns the default resubmission hosts as a string.

Returns:

str: A comma-separated list of the default resubmission hosts.