lib: introduce the interface to observables

Since ABI 23, the core provides the new observable element, which
enables the observer design pattern. Any EVL thread is in and of
itself an observable which can be monitored for events too.

As a by-product, the poll interface can now be given a user-defined
opaque data when subscribing file descriptors to poll elements, which
the core passes back on return to evl_poll().

Signed-off-by: Philippe Gerum <rpm@xenomai.org>
This commit is contained in:
Philippe Gerum 2020-04-24 19:25:19 +02:00
parent 197b2d4d41
commit f27fc99387
34 changed files with 1451 additions and 49 deletions

View File

@ -38,6 +38,7 @@ evl_open_mutex
evl_open_event
evl_open_flags
evl_open_sem
The observable API.
== VARIATION(S)

View File

@ -16,7 +16,8 @@ int evl_new_poll(void)
return epoll_create1(EPOLL_CLOEXEC);
}
int evl_add_pollfd(int efd, int newfd, unsigned int events)
int evl_add_pollfd(int efd, int newfd, unsigned int events,
union evl_value pollval)
{
struct epoll_event ev;
int ret;
@ -45,7 +46,8 @@ int evl_del_pollfd(int efd, int delfd)
return 0;
}
int evl_mod_pollfd(int efd, int modfd, unsigned int events)
int evl_mod_pollfd(int efd, int modfd, unsigned int events,
union evl_value pollval)
{
struct epoll_event ev;
int ret;

View File

@ -14,6 +14,7 @@
struct evl_poll_event {
__u32 fd;
__u32 events;
union evl_value pollval;
};
#ifdef __cplusplus
@ -23,12 +24,14 @@ extern "C" {
int evl_new_poll(void);
int evl_add_pollfd(int efd, int newfd,
unsigned int events);
unsigned int events,
union evl_value pollval);
int evl_del_pollfd(int efd, int delfd);
int evl_mod_pollfd(int efd, int modfd,
unsigned int events);
unsigned int events,
union evl_value pollval);
int evl_timedpoll(int efd, struct evl_poll_event *pollset,
int nrset, struct timespec *timeout);

View File

@ -7,6 +7,8 @@
#ifndef _EVL_ESHI_UAPI_H
#define _EVL_ESHI_UAPI_H
#include <stdint.h>
#define EVL_CLOCK_MONOTONIC_DEV "monotonic"
#define EVL_CLOCK_REALTIME_DEV "realtime"
#define EVL_CLOCK_DEV "clock"
@ -15,5 +17,14 @@
#define EVL_PROXY_DEV "proxy"
#define EVL_THREAD_DEV "thread"
#define EVL_XBUF_DEV "xbuf"
#define EVL_OBSERVABLE_DEV "observable"
union evl_value {
int32_t val;
int64_t lval;
void *ptr;
};
#define evl_nil ((union evl_value){ .lval = 0 })
#endif /* _EVL_ESHI_UAPI_H */

View File

