qio: Fix race with multiple blockers
authorBarret Rhoden <brho@cs.berkeley.edu>
Wed, 8 Feb 2017 21:17:54 +0000 (16:17 -0500)
committerBarret Rhoden <brho@cs.berkeley.edu>
Thu, 9 Feb 2017 17:31:00 +0000 (12:31 -0500)
It was possible for multiple readers (or writers) to block on an empty (or
full) queue, but for one of them to never wake up.  This was broken in
commit 9ec41115cb83 ("qio: Clean up locking").

The qlock that used to be there made it so that there was only one
*blocking* reader (or writer) at a time.  That made the usage of Qstarve
(and Qflow) safe as a signal for the other writer (or reader) to wake any
sleepers.  However, with more concurrency due to the lack of the qlock,
that flag isn't sufficient anymore.  You could have two threads both set
Qstarve, then try to sleep.  A waker could come in, see the flag, wake one
of them, but not the other.  Then the other's rendez condition fails (e.g.
the queue is still empty), but Qstarve isn't set.

The right fix for this is to not use Qflow and Qstarve - just check the
rendez if we transitioned an edge.  After all, that's the rendez condition
(e.g., is_empty), not 'is Qflow still set.'  After doing so, it actually
cleans up things a little, since we're firing taps based on the edge
condition too.  Perhaps the old Qstarve/Qflow byte check saved us the
hassle of checking the rendez if we didn't know someone was there, but that
overhead isn't worth the qlock-all-blocking-readers policy.  Yes, the qlock
only applied to *blocking* threads.

Note that kicks may fire more frequently now, since they fire on edge
transitions instead of edge transitions that also had a sleeper.  If this
breaks anything, that thing needs to be changed to handle spurious kicks.
It'd be a bad design otherwise.

Signed-off-by: Barret Rhoden <brho@cs.berkeley.edu>
kern/include/ns.h
kern/src/ns/qio.c

