Change iNULLim to inilim in qio.c
[akaros.git] / kern / src / ns / qio.c
index e2d477c..23e535c 100644 (file)
@@ -29,60 +29,56 @@ static int debugging;
  *  IO queues
  */
 
-struct queue
-{
+struct queue {
        spinlock_t lock;;
 
-       struct block*   bfirst;         /* buffer */
-       struct block*   blast;
+       struct block *bfirst;           /* buffer */
+       struct block *blast;
 
-       int     len;            /* bytes allocated to queue */
-       int     dlen;           /* data bytes in queue */
-       int     limit;          /* max bytes in queue */
-       int     iNULLim;                /* initial limit */
-       int     state;
-       int     noblock;        /* true if writes return immediately when q full */
-       int     eof;            /* number of eofs read by user */
+       int len;                                        /* bytes allocated to queue */
+       int dlen;                                       /* data bytes in queue */
+       int limit;                                      /* max bytes in queue */
+       int inilim;                             /* initial limit */
+       int state;
+       int noblock;                            /* true if writes return immediately when q full */
+       int eof;                                        /* number of eofs read by user */
 
-       void    (*kick)(void*); /* restart output */
-       void    (*bypass)(void*, struct block*);        /* bypass queue altogether */
-       void*   arg;            /* argument to kick */
+       void (*kick) (void *);          /* restart output */
+       void (*bypass) (void *, struct block *);        /* bypass queue altogether */
+       void *arg;                                      /* argument to kick */
 
-       qlock_t rlock;          /* mutex for reading processes */
-       struct rendez   rr;             /* process waiting to read */
-       qlock_t wlock;          /* mutex for writing processes */
-       struct rendez   wr;             /* process waiting to write */
+       qlock_t rlock;                          /* mutex for reading processes */
+       struct rendez rr;                       /* process waiting to read */
+       qlock_t wlock;                          /* mutex for writing processes */
+       struct rendez wr;                       /* process waiting to write */
 
-       char    err[ERRMAX];
+       char err[ERRMAX];
 };
 
-enum
-{
-       Maxatomic       = 64*1024,
+enum {
+       Maxatomic = 64 * 1024,
 };
 
-unsigned int   qiomaxatomic = Maxatomic;
+unsigned int qiomaxatomic = Maxatomic;
 
-void
-ixsummary(void)
+void ixsummary(void)
 {
        debugging ^= 1;
        iallocsummary();
-       printd("pad %lud, concat %lud, pullup %lud, copy %lud\n",
-               padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
-       printd("consume %lud, produce %lud, qcopy %lud\n",
-               consumecnt, producecnt, qcopycnt);
+       printd("pad %lu, concat %lu, pullup %lu, copy %lu\n",
+                  padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
+       printd("consume %lu, produce %lu, qcopy %lu\n",
+                  consumecnt, producecnt, qcopycnt);
 }
 
 /*
  *  free a list of blocks
  */
-void
-freeblist(struct block *b)
+void freeblist(struct block *b)
 {
        struct block *next;
 
-       for(; b != 0; b = next){
+       for (; b != 0; b = next) {
                next = b->next;
                b->next = 0;
                freeb(b);
@@ -92,24 +88,23 @@ freeblist(struct block *b)
 /*
  *  pad a block to the front (or the back if size is negative)
  */
-struct block*
-padblock(struct block *bp, int size)
+struct block *padblock(struct block *bp, int size)
 {
        int n;
        struct block *nbp;
 
        QDEBUG checkb(bp, "padblock 1");
-       if(size >= 0){
-               if(bp->rp - bp->base >= size){
+       if (size >= 0) {
+               if (bp->rp - bp->base >= size) {
                        bp->rp -= size;
                        return bp;
                }
 
-               if(bp->next)
-                       panic("padblock 0x%luX", getcallerpc(&bp));
+               if (bp->next)
+                       panic("padblock %p", getcallerpc(&bp));
                n = BLEN(bp);
                padblockcnt++;
-               nbp = allocb(size+n);
+               nbp = allocb(size + n);
                nbp->rp += size;
                nbp->wp = nbp->rp;
                memmove(nbp->wp, bp->rp, n);
@@ -119,15 +114,15 @@ padblock(struct block *bp, int size)
        } else {
                size = -size;
 
-               if(bp->next)
-                       panic("padblock 0x%luX", getcallerpc(&bp));
+               if (bp->next)
+                       panic("padblock %p", getcallerpc(&bp));
 
-               if(bp->lim - bp->wp >= size)
+               if (bp->lim - bp->wp >= size)
                        return bp;
 
                n = BLEN(bp);
                padblockcnt++;
-               nbp = allocb(size+n);
+               nbp = allocb(size + n);
                memmove(nbp->wp, bp->rp, n);
                nbp->wp += n;
                freeb(bp);
@@ -139,13 +134,12 @@ padblock(struct block *bp, int size)
 /*
  *  return count of bytes in a string of blocks
  */
-int
-blocklen(struct block *bp)
+int blocklen(struct block *bp)
 {
        int len;
 
        len = 0;
-       while(bp) {
+       while (bp) {
                len += BLEN(bp);
                bp = bp->next;
        }
@@ -155,13 +149,12 @@ blocklen(struct block *bp)
 /*
  * return count of space in blocks
  */
-int
-blockalloclen(struct block *bp)
+int blockalloclen(struct block *bp)
 {
        int len;
 
        len = 0;
-       while(bp) {
+       while (bp) {
                len += BALLOC(bp);
                bp = bp->next;
        }
@@ -172,17 +165,16 @@ blockalloclen(struct block *bp)
  *  copy the  string of blocks into
  *  a single block and free the string
  */
-struct block*
-concatblock(struct block *bp)
+struct block *concatblock(struct block *bp)
 {
        int len;
        struct block *nb, *f;
 
-       if(bp->next == 0)
+       if (bp->next == 0)
                return bp;
 
        nb = allocb(blocklen(bp));
-       for(f = bp; f; f = f->next) {
+       for (f = bp; f; f = f->next) {
                len = BLEN(f);
                memmove(nb->wp, f->rp, len);
                nb->wp += len;
@@ -196,8 +188,7 @@ concatblock(struct block *bp)
 /*
  *  make sure the first block has at least n bytes
  */
-struct block*
-pullupblock(struct block *bp, int n)
+struct block *pullupblock(struct block *bp, int n)
 {
        int i;
        struct block *nbp;
@@ -206,14 +197,14 @@ pullupblock(struct block *bp, int n)
         *  this should almost always be true, it's
         *  just to avoid every caller checking.
         */
-       if(BLEN(bp) >= n)
+       if (BLEN(bp) >= n)
                return bp;
 
        /*
         *  if not enough room in the first block,
         *  add another to the front of the list.
         */
-       if(bp->lim - bp->rp < n){
+       if (bp->lim - bp->rp < n) {
                nbp = allocb(n);
                nbp->next = bp;
                bp = nbp;
@@ -223,17 +214,16 @@ pullupblock(struct block *bp, int n)
         *  copy bytes from the trailing blocks into the first
         */
        n -= BLEN(bp);
-       while((nbp = bp->next)){
+       while ((nbp = bp->next)) {
                i = BLEN(nbp);
-               if(i > n) {
+               if (i > n) {
                        memmove(bp->wp, nbp->rp, n);
                        pullupblockcnt++;
                        bp->wp += n;
                        nbp->rp += n;
                        QDEBUG checkb(bp, "pullupblock 1");
                        return bp;
-               }
-               else {
+               } else {
                        memmove(bp->wp, nbp->rp, i);
                        pullupblockcnt++;
                        bp->wp += i;
@@ -241,7 +231,7 @@ pullupblock(struct block *bp, int n)
                        nbp->next = 0;
                        freeb(nbp);
                        n -= i;
-                       if(n == 0){
+                       if (n == 0) {
                                QDEBUG checkb(bp, "pullupblock 2");
                                return bp;
                        }
@@ -254,16 +244,14 @@ pullupblock(struct block *bp, int n)
 /*
  *  make sure the first block has at least n bytes
  */
-struct block*
-pullupqueue(struct queue *q, int n)
+struct block *pullupqueue(struct queue *q, int n)
 {
        struct block *b;
 
-       if((BLEN(q->bfirst) >= n))
+       if ((BLEN(q->bfirst) >= n))
                return q->bfirst;
        q->bfirst = pullupblock(q->bfirst, n);
-       for(b = q->bfirst; b != NULL && b->next != NULL; b = b->next)
-               ;
+       for (b = q->bfirst; b != NULL && b->next != NULL; b = b->next) ;
        q->blast = b;
        return q->bfirst;
 }
@@ -271,19 +259,18 @@ pullupqueue(struct queue *q, int n)
 /*
  *  trim to len bytes starting at offset
  */
-struct block *
-trimblock(struct block *bp, int offset, int len)
+struct block *trimblock(struct block *bp, int offset, int len)
 {
        uint32_t l;
        struct block *nb, *startb;
 
        QDEBUG checkb(bp, "trimblock 1");
-       if(blocklen(bp) < offset+len) {
+       if (blocklen(bp) < offset + len) {
                freeblist(bp);
                return NULL;
        }
 
-       while((l = BLEN(bp)) < offset) {
+       while ((l = BLEN(bp)) < offset) {
                offset -= l;
                nb = bp->next;
                bp->next = NULL;
@@ -294,14 +281,14 @@ trimblock(struct block *bp, int offset, int len)
        startb = bp;
        bp->rp += offset;
 
-       while((l = BLEN(bp)) < len) {
+       while ((l = BLEN(bp)) < len) {
                len -= l;
                bp = bp->next;
        }
 
        bp->wp -= (BLEN(bp) - len);
 
-       if(bp->next) {
+       if (bp->next) {
                freeblist(bp->next);
                bp->next = NULL;
        }
@@ -312,23 +299,22 @@ trimblock(struct block *bp, int offset, int len)
 /*
  *  copy 'count' bytes into a new block
  */
-struct block*
-copyblock(struct block *bp, int count)
+struct block *copyblock(struct block *bp, int count)
 {
        int l;
        struct block *nbp;
 
        QDEBUG checkb(bp, "copyblock 0");
        nbp = allocb(count);
-       for(; count > 0 && bp != 0; bp = bp->next){
+       for (; count > 0 && bp != 0; bp = bp->next) {
                l = BLEN(bp);
-               if(l > count)
+               if (l > count)
                        l = count;
                memmove(nbp->wp, bp->rp, l);
                nbp->wp += l;
                count -= l;
        }
-       if(count > 0){
+       if (count > 0) {
                memset(nbp->wp, 0, count);
                nbp->wp += count;
        }
@@ -338,18 +324,17 @@ copyblock(struct block *bp, int count)
        return nbp;
 }
 
-struct block*
-adjustblock(struct block* bp, int len)
+struct block *adjustblock(struct block *bp, int len)
 {
        int n;
        struct block *nbp;
 
-       if(len < 0){
+       if (len < 0) {
                freeb(bp);
                return NULL;
        }
 
-       if(bp->rp+len > bp->lim){
+       if (bp->rp + len > bp->lim) {
                nbp = copyblock(bp, len);
                freeblist(bp);
                QDEBUG checkb(nbp, "adjustblock 1");
@@ -358,40 +343,38 @@ adjustblock(struct block* bp, int len)
        }
 
        n = BLEN(bp);
-       if(len > n)
-               memset(bp->wp, 0, len-n);
-       bp->wp = bp->rp+len;
+       if (len > n)
+               memset(bp->wp, 0, len - n);
+       bp->wp = bp->rp + len;
        QDEBUG checkb(bp, "adjustblock 2");
 
        return bp;
 }
 
-
 /*
  *  throw away up to count bytes from a
  *  list of blocks.  Return count of bytes
  *  thrown away.
  */
-int
-pullblock(struct block **bph, int count)
+int pullblock(struct block **bph, int count)
 {
        struct block *bp;
        int n, bytes;
 
        bytes = 0;
-       if(bph == NULL)
+       if (bph == NULL)
                return 0;
 
-       while(*bph != NULL && count != 0) {
+       while (*bph != NULL && count != 0) {
                bp = *bph;
                n = BLEN(bp);
-               if(count < n)
+               if (count < n)
                        n = count;
                bytes += n;
                count -= n;
                bp->rp += n;
                QDEBUG checkb(bp, "pullblock ");
-               if(BLEN(bp) == 0) {
+               if (BLEN(bp) == 0) {
                        *bph = bp->next;
                        bp->next = NULL;
                        freeb(bp);
@@ -403,8 +386,7 @@ pullblock(struct block **bph, int count)
 /*
  *  get next block from a queue, return null if nothing there
  */
-struct block*
-qget(struct queue *q)
+struct block *qget(struct queue *q)
 {
        int dowakeup;
        struct block *b;
@@ -413,7 +395,7 @@ qget(struct queue *q)
        spin_lock_irqsave(&q->lock);
 
        b = q->bfirst;
-       if(b == NULL){
+       if (b == NULL) {
                q->state |= Qstarve;
                spin_unlock_irqsave(&q->lock);
                return NULL;
@@ -425,7 +407,7 @@ qget(struct queue *q)
        QDEBUG checkb(b, "qget");
 
        /* if writer flow controlled, restart */
-       if((q->state & Qflow) && q->len < q->limit/2){
+       if ((q->state & Qflow) && q->len < q->limit / 2) {
                q->state &= ~Qflow;
                dowakeup = 1;
        } else
@@ -433,8 +415,8 @@ qget(struct queue *q)
 
        spin_unlock_irqsave(&q->lock);
 
-       if(dowakeup)
-               wakeup(&q->wr);
+       if (dowakeup)
+               rendez_wakeup(&q->wr);
 
        return b;
 }
@@ -443,20 +425,19 @@ qget(struct queue *q)
  *  throw away the next 'len' bytes in the queue
  * returning the number actually discarded
  */
-int
-qdiscard(struct queue *q, int len)
+int qdiscard(struct queue *q, int len)
 {
        struct block *b;
        int dowakeup, n, sofar;
 
        spin_lock_irqsave(&q->lock);
-       for(sofar = 0; sofar < len; sofar += n){
+       for (sofar = 0; sofar < len; sofar += n) {
                b = q->bfirst;
-               if(b == NULL)
+               if (b == NULL)
                        break;
                QDEBUG checkb(b, "qdiscard");
                n = BLEN(b);
-               if(n <= len - sofar){
+               if (n <= len - sofar) {
                        q->bfirst = b->next;
                        b->next = 0;
                        q->len -= BALLOC(b);
@@ -473,13 +454,13 @@ qdiscard(struct queue *q, int len)
         *  if writer flow controlled, restart
         *
         *  This used to be
-        *      q->len < q->limit/2
+        *  q->len < q->limit/2
         *  but it slows down tcp too much for certain write sizes.
         *  I really don't understand it completely.  It may be
         *  due to the queue draining so fast that the transmission
         *  stalls waiting for the app to produce more data.  - presotto
         */
-       if((q->state & Qflow) && q->len < q->limit){
+       if ((q->state & Qflow) && q->len < q->limit) {
                q->state &= ~Qflow;
                dowakeup = 1;
        } else
@@ -487,8 +468,8 @@ qdiscard(struct queue *q, int len)
 
        spin_unlock_irqsave(&q->lock);
 
-       if(dowakeup)
-               wakeup(&q->wr);
+       if (dowakeup)
+               rendez_wakeup(&q->wr);
 
        return sofar;
 }
@@ -496,8 +477,7 @@ qdiscard(struct queue *q, int len)
 /*
  *  Interrupt level copy out of a queue, return # bytes copied.
  */
-int
-qconsume(struct queue *q, void *vp, int len)
+int qconsume(struct queue *q, void *vp, int len)
 {
        struct block *b;
        int n, dowakeup;
@@ -507,9 +487,9 @@ qconsume(struct queue *q, void *vp, int len)
        /* sync with qwrite */
        spin_lock_irqsave(&q->lock);
 
-       for(;;) {
+       for (;;) {
                b = q->bfirst;
-               if(b == 0){
+               if (b == 0) {
                        q->state |= Qstarve;
                        spin_unlock_irqsave(&q->lock);
                        return -1;
@@ -517,7 +497,7 @@ qconsume(struct queue *q, void *vp, int len)
                QDEBUG checkb(b, "qconsume 1");
 
                n = BLEN(b);
-               if(n > 0)
+               if (n > 0)
                        break;
                q->bfirst = b->next;
                q->len -= BALLOC(b);
@@ -527,7 +507,7 @@ qconsume(struct queue *q, void *vp, int len)
                tofree = b;
        };
 
-       if(n < len)
+       if (n < len)
                len = n;
        memmove(p, b->rp, len);
        consumecnt += n;
@@ -535,7 +515,7 @@ qconsume(struct queue *q, void *vp, int len)
        q->dlen -= len;
 
        /* discard the block if we're done with it */
-       if((q->state & Qmsg) || len == n){
+       if ((q->state & Qmsg) || len == n) {
                q->bfirst = b->next;
                b->next = 0;
                q->len -= BALLOC(b);
@@ -547,7 +527,7 @@ qconsume(struct queue *q, void *vp, int len)
        }
 
        /* if writer flow controlled, restart */
-       if((q->state & Qflow) && q->len < q->limit/2){
+       if ((q->state & Qflow) && q->len < q->limit / 2) {
                q->state &= ~Qflow;
                dowakeup = 1;
        } else
@@ -555,29 +535,28 @@ qconsume(struct queue *q, void *vp, int len)
 
        spin_unlock_irqsave(&q->lock);
 
-       if(dowakeup)
-               wakeup(&q->wr);
+       if (dowakeup)
+               rendez_wakeup(&q->wr);
 
-       if(tofree != NULL)
+       if (tofree != NULL)
                freeblist(tofree);
 
        return len;
 }
 
-int
-qpass(struct queue *q, struct block *b)
+int qpass(struct queue *q, struct block *b)
 {
        int dlen, len, dowakeup;
 
        /* sync with qread */
        dowakeup = 0;
        spin_lock_irqsave(&q->lock);
-       if(q->len >= q->limit){
+       if (q->len >= q->limit) {
                freeblist(b);
                spin_unlock_irqsave(&q->lock);
                return -1;
        }
-       if(q->state & Qclosed){
+       if (q->state & Qclosed) {
                len = blocklen(b);
                freeblist(b);
                spin_unlock_irqsave(&q->lock);
@@ -585,14 +564,14 @@ qpass(struct queue *q, struct block *b)
        }
 
        /* add buffer to queue */
-       if(q->bfirst)
+       if (q->bfirst)
                q->blast->next = b;
        else
                q->bfirst = b;
        len = BALLOC(b);
        dlen = BLEN(b);
        QDEBUG checkb(b, "qpass");
-       while(b->next){
+       while (b->next) {
                b = b->next;
                QDEBUG checkb(b, "qpass");
                len += BALLOC(b);
@@ -602,23 +581,22 @@ qpass(struct queue *q, struct block *b)
        q->len += len;
        q->dlen += dlen;
 
-       if(q->len >= q->limit/2)
+       if (q->len >= q->limit / 2)
                q->state |= Qflow;
 
-       if(q->state & Qstarve){
+       if (q->state & Qstarve) {
                q->state &= ~Qstarve;
                dowakeup = 1;
        }
        spin_unlock_irqsave(&q->lock);
 
-       if(dowakeup)
-               wakeup(&q->rr);
+       if (dowakeup)
+               rendez_wakeup(&q->rr);
 
        return len;
 }
 
-int
-qpassnolim(struct queue *q, struct block *b)
+int qpassnolim(struct queue *q, struct block *b)
 {
        int dlen, len, dowakeup;
 
@@ -626,21 +604,21 @@ qpassnolim(struct queue *q, struct block *b)
        dowakeup = 0;
        spin_lock_irqsave(&q->lock);
 
-       if(q->state & Qclosed){
+       if (q->state & Qclosed) {
                freeblist(b);
                spin_unlock_irqsave(&q->lock);
                return BALLOC(b);
        }
 
        /* add buffer to queue */
-       if(q->bfirst)
+       if (q->bfirst)
                q->blast->next = b;
        else
                q->bfirst = b;
        len = BALLOC(b);
        dlen = BLEN(b);
        QDEBUG checkb(b, "qpass");
-       while(b->next){
+       while (b->next) {
                b = b->next;
                QDEBUG checkb(b, "qpass");
                len += BALLOC(b);
@@ -650,17 +628,17 @@ qpassnolim(struct queue *q, struct block *b)
        q->len += len;
        q->dlen += dlen;
 
-       if(q->len >= q->limit/2)
+       if (q->len >= q->limit / 2)
                q->state |= Qflow;
 
-       if(q->state & Qstarve){
+       if (q->state & Qstarve) {
                q->state &= ~Qstarve;
                dowakeup = 1;
        }
        spin_unlock_irqsave(&q->lock);
 
-       if(dowakeup)
-               wakeup(&q->rr);
+       if (dowakeup)
+               rendez_wakeup(&q->rr);
 
        return len;
 }
@@ -669,16 +647,15 @@ qpassnolim(struct queue *q, struct block *b)
  *  if the allocated space is way out of line with the used
  *  space, reallocate to a smaller block
  */
-struct block*
-packblock(struct block *bp)
+struct block *packblock(struct block *bp)
 {
        struct block **l, *nbp;
        int n;
 
-       for(l = &bp; *l; l = &(*l)->next){
+       for (l = &bp; *l; l = &(*l)->next) {
                nbp = *l;
                n = BLEN(nbp);
-               if((n<<2) < BALLOC(nbp)){
+               if ((n << 2) < BALLOC(nbp)) {
                        *l = allocb(n);
                        memmove((*l)->wp, nbp->rp, n);
                        (*l)->wp += n;
@@ -690,8 +667,7 @@ packblock(struct block *bp)
        return bp;
 }
 
-int
-qproduce(struct queue *q, void *vp, int len)
+int qproduce(struct queue *q, void *vp, int len)
 {
        struct block *b;
        int dowakeup;
@@ -702,7 +678,7 @@ qproduce(struct queue *q, void *vp, int len)
        spin_lock_irqsave(&q->lock);
 
        /* no waiting receivers, room in buffer? */
-       if(q->len >= q->limit){
+       if (q->len >= q->limit) {
                q->state |= Qflow;
                spin_unlock_irqsave(&q->lock);
                return -1;
@@ -710,15 +686,18 @@ qproduce(struct queue *q, void *vp, int len)
 
        /* save in buffer */
        /* use Qcoalesce here to save storage */
+       // TODO: Consider removing the Qcoalesce flag and force a coalescing
+       // strategy by default.
        b = q->blast;
-       if((q->state & Qcoalesce)==0 || q->bfirst==NULL || b->lim-b->wp < len){
+       if ((q->state & Qcoalesce) == 0 || q->bfirst == NULL
+               || b->lim - b->wp < len) {
                /* need a new block */
                b = iallocb(len);
-               if(b == 0){
+               if (b == 0) {
                        spin_unlock_irqsave(&q->lock);
                        return 0;
                }
-               if(q->bfirst)
+               if (q->bfirst)
                        q->blast->next = b;
                else
                        q->bfirst = b;
@@ -732,17 +711,17 @@ qproduce(struct queue *q, void *vp, int len)
        q->dlen += len;
        QDEBUG checkb(b, "qproduce");
 
-       if(q->state & Qstarve){
+       if (q->state & Qstarve) {
                q->state &= ~Qstarve;
                dowakeup = 1;
        }
 
-       if(q->len >= q->limit)
+       if (q->len >= q->limit)
                q->state |= Qflow;
        spin_unlock_irqsave(&q->lock);
 
-       if(dowakeup)
-               wakeup(&q->rr);
+       if (dowakeup)
+               rendez_wakeup(&q->rr);
 
        return len;
 }
@@ -750,8 +729,7 @@ qproduce(struct queue *q, void *vp, int len)
 /*
  *  copy from offset in the queue
  */
-struct block*
-qcopy(struct queue *q, int len, uint32_t offset)
+struct block *qcopy(struct queue *q, int len, uint32_t offset)
 {
        int sofar;
        int n;
@@ -764,13 +742,13 @@ qcopy(struct queue *q, int len, uint32_t offset)
 
        /* go to offset */
        b = q->bfirst;
-       for(sofar = 0; ; sofar += n){
-               if(b == NULL){
+       for (sofar = 0;; sofar += n) {
+               if (b == NULL) {
                        spin_unlock_irqsave(&q->lock);
                        return nb;
                }
                n = BLEN(b);
-               if(sofar + n > offset){
+               if (sofar + n > offset) {
                        p = b->rp + offset - sofar;
                        n -= offset - sofar;
                        break;
@@ -780,15 +758,15 @@ qcopy(struct queue *q, int len, uint32_t offset)
        }
 
        /* copy bytes from there */
-       for(sofar = 0; sofar < len;){
-               if(n > len - sofar)
+       for (sofar = 0; sofar < len;) {
+               if (n > len - sofar)
                        n = len - sofar;
                memmove(nb->wp, p, n);
                qcopycnt += n;
                sofar += n;
                nb->wp += n;
                b = b->next;
-               if(b == NULL)
+               if (b == NULL)
                        break;
                n = BLEN(b);
                p = b->rp;
@@ -798,19 +776,28 @@ qcopy(struct queue *q, int len, uint32_t offset)
        return nb;
 }
 
+static void qinit_common(struct queue *q)
+{
+       spinlock_init_irqsave(&q->lock);
+       qlock_init(&q->rlock);
+       qlock_init(&q->wlock);
+       rendez_init(&q->rr);
+       rendez_init(&q->wr);
+}
+
 /*
  *  called by non-interrupt code
  */
-struct queue*
-qopen(int limit, int msg, void (*kick)(void*), void *arg)
+struct queue *qopen(int limit, int msg, void (*kick) (void *), void *arg)
 {
        struct queue *q;
 
        q = kzmalloc(sizeof(struct queue), 0);
-       if(q == 0)
+       if (q == 0)
                return 0;
+       qinit_common(q);
 
-       q->limit = q->iNULLim = limit;
+       q->limit = q->inilim = limit;
        q->kick = kick;
        q->arg = arg;
        q->state = msg;
@@ -822,14 +809,14 @@ qopen(int limit, int msg, void (*kick)(void*), void *arg)
 }
 
 /* open a queue to be bypassed */
-struct queue*
-qbypass(void (*bypass)(void*, struct block*), void *arg)
+struct queue *qbypass(void (*bypass) (void *, struct block *), void *arg)
 {
        struct queue *q;
 
        q = kzmalloc(sizeof(struct queue), 0);
-       if(q == 0)
+       if (q == 0)
                return 0;
+       qinit_common(q);
 
        q->limit = 0;
        q->arg = arg;
@@ -839,37 +826,36 @@ qbypass(void (*bypass)(void*, struct block*), void *arg)
        return q;
 }
 
-static int
-notempty(void *a)
+static int notempty(void *a)
 {
        struct queue *q = a;
 
        return (q->state & Qclosed) || q->bfirst != 0;
 }
 
-/*
- *  wait for the queue to be non-empty or closed.
- *  called with q ilocked.
- */
-static int
-qwait(struct queue *q)
+/* wait for the queue to be non-empty or closed.
+ *
+ * called with q ilocked.  rendez may error out, back through the caller, with
+ * the irqsave lock unlocked.  */
+static int qwait(struct queue *q)
 {
        /* wait for data */
-       for(;;){
-               if(q->bfirst != NULL)
+       for (;;) {
+               if (q->bfirst != NULL)
                        break;
 
-               if(q->state & Qclosed){
-                       if(++q->eof > 3)
+               if (q->state & Qclosed) {
+                       if (++q->eof > 3)
                                return -1;
-                       if(*q->err && strcmp(q->err, Ehungup) != 0)
+                       if (*q->err && strcmp(q->err, Ehungup) != 0)
                                return -1;
                        return 0;
                }
 
                q->state |= Qstarve;    /* flag requesting producer to wake me */
                spin_unlock_irqsave(&q->lock);
-               sleep(&q->rr, notempty, q);
+               /* may throw an error() */
+               rendez_sleep(&q->rr, notempty, q);
                spin_lock_irqsave(&q->lock);
        }
        return 1;
@@ -878,17 +864,16 @@ qwait(struct queue *q)
 /*
  * add a block list to a queue
  */
-void
-qaddlist(struct queue *q, struct block *b)
+void qaddlist(struct queue *q, struct block *b)
 {
        /* queue the block */
-       if(q->bfirst)
+       if (q->bfirst)
                q->blast->next = b;
        else
                q->bfirst = b;
        q->len += blockalloclen(b);
        q->dlen += blocklen(b);
-       while(b->next)
+       while (b->next)
                b = b->next;
        q->blast = b;
 }
@@ -896,13 +881,12 @@ qaddlist(struct queue *q, struct block *b)
 /*
  *  called with q ilocked
  */
-struct block*
-qremove(struct queue *q)
+struct block *qremove(struct queue *q)
 {
        struct block *b;
 
        b = q->bfirst;
-       if(b == NULL)
+       if (b == NULL)
                return NULL;
        q->bfirst = b->next;
        b->next = NULL;
@@ -917,15 +901,14 @@ qremove(struct queue *q)
  *  memory.  emptied blocks are freed.  return
  *  pointer to first unconsumed block.
  */
-struct block*
-bl2mem(uint8_t *p, struct block *b, int n)
+struct block *bl2mem(uint8_t * p, struct block *b, int n)
 {
        int i;
        struct block *next;
 
-       for(; b != NULL; b = next){
+       for (; b != NULL; b = next) {
                i = BLEN(b);
-               if(i > n){
+               if (i > n) {
                        memmove(p, b->rp, n);
                        b->rp += n;
                        return b;
@@ -944,22 +927,21 @@ bl2mem(uint8_t *p, struct block *b, int n)
  *  copy the contents of memory into a string of blocks.
  *  return NULL on error.
  */
-struct block*
-mem2bl(uint8_t *p, int len)
+struct block *mem2bl(uint8_t * p, int len)
 {
-       ERRSTACK(2);
+       ERRSTACK(1);
        int n;
        struct block *b, *first, **l;
 
        first = NULL;
        l = &first;
-       if(waserror()){
+       if (waserror()) {
                freeblist(first);
                nexterror();
        }
        do {
                n = len;
-               if(n > Maxatomic)
+               if (n > Maxatomic)
                        n = Maxatomic;
 
                *l = b = allocb(n);
@@ -968,7 +950,7 @@ mem2bl(uint8_t *p, int len)
                p += n;
                len -= n;
                l = &b->next;
-       } while(len > 0);
+       } while (len > 0);
        poperror();
 
        return first;
@@ -978,11 +960,10 @@ mem2bl(uint8_t *p, int len)
  *  put a block back to the front of the queue
  *  called with q ilocked
  */
-void
-qputback(struct queue *q, struct block *b)
+void qputback(struct queue *q, struct block *b)
 {
        b->next = q->bfirst;
-       if(q->bfirst == NULL)
+       if (q->bfirst == NULL)
                q->blast = b;
        q->bfirst = b;
        q->len += BALLOC(b);
@@ -993,13 +974,12 @@ qputback(struct queue *q, struct block *b)
  *  flow control, get producer going again
  *  called with q ilocked
  */
-static void
-qwakeup_iunlock(struct queue *q)
+static void qwakeup_iunlock(struct queue *q)
 {
        int dowakeup = 0;
 
        /* if writer flow controlled, restart */
-       if((q->state & Qflow) && q->len < q->limit/2){
+       if ((q->state & Qflow) && q->len < q->limit / 2) {
                q->state &= ~Qflow;
                dowakeup = 1;
        }
@@ -1007,41 +987,40 @@ qwakeup_iunlock(struct queue *q)
        spin_unlock_irqsave(&q->lock);
 
        /* wakeup flow controlled writers */
-       if(dowakeup){
-               if(q->kick)
+       if (dowakeup) {
+               if (q->kick)
                        q->kick(q->arg);
-               wakeup(&q->wr);
+               rendez_wakeup(&q->wr);
        }
 }
 
 /*
  *  get next block from a queue (up to a limit)
  */
-struct block*
-qbread(struct queue *q, int len)
+struct block *qbread(struct queue *q, int len)
 {
-       ERRSTACK(2);
+       ERRSTACK(1);
        struct block *b, *nb;
        int n;
 
        qlock(&q->rlock);
-       if(waserror()){
+       if (waserror()) {
                qunlock(&q->rlock);
                nexterror();
        }
 
        spin_lock_irqsave(&q->lock);
-       switch(qwait(q)){
-       case 0:
-               /* queue closed */
-               spin_unlock_irqsave(&q->lock);
-               qunlock(&q->rlock);
-               poperror();
-               return NULL;
-       case -1:
-               /* multiple reads on a closed queue */
-               spin_unlock_irqsave(&q->lock);
-               error(q->err);
+       switch (qwait(q)) {
+               case 0:
+                       /* queue closed */
+                       spin_unlock_irqsave(&q->lock);
+                       qunlock(&q->rlock);
+                       poperror();
+                       return NULL;
+               case -1:
+                       /* multiple reads on a closed queue */
+                       spin_unlock_irqsave(&q->lock);
+                       error(q->err);
        }
 
        /* if we get here, there's at least one block in the queue */
@@ -1050,11 +1029,11 @@ qbread(struct queue *q, int len)
 
        /* split block if it's too big and this is not a message queue */
        nb = b;
-       if(n > len){
-               if((q->state&Qmsg) == 0){
+       if (n > len) {
+               if ((q->state & Qmsg) == 0) {
                        n -= len;
                        b = allocb(n);
-                       memmove(b->wp, nb->rp+len, n);
+                       memmove(b->wp, nb->rp + len, n);
                        b->wp += n;
                        qputback(q, b);
                }
@@ -1073,39 +1052,40 @@ qbread(struct queue *q, int len)
  *  read a queue.  if no data is queued, post a struct block
  *  and wait on its Rendez.
  */
-long
-qread(struct queue *q, void *vp, int len)
+long qread(struct queue *q, void *vp, int len)
 {
-       ERRSTACK(2);
+       ERRSTACK(1);
        struct block *b, *first, **l;
        int m, n;
 
        qlock(&q->rlock);
-       if(waserror()){
+       if (waserror()) {
                qunlock(&q->rlock);
                nexterror();
        }
 
        spin_lock_irqsave(&q->lock);
 again:
-       switch(qwait(q)){
-       case 0:
-               /* queue closed */
-               spin_unlock_irqsave(&q->lock);
-               qunlock(&q->rlock);
-               poperror();
-               return 0;
-       case -1:
-               /* multiple reads on a closed queue */
-               spin_unlock_irqsave(&q->lock);
-               error(q->err);
+       switch (qwait(q)) {
+               case 0:
+                       /* queue closed */
+                       spin_unlock_irqsave(&q->lock);
+                       qunlock(&q->rlock);
+                       poperror();
+                       return 0;
+               case -1:
+                       /* multiple reads on a closed queue */
+                       spin_unlock_irqsave(&q->lock);
+                       error(q->err);
        }
 
        /* if we get here, there's at least one block in the queue */
-       if(q->state & Qcoalesce){
+       // TODO: Consider removing the Qcoalesce flag and force a coalescing
+       // strategy by default.
+       if (q->state & Qcoalesce) {
                /* when coalescing, 0 length blocks just go away */
                b = q->bfirst;
-               if(BLEN(b) <= 0){
+               if (BLEN(b) <= 0) {
                        freeb(qremove(q));
                        goto again;
                }
@@ -1117,16 +1097,16 @@ again:
                n = 0;
                l = &first;
                m = BLEN(b);
-               for(;;) {
+               for (;;) {
                        *l = qremove(q);
                        l = &b->next;
                        n += m;
 
                        b = q->bfirst;
-                       if(b == NULL)
+                       if (b == NULL)
                                break;
                        m = BLEN(b);
-                       if(n+m > len)
+                       if (n + m > len)
                                break;
                }
        } else {
@@ -1140,9 +1120,9 @@ again:
        spin_lock_irqsave(&q->lock);
 
        /* take care of any left over partial block */
-       if(b != NULL){
+       if (b != NULL) {
                n -= BLEN(b);
-               if(q->state & Qmsg)
+               if (q->state & Qmsg)
                        freeb(b);
                else
                        qputback(q, b);
@@ -1156,8 +1136,7 @@ again:
        return n;
 }
 
-static int
-qnotfull(void *a)
+static int qnotfull(void *a)
 {
        struct queue *q = a;
 
@@ -1169,23 +1148,23 @@ uint32_t noblockcnt;
 /*
  *  add a block to a queue obeying flow control
  */
-long
-qbwrite(struct queue *q, struct block *b)
+long qbwrite(struct queue *q, struct block *b)
 {
-       ERRSTACK(2);
+       ERRSTACK(1);
        int n, dowakeup;
+       volatile bool should_free_b = TRUE;
 
        n = BLEN(b);
 
-       if(q->bypass){
-               (*q->bypass)(q->arg, b);
+       if (q->bypass) {
+               (*q->bypass) (q->arg, b);
                return n;
        }
 
        dowakeup = 0;
        qlock(&q->wlock);
-       if(waserror()){
-               if(b != NULL)
+       if (waserror()) {
+               if (b != NULL && should_free_b)
                        freeb(b);
                qunlock(&q->wlock);
                nexterror();
@@ -1194,14 +1173,14 @@ qbwrite(struct queue *q, struct block *b)
        spin_lock_irqsave(&q->lock);
 
        /* give up if the queue is closed */
-       if(q->state & Qclosed){
+       if (q->state & Qclosed) {
                spin_unlock_irqsave(&q->lock);
                error(q->err);
        }
 
        /* if nonblocking, don't queue over the limit */
-       if(q->len >= q->limit){
-               if(q->noblock){
+       if (q->len >= q->limit) {
+               if (q->noblock) {
                        spin_unlock_irqsave(&q->lock);
                        freeb(b);
                        noblockcnt += n;
@@ -1212,7 +1191,8 @@ qbwrite(struct queue *q, struct block *b)
        }
 
        /* queue the block */
-       if(q->bfirst)
+       should_free_b = FALSE;
+       if (q->bfirst)
                q->blast->next = b;
        else
                q->bfirst = b;
@@ -1224,19 +1204,19 @@ qbwrite(struct queue *q, struct block *b)
        b = NULL;
 
        /* make sure other end gets awakened */
-       if(q->state & Qstarve){
+       if (q->state & Qstarve) {
                q->state &= ~Qstarve;
                dowakeup = 1;
        }
        spin_unlock_irqsave(&q->lock);
 
        /*  get output going again */
-       if(q->kick && (dowakeup || (q->state&Qkick)))
+       if (q->kick && (dowakeup || (q->state & Qkick)))
                q->kick(q->arg);
 
        /* wakeup anyone consuming at the other end */
-       if(dowakeup)
-               wakeup(&q->rr);
+       if (dowakeup)
+               rendez_wakeup(&q->rr);
 
        /*
         *  flow control, wait for queue to get below the limit
@@ -1250,14 +1230,14 @@ qbwrite(struct queue *q, struct block *b)
         *  that keeps getting interrupted and rewriting will
         *  queue infinite crud.
         */
-       for(;;){
-               if(q->noblock || qnotfull(q))
+       for (;;) {
+               if (q->noblock || qnotfull(q))
                        break;
 
                spin_lock_irqsave(&q->lock);
                q->state |= Qflow;
                spin_unlock_irqsave(&q->lock);
-               sleep(&q->wr, qnotfull, q);
+               rendez_sleep(&q->wr, qnotfull, q);
        }
 
        qunlock(&q->wlock);
@@ -1268,36 +1248,35 @@ qbwrite(struct queue *q, struct block *b)
 /*
  *  write to a queue.  only Maxatomic bytes at a time is atomic.
  */
-int
-qwrite(struct queue *q, void *vp, int len)
+int qwrite(struct queue *q, void *vp, int len)
 {
-       ERRSTACK(2);
+       ERRSTACK(1);
        int n, sofar;
        struct block *b;
        uint8_t *p = vp;
 
-       QDEBUG if(!islo())
-               printd("qwrite hi %lux\n", getcallerpc(&q));
+       QDEBUG if (!islo())
+                printd("qwrite hi %p\n", getcallerpc(&q));
 
        sofar = 0;
        do {
-               n = len-sofar;
-               if(n > Maxatomic)
+               n = len - sofar;
+               if (n > Maxatomic)
                        n = Maxatomic;
 
                b = allocb(n);
-               if(waserror()){
+               if (waserror()) {
                        freeb(b);
                        nexterror();
                }
-               memmove(b->wp, p+sofar, n);
+               memmove(b->wp, p + sofar, n);
                poperror();
                b->wp += n;
 
                qbwrite(q, b);
 
                sofar += n;
-       } while(sofar < len && (q->state & Qmsg) == 0);
+       } while (sofar < len && (q->state & Qmsg) == 0);
 
        return len;
 }
@@ -1306,8 +1285,7 @@ qwrite(struct queue *q, void *vp, int len)
  *  used by print() to write to a queue.  Since we may be splhi or not in
  *  a process, don't qlock.
  */
-int
-qiwrite(struct queue *q, void *vp, int len)
+int qiwrite(struct queue *q, void *vp, int len)
 {
        int n, sofar, dowakeup;
        struct block *b;
@@ -1317,20 +1295,20 @@ qiwrite(struct queue *q, void *vp, int len)
 
        sofar = 0;
        do {
-               n = len-sofar;
-               if(n > Maxatomic)
+               n = len - sofar;
+               if (n > Maxatomic)
                        n = Maxatomic;
 
                b = iallocb(n);
-               if(b == NULL)
+               if (b == NULL)
                        break;
-               memmove(b->wp, p+sofar, n);
+               memmove(b->wp, p + sofar, n);
                b->wp += n;
 
                spin_lock_irqsave(&q->lock);
 
                QDEBUG checkb(b, "qiwrite");
-               if(q->bfirst)
+               if (q->bfirst)
                        q->blast->next = b;
                else
                        q->bfirst = b;
@@ -1338,21 +1316,21 @@ qiwrite(struct queue *q, void *vp, int len)
                q->len += BALLOC(b);
                q->dlen += n;
 
-               if(q->state & Qstarve){
+               if (q->state & Qstarve) {
                        q->state &= ~Qstarve;
                        dowakeup = 1;
                }
 
                spin_unlock_irqsave(&q->lock);
 
-               if(dowakeup){
-                       if(q->kick)
+               if (dowakeup) {
+                       if (q->kick)
                                q->kick(q->arg);
-                       wakeup(&q->rr);
+                       rendez_wakeup(&q->rr);
                }
 
                sofar += n;
-       } while(sofar < len && (q->state & Qmsg) == 0);
+       } while (sofar < len && (q->state & Qmsg) == 0);
 
        return sofar;
 }
@@ -1361,8 +1339,7 @@ qiwrite(struct queue *q, void *vp, int len)
  *  be extremely careful when calling this,
  *  as there is no reference accounting
  */
-void
-qfree(struct queue *q)
+void qfree(struct queue *q)
 {
        qclose(q);
        kfree(q);
@@ -1372,19 +1349,18 @@ qfree(struct queue *q)
  *  Mark a queue as closed.  No further IO is permitted.
  *  All blocks are released.
  */
-void
-qclose(struct queue *q)
+void qclose(struct queue *q)
 {
        struct block *bfirst;
 
-       if(q == NULL)
+       if (q == NULL)
                return;
 
        /* mark it */
        spin_lock_irqsave(&q->lock);
        q->state |= Qclosed;
-       q->state &= ~(Qflow|Qstarve);
-       strncpy(q->err,  Ehungup, sizeof(q->err));
+       q->state &= ~(Qflow | Qstarve);
+       strncpy(q->err, Ehungup, sizeof(q->err));
        bfirst = q->bfirst;
        q->bfirst = 0;
        q->len = 0;
@@ -1396,36 +1372,34 @@ qclose(struct queue *q)
        freeblist(bfirst);
 
        /* wake up readers/writers */
-       wakeup(&q->rr);
-       wakeup(&q->wr);
+       rendez_wakeup(&q->rr);
+       rendez_wakeup(&q->wr);
 }
 
 /*
  *  Mark a queue as closed.  Wakeup any readers.  Don't remove queued
  *  blocks.
  */
-void
-qhangup(struct queue *q, char *msg)
+void qhangup(struct queue *q, char *msg)
 {
        /* mark it */
        spin_lock_irqsave(&q->lock);
        q->state |= Qclosed;
-       if(msg == 0 || *msg == 0)
-               strncpy(q->err,  Ehungup, sizeof(q->err));
+       if (msg == 0 || *msg == 0)
+               strncpy(q->err, Ehungup, sizeof(q->err));
        else
-               strncpy(q->err, msg, ERRMAX-1);
+               strncpy(q->err, msg, ERRMAX - 1);
        spin_unlock_irqsave(&q->lock);
 
        /* wake up readers/writers */
-       wakeup(&q->rr);
-       wakeup(&q->wr);
+       rendez_wakeup(&q->rr);
+       rendez_wakeup(&q->wr);
 }
 
 /*
  *  return non-zero if the q is hungup
  */
-int
-qisclosed(struct queue *q)
+int qisclosed(struct queue *q)
 {
        return q->state & Qclosed;
 }
@@ -1433,22 +1407,20 @@ qisclosed(struct queue *q)
 /*
  *  mark a queue as no longer hung up
  */
-void
-qreopen(struct queue *q)
+void qreopen(struct queue *q)
 {
        spin_lock_irqsave(&q->lock);
        q->state &= ~Qclosed;
        q->state |= Qstarve;
        q->eof = 0;
-       q->limit = q->iNULLim;
+       q->limit = q->inilim;
        spin_unlock_irqsave(&q->lock);
 }
 
 /*
  *  return bytes queued
  */
-int
-qlen(struct queue *q)
+int qlen(struct queue *q)
 {
        return q->dlen;
 }
@@ -1456,13 +1428,12 @@ qlen(struct queue *q)
 /*
  * return space remaining before flow control
  */
-int
-qwindow(struct queue *q)
+int qwindow(struct queue *q)
 {
        int l;
 
        l = q->limit - q->len;
-       if(l < 0)
+       if (l < 0)
                l = 0;
        return l;
 }
@@ -1470,17 +1441,15 @@ qwindow(struct queue *q)
 /*
  *  return true if we can read without blocking
  */
-int
-qcanread(struct queue *q)
+int qcanread(struct queue *q)
 {
-       return q->bfirst!=0;
+       return q->bfirst != 0;
 }
 
 /*
  *  change queue limit
  */
-void
-qsetlimit(struct queue *q, int limit)
+void qsetlimit(struct queue *q, int limit)
 {
        q->limit = limit;
 }
@@ -1488,8 +1457,7 @@ qsetlimit(struct queue *q, int limit)
 /*
  *  set blocking/nonblocking
  */
-void
-qnoblock(struct queue *q, int onoff)
+void qnoblock(struct queue *q, int onoff)
 {
        q->noblock = onoff;
 }
@@ -1497,8 +1465,7 @@ qnoblock(struct queue *q, int onoff)
 /*
  *  flush the output queue
  */
-void
-qflush(struct queue *q)
+void qflush(struct queue *q)
 {
        struct block *bfirst;
 
@@ -1514,25 +1481,22 @@ qflush(struct queue *q)
        freeblist(bfirst);
 
        /* wake up readers/writers */
-       wakeup(&q->wr);
+       rendez_wakeup(&q->wr);
 }
 
-int
-qfull(struct queue *q)
+int qfull(struct queue *q)
 {
        return q->state & Qflow;
 }
 
-int
-qstate(struct queue *q)
+int qstate(struct queue *q)
 {
        return q->state;
 }
 
-void
-qdump(struct queue *q)
+void qdump(struct queue *q)
 {
-       if(q)
-       printk("q=%p bfirst=%p blast=%p len=%d dlen=%d limit=%d state=#%x\n",
-               q, q->bfirst, q->blast, q->len, q->dlen, q->limit, q->state);
+       if (q)
+               printk("q=%p bfirst=%p blast=%p len=%d dlen=%d limit=%d state=#%x\n",
+                          q, q->bfirst, q->blast, q->len, q->dlen, q->limit, q->state);
 }