Source code for bag.data.core
# SPDX-License-Identifier: BSD-3-Clause AND Apache-2.0
# Copyright 2018 Regents of the University of California
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# Copyright 2019 Blue Cheetah Analog Design Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This module defines core data post-processing classes.
"""
import numpy as np
import scipy.interpolate as interp
import scipy.cluster.vq as svq
import scipy.optimize as sciopt
[docs]class Waveform(object):
"""A (usually transient) waveform.
This class provides interpolation and other convenience functions.
Parameters
----------
xvec : np.multiarray.ndarray
the X vector.
yvec : np.multiarray.ndarray
the Y vector.
xtol : float
the X value tolerance.
order : int
the interpolation order. 1 for nearest, 2 for linear, 3 for spline.
ext : int or str
interpolation extension mode. See documentation for InterpolatedUnivariateSpline.
"""
def __init__(self, xvec, yvec, xtol, order=3, ext=3):
self._xvec = xvec
self._yvec = yvec
self._xtol = xtol
self._order = order
self._ext = ext
self._fun = interp.InterpolatedUnivariateSpline(xvec, yvec, k=order, ext=ext)
@property
@property
@property
[docs] def order(self):
"""the interpolation order. 1 for nearest, 2 for linear, 3 for spline."""
return self._order
@property
@property
[docs] def ext(self):
"""interpolation extension mode. See documentation for InterpolatedUnivariateSpline."""
return self._ext
[docs] def __call__(self, *arg, **kwargs):
"""Evaluate the waveform at the given points."""
return self._fun(*arg, **kwargs)
[docs] def get_xrange(self):
"""Returns the X vector range.
Returns
-------
xmin : float
minimum X value.
xmax : float
maximum X value.
"""
return self.xvec[0], self.xvec[-1]
[docs] def shift_by(self, xshift):
"""Returns a shifted version of this waveform.
Parameters
----------
xshift : float
the amount to shift by.
Returns
-------
wvfm : bag.data.core.Waveform
a reference to this instance, or a copy if copy is True.
"""
return Waveform(self.xvec + xshift, self.yvec, self.xtol, order=self.order, ext=self.ext)
[docs] def get_all_crossings(self, threshold, start=None, stop=None, edge='both'):
"""Returns all X values at which this waveform crosses the given threshold.
Parameters
----------
threshold : float
the threshold value.
start : float or None
if given, search for crossings starting at this X value.
stop : float or None
if given, search only for crossings before this X value.
edge : string
crossing type. Valid values are 'rising', 'falling', or 'both'.
Returns
-------
xval_list : list[float]
all X values at which crossing occurs.
"""
# determine start and stop indices
sidx = 0 if start is None else np.searchsorted(self.xvec, [start])[0]
if stop is None:
eidx = len(self.xvec)
else:
eidx = np.searchsorted(self.xvec, [stop])[0]
if eidx < len(self.xvec) and abs(self.xvec[eidx] - stop) < self.xtol:
eidx += 1
# quantize waveform values, then detect edge.
bool_vec = self.yvec[sidx:eidx] >= threshold # type: np.ndarray
qvec = bool_vec.astype(int)
dvec = np.diff(qvec)
# eliminate unwanted edge types.
if edge == 'rising':
dvec = np.maximum(dvec, 0)
elif edge == 'falling':
dvec = np.minimum(dvec, 0)
# get crossing indices
idx_list = dvec.nonzero()[0]
# convert indices to X value using brentq interpolation.
def crossing_fun(x):
return self._fun(x) - threshold
xval_list = []
for idx in idx_list:
t0, t1 = self.xvec[sidx + idx], self.xvec[sidx + idx + 1]
try:
tcross = sciopt.brentq(crossing_fun, t0, t1, xtol=self.xtol)
except ValueError:
# no solution, this happens only if we have numerical error
# around the threshold. In this case just pick the endpoint
# closest to threshold.
va = crossing_fun(t0)
vb = crossing_fun(t1)
tcross = t0 if abs(va) < abs(vb) else t1
xval_list.append(tcross)
return xval_list
[docs] def get_crossing(self, threshold, start=None, stop=None, n=1, edge='both'):
"""Returns the X value at which this waveform crosses the given threshold.
Parameters
----------
threshold : float
the threshold value.
start : float or None
if given, search for the crossing starting at this X value.'
stop : float or None
if given, search only for crossings before this X value.
n : int
returns the nth crossing.
edge : str
crossing type. Valid values are 'rising', 'falling', or 'both'.
Returns
-------
xval : float or None
the X value at which the crossing occurs. None if no crossings are detected.
"""
xval_list = self.get_all_crossings(threshold, start=start, stop=stop, edge=edge)
if len(xval_list) < n:
return None
return xval_list[n - 1]
[docs] def to_arrays(self, xmin=None, xmax=None):
"""Returns the X and Y arrays representing this waveform.
Parameters
----------
xmin : float or None
If given, will start from this value.
xmax : float or None
If given, will end at this value.
Returns
-------
xvec : np.multiarray.ndarray
the X array
yvec : np.multiarray.ndarray
the Y array
"""
sidx = 0 if xmin is None else np.searchsorted(self.xvec, [xmin])[0]
eidx = len(self.xvec) if xmax is None else np.searchsorted(self.xvec, [xmax])[0]
if eidx < len(self.xvec) and self.xvec[eidx] == xmax:
eidx += 1
xtemp = self.xvec[sidx:eidx]
if xmin is not None and (len(xtemp) == 0 or xtemp[0] != xmin):
np.insert(xtemp, 0, [xmin])
if xmax is not None and (len(xtemp) == 0 or xtemp[-1] != xmax):
np.append(xtemp, [xmax])
return xtemp, self(xtemp)
[docs] def get_eye_specs(self, tbit, tsample, thres=0.0, nlev=2):
"""Compute the eye diagram spec of this waveform.
This algorithm uses the following steps.
1. set t_off to 0
2. sample the waveform at tbit interval, starting at t0 + t_off.
3. sort the sampled values, get gap between adjacent values.
4. record G, the length of the gap covering thres.
5. increment t_off by tsample, go to step 2 and repeat until
t_off >= tbit.
6. find t_off with maximum G. This is the eye center.
7. at the eye center, compute eye height and eye opening using kmeans
clustering algorithm.
8. return result.
Parameters
----------
tbit : float
eye period.
tsample : float
the resolution to sample the eye. Used to find optimal
time shift and maximum eye opening.
thres : float
the eye vertical threshold.
nlev : int
number of expected levels. 2 for NRZ, 4 for PAM4.
Returns
-------
result : dict
A dictionary from specification to value.
"""
tstart, tend = self.get_xrange()
toff_vec = np.arange(0, tbit, tsample)
best_idx = 0
best_gap = 0.0
best_values = None
mid_lev = nlev // 2
for idx, t_off in enumerate(toff_vec):
# noinspection PyTypeChecker
values = self(np.arange(tstart + t_off, tend, tbit))
values.sort()
up_idx = np.searchsorted(values, [thres])[0]
if up_idx == 0 or up_idx == len(values):
continue
cur_gap = values[up_idx] - values[up_idx - 1]
if cur_gap > best_gap:
best_idx = idx
best_gap = cur_gap
best_values = values
if best_values is None:
raise ValueError("waveform never cross threshold=%.4g" % thres)
vstd = np.std(best_values)
vtemp = best_values / vstd
tmp_arr = np.linspace(vtemp[0], vtemp[-1], nlev) # type: np.ndarray
clusters = svq.kmeans(vtemp, tmp_arr)[0]
# clusters = svq.kmeans(vtemp, 4, iter=50)[0]
clusters *= vstd
clusters.sort()
vcenter = (clusters[mid_lev] + clusters[mid_lev - 1]) / 2.0
# compute eye opening/margin
openings = []
tr_widths = []
last_val = best_values[0]
bot_val = last_val
cur_cidx = 0
for cur_val in best_values:
cur_cluster = clusters[cur_cidx]
next_cluster = clusters[cur_cidx + 1]
if abs(cur_val - cur_cluster) > abs(cur_val - next_cluster):
openings.append(cur_val - last_val)
tr_widths.append(last_val - bot_val)
cur_cidx += 1
if cur_cidx == len(clusters) - 1:
tr_widths.append(best_values[-1] - cur_val)
break
bot_val = cur_val
last_val = cur_val
return {'center': (float(toff_vec[best_idx]), vcenter),
'levels': clusters,
'heights': clusters[1:] - clusters[:-1],
'openings': np.array(openings),
'trace_widths': np.array(tr_widths)
}
[docs] def _add_xy(self, other):
if not isinstance(other, Waveform):
raise ValueError("Trying to add non-Waveform object.")
xnew = np.concatenate((self.xvec, other.xvec))
xnew = np.unique(np.around(xnew / self.xtol)) * self.xtol
# noinspection PyTypeChecker
y1 = self(xnew)
y2 = other(xnew)
return xnew, y1 + y2
[docs] def __add__(self, other):
if np.isscalar(other):
return Waveform(np.array(self.xvec), self.yvec + other, self.xtol, order=self.order,
ext=self.ext)
elif isinstance(other, Waveform):
new_order = max(self.order, other.order)
xvec, yvec = self._add_xy(other)
return Waveform(xvec, yvec, self.xtol, order=new_order, ext=self.ext)
else:
raise Exception('type %s not supported' % type(other))
[docs] def __neg__(self):
return Waveform(np.array(self.xvec), -self.yvec, self.xtol, order=self.order, ext=self.ext)
[docs] def __mul__(self, scale):
if not np.isscalar(scale):
raise ValueError("Can only multiply by scalar.")
return Waveform(np.array(self.xvec), scale * self.yvec, self.xtol, order=self.order,
ext=self.ext)