qio: Add non-blocking queues
[akaros.git] / kern / src / ns / qio.c
index e2d477c..4741f6c 100644 (file)
 #include <smp.h>
 #include <ip.h>
 
+#define PANIC_EXTRA(b)                                                          \
+{                                                                              \
+       if ((b)->extra_len)                                                        \
+               panic("%s doesn't handle extra_data", __FUNCTION__);               \
+}
+
 static uint32_t padblockcnt;
 static uint32_t concatblockcnt;
 static uint32_t pullupblockcnt;
@@ -29,60 +35,55 @@ static int debugging;
  *  IO queues
  */
 
-struct queue
-{
+struct queue {
        spinlock_t lock;;
 
-       struct block*   bfirst;         /* buffer */
-       struct block*   blast;
+       struct block *bfirst;           /* buffer */
+       struct block *blast;
 
-       int     len;            /* bytes allocated to queue */
-       int     dlen;           /* data bytes in queue */
-       int     limit;          /* max bytes in queue */
-       int     iNULLim;                /* initial limit */
-       int     state;
-       int     noblock;        /* true if writes return immediately when q full */
-       int     eof;            /* number of eofs read by user */
+       int len;                                        /* bytes allocated to queue */
+       int dlen;                                       /* data bytes in queue */
+       int limit;                                      /* max bytes in queue */
+       int inilim;                             /* initial limit */
+       int state;
+       int eof;                                        /* number of eofs read by user */
 
-       void    (*kick)(void*); /* restart output */
-       void    (*bypass)(void*, struct block*);        /* bypass queue altogether */
-       void*   arg;            /* argument to kick */
+       void (*kick) (void *);          /* restart output */
+       void (*bypass) (void *, struct block *);        /* bypass queue altogether */
+       void *arg;                                      /* argument to kick */
 
-       qlock_t rlock;          /* mutex for reading processes */
-       struct rendez   rr;             /* process waiting to read */
-       qlock_t wlock;          /* mutex for writing processes */
-       struct rendez   wr;             /* process waiting to write */
+       qlock_t rlock;                          /* mutex for reading processes */
+       struct rendez rr;                       /* process waiting to read */
+       qlock_t wlock;                          /* mutex for writing processes */
+       struct rendez wr;                       /* process waiting to write */
 
-       char    err[ERRMAX];
+       char err[ERRMAX];
 };
 
-enum
-{
-       Maxatomic       = 64*1024,
+enum {
+       Maxatomic = 64 * 1024,
 };
 
-unsigned int   qiomaxatomic = Maxatomic;
+unsigned int qiomaxatomic = Maxatomic;
 
-void
-ixsummary(void)
+void ixsummary(void)
 {
        debugging ^= 1;
        iallocsummary();
-       printd("pad %lud, concat %lud, pullup %lud, copy %lud\n",
-               padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
-       printd("consume %lud, produce %lud, qcopy %lud\n",
-               consumecnt, producecnt, qcopycnt);
+       printd("pad %lu, concat %lu, pullup %lu, copy %lu\n",
+                  padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
+       printd("consume %lu, produce %lu, qcopy %lu\n",
+                  consumecnt, producecnt, qcopycnt);
 }
 
 /*
  *  free a list of blocks
  */
-void
-freeblist(struct block *b)
+void freeblist(struct block *b)
 {
        struct block *next;
 
-       for(; b != 0; b = next){
+       for (; b != 0; b = next) {
                next = b->next;
                b->next = 0;
                freeb(b);
@@ -92,24 +93,29 @@ freeblist(struct block *b)
 /*
  *  pad a block to the front (or the back if size is negative)
  */
-struct block*
-padblock(struct block *bp, int size)
+struct block *padblock(struct block *bp, int size)
 {
        int n;
        struct block *nbp;
+       uint8_t bcksum = bp->flag & BCKSUM_FLAGS;
+       uint16_t checksum_start = bp->checksum_start;
+       uint16_t checksum_offset = bp->checksum_offset;
+       uint16_t mss = bp->mss;
 
        QDEBUG checkb(bp, "padblock 1");
-       if(size >= 0){
-               if(bp->rp - bp->base >= size){
+       if (size >= 0) {
+               if (bp->rp - bp->base >= size) {
+                       bp->checksum_start += size;
                        bp->rp -= size;
                        return bp;
                }
 
-               if(bp->next)
-                       panic("padblock 0x%luX", getcallerpc(&bp));
+               PANIC_EXTRA(bp);
+               if (bp->next)
+                       panic("padblock %p", getcallerpc(&bp));
                n = BLEN(bp);
                padblockcnt++;
-               nbp = allocb(size+n);
+               nbp = allocb(size + n);
                nbp->rp += size;
                nbp->wp = nbp->rp;
                memmove(nbp->wp, bp->rp, n);
@@ -119,19 +125,27 @@ padblock(struct block *bp, int size)
        } else {
                size = -size;
 
-               if(bp->next)
-                       panic("padblock 0x%luX", getcallerpc(&bp));
+               PANIC_EXTRA(bp);
 
-               if(bp->lim - bp->wp >= size)
+               if (bp->next)
+                       panic("padblock %p", getcallerpc(&bp));
+
+               if (bp->lim - bp->wp >= size)
                        return bp;
 
                n = BLEN(bp);
                padblockcnt++;
-               nbp = allocb(size+n);
+               nbp = allocb(size + n);
                memmove(nbp->wp, bp->rp, n);
                nbp->wp += n;
                freeb(bp);
        }
+       if (bcksum) {
+               nbp->flag |= bcksum;
+               nbp->checksum_start = checksum_start;
+               nbp->checksum_offset = checksum_offset;
+               nbp->mss = mss;
+       }
        QDEBUG checkb(nbp, "padblock 1");
        return nbp;
 }
@@ -139,13 +153,12 @@ padblock(struct block *bp, int size)
 /*
  *  return count of bytes in a string of blocks
  */
-int
-blocklen(struct block *bp)
+int blocklen(struct block *bp)
 {
        int len;
 
        len = 0;
-       while(bp) {
+       while (bp) {
                len += BLEN(bp);
                bp = bp->next;
        }
@@ -155,13 +168,12 @@ blocklen(struct block *bp)
 /*
  * return count of space in blocks
  */
-int
-blockalloclen(struct block *bp)
+int blockalloclen(struct block *bp)
 {
        int len;
 
        len = 0;
-       while(bp) {
+       while (bp) {
                len += BALLOC(bp);
                bp = bp->next;
        }
@@ -172,17 +184,18 @@ blockalloclen(struct block *bp)
  *  copy the  string of blocks into
  *  a single block and free the string
  */
-struct block*
-concatblock(struct block *bp)
+struct block *concatblock(struct block *bp)
 {
        int len;
        struct block *nb, *f;
 
-       if(bp->next == 0)
+       if (bp->next == 0)
                return bp;
 
+       /* probably use parts of qclone */
+       PANIC_EXTRA(bp);
        nb = allocb(blocklen(bp));
-       for(f = bp; f; f = f->next) {
+       for (f = bp; f; f = f->next) {
                len = BLEN(f);
                memmove(nb->wp, f->rp, len);
                nb->wp += len;
@@ -193,27 +206,99 @@ concatblock(struct block *bp)
        return nb;
 }
 
+/* Returns a block with the remaining contents of b all in the main body of the
+ * returned block.  Replace old references to b with the returned value (which
+ * may still be 'b', if no change was needed. */
+struct block *linearizeblock(struct block *b)
+{
+       struct block *newb;
+       size_t len;
+       struct extra_bdata *ebd;
+
+       if (!b->extra_len)
+               return b;
+
+       newb = allocb(BLEN(b));
+       len = BHLEN(b);
+       memcpy(newb->wp, b->rp, len);
+       newb->wp += len;
+       len = b->extra_len;
+       for (int i = 0; (i < b->nr_extra_bufs) && len; i++) {
+               ebd = &b->extra_data[i];
+               if (!ebd->base || !ebd->len)
+                       continue;
+               memcpy(newb->wp, (void*)(ebd->base + ebd->off), ebd->len);
+               newb->wp += ebd->len;
+               len -= ebd->len;
+       }
+       /* TODO: any other flags that need copied over? */
+       if (b->flag & BCKSUM_FLAGS) {
+               newb->flag |= (b->flag & BCKSUM_FLAGS);
+               newb->checksum_start = b->checksum_start;
+               newb->checksum_offset = b->checksum_offset;
+               newb->mss = b->mss;
+       }
+       freeb(b);
+       return newb;
+}
+
 /*
- *  make sure the first block has at least n bytes
+ *  make sure the first block has at least n bytes in its main body
  */
-struct block*
-pullupblock(struct block *bp, int n)
+struct block *pullupblock(struct block *bp, int n)
 {
-       int i;
+       int i, len, seglen;
        struct block *nbp;
+       struct extra_bdata *ebd;
 
        /*
         *  this should almost always be true, it's
         *  just to avoid every caller checking.
         */
-       if(BLEN(bp) >= n)
+       if (BHLEN(bp) >= n)
                return bp;
 
+        /* a start at explicit main-body / header management */
+       if (bp->extra_len) {
+               if (n > bp->lim - bp->rp) {
+                       /* would need to realloc a new block and copy everything over. */
+                       panic("can't pullup %d bytes, no place to put it: bp->lim %p, bp->rp %p, bp->lim-bp->rp %d\n",
+                                       n, bp->lim, bp->rp, bp->lim-bp->rp);
+               }
+               len = n - BHLEN(bp);
+               if (len > bp->extra_len)
+                       panic("pullup more than extra (%d, %d, %d)\n",
+                             n, BHLEN(bp), bp->extra_len);
+               checkb(bp, "before pullup");
+               for (int i = 0; (i < bp->nr_extra_bufs) && len; i++) {
+                       ebd = &bp->extra_data[i];
+                       if (!ebd->base || !ebd->len)
+                               continue;
+                       seglen = MIN(ebd->len, len);
+                       memcpy(bp->wp, (void*)(ebd->base + ebd->off), seglen);
+                       bp->wp += seglen;
+                       len -= seglen;
+                       ebd->len -= seglen;
+                       ebd->off += seglen;
+                       bp->extra_len -= seglen;
+                       if (ebd->len == 0) {
+                               kfree((void *)ebd->base);
+                               ebd->off = 0;
+                               ebd->base = 0;
+                       }
+               }
+               /* maybe just call pullupblock recursively here */
+               if (len)
+                       panic("pullup %d bytes overdrawn\n", len);
+               checkb(bp, "after pullup");
+               return bp;
+       }
+
        /*
         *  if not enough room in the first block,
         *  add another to the front of the list.
         */
-       if(bp->lim - bp->rp < n){
+       if (bp->lim - bp->rp < n) {
                nbp = allocb(n);
                nbp->next = bp;
                bp = nbp;
@@ -223,17 +308,16 @@ pullupblock(struct block *bp, int n)
         *  copy bytes from the trailing blocks into the first
         */
        n -= BLEN(bp);
-       while((nbp = bp->next)){
+       while ((nbp = bp->next)) {
                i = BLEN(nbp);
-               if(i > n) {
+               if (i > n) {
                        memmove(bp->wp, nbp->rp, n);
                        pullupblockcnt++;
                        bp->wp += n;
                        nbp->rp += n;
                        QDEBUG checkb(bp, "pullupblock 1");
                        return bp;
-               }
-               else {
+               } else {
                        memmove(bp->wp, nbp->rp, i);
                        pullupblockcnt++;
                        bp->wp += i;
@@ -241,7 +325,7 @@ pullupblock(struct block *bp, int n)
                        nbp->next = 0;
                        freeb(nbp);
                        n -= i;
-                       if(n == 0){
+                       if (n == 0) {
                                QDEBUG checkb(bp, "pullupblock 2");
                                return bp;
                        }
@@ -252,83 +336,177 @@ pullupblock(struct block *bp, int n)
 }
 
 /*
- *  make sure the first block has at least n bytes
+ *  make sure the first block has at least n bytes in its main body
  */
-struct block*
-pullupqueue(struct queue *q, int n)
+struct block *pullupqueue(struct queue *q, int n)
 {
        struct block *b;
 
-       if((BLEN(q->bfirst) >= n))
+       /* TODO: lock to protect the queue links? */
+       if ((BHLEN(q->bfirst) >= n))
                return q->bfirst;
        q->bfirst = pullupblock(q->bfirst, n);
-       for(b = q->bfirst; b != NULL && b->next != NULL; b = b->next)
-               ;
+       for (b = q->bfirst; b != NULL && b->next != NULL; b = b->next) ;
        q->blast = b;
        return q->bfirst;
 }
 
+/* throw away count bytes from the front of
+ * block's extradata.  Returns count of bytes
+ * thrown away
+ */
+
+static int pullext(struct block *bp, int count)
+{
+       struct extra_bdata *ed;
+       int i, rem, bytes = 0;
+
+       for (i = 0; bp->extra_len && count && i < bp->nr_extra_bufs; i++) {
+               ed = &bp->extra_data[i];
+               rem = MIN(count, ed->len);
+               bp->extra_len -= rem;
+               count -= rem;
+               bytes += rem;
+               ed->off += rem;
+               ed->len -= rem;
+               if (ed->len == 0) {
+                       kfree((void *)ed->base);
+                       ed->base = 0;
+                       ed->off = 0;
+               }
+       }
+       return bytes;
+}
+
+/* throw away count bytes from the end of a
+ * block's extradata.  Returns count of bytes
+ * thrown away
+ */
+
+static int dropext(struct block *bp, int count)
+{
+       struct extra_bdata *ed;
+       int i, rem, bytes = 0;
+
+       for (i = bp->nr_extra_bufs - 1; bp->extra_len && count && i >= 0; i--) {
+               ed = &bp->extra_data[i];
+               rem = MIN(count, ed->len);
+               bp->extra_len -= rem;
+               count -= rem;
+               bytes += rem;
+               ed->len -= rem;
+               if (ed->len == 0) {
+                       kfree((void *)ed->base);
+                       ed->base = 0;
+                       ed->off = 0;
+               }
+       }
+       return bytes;
+}
+
+/*
+ *  throw away up to count bytes from a
+ *  list of blocks.  Return count of bytes
+ *  thrown away.
+ */
+static int _pullblock(struct block **bph, int count, int free)
+{
+       struct block *bp;
+       int n, bytes;
+
+       bytes = 0;
+       if (bph == NULL)
+               return 0;
+
+       while (*bph != NULL && count != 0) {
+               bp = *bph;
+
+               n = MIN(BHLEN(bp), count);
+               bytes += n;
+               count -= n;
+               bp->rp += n;
+               n = pullext(bp, count);
+               bytes += n;
+               count -= n;
+               QDEBUG checkb(bp, "pullblock ");
+               if (BLEN(bp) == 0 && (free || count)) {
+                       *bph = bp->next;
+                       bp->next = NULL;
+                       freeb(bp);
+               }
+       }
+       return bytes;
+}
+
+int pullblock(struct block **bph, int count)
+{
+       return _pullblock(bph, count, 1);
+}
+
 /*
  *  trim to len bytes starting at offset
  */
-struct block *
-trimblock(struct block *bp, int offset, int len)
+struct block *trimblock(struct block *bp, int offset, int len)
 {
-       uint32_t l;
-       struct block *nb, *startb;
+       uint32_t l, trim;
+       int olen = len;
 
        QDEBUG checkb(bp, "trimblock 1");
-       if(blocklen(bp) < offset+len) {
+       if (blocklen(bp) < offset + len) {
                freeblist(bp);
                return NULL;
        }
 
-       while((l = BLEN(bp)) < offset) {
-               offset -= l;
-               nb = bp->next;
-               bp->next = NULL;
-               freeb(bp);
-               bp = nb;
+       l =_pullblock(&bp, offset, 0);
+       if (bp == NULL)
+               return NULL;
+       if (l != offset) {
+               freeblist(bp);
+               return NULL;
        }
 
-       startb = bp;
-       bp->rp += offset;
-
-       while((l = BLEN(bp)) < len) {
+       while ((l = BLEN(bp)) < len) {
                len -= l;
                bp = bp->next;
        }
 
-       bp->wp -= (BLEN(bp) - len);
+       trim = BLEN(bp) - len;
+       trim -= dropext(bp, trim);
+       bp->wp -= trim;
 
-       if(bp->next) {
+       if (bp->next) {
                freeblist(bp->next);
                bp->next = NULL;
        }
-
-       return startb;
+       return bp;
 }
 
 /*
  *  copy 'count' bytes into a new block
  */
-struct block*
-copyblock(struct block *bp, int count)
+struct block *copyblock(struct block *bp, int count)
 {
        int l;
        struct block *nbp;
 
        QDEBUG checkb(bp, "copyblock 0");
        nbp = allocb(count);
-       for(; count > 0 && bp != 0; bp = bp->next){
+       if (bp->flag & BCKSUM_FLAGS) {
+               nbp->flag |= (bp->flag & BCKSUM_FLAGS);
+               nbp->checksum_start = bp->checksum_start;
+               nbp->checksum_offset = bp->checksum_offset;
+               nbp->mss = bp->mss;
+       }
+       PANIC_EXTRA(bp);
+       for (; count > 0 && bp != 0; bp = bp->next) {
                l = BLEN(bp);
-               if(l > count)
+               if (l > count)
                        l = count;
                memmove(nbp->wp, bp->rp, l);
                nbp->wp += l;
                count -= l;
        }
-       if(count > 0){
+       if (count > 0) {
                memset(nbp->wp, 0, count);
                nbp->wp += count;
        }
@@ -338,18 +516,18 @@ copyblock(struct block *bp, int count)
        return nbp;
 }
 
-struct block*
-adjustblock(struct block* bp, int len)
+struct block *adjustblock(struct block *bp, int len)
 {
        int n;
        struct block *nbp;
 
-       if(len < 0){
+       if (len < 0) {
                freeb(bp);
                return NULL;
        }
 
-       if(bp->rp+len > bp->lim){
+       PANIC_EXTRA(bp);
+       if (bp->rp + len > bp->lim) {
                nbp = copyblock(bp, len);
                freeblist(bp);
                QDEBUG checkb(nbp, "adjustblock 1");
@@ -358,9 +536,9 @@ adjustblock(struct block* bp, int len)
        }
 
        n = BLEN(bp);
-       if(len > n)
-               memset(bp->wp, 0, len-n);
-       bp->wp = bp->rp+len;
+       if (len > n)
+               memset(bp->wp, 0, len - n);
+       bp->wp = bp->rp + len;
        QDEBUG checkb(bp, "adjustblock 2");
 
        return bp;
@@ -368,43 +546,9 @@ adjustblock(struct block* bp, int len)
 
 
 /*
- *  throw away up to count bytes from a
- *  list of blocks.  Return count of bytes
- *  thrown away.
- */
-int
-pullblock(struct block **bph, int count)
-{
-       struct block *bp;
-       int n, bytes;
-
-       bytes = 0;
-       if(bph == NULL)
-               return 0;
-
-       while(*bph != NULL && count != 0) {
-               bp = *bph;
-               n = BLEN(bp);
-               if(count < n)
-                       n = count;
-               bytes += n;
-               count -= n;
-               bp->rp += n;
-               QDEBUG checkb(bp, "pullblock ");
-               if(BLEN(bp) == 0) {
-                       *bph = bp->next;
-                       bp->next = NULL;
-                       freeb(bp);
-               }
-       }
-       return bytes;
-}
-
-/*
  *  get next block from a queue, return null if nothing there
  */
-struct block*
-qget(struct queue *q)
+struct block *qget(struct queue *q)
 {
        int dowakeup;
        struct block *b;
@@ -413,7 +557,7 @@ qget(struct queue *q)
        spin_lock_irqsave(&q->lock);
 
        b = q->bfirst;
-       if(b == NULL){
+       if (b == NULL) {
                q->state |= Qstarve;
                spin_unlock_irqsave(&q->lock);
                return NULL;
@@ -425,7 +569,7 @@ qget(struct queue *q)
        QDEBUG checkb(b, "qget");
 
        /* if writer flow controlled, restart */
-       if((q->state & Qflow) && q->len < q->limit/2){
+       if ((q->state & Qflow) && q->len < q->limit / 2) {
                q->state &= ~Qflow;
                dowakeup = 1;
        } else
@@ -433,8 +577,8 @@ qget(struct queue *q)
 
        spin_unlock_irqsave(&q->lock);
 
-       if(dowakeup)
-               wakeup(&q->wr);
+       if (dowakeup)
+               rendez_wakeup(&q->wr);
 
        return b;
 }
@@ -443,20 +587,20 @@ qget(struct queue *q)
  *  throw away the next 'len' bytes in the queue
  * returning the number actually discarded
  */
-int
-qdiscard(struct queue *q, int len)
+int qdiscard(struct queue *q, int len)
 {
        struct block *b;
-       int dowakeup, n, sofar;
+       int dowakeup, n, sofar, body_amt, extra_amt;
+       struct extra_bdata *ebd;
 
        spin_lock_irqsave(&q->lock);
-       for(sofar = 0; sofar < len; sofar += n){
+       for (sofar = 0; sofar < len; sofar += n) {
                b = q->bfirst;
-               if(b == NULL)
+               if (b == NULL)
                        break;
                QDEBUG checkb(b, "qdiscard");
                n = BLEN(b);
-               if(n <= len - sofar){
+               if (n <= len - sofar) {
                        q->bfirst = b->next;
                        b->next = 0;
                        q->len -= BALLOC(b);
@@ -464,8 +608,32 @@ qdiscard(struct queue *q, int len)
                        freeb(b);
                } else {
                        n = len - sofar;
-                       b->rp += n;
                        q->dlen -= n;
+                       /* partial block removal */
+                       body_amt = MIN(BHLEN(b), n);
+                       b->rp += body_amt;
+                       extra_amt = n - body_amt;
+                       /* reduce q->len by the amount we remove from the extras.  The
+                        * header will always be accounted for above, during block removal.
+                        * */
+                       q->len -= extra_amt;
+                       for (int i = 0; (i < b->nr_extra_bufs) && extra_amt; i++) {
+                               ebd = &b->extra_data[i];
+                               if (!ebd->base || !ebd->len)
+                                       continue;
+                               if (extra_amt >= ebd->len) {
+                                       /* remove the entire entry, note the kfree release */
+                                       b->extra_len -= ebd->len;
+                                       extra_amt -= ebd->len;
+                                       kfree((void*)ebd->base);
+                                       ebd->base = ebd->off = ebd->len = 0;
+                                       continue;
+                               }
+                               ebd->off += extra_amt;
+                               ebd->len -= extra_amt;
+                               b->extra_len -= extra_amt;
+                               extra_amt = 0;
+                       }
                }
        }
 
@@ -473,13 +641,13 @@ qdiscard(struct queue *q, int len)
         *  if writer flow controlled, restart
         *
         *  This used to be
-        *      q->len < q->limit/2
+        *  q->len < q->limit/2
         *  but it slows down tcp too much for certain write sizes.
         *  I really don't understand it completely.  It may be
         *  due to the queue draining so fast that the transmission
         *  stalls waiting for the app to produce more data.  - presotto
         */
-       if((q->state & Qflow) && q->len < q->limit){
+       if ((q->state & Qflow) && q->len < q->limit) {
                q->state &= ~Qflow;
                dowakeup = 1;
        } else
@@ -487,8 +655,8 @@ qdiscard(struct queue *q, int len)
 
        spin_unlock_irqsave(&q->lock);
 
-       if(dowakeup)
-               wakeup(&q->wr);
+       if (dowakeup)
+               rendez_wakeup(&q->wr);
 
        return sofar;
 }
@@ -496,8 +664,7 @@ qdiscard(struct queue *q, int len)
 /*
  *  Interrupt level copy out of a queue, return # bytes copied.
  */
-int
-qconsume(struct queue *q, void *vp, int len)
+int qconsume(struct queue *q, void *vp, int len)
 {
        struct block *b;
        int n, dowakeup;
@@ -507,9 +674,9 @@ qconsume(struct queue *q, void *vp, int len)
        /* sync with qwrite */
        spin_lock_irqsave(&q->lock);
 
-       for(;;) {
+       for (;;) {
                b = q->bfirst;
-               if(b == 0){
+               if (b == 0) {
                        q->state |= Qstarve;
                        spin_unlock_irqsave(&q->lock);
                        return -1;
@@ -517,7 +684,7 @@ qconsume(struct queue *q, void *vp, int len)
                QDEBUG checkb(b, "qconsume 1");
 
                n = BLEN(b);
-               if(n > 0)
+               if (n > 0)
                        break;
                q->bfirst = b->next;
                q->len -= BALLOC(b);
@@ -527,7 +694,8 @@ qconsume(struct queue *q, void *vp, int len)
                tofree = b;
        };
 
-       if(n < len)
+       PANIC_EXTRA(b);
+       if (n < len)
                len = n;
        memmove(p, b->rp, len);
        consumecnt += n;
@@ -535,7 +703,7 @@ qconsume(struct queue *q, void *vp, int len)
        q->dlen -= len;
 
        /* discard the block if we're done with it */
-       if((q->state & Qmsg) || len == n){
+       if ((q->state & Qmsg) || len == n) {
                q->bfirst = b->next;
                b->next = 0;
                q->len -= BALLOC(b);
@@ -547,7 +715,7 @@ qconsume(struct queue *q, void *vp, int len)
        }
 
        /* if writer flow controlled, restart */
-       if((q->state & Qflow) && q->len < q->limit/2){
+       if ((q->state & Qflow) && q->len < q->limit / 2) {
                q->state &= ~Qflow;
                dowakeup = 1;
        } else
@@ -555,29 +723,28 @@ qconsume(struct queue *q, void *vp, int len)
 
        spin_unlock_irqsave(&q->lock);
 
-       if(dowakeup)
-               wakeup(&q->wr);
+       if (dowakeup)
+               rendez_wakeup(&q->wr);
 
-       if(tofree != NULL)
+       if (tofree != NULL)
                freeblist(tofree);
 
        return len;
 }
 
-int
-qpass(struct queue *q, struct block *b)
+int qpass(struct queue *q, struct block *b)
 {
        int dlen, len, dowakeup;
 
        /* sync with qread */
        dowakeup = 0;
        spin_lock_irqsave(&q->lock);
-       if(q->len >= q->limit){
+       if (q->len >= q->limit) {
                freeblist(b);
                spin_unlock_irqsave(&q->lock);
                return -1;
        }
-       if(q->state & Qclosed){
+       if (q->state & Qclosed) {
                len = blocklen(b);
                freeblist(b);
                spin_unlock_irqsave(&q->lock);
@@ -585,14 +752,14 @@ qpass(struct queue *q, struct block *b)
        }
 
        /* add buffer to queue */
-       if(q->bfirst)
+       if (q->bfirst)
                q->blast->next = b;
        else
                q->bfirst = b;
        len = BALLOC(b);
        dlen = BLEN(b);
        QDEBUG checkb(b, "qpass");
-       while(b->next){
+       while (b->next) {
                b = b->next;
                QDEBUG checkb(b, "qpass");
                len += BALLOC(b);
@@ -602,23 +769,22 @@ qpass(struct queue *q, struct block *b)
        q->len += len;
        q->dlen += dlen;
 
-       if(q->len >= q->limit/2)
+       if (q->len >= q->limit / 2)
                q->state |= Qflow;
 
-       if(q->state & Qstarve){
+       if (q->state & Qstarve) {
                q->state &= ~Qstarve;
                dowakeup = 1;
        }
        spin_unlock_irqsave(&q->lock);
 
-       if(dowakeup)
-               wakeup(&q->rr);
+       if (dowakeup)
+               rendez_wakeup(&q->rr);
 
        return len;
 }
 
-int
-qpassnolim(struct queue *q, struct block *b)
+int qpassnolim(struct queue *q, struct block *b)
 {
        int dlen, len, dowakeup;
 
@@ -626,21 +792,21 @@ qpassnolim(struct queue *q, struct block *b)
        dowakeup = 0;
        spin_lock_irqsave(&q->lock);
 
-       if(q->state & Qclosed){
+       if (q->state & Qclosed) {
                freeblist(b);
                spin_unlock_irqsave(&q->lock);
                return BALLOC(b);
        }
 
        /* add buffer to queue */
-       if(q->bfirst)
+       if (q->bfirst)
                q->blast->next = b;
        else
                q->bfirst = b;
        len = BALLOC(b);
        dlen = BLEN(b);
        QDEBUG checkb(b, "qpass");
-       while(b->next){
+       while (b->next) {
                b = b->next;
                QDEBUG checkb(b, "qpass");
                len += BALLOC(b);
@@ -650,17 +816,17 @@ qpassnolim(struct queue *q, struct block *b)
        q->len += len;
        q->dlen += dlen;
 
-       if(q->len >= q->limit/2)
+       if (q->len >= q->limit / 2)
                q->state |= Qflow;
 
-       if(q->state & Qstarve){
+       if (q->state & Qstarve) {
                q->state &= ~Qstarve;
                dowakeup = 1;
        }
        spin_unlock_irqsave(&q->lock);
 
-       if(dowakeup)
-               wakeup(&q->rr);
+       if (dowakeup)
+               rendez_wakeup(&q->rr);
 
        return len;
 }
@@ -669,16 +835,17 @@ qpassnolim(struct queue *q, struct block *b)
  *  if the allocated space is way out of line with the used
  *  space, reallocate to a smaller block
  */
-struct block*
-packblock(struct block *bp)
+struct block *packblock(struct block *bp)
 {
        struct block **l, *nbp;
        int n;
 
-       for(l = &bp; *l; l = &(*l)->next){
+       if (bp->extra_len)
+               return bp;
+       for (l = &bp; *l; l = &(*l)->next) {
                nbp = *l;
                n = BLEN(nbp);
-               if((n<<2) < BALLOC(nbp)){
+               if ((n << 2) < BALLOC(nbp)) {
                        *l = allocb(n);
                        memmove((*l)->wp, nbp->rp, n);
                        (*l)->wp += n;
@@ -690,8 +857,7 @@ packblock(struct block *bp)
        return bp;
 }
 
-int
-qproduce(struct queue *q, void *vp, int len)
+int qproduce(struct queue *q, void *vp, int len)
 {
        struct block *b;
        int dowakeup;
@@ -702,7 +868,7 @@ qproduce(struct queue *q, void *vp, int len)
        spin_lock_irqsave(&q->lock);
 
        /* no waiting receivers, room in buffer? */
-       if(q->len >= q->limit){
+       if (q->len >= q->limit) {
                q->state |= Qflow;
                spin_unlock_irqsave(&q->lock);
                return -1;
@@ -710,15 +876,18 @@ qproduce(struct queue *q, void *vp, int len)
 
        /* save in buffer */
        /* use Qcoalesce here to save storage */
+       // TODO: Consider removing the Qcoalesce flag and force a coalescing
+       // strategy by default.
        b = q->blast;
-       if((q->state & Qcoalesce)==0 || q->bfirst==NULL || b->lim-b->wp < len){
+       if ((q->state & Qcoalesce) == 0 || q->bfirst == NULL
+               || b->lim - b->wp < len) {
                /* need a new block */
                b = iallocb(len);
-               if(b == 0){
+               if (b == 0) {
                        spin_unlock_irqsave(&q->lock);
                        return 0;
                }
-               if(q->bfirst)
+               if (q->bfirst)
                        q->blast->next = b;
                else
                        q->bfirst = b;
@@ -726,32 +895,188 @@ qproduce(struct queue *q, void *vp, int len)
                /* b->next = 0; done by iallocb() */
                q->len += BALLOC(b);
        }
+       PANIC_EXTRA(b);
        memmove(b->wp, p, len);
        producecnt += len;
        b->wp += len;
        q->dlen += len;
        QDEBUG checkb(b, "qproduce");
 
-       if(q->state & Qstarve){
+       if (q->state & Qstarve) {
                q->state &= ~Qstarve;
                dowakeup = 1;
        }
 
-       if(q->len >= q->limit)
+       if (q->len >= q->limit)
                q->state |= Qflow;
        spin_unlock_irqsave(&q->lock);
 
-       if(dowakeup)
-               wakeup(&q->rr);
+       if (dowakeup)
+               rendez_wakeup(&q->rr);
 
        return len;
 }
 
+/* Add an extra_data entry to newb at newb_idx pointing to b's body, starting at
+ * body_rp, for up to len.  Returns the len consumed. 
+ *
+ * The base is 'b', so that we can kfree it later.  This currently ties us to
+ * using kfree for the release method for all extra_data.
+ *
+ * It is possible to have a body size that is 0, if there is no offset, and
+ * b->wp == b->rp.  This will have an extra data entry of 0 length. */
+static size_t point_to_body(struct block *b, uint8_t *body_rp,
+                            struct block *newb, unsigned int newb_idx,
+                            size_t len)
+{
+       struct extra_bdata *ebd = &newb->extra_data[newb_idx];
+
+       assert(newb_idx < newb->nr_extra_bufs);
+
+       kmalloc_incref(b);
+       ebd->base = (uintptr_t)b;
+       ebd->off = (uint32_t)(body_rp - (uint8_t*)b);
+       ebd->len = MIN(b->wp - body_rp, len);   /* think of body_rp as b->rp */
+       assert((int)ebd->len >= 0);
+       newb->extra_len += ebd->len;
+       return ebd->len;
+}
+
+/* Add an extra_data entry to newb at newb_idx pointing to b's b_idx'th
+ * extra_data buf, at b_off within that buffer, for up to len.  Returns the len
+ * consumed.
+ *
+ * We can have blocks with 0 length, but they are still refcnt'd.  See above. */
+static size_t point_to_buf(struct block *b, unsigned int b_idx, uint32_t b_off,
+                           struct block *newb, unsigned int newb_idx,
+                           size_t len)
+{
+       struct extra_bdata *n_ebd = &newb->extra_data[newb_idx];
+       struct extra_bdata *b_ebd = &b->extra_data[b_idx];
+
+       assert(b_idx < b->nr_extra_bufs);
+       assert(newb_idx < newb->nr_extra_bufs);
+
+       kmalloc_incref((void*)b_ebd->base);
+       n_ebd->base = b_ebd->base;
+       n_ebd->off = b_ebd->off + b_off;
+       n_ebd->len = MIN(b_ebd->len - b_off, len);
+       newb->extra_len += n_ebd->len;
+       return n_ebd->len;
+}
+
+/* given a string of blocks, fills the new block's extra_data  with the contents
+ * of the blist [offset, len + offset)
+ *
+ * returns 0 on success.  the only failure is if the extra_data array was too
+ * small, so this returns a positive integer saying how big the extra_data needs
+ * to be.
+ *
+ * callers are responsible for protecting the list structure. */
+static int __blist_clone_to(struct block *blist, struct block *newb, int len,
+                            uint32_t offset)
+{
+       struct block *b, *first;
+       unsigned int nr_bufs = 0;
+       unsigned int b_idx, newb_idx = 0;
+       uint8_t *first_main_body = 0;
+
+       /* find the first block; keep offset relative to the latest b in the list */
+       for (b = blist; b; b = b->next) {
+               if (BLEN(b) > offset)
+                       break;
+               offset -= BLEN(b);
+       }
+       /* qcopy semantics: if you asked for an offset outside the block list, you
+        * get an empty block back */
+       if (!b)
+               return 0;
+       first = b;
+       /* upper bound for how many buffers we'll need in newb */
+       for (/* b is set*/; b; b = b->next) {
+               nr_bufs += 1 + b->nr_extra_bufs;        /* 1 for the main body */
+       }
+       /* we might be holding a spinlock here, so we won't wait for kmalloc */
+       block_add_extd(newb, nr_bufs, 0);
+       if (newb->nr_extra_bufs < nr_bufs) {
+               /* caller will need to alloc these, then re-call us */
+               return nr_bufs;
+       }
+       for (b = first; b && len; b = b->next) {
+               b_idx = 0;
+               if (offset) {
+                       if (offset < BHLEN(b)) {
+                               /* off is in the main body */
+                               len -= point_to_body(b, b->rp + offset, newb, newb_idx, len);
+                               newb_idx++;
+                       } else {
+                               /* off is in one of the buffers (or just past the last one).
+                                * we're not going to point to b's main body at all. */
+                               offset -= BHLEN(b);
+                               assert(b->extra_data);
+                               /* assuming these extrabufs are packed, or at least that len
+                                * isn't gibberish */
+                               while (b->extra_data[b_idx].len <= offset) {
+                                       offset -= b->extra_data[b_idx].len;
+                                       b_idx++;
+                               }
+                               /* now offset is set to our offset in the b_idx'th buf */
+                               len -= point_to_buf(b, b_idx, offset, newb, newb_idx, len);
+                               newb_idx++;
+                               b_idx++;
+                       }
+                       offset = 0;
+               } else {
+                       len -= point_to_body(b, b->rp, newb, newb_idx, len);
+                       newb_idx++;
+               }
+               /* knock out all remaining bufs.  we only did one point_to_ op by now,
+                * and any point_to_ could be our last if it consumed all of len. */
+               for (int i = b_idx; (i < b->nr_extra_bufs) && len; i++) {
+                       len -= point_to_buf(b, i, 0, newb, newb_idx, len);
+                       newb_idx++;
+               }
+       }
+       return 0;
+}
+
+struct block *blist_clone(struct block *blist, int header_len, int len,
+                          uint32_t offset)
+{
+       int ret;
+       struct block *newb = allocb(header_len);
+       do {
+               ret = __blist_clone_to(blist, newb, len, offset);
+               if (ret)
+                       block_add_extd(newb, ret, KMALLOC_WAIT);
+       } while (ret);
+       return newb;
+}
+
+/* given a queue, makes a single block with header_len reserved space in the
+ * block main body, and the contents of [offset, len + offset) pointed to in the
+ * new blocks ext_data. */
+struct block *qclone(struct queue *q, int header_len, int len, uint32_t offset)
+{
+       int ret;
+       struct block *newb = allocb(header_len);
+       /* the while loop should rarely be used: it would require someone
+        * concurrently adding to the queue. */
+       do {
+               /* TODO: RCU: protecting the q list (b->next) (need read lock) */
+               spin_lock_irqsave(&q->lock);
+               ret = __blist_clone_to(q->bfirst, newb, len, offset);
+               spin_unlock_irqsave(&q->lock);
+               if (ret)
+                       block_add_extd(newb, ret, KMALLOC_WAIT);
+       } while (ret);
+       return newb;
+}
+
 /*
  *  copy from offset in the queue
  */
-struct block*
-qcopy(struct queue *q, int len, uint32_t offset)
+struct block *qcopy_old(struct queue *q, int len, uint32_t offset)
 {
        int sofar;
        int n;
@@ -764,13 +1089,13 @@ qcopy(struct queue *q, int len, uint32_t offset)
 
        /* go to offset */
        b = q->bfirst;
-       for(sofar = 0; ; sofar += n){
-               if(b == NULL){
+       for (sofar = 0;; sofar += n) {
+               if (b == NULL) {
                        spin_unlock_irqsave(&q->lock);
                        return nb;
                }
                n = BLEN(b);
-               if(sofar + n > offset){
+               if (sofar + n > offset) {
                        p = b->rp + offset - sofar;
                        n -= offset - sofar;
                        break;
@@ -780,15 +1105,16 @@ qcopy(struct queue *q, int len, uint32_t offset)
        }
 
        /* copy bytes from there */
-       for(sofar = 0; sofar < len;){
-               if(n > len - sofar)
+       for (sofar = 0; sofar < len;) {
+               if (n > len - sofar)
                        n = len - sofar;
+               PANIC_EXTRA(b);
                memmove(nb->wp, p, n);
                qcopycnt += n;
                sofar += n;
                nb->wp += n;
                b = b->next;
-               if(b == NULL)
+               if (b == NULL)
                        break;
                n = BLEN(b);
                p = b->rp;
@@ -798,38 +1124,55 @@ qcopy(struct queue *q, int len, uint32_t offset)
        return nb;
 }
 
+struct block *qcopy(struct queue *q, int len, uint32_t offset)
+{
+#ifdef CONFIG_BLOCK_EXTRAS
+       return qclone(q, 0, len, offset);
+#else
+       return qcopy_old(q, len, offset);
+#endif
+}
+
+static void qinit_common(struct queue *q)
+{
+       spinlock_init_irqsave(&q->lock);
+       qlock_init(&q->rlock);
+       qlock_init(&q->wlock);
+       rendez_init(&q->rr);
+       rendez_init(&q->wr);
+}
+
 /*
  *  called by non-interrupt code
  */
-struct queue*
-qopen(int limit, int msg, void (*kick)(void*), void *arg)
+struct queue *qopen(int limit, int msg, void (*kick) (void *), void *arg)
 {
        struct queue *q;
 
        q = kzmalloc(sizeof(struct queue), 0);
-       if(q == 0)
+       if (q == 0)
                return 0;
+       qinit_common(q);
 
-       q->limit = q->iNULLim = limit;
+       q->limit = q->inilim = limit;
        q->kick = kick;
        q->arg = arg;
        q->state = msg;
        q->state |= Qstarve;
        q->eof = 0;
-       q->noblock = 0;
 
        return q;
 }
 
 /* open a queue to be bypassed */
-struct queue*
-qbypass(void (*bypass)(void*, struct block*), void *arg)
+struct queue *qbypass(void (*bypass) (void *, struct block *), void *arg)
 {
        struct queue *q;
 
        q = kzmalloc(sizeof(struct queue), 0);
-       if(q == 0)
+       if (q == 0)
                return 0;
+       qinit_common(q);
 
        q->limit = 0;
        q->arg = arg;
@@ -839,56 +1182,64 @@ qbypass(void (*bypass)(void*, struct block*), void *arg)
        return q;
 }
 
-static int
-notempty(void *a)
+static int notempty(void *a)
 {
        struct queue *q = a;
 
        return (q->state & Qclosed) || q->bfirst != 0;
 }
 
-/*
- *  wait for the queue to be non-empty or closed.
- *  called with q ilocked.
- */
-static int
-qwait(struct queue *q)
+/* Wait for the queue to be non-empty or closed.  Returns TRUE for a successful
+ * wait, FALSE on Qclose (without error)
+ *
+ * Called with q ilocked.  May error out, back through the caller, with
+ * the irqsave lock unlocked.  */
+static bool qwait(struct queue *q)
 {
        /* wait for data */
-       for(;;){
-               if(q->bfirst != NULL)
+       for (;;) {
+               if (q->bfirst != NULL)
                        break;
 
-               if(q->state & Qclosed){
-                       if(++q->eof > 3)
-                               return -1;
-                       if(*q->err && strcmp(q->err, Ehungup) != 0)
-                               return -1;
-                       return 0;
+               if (q->state & Qclosed) {
+                       if (++q->eof > 3) {
+                               spin_unlock_irqsave(&q->lock);
+                               error("multiple reads on a closed queue");
+                       }
+                       if (*q->err && strcmp(q->err, Ehungup) != 0) {
+                               spin_unlock_irqsave(&q->lock);
+                               error(q->err);
+                       }
+                       return FALSE;
+               }
+               if (q->state & Qnonblock) {
+                       spin_unlock_irqsave(&q->lock);
+                       set_errno(EAGAIN);
+                       error("queue empty");
                }
-
                q->state |= Qstarve;    /* flag requesting producer to wake me */
                spin_unlock_irqsave(&q->lock);
-               sleep(&q->rr, notempty, q);
+               /* may throw an error() */
+               rendez_sleep(&q->rr, notempty, q);
                spin_lock_irqsave(&q->lock);
        }
-       return 1;
+       return TRUE;
 }
 
 /*
  * add a block list to a queue
  */
-void
-qaddlist(struct queue *q, struct block *b)
+void qaddlist(struct queue *q, struct block *b)
 {
+       /* TODO: q lock? */
        /* queue the block */
-       if(q->bfirst)
+       if (q->bfirst)
                q->blast->next = b;
        else
                q->bfirst = b;
        q->len += blockalloclen(b);
        q->dlen += blocklen(b);
-       while(b->next)
+       while (b->next)
                b = b->next;
        q->blast = b;
 }
@@ -896,13 +1247,12 @@ qaddlist(struct queue *q, struct block *b)
 /*
  *  called with q ilocked
  */
-struct block*
-qremove(struct queue *q)
+struct block *qremove(struct queue *q)
 {
        struct block *b;
 
        b = q->bfirst;
-       if(b == NULL)
+       if (b == NULL)
                return NULL;
        q->bfirst = b->next;
        b->next = NULL;
@@ -912,28 +1262,67 @@ qremove(struct queue *q)
        return b;
 }
 
+static size_t read_from_block(struct block *b, uint8_t *to, size_t amt)
+{
+       size_t copy_amt, retval = 0;
+       struct extra_bdata *ebd;
+       
+       copy_amt = MIN(BHLEN(b), amt);
+       memcpy(to, b->rp, copy_amt);
+       /* advance the rp, since this block not be completely consumed and future
+        * reads need to know where to pick up from */
+       b->rp += copy_amt;
+       to += copy_amt;
+       amt -= copy_amt;
+       retval += copy_amt;
+       for (int i = 0; (i < b->nr_extra_bufs) && amt; i++) {
+               ebd = &b->extra_data[i];
+               /* skip empty entires.  if we track this in the struct block, we can
+                * just start the for loop early */
+               if (!ebd->base || !ebd->len)
+                       continue;
+               copy_amt = MIN(ebd->len, amt);
+               memcpy(to, (void*)(ebd->base + ebd->off), copy_amt);
+               /* we're actually consuming the entries, just like how we advance rp up
+                * above, and might only consume part of one. */
+               ebd->len -= copy_amt;
+               ebd->off += copy_amt;
+               b->extra_len -= copy_amt;
+               if (!ebd->len) {
+                       /* we don't actually have to decref here.  it's also done in
+                        * freeb().  this is the earliest we can free. */
+                       kfree((void*)ebd->base);
+                       ebd->base = ebd->off = 0;
+               }
+               to += copy_amt;
+               amt -= copy_amt;
+               retval += copy_amt;
+       }
+       return retval;
+}
+
 /*
  *  copy the contents of a string of blocks into
  *  memory.  emptied blocks are freed.  return
  *  pointer to first unconsumed block.
  */
-struct block*
-bl2mem(uint8_t *p, struct block *b, int n)
+struct block *bl2mem(uint8_t * p, struct block *b, int n)
 {
        int i;
        struct block *next;
 
-       for(; b != NULL; b = next){
+       /* could be slicker here, since read_from_block is smart */
+       for (; b != NULL; b = next) {
                i = BLEN(b);
-               if(i > n){
-                       memmove(p, b->rp, n);
-                       b->rp += n;
+               if (i > n) {
+                       /* partial block, consume some */
+                       read_from_block(b, p, n);
                        return b;
                }
-               memmove(p, b->rp, i);
+               /* full block, consume all and move on */
+               i = read_from_block(b, p, i);
                n -= i;
                p += i;
-               b->rp += i;
                next = b->next;
                freeb(b);
        }
@@ -944,31 +1333,31 @@ bl2mem(uint8_t *p, struct block *b, int n)
  *  copy the contents of memory into a string of blocks.
  *  return NULL on error.
  */
-struct block*
-mem2bl(uint8_t *p, int len)
+struct block *mem2bl(uint8_t * p, int len)
 {
-       ERRSTACK(2);
+       ERRSTACK(1);
        int n;
        struct block *b, *first, **l;
 
        first = NULL;
        l = &first;
-       if(waserror()){
+       if (waserror()) {
                freeblist(first);
                nexterror();
        }
        do {
                n = len;
-               if(n > Maxatomic)
+               if (n > Maxatomic)
                        n = Maxatomic;
 
                *l = b = allocb(n);
+               /* TODO consider extra_data */
                memmove(b->wp, p, n);
                b->wp += n;
                p += n;
                len -= n;
                l = &b->next;
-       } while(len > 0);
+       } while (len > 0);
        poperror();
 
        return first;
@@ -978,11 +1367,10 @@ mem2bl(uint8_t *p, int len)
  *  put a block back to the front of the queue
  *  called with q ilocked
  */
-void
-qputback(struct queue *q, struct block *b)
+void qputback(struct queue *q, struct block *b)
 {
        b->next = q->bfirst;
-       if(q->bfirst == NULL)
+       if (q->bfirst == NULL)
                q->blast = b;
        q->bfirst = b;
        q->len += BALLOC(b);
@@ -993,13 +1381,12 @@ qputback(struct queue *q, struct block *b)
  *  flow control, get producer going again
  *  called with q ilocked
  */
-static void
-qwakeup_iunlock(struct queue *q)
+static void qwakeup_iunlock(struct queue *q)
 {
        int dowakeup = 0;
 
        /* if writer flow controlled, restart */
-       if((q->state & Qflow) && q->len < q->limit/2){
+       if ((q->state & Qflow) && q->len < q->limit / 2) {
                q->state &= ~Qflow;
                dowakeup = 1;
        }
@@ -1007,41 +1394,35 @@ qwakeup_iunlock(struct queue *q)
        spin_unlock_irqsave(&q->lock);
 
        /* wakeup flow controlled writers */
-       if(dowakeup){
-               if(q->kick)
+       if (dowakeup) {
+               if (q->kick)
                        q->kick(q->arg);
-               wakeup(&q->wr);
+               rendez_wakeup(&q->wr);
        }
 }
 
 /*
  *  get next block from a queue (up to a limit)
  */
-struct block*
-qbread(struct queue *q, int len)
+struct block *qbread(struct queue *q, int len)
 {
-       ERRSTACK(2);
+       ERRSTACK(1);
        struct block *b, *nb;
        int n;
 
        qlock(&q->rlock);
-       if(waserror()){
+       if (waserror()) {
                qunlock(&q->rlock);
                nexterror();
        }
 
        spin_lock_irqsave(&q->lock);
-       switch(qwait(q)){
-       case 0:
+       if (!qwait(q)) {
                /* queue closed */
                spin_unlock_irqsave(&q->lock);
                qunlock(&q->rlock);
                poperror();
                return NULL;
-       case -1:
-               /* multiple reads on a closed queue */
-               spin_unlock_irqsave(&q->lock);
-               error(q->err);
        }
 
        /* if we get here, there's at least one block in the queue */
@@ -1050,11 +1431,12 @@ qbread(struct queue *q, int len)
 
        /* split block if it's too big and this is not a message queue */
        nb = b;
-       if(n > len){
-               if((q->state&Qmsg) == 0){
+       if (n > len) {
+               PANIC_EXTRA(b);
+               if ((q->state & Qmsg) == 0) {
                        n -= len;
                        b = allocb(n);
-                       memmove(b->wp, nb->rp+len, n);
+                       memmove(b->wp, nb->rp + len, n);
                        b->wp += n;
                        qputback(q, b);
                }
@@ -1073,39 +1455,35 @@ qbread(struct queue *q, int len)
  *  read a queue.  if no data is queued, post a struct block
  *  and wait on its Rendez.
  */
-long
-qread(struct queue *q, void *vp, int len)
+long qread(struct queue *q, void *vp, int len)
 {
-       ERRSTACK(2);
+       ERRSTACK(1);
        struct block *b, *first, **l;
        int m, n;
 
        qlock(&q->rlock);
-       if(waserror()){
+       if (waserror()) {
                qunlock(&q->rlock);
                nexterror();
        }
 
        spin_lock_irqsave(&q->lock);
 again:
-       switch(qwait(q)){
-       case 0:
+       if (!qwait(q)) {
                /* queue closed */
                spin_unlock_irqsave(&q->lock);
                qunlock(&q->rlock);
                poperror();
                return 0;
-       case -1:
-               /* multiple reads on a closed queue */
-               spin_unlock_irqsave(&q->lock);
-               error(q->err);
        }
 
        /* if we get here, there's at least one block in the queue */
-       if(q->state & Qcoalesce){
+       // TODO: Consider removing the Qcoalesce flag and force a coalescing
+       // strategy by default.
+       if (q->state & Qcoalesce) {
                /* when coalescing, 0 length blocks just go away */
                b = q->bfirst;
-               if(BLEN(b) <= 0){
+               if (BLEN(b) <= 0) {
                        freeb(qremove(q));
                        goto again;
                }
@@ -1117,16 +1495,16 @@ again:
                n = 0;
                l = &first;
                m = BLEN(b);
-               for(;;) {
+               for (;;) {
                        *l = qremove(q);
                        l = &b->next;
                        n += m;
 
                        b = q->bfirst;
-                       if(b == NULL)
+                       if (b == NULL)
                                break;
                        m = BLEN(b);
-                       if(n+m > len)
+                       if (n + m > len)
                                break;
                }
        } else {
@@ -1140,9 +1518,9 @@ again:
        spin_lock_irqsave(&q->lock);
 
        /* take care of any left over partial block */
-       if(b != NULL){
+       if (b != NULL) {
                n -= BLEN(b);
-               if(q->state & Qmsg)
+               if (q->state & Qmsg)
                        freeb(b);
                else
                        qputback(q, b);
@@ -1156,36 +1534,35 @@ again:
        return n;
 }
 
-static int
-qnotfull(void *a)
+static int qnotfull(void *a)
 {
        struct queue *q = a;
 
        return q->len < q->limit || (q->state & Qclosed);
 }
 
-uint32_t noblockcnt;
+uint32_t dropcnt;
 
 /*
  *  add a block to a queue obeying flow control
  */
-long
-qbwrite(struct queue *q, struct block *b)
+long qbwrite(struct queue *q, struct block *b)
 {
-       ERRSTACK(2);
+       ERRSTACK(1);
        int n, dowakeup;
+       volatile bool should_free_b = TRUE;
 
        n = BLEN(b);
 
-       if(q->bypass){
-               (*q->bypass)(q->arg, b);
+       if (q->bypass) {
+               (*q->bypass) (q->arg, b);
                return n;
        }
 
        dowakeup = 0;
        qlock(&q->wlock);
-       if(waserror()){
-               if(b != NULL)
+       if (waserror()) {
+               if (b != NULL && should_free_b)
                        freeb(b);
                qunlock(&q->wlock);
                nexterror();
@@ -1194,25 +1571,33 @@ qbwrite(struct queue *q, struct block *b)
        spin_lock_irqsave(&q->lock);
 
        /* give up if the queue is closed */
-       if(q->state & Qclosed){
+       if (q->state & Qclosed) {
                spin_unlock_irqsave(&q->lock);
                error(q->err);
        }
 
        /* if nonblocking, don't queue over the limit */
-       if(q->len >= q->limit){
-               if(q->noblock){
+       if (q->len >= q->limit) {
+               /* drop overflow takes priority over regular non-blocking */
+               if (q->state & Qdropoverflow) {
                        spin_unlock_irqsave(&q->lock);
                        freeb(b);
-                       noblockcnt += n;
+                       dropcnt += n;
                        qunlock(&q->wlock);
                        poperror();
                        return n;
                }
+               if (q->state & Qnonblock) {
+                       spin_unlock_irqsave(&q->lock);
+                       freeb(b);
+                       set_errno(EAGAIN);
+                       error("queue full");
+               }
        }
 
        /* queue the block */
-       if(q->bfirst)
+       should_free_b = FALSE;
+       if (q->bfirst)
                q->blast->next = b;
        else
                q->bfirst = b;
@@ -1224,19 +1609,19 @@ qbwrite(struct queue *q, struct block *b)
        b = NULL;
 
        /* make sure other end gets awakened */
-       if(q->state & Qstarve){
+       if (q->state & Qstarve) {
                q->state &= ~Qstarve;
                dowakeup = 1;
        }
        spin_unlock_irqsave(&q->lock);
 
        /*  get output going again */
-       if(q->kick && (dowakeup || (q->state&Qkick)))
+       if (q->kick && (dowakeup || (q->state & Qkick)))
                q->kick(q->arg);
 
        /* wakeup anyone consuming at the other end */
-       if(dowakeup)
-               wakeup(&q->rr);
+       if (dowakeup)
+               rendez_wakeup(&q->rr);
 
        /*
         *  flow control, wait for queue to get below the limit
@@ -1250,14 +1635,14 @@ qbwrite(struct queue *q, struct block *b)
         *  that keeps getting interrupted and rewriting will
         *  queue infinite crud.
         */
-       for(;;){
-               if(q->noblock || qnotfull(q))
+       for (;;) {
+               if ((q->state & (Qdropoverflow | Qnonblock)) || qnotfull(q))
                        break;
 
                spin_lock_irqsave(&q->lock);
                q->state |= Qflow;
                spin_unlock_irqsave(&q->lock);
-               sleep(&q->wr, qnotfull, q);
+               rendez_sleep(&q->wr, qnotfull, q);
        }
 
        qunlock(&q->wlock);
@@ -1265,39 +1650,87 @@ qbwrite(struct queue *q, struct block *b)
        return n;
 }
 
+long qibwrite(struct queue *q, struct block *b)
+{
+       int n, dowakeup;
+
+       dowakeup = 0;
+
+       n = BLEN(b);
+
+       spin_lock_irqsave(&q->lock);
+
+       QDEBUG checkb(b, "qibwrite");
+       if (q->bfirst)
+               q->blast->next = b;
+       else
+               q->bfirst = b;
+       q->blast = b;
+       q->len += BALLOC(b);
+       q->dlen += n;
+
+       if (q->state & Qstarve) {
+               q->state &= ~Qstarve;
+               dowakeup = 1;
+       }
+
+       spin_unlock_irqsave(&q->lock);
+
+       if (dowakeup) {
+               if (q->kick)
+                       q->kick(q->arg);
+               rendez_wakeup(&q->rr);
+       }
+
+       return n;
+}
+
 /*
  *  write to a queue.  only Maxatomic bytes at a time is atomic.
  */
-int
-qwrite(struct queue *q, void *vp, int len)
+int qwrite(struct queue *q, void *vp, int len)
 {
-       ERRSTACK(2);
        int n, sofar;
        struct block *b;
        uint8_t *p = vp;
+       void *ext_buf;
 
-       QDEBUG if(!islo())
-               printd("qwrite hi %lux\n", getcallerpc(&q));
+       QDEBUG if (!islo())
+                printd("qwrite hi %p\n", getcallerpc(&q));
 
        sofar = 0;
        do {
-               n = len-sofar;
-               if(n > Maxatomic)
+               n = len - sofar;
+               /* This is 64K, the max amount per single block.  Still a good value? */
+               if (n > Maxatomic)
                        n = Maxatomic;
 
+               /* If n is small, we don't need to bother with the extra_data.  But
+                * until the whole stack can handle extd blocks, we'll use them
+                * unconditionally. */
+#ifdef CONFIG_BLOCK_EXTRAS
+               /* allocb builds in 128 bytes of header space to all blocks, but this is
+                * only available via padblock (to the left).  we also need some space
+                * for pullupblock for some basic headers (like icmp) that get written
+                * in directly */
+               b = allocb(64);
+               ext_buf = kmalloc(n, 0);
+               memcpy(ext_buf, p + sofar, n);
+               block_add_extd(b, 1, KMALLOC_WAIT); /* returns 0 on success */
+               b->extra_data[0].base = (uintptr_t)ext_buf;
+               b->extra_data[0].off = 0;
+               b->extra_data[0].len = n;
+               b->extra_len += n;
+#else
                b = allocb(n);
-               if(waserror()){
-                       freeb(b);
-                       nexterror();
-               }
-               memmove(b->wp, p+sofar, n);
-               poperror();
+               memmove(b->wp, p + sofar, n);
                b->wp += n;
-
+#endif
+                       
                qbwrite(q, b);
 
                sofar += n;
-       } while(sofar < len && (q->state & Qmsg) == 0);
+       } while (sofar < len && (q->state & Qmsg) == 0);
 
        return len;
 }
@@ -1306,8 +1739,7 @@ qwrite(struct queue *q, void *vp, int len)
  *  used by print() to write to a queue.  Since we may be splhi or not in
  *  a process, don't qlock.
  */
-int
-qiwrite(struct queue *q, void *vp, int len)
+int qiwrite(struct queue *q, void *vp, int len)
 {
        int n, sofar, dowakeup;
        struct block *b;
@@ -1317,42 +1749,22 @@ qiwrite(struct queue *q, void *vp, int len)
 
        sofar = 0;
        do {
-               n = len-sofar;
-               if(n > Maxatomic)
+               n = len - sofar;
+               if (n > Maxatomic)
                        n = Maxatomic;
 
                b = iallocb(n);
-               if(b == NULL)
+               if (b == NULL)
                        break;
-               memmove(b->wp, p+sofar, n);
+               /* TODO consider extra_data */
+               memmove(b->wp, p + sofar, n);
+               /* this adjusts BLEN to be n, or at least it should */
                b->wp += n;
-
-               spin_lock_irqsave(&q->lock);
-
-               QDEBUG checkb(b, "qiwrite");
-               if(q->bfirst)
-                       q->blast->next = b;
-               else
-                       q->bfirst = b;
-               q->blast = b;
-               q->len += BALLOC(b);
-               q->dlen += n;
-
-               if(q->state & Qstarve){
-                       q->state &= ~Qstarve;
-                       dowakeup = 1;
-               }
-
-               spin_unlock_irqsave(&q->lock);
-
-               if(dowakeup){
-                       if(q->kick)
-                               q->kick(q->arg);
-                       wakeup(&q->rr);
-               }
+               assert(n == BLEN(b));
+               qibwrite(q, b);
 
                sofar += n;
-       } while(sofar < len && (q->state & Qmsg) == 0);
+       } while (sofar < len && (q->state & Qmsg) == 0);
 
        return sofar;
 }
@@ -1361,8 +1773,7 @@ qiwrite(struct queue *q, void *vp, int len)
  *  be extremely careful when calling this,
  *  as there is no reference accounting
  */
-void
-qfree(struct queue *q)
+void qfree(struct queue *q)
 {
        qclose(q);
        kfree(q);
@@ -1372,60 +1783,56 @@ qfree(struct queue *q)
  *  Mark a queue as closed.  No further IO is permitted.
  *  All blocks are released.
  */
-void
-qclose(struct queue *q)
+void qclose(struct queue *q)
 {
        struct block *bfirst;
 
-       if(q == NULL)
+       if (q == NULL)
                return;
 
        /* mark it */
        spin_lock_irqsave(&q->lock);
        q->state |= Qclosed;
-       q->state &= ~(Qflow|Qstarve);
-       strncpy(q->err,  Ehungup, sizeof(q->err));
+       q->state &= ~(Qflow | Qstarve | Qdropoverflow | Qnonblock);
+       strncpy(q->err, Ehungup, sizeof(q->err));
        bfirst = q->bfirst;
        q->bfirst = 0;
        q->len = 0;
        q->dlen = 0;
-       q->noblock = 0;
        spin_unlock_irqsave(&q->lock);
 
        /* free queued blocks */
        freeblist(bfirst);
 
        /* wake up readers/writers */
-       wakeup(&q->rr);
-       wakeup(&q->wr);
+       rendez_wakeup(&q->rr);
+       rendez_wakeup(&q->wr);
 }
 
 /*
  *  Mark a queue as closed.  Wakeup any readers.  Don't remove queued
  *  blocks.
  */
-void
-qhangup(struct queue *q, char *msg)
+void qhangup(struct queue *q, char *msg)
 {
        /* mark it */
        spin_lock_irqsave(&q->lock);
        q->state |= Qclosed;
-       if(msg == 0 || *msg == 0)
-               strncpy(q->err,  Ehungup, sizeof(q->err));
+       if (msg == 0 || *msg == 0)
+               strncpy(q->err, Ehungup, sizeof(q->err));
        else
-               strncpy(q->err, msg, ERRMAX-1);
+               strncpy(q->err, msg, ERRMAX - 1);
        spin_unlock_irqsave(&q->lock);
 
        /* wake up readers/writers */
-       wakeup(&q->rr);
-       wakeup(&q->wr);
+       rendez_wakeup(&q->rr);
+       rendez_wakeup(&q->wr);
 }
 
 /*
  *  return non-zero if the q is hungup
  */
-int
-qisclosed(struct queue *q)
+int qisclosed(struct queue *q)
 {
        return q->state & Qclosed;
 }
@@ -1433,22 +1840,20 @@ qisclosed(struct queue *q)
 /*
  *  mark a queue as no longer hung up
  */
-void
-qreopen(struct queue *q)
+void qreopen(struct queue *q)
 {
        spin_lock_irqsave(&q->lock);
        q->state &= ~Qclosed;
        q->state |= Qstarve;
        q->eof = 0;
-       q->limit = q->iNULLim;
+       q->limit = q->inilim;
        spin_unlock_irqsave(&q->lock);
 }
 
 /*
  *  return bytes queued
  */
-int
-qlen(struct queue *q)
+int qlen(struct queue *q)
 {
        return q->dlen;
 }
@@ -1456,13 +1861,12 @@ qlen(struct queue *q)
 /*
  * return space remaining before flow control
  */
-int
-qwindow(struct queue *q)
+int qwindow(struct queue *q)
 {
        int l;
 
        l = q->limit - q->len;
-       if(l < 0)
+       if (l < 0)
                l = 0;
        return l;
 }
@@ -1470,35 +1874,43 @@ qwindow(struct queue *q)
 /*
  *  return true if we can read without blocking
  */
-int
-qcanread(struct queue *q)
+int qcanread(struct queue *q)
 {
-       return q->bfirst!=0;
+       return q->bfirst != 0;
 }
 
 /*
  *  change queue limit
  */
-void
-qsetlimit(struct queue *q, int limit)
+void qsetlimit(struct queue *q, int limit)
 {
        q->limit = limit;
 }
 
 /*
- *  set blocking/nonblocking
+ *  set whether writes drop overflowing blocks, or if we sleep
  */
-void
-qnoblock(struct queue *q, int onoff)
+void qdropoverflow(struct queue *q, bool onoff)
+{
+       if (onoff)
+               q->state |= Qdropoverflow;
+       else
+               q->state &= ~Qdropoverflow;
+}
+
+/* set whether or not the queue is nonblocking, in the EAGAIN sense. */
+void qnonblock(struct queue *q, bool onoff)
 {
-       q->noblock = onoff;
+       if (onoff)
+               q->state |= Qnonblock;
+       else
+               q->state &= ~Qnonblock;
 }
 
 /*
  *  flush the output queue
  */
-void
-qflush(struct queue *q)
+void qflush(struct queue *q)
 {
        struct block *bfirst;
 
@@ -1514,25 +1926,22 @@ qflush(struct queue *q)
        freeblist(bfirst);
 
        /* wake up readers/writers */
-       wakeup(&q->wr);
+       rendez_wakeup(&q->wr);
 }
 
-int
-qfull(struct queue *q)
+int qfull(struct queue *q)
 {
        return q->state & Qflow;
 }
 
-int
-qstate(struct queue *q)
+int qstate(struct queue *q)
 {
        return q->state;
 }
 
-void
-qdump(struct queue *q)
+void qdump(struct queue *q)
 {
-       if(q)
-       printk("q=%p bfirst=%p blast=%p len=%d dlen=%d limit=%d state=#%x\n",
-               q, q->bfirst, q->blast, q->len, q->dlen, q->limit, q->state);
+       if (q)
+               printk("q=%p bfirst=%p blast=%p len=%d dlen=%d limit=%d state=#%x\n",
+                          q, q->bfirst, q->blast, q->len, q->dlen, q->limit, q->state);
 }