qio: Clean up locking
authorBarret Rhoden <brho@cs.berkeley.edu>
Fri, 25 Mar 2016 15:26:25 +0000 (11:26 -0400)
committerBarret Rhoden <brho@cs.berkeley.edu>
Thu, 31 Mar 2016 20:53:42 +0000 (16:53 -0400)
Locking was a bit of a mess, including qlocks and spinlocks.

For the most part, the ordering was qlock->spinlock, but most everything
was protected by the spinlock and the qlock didn't do much.

There are a few issues:

The rlock/wlock qlocks seem mostly to serialize rendez sleepers.  Plan 9
needed this; we do not.  There might be a 'thundering herd' wakeup effect
if you have a large number of sleepers (wake up just to see the condition,
vs sleeping on the qlock that doesn't get released until at least one
thread woke up).  If that's an issue, we can fix it later; maybe in rendez
or at least closer to the rendez.  The qlocks might also be 'protecting'
the spinlock, but it don't see much value in that.

The mess with 'should_free' can be ripped out too - we just free the block
in a couple cases, and now we're explicit about it.  That was nasty.

qwait() was doing weird things with locks too.  It internally unlocks for
you.  Surprise!  Instead I'll just have it lock for you when it returns -
no mystery.

The rlock might have been protecting things related to bl2mem, where we
briefly let go of the spinlock.  The qlock might have been preserving some
invariant (who knows?!).  I decided to just hold the spinlock across
bl2mem.  I avoided doing that in the past since that function could PF.
But we're busted regardless of whether we had a qlock or a spinlock; either
would deadlock if the PF handler tries to use the device to resolve the
fault.

Plus we have a host of memcpys and memmoves that touch user memory in qio.
We need to fix PFs overall - this patch should have made things no worse in
that regard (and hopefully much clearer).

Signed-off-by: Barret Rhoden <brho@cs.berkeley.edu>
kern/src/ns/qio.c

index c61c4b8..e995ca0 100644 (file)
@@ -82,9 +82,7 @@ struct queue {
        void (*bypass) (void *, struct block *);        /* bypass queue altogether */
        void *arg;                                      /* argument to kick */
 
-       qlock_t rlock;                          /* mutex for reading processes */
        struct rendez rr;                       /* process waiting to read */
-       qlock_t wlock;                          /* mutex for writing processes */
        struct rendez wr;                       /* process waiting to write */
        qio_wake_cb_t wake_cb;          /* callbacks for qio wakeups */
        void *wake_data;
@@ -1229,8 +1227,6 @@ struct block *qcopy(struct queue *q, int len, uint32_t offset)
 static void qinit_common(struct queue *q)
 {
        spinlock_init_irqsave(&q->lock);
-       qlock_init(&q->rlock);
-       qlock_init(&q->wlock);
        rendez_init(&q->rr);
        rendez_init(&q->wr);
 }
@@ -1282,18 +1278,15 @@ static int notempty(void *a)
        return (q->state & Qclosed) || q->bfirst != 0;
 }
 
