qq_lib.core.command_runner

  1# Released under MIT License.
  2# Copyright (c) 2025-2026 Ladislav Bartos and Robert Vacha Lab
  3
  4
  5import getpass
  6import logging
  7import sys
  8import threading
  9from collections.abc import Callable
 10from concurrent.futures import ThreadPoolExecutor
 11from pathlib import Path
 12from typing import Any, NoReturn, Self
 13
 14from qq_lib.batch.interface import BatchInterface, BatchJobInterface
 15from qq_lib.core.common import get_info_files, translate_server
 16from qq_lib.core.config import CFG
 17from qq_lib.core.error import QQError
 18from qq_lib.info import Informer
 19
 20
 21class CommandRunner:
 22    """
 23    Runs a job operation against one or more qq jobs.
 24
 25    Resolves and prepares informers in parallel using a thread pool, then
 26    executes the callback serially on the main thread in the original order,
 27    starting as soon as the next-in-order preparation completes.
 28
 29    All exceptions are caught internally and converted to `sys.exit` calls.
 30    Specific exception types can be handled gracefully via `on_exception`.
 31
 32    Attributes:
 33        n_jobs (int): The total number of jobs to run.
 34        encountered_errors (dict[int, Exception]): A dictionary mapping
 35            job indices to exceptions encountered during preparation or execution.
 36    """
 37
 38    def __init__(
 39        self,
 40        job_ids: tuple[str, ...],
 41        directories: tuple[Path, ...],
 42        all: bool,
 43        server: str | None,
 44        callback: Callable,
 45        logger: logging.Logger,
 46        *args: Any,
 47        n_threads: int = 1,
 48        **kwargs: Any,
 49    ):
 50        """
 51        Initialize a CommandRunner.
 52
 53        Args:
 54            job_ids (tuple[str, ...]): List of batch job IDs to operate on.
 55            directories (tuple[Path, ...]): List of directories to search for qq jobs in.
 56                If `job_ids` and `directories` are both empty and `all` is `False`, the current directory is searched for jobs.
 57            all (bool): Collect all qq jobs for the given user on the given server.
 58            server (str | None): The batch server to collect qq jobs from. If `None`, the local batch server is used.
 59                The server can be specified as a shortcut.
 60            callback (Callable): The operation to perform on each resolved Informer.
 61                The informer is passed as the first argument, followed by `*args` and `**kwargs`.
 62            logger (logging.Logger): Logger instance used for error and critical messages.
 63                Should be the module-level logger of the calling CLI module.
 64            *args (Any): Additional positional arguments forwarded to `callback`.
 65            n_threads (int): Number of threads for parallel informer resolution. Defaults to 1 (serial).
 66            **kwargs (Any): Additional keyword arguments forwarded to `callback`.
 67        """
 68        self._job_ids = job_ids
 69        self._directories = directories
 70        self._all = all
 71        self._user = getpass.getuser()
 72        self._server = translate_server(server) if server else None
 73        if not self._job_ids and not self._directories and not all:
 74            self._directories = [Path.cwd()]
 75
 76        self._callback = callback
 77        self._args = args
 78        self._kwargs = kwargs
 79
 80        self._batch_system = BatchInterface.from_env_var_or_guess()
 81        self._logger = logger
 82        self._exception_handlers: dict[type[Exception], Callable] = {}
 83        self._n_threads = n_threads
 84
 85        self.n_jobs = 0
 86        self.current_iteration = 0
 87        self.encountered_errors: dict[int, Exception] = {}
 88
 89    def on_exception(self, exc_type: type[Exception], handler: Callable) -> Self:
 90        """
 91        Register an exception handler for a specific exception type.
 92
 93        Registered handlers are invoked when the callback or the preparation
 94        step raises the given exception type. Unregistered exception types
 95        propagate up and cause the process to exit.
 96
 97        Args:
 98            exc_type (type[Exception]): The exception type to handle.
 99            handler (Callable): Function to call when `exc_type` is raised.
100                Must accept two arguments: the exception instance and
101                a reference to this `CommandRunner`.
102
103        Returns:
104            Self for chaining.
105        """
106        self._exception_handlers[exc_type] = handler
107        return self
108
109    def run(self) -> NoReturn:
110        """
111        Resolve all jobs, execute the callback for each, and exit the process.
112
113        Resolves informers from job IDs or info files in the target directory,
114        prepares them in parallel, and executes the registered callback for each
115        job in the original order. Registered exception handlers are invoked for
116        known error types; all other exceptions cause the process to exit.
117
118        This method never returns. It always terminates with `sys.exit`:
119            - Exit code 0 on success.
120            - Exit code `CFG.exit_codes.default` on `QQError`.
121            - Exit code `CFG.exit_codes.unexpected_error` on any other exception.
122        """
123        try:
124            targets = self._build_targets()
125            self._run_pipeline(targets)
126            sys.exit(0)
127        except QQError as e:
128            self._logger.error(e)
129            sys.exit(CFG.exit_codes.default)
130        except Exception as e:
131            self._logger.critical(e, exc_info=True, stack_info=True)
132            sys.exit(CFG.exit_codes.unexpected_error)
133
134    def _build_targets(self) -> list[Callable[[], Informer]]:
135        """
136        Build a list of callables that each resolve and prepare one Informer.
137
138        If job IDs were provided, each target resolves via `Informer.from_job_id`.
139        Otherwise, the specified (or current) directory is searched for qq info files
140        and each target resolves via `Informer.from_file`.
141
142        Each target then also reloads the informer's batch info.
143
144        Returns:
145            list[Callable[[], Informer]]: One callable per job.
146
147        Raises:
148            QQError: If no job IDs were given and no info files were found in the current directory.
149        """
150        if self._server and not self._all:
151            self._logger.warning(
152                "Server is only used with --all. Ignoring server option."
153            )
154            self._server = None
155
156        targets: list[Callable[[], Informer]] = []
157
158        batch_jobs = []
159        if self._job_ids:
160            batch_jobs.extend(
161                self._batch_system.get_batch_jobs_from_ids(list(self._job_ids))
162            )
163        if self._all:
164            batch_jobs.extend(
165                self._batch_system.get_unfinished_batch_jobs(self._user, self._server)
166            )
167
168        if batch_jobs:
169            targets.extend(self._build_targets_from_batch_jobs(batch_jobs))
170
171        if self._directories:
172            targets.extend(self._build_targets_from_files(list(self._directories)))
173
174        if not targets:
175            if not self._job_ids and not self._all:
176                raise QQError("No qq job info file found.")
177            raise QQError("No jobs found.")
178
179        return targets
180
181    def _build_targets_from_batch_jobs(
182        self,
183        jobs: list[BatchJobInterface],
184    ) -> list[Callable[[], Informer]]:
185        """
186        Build preparation targets from a list of batch jobs.
187
188        Args:
189            jobs (list[BatchJobInterface]): The batch jobs to resolve.
190
191        Returns:
192            list[Callable[[], Informer]]: Per-job preparation callables.
193        """
194        return [lambda bj=batch_job: Informer.from_batch_job(bj) for batch_job in jobs]
195
196    def _build_targets_from_files(
197        self,
198        directories: list[Path],
199    ) -> list[Callable[[], Informer]]:
200        """
201        Build preparation targets from qq info files across multiple directories.
202
203        Directories are searched in order. Each target loads an `Informer`
204        from a file and queries the batch system for its info individually.
205
206        Args:
207            directories (list[Path]): Directories to search for info files.
208
209        Returns:
210            list[Callable[[], Informer]]: Per-job preparation callables.
211        """
212
213        def _resolve_and_prepare(path: Path) -> Informer:
214            informer = Informer.from_file(path)
215            informer.load_batch_info()
216            return informer
217
218        info_files: list[Path] = []
219        for directory in directories:
220            info_files.extend(get_info_files(directory))
221
222        return [lambda f=info_file: _resolve_and_prepare(f) for info_file in info_files]
223
224    def _run_pipeline(self, targets: list[Callable[[], Informer]]) -> None:
225        """
226        Run the prepare-then-execute pipeline.
227
228        Submits all targets to a thread pool for parallel preparation.
229        The main thread waits on a condition variable and executes each
230        job's callback as soon as it is the next in order, ensuring output
231        and side effects follow the original job order.
232
233        Failed preparations and executions are passed to `_handle_error`.
234
235        Args:
236            targets: List of callables that each resolve and prepare one Informer.
237        """
238        self.n_jobs = len(targets)
239        results: list[Informer | Exception | None] = [None] * self.n_jobs
240        lock = threading.Lock()
241        ready = threading.Condition(lock)
242
243        def prepare(index: int, target: Callable[[], Informer]) -> None:
244            """
245            Execute a single target and store its result or exception.
246
247            Notifies the main thread upon completion so it can check whether
248            the next-in-order result is available.
249
250            Args:
251                index (int): The position of this target in the original order.
252                target (Callable[[], Informer]): The callable to execute.
253            """
254            try:
255                result: Informer | Exception = target()
256            except Exception as e:
257                result = e
258
259            with ready:
260                results[index] = result
261                ready.notify()
262
263        with ThreadPoolExecutor(max_workers=self._n_threads) as executor:
264            # submit all preparation tasks to the thread pool
265            for i, target in enumerate(targets):
266                executor.submit(prepare, i, target)
267
268            # process results in order on the main thread
269            next_index = 0
270            with ready:
271                while next_index < self.n_jobs:
272                    # block the main thread until the next result is available
273                    # later jobs may finish first, but we wait for the one we need
274                    while results[next_index] is None:
275                        ready.wait()
276
277                    self.current_iteration += 1
278                    result = results[next_index]
279                    next_index += 1
280
281                    # handle error or execute the callback
282                    if isinstance(result, Exception):
283                        self._handle_error(result)
284                    elif isinstance(result, Informer):
285                        self._execute(result)
286                    else:
287                        raise ValueError(
288                            f"Unexpected result type: {type(result)}. This is a bug, please report it."
289                        )
290
291    def _execute(self, informer: Informer) -> None:
292        """
293        Run the callback on a prepared informer.
294
295        Args:
296            informer (Informer): A resolved and prepared Informer.
297        """
298        try:
299            self._callback(informer, *self._args, **self._kwargs)
300        except tuple(self._exception_handlers.keys()) as e:
301            self._handle_error(e)
302
303    def _handle_error(self, error: Exception) -> None:
304        """
305        Handle an exception using registered handlers, or re-raise.
306
307        Args:
308            error (Exception): The exception to handle.
309
310        Raises:
311            Exception: If no handler is registered for the exception type.
312        """
313        self.encountered_errors[self.current_iteration] = error
314        handler = self._exception_handlers.get(type(error))
315        if handler:
316            handler(error, self)
317        else:
318            raise error
class CommandRunner:
 22class CommandRunner:
 23    """
 24    Runs a job operation against one or more qq jobs.
 25
 26    Resolves and prepares informers in parallel using a thread pool, then
 27    executes the callback serially on the main thread in the original order,
 28    starting as soon as the next-in-order preparation completes.
 29
 30    All exceptions are caught internally and converted to `sys.exit` calls.
 31    Specific exception types can be handled gracefully via `on_exception`.
 32
 33    Attributes:
 34        n_jobs (int): The total number of jobs to run.
 35        encountered_errors (dict[int, Exception]): A dictionary mapping
 36            job indices to exceptions encountered during preparation or execution.
 37    """
 38
 39    def __init__(
 40        self,
 41        job_ids: tuple[str, ...],
 42        directories: tuple[Path, ...],
 43        all: bool,
 44        server: str | None,
 45        callback: Callable,
 46        logger: logging.Logger,
 47        *args: Any,
 48        n_threads: int = 1,
 49        **kwargs: Any,
 50    ):
 51        """
 52        Initialize a CommandRunner.
 53
 54        Args:
 55            job_ids (tuple[str, ...]): List of batch job IDs to operate on.
 56            directories (tuple[Path, ...]): List of directories to search for qq jobs in.
 57                If `job_ids` and `directories` are both empty and `all` is `False`, the current directory is searched for jobs.
 58            all (bool): Collect all qq jobs for the given user on the given server.
 59            server (str | None): The batch server to collect qq jobs from. If `None`, the local batch server is used.
 60                The server can be specified as a shortcut.
 61            callback (Callable): The operation to perform on each resolved Informer.
 62                The informer is passed as the first argument, followed by `*args` and `**kwargs`.
 63            logger (logging.Logger): Logger instance used for error and critical messages.
 64                Should be the module-level logger of the calling CLI module.
 65            *args (Any): Additional positional arguments forwarded to `callback`.
 66            n_threads (int): Number of threads for parallel informer resolution. Defaults to 1 (serial).
 67            **kwargs (Any): Additional keyword arguments forwarded to `callback`.
 68        """
 69        self._job_ids = job_ids
 70        self._directories = directories
 71        self._all = all
 72        self._user = getpass.getuser()
 73        self._server = translate_server(server) if server else None
 74        if not self._job_ids and not self._directories and not all:
 75            self._directories = [Path.cwd()]
 76
 77        self._callback = callback
 78        self._args = args
 79        self._kwargs = kwargs
 80
 81        self._batch_system = BatchInterface.from_env_var_or_guess()
 82        self._logger = logger
 83        self._exception_handlers: dict[type[Exception], Callable] = {}
 84        self._n_threads = n_threads
 85
 86        self.n_jobs = 0
 87        self.current_iteration = 0
 88        self.encountered_errors: dict[int, Exception] = {}
 89
 90    def on_exception(self, exc_type: type[Exception], handler: Callable) -> Self:
 91        """
 92        Register an exception handler for a specific exception type.
 93
 94        Registered handlers are invoked when the callback or the preparation
 95        step raises the given exception type. Unregistered exception types
 96        propagate up and cause the process to exit.
 97
 98        Args:
 99            exc_type (type[Exception]): The exception type to handle.
100            handler (Callable): Function to call when `exc_type` is raised.
101                Must accept two arguments: the exception instance and
102                a reference to this `CommandRunner`.
103
104        Returns:
105            Self for chaining.
106        """
107        self._exception_handlers[exc_type] = handler
108        return self
109
110    def run(self) -> NoReturn:
111        """
112        Resolve all jobs, execute the callback for each, and exit the process.
113
114        Resolves informers from job IDs or info files in the target directory,
115        prepares them in parallel, and executes the registered callback for each
116        job in the original order. Registered exception handlers are invoked for
117        known error types; all other exceptions cause the process to exit.
118
119        This method never returns. It always terminates with `sys.exit`:
120            - Exit code 0 on success.
121            - Exit code `CFG.exit_codes.default` on `QQError`.
122            - Exit code `CFG.exit_codes.unexpected_error` on any other exception.
123        """
124        try:
125            targets = self._build_targets()
126            self._run_pipeline(targets)
127            sys.exit(0)
128        except QQError as e:
129            self._logger.error(e)
130            sys.exit(CFG.exit_codes.default)
131        except Exception as e:
132            self._logger.critical(e, exc_info=True, stack_info=True)
133            sys.exit(CFG.exit_codes.unexpected_error)
134
135    def _build_targets(self) -> list[Callable[[], Informer]]:
136        """
137        Build a list of callables that each resolve and prepare one Informer.
138
139        If job IDs were provided, each target resolves via `Informer.from_job_id`.
140        Otherwise, the specified (or current) directory is searched for qq info files
141        and each target resolves via `Informer.from_file`.
142
143        Each target then also reloads the informer's batch info.
144
145        Returns:
146            list[Callable[[], Informer]]: One callable per job.
147
148        Raises:
149            QQError: If no job IDs were given and no info files were found in the current directory.
150        """
151        if self._server and not self._all:
152            self._logger.warning(
153                "Server is only used with --all. Ignoring server option."
154            )
155            self._server = None
156
157        targets: list[Callable[[], Informer]] = []
158
159        batch_jobs = []
160        if self._job_ids:
161            batch_jobs.extend(
162                self._batch_system.get_batch_jobs_from_ids(list(self._job_ids))
163            )
164        if self._all:
165            batch_jobs.extend(
166                self._batch_system.get_unfinished_batch_jobs(self._user, self._server)
167            )
168
169        if batch_jobs:
170            targets.extend(self._build_targets_from_batch_jobs(batch_jobs))
171
172        if self._directories:
173            targets.extend(self._build_targets_from_files(list(self._directories)))
174
175        if not targets:
176            if not self._job_ids and not self._all:
177                raise QQError("No qq job info file found.")
178            raise QQError("No jobs found.")
179
180        return targets
181
182    def _build_targets_from_batch_jobs(
183        self,
184        jobs: list[BatchJobInterface],
185    ) -> list[Callable[[], Informer]]:
186        """
187        Build preparation targets from a list of batch jobs.
188
189        Args:
190            jobs (list[BatchJobInterface]): The batch jobs to resolve.
191
192        Returns:
193            list[Callable[[], Informer]]: Per-job preparation callables.
194        """
195        return [lambda bj=batch_job: Informer.from_batch_job(bj) for batch_job in jobs]
196
197    def _build_targets_from_files(
198        self,
199        directories: list[Path],
200    ) -> list[Callable[[], Informer]]:
201        """
202        Build preparation targets from qq info files across multiple directories.
203
204        Directories are searched in order. Each target loads an `Informer`
205        from a file and queries the batch system for its info individually.
206
207        Args:
208            directories (list[Path]): Directories to search for info files.
209
210        Returns:
211            list[Callable[[], Informer]]: Per-job preparation callables.
212        """
213
214        def _resolve_and_prepare(path: Path) -> Informer:
215            informer = Informer.from_file(path)
216            informer.load_batch_info()
217            return informer
218
219        info_files: list[Path] = []
220        for directory in directories:
221            info_files.extend(get_info_files(directory))
222
223        return [lambda f=info_file: _resolve_and_prepare(f) for info_file in info_files]
224
225    def _run_pipeline(self, targets: list[Callable[[], Informer]]) -> None:
226        """
227        Run the prepare-then-execute pipeline.
228
229        Submits all targets to a thread pool for parallel preparation.
230        The main thread waits on a condition variable and executes each
231        job's callback as soon as it is the next in order, ensuring output
232        and side effects follow the original job order.
233
234        Failed preparations and executions are passed to `_handle_error`.
235
236        Args:
237            targets: List of callables that each resolve and prepare one Informer.
238        """
239        self.n_jobs = len(targets)
240        results: list[Informer | Exception | None] = [None] * self.n_jobs
241        lock = threading.Lock()
242        ready = threading.Condition(lock)
243
244        def prepare(index: int, target: Callable[[], Informer]) -> None:
245            """
246            Execute a single target and store its result or exception.
247
248            Notifies the main thread upon completion so it can check whether
249            the next-in-order result is available.
250
251            Args:
252                index (int): The position of this target in the original order.
253                target (Callable[[], Informer]): The callable to execute.
254            """
255            try:
256                result: Informer | Exception = target()
257            except Exception as e:
258                result = e
259
260            with ready:
261                results[index] = result
262                ready.notify()
263
264        with ThreadPoolExecutor(max_workers=self._n_threads) as executor:
265            # submit all preparation tasks to the thread pool
266            for i, target in enumerate(targets):
267                executor.submit(prepare, i, target)
268
269            # process results in order on the main thread
270            next_index = 0
271            with ready:
272                while next_index < self.n_jobs:
273                    # block the main thread until the next result is available
274                    # later jobs may finish first, but we wait for the one we need
275                    while results[next_index] is None:
276                        ready.wait()
277
278                    self.current_iteration += 1
279                    result = results[next_index]
280                    next_index += 1
281
282                    # handle error or execute the callback
283                    if isinstance(result, Exception):
284                        self._handle_error(result)
285                    elif isinstance(result, Informer):
286                        self._execute(result)
287                    else:
288                        raise ValueError(
289                            f"Unexpected result type: {type(result)}. This is a bug, please report it."
290                        )
291
292    def _execute(self, informer: Informer) -> None:
293        """
294        Run the callback on a prepared informer.
295
296        Args:
297            informer (Informer): A resolved and prepared Informer.
298        """
299        try:
300            self._callback(informer, *self._args, **self._kwargs)
301        except tuple(self._exception_handlers.keys()) as e:
302            self._handle_error(e)
303
304    def _handle_error(self, error: Exception) -> None:
305        """
306        Handle an exception using registered handlers, or re-raise.
307
308        Args:
309            error (Exception): The exception to handle.
310
311        Raises:
312            Exception: If no handler is registered for the exception type.
313        """
314        self.encountered_errors[self.current_iteration] = error
315        handler = self._exception_handlers.get(type(error))
316        if handler:
317            handler(error, self)
318        else:
319            raise error

