parlib: have 2LS libraries #include parlib/stdio.h
[akaros.git] / user / parlib / ceq.c
index bdd933c..3ea5cf7 100644 (file)
@@ -8,33 +8,41 @@
  * User side (consumer).
  *
  * When initializing, the nr_events is the maximum count of events you are
- * tracking, e.g. 100 FDs being tapped, but not the actual FD numbers.
+ * tracking, e.g. 100 FDs being tapped, but not the actual FD numbers.  If you
+ * use a large number, don't worry about memory; the memory is reserved but only
+ * allocated on demand (i.e. mmap without MAP_POPULATE).
  *
  * The ring_sz is a rough guess of the number of concurrent events.  It's not a
  * big deal what you pick, but it must be a power of 2.  Otherwise the kernel
  * will probably scribble over your memory.  If you pick a value that is too
  * small, then the ring may overflow, triggering an O(n) scan of the events
- * array.  You could make it the nearest power of 2 >= nr_events, for reasonable
- * behavior at the expense of memory.  It'll be very rare for the ring to have
- * more entries than the array has events. */
+ * array (where n is the largest event ID ever seen).  You could make it the
+ * nearest power of 2 >= nr_expected_events, for reasonable behavior at the
+ * expense of memory.  It'll be very rare for the ring to have more entries than
+ * the array has events. */
 
 #include <parlib/ceq.h>
 #include <parlib/arch/atomic.h>
 #include <parlib/vcore.h>
 #include <parlib/assert.h>
 #include <parlib/spinlock.h>
+#include <parlib/stdio.h>
 #include <stdlib.h>
-#include <stdio.h>
+#include <sys/mman.h>
 
-void ceq_init(struct ceq *ceq, uint8_t op, size_t nr_events, size_t ring_sz)
+void ceq_init(struct ceq *ceq, uint8_t op, unsigned int nr_events,
+              size_t ring_sz)
 {
-       /* In case they already had an mbox initialized, cleanup whatever was there
-        * so we don't leak memory.  They better not have asked for events before
-        * doing this init call... */
+       /* In case they already had an mbox initialized, cleanup whatever was
+        * there so we don't leak memory.  They better not have asked for events
+        * before doing this init call... */
        ceq_cleanup(ceq);
-       ceq->events = malloc(sizeof(struct ceq_event) * nr_events);
-       memset(ceq->events, 0, sizeof(struct ceq_event) * nr_events);
+       ceq->events = mmap(NULL, sizeof(struct ceq_event) * nr_events,
+                          PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS,
+                          -1, 0);
+       parlib_assert_perror(ceq->events != MAP_FAILED);
        ceq->nr_events = nr_events;
+       atomic_init(&ceq->max_event_ever, 0);
        assert(IS_PWR2(ring_sz));
        ceq->ring = malloc(sizeof(int32_t) * ring_sz);
        memset(ceq->ring, 0xff, sizeof(int32_t) * ring_sz);
@@ -44,7 +52,8 @@ void ceq_init(struct ceq *ceq, uint8_t op, size_t nr_events, size_t ring_sz)
        atomic_init(&ceq->prod_idx, 0);
        atomic_init(&ceq->cons_pub_idx, 0);
        atomic_init(&ceq->cons_pvt_idx, 0);
-       static_assert(sizeof(struct spin_pdr_lock) <= sizeof(ceq->u_lock));
+       parlib_static_assert(sizeof(struct spin_pdr_lock) <=
+                            sizeof(ceq->u_lock));
        spin_pdr_init((struct spin_pdr_lock*)&ceq->u_lock);
 }
 
