[ESI] Convert cosim integration test "lit format" to tool (#445)

* [ESI] Convert cosim integration test to tool

* Point to the lib when launching cosim

* Fix copy-paste error

* Switch to using new DPI capability of circt-rtl-sim
This commit is contained in:
John Demme 2021-01-13 15:00:27 -08:00 committed by GitHub
parent 3aa99e113a
commit 34b497f9b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 270 additions and 260 deletions

View File

@ -3,6 +3,7 @@ set(CIRCT_INTEGRATION_TEST_DEPENDS
circt-opt
circt-translate
circt-rtl-sim
esi-cosim-runner
firtool)
# If ESI Cosim is available to build then enable its tests.

View File

@ -1,247 +0,0 @@
import lit.Test as Test
import lit.formats
import os
import re
import signal
import socket
import subprocess
import sys
import time
class CosimTest(lit.formats.FileBasedTest):
"""A lit test format (adapter) for running cosimulation-based tests."""
def execute(self, test, litConfig):
# All cosim tests require esi-cosim to have been built.
if 'esi-cosim' not in test.config.available_features:
return Test.Result(Test.UNSUPPORTED, "ESI-Cosim feature not present")
# This format is specialized to Verilator at the moment.
if 'verilator' not in test.config.available_features:
return Test.Result(Test.UNSUPPORTED, "Verilator not present")
# Parse, compile, and run the test.
testRun = CosimTestRunner(test, litConfig)
parsed = testRun.parse()
if parsed.code != Test.PASS:
return parsed
compiled = testRun.compile()
if compiled.code != Test.PASS:
return compiled
return testRun.run()
class CosimTestRunner:
"""The main class responsible for running a cosim test. We use a separate
class to allow for per-test mutable state variables."""
def __init__(self, test, litConfig):
self.test = test
self.litConfig = litConfig
self.file = test.getSourcePath()
self.srcdir = os.path.dirname(self.file)
self.execdir = test.getExecPath()
self.runs = list()
self.sources = list()
self.cppSources = list()
self.top = "top"
def parse(self):
"""Parse a test file. Look for comments we recognize anywhere in the
file."""
fileReader = open(self.file, "r")
for line in fileReader:
# TOP is the top module.
if m := re.match(r"^//\s*TOP:(.*)$", line):
self.top = m.group(1)
# FILES are the additional RTL files (if any). If specified, must
# include the current file. These files are either absolute or
# relative to the current file.
if m := re.match(r"^//\s*FILES:(.*)$", line):
self.sources.extend(m.group(1).split())
# C++ driver files to feed to Verilator. Defaults to the simple
# driver.cpp. Same path rules as FILES.
if m := re.match(r"^//\s*CPP:(.*)$", line):
self.cppSources.extend(m.group(1).split())
# Run this Python line.
if m := re.match(r"^//\s*PY:(.*)$", line):
self.runs.append(m.group(1).strip())
fileReader.close()
return Test.Result(Test.PASS, "")
def compile(self):
"""Compile the simulation with Verilator. Let Verilator do the whole
thing and produce a binary which 'just works'. This is sufficient for
simple-ish C++ drivers."""
# Assemble a list of sources (RTL and C++), applying the defaults and
# path rules.
cfg = self.test.config
if len(self.sources) == 0:
sources = [self.file]
else:
sources = [(src if os.path.isabs(src) else os.path.join(
self.srcdir, src)) for src in self.sources]
if len(self.cppSources) == 0:
cppSources = [os.path.join(cfg.circt_tools_dir, "driver.cpp")]
else:
cppSources = [(src if os.path.isabs(src) else os.path.join(
self.srcdir, src)) for src in self.cppSources]
# Include the cosim DPI SystemVerilog files.
cosimInclude = os.path.join(
cfg.circt_include_dir, "circt", "Dialect", "ESI", "cosim")
sources.insert(0, os.path.join(cosimInclude, "Cosim_DpiPkg.sv"))
sources.insert(1, os.path.join(cosimInclude, "Cosim_Endpoint.sv"))
# Format the list of sources.
sources = " ".join(sources + cppSources)
os.makedirs(self.execdir, exist_ok=True)
# Run verilator to produce an executable. Requires a working Verilator
# install in the PATH.
vrun = subprocess.run(
f"{cfg.verilator_path} --cc --top-module {self.top} --build --exe {sources} {cfg.esi_cosim_path} -LDFLAGS '-Wl,-rpath={cfg.circt_shlib_dir}'".split(),
capture_output=True,
text=True,
cwd=self.execdir)
output = vrun.stdout + "\n----- STDERR ------\n" + vrun.stderr
return Test.Result(Test.PASS if vrun.returncode == 0 else Test.FAIL, output)
def run(self):
"""Run the test by creating a Python script, starting the simulation,
running the Python script, then stopping the simulation.
Not perfect since we don't know when the cosim RPC server in the
simulation has started accepting connections."""
# Find available port.
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('', 0))
port = sock.getsockname()[1]
sock.close()
with open(os.path.join(self.execdir, "script.py"), "w") as script:
# Include a bunch of config variables at the beginning of the
# script for use by the test code.
cfg = self.test.config
allDicts = list(self.test.config.__dict__.items()) + \
list(self.litConfig.__dict__.items())
vars = dict([(key, value)
for (key, value) in allDicts if isinstance(value, str)])
vars["execdir"] = self.execdir
vars["srcdir"] = self.srcdir
vars["srcfile"] = self.file
# 'rpcSchemaPath' points to the CapnProto schema for RPC and is the
# one that nearly all scripts are going to need.
vars["rpcschemapath"] = os.path.join(
cfg.circt_include_dir, "circt", "Dialect", "ESI", "cosim",
"CosimDpi.capnp")
script.writelines(f"{name} = \"{value}\"\n" for (
name, value) in vars.items())
script.write("\n\n")
# Add the test files directory and this files directory to the
# pythonpath.
script.write(f"import os\n")
script.write(f"import sys\n")
script.write(
f"sys.path.append(\"{os.path.dirname(self.file)}\")\n")
script.write(
f"sys.path.append(\"{os.path.dirname(__file__)}\")\n")
script.write("\n\n")
script.write(
"simhostport = f'{os.uname()[1]}:" + str(port) + "'\n")
# Run the lines specified in the test file.
script.writelines(
f"{x}\n" for x in self.runs)
timedout = False
try:
# Run the simulation.
simStdout = open(os.path.join(self.execdir, "sim_stdout.log"), "w")
simStderr = open(os.path.join(self.execdir, "sim_stderr.log"), "w")
simEnv = os.environ.copy()
simEnv["COSIM_PORT"] = str(port)
simProc = subprocess.Popen(
[f"{self.execdir}/obj_dir/V{self.top}"],
stdout=simStdout, stderr=simStderr, cwd=self.execdir,
env=simEnv)
# Wait a set amount of time for the simulation to start accepting
# RPC connections.
# TODO: Check if the server is up by polling.
time.sleep(0.05)
# Run the test script.
testStdout = open(os.path.join(
self.execdir, "test_stdout.log"), "w")
testStderr = open(os.path.join(
self.execdir, "test_stderr.log"), "w")
timeout = None
if self.litConfig.maxIndividualTestTime > 0:
timeout = self.litConfig.maxIndividualTestTime
# Pycapnp complains if the PWD environment var doesn't match the
# actual CWD.
testEnv = os.environ.copy()
testEnv["PWD"] = self.execdir
testProc = subprocess.run([sys.executable, "-u", "script.py"],
stdout=testStdout, stderr=testStderr,
timeout=timeout, cwd=self.execdir,
env=testEnv)
except subprocess.TimeoutExpired:
timedout = True
finally:
# Make sure to stop the simulation no matter what.
if simProc:
simProc.send_signal(signal.SIGINT)
# Allow the simulation time to flush its outputs.
try:
simProc.wait(timeout=1.0)
except subprocess.TimeoutExpired:
simProc.kill()
simStderr.close()
simStdout.close()
testStdout.close()
testStderr.close()
# Read the output log files and return the proper result.
err, logs = self.readLogs()
if timedout:
result = Test.TIMEOUT
else:
logs += f"---- Test process exit code: {testProc.returncode}\n"
result = Test.PASS if testProc.returncode == 0 and not err else Test.FAIL
return Test.Result(result, logs)
def readLogs(self):
"""Read the log files from the simulation and the test script. Only
add the stderr logs if they contain something. Also return a flag
indicating that one of the stderr logs has content."""
foundErr = False
ret = "----- Simulation stdout -----\n"
with open(os.path.join(self.execdir, "sim_stdout.log")) as f:
ret += f.read()
with open(os.path.join(self.execdir, "sim_stderr.log")) as f:
stderr = f.read()
if stderr != "":
ret += "\n----- Simulation stderr -----\n"
ret += stderr
foundErr = True
ret += "\n----- Test stdout -----\n"
with open(os.path.join(self.execdir, "test_stdout.log")) as f:
ret += f.read()
with open(os.path.join(self.execdir, "test_stderr.log")) as f:
stderr = f.read()
if stderr != "":
ret += "\n----- Test stderr -----\n"
ret += stderr
foundErr = True
return (foundErr, ret)