Runs a job operation against one or more qq jobs.

Resolves and prepares informers in parallel using a thread pool, then executes the callback serially on the main thread in the original order, starting as soon as the next-in-order preparation completes.

All exceptions are caught internally and converted to sys.exit calls. Specific exception types can be handled gracefully via on_exception.

Attributes:
  • n_jobs (int): The total number of jobs to run.
  • encountered_errors (dict[int, Exception]): A dictionary mapping job indices to exceptions encountered during preparation or execution.
CommandRunner( job_ids: tuple[str, ...], directories: tuple[pathlib._local.Path, ...], all: bool, server: str | None, callback: Callable, logger: logging.Logger, *args: Any, n_threads: int = 1, **kwargs: Any)
39    def __init__(
40        self,
41        job_ids: tuple[str, ...],
42        directories: tuple[Path, ...],
43        all: bool,
44        server: str | None,
45        callback: Callable,
46        logger: logging.Logger,
47        *args: Any,
48        n_threads: int = 1,
49        **kwargs: Any,
50    ):
51        """
52        Initialize a CommandRunner.
53
54        Args:
55            job_ids (tuple[str, ...]): List of batch job IDs to operate on.
56            directories (tuple[Path, ...]): List of directories to search for qq jobs in.
57                If `job_ids` and `directories` are both empty and `all` is `False`, the current directory is searched for jobs.
58            all (bool): Collect all qq jobs for the given user on the given server.
59            server (str | None): The batch server to collect qq jobs from. If `None`, the local batch server is used.
60                The server can be specified as a shortcut.
61            callback (Callable): The operation to perform on each resolved Informer.
62                The informer is passed as the first argument, followed by `*args` and `**kwargs`.
63            logger (logging.Logger): Logger instance used for error and critical messages.
64                Should be the module-level logger of the calling CLI module.
65            *args (Any): Additional positional arguments forwarded to `callback`.
66            n_threads (int): Number of threads for parallel informer resolution. Defaults to 1 (serial).
67            **kwargs (Any): Additional keyword arguments forwarded to `callback`.
68        """
69        self._job_ids = job_ids
70        self._directories = directories
71        self._all = all
72        self._user = getpass.getuser()
73        self._server = translate_server(server) if server else None
74        if not self._job_ids and not self._directories and not all:
75            self._directories = [Path.cwd()]
76
77        self._callback = callback
78        self._args = args
79        self._kwargs = kwargs
80
81        self._batch_system = BatchInterface.from_env_var_or_guess()
82        self._logger = logger
83        self._exception_handlers: dict[type[Exception], Callable] = {}
84        self._n_threads = n_threads
85
86        self.n_jobs = 0
87        self.current_iteration = 0
88        self.encountered_errors: dict[int, Exception] = {}

