[ESI][Cosim][NFC] Refactor cosim to divorce capnp from DPI (#7045)

Move generic functionality into Capnp library. This divorces it from the
DPI server, allowing other things to use it. Additionally, divorce
server from general functionality to make adding a client thread easier.
This commit is contained in:
John Demme 2024-05-15 18:18:24 -07:00 committed by GitHub
parent 6e028d64bd
commit b64af242f2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 92 additions and 51 deletions

View File

@ -18,6 +18,10 @@ add_library(EsiCosimCapnp SHARED
${COSIM_CAPNP_HDRS}
${COSIM_CAPNP_SRCS}
${COSIM_SCHEMA_HDR}
lib/CapnpThreads.cpp
lib/Endpoint.cpp
lib/Server.cpp
)
target_include_directories(EsiCosimCapnp PUBLIC ${CAPNPC_OUTPUT_DIR})
target_include_directories(EsiCosimCapnp PUBLIC ${CAPNP_INCLUDE_DIRS})

View File

@ -6,8 +6,6 @@
add_library(EsiCosimDpiServer SHARED
DpiEntryPoints.cpp
Server.cpp
Endpoint.cpp
)
set_target_properties(EsiCosimDpiServer
PROPERTIES

View File

@ -16,8 +16,8 @@
//
//===----------------------------------------------------------------------===//
#include "cosim/Server.h"
#include "cosim/dpi.h"
#include "cosim/CapnpThreads.h"
#include "dpi.h"
#include <algorithm>
#include <cassert>

View File

@ -1,4 +1,4 @@
//===- Server.h - ESI cosim RPC servers -------------------------*- C++ -*-===//
//===- CapnpThreads.h - ESI cosim RPC ---------------------------*- C++ -*-===//
//
// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
// See https://llvm.org/LICENSE.txt for license information.
@ -20,24 +20,46 @@
#include "cosim/LowLevel.h"
#include <thread>
namespace kj {
class WaitScope;
} // namespace kj
namespace esi {
namespace cosim {
/// Since Capnp is not thread-safe, client and server must be run in their own
/// threads and communicate with the outside world through thread safe channels.
class CapnpCosimThread {
public:
EndpointRegistry endpoints;
LowLevel lowLevelBridge;
CapnpCosimThread();
~CapnpCosimThread();
/// Stop the thread. This is a blocking call -- it will not return until the
/// capnp thread has stopped.
void stop();
protected:
/// Start capnp polling loop. Does not return until stop() is called. Must be
/// called in the same thread the RPC server/client was created.
void loop(kj::WaitScope &waitScope);
using Lock = std::lock_guard<std::mutex>;
std::thread *myThread;
volatile bool stopSig;
std::mutex m;
};
/// The main RpcServer. Does not implement any capnp RPC interfaces but contains
/// the capnp main RPC server. We run the capnp server in its own thread to be
/// more responsive to network traffic and so as to not slow down the
/// simulation.
class RpcServer {
class RpcServer : public CapnpCosimThread {
public:
EndpointRegistry endpoints;
LowLevel lowLevelBridge;
RpcServer();
~RpcServer();
/// Start and stop the server thread.
void run(uint16_t port);
void stop();
void setManifest(unsigned int esiVersion,
const std::vector<uint8_t> &manifest) {
@ -46,15 +68,9 @@ public:
}
private:
using Lock = std::lock_guard<std::mutex>;
/// The thread's main loop function. Exits on shutdown.
void mainLoop(uint16_t port);
std::thread *mainThread;
volatile bool stopSig;
std::mutex m;
unsigned int esiVersion = -1;
std::vector<uint8_t> compressedManifest;
};

View File

@ -0,0 +1,51 @@
//===- CapnpThreads.cpp - Cosim RPC common code -----------------*- C++ -*-===//
//
// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
// See https://llvm.org/LICENSE.txt for license information.
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
//
//===----------------------------------------------------------------------===//
#include "cosim/CapnpThreads.h"
#include "CosimDpi.capnp.h"
#include <capnp/ez-rpc.h>
#include <thread>
#ifdef _WIN32
#include <io.h>
#else
#include <unistd.h>
#endif
using namespace capnp;
using namespace esi::cosim;
CapnpCosimThread::CapnpCosimThread() : myThread(nullptr), stopSig(false) {}
CapnpCosimThread::~CapnpCosimThread() { stop(); }
void CapnpCosimThread::loop(kj::WaitScope &waitScope) {
// OK, this is uber hacky, but it unblocks me and isn't _too_ inefficient. The
// problem is that I can't figure out how read the stop signal from libkj
// asyncrony land.
//
// IIRC the main libkj wait loop uses `select()` (or something similar on
// Windows) on its FDs. As a result, any code which checks the stop variable
// doesn't run until there is some I/O. Probably the right way is to set up a
// pipe to deliver a shutdown signal.
//
// TODO: Figure out how to do this properly, if possible.
while (!stopSig) {
waitScope.poll();
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
/// Signal the RPC server thread to stop. Wait for it to exit.
void CapnpCosimThread::stop() {
Lock g(m);
if (myThread == nullptr) {
fprintf(stderr, "CapnpCosimThread not Run()\n");
} else if (!stopSig) {
stopSig = true;
myThread->join();
}
}

View File

@ -17,8 +17,8 @@
//
//===----------------------------------------------------------------------===//
#include "cosim/Server.h"
#include "CosimDpi.capnp.h"
#include "cosim/CapnpThreads.h"
#include <capnp/ez-rpc.h>
#include <thread>
#ifdef _WIN32
@ -242,9 +242,6 @@ kj::Promise<void> CosimServer::openLowLevel(OpenLowLevelContext ctxt) {
/// ----- RpcServer definitions.
RpcServer::RpcServer() : mainThread(nullptr), stopSig(false) {}
RpcServer::~RpcServer() { stop(); }
/// Write the port number to a file. Necessary when we allow 'EzRpcServer' to
/// select its own port. We can't use stdout/stderr because the flushing
/// semantics are undefined (as in `flush()` doesn't work on all simulators).
@ -268,40 +265,15 @@ void RpcServer::mainLoop(uint16_t port) {
}
writePort(port);
printf("[COSIM] Listening on port: %u\n", (unsigned int)port);
// OK, this is uber hacky, but it unblocks me and isn't _too_ inefficient. The
// problem is that I can't figure out how read the stop signal from libkj
// asyncrony land.
//
// IIRC the main libkj wait loop uses `select()` (or something similar on
// Windows) on its FDs. As a result, any code which checks the stop variable
// doesn't run until there is some I/O. Probably the right way is to set up a
// pipe to deliver a shutdown signal.
//
// TODO: Figure out how to do this properly, if possible.
while (!stopSig) {
waitScope.poll();
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
loop(waitScope);
}
/// Start the server if not already started.
void RpcServer::run(uint16_t port) {
Lock g(m);
if (mainThread == nullptr) {
mainThread = new std::thread(&RpcServer::mainLoop, this, port);
if (myThread == nullptr) {
myThread = new std::thread(&RpcServer::mainLoop, this, port);
} else {
fprintf(stderr, "Warning: cannot Run() RPC server more than once!");
}
}
/// Signal the RPC server thread to stop. Wait for it to exit.
void RpcServer::stop() {
Lock g(m);
if (mainThread == nullptr) {
fprintf(stderr, "RpcServer not Run()\n");
} else if (!stopSig) {
stopSig = true;
mainThread->join();
}
}