@@ -56,30 +65,31 @@ static int32_t get_ring_idx(struct ceq *ceq)
 {
        long pvt_idx, prod_idx;
        int32_t ret;
+
        do {
                prod_idx = atomic_read(&ceq->prod_idx);
                pvt_idx = atomic_read(&ceq->cons_pvt_idx);
                if (__ring_empty(prod_idx, pvt_idx))
                        return -1;
        } while (!atomic_cas(&ceq->cons_pvt_idx, pvt_idx, pvt_idx + 1));
-       /* We claimed our slot, which is pvt_idx.  The new cons_pvt_idx is advanced
-        * by 1 for the next consumer.  Now we need to wait on the kernel to fill
-        * the value: */
+       /* We claimed our slot, which is pvt_idx.  The new cons_pvt_idx is
+        * advanced by 1 for the next consumer.  Now we need to wait on the
+        * kernel to fill the value: */
        while ((ret = ceq->ring[pvt_idx & (ceq->ring_sz - 1)]) == -1)
                cpu_relax();
        /* Set the value back to -1 for the next time the slot is used */
        ceq->ring[pvt_idx & (ceq->ring_sz - 1)] = -1;
-       /* We now have our entry.  We need to make sure the pub_idx is updated.  All
-        * consumers are doing this.  We can just wait on all of them to update the
-        * cons_pub to our location, then we update it to the next.
+       /* We now have our entry.  We need to make sure the pub_idx is updated.
+        * All consumers are doing this.  We can just wait on all of them to
+        * update the cons_pub to our location, then we update it to the next.
         *
         * We're waiting on other vcores, but we don't know which one(s). */
        while (atomic_read(&ceq->cons_pub_idx) != pvt_idx)
-               cpu_relax_vc(vcore_id());       /* wait on all of them */
-       /* This is the only time we update cons_pub.  We also know no one else is
-        * updating it at this moment; the while loop acts as a lock, such that
-        * no one gets to this point until pub == their pvt_idx, all of which are
-        * unique. */
+               cpu_relax_any();
+       /* This is the only time we update cons_pub.  We also know no one else
+        * is updating it at this moment; the while loop acts as a lock, such
+        * that no one gets to this point until pub == their pvt_idx, all of
+        * which are unique. */
        /* No rwmb needed, it's the same variable (con_pub) */
        atomic_set(&ceq->cons_pub_idx, pvt_idx + 1);
        return ret;
@@ -97,19 +107,20 @@ static int32_t get_ring_idx(struct ceq *ceq)
 static bool extract_ceq_msg(struct ceq *ceq, int32_t idx, struct event_msg *msg)
 {
        struct ceq_event *ceq_ev = &ceq->events[idx];
+
        if (!ceq_ev->idx_posted)
                return FALSE;
        /* Once we clear this flag, any new coalesces will trigger another ring
-        * event, so we don't need to worry about missing anything.  It is possible
-        * that this CEQ event will get those new coalesces as part of this message,
-        * and future messages will have nothing.  That's fine. */
+        * event, so we don't need to worry about missing anything.  It is
+        * possible that this CEQ event will get those new coalesces as part of
+        * this message, and future messages will have nothing.  That's fine. */
        ceq_ev->idx_posted = FALSE;
        cmb();  /* order the read after the flag write.  swap provides cpu_mb */
        /* We extract the existing coals and reset the collection to 0; now the
         * collected events are in our msg. */
        msg->ev_arg2 = atomic_swap(&ceq_ev->coalesce, 0);
-       /* if the user wants access to user_data, they can peak in the event array
-        * via ceq->events[msg->ev_type].user_data. */
+       /* if the user wants access to user_data, they can peak in the event
+        * array via ceq->events[msg->ev_type].user_data. */
        msg->ev_type = idx;
        msg->ev_arg3 = (void*)ceq_ev->blob_data;
        ceq_ev->blob_data = 0;  /* racy, but there are no blob guarantees */
@@ -122,51 +133,59 @@ static bool extract_ceq_msg(struct ceq *ceq, int32_t idx, struct event_msg *msg)
 bool get_ceq_msg(struct ceq *ceq, struct event_msg *msg)
 {
        int32_t idx = get_ring_idx(ceq);
+
        if (idx == -1) {
-               if (!ceq->ring_overflowed)
-                       return FALSE;
-               /* We didn't get anything via the ring, but if we're overflowed, then we
-                * need to look in the array directly.  Note that we only handle
-                * overflow when we failed to get something.  Eventually, we'll deal
-                * with overflow (which should be very rare).  Also note that while we
-                * are dealing with overflow, the kernel could be producing and using
-                * the ring, and we could have consumers consuming from the ring.
+               /* We didn't get anything via the ring, but if we're overflowed,
+                * then we need to look in the array directly.  Note that we
+                * only handle overflow when we failed to get something.
+                * Eventually, we'll deal with overflow (which should be very
+                * rare).  Also note that while we are dealing with overflow,
+                * the kernel could be producing and using the ring, and we
+                * could have consumers consuming from the ring.
                 *
-                * Overall, we need to clear the overflow flag, make sure the list is
-                * empty, and turn the flag back on if it isn't.  That'll make sure
-                * overflow is set if there's a chance there is a message in the array
-                * that doesn't have an idx in the ring.
-                *
-                * However, if we do that, there's a time when overflow isn't set and
-                * the ring is empty.  A concurrent consumer could think that the ring
-                * is empty, when in fact it isn't.  That's bad, since we could miss a
-                * message (i.e. sleep when we have a message we needed).  So we'll need
-                * to deal with concurrent consumers, and whatever we do will also need
-                * to deal with concurrent conusmers who handle overflow too.  Easiest
-                * thing is to just lock.  If the lock is set, then that also means the
-                * mailbox isn't empty. */
-               spin_pdr_lock((struct spin_pdr_lock*)&ceq->u_lock);
-               /* Check again - someone may have handled it while we were waiting on
-                * the lock */
-               if (!ceq->ring_overflowed) {
-                       spin_pdr_unlock((struct spin_pdr_lock*)&ceq->u_lock);
+                * Overall, we need to clear the overflow flag, then check every
+                * event.  If we find an event, we need to make sure the *next*
+                * consumer continues our recovery, hence the overflow_recovery
+                * field.  We could do the check for recovery immediately, but
+                * that adds complexity and there's no stated guarantee of CEQ
+                * message ordering (you don't have it with the ring, either,
+                * technically (consider a coalesce)).  So we're fine by having
+                * *a* consumer finish the recovery, but not necesarily the
+                * *next* consumer.  So long as no one thinks the CEQ is empty
+                * when there actually are old messages, then we're okay. */
+               if (!ceq->ring_overflowed && !ceq->overflow_recovery)
                        return FALSE;
+               spin_pdr_lock((struct spin_pdr_lock*)&ceq->u_lock);
+               if (!ceq->overflow_recovery) {
+                       ceq->overflow_recovery = TRUE;
+                       /* set recovery before clearing overflow */
+                       wmb();
+                       ceq->ring_overflowed = FALSE;
+                       ceq->last_recovered = 0;
+                       /* clear overflowed before reading event entries */
+                       wrmb();
                }
-               ceq->ring_overflowed = FALSE;
-               wrmb(); /* clear overflowed before reading event entries */
-               for (int i = 0; i < ceq->nr_events; i++) {
+               for (int i = ceq->last_recovered;
+                    i <= atomic_read(&ceq->max_event_ever);
+                    i++) {
+                       /* Regardles of whether there's a msg here, we checked
+                        * it. */
+                       ceq->last_recovered++;
                        if (extract_ceq_msg(ceq, i, msg)) {
-                               /* We found something.  There might be more, but a future
-                                * consumer will have to deal with it, or verify there isn't. */
-                               ceq->ring_overflowed = TRUE;
-                               spin_pdr_unlock((struct spin_pdr_lock*)&ceq->u_lock);
+                               /* We found something.  There might be more, but
+                                * a future consumer will have to deal with it,
+                                * or verify there isn't. */
+                               spin_pdr_unlock(
+                                       (struct spin_pdr_lock*)&ceq->u_lock);
                                return TRUE;
                        }
                }
-               /* made it to the end, looks like there was no overflow left.  there
-                * could be new ones added behind us (they'd be in the ring or overflow
-                * would be turned on again), but those message were added after we
-                * started consuming, and therefore not our obligation to extract. */
+               ceq->overflow_recovery = FALSE;
+               /* made it to the end, looks like there was no overflow left.
+                * there could be new ones added behind us (they'd be in the
+                * ring or overflow would be turned on again), but those message
+                * were added after we started consuming, and therefore not our
+                * obligation to extract. */
                spin_pdr_unlock((struct spin_pdr_lock*)&ceq->u_lock);
                return FALSE;
        }
@@ -196,6 +215,6 @@ bool ceq_is_empty(struct ceq *ceq)
 
 void ceq_cleanup(struct ceq *ceq)
 {
-       free(ceq->events);
+       munmap(ceq->events, sizeof(struct ceq_event) * ceq->nr_events);
        free(ceq->ring);
 }