Initialize a CommandRunner.

Arguments:
  • job_ids (tuple[str, ...]): List of batch job IDs to operate on.
  • directories (tuple[Path, ...]): List of directories to search for qq jobs in. If job_ids and directories are both empty and all is False, the current directory is searched for jobs.
  • all (bool): Collect all qq jobs for the given user on the given server.
  • server (str | None): The batch server to collect qq jobs from. If None, the local batch server is used. The server can be specified as a shortcut.
  • callback (Callable): The operation to perform on each resolved Informer. The informer is passed as the first argument, followed by *args and **kwargs.
  • logger (logging.Logger): Logger instance used for error and critical messages. Should be the module-level logger of the calling CLI module.
  • *args (Any): Additional positional arguments forwarded to callback.
  • n_threads (int): Number of threads for parallel informer resolution. Defaults to 1 (serial).
  • **kwargs (Any): Additional keyword arguments forwarded to callback.
n_jobs
current_iteration
encountered_errors: dict[int, Exception]
def on_exception(self, exc_type: type[Exception], handler: Callable) -> Self:
 90    def on_exception(self, exc_type: type[Exception], handler: Callable) -> Self:
 91        """
 92        Register an exception handler for a specific exception type.
 93
 94        Registered handlers are invoked when the callback or the preparation
 95        step raises the given exception type. Unregistered exception types
 96        propagate up and cause the process to exit.
 97
 98        Args:
 99            exc_type (type[Exception]): The exception type to handle.
100            handler (Callable): Function to call when `exc_type` is raised.
101                Must accept two arguments: the exception instance and
102                a reference to this `CommandRunner`.
103
104        Returns:
105            Self for chaining.
106        """
107        self._exception_handlers[exc_type] = handler
108        return self

