# SPDX-License-Identifier: Apache-2.0
# Copyright 2019 Blue Cheetah Analog Design Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This module implements bag's interface with spectre simulator.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Dict, Any, Sequence, Optional, List, Tuple, Union, Mapping, Set
import re
import shutil
import time
from pathlib import Path
from pybag.enum import DesignOutput
from pybag.core import get_cdba_name_bits
from ..math import float_to_si_string
from ..io.file import read_yaml, open_file, is_valid_file, read_file
from ..io.string import wrap_string
from ..util.immutable import ImmutableList
from .data import (
MDSweepInfo, SimData, SetSweepInfo, SweepLinear, SweepLog, SweepList, SimNetlistInfo,
SweepSpec, MonteCarlo, AnalysisInfo, AnalysisAC, AnalysisSP, AnalysisNoise, AnalysisTran,
AnalysisSweep1D, AnalysisPSS, AnalysisPNoise
)
from .base import SimProcessManager, get_corner_temp
from .hdf5 import load_sim_data_hdf5, save_sim_data_hdf5
from .nutbin import NutBinParser
from .libpsf_simdata import LibPSFParser
# The use of pysrr to parse simulation data requires an external Python package.
# Since this import is only required when pysrr is used, it is treated as an optional import
try:
from .srr import srr_to_sim_data
except ModuleNotFoundError as e:
[docs] srr_to_sim_data = None # assume pysrr is not needed, error later if assumption is false
srr_import_err = e
else:
srr_import_err = None
if TYPE_CHECKING:
from .data import SweepInfo
[docs]reserve_params = {'freq', 'time'}
[docs]class SpectreInterface(SimProcessManager):
"""This class handles interaction with Spectre simulators.
Parameters
----------
tmp_dir : str
temporary file directory for SimAccess.
sim_config : Dict[str, Any]
the simulation configuration dictionary. Contains the following options:
env_file : str
the yaml path for PVT corners.
use_pysrr : bool
True to use pysrr. Defaults to False.
compress : bool
True to compress simulation data when saving to HDF5 file. Defaults to True.
rtol: float
relative tolerance for checking if 2 simulation values are the same. Defaults to 1e-8.
atol: float
absolute tolerance for checking if 2 simulation values are the same. Defaults to 1e-22.
kwargs : Dict[str, Any]
additional spectre simulation arguments. Contains the following options:
command : str
the command to launch simulator. Defaults to spectre.
env : Optional[Dict[str, str]]
an optional dictionary of environment variables. None to inherit from parent. Defaults to None.
run_64 : bool
True to run in 64-bit mode. Defaults to True.
format : str
the output raw data file format. Defaults to psfxl.
psfversion : str
the version of psfxl to use. If not specified, defaults to the simulator's default psfversion.
options : List[str]
the command line simulator options. Defaults to an empty list.
"""
def __init__(self, tmp_dir: str, sim_config: Dict[str, Any]) -> None:
SimProcessManager.__init__(self, tmp_dir, sim_config)
self._model_setup: Dict[str, List[Tuple[str, str]]] = read_yaml(sim_config['env_file'])
self._use_pysrr: bool = sim_config.get('use_pysrr', False)
self._sim_kwargs: Dict[str, Any] = sim_config.get('kwargs', {})
self._out_fmt: str = self._sim_kwargs.get('format', 'psfxl')
self._psf_version: str = self._sim_kwargs.get('psfversion', '')
if self._out_fmt != 'psfxl': # clear psf_version since it's only used for psfxl
self._psf_version = ''
@property
[docs] def netlist_type(self) -> DesignOutput:
return DesignOutput.SPECTRE
[docs] def create_netlist(self, output_path: Path, sch_netlist: Path, info: SimNetlistInfo,
precision: int = 6) -> None:
output_path_str = str(output_path.resolve())
sch_netlist_path_str = str(sch_netlist.resolve())
if ('<' in output_path_str or '>' in output_path_str or
'<' in sch_netlist_path_str or '>' in sch_netlist_path_str):
raise ValueError('spectre does not support directory names with angle brackets.')
sim_envs = info.sim_envs
analyses = info.analyses
params = info.params
env_params = info.env_params
swp_info = info.swp_info
monte_carlo = info.monte_carlo
sim_options = info.options
init_voltages = info.init_voltages
if monte_carlo is not None and \
(isinstance(swp_info, SetSweepInfo) or (isinstance(swp_info, MDSweepInfo) and swp_info.params)
or len(sim_envs) > 1):
raise NotImplementedError('Monte Carlo simulation not implemented for parameter sweep '
'and/or process sweep')
with open_file(sch_netlist, 'r') as f:
lines = [l.rstrip() for l in f]
# write simulator options
if self._psf_version and 'psfversion' not in sim_options:
sim_options = sim_options.copy(append=dict(psfversion=self._psf_version))
if sim_options:
sim_opt_list = ['simulatorOptions', 'options']
for opt, val in sim_options.items():
sim_opt_list.append(f'{opt}={val}')
sim_opt_str = wrap_string(sim_opt_list)
lines.append(sim_opt_str)
# write parameters
param_fmt = 'parameters {}={}'
param_set = reserve_params.copy()
for par, val in swp_info.default_items():
if par not in param_set:
lines.append(param_fmt.format(par, _format_val(val, precision)))
param_set.add(par)
for par, val_list in env_params.items():
if par in param_set:
raise ValueError('Cannot set a sweep parameter as environment parameter.')
lines.append(param_fmt.format(par, _format_val(val_list[0], precision)))
param_set.add(par)
for par, val in params.items():
if par not in param_set:
lines.append(param_fmt.format(par, _format_val(val, precision)))
param_set.add(par)
for ana in analyses:
par = ana.param
if par and par not in param_set:
lines.append(param_fmt.format(par, _format_val(ana.param_start, precision)))
param_set.add(par)
lines.append('')
if isinstance(swp_info, SetSweepInfo):
# write paramset declaration if needed
_write_param_set(lines, swp_info.params, swp_info.values, precision)
lines.append('')
if init_voltages:
# write initial conditions
ic_line = 'ic'
for key, val in init_voltages.items():
key = get_cdba_name_bits(key, DesignOutput.SPECTRE)
if len(key) > 1:
raise ValueError("Separate initial bus into multiple values")
key = key[0]
ic_line += f' {key}={_format_val(val, precision)}'
lines.append(ic_line)
lines.append('')
has_ic = True
else:
has_ic = False
# write statements for each simulation environment
# write default model statements
for idx, sim_env in enumerate(sim_envs):
corner, temp = get_corner_temp(sim_env)
if idx != 0:
# start altergroup statement
lines.append(f'{sim_env} altergroup {{')
_write_sim_env(lines, self._model_setup[corner], temp)
if idx != 0:
# write environment parameters for second sim_env and on
for par, val_list in env_params.items():
lines.append(param_fmt.format(par, val_list[idx]))
# close altergroup statement
lines.append('}')
lines.append('')
# write sweep statements
num_brackets = _write_sweep_start(lines, swp_info, idx, precision)
# write Monte Carlo statements if present
if isinstance(monte_carlo, MonteCarlo):
num_brackets += _write_monte_carlo(lines, monte_carlo)
if num_brackets > 0:
lines.append('')
# write analyses
save_outputs = set()
jitter_event = []
for ana in analyses:
jitter_event = _write_analysis(lines, sim_env, ana, precision, has_ic)
lines.append('')
for output in ana.save_outputs:
try:
save_outputs.update(get_cdba_name_bits(output, DesignOutput.SPECTRE))
except ValueError:
save_outputs.update([output])
# close sweep statements
for _ in range(num_brackets):
lines.append('}')
if num_brackets > 0:
lines.append('')
# jitterevent is not an analysis and has to be written outside sweep analysis (message from Spectre)
lines += jitter_event
lines.append('')
# write save statements
_write_save_statements(lines, save_outputs)
with open_file(output_path, 'w') as f:
f.write('\n'.join(lines))
f.write('\n')
[docs] def get_sim_file(self, dir_path: Path, sim_tag: str) -> Path:
return dir_path / f'{sim_tag}.hdf5'
[docs] def load_sim_data(self, dir_path: Path, sim_tag: str) -> SimData:
hdf5_path = self.get_sim_file(dir_path, sim_tag)
import time
print('Reading HDF5')
start = time.time()
ans = load_sim_data_hdf5(hdf5_path)
stop = time.time()
print(f'HDF5 read took {stop - start:.4g} seconds.')
return ans
[docs] async def async_run_simulation(self, netlist: Path, sim_tag: str) -> None:
netlist = netlist.resolve()
if not netlist.is_file():
raise FileNotFoundError(f'netlist {netlist} is not a file.')
sim_kwargs: Dict[str, Any] = self._sim_kwargs
compress: bool = self.config.get('compress', True)
rtol: float = self.config.get('rtol', 1e-8)
atol: float = self.config.get('atol', 1e-22)
cmd_str: str = sim_kwargs.get('command', 'spectre')
env: Optional[Dict[str, str]] = sim_kwargs.get('env', None)
run_64: bool = sim_kwargs.get('run_64', True)
options = sim_kwargs.get('options', [])
sim_cmd = [cmd_str, '-cols', '100', '-colslog', '100',
'-format', self._out_fmt, '-raw', f'{sim_tag}.raw']
if run_64:
sim_cmd.append('-64')
for opt in options:
sim_cmd.append(opt)
sim_cmd.append(str(netlist))
cwd_path = netlist.parent.resolve()
log_path = cwd_path / 'spectre_output.log'
raw_path: Path = cwd_path / f'{sim_tag}.raw'
hdf5_path: Path = cwd_path / f'{sim_tag}.hdf5'
# delete previous .raw and .hdf5
for fname in cwd_path.iterdir():
if fname.name.startswith(raw_path.name) or fname.suffix == '.hdf5':
try:
if fname.is_dir():
shutil.rmtree(str(fname))
elif fname.is_file():
fname.unlink()
except FileNotFoundError: # Ignore errors from race conditions
pass
ret_code = await self.manager.async_new_subprocess(sim_cmd, str(log_path),
env=env, cwd=str(cwd_path))
if ret_code is None or ret_code != 0:
raise ValueError(f'Spectre simulation ended with error. See log file: {log_path}')
# Check if raw_path is created. Give some slack for IO latency
iter_cnt = 0
while not ((self._out_fmt.startswith('psf') and raw_path.is_dir()) or
(self._out_fmt.startswith('nut') and raw_path.is_file())):
if iter_cnt > 120:
raise ValueError(f'Spectre simulation ended with error. See log file: {log_path}')
time.sleep(1)
iter_cnt += 1
if not is_valid_file(log_path, 'spectre completes with', 120, 1):
raise ValueError(f'Spectre simulation ended with error. See log file: {log_path}')
log_contents = read_file(log_path)
if 'spectre completes with 0 errors' not in log_contents:
raise ValueError(f'Spectre simulation ended with error. See log file: {log_path}')
num_proc = 1
if self._out_fmt.startswith('psf'):
# check if Monte Carlo sim
for fname in raw_path.iterdir():
if str(fname).endswith('Distributed'):
analysis_info: Path = fname / 'Analysis.info'
with open_file(analysis_info, 'r') as f:
line = f.readline()
num_proc = int(re.search(r'(.*) (\d*)\n', line).group(2))
raw_sep: Path = raw_path / f'{num_proc}'
for fname_sep in raw_sep.iterdir():
if str(fname_sep).endswith('.mapping'):
# Monte Carlo sim in multiprocessing mode
mapping_lines = []
for i in range(num_proc):
with open_file(raw_path / f'{i + 1}' / fname_sep.name, 'r') as fr:
for line_in in fr:
mapping_lines.append(line_in)
await self._format_monte_carlo(mapping_lines, cwd_path, compress, rtol,
atol, hdf5_path)
return
elif str(fname).endswith('.mapping'):
# Monte Carlo sim in single processing mode
mapping_lines = open_file(fname, 'r').readlines()
await self._format_monte_carlo(mapping_lines, cwd_path, compress, rtol, atol,
hdf5_path)
return
# convert to HDF5
if self._out_fmt == 'psfxl':
log_path = cwd_path / 'srr_to_hdf5.log'
await self._srr_to_hdf5(compress, rtol, atol, raw_path, hdf5_path, log_path, cwd_path)
elif self._out_fmt == 'psfbin':
lpp = LibPSFParser(raw_path, rtol, atol, num_proc)
save_sim_data_hdf5(lpp.sim_data, hdf5_path, compress)
# post-process HDF5 to convert to MD array
_process_hdf5(hdf5_path, rtol, atol)
elif self._out_fmt == 'nutbin':
nbp_mc = False
for fname in cwd_path.iterdir():
if str(fname).endswith('.mapping'):
nbp_mc = True
break
nbp = NutBinParser(raw_path, rtol, atol, nbp_mc)
save_sim_data_hdf5(nbp.sim_data, hdf5_path, compress)
# post-process HDF5 to convert to MD array
_process_hdf5(hdf5_path, rtol, atol)
[docs] async def _srr_to_hdf5(self, compress: bool, rtol: float, atol: float, raw_path: Path,
hdf5_path: Path, log_path: Path, cwd_path: Path) -> None:
comp_str = '1' if compress else '0'
rtol_str = f'{rtol:.4g}'
atol_str = f'{atol:.4g}'
if self._use_pysrr:
if srr_to_sim_data is None: # re-raise error from loading pysrr
raise srr_import_err
sim_data = srr_to_sim_data(raw_path, rtol, atol)
save_sim_data_hdf5(sim_data, hdf5_path, compress)
else:
sim_cmd = ['srr_to_hdf5', str(raw_path), str(hdf5_path), comp_str, rtol_str, atol_str]
ret_code = await self.manager.async_new_subprocess(sim_cmd, str(log_path),
cwd=str(cwd_path))
if ret_code is None or ret_code != 0:
raise ValueError(f'srr_to_hdf5 ended with error. See log file: {log_path}')
if not is_valid_file(hdf5_path, None, 120, 1):
raise ValueError(f'srr_to_hdf5 ended with error. See log file: {log_path}')
# post-process HDF5 to convert to MD array
_process_hdf5(hdf5_path, rtol, atol)
[docs]def _write_sim_env(lines: List[str], models: List[Tuple[str, str]], temp: int) -> None:
for fname, section in models:
if section:
lines.append(f'include "{fname}" section={section}')
else:
lines.append(f'include "{fname}"')
lines.append(f'tempOption options temp={temp}')
[docs]def _write_param_set(lines: List[str], params: Sequence[str],
values: Sequence[ImmutableList[float]], precision: int) -> None:
# get list of lists of strings to print, and compute column widths
data = [params]
col_widths = [len(par) for par in params]
for combo in values:
str_list = []
for idx, val in enumerate(combo):
cur_str = _format_val(val, precision)
col_widths[idx] = max(col_widths[idx], len(cur_str))
str_list.append(cur_str)
data.append(str_list)
# write the columns
lines.append('swp_data paramset {')
for row in data:
lines.append(' '.join(val.ljust(width) for val, width in zip(row, col_widths)))
lines.append('}')
[docs]def _get_sweep_str(par: str, swp_spec: Optional[SweepSpec], precision: int) -> str:
if not par or swp_spec is None:
return ''
if isinstance(swp_spec, SweepList):
val_list = swp_spec.values
# abstol check
num_small = 0
for val in val_list:
if abs(val) < 3.0e-16:
num_small += 1
if num_small > 1:
raise ValueError('sweep values are below spectre abstol, try to find a work around')
tmp = ' '.join((_format_val(val, precision) for val in val_list))
val_str = f'values=[{tmp}]'
elif isinstance(swp_spec, SweepLinear):
# spectre: stop is inclusive, lin = number of points excluding the last point
val_str = f'start={swp_spec.start} stop={swp_spec.stop_inc} lin={swp_spec.num - 1}'
elif isinstance(swp_spec, SweepLog):
# spectre: stop is inclusive, log = number of points excluding the last point
val_str = f'start={swp_spec.start} stop={swp_spec.stop_inc} log={swp_spec.num - 1}'
else:
raise ValueError('Unknown sweep specification.')
if par in reserve_params:
return val_str
else:
return f'param={par} {val_str}'
[docs]def _get_options_str(options: Mapping[str, str]) -> str:
return ' '.join((f'{key}={val}' for key, val in options.items()))
[docs]def _write_sweep_start(lines: List[str], swp_info: SweepInfo, swp_idx: int, precision: int) -> int:
if isinstance(swp_info, MDSweepInfo):
for dim_idx, (par, swp_spec) in enumerate(swp_info.params):
statement = _get_sweep_str(par, swp_spec, precision)
lines.append(f'swp{swp_idx}{dim_idx} sweep {statement} {{')
return swp_info.ndim
else:
lines.append(f'swp{swp_idx} sweep paramset=swp_data {{')
return 1
[docs]def _write_monte_carlo(lines: List[str], mc: MonteCarlo) -> int:
cur_line = f'__{mc.name}__ montecarlo numruns={mc.numruns} seed={mc.seed}'
options_dict = dict(savefamilyplots='yes', appendsd='yes', savedatainseparatedir='yes',
donominal='yes', variations='all')
options_dict.update(mc.options)
opt_str = _get_options_str(options_dict)
if opt_str:
cur_line += ' '
cur_line += opt_str
cur_line += ' {'
lines.append(cur_line)
return 1
[docs]def _write_analysis(lines: List[str], sim_env: str, ana: AnalysisInfo, precision: int,
has_ic: bool) -> List[str]:
cur_line = f'__{ana.name}__{sim_env}__'
if hasattr(ana, 'p_port') and ana.p_port:
cur_line += f' {ana.p_port}'
if hasattr(ana, 'n_port') and ana.n_port:
cur_line += f' {ana.n_port}'
cur_line += f' {ana.name}'
if isinstance(ana, AnalysisTran):
cur_line += (f' start={_format_val(ana.start, precision)}'
f' stop={_format_val(ana.stop, precision)}')
if isinstance(ana.out_start, str) or ana.out_start > 0:
val_str = _format_val(ana.out_start, precision)
cur_line += f' outputstart={val_str} strobestart={val_str}'
if ana.strobe != 0:
cur_line += f' strobeperiod={_format_val(ana.strobe)}'
if has_ic:
cur_line += ' ic=node'
elif isinstance(ana, AnalysisSweep1D):
par = ana.param
sweep_str = _get_sweep_str(par, ana.sweep, precision)
cur_line += ' '
cur_line += sweep_str
if isinstance(ana, AnalysisAC) and par != 'freq':
cur_line += f' freq={float_to_si_string(ana.freq, precision)}'
if isinstance(ana, AnalysisSP):
cur_line += f' ports=[{" ".join(ana.ports)}] paramtype={ana.param_type.name.lower()}'
elif isinstance(ana, AnalysisNoise):
if ana.out_probe:
cur_line += f' oprobe={ana.out_probe}'
elif hasattr(ana, 'measurement') and ana.measurement:
meas_list = [f'pm{idx}' for idx in range(len(ana.measurement))]
cur_line += f' measurement=[{" ".join(meas_list)}]'
elif not (hasattr(ana, 'p_port') and ana.p_port):
raise ValueError('Either specify out_probe, or specify p_port and n_port, or specify measurement.')
if ana.in_probe:
cur_line += f' iprobe={ana.in_probe}'
elif isinstance(ana, AnalysisPSS):
if ana.period == 0.0 and ana.fund == 0.0 and ana.autofund is False:
raise ValueError('For PSS simulation, either specify period or fund, '
'or set autofund = True')
if ana.period > 0.0:
cur_line += f' period={ana.period}'
if ana.fund > 0.0:
cur_line += f' fund={ana.fund}'
if ana.autofund:
cur_line += f' autofund=yes'
if ana.strobe != 0:
cur_line += f' strobeperiod={_format_val(ana.strobe)}'
else:
raise ValueError('Unknown analysis specification.')
opt_str = _get_options_str(ana.options)
if opt_str:
cur_line += ' '
cur_line += opt_str
if ana.save_outputs:
cur_line += ' save=selected'
lines.append(cur_line)
jitter_event = []
if isinstance(ana, AnalysisPNoise):
if ana.measurement:
for idx, event in enumerate(ana.measurement):
cur_line = f'pm{idx} jitterevent trigger=[{event.trig_p} {event.trig_n}] ' \
f'triggerthresh={event.triggerthresh} triggernum={event.triggernum} ' \
f'triggerdir={event.triggerdir} target=[{event.targ_p} {event.targ_n}]'
jitter_event.append(cur_line)
return jitter_event
[docs]def _write_save_statements(lines: List[str], save_outputs: Set[str]):
for save_out in sorted(save_outputs):
lines.append(f'save {save_out}')
lines.append('')
[docs]def _process_hdf5(path: Path, rtol: float, atol: float) -> None:
proc = 'process'
sim_data = load_sim_data_hdf5(path)
modified = False
for grp in sim_data.group_list:
sim_data.open_group(grp)
if proc in sim_data.sweep_params:
modified |= sim_data.remove_sweep(proc, rtol=rtol, atol=atol)
if modified:
save_sim_data_hdf5(sim_data, path)