Add the CEQ mbox: Coalescing Event Queues (XCC)
authorBarret Rhoden <brho@cs.berkeley.edu>
Wed, 19 Aug 2015 13:49:57 +0000 (09:49 -0400)
committerBarret Rhoden <brho@cs.berkeley.edu>
Mon, 28 Sep 2015 19:14:00 +0000 (15:14 -0400)
Coalescing Event Queues encapsulate the essence of epoll and kqueue in a
shared memory event mailbox: a dense array of stick status bits.

This commit adds the headers, producer, and consumer side of CEQs.

Reinstall your kernel headers.

kern/include/ceq.h [new file with mode: 0644]
kern/include/ros/ceq.h [new file with mode: 0644]
kern/src/Kbuild
kern/src/ceq.c [new file with mode: 0644]
user/parlib/ceq.c [new file with mode: 0644]
user/parlib/include/ceq.h [new file with mode: 0644]

diff --git a/kern/include/ceq.h b/kern/include/ceq.h
new file mode 100644 (file)
index 0000000..b210854
--- /dev/null
@@ -0,0 +1,18 @@
+/* Copyright (c) 2015 Google Inc.
+ * Barret Rhoden <brho@cs.berkeley.edu>
+ * See LICENSE for details.
+ *
+ * Coalescing Event Queue: encapuslates the essence of epoll/kqueue in shared
+ * memory: a dense array of sticky status bits.
+ *
+ * Kernel side (producer) */
+
+#ifndef ROS_KERN_CEQ_H
+#define ROS_KERN_CEQ_H
+
+#include <ros/ceq.h>
+#include <process.h>
+
+void send_ceq_msg(struct ceq *ceq, struct proc *p, struct event_msg *msg);
+
+#endif /* ROS_KERN_CEQ_H */
diff --git a/kern/include/ros/ceq.h b/kern/include/ros/ceq.h
new file mode 100644 (file)
index 0000000..2634c8d
--- /dev/null
@@ -0,0 +1,81 @@
+/* Copyright (c) 2015 Google Inc.
+ * Barret Rhoden <brho@cs.berkeley.edu>
+ * See LICENSE for details.
+ *
+ * Coalescing Event Queue: encapuslates the essence of epoll/kqueue in shared
+ * memory: a dense array of sticky status bits.
+ *
+ * Like all event mailboxes, CEQs are multi-producer, multi-consumer, with the
+ * producer not trusting the consumer.
+ *
+ * The producer sends ceq_events with a certain ID (the ev_type).  Events of a
+ * particular ID are coalesced in one spot, such that N events can occur and the
+ * consumer only receives them as one event.  These are the "sticky status
+ * bits".  For example, setting flags in an int coalesce (you can set the same
+ * flag repeatedly, and it's still set), and an atomic counter coalesces small
+ * increments into a larger value.  The nature of the operation (atomic_or,
+ * atomic_add) depends on a field in the CEQ.  There is also a data blob field,
+ * last-write-wins.  There aren't a lot of guarantees associated with it, but
+ * it's still useful for some apps.
+ *
+ * Internally, a CEQ maintains an array of ceq_event structs for every possible
+ * ID, so the maximum ID should be known at ceq creation time.  These structs
+ * coalesce the events.  To keep the consumer from needing to scan the array for
+ * activity, there is a separate ring buffer that contains the indices of
+ * ceq_events with activity.  This is the "dense array."
+ *
+ * The ring buffer is actually an optimization.  If anything goes wrong, we can
+ * tell the consumer to scan the entire array.  Likewise, spurious entries in
+ * the ring are safe; the consumer just does an extra check.
+ *
+ * In general, every time we have an event, we make sure there's a pointer in
+ * the ring.  That's the purposed of 'idx_posted' - whether or not we think our
+ * index is posted in the ring. */
+
+#ifndef ROS_INC_CEQ_H
+#define ROS_INC_CEQ_H
+
+#include <ros/atomic.h>
+#include <ros/ring_buffer.h>
+
+#define CEQ_OR                                 1
+#define CEQ_ADD                                        2
+
+struct ceq_event {
+       atomic_t                                        coalesce;               /* ev_arg2 */
+       uint64_t                                        blob_data;              /* ev_arg3 */
+       bool                                            idx_posted;             /* for syncing with consumer */
+       uint64_t                                        user_data;              /* for apps, ignored by CEQ */
+};
+
+/* The events array and the ring buffer are provided by the consumer.
+ *
+ * Ring values are -1 for "unconsumed" and an index into *events otherwise.
+ *
+ * Similar to BCQs, the ring buffer must be a power of two and is managed with
+ * three index variables:
+ *
+ * prod_idx:     the next slot to be produced
+ * cons_pvt_idx: the next slot a consumer can claim
+ * cons_pub_idx: the last slot (farthest left / oldest) that hasn't been
+ *               consumed/made ready to be produced by the producer (it is
+ *               what the consumer produces).
+ *
+ * The ring is has no new entries that need consumed when prod == pvt.   The
+ * number of entries filled is prod - pub.  The number of available entries
+ * (nr_empty) is the size - (prod - pub). */
+
+struct ceq {
+       struct ceq_event                        *events;                /* consumer pointer */
+       size_t                                          nr_events;
+       int32_t                                         *ring;                  /* consumer pointer */
+       uint32_t                                        ring_sz;                /* size (power of 2) */
+       uint8_t                                         operation;              /* e.g. CEQ_OR */
+       bool                                            ring_overflowed;
+       atomic_t                                        prod_idx;               /* next prod slot to fill */
+       atomic_t                                        cons_pub_idx;   /* how far has been consumed */
+       atomic_t                                        cons_pvt_idx;   /* next cons slot to get */
+       uint32_t                                        u_lock[2];              /* user space lock */
+};
+
+#endif /* ROS_INC_CEQ_H */
index 51b93eb..ffdefcd 100644 (file)
@@ -4,6 +4,7 @@ obj-y                                           += arsc.o
 obj-y                                          += atomic.o
 obj-y                                          += bitmap.o
 obj-y                                          += blockdev.o
