Source code for bag.math.interpolate

# SPDX-License-Identifier: BSD-3-Clause AND Apache-2.0
# Copyright 2018 Regents of the University of California
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
#   list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
#   this list of conditions and the following disclaimer in the documentation
#   and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
#   contributors may be used to endorse or promote products derived from
#   this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

# Copyright 2019 Blue Cheetah Analog Design Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""This module defines various interpolation classes.
"""

from __future__ import annotations
from typing import List, Tuple, Union, Sequence, Optional

import numpy as np
import scipy.interpolate as interp
import scipy.ndimage.interpolation as imag_interp

from ..math.dfun import DiffFunction

__author__ = 'erichang'
__all__ = ['interpolate_grid', 'LinearInterpolator']


def _scales_to_points(scale_list: List[Tuple[float, float]], values: np.ndarray,
                      delta: float = 1e-4) -> Tuple[List[np.ndarray], List[float]]:
    """convert scale_list to list of point values and finite difference deltas."""

    ndim = len(values.shape)
    # error checking
    if ndim == 1:
        raise ValueError('This class only works for dimension >= 2.')
    elif ndim != len(scale_list):
        raise ValueError('input and output dimension mismatch.')

    points = []
    delta_list = []
    for idx in range(ndim):
        num_pts = values.shape[idx]  # type: int
        if num_pts < 2:
            raise ValueError('Every dimension must have at least 2 points.')
        offset, scale = scale_list[idx]
        points.append(np.linspace(offset, (num_pts - 1) * scale + offset, num_pts))
        delta_list.append(scale * delta)

    return points, delta_list


[docs]def interpolate_grid(scale_list: List[Tuple[float, float]], values: np.ndarray, method: str = 'spline', extrapolate: bool = False, delta: float = 1e-4, num_extrapolate: int = 3) -> DiffFunction: """Interpolates multidimensional data on a regular grid. returns an Interpolator for the given dataset. Parameters ---------- scale_list : List[Tuple[float, float]] a list of (offset, spacing). values : np.ndarray The output data in N dimensions. The length in each dimension must be at least 2. method : str The interpolation method. Either 'linear', or 'spline'. Defaults to 'spline'. extrapolate : bool True to extrapolate data output of given bounds. Defaults to False. delta : float the finite difference step size. Finite difference is only used for linear interpolation and spline interpolation on 3D data or greater. Defaults to 1e-4 of the grid spacing. num_extrapolate: int If spline interpolation is selected on 3D data or greater, we linearly extrapolate the given data by this many points to fix behavior near input boundaries. Returns ------- fun : DiffFunction the interpolator function. """ ndim = len(values.shape) if method == 'linear': points, delta_list = _scales_to_points(scale_list, values, delta) return LinearInterpolator(points, values, delta_list, extrapolate=extrapolate) elif ndim == 1: return Interpolator1D(scale_list, values, method=method, extrapolate=extrapolate) elif method == 'spline': if ndim == 2: return Spline2D(scale_list, values, extrapolate=extrapolate) else: return MapCoordinateSpline(scale_list, values, delta=delta, extrapolate=extrapolate, num_extrapolate=num_extrapolate) else: raise ValueError('Unsupported interpolation method: %s' % method)
def interpolate_unstructured(points: List[np.ndarray], values: np.ndarray, method: str = 'spline', extrapolate: bool = False) -> DiffFunction: """Interpolates multidimensional data on an unstructured data/irregular grid. returns an Interpolator for the given dataset. Parameters ---------- points : List[np.ndarray] a list of points of each dimension. values : np.ndarray The output data in N dimensions. The length in each dimension must be at least 2. method : str The interpolation method. Either 'linear', or 'spline'. Defaults to 'spline'. extrapolate : bool True to extrapolate data output of given bounds. Defaults to False. Returns ------- fun : DiffFunction the interpolator function. """ ndim = len(values.shape) if method == 'linear': return LinearInterpolatorUnstructured(points, values, None, extrapolate=extrapolate) elif ndim == 1: return Interpolator1DUnstructured(points, values, method=method, extrapolate=extrapolate) elif method == 'spline': raise ValueError("Unsupported spline interpolation for unstructured multi-dimensional data") else: raise ValueError('Unsupported interpolation method: %s' % method)
[docs]class LinearInterpolator(DiffFunction): """A linear interpolator on a regular grid for arbitrary dimensions. This class is backed by scipy.interpolate.RegularGridInterpolator. Derivatives are calculated using finite difference. Parameters ---------- points : Sequence[np.ndarray] list of points of each dimension. values : np.ndarray The output data in N dimensions. delta_list : List[float] list of finite difference step size for each axis. extrapolate : bool True to extrapolate data output of given bounds. Defaults to False. """ def __init__(self, points: Sequence[np.ndarray], values: np.ndarray, delta_list: List[float], extrapolate: bool = False) -> None: input_range = [(pvec[0], pvec[-1]) for pvec in points] DiffFunction.__init__(self, input_range, delta_list=delta_list) self._points = points self._extrapolate = extrapolate self.fun = interp.RegularGridInterpolator(points, values, method='linear', bounds_error=not extrapolate, fill_value=None)
[docs] def get_input_points(self, idx: int) -> np.ndarray: """Returns the input points for the given dimension.""" return self._points[idx]
[docs] def __call__(self, xi: np.ndarray) -> np.ndarray: """Interpolate at the given coordinate. Parameters ---------- xi : numpy.ndarray The coordinates to evaluate, with shape (..., ndim) Returns ------- val : numpy.ndarray The interpolated values at the given coordinates. """ ans = self.fun(xi) if ans.size == 1: return ans[0] return ans
[docs] def integrate(self, xstart: float, xstop: float, axis: int = -1, logx: bool = False, logy: bool = False, raw: bool = False) -> Union[LinearInterpolator, np.ndarray]: """Integrate away the given axis. if logx/logy is True, that means this LinearInterpolator is actually used to do linear interpolation on the logarithm of the actual data. This method will returns the integral of the actual data. Parameters ---------- xstart : float the X start value. xstop : float the X stop value. axis : int the axis of integration. If unspecified, this will be the last axis. logx : bool True if the values on the given axis are actually the logarithm of the real values. logy : bool True if the Y values are actually the logarithm of the real values. raw : bool True to return the raw data points instead of a LinearInterpolator object. Returns ------- result : Union[LinearInterpolator, np.ndarray] float if this interpolator has only 1 dimension, otherwise a new LinearInterpolator is returned. """ if self.delta_list is None: raise ValueError("Finite differences must be enabled") if logx != logy: raise ValueError('Currently only works for linear or log-log relationship.') ndim = self.ndim if axis < 0: axis = ndim - 1 if axis < 0 or axis >= ndim: raise IndexError('index out of range.') if len(self._points) < ndim: raise ValueError("len(self._points) != ndim") def calculate_integ_x() -> np.ndarray: # find data points between xstart and xstop vec_inner = self._points[axis] start_idx, stop_idx = np.searchsorted(vec_inner, [xstart, xstop]) cur_len = stop_idx - start_idx if vec_inner[start_idx] > xstart: cur_len += 1 istart = 1 else: istart = 0 if vec_inner[stop_idx - 1] < xstop: cur_len += 1 istop = cur_len - 1 else: istop = cur_len integ_x_inner = np.empty(cur_len) integ_x_inner[istart:istop] = vec_inner[start_idx:stop_idx] if istart != 0: integ_x_inner[0] = xstart if istop != cur_len: integ_x_inner[cur_len - 1] = xstop return integ_x_inner # get all input sample points we need to integrate. plist = [] integ_x = calculate_integ_x() # type: np.ndarray new_points = [] new_deltas = [] for axis_idx, vec in enumerate(self._points): if axis == axis_idx: plist.append(integ_x) else: plist.append(vec) new_points.append(vec) new_deltas.append(self.delta_list[axis_idx]) fun_arg = np.stack(np.meshgrid(*plist, indexing='ij'), axis=-1) values = self.fun(fun_arg) if logx: if axis != ndim - 1: # transpose values so that broadcasting/slicing is easier new_order = [idx for idx in range(ndim) if idx != axis] new_order.append(axis) values = np.transpose(values, axes=new_order) # integrate given that log-log plot is piece-wise linear ly1 = values[..., :-1] ly2 = values[..., 1:] lx1 = np.broadcast_to(integ_x[:-1], ly1.shape) lx2 = np.broadcast_to(integ_x[1:], ly1.shape) m = (ly2 - ly1) / (lx2 - lx1) x1 = np.exp(lx1) y1 = np.exp(ly1) log_idx = np.abs(m + 1) < 1e-6 log_idxb = np.invert(log_idx) area = np.empty(m.shape) area[log_idx] = (y1[log_idx] / np.power(x1[log_idx], m[log_idx]) * (lx2[log_idx] - lx1[log_idx])) mp1 = m[log_idxb] + 1 x2 = np.exp(lx2[log_idxb]) x1 = x1[log_idxb] area[log_idxb] = y1[log_idxb] / mp1 * (np.power(x2 / x1, m[log_idxb]) * x2 - x1) new_values = np.sum(area, axis=-1) else: # just use trapezoid integration # noinspection PyTypeChecker new_values = np.trapz(values, x=integ_x, axis=axis) if not raw and new_points: return LinearInterpolator(new_points, new_values, new_deltas, extrapolate=self._extrapolate) else: return new_values
class LinearInterpolatorUnstructured(DiffFunction): """A linear interpolator on an unstructured dataset/irregular grid for arbitrary dimensions. This class is backed by scipy.interpolate.LinearNDInterpolator. Derivatives are calculated using finite difference. Parameters ---------- points : Sequence[np.ndarray] list of points of each dimension. values : np.ndarray The output data in N dimensions. delta_list : List[float] list of finite difference step size for each axis. extrapolate : bool True to extrapolate data output of given bounds. Defaults to False. rescale : bool True to rescale points to unit cube before performing interpolation. Defaults to False Refer to scipy.interpolate.LinearNDInterpolator for details. """ def __init__(self, points: Sequence[np.ndarray], values: np.ndarray, delta_list: Optional[List[float]], extrapolate: bool = False, rescale: bool = False): input_range = [(pvec[0], pvec[-1]) for pvec in points] super().__init__(input_range, delta_list=delta_list) self._points = points self._extrapolate = extrapolate # TODO: implement extrapolate self.fun = interp.LinearNDInterpolator(points, values, rescale=rescale) def __call__(self, xi: np.ndarray) -> np.ndarray: """Interpolate at the given coordinate. Parameters ---------- xi : numpy.ndarray The coordinates to evaluate, with shape (..., ndim) Returns ------- val : numpy.ndarray The interpolated values at the given coordinates. """ ans = self.fun(xi) if ans.size == 1: return ans[0] return ans class Interpolator1D(DiffFunction): """An interpolator on a regular grid for 1 dimensional data. This class is backed by scipy.interpolate.InterpolatedUnivariateSpline. Parameters ---------- scale_list : List[Tuple[float, float]] a list of (offset, spacing) for each input dimension. values : numpy.ndarray The output data. Must be 1 dimension. method : str extrapolation method. Either 'linear' or 'spline'. Defaults to spline. extrapolate : bool True to extrapolate data output of given bounds. Defaults to False. """ def __init__(self, scale_list: List[Tuple[float, float]], values: np.ndarray, method: str = 'spline', extrapolate: bool = False): # error checking if len(values.shape) != 1: raise ValueError('This class only works for 1D data.') elif len(scale_list) != 1: raise ValueError('input and output dimension mismatch.') if method == 'linear' or values.shape[0] <= 3: k = 1 elif method == 'spline': k = 3 else: raise ValueError('Unsupported interpolation method: %s' % method) offset, scale = scale_list[0] num_pts = values.shape[0] points = np.linspace(offset, (num_pts - 1) * scale + offset, num_pts) DiffFunction.__init__(self, [(points[0], points[-1])], delta_list=None) ext = 0 if extrapolate else 2 self.fun = interp.InterpolatedUnivariateSpline(points, values, k=k, ext=ext) def __call__(self, xi: np.ndarray) -> np.ndarray: """Interpolate at the given coordinate. Parameters ---------- xi : numpy.ndarray The coordinates to evaluate, with shape (..., ndim) Returns ------- val : numpy.ndarray The interpolated values at the given coordinates. """ ans = self.fun(xi) if ans.size == 1: return ans.item() return ans def deriv(self, xi: np.ndarray, idx: int) -> np.ndarray: """Calculate the derivative of the spline along the given index. Parameters ---------- xi : numpy.ndarray The coordinates to evaluate, with shape (..., ndim) idx : int The index to calculate the derivative on. Returns ------- val : numpy.ndarray The derivatives at the given coordinates. """ if idx != 0: raise ValueError('Invalid derivative index: %d' % idx) ans = self.fun(xi, 1) if ans.size == 1: return ans[0] return ans class Interpolator1DUnstructured(DiffFunction): """An interpolator on an unstructured dataset/irregular grid for 1 dimensional data. This class is backed by scipy.interpolate.InterpolatedUnivariateSpline. Parameters ---------- points : Sequence[np.ndarray] list of points of each dimension. Must have length 1. values : numpy.ndarray The output data. Must be 1 dimension. method : str extrapolation method. Either 'linear' or 'spline'. Defaults to spline. extrapolate : bool True to extrapolate data output of given bounds. Defaults to False. """ def __init__(self, points: Sequence[np.ndarray], values: np.ndarray, method: str = 'spline', extrapolate: bool = False): if len(values.shape) != 1: raise ValueError('This class only works for 1D data.') elif len(points) != 1: raise ValueError('input and output dimension mismatch.') if method == 'linear' or values.shape[0] <= 3: k = 1 elif method == 'spline': k = 3 else: raise ValueError('Unsupported interpolation method: %s' % method) points = points[0] super().__init__([(points[0], points[-1])], delta_list=None) ext = 0 if extrapolate else 2 self.fun = interp.InterpolatedUnivariateSpline(points, values, k=k, ext=ext) def __call__(self, xi: np.ndarray) -> np.ndarray: """Interpolate at the given coordinate. Parameters ---------- xi : numpy.ndarray The coordinates to evaluate, with shape (..., ndim) Returns ------- val : numpy.ndarray The interpolated values at the given coordinates. """ ans = self.fun(xi) if ans.size == 1: return ans.item() return ans def deriv(self, xi: np.ndarray, idx: int) -> np.ndarray: """Calculate the derivative of the spline along the given index. Parameters ---------- xi : numpy.ndarray The coordinates to evaluate, with shape (..., ndim) idx : int The index to calculate the derivative on. Returns ------- val : numpy.ndarray The derivatives at the given coordinates. """ if idx != 0: raise ValueError('Invalid derivative index: %d' % idx) ans = self.fun(xi, 1) if ans.size == 1: return ans[0] return ans class Spline2D(DiffFunction): """A spline interpolator on a regular grid for 2D data. This class is backed by scipy.interpolate.RectBivariateSpline. Parameters ---------- scale_list : List[Tuple[float, float]] a list of (offset, spacing) for each input dimension. values : numpy.ndarray The output data. Must be 2D. extrapolate : bool True to extrapolate data output of given bounds. Defaults to False. """ def __init__(self, scale_list: List[Tuple[float, float]], values: np.ndarray, extrapolate: bool = False): # error checking if len(values.shape) != 2: raise ValueError('This class only works for 2D data.') elif len(scale_list) != 2: raise ValueError('input and output dimension mismatch.') nx, ny = values.shape offset, scale = scale_list[0] x = np.linspace(offset, (nx - 1) * scale + offset, nx) offset, scale = scale_list[1] y = np.linspace(offset, (ny - 1) * scale + offset, ny) self._min = x[0], y[0] self._max = x[-1], y[-1] DiffFunction.__init__(self, [(x[0], x[-1]), (y[0], y[-1])], delta_list=None) self.fun = interp.RectBivariateSpline(x, y, values) self._extrapolate = extrapolate def _get_xy(self, xi: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: """Get X and Y array from given coordinates.""" xi = np.asarray(xi, dtype=float) if xi.shape[-1] != 2: raise ValueError("The requested sample points xi have dimension %d, " "but this interpolator has dimension 2" % (xi.shape[-1])) # check input within bounds. x = xi[..., 0] y = xi[..., 1] if not self._extrapolate and not np.all((self._min[0] <= x) & (x <= self._max[0]) & (self._min[1] <= y) & (y <= self._max[1])): raise ValueError('some inputs are out of bounds.') return x, y def __call__(self, xi: np.ndarray) -> np.ndarray: """Interpolate at the given coordinates. Parameters ---------- xi : numpy.ndarray The coordinates to evaluate, with shape (..., ndim) Returns ------- val : numpy.ndarray The interpolated values at the given coordinates. """ x, y = self._get_xy(xi) return self.fun(x, y, grid=False) def deriv(self, xi: np.ndarray, idx: int) -> np.ndarray: """Calculate the derivative of the spline along the given index. Parameters ---------- xi : numpy.ndarray The coordinates to evaluate, with shape (..., ndim) idx : int The index to calculate the derivative on. Returns ------- val : numpy.ndarray The derivatives at the given coordinates. """ if idx < 0 or idx > 1: raise ValueError('Invalid derivative index: %d' % idx) x, y = self._get_xy(xi) if idx == 0: return self.fun(x, y, dx=1, grid=False) else: return self.fun(x, y, dy=1, grid=False) class MapCoordinateSpline(DiffFunction): """A spline interpolator on a regular grid for multidimensional data. The spline interpolation is done using map_coordinate method in the scipy.ndimage.interpolation package. The derivative is done using finite difference. if extrapolate is True, we use linear interpolation for values outside of bounds. Note: By default, map_coordinate uses the nearest value for all points outside the boundary. This will cause undesired interpolation behavior near boundary points. To solve this, we linearly extrapolates the given data for a fixed number of points. Parameters ---------- scale_list : List[Tuple[float, float]] a list of (offset, spacing) for each input dimension. values : numpy.ndarray The output data. extrapolate : bool True to linearly extrapolate outside of bounds. num_extrapolate : int number of points to extrapolate in each dimension in each direction. delta : float the finite difference step size. Defaults to 1e-4 (relative to a spacing of 1). """ def __init__(self, scale_list: List[Tuple[float, float]], values: np.ndarray, extrapolate: bool = False, num_extrapolate: int = 3, delta: float = 1e-4): shape = values.shape ndim = len(shape) # error checking if ndim < 3: raise ValueError('Data must have 3 or more dimensions.') elif ndim != len(scale_list): raise ValueError('input and output dimension mismatch.') self._scale_list = scale_list self._max = [n - 1 + num_extrapolate for n in shape] self._extrapolate = extrapolate self._ext = num_extrapolate # linearly extrapolate given values ext_points = [np.arange(num_extrapolate, n + num_extrapolate) for n in shape] points, delta_list = _scales_to_points(scale_list, values, delta) input_ranges = [(pvec[0], pvec[-1]) for pvec in points] self._extfun = LinearInterpolator(ext_points, values, [delta] * ndim, extrapolate=True) xi_ext = np.stack(np.meshgrid(*(np.arange(0, n + 2 * num_extrapolate) for n in shape), indexing='ij', copy=False), axis=-1) values_ext = self._extfun(xi_ext) self._filt_values = imag_interp.spline_filter(values_ext) DiffFunction.__init__(self, input_ranges, delta_list=delta_list) def _normalize_inputs(self, xi: np.ndarray) -> np.ndarray: """Normalize the inputs.""" xi = np.asarray(xi, dtype=float) if xi.shape[-1] != self.ndim: raise ValueError("The requested sample points xi have dimension %d, " "but this interpolator has dimension %d" % (xi.shape[-1], self.ndim)) xi = np.atleast_2d(xi.copy()) for idx, (offset, scale) in enumerate(self._scale_list): xi[..., idx] -= offset xi[..., idx] /= scale # take extension input account. xi += self._ext return xi def __call__(self, xi: np.ndarray) -> np.ndarray: """Interpolate at the given coordinate. Parameters ---------- xi : numpy.ndarray The coordinates to evaluate, with shape (..., ndim) Returns ------- val : numpy.ndarray The interpolated values at the given coordinates. """ ext = self._ext ndim = self.ndim xi = self._normalize_inputs(xi) ans_shape = xi.shape[:-1] xi = xi.reshape(-1, ndim) ext_idx_vec = False for idx in range(self.ndim): ext_idx_vec = ext_idx_vec | (xi[:, idx] < ext) | (xi[:, idx] > self._max[idx]) int_idx_vec = ~ext_idx_vec xi_ext = xi[ext_idx_vec, :] xi_int = xi[int_idx_vec, :] ans = np.empty(xi.shape[0]) ans[int_idx_vec] = imag_interp.map_coordinates(self._filt_values, xi_int.T, mode='nearest', prefilter=False) if xi_ext.size > 0: if not self._extrapolate: raise ValueError('some inputs are out of bounds.') ans[ext_idx_vec] = self._extfun(xi_ext) if ans.size == 1: return ans[0] return ans.reshape(ans_shape)