-/* Wait for the queue to be non-empty or closed.  Returns TRUE for a successful
- * wait, FALSE on Qclose (without error)
- *
- * Called with q ilocked.  May error out, back through the caller, with
- * the irqsave lock unlocked.  */
-static bool qwait(struct queue *q)
+/* Block, waiting for the queue to be non-empty or closed.  Returns with
+ * the spinlock held.  Returns TRUE when there queue is not empty, FALSE if it
+ * was naturally closed.  Throws an error o/w. */
+static bool qwait_and_ilock(struct queue *q)
 {
-       /* wait for data */
-       for (;;) {
+       while (1) {
+               spin_lock_irqsave(&q->lock);
                if (q->bfirst != NULL)
-                       break;
-
+                       return TRUE;
                if (q->state & Qclosed) {
                        if (++q->eof > 3) {
                                spin_unlock_irqsave(&q->lock);
@@ -1315,9 +1308,7 @@ static bool qwait(struct queue *q)
                spin_unlock_irqsave(&q->lock);
                /* may throw an error() */
                rendez_sleep(&q->rr, notempty, q);
-               spin_lock_irqsave(&q->lock);
        }
-       return TRUE;
 }
 
 /*
@@ -1501,22 +1492,12 @@ static void qwakeup_iunlock(struct queue *q)
  */
 struct block *qbread(struct queue *q, int len)
 {
-       ERRSTACK(1);
        struct block *b, *nb;
        int n;
 
-       qlock(&q->rlock);
-       if (waserror()) {
-               qunlock(&q->rlock);
-               nexterror();
-       }
-
-       spin_lock_irqsave(&q->lock);
-       if (!qwait(q)) {
+       if (!qwait_and_ilock(q)) {
                /* queue closed */
                spin_unlock_irqsave(&q->lock);
-               qunlock(&q->rlock);
-               poperror();
                return NULL;
        }
 
@@ -1541,8 +1522,6 @@ struct block *qbread(struct queue *q, int len)
        /* restart producer */
        qwakeup_iunlock(q);
 
-       poperror();
-       qunlock(&q->rlock);
        return nb;
 }
 
@@ -1552,23 +1531,13 @@ struct block *qbread(struct queue *q, int len)
  */
 long qread(struct queue *q, void *vp, int len)
 {
-       ERRSTACK(1);
        struct block *b, *first, **l;
        int m, n;
 
-       qlock(&q->rlock);
-       if (waserror()) {
-               qunlock(&q->rlock);
-               nexterror();
-       }
-
-       spin_lock_irqsave(&q->lock);
 again:
-       if (!qwait(q)) {
+       if (!qwait_and_ilock(q)) {
                /* queue closed */
                spin_unlock_irqsave(&q->lock);
-               qunlock(&q->rlock);
-               poperror();
                return 0;
        }
 
@@ -1580,6 +1549,7 @@ again:
                b = q->bfirst;
                if (BLEN(b) <= 0) {
                        freeb(qremove(q));
+                       spin_unlock_irqsave(&q->lock);
                        goto again;
                }
 
@@ -1606,12 +1576,7 @@ again:
                first = qremove(q);
                n = BLEN(first);
        }
-
-       /* copy to user space outside of the ilock */
-       spin_unlock_irqsave(&q->lock);
        b = bl2mem(vp, first, len);
-       spin_lock_irqsave(&q->lock);
-
        /* take care of any left over partial block */
        if (b != NULL) {
                n -= BLEN(b);
@@ -1624,8 +1589,6 @@ again:
        /* restart producer */
        qwakeup_iunlock(q);
 
-       poperror();
-       qunlock(&q->rlock);
        return n;
 }
 
@@ -1643,9 +1606,7 @@ uint32_t dropcnt;
  */
 long qbwrite(struct queue *q, struct block *b)
 {
-       ERRSTACK(1);
        int n, dowakeup;
-       volatile bool should_free_b = TRUE;
        bool was_empty;
 
        n = BLEN(b);
@@ -1656,13 +1617,6 @@ long qbwrite(struct queue *q, struct block *b)
        }
 
        dowakeup = 0;
-       qlock(&q->wlock);
-       if (waserror()) {
-               if (b != NULL && should_free_b)
-                       freeb(b);
-               qunlock(&q->wlock);
-               nexterror();
-       }
 
        spin_lock_irqsave(&q->lock);
        was_empty = q->len == 0;
@@ -1670,6 +1624,7 @@ long qbwrite(struct queue *q, struct block *b)
        /* give up if the queue is closed */
        if (q->state & Qclosed) {
                spin_unlock_irqsave(&q->lock);
+               freeb(b);
                if (q->err[0])
                        error(EFAIL, q->err);
                else
@@ -1683,8 +1638,6 @@ long qbwrite(struct queue *q, struct block *b)
                        spin_unlock_irqsave(&q->lock);
                        freeb(b);
                        dropcnt += n;
-                       qunlock(&q->wlock);
-                       poperror();
                        return n;
                }
                if (q->state & Qnonblock) {
@@ -1695,7 +1648,6 @@ long qbwrite(struct queue *q, struct block *b)
        }
 
        /* queue the block */
-       should_free_b = FALSE;
        if (q->bfirst)
                q->blast->next = b;
        else
@@ -1746,8 +1698,6 @@ long qbwrite(struct queue *q, struct block *b)
                rendez_sleep(&q->wr, qnotfull, q);
        }
 
-       qunlock(&q->wlock);
-       poperror();
        return n;
 }