index 3269523..a3256cf 100644 (file)
@@ -631,13 +631,11 @@ struct cmdtab {
 
 /* queue state bits, all can be set in qopen (Qstarve is always set) */
 enum {
-       Qstarve = (1 << 0),                     /* consumer starved */
-       Qmsg = (1 << 1),        /* message stream */
-       Qclosed = (1 << 2),     /* queue has been closed/hungup */
-       Qflow = (1 << 3),       /* producer flow controlled */
-       Qcoalesce = (1 << 4),   /* coalesce empty packets on read */
-       Qkick = (1 << 5),       /* always call the kick routine after qwrite */
-       Qdropoverflow = (1 << 6),       /* writes that would block will be dropped */
+       Qmsg                    = (1 << 1),     /* message stream */
+       Qclosed                 = (1 << 2),     /* queue has been closed/hungup */
+       Qcoalesce               = (1 << 3),     /* coalesce empty packets on read */
+       Qkick                   = (1 << 4),     /* always call the kick routine after qwrite */
+       Qdropoverflow   = (1 << 5),     /* writes that would block will be dropped */
 };
 
 #define DEVDOTDOT -1
index 0eff769..354f3f4 100644 (file)
@@ -734,7 +734,6 @@ static int __try_qbread(struct queue *q, size_t len, int qio_flags,
        struct block *ret, *ret_last, *first;
        size_t blen;
        bool was_unwritable = FALSE;
-       int dowakeup = 0;
 
        if (qio_flags & QIO_CAN_ERR_SLEEP) {
                if (!qwait_and_ilock(q, qio_flags)) {
@@ -751,6 +750,11 @@ static int __try_qbread(struct queue *q, size_t len, int qio_flags,
                        return QBR_FAIL;
                }
        }
+       /* We need to check before adjusting q->len.  We're checking the writer's
+        * sleep condition / tap condition.  When set, we *might* be making an edge
+        * transition (from unwritable to writable), which needs to wake and fire
+        * taps.  But, our read might not drain the queue below q->lim.  We'll check
+        * again later to see if we should really wake them.  */
        was_unwritable = !qwritable(q);
        blen = BLEN(first);
        if ((q->state & Qcoalesce) && (blen == 0)) {
@@ -807,29 +811,16 @@ static int __try_qbread(struct queue *q, size_t len, int qio_flags,
                len -= blen;
        }
 out_ok:
-       /*
-        *  if writer flow controlled, restart
-        *
-        *  This used to be
-        *  q->len < q->limit/2
-        *  but it slows down tcp too much for certain write sizes.
-        *  I really don't understand it completely.  It may be
-        *  due to the queue draining so fast that the transmission
-        *  stalls waiting for the app to produce more data.  - presotto
-        */
-       if ((q->state & Qflow) && q->len < q->limit) {
-               q->state &= ~Qflow;
-               dowakeup = 1;
-       }
+       /* Don't wake them up or fire tap if we didn't drain enough. */
+       if (!qwritable(q))
+               was_unwritable = FALSE;
        spin_unlock_irqsave(&q->lock);
-       /* wakeup flow controlled writers */
-       if (dowakeup) {
+       if (was_unwritable) {
                if (q->kick && !(qio_flags & QIO_DONT_KICK))
                        q->kick(q->arg);
                rendez_wakeup(&q->wr);
-       }
-       if (was_unwritable)
                qwake_cb(q, FDTAP_FILT_WRITABLE);
+       }
        *real_ret = ret;
        return QBR_OK;
 }
@@ -1183,7 +1174,6 @@ struct queue *qopen(int limit, int msg, void (*kick) (void *), void *arg)
        q->kick = kick;
        q->arg = arg;
        q->state = msg;
-       q->state |= Qstarve;
        q->eof = 0;
 
        return q;
@@ -1234,15 +1224,23 @@ static bool qwait_and_ilock(struct queue *q, int qio_flags)
                        }
                        return FALSE;
                }
-               /* We set Qstarve regardless of whether we are non-blocking or not.
-                * Qstarve tracks the edge detection of the queue being empty. */
-               q->state |= Qstarve;
                if (qio_flags & QIO_NON_BLOCK) {
                        spin_unlock_irqsave(&q->lock);
                        error(EAGAIN, "queue empty");
                }
                spin_unlock_irqsave(&q->lock);
-               /* may throw an error() */
+               /* As with the producer side, we check for a condition while holding the
+                * q->lock, decide to sleep, then unlock.  It's like the "check, signal,
+                * check again" pattern, but we do it conditionally.  Both sides agree
+                * synchronously to do it, and those decisions are made while holding
+                * q->lock.  I think this is OK.
+                *
+                * The invariant is that no reader sleeps when the queue has data.
+                * While holding the rendez lock, if we see there's no data, we'll
+                * sleep.  Since we saw there was no data, the next writer will see (or
+                * already saw) no data, and then the writer decides to rendez_wake,
+                * which will grab the rendez lock.  If the writer already did that,
+                * then we'll see notempty when we do our check-again. */
                rendez_sleep(&q->rr, notempty, q);
        }
 }
@@ -1441,7 +1439,7 @@ static int qnotfull(void *a)
 {
        struct queue *q = a;
 
-       return q->len < q->limit || (q->state & Qclosed);
+       return qwritable(q) || (q->state & Qclosed);
 }
 
 /* Helper: enqueues a list of blocks to a queue.  Returns the total length. */
