Merge pull request #2 from lx071/master

verilog_simlite
This commit is contained in:
Mosk0ng 2022-05-09 12:53:41 +08:00 committed by GitHub
commit 7bd1f7600b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 3419 additions and 37 deletions

View File

@ -2,6 +2,7 @@ from pyhcl import *
from pysv import sv, DataType, Reference
from pyhcl.simulator import Simlite, DpiConfig
class Add(BlackBox):
io = IO(
in1=Input(U.w(32)),
@ -14,11 +15,11 @@ class Add(BlackBox):
def fn(a, b):
return a + b
addpysvmodule(Add, fn)
compile_and_binding_all()
class Top(Module):
io = IO(
a=Input(U.w(32)),
@ -31,14 +32,16 @@ class Top(Module):
add.io.in2 <<= io.b
io.c <<= add.io.out
import time
def test():
cfg = DpiConfig()
#Emitter.dumpVerilog(Emitter.dump(Emitter.emit(Top()), "Top.fir"))
# Emitter.dumpVerilog(Emitter.dump(Emitter.emit(Top()), "Top.fir"))
s = Simlite(Top(), dpiconfig=cfg, debug=True)
time1 = time.time()
inputs = [[2000, 230032]]*40000000
inputs = [[2000, 230032]]*40
for i in range(2):
s.start_task(f"task_{i}", inputs)
time2 = time.time()
@ -48,5 +51,6 @@ def test():
print("time = " + str(time3 - time2))
s.close()
if __name__ == '__main__':
test()

39
example/half_adder.py Normal file
View File

@ -0,0 +1,39 @@
from pyhcl import *
from pyhcl.simulator import Simlite
class HalfAdder(Module):
io = IO(
a=Input(U.w(1)),
b=Input(U.w(1)),
s=Output(U.w(1)),
cout=Output(U.w(1))
)
# 填充完整输出端口s以及cout的逻辑例如
io.s <<= io.a ^ io.b
io.cout <<= io.a & io.b
def main():
Emitter.dumpVerilog(Emitter.dump(Emitter.emit(HalfAdder()), "HalfAdder.fir"))
# cfg = DpiConfig()
s = Simlite(HalfAdder())
# s.start()
# s.step([20, 20])
# s.step([15, 10])
# s.step([1000, 1])
# s.step([999, 201])
# s.close()
# 生成Verilog文件
if __name__ == '__main__':
Emitter.dumpVerilog(Emitter.dump(Emitter.emit(HalfAdder()), "HalfAdder.fir"))
# cfg = DpiConfig()
s = Simlite(HalfAdder())
s.start()
s.step([20, 20])
s.step([15, 10])
s.step([1000, 1])
s.step([999, 201])
s.close()

View File

@ -1,6 +1,8 @@
from pyhcl import *
from pysv import sv, DataType, Reference
from pyhcl.simulator import Simlite, DpiConfig
from queue import Queue
class Add(BlackBox):
io = IO(
@ -14,9 +16,9 @@ class Add(BlackBox):
def fn(a, b):
return a + b
addpysvmodule(Add, fn)
compile_and_binding_all()
addpysvmodule(Add, fn) # 黑盒与函数 # 转换得到.sv/bbox/Add.svSV里调用python函数
compile_and_binding_all() # 编译得到共享库 到.build文件夹下, 生成 SV binding文件 .sv/pkg/pysv_pkg.sv
class Top(Module):
@ -34,13 +36,32 @@ class Top(Module):
from random import randint
if __name__ == '__main__':
def main():
cfg = DpiConfig()
#Emitter.dumpVerilog(Emitter.dump(Emitter.emit(Top()), "Top.fir"))
s = Simlite(Top(), dpiconfig=cfg)
# Emitter.dumpVerilog(Emitter.dump(Emitter.emit(Top()), "Top.fir"))
s = Simlite(Top(), dpiconfig=cfg, debug=True)
s.start()
s.step([20, 20])
print("time: %d\t\tresult:%s" % (s.cnt, s.getRes()))
s.step([15, 10])
print("time: %d\t\tresult:%s" % (s.cnt, s.getRes()))
s.step([1000, 1])
print("time: %d\t\tresult:%s" % (s.cnt, s.getRes()))
s.step([999, 201])
# tasks = []
# tasks.append([11, 12])
# tasks.append([12, 13])
# tasks.append([13, 14])
# s.start_task('Top', tasks)
# s.start()
# s.step([15, 10])
# s.step([1000, 1])
# s.step([999, 201])
s.close()
if __name__ == '__main__':
main()

46
example/simlite_2.py Normal file
View File

@ -0,0 +1,46 @@
from pyhcl import *
from pysv import sv, DataType, Reference
from pyhcl.simulator import Simlite, DpiConfig
from queue import Queue
class Top(Module):
io = IO(
a=Input(U.w(32)),
b=Input(U.w(32)),
c=Output(U.w(32))
)
io.c <<= io.a + io.b
from random import randint
def main():
# Emitter.dumpVerilog(Emitter.dump(Emitter.emit(Top()), "Top.fir"))
s = Simlite(Top(), debug=True)
s.start()
s.step([20, 20])
print("time: %d\t\tresult:%s" % (s.cnt, s.getRes()))
s.step([15, 10])
print("time: %d\t\tresult:%s" % (s.cnt, s.getRes()))
s.step([1000, 1])
print("time: %d\t\tresult:%s" % (s.cnt, s.getRes()))
s.step([999, 201])
# tasks = []
# tasks.append([11, 12])
# tasks.append([12, 13])
# tasks.append([13, 14])
# s.start_task('Top', tasks)
# s.start()
# s.step([15, 10])
# s.step([1000, 1])
# s.step([999, 201])
s.close()
if __name__ == '__main__':
main()

38
injector/reader.py Normal file
View File

@ -0,0 +1,38 @@
# 读波形
from injector.vcd_reader import VcdReader
def read_wave(wavefile, replay_block, inputs_only, excluded_sigs):
# get file postfix check if supported # 得到文件后缀查看是否支持
wave_type = wavefile.split('.')[-1] # 得到文件后缀fsdb/vcd
supported_wave_formats = ['vcd', 'fsdb']
if wave_type not in supported_wave_formats: # 不属于fsdb和vcd文件报错
raise ValueError("Wavefile type: ", wave_type, " is currently not supported. Supported formats are: ", supported_wave_formats)
data = None
if wave_type == 'vcd': # vcd文件
# from waveRead.vcd_reader import VcdReader
data = VcdReader(replay_block, wavefile, excluded_sigs, inputs_only)
elif wave_type == 'fsdb': # fsdb文件
pass
# from waveRead.fsdb_reader import FsdbReader
# data = FsdbReader(replay_block, wavefile, excluded_sigs, inputs_only)
return data
if __name__ == '__main__':
replay_block = []
wavefile = "../simulation/wave.vcd"
# wavefile = "./mcdt.vcd"
excluded_sigs = []
inputs_only = False
data = read_wave(wavefile, replay_block, inputs_only, excluded_sigs) # VcdReader对象
print(data.sig_name_2_vcd_name)
print(data.vcd_name_2_sig_name)
print(data.signal_values)

58
injector/reader_base.py Normal file
View File

@ -0,0 +1,58 @@
class ReaderBase:
# 属性excluded_sigs、inputs_only、replay_blocks、signal_values、signal_changes
def __init__(self, replay_blocks, wave_file, excluded_sigs, inputs_only):
self.excluded_sigs = excluded_sigs if excluded_sigs != None else [] # 信号集合
self.inputs_only = inputs_only if inputs_only != None else True
if type(replay_blocks) != list:
replay_blocks = [replay_blocks]
self.replay_blocks = replay_blocks
# 从波形中读取数据,得到字典{'信号':[(时间,'值'),...],...}
self.signal_values = self.extract_values_from_wave(self.replay_blocks, self.excluded_sigs, inputs_only)
# 从signal_values中提取所有信号发生变化的时间得到列表
self.signal_changes = self.extract_events(self.signal_values)
self.signal_values_i = {}
self.signal_values_o = {}
self.countIO()
def countIO(self):
for key in self.signal_values:
if key.endswith('_i'):
self.signal_values_i[key] = self.signal_values[key]
else:
self.signal_values_o[key] = self.signal_values[key]
if self.inputs_only is True:
self.signal_values = self.signal_values_i
# 从波形中读取数据,得到字典{'信号':[(时间,'值'),...],...}
def extract_values_from_wave(self, replay_blocks, excluded_sigs, inputs_only):
pass
# 从signal_values中提取所有信号发生变化的时间得到列表
def extract_events(self, signal_values):
all_changes = []
for sig_name, sig_values in signal_values.items():
all_changes.extend(sig_values)
# 保留所有信号变化的时间
change_times = sorted(list(set([change[0] for change in all_changes])))
return change_times
# 得到仿真时刻sim_time后的一个变化时刻若无则返回None
def get_next_event(self, sim_time):
next_time = next((change_time for change_time in self.signal_changes if change_time > sim_time), None)
return next_time
# 返回字典--存放仿真时刻sim_time时各信号量的值
def get_values_at(self, sim_time):
current_values = {}
for sig_name, sig_values in self.signal_values.items():
# [::-1]表示倒序, next()返回迭代器的下一个项目
# 返回字典--存放仿真时刻sim_time时各信号量的值
current_values[sig_name] = next((sig_value[1] for sig_value in sig_values[::-1] if sig_value[0] <= sim_time), None)
return current_values

143
injector/simlite_v1.py Normal file
View File

@ -0,0 +1,143 @@
from pyhcl import *
from pysv import sv, DataType, Reference
from pyhcl.simulator import Simlite, DpiConfig
from queue import Queue
import asyncio
class Add(BlackBox):
io = IO(
in1=Input(U.w(32)),
in2=Input(U.w(32)),
out=Output(U.w(32))
)
@sv(a=DataType.UInt, b=DataType.UInt, return_type=Reference(x=DataType.UInt))
def fn(a, b):
return a + b
addpysvmodule(Add, fn) # 黑盒与函数 # 转换得到.sv/bbox/Add.svSV里调用python函数
compile_and_binding_all() # 编译得到共享库 到.build文件夹下, 生成 SV binding文件 .sv/pkg/pysv_pkg.sv
class Top(Module):
io = IO(
a=Input(U.w(32)),
b=Input(U.w(32)),
c=Output(U.w(32))
)
add = Add()
add.io.in1 <<= io.a
add.io.in2 <<= io.b
io.c <<= add.io.out
from random import randint
class driver:
def __init__(self, name, s: Simlite, time_period):
self.name = name
self.req_mb = Queue()
self.simlite = s
self.time_period = time_period
async def run(self):
for i in range(3):
input_data = [15 + i, 10 + i]
self.simlite.step(input_data)
print("%s drivered data %s" % (self.name, input_data))
await asyncio.sleep(self.time_period)
class monitor:
def __init__(self, name, s: Simlite, time_period):
self.name = name
self.input_mb = Queue()
self.output_mb = Queue()
self.simlite = s
self.cnt = self.simlite.cnt
self.time_period = time_period
async def run(self):
await asyncio.sleep(self.time_period/2)
while True:
if self.simlite.cnt == self.cnt + 1:
self.cnt = self.simlite.cnt
input_data = self.simlite.inputs_values
output_data = self.simlite.results
print("%s monitored input data %s" % (self.name, input_data))
print("%s monitored output data %s" % (self.name, output_data))
self.input_mb.put(input_data)
self.output_mb.put(output_data)
await asyncio.sleep(self.time_period)
class checker:
def __init__(self, name, time_period):
self.name = name
self.error_count = 0
self.cmp_count = 0
self.in_mb = Queue()
self.out_mb = Queue()
self.time_period = time_period
async def run(self):
await self.do_compare()
async def do_compare(self):
while True:
while self.out_mb.empty() or self.in_mb.empty():
await asyncio.sleep(self.time_period)
outputs = self.out_mb.get()
inputs = self.in_mb.get()
# print(inputs, outputs)
result = sum(inputs)
if result == outputs[0]:
print("succeed:output data %d is equal with desired data %d" % (outputs[0], result))
else:
print("failed:output data %d is not equal with desired data %d" % (outputs[0], result))
self.cmp_count = self.cmp_count + 1
# await asyncio.sleep(self.time_period)
async def func(s, time_period):
driver1 = driver("driver", s, time_period)
monitor1 = monitor("monitor", s, time_period)
checker1 = checker("checker", time_period)
monitor1.input_mb = checker1.in_mb
monitor1.output_mb = checker1.out_mb
driver_task = asyncio.create_task(driver1.run())
monitor_task = asyncio.create_task(monitor1.run())
checker_task = asyncio.create_task(checker1.run())
await driver_task
# await monitor_task
await asyncio.sleep(1)
def main():
cfg = DpiConfig()
# Emitter.dumpVerilog(Emitter.dump(Emitter.emit(Top()), "Top.fir"))
s = Simlite(Top(), dpiconfig=cfg, debug=True)
s.start()
time_period = 0.1
asyncio.run(func(s, time_period))
s.close()
if __name__ == '__main__':
cfg = DpiConfig()
# Emitter.dumpVerilog(Emitter.dump(Emitter.emit(Top()), "Top.fir"))
s = Simlite(Top(), dpiconfig=cfg, debug=True)
s.start()
s.step([20, 20])
s.step([15, 10])
s.step([1000, 1])
s.step([999, 201])
s.close()

171
injector/simlite_v2.py Normal file
View File

@ -0,0 +1,171 @@
from pyhcl import *
from pysv import sv, DataType, Reference
from pyhcl.simulator import Simlite, DpiConfig
from queue import Queue
import asyncio
class Add(BlackBox):
io = IO(
in1=Input(U.w(32)),
in2=Input(U.w(32)),
out=Output(U.w(32))
)
@sv(a=DataType.UInt, b=DataType.UInt, return_type=Reference(x=DataType.UInt))
def fn(a, b):
return a + b
addpysvmodule(Add, fn) # 黑盒与函数 # 转换得到.sv/bbox/Add.svSV里调用python函数
compile_and_binding_all() # 编译得到共享库 到.build文件夹下, 生成 SV binding文件 .sv/pkg/pysv_pkg.sv
class Top(Module):
io = IO(
a=Input(U.w(32)),
b=Input(U.w(32)),
c=Output(U.w(32))
)
add = Add()
add.io.in1 <<= io.a
add.io.in2 <<= io.b
io.c <<= add.io.out
from random import randint
class in_intf:
def __init__(self):
self.input_data = []
class out_intf:
def __init__(self):
self.output_data = []
class driver:
def __init__(self, name, s: Simlite, time_period):
self.name = name
self.req_mb = Queue()
self.simlite = s
self.time_period = time_period
async def run(self):
for i in range(3):
input_data = [15 + i, 10 + i]
self.simlite.step(input_data)
print("%s drivered data %s" % (self.name, input_data))
await asyncio.sleep(self.time_period)
class in_monitor:
def __init__(self, name, s: Simlite, time_period):
self.name = name
self.input_mb = Queue()
self.simlite = s
self.cnt = self.simlite.cnt
self.time_period = time_period
async def run(self):
await asyncio.sleep(self.time_period/2)
while True:
if self.simlite.cnt == self.cnt + 1:
self.cnt = self.simlite.cnt
input_data = self.simlite.inputs_values
print("%s monitored input data %s" % (self.name, input_data))
self.input_mb.put(input_data)
await asyncio.sleep(self.time_period)
class out_monitor:
def __init__(self, name, s: Simlite, time_period):
self.name = name
self.output_mb = Queue()
self.simlite = s
self.cnt = self.simlite.cnt
self.time_period = time_period
async def run(self):
await asyncio.sleep(self.time_period/2)
while True:
if self.simlite.cnt == self.cnt + 1:
self.cnt = self.simlite.cnt
output_data = self.simlite.results
print("%s monitored output data %s" % (self.name, output_data))
self.output_mb.put(output_data)
await asyncio.sleep(self.time_period)
class checker:
def __init__(self, name, time_period):
self.name = name
self.error_count = 0
self.cmp_count = 0
self.in_mb = Queue()
self.out_mb = Queue()
self.time_period = time_period
async def run(self):
await self.do_compare()
async def do_compare(self):
while True:
while self.out_mb.empty() or self.in_mb.empty():
await asyncio.sleep(self.time_period)
outputs = self.out_mb.get()
inputs = self.in_mb.get()
# print(inputs, outputs)
result = sum(inputs)
if result == outputs[0]:
print("succeed:output data %d is equal with desired data %d" % (outputs[0], result))
else:
print("failed:output data %d is not equal with desired data %d" % (outputs[0], result))
self.cmp_count = self.cmp_count + 1
# await asyncio.sleep(self.time_period)
async def func(s, time_period):
driver1 = driver("driver", s, time_period)
i_monitor = in_monitor("in_monitor", s, time_period)
o_monitor = out_monitor("out_monitor", s, time_period)
checker1 = checker("checker", time_period)
i_monitor.input_mb = checker1.in_mb
o_monitor.output_mb = checker1.out_mb
driver_task = asyncio.create_task(driver1.run())
i_monitor_task = asyncio.create_task(i_monitor.run())
o_monitor_task = asyncio.create_task(o_monitor.run())
checker_task = asyncio.create_task(checker1.run())
await driver_task
# await monitor_task
await asyncio.sleep(1)
def main():
cfg = DpiConfig()
# Emitter.dumpVerilog(Emitter.dump(Emitter.emit(Top()), "Top.fir"))
s = Simlite(Top(), dpiconfig=cfg, debug=True)
s.start()
time_period = 0.1
asyncio.run(func(s, time_period))
s.close()
if __name__ == '__main__':
cfg = DpiConfig()
# Emitter.dumpVerilog(Emitter.dump(Emitter.emit(Top()), "Top.fir"))
s = Simlite(Top(), dpiconfig=cfg, debug=True)
s.start()
s.step([20, 20])
s.step([15, 10])
s.step([1000, 1])
s.step([999, 201])
s.close()

196
injector/simlite_v3.py Normal file
View File

@ -0,0 +1,196 @@
from pyhcl import *
from pysv import sv, DataType, Reference
from pyhcl.simulator import Simlite, DpiConfig
from queue import Queue
import asyncio
class Add(BlackBox):
io = IO(
in1=Input(U.w(32)),
in2=Input(U.w(32)),
out=Output(U.w(32))
)
@sv(a=DataType.UInt, b=DataType.UInt, return_type=Reference(x=DataType.UInt))
def fn(a, b):
return a + b
addpysvmodule(Add, fn) # 黑盒与函数 # 转换得到.sv/bbox/Add.svSV里调用python函数
compile_and_binding_all() # 编译得到共享库 到.build文件夹下, 生成 SV binding文件 .sv/pkg/pysv_pkg.sv
class Top(Module):
io = IO(
a=Input(U.w(32)),
b=Input(U.w(32)),
c=Output(U.w(32))
)
add = Add()
add.io.in1 <<= io.a
add.io.in2 <<= io.b
io.c <<= add.io.out
from random import randint
class in_intf:
def __init__(self):
self.input_data = []
class out_intf:
def __init__(self):
self.output_data = []
class driver:
def __init__(self, name, s: Simlite, time_period):
self.name = name
self.req_mb = Queue()
self.simlite = s
self.time_period = time_period
async def run(self):
for i in range(3):
input_data = [15 + i, 10 + i]
self.simlite.step(input_data)
print("%s drivered data %s" % (self.name, input_data))
await asyncio.sleep(self.time_period)
def set_interface(self, intf):
self.intf = intf
class in_monitor:
def __init__(self, name, time_period):
self.name = name
self.input_mb = Queue()
self.time_period = time_period
self.input_data = 0
async def run(self):
await asyncio.sleep(self.time_period/2)
while True:
if self.intf.input_data != self.input_data:
self.input_data = self.intf.input_data
print("%s monitored input data %s" % (self.name, self.input_data))
self.input_mb.put(self.input_data)
await asyncio.sleep(self.time_period)
def set_interface(self, intf):
self.intf = intf
class out_monitor:
def __init__(self, name, time_period):
self.name = name
self.output_mb = Queue()
self.time_period = time_period
self.output_data = 0
async def run(self):
await asyncio.sleep(self.time_period/2)
while True:
if self.intf.output_data != self.output_data:
self.output_data = self.intf.output_data
print("%s monitored output data %s" % (self.name, self.output_data))
self.output_mb.put(self.output_data)
await asyncio.sleep(self.time_period)
def set_interface(self, intf):
self.intf = intf
class checker:
def __init__(self, name, time_period):
self.name = name
self.error_count = 0
self.cmp_count = 0
self.in_mb = Queue()
self.out_mb = Queue()
self.time_period = time_period
async def run(self):
await self.do_compare()
async def do_compare(self):
while True:
while self.out_mb.empty() or self.in_mb.empty():
await asyncio.sleep(self.time_period)
outputs = self.out_mb.get()
inputs = self.in_mb.get()
# print(inputs, outputs)
result = sum(inputs)
if result == outputs[0]:
print("succeed:output data %d is equal with desired data %d" % (outputs[0], result))
else:
print("failed:output data %d is not equal with desired data %d" % (outputs[0], result))
self.cmp_count = self.cmp_count + 1
# await asyncio.sleep(self.time_period)
async def inject_values(time_period, in_intf, out_intf):
for i in range(3):
input_data = [15 + i, 10 + i]
output_data = [sum(input_data)]
in_intf.input_data = input_data
out_intf.output_data = output_data
print("inject input_data %s and output_data %s" % (input_data, output_data))
await asyncio.sleep(time_period)
pass
async def func(time_period):
# driver1 = driver("driver", s, time_period)
i_monitor = in_monitor("in_monitor", time_period)
o_monitor = out_monitor("out_monitor", time_period)
checker1 = checker("checker", time_period)
i_monitor.input_mb = checker1.in_mb
o_monitor.output_mb = checker1.out_mb
i_intf = in_intf()
o_intf = out_intf()
i_monitor.set_interface(i_intf)
o_monitor.set_interface(o_intf)
# driver_task = asyncio.create_task(driver1.run())
i_monitor_task = asyncio.create_task(i_monitor.run())
o_monitor_task = asyncio.create_task(o_monitor.run())
checker_task = asyncio.create_task(checker1.run())
# await driver_task
# await monitor_task
inject_task = asyncio.create_task(inject_values(time_period, i_intf, o_intf))
await inject_task
await asyncio.sleep(1)
# 手动注入
def main():
# cfg = DpiConfig()
# # Emitter.dumpVerilog(Emitter.dump(Emitter.emit(Top()), "Top.fir"))
# s = Simlite(Top(), dpiconfig=cfg, debug=True)
# s.start()
time_period = 0.1
asyncio.run(func(time_period))
# s.close()
if __name__ == '__main__':
cfg = DpiConfig()
# Emitter.dumpVerilog(Emitter.dump(Emitter.emit(Top()), "Top.fir"))
s = Simlite(Top(), dpiconfig=cfg, debug=True)
s.start()
s.step([20, 20])
s.step([15, 10])
s.step([1000, 1])
s.step([999, 201])
s.close()

236
injector/simlite_v4.py Normal file
View File

@ -0,0 +1,236 @@
# from pyhcl import *
# from pysv import sv, DataType, Reference
# from pyhcl.simulator import Simlite, DpiConfig
from queue import Queue
import asyncio
from injector.reader import read_wave
# class Add(BlackBox):
# io = IO(
# in1=Input(U.w(32)),
# in2=Input(U.w(32)),
# out=Output(U.w(32))
# )
#
#
# @sv(a=DataType.UInt, b=DataType.UInt, return_type=Reference(x=DataType.UInt))
# def fn(a, b):
# return a + b
#
#
# addpysvmodule(Add, fn) # 黑盒与函数 # 转换得到.sv/bbox/Add.svSV里调用python函数
# compile_and_binding_all() # 编译得到共享库 到.build文件夹下, 生成 SV binding文件 .sv/pkg/pysv_pkg.sv
#
#
# class Top(Module):
# io = IO(
# a=Input(U.w(32)),
# b=Input(U.w(32)),
# c=Output(U.w(32))
# )
#
# add = Add()
# add.io.in1 <<= io.a
# add.io.in2 <<= io.b
# io.c <<= add.io.out
from random import randint
class in_intf:
def __init__(self):
self.input_data = []
class out_intf:
def __init__(self):
self.output_data = []
# class driver:
# def __init__(self, name, s: Simlite, time_period):
# self.name = name
# self.req_mb = Queue()
# self.simlite = s
# self.time_period = time_period
#
# async def run(self):
# for i in range(3):
# input_data = [15 + i, 10 + i]
# self.simlite.step(input_data)
# print("%s drivered data %s" % (self.name, input_data))
# await asyncio.sleep(self.time_period)
#
# def set_interface(self, intf):
# self.intf = intf
class in_monitor:
def __init__(self, name, time_period):
self.name = name
self.input_mb = Queue()
self.time_period = time_period
self.input_data = 0
async def run(self):
await asyncio.sleep(self.time_period/2)
while True:
if self.intf.input_data != self.input_data:
self.input_data = self.intf.input_data
print("%s monitored input data %s" % (self.name, self.input_data))
self.input_mb.put(self.input_data)
await asyncio.sleep(self.time_period)
def set_interface(self, intf):
self.intf = intf
class out_monitor:
def __init__(self, name, time_period):
self.name = name
self.output_mb = Queue()
self.time_period = time_period
self.output_data = 0
async def run(self):
await asyncio.sleep(self.time_period/2)
while True:
if self.intf.output_data != self.output_data:
self.output_data = self.intf.output_data
print("%s monitored output data %s" % (self.name, self.output_data))
self.output_mb.put(self.output_data)
await asyncio.sleep(self.time_period)
def set_interface(self, intf):
self.intf = intf
class checker:
def __init__(self, name, time_period):
self.name = name
self.error_count = 0
self.cmp_count = 0
self.in_mb = Queue()
self.out_mb = Queue()
self.time_period = time_period
async def run(self):
await self.do_compare()
async def do_compare(self):
while True:
while self.out_mb.empty() or self.in_mb.empty():
await asyncio.sleep(self.time_period)
outputs = self.out_mb.get()
inputs = self.in_mb.get()
# print(inputs, outputs)
result = sum(inputs)
if result == outputs[0]:
print("succeed:output data %d is equal with desired data %d" % (outputs[0], result))
else:
print("failed:output data %d is not equal with desired data %d" % (outputs[0], result))
self.cmp_count = self.cmp_count + 1
# await asyncio.sleep(self.time_period)
async def inject_values(time_period, in_intf, out_intf, wavefile):
data = waveRead(wavefile)
sim_time = 0
while True:
print()
# values为字典{'信号':'值',...}--存放仿真时刻sim_time时各信号量的值
values = data.get_values_at(sim_time)
# print(values)
# 注入当前仿真时刻的信号与值
# injector.inject_values(values)
input_data = [values['TOP.io_a'], values['TOP.io_b']]
input_data = [int(k, base=2) for k in input_data]
output_data = [values['TOP.io_c']]
output_data = [int(k, base=2) for k in output_data]
in_intf.input_data = input_data
out_intf.output_data = output_data
print("inject input_data %s and output_data %s" % (input_data, output_data))
await asyncio.sleep(time_period)
previous_time = sim_time
# 得到仿真时刻sim_time后的一个变化时刻若无则返回None
sim_time = data.get_next_event(sim_time)
if sim_time is None:
break
# await Timer(sim_time - previous_time)
# for i in range(3):
# input_data = [15 + i, 10 + i]
# output_data = [sum(input_data)]
# in_intf.input_data = input_data
# out_intf.output_data = output_data
# print("inject input_data %s and output_data %s" % (input_data, output_data))
# await asyncio.sleep(time_period)
pass
def waveRead(wavefile):
replay_block = []
excluded_sigs = []
inputs_only = False
data = read_wave(wavefile, replay_block, inputs_only, excluded_sigs) # VcdReader对象
print(data.signal_values)
return data
async def func(time_period):
# driver1 = driver("driver", s, time_period)
i_monitor = in_monitor("in_monitor", time_period)
o_monitor = out_monitor("out_monitor", time_period)
checker1 = checker("checker", time_period)
i_monitor.input_mb = checker1.in_mb
o_monitor.output_mb = checker1.out_mb
i_intf = in_intf()
o_intf = out_intf()
i_monitor.set_interface(i_intf)
o_monitor.set_interface(o_intf)
# driver_task = asyncio.create_task(driver1.run())
i_monitor_task = asyncio.create_task(i_monitor.run())
o_monitor_task = asyncio.create_task(o_monitor.run())
checker_task = asyncio.create_task(checker1.run())
# await driver_task
# await monitor_task
wavefile = "simulation/wave.vcd"
inject_task = asyncio.create_task(inject_values(time_period, i_intf, o_intf, wavefile))
await inject_task
await asyncio.sleep(1)
def main():
# cfg = DpiConfig()
# Emitter.dumpVerilog(Emitter.dump(Emitter.emit(Top()), "Top.fir"))
# s = Simlite(Top(), dpiconfig=cfg, debug=True)
# s.start()
time_period = 0.1
asyncio.run(func(time_period))
# s.close()
if __name__ == '__main__':
# cfg = DpiConfig()
# # Emitter.dumpVerilog(Emitter.dump(Emitter.emit(Top()), "Top.fir"))
# s = Simlite(Top(), dpiconfig=cfg, debug=True)
# s.start()
# s.step([20, 20])
# s.step([15, 10])
# s.step([1000, 1])
# s.step([999, 201])
# s.close()
# wavefile = "../simulation/wave.vcd"
# waveRead(wavefile)
main()

199
injector/vcd_reader.py Normal file
View File

@ -0,0 +1,199 @@
import os
import sys
import re
# wip: better structured vcd reader
from injector.reader_base import ReaderBase
class VcdReader(ReaderBase):
# 属性vcd_path、excluded_sigs、replay_blocks、
# 属性scope_begin_match、scope_end_match、definitions_end_match、signal_match、simple_sig_match、vector_sig_match、dumpvars_match、new_time_match、sig_value_match
# 父类属性excluded_sigs、inputs_only、replay_blocks、signal_values、signal_changes
def __init__(self, replay_blocks, wave_file, excluded_sigs, inputs_only):
# 波形文件名xxx.vcd
self.vcd_path = wave_file
# 排除的信号
self.excluded_sigs = excluded_sigs
# 重新运行的模块
self.replay_blocks = replay_blocks
# r raw string
# ^ 匹配字符串的开头
# \s 匹配任意空白字符
# re* 匹配0个或多个的表达式
# [^...] 不在[]中的字符
# . 匹配任意字符,除了换行符
# 正则表达式
# 匹配scope头 # 例:$scope module tb_top $end
self.scope_begin_match = r"^\$scope\s*([^ ]+)\s*([^ ]+)\s*\$end"
# 匹配scope尾 # 例:$upscope $end
self.scope_end_match = r"\$upscope\s*\$end"
# 匹配信号定义尾 # 例:$enddefinitions $end
self.definitions_end_match = r"\$enddefinitions.*"
# 匹配信号 # 例:$var reg 1 ! clk $end
self.signal_match = r"\$var.*\$end"
# 匹配简单信号 # 例:! clk $end
self.simple_sig_match = r"([^ ]+)\s*([a-zA-Z0-9_]+)\s*\$end"
# 匹配信号数组 # 例:' ctr [10:0] $end
self.vector_sig_match = r"([^ ]+)\s*([a-zA-Z0-9_]+)\s*(\[[0-9]+\:[0-9]+\]*)\s*\$end"
# 匹配dumpvars头 # 例:$dumpvars
self.dumpvars_match = r"^\$dumpvars.*"
# 匹配时间(#数字) # 例:#10
self.new_time_match = r"^#([0-9]+)"
# 匹配信号值 # 例b1 ' 例1&
self.sig_value_match = r"^[b]{0,1}([zx10]+)[ ]{0,1}(.*)"
super().__init__(replay_blocks, wave_file, excluded_sigs, inputs_only)
# 从波形文件中提取
def extract_values_from_wave(self, replay_blocks, excluded_sigs, inputs_only):
# 得到信号值 字典{'信号':[(时间,'值'),...],...}
self.sigs_values = {}
# 从vcd波形文件中提取xxx.vcd
self.extract_vcd_file(self.vcd_path)
return self.sigs_values
# 从vcd波形文件中提取数据xxx.vcd
def extract_vcd_file(self, valid_path):
with open(valid_path, "r") as vcd_file:
# 将信号名与vcd文件中字符名一一对应
self.sig_name_2_vcd_name = {} # 信号名转vcd名
self.vcd_name_2_sig_name = {} # vcd名转信号名
# 从vcd波形文件中提取前面scope部分
self.extract_scopes(vcd_file)
# print(self.sig_name_2_vcd_name)
# 从vcd波形文件中提取信号值部分
self.extract_sig_values(vcd_file)
# 从当前scope(范围)中提取数据
def extract_scopes(self, vcd_file, current_scope=""):
while vcd_file:
vcd_line = vcd_file.readline().strip(' ') # 读取一行
if vcd_line == "": # 注:如果是空行,为'\n'
return
# re.match(pattern, string, flags=0)
# re.match 方法从字符串的起始位置匹配一个模式,匹配成功,返回一个匹配的对象,否则返回 None
# re.search(pattern, string, flags=0)
# re.search 扫描整个字符串并返回第一个成功的匹配,否则返回 None
# group() 同group0就是匹配正则表达式整体结果
# group(1) 列出第一个括号匹配部分group(2) 列出第二个括号匹配部分group(3) 列出第三个括号匹配部分。
# 匹配scope的头
scope_begin = re.match(self.scope_begin_match, vcd_line)
# 匹配scope的尾
scope_end = re.match(self.scope_end_match, vcd_line)
# 匹配信号
sig = re.match(self.signal_match, vcd_line)
# 匹配简单信号
simple_sig = re.search(self.simple_sig_match, vcd_line)
# 匹配信号数组
vector_sig = re.search(self.vector_sig_match, vcd_line)
# 匹配信号定义的结束
definitions_end = re.search(self.definitions_end_match, vcd_line)
# 信号定义的结束
if definitions_end:
if current_scope != "": # 顶层
raise ValueError("Definitions terminated in the middle of scope: ", current_scope)
return
# scope的头
if scope_begin:
new_scope_name = scope_begin.group(2) # 模块名
prefix = current_scope + "." if current_scope != "" else ""
# print("going into scope: ", prefix + new_scope_name)
# 从scope模块中提取数据递归
self.extract_scopes(vcd_file, prefix + new_scope_name) # 从新模块中提取数据
# scope的尾
if scope_end:
# print("going out of scope: ", current_scope)
return # 当前scope模块结束
# 当前模块不用重新运行
# if current_scope not in self.replay_blocks:
# # just read the lines # 只是读行一直到scope_end
# continue
# 简单信号
if sig and simple_sig:
self.update_sig_tables(current_scope, simple_sig) # 更新信号表
# 信号数组
if sig and vector_sig:
self.update_sig_tables(current_scope, vector_sig) # 更新信号表
# 不是信号和scope
if (not (vector_sig or simple_sig or scope_begin or scope_end)):
pass
#print("Unsupported line: ", vcd_line)
# 更新信号表
def update_sig_tables(self, current_scope, signal_match):
vcd_name = signal_match.group(1) # vcd中信号代表字符名
sig_name = signal_match.group(2) # 信号名
full_sig_name = current_scope + "." + sig_name # 模块.信号名
# print(full_sig_name)
if full_sig_name in self.excluded_sigs: # 该信号在排除范围内
return
# 将信号名与vcd文件中字符名一一对应
self.sig_name_2_vcd_name[full_sig_name] = vcd_name
if vcd_name not in self.vcd_name_2_sig_name:
self.vcd_name_2_sig_name[vcd_name] = []
self.vcd_name_2_sig_name[vcd_name].append(full_sig_name)
# 从vcd波形文件中提取信号值部分
def extract_sig_values(self, vcd_file):
for sig_name in self.sig_name_2_vcd_name.keys(): # 遍历信号名
self.sigs_values[sig_name] = [] # 信号值字典 键:信号名 值:列表(时间,信号值)
# dumpvars_found = False
while vcd_file:
vcd_line = vcd_file.readline().strip(' ') # 读取一行
if vcd_line == "": # 注:如果是空行,为'\n'
return
elif vcd_line == "\n":
continue
# dumpvars = re.match(self.dumpvars_match, vcd_line) # 匹配头
new_time = re.match(self.new_time_match, vcd_line) # 匹配时间
sig_value = re.match(self.sig_value_match, vcd_line) # 匹配信号值
# if dumpvars:
# dumpvars_found = True # 找到dumpvars头,开始匹配信号值
# current_time = 0 # 时间初始为0
if new_time:
current_time = new_time.group(1) # 时间
if current_time == '123018744': # 时间
pass
if sig_value:
signal_value = sig_value.group(1) # 信号值([zx10]+
signal_vcd_name = sig_value.group(2) # 信号对应的vcd名
if signal_vcd_name in self.vcd_name_2_sig_name:
# vcd名转信号名
signal_name_list = self.vcd_name_2_sig_name[signal_vcd_name]
for signal_name in signal_name_list:
# 信号值字典 键:信号名 值:列表(时间,信号值)
self.sigs_values[signal_name].append((int(current_time), signal_value))
else:
# we currently get here for expended vector signals (i.e. [x[3]])
continue

107
myTests/test.py Normal file
View File

@ -0,0 +1,107 @@
from typing import Type, Union, Dict
class IO:
_ios: Dict[str, int]
def __init__(self, **kwargs):
self._ios = kwargs
class Add:
io = IO(
in1=3,
in2=4,
out=5
)
import re
# 解析FIRRTL代码, 返回输入端口名列表 和 输出端口名列表
def firrtl_parse(firrtl_path):
circuit_begin_match = r"circuit\s*([a-zA-Z0-9_]+)"
module_begin_match = r"module\s*([a-zA-Z0-9_]+)"
input_port_match = r"input\s*([a-zA-Z0-9_]+)"
output_port_match = r"output\s*([a-zA-Z0-9_]+)"
input_ports_name = []
output_ports_name = []
top_module_name = '0'
current_module_name = '1'
with open(firrtl_path, "r") as firrtl_file:
while firrtl_file:
firrtl_line = firrtl_file.readline().strip(' ') # 读取一行
# print(firrtl_line)
if firrtl_line == "": # 注:如果是空行,为'\n'
break
circuit_begin = re.search(circuit_begin_match, firrtl_line)
module_begin = re.search(module_begin_match, firrtl_line)
if circuit_begin:
top_module_name = circuit_begin.group(1)
# print(top_module_name)
if module_begin:
current_module_name = module_begin.group(1)
# print(current_module_name)
if current_module_name == top_module_name:
input_port = re.search(input_port_match, firrtl_line)
output_port = re.search(output_port_match, firrtl_line)
if input_port:
input_ports_name.append(input_port.group(1))
if output_port:
output_ports_name.append(output_port.group(1))
print(top_module_name)
print(input_ports_name)
print(output_ports_name)
return top_module_name, input_ports_name, output_ports_name
# 解析verilog代码, 返回输入端口名列表 和 输出端口名列表
def verilog_parse(dut_path, top_module_name):
dut_name = top_module_name.split('.')[0] # 模块名
top_module_path = dut_path + top_module_name
# print(top_module_path)
module_begin_match = r"module\s*([a-zA-Z0-9_]+)"
input_port_match = r"input\s*(reg|wire)*\s*(\[[0-9]+\:[0-9]+\]*)*\s*([a-zA-Z0-9_]+)"
output_port_match = r"output\s*(reg|wire)*\s*(\[[0-9]+\:[0-9]+\]*)*\s*([a-zA-Z0-9_]+)"
input_ports_name = []
output_ports_name = []
with open(top_module_path, "r") as verilog_file:
while verilog_file:
verilog_line = verilog_file.readline().strip(' ') # 读取一行
# print(verilog_line)
if verilog_line == "": # 注:如果是空行,为'\n'
break
module_begin = re.search(module_begin_match, verilog_line)
if module_begin:
current_module_name = module_begin.group(1)
# print(current_module_name)
if current_module_name == dut_name:
input_port = re.search(input_port_match, verilog_line)
output_port = re.search(output_port_match, verilog_line)
if input_port:
input_ports_name.append(input_port.group(3))
if output_port:
output_ports_name.append(output_port.group(3))
# print(dut_name)
# print(input_ports_name)
# print(output_ports_name)
return input_ports_name, output_ports_name
if __name__ == '__main__':
# verilog_parse('./tmp/dut/', 'Top.v')
verilog_parse('../simulation/', 'M.v')
# firrtl_parse('./tmp/firrtl/M.fir')

116
myTests/test_async.py Normal file
View File

@ -0,0 +1,116 @@
# Eventloop 是asyncio应用的核心,把一些异步函数注册到这个事件循环上,事件循环会循环执行这些函数,
# 当执行到某个函数时如果它正在等待I/O返回如它正在进行网络请求或者sleep操作事件循环会暂停它的执行去执行其他的函数
# 当某个函数完成I/O后会恢复下次循环到它的时候继续执行。
# 因此,这些异步函数可以协同(Cooperative)运行:这就是事件循环的目标。
# await只能用在协程函数中, 所以想要用await关键字就还需要定义一个协程函数, 但最终的执行还是需要放到一个事件循环中进行
import asyncio
import time
async def testa(x):
print("in myTests a")
await asyncio.sleep(3)
print("Resuming a")
return x
async def testb(x):
print("in myTests b")
await asyncio.sleep(1)
print('Resuming b')
return x
# 先执行了testa函数然后再执行了testb函数是串行的依次执行的
async def main_1():
start = time.time()
resulta = await testa(1)
resultb = await testb(2)
print("myTests a result is %d" % resulta)
print("myTests b result is %d" % resultb)
print("use %s time" % (time.time()-start))
# testa和testb是同步在运行,最后将每个协程函数的结果返回,
# 注意这里是gather()函数里的每一个协程函数都执行完了它才结果结果是一个列表列表里的值顺序和放到gather函数里的协程的顺序是一致的
async def main_2():
start = time.time()
resulta, resultb = await asyncio.gather(testa(1), testb(2))
print("myTests a result is %d" % resulta)
print("myTests b result is %d" % resultb)
print("use %s time" % (time.time() - start))
# 使用Task任务对象
# 使用asyncio.ensure_future(testa(1)) 返回一个task对象此时task进入pending状态并没有执行
# taska.done()返回False,表示它还没有结束
# 当调用await taska 时表示开始执行该协程当执行结束以后taska.done() 返回True
# 这时可以调用taska.result() 得到函数的返回值如果协程还没有结束就调用result()方法则会抛异常
async def main_3():
start = time.time()
taska = asyncio.ensure_future(testa(1))
taskb = asyncio.ensure_future(testb(2))
print(taska)
print(taskb)
print(taska.done(), taskb.done())
await taskb
await taska
print(taska.done(), taskb.done())
print(taskb.result())
print(taska.result())
print("use %s time" % (time.time() - start))
# 创建task对象除了使用asyncio.ensure_future()方法还可以使用loop.create_task() 方法
async def main_4():
start = time.time()
taska = loop.create_task(testa(1))
taskb = loop.create_task(testb(2))
print(taska)
print(taskb)
print(taska.done(), taskb.done())
await taskb
await taska
print(taska.done(), taskb.done())
print(taskb.result())
print(taska.result())
print("use %s time" % (time.time() - start))
# asyncio.wait() 返回一个tuple对象
async def main_5():
start = time.time()
done, pending = await asyncio.wait([testa(1), testb(2)])
print(list(done))
print(list(pending))
print(list(done)[0].result())
print("use %s time" % (time.time() - start))
# gather是需要所有任务都执行结束如果某一个协程函数崩溃了则会抛异常都不会有结果。
# wait可以定义函数返回的时机可以是FIRST_COMPLETED(第一个结束的), FIRST_EXCEPTION(第一个出现异常的), ALL_COMPLETED(全部执行完,默认的)
async def main_6():
start = time.time()
done, pending = await asyncio.wait([testa(1), testb(2)], return_when=asyncio.tasks.FIRST_EXCEPTION)
print(list(done))
print(list(pending))
print(list(done)[1].result())
print("use %s time" % (time.time() - start))
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main_1())
# loop.run_until_complete(main_2())
# loop.run_until_complete(main_3())
# loop.run_until_complete(main_4())
# loop.run_until_complete(main_5())
# loop.run_until_complete(main_6())
loop.close()

97
myTests/test_async_2.py Normal file
View File

@ -0,0 +1,97 @@
import asyncio
import time
async def a():
print('Suspending a')
await asyncio.sleep(3)
print('Resuming a')
async def b():
print('Suspending b')
await asyncio.sleep(1)
print('Resuming b')
def show_perf(func):
print('*'*20)
start = time.perf_counter()
asyncio.run(func())
print(f'{func.__name__} Cost: {time.perf_counter() - start}')
# 串行的执行
async def s1():
await a()
await b()
async def c1():
await asyncio.gather(a(),b())
async def c2():
await asyncio.wait([a(), b()])
# asyncio.create_task相当于把协程封装成Task
async def c3():
task1 = asyncio.create_task(a())
task2 = asyncio.create_task(b())
print("XXX")
await task2
print("YYY")
# await task2
# await asyncio.sleep(3)
print("ZZZ")
# print("XXX")
# await task2
# print("YYY")
# # await task1
# print("ZZZ")
async def c4():
task = asyncio.create_task(b())
print("XXX")
await a()
print("YYY")
# await task
print("ZZZ")
# print("XXX")
# await task
# print("YYY")
# await a()
# print("ZZZ")
# 直接await task不会对并发有帮助
async def s2():
await asyncio.create_task(a())
await asyncio.create_task(b())
async def c5():
task = asyncio.ensure_future(b())
print("XXX")
await a()
print("YYY")
# await task
print("ZZZ")
async def c6():
loop = asyncio.get_event_loop()
task = loop.create_task(b())
print("XXX")
await a()
print("YYY")
await task
print("ZZZ")
if __name__ == '__main__':
show_perf(c3)

85
myTests/test_firrtl.py Normal file
View File

@ -0,0 +1,85 @@
from pyhcl import *
from pyhcl.util.firrtltools import addfirrtlmodule
from pyhcl.simulator import Simlite
import random
class Add(BlackBox):
io = IO(
in1=Input(U.w(32)),
in2=Input(U.w(32)),
out=Output(U.w(32)),
)
fd = open(f"myTests/tmp/firrtl/Add.fir", "r")
firrtl_code = "".join(fd.readlines())
# print(firrtl_code)
addfirrtlmodule(Add, firrtl_code)
class M(Module):
io = IO(
a=Input(U.w(32)),
b=Input(U.w(32)),
c=Output(U.w(32)),
)
bbox = Add()
bbox.io.in1 <<= io.a
bbox.io.in2 <<= io.b
io.c <<= bbox.io.out
# 每次给输入端口赋值, 跑一个时间单位
def test_step(s):
s.start()
s.step([20, 20])
print("cnt: %d\t\tresult:%s" % (s.cnt, s.getRes()))
s.step([15, 10])
print("cnt: %d\t\tresult:%s" % (s.cnt, s.getRes()))
s.step([1000, 1])
print("cnt: %d\t\tresult:%s" % (s.cnt, s.getRes()))
s.step([999, 201])
print("cnt: %d\t\tresult:%s" % (s.cnt, s.getRes()))
s.stop()
def test_task(s):
tasks = []
tasks.append([20, 20])
tasks.append([15, 10])
tasks.append([1000, 1])
tasks.append([999, 201])
s.start_task('Top', tasks)
def randomInput(ifn):
fd = open(ifn, "w")
instr = ""
for i in range(100):
instr += "0 " + str(random.randint(1, 2000)) + ' ' + str(random.randint(1, 2000)) + "\n"
instr = instr + "-1\n"
fd.write(instr)
fd.close()
def test_file(s):
ifn = f"../myTests/tmp/Top_inputs"
ofn = f"../myTests/tmp/Top_outputs"
randomInput(ifn)
s.start(mode="task", ofn=ofn, ifn=ifn)
pass
def main():
s = Simlite(M())
# test_step(s)
# test_task(s)
test_file(s)
s.close()

57
myTests/test_firrtl_2.py Normal file
View File

@ -0,0 +1,57 @@
from pyhcl.simulator.simlite_firrtl import Simlite
import random
# 每次给输入端口赋值, 跑一个时间单位
def test_step(s):
s.start()
s.step([0, 0, 20, 20])
print("cnt: %d\t\tresult:%s" % (s.cnt, s.getRes()))
s.step([0, 0, 15, 10])
print("cnt: %d\t\tresult:%s" % (s.cnt, s.getRes()))
s.step([0, 0, 1000, 1])
print("cnt: %d\t\tresult:%s" % (s.cnt, s.getRes()))
s.step([0, 0, 999, 201])
print("cnt: %d\t\tresult:%s" % (s.cnt, s.getRes()))
s.stop()
def test_task(s):
tasks = []
tasks.append([0, 0, 20, 20])
tasks.append([0, 0, 15, 10])
tasks.append([0, 0, 1000, 1])
tasks.append([0, 0, 999, 201])
s.start_task('Top', tasks)
def randomInput(ifn):
fd = open(ifn, "w")
instr = ""
for i in range(100):
instr += "0 0 0" + str(random.randint(1, 2000)) + ' ' + str(random.randint(1, 2000)) + "\n"
instr = instr + "-1\n"
fd.write(instr)
fd.close()
def test_file(s):
ifn = f"../myTests/tmp/Top_inputs"
ofn = f"../myTests/tmp/Top_outputs"
randomInput(ifn)
s.start(mode="task", ofn=ofn, ifn=ifn)
pass
def main():
firrtl_path = 'myTests/tmp/firrtl/M.fir'
s = Simlite(firrtl_path)
test_step(s)
# test_task(s)
# test_file(s)
s.close()

72
myTests/test_simlite.py Normal file
View File

@ -0,0 +1,72 @@
from pyhcl import *
from pyhcl.simulator import Simlite
import random
class Top(Module):
io = IO(
a=Input(U.w(32)),
b=Input(U.w(32)),
c=Output(U.w(32))
)
io.c <<= io.a + io.b
# 每次给输入端口赋值, 跑一个时间单位
def test_step(s):
s.start()
s.step([20, 20])
print("cnt: %d\t\tresult:%s" % (s.cnt, s.getRes()))
s.step([15, 10])
print("cnt: %d\t\tresult:%s" % (s.cnt, s.getRes()))
s.step([1000, 1])
print("cnt: %d\t\tresult:%s" % (s.cnt, s.getRes()))
s.step([999, 201])
print("cnt: %d\t\tresult:%s" % (s.cnt, s.getRes()))
s.stop()
def test_task(s):
tasks = []
tasks.append([20, 20])
tasks.append([15, 10])
tasks.append([1000, 1])
tasks.append([999, 201])
s.start_task('Top', tasks)
def randomInput(ifn):
fd = open(ifn, "w")
instr = ""
for i in range(100):
instr += "0 " + str(random.randint(1, 2000)) + ' ' + str(random.randint(1, 2000)) + "\n"
instr = instr + "-1\n"
fd.write(instr)
fd.close()
def test_file(s):
ifn = f"../myTests/tmp/Top_inputs"
ofn = f"../myTests/tmp/Top_outputs"
randomInput(ifn)
s.start(mode="task", ofn=ofn, ifn=ifn)
pass
def main():
# Emitter.dumpVerilog(Emitter.dump(Emitter.emit(Top()), "Add.fir"))
s = Simlite(Top(), debug=True)
# test_step(s)
# test_task(s)
test_file(s)
s.close()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,75 @@
from pyhcl import *
from pyhcl.simulator import Simlite
import random
class Top(Module):
io = IO(
a=Input(U.w(32)),
b=Input(U.w(32)),
c=Output(U.w(32))
)
io.c <<= io.a + io.b
# 每次给输入端口赋值, 跑一个时间单位
def test_step(s):
s.start()
s.step([20, 20])
print("cnt: %d\t\tresult:%s" % (s.cnt, s.getRes()))
s.step([15, 10])
print("cnt: %d\t\tresult:%s" % (s.cnt, s.getRes()))
s.step([1000, 1])
print("cnt: %d\t\tresult:%s" % (s.cnt, s.getRes()))
s.step([999, 201])
print("cnt: %d\t\tresult:%s" % (s.cnt, s.getRes()))
s.stop()
def test_task(s):
tasks = []
tasks.append([20, 20])
tasks.append([15, 10])
tasks.append([1000, 1])
tasks.append([999, 201])
s.start_task('Top', tasks)
def randomInput(ifn):
fd = open(ifn, "w")
instr = ""
for i in range(100):
instr += "0 " + str(random.randint(1, 2000)) + ' ' + str(random.randint(1, 2000)) + "\n"
instr = instr + "-1\n"
fd.write(instr)
fd.close()
def test_file(s, num):
ifn = f"../myTests/tmp/Top_inputs" + str(num)
ofn = f"../myTests/tmp/Top_outputs" + str(num)
randomInput(ifn)
s.start(mode="task", ofn=ofn, ifn=ifn)
pass
def main():
# Emitter.dumpVerilog(Emitter.dump(Emitter.emit(Top()), "Add.fir"))
s1 = Simlite(Top(), debug=True)
s2 = Simlite(s1)
# test_step(s)
# test_task(s)
test_file(s1, 1)
test_file(s2, 2)
s1.close()
s2.close()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,101 @@
from pyhcl import *
from pysv import sv, DataType, Reference
from pyhcl.simulator import Simlite, DpiConfig
import random
class Add(BlackBox):
io = IO(
in1=Input(U.w(32)),
in2=Input(U.w(32)),
out=Output(U.w(32))
)
@sv(a=DataType.UInt, b=DataType.UInt, return_type=Reference(x=DataType.UInt))
def fn(a, b):
return a + b
addpysvmodule(Add, fn) # 黑盒与函数 # 转换得到.sv/bbox/Add.svSV里调用python函数
compile_and_binding_all() # 编译得到共享库 到.build文件夹下, 生成 SV binding文件 .sv/pkg/pysv_pkg.sv
class Top(Module):
io = IO(
a=Input(U.w(32)),
b=Input(U.w(32)),
c=Output(U.w(32))
)
add = Add()
add.io.in1 <<= io.a
add.io.in2 <<= io.b
io.c <<= add.io.out
# 每次给输入端口赋值, 跑一个时间单位
def test_step(s):
s.start()
s.step([20, 20])
print("cnt: %d\t\tresult:%s" % (s.cnt, s.getRes()))
s.step([15, 10])
print("cnt: %d\t\tresult:%s" % (s.cnt, s.getRes()))
s.step([1000, 1])
print("cnt: %d\t\tresult:%s" % (s.cnt, s.getRes()))
s.step([999, 201])
print("cnt: %d\t\tresult:%s" % (s.cnt, s.getRes()))
s.stop()
def test_task(s):
tasks = []
tasks.append([20, 20])
tasks.append([15, 10])
tasks.append([1000, 1])
tasks.append([999, 201])
s.start_task('Top', tasks)
def randomInput(ifn):
fd = open(ifn, "w")
instr = ""
for i in range(100):
instr += "0 " + str(random.randint(1, 2000)) + ' ' + str(random.randint(1, 2000)) + "\n"
instr = instr + "-1\n"
fd.write(instr)
fd.close()
def test_file(s):
ifn = f"../myTests/tmp/Top_inputs"
ofn = f"../myTests/tmp/Top_outputs"
randomInput(ifn)
s.start(mode="task", ofn=ofn, ifn=ifn)
pass
def main():
cfg = DpiConfig()
s = Simlite(Top(), dpiconfig=cfg, debug=True)
# test_step(s)
# test_task(s)
test_file(s)
s.close()
if __name__ == '__main__':
cfg = DpiConfig()
# Emitter.dumpVerilog(Emitter.dump(Emitter.emit(Top()), "Add.fir"))
s = Simlite(Top(), dpiconfig=cfg, debug=True)
s.start()
s.step([20, 20])
s.step([15, 10])
s.step([1000, 1])
s.step([999, 201])
s.close()

62
myTests/test_verilog.py Normal file
View File

@ -0,0 +1,62 @@
from pyhcl.simulator.simlite_verilog import Simlite
import random
# 每次给输入端口赋值, 跑一个时间单位
def test_step(s):
s.start()
s.step([0, 0, 20, 20])
print("cnt: %d\t\tresult:%s" % (s.cnt, s.getRes()))
s.step([1, 0, 15, 10])
print("cnt: %d\t\tresult:%s" % (s.cnt, s.getRes()))
s.step([0, 0, 1000, 1])
print("cnt: %d\t\tresult:%s" % (s.cnt, s.getRes()))
s.step([1, 0, 999, 201])
print("cnt: %d\t\tresult:%s" % (s.cnt, s.getRes()))
s.stop()
def test_task(s):
tasks = []
tasks.append([0, 0, 20, 20])
tasks.append([1, 0, 15, 10])
tasks.append([0, 0, 1000, 1])
tasks.append([1, 0, 999, 201])
s.start_task('Top', tasks)
def randomInput(ifn):
fd = open(ifn, "w")
instr = ""
for i in range(100):
instr += "0 0 0 " + str(random.randint(1, 2000)) + ' ' + str(random.randint(1, 2000)) + "\n"
instr = instr + "-1\n"
fd.write(instr)
fd.close()
def test_file(s):
ifn = f"../myTests/tmp/Top_inputs"
ofn = f"../myTests/tmp/Top_outputs"
randomInput(ifn)
s.start(mode="task", ofn=ofn, ifn=ifn)
pass
def main():
# Emitter.dumpVerilog(Emitter.dump(Emitter.emit(Top()), "Add.fir"))
top_module_name = 'Top.v'
dut_path = 'myTests/tmp/dut/'
s = Simlite(top_module_name, dut_path, debug=True)
# test_step(s)
# test_task(s)
test_file(s)
s.close()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,62 @@
from pyhcl.simulator.simlite_verilog import Simlite
import random
# 每次给输入端口赋值, 跑一个时间单位
def test_step(s):
s.start()
s.step([0, 0, 20, 20])
print("cnt: %d\t\tresult:%s" % (s.cnt, s.getRes()))
s.step([1, 0, 15, 10])
print("cnt: %d\t\tresult:%s" % (s.cnt, s.getRes()))
s.step([0, 0, 1000, 1])
print("cnt: %d\t\tresult:%s" % (s.cnt, s.getRes()))
s.step([1, 0, 999, 201])
print("cnt: %d\t\tresult:%s" % (s.cnt, s.getRes()))
s.stop()
def test_task(s):
tasks = []
tasks.append([0, 0, 20, 20])
tasks.append([1, 0, 15, 10])
tasks.append([0, 0, 1000, 1])
tasks.append([1, 0, 999, 201])
s.start_task('Top', tasks)
def randomInput(ifn):
fd = open(ifn, "w")
instr = ""
for i in range(100):
instr += "0 0 0 " + str(random.randint(1, 2000)) + ' ' + str(random.randint(1, 2000)) + "\n"
instr = instr + "-1\n"
fd.write(instr)
fd.close()
def test_file(s, num):
ifn = f"../myTests/tmp/Top_inputs" + str(num)
ofn = f"../myTests/tmp/Top_outputs" + str(num)
randomInput(ifn)
s.start(mode="task", ofn=ofn, ifn=ifn)
pass
def main():
top_module_name = 'Top.v'
dut_path = 'myTests/tmp/dut/'
s1 = Simlite(top_module_name, dut_path, debug=True)
s2 = Simlite(module=s1)
test_file(s1, 1)
test_file(s2, 2)
s1.close()
s2.close()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,76 @@
from pyhcl.util.pysvtools import compile_and_binding_all_func
from pyhcl.simulator.simlite_verilog import Simlite, DpiConfig
from pysv import sv, DataType, Reference
import random
@sv(a=DataType.UInt, b=DataType.UInt, return_type=Reference(x=DataType.UInt))
def fn(a, b):
return a + b
# 传入函数列表
# 编译得到共享库 到.build文件夹下, 生成 SV binding文件 .sv/pkg/pysv_pkg.sv
compile_and_binding_all_func([fn])
# 每次给输入端口赋值, 跑一个时间单位
def test_step(s):
s.start()
s.step([0, 0, 20, 20])
print("cnt: %d\t\tresult:%s" % (s.cnt, s.getRes()))
s.step([1, 0, 15, 10])
print("cnt: %d\t\tresult:%s" % (s.cnt, s.getRes()))
s.step([0, 0, 1000, 1])
print("cnt: %d\t\tresult:%s" % (s.cnt, s.getRes()))
s.step([1, 0, 999, 201])
print("cnt: %d\t\tresult:%s" % (s.cnt, s.getRes()))
s.stop()
def test_task(s):
tasks = []
tasks.append([0, 0, 20, 20])
tasks.append([1, 0, 15, 10])
tasks.append([0, 0, 1000, 1])
tasks.append([1, 0, 999, 201])
s.start_task('Top', tasks)
def randomInput(ifn):
fd = open(ifn, "w")
instr = ""
for i in range(100):
instr += "0 0 0 " + str(random.randint(1, 2000)) + ' ' + str(random.randint(1, 2000)) + "\n"
instr = instr + "-1\n"
fd.write(instr)
fd.close()
def test_file(s):
ifn = f"../myTests/tmp/Top_inputs"
ofn = f"../myTests/tmp/Top_outputs"
randomInput(ifn)
s.start(mode="task", ofn=ofn, ifn=ifn)
pass
def main():
# Emitter.dumpVerilog(Emitter.dump(Emitter.emit(Top()), "Add.fir"))
top_module_name = 'Top.v'
dut_path = 'myTests/tmp/dut_pysv/'
# 创建 DpiConfig对象传入调用Python函数的SV文件名列表
cfg = DpiConfig(['Add.sv'])
s = Simlite(top_module_name, dut_path, dpiconfig=cfg, debug=True)
# test_step(s)
# test_task(s)
test_file(s)
s.close()
if __name__ == '__main__':
main()

10
myTests/tmp/Top.v Normal file
View File

@ -0,0 +1,10 @@
module Top(
input clock,
input reset,
input [31:0] io_a,
input [31:0] io_b,
output [31:0] io_c
);
wire [32:0] _GEN_0 = io_a + io_b;
assign io_c = _GEN_0[31:0];
endmodule

101
myTests/tmp/Top_inputs Normal file
View File

@ -0,0 +1,101 @@
0 0 0 1502 468
0 0 0 45 1030
0 0 0 701 1944
0 0 0 1604 1348
0 0 0 1060 862
0 0 0 1235 959
0 0 0 634 1986
0 0 0 538 1387
0 0 0 952 833
0 0 0 1285 1190
0 0 0 706 740
0 0 0 1426 1018
0 0 0 238 131
0 0 0 623 992
0 0 0 1954 1555
0 0 0 1718 1349
0 0 0 1966 1035
0 0 0 575 1403
0 0 0 1828 331
0 0 0 350 882
0 0 0 1142 1056
0 0 0 1990 1750
0 0 0 1378 365
0 0 0 1890 913
0 0 0 521 873
0 0 0 1105 1016
0 0 0 157 458
0 0 0 1500 99
0 0 0 515 739
0 0 0 1700 1195
0 0 0 1951 1003
0 0 0 734 421
0 0 0 519 876
0 0 0 424 1078
0 0 0 1081 1193
0 0 0 41 1798
0 0 0 419 1676
0 0 0 1650 534
0 0 0 135 1170
0 0 0 403 1752
0 0 0 640 1249
0 0 0 916 72
0 0 0 715 1405
0 0 0 949 1178
0 0 0 45 1099
0 0 0 1875 414
0 0 0 430 648
0 0 0 1847 1628
0 0 0 436 110
0 0 0 889 377
0 0 0 1119 1690
0 0 0 1750 824
0 0 0 265 542
0 0 0 859 1574
0 0 0 1746 1043
0 0 0 326 472
0 0 0 303 671
0 0 0 1990 38
0 0 0 1330 1859
0 0 0 1843 979
0 0 0 1363 1911
0 0 0 213 1712
0 0 0 1744 1847
0 0 0 585 1606
0 0 0 879 866
0 0 0 234 379
0 0 0 553 430
0 0 0 589 369
0 0 0 1827 1745
0 0 0 792 1238
0 0 0 694 1859
0 0 0 659 1582
0 0 0 1570 1503
0 0 0 1454 649
0 0 0 375 1315
0 0 0 308 1675
0 0 0 1254 619
0 0 0 295 274
0 0 0 809 944
0 0 0 207 585
0 0 0 1125 133
0 0 0 608 520
0 0 0 1524 91
0 0 0 1067 684
0 0 0 1298 456
0 0 0 279 1632
0 0 0 281 1078
0 0 0 1670 312
0 0 0 744 987
0 0 0 80 1103
0 0 0 7 66
0 0 0 1653 1094
0 0 0 1658 498
0 0 0 1478 884
0 0 0 1675 1776
0 0 0 1329 1984
0 0 0 597 1638
0 0 0 1097 519
0 0 0 1499 1922
0 0 0 1769 1706
-1

105
myTests/tmp/Top_outputs Normal file
View File

@ -0,0 +1,105 @@
internalsDump:
Version: Verilator 4.028 2020-02-06
Argv: ./obj_dir/VTop
scopesDump:
1970
1075
2645
2952
1922
2194
2620
1925
1785
2475
1446
2444
369
1615
3509
3067
3001
1978
2159
1232
2198
3740
1743
2803
1394
2121
615
1599
1254
2895
2954
1155
1395
1502
2274
1839
2095
2184
1305
2155
1889
988
2120
2127
1144
2289
1078
3475
546
1266
2809
2574
807
2433
2789
798
974
2028
3189
2822
3274
1925
3591
2191
1745
613
983
958
3572
2030
2553
2241
3073
2103
1690
1983
1873
569
1753
792
1258
1128
1615
1751
1754
1911
1359
1982
1731
1183
73
2747
2156
2362
3451
3313
2235
1616
3421
3475

10
myTests/tmp/dut/Add.v Normal file
View File

@ -0,0 +1,10 @@
module Add(
input [31:0] in1 ,
input [31:0] in2 ,
output [31:0] out
);
assign out = in1 + in2 ;
endmodule

20
myTests/tmp/dut/Top.v Normal file
View File

@ -0,0 +1,20 @@
module Top(
input clock,
input reset,
input [31:0] io_a,
input [31:0] io_b,
output [31:0] io_c
);
wire [31:0] add_in1;
wire [31:0] add_in2;
wire [31:0] add_out;
Add add (
.in1(add_in1),
.in2(add_in2),
.out(add_out)
);
assign io_c = add_out;
assign add_in1 = io_a;
assign add_in2 = io_b;
endmodule

View File

@ -0,0 +1,23 @@
// Pysv
module Add(
input [31:0] in1 ,
input [31:0] in2 ,
output [31:0] out
);
wire [31:0] __tmp_in1 ;
wire [31:0] __tmp_in2 ;
reg [31:0] __tmp_out ;
assign __tmp_in1 = in1 ;
assign __tmp_in2 = in2 ;
import pysv::* ;
always begin
fn(__tmp_in1, __tmp_in2, __tmp_out) ;
end
assign out = __tmp_out ;
endmodule

View File

@ -0,0 +1,19 @@
module Top(
input clock,
input reset,
input [31:0] io_a,
input [31:0] io_b,
output [31:0] io_c
);
wire [31:0] add_in1;
wire [31:0] add_in2;
wire [31:0] add_out;
Add add (
.in1(add_in1),
.in2(add_in2),
.out(add_out)
);
assign io_c = add_out;
assign add_in1 = io_a;
assign add_in2 = io_b;
endmodule

View File

@ -0,0 +1,5 @@
module Add :
input in1 : UInt<32>
input in2 : UInt<32>
output out : UInt<32>
out <= add(in1, in2)

19
myTests/tmp/firrtl/M.fir Normal file
View File

@ -0,0 +1,19 @@
circuit M :
module Add :
input in1 : UInt<32>
input in2 : UInt<32>
output out : UInt<32>
out <= add(in1, in2)
module M :
input clock : Clock
input reset : UInt<1>
output io : {flip a : UInt<32>, flip b : UInt<32>, c : UInt<32>}
inst bbox of Add
bbox.in1 <= io.a
bbox.in2 <= io.b
io.c <= bbox.out

View File

@ -10,14 +10,17 @@ from pyhcl.util.firrtltools import replacewithfirmod
class Emitter:
# 传入模块对象返回str---firrtl代码
@staticmethod
def emit(m: Module, toverilog=False) -> str:
circuit = Emitter.elaborate(m)
# 将Circuit对象转化为str
if(toverilog):
return circuit.verilog_serialize()
else:
return circuit.serialize()
return circuit.serialize() # firrtl代码
# 传入模块对象返回Circuit对象
@staticmethod
def elaborate(m: Module) -> low_ir.Circuit:
ec: EmitterContext = EmitterContext(m, {}, Counter())
@ -27,6 +30,7 @@ class Emitter:
DynamicContext.clearScope()
return circuit
# 传入firrtl代码和文件名将firrtl代码写入文件中并返回文件路径
@staticmethod
def dump(s, filename) -> str:
if not os.path.exists('.fir'):
@ -38,6 +42,7 @@ class Emitter:
return f
# 传入firrtl文件路径执行firrtl命令将firrtl代码编译为verilog代码
@staticmethod
def dumpVerilog(filename):
os.system('firrtl -i %s -o %s -X verilog' % (filename, filename))

View File

@ -126,6 +126,7 @@ class Handler:
def __hash__(self):
return hash(self.sig)
class DpiConfig(object):
def __init__(self, pkg_sv_path=".sv/pkg/pysv_pkg.sv", bbox_sv_dir=".sv/bbox/", lib_path=".build/libpysv.so"):
self.sv = pkg_sv_path

View File

@ -7,6 +7,7 @@ from ..simulator import DpiConfig
class Simlite(object):
# init for fork method
# 根据传入的Simlite对象实例深度复制得到新的Simlite对象实例
def __fork_init(self, other):
import copy
self.low_module = other.low_module
@ -22,6 +23,8 @@ class Simlite(object):
# recover the status base on self.steps
self.steps = []
self.debug = False
# 开始仿真
self.start()
for inputs in other.steps:
self.step(inputs)
@ -31,90 +34,157 @@ class Simlite(object):
self.name = other.name + "_" + str(other.fork_cnt)
other.fork_cnt += 1
# 传入moduleModule子类对象 和 harness代码
# dpiconfig对象
# self.sv = pkg_sv_path # .sv/pkg/pysv_pkg.sv
# self.lib = lib_path # .build/libpysv.so
# self.bdir = bbox_sv_dir # .sv/bbox/
# self.bname = " ".join(os.listdir(self.bdir)) # .sv/bbox/文件夹包含的文件或文件夹的名字的列表
def __init__(self, module, harness_code=None, dpiconfig: DpiConfig = None, debug=False, name="sim0"):
self.raw_in = None
self.efn = None
self.ofn = None
self.ifn = None
# module为Simlite对象实例
if isinstance(module, Simlite):
self.__fork_init(module)
else:
# self.low_module为Circuit对象
self.low_module = Emitter.elaborate(module)
self.dpiconfig = dpiconfig
# 模块名
module_name = self.low_module.main
# ports = next(m.typ for m in low_module.modules if m.name == module_name)
# 模块端口--字典 _ios: Dict[str, Union[Input, Output]]
ports = module.io.value._ios
# 输入端口名列表
self.inputs = []
self.inputs_values = []
# 输出端口名列表
self.outputs = []
# 结果列表
self.results = []
# 计数器
self.cnt = 0
# 步骤列表
self.steps = []
# 仿真名
self.name = name
self.debug = debug
self.fork_cnt = 0
# k为键v为值
for k, v in ports.items():
if (type(v) == Input):
self.inputs.append(k)
self.inputs.append(k) # 输入端口名
elif (type(v) == Output):
self.outputs.append(k)
self.outputs.append(k) # 输出端口名
self.dut_name = module_name
self.dut_name = module_name # 模块名
# 通过harness代码开始仿真
if harness_code:
self.compile(harness_code)
else:
# 传入module_name和ports生成harness代码然后仿真
self.compile(self.codegen(module_name, ports))
def close(self):
os.system("cd .. && rm -r .sv .fir .build 2>/dev/null")
def stop(self):
instr = '-1'.encode(encoding="utf-8") + b'\n'
self.p.stdin.write(instr)
self.p.stdin.flush()
def close(self):
# 2表示标准错误stderr, >表示重定向 /dev/mull表示空设备
# 2>/dev/nul,将标准错误重定向到空设备里,即不输出错误信息
# 追加 2>/dev/null 在命令末尾,表示:把错误输出到 “黑洞”
os.system("cd .. && rm -r .sv .fir .build 2>/dev/null")
# self.p.kill()
# pass
# 传入harness代码调用firrtl命令得到verilog代码调用verilater最终得到V{dut_name}
def compile(self, harness_code):
print("\n\n---------------------verilator build info--------------------------\n")
dpiconfig = self.dpiconfig
# 在当前目录创建simulation文件夹
try:
os.mkdir("simulation")
except FileExistsError:
pass
# 在simulation文件夹下创建dut_name-harness.cpp写入harness代码
with open(f"./simulation/{self.dut_name}-harness.cpp", "w+") as f:
f.write(harness_code)
# 在simulation文件夹下创建dut_name-harness.fir写入firrtl代码
with open(f"./simulation/{self.dut_name}.fir", "w+") as f:
f.write(self.low_module.serialize())
# 调用firrtl命令传入firrtl代码得到verilog代码
# firrtl -i ./simulation/{self.dut_name}.fir -o ./simulation/{self.dut_name}.v -X verilog
# print(f"firrtl -i ./simulation/{self.dut_name}.fir -o ./simulation/{self.dut_name}.v -X verilog")
os.system(
f"firrtl -i ./simulation/{self.dut_name}.fir -o ./simulation/{self.dut_name}.v -X verilog")
vfn = "{}.v".format(self.dut_name)
hfn = "{}-harness.cpp".format(self.dut_name)
mfn = "V{}.mk".format(self.dut_name)
efn = "V{}".format(self.dut_name)
vfn = "{}.v".format(self.dut_name) # {dut_name}.v
hfn = "{}-harness.cpp".format(self.dut_name) # {dut_name}-harness.cpp
mfn = "V{}.mk".format(self.dut_name) # V{dut_name}.mk
efn = "V{}".format(self.dut_name) # V{dut_name}
# dpi
if self.dpiconfig:
pysv_pkg = "{}_pysv_pkg.sv".format(self.dut_name)
pysv_lib = "libpysv_{}.so".format(self.dut_name)
# dpiconfig对象
# self.sv = pkg_sv_path # .sv/pkg/pysv_pkg.sv
# self.lib = lib_path # .build/libpysv.so
# self.bdir = bbox_sv_dir # .sv/bbox/
# self.bname = " ".join(os.listdir(self.bdir)) # .sv/bbox/文件夹包含的文件或文件夹的名字的列表
pysv_pkg = "{}_pysv_pkg.sv".format(self.dut_name) # {dut_name}_pysv_pkg.sv
pysv_lib = "libpysv_{}.so".format(self.dut_name) # libpysv_{dut_name}.so
# cp .sv/pkg/pysv_pkg.sv ./simulation/{dut_name}_pysv_pkg.sv # 由各python函数生成得到的SV binding文件
os.system("cp {} ./simulation/{}_pysv_pkg.sv".format(dpiconfig.sv, self.dut_name))
# cp .build/libpysv.so ./simulation/libpysv_{dut_name}.so # 由各python函数编译得到的共享库
os.system("cp {} ./simulation/libpysv_{}.so".format(dpiconfig.lib, self.dut_name))
# cp .sv/bbox/ ./simulation/ # 使用了python函数的SV文件使用pysv
os.system("cp {}* ./simulation/".format(dpiconfig.bdir))
# 转换目录到./simulation文件夹下
os.chdir("./simulation")
# Using verilator backend
# --cc Create C++ output
# --trace Enable waveform creation
# --exe Link to create executable
# --prefix <topname> Name of top level class
# --top-module <topname> Name of top level input module
# .so为 与 Verilog 代码链接的可选对象或库文件
# In the verilator command, include the shared library and the generated binding file
# verilator --cc --trace --exe --prefix VTop --top-module Top Top_pysv_pkg.sv {bbx} Top.v libpysv_Top.so Top-harness.cpp
# verilator --cc --trace --exe --prefix VTop --top-module Top Top_pysv_pkg.sv Add.sv Top.v libpysv_Top.so Top-harness.cpp
os.system(
"verilator --cc --trace --exe --prefix {prefix} --top-module {top} {pkg} {bbx} {vfn} {lib} {hfn}" \
.format(top=self.dut_name, bbx=dpiconfig.bname, vfn=vfn, hfn=hfn, pkg=pysv_pkg, lib=pysv_lib,
prefix=efn))
# cp libpysv_Top.so ./obj_dir/
os.system("cp {} ./obj_dir/".format(pysv_lib))
else:
# 改变当前工作目录到指定的路径--simulation
os.chdir("./simulation")
# Using verilator backend
# 传入verilog代码和harness代码使用verilator进行仿真
# --cc Create C++ output
# --trace Enable waveform creation
# --exe Link to create executable
# verilator --cc {dut_name}.v --trace --exe {dut_name}-harness.cpp
os.system(
"verilator --cc {vfn} --trace --exe {hfn}".format(vfn=vfn, hfn=hfn))
# make -j -C ./obj_dir -f V{dut_name}.mk V{dut_name}
os.system(
"make -j -C ./obj_dir -f {mfn} {efn}".format(mfn=mfn, efn=efn))
@ -125,69 +195,116 @@ class Simlite(object):
else:
os.system("./obj_dir/{}&".format(efn))
"""
self.efn = efn
self.efn = efn # V{dut_name}
# 开启仿真默认模式为ia
def start(self, mode="ia", ofn=None, ifn=None):
env = None
if self.dpiconfig:
env = {"LD_LIBRARY_PATH": "."}
# 确保 libpysv.so共享库 在 LD_LIBRARY_PATH 中-- pysv要求的
env = {"LD_LIBRARY_PATH": "."} # 环境变量
if mode == "ia":
args = [f"./obj_dir/{self.efn}"]
args = [f"./obj_dir/{self.efn}"] # ./obj_dir/VAdder
# 创建子进程执行./obj_dir/VAdder
self.p = subprocess.Popen(args, env=env, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
self.dropinfo()
elif mode == "task":
args = [f"./obj_dir/{self.efn}"]
args = [f"./obj_dir/{self.efn}"] # ./obj_dir/VAdder
infile = open(ifn, "r")
outfile = open(ofn, "a")
self.p = subprocess.Popen(args, env=env, shell=True, stdin=infile, stdout=outfile)
outfile = open(ofn, "w")
# subprocess 模块允许我们启动一个新进程,并连接到它们的输入/输出/错误管道,从而获取返回值。
# Popen 是 subprocess的核心子进程的创建和管理都靠它处理
# argsshell命令可以是字符串或者序列类型list元组
# env用于指定子进程的环境变量。如果 env = None子进程的环境变量将从父进程中继承
# shell如果该参数为 True将通过操作系统的 shell 执行指定的命令
# stdin、stdout 和 stderr子进程的标准输入、输出和错误句柄
# 创建子进程执行./obj_dir/VAdder
# self.p = subprocess.Popen(args, env=env, shell=True, stdin=subprocess.PIPE, stdout=outfile)
self.p = subprocess.Popen(args, env=env, stdin=infile, stdout=outfile)
# 读掉stdout缓冲区里的5行info
def dropinfo(self):
for i in range(5):
self.p.stdout.readline()
if self.debug:
print("\n\n--------------------------sim result---------------------------")
# 传入整个任务tsk开始仿真task模式 name为模块名tsk为二维列表包含每一次的输入端口列表值
def start_task(self, name, tsk):
try:
os.mkdir("tmp")
except FileExistsError:
pass
# input file
ifn = f"/tmp/{name}_inputs"
ofn = f"/tmp/{name}_outputs"
ifn = f"./tmp/{name}_inputs"
ofn = f"./tmp/{name}_outputs"
fd = open(ifn, "a")
instr = ""
for inputs in tsk:
# 一次输入端口值列表
inputs = [str(x) for x in inputs]
# 输入端口值--str
self.raw_in = " ".join(inputs)
self.raw_in = "0 " + self.raw_in
self.raw_in = "0 " + self.raw_in # 0表示状态值状态值小于0会退出
instr += self.raw_in + "\n"
# 所有输入端口值,每一次以\n分隔
instr += "-1\n"
# 将tsk整个任务包含每一次的输入端口列表值传入ifn输入文件
fd.write(instr)
fd.close()
# 传入输入文件和输出文件,开始仿真
self.start("task", ofn, ifn)
# 传入输入端口值列表
def step(self, inputs):
# 每一步存入steps列表
self.steps.append(inputs)
self.inputs_values = inputs
# 输入端口值列表
inputs = [str(x) for x in inputs]
# 输入端口值--str
self.raw_in = " ".join(inputs)
self.raw_in = "0 " + self.raw_in
instr = self.raw_in.encode(encoding="utf-8") + b'\n'
# 将输入端口值--str写入子进程的标准输入stdin缓冲区--传入inputs数组
self.p.stdin.write(instr)
# 刷新stdin缓冲区, 即将缓冲区中的数据立刻写入
self.p.stdin.flush()
# 读取子进程的标准输出stdout里的内容--即outputs数组的值
line = self.p.stdout.readline()
# 刷新stdout缓冲区
self.p.stdout.flush()
# 从outputs数组中读取的结果--输出端口值
self.raw_res = str(line, encoding="utf-8").strip()
strs = self.raw_res.split(" ")
# 结果列表
res = [int(x) for x in strs]
self.results = res
# debug模式
if self.debug:
self.pprint()
# 计数器值加一
self.cnt += 1
return res
# 输出仿真相关值--输入端口值、输出端口值
def pprint(self):
print("")
# 仿真名 计数器值 IN 输入端口值
# 仿真名 计数器值 OUT 输出端口值
print(f"[{self.name}\t\t{self.cnt}]IN : {self.raw_in}")
print(f"[{self.name}\t\t{self.cnt}]OUT : {self.raw_res}")
def getRes(self):
return self.results
# 传入module_name和ports生成harness代码
def codegen(self, name, ports):
tempfile = """#include "V{modname}.h"
#include "verilated.h"
@ -197,12 +314,14 @@ class Simlite(object):
#include <cstdio>
vluint64_t main_time = 0; // See comments in first example
const vluint64_t sim_time = 1024;
double sc_time_stamp() {{ return main_time; }}
std::vector<unsigned long long> inputs, outputs;
#define INN {innum}
#define OUTN {outnum}
int status = 0;
void ioinit(){{
setvbuf(stdout,0,_IONBF, 0);
@ -214,10 +333,9 @@ void ioinit(){{
}}
void input_handler(){{
int status = 0;
std::cin>>status;
if(status<0)
exit(0);
if(status==-1)
return;
for(int i = 0; i < INN; i++){{
std::cin>>inputs[i];
}}
@ -242,11 +360,12 @@ int main(int argc, char** argv, char** env) {{
Verilated::traceEverOn(true);
VerilatedVcdC* tfp = new VerilatedVcdC;
top->trace(tfp, 0);
top->trace(tfp, 99);
tfp->open("wave.vcd");
while (!Verilated::gotFinish()) {{
input_handler();
if(status==-1) break;
//get inputs
{inputs_init}
top->eval();
@ -264,20 +383,22 @@ int main(int argc, char** argv, char** env) {{
outputs_log=self.handle_outputs())
return tempfile
# 处理input端口得到对应的harness代码 # 用inputs数组对输入端口进行赋值
def handle_inputs(self):
res = ""
taps = " "
i = 0
for n in self.inputs:
res += taps + f"top->io_{n} = inputs[{i}];\n"
res += taps + f"top->io_{n} = inputs[{i}];\n" # 对输入端口进行赋值
i += 1
return res
# 处理output端口得到对应的harness代码 # 对输出端口进行取值放入outputs数组
def handle_outputs(self):
res = ""
taps = " "
i = 0
for n in self.outputs:
res += taps + f"outputs[{i}] = top->io_{n};\n"
res += taps + f"outputs[{i}] = top->io_{n};\n" # 对输出端口进行取值
i += 1
return res

View File

@ -0,0 +1,433 @@
import os
import subprocess
from pyhcl import *
from ..simulator import DpiConfig
import re
class Simlite(object):
# init for fork method
# 根据传入的Simlite对象实例深度复制得到新的Simlite对象实例
def __fork_init(self, other):
import copy
self.low_module = other.low_module
self.dpiconfig = other.dpiconfig
if (hasattr(other, "efn")):
self.efn = other.efn
self.inputs = copy.deepcopy(other.inputs)
self.outputs = copy.deepcopy(other.outputs)
self.results = copy.deepcopy(other.outputs)
self.cnt = copy.deepcopy(other.cnt)
self.dut_name = copy.deepcopy(other.dut_name)
# recover the status base on self.steps
self.steps = []
self.debug = False
# 开始仿真
self.start()
for inputs in other.steps:
self.step(inputs)
assert (self.steps == other.steps)
self.debug = other.debug
self.name = other.name + "_" + str(other.fork_cnt)
other.fork_cnt += 1
# 传入moduleModule子类对象 和 harness代码
# dpiconfig对象
# self.sv = pkg_sv_path # .sv/pkg/pysv_pkg.sv
# self.lib = lib_path # .build/libpysv.so
# self.bdir = bbox_sv_dir # .sv/bbox/
# self.bname = " ".join(os.listdir(self.bdir)) # .sv/bbox/文件夹包含的文件或文件夹的名字的列表
def __init__(self, firrtl_path, module=None, harness_code=None, dpiconfig: DpiConfig = None, debug=False, name="sim0"):
self.raw_in = None
self.efn = None
self.ofn = None
self.ifn = None
# module为Simlite对象实例
if isinstance(module, Simlite):
self.__fork_init(module)
else:
# self.low_module为Circuit对象
fd = open(firrtl_path, "r")
self.firrtl_code = "".join(fd.readlines())
self.dpiconfig = dpiconfig
# 输入端口名列表
self.inputs = []
self.inputs_values = []
# 输出端口名列表
self.outputs = []
# 结果列表
self.results = []
# 计数器
self.cnt = 0
# 步骤列表
self.steps = []
# 仿真名
self.name = name
self.debug = debug
self.fork_cnt = 0
self.dut_name, self.inputs, self.outputs = self.firrtl_parse(firrtl_path)
# 通过harness代码开始仿真
if harness_code:
self.compile(harness_code)
else:
# 传入module_name和ports生成harness代码然后仿真
self.compile(self.codegen(self.dut_name))
# 解析FIRRTL代码, 返回输入端口名列表 和 输出端口名列表
def firrtl_parse(self, firrtl_path):
circuit_begin_match = r"circuit\s*([a-zA-Z0-9_]+)"
module_begin_match = r"module\s*([a-zA-Z0-9_]+)"
input_port_match = r"input\s*([a-zA-Z0-9_]+)"
output_port_match = r"output\s*([a-zA-Z0-9_]+)"
input_ports_name = []
output_ports_name = []
top_module_name = '0'
current_module_name = '1'
with open(firrtl_path, "r") as firrtl_file:
while firrtl_file:
firrtl_line = firrtl_file.readline().strip(' ') # 读取一行
# print(firrtl_line)
if firrtl_line == "": # 注:如果是空行,为'\n'
break
circuit_begin = re.search(circuit_begin_match, firrtl_line)
module_begin = re.search(module_begin_match, firrtl_line)
if circuit_begin:
top_module_name = circuit_begin.group(1)
# print(top_module_name)
if module_begin:
current_module_name = module_begin.group(1)
# print(current_module_name)
if current_module_name == top_module_name:
input_port = re.search(input_port_match, firrtl_line)
output_port = re.search(output_port_match, firrtl_line)
if input_port:
input_ports_name.append(input_port.group(1))
if output_port:
output_ports_name.append(output_port.group(1))
print(top_module_name)
print(input_ports_name)
print(output_ports_name)
return top_module_name, input_ports_name, output_ports_name
def stop(self):
instr = '-1'.encode(encoding="utf-8") + b'\n'
self.p.stdin.write(instr)
self.p.stdin.flush()
def close(self):
# 2表示标准错误stderr, >表示重定向 /dev/mull表示空设备
# 2>/dev/nul,将标准错误重定向到空设备里,即不输出错误信息
# 追加 2>/dev/null 在命令末尾,表示:把错误输出到 “黑洞”
os.system("cd .. && rm -r .sv .fir .build 2>/dev/null")
# self.p.kill()
# pass
# 传入harness代码调用firrtl命令得到verilog代码调用verilater最终得到V{dut_name}
def compile(self, harness_code):
print("\n\n---------------------verilator build info--------------------------\n")
dpiconfig = self.dpiconfig
# 在当前目录创建simulation文件夹
try:
os.mkdir("simulation")
except FileExistsError:
pass
# 在simulation文件夹下创建dut_name-harness.cpp写入harness代码
with open(f"./simulation/{self.dut_name}-harness.cpp", "w+") as f:
f.write(harness_code)
# 在simulation文件夹下创建dut_name-harness.fir写入firrtl代码
with open(f"./simulation/{self.dut_name}.fir", "w+") as f:
f.write(self.firrtl_code)
# 调用firrtl命令传入firrtl代码得到verilog代码
# firrtl -i ./simulation/{self.dut_name}.fir -o ./simulation/{self.dut_name}.v -X verilog
# print(f"firrtl -i ./simulation/{self.dut_name}.fir -o ./simulation/{self.dut_name}.v -X verilog")
os.system(
f"firrtl -i ./simulation/{self.dut_name}.fir -o ./simulation/{self.dut_name}.v -X verilog")
vfn = "{}.v".format(self.dut_name) # {dut_name}.v
hfn = "{}-harness.cpp".format(self.dut_name) # {dut_name}-harness.cpp
mfn = "V{}.mk".format(self.dut_name) # V{dut_name}.mk
efn = "V{}".format(self.dut_name) # V{dut_name}
# dpi
if self.dpiconfig:
# dpiconfig对象
# self.sv = pkg_sv_path # .sv/pkg/pysv_pkg.sv
# self.lib = lib_path # .build/libpysv.so
# self.bdir = bbox_sv_dir # .sv/bbox/
# self.bname = " ".join(os.listdir(self.bdir)) # .sv/bbox/文件夹包含的文件或文件夹的名字的列表
pysv_pkg = "{}_pysv_pkg.sv".format(self.dut_name) # {dut_name}_pysv_pkg.sv
pysv_lib = "libpysv_{}.so".format(self.dut_name) # libpysv_{dut_name}.so
# cp .sv/pkg/pysv_pkg.sv ./simulation/{dut_name}_pysv_pkg.sv # 由各python函数生成得到的SV binding文件
os.system("cp {} ./simulation/{}_pysv_pkg.sv".format(dpiconfig.sv, self.dut_name))
# cp .build/libpysv.so ./simulation/libpysv_{dut_name}.so # 由各python函数编译得到的共享库
os.system("cp {} ./simulation/libpysv_{}.so".format(dpiconfig.lib, self.dut_name))
# cp .sv/bbox/ ./simulation/ # 使用了python函数的SV文件使用pysv
os.system("cp {}* ./simulation/".format(dpiconfig.bdir))
# 转换目录到./simulation文件夹下
os.chdir("./simulation")
# Using verilator backend
# --cc Create C++ output
# --trace Enable waveform creation
# --exe Link to create executable
# --prefix <topname> Name of top level class
# --top-module <topname> Name of top level input module
# .so为 与 Verilog 代码链接的可选对象或库文件
# In the verilator command, include the shared library and the generated binding file
# verilator --cc --trace --exe --prefix VTop --top-module Top Top_pysv_pkg.sv {bbx} Top.v libpysv_Top.so Top-harness.cpp
# verilator --cc --trace --exe --prefix VTop --top-module Top Top_pysv_pkg.sv Add.sv Top.v libpysv_Top.so Top-harness.cpp
os.system(
"verilator --cc --trace --exe --prefix {prefix} --top-module {top} {pkg} {bbx} {vfn} {lib} {hfn}" \
.format(top=self.dut_name, bbx=dpiconfig.bname, vfn=vfn, hfn=hfn, pkg=pysv_pkg, lib=pysv_lib,
prefix=efn))
# cp libpysv_Top.so ./obj_dir/
os.system("cp {} ./obj_dir/".format(pysv_lib))
else:
# 改变当前工作目录到指定的路径--simulation
os.chdir("./simulation")
# Using verilator backend
# 传入verilog代码和harness代码使用verilator进行仿真
# --cc Create C++ output
# --trace Enable waveform creation
# --exe Link to create executable
# verilator --cc {dut_name}.v --trace --exe {dut_name}-harness.cpp
os.system(
"verilator --cc {vfn} --trace --exe {hfn}".format(vfn=vfn, hfn=hfn))
# make -j -C ./obj_dir -f V{dut_name}.mk V{dut_name}
os.system(
"make -j -C ./obj_dir -f {mfn} {efn}".format(mfn=mfn, efn=efn))
# Run simulation backend program
"""
if dpiconfig:
os.system("LD_LIBRARY_PATH=. ./obj_dir/{}&".format(efn))
else:
os.system("./obj_dir/{}&".format(efn))
"""
self.efn = efn # V{dut_name}
# 开启仿真默认模式为ia
def start(self, mode="ia", ofn=None, ifn=None):
env = None
if self.dpiconfig:
# 确保 libpysv.so共享库 在 LD_LIBRARY_PATH 中-- pysv要求的
env = {"LD_LIBRARY_PATH": "."} # 环境变量
if mode == "ia":
args = [f"./obj_dir/{self.efn}"] # ./obj_dir/VAdder
# 创建子进程执行./obj_dir/VAdder
self.p = subprocess.Popen(args, env=env, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
self.dropinfo()
elif mode == "task":
args = [f"./obj_dir/{self.efn}"] # ./obj_dir/VAdder
infile = open(ifn, "r")
outfile = open(ofn, "w")
# subprocess 模块允许我们启动一个新进程,并连接到它们的输入/输出/错误管道,从而获取返回值。
# Popen 是 subprocess的核心子进程的创建和管理都靠它处理
# argsshell命令可以是字符串或者序列类型list元组
# env用于指定子进程的环境变量。如果 env = None子进程的环境变量将从父进程中继承
# shell如果该参数为 True将通过操作系统的 shell 执行指定的命令
# stdin、stdout 和 stderr子进程的标准输入、输出和错误句柄
# 创建子进程执行./obj_dir/VAdder
# self.p = subprocess.Popen(args, env=env, shell=True, stdin=subprocess.PIPE, stdout=outfile)
self.p = subprocess.Popen(args, env=env, stdin=infile, stdout=outfile)
# 读掉stdout缓冲区里的5行info
def dropinfo(self):
for i in range(5):
self.p.stdout.readline()
if self.debug:
print("\n\n--------------------------sim result---------------------------")
# 传入整个任务tsk开始仿真task模式 name为模块名tsk为二维列表包含每一次的输入端口列表值
def start_task(self, name, tsk):
try:
os.mkdir("tmp")
except FileExistsError:
pass
# input file
ifn = f"./tmp/{name}_inputs"
ofn = f"./tmp/{name}_outputs"
fd = open(ifn, "a")
instr = ""
for inputs in tsk:
# 一次输入端口值列表
inputs = [str(x) for x in inputs]
# 输入端口值--str
self.raw_in = " ".join(inputs)
self.raw_in = "0 " + self.raw_in # 0表示状态值状态值小于0会退出
instr += self.raw_in + "\n"
# 所有输入端口值,每一次以\n分隔
instr += "-1\n"
# 将tsk整个任务包含每一次的输入端口列表值传入ifn输入文件
fd.write(instr)
fd.close()
# 传入输入文件和输出文件,开始仿真
self.start("task", ofn, ifn)
# 传入输入端口值列表
def step(self, inputs):
# 每一步存入steps列表
self.steps.append(inputs)
self.inputs_values = inputs
# 输入端口值列表
inputs = [str(x) for x in inputs]
# 输入端口值--str
self.raw_in = " ".join(inputs)
self.raw_in = "0 " + self.raw_in
instr = self.raw_in.encode(encoding="utf-8") + b'\n'
# 将输入端口值--str写入子进程的标准输入stdin缓冲区--传入inputs数组
self.p.stdin.write(instr)
# 刷新stdin缓冲区, 即将缓冲区中的数据立刻写入
self.p.stdin.flush()
# 读取子进程的标准输出stdout里的内容--即outputs数组的值
line = self.p.stdout.readline()
# 刷新stdout缓冲区
self.p.stdout.flush()
# 从outputs数组中读取的结果--输出端口值
self.raw_res = str(line, encoding="utf-8").strip()
strs = self.raw_res.split(" ")
# 结果列表
res = [int(x) for x in strs]
self.results = res
# debug模式
if self.debug:
self.pprint()
# 计数器值加一
self.cnt += 1
return res
# 输出仿真相关值--输入端口值、输出端口值
def pprint(self):
print("")
# 仿真名 计数器值 IN 输入端口值
# 仿真名 计数器值 OUT 输出端口值
print(f"[{self.name}\t\t{self.cnt}]IN : {self.raw_in}")
print(f"[{self.name}\t\t{self.cnt}]OUT : {self.raw_res}")
def getRes(self):
return self.results
# 传入module_name和ports生成harness代码
def codegen(self, name):
tempfile = """#include "V{modname}.h"
#include "verilated.h"
#include "verilated_vcd_c.h"
#include <vector>
#include <iostream>
#include <cstdio>
vluint64_t main_time = 0; // See comments in first example
const vluint64_t sim_time = 1024;
double sc_time_stamp() {{ return main_time; }}
std::vector<unsigned long long> inputs, outputs;
#define INN {innum}
#define OUTN {outnum}
int status = 0;
void ioinit(){{
setvbuf(stdout,0,_IONBF, 0);
setvbuf(stdin,0,_IONBF, 0);
setvbuf(stderr,0,_IONBF, 0);
inputs.resize(INN);
outputs.resize(OUTN);
return;
}}
void input_handler(){{
std::cin>>status;
if(status==-1)
return;
for(int i = 0; i < INN; i++){{
std::cin>>inputs[i];
}}
return;
}}
void output_handler(){{
for(int i = 0; i < OUTN; i++){{
std::cout<<outputs[i]<<" ";
}}
std::cout<<std::endl;
return;
}}
int main(int argc, char** argv, char** env) {{
Verilated::commandArgs(argc, argv);
ioinit();
V{modname}* top = new V{modname};
Verilated::internalsDump(); // See scopes to help debug
Verilated::traceEverOn(true);
VerilatedVcdC* tfp = new VerilatedVcdC;
top->trace(tfp, 99);
tfp->open("wave.vcd");
while (!Verilated::gotFinish()) {{
input_handler();
if(status==-1) break;
//get inputs
{inputs_init}
top->eval();
tfp->dump(main_time);
{outputs_log}
//get output
output_handler();
main_time++;
}}
top->final();
tfp->close();
delete top;
return 0;
}}""".format(modname=name, innum=len(self.inputs), outnum=len(self.outputs), inputs_init=self.handle_inputs(),
outputs_log=self.handle_outputs())
return tempfile
# 处理input端口得到对应的harness代码 # 用inputs数组对输入端口进行赋值
def handle_inputs(self):
res = ""
taps = " "
i = 0
for n in self.inputs:
res += taps + f"top->{n} = inputs[{i}];\n" # 对输入端口进行赋值
i += 1
return res
# 处理output端口得到对应的harness代码 # 对输出端口进行取值放入outputs数组
def handle_outputs(self):
res = ""
taps = " "
i = 0
for n in self.outputs:
res += taps + f"outputs[{i}] = top->{n};\n" # 对输出端口进行取值
i += 1
return res

View File

@ -0,0 +1,440 @@
import os
import subprocess
import re
class DpiConfig(object):
def __init__(self, pysv_li, pkg_sv_path=".sv/pkg/pysv_pkg.sv", lib_path=".build/libpysv.so"):
self.sv = pkg_sv_path
self.lib = lib_path
self.bname = " ".join(pysv_li)
...
class Simlite(object):
# init for fork method
# 根据传入的Simlite对象实例深度复制得到新的Simlite对象实例
def __fork_init(self, other):
import copy
self.top_module_name = other.top_module_name
self.dut_path = other.dut_path
self.dpiconfig = other.dpiconfig
if hasattr(other, "efn"):
self.efn = other.efn
self.inputs = copy.deepcopy(other.inputs)
self.outputs = copy.deepcopy(other.outputs)
self.results = copy.deepcopy(other.outputs)
self.cnt = copy.deepcopy(other.cnt)
self.dut_name = copy.deepcopy(other.dut_name)
# recover the status base on self.steps
self.steps = []
self.debug = False
# 开始仿真
self.start()
for inputs in other.steps:
self.step(inputs)
assert (self.steps == other.steps)
self.debug = other.debug
self.name = other.name + "_" + str(other.fork_cnt)
other.fork_cnt += 1
# dpiconfig对象
# self.sv = pkg_sv_path # .sv/pkg/pysv_pkg.sv
# self.lib = lib_path # .build/libpysv.so
# self.bname = " ".join(pysv_li) # pysv_li存放调用了python函数的SV列表 (Add.sv)
def __init__(self, top_module_name='', dut_path='', harness_code=None, dpiconfig: DpiConfig = None, debug=False, name="sim0", module=None):
self.raw_in = None
self.efn = None
self.ofn = None
self.ifn = None
self.dut_path = dut_path
self.top_module_name = top_module_name
# module为Simlite对象实例
if isinstance(module, Simlite):
self.__fork_init(module)
else:
self.dpiconfig = dpiconfig
# 输入端口名列表
self.inputs = []
self.inputs_values = []
# 输出端口名列表
self.outputs = []
# 结果列表
self.results = []
# 计数器
self.cnt = 0
# 步骤列表
self.steps = []
# 仿真名
self.name = name
self.debug = debug
self.fork_cnt = 0
self.inputs, self.outputs = self.verilog_parse(dut_path, top_module_name)
# dut_name: Top top_module_name: Top.v
self.dut_name = top_module_name.split('.')[0] # 模块名
# 通过harness代码开始仿真
if harness_code:
self.compile(harness_code)
else:
# 传入module_name和ports生成harness代码然后仿真
self.compile(self.codegen(self.dut_name))
# 解析verilog代码, 返回输入端口名列表 和 输出端口名列表
def verilog_parse(self, dut_path, top_module_name):
dut_name = top_module_name.split('.')[0] # 模块名
top_module_path = dut_path + top_module_name
# print(top_module_path)
module_begin_match = r"module\s*([a-zA-Z0-9_]+)"
# 匹配输入端口 input clock, input [31:0] io_a
input_port_match = r"input\s*(reg|wire)*\s*(\[[0-9]+\:[0-9]+\]*)*\s*([a-zA-Z0-9_]+)"
# 匹配输出端口 output [31:0] io_c
output_port_match = r"output\s*(reg|wire)*\s*(\[[0-9]+\:[0-9]+\]*)*\s*([a-zA-Z0-9_]+)"
current_module_name = ''
input_ports_name = []
output_ports_name = []
with open(top_module_path, "r") as verilog_file:
while verilog_file:
verilog_line = verilog_file.readline().strip(' ') # 读取一行
# print(verilog_line)
if verilog_line == "": # 注:如果是空行,为'\n'
break
module_begin = re.search(module_begin_match, verilog_line)
if module_begin:
current_module_name = module_begin.group(1)
# print(current_module_name)
if current_module_name == dut_name:
input_port = re.search(input_port_match, verilog_line)
output_port = re.search(output_port_match, verilog_line)
if input_port:
# 输入端口名列表
input_ports_name.append(input_port.group(3))
if output_port:
# 输出端口名列表
output_ports_name.append(output_port.group(3))
# print(dut_name)
# print(input_ports_name)
# print(output_ports_name)
return input_ports_name, output_ports_name
# --timescale <timescale> Sets default timescale
# --timescale <timeunit>/<timeprecision>
def setTimeScale(self, timescale):
pass
def stop(self):
instr = '-1'.encode(encoding="utf-8") + b'\n'
self.p.stdin.write(instr)
self.p.stdin.flush()
def close(self):
# 2表示标准错误stderr, >表示重定向 /dev/mull表示空设备
# 2>/dev/nul,将标准错误重定向到空设备里,即不输出错误信息
# 追加 2>/dev/null 在命令末尾,表示:把错误输出到 “黑洞”
os.system("cd .. && rm -r .sv .fir .build 2>/dev/null")
# self.p.kill()
# pass
# 传入harness代码调用firrtl命令得到verilog代码调用verilater最终得到V{dut_name}
def compile(self, harness_code):
print("\n\n---------------------verilator build info--------------------------\n")
dpiconfig = self.dpiconfig
# 在当前目录创建simulation文件夹
try:
os.mkdir("simulation")
except FileExistsError:
pass
# 把所有dut文件复制到simulation文件夹下
os.system("cp {}* ./simulation/".format(self.dut_path))
# 在simulation文件夹下创建dut_name-harness.cpp写入harness代码
with open(f"./simulation/{self.dut_name}-harness.cpp", "w+") as f:
f.write(harness_code)
# vfn = "{}.v".format(self.dut_name) # {dut_name}.v
vfn = self.top_module_name
hfn = "{}-harness.cpp".format(self.dut_name) # {dut_name}-harness.cpp
mfn = "V{}.mk".format(self.dut_name) # V{dut_name}.mk
efn = "V{}".format(self.dut_name) # V{dut_name}
# dpi
if self.dpiconfig:
# dpiconfig对象
# self.sv = pkg_sv_path # .sv/pkg/pysv_pkg.sv
# self.lib = lib_path # .build/libpysv.so
# self.bname = " ".join(pysv_li) # pysv_li存放调用了python函数的SV列表 (Add.sv)
pysv_pkg = "{}_pysv_pkg.sv".format(self.dut_name) # {dut_name}_pysv_pkg.sv
pysv_lib = "libpysv_{}.so".format(self.dut_name) # libpysv_{dut_name}.so
# cp .sv/pkg/pysv_pkg.sv ./simulation/{dut_name}_pysv_pkg.sv # 由各python函数生成得到的SV binding文件
os.system("cp {} ./simulation/{}_pysv_pkg.sv".format(dpiconfig.sv, self.dut_name))
# cp .build/libpysv.so ./simulation/libpysv_{dut_name}.so # 由各python函数编译得到的共享库
os.system("cp {} ./simulation/libpysv_{}.so".format(dpiconfig.lib, self.dut_name))
# 转换目录到./simulation文件夹下
os.chdir("./simulation")
# Using verilator backend
# --cc Create C++ output
# --trace Enable waveform creation
# --exe Link to create executable
# --prefix <topname> Name of top level class
# --top-module <topname> Name of top level input module
# .so为 与 Verilog 代码链接的可选对象或库文件
# In the verilator command, include the shared library and the generated binding file
# verilator --cc --trace --exe --prefix VTop --top-module Top Top_pysv_pkg.sv {bbx} libpysv_Top.so Top-harness.cpp
# verilator --cc --trace --exe --prefix VTop --top-module Top Top_pysv_pkg.sv Add.sv Top.v libpysv_Top.so Top-harness.cpp
print(
"verilator --cc --trace --exe --prefix {prefix} --top-module {top} {pkg} {bbx} {vfn} {lib} {hfn}" \
.format(top=self.dut_name, bbx=dpiconfig.bname, vfn=vfn, hfn=hfn, pkg=pysv_pkg, lib=pysv_lib,
prefix=efn))
os.system(
"verilator --cc --trace --exe --prefix {prefix} --top-module {top} {pkg} {bbx} {vfn} {lib} {hfn}" \
.format(top=self.dut_name, bbx=dpiconfig.bname, vfn=vfn, hfn=hfn, pkg=pysv_pkg, lib=pysv_lib,
prefix=efn))
# cp libpysv_Top.so ./obj_dir/
os.system("cp {} ./obj_dir/".format(pysv_lib))
else:
# 改变当前工作目录到指定的路径--simulation
os.chdir("./simulation")
# Using verilator backend
# 传入verilog代码和harness代码使用verilator进行仿真
# --cc Create C++ output
# --trace Enable waveform creation
# --exe Link to create executable
# verilator --cc {dut_name}.v --trace --exe {dut_name}-harness.cpp
print("verilator --cc {vfn} --trace --exe {hfn}".format(vfn=vfn, hfn=hfn))
os.system(
"verilator --cc {vfn} --trace --exe {hfn}".format(vfn=vfn, hfn=hfn))
# make -j -C ./obj_dir -f V{dut_name}.mk V{dut_name}
os.system(
"make -j -C ./obj_dir -f {mfn} {efn}".format(mfn=mfn, efn=efn))
# Run simulation backend program
"""
if dpiconfig:
os.system("LD_LIBRARY_PATH=. ./obj_dir/{}&".format(efn))
else:
os.system("./obj_dir/{}&".format(efn))
"""
self.efn = efn # V{dut_name}
# 开启仿真默认模式为ia
def start(self, mode="ia", ofn=None, ifn=None):
env = None
if self.dpiconfig:
# 确保 libpysv.so共享库 在 LD_LIBRARY_PATH 中-- pysv要求的
env = {"LD_LIBRARY_PATH": "."} # 环境变量
if mode == "ia":
args = [f"./obj_dir/{self.efn}"] # ./obj_dir/V{dut_name}
# 创建子进程执行./obj_dir/{dut_name}
self.p = subprocess.Popen(args, env=env, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
self.dropinfo()
elif mode == "task":
args = [f"./obj_dir/{self.efn}"] # ./obj_dir/V{dut_name}
infile = open(ifn, "r")
outfile = open(ofn, "w")
# subprocess 模块允许我们启动一个新进程,并连接到它们的输入/输出/错误管道,从而获取返回值。
# Popen 是 subprocess的核心子进程的创建和管理都靠它处理
# argsshell命令可以是字符串或者序列类型list元组
# env用于指定子进程的环境变量。如果 env = None子进程的环境变量将从父进程中继承
# shell如果该参数为 True将通过操作系统的 shell 执行指定的命令
# stdin、stdout 和 stderr子进程的标准输入、输出和错误句柄
# 创建子进程执行./obj_dir/VAdder
# self.p = subprocess.Popen(args, env=env, shell=True, stdin=subprocess.PIPE, stdout=outfile)
self.p = subprocess.Popen(args, env=env, stdin=infile, stdout=outfile)
# 读掉stdout缓冲区里的5行info
def dropinfo(self):
for i in range(5):
self.p.stdout.readline()
if self.debug:
print("\n\n--------------------------sim result---------------------------")
# 传入整个任务tsk开始仿真task模式 name为模块名tsk为二维列表包含每一次的输入端口列表值
def start_task(self, name, tsk):
try:
os.mkdir("tmp")
except FileExistsError:
pass
# input file
ifn = f"./tmp/{name}_inputs"
ofn = f"./tmp/{name}_outputs"
fd = open(ifn, "a")
instr = ""
for inputs in tsk:
# 一次输入端口值列表
inputs = [str(x) for x in inputs]
# 输入端口值--str
self.raw_in = " ".join(inputs)
self.raw_in = "0 " + self.raw_in # 0表示状态值状态值小于0会退出
instr += self.raw_in + "\n"
# 所有输入端口值,每一次以\n分隔
instr += "-1\n"
# 将tsk整个任务包含每一次的输入端口列表值传入ifn输入文件
fd.write(instr)
fd.close()
# 传入输入文件和输出文件,开始仿真
self.start("task", ofn, ifn)
# 传入输入端口值列表
def step(self, inputs):
# 每一步存入steps列表
self.steps.append(inputs)
self.inputs_values = inputs
# 输入端口值列表
inputs = [str(x) for x in inputs]
# 输入端口值--str
self.raw_in = " ".join(inputs)
self.raw_in = "0 " + self.raw_in
instr = self.raw_in.encode(encoding="utf-8") + b'\n'
# 将输入端口值--str写入子进程的标准输入stdin缓冲区--传入inputs数组
self.p.stdin.write(instr)
# 刷新stdin缓冲区, 即将缓冲区中的数据立刻写入
self.p.stdin.flush()
# 读取子进程的标准输出stdout里的内容--即outputs数组的值
line = self.p.stdout.readline()
# 刷新stdout缓冲区
self.p.stdout.flush()
# 从outputs数组中读取的结果--输出端口值
self.raw_res = str(line, encoding="utf-8").strip()
strs = self.raw_res.split(" ")
# 结果列表
res = [int(x) for x in strs]
self.results = res
# debug模式
if self.debug:
self.pprint()
# 计数器值加一
self.cnt += 1
return res
# 输出仿真相关值--输入端口值、输出端口值
def pprint(self):
print("")
# 仿真名 计数器值 IN 输入端口值
# 仿真名 计数器值 OUT 输出端口值
print(f"[{self.name}\t\t{self.cnt}]IN : {self.raw_in}")
print(f"[{self.name}\t\t{self.cnt}]OUT : {self.raw_res}")
def getRes(self):
return self.results
# 传入module_name和ports生成harness代码
def codegen(self, name):
tempfile = """#include "V{modname}.h"
#include "verilated.h"
#include "verilated_vcd_c.h"
#include <vector>
#include <iostream>
#include <cstdio>
vluint64_t main_time = 0; // See comments in first example
const vluint64_t sim_time = 1024;
double sc_time_stamp() {{ return main_time; }}
std::vector<unsigned long long> inputs, outputs;
#define INN {innum}
#define OUTN {outnum}
int status = 0;
void ioinit(){{
setvbuf(stdout,0,_IONBF, 0);
setvbuf(stdin,0,_IONBF, 0);
setvbuf(stderr,0,_IONBF, 0);
inputs.resize(INN);
outputs.resize(OUTN);
return;
}}
void input_handler(){{
std::cin>>status;
if(status==-1)
return;
for(int i = 0; i < INN; i++){{
std::cin>>inputs[i];
}}
return;
}}
void output_handler(){{
for(int i = 0; i < OUTN; i++){{
std::cout<<outputs[i]<<" ";
}}
std::cout<<std::endl;
return;
}}
int main(int argc, char** argv, char** env) {{
Verilated::commandArgs(argc, argv);
ioinit();
V{modname}* top = new V{modname};
Verilated::internalsDump(); // See scopes to help debug
Verilated::traceEverOn(true);
VerilatedVcdC* tfp = new VerilatedVcdC;
top->trace(tfp, 99);
tfp->open("wave.vcd");
while (!Verilated::gotFinish()) {{
input_handler();
if(status==-1) break;
//get inputs
{inputs_init}
top->eval();
tfp->dump(main_time);
{outputs_log}
//get output
output_handler();
main_time++;
}}
top->final();
tfp->close();
delete top;
return 0;
}}""".format(modname=name, innum=len(self.inputs), outnum=len(self.outputs), inputs_init=self.handle_inputs(),
outputs_log=self.handle_outputs())
return tempfile
# 处理input端口得到对应的harness代码 # 用inputs数组对输入端口进行赋值
def handle_inputs(self):
res = ""
taps = " "
i = 0
for n in self.inputs:
res += taps + f"top->{n} = inputs[{i}];\n" # 对输入端口进行赋值
i += 1
return res
# 处理output端口得到对应的harness代码 # 对输出端口进行取值放入outputs数组
def handle_outputs(self):
res = ""
taps = " "
i = 0
for n in self.outputs:
res += taps + f"outputs[{i}] = top->{n};\n" # 对输出端口进行取值
i += 1
return res

View File

@ -220,4 +220,13 @@ def compile_and_binding_all():
# compile the a shared_lib into build folder
lib_path = compile_lib(funclist, cwd=".build")
# generate SV binding
generate_sv_binding(funclist, filename=".sv/pkg/pysv_pkg.sv")
def compile_and_binding_all_func(funclist):
print("\n\n-----------------------pysv build info---------------------------\n")
# funclist = bboxs_list.values()
# compile the a shared_lib into build folder
lib_path = compile_lib(funclist, cwd=".build")
# generate SV binding
generate_sv_binding(funclist, filename=".sv/pkg/pysv_pkg.sv")