+obj-y                                          += ceq.o
 obj-y                                          += colored_caches.o
 obj-y                                          += console.o
 obj-y                                          += ctype.o
diff --git a/kern/src/ceq.c b/kern/src/ceq.c
new file mode 100644 (file)
index 0000000..1ae6156
--- /dev/null
@@ -0,0 +1,118 @@
+/* Copyright (c) 2015 Google Inc.
+ * Barret Rhoden <brho@cs.berkeley.edu>
+ * See LICENSE for details.
+ *
+ * Coalescing Event Queue: encapuslates the essence of epoll/kqueue in shared
+ * memory: a dense array of sticky status bits.
+ *
+ * Kernel side (producer)
+ *
+ * All of the printks are just us helping the user debug their CEQs. */
+
+#include <ceq.h>
+#include <process.h>
+#include <stdio.h>
+#include <umem.h>
+
+static void error_addr(struct ceq *ceq, struct proc *p, void *addr)
+{
+       printk("[kernel] Invalid ceq (%p) bad addr %p for proc %d\n", ceq,
+              addr, p->pid);
+}
+
+void send_ceq_msg(struct ceq *ceq, struct proc *p, struct event_msg *msg)
+{
+       struct ceq_event *ceq_ev;
+       int32_t *ring_slot;
+       unsigned long my_slot;
+       int loops = 0;
+       #define NR_RING_TRIES 10
+
+       /* should have been checked by the kernel func that called us */
+       assert(is_user_rwaddr(ceq, sizeof(struct ceq)));
+       if (msg->ev_type >= ceq->nr_events) {
+               printk("[kernel] CEQ %p too small.  Wanted %d, had %d\n", ceq,
+                      msg->ev_type, ceq->nr_events);
+               return;
+       }
+       /* ACCESS_ONCE, prevent the compiler from rereading ceq->events later, and
+        * possibly getting a new, illegal version after our check */
+       ceq_ev = &(ACCESS_ONCE(ceq->events))[msg->ev_type];
+       if (!is_user_rwaddr(ceq_ev, sizeof(struct ceq_event))) {
+               error_addr(ceq, p, ceq);
+               return;
+       }
+       /* ideally, we'd like the blob to be posted after the coal, so that the
+        * 'reason' for the blob is present when the blob is.  but we can't
+        * guarantee that.  after we write the coal, the cons could consume that.
+        * then the next time it looks at us, it could just see the blob - so
+        * there's no good way to keep them together.  the user will just have to
+        * deal with it.  in that case, we might as well do it first, to utilize the
+        * atomic ops's memory barrier. */
+       ceq_ev->blob_data = (uint64_t)msg->ev_arg3;
+       switch (ceq->operation) {
+               case (CEQ_OR):
+                       atomic_or(&ceq_ev->coalesce, msg->ev_arg2);
+                       break;
+               case (CEQ_ADD):
+                       atomic_add(&ceq_ev->coalesce, msg->ev_arg2);
+                       break;
+               default:
+                       printk("[kernel] CEQ %p invalid op%d\n", ceq, ceq->operation);
+                       return;
+       }
+       /* write before checking if we need to post (covered by the atomic) */
+       if (ceq_ev->idx_posted) {
+               /* our entry was updated and posted was still set: we know the consumer
+                * will still check it, so we can safely leave.  If we ever have exit
+                * codes or something from send_*_msg, then we can tell the kernel to
+                * not bother with INDIRS/IPIs/etc.  This is unnecessary now since
+                * INDIRs are throttled */
+               return;
+       }
+       /* at this point, we need to make sure the cons looks at our entry.  it may
+        * have already done so while we were mucking around, but 'poking' them to
+        * look again can't hurt */
+       ceq_ev->idx_posted = TRUE;
+       /* idx_posted write happens before the writes posting it.  the following
+        * atomic provides the cpu mb() */
+       cmb();
+       /* I considered checking the buffer for full-ness or the ceq overflow here.
+        * Those would be reads, which would require a wrmb() right above for every
+        * ring post, all for something we check for later anyways and for something
+        * that should be rare.  In return, when we are overflowed, which should be
+        * rare if the user sizes their ring buffer appropriately, we go through a
+        * little more hassle below. */
+       /* I tried doing this with fetch_and_add to avoid the while loop and picking
+        * a number of times to try.  The trick is that you need to back out, and
+        * could have multiple producers working on the same slot.  Although the
+        * overflow makes it okay for the producers idxes to be clobbered, it's not
+        * okay to have two producers on the same slot, since there'd only be one
+        * consumer.  Theoretically, you could have a producer delayed a long time
+        * that just clobbers an index at some point in the future, or leaves an
+        * index in the non-init state (-1).  It's a mess. */
+       do {
+               cmb();  /* reread the indices */
+               my_slot = atomic_read(&ceq->prod_idx);
+               if (__ring_full(ceq->ring_sz, my_slot,
+                               atomic_read(&ceq->cons_pub_idx))) {
+                       ceq->ring_overflowed = TRUE;
+                       return;
+               }
+               if (loops++ == NR_RING_TRIES) {
+                       ceq->ring_overflowed = TRUE;
+                       return;
+               }
+       } while (!atomic_cas(&ceq->prod_idx, my_slot, my_slot + 1));
+       /* ring_slot is a user pointer, calculated by ring, my_slot, and sz */
+       ring_slot = &(ACCESS_ONCE(ceq->ring))[my_slot & (ceq->ring_sz - 1)];
+       if (!is_user_rwaddr(ring_slot, sizeof(int32_t))) {
+               /* This is a serious user error.  We're just bailing out, and any
+                * consumers might be spinning waiting on us to produce.  Probably not
+                * though, since the ring slot is bad memory. */
+               error_addr(ceq, p, ring_slot);
+               return;
+       }
+       /* At this point, we have a valid slot */
+       *ring_slot = msg->ev_type;
+}
diff --git a/user/parlib/ceq.c b/user/parlib/ceq.c
new file mode 100644 (file)
index 0000000..069d9c5
--- /dev/null
@@ -0,0 +1,202 @@
+/* Copyright (c) 2015 Google Inc.
+ * Barret Rhoden <brho@cs.berkeley.edu>
+ * See LICENSE for details.
+ *
+ * Coalescing Event Queue: encapuslates the essence of epoll/kqueue in shared
+ * memory: a dense array of sticky status bits.
+ *
+ * User side (consumer).
+ *
+ * When initializing, the nr_events is the maximum count of events you are
+ * tracking, e.g. 100 FDs being tapped, but not the actual FD numbers.
+ *
+ * The ring_sz is a rough guess of the number of concurrent events.  It's not a
+ * big deal what you pick, but it must be a power of 2.  Otherwise the kernel
+ * will probably scribble over your memory.  If you pick a value that is too
+ * small, then the ring may overflow, triggering an O(n) scan of the events
+ * array.  You could make it the nearest power of 2 >= nr_events, for reasonable
+ * behavior at the expense of memory.  It'll be very rare for the ring to have
+ * more entries than the array has events. */
+
+#include <parlib/ceq.h>
+#include <parlib/arch/atomic.h>
+#include <parlib/vcore.h>
+#include <parlib/rassert.h>
+#include <parlib/spinlock.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <assert.h>
+
+void ceq_init(struct ceq *ceq, uint8_t op, size_t nr_events, size_t ring_sz)
+{
+       /* In case they already had an mbox initialized, cleanup whatever was there
+        * so we don't leak memory.  They better not have asked for events before
+        * doing this init call... */
+       ceq_cleanup(ceq);
+       ceq->events = malloc(sizeof(struct ceq_event) * nr_events);
+       memset(ceq->events, 0, sizeof(struct ceq_event) * nr_events);
+       ceq->nr_events = nr_events;
+       assert(IS_PWR2(ring_sz));
+       ceq->ring = malloc(sizeof(int32_t) * ring_sz);
+       memset(ceq->ring, 0xff, sizeof(int32_t) * ring_sz);
+       ceq->ring_sz = ring_sz;
+       ceq->operation = op;
+       ceq->ring_overflowed = FALSE;
+       atomic_init(&ceq->prod_idx, 0);
+       atomic_init(&ceq->cons_pub_idx, 0);
+       atomic_init(&ceq->cons_pvt_idx, 0);
+       static_assert(sizeof(struct spin_pdr_lock) <= sizeof(ceq->u_lock));
+       spin_pdr_init((struct spin_pdr_lock*)&ceq->u_lock);
+}
+
+/* Helper, returns an index into the events array from the ceq ring.  -1 if the
+ * ring was empty when we looked (could be filled right after we looked).  This
+ * is the same algorithm used with BCQs, but with a magic value (-1) instead of
+ * a bool to track whether or not the slot is ready for consumption. */
+static int32_t get_ring_idx(struct ceq *ceq)
+{
+       long pvt_idx, prod_idx;
+       int32_t ret;
+       do {
+               prod_idx = atomic_read(&ceq->prod_idx);
+               pvt_idx = atomic_read(&ceq->cons_pvt_idx);
+               if (__ring_empty(prod_idx, pvt_idx))
+                       return -1;
+       } while (!atomic_cas(&ceq->cons_pvt_idx, pvt_idx, pvt_idx + 1));
+       /* We claimed our slot, which is pvt_idx.  The new cons_pvt_idx is advanced
+        * by 1 for the next consumer.  Now we need to wait on the kernel to fill
+        * the value: */
+       while ((ret = ceq->ring[pvt_idx & (ceq->ring_sz - 1)]) == -1)
+               cpu_relax();
+       /* Set the value back to -1 for the next time the slot is used */
+       ceq->ring[pvt_idx & (ceq->ring_sz - 1)] = -1;
+       /* We now have our entry.  We need to make sure the pub_idx is updated.  All
+        * consumers are doing this.  We can just wait on all of them to update the
+        * cons_pub to our location, then we update it to the next.
+        *
+        * We're waiting on other vcores, but we don't know which one(s). */
+       while (atomic_read(&ceq->cons_pub_idx) != pvt_idx)
+               cpu_relax_vc(vcore_id());       /* wait on all of them */
+       /* This is the only time we update cons_pub.  We also know no one else is
+        * updating it at this moment; the while loop acts as a lock, such that
+        * no one gets to this point until pub == their pvt_idx, all of which are
+        * unique. */
+       /* No rwmb needed, it's the same variable (con_pub) */
+       atomic_set(&ceq->cons_pub_idx, pvt_idx + 1);
+       return ret;
+}
+
+/* Helper, extracts a message from a ceq[idx], returning TRUE if there was a
+ * message.  Note that there might have been nothing in the message (coal == 0).
+ * still, that counts; it's more about idx_posted.  A concurrent reader could
+ * have swapped out the coal contents (imagine two consumers, each gets past the
+ * idx_posted check).  If having an "empty" coal is a problem, then higher level
+ * software can ask for another event.
+ *
+ * Implied in all of that is that idx_posted is also racy.  The consumer blindly
+ * sets it to false.  So long as it extracts coal after doing so, we're fine. */
+static bool extract_ceq_msg(struct ceq *ceq, int32_t idx, struct event_msg *msg)
+{
+       struct ceq_event *ceq_ev = &ceq->events[idx];
+       if (!ceq_ev->idx_posted)
+               return FALSE;
+       /* Once we clear this flag, any new coalesces will trigger another ring
+        * event, so we don't need to worry about missing anything.  It is possible
+        * that this CEQ event will get those new coalesces as part of this message,
+        * and future messages will have nothing.  That's fine. */
+       ceq_ev->idx_posted = FALSE;
+       cmb();  /* order the read after the flag write.  swap provides cpu_mb */
+       /* We extract the existing coals and reset the collection to 0; now the
+        * collected events are in our msg. */
+       msg->ev_arg2 = atomic_swap(&ceq_ev->coalesce, 0);
+       /* if the user wants access to user_data, they can peak in the event array
+        * via ceq->events[msg->ev_type].user_data. */
+       msg->ev_type = idx;
+       msg->ev_arg3 = (void*)ceq_ev->blob_data;
+       ceq_ev->blob_data = 0;  /* racy, but there are no blob guarantees */
+       return TRUE;
+}
+
+/* Consumer side, returns TRUE on success and fills *msg with the ev_msg.  If
+ * the ceq appears empty, it will return FALSE.  Messages may have arrived after
+ * we started getting that we do not receive. */
+bool get_ceq_msg(struct ceq *ceq, struct event_msg *msg)
+{
+       int32_t idx = get_ring_idx(ceq);
+       if (idx == -1) {
+               if (!ceq->ring_overflowed)
+                       return FALSE;
+               /* We didn't get anything via the ring, but if we're overflowed, then we
+                * need to look in the array directly.  Note that we only handle
+                * overflow when we failed to get something.  Eventually, we'll deal
+                * with overflow (which should be very rare).  Also note that while we
+                * are dealing with overflow, the kernel could be producing and using
+                * the ring, and we could have consumers consuming from the ring.
+                *
+                * Overall, we need to clear the overflow flag, make sure the list is
+                * empty, and turn the flag back on if it isn't.  That'll make sure
+                * overflow is set if there's a chance there is a message in the array
+                * that doesn't have an idx in the ring.
+                *
+                * However, if we do that, there's a time when overflow isn't set and
+                * the ring is empty.  A concurrent consumer could think that the ring
+                * is empty, when in fact it isn't.  That's bad, since we could miss a
+                * message (i.e. sleep when we have a message we needed).  So we'll need
+                * to deal with concurrent consumers, and whatever we do will also need
+                * to deal with concurrent conusmers who handle overflow too.  Easiest
+                * thing is to just lock.  If the lock is set, then that also means the
+                * mailbox isn't empty. */
+               spin_pdr_lock((struct spin_pdr_lock*)&ceq->u_lock);
+               /* Check again - someone may have handled it while we were waiting on
+                * the lock */
+               if (!ceq->ring_overflowed) {
+                       spin_pdr_unlock((struct spin_pdr_lock*)&ceq->u_lock);
+                       return FALSE;
+               }
+               ceq->ring_overflowed = FALSE;
+               wrmb(); /* clear overflowed before reading event entries */
+               for (int i = 0; i < ceq->nr_events; i++) {
+                       if (extract_ceq_msg(ceq, i, msg)) {
+                               /* We found something.  There might be more, but a future
+                                * consumer will have to deal with it, or verify there isn't. */
+                               ceq->ring_overflowed = TRUE;
+                               spin_pdr_unlock((struct spin_pdr_lock*)&ceq->u_lock);
+                               return TRUE;
+                       }
+               }
+               /* made it to the end, looks like there was no overflow left.  there
+                * could be new ones added behind us (they'd be in the ring or overflow
+                * would be turned on again), but those message were added after we
+                * started consuming, and therefore not our obligation to extract. */
+               spin_pdr_unlock((struct spin_pdr_lock*)&ceq->u_lock);
+               return FALSE;
+       }
+       if (!extract_ceq_msg(ceq, idx, msg))
+               return FALSE;
+       return TRUE;
+}
+
+/* pvt_idx is the next slot that a new consumer will try to consume.  when
+ * pvt_idx != pub_idx, pub_idx is lagging, and it represents consumptions in
+ * progress. */
+static bool __ceq_ring_is_empty(struct ceq *ceq)
+{
+       return __ring_empty(atomic_read(&ceq->prod_idx),
+                           atomic_read(&ceq->cons_pvt_idx));
+}
+
+bool ceq_is_empty(struct ceq *ceq)
+{
+       if (!__ceq_ring_is_empty(ceq) ||
+           ceq->ring_overflowed ||
+           spin_pdr_locked((struct spin_pdr_lock*)&ceq->u_lock)) {
+               return FALSE;
+       }
+       return TRUE;
+}
+
+void ceq_cleanup(struct ceq *ceq)
+{
+       free(ceq->events);
+       free(ceq->ring);
+}
diff --git a/user/parlib/include/ceq.h b/user/parlib/include/ceq.h
new file mode 100644 (file)
index 0000000..4860b4d
--- /dev/null
@@ -0,0 +1,42 @@
+/* Copyright (c) 2015 Google Inc.
+ * Barret Rhoden <brho@cs.berkeley.edu>
+ * See LICENSE for details.
+ *
+ * Coalescing Event Queue: encapuslates the essence of epoll/kqueue in shared
+ * memory: a dense array of sticky status bits.
+ *
+ * User side (consumer).
+ *
+ * When initializing, the nr_events is the maximum count of events you are
+ * tracking, e.g. 100 FDs being tapped, but not the actual FD numbers.
+ *
+ * The ring_sz is a rough guess of the number of concurrent events.  It's not a
+ * big deal what you pick, but it must be a power of 2.  Otherwise the kernel
+ * will probably scribble over your memory.  If you pick a value that is too
+ * small, then the ring may overflow, triggering an O(n) scan of the events
+ * array.  You could make it == nr_events, for reasonable behavior at the
+ * expense of memory. */
+
+#ifndef PARLIB_CEQ_H
+#define PARLIB_CEQ_H
+
+#include <ros/ceq.h>
+#include <ros/event.h>
+
+__BEGIN_DECLS
+
+/* If you get a non-raw event queue (with mbox, initialized by event code), then
+ * you'll get a CEQ with 128 events and 128 ring slots with the OR operation.
+ * It's actually doable to have the user grow the CEQ, but we don't have support
+ * for that yet, so just pick a size in advance.  If you're using a CEQ, you'll
+ * probably want to do it yourself. */
+#define CEQ_DEFAULT_SZ 128
+
+void ceq_init(struct ceq *ceq, uint8_t op, size_t nr_events, size_t ring_sz);
+bool get_ceq_msg(struct ceq *ceq, struct event_msg *msg);
+bool ceq_is_empty(struct ceq *ceq);
+void ceq_cleanup(struct ceq *ceq);
+
+__END_DECLS
+
+#endif /* PARLIB_CEQ_H */