qio: add a few block helpers
[akaros.git] / kern / src / ns / qio.c
index f5b5430..16caad3 100644 (file)
@@ -26,8 +26,6 @@
  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  * SOFTWARE. */
 
-#include <vfs.h>
-#include <kfs.h>
 #include <slab.h>
 #include <kmalloc.h>
 #include <kref.h>
@@ -38,7 +36,7 @@
 #include <cpio.h>
 #include <pmap.h>
 #include <smp.h>
-#include <ip.h>
+#include <net/ip.h>
 
 #define PANIC_EXTRA(b)                                                 \
 {                                                                      \
@@ -71,18 +69,19 @@ struct queue {
        struct block *bfirst;           /* buffer */
        struct block *blast;
 
-       int dlen;                                       /* data bytes in queue */
-       int limit;                                      /* max bytes in queue */
-       int inilim;                             /* initial limit */
+       int dlen;                       /* data bytes in queue */
+       int limit;                      /* max bytes in queue */
+       int inilim;                     /* initial limit */
        int state;
-       int eof;                                        /* number of eofs read by user */
+       int eof;                        /* number of eofs read by user */
+       size_t bytes_read;
 
        void (*kick) (void *);          /* restart output */
-       void (*bypass) (void *, struct block *);        /* bypass queue altogether */
-       void *arg;                                      /* argument to kick */
+       void (*bypass) (void *, struct block *); /* bypass queue altogether */
+       void *arg;                      /* argument to kick */
 
-       struct rendez rr;                       /* process waiting to read */
-       struct rendez wr;                       /* process waiting to write */
+       struct rendez rr;               /* process waiting to read */
+       struct rendez wr;               /* process waiting to write */
        qio_wake_cb_t wake_cb;          /* callbacks for qio wakeups */
        void *wake_data;
 
@@ -92,16 +91,15 @@ struct queue {
 enum {
        Maxatomic = 64 * 1024,
        QIO_CAN_ERR_SLEEP = (1 << 0),   /* can throw errors or block/sleep */
-       QIO_LIMIT = (1 << 1),                   /* respect q->limit */
-       QIO_DROP_OVERFLOW = (1 << 2),   /* alternative to setting qdropoverflow */
+       QIO_LIMIT = (1 << 1),           /* respect q->limit */
+       QIO_DROP_OVERFLOW = (1 << 2),   /* alternative to qdropoverflow */
        QIO_JUST_ONE_BLOCK = (1 << 3),  /* when qbreading, just get one block */
-       QIO_NON_BLOCK = (1 << 4),               /* throw EAGAIN instead of blocking */
-       QIO_DONT_KICK = (1 << 5),               /* don't kick when waking */
+       QIO_NON_BLOCK = (1 << 4),       /* throw EAGAIN instead of blocking */
+       QIO_DONT_KICK = (1 << 5),       /* don't kick when waking */
 };
 
 unsigned int qiomaxatomic = Maxatomic;
 
-static size_t copy_to_block_body(struct block *to, void *from, size_t copy_amt);
 static ssize_t __qbwrite(struct queue *q, struct block *b, int flags);
 static struct block *__qbread(struct queue *q, size_t len, int qio_flags,
                               int mem_flags);
@@ -130,25 +128,23 @@ struct block *padblock(struct block *bp, int size)
 {
        int n;
        struct block *nbp;
-       uint8_t bcksum = bp->flag & BCKSUM_FLAGS;
-       uint16_t checksum_start = bp->checksum_start;
-       uint16_t checksum_offset = bp->checksum_offset;
-       uint16_t mss = bp->mss;
 
        QDEBUG checkb(bp, "padblock 1");
        if (size >= 0) {
                if (bp->rp - bp->base >= size) {
-                       bp->checksum_start += size;
+                       bp->network_offset += size;
+                       bp->transport_offset += size;
                        bp->rp -= size;
                        return bp;
                }
 
                PANIC_EXTRA(bp);
                if (bp->next)
-                       panic("padblock %p", getcallerpc(&bp));
+                       panic("%s %p had a next", __func__, bp);
                n = BLEN(bp);
                padblockcnt++;
                nbp = block_alloc(size + n, MEM_WAIT);
+               block_copy_metadata(nbp, bp);
                nbp->rp += size;
                nbp->wp = nbp->rp;
                memmove(nbp->wp, bp->rp, n);
@@ -161,7 +157,7 @@ struct block *padblock(struct block *bp, int size)
                PANIC_EXTRA(bp);
 
                if (bp->next)
-                       panic("padblock %p", getcallerpc(&bp));
+                       panic("%s %p had a next", __func__, bp);
 
                if (bp->lim - bp->wp >= size)
                        return bp;
@@ -169,16 +165,11 @@ struct block *padblock(struct block *bp, int size)
                n = BLEN(bp);
                padblockcnt++;
                nbp = block_alloc(size + n, MEM_WAIT);
+               block_copy_metadata(nbp, bp);
                memmove(nbp->wp, bp->rp, n);
                nbp->wp += n;
                freeb(bp);
        }
-       if (bcksum) {
-               nbp->flag |= bcksum;
-               nbp->checksum_start = checksum_start;
-               nbp->checksum_offset = checksum_offset;
-               nbp->mss = mss;
-       }
        QDEBUG checkb(nbp, "padblock 1");
        return nbp;
 }
@@ -253,22 +244,17 @@ struct block *copyblock(struct block *bp, int mem_flags)
        newb = block_alloc(BLEN(bp), mem_flags);
        if (!newb)
                return 0;
-       amt = copy_to_block_body(newb, bp->rp, BHLEN(bp));
+       amt = block_copy_to_body(newb, bp->rp, BHLEN(bp));
        assert(amt == BHLEN(bp));
        for (int i = 0; i < bp->nr_extra_bufs; i++) {
                ebd = &bp->extra_data[i];
                if (!ebd->base || !ebd->len)
                        continue;
-               amt = copy_to_block_body(newb, (void*)ebd->base + ebd->off, ebd->len);
+               amt = block_copy_to_body(newb, (void*)ebd->base + ebd->off,
+                                        ebd->len);
                assert(amt == ebd->len);
        }
-       /* TODO: any other flags that need copied over? */
-       if (bp->flag & BCKSUM_FLAGS) {
-               newb->flag |= (bp->flag & BCKSUM_FLAGS);
-               newb->checksum_start = bp->checksum_start;
-               newb->checksum_offset = bp->checksum_offset;
-               newb->mss = bp->mss;
-       }
+       block_copy_metadata(newb, bp);
        copyblockcnt++;
        QDEBUG checkb(newb, "copyblock 1");
        return newb;
@@ -288,6 +274,22 @@ struct block *linearizeblock(struct block *b)
        return newb;
 }
 
+/* Helper for bookkeeping: we removed amt bytes from block b's ebd, which may
+ * have emptied it. */
+static void block_consume_ebd_bytes(struct block *b, struct extra_bdata *ebd,
+                                   size_t amt)
+{
+       ebd->len -= amt;
+       ebd->off += amt;
+       b->extra_len -= amt;
+       if (!ebd->len) {
+               /* we don't actually have to decref here.  it's also
+                * done in freeb().  this is the earliest we can free. */
+               kfree((void*)ebd->base);
+               ebd->base = ebd->off = 0;
+       }
+}
+
 /* Make sure the first block has at least n bytes in its main body.  Pulls up
  * data from the *list* of blocks.  Returns 0 if there is not enough data in the
  * block list. */
@@ -304,20 +306,22 @@ struct block *pullupblock(struct block *bp, int n)
        if (BHLEN(bp) >= n)
                return bp;
 
-       /* If there's no chance, just bail out now.  This might be slightly wasteful
-        * if there's a long blist that does have enough data. */
+       /* If there's no chance, just bail out now.  This might be slightly
+        * wasteful if there's a long blist that does have enough data. */
        if (n > blocklen(bp))
                return 0;
        /* a start at explicit main-body / header management */
        if (bp->extra_len) {
                if (n > bp->lim - bp->rp) {
-                       /* would need to realloc a new block and copy everything over. */
+                       /* would need to realloc a new block and copy everything
+                        * over. */
                        panic("can't pullup %d bytes, no place to put it: bp->lim %p, bp->rp %p, bp->lim-bp->rp %d\n",
-                                       n, bp->lim, bp->rp, bp->lim-bp->rp);
+                             n, bp->lim, bp->rp, bp->lim-bp->rp);
                }
                len = n - BHLEN(bp);
-               /* Would need to recursively call this, or otherwise pull from later
-                * blocks and put chunks of their data into the block we're building. */
+               /* Would need to recursively call this, or otherwise pull from
+                * later blocks and put chunks of their data into the block
+                * we're building. */
                if (len > bp->extra_len)
                        panic("pullup more than extra (%d, %d, %d)\n",
                              n, BHLEN(bp), bp->extra_len);
@@ -569,7 +573,8 @@ struct block *adjustblock(struct block *bp, int len)
                }
                /* Grow with extra data buffers. */
                buf = kzmalloc(len - BLEN(bp), MEM_WAIT);
-               block_append_extra(bp, (uintptr_t)buf, 0, len - BLEN(bp), MEM_WAIT);
+               block_append_extra(bp, (uintptr_t)buf, 0, len - BLEN(bp),
+                                  MEM_WAIT);
                QDEBUG checkb(bp, "adjustblock 3");
                return bp;
        }
