qio: Remove q->len
[akaros.git] / kern / src / ns / qio.c
index b1c6b7c..07c58c0 100644 (file)
@@ -71,7 +71,6 @@ struct queue {
        struct block *bfirst;           /* buffer */
        struct block *blast;
 
-       int len;                                        /* bytes allocated to queue */
        int dlen;                                       /* data bytes in queue */
        int limit;                                      /* max bytes in queue */
        int inilim;                             /* initial limit */
@@ -97,15 +96,16 @@ enum {
        QIO_DROP_OVERFLOW = (1 << 2),   /* alternative to setting qdropoverflow */
        QIO_JUST_ONE_BLOCK = (1 << 3),  /* when qbreading, just get one block */
        QIO_NON_BLOCK = (1 << 4),               /* throw EAGAIN instead of blocking */
+       QIO_DONT_KICK = (1 << 5),               /* don't kick when waking */
 };
 
 unsigned int qiomaxatomic = Maxatomic;
 
+static size_t copy_to_block_body(struct block *to, void *from, size_t copy_amt);
 static ssize_t __qbwrite(struct queue *q, struct block *b, int flags);
 static struct block *__qbread(struct queue *q, size_t len, int qio_flags,
                               int mem_flags);
 static bool qwait_and_ilock(struct queue *q, int qio_flags);
-static void qwakeup_iunlock(struct queue *q);
 
 /* Helper: fires a wake callback, sending 'filter' */
 static void qwake_cb(struct queue *q, int filter)
@@ -239,45 +239,58 @@ struct block *concatblock(struct block *bp)
        return nb;
 }
 
