qq_lib.core.common
General utility functions for the qq library.
This module provides helpers for working with qq job files, time durations, YAML I/O, string normalization, user prompts, path manipulation, and job-name construction.
1# Released under MIT License. 2# Copyright (c) 2025-2026 Ladislav Bartos and Robert Vacha Lab 3 4""" 5General utility functions for the qq library. 6 7This module provides helpers for working with qq job files, time durations, 8YAML I/O, string normalization, user prompts, path manipulation, and job-name construction. 9""" 10 11import re 12from datetime import timedelta 13from functools import lru_cache 14from pathlib import Path 15 16import readchar 17import yaml 18from rich.console import Console 19from rich.live import Live 20from rich.text import Text 21 22from qq_lib.core.logical_paths import logical_resolve 23 24from .config import CFG 25from .error import QQError 26from .logger import get_logger 27 28logger = get_logger(__name__) 29 30 31@lru_cache(maxsize=1) 32def load_yaml_dumper() -> type[yaml.Dumper]: 33 """Return the fastest available YAML dumper (CDumper if possible).""" 34 try: 35 from yaml import CDumper as Dumper 36 37 logger.debug("Loaded YAML CDumper.") 38 except ImportError: 39 from yaml import Dumper 40 41 logger.debug("Loaded default YAML dumper.") 42 return Dumper 43 44 45@lru_cache(maxsize=1) 46def load_yaml_loader() -> type[yaml.SafeLoader]: 47 """Return the fastest available safe YAML loader (CSafeLoader if possible).""" 48 try: 49 from yaml import ( 50 CSafeLoader as SafeLoader, 51 ) 52 53 logger.debug("Loaded YAML CLoader.") 54 except ImportError: 55 from yaml import SafeLoader 56 57 logger.debug("Loaded default YAML loader.") 58 59 return SafeLoader 60 61 62def get_files_with_suffix(directory: Path, suffix: str) -> list[Path]: 63 """ 64 Retrieve all files in a directory that have the specified file suffix. 65 66 Args: 67 directory (Path): The directory to search in. 68 suffix (str): The file suffix to match (including the dot, e.g., '.txt'). 69 70 Returns: 71 list[Path]: A list of Path objects representing files with the given suffix. 72 """ 73 files = [] 74 for file in directory.iterdir(): 75 if file.is_file() and file.suffix == suffix: 76 files.append(file) 77 78 return files 79 80 81def get_runtime_files(directory: Path) -> list[Path]: 82 """ 83 Retrieve all qq runtime files in a directory. 84 85 Args: 86 directory (Path): The directory to search in. 87 88 Returns: 89 list[Path]: A list of Path objects representing qq runtime files. 90 """ 91 files = [] 92 for suffix in CFG.suffixes.all_suffixes: 93 files.extend(get_files_with_suffix(directory, suffix)) 94 95 return files 96 97 98def get_info_file(directory: Path) -> Path: 99 """ 100 Locate the qq job info file in a directory. 101 102 This function searches for files with suffix `CFG.suffixes.qq_info` in the 103 provided directory. It raises an error if none or multiple info files are found. 104 105 Args: 106 directory (Path): The directory to search in. 107 108 Returns: 109 Path: The Path object of the detected qq job info file. 110 111 Raises: 112 QQError: If no info file is found or multiple info files are detected. 113 """ 114 info_files = get_info_files(directory) 115 if len(info_files) == 0: 116 raise QQError("No qq job info file found.") 117 if len(info_files) > 1: 118 raise QQError("Multiple qq job info files found.") 119 120 return info_files[0] 121 122 123def get_info_files(directory: Path) -> list[Path]: 124 """ 125 Retrieve all qq job info files in a directory. 126 127 This function searches for files with suffix `CFG.suffixes.qq_info` in the 128 provided directory. The files are sorted by their last modification time 129 (with the newest modified file being last in the list). 130 131 Args: 132 directory (Path): The directory to search in. 133 134 Returns: 135 list[Path]: A list of Path objects representing the detected qq job info files. 136 """ 137 info_files = get_files_with_suffix(directory, CFG.suffixes.qq_info) 138 logger.debug(f"Detected the following qq info files: {info_files}.") 139 140 return sorted(info_files, key=lambda f: f.stat().st_mtime) 141 142 143def get_info_file_from_job_id(job_id: str) -> Path: 144 """ 145 Get path to the qq info file corresponding to a job with the given ID. 146 The batch system to use is obtained from the environment variable or guessed. 147 148 Args: 149 job_id (str): The ID of the job for which to retrieve the info file. 150 151 Returns: 152 Path: Absolute path to the qq job info file. 153 154 Raises: 155 QQError: If the batch system could not be guessed, 156 the job does not exist or is not a qq job. 157 """ 158 159 from qq_lib.batch.interface import BatchInterface, BatchJobInterface 160 161 BatchSystem = BatchInterface.from_env_var_or_guess() 162 job_info: BatchJobInterface = BatchSystem.get_batch_job(job_id) 163 164 if job_info.is_empty(): 165 raise QQError(f"Job '{job_id}' does not exist.") 166 167 if not (path := job_info.get_info_file()): 168 raise QQError(f"Job '{job_id}' is not a valid qq job.") 169 170 return path 171 172 173def get_info_files_from_job_id_or_dir(job_id: str | None) -> list[Path]: 174 """ 175 Retrieve qq job info files based on a job ID or from the current directory. 176 177 Args: 178 job_id (str | None): The ID of the qq job to retrieve the info file for. 179 If None, the function searches for qq job info files in the current directory. 180 181 Returns: 182 list[Path]: A list containing the qq job info file(s). If a job ID is provided, 183 the list contains a single Path. If not, it contains all detected info files 184 in the current directory. 185 186 Raises: 187 QQError: If the info file corresponding to the given job ID does not exist 188 or is not reachable, or if no qq job info file is found in the current 189 directory when no job ID is provided. 190 """ 191 if job_id: 192 info_file = get_info_file_from_job_id(job_id) 193 # check that the detected info file exists and we have permissions to read it 194 try: 195 missing = not info_file.is_file() 196 except PermissionError: 197 missing = True 198 199 if missing: 200 raise QQError( 201 f"Info file for job '{job_id}' does not exist or is not reachable." 202 ) 203 204 return [info_file] 205 206 # get info files from the directory 207 info_files = get_info_files(Path()) 208 if not info_files: 209 raise QQError("No qq job info file found.") 210 211 return info_files 212 213 214def yes_or_no_prompt(prompt: str) -> bool: 215 """ 216 Display an interactive yes/no prompt to the user and return the selection. 217 218 The prompt highlights the pressed key ('y' in green for yes, 'N' in red for no) 219 and defaults to 'No' if the user presses any key other than 'y'. 220 221 Args: 222 prompt (str): The text to display as the question. 223 224 Returns: 225 bool: True if the user selects 'yes' (presses 'y'), False otherwise. 226 """ 227 prompt = f" {prompt} " 228 text = ( 229 Text("PROMPT", style="magenta") 230 + Text(prompt, style="default") 231 + Text("[y/N]", style="bold default") 232 ) 233 234 with Live(text, refresh_per_second=1) as live: 235 key = readchar.readkey().lower() 236 237 # highlight the pressed key 238 if key == "y": 239 choice = ( 240 Text("[", style="bold default") 241 + Text("y", style="bold green") 242 + Text("/N]", style="bold default") 243 ) 244 else: 245 choice = ( 246 Text("[y/", style="bold default") 247 + Text("N", style="bold red") 248 + Text("]", style="bold default") 249 ) 250 251 live.update( 252 Text("PROMPT", style="magenta") + Text(prompt, style="default") + choice 253 ) 254 255 return key == "y" 256 257 258def format_duration(td: timedelta) -> str: 259 """ 260 Convert a timedelta into a human-readable string showing only relevant units. 261 262 The output string includes weeks, days, hours, minutes, and seconds, but omits 263 units that are zero. 264 265 Args: 266 td (timedelta): The duration to format. 267 268 Returns: 269 str: A formatted string representing the duration, e.g., '1d 2h 3m 4s'. 270 """ 271 total_seconds = int(td.total_seconds()) 272 273 days_total, remainder = divmod(total_seconds, 86400) 274 weeks, days = divmod(days_total, 7) 275 hours, remainder = divmod(remainder, 3600) 276 minutes, seconds = divmod(remainder, 60) 277 278 parts = [] 279 if weeks > 0: 280 parts.append(f"{weeks}w") 281 if days > 0: 282 parts.append(f"{days}d") 283 if hours > 0: 284 parts.append(f"{hours}h") 285 if minutes > 0: 286 parts.append(f"{minutes}m") 287 if seconds > 0 or total_seconds == 0: 288 parts.append(f"{seconds}s") 289 290 return " ".join(parts) 291 292 293def format_duration_wdhhmmss(td: timedelta) -> str: 294 """ 295 Format a timedelta into a human-readable string: Xw Yd HH:MM:SS. 296 297 Weeks and days are included only if non-zero. 298 Hours, minutes, and seconds are always displayed with zero-padding. 299 300 Examples: 301 0:00:45 -> "00:00:45" 302 1 day, 2:03:04 -> "1d 02:03:04" 303 10 days, 5:06:07 -> "1w 3d 05:06:07" 304 305 Args: 306 td (timedelta): The duration to format. 307 308 Returns: 309 str: Formatted string in "Xw Yd HH:MM:SS" format. 310 """ 311 total_seconds = int(td.total_seconds()) 312 313 weeks, remainder = divmod(total_seconds, 7 * 24 * 3600) 314 days, remainder = divmod(remainder, 24 * 3600) 315 hours, remainder = divmod(remainder, 3600) 316 minutes, seconds = divmod(remainder, 60) 317 318 parts = [] 319 if weeks > 0: 320 parts.append(f"{weeks}w") 321 if days > 0: 322 parts.append(f"{days}d") 323 324 parts.append(f"{hours:02}:{minutes:02}:{seconds:02}") 325 326 return " ".join(parts) 327 328 329def hhmmss_to_duration(timestr: str) -> timedelta: 330 """ 331 Convert a time string in HH:MM:SS (or HHH:MM:SS) format to a timedelta object. 332 333 Examples: 334 "0:00:00" -> 0 seconds 335 "1:23:45" -> 1 hour, 23 minutes, 45 seconds 336 "100:00:00" -> 100 hours 337 338 Args: 339 timestr (str): Input string in HH:MM:SS format. 340 341 Returns: 342 timedelta: The corresponding duration. 343 344 Raises: 345 QQError: If the input string is not in a valid HH:MM:SS format or contains 346 invalid numeric values. 347 """ 348 pattern = re.compile(r"^\s*(\d+):([0-5]?\d):([0-5]?\d)\s*$") 349 match = pattern.fullmatch(timestr) 350 if not match: 351 raise QQError(f"Invalid HH:MM:SS time string '{timestr}'.") 352 353 hours, minutes, seconds = map(int, match.groups()) 354 355 return timedelta(hours=hours, minutes=minutes, seconds=seconds) 356 357 358def dhhmmss_to_duration(timestr: str) -> timedelta: 359 """ 360 Convert a time string in optional D-HH:MM:SS (or HH:MM:SS / HHH:MM:SS) format to a timedelta object. 361 362 Examples: 363 "0:00:00" -> 0 seconds 364 "1:23:45" -> 1 hour, 23 minutes, 45 seconds 365 "100:00:00" -> 100 hours 366 "2-12:34:56" -> 2 days, 12 hours, 34 minutes, 56 seconds 367 368 Args: 369 timestr (str): Input string in optional D-HH:MM:SS format. 370 371 Returns: 372 timedelta: The corresponding duration. 373 374 Raises: 375 QQError: If the input string is not in a valid format or contains invalid numeric values. 376 """ 377 pattern = re.compile(r"^\s*(?:(\d+)-)?(\d+):([0-5]?\d):([0-5]?\d)\s*$") 378 match = pattern.fullmatch(timestr) 379 if not match: 380 raise QQError(f"Invalid D-HH:MM:SS time string '{timestr}'.") 381 382 days_str, hours_str, minutes_str, seconds_str = match.groups() 383 days = int(days_str) if days_str else 0 384 hours = int(hours_str) 385 minutes = int(minutes_str) 386 seconds = int(seconds_str) 387 388 return timedelta(days=days, hours=hours, minutes=minutes, seconds=seconds) 389 390 391def normalize(s: str) -> str: 392 """ 393 Normalize a string for consistent comparison. 394 395 The string is converted to lowercase and all hyphens and underscores are removed. 396 397 Args: 398 s (str): The input string to normalize. 399 400 Returns: 401 str: The normalized string. 402 """ 403 return s.lower().replace("-", "").replace("_", "") 404 405 406def equals_normalized(a: str, b: str) -> bool: 407 """ 408 Compare two strings for equality, ignoring case, hyphens, and underscores. 409 410 Args: 411 a (str): First string to compare. 412 b (str): Second string to compare. 413 414 Returns: 415 bool: True if the normalized strings are equal, False otherwise. 416 """ 417 418 return normalize(a) == normalize(b) 419 420 421def convert_absolute_to_relative(files: list[Path], target: Path) -> list[Path]: 422 """ 423 Convert a list of absolute paths into paths relative to a target directory. 424 425 Each file in `files` must be located inside `target` or one of its 426 subdirectories. If any file is outside `target`, a `QQError` is raised. 427 428 This function works even for remote files or paths to non-existent files. 429 430 Args: 431 files (list[Path]): A list of absolute file paths to convert. 432 target (Path): The target directory against which paths are made relative. 433 434 Returns: 435 list[Path]: A list of paths relative to `target`. 436 437 Raises: 438 QQError: If any file in `files` is not located within `target`. 439 """ 440 relative = [] 441 target_parts = target.parts 442 443 for file in files: 444 file_parts = file.parts 445 446 # file must starts with the target path 447 if file_parts[: len(target_parts)] != target_parts: 448 raise QQError(f"Item '{file}' is not in target directory '{target}'.") 449 450 # create a relative path 451 rel_path = Path(*file_parts[len(target_parts) :]) 452 relative.append(rel_path) 453 454 logger.debug(f"Converted paths: {relative}.") 455 return relative 456 457 458def wdhms_to_hhmmss(timestr: str) -> str: 459 """ 460 Convert a time specification in the wdhms format into (H)HH:MM:SS. 461 462 The accepted format is a sequence of one or more integer + unit tokens, 463 where unit is one of: 464 w = weeks, d = days, h = hours, m = minutes, s = seconds 465 466 Tokens may be compact (e.g. "1w2d3h") or space-separated 467 (e.g. "1w 2d 3h"). The function is case-insensitive. 468 469 Examples: 470 "1w2d3h4m5s" -> "195:04:05" 471 "90m" -> "1:30:00" 472 "" -> "0:00:00" 473 474 Args: 475 timestr: Input duration string in wdhms format. 476 477 Returns: 478 Converted time as a string in (H)HH:MM:SS. 479 480 Raises: 481 QQError: If the string contains invalid characters or does not 482 conform to the token pattern (excluding empty/whitespace, 483 which is treated as zero). 484 """ 485 # treat empty / whitespace-only as zero 486 if timestr.strip() == "": 487 return "0:00:00" 488 489 # validation 490 full_pattern = re.compile(r"^\s*(?:\d+\s*[wdhms]\s*)+$", re.IGNORECASE) 491 if not full_pattern.fullmatch(timestr): 492 raise QQError(f"Invalid time string '{timestr}'.") 493 494 # extract tokens 495 token_pattern = re.compile(r"(\d+)\s*([wdhms])", re.IGNORECASE) 496 matches = token_pattern.findall(timestr) 497 498 weeks = days = hours = minutes = seconds = 0 499 500 for value_str, unit in matches: 501 value = int(value_str) 502 unit = unit.lower() 503 if unit == "w": 504 weeks += value 505 elif unit == "d": 506 days += value 507 elif unit == "h": 508 hours += value 509 elif unit == "m": 510 minutes += value 511 elif unit == "s": 512 seconds += value 513 514 total_seconds = ( 515 weeks * 7 * 24 * 3600 + days * 24 * 3600 + hours * 3600 + minutes * 60 + seconds 516 ) 517 518 h, remainder = divmod(total_seconds, 3600) 519 m, s = divmod(remainder, 60) 520 521 return f"{h}:{m:02}:{s:02}" 522 523 524def hhmmss_to_wdhms(timestr: str) -> str: 525 """ 526 Convert a time specification in (H)HH:MM:SS format into the compact wdhms format. 527 528 The output format expresses the duration as a sequence of one or more integer + unit tokens: 529 w = weeks, d = days, h = hours, m = minutes, s = seconds 530 531 Units that are zero are omitted, except that "0s" is returned if the total duration is zero. 532 533 Examples: 534 "195:04:05" -> "1w2d3h4m5s" 535 "1:30:00" -> "1h30m" 536 "0:00:00" -> "0s" 537 "49:00:00" -> "2d1h" 538 539 Args: 540 timestr (str): Input time string in (H)HH:MM:SS format. 541 542 Returns: 543 str: Time duration converted into the compact wdhms format. 544 545 Raises: 546 QQError: If the input string is malformed or does not conform 547 to the expected (H)HH:MM:SS pattern. 548 """ 549 pattern = re.compile(r"^\s*(\d+):([0-5]?\d):([0-5]?\d)\s*$") 550 match = pattern.fullmatch(timestr) 551 if not match: 552 raise QQError(f"Invalid HH:MM:SS time string '{timestr}'.") 553 554 hours, minutes, seconds = map(int, match.groups()) 555 total_seconds = hours * 3600 + minutes * 60 + seconds 556 557 if total_seconds == 0: 558 return "0s" 559 560 weeks, remainder = divmod(total_seconds, 7 * 24 * 3600) 561 days, remainder = divmod(remainder, 24 * 3600) 562 hours, remainder = divmod(remainder, 3600) 563 minutes, seconds = divmod(remainder, 60) 564 565 parts = [] 566 if weeks: 567 parts.append(f"{weeks}w") 568 if days: 569 parts.append(f"{days}d") 570 if hours: 571 parts.append(f"{hours}h") 572 if minutes: 573 parts.append(f"{minutes}m") 574 if seconds: 575 parts.append(f"{seconds}s") 576 577 return "".join(parts) 578 579 580def printf_to_regex(pattern: str) -> str: 581 """ 582 Convert a simple printf-style pattern to an equivalent regular expression pattern. 583 584 Args: 585 pattern (str): A printf-style pattern (e.g., "md%04d", "file%03d_part%02d"). 586 587 Returns: 588 str: A string representing the equivalent regex pattern. 589 """ 590 regex = re.escape(pattern) 591 regex = re.sub(r"%0(\d+)d", r"\\d{\1}", regex) # double backslash 592 return re.sub(r"%d", r"\\d+", regex) 593 594 595def is_printf_pattern(pattern: str) -> bool: 596 """ 597 Detect whether a string pattern uses printf-style numeric placeholders. 598 599 Args: 600 pattern (str): The pattern string to check. 601 602 Returns: 603 bool: True if the pattern contains printf-style placeholders, False otherwise. 604 """ 605 return bool(re.search(r"%0?\d*d", pattern)) 606 607 608def split_files_list(string: str | None) -> list[Path]: 609 """ 610 Split a string containing multiple file paths into a list of relative Path objects. 611 612 The string can contain file paths separated by colons (:), commas (,), or 613 any whitespace characters (space, tab, newline). 614 615 Args: 616 string (str | None): The string containing file paths. If None or empty, 617 an empty list is returned. 618 619 Returns: 620 list[Path]: A list of Path objects corresponding to the individual 621 relative file paths in the input string. 622 """ 623 if not string: 624 return [] 625 626 return [Path(f) for f in re.split(r"[:,\s]+", string)] 627 628 629def to_snake_case(s: str) -> str: 630 """ 631 Convert a string from PascalCase or kebab-case to snake_case. 632 633 Args: 634 s (str): Input string in PascalCase or kebab-case. 635 636 Returns: 637 str: Converted string in snake_case. 638 """ 639 # replace hyphens with underscores 640 s = s.replace("-", "_") 641 642 # convert PascalCase to snake_case 643 return re.sub(r"(?<!^)(?=[A-Z])", "_", s).lower() 644 645 646def get_panel_width( 647 console: Console, factor: int, min_width: int | None, max_width: int | None 648): 649 """ 650 Calculate the width of a panel relative to the console width, constrained by 651 optional minimum and maximum width values. 652 653 Args: 654 console (Console): A rich Console-like object that provides terminal size. 655 factor (int): A divisor used to scale down the terminal width. 656 min_width (int): The minimum allowable panel width. If None, no lower bound is applied. 657 max_width (int): The maximum allowable panel width. If None, no upper bound is applied. 658 659 Returns: 660 int: The computed panel width after applying scaling and bounds. 661 """ 662 663 term_width = console.size.width 664 panel_width = term_width // factor 665 if min_width is not None: 666 panel_width = max(panel_width, min_width) 667 if max_width is not None: 668 panel_width = min(panel_width, max_width) 669 670 return panel_width 671 672 673def construct_loop_job_name(script_name: str, cycle: int) -> str: 674 """ 675 Construct a job name for a loop job. 676 677 Args: 678 script_name (str): Filename of the submitted script. 679 cycle (int): The current cycle of the loop job. 680 681 Returns: 682 str: The name of the loop job in the current cycle. 683 """ 684 try: 685 # if the script has an extension, put the cycle number BEFORE the extension 686 stem, suffix = script_name.split(".", maxsplit=1) 687 return f"{stem}{CFG.loop_jobs.pattern % cycle}.{suffix}" 688 except ValueError: 689 # if the script has no extension, add the cycle number after the full name 690 return f"{script_name}{CFG.loop_jobs.pattern % cycle}" 691 692 693def construct_info_file_path(input_dir: Path, job_name: str) -> Path: 694 """ 695 Construct the absolute path to a job's qq info file. 696 697 Args: 698 input_dir (Path): The directory containing the job script. 699 job_name (str): The name of the job. 700 701 Returns: 702 Path: The absolute path to the job's qq info file. 703 """ 704 return logical_resolve((input_dir / job_name).with_suffix(CFG.suffixes.qq_info)) 705 706 707def available_work_dirs() -> str: 708 """ 709 Return the supported work-directory types for the detected batch system. 710 711 The batch system is determined using the `QQ_BATCH_SYSTEM` environment 712 variable or by automatic detection. The supported work-directory types are 713 returned as a comma-separated string formatted for display in help text. 714 715 Returns: 716 str: A comma-separated list of supported work directory types, each 717 wrapped in quotes. 718 """ 719 from qq_lib.batch.interface import BatchInterface 720 721 try: 722 BatchSystem = BatchInterface.from_env_var_or_guess() 723 work_dirs = BatchSystem.get_supported_work_dir_types() 724 return ", ".join([f"'{work_dir_type}'" for work_dir_type in work_dirs]) 725 except QQError: 726 return "??? (no batch system detected)" 727 728 729def available_job_types() -> str: 730 """ 731 Return the supported job types. 732 733 Returns: 734 str: A comma-separated list of supported job types, each wrapped in quotes. 735 """ 736 from qq_lib.properties.job_type import JobType 737 738 return ", ".join([f"'{str(job_type)}'" for job_type in JobType]) 739 740 741def translate_server(raw: str) -> str: 742 """ 743 Translate a batch server shortcut to its full name. 744 If the shortcut is not recognized, the original value is returned unchanged. 745 746 Returns: 747 str: Full name the the batch server. 748 """ 749 return CFG.batch_servers_options.known_servers.get(raw, raw) 750 751 752def default_resubmit_from_hosts() -> str: 753 """ 754 Returns the default resubmission hosts as a string. 755 756 Returns: 757 str: A comma-separated list of the default resubmission hosts. 758 """ 759 from qq_lib.batch.interface import BatchInterface 760 761 try: 762 return CFG.resubmitter.default_resubmit_hosts or ",".join( 763 x.to_str() 764 for x in BatchInterface.from_env_var_or_guess().get_default_resubmit_hosts() 765 ) 766 # if no batch system is available 767 except QQError: 768 return "??? (no batch system detected)"
32@lru_cache(maxsize=1) 33def load_yaml_dumper() -> type[yaml.Dumper]: 34 """Return the fastest available YAML dumper (CDumper if possible).""" 35 try: 36 from yaml import CDumper as Dumper 37 38 logger.debug("Loaded YAML CDumper.") 39 except ImportError: 40 from yaml import Dumper 41 42 logger.debug("Loaded default YAML dumper.") 43 return Dumper
Return the fastest available YAML dumper (CDumper if possible).
46@lru_cache(maxsize=1) 47def load_yaml_loader() -> type[yaml.SafeLoader]: 48 """Return the fastest available safe YAML loader (CSafeLoader if possible).""" 49 try: 50 from yaml import ( 51 CSafeLoader as SafeLoader, 52 ) 53 54 logger.debug("Loaded YAML CLoader.") 55 except ImportError: 56 from yaml import SafeLoader 57 58 logger.debug("Loaded default YAML loader.") 59 60 return SafeLoader
Return the fastest available safe YAML loader (CSafeLoader if possible).
63def get_files_with_suffix(directory: Path, suffix: str) -> list[Path]: 64 """ 65 Retrieve all files in a directory that have the specified file suffix. 66 67 Args: 68 directory (Path): The directory to search in. 69 suffix (str): The file suffix to match (including the dot, e.g., '.txt'). 70 71 Returns: 72 list[Path]: A list of Path objects representing files with the given suffix. 73 """ 74 files = [] 75 for file in directory.iterdir(): 76 if file.is_file() and file.suffix == suffix: 77 files.append(file) 78 79 return files
Retrieve all files in a directory that have the specified file suffix.
Arguments:
- directory (Path): The directory to search in.
- suffix (str): The file suffix to match (including the dot, e.g., '.txt').
Returns:
list[Path]: A list of Path objects representing files with the given suffix.
82def get_runtime_files(directory: Path) -> list[Path]: 83 """ 84 Retrieve all qq runtime files in a directory. 85 86 Args: 87 directory (Path): The directory to search in. 88 89 Returns: 90 list[Path]: A list of Path objects representing qq runtime files. 91 """ 92 files = [] 93 for suffix in CFG.suffixes.all_suffixes: 94 files.extend(get_files_with_suffix(directory, suffix)) 95 96 return files
Retrieve all qq runtime files in a directory.
Arguments:
- directory (Path): The directory to search in.
Returns:
list[Path]: A list of Path objects representing qq runtime files.
99def get_info_file(directory: Path) -> Path: 100 """ 101 Locate the qq job info file in a directory. 102 103 This function searches for files with suffix `CFG.suffixes.qq_info` in the 104 provided directory. It raises an error if none or multiple info files are found. 105 106 Args: 107 directory (Path): The directory to search in. 108 109 Returns: 110 Path: The Path object of the detected qq job info file. 111 112 Raises: 113 QQError: If no info file is found or multiple info files are detected. 114 """ 115 info_files = get_info_files(directory) 116 if len(info_files) == 0: 117 raise QQError("No qq job info file found.") 118 if len(info_files) > 1: 119 raise QQError("Multiple qq job info files found.") 120 121 return info_files[0]
Locate the qq job info file in a directory.
This function searches for files with suffix CFG.suffixes.qq_info in the
provided directory. It raises an error if none or multiple info files are found.
Arguments:
- directory (Path): The directory to search in.
Returns:
Path: The Path object of the detected qq job info file.
Raises:
- QQError: If no info file is found or multiple info files are detected.
124def get_info_files(directory: Path) -> list[Path]: 125 """ 126 Retrieve all qq job info files in a directory. 127 128 This function searches for files with suffix `CFG.suffixes.qq_info` in the 129 provided directory. The files are sorted by their last modification time 130 (with the newest modified file being last in the list). 131 132 Args: 133 directory (Path): The directory to search in. 134 135 Returns: 136 list[Path]: A list of Path objects representing the detected qq job info files. 137 """ 138 info_files = get_files_with_suffix(directory, CFG.suffixes.qq_info) 139 logger.debug(f"Detected the following qq info files: {info_files}.") 140 141 return sorted(info_files, key=lambda f: f.stat().st_mtime)
Retrieve all qq job info files in a directory.
This function searches for files with suffix CFG.suffixes.qq_info in the
provided directory. The files are sorted by their last modification time
(with the newest modified file being last in the list).
Arguments:
- directory (Path): The directory to search in.
Returns:
list[Path]: A list of Path objects representing the detected qq job info files.
144def get_info_file_from_job_id(job_id: str) -> Path: 145 """ 146 Get path to the qq info file corresponding to a job with the given ID. 147 The batch system to use is obtained from the environment variable or guessed. 148 149 Args: 150 job_id (str): The ID of the job for which to retrieve the info file. 151 152 Returns: 153 Path: Absolute path to the qq job info file. 154 155 Raises: 156 QQError: If the batch system could not be guessed, 157 the job does not exist or is not a qq job. 158 """ 159 160 from qq_lib.batch.interface import BatchInterface, BatchJobInterface 161 162 BatchSystem = BatchInterface.from_env_var_or_guess() 163 job_info: BatchJobInterface = BatchSystem.get_batch_job(job_id) 164 165 if job_info.is_empty(): 166 raise QQError(f"Job '{job_id}' does not exist.") 167 168 if not (path := job_info.get_info_file()): 169 raise QQError(f"Job '{job_id}' is not a valid qq job.") 170 171 return path
Get path to the qq info file corresponding to a job with the given ID. The batch system to use is obtained from the environment variable or guessed.
Arguments:
- job_id (str): The ID of the job for which to retrieve the info file.
Returns:
Path: Absolute path to the qq job info file.
Raises:
- QQError: If the batch system could not be guessed,
- the job does not exist or is not a qq job.
174def get_info_files_from_job_id_or_dir(job_id: str | None) -> list[Path]: 175 """ 176 Retrieve qq job info files based on a job ID or from the current directory. 177 178 Args: 179 job_id (str | None): The ID of the qq job to retrieve the info file for. 180 If None, the function searches for qq job info files in the current directory. 181 182 Returns: 183 list[Path]: A list containing the qq job info file(s). If a job ID is provided, 184 the list contains a single Path. If not, it contains all detected info files 185 in the current directory. 186 187 Raises: 188 QQError: If the info file corresponding to the given job ID does not exist 189 or is not reachable, or if no qq job info file is found in the current 190 directory when no job ID is provided. 191 """ 192 if job_id: 193 info_file = get_info_file_from_job_id(job_id) 194 # check that the detected info file exists and we have permissions to read it 195 try: 196 missing = not info_file.is_file() 197 except PermissionError: 198 missing = True 199 200 if missing: 201 raise QQError( 202 f"Info file for job '{job_id}' does not exist or is not reachable." 203 ) 204 205 return [info_file] 206 207 # get info files from the directory 208 info_files = get_info_files(Path()) 209 if not info_files: 210 raise QQError("No qq job info file found.") 211 212 return info_files
Retrieve qq job info files based on a job ID or from the current directory.
Arguments:
- job_id (str | None): The ID of the qq job to retrieve the info file for. If None, the function searches for qq job info files in the current directory.
Returns:
list[Path]: A list containing the qq job info file(s). If a job ID is provided, the list contains a single Path. If not, it contains all detected info files in the current directory.
Raises:
- QQError: If the info file corresponding to the given job ID does not exist or is not reachable, or if no qq job info file is found in the current directory when no job ID is provided.
215def yes_or_no_prompt(prompt: str) -> bool: 216 """ 217 Display an interactive yes/no prompt to the user and return the selection. 218 219 The prompt highlights the pressed key ('y' in green for yes, 'N' in red for no) 220 and defaults to 'No' if the user presses any key other than 'y'. 221 222 Args: 223 prompt (str): The text to display as the question. 224 225 Returns: 226 bool: True if the user selects 'yes' (presses 'y'), False otherwise. 227 """ 228 prompt = f" {prompt} " 229 text = ( 230 Text("PROMPT", style="magenta") 231 + Text(prompt, style="default") 232 + Text("[y/N]", style="bold default") 233 ) 234 235 with Live(text, refresh_per_second=1) as live: 236 key = readchar.readkey().lower() 237 238 # highlight the pressed key 239 if key == "y": 240 choice = ( 241 Text("[", style="bold default") 242 + Text("y", style="bold green") 243 + Text("/N]", style="bold default") 244 ) 245 else: 246 choice = ( 247 Text("[y/", style="bold default") 248 + Text("N", style="bold red") 249 + Text("]", style="bold default") 250 ) 251 252 live.update( 253 Text("PROMPT", style="magenta") + Text(prompt, style="default") + choice 254 ) 255 256 return key == "y"
Display an interactive yes/no prompt to the user and return the selection.
The prompt highlights the pressed key ('y' in green for yes, 'N' in red for no) and defaults to 'No' if the user presses any key other than 'y'.
Arguments:
- prompt (str): The text to display as the question.
Returns:
bool: True if the user selects 'yes' (presses 'y'), False otherwise.
259def format_duration(td: timedelta) -> str: 260 """ 261 Convert a timedelta into a human-readable string showing only relevant units. 262 263 The output string includes weeks, days, hours, minutes, and seconds, but omits 264 units that are zero. 265 266 Args: 267 td (timedelta): The duration to format. 268 269 Returns: 270 str: A formatted string representing the duration, e.g., '1d 2h 3m 4s'. 271 """ 272 total_seconds = int(td.total_seconds()) 273 274 days_total, remainder = divmod(total_seconds, 86400) 275 weeks, days = divmod(days_total, 7) 276 hours, remainder = divmod(remainder, 3600) 277 minutes, seconds = divmod(remainder, 60) 278 279 parts = [] 280 if weeks > 0: 281 parts.append(f"{weeks}w") 282 if days > 0: 283 parts.append(f"{days}d") 284 if hours > 0: 285 parts.append(f"{hours}h") 286 if minutes > 0: 287 parts.append(f"{minutes}m") 288 if seconds > 0 or total_seconds == 0: 289 parts.append(f"{seconds}s") 290 291 return " ".join(parts)
Convert a timedelta into a human-readable string showing only relevant units.
The output string includes weeks, days, hours, minutes, and seconds, but omits units that are zero.
Arguments:
- td (timedelta): The duration to format.
Returns:
str: A formatted string representing the duration, e.g., '1d 2h 3m 4s'.
294def format_duration_wdhhmmss(td: timedelta) -> str: 295 """ 296 Format a timedelta into a human-readable string: Xw Yd HH:MM:SS. 297 298 Weeks and days are included only if non-zero. 299 Hours, minutes, and seconds are always displayed with zero-padding. 300 301 Examples: 302 0:00:45 -> "00:00:45" 303 1 day, 2:03:04 -> "1d 02:03:04" 304 10 days, 5:06:07 -> "1w 3d 05:06:07" 305 306 Args: 307 td (timedelta): The duration to format. 308 309 Returns: 310 str: Formatted string in "Xw Yd HH:MM:SS" format. 311 """ 312 total_seconds = int(td.total_seconds()) 313 314 weeks, remainder = divmod(total_seconds, 7 * 24 * 3600) 315 days, remainder = divmod(remainder, 24 * 3600) 316 hours, remainder = divmod(remainder, 3600) 317 minutes, seconds = divmod(remainder, 60) 318 319 parts = [] 320 if weeks > 0: 321 parts.append(f"{weeks}w") 322 if days > 0: 323 parts.append(f"{days}d") 324 325 parts.append(f"{hours:02}:{minutes:02}:{seconds:02}") 326 327 return " ".join(parts)
Format a timedelta into a human-readable string: Xw Yd HH:MM:SS.
Weeks and days are included only if non-zero. Hours, minutes, and seconds are always displayed with zero-padding.
Examples:
0:00:45 -> "00:00:45" 1 day, 2:03:04 -> "1d 02:03:04" 10 days, 5:06:07 -> "1w 3d 05:06:07"
Arguments:
- td (timedelta): The duration to format.
Returns:
str: Formatted string in "Xw Yd HH:MM:SS" format.
330def hhmmss_to_duration(timestr: str) -> timedelta: 331 """ 332 Convert a time string in HH:MM:SS (or HHH:MM:SS) format to a timedelta object. 333 334 Examples: 335 "0:00:00" -> 0 seconds 336 "1:23:45" -> 1 hour, 23 minutes, 45 seconds 337 "100:00:00" -> 100 hours 338 339 Args: 340 timestr (str): Input string in HH:MM:SS format. 341 342 Returns: 343 timedelta: The corresponding duration. 344 345 Raises: 346 QQError: If the input string is not in a valid HH:MM:SS format or contains 347 invalid numeric values. 348 """ 349 pattern = re.compile(r"^\s*(\d+):([0-5]?\d):([0-5]?\d)\s*$") 350 match = pattern.fullmatch(timestr) 351 if not match: 352 raise QQError(f"Invalid HH:MM:SS time string '{timestr}'.") 353 354 hours, minutes, seconds = map(int, match.groups()) 355 356 return timedelta(hours=hours, minutes=minutes, seconds=seconds)
Convert a time string in HH:MM:SS (or HHH:MM:SS) format to a timedelta object.
Examples:
"0:00:00" -> 0 seconds "1:23:45" -> 1 hour, 23 minutes, 45 seconds "100:00:00" -> 100 hours
Arguments:
- timestr (str): Input string in HH:MM:SS format.
Returns:
timedelta: The corresponding duration.
Raises:
- QQError: If the input string is not in a valid HH:MM:SS format or contains invalid numeric values.
359def dhhmmss_to_duration(timestr: str) -> timedelta: 360 """ 361 Convert a time string in optional D-HH:MM:SS (or HH:MM:SS / HHH:MM:SS) format to a timedelta object. 362 363 Examples: 364 "0:00:00" -> 0 seconds 365 "1:23:45" -> 1 hour, 23 minutes, 45 seconds 366 "100:00:00" -> 100 hours 367 "2-12:34:56" -> 2 days, 12 hours, 34 minutes, 56 seconds 368 369 Args: 370 timestr (str): Input string in optional D-HH:MM:SS format. 371 372 Returns: 373 timedelta: The corresponding duration. 374 375 Raises: 376 QQError: If the input string is not in a valid format or contains invalid numeric values. 377 """ 378 pattern = re.compile(r"^\s*(?:(\d+)-)?(\d+):([0-5]?\d):([0-5]?\d)\s*$") 379 match = pattern.fullmatch(timestr) 380 if not match: 381 raise QQError(f"Invalid D-HH:MM:SS time string '{timestr}'.") 382 383 days_str, hours_str, minutes_str, seconds_str = match.groups() 384 days = int(days_str) if days_str else 0 385 hours = int(hours_str) 386 minutes = int(minutes_str) 387 seconds = int(seconds_str) 388 389 return timedelta(days=days, hours=hours, minutes=minutes, seconds=seconds)
Convert a time string in optional D-HH:MM:SS (or HH:MM:SS / HHH:MM:SS) format to a timedelta object.
Examples:
"0:00:00" -> 0 seconds "1:23:45" -> 1 hour, 23 minutes, 45 seconds "100:00:00" -> 100 hours "2-12:34:56" -> 2 days, 12 hours, 34 minutes, 56 seconds
Arguments:
- timestr (str): Input string in optional D-HH:MM:SS format.
Returns:
timedelta: The corresponding duration.
Raises:
- QQError: If the input string is not in a valid format or contains invalid numeric values.
392def normalize(s: str) -> str: 393 """ 394 Normalize a string for consistent comparison. 395 396 The string is converted to lowercase and all hyphens and underscores are removed. 397 398 Args: 399 s (str): The input string to normalize. 400 401 Returns: 402 str: The normalized string. 403 """ 404 return s.lower().replace("-", "").replace("_", "")
Normalize a string for consistent comparison.
The string is converted to lowercase and all hyphens and underscores are removed.
Arguments:
- s (str): The input string to normalize.
Returns:
str: The normalized string.
407def equals_normalized(a: str, b: str) -> bool: 408 """ 409 Compare two strings for equality, ignoring case, hyphens, and underscores. 410 411 Args: 412 a (str): First string to compare. 413 b (str): Second string to compare. 414 415 Returns: 416 bool: True if the normalized strings are equal, False otherwise. 417 """ 418 419 return normalize(a) == normalize(b)
Compare two strings for equality, ignoring case, hyphens, and underscores.
Arguments:
- a (str): First string to compare.
- b (str): Second string to compare.
Returns:
bool: True if the normalized strings are equal, False otherwise.
422def convert_absolute_to_relative(files: list[Path], target: Path) -> list[Path]: 423 """ 424 Convert a list of absolute paths into paths relative to a target directory. 425 426 Each file in `files` must be located inside `target` or one of its 427 subdirectories. If any file is outside `target`, a `QQError` is raised. 428 429 This function works even for remote files or paths to non-existent files. 430 431 Args: 432 files (list[Path]): A list of absolute file paths to convert. 433 target (Path): The target directory against which paths are made relative. 434 435 Returns: 436 list[Path]: A list of paths relative to `target`. 437 438 Raises: 439 QQError: If any file in `files` is not located within `target`. 440 """ 441 relative = [] 442 target_parts = target.parts 443 444 for file in files: 445 file_parts = file.parts 446 447 # file must starts with the target path 448 if file_parts[: len(target_parts)] != target_parts: 449 raise QQError(f"Item '{file}' is not in target directory '{target}'.") 450 451 # create a relative path 452 rel_path = Path(*file_parts[len(target_parts) :]) 453 relative.append(rel_path) 454 455 logger.debug(f"Converted paths: {relative}.") 456 return relative
Convert a list of absolute paths into paths relative to a target directory.
Each file in files must be located inside target or one of its
subdirectories. If any file is outside target, a QQError is raised.
This function works even for remote files or paths to non-existent files.
Arguments:
- files (list[Path]): A list of absolute file paths to convert.
- target (Path): The target directory against which paths are made relative.
Returns:
list[Path]: A list of paths relative to
target.
Raises:
- QQError: If any file in
filesis not located withintarget.
459def wdhms_to_hhmmss(timestr: str) -> str: 460 """ 461 Convert a time specification in the wdhms format into (H)HH:MM:SS. 462 463 The accepted format is a sequence of one or more integer + unit tokens, 464 where unit is one of: 465 w = weeks, d = days, h = hours, m = minutes, s = seconds 466 467 Tokens may be compact (e.g. "1w2d3h") or space-separated 468 (e.g. "1w 2d 3h"). The function is case-insensitive. 469 470 Examples: 471 "1w2d3h4m5s" -> "195:04:05" 472 "90m" -> "1:30:00" 473 "" -> "0:00:00" 474 475 Args: 476 timestr: Input duration string in wdhms format. 477 478 Returns: 479 Converted time as a string in (H)HH:MM:SS. 480 481 Raises: 482 QQError: If the string contains invalid characters or does not 483 conform to the token pattern (excluding empty/whitespace, 484 which is treated as zero). 485 """ 486 # treat empty / whitespace-only as zero 487 if timestr.strip() == "": 488 return "0:00:00" 489 490 # validation 491 full_pattern = re.compile(r"^\s*(?:\d+\s*[wdhms]\s*)+$", re.IGNORECASE) 492 if not full_pattern.fullmatch(timestr): 493 raise QQError(f"Invalid time string '{timestr}'.") 494 495 # extract tokens 496 token_pattern = re.compile(r"(\d+)\s*([wdhms])", re.IGNORECASE) 497 matches = token_pattern.findall(timestr) 498 499 weeks = days = hours = minutes = seconds = 0 500 501 for value_str, unit in matches: 502 value = int(value_str) 503 unit = unit.lower() 504 if unit == "w": 505 weeks += value 506 elif unit == "d": 507 days += value 508 elif unit == "h": 509 hours += value 510 elif unit == "m": 511 minutes += value 512 elif unit == "s": 513 seconds += value 514 515 total_seconds = ( 516 weeks * 7 * 24 * 3600 + days * 24 * 3600 + hours * 3600 + minutes * 60 + seconds 517 ) 518 519 h, remainder = divmod(total_seconds, 3600) 520 m, s = divmod(remainder, 60) 521 522 return f"{h}:{m:02}:{s:02}"
Convert a time specification in the wdhms format into (H)HH:MM:SS.
The accepted format is a sequence of one or more integer + unit tokens, where unit is one of: w = weeks, d = days, h = hours, m = minutes, s = seconds
Tokens may be compact (e.g. "1w2d3h") or space-separated (e.g. "1w 2d 3h"). The function is case-insensitive.
Examples:
"1w2d3h4m5s" -> "195:04:05" "90m" -> "1:30:00" "" -> "0:00:00"
Arguments:
- timestr: Input duration string in wdhms format.
Returns:
Converted time as a string in (H)HH:MM:SS.
Raises:
- QQError: If the string contains invalid characters or does not conform to the token pattern (excluding empty/whitespace, which is treated as zero).
525def hhmmss_to_wdhms(timestr: str) -> str: 526 """ 527 Convert a time specification in (H)HH:MM:SS format into the compact wdhms format. 528 529 The output format expresses the duration as a sequence of one or more integer + unit tokens: 530 w = weeks, d = days, h = hours, m = minutes, s = seconds 531 532 Units that are zero are omitted, except that "0s" is returned if the total duration is zero. 533 534 Examples: 535 "195:04:05" -> "1w2d3h4m5s" 536 "1:30:00" -> "1h30m" 537 "0:00:00" -> "0s" 538 "49:00:00" -> "2d1h" 539 540 Args: 541 timestr (str): Input time string in (H)HH:MM:SS format. 542 543 Returns: 544 str: Time duration converted into the compact wdhms format. 545 546 Raises: 547 QQError: If the input string is malformed or does not conform 548 to the expected (H)HH:MM:SS pattern. 549 """ 550 pattern = re.compile(r"^\s*(\d+):([0-5]?\d):([0-5]?\d)\s*$") 551 match = pattern.fullmatch(timestr) 552 if not match: 553 raise QQError(f"Invalid HH:MM:SS time string '{timestr}'.") 554 555 hours, minutes, seconds = map(int, match.groups()) 556 total_seconds = hours * 3600 + minutes * 60 + seconds 557 558 if total_seconds == 0: 559 return "0s" 560 561 weeks, remainder = divmod(total_seconds, 7 * 24 * 3600) 562 days, remainder = divmod(remainder, 24 * 3600) 563 hours, remainder = divmod(remainder, 3600) 564 minutes, seconds = divmod(remainder, 60) 565 566 parts = [] 567 if weeks: 568 parts.append(f"{weeks}w") 569 if days: 570 parts.append(f"{days}d") 571 if hours: 572 parts.append(f"{hours}h") 573 if minutes: 574 parts.append(f"{minutes}m") 575 if seconds: 576 parts.append(f"{seconds}s") 577 578 return "".join(parts)
Convert a time specification in (H)HH:MM:SS format into the compact wdhms format.
The output format expresses the duration as a sequence of one or more integer + unit tokens: w = weeks, d = days, h = hours, m = minutes, s = seconds
Units that are zero are omitted, except that "0s" is returned if the total duration is zero.
Examples:
"195:04:05" -> "1w2d3h4m5s" "1:30:00" -> "1h30m" "0:00:00" -> "0s" "49:00:00" -> "2d1h"
Arguments:
- timestr (str): Input time string in (H)HH:MM:SS format.
Returns:
str: Time duration converted into the compact wdhms format.
Raises:
- QQError: If the input string is malformed or does not conform to the expected (H)HH:MM:SS pattern.
581def printf_to_regex(pattern: str) -> str: 582 """ 583 Convert a simple printf-style pattern to an equivalent regular expression pattern. 584 585 Args: 586 pattern (str): A printf-style pattern (e.g., "md%04d", "file%03d_part%02d"). 587 588 Returns: 589 str: A string representing the equivalent regex pattern. 590 """ 591 regex = re.escape(pattern) 592 regex = re.sub(r"%0(\d+)d", r"\\d{\1}", regex) # double backslash 593 return re.sub(r"%d", r"\\d+", regex)
Convert a simple printf-style pattern to an equivalent regular expression pattern.
Arguments:
- pattern (str): A printf-style pattern (e.g., "md%04d", "file%03d_part%02d").
Returns:
str: A string representing the equivalent regex pattern.
596def is_printf_pattern(pattern: str) -> bool: 597 """ 598 Detect whether a string pattern uses printf-style numeric placeholders. 599 600 Args: 601 pattern (str): The pattern string to check. 602 603 Returns: 604 bool: True if the pattern contains printf-style placeholders, False otherwise. 605 """ 606 return bool(re.search(r"%0?\d*d", pattern))
Detect whether a string pattern uses printf-style numeric placeholders.
Arguments:
- pattern (str): The pattern string to check.
Returns:
bool: True if the pattern contains printf-style placeholders, False otherwise.
609def split_files_list(string: str | None) -> list[Path]: 610 """ 611 Split a string containing multiple file paths into a list of relative Path objects. 612 613 The string can contain file paths separated by colons (:), commas (,), or 614 any whitespace characters (space, tab, newline). 615 616 Args: 617 string (str | None): The string containing file paths. If None or empty, 618 an empty list is returned. 619 620 Returns: 621 list[Path]: A list of Path objects corresponding to the individual 622 relative file paths in the input string. 623 """ 624 if not string: 625 return [] 626 627 return [Path(f) for f in re.split(r"[:,\s]+", string)]
Split a string containing multiple file paths into a list of relative Path objects.
The string can contain file paths separated by colons (:), commas (,), or any whitespace characters (space, tab, newline).
Arguments:
- string (str | None): The string containing file paths. If None or empty, an empty list is returned.
Returns:
list[Path]: A list of Path objects corresponding to the individual relative file paths in the input string.
630def to_snake_case(s: str) -> str: 631 """ 632 Convert a string from PascalCase or kebab-case to snake_case. 633 634 Args: 635 s (str): Input string in PascalCase or kebab-case. 636 637 Returns: 638 str: Converted string in snake_case. 639 """ 640 # replace hyphens with underscores 641 s = s.replace("-", "_") 642 643 # convert PascalCase to snake_case 644 return re.sub(r"(?<!^)(?=[A-Z])", "_", s).lower()
Convert a string from PascalCase or kebab-case to snake_case.
Arguments:
- s (str): Input string in PascalCase or kebab-case.
Returns:
str: Converted string in snake_case.
647def get_panel_width( 648 console: Console, factor: int, min_width: int | None, max_width: int | None 649): 650 """ 651 Calculate the width of a panel relative to the console width, constrained by 652 optional minimum and maximum width values. 653 654 Args: 655 console (Console): A rich Console-like object that provides terminal size. 656 factor (int): A divisor used to scale down the terminal width. 657 min_width (int): The minimum allowable panel width. If None, no lower bound is applied. 658 max_width (int): The maximum allowable panel width. If None, no upper bound is applied. 659 660 Returns: 661 int: The computed panel width after applying scaling and bounds. 662 """ 663 664 term_width = console.size.width 665 panel_width = term_width // factor 666 if min_width is not None: 667 panel_width = max(panel_width, min_width) 668 if max_width is not None: 669 panel_width = min(panel_width, max_width) 670 671 return panel_width
Calculate the width of a panel relative to the console width, constrained by optional minimum and maximum width values.
Arguments:
- console (Console): A rich Console-like object that provides terminal size.
- factor (int): A divisor used to scale down the terminal width.
- min_width (int): The minimum allowable panel width. If None, no lower bound is applied.
- max_width (int): The maximum allowable panel width. If None, no upper bound is applied.
Returns:
int: The computed panel width after applying scaling and bounds.
674def construct_loop_job_name(script_name: str, cycle: int) -> str: 675 """ 676 Construct a job name for a loop job. 677 678 Args: 679 script_name (str): Filename of the submitted script. 680 cycle (int): The current cycle of the loop job. 681 682 Returns: 683 str: The name of the loop job in the current cycle. 684 """ 685 try: 686 # if the script has an extension, put the cycle number BEFORE the extension 687 stem, suffix = script_name.split(".", maxsplit=1) 688 return f"{stem}{CFG.loop_jobs.pattern % cycle}.{suffix}" 689 except ValueError: 690 # if the script has no extension, add the cycle number after the full name 691 return f"{script_name}{CFG.loop_jobs.pattern % cycle}"
Construct a job name for a loop job.
Arguments:
- script_name (str): Filename of the submitted script.
- cycle (int): The current cycle of the loop job.
Returns:
str: The name of the loop job in the current cycle.
694def construct_info_file_path(input_dir: Path, job_name: str) -> Path: 695 """ 696 Construct the absolute path to a job's qq info file. 697 698 Args: 699 input_dir (Path): The directory containing the job script. 700 job_name (str): The name of the job. 701 702 Returns: 703 Path: The absolute path to the job's qq info file. 704 """ 705 return logical_resolve((input_dir / job_name).with_suffix(CFG.suffixes.qq_info))
Construct the absolute path to a job's qq info file.
Arguments:
- input_dir (Path): The directory containing the job script.
- job_name (str): The name of the job.
Returns:
Path: The absolute path to the job's qq info file.
708def available_work_dirs() -> str: 709 """ 710 Return the supported work-directory types for the detected batch system. 711 712 The batch system is determined using the `QQ_BATCH_SYSTEM` environment 713 variable or by automatic detection. The supported work-directory types are 714 returned as a comma-separated string formatted for display in help text. 715 716 Returns: 717 str: A comma-separated list of supported work directory types, each 718 wrapped in quotes. 719 """ 720 from qq_lib.batch.interface import BatchInterface 721 722 try: 723 BatchSystem = BatchInterface.from_env_var_or_guess() 724 work_dirs = BatchSystem.get_supported_work_dir_types() 725 return ", ".join([f"'{work_dir_type}'" for work_dir_type in work_dirs]) 726 except QQError: 727 return "??? (no batch system detected)"
Return the supported work-directory types for the detected batch system.
The batch system is determined using the QQ_BATCH_SYSTEM environment
variable or by automatic detection. The supported work-directory types are
returned as a comma-separated string formatted for display in help text.
Returns:
str: A comma-separated list of supported work directory types, each wrapped in quotes.
730def available_job_types() -> str: 731 """ 732 Return the supported job types. 733 734 Returns: 735 str: A comma-separated list of supported job types, each wrapped in quotes. 736 """ 737 from qq_lib.properties.job_type import JobType 738 739 return ", ".join([f"'{str(job_type)}'" for job_type in JobType])
Return the supported job types.
Returns:
str: A comma-separated list of supported job types, each wrapped in quotes.
742def translate_server(raw: str) -> str: 743 """ 744 Translate a batch server shortcut to its full name. 745 If the shortcut is not recognized, the original value is returned unchanged. 746 747 Returns: 748 str: Full name the the batch server. 749 """ 750 return CFG.batch_servers_options.known_servers.get(raw, raw)
Translate a batch server shortcut to its full name. If the shortcut is not recognized, the original value is returned unchanged.
Returns:
str: Full name the the batch server.
753def default_resubmit_from_hosts() -> str: 754 """ 755 Returns the default resubmission hosts as a string. 756 757 Returns: 758 str: A comma-separated list of the default resubmission hosts. 759 """ 760 from qq_lib.batch.interface import BatchInterface 761 762 try: 763 return CFG.resubmitter.default_resubmit_hosts or ",".join( 764 x.to_str() 765 for x in BatchInterface.from_env_var_or_guess().get_default_resubmit_hosts() 766 ) 767 # if no batch system is available 768 except QQError: 769 return "??? (no batch system detected)"
Returns the default resubmission hosts as a string.
Returns:
str: A comma-separated list of the default resubmission hosts.