Fix potential overflow error in CEQs (XCC)
authorBarret Rhoden <brho@cs.berkeley.edu>
Tue, 4 Oct 2016 18:06:06 +0000 (14:06 -0400)
committerBarret Rhoden <brho@cs.berkeley.edu>
Thu, 6 Oct 2016 19:41:48 +0000 (15:41 -0400)
The issue was that a consumer that came in during overflow recovery could
see that there was no overflow and return FALSE, meaning the CEQ was empty,
even though there were older messages.

Consider, the kernel already posted two messages, set overflow, and the
ring is empty:

Thread 1                      Thread 2
--------                      --------
see empty ring                see empty ring
see overflow is on
grab lock
clear overflow
extract a message
                              sees overflow is off
                              returns FALSE
sets overflow
unlocks
returns TRUE

And there's still a message in the CEQ that thread 2 should have grabbed.

While doing this change, I also changed nr_events to an unsigned.  That was
my original intent (based on the usage in epoll), and making the change now
keeps this commit from changing the size of the CEQ, which keeps everyone
from having to rebuild every application.

Reinstall your kernel headers.

Signed-off-by: Barret Rhoden <brho@cs.berkeley.edu>
kern/include/ros/ceq.h
user/parlib/ceq.c
user/parlib/include/parlib/ceq.h

index e4bc535..fa23e7e 100644 (file)
@@ -67,11 +67,13 @@ struct ceq_event {
 
 struct ceq {
        struct ceq_event                        *events;                /* consumer pointer */
-       size_t                                          nr_events;
+       unsigned int                            nr_events;
+       unsigned int                            last_recovered;
        int32_t                                         *ring;                  /* consumer pointer */
        uint32_t                                        ring_sz;                /* size (power of 2) */
        uint8_t                                         operation;              /* e.g. CEQ_OR */
        bool                                            ring_overflowed;
+       bool                                            overflow_recovery;
        atomic_t                                        prod_idx;               /* next prod slot to fill */
        atomic_t                                        cons_pub_idx;   /* how far has been consumed */
        atomic_t                                        cons_pvt_idx;   /* next cons slot to get */
index e7566ab..0af89a5 100644 (file)
@@ -26,7 +26,8 @@
 #include <stdlib.h>
 #include <stdio.h>
 
-void ceq_init(struct ceq *ceq, uint8_t op, size_t nr_events, size_t ring_sz)
+void ceq_init(struct ceq *ceq, uint8_t op, unsigned int nr_events,
+              size_t ring_sz)
 {
        /* In case they already had an mbox initialized, cleanup whatever was there
         * so we don't leak memory.  They better not have asked for events before
@@ -122,9 +123,8 @@ static bool extract_ceq_msg(struct ceq *ceq, int32_t idx, struct event_msg *msg)
 bool get_ceq_msg(struct ceq *ceq, struct event_msg *msg)
 {
        int32_t idx = get_ring_idx(ceq);
+
        if (idx == -1) {
-               if (!ceq->ring_overflowed)
-                       return FALSE;
                /* We didn't get anything via the ring, but if we're overflowed, then we
                 * need to look in the array directly.  Note that we only handle
                 * overflow when we failed to get something.  Eventually, we'll deal
@@ -132,37 +132,36 @@ bool get_ceq_msg(struct ceq *ceq, struct event_msg *msg)
                 * are dealing with overflow, the kernel could be producing and using
                 * the ring, and we could have consumers consuming from the ring.
                 *
-                * Overall, we need to clear the overflow flag, make sure the list is
-                * empty, and turn the flag back on if it isn't.  That'll make sure
-                * overflow is set if there's a chance there is a message in the array
-                * that doesn't have an idx in the ring.
-                *
-                * However, if we do that, there's a time when overflow isn't set and
-                * the ring is empty.  A concurrent consumer could think that the ring
-                * is empty, when in fact it isn't.  That's bad, since we could miss a
-                * message (i.e. sleep when we have a message we needed).  So we'll need
-                * to deal with concurrent consumers, and whatever we do will also need
-                * to deal with concurrent conusmers who handle overflow too.  Easiest
-                * thing is to just lock.  If the lock is set, then that also means the
-                * mailbox isn't empty. */
-               spin_pdr_lock((struct spin_pdr_lock*)&ceq->u_lock);
-               /* Check again - someone may have handled it while we were waiting on
-                * the lock */
-               if (!ceq->ring_overflowed) {
-                       spin_pdr_unlock((struct spin_pdr_lock*)&ceq->u_lock);
+                * Overall, we need to clear the overflow flag, then check every event.
+                * If we find an event, we need to make sure the *next* consumer
+                * continues our recovery, hence the overflow_recovery field.  We could
+                * do the check for recovery immediately, but that adds complexity and
+                * there's no stated guarantee of CEQ message ordering (you don't have
+                * it with the ring, either, technically (consider a coalesce)).  So
+                * we're fine by having *a* consumer finish the recovery, but not
+                * necesarily the *next* consumer.  So long as no one thinks the CEQ is
+                * empty when there actually are old messages, then we're okay. */
+               if (!ceq->ring_overflowed && !ceq->overflow_recovery)
                        return FALSE;
+               spin_pdr_lock((struct spin_pdr_lock*)&ceq->u_lock);
+               if (!ceq->overflow_recovery) {
+                       ceq->overflow_recovery = TRUE;
+                       wmb();  /* set recovery before clearing overflow */
+                       ceq->ring_overflowed = FALSE;
+                       ceq->last_recovered = 0;
+                       wrmb(); /* clear overflowed before reading event entries */
                }
-               ceq->ring_overflowed = FALSE;
-               wrmb(); /* clear overflowed before reading event entries */
-               for (int i = 0; i < ceq->nr_events; i++) {
+               for (int i = ceq->last_recovered; i < ceq->nr_events; i++) {
+                       /* Regardles of whether there's a msg here, we checked it. */
+                       ceq->last_recovered++;
                        if (extract_ceq_msg(ceq, i, msg)) {
                                /* We found something.  There might be more, but a future
                                 * consumer will have to deal with it, or verify there isn't. */
-                               ceq->ring_overflowed = TRUE;
                                spin_pdr_unlock((struct spin_pdr_lock*)&ceq->u_lock);
                                return TRUE;
                        }
                }
+               ceq->overflow_recovery = FALSE;
                /* made it to the end, looks like there was no overflow left.  there
                 * could be new ones added behind us (they'd be in the ring or overflow
                 * would be turned on again), but those message were added after we
index 0789181..49a266a 100644 (file)
@@ -31,7 +31,8 @@ __BEGIN_DECLS
  * probably want to do it yourself. */
 #define CEQ_DEFAULT_SZ 128
 
-void ceq_init(struct ceq *ceq, uint8_t op, size_t nr_events, size_t ring_sz);
+void ceq_init(struct ceq *ceq, uint8_t op, unsigned int nr_events,
+              size_t ring_sz);
 bool get_ceq_msg(struct ceq *ceq, struct event_msg *msg);
 bool ceq_is_empty(struct ceq *ceq);
 void ceq_cleanup(struct ceq *ceq);