@@ -610,25 +615,18 @@ static struct block *pop_first_block(struct queue *q)
        struct block *b = q->bfirst;
 
        q->dlen -= BLEN(b);
+       q->bytes_read += BLEN(b);
        q->bfirst = b->next;
        b->next = 0;
        return b;
 }
 
-/* Helper: copies up to copy_amt from a buf to a block's main body (b->wp) */
-static size_t copy_to_block_body(struct block *to, void *from, size_t copy_amt)
-{
-       copy_amt = MIN(to->lim - to->wp, copy_amt);
-       memcpy(to->wp, from, copy_amt);
-       to->wp += copy_amt;
-       return copy_amt;
-}
-
 /* Accounting helper.  Block b in q lost amt extra_data */
 static void block_and_q_lost_extra(struct block *b, struct queue *q, size_t amt)
 {
        b->extra_len -= amt;
        q->dlen -= amt;
+       q->bytes_read += amt;
 }
 
 /* Helper: moves ebd from a block (in from_q) to another block.  The *ebd is
@@ -666,11 +664,13 @@ static size_t copy_from_first_block(struct queue *q, struct block *to,
        /* Try to extract from the main body */
        copy_amt = MIN(BHLEN(from), len);
        if (copy_amt) {
-               copy_amt = copy_to_block_body(to, from->rp, copy_amt);
+               copy_amt = block_copy_to_body(to, from->rp, copy_amt);
                from->rp += copy_amt;
-               /* We only change dlen, (data len), not q->len, since the q still has
-                * the same block memory allocation (no kfrees happened) */
+               /* We only change dlen, (data len), not q->len, since the q
+                * still has the same block memory allocation (no kfrees
+                * happened) */
                q->dlen -= copy_amt;
+               q->bytes_read += copy_amt;
        }
        /* Try to extract the remainder from the extra data */
        len -= copy_amt;