@ -20,9 +20,9 @@
#include <evl/poll.h>
#include <evl/proxy.h>
#define __EVL__ 15 /* API version */
#define __EVL__ 16 /* API version */
#define EVL_ABI_PREREQ 21
#define EVL_ABI_PREREQ 23
struct evl_version {
int api_level; /* libevl.so: __EVL__ */

43
include/evl/observable.h Normal file
View File

@ -0,0 +1,43 @@
/*
* SPDX-License-Identifier: MIT
*
* Copyright (C) 2020 Philippe Gerum <rpm@xenomai.org>
*/
#ifndef _EVL_OBSERVABLE_H
#define _EVL_OBSERVABLE_H
#include <time.h>
#include <stdint.h>
#include <evl/syscall.h>
#include <uapi/evl/observable.h>
#include <uapi/evl/factory.h>
struct evl_notification {
uint32_t tag;
uint32_t serial;
int32_t issuer;
union evl_value event;
struct timespec date;
};
#define evl_new_observable(__fmt, __args...) \
evl_create_observable(EVL_CLONE_PRIVATE, __fmt, ##__args)
#ifdef __cplusplus
extern "C" {
#endif
int evl_create_observable(int flags, const char *fmt, ...);
int evl_update_observable(int ofd, const struct evl_notice *ntc,
int nr);
int evl_read_observable(int ofd, struct evl_notification *nf,
int nr);
#ifdef __cplusplus
}
#endif
#endif /* _EVL_OBSERVABLE_H */

View File

@ -19,12 +19,14 @@ extern "C" {
int evl_new_poll(void);
int evl_add_pollfd(int efd, int newfd,
unsigned int events);
unsigned int events,
union evl_value pollval);
int evl_del_pollfd(int efd, int delfd);
int evl_mod_pollfd(int efd, int modfd,
unsigned int events);
unsigned int events,
union evl_value pollval);
int evl_timedpoll(int efd, struct evl_poll_event *pollset,
int nrset, const struct timespec *timeout);

View File

@ -76,6 +76,12 @@ int evl_set_thread_mode(int efd, int mask,
int evl_clear_thread_mode(int efd, int mask,
int *oldmask);
int evl_subscribe(int ofd,
unsigned int backlog_count,
int flags);
int evl_unsubscribe(int ofd);
#ifdef __cplusplus
}
#endif

View File

@ -48,8 +48,7 @@ static int init_event_vargs(struct evl_event *evt,
attrs.protocol = EVL_EVENT_GATED;
attrs.clockfd = clockfd;
attrs.initval = 0;
efd = create_evl_element(EVL_MONITOR_DEV, name, &attrs,
flags & EVL_CLONE_MASK, &eids);
efd = create_evl_element(EVL_MONITOR_DEV, name, &attrs, flags, &eids);
free(name);
if (efd < 0)
return efd;

View File

@ -49,8 +49,7 @@ int evl_create_flags(struct evl_flags *flg, int clockfd,
attrs.protocol = EVL_EVENT_MASK;
attrs.clockfd = clockfd;
attrs.initval = initval;
efd = create_evl_element(EVL_MONITOR_DEV, name, &attrs,
flags & EVL_CLONE_MASK, &eids);
efd = create_evl_element(EVL_MONITOR_DEV, name, &attrs, flags, &eids);
free(name);
if (efd < 0)
return efd;

View File

@ -28,6 +28,21 @@ static void lart_once(void)
pthread_once(&lart_once, do_lart_once);
}
static int flip_fd_flags(int efd, int cmd, int flags)
{
int ret;
ret = fcntl(efd, cmd == F_SETFD ? F_GETFD : F_GETFL, 0);
if (ret < 0)
return -errno;
ret = fcntl(efd, cmd, ret | flags);
if (ret)
return -errno;
return 0;
}
/*
* Creating an EVL element is done in the following steps:
*
@ -51,6 +66,12 @@ int create_evl_element(const char *type, const char *name,
char *fdevname, *edevname = NULL;
struct evl_clone_req clone;
int ffd, efd, ret;
bool nonblock;
nonblock = !!(clone_flags & EVL_CLONE_NONBLOCK);
/* Strip off user-only bits. */
clone_flags &= EVL_CLONE_MASK;
clone_flags &= ~EVL_CLONE_NONBLOCK;
ret = asprintf(&fdevname, "/dev/evl/%s/clone", type);
if (ret < 0)
@ -101,16 +122,14 @@ int create_evl_element(const char *type, const char *name,
efd = clone.efd;
}
ret = fcntl(efd, F_GETFD, 0);
if (ret < 0) {
ret = -errno;
ret = flip_fd_flags(efd, F_SETFD, O_CLOEXEC);
if (ret)
goto out_element;
}
ret = fcntl(efd, F_SETFD, ret | O_CLOEXEC);
if (ret) {
ret = -errno;
goto out_element;
if (nonblock) {
ret = flip_fd_flags(efd, F_SETFL, O_NONBLOCK);
if (ret)
goto out_element;
}
if (eids)

View File

@ -62,8 +62,7 @@ static int init_mutex_vargs(struct evl_mutex *mutex,
attrs.protocol = protocol;
attrs.clockfd = clockfd;
attrs.initval = ceiling;
efd = create_evl_element(EVL_MONITOR_DEV, name, &attrs,
flags & EVL_CLONE_MASK, &eids);
efd = create_evl_element(EVL_MONITOR_DEV, name, &attrs, flags, &eids);
free(name);
if (efd < 0)
return efd;

114
lib/observable.c Normal file
View File

@ -0,0 +1,114 @@
/*
* SPDX-License-Identifier: MIT
*
* Copyright (C) 2020 Philippe Gerum <rpm@xenomai.org>
*/
#include <errno.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <evl/compiler.h>
#include <evl/thread.h>
#include <evl/observable.h>
#include <evl/syscall.h>
#include "internal.h"
int evl_create_observable(int flags, const char *fmt, ...)
{
int ret, efd;
va_list ap;
char *name;
va_start(ap, fmt);
ret = vasprintf(&name, fmt, ap);
va_end(ap);
if (ret < 0)
return -ENOMEM;
efd = create_evl_element(EVL_OBSERVABLE_DEV, name, NULL, flags, NULL);
free(name);
return efd;
}
static bool wants_oob_io(void)
{
/*
* Only non-EVL threads or members of the SCHED_WEAK class
* should call in-band.
*/
if (evl_current == EVL_NO_HANDLE)
return false;
return !(evl_get_current_mode() & T_WEAK);
}
int evl_update_observable(int ofd, const struct evl_notice *ntc, int nr)
{
ssize_t ret;
if (!wants_oob_io())
ret = write(ofd, ntc, nr * sizeof(*ntc));
else
ret = oob_write(ofd, ntc, nr * sizeof(*ntc));
if (ret < 0)
return errno == EAGAIN ? 0 : -errno;
return ret / sizeof(*ntc);
}
static ssize_t do_read(int ofd, struct evl_notification *nf, int nr,
ssize_t (*readfn)(int ofd, void *buf, size_t count))
{
struct __evl_notification _nf __maybe_unused;
ssize_t ret, _ret __maybe_unused;
/*
* This mess is exclusively intended not to expose the
* __evl_timespec type embedded into the __evl_notification
* descriptor to users. Legacy 32bit systems with
* Y0238-unsafe C libraries have to pay a price for this, by
* reading every notification one after another instead of
* pulling a bulk - this stupidly trivial way seems acceptable
* for those platforms. For all others, struct __evl_timespec
* used in kernel space and timespec in userland have the same
* memory layout, so we may read the notifications in one gulp
* directly into the user buffer.
*/
#if __WORDSIZE == 64 || __TIMESIZE == 64
ret = readfn(ofd, nf, nr * sizeof(*nf));
#else
ret = 0;
while (nr-- > 0) {
_ret = readfn(ofd, &_nf, sizeof(_nf));
if (_ret <= 0)
return ret ?: _ret;
nf->tag = _nf.tag;
nf->serial = _nf.serial;
nf->issuer = _nf.issuer;
nf->event = _nf.event;
nf->date.tv_sec = (long)_nf.date.tv_sec;
nf->date.tv_nsec = _nf.date.tv_nsec;
ret += sizeof(*nf);
nf++;
}
#endif
return ret;
}
int evl_read_observable(int ofd, struct evl_notification *nf, int nr)
{
ssize_t ret;
if (!wants_oob_io())
ret = do_read(ofd, nf, nr, read);
else
ret = do_read(ofd, nf, nr, oob_read);
if (ret < 0)
return -errno;
return ret / sizeof(*nf);
}

View File

@ -18,7 +18,8 @@ int evl_new_poll(void)
return create_evl_file(EVL_POLL_DEV);
}
static int update_pollset(int efd, int op, int fd, unsigned int events)
static int update_pollset(int efd, int op, int fd, unsigned int events,
union evl_value pollval)
{
struct evl_poll_ctlreq creq;
int ret;
@ -26,24 +27,27 @@ static int update_pollset(int efd, int op, int fd, unsigned int events)
creq.action = op;
creq.fd = fd;
creq.events = events;
creq.pollval = pollval;
ret = oob_ioctl(efd, EVL_POLIOC_CTL, &creq);
return ret ? -errno : 0;
}
int evl_add_pollfd(int efd, int fd, unsigned int events)
int evl_add_pollfd(int efd, int fd, unsigned int events,
union evl_value pollval)
{
return update_pollset(efd, EVL_POLL_CTLADD, fd, events);
return update_pollset(efd, EVL_POLL_CTLADD, fd, events, pollval);
}
int evl_del_pollfd(int efd, int fd)
{
return update_pollset(efd, EVL_POLL_CTLDEL, fd, 0);
return update_pollset(efd, EVL_POLL_CTLDEL, fd, 0, evl_nil);
}
int evl_mod_pollfd(int efd, int fd, unsigned int events)
int evl_mod_pollfd(int efd, int fd,
unsigned int events, union evl_value pollval)
{
return update_pollset(efd, EVL_POLL_CTLMOD, fd, events);
return update_pollset(efd, EVL_POLL_CTLMOD, fd, events, pollval);
}
static int do_poll(int efd, struct evl_poll_event *pollset,

View File

@ -49,8 +49,7 @@ int evl_create_proxy(int targetfd, size_t bufsz, size_t granularity,
attrs.fd = targetfd;
attrs.bufsz = bufsz;
attrs.granularity = granularity;
efd = create_evl_element(EVL_PROXY_DEV, name, &attrs,
flags & EVL_CLONE_MASK, NULL);
efd = create_evl_element(EVL_PROXY_DEV, name, &attrs, flags, NULL);
free(name);
return efd;

View File

@ -48,8 +48,7 @@ int evl_create_sem(struct evl_sem *sem, int clockfd,
attrs.protocol = EVL_EVENT_COUNT;
attrs.clockfd = clockfd;
attrs.initval = initval;
efd = create_evl_element(EVL_MONITOR_DEV, name, &attrs,
flags & EVL_CLONE_MASK, &eids);
efd = create_evl_element(EVL_MONITOR_DEV, name, &attrs, flags, &eids);
free(name);
if (efd < 0)
return efd;

View File

@ -19,6 +19,7 @@
#include <linux/types.h>
#include <uapi/evl/factory.h>
#include <uapi/evl/control.h>
#include <uapi/evl/observable.h>
#include "internal.h"
__thread __attribute__ ((tls_model (EVL_TLS_MODEL)))
@ -81,8 +82,7 @@ int evl_attach_thread(int flags, const char *fmt, ...)
if (ret < 0)
return -ENOMEM;
efd = create_evl_element(EVL_THREAD_DEV, name, NULL,
flags & EVL_CLONE_MASK, &eids);
efd = create_evl_element(EVL_THREAD_DEV, name, NULL, flags, &eids);
free(name);
if (efd < 0)
return efd;
@ -234,3 +234,28 @@ int evl_clear_thread_mode(int efd, int mask, int *oldmask)
{
return do_thread_mode(efd, EVL_THRIOC_CLEAR_MODE, mask, oldmask);
}
int evl_subscribe(int efd, unsigned int backlog_count, int flags)
{
struct evl_subscription sub;
int ret;
sub.backlog_count = backlog_count;
sub.flags = flags;
ret = ioctl(efd, EVL_OBSIOC_SUBSCRIBE, &sub);
if (ret && errno == ENOTTY)
return -EPERM;
return ret ? -errno : 0;
}
int evl_unsubscribe(int efd)
{
int ret;
ret = ioctl(efd, EVL_OBSIOC_UNSUBSCRIBE);
if (ret && errno == ENOTTY)
return -EPERM;
return ret ? -errno : 0;
}

View File

@ -26,8 +26,7 @@ int evl_create_xbuf(size_t i_bufsz, size_t o_bufsz,
attrs.i_bufsz = i_bufsz;
attrs.o_bufsz = o_bufsz;
efd = create_evl_element(EVL_XBUF_DEV, name, &attrs,
flags & EVL_CLONE_MASK, NULL);
efd = create_evl_element(EVL_XBUF_DEV, name, &attrs, flags, NULL);
free(name);
return efd;

View File

@ -0,0 +1,15 @@
/*
* SPDX-License-Identifier: MIT
*
* COMPILE-TESTING ONLY.
*/
#include <evl/observable.h>
int main(int argc, char *argv[])
{
evl_new_observable("test");
evl_create_observable(EVL_CLONE_PRIVATE, "test");
return 0;
}

View File

@ -14,9 +14,9 @@ int main(int argc, char *argv[])
int efd;
efd = evl_new_poll();
evl_add_pollfd(efd, 1, POLLIN);
evl_add_pollfd(efd, 1, POLLIN, evl_nil);
evl_del_pollfd(efd, 1);
evl_mod_pollfd(efd, 1, POLLOUT);
evl_mod_pollfd(efd, 1, POLLOUT, evl_nil);
evl_read_clock(EVL_CLOCK_MONOTONIC, &timeout);
evl_poll(efd, &pollset, 1);
evl_timedpoll(efd, &pollset, 1, &timeout);

View File

@ -24,6 +24,8 @@ int main(int argc, char *argv[])
evl_demote_thread(tfd);
evl_set_thread_mode(tfd, 0, NULL);
evl_clear_thread_mode(tfd, 0, NULL);
evl_subscribe(tfd, 1024, 0);
evl_unsubscribe(tfd);
return 0;
}

205
tests/observable-inband.c Normal file
View File

@ -0,0 +1,205 @@
/*
* SPDX-License-Identifier: MIT
*/
#include <sys/types.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sched.h>
#include <pthread.h>
#include <errno.h>
#include <error.h>
#include <getopt.h>
#include <evl/thread.h>
#include <evl/sched.h>
#include <evl/clock.h>
#include <evl/observable.h>
#include <evl/sem.h>
#include "helpers.h"
#define MAX_THREADS 8
#define BACKLOG_DEPTH 4096
#define TERMINATOR 0xa5a5a5a5a5a5a5a5ULL
#define LOW_PRIO 1
static int nrthreads = 1;
static bool verbose;
static int observable_fd;
static struct evl_sem ready;
static struct evl_notice next_states[] = {
[0] = {
.tag = EVL_NOTICE_USER,
.event = {
.lval = 0xa1a2a3a4a5bbccddULL,
},
},
[1] = {
.tag = EVL_NOTICE_USER + 1,
.event = {
.lval = 0xb1b2b3b4b5ffeeddULL,
},
},
[2] = {
.tag = EVL_NOTICE_USER + 2,
.event = {
.lval = 0xc1c2c3c4c5ccaabbULL,
}
},
[3] = {
.tag = EVL_NOTICE_USER + 3,
.event = {
.lval = 0xffffffffeeeeeeeeULL,
}
},
};
#define NR_UPDATES (sizeof(next_states) / sizeof(next_states[0]))
#define do_trace(__fmt, __args...) \
do { \
if (verbose) \
evl_printf(__fmt "\n", ##__args); \
} while (0)
static void usage(void)
{
fprintf(stderr, "usage: observable-inband [options]:\n");
fprintf(stderr, "-n --num-threads number of observer threads\n");
fprintf(stderr, "-l --message-loops number of message loops\n");
fprintf(stderr, "-v --verbose turn on verbosity\n");
}
#define short_optlist "vn:l:"
static const struct option options[] = {
{
.name = "num-threads",
.has_arg = required_argument,
.val = 'n',
},
{
.name = "message-loops",
.has_arg = required_argument,
.val = 'l',
},
{
.name = "verbose",
.has_arg = no_argument,
.val = 'v',
},
{ /* Sentinel */ }
};
static void *observer_thread(void *arg)
{
int serial = (int)(long)arg, n = 0;
struct evl_notification nf;
ssize_t ret;
do_trace("in-band observer #%d started", serial);
__Fcall_assert(ret, evl_read_observable(observable_fd, &nf, 1));
__Texpr_assert(ret == -ENXIO);
/*
* Don't attach, we want to make sure that a plain thread can
* read an observable.
*/
__Tcall_assert(ret, evl_subscribe(observable_fd, BACKLOG_DEPTH, 0));
evl_put_sem(&ready);
for (;;) {
ret = evl_read_observable(observable_fd, &nf, 1);
do_trace("[%d] msg from pid=%d, at %ld.%ld, tag=%u, state=%llx",
serial, nf.issuer, nf.date.tv_sec, nf.date.tv_nsec,
nf.tag, nf.event.lval);
if (nf.event.lval == TERMINATOR)
break;
__Texpr_assert(next_states[n].tag == nf.tag);
__Texpr_assert(next_states[n].event.lval == nf.event.lval);
n = (n + 1) % (sizeof(next_states) / sizeof(next_states[0]));
}
do_trace("in-band observer #%d done", serial);
return NULL;
}
int main(int argc, char *argv[])
{
int tfd, ofd, n, c, loops = 1000, throttle;
struct evl_notice terminator;
pthread_t tid[MAX_THREADS];
ssize_t ret;
for (;;) {
c = getopt_long(argc, argv, short_optlist, options, NULL);
if (c == EOF)
break;
switch (c) {
case 0:
break;
case 'v':
verbose = true;
break;
case 'n':
nrthreads = atoi(optarg);
break;
case 'l':
loops = atoi(optarg);
break;
case '?':
default:
usage();
return 1;
}
}
if (optind < argc) {
usage();
return 1;
}
if (nrthreads <= 0)
nrthreads = 3;
if (nrthreads > MAX_THREADS)
error(1, EINVAL, "max %d threads", MAX_THREADS);
__Tcall_assert(tfd, evl_attach_self("observable-inband:%d", getpid()));
__Tcall_assert(ret, evl_new_sem(&ready, "observable-inband-ready:%d", getpid()));
__Tcall_assert(ofd, evl_new_observable("observable:%d", getpid()));
observable_fd = ofd;
for (n = 0; n < nrthreads; n++) {
new_thread(tid + n, SCHED_OTHER, 0, observer_thread, (void *)(long)n);
__Tcall_assert(ret, evl_get_sem(&ready));
}
throttle = BACKLOG_DEPTH / (sizeof(next_states) / sizeof(next_states[0]) / 2);
for (n = 0; loops == 0 || n < loops; n++) {
__Tcall_errno_assert(ret,
evl_update_observable(observable_fd, next_states, NR_UPDATES));
if (!(n % throttle))
__Tcall_assert(ret, usleep(10000));
}
terminator.tag = EVL_NOTICE_USER;
terminator.event.lval = TERMINATOR;
__Tcall_errno_assert(ret, evl_update_observable(observable_fd, &terminator, 1));
for (n = 0; n < nrthreads; n++)
pthread_join(tid[n], NULL);
__Tcall_assert(ret, close(observable_fd));
return 0;
}

187
tests/observable-master.c Normal file
View File

@ -0,0 +1,187 @@
/*
* SPDX-License-Identifier: MIT
*/
#include <sys/types.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sched.h>
#include <pthread.h>
#include <errno.h>
#include <error.h>
#include <getopt.h>
#include <evl/thread.h>
#include <evl/sched.h>
#include <evl/clock.h>
#include <evl/observable.h>
#include <evl/sem.h>
#include "helpers.h"
#define MAX_THREADS 5
#define BACKLOG_DEPTH 1024
#define HIGH_PRIO 2
static bool verbose;
static int observable_fd;
static struct evl_sem ready;
static struct evl_notice next_states[] = {
[0] = {
.tag = EVL_NOTICE_USER,
.event = {
.lval = 0xa1a2a3a4a5bbccddULL,
},
},
[1] = {
.tag = EVL_NOTICE_USER + 1,
.event = {
.lval = 0xb1b2b3b4b5ffeeddULL,
},
},
[2] = {
.tag = EVL_NOTICE_USER + 2,
.event = {
.lval = 0xc1c2c3c4c5ccaabbULL,
}
},
[3] = {
.tag = EVL_NOTICE_USER + 3,
.event = {
.lval = 0xd1d2d3d4d5ffeeddULL,
}
},
[4] = {
.tag = EVL_NOTICE_USER + 4,
.event = {
.lval = 0xe1e2e3e4e5010203ULL,
}
},
};
#define do_trace(__fmt, __args...) \
do { \
if (verbose) \
evl_printf(__fmt "\n", ##__args); \
} while (0)
static void usage(void)
{
fprintf(stderr, "usage: observable-master [options]:\n");
fprintf(stderr, "-l --message-loops number of message loops\n");
fprintf(stderr, "-v --verbose turn on verbosity\n");
}
#define short_optlist "vl:"
static const struct option options[] = {
{
.name = "message-loops",
.has_arg = required_argument,
.val = 'l',
},
{
.name = "verbose",
.has_arg = no_argument,
.val = 'v',
},
{ /* Sentinel */ }
};
static void *worker_thread(void *arg)
{
int serial = (int)(long)arg, tfd;
struct evl_notification nf;
ssize_t ret;
do_trace("worker thread #%d started", serial);
__Tcall_assert(tfd, evl_attach_self("oob-observer:%d.%d",
getpid(), serial));
__Tcall_assert(ret, evl_subscribe(observable_fd, BACKLOG_DEPTH, 0));
evl_put_sem(&ready);
/*
* Expect round-robin scheduling of workers if master mode.
*/
for (;;) {
ret = evl_read_observable(observable_fd, &nf, 1);
__Texpr_assert((ret == -EBADF) || ret == 1);
if (ret < 0)
break;
do_trace("[%d] msg from pid=%d, at %ld.%ld, tag=%u, state=%llx",
serial, nf.issuer, nf.date.tv_sec, nf.date.tv_nsec,
nf.tag, nf.event.lval);
__Texpr_assert(next_states[serial].tag == nf.tag);
__Texpr_assert(next_states[serial].event.lval == nf.event.lval);
}
/* Already closed in main(), should fail. */
__Fcall_assert(ret, evl_unsubscribe(observable_fd));
__Texpr_assert(errno == EBADF);
do_trace("worker thread #%d done", serial);
return NULL;
}
int main(int argc, char *argv[])
{
int tfd, ofd, n, c, loops = 1000;
pthread_t tid[MAX_THREADS];
ssize_t ret;
for (;;) {
c = getopt_long(argc, argv, short_optlist, options, NULL);
if (c == EOF)
break;
switch (c) {
case 0:
break;
case 'v':
verbose = true;
break;
case 'l':
loops = atoi(optarg);
break;
case '?':
default:
usage();
return 1;
}
}
if (optind < argc) {
usage();
return 1;
}
__Tcall_assert(tfd, evl_attach_self("observable-master:%d", getpid()));
__Tcall_assert(ret, evl_new_sem(&ready, "observable-master-ready:%d", getpid()));
__Tcall_assert(ofd, evl_create_observable(EVL_CLONE_MASTER,
"observable:%d", getpid()));
observable_fd = ofd;
for (n = 0; n < MAX_THREADS; n++) {
new_thread(tid + n, SCHED_FIFO, HIGH_PRIO, worker_thread, (void *)(long)n);
__Tcall_assert(ret, evl_get_sem(&ready));
}
for (n = 0; loops == 0 || n < loops; n++)
__Tcall_errno_assert(ret, evl_update_observable(observable_fd,
&next_states[n % MAX_THREADS], 1));
__Tcall_assert(ret, close(observable_fd));
for (n = 0; n < MAX_THREADS; n++)
pthread_join(tid[n], NULL);
return 0;
}

View File

@ -0,0 +1,56 @@
/*
* SPDX-License-Identifier: MIT
*/
#include <sys/types.h>
#include <unistd.h>
#include <fcntl.h>
#include <sched.h>
#include <pthread.h>
#include <errno.h>
#include <evl/thread.h>
#include <evl/observable.h>
#include "helpers.h"
int main(int argc, char *argv[])
{
struct evl_notification nf;
struct evl_notice next;
int tfd, ofd;
ssize_t ret;
__Tcall_assert(tfd, evl_attach_self("observable-onchange:%d", getpid()));
__Tcall_assert(ofd, evl_new_observable("observable:%d", getpid()));
__Tcall_assert(ret, evl_subscribe(ofd, 16, EVL_NOTIFY_ONCHANGE));
next.tag = EVL_NOTICE_USER;
/* Send 1, 1, 2 */
next.event.lval = 1ULL;
__Tcall_assert(ret, evl_update_observable(ofd, &next, 1));
__Texpr_assert(ret == 1);
next.event.lval = 1ULL;
__Tcall_assert(ret, evl_update_observable(ofd, &next, 1));
__Texpr_assert(ret == 1);
next.event.lval = 2ULL;
__Tcall_assert(ret, evl_update_observable(ofd, &next, 1));
__Texpr_assert(ret == 1);
/* Receive 1, 2. */
__Tcall_assert(ret, evl_read_observable(ofd, &nf, 1));
__Texpr_assert(ret == 1);
__Texpr_assert(nf.tag == EVL_NOTICE_USER);
__Texpr_assert(nf.event.lval == 1ULL);
__Tcall_assert(ret, evl_read_observable(ofd, &nf, 1));
__Texpr_assert(ret == 1);
__Texpr_assert(nf.tag == EVL_NOTICE_USER);
__Texpr_assert(nf.event.lval == 2ULL);
/* There should be nothing more to be read. */
__Tcall_errno_assert(ret, fcntl(ofd, F_SETFL, O_NONBLOCK));
__Fcall_assert(ret, evl_read_observable(ofd, &nf, 1));
__Texpr_assert(ret == -EAGAIN);
return 0;
}

210
tests/observable-oob.c Normal file
View File

@ -0,0 +1,210 @@
/*
* SPDX-License-Identifier: MIT
*/
#include <sys/types.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sched.h>
#include <pthread.h>
#include <errno.h>
#include <error.h>
#include <getopt.h>
#include <evl/thread.h>
#include <evl/sched.h>
#include <evl/clock.h>
#include <evl/observable.h>
#include <evl/sem.h>
#include "helpers.h"
#define MAX_THREADS 8
#define BACKLOG_DEPTH 1024
#define LOW_PRIO 1
#define HIGH_PRIO 2
static int nrthreads = 1;
static bool verbose;
static int observable_fd;
static struct evl_sem ready;
static struct evl_notice next_states[] = {
[0] = {
.tag = EVL_NOTICE_USER,
.event = {
.lval = 0xa1a2a3a4a5bbccddULL,
},
},
[1] = {
.tag = EVL_NOTICE_USER + 1,
.event = {
.lval = 0xb1b2b3b4b5ffeeddULL,
},
},
[2] = {
.tag = EVL_NOTICE_USER + 2,
.event = {
.lval = 0xc1c2c3c4c5ccaabbULL,
}
},
[3] = {
.tag = EVL_NOTICE_USER + 3,
.event = {
.lval = 0xffffffffeeeeeeeeULL,
}
},
};
#define NR_UPDATES (sizeof(next_states) / sizeof(next_states[0]))
#define do_trace(__fmt, __args...) \
do { \
if (verbose) \
evl_printf(__fmt "\n", ##__args); \
} while (0)
static void usage(void)
{
fprintf(stderr, "usage: observable-oob [options]:\n");
fprintf(stderr, "-n --num-threads number of observer threads\n");
fprintf(stderr, "-l --message-loops number of message loops\n");
fprintf(stderr, "-v --verbose turn on verbosity\n");
}
#define short_optlist "vn:l:"
static const struct option options[] = {
{
.name = "num-threads",
.has_arg = required_argument,
.val = 'n',
},
{
.name = "message-loops",
.has_arg = required_argument,
.val = 'l',
},
{
.name = "verbose",
.has_arg = no_argument,
.val = 'v',
},
{ /* Sentinel */ }
};
static void *observer_thread(void *arg)
{
int serial = (int)(long)arg, tfd, n = 0;
struct evl_sched_attrs attrs;
struct evl_notification nf;
ssize_t ret;
do_trace("out-of-band observer #%d started", serial);
__Tcall_assert(tfd, evl_attach_self("oob-observer:%d.%d",
getpid(), serial));
attrs.sched_policy = SCHED_FIFO;
attrs.sched_priority = HIGH_PRIO;
__Tcall_assert(ret, evl_set_schedattr(tfd, &attrs));
__Tcall_assert(ret, evl_subscribe(observable_fd, BACKLOG_DEPTH, 0));
__Tcall_assert(ret, evl_unsubscribe(observable_fd));
__Tcall_assert(ret, evl_subscribe(observable_fd, BACKLOG_DEPTH, 0));
evl_put_sem(&ready);
for (;;) {
ret = evl_read_observable(observable_fd, &nf, 1);
__Texpr_assert((ret == -EBADF) || ret == 1);
if (ret < 0)
break;
do_trace("[%d] msg from pid=%d, at %ld.%ld, tag=%u, state=%llx",
serial, nf.issuer, nf.date.tv_sec, nf.date.tv_nsec,
nf.tag, nf.event.lval);
__Texpr_assert(next_states[n].tag == nf.tag);
__Texpr_assert(next_states[n].event.lval == nf.event.lval);
n = (n + 1) % NR_UPDATES;
}
/* Already closed in main(), should fail. */
__Fcall_assert(ret, evl_unsubscribe(observable_fd));
__Texpr_assert(errno == EBADF);
do_trace("out-of-band observer #%d done", serial);
return NULL;
}
int main(int argc, char *argv[])
{
int tfd, ofd, n, c, loops = 1000, throttle;
pthread_t tid[MAX_THREADS];
struct sched_param param;
ssize_t ret;
for (;;) {
c = getopt_long(argc, argv, short_optlist, options, NULL);
if (c == EOF)
break;
switch (c) {
case 0:
break;
case 'v':
verbose = true;
break;
case 'n':
nrthreads = atoi(optarg);
break;
case 'l':
loops = atoi(optarg);
break;
case '?':
default:
usage();
return 1;
}
}
if (optind < argc) {
usage();
return 1;
}
if (nrthreads <= 0)
nrthreads = 3;
if (nrthreads > MAX_THREADS)
error(1, EINVAL, "max %d threads", MAX_THREADS);
param.sched_priority = LOW_PRIO;
__Tcall_assert(ret, pthread_setschedparam(pthread_self(), SCHED_FIFO, &param));
__Tcall_assert(tfd, evl_attach_self("observable-oob:%d", getpid()));
__Tcall_assert(ret, evl_new_sem(&ready, "observable-oob-ready:%d", getpid()));
__Tcall_assert(ofd, evl_new_observable("observable:%d", getpid()));
observable_fd = ofd;
for (n = 0; n < nrthreads; n++) {
new_thread(tid + n, SCHED_FIFO, 1, observer_thread, (void *)(long)n);
__Tcall_assert(ret, evl_get_sem(&ready));
}
throttle = BACKLOG_DEPTH / NR_UPDATES / 2;
for (n = 0; loops == 0 || n < loops; n++) {
__Tcall_errno_assert(ret,
evl_update_observable(observable_fd, next_states, NR_UPDATES));
if (!(n % throttle))
__Tcall_assert(ret, evl_usleep(10000));
}
__Tcall_assert(ret, close(observable_fd));
for (n = 0; n < nrthreads; n++)
pthread_join(tid[n], NULL);
return 0;
}

224
tests/observable-race.c Normal file
View File

@ -0,0 +1,224 @@
/*
* SPDX-License-Identifier: MIT
*/
#include <sys/types.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sched.h>
#include <pthread.h>
#include <errno.h>
#include <error.h>
#include <getopt.h>
#include <evl/thread.h>
#include <evl/sched.h>
#include <evl/observable.h>
#include <evl/clock.h>
#include <evl/poll.h>
#include <evl/sem.h>
#include "helpers.h"
#define MAX_THREADS 8
#define LOW_PRIO 1
#define HIGH_PRIO 2
static int nrthreads = 1;
static bool verbose;
static bool send_oob;
static int observable_fd;
static struct evl_sem ready;
static struct evl_notice next_states[] = {
[0] = {
.tag = EVL_NOTICE_USER,
.event = {
.lval = 0xa1a2a3a4a5bbccddULL,
},
},
[1] = {
.tag = EVL_NOTICE_USER + 1,
.event = {
.lval = 0xb1b2b3b4b5ffeeddULL,
},
},
[2] = {
.tag = EVL_NOTICE_USER + 2,
.event = {
.lval = 0xc1c2c3c4c5ccaabbULL,
}
},
};
#define NR_UPDATES (sizeof(next_states) / sizeof(next_states[0]))
#define do_trace(__fmt, __args...) \
do { \
if (verbose) \
evl_printf(__fmt "\n", ##__args); \
} while (0)
static void usage(void)
{
fprintf(stderr, "usage: observable-race [options]:\n");
fprintf(stderr, "-S --send-oob run sender out-of-band\n");
fprintf(stderr, "-n --num-threads number of observer threads\n");
fprintf(stderr, "-l --message-loops number of message loops\n");
fprintf(stderr, "-v --verbose turn on verbosity\n");
}
#define short_optlist "vn:Sl:"
static const struct option options[] = {
{
.name = "num-threads",
.has_arg = required_argument,
.val = 'n',
},
{
.name = "message-loops",
.has_arg = required_argument,
.val = 'l',
},
{
.name = "send-oob",
.has_arg = no_argument,
.val = 'S',
},
{
.name = "verbose",
.has_arg = no_argument,
.val = 'v',
},
{ /* Sentinel */ }
};
static void *observer_thread(void *arg)
{
int serial = (int)(long)arg, tfd, n;
struct evl_sched_attrs attrs;
struct evl_notification nf;
ssize_t ret;
do_trace("observer #%d started", serial);
__Tcall_assert(tfd, evl_attach_self("race-observer:%d.%d",
getpid(), serial));
attrs.sched_policy = SCHED_FIFO;
attrs.sched_priority = HIGH_PRIO;
__Tcall_assert(ret, evl_set_schedattr(tfd, &attrs));
__Tcall_assert(ret, evl_subscribe(observable_fd, 1024, 0));
evl_put_sem(&ready);
for (n = 0; ; n++) {
__Tcall_assert(ret, evl_read_observable(observable_fd, &nf, 1));
__Texpr_assert(ret == 1);
do_trace("[%d] msg from pid=%d, at %ld.%ld, tag=%u, state=%llx",
serial, nf.issuer, nf.date.tv_sec, nf.date.tv_nsec,
nf.tag, nf.event.lval);
if (!(n % 100)) {
__Tcall_assert(ret, evl_unsubscribe(observable_fd));
do_trace("[%d] round=%d UNSUBSCRIBED", serial, n);
__Tcall_assert(ret, evl_subscribe(observable_fd, 16, 0));
do_trace("[%d] round=%d SUBSCRIBED", serial, n);
}
}
return NULL;
}
int main(int argc, char *argv[])
{
int tfd, ofd, pfd, n, c, loops = 100000, ret;
struct evl_poll_event pollset;
pthread_t tid[MAX_THREADS];
struct sched_param param;
for (;;) {
c = getopt_long(argc, argv, short_optlist, options, NULL);
if (c == EOF)
break;
switch (c) {
case 0:
break;
case 'S':
send_oob = true;
break;
case 'v':
verbose = true;
break;
case 'n':
nrthreads = atoi(optarg);
break;
case 'l':
loops = atoi(optarg);
break;
case '?':
default:
usage();
return 1;
}
}
if (optind < argc) {
usage();
return 1;
}
if (nrthreads <= 0)
nrthreads = 3;
if (nrthreads > MAX_THREADS)
error(1, EINVAL, "max %d threads", MAX_THREADS);
if (send_oob) {
param.sched_priority = LOW_PRIO;
__Tcall_assert(ret, pthread_setschedparam(pthread_self(), SCHED_FIFO, &param));
do_trace("sender runs out-of-band");
}
__Tcall_assert(tfd, evl_attach_self("observable-race:%d", getpid()));
__Tcall_assert(ret, evl_new_sem(&ready, "observable-race-ready:%d", getpid()));
__Tcall_assert(ofd, evl_new_observable("observable:%d", getpid()));
observable_fd = ofd;
__Tcall_assert(pfd, evl_new_poll());
__Tcall_assert(ret, evl_add_pollfd(pfd, ofd, POLLOUT, evl_nil));
for (n = 0; n < nrthreads; n++) {
new_thread(tid + n, SCHED_FIFO, 1, observer_thread, (void *)(long)n);
__Tcall_assert(ret, evl_get_sem(&ready));
}
for (n = 0; loops == 0 || n < loops; n++) {
ret = evl_update_observable(observable_fd, next_states, NR_UPDATES);
do_trace("round=%d wrote %d notices (errno=%d)",
n, ret, ret < 0 ? errno : 0);
__Texpr_assert(ret >= 0);
if (ret == 0) {
/*
* Throttle output on contention. Writability
* means that at least one observer has buffer
* space to receive at least one message.
*/
__Tcall_assert(ret, evl_poll(pfd, &pollset, 1));
__Texpr_assert(ret == 1);
__Texpr_assert(pollset.events == POLLOUT);
__Texpr_assert(pollset.fd == ofd);
}
}
for (n = 0; n < nrthreads; n++) {
pthread_cancel(tid[n]);
pthread_join(tid[n], NULL);
}
return 0;
}

72
tests/observable-thread.c Normal file
View File

@ -0,0 +1,72 @@
/*
* SPDX-License-Identifier: MIT
*/
#include <sys/types.h>
#include <unistd.h>
#include <fcntl.h>
#include <sched.h>
#include <pthread.h>
#include <errno.h>
#include <evl/thread.h>
#include <evl/observable.h>
#include "helpers.h"
int main(int argc, char *argv[])
{
struct evl_notification nf;
struct evl_notice next;
ssize_t ret;
int tfd;
__Tcall_assert(tfd, evl_attach_self("observable-thread:%d", getpid()));
/* Subscribe to myself, should fail since not observable. */
__Fcall_assert(ret, evl_subscribe(tfd, 16, 0));
__Texpr_assert(ret == -EPERM);
__Tcall_assert(ret, evl_detach_self());
__Tcall_assert(tfd, evl_attach_thread(EVL_CLONE_OBSERVABLE,
"observable-thread:%d", getpid()));
/* Should work this time. */
__Tcall_assert(ret, evl_subscribe(tfd, 16, 0));
next.tag = EVL_NOTICE_USER;
/* Send 1, 2, 3. */
next.event.lval = 1ULL;
__Tcall_errno_assert(ret, evl_update_observable(tfd, &next, 1));
__Texpr_assert(ret == 1);
next.event.lval = 2ULL;
__Tcall_errno_assert(ret, evl_update_observable(tfd, &next, 1));
__Texpr_assert(ret == 1);
next.event.lval = 3ULL;
__Tcall_errno_assert(ret, evl_update_observable(tfd, &next, 1));
__Texpr_assert(ret == 1);
/* Try sending a wrong tag, should fail. */
next.tag = EVL_NOTICE_USER - 1;
next.event.lval = 4ULL;
__Fcall_assert(ret, evl_update_observable(tfd, &next, 1));
__Texpr_assert(ret == -EINVAL);
/* Receive the initial sequence. */
__Tcall_assert(ret, evl_read_observable(tfd, &nf, 1));
__Texpr_assert(ret == 1);
__Texpr_assert(nf.tag == EVL_NOTICE_USER);
__Texpr_assert(nf.event.lval == 1ULL);
__Tcall_assert(ret, evl_read_observable(tfd, &nf, 1));
__Texpr_assert(ret == 1);
__Texpr_assert(nf.tag == EVL_NOTICE_USER);
__Texpr_assert(nf.event.lval == 2ULL);
__Tcall_assert(ret, evl_read_observable(tfd, &nf, 1));
__Texpr_assert(ret == 1);
__Texpr_assert(nf.tag == EVL_NOTICE_USER);
__Texpr_assert(nf.event.lval == 3ULL);
__Tcall_errno_assert(ret, fcntl(tfd, F_SETFL, O_NONBLOCK));
__Fcall_assert(ret, evl_read_observable(tfd, &nf, 1));
__Texpr_assert(ret == -EAGAIN);
return 0;
}

View File

@ -40,7 +40,7 @@ int main(int argc, char *argv[])
__Tcall_assert(xfd, evl_new_xbuf(1024, name));
__Tcall_assert(pfd, evl_new_poll());
__Tcall_assert(ret, evl_add_pollfd(pfd, xfd, POLLIN));
__Tcall_assert(ret, evl_add_pollfd(pfd, xfd, POLLIN, evl_nil));
new_thread(&poller, SCHED_FIFO, 1, polling_thread, NULL);

View File

@ -71,9 +71,9 @@ int main(int argc, char *argv[])
__Tcall_assert(ffd, evl_new_flags(&flags, name));
__Tcall_assert(pollfd_in, evl_new_poll());
__Tcall_assert(ret, evl_add_pollfd(pollfd_in, ffd, POLLIN));
__Tcall_assert(ret, evl_add_pollfd(pollfd_in, ffd, POLLIN, evl_nil));
__Tcall_assert(pollfd_out, evl_new_poll());
__Tcall_assert(ret, evl_add_pollfd(pollfd_out, ffd, POLLOUT));
__Tcall_assert(ret, evl_add_pollfd(pollfd_out, ffd, POLLOUT, evl_nil));
for (n = 0; n < NR_RECEIVERS; n++) {
c[n].serial = n;

View File

@ -27,22 +27,22 @@ int main(int argc, char *argv[])
* nesting, only cycles.
*/
__Tcall_assert(pfd1, evl_new_poll());
__Fcall_assert(ret, evl_add_pollfd(pfd1, pfd1, POLLIN));
__Fcall_assert(ret, evl_add_pollfd(pfd1, pfd1, POLLIN, evl_nil));
__Texpr_assert(ret == -ELOOP);
__Tcall_assert(pfd2, evl_new_poll());
__Tcall_assert(ret, evl_add_pollfd(pfd1, pfd2, POLLIN));
__Tcall_assert(ret, evl_add_pollfd(pfd1, pfd2, POLLIN, evl_nil));
__Tcall_assert(pfd3, evl_new_poll());
__Tcall_assert(ret, evl_add_pollfd(pfd2, pfd3, POLLIN));
__Tcall_assert(ret, evl_add_pollfd(pfd2, pfd3, POLLIN, evl_nil));
__Tcall_assert(pfd4, evl_new_poll());
__Tcall_assert(ret, evl_add_pollfd(pfd3, pfd4, POLLIN));
__Tcall_assert(ret, evl_add_pollfd(pfd3, pfd4, POLLIN, evl_nil));
__Tcall_assert(pfd5, evl_new_poll());
__Tcall_assert(ret, evl_add_pollfd(pfd4, pfd5, POLLIN));
__Tcall_assert(ret, evl_add_pollfd(pfd4, pfd5, POLLIN, evl_nil));
__Fcall_assert(ret, evl_add_pollfd(pfd5, pfd1, POLLIN));
__Fcall_assert(ret, evl_add_pollfd(pfd5, pfd1, POLLIN, evl_nil));
__Texpr_assert(ret == -ELOOP);
close(pfd5);

View File

@ -0,0 +1,105 @@
/*
* SPDX-License-Identifier: MIT
*/
#include <sys/types.h>
#include <time.h>
#include <stdbool.h>
#include <unistd.h>
#include <pthread.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <evl/thread.h>
#include <evl/observable.h>
#include <evl/clock.h>
#include <evl/poll.h>
#include <evl/sem.h>
#include "helpers.h"
#define NR_RECEIVERS 1
#define LOW_PRIO 1
#define WRITE_COUNT 1024
static int pollfd_in, ofd;
struct test_context {
int serial;
};
static struct evl_sem ready;
static void *observable_poller(void *arg)
{
struct test_context *p = arg;
struct evl_notification nf;
struct epoll_event evs;
int ret, tfd, n;
__Tcall_assert(tfd, evl_attach_self("observable-poller:%d.%d",
getpid(), p->serial));
__Tcall_assert(ret, evl_subscribe(ofd, WRITE_COUNT, 0));
evl_put_sem(&ready);
for (n = 0; n < WRITE_COUNT; n++) {
__Tcall_assert(ret, epoll_wait(pollfd_in, &evs, 1, -1));
__Texpr_assert(ret == 1);
__Texpr_assert(evs.events == POLLIN);
__Texpr_assert(evs.data.fd == ofd);
__Tcall_assert(ret, evl_read_observable(ofd, &nf, 1));
__Texpr_assert(ret == 1);
__Texpr_assert(nf.tag == EVL_NOTICE_USER);
__Texpr_assert(nf.event.val == 0xa5a5a5a5);
}
return NULL;
}
int main(int argc, char *argv[])
{
struct test_context c[NR_RECEIVERS];
pthread_t pollers[NR_RECEIVERS];
struct sched_param param;
struct evl_notice next;
struct epoll_event ev;
void *status = NULL;
int tfd, ret, n;
param.sched_priority = LOW_PRIO;
__Texpr_assert(pthread_setschedparam(pthread_self(),
SCHED_FIFO, &param) == 0);
/* EVL inherits the inband scheduling params upon attachment. */
__Tcall_assert(tfd, evl_attach_self("poll-observable-inband:%d", getpid()));
__Tcall_assert(ofd, evl_new_observable("observable:%d", getpid()));
__Tcall_assert(ret, evl_new_sem(&ready, "poll-observable-inband-ready:%d", getpid()));
__Tcall_assert(pollfd_in, epoll_create1(0));
ev.events = POLLIN;
ev.data.fd = ofd;
__Tcall_errno_assert(ret, epoll_ctl(pollfd_in, EPOLL_CTL_ADD, ofd, &ev));
for (n = 0; n < NR_RECEIVERS; n++) {
c[n].serial = n;
new_thread(pollers + n, SCHED_OTHER, 0, observable_poller, c + n);
__Tcall_assert(ret, evl_get_sem(&ready));
}
for (n = 0; n < WRITE_COUNT; n++) {
next.tag = EVL_NOTICE_USER;
next.event.val = 0xa5a5a5a5;
__Tcall_assert(ret, evl_update_observable(ofd, &next, 1));
__Texpr_assert(ret == 1);
}
for (n = 0; n < NR_RECEIVERS; n++) {
__Texpr_assert(pthread_join(pollers[n], &status) == 0);
__Texpr_assert(status == NULL);
}
return 0;
}

102
tests/poll-observable-oob.c Normal file
View File

@ -0,0 +1,102 @@
/*
* SPDX-License-Identifier: MIT
*/
#include <sys/types.h>
#include <time.h>
#include <stdbool.h>
#include <unistd.h>
#include <pthread.h>
#include <stdlib.h>
#include <evl/thread.h>
#include <evl/observable.h>
#include <evl/clock.h>
#include <evl/poll.h>
#include <evl/sem.h>
#include "helpers.h"
#define NR_RECEIVERS 1
#define LOW_PRIO 1
#define HIGH_PRIO 2
#define WRITE_COUNT 1024
static int pollfd_in, ofd;
struct test_context {
int serial;
};
static struct evl_sem ready;
static void *observable_poller(void *arg)
{
struct evl_poll_event pollset;
struct test_context *p = arg;
struct evl_notification nf;
int ret, tfd, n;
__Tcall_assert(tfd, evl_attach_self("observable-poller:%d.%d",
getpid(), p->serial));
__Tcall_assert(ret, evl_subscribe(ofd, WRITE_COUNT, 0));
evl_put_sem(&ready);
for (n = 0; n < WRITE_COUNT; n++) {
__Tcall_assert(ret, evl_poll(pollfd_in, &pollset, 1));
__Texpr_assert(ret == 1);
__Texpr_assert(pollset.events == POLLIN);
__Texpr_assert(pollset.fd == ofd);
__Tcall_assert(ret, evl_read_observable(ofd, &nf, 1));
__Texpr_assert(ret == 1);
__Texpr_assert(nf.tag == EVL_NOTICE_USER);
__Texpr_assert(nf.event.val == 0xa5a5a5a5);
}
return NULL;
}
int main(int argc, char *argv[])
{
struct test_context c[NR_RECEIVERS];
pthread_t pollers[NR_RECEIVERS];
struct sched_param param;
struct evl_notice next;
void *status = NULL;
int tfd, ret, n;
param.sched_priority = LOW_PRIO;
__Texpr_assert(pthread_setschedparam(pthread_self(),
SCHED_FIFO, &param) == 0);
/* EVL inherits the inband scheduling params upon attachment. */
__Tcall_assert(tfd, evl_attach_self("poll-observable-oob:%d", getpid()));
__Tcall_assert(ofd, evl_new_observable("observable:%d", getpid()));
__Tcall_assert(ret, evl_new_sem(&ready, "poll-observable-oob-ready:%d", getpid()));
__Tcall_assert(pollfd_in, evl_new_poll());
__Tcall_assert(ret, evl_add_pollfd(pollfd_in, ofd, POLLIN, evl_nil));
for (n = 0; n < NR_RECEIVERS; n++) {
c[n].serial = n;
new_thread(pollers + n, SCHED_FIFO, HIGH_PRIO, observable_poller, c + n);
__Tcall_assert(ret, evl_get_sem(&ready));
}
for (n = 0; n < WRITE_COUNT; n++) {
next.tag = EVL_NOTICE_USER;
next.event.val = 0xa5a5a5a5;
__Tcall_assert(ret, evl_update_observable(ofd, &next, 1));
__Texpr_assert(ret == 1);
}
for (n = 0; n < NR_RECEIVERS; n++) {
__Texpr_assert(pthread_join(pollers[n], &status) == 0);
__Texpr_assert(status == NULL);
}
return 0;
}

View File

@ -69,7 +69,7 @@ int main(int argc, char *argv[])
__Tcall_assert(sfd, evl_new_sem(&sem, name));
__Tcall_assert(pollfd_in, evl_new_poll());
__Tcall_assert(ret, evl_add_pollfd(pollfd_in, sfd, POLLIN));
__Tcall_assert(ret, evl_add_pollfd(pollfd_in, sfd, POLLIN, evl_nil));
for (n = 0; n < NR_RECEIVERS; n++) {
c[n].serial = n;

View File

@ -84,7 +84,7 @@ int main(int argc, char *argv[])
new_thread(&writer, SCHED_OTHER, 0, writer_thread, path);
__Tcall_assert(pfd, evl_new_poll());
__Tcall_assert(ret, evl_add_pollfd(pfd, xfd, POLLIN));
__Tcall_assert(ret, evl_add_pollfd(pfd, xfd, POLLIN, evl_nil));
for (n = 0; n < sizeof(msg) / sizeof(msg[0]) - 1; n++) {
__Tcall_assert(ret, evl_poll(pfd, &pollset, 1));