Resurrect pselect MainLoop implementation

Summary:
It turns out that even though ppoll is available on all the android
devices we support, it does not seem to be working properly on all of
them -- MainLoop just does a busy loop with ppoll returning EINTR and
not making any progress.

This brings back the pselect implementation and makes it available on
android. I could not do any cmake checks for this as the ppoll symbol is
actually avaiable -- it just does not work.

Reviewers: beanz, eugene

Subscribers: srhines, lldb-commits

Differential Revision: https://reviews.llvm.org/D32600

llvm-svn: 301636
This commit is contained in:
Pavel Labath 2017-04-28 10:26:06 +00:00
parent 45c423bcdc
commit 836ad03fa5
2 changed files with 208 additions and 93 deletions

View File

@ -93,6 +93,7 @@ private:
#endif
bool was_blocked : 1;
};
class RunImpl;
llvm::DenseMap<IOObject::WaitableHandle, Callback> m_read_fds;
llvm::DenseMap<int, SignalInfo> m_signals;

View File

@ -32,6 +32,10 @@
#define POLL poll
#endif
#ifdef __ANDROID__
#define FORCE_PSELECT
#endif
#if SIGNAL_POLLING_UNSUPPORTED
#ifdef LLVM_ON_WIN32
typedef int sigset_t;
@ -59,6 +63,189 @@ static void SignalHandler(int signo, siginfo_t *info, void *) {
g_signal_flags[signo] = 1;
}
class MainLoop::RunImpl {
public:
// TODO: Use llvm::Expected<T>
static std::unique_ptr<RunImpl> Create(MainLoop &loop, Error &error);
~RunImpl();
Error Poll();
template <typename F> void ForEachReadFD(F &&f);
template <typename F> void ForEachSignal(F &&f);
private:
MainLoop &loop;
#if HAVE_SYS_EVENT_H
int queue_id;
std::vector<struct kevent> in_events;
struct kevent out_events[4];
int num_events = -1;
RunImpl(MainLoop &loop, int queue_id) : loop(loop), queue_id(queue_id) {
in_events.reserve(loop.m_read_fds.size() + loop.m_signals.size());
}
#else
std::vector<int> signals;
#ifdef FORCE_PSELECT
fd_set read_fd_set;
#else
std::vector<struct pollfd> read_fds;
#endif
RunImpl(MainLoop &loop) : loop(loop) {
signals.reserve(loop.m_signals.size());
}
sigset_t get_sigmask();
#endif
};
#if HAVE_SYS_EVENT_H
MainLoop::RunImpl::~RunImpl() {
int r = close(queue_id);
assert(r == 0);
(void)r;
}
std::unique_ptr<MainLoop::RunImpl> MainLoop::RunImpl::Create(MainLoop &loop, Error &error)
{
error.Clear();
int queue_id = kqueue();
if(queue_id < 0) {
error = Error(errno, eErrorTypePOSIX);
return nullptr;
}
return std::unique_ptr<RunImpl>(new RunImpl(loop, queue_id));
}
Error MainLoop::RunImpl::Poll() {
in_events.resize(loop.m_read_fds.size() + loop.m_signals.size());
unsigned i = 0;
for (auto &fd : loop.m_read_fds)
EV_SET(&in_events[i++], fd.first, EVFILT_READ, EV_ADD, 0, 0, 0);
for (const auto &sig : loop.m_signals)
EV_SET(&in_events[i++], sig.first, EVFILT_SIGNAL, EV_ADD, 0, 0, 0);
num_events = kevent(queue_id, in_events.data(), in_events.size(), out_events,
llvm::array_lengthof(out_events), nullptr);
if (num_events < 0)
return Error("kevent() failed with error %d\n", num_events);
return Error();
}
template <typename F> void MainLoop::RunImpl::ForEachReadFD(F &&f) {
assert(num_events >= 0);
for (int i = 0; i < num_events; ++i) {
f(out_events[i].ident);
if (loop.m_terminate_request)
return;
}
}
template <typename F> void MainLoop::RunImpl::ForEachSignal(F && f) {}
#else
MainLoop::RunImpl::~RunImpl() {}
std::unique_ptr<MainLoop::RunImpl> MainLoop::RunImpl::Create(MainLoop &loop, Error &error)
{
error.Clear();
return std::unique_ptr<RunImpl>(new RunImpl(loop));
}
sigset_t MainLoop::RunImpl::get_sigmask() {
#if SIGNAL_POLLING_UNSUPPORTED
return 0;
#else
sigset_t sigmask;
int ret = pthread_sigmask(SIG_SETMASK, nullptr, &sigmask);
assert(ret == 0);
(void) ret;
for (const auto &sig : loop.m_signals) {
signals.push_back(sig.first);
sigdelset(&sigmask, sig.first);
}
return sigmask;
#endif
}
#ifdef FORCE_PSELECT
Error MainLoop::RunImpl::Poll() {
signals.clear();
FD_ZERO(&read_fd_set);
int nfds = 0;
for (const auto &fd : loop.m_read_fds) {
FD_SET(fd.first, &read_fd_set);
nfds = std::max(nfds, fd.first + 1);
}
sigset_t sigmask = get_sigmask();
if (pselect(nfds, &read_fd_set, nullptr, nullptr, nullptr, &sigmask) == -1 &&
errno != EINTR)
return Error(errno, eErrorTypePOSIX);
return Error();
}
template <typename F> void MainLoop::RunImpl::ForEachReadFD(F &&f) {
for (const auto &fd : loop.m_read_fds) {
if(!FD_ISSET(fd.first, &read_fd_set))
continue;
f(fd.first);
if (loop.m_terminate_request)
return;
}
}
#else
Error MainLoop::RunImpl::Poll() {
signals.clear();
read_fds.clear();
sigset_t sigmask = get_sigmask();
for (const auto &fd : loop.m_read_fds) {
struct pollfd pfd;
pfd.fd = fd.first;
pfd.events = POLLIN;
pfd.revents = 0;
read_fds.push_back(pfd);
}
if (ppoll(read_fds.data(), read_fds.size(), nullptr, &sigmask) == -1 &&
errno != EINTR)
return Error(errno, eErrorTypePOSIX);
return Error();
}
template <typename F> void MainLoop::RunImpl::ForEachReadFD(F &&f) {
for (const auto &fd : read_fds) {
if ((fd.revents & POLLIN) == 0)
continue;
f(fd.fd);
if (loop.m_terminate_request)
return;
}
}
#endif
template <typename F> void MainLoop::RunImpl::ForEachSignal(F &&f) {
for (int sig : signals) {
if (g_signal_flags[sig] == 0)
continue; // No signal
g_signal_flags[sig] = 0;
f(sig);
if (loop.m_terminate_request)
return;
}
}
#endif
MainLoop::~MainLoop() {
assert(m_read_fds.size() == 0);
assert(m_signals.size() == 0);
@ -161,108 +348,35 @@ void MainLoop::UnregisterSignal(int signo) {
}
Error MainLoop::Run() {
std::vector<int> signals;
m_terminate_request = false;
signals.reserve(m_signals.size());
#if HAVE_SYS_EVENT_H
int queue_id = kqueue();
if (queue_id < 0)
Error("kqueue failed with error %d\n", queue_id);
std::vector<struct kevent> events;
events.reserve(m_read_fds.size() + m_signals.size());
#else
sigset_t sigmask;
std::vector<struct pollfd> read_fds;
read_fds.reserve(m_read_fds.size());
#endif
Error error;
auto impl = RunImpl::Create(*this, error);
if (!impl)
return error;
// run until termination or until we run out of things to listen to
while (!m_terminate_request && (!m_read_fds.empty() || !m_signals.empty())) {
// To avoid problems with callbacks changing the things we're supposed to
// listen to, we
// will store the *real* list of events separately.
signals.clear();
#if HAVE_SYS_EVENT_H
events.resize(m_read_fds.size() + m_signals.size());
int i = 0;
for (auto &fd: m_read_fds) {
EV_SET(&events[i++], fd.first, EVFILT_READ, EV_ADD, 0, 0, 0);
}
for (const auto &sig : m_signals) {
signals.push_back(sig.first);
EV_SET(&events[i++], sig.first, EVFILT_SIGNAL, EV_ADD, 0, 0, 0);
}
struct kevent event_list[4];
int num_events =
kevent(queue_id, events.data(), events.size(), event_list, 4, NULL);
if (num_events < 0)
return Error("kevent() failed with error %d\n", num_events);
#else
read_fds.clear();
#if !SIGNAL_POLLING_UNSUPPORTED
if (int ret = pthread_sigmask(SIG_SETMASK, nullptr, &sigmask))
return Error("pthread_sigmask failed with error %d\n", ret);
for (const auto &sig : m_signals) {
signals.push_back(sig.first);
sigdelset(&sigmask, sig.first);
}
#endif
for (const auto &fd : m_read_fds) {
struct pollfd pfd;
pfd.fd = fd.first;
pfd.events = POLLIN;
pfd.revents = 0;
read_fds.push_back(pfd);
}
if (ppoll(read_fds.data(), read_fds.size(), nullptr, &sigmask) == -1 &&
errno != EINTR)
return Error(errno, eErrorTypePOSIX);
#endif
for (int sig : signals) {
if (g_signal_flags[sig] == 0)
continue; // No signal
g_signal_flags[sig] = 0;
error = impl->Poll();
if (error.Fail())
return error;
impl->ForEachSignal([&](int sig) {
auto it = m_signals.find(sig);
if (it == m_signals.end())
continue; // Signal must have gotten unregistered in the meantime
if (it != m_signals.end())
it->second.callback(*this); // Do the work
});
if (m_terminate_request)
return Error();
it->second.callback(*this); // Do the work
if (m_terminate_request)
return Error();
}
#if HAVE_SYS_EVENT_H
for (int i = 0; i < num_events; ++i) {
auto it = m_read_fds.find(event_list[i].ident);
#else
for (auto fd : read_fds) {
if ((fd.revents & POLLIN) == 0)
continue;
auto it = m_read_fds.find(fd.fd);
#endif
if (it == m_read_fds.end())
continue; // File descriptor must have gotten unregistered in the
// meantime
it->second(*this); // Do the work
if (m_terminate_request)
return Error();
}
impl->ForEachReadFD([&](int fd) {
auto it = m_read_fds.find(fd);
if (it != m_read_fds.end())
it->second(*this); // Do the work
});
if (m_terminate_request)
return Error();
}
return Error();
}