Register an exception handler for a specific exception type.

Registered handlers are invoked when the callback or the preparation step raises the given exception type. Unregistered exception types propagate up and cause the process to exit.

Arguments:
  • exc_type (type[Exception]): The exception type to handle.
  • handler (Callable): Function to call when exc_type is raised. Must accept two arguments: the exception instance and a reference to this CommandRunner.
Returns:

Self for chaining.

def run(self) -> NoReturn:
110    def run(self) -> NoReturn:
111        """
112        Resolve all jobs, execute the callback for each, and exit the process.
113
114        Resolves informers from job IDs or info files in the target directory,
115        prepares them in parallel, and executes the registered callback for each
116        job in the original order. Registered exception handlers are invoked for
117        known error types; all other exceptions cause the process to exit.
118
119        This method never returns. It always terminates with `sys.exit`:
120            - Exit code 0 on success.
121            - Exit code `CFG.exit_codes.default` on `QQError`.
122            - Exit code `CFG.exit_codes.unexpected_error` on any other exception.
123        """
124        try:
125            targets = self._build_targets()
126            self._run_pipeline(targets)
127            sys.exit(0)
128        except QQError as e:
129            self._logger.error(e)
130            sys.exit(CFG.exit_codes.default)
131        except Exception as e:
132            self._logger.critical(e, exc_info=True, stack_info=True)
133            sys.exit(CFG.exit_codes.unexpected_error)

Resolve all jobs, execute the callback for each, and exit the process.

Resolves informers from job IDs or info files in the target directory, prepares them in parallel, and executes the registered callback for each job in the original order. Registered exception handlers are invoked for known error types; all other exceptions cause the process to exit.

This method never returns. It always terminates with sys.exit: - Exit code 0 on success. - Exit code CFG.exit_codes.default on QQError. - Exit code CFG.exit_codes.unexpected_error on any other exception.