qio: add a few block helpers
[akaros.git] / kern / src / ns / qio.c
index 28c2455..16caad3 100644 (file)
@@ -1,6 +1,31 @@
-// INFERNO
-#include <vfs.h>
-#include <kfs.h>
+/* Copyright © 1994-1999 Lucent Technologies Inc.  All rights reserved.
+ * Portions Copyright © 1997-1999 Vita Nuova Limited
+ * Portions Copyright © 2000-2007 Vita Nuova Holdings Limited
+ *                                (www.vitanuova.com)
+ * Revisions Copyright © 2000-2007 Lucent Technologies Inc. and others
+ *
+ * Modified for the Akaros operating system:
+ * Copyright (c) 2013-2014 The Regents of the University of California
+ * Copyright (c) 2013-2015 Google Inc.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE. */
+
 #include <slab.h>
 #include <kmalloc.h>
 #include <kref.h>
@@ -11,7 +36,7 @@
 #include <cpio.h>
 #include <pmap.h>
 #include <smp.h>
-#include <ip.h>
+#include <net/ip.h>
 
 #define PANIC_EXTRA(b)                                                 \
 {                                                                      \
@@ -44,21 +69,19 @@ struct queue {
        struct block *bfirst;           /* buffer */
        struct block *blast;
 
-       int len;                                        /* bytes allocated to queue */
-       int dlen;                                       /* data bytes in queue */
-       int limit;                                      /* max bytes in queue */
-       int inilim;                             /* initial limit */
+       int dlen;                       /* data bytes in queue */
+       int limit;                      /* max bytes in queue */
+       int inilim;                     /* initial limit */
        int state;
-       int eof;                                        /* number of eofs read by user */
+       int eof;                        /* number of eofs read by user */
+       size_t bytes_read;
 
        void (*kick) (void *);          /* restart output */
-       void (*bypass) (void *, struct block *);        /* bypass queue altogether */
-       void *arg;                                      /* argument to kick */
+       void (*bypass) (void *, struct block *); /* bypass queue altogether */
+       void *arg;                      /* argument to kick */
 
-       qlock_t rlock;                          /* mutex for reading processes */
-       struct rendez rr;                       /* process waiting to read */
-       qlock_t wlock;                          /* mutex for writing processes */
-       struct rendez wr;                       /* process waiting to write */
+       struct rendez rr;               /* process waiting to read */
+       struct rendez wr;               /* process waiting to write */
        qio_wake_cb_t wake_cb;          /* callbacks for qio wakeups */
        void *wake_data;
 
@@ -67,10 +90,21 @@ struct queue {
 
 enum {
        Maxatomic = 64 * 1024,
+       QIO_CAN_ERR_SLEEP = (1 << 0),   /* can throw errors or block/sleep */
+       QIO_LIMIT = (1 << 1),           /* respect q->limit */
+       QIO_DROP_OVERFLOW = (1 << 2),   /* alternative to qdropoverflow */
+       QIO_JUST_ONE_BLOCK = (1 << 3),  /* when qbreading, just get one block */
+       QIO_NON_BLOCK = (1 << 4),       /* throw EAGAIN instead of blocking */
+       QIO_DONT_KICK = (1 << 5),       /* don't kick when waking */
 };
 
 unsigned int qiomaxatomic = Maxatomic;
 
+static ssize_t __qbwrite(struct queue *q, struct block *b, int flags);
+static struct block *__qbread(struct queue *q, size_t len, int qio_flags,
+                              int mem_flags);
+static bool qwait_and_ilock(struct queue *q, int qio_flags);
+
 /* Helper: fires a wake callback, sending 'filter' */
 static void qwake_cb(struct queue *q, int filter)
 {
@@ -81,7 +115,6 @@ static void qwake_cb(struct queue *q, int filter)
 void ixsummary(void)
 {
        debugging ^= 1;
-       iallocsummary();
        printd("pad %lu, concat %lu, pullup %lu, copy %lu\n",
                   padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
        printd("consume %lu, produce %lu, qcopy %lu\n",
@@ -89,45 +122,29 @@ void ixsummary(void)
 }
 
 /*
- *  free a list of blocks
- */
-void freeblist(struct block *b)
-{
-       struct block *next;
-
-       for (; b != 0; b = next) {
-               next = b->next;
-               b->next = 0;
-               freeb(b);
-       }
-}
-
-/*
  *  pad a block to the front (or the back if size is negative)
  */
 struct block *padblock(struct block *bp, int size)
 {
        int n;
        struct block *nbp;
-       uint8_t bcksum = bp->flag & BCKSUM_FLAGS;
-       uint16_t checksum_start = bp->checksum_start;
-       uint16_t checksum_offset = bp->checksum_offset;
-       uint16_t mss = bp->mss;
 
        QDEBUG checkb(bp, "padblock 1");
        if (size >= 0) {
                if (bp->rp - bp->base >= size) {
-                       bp->checksum_start += size;
+                       bp->network_offset += size;
+                       bp->transport_offset += size;
                        bp->rp -= size;
                        return bp;
                }
 
                PANIC_EXTRA(bp);
                if (bp->next)
-                       panic("padblock %p", getcallerpc(&bp));
+                       panic("%s %p had a next", __func__, bp);
                n = BLEN(bp);
                padblockcnt++;
-               nbp = allocb(size + n);
+               nbp = block_alloc(size + n, MEM_WAIT);
+               block_copy_metadata(nbp, bp);
                nbp->rp += size;
                nbp->wp = nbp->rp;
                memmove(nbp->wp, bp->rp, n);
@@ -140,24 +157,19 @@ struct block *padblock(struct block *bp, int size)
                PANIC_EXTRA(bp);
 
                if (bp->next)
-                       panic("padblock %p", getcallerpc(&bp));
+                       panic("%s %p had a next", __func__, bp);
 
                if (bp->lim - bp->wp >= size)
                        return bp;
 
                n = BLEN(bp);
                padblockcnt++;
-               nbp = allocb(size + n);
+               nbp = block_alloc(size + n, MEM_WAIT);
+               block_copy_metadata(nbp, bp);
                memmove(nbp->wp, bp->rp, n);
                nbp->wp += n;
                freeb(bp);
        }
-       if (bcksum) {
-               nbp->flag |= bcksum;
-               nbp->checksum_start = checksum_start;
-               nbp->checksum_offset = checksum_offset;
-               nbp->mss = mss;
-       }
        QDEBUG checkb(nbp, "padblock 1");
        return nbp;
 }
@@ -206,7 +218,7 @@ struct block *concatblock(struct block *bp)
 
        /* probably use parts of qclone */
        PANIC_EXTRA(bp);
-       nb = allocb(blocklen(bp));
+       nb = block_alloc(blocklen(bp), MEM_WAIT);
        for (f = bp; f; f = f->next) {
                len = BLEN(f);
                memmove(nb->wp, f->rp, len);
@@ -218,45 +230,69 @@ struct block *concatblock(struct block *bp)
        return nb;
 }
 
+/* Makes an identical copy of the block, collapsing all the data into the block
+ * body.  It does not point to the contents of the original, it is a copy
+ * (unlike qclone).  Since we're copying, we might as well put the memory into
+ * one contiguous chunk. */
+struct block *copyblock(struct block *bp, int mem_flags)
+{
+       struct block *newb;
+       struct extra_bdata *ebd;
+       size_t amt;
+
+       QDEBUG checkb(bp, "copyblock 0");
+       newb = block_alloc(BLEN(bp), mem_flags);
+       if (!newb)
+               return 0;
+       amt = block_copy_to_body(newb, bp->rp, BHLEN(bp));
+       assert(amt == BHLEN(bp));
+       for (int i = 0; i < bp->nr_extra_bufs; i++) {
+               ebd = &bp->extra_data[i];
+               if (!ebd->base || !ebd->len)
+                       continue;
+               amt = block_copy_to_body(newb, (void*)ebd->base + ebd->off,
+                                        ebd->len);
+               assert(amt == ebd->len);
+       }
+       block_copy_metadata(newb, bp);
+       copyblockcnt++;
+       QDEBUG checkb(newb, "copyblock 1");
+       return newb;
+}
+
 /* Returns a block with the remaining contents of b all in the main body of the
  * returned block.  Replace old references to b with the returned value (which
  * may still be 'b', if no change was needed. */
 struct block *linearizeblock(struct block *b)
 {
        struct block *newb;
-       size_t len;
-       struct extra_bdata *ebd;
 
        if (!b->extra_len)
                return b;
-
-       newb = allocb(BLEN(b));
-       len = BHLEN(b);
-       memcpy(newb->wp, b->rp, len);
-       newb->wp += len;
-       len = b->extra_len;
-       for (int i = 0; (i < b->nr_extra_bufs) && len; i++) {
-               ebd = &b->extra_data[i];
-               if (!ebd->base || !ebd->len)
-                       continue;
-               memcpy(newb->wp, (void*)(ebd->base + ebd->off), ebd->len);
-               newb->wp += ebd->len;
-               len -= ebd->len;
-       }
-       /* TODO: any other flags that need copied over? */
-       if (b->flag & BCKSUM_FLAGS) {
-               newb->flag |= (b->flag & BCKSUM_FLAGS);
-               newb->checksum_start = b->checksum_start;
-               newb->checksum_offset = b->checksum_offset;
-               newb->mss = b->mss;
-       }
+       newb = copyblock(b, MEM_WAIT);
        freeb(b);
        return newb;
 }
 
-/*
- *  make sure the first block has at least n bytes in its main body
- */
+/* Helper for bookkeeping: we removed amt bytes from block b's ebd, which may
+ * have emptied it. */
+static void block_consume_ebd_bytes(struct block *b, struct extra_bdata *ebd,
+                                   size_t amt)
+{
+       ebd->len -= amt;
+       ebd->off += amt;
+       b->extra_len -= amt;
+       if (!ebd->len) {
+               /* we don't actually have to decref here.  it's also
+                * done in freeb().  this is the earliest we can free. */
+               kfree((void*)ebd->base);
+               ebd->base = ebd->off = 0;
+       }
+}
+
+/* Make sure the first block has at least n bytes in its main body.  Pulls up
+ * data from the *list* of blocks.  Returns 0 if there is not enough data in the
+ * block list. */
 struct block *pullupblock(struct block *bp, int n)
 {
        int i, len, seglen;
@@ -270,18 +306,26 @@ struct block *pullupblock(struct block *bp, int n)
        if (BHLEN(bp) >= n)
                return bp;
 
-        /* a start at explicit main-body / header management */
+       /* If there's no chance, just bail out now.  This might be slightly
+        * wasteful if there's a long blist that does have enough data. */
+       if (n > blocklen(bp))
+               return 0;
+       /* a start at explicit main-body / header management */
        if (bp->extra_len) {
                if (n > bp->lim - bp->rp) {
-                       /* would need to realloc a new block and copy everything over. */
+                       /* would need to realloc a new block and copy everything
+                        * over. */
                        panic("can't pullup %d bytes, no place to put it: bp->lim %p, bp->rp %p, bp->lim-bp->rp %d\n",
-                                       n, bp->lim, bp->rp, bp->lim-bp->rp);
+                             n, bp->lim, bp->rp, bp->lim-bp->rp);
                }
                len = n - BHLEN(bp);
+               /* Would need to recursively call this, or otherwise pull from
+                * later blocks and put chunks of their data into the block
+                * we're building. */
                if (len > bp->extra_len)
                        panic("pullup more than extra (%d, %d, %d)\n",
                              n, BHLEN(bp), bp->extra_len);
-               checkb(bp, "before pullup");
+               QDEBUG checkb(bp, "before pullup");
                for (int i = 0; (i < bp->nr_extra_bufs) && len; i++) {
                        ebd = &bp->extra_data[i];
                        if (!ebd->base || !ebd->len)
@@ -302,7 +346,7 @@ struct block *pullupblock(struct block *bp, int n)
                /* maybe just call pullupblock recursively here */
                if (len)
                        panic("pullup %d bytes overdrawn\n", len);
-               checkb(bp, "after pullup");
+               QDEBUG checkb(bp, "after pullup");
                return bp;
        }
 
@@ -311,7 +355,7 @@ struct block *pullupblock(struct block *bp, int n)
         *  add another to the front of the list.
         */
        if (bp->lim - bp->rp < n) {
-               nbp = allocb(n);
+               nbp = block_alloc(n, MEM_WAIT);
                nbp->next = bp;
                bp = nbp;
        }
@@ -493,365 +537,403 @@ struct block *trimblock(struct block *bp, int offset, int len)
        return bp;
 }
 
-/*
- *  copy 'count' bytes into a new block
- */
-struct block *copyblock(struct block *bp, int count)
-{
-       int l;
-       struct block *nbp;
-
-       QDEBUG checkb(bp, "copyblock 0");
-       nbp = allocb(count);
-       if (bp->flag & BCKSUM_FLAGS) {
-               nbp->flag |= (bp->flag & BCKSUM_FLAGS);
-               nbp->checksum_start = bp->checksum_start;
-               nbp->checksum_offset = bp->checksum_offset;
-               nbp->mss = bp->mss;
-       }
-       PANIC_EXTRA(bp);
-       for (; count > 0 && bp != 0; bp = bp->next) {
-               l = BLEN(bp);
-               if (l > count)
-                       l = count;
-               memmove(nbp->wp, bp->rp, l);
-               nbp->wp += l;
-               count -= l;
-       }
-       if (count > 0) {
-               memset(nbp->wp, 0, count);
-               nbp->wp += count;
-       }
-       copyblockcnt++;
-       QDEBUG checkb(nbp, "copyblock 1");
-
-       return nbp;
-}
-
+/* Adjust block @bp so that its size is exactly @len.
+ * If the size is increased, fill in the new contents with zeros.
+ * If the size is decreased, discard some of the old contents at the tail. */
 struct block *adjustblock(struct block *bp, int len)
 {
-       int n;
-       struct block *nbp;
+       struct extra_bdata *ebd;
+       void *buf;
+       int i;
 
        if (len < 0) {
                freeb(bp);
                return NULL;
        }
 
-       PANIC_EXTRA(bp);
-       if (bp->rp + len > bp->lim) {
-               nbp = copyblock(bp, len);
-               freeblist(bp);
-               QDEBUG checkb(nbp, "adjustblock 1");
+       if (len == BLEN(bp))
+               return bp;
 
-               return nbp;
+       /* Shrink within block main body. */
+       if (len <= BHLEN(bp)) {
+               free_block_extra(bp);
+               bp->wp = bp->rp + len;
+               QDEBUG checkb(bp, "adjustblock 1");
+               return bp;
        }
 
-       n = BLEN(bp);
-       if (len > n)
-               memset(bp->wp, 0, len - n);
-       bp->wp = bp->rp + len;
-       QDEBUG checkb(bp, "adjustblock 2");
+       /* Need to grow. */
+       if (len > BLEN(bp)) {
+               /* Grow within block main body. */
+               if (bp->extra_len == 0 && bp->rp + len <= bp->lim) {
+                       memset(bp->wp, 0, len - BLEN(bp));
+                       bp->wp = bp->rp + len;
+                       QDEBUG checkb(bp, "adjustblock 2");
+                       return bp;
+               }
+               /* Grow with extra data buffers. */
+               buf = kzmalloc(len - BLEN(bp), MEM_WAIT);
+               block_append_extra(bp, (uintptr_t)buf, 0, len - BLEN(bp),
+                                  MEM_WAIT);
+               QDEBUG checkb(bp, "adjustblock 3");
+               return bp;
+       }
 
+       /* Shrink extra data buffers.
+        * len is how much of ebd we need to keep.
+        * extra_len is re-accumulated. */
+       assert(bp->extra_len > 0);
+       len -= BHLEN(bp);
+       bp->extra_len = 0;
+       for (i = 0; i < bp->nr_extra_bufs; i++) {
+               ebd = &bp->extra_data[i];
+               if (len <= ebd->len)
+                       break;
+               len -= ebd->len;
+               bp->extra_len += ebd->len;
+       }
+       /* If len becomes zero, extra_data[i] should be freed. */
+       if (len > 0) {
+               ebd = &bp->extra_data[i];
+               ebd->len = len;
+               bp->extra_len += ebd->len;
+               i++;
+       }
+       for (; i < bp->nr_extra_bufs; i++) {
+               ebd = &bp->extra_data[i];
+               if (ebd->base)
+                       kfree((void*)ebd->base);
+               ebd->base = ebd->off = ebd->len = 0;
+       }
+       QDEBUG checkb(bp, "adjustblock 4");
        return bp;
 }
 
-
-/*
- *  get next block from a queue, return null if nothing there
- */
-struct block *qget(struct queue *q)
+/* Helper: removes and returns the first block from q */
+static struct block *pop_first_block(struct queue *q)
 {
-       int dowakeup;
-       struct block *b;
+       struct block *b = q->bfirst;
 
-       /* sync with qwrite */
-       spin_lock_irqsave(&q->lock);
-
-       b = q->bfirst;
-       if (b == NULL) {
-               q->state |= Qstarve;
-               spin_unlock_irqsave(&q->lock);
-               return NULL;
-       }
+       q->dlen -= BLEN(b);
+       q->bytes_read += BLEN(b);
        q->bfirst = b->next;
        b->next = 0;
-       q->len -= BALLOC(b);
-       q->dlen -= BLEN(b);
-       QDEBUG checkb(b, "qget");
-
-       /* if writer flow controlled, restart */
-       if ((q->state & Qflow) && q->len < q->limit / 2) {
-               q->state &= ~Qflow;
-               dowakeup = 1;
-       } else
-               dowakeup = 0;
+       return b;
+}
 
-       spin_unlock_irqsave(&q->lock);
+/* Accounting helper.  Block b in q lost amt extra_data */
+static void block_and_q_lost_extra(struct block *b, struct queue *q, size_t amt)
+{
+       b->extra_len -= amt;
+       q->dlen -= amt;
+       q->bytes_read += amt;
+}
 
-       if (dowakeup) {
-               rendez_wakeup(&q->wr);
-               /* We only send the writable event on wakeup, which is edge triggered */
-               qwake_cb(q, FDTAP_FILT_WRITABLE);
-       }
+/* Helper: moves ebd from a block (in from_q) to another block.  The *ebd is
+ * fixed in 'from', so we move its contents and zero it out in 'from'.
+ *
+ * Returns the length moved (0 on failure). */
+static size_t move_ebd(struct extra_bdata *ebd, struct block *to,
+                       struct block *from, struct queue *from_q)
+{
+       size_t ret = ebd->len;
 
-       return b;
+       if (block_append_extra(to, ebd->base, ebd->off, ebd->len, MEM_ATOMIC))
+               return 0;
+       block_and_q_lost_extra(from, from_q, ebd->len);
+       ebd->base = ebd->len = ebd->off = 0;
+       return ret;
 }
 
-/*
- *  throw away the next 'len' bytes in the queue
- * returning the number actually discarded
- */
-int qdiscard(struct queue *q, int len)
-{
-       struct block *b;
-       int dowakeup, n, sofar, body_amt, extra_amt;
+/* Copy up to len bytes from q->bfirst to @to, leaving the block in place.  May
+ * return with less than len, but greater than 0, even if there is more
+ * available in q.
+ *
+ * At any moment that we have copied anything and things are tricky, we can just
+ * return.  The trickiness comes from a bunch of variables: is the main body
+ * empty?  How do we split the ebd?  If our alloc fails, then we can fall back
+ * to @to's main body, but only if we haven't used it yet. */
+static size_t copy_from_first_block(struct queue *q, struct block *to,
+                                    size_t len)
+{
+       struct block *from = q->bfirst;
+       size_t copy_amt, amt;
        struct extra_bdata *ebd;
 
-       spin_lock_irqsave(&q->lock);
-       for (sofar = 0; sofar < len; sofar += n) {
-               b = q->bfirst;
-               if (b == NULL)
-                       break;
-               QDEBUG checkb(b, "qdiscard");
-               n = BLEN(b);
-               if (n <= len - sofar) {
-                       q->bfirst = b->next;
-                       b->next = 0;
-                       q->len -= BALLOC(b);
-                       q->dlen -= BLEN(b);
-                       freeb(b);
-               } else {
-                       n = len - sofar;
-                       q->dlen -= n;
-                       /* partial block removal */
-                       body_amt = MIN(BHLEN(b), n);
-                       b->rp += body_amt;
-                       extra_amt = n - body_amt;
-                       /* reduce q->len by the amount we remove from the extras.  The
-                        * header will always be accounted for above, during block removal.
-                        * */
-                       q->len -= extra_amt;
-                       for (int i = 0; (i < b->nr_extra_bufs) && extra_amt; i++) {
-                               ebd = &b->extra_data[i];
-                               if (!ebd->base || !ebd->len)
-                                       continue;
-                               if (extra_amt >= ebd->len) {
-                                       /* remove the entire entry, note the kfree release */
-                                       b->extra_len -= ebd->len;
-                                       extra_amt -= ebd->len;
-                                       kfree((void*)ebd->base);
-                                       ebd->base = ebd->off = ebd->len = 0;
-                                       continue;
-                               }
-                               ebd->off += extra_amt;
-                               ebd->len -= extra_amt;
-                               b->extra_len -= extra_amt;
-                               extra_amt = 0;
+       assert(len < BLEN(from));       /* sanity */
+       /* Try to extract from the main body */
+       copy_amt = MIN(BHLEN(from), len);
+       if (copy_amt) {
+               copy_amt = block_copy_to_body(to, from->rp, copy_amt);
+               from->rp += copy_amt;
+               /* We only change dlen, (data len), not q->len, since the q
+                * still has the same block memory allocation (no kfrees
+                * happened) */
+               q->dlen -= copy_amt;
+               q->bytes_read += copy_amt;
+       }
+       /* Try to extract the remainder from the extra data */
+       len -= copy_amt;
+       for (int i = 0; (i < from->nr_extra_bufs) && len; i++) {
+               ebd = &from->extra_data[i];
+               if (!ebd->base || !ebd->len)
+                       continue;
+               if (len >= ebd->len) {
+                       amt = move_ebd(ebd, to, from, q);
+                       if (!amt) {
+                               /* our internal alloc could have failed.   this
+                                * ebd is now the last one we'll consider.
+                                * let's handle it separately and put it in the
+                                * main body. */
+                               if (copy_amt)
+                                       return copy_amt;
+                               copy_amt = block_copy_to_body(to,
+                                                             (void*)ebd->base +
+                                                             ebd->off,
+                                                             ebd->len);
+                               block_and_q_lost_extra(from, q, copy_amt);
+                               break;
                        }
+                       len -= amt;
+                       copy_amt += amt;
+                       continue;
+               } else {
+                       /* If we're here, we reached our final ebd, which we'll
+                        * need to split to get anything from it. */
+                       if (copy_amt)
+                               return copy_amt;
+                       copy_amt = block_copy_to_body(to, (void*)ebd->base +
+                                                     ebd->off, len);
+                       ebd->off += copy_amt;
+                       ebd->len -= copy_amt;
+                       block_and_q_lost_extra(from, q, copy_amt);
+                       break;
                }
        }
-
-       /*
-        *  if writer flow controlled, restart
-        *
-        *  This used to be
-        *  q->len < q->limit/2
-        *  but it slows down tcp too much for certain write sizes.
-        *  I really don't understand it completely.  It may be
-        *  due to the queue draining so fast that the transmission
-        *  stalls waiting for the app to produce more data.  - presotto
-        */
-       if ((q->state & Qflow) && q->len < q->limit) {
-               q->state &= ~Qflow;
-               dowakeup = 1;
-       } else
-               dowakeup = 0;
-
-       spin_unlock_irqsave(&q->lock);
-
-       if (dowakeup) {
-               rendez_wakeup(&q->wr);
-               qwake_cb(q, FDTAP_FILT_WRITABLE);
-       }
-
-       return sofar;
+       if (len)
+               assert(copy_amt);       /* sanity */
+       return copy_amt;
 }
 
-/*
- *  Interrupt level copy out of a queue, return # bytes copied.
- */
-int qconsume(struct queue *q, void *vp, int len)
-{
-       struct block *b;
-       int n, dowakeup;
-       uint8_t *p = vp;
-       struct block *tofree = NULL;
+/* Return codes for __qbread and __try_qbread. */
+enum {
+       QBR_OK,
+       QBR_FAIL,
+       QBR_SPARE,      /* we need a spare block */
+       QBR_AGAIN,      /* do it again, we are coalescing blocks */
+};
 
-       /* sync with qwrite */
-       spin_lock_irqsave(&q->lock);
+/* Helper and back-end for __qbread: extracts and returns a list of blocks
+ * containing up to len bytes.  It may contain less than len even if q has more
+ * data.
+ *
+ * Returns a code interpreted by __qbread, and the returned blist in ret. */
+static int __try_qbread(struct queue *q, size_t len, int qio_flags,
+                        struct block **real_ret, struct block *spare)
+{
+       struct block *ret, *ret_last, *first;
+       size_t blen;
+       bool was_unwritable = FALSE;
 
-       for (;;) {
-               b = q->bfirst;
-               if (b == 0) {
-                       q->state |= Qstarve;
+       if (qio_flags & QIO_CAN_ERR_SLEEP) {
+               if (!qwait_and_ilock(q, qio_flags)) {
                        spin_unlock_irqsave(&q->lock);
-                       return -1;
+                       return QBR_FAIL;
+               }
+               /* we qwaited and still hold the lock, so the q is not empty */
+               first = q->bfirst;
+       } else {
+               spin_lock_irqsave(&q->lock);
+               first = q->bfirst;
+               if (!first) {
+                       spin_unlock_irqsave(&q->lock);
+                       return QBR_FAIL;
                }
-               QDEBUG checkb(b, "qconsume 1");
-
-               n = BLEN(b);
-               if (n > 0)
-                       break;
-               q->bfirst = b->next;
-               q->len -= BALLOC(b);
-
-               /* remember to free this */
-               b->next = tofree;
-               tofree = b;
-       };
-
-       PANIC_EXTRA(b);
-       if (n < len)
-               len = n;
-       memmove(p, b->rp, len);
-       consumecnt += n;
-       b->rp += len;
-       q->dlen -= len;
-
-       /* discard the block if we're done with it */
-       if ((q->state & Qmsg) || len == n) {
-               q->bfirst = b->next;
-               b->next = 0;
-               q->len -= BALLOC(b);
-               q->dlen -= BLEN(b);
-
-               /* remember to free this */
-               b->next = tofree;
-               tofree = b;
        }
-
-       /* if writer flow controlled, restart */
-       if ((q->state & Qflow) && q->len < q->limit / 2) {
-               q->state &= ~Qflow;
-               dowakeup = 1;
-       } else
-               dowakeup = 0;
-
+       /* We need to check before adjusting q->len.  We're checking the
+        * writer's sleep condition / tap condition.  When set, we *might* be
+        * making an edge transition (from unwritable to writable), which needs
+        * to wake and fire taps.  But, our read might not drain the queue below
+        * q->lim.  We'll check again later to see if we should really wake
+        * them.  */
+       was_unwritable = !qwritable(q);
+       blen = BLEN(first);
+       if ((q->state & Qcoalesce) && (blen == 0)) {
+               freeb(pop_first_block(q));
+               spin_unlock_irqsave(&q->lock);
+               /* Need to retry to make sure we have a first block */
+               return QBR_AGAIN;
+       }
+       /* Qmsg: just return the first block.  Be careful, since our caller
+        * might not read all of the block and thus drop bytes.  Similar to
+        * SOCK_DGRAM. */
+       if (q->state & Qmsg) {
+               ret = pop_first_block(q);
+               goto out_ok;
+       }
+       /* Let's get at least something first - makes the code easier.  This
+        * way, we'll only ever split the block once. */
+       if (blen <= len) {
+               ret = pop_first_block(q);
+               len -= blen;
+       } else {
+               /* need to split the block.  we won't actually take the first
+                * block out of the queue - we're just extracting a little bit.
+                */
+               if (!spare) {
+                       /* We have nothing and need a spare block.  Retry! */
+                       spin_unlock_irqsave(&q->lock);
+                       return QBR_SPARE;
+               }
+               copy_from_first_block(q, spare, len);
+               ret = spare;
+               goto out_ok;
+       }
+       /* At this point, we just grabbed the first block.  We can try to grab
+        * some more, up to len (if they want). */
+       if (qio_flags & QIO_JUST_ONE_BLOCK)
+               goto out_ok;
+       ret_last = ret;
+       while (q->bfirst && (len > 0)) {
+               blen = BLEN(q->bfirst);
+               if ((q->state & Qcoalesce) && (blen == 0)) {
+                       /* remove the intermediate 0 blocks */
+                       freeb(pop_first_block(q));
+                       continue;
+               }
+               if (blen > len) {
+                       /* We could try to split the block, but that's a huge
+                        * pain.  For instance, we might need to move the main
+                        * body of b into an extra_data of ret_last.  lots of
+                        * ways for that to fail, and lots of cases to consider.
+                        * Easier to just bail out.  This is why I did the first
+                        * block above: we don't need to worry about this. */
+                        break;
+               }
+               ret_last->next = pop_first_block(q);
+               ret_last = ret_last->next;
+               len -= blen;
+       }
+out_ok:
+       /* Don't wake them up or fire tap if we didn't drain enough. */
+       if (!qwritable(q))
+               was_unwritable = FALSE;
        spin_unlock_irqsave(&q->lock);
-
-       if (dowakeup) {
+       if (was_unwritable) {
+               if (q->kick && !(qio_flags & QIO_DONT_KICK))
+                       q->kick(q->arg);
                rendez_wakeup(&q->wr);
                qwake_cb(q, FDTAP_FILT_WRITABLE);
        }
-
-       if (tofree != NULL)
-               freeblist(tofree);
-
-       return len;
+       *real_ret = ret;
+       return QBR_OK;
 }
 
-int qpass(struct queue *q, struct block *b)
+/* Helper and front-end for __try_qbread: extracts and returns a list of blocks
+ * containing up to len bytes.  It may contain less than len even if q has more
+ * data.
+ *
+ * Returns 0 if the q is closed, if it would require blocking and !CAN_BLOCK, or
+ * if it required a spare and the memory allocation failed.
+ *
+ * Technically, there's a weird corner case with !Qcoalesce and Qmsg where you
+ * could get a zero length block back. */
+static struct block *__qbread(struct queue *q, size_t len, int qio_flags,
+                              int mem_flags)
 {
-       int dlen, len, dowakeup;
-
-       /* sync with qread */
-       dowakeup = 0;
-       spin_lock_irqsave(&q->lock);
-       if (q->len >= q->limit) {
-               freeblist(b);
-               spin_unlock_irqsave(&q->lock);
-               return -1;
-       }
-       if (q->state & Qclosed) {
-               len = blocklen(b);
-               freeblist(b);
-               spin_unlock_irqsave(&q->lock);
-               return len;
-       }
-
-       /* add buffer to queue */
-       if (q->bfirst)
-               q->blast->next = b;
-       else
-               q->bfirst = b;
-       len = BALLOC(b);
-       dlen = BLEN(b);
-       QDEBUG checkb(b, "qpass");
-       while (b->next) {
-               b = b->next;
-               QDEBUG checkb(b, "qpass");
-               len += BALLOC(b);
-               dlen += BLEN(b);
-       }
-       q->blast = b;
-       q->len += len;
-       q->dlen += dlen;
-
-       if (q->len >= q->limit / 2)
-               q->state |= Qflow;
+       ERRSTACK(1);
+       struct block *ret = 0;
+       struct block *volatile spare = 0;       /* volatile for the waserror */
 
-       if (q->state & Qstarve) {
-               q->state &= ~Qstarve;
-               dowakeup = 1;
+       /* __try_qbread can throw, based on qio flags. */
+       if ((qio_flags & QIO_CAN_ERR_SLEEP) && waserror()) {
+               if (spare)
+                       freeb(spare);
+               nexterror();
        }
-       spin_unlock_irqsave(&q->lock);
-
-       if (dowakeup) {
-               rendez_wakeup(&q->rr);
-               qwake_cb(q, FDTAP_FILT_READABLE);
+       while (1) {
+               switch (__try_qbread(q, len, qio_flags, &ret, spare)) {
+               case QBR_OK:
+               case QBR_FAIL:
+                       if (spare && (ret != spare))
+                               freeb(spare);
+                       goto out_ret;
+               case QBR_SPARE:
+                       assert(!spare);
+                       /* Due to some nastiness, we need a fresh block so we
+                        * can read out anything from the queue.  'len' seems
+                        * like a reasonable amount.  Maybe we can get away with
+                        * less. */
+                       spare = block_alloc(len, mem_flags);
+                       if (!spare) {
+                               /* Careful here: a memory failure (possible with
+                                * MEM_ATOMIC) could look like 'no data in the
+                                * queue' (QBR_FAIL).  The only one who does is
+                                * this qget(), who happens to know that we
+                                * won't need a spare, due to the len argument.
+                                * Spares are only needed when we need to split
+                                * a block. */
+                               ret = 0;
+                               goto out_ret;
+                       }
+                       break;
+               case QBR_AGAIN:
+                       /* if the first block is 0 and we are Qcoalesce, then
+                        * we'll need to try again.  We bounce out of __try so
+                        * we can perform the "is there a block" logic again
+                        * from the top. */
+                       break;
+               }
        }
-
-       return len;
+       assert(0);
+out_ret:
+       if (qio_flags & QIO_CAN_ERR_SLEEP)
+               poperror();
+       return ret;
 }
 
-int qpassnolim(struct queue *q, struct block *b)
+/*
+ *  get next block from a queue, return null if nothing there
+ */
+struct block *qget(struct queue *q)
 {
-       int dlen, len, dowakeup;
-
-       /* sync with qread */
-       dowakeup = 0;
-       spin_lock_irqsave(&q->lock);
-
-       if (q->state & Qclosed) {
-               freeblist(b);
-               spin_unlock_irqsave(&q->lock);
-               return BALLOC(b);
-       }
-
-       /* add buffer to queue */
-       if (q->bfirst)
-               q->blast->next = b;
-       else
-               q->bfirst = b;
-       len = BALLOC(b);
-       dlen = BLEN(b);
-       QDEBUG checkb(b, "qpass");
-       while (b->next) {
-               b = b->next;
-               QDEBUG checkb(b, "qpass");
-               len += BALLOC(b);
-               dlen += BLEN(b);
-       }
-       q->blast = b;
-       q->len += len;
-       q->dlen += dlen;
-
-       if (q->len >= q->limit / 2)
-               q->state |= Qflow;
+       /* since len == SIZE_MAX, we should never need to do a mem alloc */
+       return __qbread(q, SIZE_MAX, QIO_JUST_ONE_BLOCK, MEM_ATOMIC);
+}
 
-       if (q->state & Qstarve) {
-               q->state &= ~Qstarve;
-               dowakeup = 1;
+/* Throw away the next 'len' bytes in the queue returning the number actually
+ * discarded.
+ *
+ * If the bytes are in the queue, then they must be discarded.  The only time to
+ * return less than len is if the q itself has less than len bytes.
+ *
+ * This won't trigger a kick when waking up any sleepers.  This seems to be Plan
+ * 9's intent, since the TCP stack will deadlock if qdiscard kicks. */
+size_t qdiscard(struct queue *q, size_t len)
+{
+       struct block *blist;
+       size_t removed_amt;
+       size_t sofar = 0;
+
+       /* This is racy.  There could be multiple qdiscarders or other
+        * consumers, where the consumption could be interleaved. */
+       while (qlen(q) && len) {
+               blist = __qbread(q, len, QIO_DONT_KICK, MEM_WAIT);
+               removed_amt = freeblist(blist);
+               sofar += removed_amt;
+               len -= removed_amt;
        }
-       spin_unlock_irqsave(&q->lock);
+       return sofar;
+}
 
-       if (dowakeup) {
-               rendez_wakeup(&q->rr);
-               qwake_cb(q, FDTAP_FILT_READABLE);
-       }
+ssize_t qpass(struct queue *q, struct block *b)
+{
+       return __qbwrite(q, b, QIO_LIMIT | QIO_DROP_OVERFLOW);
+}
 
-       return len;
+ssize_t qpassnolim(struct queue *q, struct block *b)
+{
+       return __qbwrite(q, b, 0);
 }
 
 /*
@@ -869,10 +951,11 @@ struct block *packblock(struct block *bp)
                nbp = *l;
                n = BLEN(nbp);
                if ((n << 2) < BALLOC(nbp)) {
-                       *l = allocb(n);
+                       *l = block_alloc(n, MEM_WAIT);
                        memmove((*l)->wp, nbp->rp, n);
                        (*l)->wp += n;
                        (*l)->next = nbp->next;
+                       nbp->next = NULL;
                        freeb(nbp);
                }
        }
@@ -880,83 +963,21 @@ struct block *packblock(struct block *bp)
        return bp;
 }
 
-int qproduce(struct queue *q, void *vp, int len)
+/* Add an extra_data entry to newb at newb_idx pointing to b's body, starting at
+ * body_rp, for up to len.  Returns the len consumed.
+ *
+ * The base is 'b', so that we can kfree it later.  This currently ties us to
+ * using kfree for the release method for all extra_data.
+ *
+ * It is possible to have a body size that is 0, if there is no offset, and
+ * b->wp == b->rp.  This will have an extra data entry of 0 length. */
+static size_t point_to_body(struct block *b, uint8_t *body_rp,
+                            struct block *newb, unsigned int newb_idx,
+                            size_t len)
 {
-       struct block *b;
-       int dowakeup;
-       uint8_t *p = vp;
-
-       /* sync with qread */
-       dowakeup = 0;
-       spin_lock_irqsave(&q->lock);
+       struct extra_bdata *ebd = &newb->extra_data[newb_idx];
 
-       /* no waiting receivers, room in buffer? */
-       if (q->len >= q->limit) {
-               q->state |= Qflow;
-               spin_unlock_irqsave(&q->lock);
-               return -1;
-       }
-
-       /* save in buffer */
-       /* use Qcoalesce here to save storage */
-       // TODO: Consider removing the Qcoalesce flag and force a coalescing
-       // strategy by default.
-       b = q->blast;
-       if ((q->state & Qcoalesce) == 0 || q->bfirst == NULL
-               || b->lim - b->wp < len) {
-               /* need a new block */
-               b = iallocb(len);
-               if (b == 0) {
-                       spin_unlock_irqsave(&q->lock);
-                       return 0;
-               }
-               if (q->bfirst)
-                       q->blast->next = b;
-               else
-                       q->bfirst = b;
-               q->blast = b;
-               /* b->next = 0; done by iallocb() */
-               q->len += BALLOC(b);
-       }
-       PANIC_EXTRA(b);
-       memmove(b->wp, p, len);
-       producecnt += len;
-       b->wp += len;
-       q->dlen += len;
-       QDEBUG checkb(b, "qproduce");
-
-       if (q->state & Qstarve) {
-               q->state &= ~Qstarve;
-               dowakeup = 1;
-       }
-
-       if (q->len >= q->limit)
-               q->state |= Qflow;
-       spin_unlock_irqsave(&q->lock);
-
-       if (dowakeup) {
-               rendez_wakeup(&q->rr);
-               qwake_cb(q, FDTAP_FILT_READABLE);
-       }
-
-       return len;
-}
-
-/* Add an extra_data entry to newb at newb_idx pointing to b's body, starting at
- * body_rp, for up to len.  Returns the len consumed. 
- *
- * The base is 'b', so that we can kfree it later.  This currently ties us to
- * using kfree for the release method for all extra_data.
- *
- * It is possible to have a body size that is 0, if there is no offset, and
- * b->wp == b->rp.  This will have an extra data entry of 0 length. */
-static size_t point_to_body(struct block *b, uint8_t *body_rp,
-                            struct block *newb, unsigned int newb_idx,
-                            size_t len)
-{
-       struct extra_bdata *ebd = &newb->extra_data[newb_idx];
-
-       assert(newb_idx < newb->nr_extra_bufs);
+       assert(newb_idx < newb->nr_extra_bufs);
 
        kmalloc_incref(b);
        ebd->base = (uintptr_t)b;
@@ -990,8 +1011,9 @@ static size_t point_to_buf(struct block *b, unsigned int b_idx, uint32_t b_off,
        return n_ebd->len;
 }
 
-/* given a string of blocks, fills the new block's extra_data  with the contents
- * of the blist [offset, len + offset)
+/* given a string of blocks, sets up the new block's extra_data such that it
+ * *points* to the contents of the blist [offset, len + offset).  This does not
+ * make a separate copy of the contents of the blist.
  *
  * returns 0 on success.  the only failure is if the extra_data array was too
  * small, so this returns a positive integer saying how big the extra_data needs
@@ -1005,25 +1027,32 @@ static int __blist_clone_to(struct block *blist, struct block *newb, int len,
        unsigned int nr_bufs = 0;
        unsigned int b_idx, newb_idx = 0;
        uint8_t *first_main_body = 0;
+       ssize_t sofar = 0;
 
-       /* find the first block; keep offset relative to the latest b in the list */
+       /* find the first block; keep offset relative to the latest b in the
+        * list */
        for (b = blist; b; b = b->next) {
                if (BLEN(b) > offset)
                        break;
                offset -= BLEN(b);
        }
-       /* qcopy semantics: if you asked for an offset outside the block list, you
-        * get an empty block back */
+       /* qcopy semantics: if you asked for an offset outside the block list,
+        * you get an empty block back */
        if (!b)
                return 0;
        first = b;
+       sofar -= offset; /* don't count the remaining offset in the first b */
        /* upper bound for how many buffers we'll need in newb */
        for (/* b is set*/; b; b = b->next) {
-               nr_bufs += 1 + b->nr_extra_bufs;        /* 1 for the main body */
+               nr_bufs += BHLEN(b) ? 1 : 0;
+               /* still assuming nr_extra == nr_valid */
+               nr_bufs += b->nr_extra_bufs;
+               sofar += BLEN(b);
+               if (sofar > len)
+                       break;
        }
        /* we might be holding a spinlock here, so we won't wait for kmalloc */
-       block_add_extd(newb, nr_bufs, 0);
-       if (newb->nr_extra_bufs < nr_bufs) {
+       if (block_add_extd(newb, nr_bufs, 0) != 0) {
                /* caller will need to alloc these, then re-call us */
                return nr_bufs;
        }
@@ -1032,31 +1061,39 @@ static int __blist_clone_to(struct block *blist, struct block *newb, int len,
                if (offset) {
                        if (offset < BHLEN(b)) {
                                /* off is in the main body */
-                               len -= point_to_body(b, b->rp + offset, newb, newb_idx, len);
+                               len -= point_to_body(b, b->rp + offset, newb,
+                                                    newb_idx, len);
                                newb_idx++;
                        } else {
-                               /* off is in one of the buffers (or just past the last one).
-                                * we're not going to point to b's main body at all. */
+                               /* off is in one of the buffers (or just past
+                                * the last one).  we're not going to point to
+                                * b's main body at all. */
                                offset -= BHLEN(b);
                                assert(b->extra_data);
-                               /* assuming these extrabufs are packed, or at least that len
-                                * isn't gibberish */
+                               /* assuming these extrabufs are packed, or at
+                                * least that len isn't gibberish */
                                while (b->extra_data[b_idx].len <= offset) {
                                        offset -= b->extra_data[b_idx].len;
                                        b_idx++;
                                }
-                               /* now offset is set to our offset in the b_idx'th buf */
-                               len -= point_to_buf(b, b_idx, offset, newb, newb_idx, len);
+                               /* now offset is set to our offset in the
+                                * b_idx'th buf */
+                               len -= point_to_buf(b, b_idx, offset, newb,
+                                                   newb_idx, len);
                                newb_idx++;
                                b_idx++;
                        }
                        offset = 0;
                } else {
-                       len -= point_to_body(b, b->rp, newb, newb_idx, len);
-                       newb_idx++;
+                       if (BHLEN(b)) {
+                               len -= point_to_body(b, b->rp, newb, newb_idx,
+                                                    len);
+                               newb_idx++;
+                       }
                }
-               /* knock out all remaining bufs.  we only did one point_to_ op by now,
-                * and any point_to_ could be our last if it consumed all of len. */
+               /* knock out all remaining bufs.  we only did one point_to_ op
+                * by now, and any point_to_ could be our last if it consumed
+                * all of len. */
                for (int i = b_idx; (i < b->nr_extra_bufs) && len; i++) {
                        len -= point_to_buf(b, i, 0, newb, newb_idx, len);
                        newb_idx++;
@@ -1069,100 +1106,45 @@ struct block *blist_clone(struct block *blist, int header_len, int len,
                           uint32_t offset)
 {
        int ret;
-       struct block *newb = allocb(header_len);
+       struct block *newb = block_alloc(header_len, MEM_WAIT);
+
        do {
                ret = __blist_clone_to(blist, newb, len, offset);
                if (ret)
-                       block_add_extd(newb, ret, KMALLOC_WAIT);
+                       block_add_extd(newb, ret, MEM_WAIT);
        } while (ret);
        return newb;
 }
 
 /* given a queue, makes a single block with header_len reserved space in the
  * block main body, and the contents of [offset, len + offset) pointed to in the
- * new blocks ext_data. */
+ * new blocks ext_data.  This does not make a copy of the q's contents, though
+ * you do have a ref count on the memory. */
 struct block *qclone(struct queue *q, int header_len, int len, uint32_t offset)
 {
        int ret;
-       struct block *newb = allocb(header_len);
+       struct block *newb = block_alloc(header_len, MEM_WAIT);
        /* the while loop should rarely be used: it would require someone
         * concurrently adding to the queue. */
        do {
-               /* TODO: RCU: protecting the q list (b->next) (need read lock) */
+               /* TODO: RCU protect the q list (b->next) (need read lock) */
                spin_lock_irqsave(&q->lock);
                ret = __blist_clone_to(q->bfirst, newb, len, offset);
                spin_unlock_irqsave(&q->lock);
                if (ret)
-                       block_add_extd(newb, ret, KMALLOC_WAIT);
+                       block_add_extd(newb, ret, MEM_WAIT);
        } while (ret);
        return newb;
 }
 
-/*
- *  copy from offset in the queue
- */
-struct block *qcopy_old(struct queue *q, int len, uint32_t offset)
-{
-       int sofar;
-       int n;
-       struct block *b, *nb;
-       uint8_t *p;
-
-       nb = allocb(len);
-
-       spin_lock_irqsave(&q->lock);
-
-       /* go to offset */
-       b = q->bfirst;
-       for (sofar = 0;; sofar += n) {
-               if (b == NULL) {
-                       spin_unlock_irqsave(&q->lock);
-                       return nb;
-               }
-               n = BLEN(b);
-               if (sofar + n > offset) {
-                       p = b->rp + offset - sofar;
-                       n -= offset - sofar;
-                       break;
-               }
-               QDEBUG checkb(b, "qcopy");
-               b = b->next;
-       }
-
-       /* copy bytes from there */
-       for (sofar = 0; sofar < len;) {
-               if (n > len - sofar)
-                       n = len - sofar;
-               PANIC_EXTRA(b);
-               memmove(nb->wp, p, n);
-               qcopycnt += n;
-               sofar += n;
-               nb->wp += n;
-               b = b->next;
-               if (b == NULL)
-                       break;
-               n = BLEN(b);
-               p = b->rp;
-       }
-       spin_unlock_irqsave(&q->lock);
-
-       return nb;
-}
-
 struct block *qcopy(struct queue *q, int len, uint32_t offset)
 {
-#ifdef CONFIG_BLOCK_EXTRAS
        return qclone(q, 0, len, offset);
-#else
-       return qcopy_old(q, len, offset);
-#endif
 }
 
 static void qinit_common(struct queue *q)
 {
        spinlock_init_irqsave(&q->lock);
-       qlock_init(&q->rlock);
-       qlock_init(&q->wlock);
        rendez_init(&q->rr);
        rendez_init(&q->wr);
 }
@@ -1183,7 +1165,6 @@ struct queue *qopen(int limit, int msg, void (*kick) (void *), void *arg)
        q->kick = kick;
        q->arg = arg;
        q->state = msg;
-       q->state |= Qstarve;
        q->eof = 0;
 
        return q;
@@ -1214,47 +1195,53 @@ static int notempty(void *a)
        return (q->state & Qclosed) || q->bfirst != 0;
 }
 
-/* Wait for the queue to be non-empty or closed.  Returns TRUE for a successful
- * wait, FALSE on Qclose (without error)
- *
- * Called with q ilocked.  May error out, back through the caller, with
- * the irqsave lock unlocked.  */
-static bool qwait(struct queue *q)
+/* Block, waiting for the queue to be non-empty or closed.  Returns with
+ * the spinlock held.  Returns TRUE when there queue is not empty, FALSE if it
+ * was naturally closed.  Throws an error o/w. */
+static bool qwait_and_ilock(struct queue *q, int qio_flags)
 {
-       /* wait for data */
-       for (;;) {
+       while (1) {
+               spin_lock_irqsave(&q->lock);
                if (q->bfirst != NULL)
-                       break;
-
+                       return TRUE;
                if (q->state & Qclosed) {
                        if (++q->eof > 3) {
                                spin_unlock_irqsave(&q->lock);
-                               error("multiple reads on a closed queue");
+                               error(EPIPE,
+                                     "multiple reads on a closed queue");
                        }
-                       if (*q->err && strcmp(q->err, Ehungup) != 0) {
+                       if (q->err[0]) {
                                spin_unlock_irqsave(&q->lock);
-                               error(q->err);
+                               error(EPIPE, q->err);
                        }
                        return FALSE;
                }
-               /* We set Qstarve regardless of whether we are non-blocking or not.
-                * Qstarve tracks the edge detection of the queue being empty. */
-               q->state |= Qstarve;
-               if (q->state & Qnonblock) {
+               if (qio_flags & QIO_NON_BLOCK) {
                        spin_unlock_irqsave(&q->lock);
-                       set_errno(EAGAIN);
-                       error("queue empty");
+                       error(EAGAIN, "queue empty");
                }
                spin_unlock_irqsave(&q->lock);
-               /* may throw an error() */
+               /* As with the producer side, we check for a condition while
+                * holding the q->lock, decide to sleep, then unlock.  It's like
+                * the "check, signal, check again" pattern, but we do it
+                * conditionally.  Both sides agree synchronously to do it, and
+                * those decisions are made while holding q->lock.  I think this
+                * is OK.
+                *
+                * The invariant is that no reader sleeps when the queue has
+                * data.  While holding the rendez lock, if we see there's no
+                * data, we'll sleep.  Since we saw there was no data, the next
+                * writer will see (or already saw) no data, and then the writer
+                * decides to rendez_wake, which will grab the rendez lock.  If
+                * the writer already did that, then we'll see notempty when we
+                * do our check-again. */
                rendez_sleep(&q->rr, notempty, q);
-               spin_lock_irqsave(&q->lock);
        }
-       return TRUE;
 }
 
 /*
  * add a block list to a queue
+ * XXX basically the same as enqueue blist, and has no locking!
  */
 void qaddlist(struct queue *q, struct block *b)
 {
@@ -1264,63 +1251,36 @@ void qaddlist(struct queue *q, struct block *b)
                q->blast->next = b;
        else
                q->bfirst = b;
-       q->len += blockalloclen(b);
        q->dlen += blocklen(b);
        while (b->next)
                b = b->next;
        q->blast = b;
 }
 
-/*
- *  called with q ilocked
- */
-struct block *qremove(struct queue *q)
-{
-       struct block *b;
-
-       b = q->bfirst;
-       if (b == NULL)
-               return NULL;
-       q->bfirst = b->next;
-       b->next = NULL;
-       q->dlen -= BLEN(b);
-       q->len -= BALLOC(b);
-       QDEBUG checkb(b, "qremove");
-       return b;
-}
-
 static size_t read_from_block(struct block *b, uint8_t *to, size_t amt)
 {
        size_t copy_amt, retval = 0;
        struct extra_bdata *ebd;
-       
+
        copy_amt = MIN(BHLEN(b), amt);
        memcpy(to, b->rp, copy_amt);
-       /* advance the rp, since this block not be completely consumed and future
-        * reads need to know where to pick up from */
+       /* advance the rp, since this block might not be completely consumed and
+        * future reads need to know where to pick up from */
        b->rp += copy_amt;
        to += copy_amt;
        amt -= copy_amt;
        retval += copy_amt;
        for (int i = 0; (i < b->nr_extra_bufs) && amt; i++) {
                ebd = &b->extra_data[i];
-               /* skip empty entires.  if we track this in the struct block, we can
-                * just start the for loop early */
+               /* skip empty entires.  if we track this in the struct block, we
+                * can just start the for loop early */
                if (!ebd->base || !ebd->len)
                        continue;
                copy_amt = MIN(ebd->len, amt);
                memcpy(to, (void*)(ebd->base + ebd->off), copy_amt);
-               /* we're actually consuming the entries, just like how we advance rp up
-                * above, and might only consume part of one. */
-               ebd->len -= copy_amt;
-               ebd->off += copy_amt;
-               b->extra_len -= copy_amt;
-               if (!ebd->len) {
-                       /* we don't actually have to decref here.  it's also done in
-                        * freeb().  this is the earliest we can free. */
-                       kfree((void*)ebd->base);
-                       ebd->base = ebd->off = 0;
-               }
+               /* we're actually consuming the entries, just like how we
+                * advance rp up above, and might only consume part of one. */
+               block_consume_ebd_bytes(b, ebd, copy_amt);
                to += copy_amt;
                amt -= copy_amt;
                retval += copy_amt;
@@ -1351,11 +1311,33 @@ struct block *bl2mem(uint8_t * p, struct block *b, int n)
                n -= i;
                p += i;
                next = b->next;
+               b->next = NULL;
                freeb(b);
        }
        return NULL;
 }
 
+/* Extract the contents of all blocks and copy to va, up to len.  Returns the
+ * actual amount copied. */
+static size_t read_all_blocks(struct block *b, void *va, size_t len)
+{
+       size_t sofar = 0;
+       struct block *next;
+
+       do {
+               assert(va);
+               assert(b->rp);
+               sofar += read_from_block(b, va + sofar, len - sofar);
+               if (BLEN(b) && b->next)
+                       panic("Failed to drain entire block (Qmsg?) but had a next!");
+               next = b->next;
+               b->next = NULL;
+               freeb(b);
+               b = next;
+       } while (b);
+       return sofar;
+}
+
 /*
  *  copy the contents of memory into a string of blocks.
  *  return NULL on error.
@@ -1377,7 +1359,7 @@ struct block *mem2bl(uint8_t * p, int len)
                if (n > Maxatomic)
                        n = Maxatomic;
 
-               *l = b = allocb(n);
+               *l = b = block_alloc(n, MEM_WAIT);
                /* TODO consider extra_data */
                memmove(b->wp, p, n);
                b->wp += n;
@@ -1400,259 +1382,134 @@ void qputback(struct queue *q, struct block *b)
        if (q->bfirst == NULL)
                q->blast = b;
        q->bfirst = b;
-       q->len += BALLOC(b);
        q->dlen += BLEN(b);
+       /* qputback seems to undo a read, so we can undo the accounting too. */
+       q->bytes_read -= BLEN(b);
 }
 
 /*
- *  flow control, get producer going again
- *  called with q ilocked
+ *  get next block from a queue (up to a limit)
+ *
  */
-static void qwakeup_iunlock(struct queue *q)
+struct block *qbread(struct queue *q, size_t len)
 {
-       int dowakeup = 0;
-
-       /* if writer flow controlled, restart */
-       if ((q->state & Qflow) && q->len < q->limit / 2) {
-               q->state &= ~Qflow;
-               dowakeup = 1;
-       }
-
-       spin_unlock_irqsave(&q->lock);
-
-       /* wakeup flow controlled writers */
-       if (dowakeup) {
-               if (q->kick)
-                       q->kick(q->arg);
-               rendez_wakeup(&q->wr);
-               qwake_cb(q, FDTAP_FILT_WRITABLE);
-       }
+       return __qbread(q, len, QIO_JUST_ONE_BLOCK | QIO_CAN_ERR_SLEEP,
+                       MEM_WAIT);
 }
 
-/*
- *  get next block from a queue (up to a limit)
- */
-struct block *qbread(struct queue *q, int len)
+struct block *qbread_nonblock(struct queue *q, size_t len)
 {
-       ERRSTACK(1);
-       struct block *b, *nb;
-       int n;
-
-       qlock(&q->rlock);
-       if (waserror()) {
-               qunlock(&q->rlock);
-               nexterror();
-       }
-
-       spin_lock_irqsave(&q->lock);
-       if (!qwait(q)) {
-               /* queue closed */
-               spin_unlock_irqsave(&q->lock);
-               qunlock(&q->rlock);
-               poperror();
-               return NULL;
-       }
-
-       /* if we get here, there's at least one block in the queue */
-       b = qremove(q);
-       n = BLEN(b);
-
-       /* split block if it's too big and this is not a message queue */
-       nb = b;
-       if (n > len) {
-               PANIC_EXTRA(b);
-               if ((q->state & Qmsg) == 0) {
-                       n -= len;
-                       b = allocb(n);
-                       memmove(b->wp, nb->rp + len, n);
-                       b->wp += n;
-                       qputback(q, b);
-               }
-               nb->wp = nb->rp + len;
-       }
-
-       /* restart producer */
-       qwakeup_iunlock(q);
-
-       poperror();
-       qunlock(&q->rlock);
-       return nb;
+       return __qbread(q, len, QIO_JUST_ONE_BLOCK | QIO_CAN_ERR_SLEEP |
+                       QIO_NON_BLOCK, MEM_WAIT);
 }
 
-/*
- *  read a queue.  if no data is queued, post a struct block
- *  and wait on its Rendez.
- */
-long qread(struct queue *q, void *vp, int len)
+/* read up to len from a queue into vp. */
+size_t qread(struct queue *q, void *va, size_t len)
 {
-       ERRSTACK(1);
-       struct block *b, *first, **l;
-       int m, n;
+       struct block *blist = __qbread(q, len, QIO_CAN_ERR_SLEEP, MEM_WAIT);
 
-       qlock(&q->rlock);
-       if (waserror()) {
-               qunlock(&q->rlock);
-               nexterror();
-       }
-
-       spin_lock_irqsave(&q->lock);
-again:
-       if (!qwait(q)) {
-               /* queue closed */
-               spin_unlock_irqsave(&q->lock);
-               qunlock(&q->rlock);
-               poperror();
+       if (!blist)
                return 0;
-       }
-
-       /* if we get here, there's at least one block in the queue */
-       // TODO: Consider removing the Qcoalesce flag and force a coalescing
-       // strategy by default.
-       if (q->state & Qcoalesce) {
-               /* when coalescing, 0 length blocks just go away */
-               b = q->bfirst;
-               if (BLEN(b) <= 0) {
-                       freeb(qremove(q));
-                       goto again;
-               }
-
-               /*  grab the first block plus as many
-                *  following blocks as will completely
-                *  fit in the read.
-                */
-               n = 0;
-               l = &first;
-               m = BLEN(b);
-               for (;;) {
-                       *l = qremove(q);
-                       l = &b->next;
-                       n += m;
-
-                       b = q->bfirst;
-                       if (b == NULL)
-                               break;
-                       m = BLEN(b);
-                       if (n + m > len)
-                               break;
-               }
-       } else {
-               first = qremove(q);
-               n = BLEN(first);
-       }
-
-       /* copy to user space outside of the ilock */
-       spin_unlock_irqsave(&q->lock);
-       b = bl2mem(vp, first, len);
-       spin_lock_irqsave(&q->lock);
-
-       /* take care of any left over partial block */
-       if (b != NULL) {
-               n -= BLEN(b);
-               if (q->state & Qmsg)
-                       freeb(b);
-               else
-                       qputback(q, b);
-       }
+       return read_all_blocks(blist, va, len);
+}
 
-       /* restart producer */
-       qwakeup_iunlock(q);
+size_t qread_nonblock(struct queue *q, void *va, size_t len)
+{
+       struct block *blist = __qbread(q, len, QIO_CAN_ERR_SLEEP |
+                                      QIO_NON_BLOCK, MEM_WAIT);
 
-       poperror();
-       qunlock(&q->rlock);
-       return n;
+       if (!blist)
+               return 0;
+       return read_all_blocks(blist, va, len);
 }
 
-static int qnotfull(void *a)
+/* This is the rendez wake condition for writers. */
+static int qwriter_should_wake(void *a)
 {
        struct queue *q = a;
 
-       return q->len < q->limit || (q->state & Qclosed);
+       return qwritable(q) || (q->state & Qclosed);
 }
 
-uint32_t dropcnt;
-
-/*
- *  add a block to a queue obeying flow control
- */
-long qbwrite(struct queue *q, struct block *b)
+/* Helper: enqueues a list of blocks to a queue.  Returns the total length. */
+static size_t enqueue_blist(struct queue *q, struct block *b)
 {
-       ERRSTACK(1);
-       int n, dowakeup;
-       volatile bool should_free_b = TRUE;
+       size_t dlen;
+
+       if (q->bfirst)
+               q->blast->next = b;
+       else
+               q->bfirst = b;
+       dlen = BLEN(b);
+       while (b->next) {
+               b = b->next;
+               dlen += BLEN(b);
+       }
+       q->blast = b;
+       q->dlen += dlen;
+       return dlen;
+}
 
-       n = BLEN(b);
+/* Adds block (which can be a list of blocks) to the queue, subject to
+ * qio_flags.  Returns the length written on success or -1 on non-throwable
+ * error.  Adjust qio_flags to control the value-added features!. */
+static ssize_t __qbwrite(struct queue *q, struct block *b, int qio_flags)
+{
+       ssize_t ret;
+       bool was_unreadable;
 
        if (q->bypass) {
+               ret = blocklen(b);
                (*q->bypass) (q->arg, b);
-               return n;
-       }
-
-       dowakeup = 0;
-       qlock(&q->wlock);
-       if (waserror()) {
-               if (b != NULL && should_free_b)
-                       freeb(b);
-               qunlock(&q->wlock);
-               nexterror();
+               return ret;
        }
-
        spin_lock_irqsave(&q->lock);
-
-       /* give up if the queue is closed */
+       was_unreadable = q->dlen == 0;
        if (q->state & Qclosed) {
                spin_unlock_irqsave(&q->lock);
-               error(q->err);
+               freeblist(b);
+               if (!(qio_flags & QIO_CAN_ERR_SLEEP))
+                       return -1;
+               if (q->err[0])
+                       error(EPIPE, q->err);
+               else
+                       error(EPIPE, "connection closed");
        }
-
-       /* if nonblocking, don't queue over the limit */
-       if (q->len >= q->limit) {
+       if ((qio_flags & QIO_LIMIT) && (q->dlen >= q->limit)) {
                /* drop overflow takes priority over regular non-blocking */
-               if (q->state & Qdropoverflow) {
+               if ((qio_flags & QIO_DROP_OVERFLOW)
+                   || (q->state & Qdropoverflow)) {
                        spin_unlock_irqsave(&q->lock);
                        freeb(b);
-                       dropcnt += n;
-                       qunlock(&q->wlock);
-                       poperror();
-                       return n;
+                       return -1;
                }
-               if (q->state & Qnonblock) {
+               /* People shouldn't set NON_BLOCK without CAN_ERR, but we can be
+                * nice and catch it. */
+               if ((qio_flags & QIO_CAN_ERR_SLEEP)
+                   && (qio_flags & QIO_NON_BLOCK)) {
                        spin_unlock_irqsave(&q->lock);
                        freeb(b);
-                       set_errno(EAGAIN);
-                       error("queue full");
+                       error(EAGAIN, "queue full");
                }
        }
-
-       /* queue the block */
-       should_free_b = FALSE;
-       if (q->bfirst)
-               q->blast->next = b;
-       else
-               q->bfirst = b;
-       q->blast = b;
-       b->next = 0;
-       q->len += BALLOC(b);
-       q->dlen += n;
-       QDEBUG checkb(b, "qbwrite");
-       b = NULL;
-
-       /* make sure other end gets awakened */
-       if (q->state & Qstarve) {
-               q->state &= ~Qstarve;
-               dowakeup = 1;
-       }
+       ret = enqueue_blist(q, b);
+       QDEBUG checkb(b, "__qbwrite");
        spin_unlock_irqsave(&q->lock);
-
-       /*  get output going again */
-       if (q->kick && (dowakeup || (q->state & Qkick)))
+       /* TODO: not sure if the usage of a kick is mutually exclusive with a
+        * wakeup, meaning that actual users either want a kick or have
+        * qreaders. */
+       if (q->kick && (was_unreadable || (q->state & Qkick)))
                q->kick(q->arg);
-
-       /* wakeup anyone consuming at the other end */
-       if (dowakeup) {
+       if (was_unreadable) {
+               /* Unlike the read side, there's no double-check to make sure
+                * the queue transitioned across an edge.  We know we added
+                * something, so that's enough.  We wake if the queue was empty.
+                * Both sides are the same, in that the condition for which we
+                * do the rendez_wakeup() is the same as the condition done for
+                * the rendez_sleep(). */
                rendez_wakeup(&q->rr);
                qwake_cb(q, FDTAP_FILT_READABLE);
        }
-
        /*
         *  flow control, wait for queue to get below the limit
         *  before allowing the process to continue and queue
@@ -1665,139 +1522,142 @@ long qbwrite(struct queue *q, struct block *b)
         *  that keeps getting interrupted and rewriting will
         *  queue infinite crud.
         */
-       for (;;) {
-               if ((q->state & (Qdropoverflow | Qnonblock)) || qnotfull(q))
-                       break;
-
-               spin_lock_irqsave(&q->lock);
-               q->state |= Qflow;
-               spin_unlock_irqsave(&q->lock);
-               rendez_sleep(&q->wr, qnotfull, q);
-       }
-
-       qunlock(&q->wlock);
-       poperror();
-       return n;
+       if ((qio_flags & QIO_CAN_ERR_SLEEP) &&
+           !(q->state & Qdropoverflow) && !(qio_flags & QIO_NON_BLOCK)) {
+               /* This is a racy peek at the q status.  If we accidentally
+                * block, our rendez will return.  The rendez's peak
+                * (qwriter_should_wake) is also racy w.r.t.  the q's spinlock
+                * (that lock protects writes, but not reads).
+                *
+                * Here's the deal: when holding the rendez lock, if we see the
+                * sleep condition, the consumer will wake us.  The condition
+                * will only ever be changed by the next qbread() (consumer,
+                * changes q->dlen).  That code will do a rendez wake, which
+                * will spin on the rendez lock, meaning it won't procede until
+                * we either see the new state (and return) or put ourselves on
+                * the rendez, and wake up.
+                *
+                * The pattern is one side writes mem, then signals.  Our side
+                * checks the signal, then reads the mem.  The goal is to not
+                * miss seeing the signal AND missing the memory write.  In this
+                * specific case, the signal is actually synchronous (the rendez
+                * lock) and not basic shared memory.
+                *
+                * Oh, and we spin in case we woke early and someone else filled
+                * the queue, mesa-style. */
+               while (!qwriter_should_wake(q))
+                       rendez_sleep(&q->wr, qwriter_should_wake, q);
+       }
+       return ret;
 }
 
-long qibwrite(struct queue *q, struct block *b)
+/*
+ *  add a block to a queue obeying flow control
+ */
+ssize_t qbwrite(struct queue *q, struct block *b)
 {
-       int n, dowakeup;
+       return __qbwrite(q, b, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
+}
 
-       dowakeup = 0;
+ssize_t qbwrite_nonblock(struct queue *q, struct block *b)
+{
+       return __qbwrite(q, b, QIO_CAN_ERR_SLEEP | QIO_LIMIT | QIO_NON_BLOCK);
+}
 
-       n = BLEN(b);
+ssize_t qibwrite(struct queue *q, struct block *b)
+{
+       return __qbwrite(q, b, 0);
+}
 
-       spin_lock_irqsave(&q->lock);
+/* Helper, allocs a block and copies [from, from + len) into it.  Returns the
+ * block on success, 0 on failure. */
+static struct block *build_block(void *from, size_t len, int mem_flags)
+{
+       struct block *b;
+       void *ext_buf;
 
-       QDEBUG checkb(b, "qibwrite");
-       if (q->bfirst)
-               q->blast->next = b;
-       else
-               q->bfirst = b;
-       q->blast = b;
-       q->len += BALLOC(b);
-       q->dlen += n;
+       /* If len is small, we don't need to bother with the extra_data.  But
+        * until the whole stack can handle extd blocks, we'll use them
+        * unconditionally.  */
 
-       if (q->state & Qstarve) {
-               q->state &= ~Qstarve;
-               dowakeup = 1;
+       /* allocb builds in 128 bytes of header space to all blocks, but this is
+        * only available via padblock (to the left).  we also need some space
+        * for pullupblock for some basic headers (like icmp) that get written
+        * in directly */
+       b = block_alloc(64, mem_flags);
+       if (!b)
+               return 0;
+       ext_buf = kmalloc(len, mem_flags);
+       if (!ext_buf) {
+               kfree(b);
+               return 0;
        }
-
-       spin_unlock_irqsave(&q->lock);
-
-       if (dowakeup) {
-               if (q->kick)
-                       q->kick(q->arg);
-               rendez_wakeup(&q->rr);
-               qwake_cb(q, FDTAP_FILT_READABLE);
+       memcpy(ext_buf, from, len);
+       if (block_add_extd(b, 1, mem_flags)) {
+               kfree(ext_buf);
+               kfree(b);
+               return 0;
        }
-
-       return n;
+       b->extra_data[0].base = (uintptr_t)ext_buf;
+       b->extra_data[0].off = 0;
+       b->extra_data[0].len = len;
+       b->extra_len += len;
+       return b;
 }
 
-/*
- *  write to a queue.  only Maxatomic bytes at a time is atomic.
- */
-int qwrite(struct queue *q, void *vp, int len)
+static ssize_t __qwrite(struct queue *q, void *vp, size_t len, int mem_flags,
+                        int qio_flags)
 {
-       int n, sofar;
+       ERRSTACK(1);
+       size_t n;
+       volatile size_t sofar = 0;      /* volatile for the waserror */
        struct block *b;
        uint8_t *p = vp;
        void *ext_buf;
 
-       QDEBUG if (!islo())
-                printd("qwrite hi %p\n", getcallerpc(&q));
-
-       sofar = 0;
+       /* Only some callers can throw.  Others might be in a context where
+        * waserror isn't safe. */
+       if ((qio_flags & QIO_CAN_ERR_SLEEP) && waserror()) {
+               /* Any error (EAGAIN for nonblock, syscall aborted, even EPIPE)
+                * after some data has been sent should be treated as a partial
+                * write. */
+               if (sofar)
+                       goto out_ok;
+               nexterror();
+       }
        do {
                n = len - sofar;
-               /* This is 64K, the max amount per single block.  Still a good value? */
+               /* This is 64K, the max amount per single block.  Still a good
+                * value? */
                if (n > Maxatomic)
                        n = Maxatomic;
-
-               /* If n is small, we don't need to bother with the extra_data.  But
-                * until the whole stack can handle extd blocks, we'll use them
-                * unconditionally. */
-#ifdef CONFIG_BLOCK_EXTRAS
-               /* allocb builds in 128 bytes of header space to all blocks, but this is
-                * only available via padblock (to the left).  we also need some space
-                * for pullupblock for some basic headers (like icmp) that get written
-                * in directly */
-               b = allocb(64);
-               ext_buf = kmalloc(n, 0);
-               memcpy(ext_buf, p + sofar, n);
-               block_add_extd(b, 1, KMALLOC_WAIT); /* returns 0 on success */
-               b->extra_data[0].base = (uintptr_t)ext_buf;
-               b->extra_data[0].off = 0;
-               b->extra_data[0].len = n;
-               b->extra_len += n;
-#else
-               b = allocb(n);
-               memmove(b->wp, p + sofar, n);
-               b->wp += n;
-#endif
-                       
-               qbwrite(q, b);
-
+               b = build_block(p + sofar, n, mem_flags);
+               if (!b)
+                       break;
+               if (__qbwrite(q, b, qio_flags) < 0)
+                       break;
                sofar += n;
-       } while (sofar < len && (q->state & Qmsg) == 0);
-
-       return len;
+       } while ((sofar < len) && (q->state & Qmsg) == 0);
+out_ok:
+       if (qio_flags & QIO_CAN_ERR_SLEEP)
+               poperror();
+       return sofar;
 }
 
-/*
- *  used by print() to write to a queue.  Since we may be splhi or not in
- *  a process, don't qlock.
- */
-int qiwrite(struct queue *q, void *vp, int len)
+ssize_t qwrite(struct queue *q, void *vp, int len)
 {
-       int n, sofar, dowakeup;
-       struct block *b;
-       uint8_t *p = vp;
-
-       dowakeup = 0;
-
-       sofar = 0;
-       do {
-               n = len - sofar;
-               if (n > Maxatomic)
-                       n = Maxatomic;
-
-               b = iallocb(n);
-               if (b == NULL)
-                       break;
-               /* TODO consider extra_data */
-               memmove(b->wp, p + sofar, n);
-               /* this adjusts BLEN to be n, or at least it should */
-               b->wp += n;
-               assert(n == BLEN(b));
-               qibwrite(q, b);
+       return __qwrite(q, vp, len, MEM_WAIT, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
+}
 
-               sofar += n;
-       } while (sofar < len && (q->state & Qmsg) == 0);
+ssize_t qwrite_nonblock(struct queue *q, void *vp, int len)
+{
+       return __qwrite(q, vp, len, MEM_WAIT, QIO_CAN_ERR_SLEEP | QIO_LIMIT |
+                                             QIO_NON_BLOCK);
+}
 
-       return sofar;
+ssize_t qiwrite(struct queue *q, void *vp, int len)
+{
+       return __qwrite(q, vp, len, MEM_ATOMIC, 0);
 }
 
 /*
@@ -1824,11 +1684,10 @@ void qclose(struct queue *q)
        /* mark it */
        spin_lock_irqsave(&q->lock);
        q->state |= Qclosed;
-       q->state &= ~(Qflow | Qstarve | Qdropoverflow | Qnonblock);
-       strncpy(q->err, Ehungup, sizeof(q->err));
+       q->state &= ~Qdropoverflow;
+       q->err[0] = 0;
        bfirst = q->bfirst;
        q->bfirst = 0;
-       q->len = 0;
        q->dlen = 0;
        spin_unlock_irqsave(&q->lock);
 
@@ -1841,19 +1700,21 @@ void qclose(struct queue *q)
        qwake_cb(q, FDTAP_FILT_HANGUP);
 }
 
-/*
- *  Mark a queue as closed.  Wakeup any readers.  Don't remove queued
- *  blocks.
- */
+/* Mark a queue as closed.  Wakeup any readers.  Don't remove queued blocks.
+ *
+ * msg will be the errstr received by any waiters (qread, qbread, etc).  If
+ * there is no message, which is what also happens during a natural qclose(),
+ * those waiters will simply return 0.  qwriters will always error() on a
+ * closed/hungup queue. */
 void qhangup(struct queue *q, char *msg)
 {
        /* mark it */
        spin_lock_irqsave(&q->lock);
        q->state |= Qclosed;
        if (msg == 0 || *msg == 0)
-               strncpy(q->err, Ehungup, sizeof(q->err));
+               q->err[0] = 0;
        else
-               strncpy(q->err, msg, ERRMAX - 1);
+               strlcpy(q->err, msg, ERRMAX);
        spin_unlock_irqsave(&q->lock);
 
        /* wake up readers/writers */
@@ -1877,7 +1738,6 @@ void qreopen(struct queue *q)
 {
        spin_lock_irqsave(&q->lock);
        q->state &= ~Qclosed;
-       q->state |= Qstarve;
        q->eof = 0;
        q->limit = q->inilim;
        q->wake_cb = 0;
@@ -1893,14 +1753,29 @@ int qlen(struct queue *q)
        return q->dlen;
 }
 
+size_t q_bytes_read(struct queue *q)
+{
+       return q->bytes_read;
+}
+
 /*
  * return space remaining before flow control
+ *
+ *  This used to be
+ *  q->len < q->limit/2
+ *  but it slows down tcp too much for certain write sizes.
+ *  I really don't understand it completely.  It may be
+ *  due to the queue draining so fast that the transmission
+ *  stalls waiting for the app to produce more data.  - presotto
+ *
+ *  q->len was the amount of bytes, which is no longer used.  we now use
+ *  q->dlen, the amount of usable data.  a.k.a. qlen()...  - brho
  */
 int qwindow(struct queue *q)
 {
        int l;
 
-       l = q->limit - q->len;
+       l = q->limit - q->dlen;
        if (l < 0)
                l = 0;
        return l;
@@ -1917,9 +1792,20 @@ int qcanread(struct queue *q)
 /*
  *  change queue limit
  */
-void qsetlimit(struct queue *q, int limit)
+void qsetlimit(struct queue *q, size_t limit)
 {
+       bool was_writable = qwritable(q);
+
        q->limit = limit;
+       if (!was_writable && qwritable(q)) {
+               rendez_wakeup(&q->wr);
+               qwake_cb(q, FDTAP_FILT_WRITABLE);
+       }
+}
+
+size_t qgetlimit(struct queue *q)
+{
+       return q->limit;
 }
 
 /*
@@ -1927,19 +1813,36 @@ void qsetlimit(struct queue *q, int limit)
  */
 void qdropoverflow(struct queue *q, bool onoff)
 {
+       spin_lock_irqsave(&q->lock);
        if (onoff)
                q->state |= Qdropoverflow;
        else
                q->state &= ~Qdropoverflow;
+       spin_unlock_irqsave(&q->lock);
+}
+
+/* Be careful: this can affect concurrent reads/writes and code that might have
+ * built-in expectations of the q's type. */
+void q_toggle_qmsg(struct queue *q, bool onoff)
+{
+       spin_lock_irqsave(&q->lock);
+       if (onoff)
+               q->state |= Qmsg;
+       else
+               q->state &= ~Qmsg;
+       spin_unlock_irqsave(&q->lock);
 }
 
-/* set whether or not the queue is nonblocking, in the EAGAIN sense. */
-void qnonblock(struct queue *q, bool onoff)
+/* Be careful: this can affect concurrent reads/writes and code that might have
+ * built-in expectations of the q's type. */
+void q_toggle_qcoalesce(struct queue *q, bool onoff)
 {
+       spin_lock_irqsave(&q->lock);
        if (onoff)
-               q->state |= Qnonblock;
+               q->state |= Qcoalesce;
        else
-               q->state &= ~Qnonblock;
+               q->state &= ~Qcoalesce;
+       spin_unlock_irqsave(&q->lock);
 }
 
 /*
@@ -1953,7 +1856,6 @@ void qflush(struct queue *q)
        spin_lock_irqsave(&q->lock);
        bfirst = q->bfirst;
        q->bfirst = 0;
-       q->len = 0;
        q->dlen = 0;
        spin_unlock_irqsave(&q->lock);
 
@@ -1967,7 +1869,7 @@ void qflush(struct queue *q)
 
 int qfull(struct queue *q)
 {
-       return q->state & Qflow;
+       return !qwritable(q);
 }
 
 int qstate(struct queue *q)
@@ -1978,8 +1880,8 @@ int qstate(struct queue *q)
 void qdump(struct queue *q)
 {
        if (q)
-               printk("q=%p bfirst=%p blast=%p len=%d dlen=%d limit=%d state=#%x\n",
-                          q, q->bfirst, q->blast, q->len, q->dlen, q->limit, q->state);
+               printk("q=%p bfirst=%p blast=%p dlen=%d limit=%d state=#%x\n",
+                          q, q->bfirst, q->blast, q->dlen, q->limit, q->state);
 }
 
 /* On certain wakeup events, qio will call func(q, data, filter), where filter
@@ -1998,3 +1900,15 @@ void qio_set_wake_cb(struct queue *q, qio_wake_cb_t func, void *data)
        wmb();  /* if we see func, we'll also see the data for it */
        q->wake_cb = func;
 }
+
+/* Helper for detecting whether we'll block on a read at this instant. */
+bool qreadable(struct queue *q)
+{
+       return qlen(q) > 0;
+}
+
+/* Helper for detecting whether we'll block on a write at this instant. */
+bool qwritable(struct queue *q)
+{
+       return !q->limit || qwindow(q) > 0;
+}