diff --git a/eshi/README.md b/eshi/README.md index 59d4c7e..66360d3 100644 --- a/eshi/README.md +++ b/eshi/README.md @@ -38,6 +38,7 @@ evl_open_mutex evl_open_event evl_open_flags evl_open_sem +The observable API. == VARIATION(S) diff --git a/eshi/poll.c b/eshi/poll.c index 1538be5..82c98c6 100644 --- a/eshi/poll.c +++ b/eshi/poll.c @@ -16,7 +16,8 @@ int evl_new_poll(void) return epoll_create1(EPOLL_CLOEXEC); } -int evl_add_pollfd(int efd, int newfd, unsigned int events) +int evl_add_pollfd(int efd, int newfd, unsigned int events, + union evl_value pollval) { struct epoll_event ev; int ret; @@ -45,7 +46,8 @@ int evl_del_pollfd(int efd, int delfd) return 0; } -int evl_mod_pollfd(int efd, int modfd, unsigned int events) +int evl_mod_pollfd(int efd, int modfd, unsigned int events, + union evl_value pollval) { struct epoll_event ev; int ret; diff --git a/include/eshi/evl/poll.h b/include/eshi/evl/poll.h index 81b683a..99ef8f0 100644 --- a/include/eshi/evl/poll.h +++ b/include/eshi/evl/poll.h @@ -14,6 +14,7 @@ struct evl_poll_event { __u32 fd; __u32 events; + union evl_value pollval; }; #ifdef __cplusplus @@ -23,12 +24,14 @@ extern "C" { int evl_new_poll(void); int evl_add_pollfd(int efd, int newfd, - unsigned int events); + unsigned int events, + union evl_value pollval); int evl_del_pollfd(int efd, int delfd); int evl_mod_pollfd(int efd, int modfd, - unsigned int events); + unsigned int events, + union evl_value pollval); int evl_timedpoll(int efd, struct evl_poll_event *pollset, int nrset, struct timespec *timeout); diff --git a/include/eshi/evl/uapi.h b/include/eshi/evl/uapi.h index ee4080a..e758724 100644 --- a/include/eshi/evl/uapi.h +++ b/include/eshi/evl/uapi.h @@ -7,6 +7,8 @@ #ifndef _EVL_ESHI_UAPI_H #define _EVL_ESHI_UAPI_H +#include + #define EVL_CLOCK_MONOTONIC_DEV "monotonic" #define EVL_CLOCK_REALTIME_DEV "realtime" #define EVL_CLOCK_DEV "clock" @@ -15,5 +17,14 @@ #define EVL_PROXY_DEV "proxy" #define EVL_THREAD_DEV "thread" #define EVL_XBUF_DEV "xbuf" +#define EVL_OBSERVABLE_DEV "observable" + +union evl_value { + int32_t val; + int64_t lval; + void *ptr; +}; + +#define evl_nil ((union evl_value){ .lval = 0 }) #endif /* _EVL_ESHI_UAPI_H */ diff --git a/include/evl/evl.h b/include/evl/evl.h index 511307f..b7ae6a1 100644 --- a/include/evl/evl.h +++ b/include/evl/evl.h @@ -20,9 +20,9 @@ #include #include -#define __EVL__ 15 /* API version */ +#define __EVL__ 16 /* API version */ -#define EVL_ABI_PREREQ 21 +#define EVL_ABI_PREREQ 23 struct evl_version { int api_level; /* libevl.so: __EVL__ */ diff --git a/include/evl/observable.h b/include/evl/observable.h new file mode 100644 index 0000000..d7cbbe4 --- /dev/null +++ b/include/evl/observable.h @@ -0,0 +1,43 @@ +/* + * SPDX-License-Identifier: MIT + * + * Copyright (C) 2020 Philippe Gerum + */ + +#ifndef _EVL_OBSERVABLE_H +#define _EVL_OBSERVABLE_H + +#include +#include +#include +#include +#include + +struct evl_notification { + uint32_t tag; + uint32_t serial; + int32_t issuer; + union evl_value event; + struct timespec date; +}; + +#define evl_new_observable(__fmt, __args...) \ + evl_create_observable(EVL_CLONE_PRIVATE, __fmt, ##__args) + +#ifdef __cplusplus +extern "C" { +#endif + +int evl_create_observable(int flags, const char *fmt, ...); + +int evl_update_observable(int ofd, const struct evl_notice *ntc, + int nr); + +int evl_read_observable(int ofd, struct evl_notification *nf, + int nr); + +#ifdef __cplusplus +} +#endif + +#endif /* _EVL_OBSERVABLE_H */ diff --git a/include/evl/poll.h b/include/evl/poll.h index 9566819..ca2ecdc 100644 --- a/include/evl/poll.h +++ b/include/evl/poll.h @@ -19,12 +19,14 @@ extern "C" { int evl_new_poll(void); int evl_add_pollfd(int efd, int newfd, - unsigned int events); + unsigned int events, + union evl_value pollval); int evl_del_pollfd(int efd, int delfd); int evl_mod_pollfd(int efd, int modfd, - unsigned int events); + unsigned int events, + union evl_value pollval); int evl_timedpoll(int efd, struct evl_poll_event *pollset, int nrset, const struct timespec *timeout); diff --git a/include/evl/thread.h b/include/evl/thread.h index 98c0bbb..ac9ff39 100644 --- a/include/evl/thread.h +++ b/include/evl/thread.h @@ -76,6 +76,12 @@ int evl_set_thread_mode(int efd, int mask, int evl_clear_thread_mode(int efd, int mask, int *oldmask); +int evl_subscribe(int ofd, + unsigned int backlog_count, + int flags); + +int evl_unsubscribe(int ofd); + #ifdef __cplusplus } #endif diff --git a/lib/event.c b/lib/event.c index 39315c7..3b76a21 100644 --- a/lib/event.c +++ b/lib/event.c @@ -48,8 +48,7 @@ static int init_event_vargs(struct evl_event *evt, attrs.protocol = EVL_EVENT_GATED; attrs.clockfd = clockfd; attrs.initval = 0; - efd = create_evl_element(EVL_MONITOR_DEV, name, &attrs, - flags & EVL_CLONE_MASK, &eids); + efd = create_evl_element(EVL_MONITOR_DEV, name, &attrs, flags, &eids); free(name); if (efd < 0) return efd; diff --git a/lib/flags.c b/lib/flags.c index ee7eb5c..92e03c9 100644 --- a/lib/flags.c +++ b/lib/flags.c @@ -49,8 +49,7 @@ int evl_create_flags(struct evl_flags *flg, int clockfd, attrs.protocol = EVL_EVENT_MASK; attrs.clockfd = clockfd; attrs.initval = initval; - efd = create_evl_element(EVL_MONITOR_DEV, name, &attrs, - flags & EVL_CLONE_MASK, &eids); + efd = create_evl_element(EVL_MONITOR_DEV, name, &attrs, flags, &eids); free(name); if (efd < 0) return efd; diff --git a/lib/internal.c b/lib/internal.c index 1c77361..6736fe4 100644 --- a/lib/internal.c +++ b/lib/internal.c @@ -28,6 +28,21 @@ static void lart_once(void) pthread_once(&lart_once, do_lart_once); } +static int flip_fd_flags(int efd, int cmd, int flags) +{ + int ret; + + ret = fcntl(efd, cmd == F_SETFD ? F_GETFD : F_GETFL, 0); + if (ret < 0) + return -errno; + + ret = fcntl(efd, cmd, ret | flags); + if (ret) + return -errno; + + return 0; +} + /* * Creating an EVL element is done in the following steps: * @@ -51,6 +66,12 @@ int create_evl_element(const char *type, const char *name, char *fdevname, *edevname = NULL; struct evl_clone_req clone; int ffd, efd, ret; + bool nonblock; + + nonblock = !!(clone_flags & EVL_CLONE_NONBLOCK); + /* Strip off user-only bits. */ + clone_flags &= EVL_CLONE_MASK; + clone_flags &= ~EVL_CLONE_NONBLOCK; ret = asprintf(&fdevname, "/dev/evl/%s/clone", type); if (ret < 0) @@ -101,16 +122,14 @@ int create_evl_element(const char *type, const char *name, efd = clone.efd; } - ret = fcntl(efd, F_GETFD, 0); - if (ret < 0) { - ret = -errno; + ret = flip_fd_flags(efd, F_SETFD, O_CLOEXEC); + if (ret) goto out_element; - } - ret = fcntl(efd, F_SETFD, ret | O_CLOEXEC); - if (ret) { - ret = -errno; - goto out_element; + if (nonblock) { + ret = flip_fd_flags(efd, F_SETFL, O_NONBLOCK); + if (ret) + goto out_element; } if (eids) diff --git a/lib/mutex.c b/lib/mutex.c index 54feb61..9189a4c 100644 --- a/lib/mutex.c +++ b/lib/mutex.c @@ -62,8 +62,7 @@ static int init_mutex_vargs(struct evl_mutex *mutex, attrs.protocol = protocol; attrs.clockfd = clockfd; attrs.initval = ceiling; - efd = create_evl_element(EVL_MONITOR_DEV, name, &attrs, - flags & EVL_CLONE_MASK, &eids); + efd = create_evl_element(EVL_MONITOR_DEV, name, &attrs, flags, &eids); free(name); if (efd < 0) return efd; diff --git a/lib/observable.c b/lib/observable.c new file mode 100644 index 0000000..4d17bbc --- /dev/null +++ b/lib/observable.c @@ -0,0 +1,114 @@ +/* + * SPDX-License-Identifier: MIT + * + * Copyright (C) 2020 Philippe Gerum + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include "internal.h" + +int evl_create_observable(int flags, const char *fmt, ...) +{ + int ret, efd; + va_list ap; + char *name; + + va_start(ap, fmt); + ret = vasprintf(&name, fmt, ap); + va_end(ap); + if (ret < 0) + return -ENOMEM; + + efd = create_evl_element(EVL_OBSERVABLE_DEV, name, NULL, flags, NULL); + free(name); + + return efd; +} + +static bool wants_oob_io(void) +{ + /* + * Only non-EVL threads or members of the SCHED_WEAK class + * should call in-band. + */ + if (evl_current == EVL_NO_HANDLE) + return false; + + return !(evl_get_current_mode() & T_WEAK); +} + +int evl_update_observable(int ofd, const struct evl_notice *ntc, int nr) +{ + ssize_t ret; + + if (!wants_oob_io()) + ret = write(ofd, ntc, nr * sizeof(*ntc)); + else + ret = oob_write(ofd, ntc, nr * sizeof(*ntc)); + + if (ret < 0) + return errno == EAGAIN ? 0 : -errno; + + return ret / sizeof(*ntc); +} + +static ssize_t do_read(int ofd, struct evl_notification *nf, int nr, + ssize_t (*readfn)(int ofd, void *buf, size_t count)) +{ + struct __evl_notification _nf __maybe_unused; + ssize_t ret, _ret __maybe_unused; + + /* + * This mess is exclusively intended not to expose the + * __evl_timespec type embedded into the __evl_notification + * descriptor to users. Legacy 32bit systems with + * Y0238-unsafe C libraries have to pay a price for this, by + * reading every notification one after another instead of + * pulling a bulk - this stupidly trivial way seems acceptable + * for those platforms. For all others, struct __evl_timespec + * used in kernel space and timespec in userland have the same + * memory layout, so we may read the notifications in one gulp + * directly into the user buffer. + */ +#if __WORDSIZE == 64 || __TIMESIZE == 64 + ret = readfn(ofd, nf, nr * sizeof(*nf)); +#else + ret = 0; + while (nr-- > 0) { + _ret = readfn(ofd, &_nf, sizeof(_nf)); + if (_ret <= 0) + return ret ?: _ret; + nf->tag = _nf.tag; + nf->serial = _nf.serial; + nf->issuer = _nf.issuer; + nf->event = _nf.event; + nf->date.tv_sec = (long)_nf.date.tv_sec; + nf->date.tv_nsec = _nf.date.tv_nsec; + ret += sizeof(*nf); + nf++; + } +#endif + + return ret; +} + +int evl_read_observable(int ofd, struct evl_notification *nf, int nr) +{ + ssize_t ret; + + if (!wants_oob_io()) + ret = do_read(ofd, nf, nr, read); + else + ret = do_read(ofd, nf, nr, oob_read); + if (ret < 0) + return -errno; + + return ret / sizeof(*nf); +} diff --git a/lib/poll.c b/lib/poll.c index 44e3042..a5d3201 100644 --- a/lib/poll.c +++ b/lib/poll.c @@ -18,7 +18,8 @@ int evl_new_poll(void) return create_evl_file(EVL_POLL_DEV); } -static int update_pollset(int efd, int op, int fd, unsigned int events) +static int update_pollset(int efd, int op, int fd, unsigned int events, + union evl_value pollval) { struct evl_poll_ctlreq creq; int ret; @@ -26,24 +27,27 @@ static int update_pollset(int efd, int op, int fd, unsigned int events) creq.action = op; creq.fd = fd; creq.events = events; + creq.pollval = pollval; ret = oob_ioctl(efd, EVL_POLIOC_CTL, &creq); return ret ? -errno : 0; } -int evl_add_pollfd(int efd, int fd, unsigned int events) +int evl_add_pollfd(int efd, int fd, unsigned int events, + union evl_value pollval) { - return update_pollset(efd, EVL_POLL_CTLADD, fd, events); + return update_pollset(efd, EVL_POLL_CTLADD, fd, events, pollval); } int evl_del_pollfd(int efd, int fd) { - return update_pollset(efd, EVL_POLL_CTLDEL, fd, 0); + return update_pollset(efd, EVL_POLL_CTLDEL, fd, 0, evl_nil); } -int evl_mod_pollfd(int efd, int fd, unsigned int events) +int evl_mod_pollfd(int efd, int fd, + unsigned int events, union evl_value pollval) { - return update_pollset(efd, EVL_POLL_CTLMOD, fd, events); + return update_pollset(efd, EVL_POLL_CTLMOD, fd, events, pollval); } static int do_poll(int efd, struct evl_poll_event *pollset, diff --git a/lib/proxy.c b/lib/proxy.c index d0210ab..6dbbdd9 100644 --- a/lib/proxy.c +++ b/lib/proxy.c @@ -49,8 +49,7 @@ int evl_create_proxy(int targetfd, size_t bufsz, size_t granularity, attrs.fd = targetfd; attrs.bufsz = bufsz; attrs.granularity = granularity; - efd = create_evl_element(EVL_PROXY_DEV, name, &attrs, - flags & EVL_CLONE_MASK, NULL); + efd = create_evl_element(EVL_PROXY_DEV, name, &attrs, flags, NULL); free(name); return efd; diff --git a/lib/sem.c b/lib/sem.c index ec3454c..39cc72c 100644 --- a/lib/sem.c +++ b/lib/sem.c @@ -48,8 +48,7 @@ int evl_create_sem(struct evl_sem *sem, int clockfd, attrs.protocol = EVL_EVENT_COUNT; attrs.clockfd = clockfd; attrs.initval = initval; - efd = create_evl_element(EVL_MONITOR_DEV, name, &attrs, - flags & EVL_CLONE_MASK, &eids); + efd = create_evl_element(EVL_MONITOR_DEV, name, &attrs, flags, &eids); free(name); if (efd < 0) return efd; diff --git a/lib/thread.c b/lib/thread.c index 56c841e..682f49c 100644 --- a/lib/thread.c +++ b/lib/thread.c @@ -19,6 +19,7 @@ #include #include #include +#include #include "internal.h" __thread __attribute__ ((tls_model (EVL_TLS_MODEL))) @@ -81,8 +82,7 @@ int evl_attach_thread(int flags, const char *fmt, ...) if (ret < 0) return -ENOMEM; - efd = create_evl_element(EVL_THREAD_DEV, name, NULL, - flags & EVL_CLONE_MASK, &eids); + efd = create_evl_element(EVL_THREAD_DEV, name, NULL, flags, &eids); free(name); if (efd < 0) return efd; @@ -234,3 +234,28 @@ int evl_clear_thread_mode(int efd, int mask, int *oldmask) { return do_thread_mode(efd, EVL_THRIOC_CLEAR_MODE, mask, oldmask); } + +int evl_subscribe(int efd, unsigned int backlog_count, int flags) +{ + struct evl_subscription sub; + int ret; + + sub.backlog_count = backlog_count; + sub.flags = flags; + ret = ioctl(efd, EVL_OBSIOC_SUBSCRIBE, &sub); + if (ret && errno == ENOTTY) + return -EPERM; + + return ret ? -errno : 0; +} + +int evl_unsubscribe(int efd) +{ + int ret; + + ret = ioctl(efd, EVL_OBSIOC_UNSUBSCRIBE); + if (ret && errno == ENOTTY) + return -EPERM; + + return ret ? -errno : 0; +} diff --git a/lib/xbuf.c b/lib/xbuf.c index 86f9a41..6fa393b 100644 --- a/lib/xbuf.c +++ b/lib/xbuf.c @@ -26,8 +26,7 @@ int evl_create_xbuf(size_t i_bufsz, size_t o_bufsz, attrs.i_bufsz = i_bufsz; attrs.o_bufsz = o_bufsz; - efd = create_evl_element(EVL_XBUF_DEV, name, &attrs, - flags & EVL_CLONE_MASK, NULL); + efd = create_evl_element(EVL_XBUF_DEV, name, &attrs, flags, NULL); free(name); return efd; diff --git a/tests/compile-tests/observable.cc b/tests/compile-tests/observable.cc new file mode 100644 index 0000000..b674ecc --- /dev/null +++ b/tests/compile-tests/observable.cc @@ -0,0 +1,15 @@ +/* + * SPDX-License-Identifier: MIT + * + * COMPILE-TESTING ONLY. + */ + +#include + +int main(int argc, char *argv[]) +{ + evl_new_observable("test"); + evl_create_observable(EVL_CLONE_PRIVATE, "test"); + + return 0; +} diff --git a/tests/compile-tests/poll.cc b/tests/compile-tests/poll.cc index 3d2869e..43efc7c 100644 --- a/tests/compile-tests/poll.cc +++ b/tests/compile-tests/poll.cc @@ -14,9 +14,9 @@ int main(int argc, char *argv[]) int efd; efd = evl_new_poll(); - evl_add_pollfd(efd, 1, POLLIN); + evl_add_pollfd(efd, 1, POLLIN, evl_nil); evl_del_pollfd(efd, 1); - evl_mod_pollfd(efd, 1, POLLOUT); + evl_mod_pollfd(efd, 1, POLLOUT, evl_nil); evl_read_clock(EVL_CLOCK_MONOTONIC, &timeout); evl_poll(efd, &pollset, 1); evl_timedpoll(efd, &pollset, 1, &timeout); diff --git a/tests/compile-tests/thread.cc b/tests/compile-tests/thread.cc index 0b48cd8..06ab4c1 100644 --- a/tests/compile-tests/thread.cc +++ b/tests/compile-tests/thread.cc @@ -24,6 +24,8 @@ int main(int argc, char *argv[]) evl_demote_thread(tfd); evl_set_thread_mode(tfd, 0, NULL); evl_clear_thread_mode(tfd, 0, NULL); + evl_subscribe(tfd, 1024, 0); + evl_unsubscribe(tfd); return 0; } diff --git a/tests/observable-inband.c b/tests/observable-inband.c new file mode 100644 index 0000000..63eb549 --- /dev/null +++ b/tests/observable-inband.c @@ -0,0 +1,205 @@ +/* + * SPDX-License-Identifier: MIT + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "helpers.h" + +#define MAX_THREADS 8 +#define BACKLOG_DEPTH 4096 +#define TERMINATOR 0xa5a5a5a5a5a5a5a5ULL + +#define LOW_PRIO 1 + +static int nrthreads = 1; + +static bool verbose; + +static int observable_fd; + +static struct evl_sem ready; + +static struct evl_notice next_states[] = { + [0] = { + .tag = EVL_NOTICE_USER, + .event = { + .lval = 0xa1a2a3a4a5bbccddULL, + }, + }, + [1] = { + .tag = EVL_NOTICE_USER + 1, + .event = { + .lval = 0xb1b2b3b4b5ffeeddULL, + }, + }, + [2] = { + .tag = EVL_NOTICE_USER + 2, + .event = { + .lval = 0xc1c2c3c4c5ccaabbULL, + } + }, + [3] = { + .tag = EVL_NOTICE_USER + 3, + .event = { + .lval = 0xffffffffeeeeeeeeULL, + } + }, +}; + +#define NR_UPDATES (sizeof(next_states) / sizeof(next_states[0])) + +#define do_trace(__fmt, __args...) \ + do { \ + if (verbose) \ + evl_printf(__fmt "\n", ##__args); \ + } while (0) + +static void usage(void) +{ + fprintf(stderr, "usage: observable-inband [options]:\n"); + fprintf(stderr, "-n --num-threads number of observer threads\n"); + fprintf(stderr, "-l --message-loops number of message loops\n"); + fprintf(stderr, "-v --verbose turn on verbosity\n"); +} + +#define short_optlist "vn:l:" + +static const struct option options[] = { + { + .name = "num-threads", + .has_arg = required_argument, + .val = 'n', + }, + { + .name = "message-loops", + .has_arg = required_argument, + .val = 'l', + }, + { + .name = "verbose", + .has_arg = no_argument, + .val = 'v', + }, + { /* Sentinel */ } +}; + +static void *observer_thread(void *arg) +{ + int serial = (int)(long)arg, n = 0; + struct evl_notification nf; + ssize_t ret; + + do_trace("in-band observer #%d started", serial); + + __Fcall_assert(ret, evl_read_observable(observable_fd, &nf, 1)); + __Texpr_assert(ret == -ENXIO); + + /* + * Don't attach, we want to make sure that a plain thread can + * read an observable. + */ + __Tcall_assert(ret, evl_subscribe(observable_fd, BACKLOG_DEPTH, 0)); + + evl_put_sem(&ready); + + for (;;) { + ret = evl_read_observable(observable_fd, &nf, 1); + do_trace("[%d] msg from pid=%d, at %ld.%ld, tag=%u, state=%llx", + serial, nf.issuer, nf.date.tv_sec, nf.date.tv_nsec, + nf.tag, nf.event.lval); + if (nf.event.lval == TERMINATOR) + break; + __Texpr_assert(next_states[n].tag == nf.tag); + __Texpr_assert(next_states[n].event.lval == nf.event.lval); + n = (n + 1) % (sizeof(next_states) / sizeof(next_states[0])); + } + + do_trace("in-band observer #%d done", serial); + + return NULL; +} + +int main(int argc, char *argv[]) +{ + int tfd, ofd, n, c, loops = 1000, throttle; + struct evl_notice terminator; + pthread_t tid[MAX_THREADS]; + ssize_t ret; + + for (;;) { + c = getopt_long(argc, argv, short_optlist, options, NULL); + if (c == EOF) + break; + + switch (c) { + case 0: + break; + case 'v': + verbose = true; + break; + case 'n': + nrthreads = atoi(optarg); + break; + case 'l': + loops = atoi(optarg); + break; + case '?': + default: + usage(); + return 1; + } + } + + if (optind < argc) { + usage(); + return 1; + } + + if (nrthreads <= 0) + nrthreads = 3; + if (nrthreads > MAX_THREADS) + error(1, EINVAL, "max %d threads", MAX_THREADS); + + __Tcall_assert(tfd, evl_attach_self("observable-inband:%d", getpid())); + __Tcall_assert(ret, evl_new_sem(&ready, "observable-inband-ready:%d", getpid())); + + __Tcall_assert(ofd, evl_new_observable("observable:%d", getpid())); + observable_fd = ofd; + + for (n = 0; n < nrthreads; n++) { + new_thread(tid + n, SCHED_OTHER, 0, observer_thread, (void *)(long)n); + __Tcall_assert(ret, evl_get_sem(&ready)); + } + + throttle = BACKLOG_DEPTH / (sizeof(next_states) / sizeof(next_states[0]) / 2); + for (n = 0; loops == 0 || n < loops; n++) { + __Tcall_errno_assert(ret, + evl_update_observable(observable_fd, next_states, NR_UPDATES)); + if (!(n % throttle)) + __Tcall_assert(ret, usleep(10000)); + } + + terminator.tag = EVL_NOTICE_USER; + terminator.event.lval = TERMINATOR; + __Tcall_errno_assert(ret, evl_update_observable(observable_fd, &terminator, 1)); + + for (n = 0; n < nrthreads; n++) + pthread_join(tid[n], NULL); + + __Tcall_assert(ret, close(observable_fd)); + + return 0; +} diff --git a/tests/observable-master.c b/tests/observable-master.c new file mode 100644 index 0000000..93007ba --- /dev/null +++ b/tests/observable-master.c @@ -0,0 +1,187 @@ +/* + * SPDX-License-Identifier: MIT + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "helpers.h" + +#define MAX_THREADS 5 +#define BACKLOG_DEPTH 1024 + +#define HIGH_PRIO 2 + +static bool verbose; + +static int observable_fd; + +static struct evl_sem ready; + +static struct evl_notice next_states[] = { + [0] = { + .tag = EVL_NOTICE_USER, + .event = { + .lval = 0xa1a2a3a4a5bbccddULL, + }, + }, + [1] = { + .tag = EVL_NOTICE_USER + 1, + .event = { + .lval = 0xb1b2b3b4b5ffeeddULL, + }, + }, + [2] = { + .tag = EVL_NOTICE_USER + 2, + .event = { + .lval = 0xc1c2c3c4c5ccaabbULL, + } + }, + [3] = { + .tag = EVL_NOTICE_USER + 3, + .event = { + .lval = 0xd1d2d3d4d5ffeeddULL, + } + }, + [4] = { + .tag = EVL_NOTICE_USER + 4, + .event = { + .lval = 0xe1e2e3e4e5010203ULL, + } + }, +}; + +#define do_trace(__fmt, __args...) \ + do { \ + if (verbose) \ + evl_printf(__fmt "\n", ##__args); \ + } while (0) + +static void usage(void) +{ + fprintf(stderr, "usage: observable-master [options]:\n"); + fprintf(stderr, "-l --message-loops number of message loops\n"); + fprintf(stderr, "-v --verbose turn on verbosity\n"); +} + +#define short_optlist "vl:" + +static const struct option options[] = { + { + .name = "message-loops", + .has_arg = required_argument, + .val = 'l', + }, + { + .name = "verbose", + .has_arg = no_argument, + .val = 'v', + }, + { /* Sentinel */ } +}; + +static void *worker_thread(void *arg) +{ + int serial = (int)(long)arg, tfd; + struct evl_notification nf; + ssize_t ret; + + do_trace("worker thread #%d started", serial); + + __Tcall_assert(tfd, evl_attach_self("oob-observer:%d.%d", + getpid(), serial)); + + __Tcall_assert(ret, evl_subscribe(observable_fd, BACKLOG_DEPTH, 0)); + + evl_put_sem(&ready); + + /* + * Expect round-robin scheduling of workers if master mode. + */ + for (;;) { + ret = evl_read_observable(observable_fd, &nf, 1); + __Texpr_assert((ret == -EBADF) || ret == 1); + if (ret < 0) + break; + do_trace("[%d] msg from pid=%d, at %ld.%ld, tag=%u, state=%llx", + serial, nf.issuer, nf.date.tv_sec, nf.date.tv_nsec, + nf.tag, nf.event.lval); + __Texpr_assert(next_states[serial].tag == nf.tag); + __Texpr_assert(next_states[serial].event.lval == nf.event.lval); + } + + /* Already closed in main(), should fail. */ + __Fcall_assert(ret, evl_unsubscribe(observable_fd)); + __Texpr_assert(errno == EBADF); + + do_trace("worker thread #%d done", serial); + + return NULL; +} + +int main(int argc, char *argv[]) +{ + int tfd, ofd, n, c, loops = 1000; + pthread_t tid[MAX_THREADS]; + ssize_t ret; + + for (;;) { + c = getopt_long(argc, argv, short_optlist, options, NULL); + if (c == EOF) + break; + + switch (c) { + case 0: + break; + case 'v': + verbose = true; + break; + case 'l': + loops = atoi(optarg); + break; + case '?': + default: + usage(); + return 1; + } + } + + if (optind < argc) { + usage(); + return 1; + } + + __Tcall_assert(tfd, evl_attach_self("observable-master:%d", getpid())); + __Tcall_assert(ret, evl_new_sem(&ready, "observable-master-ready:%d", getpid())); + + __Tcall_assert(ofd, evl_create_observable(EVL_CLONE_MASTER, + "observable:%d", getpid())); + observable_fd = ofd; + + for (n = 0; n < MAX_THREADS; n++) { + new_thread(tid + n, SCHED_FIFO, HIGH_PRIO, worker_thread, (void *)(long)n); + __Tcall_assert(ret, evl_get_sem(&ready)); + } + + for (n = 0; loops == 0 || n < loops; n++) + __Tcall_errno_assert(ret, evl_update_observable(observable_fd, + &next_states[n % MAX_THREADS], 1)); + + __Tcall_assert(ret, close(observable_fd)); + + for (n = 0; n < MAX_THREADS; n++) + pthread_join(tid[n], NULL); + + return 0; +} diff --git a/tests/observable-onchange.c b/tests/observable-onchange.c new file mode 100644 index 0000000..96c25a1 --- /dev/null +++ b/tests/observable-onchange.c @@ -0,0 +1,56 @@ +/* + * SPDX-License-Identifier: MIT + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include "helpers.h" + +int main(int argc, char *argv[]) +{ + struct evl_notification nf; + struct evl_notice next; + int tfd, ofd; + ssize_t ret; + + __Tcall_assert(tfd, evl_attach_self("observable-onchange:%d", getpid())); + + __Tcall_assert(ofd, evl_new_observable("observable:%d", getpid())); + __Tcall_assert(ret, evl_subscribe(ofd, 16, EVL_NOTIFY_ONCHANGE)); + + next.tag = EVL_NOTICE_USER; + + /* Send 1, 1, 2 */ + next.event.lval = 1ULL; + __Tcall_assert(ret, evl_update_observable(ofd, &next, 1)); + __Texpr_assert(ret == 1); + next.event.lval = 1ULL; + __Tcall_assert(ret, evl_update_observable(ofd, &next, 1)); + __Texpr_assert(ret == 1); + next.event.lval = 2ULL; + __Tcall_assert(ret, evl_update_observable(ofd, &next, 1)); + __Texpr_assert(ret == 1); + + /* Receive 1, 2. */ + __Tcall_assert(ret, evl_read_observable(ofd, &nf, 1)); + __Texpr_assert(ret == 1); + __Texpr_assert(nf.tag == EVL_NOTICE_USER); + __Texpr_assert(nf.event.lval == 1ULL); + __Tcall_assert(ret, evl_read_observable(ofd, &nf, 1)); + __Texpr_assert(ret == 1); + __Texpr_assert(nf.tag == EVL_NOTICE_USER); + __Texpr_assert(nf.event.lval == 2ULL); + + /* There should be nothing more to be read. */ + __Tcall_errno_assert(ret, fcntl(ofd, F_SETFL, O_NONBLOCK)); + __Fcall_assert(ret, evl_read_observable(ofd, &nf, 1)); + __Texpr_assert(ret == -EAGAIN); + + return 0; +} diff --git a/tests/observable-oob.c b/tests/observable-oob.c new file mode 100644 index 0000000..818cc44 --- /dev/null +++ b/tests/observable-oob.c @@ -0,0 +1,210 @@ +/* + * SPDX-License-Identifier: MIT + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "helpers.h" + +#define MAX_THREADS 8 +#define BACKLOG_DEPTH 1024 + +#define LOW_PRIO 1 +#define HIGH_PRIO 2 + +static int nrthreads = 1; + +static bool verbose; + +static int observable_fd; + +static struct evl_sem ready; + +static struct evl_notice next_states[] = { + [0] = { + .tag = EVL_NOTICE_USER, + .event = { + .lval = 0xa1a2a3a4a5bbccddULL, + }, + }, + [1] = { + .tag = EVL_NOTICE_USER + 1, + .event = { + .lval = 0xb1b2b3b4b5ffeeddULL, + }, + }, + [2] = { + .tag = EVL_NOTICE_USER + 2, + .event = { + .lval = 0xc1c2c3c4c5ccaabbULL, + } + }, + [3] = { + .tag = EVL_NOTICE_USER + 3, + .event = { + .lval = 0xffffffffeeeeeeeeULL, + } + }, +}; + +#define NR_UPDATES (sizeof(next_states) / sizeof(next_states[0])) + +#define do_trace(__fmt, __args...) \ + do { \ + if (verbose) \ + evl_printf(__fmt "\n", ##__args); \ + } while (0) + +static void usage(void) +{ + fprintf(stderr, "usage: observable-oob [options]:\n"); + fprintf(stderr, "-n --num-threads number of observer threads\n"); + fprintf(stderr, "-l --message-loops number of message loops\n"); + fprintf(stderr, "-v --verbose turn on verbosity\n"); +} + +#define short_optlist "vn:l:" + +static const struct option options[] = { + { + .name = "num-threads", + .has_arg = required_argument, + .val = 'n', + }, + { + .name = "message-loops", + .has_arg = required_argument, + .val = 'l', + }, + { + .name = "verbose", + .has_arg = no_argument, + .val = 'v', + }, + { /* Sentinel */ } +}; + +static void *observer_thread(void *arg) +{ + int serial = (int)(long)arg, tfd, n = 0; + struct evl_sched_attrs attrs; + struct evl_notification nf; + ssize_t ret; + + do_trace("out-of-band observer #%d started", serial); + + __Tcall_assert(tfd, evl_attach_self("oob-observer:%d.%d", + getpid(), serial)); + attrs.sched_policy = SCHED_FIFO; + attrs.sched_priority = HIGH_PRIO; + __Tcall_assert(ret, evl_set_schedattr(tfd, &attrs)); + + __Tcall_assert(ret, evl_subscribe(observable_fd, BACKLOG_DEPTH, 0)); + __Tcall_assert(ret, evl_unsubscribe(observable_fd)); + __Tcall_assert(ret, evl_subscribe(observable_fd, BACKLOG_DEPTH, 0)); + + evl_put_sem(&ready); + + for (;;) { + ret = evl_read_observable(observable_fd, &nf, 1); + __Texpr_assert((ret == -EBADF) || ret == 1); + if (ret < 0) + break; + do_trace("[%d] msg from pid=%d, at %ld.%ld, tag=%u, state=%llx", + serial, nf.issuer, nf.date.tv_sec, nf.date.tv_nsec, + nf.tag, nf.event.lval); + __Texpr_assert(next_states[n].tag == nf.tag); + __Texpr_assert(next_states[n].event.lval == nf.event.lval); + n = (n + 1) % NR_UPDATES; + } + + /* Already closed in main(), should fail. */ + __Fcall_assert(ret, evl_unsubscribe(observable_fd)); + __Texpr_assert(errno == EBADF); + + do_trace("out-of-band observer #%d done", serial); + + return NULL; +} + +int main(int argc, char *argv[]) +{ + int tfd, ofd, n, c, loops = 1000, throttle; + pthread_t tid[MAX_THREADS]; + struct sched_param param; + ssize_t ret; + + for (;;) { + c = getopt_long(argc, argv, short_optlist, options, NULL); + if (c == EOF) + break; + + switch (c) { + case 0: + break; + case 'v': + verbose = true; + break; + case 'n': + nrthreads = atoi(optarg); + break; + case 'l': + loops = atoi(optarg); + break; + case '?': + default: + usage(); + return 1; + } + } + + if (optind < argc) { + usage(); + return 1; + } + + if (nrthreads <= 0) + nrthreads = 3; + if (nrthreads > MAX_THREADS) + error(1, EINVAL, "max %d threads", MAX_THREADS); + + param.sched_priority = LOW_PRIO; + __Tcall_assert(ret, pthread_setschedparam(pthread_self(), SCHED_FIFO, ¶m)); + __Tcall_assert(tfd, evl_attach_self("observable-oob:%d", getpid())); + __Tcall_assert(ret, evl_new_sem(&ready, "observable-oob-ready:%d", getpid())); + + __Tcall_assert(ofd, evl_new_observable("observable:%d", getpid())); + observable_fd = ofd; + + for (n = 0; n < nrthreads; n++) { + new_thread(tid + n, SCHED_FIFO, 1, observer_thread, (void *)(long)n); + __Tcall_assert(ret, evl_get_sem(&ready)); + } + + throttle = BACKLOG_DEPTH / NR_UPDATES / 2; + for (n = 0; loops == 0 || n < loops; n++) { + __Tcall_errno_assert(ret, + evl_update_observable(observable_fd, next_states, NR_UPDATES)); + if (!(n % throttle)) + __Tcall_assert(ret, evl_usleep(10000)); + } + + __Tcall_assert(ret, close(observable_fd)); + + for (n = 0; n < nrthreads; n++) + pthread_join(tid[n], NULL); + + return 0; +} diff --git a/tests/observable-race.c b/tests/observable-race.c new file mode 100644 index 0000000..7a23d73 --- /dev/null +++ b/tests/observable-race.c @@ -0,0 +1,224 @@ +/* + * SPDX-License-Identifier: MIT + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "helpers.h" + +#define MAX_THREADS 8 + +#define LOW_PRIO 1 +#define HIGH_PRIO 2 + +static int nrthreads = 1; + +static bool verbose; + +static bool send_oob; + +static int observable_fd; + +static struct evl_sem ready; + +static struct evl_notice next_states[] = { + [0] = { + .tag = EVL_NOTICE_USER, + .event = { + .lval = 0xa1a2a3a4a5bbccddULL, + }, + }, + [1] = { + .tag = EVL_NOTICE_USER + 1, + .event = { + .lval = 0xb1b2b3b4b5ffeeddULL, + }, + }, + [2] = { + .tag = EVL_NOTICE_USER + 2, + .event = { + .lval = 0xc1c2c3c4c5ccaabbULL, + } + }, +}; + +#define NR_UPDATES (sizeof(next_states) / sizeof(next_states[0])) + +#define do_trace(__fmt, __args...) \ + do { \ + if (verbose) \ + evl_printf(__fmt "\n", ##__args); \ + } while (0) + +static void usage(void) +{ + fprintf(stderr, "usage: observable-race [options]:\n"); + fprintf(stderr, "-S --send-oob run sender out-of-band\n"); + fprintf(stderr, "-n --num-threads number of observer threads\n"); + fprintf(stderr, "-l --message-loops number of message loops\n"); + fprintf(stderr, "-v --verbose turn on verbosity\n"); +} + +#define short_optlist "vn:Sl:" + +static const struct option options[] = { + { + .name = "num-threads", + .has_arg = required_argument, + .val = 'n', + }, + { + .name = "message-loops", + .has_arg = required_argument, + .val = 'l', + }, + { + .name = "send-oob", + .has_arg = no_argument, + .val = 'S', + }, + { + .name = "verbose", + .has_arg = no_argument, + .val = 'v', + }, + { /* Sentinel */ } +}; + +static void *observer_thread(void *arg) +{ + int serial = (int)(long)arg, tfd, n; + struct evl_sched_attrs attrs; + struct evl_notification nf; + ssize_t ret; + + do_trace("observer #%d started", serial); + + __Tcall_assert(tfd, evl_attach_self("race-observer:%d.%d", + getpid(), serial)); + attrs.sched_policy = SCHED_FIFO; + attrs.sched_priority = HIGH_PRIO; + __Tcall_assert(ret, evl_set_schedattr(tfd, &attrs)); + + __Tcall_assert(ret, evl_subscribe(observable_fd, 1024, 0)); + + evl_put_sem(&ready); + + for (n = 0; ; n++) { + __Tcall_assert(ret, evl_read_observable(observable_fd, &nf, 1)); + __Texpr_assert(ret == 1); + do_trace("[%d] msg from pid=%d, at %ld.%ld, tag=%u, state=%llx", + serial, nf.issuer, nf.date.tv_sec, nf.date.tv_nsec, + nf.tag, nf.event.lval); + if (!(n % 100)) { + __Tcall_assert(ret, evl_unsubscribe(observable_fd)); + do_trace("[%d] round=%d UNSUBSCRIBED", serial, n); + __Tcall_assert(ret, evl_subscribe(observable_fd, 16, 0)); + do_trace("[%d] round=%d SUBSCRIBED", serial, n); + } + } + + return NULL; +} + +int main(int argc, char *argv[]) +{ + int tfd, ofd, pfd, n, c, loops = 100000, ret; + struct evl_poll_event pollset; + pthread_t tid[MAX_THREADS]; + struct sched_param param; + + for (;;) { + c = getopt_long(argc, argv, short_optlist, options, NULL); + if (c == EOF) + break; + + switch (c) { + case 0: + break; + case 'S': + send_oob = true; + break; + case 'v': + verbose = true; + break; + case 'n': + nrthreads = atoi(optarg); + break; + case 'l': + loops = atoi(optarg); + break; + case '?': + default: + usage(); + return 1; + } + } + + if (optind < argc) { + usage(); + return 1; + } + + if (nrthreads <= 0) + nrthreads = 3; + if (nrthreads > MAX_THREADS) + error(1, EINVAL, "max %d threads", MAX_THREADS); + + if (send_oob) { + param.sched_priority = LOW_PRIO; + __Tcall_assert(ret, pthread_setschedparam(pthread_self(), SCHED_FIFO, ¶m)); + do_trace("sender runs out-of-band"); + } + __Tcall_assert(tfd, evl_attach_self("observable-race:%d", getpid())); + __Tcall_assert(ret, evl_new_sem(&ready, "observable-race-ready:%d", getpid())); + + __Tcall_assert(ofd, evl_new_observable("observable:%d", getpid())); + observable_fd = ofd; + + __Tcall_assert(pfd, evl_new_poll()); + __Tcall_assert(ret, evl_add_pollfd(pfd, ofd, POLLOUT, evl_nil)); + + for (n = 0; n < nrthreads; n++) { + new_thread(tid + n, SCHED_FIFO, 1, observer_thread, (void *)(long)n); + __Tcall_assert(ret, evl_get_sem(&ready)); + } + + for (n = 0; loops == 0 || n < loops; n++) { + ret = evl_update_observable(observable_fd, next_states, NR_UPDATES); + do_trace("round=%d wrote %d notices (errno=%d)", + n, ret, ret < 0 ? errno : 0); + __Texpr_assert(ret >= 0); + if (ret == 0) { + /* + * Throttle output on contention. Writability + * means that at least one observer has buffer + * space to receive at least one message. + */ + __Tcall_assert(ret, evl_poll(pfd, &pollset, 1)); + __Texpr_assert(ret == 1); + __Texpr_assert(pollset.events == POLLOUT); + __Texpr_assert(pollset.fd == ofd); + } + } + + for (n = 0; n < nrthreads; n++) { + pthread_cancel(tid[n]); + pthread_join(tid[n], NULL); + } + + return 0; +} diff --git a/tests/observable-thread.c b/tests/observable-thread.c new file mode 100644 index 0000000..f0af4dc --- /dev/null +++ b/tests/observable-thread.c @@ -0,0 +1,72 @@ +/* + * SPDX-License-Identifier: MIT + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include "helpers.h" + +int main(int argc, char *argv[]) +{ + struct evl_notification nf; + struct evl_notice next; + ssize_t ret; + int tfd; + + __Tcall_assert(tfd, evl_attach_self("observable-thread:%d", getpid())); + + /* Subscribe to myself, should fail since not observable. */ + __Fcall_assert(ret, evl_subscribe(tfd, 16, 0)); + __Texpr_assert(ret == -EPERM); + + __Tcall_assert(ret, evl_detach_self()); + __Tcall_assert(tfd, evl_attach_thread(EVL_CLONE_OBSERVABLE, + "observable-thread:%d", getpid())); + /* Should work this time. */ + __Tcall_assert(ret, evl_subscribe(tfd, 16, 0)); + + next.tag = EVL_NOTICE_USER; + + /* Send 1, 2, 3. */ + next.event.lval = 1ULL; + __Tcall_errno_assert(ret, evl_update_observable(tfd, &next, 1)); + __Texpr_assert(ret == 1); + next.event.lval = 2ULL; + __Tcall_errno_assert(ret, evl_update_observable(tfd, &next, 1)); + __Texpr_assert(ret == 1); + next.event.lval = 3ULL; + __Tcall_errno_assert(ret, evl_update_observable(tfd, &next, 1)); + __Texpr_assert(ret == 1); + + /* Try sending a wrong tag, should fail. */ + next.tag = EVL_NOTICE_USER - 1; + next.event.lval = 4ULL; + __Fcall_assert(ret, evl_update_observable(tfd, &next, 1)); + __Texpr_assert(ret == -EINVAL); + + /* Receive the initial sequence. */ + __Tcall_assert(ret, evl_read_observable(tfd, &nf, 1)); + __Texpr_assert(ret == 1); + __Texpr_assert(nf.tag == EVL_NOTICE_USER); + __Texpr_assert(nf.event.lval == 1ULL); + __Tcall_assert(ret, evl_read_observable(tfd, &nf, 1)); + __Texpr_assert(ret == 1); + __Texpr_assert(nf.tag == EVL_NOTICE_USER); + __Texpr_assert(nf.event.lval == 2ULL); + __Tcall_assert(ret, evl_read_observable(tfd, &nf, 1)); + __Texpr_assert(ret == 1); + __Texpr_assert(nf.tag == EVL_NOTICE_USER); + __Texpr_assert(nf.event.lval == 3ULL); + + __Tcall_errno_assert(ret, fcntl(tfd, F_SETFL, O_NONBLOCK)); + __Fcall_assert(ret, evl_read_observable(tfd, &nf, 1)); + __Texpr_assert(ret == -EAGAIN); + + return 0; +} diff --git a/tests/poll-close.c b/tests/poll-close.c index 5af4f59..468912a 100644 --- a/tests/poll-close.c +++ b/tests/poll-close.c @@ -40,7 +40,7 @@ int main(int argc, char *argv[]) __Tcall_assert(xfd, evl_new_xbuf(1024, name)); __Tcall_assert(pfd, evl_new_poll()); - __Tcall_assert(ret, evl_add_pollfd(pfd, xfd, POLLIN)); + __Tcall_assert(ret, evl_add_pollfd(pfd, xfd, POLLIN, evl_nil)); new_thread(&poller, SCHED_FIFO, 1, polling_thread, NULL); diff --git a/tests/poll-flags.c b/tests/poll-flags.c index d271883..c5e10ab 100644 --- a/tests/poll-flags.c +++ b/tests/poll-flags.c @@ -71,9 +71,9 @@ int main(int argc, char *argv[]) __Tcall_assert(ffd, evl_new_flags(&flags, name)); __Tcall_assert(pollfd_in, evl_new_poll()); - __Tcall_assert(ret, evl_add_pollfd(pollfd_in, ffd, POLLIN)); + __Tcall_assert(ret, evl_add_pollfd(pollfd_in, ffd, POLLIN, evl_nil)); __Tcall_assert(pollfd_out, evl_new_poll()); - __Tcall_assert(ret, evl_add_pollfd(pollfd_out, ffd, POLLOUT)); + __Tcall_assert(ret, evl_add_pollfd(pollfd_out, ffd, POLLOUT, evl_nil)); for (n = 0; n < NR_RECEIVERS; n++) { c[n].serial = n; diff --git a/tests/poll-nested.c b/tests/poll-nested.c index c92c509..fcf422c 100644 --- a/tests/poll-nested.c +++ b/tests/poll-nested.c @@ -27,22 +27,22 @@ int main(int argc, char *argv[]) * nesting, only cycles. */ __Tcall_assert(pfd1, evl_new_poll()); - __Fcall_assert(ret, evl_add_pollfd(pfd1, pfd1, POLLIN)); + __Fcall_assert(ret, evl_add_pollfd(pfd1, pfd1, POLLIN, evl_nil)); __Texpr_assert(ret == -ELOOP); __Tcall_assert(pfd2, evl_new_poll()); - __Tcall_assert(ret, evl_add_pollfd(pfd1, pfd2, POLLIN)); + __Tcall_assert(ret, evl_add_pollfd(pfd1, pfd2, POLLIN, evl_nil)); __Tcall_assert(pfd3, evl_new_poll()); - __Tcall_assert(ret, evl_add_pollfd(pfd2, pfd3, POLLIN)); + __Tcall_assert(ret, evl_add_pollfd(pfd2, pfd3, POLLIN, evl_nil)); __Tcall_assert(pfd4, evl_new_poll()); - __Tcall_assert(ret, evl_add_pollfd(pfd3, pfd4, POLLIN)); + __Tcall_assert(ret, evl_add_pollfd(pfd3, pfd4, POLLIN, evl_nil)); __Tcall_assert(pfd5, evl_new_poll()); - __Tcall_assert(ret, evl_add_pollfd(pfd4, pfd5, POLLIN)); + __Tcall_assert(ret, evl_add_pollfd(pfd4, pfd5, POLLIN, evl_nil)); - __Fcall_assert(ret, evl_add_pollfd(pfd5, pfd1, POLLIN)); + __Fcall_assert(ret, evl_add_pollfd(pfd5, pfd1, POLLIN, evl_nil)); __Texpr_assert(ret == -ELOOP); close(pfd5); diff --git a/tests/poll-observable-inband.c b/tests/poll-observable-inband.c new file mode 100644 index 0000000..18eee97 --- /dev/null +++ b/tests/poll-observable-inband.c @@ -0,0 +1,105 @@ +/* + * SPDX-License-Identifier: MIT + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "helpers.h" + +#define NR_RECEIVERS 1 + +#define LOW_PRIO 1 + +#define WRITE_COUNT 1024 + +static int pollfd_in, ofd; + +struct test_context { + int serial; +}; + +static struct evl_sem ready; + +static void *observable_poller(void *arg) +{ + struct test_context *p = arg; + struct evl_notification nf; + struct epoll_event evs; + int ret, tfd, n; + + __Tcall_assert(tfd, evl_attach_self("observable-poller:%d.%d", + getpid(), p->serial)); + + __Tcall_assert(ret, evl_subscribe(ofd, WRITE_COUNT, 0)); + + evl_put_sem(&ready); + + for (n = 0; n < WRITE_COUNT; n++) { + __Tcall_assert(ret, epoll_wait(pollfd_in, &evs, 1, -1)); + __Texpr_assert(ret == 1); + __Texpr_assert(evs.events == POLLIN); + __Texpr_assert(evs.data.fd == ofd); + __Tcall_assert(ret, evl_read_observable(ofd, &nf, 1)); + __Texpr_assert(ret == 1); + __Texpr_assert(nf.tag == EVL_NOTICE_USER); + __Texpr_assert(nf.event.val == 0xa5a5a5a5); + } + + return NULL; +} + +int main(int argc, char *argv[]) +{ + struct test_context c[NR_RECEIVERS]; + pthread_t pollers[NR_RECEIVERS]; + struct sched_param param; + struct evl_notice next; + struct epoll_event ev; + void *status = NULL; + int tfd, ret, n; + + param.sched_priority = LOW_PRIO; + __Texpr_assert(pthread_setschedparam(pthread_self(), + SCHED_FIFO, ¶m) == 0); + + /* EVL inherits the inband scheduling params upon attachment. */ + __Tcall_assert(tfd, evl_attach_self("poll-observable-inband:%d", getpid())); + + __Tcall_assert(ofd, evl_new_observable("observable:%d", getpid())); + __Tcall_assert(ret, evl_new_sem(&ready, "poll-observable-inband-ready:%d", getpid())); + + __Tcall_assert(pollfd_in, epoll_create1(0)); + ev.events = POLLIN; + ev.data.fd = ofd; + __Tcall_errno_assert(ret, epoll_ctl(pollfd_in, EPOLL_CTL_ADD, ofd, &ev)); + + for (n = 0; n < NR_RECEIVERS; n++) { + c[n].serial = n; + new_thread(pollers + n, SCHED_OTHER, 0, observable_poller, c + n); + __Tcall_assert(ret, evl_get_sem(&ready)); + } + + for (n = 0; n < WRITE_COUNT; n++) { + next.tag = EVL_NOTICE_USER; + next.event.val = 0xa5a5a5a5; + __Tcall_assert(ret, evl_update_observable(ofd, &next, 1)); + __Texpr_assert(ret == 1); + } + + for (n = 0; n < NR_RECEIVERS; n++) { + __Texpr_assert(pthread_join(pollers[n], &status) == 0); + __Texpr_assert(status == NULL); + } + + return 0; +} diff --git a/tests/poll-observable-oob.c b/tests/poll-observable-oob.c new file mode 100644 index 0000000..d5193c5 --- /dev/null +++ b/tests/poll-observable-oob.c @@ -0,0 +1,102 @@ +/* + * SPDX-License-Identifier: MIT + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "helpers.h" + +#define NR_RECEIVERS 1 + +#define LOW_PRIO 1 +#define HIGH_PRIO 2 + +#define WRITE_COUNT 1024 + +static int pollfd_in, ofd; + +struct test_context { + int serial; +}; + +static struct evl_sem ready; + +static void *observable_poller(void *arg) +{ + struct evl_poll_event pollset; + struct test_context *p = arg; + struct evl_notification nf; + int ret, tfd, n; + + __Tcall_assert(tfd, evl_attach_self("observable-poller:%d.%d", + getpid(), p->serial)); + + __Tcall_assert(ret, evl_subscribe(ofd, WRITE_COUNT, 0)); + + evl_put_sem(&ready); + + for (n = 0; n < WRITE_COUNT; n++) { + __Tcall_assert(ret, evl_poll(pollfd_in, &pollset, 1)); + __Texpr_assert(ret == 1); + __Texpr_assert(pollset.events == POLLIN); + __Texpr_assert(pollset.fd == ofd); + __Tcall_assert(ret, evl_read_observable(ofd, &nf, 1)); + __Texpr_assert(ret == 1); + __Texpr_assert(nf.tag == EVL_NOTICE_USER); + __Texpr_assert(nf.event.val == 0xa5a5a5a5); + } + + return NULL; +} + +int main(int argc, char *argv[]) +{ + struct test_context c[NR_RECEIVERS]; + pthread_t pollers[NR_RECEIVERS]; + struct sched_param param; + struct evl_notice next; + void *status = NULL; + int tfd, ret, n; + + param.sched_priority = LOW_PRIO; + __Texpr_assert(pthread_setschedparam(pthread_self(), + SCHED_FIFO, ¶m) == 0); + + /* EVL inherits the inband scheduling params upon attachment. */ + __Tcall_assert(tfd, evl_attach_self("poll-observable-oob:%d", getpid())); + + __Tcall_assert(ofd, evl_new_observable("observable:%d", getpid())); + __Tcall_assert(ret, evl_new_sem(&ready, "poll-observable-oob-ready:%d", getpid())); + + __Tcall_assert(pollfd_in, evl_new_poll()); + __Tcall_assert(ret, evl_add_pollfd(pollfd_in, ofd, POLLIN, evl_nil)); + + for (n = 0; n < NR_RECEIVERS; n++) { + c[n].serial = n; + new_thread(pollers + n, SCHED_FIFO, HIGH_PRIO, observable_poller, c + n); + __Tcall_assert(ret, evl_get_sem(&ready)); + } + + for (n = 0; n < WRITE_COUNT; n++) { + next.tag = EVL_NOTICE_USER; + next.event.val = 0xa5a5a5a5; + __Tcall_assert(ret, evl_update_observable(ofd, &next, 1)); + __Texpr_assert(ret == 1); + } + + for (n = 0; n < NR_RECEIVERS; n++) { + __Texpr_assert(pthread_join(pollers[n], &status) == 0); + __Texpr_assert(status == NULL); + } + + return 0; +} diff --git a/tests/poll-sem.c b/tests/poll-sem.c index 0399b00..15cb6d2 100644 --- a/tests/poll-sem.c +++ b/tests/poll-sem.c @@ -69,7 +69,7 @@ int main(int argc, char *argv[]) __Tcall_assert(sfd, evl_new_sem(&sem, name)); __Tcall_assert(pollfd_in, evl_new_poll()); - __Tcall_assert(ret, evl_add_pollfd(pollfd_in, sfd, POLLIN)); + __Tcall_assert(ret, evl_add_pollfd(pollfd_in, sfd, POLLIN, evl_nil)); for (n = 0; n < NR_RECEIVERS; n++) { c[n].serial = n; diff --git a/tests/poll-read.c b/tests/poll-xbuf.c similarity index 96% rename from tests/poll-read.c rename to tests/poll-xbuf.c index 3f376bc..420a150 100644 --- a/tests/poll-read.c +++ b/tests/poll-xbuf.c @@ -84,7 +84,7 @@ int main(int argc, char *argv[]) new_thread(&writer, SCHED_OTHER, 0, writer_thread, path); __Tcall_assert(pfd, evl_new_poll()); - __Tcall_assert(ret, evl_add_pollfd(pfd, xfd, POLLIN)); + __Tcall_assert(ret, evl_add_pollfd(pfd, xfd, POLLIN, evl_nil)); for (n = 0; n < sizeof(msg) / sizeof(msg[0]) - 1; n++) { __Tcall_assert(ret, evl_poll(pfd, &pollset, 1));