# SPDX-License-Identifier: BSD-3-Clause AND Apache-2.0
# Copyright 2018 Regents of the University of California
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# Copyright 2019 Blue Cheetah Analog Design Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This module provides functions to help you run external processes.
"""
import os
import sys
from .common import bag_encoding, bag_codec_error
from .file import write_file
import multiprocessing
# noinspection PyCompatibility
import concurrent.futures
if sys.version_info[0] < 3:
# use subprocess32 for timeout feature.
if os.name != 'posix':
raise Exception('bag.io.process module current only works for POSIX systems.')
# noinspection PyUnresolvedReferences,PyPackageRequirements
import subprocess32 as subprocess
else:
import subprocess
[docs]def run_proc_with_quit(proc_id, quit_dict, args, logfile=None, append=False, env=None, cwd=None):
if logfile is None:
logfile = os.devnull
mode = 'ab' if append else 'wb'
with open(logfile, mode) as logf:
if proc_id in quit_dict:
return None
proc = subprocess.Popen(args, stdout=logf, stderr=subprocess.STDOUT,
env=env, cwd=cwd)
retcode = None
num_kill = 0
timeout = 0.05
while retcode is None and num_kill <= 2:
try:
retcode = proc.wait(timeout=timeout)
except subprocess.TimeoutExpired:
if proc_id in quit_dict:
if num_kill == 0:
proc.terminate()
timeout = quit_dict[proc_id]
elif num_kill == 1:
proc.kill()
num_kill += 1
return proc.returncode
[docs]def run_and_wait(args, timeout=None, logfile=None, append=False,
env=None, cwd=None):
"""Run a command in a subprocess, then wait for it to finish.
Parameters
----------
args : string or list[string]
the command to run. Should be either a command string or a list
of command string and its arguments as strings. A list is preferred;
see Python subprocess documentation.
timeout : float or None
the amount of time to wait for the command to finish, in seconds.
If None, waits indefinitely.
logfile : string or None
If given, stdout and stderr will be written to this file.
append : bool
True to append to the logfile. Defaults to False.
env : dict[string, any]
If not None, environment variables of the subprocess will be set
according to this dictionary instead of inheriting from current
process.
cwd : string or None
The current working directory of the subprocess.
Returns
-------
output : string
the standard output and standard error from the command.
Raises
------
subprocess.CalledProcessError
if any error occurred in the subprocess.
"""
output = subprocess.check_output(args, stderr=subprocess.STDOUT,
timeout=timeout, env=env, cwd=cwd)
output = output.decode(encoding=bag_encoding, errors=bag_codec_error)
if logfile is not None:
write_file(logfile, output, append=append)
return output
[docs]class ProcessManager(object):
"""A class that manages subprocesses.
This class is for starting processes that you do not need to wait on,
and allows you to query for their status or terminate/kill them if needed.
Parameters
----------
max_workers : int or None
number of maximum allowed subprocesses. If None, defaults to system
CPU count.
cancel_timeout : float or None
Number of seconds to wait for a process to terminate once SIGTERM or
SIGKILL is issued. Defaults to 10 seconds.
"""
def __init__(self, max_workers=None, cancel_timeout=10.0):
if max_workers is None:
max_workers = multiprocessing.cpu_count()
if cancel_timeout is None:
cancel_timeout = 10.0
self._exec = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
self._cancel_timeout = cancel_timeout
self._future_dict = {}
self._quit_dict = {}
[docs] def close(self, timeout=10.0):
"""Cancel all processes.
Parameters
----------
timeout : float
time to wait in seconds for each process to terminate.
"""
for proc_id in self._future_dict.keys():
self.cancel(proc_id, timeout=timeout)
self._exec.shutdown()
self._quit_dict.clear()
self._future_dict.clear()
[docs] def new_thread(self, fun, basename=None, callback=None):
"""Put a new custom task in queue.
Execute the given function in a thread asynchronously. The given function
must take two arguments, The first argument is a unique string that represents
this task, and the second argument is a dictionary. The dictionary will
map the unique string to a timeout (in second) if this task is being cancelled.
The function should periodically check the dictionary and terminate gracefully.
Before function returns, it should also delete the unique string from dictionary
if it exists.
Parameters
----------
fun : callable
the function to execute in a thread, as described above.
basename : string or None
If given, this will be used as the basis for generating the unique
process ID.
callback : callable
If given, this function will automatically be executed when the
process finished. This function should take a single argument,
which is a Future object that returns the return code of the
process.
Returns
-------
proc_id : string
a unique string representing this process. Can be used later
to query process status or cancel process.
"""
# find unique process ID
proc_id = basename or 'proc'
cur_idx = 1
while proc_id in self._future_dict:
proc_id = '%s_%d' % (proc_id, cur_idx)
cur_idx += 1
future = self._exec.submit(fun, proc_id, self._quit_dict)
if callback is not None:
future.add_done_callback(callback)
self._future_dict[proc_id] = future
return proc_id
[docs] def new_process(self, args, basename=None, logfile=None, append=False,
env=None, cwd=None, callback=None):
"""Put a new process in queue.
When the process is done, its return code will be returned.
Parameters
----------
args : string or list[string]
the command to run as a string or list of string arguments. See
Python subprocess documentation. list of string format is preferred.
basename : string or None
If given, this will be used as the basis for generating the unique
process ID.
logfile : string or None
If given, stdout and stderr will be written to this file. Otherwise,
they will be redirected to `os.devnull`.
append : bool
True to append to ``logfile`` instead of overwritng it.
env : dict[string, string] or None
If given, environment variables of the process will be set according
to this dictionary.
cwd : string or None
current working directory of the process.
callback : callable
If given, this function will automatically be executed when the
process finished. This function should take a single argument,
which is a Future object that returns the return code of the
process.
Returns
-------
proc_id : string
a unique string representing this process. Can be used later
to query process status or cancel process.
"""
# find unique process ID
proc_id = basename or 'proc'
cur_idx = 1
while proc_id in self._future_dict:
proc_id = '%s_%d' % (proc_id, cur_idx)
cur_idx += 1
future = self._exec.submit(self._start_cmd, args, proc_id,
logfile=logfile, append=append, env=env, cwd=cwd)
if callback is not None:
future.add_done_callback(callback)
self._future_dict[proc_id] = future
return proc_id
@staticmethod
[docs] def _get_output(future, timeout=None):
"""Get output from future. Return None when exception."""
try:
if future.exception(timeout=timeout) is None:
return future.result()
else:
return None
except concurrent.futures.CancelledError:
return None
[docs] def cancel(self, proc_id, timeout=None):
"""Cancel the given process.
If the process haven't started, this method prevents it from started.
Otherwise, we first send a SIGTERM signal to kill the process. If
after ``timeout`` seconds the process is still alive, we will send a
SIGKILL signal. If after another ``timeout`` seconds the process is
still alive, an Exception will be raised.
Parameters
----------
proc_id : string
the process ID to cancel.
timeout : float or None
number of seconds to wait for cancellation. If None, use default
timeout.
Returns
-------
output :
output of the thread if it successfully terminates.
Otherwise, return None.
"""
if timeout is None:
timeout = self._cancel_timeout
future = self._future_dict.get(proc_id, None)
if future is None:
return None
if future.done():
# process already done, return status.
del self._future_dict[proc_id]
return self._get_output(future)
if future.cancel():
# we cancelled process before it made into the thread pool.
del self._future_dict[proc_id]
return None
else:
# inform thread it should try to quit.
self._quit_dict[proc_id] = timeout
try:
output = self._get_output(future, timeout=4 * timeout)
del self._future_dict[proc_id]
return output
except concurrent.futures.TimeoutError:
# shouldn't get here, but we did
print("*WARNING* worker thread refuse to die...")
del self._future_dict[proc_id]
return None
[docs] def done(self, proc_id):
"""Returns True if the given process finished or is cancelled successfully.
Parameters
----------
proc_id : string
the process ID.
Returns
-------
done : bool
True if the process is cancelled or completed.
"""
return self._future_dict[proc_id].done()
[docs] def wait(self, proc_id, timeout=None, cancel_timeout=None):
"""Wait for the given process to finish, then return its return code.
If ``timeout`` is None, waits indefinitely. Otherwise, if after
``timeout`` seconds the process is still running, a
:class:`concurrent.futures.TimeoutError` will be raised.
However, it is safe to catch this error and call wait again.
If Ctrl-C is pressed before process finish or before timeout
is reached, the process will be cancelled.
Parameters
----------
proc_id : string
the process ID.
timeout : float or None
number of seconds to wait. If None, waits indefinitely.
cancel_timeout : float or None
number of seconds to wait for process cancellation. If None,
use default timeout.
Returns
-------
output :
output of the thread if it successfully terminates. Otherwise return None.
"""
if cancel_timeout is None:
cancel_timeout = self._cancel_timeout
future = self._future_dict[proc_id]
try:
output = future.result(timeout=timeout)
# remove future from dictionary.
del self._future_dict[proc_id]
return output
except KeyboardInterrupt:
# cancel the process
print('KeyboardInterrupt received, cancelling %s...' % proc_id)
return self.cancel(proc_id, timeout=cancel_timeout)
[docs] def _start_cmd(self, args, proc_id, logfile=None, append=False, env=None, cwd=None):
"""The function that actually starts the subprocess. Executed by thread."""
retcode = run_proc_with_quit(proc_id, self._quit_dict, args, logfile=logfile,
append=append, env=env, cwd=cwd)
if proc_id in self._quit_dict:
del self._quit_dict[proc_id]
return retcode