qq_lib.core.command_runner
1# Released under MIT License. 2# Copyright (c) 2025-2026 Ladislav Bartos and Robert Vacha Lab 3 4 5import getpass 6import logging 7import sys 8import threading 9from collections.abc import Callable 10from concurrent.futures import ThreadPoolExecutor 11from pathlib import Path 12from typing import Any, NoReturn, Self 13 14from qq_lib.batch.interface import BatchInterface, BatchJobInterface 15from qq_lib.core.common import get_info_files, translate_server 16from qq_lib.core.config import CFG 17from qq_lib.core.error import QQError 18from qq_lib.info import Informer 19 20 21class CommandRunner: 22 """ 23 Runs a job operation against one or more qq jobs. 24 25 Resolves and prepares informers in parallel using a thread pool, then 26 executes the callback serially on the main thread in the original order, 27 starting as soon as the next-in-order preparation completes. 28 29 All exceptions are caught internally and converted to `sys.exit` calls. 30 Specific exception types can be handled gracefully via `on_exception`. 31 32 Attributes: 33 n_jobs (int): The total number of jobs to run. 34 encountered_errors (dict[int, Exception]): A dictionary mapping 35 job indices to exceptions encountered during preparation or execution. 36 """ 37 38 def __init__( 39 self, 40 job_ids: tuple[str, ...], 41 directories: tuple[Path, ...], 42 all: bool, 43 server: str | None, 44 callback: Callable, 45 logger: logging.Logger, 46 *args: Any, 47 n_threads: int = 1, 48 **kwargs: Any, 49 ): 50 """ 51 Initialize a CommandRunner. 52 53 Args: 54 job_ids (tuple[str, ...]): List of batch job IDs to operate on. 55 directories (tuple[Path, ...]): List of directories to search for qq jobs in. 56 If `job_ids` and `directories` are both empty and `all` is `False`, the current directory is searched for jobs. 57 all (bool): Collect all qq jobs for the given user on the given server. 58 server (str | None): The batch server to collect qq jobs from. If `None`, the local batch server is used. 59 The server can be specified as a shortcut. 60 callback (Callable): The operation to perform on each resolved Informer. 61 The informer is passed as the first argument, followed by `*args` and `**kwargs`. 62 logger (logging.Logger): Logger instance used for error and critical messages. 63 Should be the module-level logger of the calling CLI module. 64 *args (Any): Additional positional arguments forwarded to `callback`. 65 n_threads (int): Number of threads for parallel informer resolution. Defaults to 1 (serial). 66 **kwargs (Any): Additional keyword arguments forwarded to `callback`. 67 """ 68 self._job_ids = job_ids 69 self._directories = directories 70 self._all = all 71 self._user = getpass.getuser() 72 self._server = translate_server(server) if server else None 73 if not self._job_ids and not self._directories and not all: 74 self._directories = [Path.cwd()] 75 76 self._callback = callback 77 self._args = args 78 self._kwargs = kwargs 79 80 self._batch_system = BatchInterface.from_env_var_or_guess() 81 self._logger = logger 82 self._exception_handlers: dict[type[Exception], Callable] = {} 83 self._n_threads = n_threads 84 85 self.n_jobs = 0 86 self.current_iteration = 0 87 self.encountered_errors: dict[int, Exception] = {} 88 89 def on_exception(self, exc_type: type[Exception], handler: Callable) -> Self: 90 """ 91 Register an exception handler for a specific exception type. 92 93 Registered handlers are invoked when the callback or the preparation 94 step raises the given exception type. Unregistered exception types 95 propagate up and cause the process to exit. 96 97 Args: 98 exc_type (type[Exception]): The exception type to handle. 99 handler (Callable): Function to call when `exc_type` is raised. 100 Must accept two arguments: the exception instance and 101 a reference to this `CommandRunner`. 102 103 Returns: 104 Self for chaining. 105 """ 106 self._exception_handlers[exc_type] = handler 107 return self 108 109 def run(self) -> NoReturn: 110 """ 111 Resolve all jobs, execute the callback for each, and exit the process. 112 113 Resolves informers from job IDs or info files in the target directory, 114 prepares them in parallel, and executes the registered callback for each 115 job in the original order. Registered exception handlers are invoked for 116 known error types; all other exceptions cause the process to exit. 117 118 This method never returns. It always terminates with `sys.exit`: 119 - Exit code 0 on success. 120 - Exit code `CFG.exit_codes.default` on `QQError`. 121 - Exit code `CFG.exit_codes.unexpected_error` on any other exception. 122 """ 123 try: 124 targets = self._build_targets() 125 self._run_pipeline(targets) 126 sys.exit(0) 127 except QQError as e: 128 self._logger.error(e) 129 sys.exit(CFG.exit_codes.default) 130 except Exception as e: 131 self._logger.critical(e, exc_info=True, stack_info=True) 132 sys.exit(CFG.exit_codes.unexpected_error) 133 134 def _build_targets(self) -> list[Callable[[], Informer]]: 135 """ 136 Build a list of callables that each resolve and prepare one Informer. 137 138 If job IDs were provided, each target resolves via `Informer.from_job_id`. 139 Otherwise, the specified (or current) directory is searched for qq info files 140 and each target resolves via `Informer.from_file`. 141 142 Each target then also reloads the informer's batch info. 143 144 Returns: 145 list[Callable[[], Informer]]: One callable per job. 146 147 Raises: 148 QQError: If no job IDs were given and no info files were found in the current directory. 149 """ 150 if self._server and not self._all: 151 self._logger.warning( 152 "Server is only used with --all. Ignoring server option." 153 ) 154 self._server = None 155 156 targets: list[Callable[[], Informer]] = [] 157 158 batch_jobs = [] 159 if self._job_ids: 160 batch_jobs.extend( 161 self._batch_system.get_batch_jobs_from_ids(list(self._job_ids)) 162 ) 163 if self._all: 164 batch_jobs.extend( 165 self._batch_system.get_unfinished_batch_jobs(self._user, self._server) 166 ) 167 168 if batch_jobs: 169 targets.extend(self._build_targets_from_batch_jobs(batch_jobs)) 170 171 if self._directories: 172 targets.extend(self._build_targets_from_files(list(self._directories))) 173 174 if not targets: 175 if not self._job_ids and not self._all: 176 raise QQError("No qq job info file found.") 177 raise QQError("No jobs found.") 178 179 return targets 180 181 def _build_targets_from_batch_jobs( 182 self, 183 jobs: list[BatchJobInterface], 184 ) -> list[Callable[[], Informer]]: 185 """ 186 Build preparation targets from a list of batch jobs. 187 188 Args: 189 jobs (list[BatchJobInterface]): The batch jobs to resolve. 190 191 Returns: 192 list[Callable[[], Informer]]: Per-job preparation callables. 193 """ 194 return [lambda bj=batch_job: Informer.from_batch_job(bj) for batch_job in jobs] 195 196 def _build_targets_from_files( 197 self, 198 directories: list[Path], 199 ) -> list[Callable[[], Informer]]: 200 """ 201 Build preparation targets from qq info files across multiple directories. 202 203 Directories are searched in order. Each target loads an `Informer` 204 from a file and queries the batch system for its info individually. 205 206 Args: 207 directories (list[Path]): Directories to search for info files. 208 209 Returns: 210 list[Callable[[], Informer]]: Per-job preparation callables. 211 """ 212 213 def _resolve_and_prepare(path: Path) -> Informer: 214 informer = Informer.from_file(path) 215 informer.load_batch_info() 216 return informer 217 218 info_files: list[Path] = [] 219 for directory in directories: 220 info_files.extend(get_info_files(directory)) 221 222 return [lambda f=info_file: _resolve_and_prepare(f) for info_file in info_files] 223 224 def _run_pipeline(self, targets: list[Callable[[], Informer]]) -> None: 225 """ 226 Run the prepare-then-execute pipeline. 227 228 Submits all targets to a thread pool for parallel preparation. 229 The main thread waits on a condition variable and executes each 230 job's callback as soon as it is the next in order, ensuring output 231 and side effects follow the original job order. 232 233 Failed preparations and executions are passed to `_handle_error`. 234 235 Args: 236 targets: List of callables that each resolve and prepare one Informer. 237 """ 238 self.n_jobs = len(targets) 239 results: list[Informer | Exception | None] = [None] * self.n_jobs 240 lock = threading.Lock() 241 ready = threading.Condition(lock) 242 243 def prepare(index: int, target: Callable[[], Informer]) -> None: 244 """ 245 Execute a single target and store its result or exception. 246 247 Notifies the main thread upon completion so it can check whether 248 the next-in-order result is available. 249 250 Args: 251 index (int): The position of this target in the original order. 252 target (Callable[[], Informer]): The callable to execute. 253 """ 254 try: 255 result: Informer | Exception = target() 256 except Exception as e: 257 result = e 258 259 with ready: 260 results[index] = result 261 ready.notify() 262 263 with ThreadPoolExecutor(max_workers=self._n_threads) as executor: 264 # submit all preparation tasks to the thread pool 265 for i, target in enumerate(targets): 266 executor.submit(prepare, i, target) 267 268 # process results in order on the main thread 269 next_index = 0 270 with ready: 271 while next_index < self.n_jobs: 272 # block the main thread until the next result is available 273 # later jobs may finish first, but we wait for the one we need 274 while results[next_index] is None: 275 ready.wait() 276 277 self.current_iteration += 1 278 result = results[next_index] 279 next_index += 1 280 281 # handle error or execute the callback 282 if isinstance(result, Exception): 283 self._handle_error(result) 284 elif isinstance(result, Informer): 285 self._execute(result) 286 else: 287 raise ValueError( 288 f"Unexpected result type: {type(result)}. This is a bug, please report it." 289 ) 290 291 def _execute(self, informer: Informer) -> None: 292 """ 293 Run the callback on a prepared informer. 294 295 Args: 296 informer (Informer): A resolved and prepared Informer. 297 """ 298 try: 299 self._callback(informer, *self._args, **self._kwargs) 300 except tuple(self._exception_handlers.keys()) as e: 301 self._handle_error(e) 302 303 def _handle_error(self, error: Exception) -> None: 304 """ 305 Handle an exception using registered handlers, or re-raise. 306 307 Args: 308 error (Exception): The exception to handle. 309 310 Raises: 311 Exception: If no handler is registered for the exception type. 312 """ 313 self.encountered_errors[self.current_iteration] = error 314 handler = self._exception_handlers.get(type(error)) 315 if handler: 316 handler(error, self) 317 else: 318 raise error
22class CommandRunner: 23 """ 24 Runs a job operation against one or more qq jobs. 25 26 Resolves and prepares informers in parallel using a thread pool, then 27 executes the callback serially on the main thread in the original order, 28 starting as soon as the next-in-order preparation completes. 29 30 All exceptions are caught internally and converted to `sys.exit` calls. 31 Specific exception types can be handled gracefully via `on_exception`. 32 33 Attributes: 34 n_jobs (int): The total number of jobs to run. 35 encountered_errors (dict[int, Exception]): A dictionary mapping 36 job indices to exceptions encountered during preparation or execution. 37 """ 38 39 def __init__( 40 self, 41 job_ids: tuple[str, ...], 42 directories: tuple[Path, ...], 43 all: bool, 44 server: str | None, 45 callback: Callable, 46 logger: logging.Logger, 47 *args: Any, 48 n_threads: int = 1, 49 **kwargs: Any, 50 ): 51 """ 52 Initialize a CommandRunner. 53 54 Args: 55 job_ids (tuple[str, ...]): List of batch job IDs to operate on. 56 directories (tuple[Path, ...]): List of directories to search for qq jobs in. 57 If `job_ids` and `directories` are both empty and `all` is `False`, the current directory is searched for jobs. 58 all (bool): Collect all qq jobs for the given user on the given server. 59 server (str | None): The batch server to collect qq jobs from. If `None`, the local batch server is used. 60 The server can be specified as a shortcut. 61 callback (Callable): The operation to perform on each resolved Informer. 62 The informer is passed as the first argument, followed by `*args` and `**kwargs`. 63 logger (logging.Logger): Logger instance used for error and critical messages. 64 Should be the module-level logger of the calling CLI module. 65 *args (Any): Additional positional arguments forwarded to `callback`. 66 n_threads (int): Number of threads for parallel informer resolution. Defaults to 1 (serial). 67 **kwargs (Any): Additional keyword arguments forwarded to `callback`. 68 """ 69 self._job_ids = job_ids 70 self._directories = directories 71 self._all = all 72 self._user = getpass.getuser() 73 self._server = translate_server(server) if server else None 74 if not self._job_ids and not self._directories and not all: 75 self._directories = [Path.cwd()] 76 77 self._callback = callback 78 self._args = args 79 self._kwargs = kwargs 80 81 self._batch_system = BatchInterface.from_env_var_or_guess() 82 self._logger = logger 83 self._exception_handlers: dict[type[Exception], Callable] = {} 84 self._n_threads = n_threads 85 86 self.n_jobs = 0 87 self.current_iteration = 0 88 self.encountered_errors: dict[int, Exception] = {} 89 90 def on_exception(self, exc_type: type[Exception], handler: Callable) -> Self: 91 """ 92 Register an exception handler for a specific exception type. 93 94 Registered handlers are invoked when the callback or the preparation 95 step raises the given exception type. Unregistered exception types 96 propagate up and cause the process to exit. 97 98 Args: 99 exc_type (type[Exception]): The exception type to handle. 100 handler (Callable): Function to call when `exc_type` is raised. 101 Must accept two arguments: the exception instance and 102 a reference to this `CommandRunner`. 103 104 Returns: 105 Self for chaining. 106 """ 107 self._exception_handlers[exc_type] = handler 108 return self 109 110 def run(self) -> NoReturn: 111 """ 112 Resolve all jobs, execute the callback for each, and exit the process. 113 114 Resolves informers from job IDs or info files in the target directory, 115 prepares them in parallel, and executes the registered callback for each 116 job in the original order. Registered exception handlers are invoked for 117 known error types; all other exceptions cause the process to exit. 118 119 This method never returns. It always terminates with `sys.exit`: 120 - Exit code 0 on success. 121 - Exit code `CFG.exit_codes.default` on `QQError`. 122 - Exit code `CFG.exit_codes.unexpected_error` on any other exception. 123 """ 124 try: 125 targets = self._build_targets() 126 self._run_pipeline(targets) 127 sys.exit(0) 128 except QQError as e: 129 self._logger.error(e) 130 sys.exit(CFG.exit_codes.default) 131 except Exception as e: 132 self._logger.critical(e, exc_info=True, stack_info=True) 133 sys.exit(CFG.exit_codes.unexpected_error) 134 135 def _build_targets(self) -> list[Callable[[], Informer]]: 136 """ 137 Build a list of callables that each resolve and prepare one Informer. 138 139 If job IDs were provided, each target resolves via `Informer.from_job_id`. 140 Otherwise, the specified (or current) directory is searched for qq info files 141 and each target resolves via `Informer.from_file`. 142 143 Each target then also reloads the informer's batch info. 144 145 Returns: 146 list[Callable[[], Informer]]: One callable per job. 147 148 Raises: 149 QQError: If no job IDs were given and no info files were found in the current directory. 150 """ 151 if self._server and not self._all: 152 self._logger.warning( 153 "Server is only used with --all. Ignoring server option." 154 ) 155 self._server = None 156 157 targets: list[Callable[[], Informer]] = [] 158 159 batch_jobs = [] 160 if self._job_ids: 161 batch_jobs.extend( 162 self._batch_system.get_batch_jobs_from_ids(list(self._job_ids)) 163 ) 164 if self._all: 165 batch_jobs.extend( 166 self._batch_system.get_unfinished_batch_jobs(self._user, self._server) 167 ) 168 169 if batch_jobs: 170 targets.extend(self._build_targets_from_batch_jobs(batch_jobs)) 171 172 if self._directories: 173 targets.extend(self._build_targets_from_files(list(self._directories))) 174 175 if not targets: 176 if not self._job_ids and not self._all: 177 raise QQError("No qq job info file found.") 178 raise QQError("No jobs found.") 179 180 return targets 181 182 def _build_targets_from_batch_jobs( 183 self, 184 jobs: list[BatchJobInterface], 185 ) -> list[Callable[[], Informer]]: 186 """ 187 Build preparation targets from a list of batch jobs. 188 189 Args: 190 jobs (list[BatchJobInterface]): The batch jobs to resolve. 191 192 Returns: 193 list[Callable[[], Informer]]: Per-job preparation callables. 194 """ 195 return [lambda bj=batch_job: Informer.from_batch_job(bj) for batch_job in jobs] 196 197 def _build_targets_from_files( 198 self, 199 directories: list[Path], 200 ) -> list[Callable[[], Informer]]: 201 """ 202 Build preparation targets from qq info files across multiple directories. 203 204 Directories are searched in order. Each target loads an `Informer` 205 from a file and queries the batch system for its info individually. 206 207 Args: 208 directories (list[Path]): Directories to search for info files. 209 210 Returns: 211 list[Callable[[], Informer]]: Per-job preparation callables. 212 """ 213 214 def _resolve_and_prepare(path: Path) -> Informer: 215 informer = Informer.from_file(path) 216 informer.load_batch_info() 217 return informer 218 219 info_files: list[Path] = [] 220 for directory in directories: 221 info_files.extend(get_info_files(directory)) 222 223 return [lambda f=info_file: _resolve_and_prepare(f) for info_file in info_files] 224 225 def _run_pipeline(self, targets: list[Callable[[], Informer]]) -> None: 226 """ 227 Run the prepare-then-execute pipeline. 228 229 Submits all targets to a thread pool for parallel preparation. 230 The main thread waits on a condition variable and executes each 231 job's callback as soon as it is the next in order, ensuring output 232 and side effects follow the original job order. 233 234 Failed preparations and executions are passed to `_handle_error`. 235 236 Args: 237 targets: List of callables that each resolve and prepare one Informer. 238 """ 239 self.n_jobs = len(targets) 240 results: list[Informer | Exception | None] = [None] * self.n_jobs 241 lock = threading.Lock() 242 ready = threading.Condition(lock) 243 244 def prepare(index: int, target: Callable[[], Informer]) -> None: 245 """ 246 Execute a single target and store its result or exception. 247 248 Notifies the main thread upon completion so it can check whether 249 the next-in-order result is available. 250 251 Args: 252 index (int): The position of this target in the original order. 253 target (Callable[[], Informer]): The callable to execute. 254 """ 255 try: 256 result: Informer | Exception = target() 257 except Exception as e: 258 result = e 259 260 with ready: 261 results[index] = result 262 ready.notify() 263 264 with ThreadPoolExecutor(max_workers=self._n_threads) as executor: 265 # submit all preparation tasks to the thread pool 266 for i, target in enumerate(targets): 267 executor.submit(prepare, i, target) 268 269 # process results in order on the main thread 270 next_index = 0 271 with ready: 272 while next_index < self.n_jobs: 273 # block the main thread until the next result is available 274 # later jobs may finish first, but we wait for the one we need 275 while results[next_index] is None: 276 ready.wait() 277 278 self.current_iteration += 1 279 result = results[next_index] 280 next_index += 1 281 282 # handle error or execute the callback 283 if isinstance(result, Exception): 284 self._handle_error(result) 285 elif isinstance(result, Informer): 286 self._execute(result) 287 else: 288 raise ValueError( 289 f"Unexpected result type: {type(result)}. This is a bug, please report it." 290 ) 291 292 def _execute(self, informer: Informer) -> None: 293 """ 294 Run the callback on a prepared informer. 295 296 Args: 297 informer (Informer): A resolved and prepared Informer. 298 """ 299 try: 300 self._callback(informer, *self._args, **self._kwargs) 301 except tuple(self._exception_handlers.keys()) as e: 302 self._handle_error(e) 303 304 def _handle_error(self, error: Exception) -> None: 305 """ 306 Handle an exception using registered handlers, or re-raise. 307 308 Args: 309 error (Exception): The exception to handle. 310 311 Raises: 312 Exception: If no handler is registered for the exception type. 313 """ 314 self.encountered_errors[self.current_iteration] = error 315 handler = self._exception_handlers.get(type(error)) 316 if handler: 317 handler(error, self) 318 else: 319 raise error
Runs a job operation against one or more qq jobs.
Resolves and prepares informers in parallel using a thread pool, then executes the callback serially on the main thread in the original order, starting as soon as the next-in-order preparation completes.
All exceptions are caught internally and converted to sys.exit calls.
Specific exception types can be handled gracefully via on_exception.
Attributes:
- n_jobs (int): The total number of jobs to run.
- encountered_errors (dict[int, Exception]): A dictionary mapping job indices to exceptions encountered during preparation or execution.
39 def __init__( 40 self, 41 job_ids: tuple[str, ...], 42 directories: tuple[Path, ...], 43 all: bool, 44 server: str | None, 45 callback: Callable, 46 logger: logging.Logger, 47 *args: Any, 48 n_threads: int = 1, 49 **kwargs: Any, 50 ): 51 """ 52 Initialize a CommandRunner. 53 54 Args: 55 job_ids (tuple[str, ...]): List of batch job IDs to operate on. 56 directories (tuple[Path, ...]): List of directories to search for qq jobs in. 57 If `job_ids` and `directories` are both empty and `all` is `False`, the current directory is searched for jobs. 58 all (bool): Collect all qq jobs for the given user on the given server. 59 server (str | None): The batch server to collect qq jobs from. If `None`, the local batch server is used. 60 The server can be specified as a shortcut. 61 callback (Callable): The operation to perform on each resolved Informer. 62 The informer is passed as the first argument, followed by `*args` and `**kwargs`. 63 logger (logging.Logger): Logger instance used for error and critical messages. 64 Should be the module-level logger of the calling CLI module. 65 *args (Any): Additional positional arguments forwarded to `callback`. 66 n_threads (int): Number of threads for parallel informer resolution. Defaults to 1 (serial). 67 **kwargs (Any): Additional keyword arguments forwarded to `callback`. 68 """ 69 self._job_ids = job_ids 70 self._directories = directories 71 self._all = all 72 self._user = getpass.getuser() 73 self._server = translate_server(server) if server else None 74 if not self._job_ids and not self._directories and not all: 75 self._directories = [Path.cwd()] 76 77 self._callback = callback 78 self._args = args 79 self._kwargs = kwargs 80 81 self._batch_system = BatchInterface.from_env_var_or_guess() 82 self._logger = logger 83 self._exception_handlers: dict[type[Exception], Callable] = {} 84 self._n_threads = n_threads 85 86 self.n_jobs = 0 87 self.current_iteration = 0 88 self.encountered_errors: dict[int, Exception] = {}
Initialize a CommandRunner.
Arguments:
- job_ids (tuple[str, ...]): List of batch job IDs to operate on.
- directories (tuple[Path, ...]): List of directories to search for qq jobs in.
If
job_idsanddirectoriesare both empty andallisFalse, the current directory is searched for jobs. - all (bool): Collect all qq jobs for the given user on the given server.
- server (str | None): The batch server to collect qq jobs from. If
None, the local batch server is used. The server can be specified as a shortcut. - callback (Callable): The operation to perform on each resolved Informer.
The informer is passed as the first argument, followed by
*argsand**kwargs. - logger (logging.Logger): Logger instance used for error and critical messages. Should be the module-level logger of the calling CLI module.
- *args (Any): Additional positional arguments forwarded to
callback. - n_threads (int): Number of threads for parallel informer resolution. Defaults to 1 (serial).
- **kwargs (Any): Additional keyword arguments forwarded to
callback.
90 def on_exception(self, exc_type: type[Exception], handler: Callable) -> Self: 91 """ 92 Register an exception handler for a specific exception type. 93 94 Registered handlers are invoked when the callback or the preparation 95 step raises the given exception type. Unregistered exception types 96 propagate up and cause the process to exit. 97 98 Args: 99 exc_type (type[Exception]): The exception type to handle. 100 handler (Callable): Function to call when `exc_type` is raised. 101 Must accept two arguments: the exception instance and 102 a reference to this `CommandRunner`. 103 104 Returns: 105 Self for chaining. 106 """ 107 self._exception_handlers[exc_type] = handler 108 return self
Register an exception handler for a specific exception type.
Registered handlers are invoked when the callback or the preparation step raises the given exception type. Unregistered exception types propagate up and cause the process to exit.
Arguments:
- exc_type (type[Exception]): The exception type to handle.
- handler (Callable): Function to call when
exc_typeis raised. Must accept two arguments: the exception instance and a reference to thisCommandRunner.
Returns:
Self for chaining.
110 def run(self) -> NoReturn: 111 """ 112 Resolve all jobs, execute the callback for each, and exit the process. 113 114 Resolves informers from job IDs or info files in the target directory, 115 prepares them in parallel, and executes the registered callback for each 116 job in the original order. Registered exception handlers are invoked for 117 known error types; all other exceptions cause the process to exit. 118 119 This method never returns. It always terminates with `sys.exit`: 120 - Exit code 0 on success. 121 - Exit code `CFG.exit_codes.default` on `QQError`. 122 - Exit code `CFG.exit_codes.unexpected_error` on any other exception. 123 """ 124 try: 125 targets = self._build_targets() 126 self._run_pipeline(targets) 127 sys.exit(0) 128 except QQError as e: 129 self._logger.error(e) 130 sys.exit(CFG.exit_codes.default) 131 except Exception as e: 132 self._logger.critical(e, exc_info=True, stack_info=True) 133 sys.exit(CFG.exit_codes.unexpected_error)
Resolve all jobs, execute the callback for each, and exit the process.
Resolves informers from job IDs or info files in the target directory, prepares them in parallel, and executes the registered callback for each job in the original order. Registered exception handlers are invoked for known error types; all other exceptions cause the process to exit.
This method never returns. It always terminates with sys.exit:
- Exit code 0 on success.
- Exit code CFG.exit_codes.default on QQError.
- Exit code CFG.exit_codes.unexpected_error on any other exception.