bag.concurrent.core

This module define utility classes for performing concurrent operations.

Module Contents

Classes

Semaphore

A modified asyncio Semaphore class that gets the running loop dynamically.

SubProcessManager

A class that provides methods to run multiple subprocesses in parallel using asyncio.

Functions

batch_async_task(→ List[Any])

Execute a list of coroutines or futures concurrently.

Attributes

ProcInfo

FlowInfo

bag.concurrent.core.ProcInfo[source]
bag.concurrent.core.FlowInfo[source]
bag.concurrent.core.batch_async_task(coro_list: Iterable[Awaitable[Any]]) List[Any][source]

Execute a list of coroutines or futures concurrently.

User may press Ctrl-C to cancel all given tasks.

Parameters:

coro_list (Iterable[Awaitable[Any]]) – a list of coroutines or futures to run concurrently.

Returns:

results – a list of return values or raised exceptions of given tasks.

Return type:

Optional[Tuple[Any]]

class bag.concurrent.core.Semaphore(value: int = 1)[source]

A modified asyncio Semaphore class that gets the running loop dynamically.

async __aenter__() None[source]
async __aexit__(exc_type, exc, tb)[source]
_wake_up_next()[source]
locked()[source]
async acquire()[source]
release()[source]
class bag.concurrent.core.SubProcessManager(max_workers: int = 0, cancel_timeout: float = 10.0, **kwargs: Any)[source]

A class that provides methods to run multiple subprocesses in parallel using asyncio.

Parameters:
  • max_workers (Optional[int]) – number of maximum allowed subprocesses. If None, defaults to system CPU count.

  • cancel_timeout (float) – Number of seconds to wait for a process to terminate once SIGTERM or SIGKILL is issued. Defaults to 10 seconds.

  • **kwargs (Any) – Optional keyword arguments.

async _kill_subprocess(proc: Optional[asyncio.subprocess.Process]) None[source]

Helper method; send SIGTERM/SIGKILL to a subprocess.

This method first sends SIGTERM to the subprocess. If the process hasn’t terminated after a given timeout, it sends SIGKILL.

Parameter

procOptional[Process]

the process to attempt to terminate. If None, this method does nothing.

async async_new_subprocess(args: Union[str, Sequence[str]], log: str, env: Optional[Dict[str, str]] = None, cwd: Optional[str] = None, **kwargs: Any) Optional[int][source]

A coroutine which starts a subprocess.

If this coroutine is cancelled, it will shut down the subprocess gracefully using SIGTERM/SIGKILL, then raise CancelledError.

Parameters:
  • args (Union[str, Sequence[str]]) – command to run, as string or sequence of strings.

  • log (str) – the log file name.

  • env (Optional[Dict[str, str]]) – an optional dictionary of environment variables. None to inherit from parent.

  • cwd (Optional[str]) – the working directory. None to inherit from parent.

Returns:

retcode – the return code of the subprocess.

Return type:

Optional[int]

async async_new_subprocess_flow(proc_info_list: Sequence[FlowInfo], **kwargs: Any) Any[source]

A coroutine which runs a series of subprocesses.

If this coroutine is cancelled, it will shut down the current subprocess gracefully using SIGTERM/SIGKILL, then raise CancelledError.

Parameters:

proc_info_list (Sequence[FlowInfo]) –

a list of processes to execute in series. Each element is a tuple of:

argsUnion[str, Sequence[str]]

command to run, as string or list of string arguments.

logstr

log file name.

envOptional[Dict[str, str]]

environment variable dictionary. None to inherit from parent.

cwdOptional[str]

working directory path. None to inherit from parent.

vfunSequence[Callable[[Optional[int], str], Any]]

a function to validate if it is ok to execute the next process. The output of the last function is returned. The first argument is the return code, the second argument is the log file name.

Returns:

result – the return value of the last validate function. None if validate function returns False.

Return type:

Any

batch_subprocess(proc_info_list: Sequence[ProcInfo]) Optional[Sequence[Union[int, Exception]]][source]

Run all given subprocesses in parallel.

Parameters:

proc_info_list (Sequence[ProcInfo]) –

a list of process information. Each element is a tuple of:

argsUnion[str, Sequence[str]]

command to run, as string or list of string arguments.

logstr

log file name.

envOptional[Dict[str, str]]

environment variable dictionary. None to inherit from parent.

cwdOptional[str]

working directory path. None to inherit from parent.

Returns:

results – if user cancelled the subprocesses, None is returned. Otherwise, a list of subprocess return codes or exceptions are returned.

Return type:

Optional[Sequence[Union[int, Exception]]]

batch_subprocess_flow(proc_info_list: Sequence[Sequence[FlowInfo]]) Optional[Sequence[Union[int, Exception]]][source]

Run all given subprocesses flow in parallel.

Parameters:

proc_info_list (Sequence[Sequence[FlowInfo]) –

a list of process flow information. Each element is a sequence of tuples of:

argsUnion[str, Sequence[str]]

command to run, as string or list of string arguments.

logstr

log file name.

envOptional[Dict[str, str]]

environment variable dictionary. None to inherit from parent.

cwdOptional[str]

working directory path. None to inherit from parent.

vfunSequence[Callable[[Optional[int], str], Any]]

a function to validate if it is ok to execute the next process. The output of the last function is returned. The first argument is the return code, the second argument is the log file name.

Returns:

results – if user cancelled the subprocess flows, None is returned. Otherwise, a list of flow return values or exceptions are returned.

Return type:

Optional[Sequence[Any]]