-/* Returns a block with the remaining contents of b all in the main body of the
- * returned block.  Replace old references to b with the returned value (which
- * may still be 'b', if no change was needed. */
-struct block *linearizeblock(struct block *b)
+/* Makes an identical copy of the block, collapsing all the data into the block
+ * body.  It does not point to the contents of the original, it is a copy
+ * (unlike qclone).  Since we're copying, we might as well put the memory into
+ * one contiguous chunk. */
+struct block *copyblock(struct block *bp, int mem_flags)
 {
        struct block *newb;
-       size_t len;
        struct extra_bdata *ebd;
+       size_t amt;
 
-       if (!b->extra_len)
-               return b;
-
-       newb = block_alloc(BLEN(b), MEM_WAIT);
-       len = BHLEN(b);
-       memcpy(newb->wp, b->rp, len);
-       newb->wp += len;
-       len = b->extra_len;
-       for (int i = 0; (i < b->nr_extra_bufs) && len; i++) {
-               ebd = &b->extra_data[i];
+       QDEBUG checkb(bp, "copyblock 0");
+       newb = block_alloc(BLEN(bp), mem_flags);
+       if (!newb)
+               return 0;
+       amt = copy_to_block_body(newb, bp->rp, BHLEN(bp));
+       assert(amt == BHLEN(bp));
+       for (int i = 0; i < bp->nr_extra_bufs; i++) {
+               ebd = &bp->extra_data[i];
                if (!ebd->base || !ebd->len)
                        continue;
-               memcpy(newb->wp, (void*)(ebd->base + ebd->off), ebd->len);
-               newb->wp += ebd->len;
-               len -= ebd->len;
+               amt = copy_to_block_body(newb, (void*)ebd->base + ebd->off, ebd->len);
+               assert(amt == ebd->len);
        }
        /* TODO: any other flags that need copied over? */
-       if (b->flag & BCKSUM_FLAGS) {
-               newb->flag |= (b->flag & BCKSUM_FLAGS);
-               newb->checksum_start = b->checksum_start;
-               newb->checksum_offset = b->checksum_offset;
-               newb->mss = b->mss;
+       if (bp->flag & BCKSUM_FLAGS) {
+               newb->flag |= (bp->flag & BCKSUM_FLAGS);
+               newb->checksum_start = bp->checksum_start;
+               newb->checksum_offset = bp->checksum_offset;
+               newb->mss = bp->mss;
        }
+       copyblockcnt++;
+       QDEBUG checkb(newb, "copyblock 1");
+       return newb;
+}
+
+/* Returns a block with the remaining contents of b all in the main body of the
+ * returned block.  Replace old references to b with the returned value (which
+ * may still be 'b', if no change was needed. */
+struct block *linearizeblock(struct block *b)
+{
+       struct block *newb;
+
+       if (!b->extra_len)
+               return b;
+       newb = copyblock(b, MEM_WAIT);
        freeb(b);
        return newb;
 }
 
-/*
- *  make sure the first block has at least n bytes in its main body
- */
+/* Make sure the first block has at least n bytes in its main body.  Pulls up
+ * data from the *list* of blocks.  Returns 0 if there is not enough data in the
+ * block list. */
 struct block *pullupblock(struct block *bp, int n)
 {
        int i, len, seglen;
@@ -291,7 +304,11 @@ struct block *pullupblock(struct block *bp, int n)
        if (BHLEN(bp) >= n)
                return bp;
 
-        /* a start at explicit main-body / header management */
+       /* If there's no chance, just bail out now.  This might be slightly wasteful
+        * if there's a long blist that does have enough data. */
+       if (n > blocklen(bp))
+               return 0;
+       /* a start at explicit main-body / header management */
        if (bp->extra_len) {
                if (n > bp->lim - bp->rp) {
                        /* would need to realloc a new block and copy everything over. */
@@ -299,10 +316,12 @@ struct block *pullupblock(struct block *bp, int n)
                                        n, bp->lim, bp->rp, bp->lim-bp->rp);
                }
                len = n - BHLEN(bp);
+               /* Would need to recursively call this, or otherwise pull from later
+                * blocks and put chunks of their data into the block we're building. */
                if (len > bp->extra_len)
                        panic("pullup more than extra (%d, %d, %d)\n",
                              n, BHLEN(bp), bp->extra_len);
-               checkb(bp, "before pullup");
+               QDEBUG checkb(bp, "before pullup");
                for (int i = 0; (i < bp->nr_extra_bufs) && len; i++) {
                        ebd = &bp->extra_data[i];
                        if (!ebd->base || !ebd->len)
@@ -323,7 +342,7 @@ struct block *pullupblock(struct block *bp, int n)
                /* maybe just call pullupblock recursively here */
                if (len)
                        panic("pullup %d bytes overdrawn\n", len);
-               checkb(bp, "after pullup");
+               QDEBUG checkb(bp, "after pullup");
                return bp;
        }
 
@@ -514,41 +533,6 @@ struct block *trimblock(struct block *bp, int offset, int len)
        return bp;
 }
 
-/*
- *  copy 'count' bytes into a new block
- */
-struct block *copyblock(struct block *bp, int count)
-{
-       int l;
-       struct block *nbp;
-
-       QDEBUG checkb(bp, "copyblock 0");
-       nbp = block_alloc(count, MEM_WAIT);
-       if (bp->flag & BCKSUM_FLAGS) {
-               nbp->flag |= (bp->flag & BCKSUM_FLAGS);
-               nbp->checksum_start = bp->checksum_start;
-               nbp->checksum_offset = bp->checksum_offset;
-               nbp->mss = bp->mss;
-       }
-       PANIC_EXTRA(bp);
-       for (; count > 0 && bp != 0; bp = bp->next) {
-               l = BLEN(bp);
-               if (l > count)
-                       l = count;
-               memmove(nbp->wp, bp->rp, l);
-               nbp->wp += l;
-               count -= l;
-       }
-       if (count > 0) {
-               memset(nbp->wp, 0, count);
-               nbp->wp += count;
-       }
-       copyblockcnt++;
-       QDEBUG checkb(nbp, "copyblock 1");
-
-       return nbp;
-}
-
 /* Adjust block @bp so that its size is exactly @len.
  * If the size is increased, fill in the new contents with zeros.
  * If the size is decreased, discard some of the old contents at the tail. */
@@ -625,7 +609,6 @@ static struct block *pop_first_block(struct queue *q)
 {
        struct block *b = q->bfirst;
 
-       q->len -= BALLOC(b);
        q->dlen -= BLEN(b);
        q->bfirst = b->next;
        b->next = 0;
@@ -645,7 +628,6 @@ static size_t copy_to_block_body(struct block *to, void *from, size_t copy_amt)
 static void block_and_q_lost_extra(struct block *b, struct queue *q, size_t amt)
 {
        b->extra_len -= amt;
-       q->len -= amt;
        q->dlen -= amt;
 }
 
@@ -748,6 +730,7 @@ static int __try_qbread(struct queue *q, size_t len, int qio_flags,
 {
        struct block *ret, *ret_last, *first;
        size_t blen;
+       bool was_unwritable = FALSE;
 
        if (qio_flags & QIO_CAN_ERR_SLEEP) {
                if (!qwait_and_ilock(q, qio_flags)) {
@@ -764,6 +747,12 @@ static int __try_qbread(struct queue *q, size_t len, int qio_flags,
                        return QBR_FAIL;
                }
        }
+       /* We need to check before adjusting q->len.  We're checking the writer's
+        * sleep condition / tap condition.  When set, we *might* be making an edge
+        * transition (from unwritable to writable), which needs to wake and fire
+        * taps.  But, our read might not drain the queue below q->lim.  We'll check
+        * again later to see if we should really wake them.  */
+       was_unwritable = !qwritable(q);
        blen = BLEN(first);
        if ((q->state & Qcoalesce) && (blen == 0)) {
                freeb(pop_first_block(q));
@@ -771,9 +760,8 @@ static int __try_qbread(struct queue *q, size_t len, int qio_flags,
                /* Need to retry to make sure we have a first block */
                return QBR_AGAIN;
        }
-       /* Qmsg is a bit weird.  The old 9ns code seemed to yank the entire block,
-        * regardless of len.  We'll do the same, and just return the minimum: the
-        * first block.  I'd be happy to remove this. */
+       /* Qmsg: just return the first block.  Be careful, since our caller might
+        * not read all of the block and thus drop bytes.  Similar to SOCK_DGRAM. */
        if (q->state & Qmsg) {
                ret = pop_first_block(q);
                goto out_ok;
@@ -820,7 +808,16 @@ static int __try_qbread(struct queue *q, size_t len, int qio_flags,
                len -= blen;
        }
 out_ok:
-       qwakeup_iunlock(q);
+       /* Don't wake them up or fire tap if we didn't drain enough. */
+       if (!qwritable(q))
+               was_unwritable = FALSE;
+       spin_unlock_irqsave(&q->lock);
+       if (was_unwritable) {
+               if (q->kick && !(qio_flags & QIO_DONT_KICK))
+                       q->kick(q->arg);
+               rendez_wakeup(&q->wr);
+               qwake_cb(q, FDTAP_FILT_WRITABLE);
+       }
        *real_ret = ret;
        return QBR_OK;
 }
@@ -877,7 +874,10 @@ struct block *qget(struct queue *q)
  * discarded.
  *
  * If the bytes are in the queue, then they must be discarded.  The only time to
- * return less than len is if the q itself has less than len bytes.  */
+ * return less than len is if the q itself has less than len bytes.
+ *
+ * This won't trigger a kick when waking up any sleepers.  This seems to be Plan
+ * 9's intent, since the TCP stack will deadlock if qdiscard kicks. */
 size_t qdiscard(struct queue *q, size_t len)
 {
        struct block *blist;
@@ -887,7 +887,7 @@ size_t qdiscard(struct queue *q, size_t len)
        /* This is racy.  There could be multiple qdiscarders or other consumers,
         * where the consumption could be interleaved. */
        while (qlen(q) && len) {
-               blist = __qbread(q, len, 0, MEM_WAIT);
+               blist = __qbread(q, len, QIO_DONT_KICK, MEM_WAIT);
                removed_amt = freeblist(blist);
                sofar += removed_amt;
                len -= removed_amt;
@@ -932,7 +932,7 @@ struct block *packblock(struct block *bp)
 }
 
 /* Add an extra_data entry to newb at newb_idx pointing to b's body, starting at
- * body_rp, for up to len.  Returns the len consumed. 
+ * body_rp, for up to len.  Returns the len consumed.
  *
  * The base is 'b', so that we can kfree it later.  This currently ties us to
  * using kfree for the release method for all extra_data.
@@ -979,8 +979,9 @@ static size_t point_to_buf(struct block *b, unsigned int b_idx, uint32_t b_off,
        return n_ebd->len;
 }
 
-/* given a string of blocks, fills the new block's extra_data  with the contents
- * of the blist [offset, len + offset)
+/* given a string of blocks, sets up the new block's extra_data such that it
+ * *points* to the contents of the blist [offset, len + offset).  This does not
+ * make a separate copy of the contents of the blist.
  *
  * returns 0 on success.  the only failure is if the extra_data array was too
  * small, so this returns a positive integer saying how big the extra_data needs
@@ -1068,7 +1069,8 @@ struct block *blist_clone(struct block *blist, int header_len, int len,
 
 /* given a queue, makes a single block with header_len reserved space in the
  * block main body, and the contents of [offset, len + offset) pointed to in the
- * new blocks ext_data. */
+ * new blocks ext_data.  This does not make a copy of the q's contents, though
+ * you do have a ref count on the memory. */
 struct block *qclone(struct queue *q, int header_len, int len, uint32_t offset)
 {
        int ret;
@@ -1169,7 +1171,6 @@ struct queue *qopen(int limit, int msg, void (*kick) (void *), void *arg)
        q->kick = kick;
        q->arg = arg;
        q->state = msg;
-       q->state |= Qstarve;
        q->eof = 0;
 
        return q;
@@ -1212,29 +1213,38 @@ static bool qwait_and_ilock(struct queue *q, int qio_flags)
                if (q->state & Qclosed) {
                        if (++q->eof > 3) {
                                spin_unlock_irqsave(&q->lock);
-                               error(EFAIL, "multiple reads on a closed queue");
+                               error(EPIPE, "multiple reads on a closed queue");
                        }
                        if (q->err[0]) {
                                spin_unlock_irqsave(&q->lock);
-                               error(EFAIL, q->err);
+                               error(EPIPE, q->err);
                        }
                        return FALSE;
                }
-               /* We set Qstarve regardless of whether we are non-blocking or not.
-                * Qstarve tracks the edge detection of the queue being empty. */
-               q->state |= Qstarve;
                if (qio_flags & QIO_NON_BLOCK) {
                        spin_unlock_irqsave(&q->lock);
                        error(EAGAIN, "queue empty");
                }
                spin_unlock_irqsave(&q->lock);
-               /* may throw an error() */
+               /* As with the producer side, we check for a condition while holding the
+                * q->lock, decide to sleep, then unlock.  It's like the "check, signal,
+                * check again" pattern, but we do it conditionally.  Both sides agree
+                * synchronously to do it, and those decisions are made while holding
+                * q->lock.  I think this is OK.
+                *
+                * The invariant is that no reader sleeps when the queue has data.
+                * While holding the rendez lock, if we see there's no data, we'll
+                * sleep.  Since we saw there was no data, the next writer will see (or
+                * already saw) no data, and then the writer decides to rendez_wake,
+                * which will grab the rendez lock.  If the writer already did that,
+                * then we'll see notempty when we do our check-again. */
                rendez_sleep(&q->rr, notempty, q);
        }
 }
 
 /*
  * add a block list to a queue
+ * XXX basically the same as enqueue blist, and has no locking!
  */
 void qaddlist(struct queue *q, struct block *b)
 {
@@ -1244,7 +1254,6 @@ void qaddlist(struct queue *q, struct block *b)
                q->blast->next = b;
        else
                q->bfirst = b;
-       q->len += blockalloclen(b);
        q->dlen += blocklen(b);
        while (b->next)
                b = b->next;
@@ -1255,7 +1264,7 @@ static size_t read_from_block(struct block *b, uint8_t *to, size_t amt)
 {
        size_t copy_amt, retval = 0;
        struct extra_bdata *ebd;
-       
+
        copy_amt = MIN(BHLEN(b), amt);
        memcpy(to, b->rp, copy_amt);
        /* advance the rp, since this block not be completely consumed and future
@@ -1328,6 +1337,9 @@ static size_t read_all_blocks(struct block *b, void *va, size_t len)
        do {
                /* We should be draining every block completely. */
                assert(BLEN(b) <= len - sofar);
+               assert(va);
+               assert(va + sofar);
+               assert(b->rp);
                sofar += read_from_block(b, va + sofar, len - sofar);
                next = b->next;
                freeb(b);
@@ -1380,45 +1392,10 @@ void qputback(struct queue *q, struct block *b)
        if (q->bfirst == NULL)
                q->blast = b;
        q->bfirst = b;
-       q->len += BALLOC(b);
        q->dlen += BLEN(b);
 }
 
 /*
- *  flow control, get producer going again
- *  called with q ilocked
- */
-static void qwakeup_iunlock(struct queue *q)
-{
-       int dowakeup = 0;
-
-       /*
-        *  if writer flow controlled, restart
-        *
-        *  This used to be
-        *  q->len < q->limit/2
-        *  but it slows down tcp too much for certain write sizes.
-        *  I really don't understand it completely.  It may be
-        *  due to the queue draining so fast that the transmission
-        *  stalls waiting for the app to produce more data.  - presotto
-        */
-       if ((q->state & Qflow) && q->len < q->limit) {
-               q->state &= ~Qflow;
-               dowakeup = 1;
-       }
-
-       spin_unlock_irqsave(&q->lock);
-
-       /* wakeup flow controlled writers */
-       if (dowakeup) {
-               if (q->kick)
-                       q->kick(q->arg);
-               rendez_wakeup(&q->wr);
-       }
-       qwake_cb(q, FDTAP_FILT_WRITABLE);
-}
-
-/*
  *  get next block from a queue (up to a limit)
  *
  */
@@ -1457,27 +1434,24 @@ static int qnotfull(void *a)
 {
        struct queue *q = a;
 
-       return q->len < q->limit || (q->state & Qclosed);
+       return qwritable(q) || (q->state & Qclosed);
 }
 
 /* Helper: enqueues a list of blocks to a queue.  Returns the total length. */
 static size_t enqueue_blist(struct queue *q, struct block *b)
 {
-       size_t len, dlen;
+       size_t dlen;
 
        if (q->bfirst)
                q->blast->next = b;
        else
                q->bfirst = b;
-       len = BALLOC(b);
        dlen = BLEN(b);
        while (b->next) {
                b = b->next;
-               len += BALLOC(b);
                dlen += BLEN(b);
        }
        q->blast = b;
-       q->len += len;
        q->dlen += dlen;
        return dlen;
 }
@@ -1488,26 +1462,26 @@ static size_t enqueue_blist(struct queue *q, struct block *b)
 static ssize_t __qbwrite(struct queue *q, struct block *b, int qio_flags)
 {
        ssize_t ret;
-       bool dowakeup = FALSE;
-       bool was_empty;
+       bool was_unreadable;
 
        if (q->bypass) {
+               ret = blocklen(b);
                (*q->bypass) (q->arg, b);
-               return blocklen(b);
+               return ret;
        }
        spin_lock_irqsave(&q->lock);
-       was_empty = q->len == 0;
+       was_unreadable = q->dlen == 0;
        if (q->state & Qclosed) {
                spin_unlock_irqsave(&q->lock);
                freeblist(b);
                if (!(qio_flags & QIO_CAN_ERR_SLEEP))
                        return -1;
                if (q->err[0])
-                       error(EFAIL, q->err);
+                       error(EPIPE, q->err);
                else
-                       error(EFAIL, "connection closed");
+                       error(EPIPE, "connection closed");
        }
-       if ((qio_flags & QIO_LIMIT) && (q->len >= q->limit)) {
+       if ((qio_flags & QIO_LIMIT) && (q->dlen >= q->limit)) {
                /* drop overflow takes priority over regular non-blocking */
                if ((qio_flags & QIO_DROP_OVERFLOW) || (q->state & Qdropoverflow)) {
                        spin_unlock_irqsave(&q->lock);
@@ -1524,20 +1498,20 @@ static ssize_t __qbwrite(struct queue *q, struct block *b, int qio_flags)
        }
        ret = enqueue_blist(q, b);
        QDEBUG checkb(b, "__qbwrite");
-       /* make sure other end gets awakened */
-       if (q->state & Qstarve) {
-               q->state &= ~Qstarve;
-               dowakeup = TRUE;
-       }
        spin_unlock_irqsave(&q->lock);
        /* TODO: not sure if the usage of a kick is mutually exclusive with a
         * wakeup, meaning that actual users either want a kick or have qreaders. */
-       if (q->kick && (dowakeup || (q->state & Qkick)))
+       if (q->kick && (was_unreadable || (q->state & Qkick)))
                q->kick(q->arg);
-       if (dowakeup)
+       if (was_unreadable) {
+               /* Unlike the read side, there's no double-check to make sure the queue
+                * transitioned across an edge.  We know we added something, so that's
+                * enough.  We wake if the queue was empty.  Both sides are the same, in
+                * that the condition for which we do the rendez_wakeup() is the same as
+                * the condition done for the rendez_sleep(). */
                rendez_wakeup(&q->rr);
-       if (was_empty)
                qwake_cb(q, FDTAP_FILT_READABLE);
+       }
        /*
         *  flow control, wait for queue to get below the limit
         *  before allowing the process to continue and queue
@@ -1552,16 +1526,27 @@ static ssize_t __qbwrite(struct queue *q, struct block *b, int qio_flags)
         */
        if ((qio_flags & QIO_CAN_ERR_SLEEP) &&
            !(q->state & Qdropoverflow) && !(qio_flags & QIO_NON_BLOCK)) {
-               /* This is a racy peek at the q status.  If we accidentally block, we
-                * set Qflow, so someone should wake us.  If we accidentally don't
-                * block, we just returned to the user and let them slip a block past
-                * flow control. */
-               while (!qnotfull(q)) {
-                       spin_lock_irqsave(&q->lock);
-                       q->state |= Qflow;
-                       spin_unlock_irqsave(&q->lock);
+               /* This is a racy peek at the q status.  If we accidentally block, our
+                * rendez will return.  The rendez's peak (qnotfull) is also racy w.r.t.
+                * the q's spinlock (that lock protects writes, but not reads).
+                *
+                * Here's the deal: when holding the rendez lock, if we see the sleep
+                * condition, the consumer will wake us.  The condition will only ever
+                * be changed by the next qbread() (consumer, changes q->dlen).  That
+                * code will do a rendez wake, which will spin on the rendez lock,
+                * meaning it won't procede until we either see the new state (and
+                * return) or put ourselves on the rendez, and wake up.
+                *
+                * The pattern is one side writes mem, then signals.  Our side checks
+                * the signal, then reads the mem.  The goal is to not miss seeing the
+                * signal AND missing the memory write.  In this specific case, the
+                * signal is actually synchronous (the rendez lock) and not basic shared
+                * memory.
+                *
+                * Oh, and we spin in case we woke early and someone else filled the
+                * queue, mesa-style. */
+               while (!qnotfull(q))
                        rendez_sleep(&q->wr, qnotfull, q);
-               }
        }
        return ret;
 }
@@ -1618,7 +1603,7 @@ static struct block *build_block(void *from, size_t len, int mem_flags)
        b->extra_data[0].len = len;
        b->extra_len += len;
 #else
-       b = block_alloc(n, mem_flags);
+       b = block_alloc(len, mem_flags);
        if (!b)
                return 0;
        memmove(b->wp, from, len);
@@ -1691,11 +1676,10 @@ void qclose(struct queue *q)
        /* mark it */
        spin_lock_irqsave(&q->lock);
        q->state |= Qclosed;
-       q->state &= ~(Qflow | Qstarve | Qdropoverflow | Qnonblock);
+       q->state &= ~Qdropoverflow;
        q->err[0] = 0;
        bfirst = q->bfirst;
        q->bfirst = 0;
-       q->len = 0;
        q->dlen = 0;
        spin_unlock_irqsave(&q->lock);
 
@@ -1746,7 +1730,6 @@ void qreopen(struct queue *q)
 {
        spin_lock_irqsave(&q->lock);
        q->state &= ~Qclosed;
-       q->state |= Qstarve;
        q->eof = 0;
        q->limit = q->inilim;
        q->wake_cb = 0;
@@ -1764,12 +1747,22 @@ int qlen(struct queue *q)
 
 /*
  * return space remaining before flow control
+ *
+ *  This used to be
+ *  q->len < q->limit/2
+ *  but it slows down tcp too much for certain write sizes.
+ *  I really don't understand it completely.  It may be
+ *  due to the queue draining so fast that the transmission
+ *  stalls waiting for the app to produce more data.  - presotto
+ *
+ *  q->len was the amount of bytes, which is no longer used.  we now use
+ *  q->dlen, the amount of usable data.  a.k.a. qlen()...  - brho
  */
 int qwindow(struct queue *q)
 {
        int l;
 
-       l = q->limit - q->len;
+       l = q->limit - q->dlen;
        if (l < 0)
                l = 0;
        return l;
@@ -1796,19 +1789,36 @@ void qsetlimit(struct queue *q, int limit)
  */
 void qdropoverflow(struct queue *q, bool onoff)
 {
+       spin_lock_irqsave(&q->lock);
        if (onoff)
                q->state |= Qdropoverflow;
        else
                q->state &= ~Qdropoverflow;
+       spin_unlock_irqsave(&q->lock);
 }
 
-/* set whether or not the queue is nonblocking, in the EAGAIN sense. */
-void qnonblock(struct queue *q, bool onoff)
+/* Be careful: this can affect concurrent reads/writes and code that might have
+ * built-in expectations of the q's type. */
+void q_toggle_qmsg(struct queue *q, bool onoff)
 {
+       spin_lock_irqsave(&q->lock);
        if (onoff)
-               q->state |= Qnonblock;
+               q->state |= Qmsg;
        else
-               q->state &= ~Qnonblock;
+               q->state &= ~Qmsg;
+       spin_unlock_irqsave(&q->lock);
+}
+
+/* Be careful: this can affect concurrent reads/writes and code that might have
+ * built-in expectations of the q's type. */
+void q_toggle_qcoalesce(struct queue *q, bool onoff)
+{
+       spin_lock_irqsave(&q->lock);
+       if (onoff)
+               q->state |= Qcoalesce;
+       else
+               q->state &= ~Qcoalesce;
+       spin_unlock_irqsave(&q->lock);
 }
 
 /*
@@ -1822,7 +1832,6 @@ void qflush(struct queue *q)
        spin_lock_irqsave(&q->lock);
        bfirst = q->bfirst;
        q->bfirst = 0;
-       q->len = 0;
        q->dlen = 0;
        spin_unlock_irqsave(&q->lock);
 
@@ -1836,7 +1845,7 @@ void qflush(struct queue *q)
 
 int qfull(struct queue *q)
 {
-       return q->len >= q->limit;
+       return q->dlen >= q->limit;
 }
 
 int qstate(struct queue *q)
@@ -1847,8 +1856,8 @@ int qstate(struct queue *q)
 void qdump(struct queue *q)
 {
        if (q)
-               printk("q=%p bfirst=%p blast=%p len=%d dlen=%d limit=%d state=#%x\n",
-                          q, q->bfirst, q->blast, q->len, q->dlen, q->limit, q->state);
+               printk("q=%p bfirst=%p blast=%p dlen=%d limit=%d state=#%x\n",
+                          q, q->bfirst, q->blast, q->dlen, q->limit, q->state);
 }
 
 /* On certain wakeup events, qio will call func(q, data, filter), where filter
@@ -1867,3 +1876,15 @@ void qio_set_wake_cb(struct queue *q, qio_wake_cb_t func, void *data)
        wmb();  /* if we see func, we'll also see the data for it */
        q->wake_cb = func;
 }
+
+/* Helper for detecting whether we'll block on a read at this instant. */
+bool qreadable(struct queue *q)
+{
+       return qlen(q) > 0;
+}
+
+/* Helper for detecting whether we'll block on a write at this instant. */
+bool qwritable(struct queue *q)
+{
+       return !q->limit || qwindow(q) > 0;
+}