Fix potential overflow error in CEQs (XCC)
[akaros.git] / user / parlib / ceq.c
index e7566ab..0af89a5 100644 (file)
@@ -26,7 +26,8 @@
 #include <stdlib.h>
 #include <stdio.h>
 
-void ceq_init(struct ceq *ceq, uint8_t op, size_t nr_events, size_t ring_sz)
+void ceq_init(struct ceq *ceq, uint8_t op, unsigned int nr_events,
+              size_t ring_sz)
 {
        /* In case they already had an mbox initialized, cleanup whatever was there
         * so we don't leak memory.  They better not have asked for events before
@@ -122,9 +123,8 @@ static bool extract_ceq_msg(struct ceq *ceq, int32_t idx, struct event_msg *msg)
 bool get_ceq_msg(struct ceq *ceq, struct event_msg *msg)
 {
        int32_t idx = get_ring_idx(ceq);
+
        if (idx == -1) {
-               if (!ceq->ring_overflowed)
-                       return FALSE;
                /* We didn't get anything via the ring, but if we're overflowed, then we
                 * need to look in the array directly.  Note that we only handle
                 * overflow when we failed to get something.  Eventually, we'll deal
@@ -132,37 +132,36 @@ bool get_ceq_msg(struct ceq *ceq, struct event_msg *msg)
                 * are dealing with overflow, the kernel could be producing and using
                 * the ring, and we could have consumers consuming from the ring.
                 *
-                * Overall, we need to clear the overflow flag, make sure the list is
-                * empty, and turn the flag back on if it isn't.  That'll make sure
-                * overflow is set if there's a chance there is a message in the array
-                * that doesn't have an idx in the ring.
-                *
-                * However, if we do that, there's a time when overflow isn't set and
-                * the ring is empty.  A concurrent consumer could think that the ring
-                * is empty, when in fact it isn't.  That's bad, since we could miss a
-                * message (i.e. sleep when we have a message we needed).  So we'll need
-                * to deal with concurrent consumers, and whatever we do will also need
-                * to deal with concurrent conusmers who handle overflow too.  Easiest
-                * thing is to just lock.  If the lock is set, then that also means the
-                * mailbox isn't empty. */
-               spin_pdr_lock((struct spin_pdr_lock*)&ceq->u_lock);
-               /* Check again - someone may have handled it while we were waiting on
-                * the lock */
-               if (!ceq->ring_overflowed) {
-                       spin_pdr_unlock((struct spin_pdr_lock*)&ceq->u_lock);
+                * Overall, we need to clear the overflow flag, then check every event.
+                * If we find an event, we need to make sure the *next* consumer
+                * continues our recovery, hence the overflow_recovery field.  We could
+                * do the check for recovery immediately, but that adds complexity and
+                * there's no stated guarantee of CEQ message ordering (you don't have
+                * it with the ring, either, technically (consider a coalesce)).  So
+                * we're fine by having *a* consumer finish the recovery, but not
+                * necesarily the *next* consumer.  So long as no one thinks the CEQ is
+                * empty when there actually are old messages, then we're okay. */
+               if (!ceq->ring_overflowed && !ceq->overflow_recovery)
                        return FALSE;
+               spin_pdr_lock((struct spin_pdr_lock*)&ceq->u_lock);
+               if (!ceq->overflow_recovery) {
+                       ceq->overflow_recovery = TRUE;
+                       wmb();  /* set recovery before clearing overflow */
+                       ceq->ring_overflowed = FALSE;
+                       ceq->last_recovered = 0;
+                       wrmb(); /* clear overflowed before reading event entries */
                }
-               ceq->ring_overflowed = FALSE;
-               wrmb(); /* clear overflowed before reading event entries */
-               for (int i = 0; i < ceq->nr_events; i++) {
+               for (int i = ceq->last_recovered; i < ceq->nr_events; i++) {
+                       /* Regardles of whether there's a msg here, we checked it. */
+                       ceq->last_recovered++;
                        if (extract_ceq_msg(ceq, i, msg)) {
                                /* We found something.  There might be more, but a future
                                 * consumer will have to deal with it, or verify there isn't. */
-                               ceq->ring_overflowed = TRUE;
                                spin_pdr_unlock((struct spin_pdr_lock*)&ceq->u_lock);
                                return TRUE;
                        }
                }
+               ceq->overflow_recovery = FALSE;
                /* made it to the end, looks like there was no overflow left.  there
                 * could be new ones added behind us (they'd be in the ring or overflow
                 * would be turned on again), but those message were added after we