@@ -1472,8 +1470,7 @@ static size_t enqueue_blist(struct queue *q, struct block *b)
 static ssize_t __qbwrite(struct queue *q, struct block *b, int qio_flags)
 {
        ssize_t ret;
-       bool dowakeup = FALSE;
-       bool was_empty;
+       bool was_unreadable;
 
        if (q->bypass) {
                ret = blocklen(b);
@@ -1481,7 +1478,7 @@ static ssize_t __qbwrite(struct queue *q, struct block *b, int qio_flags)
                return ret;
        }
        spin_lock_irqsave(&q->lock);
-       was_empty = q->len == 0;
+       was_unreadable = q->len == 0;
        if (q->state & Qclosed) {
                spin_unlock_irqsave(&q->lock);
                freeblist(b);
@@ -1509,20 +1506,20 @@ static ssize_t __qbwrite(struct queue *q, struct block *b, int qio_flags)
        }
        ret = enqueue_blist(q, b);
        QDEBUG checkb(b, "__qbwrite");
-       /* make sure other end gets awakened */
-       if (q->state & Qstarve) {
-               q->state &= ~Qstarve;
-               dowakeup = TRUE;
-       }
        spin_unlock_irqsave(&q->lock);
        /* TODO: not sure if the usage of a kick is mutually exclusive with a
         * wakeup, meaning that actual users either want a kick or have qreaders. */
-       if (q->kick && (dowakeup || (q->state & Qkick)))
+       if (q->kick && (was_unreadable || (q->state & Qkick)))
                q->kick(q->arg);
-       if (dowakeup)
+       if (was_unreadable) {
+               /* Unlike the read side, there's no double-check to make sure the queue
+                * transitioned across an edge.  We know we added something, so that's
+                * enough.  We wake if the queue was empty.  Both sides are the same, in
+                * that the condition for which we do the rendez_wakeup() is the same as
+                * the condition done for the rendez_sleep(). */
                rendez_wakeup(&q->rr);
-       if (was_empty)
                qwake_cb(q, FDTAP_FILT_READABLE);
+       }
        /*
         *  flow control, wait for queue to get below the limit
         *  before allowing the process to continue and queue
@@ -1537,16 +1534,27 @@ static ssize_t __qbwrite(struct queue *q, struct block *b, int qio_flags)
         */
        if ((qio_flags & QIO_CAN_ERR_SLEEP) &&
            !(q->state & Qdropoverflow) && !(qio_flags & QIO_NON_BLOCK)) {
-               /* This is a racy peek at the q status.  If we accidentally block, we
-                * set Qflow, so someone should wake us.  If we accidentally don't
-                * block, we just returned to the user and let them slip a block past
-                * flow control. */
-               while (!qnotfull(q)) {
-                       spin_lock_irqsave(&q->lock);
-                       q->state |= Qflow;
-                       spin_unlock_irqsave(&q->lock);
+               /* This is a racy peek at the q status.  If we accidentally block, our
+                * rendez will return.  The rendez's peak (qnotfull) is also racy w.r.t.
+                * the q's spinlock (that lock protects writes, but not reads).
+                *
+                * Here's the deal: when holding the rendez lock, if we see the sleep
+                * condition, the consumer will wake us.  The condition will only ever
+                * be changed by the next qbread() (consumer, changes q->len).  That
+                * code will do a rendez wake, which will spin on the rendez lock,
+                * meaning it won't procede until we either see the new state (and
+                * return) or put ourselves on the rendez, and wake up.
+                *
+                * The pattern is one side writes mem, then signals.  Our side checks
+                * the signal, then reads the mem.  The goal is to not miss seeing the
+                * signal AND missing the memory write.  In this specific case, the
+                * signal is actually synchronous (the rendez lock) and not basic shared
+                * memory.
+                *
+                * Oh, and we spin in case we woke early and someone else filled the
+                * queue, mesa-style. */
+               while (!qnotfull(q))
                        rendez_sleep(&q->wr, qnotfull, q);
-               }
        }
        return ret;
 }
@@ -1676,7 +1684,7 @@ void qclose(struct queue *q)
        /* mark it */
        spin_lock_irqsave(&q->lock);
        q->state |= Qclosed;
-       q->state &= ~(Qflow | Qstarve | Qdropoverflow);
+       q->state &= ~Qdropoverflow;
        q->err[0] = 0;
        bfirst = q->bfirst;
        q->bfirst = 0;
@@ -1731,7 +1739,6 @@ void qreopen(struct queue *q)
 {
        spin_lock_irqsave(&q->lock);
        q->state &= ~Qclosed;
-       q->state |= Qstarve;
        q->eof = 0;
        q->limit = q->inilim;
        q->wake_cb = 0;
@@ -1749,6 +1756,13 @@ int qlen(struct queue *q)
 
 /*
  * return space remaining before flow control
+ *
+ *  This used to be
+ *  q->len < q->limit/2
+ *  but it slows down tcp too much for certain write sizes.
+ *  I really don't understand it completely.  It may be
+ *  due to the queue draining so fast that the transmission
+ *  stalls waiting for the app to produce more data.  - presotto
  */
 int qwindow(struct queue *q)
 {