345 lines
9.8 KiB
C
345 lines
9.8 KiB
C
#include <rt/queue.h>
|
|
|
|
#include <rt/atomic.h>
|
|
#include <rt/log.h>
|
|
|
|
#include <limits.h>
|
|
#include <string.h>
|
|
|
|
// An empty slot ready to be pushed.
|
|
#define SLOT_EMPTY 0x00U
|
|
|
|
// An empty slot that has been claimed by a pusher.
|
|
#define SLOT_PUSH 0x05U
|
|
|
|
// A full slot that has been claimed by a popper/peeker.
|
|
#define SLOT_POP 0x0AU
|
|
|
|
// An empty slot that has been skipped by a popper.
|
|
#define SLOT_SKIPPED 0x0CU
|
|
|
|
// A full slot ready to be popped.
|
|
#define SLOT_FULL 0x0FU
|
|
|
|
#define SLOT_STATE_MASK 0x0FU
|
|
|
|
#define SLOT_GEN_INCREMENT (1U << RT_QUEUE_STATE_BITS)
|
|
#define SLOT_GEN_MASK (UCHAR_MAX & ~SLOT_STATE_MASK)
|
|
|
|
#define Q_GEN_INCREMENT ((size_t)1 << RT_QUEUE_INDEX_BITS)
|
|
#define Q_INDEX_MASK (Q_GEN_INCREMENT - (size_t)1)
|
|
#define Q_GEN_MASK (~Q_INDEX_MASK)
|
|
#define Q_SGEN_SHIFT (RT_QUEUE_INDEX_BITS - RT_QUEUE_STATE_BITS)
|
|
|
|
static inline unsigned char state(unsigned char slot)
|
|
{
|
|
return slot & SLOT_STATE_MASK;
|
|
}
|
|
|
|
static inline unsigned char sgen(unsigned char slot)
|
|
{
|
|
return slot & SLOT_GEN_MASK;
|
|
}
|
|
|
|
static inline unsigned char sgen_next(unsigned char slot)
|
|
{
|
|
return (unsigned char)(sgen(slot) + SLOT_GEN_INCREMENT);
|
|
}
|
|
|
|
static inline size_t qgen(size_t q)
|
|
{
|
|
return q & Q_GEN_MASK;
|
|
}
|
|
|
|
static inline unsigned char qsgen(size_t q)
|
|
{
|
|
return (unsigned char)(qgen(q) >> Q_SGEN_SHIFT);
|
|
}
|
|
|
|
static inline size_t qindex(size_t q)
|
|
{
|
|
return q & Q_INDEX_MASK;
|
|
}
|
|
|
|
static size_t next(size_t q, size_t num_elems)
|
|
{
|
|
q += 1;
|
|
if (qindex(q) == num_elems)
|
|
{
|
|
return qgen(q + Q_GEN_INCREMENT);
|
|
}
|
|
return q;
|
|
}
|
|
|
|
static void push(struct rt_queue *queue, const void *elem)
|
|
{
|
|
for (;;)
|
|
{
|
|
size_t enq = rt_atomic_load(&queue->enq, RT_ATOMIC_RELAXED);
|
|
size_t last_enq = enq;
|
|
rt_atomic_uchar *slot;
|
|
unsigned char s;
|
|
for (;;)
|
|
{
|
|
slot = &queue->slots[qindex(enq)];
|
|
s = rt_atomic_load(slot, RT_ATOMIC_RELAXED);
|
|
rt_logf("push: slot %zu\n", qindex(enq));
|
|
if ((state(s) == SLOT_EMPTY) && (sgen(s) == qsgen(enq)))
|
|
{
|
|
break;
|
|
}
|
|
const size_t new_enq =
|
|
rt_atomic_load(&queue->enq, RT_ATOMIC_RELAXED);
|
|
if (new_enq != last_enq)
|
|
{
|
|
enq = new_enq;
|
|
last_enq = new_enq;
|
|
}
|
|
else
|
|
{
|
|
enq = next(enq, queue->num_elems);
|
|
}
|
|
}
|
|
|
|
const unsigned char push_s = sgen(s) | SLOT_PUSH;
|
|
if (rt_atomic_compare_exchange(slot, &s, push_s, RT_ATOMIC_RELAXED,
|
|
RT_ATOMIC_RELAXED))
|
|
{
|
|
rt_logf("push: slot %zu claimed...\n", qindex(enq));
|
|
rt_atomic_store(&queue->enq, next(enq, queue->num_elems),
|
|
RT_ATOMIC_RELAXED);
|
|
|
|
unsigned char *const p = queue->data;
|
|
memcpy(&p[queue->elem_size * qindex(enq)], elem, queue->elem_size);
|
|
|
|
s = push_s;
|
|
if (rt_atomic_compare_exchange(slot, &s, sgen(s) | SLOT_FULL,
|
|
RT_ATOMIC_RELEASE,
|
|
RT_ATOMIC_RELAXED))
|
|
{
|
|
break;
|
|
}
|
|
|
|
rt_logf("push: slot %zu skipped...\n", qindex(enq));
|
|
/* If our slot has been skipped by a reader, then restore it
|
|
* back to empty and keep looking. */
|
|
while (!rt_atomic_compare_exchange_weak(slot, &s,
|
|
sgen(s) | SLOT_EMPTY,
|
|
RT_ATOMIC_RELEASE,
|
|
RT_ATOMIC_RELAXED))
|
|
{
|
|
}
|
|
}
|
|
}
|
|
rt_sem_post(&queue->pop_sem);
|
|
}
|
|
|
|
static void pop(struct rt_queue *queue, void *elem)
|
|
{
|
|
for (;;)
|
|
{
|
|
size_t deq = rt_atomic_load(&queue->deq, RT_ATOMIC_RELAXED);
|
|
size_t last_deq = deq;
|
|
rt_atomic_uchar *slot;
|
|
unsigned char s;
|
|
for (;;)
|
|
{
|
|
slot = &queue->slots[qindex(deq)];
|
|
s = rt_atomic_load(slot, RT_ATOMIC_RELAXED);
|
|
rt_logf("pop: slot %zu\n", qindex(deq));
|
|
if (sgen(s) == qsgen(deq))
|
|
{
|
|
if ((state(s) == SLOT_PUSH) || (state(s) == SLOT_SKIPPED))
|
|
{
|
|
const unsigned char skipped_slot =
|
|
sgen_next(s) | SLOT_SKIPPED;
|
|
/* If we encounter an in-progress push, attempt to skip it.
|
|
* The push may have completed in the mean time, in which
|
|
* case we can attempt to pop from the slot. If not, there
|
|
* will be full slots to pop after this slot, because
|
|
* the level allowing poppers to run is only incremented
|
|
* after each push is complete. */
|
|
if (rt_atomic_compare_exchange(slot, &s, skipped_slot,
|
|
RT_ATOMIC_RELAXED,
|
|
RT_ATOMIC_RELAXED))
|
|
{
|
|
rt_logf("pop: slot %zu skipped...\n", qindex(deq));
|
|
}
|
|
}
|
|
if ((state(s) == SLOT_FULL) || (state(s) == SLOT_POP))
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
const size_t new_deq =
|
|
rt_atomic_load(&queue->deq, RT_ATOMIC_RELAXED);
|
|
if (new_deq != last_deq)
|
|
{
|
|
deq = new_deq;
|
|
last_deq = new_deq;
|
|
}
|
|
else
|
|
{
|
|
deq = next(deq, queue->num_elems);
|
|
}
|
|
}
|
|
|
|
unsigned char pop_s = sgen(s) | SLOT_POP;
|
|
if (rt_atomic_compare_exchange(slot, &s, pop_s, RT_ATOMIC_ACQUIRE,
|
|
RT_ATOMIC_RELAXED))
|
|
{
|
|
rt_logf("pop: slot %zu claimed...\n", qindex(deq));
|
|
|
|
const unsigned char *const p = queue->data;
|
|
memcpy(elem, &p[queue->elem_size * qindex(deq)], queue->elem_size);
|
|
|
|
const unsigned char empty_s = sgen_next(s) | SLOT_EMPTY;
|
|
if (rt_atomic_compare_exchange(slot, &pop_s, empty_s,
|
|
RT_ATOMIC_RELAXED,
|
|
RT_ATOMIC_RELAXED))
|
|
{
|
|
rt_atomic_store(&queue->deq, next(deq, queue->num_elems),
|
|
RT_ATOMIC_RELAXED);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
rt_sem_post(&queue->push_sem);
|
|
}
|
|
|
|
static void peek(struct rt_queue *queue, void *elem)
|
|
{
|
|
/* Similar to pop, but don't update the dequeue index and set the slot back
|
|
* to SLOT_FULL after reading it so another popper/peeker can read it. */
|
|
for (;;)
|
|
{
|
|
size_t deq = rt_atomic_load(&queue->deq, RT_ATOMIC_RELAXED);
|
|
size_t last_deq = deq;
|
|
rt_atomic_uchar *slot;
|
|
unsigned char s;
|
|
for (;;)
|
|
{
|
|
slot = &queue->slots[qindex(deq)];
|
|
s = rt_atomic_load(slot, RT_ATOMIC_RELAXED);
|
|
rt_logf("peek: slot %zu\n", qindex(deq));
|
|
if (sgen(s) == qsgen(deq))
|
|
{
|
|
if ((state(s) == SLOT_FULL) || (state(s) == SLOT_POP))
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
const size_t new_deq =
|
|
rt_atomic_load(&queue->deq, RT_ATOMIC_RELAXED);
|
|
if (new_deq != last_deq)
|
|
{
|
|
deq = new_deq;
|
|
last_deq = new_deq;
|
|
}
|
|
else
|
|
{
|
|
deq = next(deq, queue->num_elems);
|
|
}
|
|
}
|
|
|
|
unsigned char pop_s = sgen(s) | SLOT_POP;
|
|
if (rt_atomic_compare_exchange(slot, &s, pop_s, RT_ATOMIC_ACQUIRE,
|
|
RT_ATOMIC_RELAXED))
|
|
{
|
|
rt_logf("peek: slot %zu claimed...\n", qindex(deq));
|
|
|
|
const unsigned char *const p = queue->data;
|
|
memcpy(elem, &p[queue->elem_size * qindex(deq)], queue->elem_size);
|
|
|
|
const unsigned char full_s = sgen_next(s) | SLOT_FULL;
|
|
if (rt_atomic_compare_exchange(slot, &pop_s, full_s,
|
|
RT_ATOMIC_RELAXED,
|
|
RT_ATOMIC_RELAXED))
|
|
{
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
// After peeking, another popper/peeker may run.
|
|
rt_sem_post(&queue->pop_sem);
|
|
}
|
|
|
|
void rt_queue_push(struct rt_queue *queue, const void *elem)
|
|
{
|
|
rt_sem_wait(&queue->push_sem);
|
|
push(queue, elem);
|
|
}
|
|
|
|
void rt_queue_pop(struct rt_queue *queue, void *elem)
|
|
{
|
|
rt_sem_wait(&queue->pop_sem);
|
|
pop(queue, elem);
|
|
}
|
|
|
|
void rt_queue_peek(struct rt_queue *queue, void *elem)
|
|
{
|
|
rt_sem_wait(&queue->pop_sem);
|
|
peek(queue, elem);
|
|
}
|
|
|
|
bool rt_queue_trypush(struct rt_queue *queue, const void *elem)
|
|
{
|
|
if (!rt_sem_trywait(&queue->push_sem))
|
|
{
|
|
return false;
|
|
}
|
|
push(queue, elem);
|
|
return true;
|
|
}
|
|
|
|
bool rt_queue_trypop(struct rt_queue *queue, void *elem)
|
|
{
|
|
if (!rt_sem_trywait(&queue->pop_sem))
|
|
{
|
|
return false;
|
|
}
|
|
pop(queue, elem);
|
|
return true;
|
|
}
|
|
|
|
bool rt_queue_trypeek(struct rt_queue *queue, void *elem)
|
|
{
|
|
if (!rt_sem_trywait(&queue->pop_sem))
|
|
{
|
|
return false;
|
|
}
|
|
peek(queue, elem);
|
|
return true;
|
|
}
|
|
|
|
bool rt_queue_timedpush(struct rt_queue *queue, const void *elem,
|
|
unsigned long ticks)
|
|
{
|
|
if (!rt_sem_timedwait(&queue->push_sem, ticks))
|
|
{
|
|
return false;
|
|
}
|
|
push(queue, elem);
|
|
return true;
|
|
}
|
|
|
|
bool rt_queue_timedpop(struct rt_queue *queue, void *elem, unsigned long ticks)
|
|
{
|
|
if (!rt_sem_timedwait(&queue->pop_sem, ticks))
|
|
{
|
|
return false;
|
|
}
|
|
pop(queue, elem);
|
|
return true;
|
|
}
|
|
|
|
bool rt_queue_timedpeek(struct rt_queue *queue, void *elem, unsigned long ticks)
|
|
{
|
|
if (!rt_sem_timedwait(&queue->pop_sem, ticks))
|
|
{
|
|
return false;
|
|
}
|
|
peek(queue, elem);
|
|
return true;
|
|
}
|