# BSD 3-Clause License
#
# Copyright (c) 2018, Regents of the University of California
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import re
from typing import Mapping, Any, Dict, Tuple, Union, Optional, Sequence, List
from pathlib import Path
from libpsf import PSFDataSet
import numpy as np
from pybag.enum import DesignOutput
from .data import AnalysisData, SimData, _check_is_md, combine_ana_sim_envs
from .nutbin import parse_sweep_info
# The following psf analysis types are mapped to different analysis type names for SimData
[docs]_ANA_TYPE_MAP = {
'td.pss': 'pss_td',
'fd.pss': 'pss_fd',
'tran.pss': 'pss_tran',
}
[docs]class LibPSFParser:
def __init__(self, raw_path: Path, rtol: float, atol: float, num_proc: int = 1) -> None:
self._cwd_path = raw_path.parent
self._num_proc = num_proc
lp_data = self.parse_raw_folder(raw_path)
self._sim_data = self.convert_to_sim_data(lp_data, rtol, atol)
@property
[docs] def sim_data(self) -> SimData:
return self._sim_data
[docs] def parse_raw_folder(self, raw_path: Path) -> Mapping[str, Any]:
ana_dict = {}
if self._num_proc == 1:
self._parse_raw_folder_helper(raw_path, ana_dict)
else:
# multi-processing mode
for pidx in range(self._num_proc):
self._parse_raw_folder_helper(raw_path / f'{pidx + 1}', ana_dict)
return ana_dict
[docs] def _parse_raw_folder_helper(self, raw_path: Path, ana_dict: Dict[str, Any]) -> None:
at_se_check = [] # keep track of which (ana_type, sim_env)'s outer sweep info has been checked in here
for fname in raw_path.iterdir():
# some files have multiple suffixes, so split name at first '.' to get entire suffix
suf = fname.name.split('.', 1)[-1]
if suf not in ['logFile', 'sweep', 'pac']:
info = self.get_info_from_fname(fname.name)
data, inner_sweep = self.parse_raw_file(fname)
info['inner_sweep'] = inner_sweep
self.populate_dict(ana_dict, info, raw_path, data, at_se_check)
@staticmethod
[docs] def parse_raw_file(fname: Path) -> Tuple[Dict[str, Union[np.ndarray, float]], str]:
ds = PSFDataSet(str(fname))
data = {}
# get inner sweep, if any
if ds.is_swept():
swp_vars = ds.get_sweep_param_names()
if len(swp_vars) == 1:
inner_sweep = swp_vars[0]
data[inner_sweep] = ds.get_sweep_values()
else:
# Only innermost sweep is in this file.
raise NotImplementedError
else:
inner_sweep = ''
# get signals
for sig_name in ds.get_signal_names():
sig = ds.get_signal(sig_name)
if isinstance(sig, float):
data[sig_name] = sig
elif isinstance(sig[0], dict):
# PNoise: separate out the noise components
pn_dict = {}
for _sig in sig:
for bkey, val in _sig.items():
key = f'{sig_name}.{bkey.decode("ascii")}'
if key in pn_dict:
pn_dict[key].append(val)
else:
pn_dict[key] = [val]
for key, val in pn_dict.items():
data[key] = np.array(val)
else:
data[sig_name] = sig
return data, inner_sweep
@staticmethod
[docs] def get_info_from_fname(fname: str) -> Dict[str, Any]:
# get ana_name from fname
ana_name, suf = fname.split('.', 1)
# get ana_type and sim_env from ana_name
ana_type_fmt = '[a-zA-Z]+'
sim_env_fmt = '[a-zA-Z0-9]+_[a-zA-Z0-9]+'
m = re.search(f'({ana_type_fmt})__({sim_env_fmt})', ana_name)
ana_type = m.group(1)
# For PSS, edit ana_type based on inner sweep
if ana_type == 'pss':
ana_type = _ANA_TYPE_MAP.get(suf)
if ana_type is None:
raise NotImplementedError(f'Unrecognized suf = {suf}')
# For PAC, get harmonic number
if ana_type == 'pac':
harmonic = int(suf.split('.')[0])
else:
harmonic = None
# get outer sweep information from ana_name, if any
m_swp = re.findall('swp[0-9]{2}', ana_name)
m_swp1 = re.findall('swp[0-9]{2}-[0-9]{3}', ana_name)
key_list = []
for idx, val in enumerate(m_swp1):
key_list.append(val[-3:])
return dict(
ana_name=ana_name,
sim_env=m.group(2),
ana_type=ana_type,
swp_info=m_swp,
swp_key='_'.join(key_list),
harmonic=harmonic,
)
[docs] def populate_dict(self, ana_dict: Dict[str, Any], info: Mapping[str, Any], raw_path: Path,
data: Dict[str, Union[np.ndarray, float]], at_se_check: List[Tuple[str, str]]) -> None:
ana_type: str = info['ana_type']
sim_env: str = info['sim_env']
swp_key: str = info['swp_key']
swp_info: Sequence[str] = info['swp_info']
harmonic: Optional[int] = info['harmonic']
inner_sweep: str = info['inner_sweep']
if ana_type not in ana_dict:
ana_dict[ana_type] = {}
if sim_env not in ana_dict[ana_type]:
# get outer sweep, if any
swp_vars, swp_data = parse_sweep_info(swp_info, raw_path, ana_type, sim_env, offset=16)
ana_dict[ana_type][sim_env] = {
'inner_sweep': inner_sweep,
'outer_sweep': swp_key is not '',
'harmonics': harmonic is not None,
'swp_vars': swp_vars,
'swp_data': swp_data,
}
if swp_key or (harmonic is not None):
ana_dict[ana_type][sim_env]['data'] = {}
if self._num_proc > 1 and int(raw_path.name) > 1 and (ana_type, sim_env) not in at_se_check:
# multiprocessing mode, need to parse sweep file once in every distributed directory for
# each (ana_type, sim_env) pair
swp_vars, swp_data = parse_sweep_info(swp_info, raw_path, ana_type, sim_env, offset=16)
for _key, _val in swp_data.items():
_val_ini = ana_dict[ana_type][sim_env]['swp_data'][_key]
ana_dict[ana_type][sim_env]['swp_data'][_key] = np.concatenate((_val_ini, _val))
at_se_check.append((ana_type, sim_env))
# PAC harmonics are handled separately from parametric sweep
if swp_key:
if harmonic is not None:
if swp_key not in ana_dict[ana_type][sim_env]['data']:
ana_dict[ana_type][sim_env]['data'][swp_key] = {}
ana_dict[ana_type][sim_env]['data'][swp_key][harmonic] = data
else:
ana_dict[ana_type][sim_env]['data'][swp_key] = data
else:
if harmonic is not None:
ana_dict[ana_type][sim_env]['data'][harmonic] = data
else:
ana_dict[ana_type][sim_env]['data'] = data
[docs] def convert_to_sim_data(self, lp_data, rtol: float, atol: float) -> SimData:
ana_dict = {}
sim_envs = []
for ana_type, sim_env_dict in lp_data.items():
sim_envs = sorted(sim_env_dict.keys())
sub_ana_dict = {}
for sim_env, lp_dict in sim_env_dict.items():
sub_ana_dict[sim_env] = self.convert_to_analysis_data(lp_dict, rtol, atol)
ana_dict[ana_type] = combine_ana_sim_envs(sub_ana_dict, sim_envs)
return SimData(sim_envs, ana_dict, DesignOutput.SPECTRE)
[docs] def convert_to_analysis_data(self, lp_dict: Mapping[str, Any], rtol: float, atol: float) -> AnalysisData:
data = {}
# get sweep information
inner_sweep: str = lp_dict['inner_sweep']
outer_sweep: bool = lp_dict['outer_sweep']
harmonics: bool = lp_dict['harmonics']
swp_vars = lp_dict['swp_vars']
swp_data = lp_dict['swp_data']
lp_data = lp_dict['data']
if outer_sweep:
swp_combos = []
swp_keys = sorted(lp_data.keys())
for key in swp_keys:
swp_combo = []
for _vridx, _vlidx in enumerate(key.split('_')):
swp_combo.append(swp_data[swp_vars[_vridx]][int(_vlidx)])
swp_combos.append(swp_combo)
num_swp = len(swp_combos[0])
if harmonics:
swp_vars.append('harmonic')
new_swp_combos = []
harm_swp = sorted(lp_data[swp_keys[0]].keys())
for swp_combo in swp_combos:
for _harm in harm_swp:
new_swp_combos.append(swp_combo + [_harm])
swp_combos = new_swp_combos
num_swp += 1
else:
harm_swp = []
swp_len = len(swp_combos)
swp_combo_list = [np.array(swp_combos)[:, i] for i in range(num_swp)]
else:
swp_keys = []
if harmonics:
swp_vars = ['harmonic']
harm_swp = sorted(lp_data.keys())
swp_len = len(harm_swp)
swp_combo_list = [np.array(harm_swp)]
else:
harm_swp = []
swp_len = 0
swp_combo_list = []
swp_shape, swp_vals = _check_is_md(1, swp_combo_list, rtol, atol, None) # single corner per set
is_md = swp_shape is not None
if is_md:
swp_combo = {var: swp_vals[i] for i, var in enumerate(swp_vars)}
else:
raise NotImplementedError("Parametric sweeps must be formatted multi-dimensionally")
data.update(swp_combo)
# parse each signal
if swp_len == 0: # no outer sweep
for sig_name, sig_y in lp_data.items():
if isinstance(sig_y, float):
data_shape = swp_shape
else:
data_shape = (*swp_shape, sig_y.shape[-1])
_new_sig = sig_name.replace('/', '.')
data[_new_sig] = sig_y if _new_sig == inner_sweep else np.reshape(sig_y, data_shape)
else: # combine outer sweeps
if outer_sweep:
if harmonics:
sig_names = lp_data[swp_keys[0]][0].keys()
else:
sig_names = lp_data[swp_keys[0]].keys()
else:
if harmonics:
sig_names = lp_data[0].keys()
else:
raise NotImplementedError('Not possible.')
for sig_name in sig_names:
yvecs = []
if outer_sweep:
if harmonics:
yvecs = [lp_data[key0][key1][sig_name] for key1 in harm_swp for key0 in swp_keys]
else:
yvecs = [lp_data[key][sig_name] for key in swp_keys]
else:
if harmonics:
yvecs = [lp_data[key][sig_name] for key in harm_swp]
if isinstance(yvecs[0], float):
sub_dims = () # not used
max_dim = 0 # not used
is_same_len = True
data_shape = swp_shape
else:
sub_dims = tuple(yvec.shape[0] for yvec in yvecs)
max_dim = max(sub_dims)
is_same_len = all((sub_dims[i] == sub_dims[0] for i in range(swp_len)))
data_shape = (*swp_shape, max_dim)
_new_sig = sig_name.replace('/', '.')
if not is_same_len:
yvecs_padded = [np.pad(yvec, (0, max_dim - dim), constant_values=np.nan)
for yvec, dim in zip(yvecs, sub_dims)]
sig_y = np.stack(yvecs_padded)
data[_new_sig] = np.reshape(sig_y, data_shape)
else:
if _new_sig == inner_sweep:
same_inner = True
for _yvec in yvecs[1:]:
if not np.allclose(yvecs[0], _yvec, rtol=rtol, atol=atol):
same_inner = False
break
if same_inner:
data[_new_sig] = yvecs[0]
else:
# same length but unequal sweep values
sig_y = np.stack(yvecs)
data[_new_sig] = np.reshape(sig_y, data_shape)
else:
sig_y = np.stack(yvecs)
data[_new_sig] = np.reshape(sig_y, data_shape)
if inner_sweep:
swp_vars.append(inner_sweep)
return AnalysisData(['corner'] + swp_vars, data, is_md)