View File

@ -1,12 +0,0 @@
import os
import site
try:
# If capnp isn't available, cosim tests aren't supported.
import capnp
site.addsitedir(os.path.dirname(__file__))
import formats
config.test_format = formats.CosimTest()
except ImportError:
config.unsupported = True

View File

@ -1,3 +1,5 @@
// REQUIRES: esi-cosim
// RUN: esi-cosim-runner.py %s
// PY: import loopback as test
// PY: rpc = test.LoopbackTester(rpcschemapath, simhostport)
// PY: rpc.test_list()

View File

@ -65,7 +65,8 @@ tools = [
'circt-opt',
'circt-translate',
'firtool',
'circt-rtl-sim.py'
'circt-rtl-sim.py',
'esi-cosim-runner.py'
]
# Enable yosys if it has been detected.

View File

@ -2,6 +2,7 @@
add_subdirectory(circt-opt)
add_subdirectory(circt-rtl-sim)
add_subdirectory(circt-translate)
add_subdirectory(esi)
add_subdirectory(handshake-runner)
add_subdirectory(firtool)
add_subdirectory(llhd-sim)

23
tools/esi/CMakeLists.txt Normal file
View File

@ -0,0 +1,23 @@
# ===- CMakeLists.txt - Simulation driver cmake ---------------*- cmake -*-===//
#
# Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
# See https://llvm.org/LICENSE.txt for license information.
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
#
# ===-----------------------------------------------------------------------===//
#
# Configure and copy a script to run ESI cosimulation tests.
#
# ===-----------------------------------------------------------------------===//
if (TARGET EsiCosimDpiServer)
list(APPEND CIRCT_INTEGRATION_TEST_DEPENDS EsiCosimDpiServer)
get_property(ESI_COSIM_LIB_DIR TARGET EsiCosimDpiServer PROPERTY LIBRARY_OUTPUT_DIRECTORY)
set(ESI_COSIM_PATH ${ESI_COSIM_LIB_DIR}/libEsiCosimDpiServer.so)
endif()
set(SOURCES esi-cosim-runner.py)
foreach(file IN ITEMS ${SOURCES})
configure_file(${file}.in ${CIRCT_TOOLS_DIR}/${file})
endforeach()
add_custom_target(esi-cosim-runner SOURCES ${SOURCES})

