368 lines
11 KiB
C
368 lines
11 KiB
C
/*
|
|
* SPDX-License-Identifier: MIT
|
|
*
|
|
* Copyright (C) 2019 Philippe Gerum <rpm@xenomai.org>
|
|
*
|
|
* The tube: a lighweight, lockless multi-reader/multi-writer FIFO
|
|
* with a base-offset addressing variant which can work over a memory
|
|
* segment shared between processes (*_rel form).
|
|
*/
|
|
|
|
#ifndef _EVL_TUBE_H
|
|
#define _EVL_TUBE_H
|
|
|
|
#include <stdbool.h>
|
|
#include <stdint.h>
|
|
#include <stddef.h>
|
|
#include <stdlib.h>
|
|
#include <evl/compiler.h>
|
|
#include <evl/atomic.h>
|
|
|
|
/*
|
|
* The tricky one: pulling a canister from the tube. The noticeable
|
|
* part is how we deal with the end-of-queue situation, temporarily
|
|
* snatching the final element - which is not usable for carrying
|
|
* payload and cannot be returned to the caller. We have two
|
|
* requirements:
|
|
*
|
|
* - first is that a queue must contain at least one (final) element,
|
|
* so that tube_push_item() can always store a new message there,
|
|
* adding a free canister at the end of the same queue to compensate
|
|
* for the consumed slot. Therefore, consuming the final element with
|
|
* a NULL ->next pointer is not allowed, which means that observing
|
|
* __desc->head->next == NULL denotes an empty queue.
|
|
*
|
|
* - second is to not interpret or dereference the value of __next_dq
|
|
* until the CAS advancing the head pointer has taken place, so that
|
|
* we don't consume such information which might have been updated in
|
|
* parallel by a concurrent thread which won the CAS race on advancing
|
|
* the same head pointer (i.e. __desc->head = __head_dq->next). We
|
|
* solve this by allowing the head pointer to go NULL shortly,
|
|
* reverting this move only after the CAS has succeeded, which
|
|
* guarantees us that nobody else was granted the same item
|
|
* (__head_dq). Another concurrent pull might fetch a NULL head in the
|
|
* meantime, which also denotes an empty queue (like seeing
|
|
* __head_dq->next == NULL).
|
|
*
|
|
* The only drawback of this scheme is as follows:
|
|
*
|
|
* A: tube_pull() sets head = NULL B: tube_push() stores to final element
|
|
* B: tube_pull() observes empty queue (head == NULL)
|
|
* A: tube_pull() restores head = &final
|
|
*
|
|
* IOW, when a puller reaches the non-removable final element of a
|
|
* queue, a concurrent pusher might still observe an empty queue after
|
|
* a successful push, until the puller eventually restores the head
|
|
* pointer, revealing the new message. Although not perfect, that
|
|
* seems acceptable from the standpoint of the pusher knowing that
|
|
* concurrent pullers are present.
|
|
*/
|
|
#define tube_pull(__desc) \
|
|
({ \
|
|
typeof((__desc)->head) __next_dq, __head_dq; \
|
|
do { \
|
|
__head_dq = atomic_load(&(__desc)->head); \
|
|
if (__head_dq == NULL) \
|
|
break; \
|
|
__next_dq = atomic_load(&(__head_dq)->next); \
|
|
} while (!__sync_bool_compare_and_swap( \
|
|
&(__desc)->head, __head_dq, __next_dq)); \
|
|
if (__head_dq && __next_dq == NULL) { \
|
|
atomic_store(&(__desc)->head, __head_dq); \
|
|
__head_dq = NULL; \
|
|
} \
|
|
__head_dq; \
|
|
})
|
|
|
|
/*
|
|
* The push operation moves the tail to a free canister, which serves
|
|
* as the new final element, returning the former tail element which
|
|
* will convey the payload (CAS ensures that multiple callers cannot
|
|
* get the same element).
|
|
*
|
|
* FIXME: we still have a queue connectivity issue there if preempted
|
|
* indefinitely between the prep and finish ops. Revisit.
|
|
*/
|
|
#define tube_push_prepare(__desc, __free) \
|
|
({ \
|
|
typeof((__desc)->tail) __old_qp; \
|
|
atomic_store(&(__free)->next, NULL); \
|
|
smp_mb(); \
|
|
do { \
|
|
__old_qp = atomic_load(&(__desc)->tail); \
|
|
} while (!__sync_bool_compare_and_swap( \
|
|
&(__desc)->tail, __old_qp, __free)); \
|
|
__old_qp; \
|
|
})
|
|
|
|
/*
|
|
* We may directly compete with tube_push_prepare() on the tail
|
|
* pointer, and indirectly with tube_pull() which may be walking the
|
|
* canister list through the ->next link. A NULL link denotes the end
|
|
* canister, tube_pull() cannot walk past that one, and never dequeues
|
|
* it. Since __new was the previous end canister referred to by the
|
|
* tail pointer, __new->next is NULL until sending is completed.
|
|
*/
|
|
#define tube_push_finish(__desc, __new, __free) \
|
|
do { \
|
|
smp_mb(); \
|
|
atomic_store(&(__new)->next, __free); \
|
|
} while (0)
|
|
|
|
#define tube_push(__desc, __free) \
|
|
do { \
|
|
typeof((__desc)->tail) __new_q; \
|
|
compiler_barrier(); \
|
|
__new_q = tube_push_prepare(__desc, __free); \
|
|
tube_push_finish(__desc, __new_q, __free); \
|
|
} while (0)
|
|
|
|
#define tube_push_item(__desc, __item, __free) \
|
|
do { \
|
|
typeof((__desc)->tail) __new_qi; \
|
|
__new_qi = tube_push_prepare(__desc, __free); \
|
|
__new_qi->payload = (__item); \
|
|
tube_push_finish(__desc, __new_qi, __free); \
|
|
} while (0)
|
|
|
|
#define DECLARE_EVL_CANISTER(__can_struct, __payload) \
|
|
struct __can_struct { \
|
|
struct __can_struct *next; \
|
|
typeof(__payload) payload; \
|
|
}
|
|
|
|
#define DECLARE_CANISTER_QUEUE(__can_struct) \
|
|
struct { \
|
|
struct __can_struct *head; \
|
|
struct __can_struct *tail; \
|
|
struct __can_struct first; \
|
|
}
|
|
|
|
#define CANISTER_QUEUE_INITIALIZER(__name) \
|
|
{ \
|
|
.head = &(__name).first, \
|
|
.tail = &(__name).first, \
|
|
.first = { \
|
|
.next = NULL, \
|
|
}, \
|
|
}
|
|
|
|
#define DECLARE_EVL_TUBE(__name, __can_struct) \
|
|
struct __name { \
|
|
DECLARE_CANISTER_QUEUE(__can_struct) pending; \
|
|
DECLARE_CANISTER_QUEUE(__can_struct) free; \
|
|
long max_items; \
|
|
}
|
|
|
|
#define TUBE_INITIALIZER(__name) \
|
|
{ \
|
|
.pending = CANISTER_QUEUE_INITIALIZER((__name).pending), \
|
|
.free = CANISTER_QUEUE_INITIALIZER((__name).free), \
|
|
.max_items = 0, \
|
|
}
|
|
|
|
#define evl_get_tube_size(__name, __count) \
|
|
({ \
|
|
struct __name *__ptr; \
|
|
sizeof(struct __name) + \
|
|
sizeof(__ptr->free.first) * __count; \
|
|
})
|
|
|
|
#define evl_init_tube(__tube, __freevec, __count) \
|
|
do { \
|
|
typeof((__tube)->pending.tail) __i; \
|
|
int __n; \
|
|
*(__tube) = (typeof(*(__tube))) \
|
|
TUBE_INITIALIZER(*(__tube)); \
|
|
for (__n = 0, __i = (typeof(__i))(__freevec); \
|
|
__n < (__count); __n++, __i++) \
|
|
tube_push(&(__tube)->free, __i); \
|
|
(__tube)->max_items = (__count); \
|
|
} while (0)
|
|
|
|
#define evl_send_tube(__tube, __item) \
|
|
({ \
|
|
bool __ret = false; \
|
|
typeof((__tube)->pending.tail) __new; \
|
|
__new = tube_pull(&(__tube)->free); \
|
|
if (__new) { \
|
|
tube_push_item(&(__tube)->pending, \
|
|
__item, __new); \
|
|
__ret = true; \
|
|
} \
|
|
__ret; \
|
|
})
|
|
|
|
#define evl_receive_tube(__tube, __item) \
|
|
({ \
|
|
bool __ret = false; \
|
|
typeof((__tube)->pending.tail) __next; \
|
|
__next = tube_pull(&(__tube)->pending); \
|
|
if (__next) { \
|
|
__item = __next->payload; \
|
|
tube_push(&(__tube)->free, __next); \
|
|
__ret = true; \
|
|
} \
|
|
__ret; \
|
|
})
|
|
|
|
/*
|
|
* Position-independent variant.
|
|
*/
|
|
|
|
#define tube_push_prepare_rel(__base, __desc, __free) \
|
|
({ \
|
|
typeof((__desc)->tail) __old_qp; \
|
|
uintptr_t __off_qp = __memoff(__base, __free); \
|
|
atomic_store(&(__free)->next, 0); \
|
|
smp_mb(); \
|
|
do { \
|
|
__old_qp = atomic_load(&(__desc)->tail); \
|
|
} while (__sync_val_compare_and_swap( \
|
|
&(__desc)->tail, \
|
|
__old_qp, __off_qp) != __old_qp); \
|
|
__memptr(__base, __old_qp); \
|
|
})
|
|
|
|
/* See tube_pull() for details. */
|
|
#define tube_pull_rel(__base, __desc) \
|
|
({ \
|
|
typeof((__desc)->head) __next_dq, __head_dq; \
|
|
typeof((__desc)->first[0]) *__head_dqptr; \
|
|
do { \
|
|
__head_dq = atomic_load(&(__desc)->head); \
|
|
if (__head_dq == 0) { \
|
|
__head_dqptr = NULL; \
|
|
break; \
|
|
} \
|
|
__head_dqptr = (typeof(__head_dqptr)) \
|
|
__memptr(__base, __head_dq); \
|
|
__next_dq = atomic_load(&(__head_dqptr)->next); \
|
|
} while (!__sync_bool_compare_and_swap( \
|
|
&(__desc)->head, __head_dq, __next_dq)); \
|
|
if (__head_dq && __next_dq == 0) { \
|
|
atomic_store(&(__desc)->head, __head_dq); \
|
|
__head_dqptr = NULL; \
|
|
} \
|
|
__head_dqptr; \
|
|
})
|
|
|
|
#define tube_push_finish_rel(__base, __desc, __new, __free) \
|
|
do { \
|
|
smp_mb(); \
|
|
atomic_store(&(__new)->next, __memoff(__base, __free)); \
|
|
} while (0)
|
|
|
|
#define tube_push_rel(__base, __desc, __free) \
|
|
do { \
|
|
typeof((__desc)->first[0]) *__new_q; \
|
|
compiler_barrier(); \
|
|
__new_q = (typeof(__new_q)) \
|
|
tube_push_prepare_rel(__base, __desc, __free); \
|
|
tube_push_finish_rel(__base, __desc, __new_q, __free); \
|
|
} while (0)
|
|
|
|
#define tube_push_item_rel(__base, __desc, __item, __free) \
|
|
do { \
|
|
typeof((__desc)->first[0]) *__new_qi; \
|
|
__new_qi = (typeof(__new_qi)) \
|
|
tube_push_prepare_rel(__base, __desc, __free); \
|
|
__new_qi->payload = (__item); \
|
|
tube_push_finish_rel(__base, __desc, __new_qi, __free); \
|
|
} while (0)
|
|
|
|
#define DECLARE_EVL_CANISTER_REL(__can_struct, __payload) \
|
|
struct __can_struct { \
|
|
uintptr_t next; \
|
|
typeof(__payload) payload; \
|
|
} \
|
|
|
|
#define DECLARE_CANISTER_QUEUE_REL(__can_struct) \
|
|
struct { \
|
|
uintptr_t head; \
|
|
uintptr_t tail; \
|
|
/* Must NOT be first (non-zero offset) */ \
|
|
struct __can_struct first[1]; \
|
|
}
|
|
|
|
#define DECLARE_EVL_TUBE_REL(__name, __can_struct, __payload) \
|
|
struct __name { \
|
|
DECLARE_CANISTER_QUEUE_REL(__can_struct) pending; \
|
|
DECLARE_CANISTER_QUEUE_REL(__can_struct) free; \
|
|
long max_items; \
|
|
}
|
|
|
|
#define canq_offsetof(__baseoff, __can_struct, __member) \
|
|
({ \
|
|
DECLARE_CANISTER_QUEUE_REL(__can_struct) __tmp; \
|
|
(void)(__tmp); /* Pedantic indirection for C++ */ \
|
|
offsetof(typeof(__tmp), __member) + __baseoff; \
|
|
})
|
|
|
|
#define CANISTER_QUEUE_INITIALIZER_REL(__baseoff, __can_struct) \
|
|
{ \
|
|
.head = canq_offsetof(__baseoff, __can_struct, first), \
|
|
.tail = canq_offsetof(__baseoff, __can_struct, first), \
|
|
.first = { \
|
|
[0] = { \
|
|
.next = 0, \
|
|
}, \
|
|
}, \
|
|
}
|
|
|
|
#define TUBE_INITIALIZER_REL(__name, __can_struct) \
|
|
{ \
|
|
.pending = CANISTER_QUEUE_INITIALIZER_REL(__memoff(&(__name), \
|
|
&(__name).pending), __can_struct), \
|
|
.free = CANISTER_QUEUE_INITIALIZER_REL(__memoff(&(__name), \
|
|
&(__name).free), __can_struct), \
|
|
.max_items = 0, \
|
|
}
|
|
|
|
#define evl_init_tube_rel(__name, __can_struct, __mem, __size) \
|
|
({ \
|
|
struct __name *__tube = (typeof(__tube))(__mem); \
|
|
typeof(__tube->free.first[0]) *__i, *__iend; \
|
|
*__tube = (typeof(*__tube)) \
|
|
TUBE_INITIALIZER_REL(*__tube, __can_struct); \
|
|
__iend = (typeof(__iend))((char *)__mem + __size); \
|
|
for (__i = (typeof(__i))(__tube + 1); __i < __iend; __i++) { \
|
|
tube_push_rel(__tube, &(__tube)->free, __i); \
|
|
} \
|
|
__tube->max_items = __i - &__tube->free.first[1]; \
|
|
__tube; \
|
|
})
|
|
|
|
#define evl_get_tube_size_rel(__name, __count) \
|
|
evl_get_tube_size(__name, __count)
|
|
|
|
#define evl_send_tube_rel(__tube, __item) \
|
|
({ \
|
|
bool __ret = false; \
|
|
typeof((__tube)->pending.first[0]) *__new; \
|
|
__new = (typeof(__new)) \
|
|
tube_pull_rel(__tube, &(__tube)->free); \
|
|
if (__new) { \
|
|
tube_push_item_rel(__tube, \
|
|
&(__tube)->pending, \
|
|
__item, __new); \
|
|
__ret = true; \
|
|
} \
|
|
__ret; \
|
|
})
|
|
|
|
#define evl_receive_tube_rel(__tube, __item) \
|
|
({ \
|
|
bool __ret = false; \
|
|
typeof((__tube)->pending.first[0]) *__next; \
|
|
__next = tube_pull_rel(__tube, &(__tube)->pending); \
|
|
if (__next) { \
|
|
__item = __next->payload; \
|
|
tube_push_rel(__tube, &(__tube)->free, __next); \
|
|
__ret = true; \
|
|
} \
|
|
__ret; \
|
|
})
|
|
|
|
#endif /* _EVL_TUBE_H */
|