qq_lib.sync
File-synchronization utilities for retrieving data from a running or failed qq job.
This module defines the Syncer class, an extension of Navigator that handles
copying files from a job's remote working directory back to the job's input
directory. It performs safety checks based on the job's real state, ensuring
that synchronization is attempted only when a working directory actually exists.
1# Released under MIT License. 2# Copyright (c) 2025-2026 Ladislav Bartos and Robert Vacha Lab 3 4""" 5File-synchronization utilities for retrieving data from a running or failed qq job. 6 7This module defines the `Syncer` class, an extension of `Navigator` that handles 8copying files from a job's remote working directory back to the job's input 9directory. It performs safety checks based on the job's real state, ensuring 10that synchronization is attempted only when a working directory actually exists. 11""" 12 13from .syncer import Syncer 14 15__all__ = [ 16 "Syncer", 17]
12class Syncer(Navigator): 13 """ 14 Handle synchronization of job files between a remote working directory 15 (on a compute node) and the local input directory. 16 """ 17 18 def ensure_suitable(self): 19 """ 20 Verify that the job is in a state where files can be fetched from its working directory. 21 22 Raises: 23 QQNotSuitableError: If the working directory is not expected to exist 24 or if the working directory is the input directory. 25 """ 26 if self._work_dir_is_input_dir(): 27 raise QQNotSuitableError( 28 "Working directory of the job is the input directory of the job: implicitly synchronized." 29 ) 30 31 if self._is_synchronized(): 32 raise QQNotSuitableError( 33 "Job has been completed and was synchronized: working directory no longer exists." 34 ) 35 36 # killed jobs may not have working directory 37 if self._is_killed() and not self.has_destination(): 38 raise QQNotSuitableError( 39 "Job has been killed and no working directory is available." 40 ) 41 42 # queued jobs do not have working directory 43 if self._is_queued(): 44 raise QQNotSuitableError("Job is queued or booting: nothing to sync.") 45 46 def sync(self, files: list[str] | None = None) -> None: 47 """ 48 Synchronize files from the remote working directory to the local input directory. 49 50 Args: 51 files (list[str] | None): Optional list of specific filenames to fetch. 52 If omitted, all files are synchronized except those excluded by the batch system. 53 54 Behavior: 55 - If `files` is provided, only those specific files are copied. 56 - If omitted, the entire working directory is synchronized. 57 58 Raises: 59 QQError: If the job's destination (host or working directory) cannot be determined. 60 """ 61 if not self.has_destination(): 62 raise QQError( 63 "Host ('main_node') or working directory ('work_dir') are not defined." 64 ) 65 66 # hint for type checker 67 # work_dir and main_node must be set - we check that in self.hasDestination 68 assert self._work_dir and self._main_node 69 70 if files: 71 logger.info( 72 f"Fetching file{'s' if len(files) > 1 else ''} '{' '.join(files)}' from job's working directory to input directory." 73 ) 74 self._batch_system.sync_selected( 75 self._work_dir, 76 self._informer.info.input_dir, 77 self._main_node, 78 None, 79 [self._work_dir / x for x in files], 80 ) 81 else: 82 logger.info( 83 "Fetching all files from job's working directory to input directory." 84 ) 85 self._batch_system.sync_with_exclusions( 86 self._work_dir, self._informer.info.input_dir, self._main_node, None 87 )
Handle synchronization of job files between a remote working directory (on a compute node) and the local input directory.
def
ensure_suitable(self):
18 def ensure_suitable(self): 19 """ 20 Verify that the job is in a state where files can be fetched from its working directory. 21 22 Raises: 23 QQNotSuitableError: If the working directory is not expected to exist 24 or if the working directory is the input directory. 25 """ 26 if self._work_dir_is_input_dir(): 27 raise QQNotSuitableError( 28 "Working directory of the job is the input directory of the job: implicitly synchronized." 29 ) 30 31 if self._is_synchronized(): 32 raise QQNotSuitableError( 33 "Job has been completed and was synchronized: working directory no longer exists." 34 ) 35 36 # killed jobs may not have working directory 37 if self._is_killed() and not self.has_destination(): 38 raise QQNotSuitableError( 39 "Job has been killed and no working directory is available." 40 ) 41 42 # queued jobs do not have working directory 43 if self._is_queued(): 44 raise QQNotSuitableError("Job is queued or booting: nothing to sync.")
Verify that the job is in a state where files can be fetched from its working directory.
Raises:
- QQNotSuitableError: If the working directory is not expected to exist or if the working directory is the input directory.
def
sync(self, files: list[str] | None = None) -> None:
46 def sync(self, files: list[str] | None = None) -> None: 47 """ 48 Synchronize files from the remote working directory to the local input directory. 49 50 Args: 51 files (list[str] | None): Optional list of specific filenames to fetch. 52 If omitted, all files are synchronized except those excluded by the batch system. 53 54 Behavior: 55 - If `files` is provided, only those specific files are copied. 56 - If omitted, the entire working directory is synchronized. 57 58 Raises: 59 QQError: If the job's destination (host or working directory) cannot be determined. 60 """ 61 if not self.has_destination(): 62 raise QQError( 63 "Host ('main_node') or working directory ('work_dir') are not defined." 64 ) 65 66 # hint for type checker 67 # work_dir and main_node must be set - we check that in self.hasDestination 68 assert self._work_dir and self._main_node 69 70 if files: 71 logger.info( 72 f"Fetching file{'s' if len(files) > 1 else ''} '{' '.join(files)}' from job's working directory to input directory." 73 ) 74 self._batch_system.sync_selected( 75 self._work_dir, 76 self._informer.info.input_dir, 77 self._main_node, 78 None, 79 [self._work_dir / x for x in files], 80 ) 81 else: 82 logger.info( 83 "Fetching all files from job's working directory to input directory." 84 ) 85 self._batch_system.sync_with_exclusions( 86 self._work_dir, self._informer.info.input_dir, self._main_node, None 87 )
Synchronize files from the remote working directory to the local input directory.
Arguments:
- files (list[str] | None): Optional list of specific filenames to fetch. If omitted, all files are synchronized except those excluded by the batch system.
Behavior:
- If
filesis provided, only those specific files are copied.- If omitted, the entire working directory is synchronized.
Raises:
- QQError: If the job's destination (host or working directory) cannot be determined.