@@ -681,12 +681,15 @@ static size_t copy_from_first_block(struct queue *q, struct block *to,
                if (len >= ebd->len) {
                        amt = move_ebd(ebd, to, from, q);
                        if (!amt) {
-                               /* our internal alloc could have failed.   this ebd is now the
-                                * last one we'll consider.  let's handle it separately and put
-                                * it in the main body. */
+                               /* our internal alloc could have failed.   this
+                                * ebd is now the last one we'll consider.
+                                * let's handle it separately and put it in the
+                                * main body. */
                                if (copy_amt)
                                        return copy_amt;
-                               copy_amt = copy_to_block_body(to, (void*)ebd->base + ebd->off,
+                               copy_amt = block_copy_to_body(to,
+                                                             (void*)ebd->base +
+                                                             ebd->off,
                                                              ebd->len);
                                block_and_q_lost_extra(from, q, copy_amt);
                                break;
@@ -695,12 +698,12 @@ static size_t copy_from_first_block(struct queue *q, struct block *to,
                        copy_amt += amt;
                        continue;
                } else {
-                       /* If we're here, we reached our final ebd, which we'll need to
-                        * split to get anything from it. */
+                       /* If we're here, we reached our final ebd, which we'll
+                        * need to split to get anything from it. */
                        if (copy_amt)
                                return copy_amt;
-                       copy_amt = copy_to_block_body(to, (void*)ebd->base + ebd->off,
-                                                     len);
+                       copy_amt = block_copy_to_body(to, (void*)ebd->base +
+                                                     ebd->off, len);
                        ebd->off += copy_amt;
                        ebd->len -= copy_amt;
                        block_and_q_lost_extra(from, q, copy_amt);
@@ -747,11 +750,12 @@ static int __try_qbread(struct queue *q, size_t len, int qio_flags,
                        return QBR_FAIL;
                }
        }
-       /* We need to check before adjusting q->len.  We're checking the writer's
-        * sleep condition / tap condition.  When set, we *might* be making an edge
-        * transition (from unwritable to writable), which needs to wake and fire
-        * taps.  But, our read might not drain the queue below q->lim.  We'll check
-        * again later to see if we should really wake them.  */
+       /* We need to check before adjusting q->len.  We're checking the
+        * writer's sleep condition / tap condition.  When set, we *might* be
+        * making an edge transition (from unwritable to writable), which needs
+        * to wake and fire taps.  But, our read might not drain the queue below
+        * q->lim.  We'll check again later to see if we should really wake
+        * them.  */
        was_unwritable = !qwritable(q);
        blen = BLEN(first);
        if ((q->state & Qcoalesce) && (blen == 0)) {
@@ -760,20 +764,22 @@ static int __try_qbread(struct queue *q, size_t len, int qio_flags,
                /* Need to retry to make sure we have a first block */
                return QBR_AGAIN;
        }
-       /* Qmsg: just return the first block.  Be careful, since our caller might
-        * not read all of the block and thus drop bytes.  Similar to SOCK_DGRAM. */
+       /* Qmsg: just return the first block.  Be careful, since our caller
+        * might not read all of the block and thus drop bytes.  Similar to
+        * SOCK_DGRAM. */
        if (q->state & Qmsg) {
                ret = pop_first_block(q);
                goto out_ok;
        }
-       /* Let's get at least something first - makes the code easier.  This way,
-        * we'll only ever split the block once. */
+       /* Let's get at least something first - makes the code easier.  This
+        * way, we'll only ever split the block once. */
        if (blen <= len) {
                ret = pop_first_block(q);
                len -= blen;
        } else {
-               /* need to split the block.  we won't actually take the first block out
-                * of the queue - we're just extracting a little bit. */
+               /* need to split the block.  we won't actually take the first
+                * block out of the queue - we're just extracting a little bit.
+                */
                if (!spare) {
                        /* We have nothing and need a spare block.  Retry! */
                        spin_unlock_irqsave(&q->lock);
@@ -783,8 +789,8 @@ static int __try_qbread(struct queue *q, size_t len, int qio_flags,
                ret = spare;
                goto out_ok;
        }
-       /* At this point, we just grabbed the first block.  We can try to grab some
-        * more, up to len (if they want). */
+       /* At this point, we just grabbed the first block.  We can try to grab
+        * some more, up to len (if they want). */
        if (qio_flags & QIO_JUST_ONE_BLOCK)
                goto out_ok;
        ret_last = ret;
@@ -796,11 +802,12 @@ static int __try_qbread(struct queue *q, size_t len, int qio_flags,
                        continue;
                }
                if (blen > len) {
-                       /* We could try to split the block, but that's a huge pain.  For
-                        * instance, we might need to move the main body of b into an
-                        * extra_data of ret_last.  lots of ways for that to fail, and lots
-                        * of cases to consider.  Easier to just bail out.  This is why I
-                        * did the first block above: we don't need to worry about this. */
+                       /* We could try to split the block, but that's a huge
+                        * pain.  For instance, we might need to move the main
+                        * body of b into an extra_data of ret_last.  lots of
+                        * ways for that to fail, and lots of cases to consider.
+                        * Easier to just bail out.  This is why I did the first
+                        * block above: we don't need to worry about this. */
                         break;
                }
                ret_last->next = pop_first_block(q);
@@ -826,39 +833,63 @@ out_ok:
  * containing up to len bytes.  It may contain less than len even if q has more
  * data.
  *
- * Returns 0 if the q is closed or would require blocking and !CAN_BLOCK.
+ * Returns 0 if the q is closed, if it would require blocking and !CAN_BLOCK, or
+ * if it required a spare and the memory allocation failed.
  *
  * Technically, there's a weird corner case with !Qcoalesce and Qmsg where you
  * could get a zero length block back. */
 static struct block *__qbread(struct queue *q, size_t len, int qio_flags,
                               int mem_flags)
 {
+       ERRSTACK(1);
        struct block *ret = 0;
-       struct block *spare = 0;
+       struct block *volatile spare = 0;       /* volatile for the waserror */
 
+       /* __try_qbread can throw, based on qio flags. */
+       if ((qio_flags & QIO_CAN_ERR_SLEEP) && waserror()) {
+               if (spare)
+                       freeb(spare);
+               nexterror();
+       }
        while (1) {
                switch (__try_qbread(q, len, qio_flags, &ret, spare)) {
                case QBR_OK:
                case QBR_FAIL:
                        if (spare && (ret != spare))
                                freeb(spare);
-                       return ret;
+                       goto out_ret;
                case QBR_SPARE:
                        assert(!spare);
-                       /* Due to some nastiness, we need a fresh block so we can read out
-                        * anything from the queue.  'len' seems like a reasonable amount.
-                        * Maybe we can get away with less. */
+                       /* Due to some nastiness, we need a fresh block so we
+                        * can read out anything from the queue.  'len' seems
+                        * like a reasonable amount.  Maybe we can get away with
+                        * less. */
                        spare = block_alloc(len, mem_flags);
-                       if (!spare)
-                               return 0;
+                       if (!spare) {
+                               /* Careful here: a memory failure (possible with
+                                * MEM_ATOMIC) could look like 'no data in the
+                                * queue' (QBR_FAIL).  The only one who does is
+                                * this qget(), who happens to know that we
+                                * won't need a spare, due to the len argument.
+                                * Spares are only needed when we need to split
+                                * a block. */
+                               ret = 0;
+                               goto out_ret;
+                       }
                        break;
                case QBR_AGAIN:
-                       /* if the first block is 0 and we are Qcoalesce, then we'll need to
-                        * try again.  We bounce out of __try so we can perform the "is
-                        * there a block" logic again from the top. */
+                       /* if the first block is 0 and we are Qcoalesce, then
+                        * we'll need to try again.  We bounce out of __try so
+                        * we can perform the "is there a block" logic again
+                        * from the top. */
                        break;
                }
        }
+       assert(0);
+out_ret:
+       if (qio_flags & QIO_CAN_ERR_SLEEP)
+               poperror();
+       return ret;
 }
 
 /*
@@ -884,8 +915,8 @@ size_t qdiscard(struct queue *q, size_t len)
        size_t removed_amt;
        size_t sofar = 0;
 
-       /* This is racy.  There could be multiple qdiscarders or other consumers,
-        * where the consumption could be interleaved. */
+       /* This is racy.  There could be multiple qdiscarders or other
+        * consumers, where the consumption could be interleaved. */
        while (qlen(q) && len) {
                blist = __qbread(q, len, QIO_DONT_KICK, MEM_WAIT);
                removed_amt = freeblist(blist);
@@ -924,6 +955,7 @@ struct block *packblock(struct block *bp)
                        memmove((*l)->wp, nbp->rp, n);
                        (*l)->wp += n;
                        (*l)->next = nbp->next;
+                       nbp->next = NULL;
                        freeb(nbp);
                }
        }
@@ -995,21 +1027,29 @@ static int __blist_clone_to(struct block *blist, struct block *newb, int len,
        unsigned int nr_bufs = 0;
        unsigned int b_idx, newb_idx = 0;
        uint8_t *first_main_body = 0;
+       ssize_t sofar = 0;
 
-       /* find the first block; keep offset relative to the latest b in the list */
+       /* find the first block; keep offset relative to the latest b in the
+        * list */
        for (b = blist; b; b = b->next) {
                if (BLEN(b) > offset)
                        break;
                offset -= BLEN(b);
        }
-       /* qcopy semantics: if you asked for an offset outside the block list, you
-        * get an empty block back */
+       /* qcopy semantics: if you asked for an offset outside the block list,
+        * you get an empty block back */
        if (!b)
                return 0;
        first = b;
+       sofar -= offset; /* don't count the remaining offset in the first b */
        /* upper bound for how many buffers we'll need in newb */
        for (/* b is set*/; b; b = b->next) {
-               nr_bufs += 1 + b->nr_extra_bufs;        /* 1 for the main body */
+               nr_bufs += BHLEN(b) ? 1 : 0;
+               /* still assuming nr_extra == nr_valid */
+               nr_bufs += b->nr_extra_bufs;
+               sofar += BLEN(b);
+               if (sofar > len)
+                       break;
        }
        /* we might be holding a spinlock here, so we won't wait for kmalloc */
        if (block_add_extd(newb, nr_bufs, 0) != 0) {
@@ -1021,31 +1061,39 @@ static int __blist_clone_to(struct block *blist, struct block *newb, int len,
                if (offset) {
                        if (offset < BHLEN(b)) {
                                /* off is in the main body */
-                               len -= point_to_body(b, b->rp + offset, newb, newb_idx, len);
+                               len -= point_to_body(b, b->rp + offset, newb,
+                                                    newb_idx, len);
                                newb_idx++;
                        } else {
-                               /* off is in one of the buffers (or just past the last one).
-                                * we're not going to point to b's main body at all. */
+                               /* off is in one of the buffers (or just past
+                                * the last one).  we're not going to point to
+                                * b's main body at all. */
                                offset -= BHLEN(b);
                                assert(b->extra_data);
-                               /* assuming these extrabufs are packed, or at least that len
-                                * isn't gibberish */
+                               /* assuming these extrabufs are packed, or at
+                                * least that len isn't gibberish */
                                while (b->extra_data[b_idx].len <= offset) {
                                        offset -= b->extra_data[b_idx].len;
                                        b_idx++;
                                }
-                               /* now offset is set to our offset in the b_idx'th buf */
-                               len -= point_to_buf(b, b_idx, offset, newb, newb_idx, len);
+                               /* now offset is set to our offset in the
+                                * b_idx'th buf */
+                               len -= point_to_buf(b, b_idx, offset, newb,
+                                                   newb_idx, len);
                                newb_idx++;
                                b_idx++;
                        }
                        offset = 0;
                } else {
-                       len -= point_to_body(b, b->rp, newb, newb_idx, len);
-                       newb_idx++;
+                       if (BHLEN(b)) {
+                               len -= point_to_body(b, b->rp, newb, newb_idx,
+                                                    len);
+                               newb_idx++;
+                       }
                }
-               /* knock out all remaining bufs.  we only did one point_to_ op by now,
-                * and any point_to_ could be our last if it consumed all of len. */
+               /* knock out all remaining bufs.  we only did one point_to_ op
+                * by now, and any point_to_ could be our last if it consumed
+                * all of len. */
                for (int i = b_idx; (i < b->nr_extra_bufs) && len; i++) {
                        len -= point_to_buf(b, i, 0, newb, newb_idx, len);
                        newb_idx++;
@@ -1059,6 +1107,7 @@ struct block *blist_clone(struct block *blist, int header_len, int len,
 {
        int ret;
        struct block *newb = block_alloc(header_len, MEM_WAIT);
+
        do {
                ret = __blist_clone_to(blist, newb, len, offset);
                if (ret)
@@ -1078,7 +1127,7 @@ struct block *qclone(struct queue *q, int header_len, int len, uint32_t offset)
        /* the while loop should rarely be used: it would require someone
         * concurrently adding to the queue. */
        do {
-               /* TODO: RCU: protecting the q list (b->next) (need read lock) */
+               /* TODO: RCU protect the q list (b->next) (need read lock) */
                spin_lock_irqsave(&q->lock);
                ret = __blist_clone_to(q->bfirst, newb, len, offset);
                spin_unlock_irqsave(&q->lock);
@@ -1088,64 +1137,9 @@ struct block *qclone(struct queue *q, int header_len, int len, uint32_t offset)
        return newb;
 }
 
-/*
- *  copy from offset in the queue
- */
-struct block *qcopy_old(struct queue *q, int len, uint32_t offset)
-{
-       int sofar;
-       int n;
-       struct block *b, *nb;
-       uint8_t *p;
-
-       nb = block_alloc(len, MEM_WAIT);
-
-       spin_lock_irqsave(&q->lock);
-
-       /* go to offset */
-       b = q->bfirst;
-       for (sofar = 0;; sofar += n) {
-               if (b == NULL) {
-                       spin_unlock_irqsave(&q->lock);
-                       return nb;
-               }
-               n = BLEN(b);
-               if (sofar + n > offset) {
-                       p = b->rp + offset - sofar;
-                       n -= offset - sofar;
-                       break;
-               }
-               QDEBUG checkb(b, "qcopy");
-               b = b->next;
-       }
-
-       /* copy bytes from there */
-       for (sofar = 0; sofar < len;) {
-               if (n > len - sofar)
-                       n = len - sofar;
-               PANIC_EXTRA(b);
-               memmove(nb->wp, p, n);
-               qcopycnt += n;
-               sofar += n;
-               nb->wp += n;
-               b = b->next;
-               if (b == NULL)
-                       break;
-               n = BLEN(b);
-               p = b->rp;
-       }
-       spin_unlock_irqsave(&q->lock);
-
-       return nb;
-}
-
 struct block *qcopy(struct queue *q, int len, uint32_t offset)
 {
-#ifdef CONFIG_BLOCK_EXTRAS
        return qclone(q, 0, len, offset);
-#else
-       return qcopy_old(q, len, offset);
-#endif
 }
 
 static void qinit_common(struct queue *q)
@@ -1213,7 +1207,8 @@ static bool qwait_and_ilock(struct queue *q, int qio_flags)
                if (q->state & Qclosed) {
                        if (++q->eof > 3) {
                                spin_unlock_irqsave(&q->lock);
-                               error(EPIPE, "multiple reads on a closed queue");
+                               error(EPIPE,
+                                     "multiple reads on a closed queue");
                        }
                        if (q->err[0]) {
                                spin_unlock_irqsave(&q->lock);
@@ -1226,18 +1221,20 @@ static bool qwait_and_ilock(struct queue *q, int qio_flags)
                        error(EAGAIN, "queue empty");
                }
                spin_unlock_irqsave(&q->lock);
-               /* As with the producer side, we check for a condition while holding the
-                * q->lock, decide to sleep, then unlock.  It's like the "check, signal,
-                * check again" pattern, but we do it conditionally.  Both sides agree
-                * synchronously to do it, and those decisions are made while holding
-                * q->lock.  I think this is OK.
+               /* As with the producer side, we check for a condition while
+                * holding the q->lock, decide to sleep, then unlock.  It's like
+                * the "check, signal, check again" pattern, but we do it
+                * conditionally.  Both sides agree synchronously to do it, and
+                * those decisions are made while holding q->lock.  I think this
+                * is OK.
                 *
-                * The invariant is that no reader sleeps when the queue has data.
-                * While holding the rendez lock, if we see there's no data, we'll
-                * sleep.  Since we saw there was no data, the next writer will see (or
-                * already saw) no data, and then the writer decides to rendez_wake,
-                * which will grab the rendez lock.  If the writer already did that,
-                * then we'll see notempty when we do our check-again. */
+                * The invariant is that no reader sleeps when the queue has
+                * data.  While holding the rendez lock, if we see there's no
+                * data, we'll sleep.  Since we saw there was no data, the next
+                * writer will see (or already saw) no data, and then the writer
+                * decides to rendez_wake, which will grab the rendez lock.  If
+                * the writer already did that, then we'll see notempty when we
+                * do our check-again. */
                rendez_sleep(&q->rr, notempty, q);
        }
 }
@@ -1267,31 +1264,23 @@ static size_t read_from_block(struct block *b, uint8_t *to, size_t amt)
 
        copy_amt = MIN(BHLEN(b), amt);
        memcpy(to, b->rp, copy_amt);
-       /* advance the rp, since this block not be completely consumed and future
-        * reads need to know where to pick up from */
+       /* advance the rp, since this block might not be completely consumed and
+        * future reads need to know where to pick up from */
        b->rp += copy_amt;
        to += copy_amt;
        amt -= copy_amt;
        retval += copy_amt;
        for (int i = 0; (i < b->nr_extra_bufs) && amt; i++) {
                ebd = &b->extra_data[i];
-               /* skip empty entires.  if we track this in the struct block, we can
-                * just start the for loop early */
+               /* skip empty entires.  if we track this in the struct block, we
+                * can just start the for loop early */
                if (!ebd->base || !ebd->len)
                        continue;
                copy_amt = MIN(ebd->len, amt);
                memcpy(to, (void*)(ebd->base + ebd->off), copy_amt);
-               /* we're actually consuming the entries, just like how we advance rp up
-                * above, and might only consume part of one. */
-               ebd->len -= copy_amt;
-               ebd->off += copy_amt;
-               b->extra_len -= copy_amt;
-               if (!ebd->len) {
-                       /* we don't actually have to decref here.  it's also done in
-                        * freeb().  this is the earliest we can free. */
-                       kfree((void*)ebd->base);
-                       ebd->base = ebd->off = 0;
-               }
+               /* we're actually consuming the entries, just like how we
+                * advance rp up above, and might only consume part of one. */
+               block_consume_ebd_bytes(b, ebd, copy_amt);
                to += copy_amt;
                amt -= copy_amt;
                retval += copy_amt;
@@ -1322,6 +1311,7 @@ struct block *bl2mem(uint8_t * p, struct block *b, int n)
                n -= i;
                p += i;
                next = b->next;
+               b->next = NULL;
                freeb(b);
        }
        return NULL;
@@ -1335,13 +1325,13 @@ static size_t read_all_blocks(struct block *b, void *va, size_t len)
        struct block *next;
 
        do {
-               /* We should be draining every block completely. */
-               assert(BLEN(b) <= len - sofar);
                assert(va);
-               assert(va + sofar);
                assert(b->rp);
                sofar += read_from_block(b, va + sofar, len - sofar);
+               if (BLEN(b) && b->next)
+                       panic("Failed to drain entire block (Qmsg?) but had a next!");
                next = b->next;
+               b->next = NULL;
                freeb(b);
                b = next;
        } while (b);
@@ -1393,6 +1383,8 @@ void qputback(struct queue *q, struct block *b)
                q->blast = b;
        q->bfirst = b;
        q->dlen += BLEN(b);
+       /* qputback seems to undo a read, so we can undo the accounting too. */
+       q->bytes_read -= BLEN(b);
 }
 
 /*
@@ -1401,7 +1393,8 @@ void qputback(struct queue *q, struct block *b)
  */
 struct block *qbread(struct queue *q, size_t len)
 {
-       return __qbread(q, len, QIO_JUST_ONE_BLOCK | QIO_CAN_ERR_SLEEP, MEM_WAIT);
+       return __qbread(q, len, QIO_JUST_ONE_BLOCK | QIO_CAN_ERR_SLEEP,
+                       MEM_WAIT);
 }
 
 struct block *qbread_nonblock(struct queue *q, size_t len)
@@ -1422,8 +1415,8 @@ size_t qread(struct queue *q, void *va, size_t len)
 
 size_t qread_nonblock(struct queue *q, void *va, size_t len)
 {
-       struct block *blist = __qbread(q, len, QIO_CAN_ERR_SLEEP | QIO_NON_BLOCK,
-                                      MEM_WAIT);
+       struct block *blist = __qbread(q, len, QIO_CAN_ERR_SLEEP |
+                                      QIO_NON_BLOCK, MEM_WAIT);
 
        if (!blist)
                return 0;
@@ -1484,14 +1477,16 @@ static ssize_t __qbwrite(struct queue *q, struct block *b, int qio_flags)
        }
        if ((qio_flags & QIO_LIMIT) && (q->dlen >= q->limit)) {
                /* drop overflow takes priority over regular non-blocking */
-               if ((qio_flags & QIO_DROP_OVERFLOW) || (q->state & Qdropoverflow)) {
+               if ((qio_flags & QIO_DROP_OVERFLOW)
+                   || (q->state & Qdropoverflow)) {
                        spin_unlock_irqsave(&q->lock);
                        freeb(b);
                        return -1;
                }
-               /* People shouldn't set NON_BLOCK without CAN_ERR, but we can be nice
-                * and catch it. */
-               if ((qio_flags & QIO_CAN_ERR_SLEEP) && (qio_flags & QIO_NON_BLOCK)) {
+               /* People shouldn't set NON_BLOCK without CAN_ERR, but we can be
+                * nice and catch it. */
+               if ((qio_flags & QIO_CAN_ERR_SLEEP)
+                   && (qio_flags & QIO_NON_BLOCK)) {
                        spin_unlock_irqsave(&q->lock);
                        freeb(b);
                        error(EAGAIN, "queue full");
@@ -1501,15 +1496,17 @@ static ssize_t __qbwrite(struct queue *q, struct block *b, int qio_flags)
        QDEBUG checkb(b, "__qbwrite");
        spin_unlock_irqsave(&q->lock);
        /* TODO: not sure if the usage of a kick is mutually exclusive with a
-        * wakeup, meaning that actual users either want a kick or have qreaders. */
+        * wakeup, meaning that actual users either want a kick or have
+        * qreaders. */
        if (q->kick && (was_unreadable || (q->state & Qkick)))
                q->kick(q->arg);
        if (was_unreadable) {
-               /* Unlike the read side, there's no double-check to make sure the queue
-                * transitioned across an edge.  We know we added something, so that's
-                * enough.  We wake if the queue was empty.  Both sides are the same, in
-                * that the condition for which we do the rendez_wakeup() is the same as
-                * the condition done for the rendez_sleep(). */
+               /* Unlike the read side, there's no double-check to make sure
+                * the queue transitioned across an edge.  We know we added
+                * something, so that's enough.  We wake if the queue was empty.
+                * Both sides are the same, in that the condition for which we
+                * do the rendez_wakeup() is the same as the condition done for
+                * the rendez_sleep(). */
                rendez_wakeup(&q->rr);
                qwake_cb(q, FDTAP_FILT_READABLE);
        }
@@ -1527,26 +1524,27 @@ static ssize_t __qbwrite(struct queue *q, struct block *b, int qio_flags)
         */
        if ((qio_flags & QIO_CAN_ERR_SLEEP) &&
            !(q->state & Qdropoverflow) && !(qio_flags & QIO_NON_BLOCK)) {
-               /* This is a racy peek at the q status.  If we accidentally block, our
-                * rendez will return.  The rendez's peak (qwriter_should_wake) is also
-                * racy w.r.t.  the q's spinlock (that lock protects writes, but not
-                * reads).
+               /* This is a racy peek at the q status.  If we accidentally
+                * block, our rendez will return.  The rendez's peak
+                * (qwriter_should_wake) is also racy w.r.t.  the q's spinlock
+                * (that lock protects writes, but not reads).
                 *
-                * Here's the deal: when holding the rendez lock, if we see the sleep
-                * condition, the consumer will wake us.  The condition will only ever
-                * be changed by the next qbread() (consumer, changes q->dlen).  That
-                * code will do a rendez wake, which will spin on the rendez lock,
-                * meaning it won't procede until we either see the new state (and
-                * return) or put ourselves on the rendez, and wake up.
+                * Here's the deal: when holding the rendez lock, if we see the
+                * sleep condition, the consumer will wake us.  The condition
+                * will only ever be changed by the next qbread() (consumer,
+                * changes q->dlen).  That code will do a rendez wake, which
+                * will spin on the rendez lock, meaning it won't procede until
+                * we either see the new state (and return) or put ourselves on
+                * the rendez, and wake up.
                 *
-                * The pattern is one side writes mem, then signals.  Our side checks
-                * the signal, then reads the mem.  The goal is to not miss seeing the
-                * signal AND missing the memory write.  In this specific case, the
-                * signal is actually synchronous (the rendez lock) and not basic shared
-                * memory.
+                * The pattern is one side writes mem, then signals.  Our side
+                * checks the signal, then reads the mem.  The goal is to not
+                * miss seeing the signal AND missing the memory write.  In this
+                * specific case, the signal is actually synchronous (the rendez
+                * lock) and not basic shared memory.
                 *
-                * Oh, and we spin in case we woke early and someone else filled the
-                * queue, mesa-style. */
+                * Oh, and we spin in case we woke early and someone else filled
+                * the queue, mesa-style. */
                while (!qwriter_should_wake(q))
                        rendez_sleep(&q->wr, qwriter_should_wake, q);
        }
@@ -1578,10 +1576,10 @@ static struct block *build_block(void *from, size_t len, int mem_flags)
        struct block *b;
        void *ext_buf;
 
-       /* If len is small, we don't need to bother with the extra_data.  But until
-        * the whole stack can handle extd blocks, we'll use them unconditionally.
-        * */
-#ifdef CONFIG_BLOCK_EXTRAS
+       /* If len is small, we don't need to bother with the extra_data.  But
+        * until the whole stack can handle extd blocks, we'll use them
+        * unconditionally.  */
+
        /* allocb builds in 128 bytes of header space to all blocks, but this is
         * only available via padblock (to the left).  we also need some space
         * for pullupblock for some basic headers (like icmp) that get written
@@ -1604,13 +1602,6 @@ static struct block *build_block(void *from, size_t len, int mem_flags)
        b->extra_data[0].off = 0;
        b->extra_data[0].len = len;
        b->extra_len += len;
-#else
-       b = block_alloc(len, mem_flags);
-       if (!b)
-               return 0;
-       memmove(b->wp, from, len);
-       b->wp += len;
-#endif
        return b;
 }
 
@@ -1624,18 +1615,20 @@ static ssize_t __qwrite(struct queue *q, void *vp, size_t len, int mem_flags,
        uint8_t *p = vp;
        void *ext_buf;
 
-       /* Only some callers can throw.  Others might be in a context where waserror
-        * isn't safe. */
+       /* Only some callers can throw.  Others might be in a context where
+        * waserror isn't safe. */
        if ((qio_flags & QIO_CAN_ERR_SLEEP) && waserror()) {
-               /* Any error (EAGAIN for nonblock, syscall aborted, even EPIPE) after
-                * some data has been sent should be treated as a partial write. */
+               /* Any error (EAGAIN for nonblock, syscall aborted, even EPIPE)
+                * after some data has been sent should be treated as a partial
+                * write. */
                if (sofar)
                        goto out_ok;
                nexterror();
        }
        do {
                n = len - sofar;
-               /* This is 64K, the max amount per single block.  Still a good value? */
+               /* This is 64K, the max amount per single block.  Still a good
+                * value? */
                if (n > Maxatomic)
                        n = Maxatomic;
                b = build_block(p + sofar, n, mem_flags);
@@ -1760,6 +1753,11 @@ int qlen(struct queue *q)
        return q->dlen;
 }
 
+size_t q_bytes_read(struct queue *q)
+{
+       return q->bytes_read;
+}
+
 /*
  * return space remaining before flow control
  *
@@ -1794,9 +1792,20 @@ int qcanread(struct queue *q)
 /*
  *  change queue limit
  */
-void qsetlimit(struct queue *q, int limit)
+void qsetlimit(struct queue *q, size_t limit)
 {
+       bool was_writable = qwritable(q);
+
        q->limit = limit;
+       if (!was_writable && qwritable(q)) {
+               rendez_wakeup(&q->wr);
+               qwake_cb(q, FDTAP_FILT_WRITABLE);
+       }
+}
+
+size_t qgetlimit(struct queue *q)
+{
+       return q->limit;
 }
 
 /*