241
tools/esi/esi-cosim-runner.py.in Executable file
View File

@ -0,0 +1,241 @@
#!/usr/bin/env python3
# ===- circt-rtl-sim.py - CIRCT simulation driver -------------*- python -*-===//
#
# Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
# See https://llvm.org/LICENSE.txt for license information.
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
#
# ===-----------------------------------------------------------------------===//
#
# Script to drive CIRCT cosimulation tests.
#
# ===-----------------------------------------------------------------------===//
import argparse
import os
import re
import signal
import socket
import subprocess
import sys
import time
ThisFileDir = os.path.dirname(__file__)
class CosimTestRunner:
"""The main class responsible for running a cosim test. We use a separate
class to allow for per-test mutable state variables."""
def __init__(self, testFile, addlArgs):
"""Parse a test file. Look for comments we recognize anywhere in the
file. Assemble a list of sources."""
self.args = addlArgs
self.file = testFile
self.runs = list()
self.srcdir = os.path.dirname(self.file)
self.sources = list()
self.top = "top"
if "@ESI_COSIM_PATH@" == "" or not os.path.exists("@ESI_COSIM_PATH@"):
raise Exception("The ESI cosimulation DPI library must be enabled " +
"to run cosim tests.")
self.simRunScript = os.path.join(
"@CIRCT_TOOLS_DIR@", "circt-rtl-sim.py")
fileReader = open(self.file, "r")
sources = []
for line in fileReader:
# Arguments to circt-rtl-sim, except for source files list
if m := re.match(r"^//\s*ARGS:(.*)$", line):
self.args.extend(m.group(1).split())
# SOURCES are the additional source files (if any). If specified,
# must include the current file. These files are either absolute or
# relative to the current file.
if m := re.match(r"^//\s*SOURCES:(.*)$", line):
sources.extend(m.group(1).split())
# Run this Python line.
if m := re.match(r"^//\s*PY:(.*)$", line):
self.runs.append(m.group(1).strip())
fileReader.close()
if len(sources) == 0:
self.sources = [self.file]
else:
self.sources = [(src if os.path.isabs(src) else os.path.join(
self.srcdir, src)) for src in self.sources]
# Include the cosim DPI SystemVerilog files.
cosimInclude = os.path.join(
"@CIRCT_MAIN_INCLUDE_DIR@", "circt", "Dialect", "ESI", "cosim")
self.sources.insert(0, os.path.join(cosimInclude, "Cosim_DpiPkg.sv"))
self.sources.insert(1, os.path.join(cosimInclude, "Cosim_Endpoint.sv"))
self.sources.append("@ESI_COSIM_PATH@")
def compile(self):
"""Compile with circt-rtl-sim.py"""
# Run the simulation compilation step. Requires a simulator to be
# installed and working.
# os.makedirs(self.execdir, exist_ok=True)
cmd = [self.simRunScript, "--no-run"] + self.args + self.sources
print("Running: " + " ".join(cmd))
vrun = subprocess.run(
cmd,
capture_output=True,
text=True)
# cwd=self.execdir)
output = vrun.stdout + "\n----- STDERR ------\n" + vrun.stderr
if vrun.returncode != 0:
print("====== Compilation failure:")
print(output)
return vrun.returncode
def run(self):
"""Run the test by creating a Python script, starting the simulation,
running the Python script, then stopping the simulation. Use
circt-rtl-sim.py to run the sim.
Not perfect since we don't know when the cosim RPC server in the
simulation has started accepting connections."""
# Find available port.
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('', 0))
port = sock.getsockname()[1]
sock.close()
with open("script.py", "w") as script:
# Include a bunch of config variables at the beginning of the
# script for use by the test code.
vars = {
"srcdir": self.srcdir,
"srcfile": self.file,
# 'rpcSchemaPath' points to the CapnProto schema for RPC and is
# the one that nearly all scripts are going to need.
"rpcschemapath": os.path.join(
"@CIRCT_MAIN_INCLUDE_DIR@", "circt", "Dialect", "ESI",
"cosim", "CosimDpi.capnp")
}
script.writelines(f"{name} = \"{value}\"\n" for (
name, value) in vars.items())
script.write("\n\n")
# Add the test files directory and this files directory to the
# pythonpath.
script.write(f"import os\n")
script.write(f"import sys\n")
script.write(
f"sys.path.append(\"{os.path.dirname(self.file)}\")\n")
script.write(
f"sys.path.append(\"{os.path.dirname(__file__)}\")\n")
script.write("\n\n")
script.write(
"simhostport = f'{os.uname()[1]}:" + str(port) + "'\n")
# Run the lines specified in the test file.
script.writelines(
f"{x}\n" for x in self.runs)
try:
# Run the simulation.
simStdout = open("sim_stdout.log", "w")
simStderr = open("sim_stderr.log", "w")
simEnv = os.environ.copy()
simEnv["COSIM_PORT"] = str(port)
simProc = subprocess.Popen(
[self.simRunScript] + self.args + self.sources,
stdout=simStdout, stderr=simStderr, env=simEnv, preexec_fn=os.setsid)
simStderr.close()
simStdout.close()
# Wait a set amount of time for the simulation to start accepting
# RPC connections.
# TODO: Check if the server is up by polling.
time.sleep(0.25)
# Run the test script.
testStdout = open("test_stdout.log", "w")
testStderr = open("test_stderr.log", "w")
testProc = subprocess.run([sys.executable, "-u", "script.py"],
stdout=testStdout, stderr=testStderr)
testStdout.close()
testStderr.close()
finally:
# Make sure to stop the simulation no matter what.
if simProc:
os.killpg(os.getpgid(simProc.pid), signal.SIGINT)
# simProc.send_signal(signal.SIGINT)
# Allow the simulation time to flush its outputs.
try:
simProc.wait(timeout=1.0)
except subprocess.TimeoutExpired:
simProc.kill()
# Read the output log files and return the proper result.
err, logs = self.readLogs()
logs += f"---- Test process exit code: {testProc.returncode}\n"
passed = testProc.returncode == 0 and not err
if not passed:
print(logs)
return 0 if passed else 1
def readLogs(self):
"""Read the log files from the simulation and the test script. Only add
the stderr logs if they contain something. Also return a flag
indicating that one of the stderr logs has content."""
foundErr = False
ret = "----- Simulation stdout -----\n"
with open("sim_stdout.log") as f:
ret += f.read()
with open("sim_stderr.log") as f:
stderr = f.read()
if stderr != "":
ret += "\n----- Simulation stderr -----\n"
ret += stderr
foundErr = True
ret += "\n----- Test stdout -----\n"
with open("test_stdout.log") as f:
ret += f.read()
with open("test_stderr.log") as f:
stderr = f.read()
if stderr != "":
ret += "\n----- Test stderr -----\n"
ret += stderr
foundErr = True
return (foundErr, ret)
def __main__(args):
argparser = argparse.ArgumentParser(
description="RTL cosimulation runner for ESI")
argparser.add_argument("source",
help="The source run spec file")
argparser.add_argument("addlArgs", nargs=argparse.REMAINDER,
help="Additional arguments to pass through to " +
"'circt-rtl-sim.py'")
if len(args) <= 1:
argparser.print_help()
return
args = argparser.parse_args(args[1:])
runner = CosimTestRunner(args.source, args.addlArgs)
rc = runner.compile()
if rc != 0:
return rc
return runner.run()
if __name__ == '__main__':
sys.exit(__main__(sys.argv))