PyHCL/pyhcl/simulator/sim.py

377 lines
10 KiB
Python

# Copyright (c) 2019 scutdig
# Licensed under the MIT license.
import logging
import mmap
import os
# Backend config
import time
from inspect import getmodule
from pyhcl import *
psize = 4096
uint64_t_size = 8
input_signum = 0
output_signum = 0
# Signals
WAIT = 0
DOUT = 1
DIN = 2
TERM = 3
STEP = 4
START = 5
RESET = 6
def read_data(mm):
rdata = int.from_bytes(mm.read(uint64_t_size), "little")
mm.seek(0)
return rdata
def write_data(mm, value):
mm.write(int(value).to_bytes(uint64_t_size, "little"))
mm.seek(0)
def search_io(io, input_sig_map, output_sig_map):
global input_signum, output_signum
for k in io.__dict__:
if not k.startswith("_") and not k.startswith("__"):
obj = io.__dict__[k]
if isinstance(obj, Bundle):
search_io(obj, input_sig_map, output_sig_map)
elif isinstance(obj, Input):
input_sig_map[obj] = input_signum
input_signum += 1
elif isinstance(obj, Output):
output_sig_map[obj] = output_signum
output_signum += 1
def select_datawrapper(width):
if 1 <= width <= 8:
return "CDataWrapper"
elif 9 <= width <= 16:
return "SDataWrapper"
elif 17 <= width <= 32:
return "IDataWrapper"
elif 33 <= width <= 64:
return "QDataWrapper"
def push_data(input_sig_map, output_sig_map):
cat_table = []
for k in input_sig_map.keys():
(port_name, width) = k.sig
if width <= 0:
raise ValueError("Simulate IO ports must specified width")
datawrapper = select_datawrapper(width)
cat_table.append(
"\t\tthis->sim_datas.inputs.push_back(new {datawrapper}(&(dut->{name})));\n".format(
datawrapper=datawrapper,
name=port_name))
for k in output_sig_map.keys():
(port_name, width) = k.sig
if width <= 0:
raise ValueError("Simulate IO ports must specified width")
datawrapper = select_datawrapper(width)
cat_table.append(
"\t\tthis->sim_datas.outputs.push_back(new {datawrapper}(&(dut->{name})));\n".format(
datawrapper=datawrapper,
name=port_name))
return "".join(cat_table)
def ports_to_handler(ports, input_sig_map, output_sig_map):
global input_signum, output_signum
handler = Handler()
clock = Handler()
clock.sig = ('clock', 1)
handler.clock = clock
input_sig_map[clock] = input_signum
input_signum += 1
reset = Handler()
reset.sig = ('reset', 1)
handler.__dict__['reset'] = reset
input_sig_map[reset] = input_signum
input_signum += 1
io = Handler()
io.sig = ('io',)
handler.io = io
for f in next(f.typ.fields for f in ports.fields if f.name == 'io'):
obj = Handler()
obj.sig = (f"io_{f.name}", f.typ.width.width)
io.__dict__[f.name] = obj
from pyhcl.ir.low_ir import Flip
if f.flip == Flip():
input_sig_map[obj] = input_signum
input_signum += 1
else:
output_sig_map[obj] = output_signum
output_signum += 1
return handler
class Handler:
def __hash__(self):
return hash(self.sig)
class DpiConfig(object):
def __init__(self, pkg_sv_path=".sv/pkg/pysv_pkg.sv", bbox_sv_dir=".sv/bbox/", lib_path=".build/libpysv.so"):
self.sv = pkg_sv_path
self.lib = lib_path
self.bdir = bbox_sv_dir
self.bname = " ".join(os.listdir(self.bdir))
...
class Simulator(object):
def __init__(self, module, dpiconfig: DpiConfig = None):
low_module = Emitter.elaborate(module)
module_name = low_module.main
ports = next(m.typ for m in low_module.modules if m.name == module_name)
# Cat table of the harness code string
cat_table = []
# Pipe channel open in channel init
self.mm_in = None
self.mm_sig = None
self.mm_out = None
self.input_sig_map = {}
self.output_sig_map = {}
self.dut_name = module_name
self.step_count = 0
self.handler = ports_to_handler(ports, self.input_sig_map,
self.output_sig_map)
# Generate cpp harness code
harness_code_A = """#include \"V{name}.h\"
#include \"simulator.h\"
using namespace std;
class {name}_Simulator: public Simulator<DataWrapper*>
{{
private:
V{name}* dut;
VerilatedVcdC *tfp;
int psize;
public:
{name}_Simulator(V{name}* _dut): Simulator()
{{
this->dut = _dut;
this->psize = getpagesize();
}}
~{name}_Simulator()
{{
munmap(this->in, this->psize);
munmap(this->sig, this->psize);
munmap(this->out, this->psize);
close(this->in_fd);
close(this->out_fd);
close(this->sig_fd);
remove("./in.dat");
remove("./sig.dat");
remove("./out.dat");
}}
void init_tfp(VerilatedVcdC *_tfp)
{{
this->tfp = _tfp;
}}
void init_simdata()
{{
this->sim_datas.inputs.clear();
this->sim_datas.outputs.clear();
""".format(name=self.dut_name)
cat_table.append(harness_code_A)
push_str = push_data(self.input_sig_map, self.output_sig_map)
cat_table.append(push_str)
harness_code_B = """\t}}
virtual void step()
{{
this->dut->clock = 0;
this->dut->eval();
this->tfp->dump(this->main_time);
this->main_time++;
this->dut->clock = 1;
this->dut->eval();
this->tfp->dump(this->main_time);
this->main_time++;
}}
virtual void reset()
{{
this->dut->reset = 1;
step();
}}
virtual void start()
{{
this->dut->reset = 0;
step();
}}
}};
int main(int argc, char **argv)
{{
Verilated::commandArgs(argc, argv);
Verilated::traceEverOn(true);
V{name} *top = new V{name};
VerilatedVcdC *tfp = new VerilatedVcdC;
tfp = new VerilatedVcdC;
top->trace(tfp, 99);
tfp->open("{name}.vcd");
{name}_Simulator sim(top);
sim.init_simdata();
sim.init_tfp(tfp);
top->reset = 1;
while(!sim.isexit())
sim.tick();
delete tfp;
delete top;
exit(0);
}}
""".format(name=self.dut_name)
cat_table.append(harness_code_B)
harness_code = "".join(cat_table)
try:
os.mkdir("simulation")
except FileExistsError:
pass
with open(f"./simulation/{self.dut_name}-harness.cpp", "w+") as f:
f.write(harness_code)
with open(f"./simulation/{self.dut_name}.fir", "w+") as f:
f.write(low_module.serialize())
os.system(
f"firrtl -i ./simulation/{self.dut_name}.fir -o ./simulation/{self.dut_name}.v -X verilog")
vfn = "{}.v".format(self.dut_name)
hfn = "{}-harness.cpp".format(self.dut_name)
mfn = "V{}.mk".format(self.dut_name)
efn = "V{}".format(self.dut_name)
src_file = "/".join(
getmodule(Simulator).__file__.split("/")[:-1]) + "/src/simulator.h"
os.system("cp {} ./simulation".format(src_file))
# dpi
if dpiconfig:
pysv_pkg = "{}_pysv_pkg.sv".format(self.dut_name)
pysv_lib = "libpysv_{}.so".format(self.dut_name)
os.system("cp {} ./simulation/{}_pysv_pkg.sv".format(dpiconfig.sv, self.dut_name))
os.system("cp {} ./simulation/libpysv_{}.so".format(dpiconfig.lib, self.dut_name))
os.system("cp {}* ./simulation/".format(dpiconfig.bdir))
os.chdir("./simulation")
# Using verilator backend
os.system(
"verilator --cc --trace --exe --prefix {prefix} --top-module {top} {pkg} {bbx} {vfn} {lib} {hfn}" \
.format(top=self.dut_name, bbx=dpiconfig.bname, vfn=vfn, hfn=hfn, pkg=pysv_pkg, lib=pysv_lib, prefix=efn))
os.system("cp {} ./obj_dir/".format(pysv_lib))
else:
os.chdir("./simulation")
# Using verilator backend
os.system(
"verilator --cc {vfn} --trace --exe {hfn}".format(vfn=vfn, hfn=hfn))
os.system(
"make -j -C ./obj_dir -f {mfn} {efn}".format(mfn=mfn, efn=efn))
# Run simulation backend program
if dpiconfig:
os.system("LD_LIBRARY_PATH=. ./obj_dir/{}&".format(efn))
else:
os.system("./obj_dir/{}&".format(efn))
time.sleep(1)
self.init_channel()
def init_channel(self):
self.mm_in = mmap.mmap(os.open("./in.dat", os.O_RDWR), psize)
self.mm_sig = mmap.mmap(os.open("./sig.dat", os.O_RDWR), psize)
self.mm_out = mmap.mmap(os.open("./out.dat", os.O_RDWR), psize)
def wait_signal(self):
signal = read_data(self.mm_sig)
while signal != WAIT:
signal = read_data(self.mm_sig)
def poke(self, port, value):
signum = self.input_sig_map[port]
# print(signum)
self.wait_signal()
write_data(self.mm_in, value)
self.mm_in.seek(uint64_t_size)
write_data(self.mm_in, signum)
write_data(self.mm_sig, DIN)
print("{}->".format(port.sig[0]), value)
def peek(self, port):
signum = self.output_sig_map[port]
# print(signum)
self.wait_signal()
self.mm_out.seek(uint64_t_size)
write_data(self.mm_out, signum)
write_data(self.mm_sig, DOUT)
self.wait_signal()
print("{}->".format(port.sig[0]), read_data(self.mm_out))
return self.mm_out
def step(self):
self.wait_signal()
write_data(self.mm_sig, STEP)
print("step %d" % self.step_count)
self.step_count += 1
def term(self):
self.wait_signal()
write_data(self.mm_sig, TERM)
time.sleep(1)
os.remove("./in.dat")
os.remove("./out.dat")
os.remove("./sig.dat")
def start(self):
write_data(self.mm_sig, START)
def reset(self):
write_data(self.mm_sig, RESET)