qio: Consolidate readers into __qbread()
authorBarret Rhoden <brho@cs.berkeley.edu>
Wed, 30 Mar 2016 18:08:45 +0000 (14:08 -0400)
committerBarret Rhoden <brho@cs.berkeley.edu>
Tue, 5 Apr 2016 19:42:14 +0000 (15:42 -0400)
There are a bunch of different functions that read from a queue:
- qread()
- qbread()
- qget()
- qdiscard()

And I'll be adding more soon.

They all had slightly different semantics, but mostly did the same thing.
I consolidated them into one __qbread(), which returns a blocklist.  The
functions do whatever they want with the list.

To some extent, this will be a little slower than before.  Other than just
stuff like branching, qdiscard was able to just free as it went, instead of
transferring them to a new list (meaning it never needed the 'spare' block
for allocations).  Same goes for qread().

OTOH, qread was holding a spinlock and writing to user memory.  Not
anymore!  So that will save us from a source of complexity later.

We do have slightly different semantics now.  __qbread() will *attempt* to
get up to len, but might return early (with amt > 0) while there is more
data in the queue.  Some of the functions, like qdiscard(), can't handle
this.  I used a somewhat racy loop around __qbread() in that case.

There's now only one source of producer wakeup, where I preserved
Presotto's comment and change to the heuristic.  Who knows if that was
supposed to apply in other places, or if it was just added to the place
that popped up during profiling.  Now there's just one site.

A lot of QIO is an unclear mess.  Why does qbread return only one block?
(Note in #mnt that a dev.bread expects to get a blocklist back).  Why is a
blocklist different than a block in many places?   Good thing we have
blocklen() (length of a block list) and BLEN() (length of a block).   Why
are so many functions mucking with queues without locks?  (e.g. qaddlist())
Good times.

Signed-off-by: Barret Rhoden <brho@cs.berkeley.edu>
kern/drivers/dev/mnt.c
kern/include/ns.h
kern/src/ns/qio.c

index df3dd30..f1b98f0 100644 (file)
@@ -965,6 +965,9 @@ int mntrpcread(struct mnt *m, struct mntrpc *r)
                return -1;
        }
 
+       /* TODO: this should use a qio helper directly.  qputback should have the
+        * qlocked, but I guess we assume we're the only one using it. */
+
        /* hang the data off of the fcall struct */
        l = &r->b;
        *l = NULL;
index aadb2a0..8bcbe2c 100644 (file)
@@ -809,7 +809,7 @@ struct block *pullupqueue(struct queue *, int);
 void putmhead(struct mhead *);
 void putstrn(char *unused_char_p_t, int);
 void qaddlist(struct queue *, struct block *);
-struct block *qbread(struct queue *, int);
+struct block *qbread(struct queue *q, size_t len);
 ssize_t qbwrite(struct queue *, struct block *);
 ssize_t qibwrite(struct queue *q, struct block *b);
 struct queue *qbypass(void (*)(void *, struct block *), void *);
@@ -820,7 +820,7 @@ struct block *qclone(struct queue *q, int header_len, int len,
                      uint32_t offset);
 struct block *blist_clone(struct block *blist, int header_len, int len,
                           uint32_t offset);
-int qdiscard(struct queue *, int);
+size_t qdiscard(struct queue *q, size_t len);
 void qflush(struct queue *);
 void qfree(struct queue *);
 int qfull(struct queue *);
@@ -835,7 +835,7 @@ struct queue *qopen(int unused_int, int, void (*)(void *), void *);
 ssize_t qpass(struct queue *, struct block *);
 ssize_t qpassnolim(struct queue *, struct block *);
 void qputback(struct queue *, struct block *);
-long qread(struct queue *, void *, int);
+size_t qread(struct queue *q, void *va, size_t len);
 void qreopen(struct queue *);
 void qsetlimit(struct queue *, int);
 int qwindow(struct queue *);
index 33af779..a9a9c73 100644 (file)
@@ -95,11 +95,16 @@ enum {
        QIO_CAN_ERR_SLEEP = (1 << 0),   /* can throw errors or block/sleep */
        QIO_LIMIT = (1 << 1),                   /* respect q->limit */
        QIO_DROP_OVERFLOW = (1 << 2),   /* alternative to setting qdropoverflow */
+       QIO_JUST_ONE_BLOCK = (1 << 3),  /* when qbreading, just get one block */
 };
 
 unsigned int qiomaxatomic = Maxatomic;
 
 static ssize_t __qbwrite(struct queue *q, struct block *b, int flags);
+static struct block *__qbread(struct queue *q, size_t len, int qio_flags,
+                              int mem_flags);
+static bool qwait_and_ilock(struct queue *q);
+static void qwakeup_iunlock(struct queue *q);
 
 /* Helper: fires a wake callback, sending 'filter' */
 static void qwake_cb(struct queue *q, int filter)
@@ -614,122 +619,278 @@ struct block *adjustblock(struct block *bp, int len)
        return bp;
 }
 
-
-/*
- *  get next block from a queue, return null if nothing there
- */
-struct block *qget(struct queue *q)
+/* Helper: removes and returns the first block from q */
+static struct block *pop_first_block(struct queue *q)
 {
-       int dowakeup;
-       struct block *b;
+       struct block *b = q->bfirst;
 
-       /* sync with qwrite */
-       spin_lock_irqsave(&q->lock);
-
-       b = q->bfirst;
-       if (b == NULL) {
-               q->state |= Qstarve;
-               spin_unlock_irqsave(&q->lock);
-               return NULL;
-       }
-       q->bfirst = b->next;
-       b->next = 0;
        q->len -= BALLOC(b);
        q->dlen -= BLEN(b);
-       QDEBUG checkb(b, "qget");
+       q->bfirst = b->next;
+       b->next = 0;
+       return b;
+}
 
-       /* if writer flow controlled, restart */
-       if ((q->state & Qflow) && q->len < q->limit / 2) {
-               q->state &= ~Qflow;
-               dowakeup = 1;
-       } else
-               dowakeup = 0;
+/* Helper: copies up to copy_amt from a buf to a block's main body (b->wp) */
+static size_t copy_to_block_body(struct block *to, void *from, size_t copy_amt)
+{
+       copy_amt = MIN(to->lim - to->wp, copy_amt);
+       memcpy(to->wp, from, copy_amt);
+       to->wp += copy_amt;
+       return copy_amt;
+}
 
-       spin_unlock_irqsave(&q->lock);
+/* Accounting helper.  Block b in q lost amt extra_data */
+static void block_and_q_lost_extra(struct block *b, struct queue *q, size_t amt)
+{
+       b->extra_len -= amt;
+       q->len -= amt;
+       q->dlen -= amt;
+}
 
-       if (dowakeup)
-               rendez_wakeup(&q->wr);
-       qwake_cb(q, FDTAP_FILT_WRITABLE);
+/* Helper: moves ebd from a block (in from_q) to another block.  The *ebd is
+ * fixed in 'from', so we move its contents and zero it out in 'from'.
+ *
+ * Returns the length moved (0 on failure). */
+static size_t move_ebd(struct extra_bdata *ebd, struct block *to,
+                       struct block *from, struct queue *from_q)
+{
+       size_t ret = ebd->len;
 
-       return b;
+       if (block_append_extra(to, ebd->base, ebd->off, ebd->len, MEM_ATOMIC))
+               return 0;
+       block_and_q_lost_extra(from, from_q, ebd->len);
+       ebd->base = ebd->len = ebd->off = 0;
+       return ret;
 }
 
-/*
- *  throw away the next 'len' bytes in the queue
- * returning the number actually discarded
- */
-int qdiscard(struct queue *q, int len)
-{
-       struct block *b;
-       int dowakeup, n, sofar, body_amt, extra_amt;
+/* Copy up to len bytes from q->bfirst to @to, leaving the block in place.  May
+ * return with less than len, but greater than 0, even if there is more
+ * available in q.
+ *
+ * At any moment that we have copied anything and things are tricky, we can just
+ * return.  The trickiness comes from a bunch of variables: is the main body
+ * empty?  How do we split the ebd?  If our alloc fails, then we can fall back
+ * to @to's main body, but only if we haven't used it yet. */
+static size_t copy_from_first_block(struct queue *q, struct block *to,
+                                    size_t len)
+{
+       struct block *from = q->bfirst;
+       size_t copy_amt, amt;
        struct extra_bdata *ebd;
 
-       spin_lock_irqsave(&q->lock);
-       for (sofar = 0; sofar < len; sofar += n) {
-               b = q->bfirst;
-               if (b == NULL)
-                       break;
-               QDEBUG checkb(b, "qdiscard");
-               n = BLEN(b);
-               if (n <= len - sofar) {
-                       q->bfirst = b->next;
-                       b->next = 0;
-                       q->len -= BALLOC(b);
-                       q->dlen -= BLEN(b);
-                       freeb(b);
-               } else {
-                       n = len - sofar;
-                       q->dlen -= n;
-                       /* partial block removal */
-                       body_amt = MIN(BHLEN(b), n);
-                       b->rp += body_amt;
-                       extra_amt = n - body_amt;
-                       /* reduce q->len by the amount we remove from the extras.  The
-                        * header will always be accounted for above, during block removal.
-                        * */
-                       q->len -= extra_amt;
-                       for (int i = 0; (i < b->nr_extra_bufs) && extra_amt; i++) {
-                               ebd = &b->extra_data[i];
-                               if (!ebd->base || !ebd->len)
-                                       continue;
-                               if (extra_amt >= ebd->len) {
-                                       /* remove the entire entry, note the kfree release */
-                                       b->extra_len -= ebd->len;
-                                       extra_amt -= ebd->len;
-                                       kfree((void*)ebd->base);
-                                       ebd->base = ebd->off = ebd->len = 0;
-                                       continue;
-                               }
-                               ebd->off += extra_amt;
-                               ebd->len -= extra_amt;
-                               b->extra_len -= extra_amt;
-                               extra_amt = 0;
+       assert(len < BLEN(from));       /* sanity */
+       /* Try to extract from the main body */
+       copy_amt = MIN(BHLEN(from), len);
+       if (copy_amt) {
+               copy_amt = copy_to_block_body(to, from->rp, copy_amt);
+               from->rp += copy_amt;
+               /* We only change dlen, (data len), not q->len, since the q still has
+                * the same block memory allocation (no kfrees happened) */
+               q->dlen -= copy_amt;
+       }
+       /* Try to extract the remainder from the extra data */
+       len -= copy_amt;
+       for (int i = 0; (i < from->nr_extra_bufs) && len; i++) {
+               ebd = &from->extra_data[i];
+               if (!ebd->base || !ebd->len)
+                       continue;
+               if (len >= ebd->len) {
+                       amt = move_ebd(ebd, to, from, q);
+                       if (!amt) {
+                               /* our internal alloc could have failed.   this ebd is now the
+                                * last one we'll consider.  let's handle it separately and put
+                                * it in the main body. */
+                               if (copy_amt)
+                                       return copy_amt;
+                               copy_amt = copy_to_block_body(to, (void*)ebd->base + ebd->off,
+                                                             ebd->len);
+                               block_and_q_lost_extra(from, q, copy_amt);
+                               break;
                        }
+                       len -= amt;
+                       copy_amt += amt;
+                       continue;
+               } else {
+                       /* If we're here, we reached our final ebd, which we'll need to
+                        * split to get anything from it. */
+                       if (copy_amt)
+                               return copy_amt;
+                       copy_amt = copy_to_block_body(to, (void*)ebd->base + ebd->off,
+                                                     len);
+                       ebd->off += copy_amt;
+                       ebd->len -= copy_amt;
+                       block_and_q_lost_extra(from, q, copy_amt);
+                       break;
                }
        }
+       if (len)
+               assert(copy_amt);       /* sanity */
+       return copy_amt;
+}
 
-       /*
-        *  if writer flow controlled, restart
-        *
-        *  This used to be
-        *  q->len < q->limit/2
-        *  but it slows down tcp too much for certain write sizes.
-        *  I really don't understand it completely.  It may be
-        *  due to the queue draining so fast that the transmission
-        *  stalls waiting for the app to produce more data.  - presotto
-        */
-       if ((q->state & Qflow) && q->len < q->limit) {
-               q->state &= ~Qflow;
-               dowakeup = 1;
-       } else
-               dowakeup = 0;
+/* Return codes for __qbread and __try_qbread. */
+enum {
+       QBR_OK,
+       QBR_FAIL,
+       QBR_SPARE,      /* we need a spare block */
+       QBR_AGAIN,      /* do it again, we are coalescing blocks */
+};
 
-       spin_unlock_irqsave(&q->lock);
+/* Helper and back-end for __qbread: extracts and returns a list of blocks
+ * containing up to len bytes.  It may contain less than len even if q has more
+ * data.
+ *
+ * Returns a code interpreted by __qbread, and the returned blist in ret. */
+static int __try_qbread(struct queue *q, size_t len, int qio_flags,
+                        struct block **real_ret, struct block *spare)
+{
+       struct block *ret, *ret_last, *first;
+       size_t blen;
 
-       if (dowakeup)
-               rendez_wakeup(&q->wr);
-       qwake_cb(q, FDTAP_FILT_WRITABLE);
+       if (qio_flags & QIO_CAN_ERR_SLEEP) {
+               if (!qwait_and_ilock(q)) {
+                       spin_unlock_irqsave(&q->lock);
+                       return QBR_FAIL;
+               }
+               /* we qwaited and still hold the lock, so the q is not empty */
+               first = q->bfirst;
+       } else {
+               spin_lock_irqsave(&q->lock);
+               first = q->bfirst;
+               if (!first) {
+                       spin_unlock_irqsave(&q->lock);
+                       return QBR_FAIL;
+               }
+       }
+       blen = BLEN(first);
+       if ((q->state & Qcoalesce) && (blen == 0)) {
+               freeb(pop_first_block(q));
+               spin_unlock_irqsave(&q->lock);
+               /* Need to retry to make sure we have a first block */
+               return QBR_AGAIN;
+       }
+       /* Qmsg is a bit weird.  The old 9ns code seemed to yank the entire block,
+        * regardless of len.  We'll do the same, and just return the minimum: the
+        * first block.  I'd be happy to remove this. */
+       if (q->state & Qmsg) {
+               ret = pop_first_block(q);
+               goto out_ok;
+       }
+       /* Let's get at least something first - makes the code easier.  This way,
+        * we'll only ever split the block once. */
+       if (blen <= len) {
+               ret = pop_first_block(q);
+               len -= blen;
+       } else {
+               /* need to split the block.  we won't actually take the first block out
+                * of the queue - we're just extracting a little bit. */
+               if (!spare) {
+                       /* We have nothing and need a spare block.  Retry! */
+                       spin_unlock_irqsave(&q->lock);
+                       return QBR_SPARE;
+               }
+               copy_from_first_block(q, spare, len);
+               ret = spare;
+               goto out_ok;
+       }
+       /* At this point, we just grabbed the first block.  We can try to grab some
+        * more, up to len (if they want). */
+       if (qio_flags & QIO_JUST_ONE_BLOCK)
+               goto out_ok;
+       ret_last = ret;
+       while (q->bfirst && (len > 0)) {
+               blen = BLEN(q->bfirst);
+               if ((q->state & Qcoalesce) && (blen == 0)) {
+                       /* remove the intermediate 0 blocks */
+                       freeb(pop_first_block(q));
+                       continue;
+               }
+               if (blen > len) {
+                       /* We could try to split the block, but that's a huge pain.  For
+                        * instance, we might need to move the main body of b into an
+                        * extra_data of ret_last.  lots of ways for that to fail, and lots
+                        * of cases to consider.  Easier to just bail out.  This is why I
+                        * did the first block above: we don't need to worry about this. */
+                        break;
+               }
+               ret_last->next = pop_first_block(q);
+               ret_last = ret_last->next;
+               len -= blen;
+       }
+out_ok:
+       qwakeup_iunlock(q);
+       *real_ret = ret;
+       return QBR_OK;
+}
+
+/* Helper and front-end for __try_qbread: extracts and returns a list of blocks
+ * containing up to len bytes.  It may contain less than len even if q has more
+ * data.
+ *
+ * Returns 0 if the q is closed or would require blocking and !CAN_BLOCK.
+ *
+ * Technically, there's a weird corner case with !Qcoalesce and Qmsg where you
+ * could get a zero length block back. */
+static struct block *__qbread(struct queue *q, size_t len, int qio_flags,
+                              int mem_flags)
+{
+       struct block *ret = 0;
+       struct block *spare = 0;
+
+       while (1) {
+               switch (__try_qbread(q, len, qio_flags, &ret, spare)) {
+               case QBR_OK:
+               case QBR_FAIL:
+                       if (spare && (ret != spare))
+                               freeb(spare);
+                       return ret;
+               case QBR_SPARE:
+                       assert(!spare);
+                       /* Due to some nastiness, we need a fresh block so we can read out
+                        * anything from the queue.  'len' seems like a reasonable amount.
+                        * Maybe we can get away with less. */
+                       spare = block_alloc(len, mem_flags);
+                       if (!spare)
+                               return 0;
+                       break;
+               case QBR_AGAIN:
+                       /* if the first block is 0 and we are Qcoalesce, then we'll need to
+                        * try again.  We bounce out of __try so we can perform the "is
+                        * there a block" logic again from the top. */
+                       break;
+               }
+       }
+}
+
+/*
+ *  get next block from a queue, return null if nothing there
+ */
+struct block *qget(struct queue *q)
+{
+       /* since len == SIZE_MAX, we should never need to do a mem alloc */
+       return __qbread(q, SIZE_MAX, QIO_JUST_ONE_BLOCK, MEM_ATOMIC);
+}
 
+/* Throw away the next 'len' bytes in the queue returning the number actually
+ * discarded.
+ *
+ * If the bytes are in the queue, then they must be discarded.  The only time to
+ * return less than len is if the q itself has less than len bytes.  */
+size_t qdiscard(struct queue *q, size_t len)
+{
+       struct block *blist;
+       size_t removed_amt;
+       size_t sofar = 0;
+
+       /* This is racy.  There could be multiple qdiscarders or other consumers,
+        * where the consumption could be interleaved. */
+       while (qlen(q) && len) {
+               blist = __qbread(q, len, 0, MEM_WAIT);
+               removed_amt = freeblist(blist);
+               sofar += removed_amt;
+               len -= removed_amt;
+       }
        return sofar;
 }
 
@@ -1089,24 +1250,6 @@ void qaddlist(struct queue *q, struct block *b)
        q->blast = b;
 }
 
-/*
- *  called with q ilocked
- */
-static struct block *qremove(struct queue *q)
-{
-       struct block *b;
-
-       b = q->bfirst;
-       if (b == NULL)
-               return NULL;
-       q->bfirst = b->next;
-       b->next = NULL;
-       q->dlen -= BLEN(b);
-       q->len -= BALLOC(b);
-       QDEBUG checkb(b, "qremove");
-       return b;
-}
-
 static size_t read_from_block(struct block *b, uint8_t *to, size_t amt)
 {
        size_t copy_amt, retval = 0;
@@ -1174,6 +1317,24 @@ struct block *bl2mem(uint8_t * p, struct block *b, int n)
        return NULL;
 }
 
+/* Extract the contents of all blocks and copy to va, up to len.  Returns the
+ * actual amount copied. */
+static size_t read_all_blocks(struct block *b, void *va, size_t len)
+{
+       size_t sofar = 0;
+       struct block *next;
+
+       do {
+               /* We should be draining every block completely. */
+               assert(BLEN(b) <= len - sofar);
+               sofar += read_from_block(b, va + sofar, len - sofar);
+               next = b->next;
+               freeb(b);
+               b = next;
+       } while (b);
+       return sofar;
+}
+
 /*
  *  copy the contents of memory into a string of blocks.
  *  return NULL on error.
@@ -1230,8 +1391,17 @@ static void qwakeup_iunlock(struct queue *q)
 {
        int dowakeup = 0;
 
-       /* if writer flow controlled, restart */
-       if ((q->state & Qflow) && q->len < q->limit / 2) {
+       /*
+        *  if writer flow controlled, restart
+        *
+        *  This used to be
+        *  q->len < q->limit/2
+        *  but it slows down tcp too much for certain write sizes.
+        *  I really don't understand it completely.  It may be
+        *  due to the queue draining so fast that the transmission
+        *  stalls waiting for the app to produce more data.  - presotto
+        */
+       if ((q->state & Qflow) && q->len < q->limit) {
                q->state &= ~Qflow;
                dowakeup = 1;
        }
@@ -1249,107 +1419,21 @@ static void qwakeup_iunlock(struct queue *q)
 
 /*
  *  get next block from a queue (up to a limit)
+ *
  */
-struct block *qbread(struct queue *q, int len)
+struct block *qbread(struct queue *q, size_t len)
 {
-       struct block *b, *nb;
-       int n;
-
-       if (!qwait_and_ilock(q)) {
-               /* queue closed */
-               spin_unlock_irqsave(&q->lock);
-               return NULL;
-       }
-
-       /* if we get here, there's at least one block in the queue */
-       b = qremove(q);
-       n = BLEN(b);
-
-       /* split block if it's too big and this is not a message queue */
-       nb = b;
-       if (n > len) {
-               PANIC_EXTRA(b);
-               if ((q->state & Qmsg) == 0) {
-                       n -= len;
-                       b = block_alloc(n, MEM_WAIT);
-                       memmove(b->wp, nb->rp + len, n);
-                       b->wp += n;
-                       qputback(q, b);
-               }
-               nb->wp = nb->rp + len;
-       }
-
-       /* restart producer */
-       qwakeup_iunlock(q);
-
-       return nb;
+       return __qbread(q, len, QIO_JUST_ONE_BLOCK | QIO_CAN_ERR_SLEEP, MEM_WAIT);
 }
 
-/*
- *  read a queue.  if no data is queued, post a struct block
- *  and wait on its Rendez.
- */
-long qread(struct queue *q, void *vp, int len)
+/* read up to len from a queue into vp. */
+size_t qread(struct queue *q, void *va, size_t len)
 {
-       struct block *b, *first, **l;
-       int m, n;
+       struct block *blist = __qbread(q, len, QIO_CAN_ERR_SLEEP, MEM_WAIT);
 
-again:
-       if (!qwait_and_ilock(q)) {
-               /* queue closed */
-               spin_unlock_irqsave(&q->lock);
+       if (!blist)
                return 0;
-       }
-
-       /* if we get here, there's at least one block in the queue */
-       // TODO: Consider removing the Qcoalesce flag and force a coalescing
-       // strategy by default.
-       if (q->state & Qcoalesce) {
-               /* when coalescing, 0 length blocks just go away */
-               b = q->bfirst;
-               if (BLEN(b) <= 0) {
-                       freeb(qremove(q));
-                       spin_unlock_irqsave(&q->lock);
-                       goto again;
-               }
-
-               /*  grab the first block plus as many
-                *  following blocks as will completely
-                *  fit in the read.
-                */
-               n = 0;
-               l = &first;
-               m = BLEN(b);
-               for (;;) {
-                       *l = qremove(q);
-                       l = &b->next;
-                       n += m;
-
-                       b = q->bfirst;
-                       if (b == NULL)
-                               break;
-                       m = BLEN(b);
-                       if (n + m > len)
-                               break;
-               }
-       } else {
-               first = qremove(q);
-               n = BLEN(first);
-       }
-       b = bl2mem(vp, first, len);
-       /* take care of any left over partial block */
-       if (b != NULL) {
-               n -= BLEN(b);
-               if (q->state & Qmsg)
-                       freeb(b);
-               else
-                       qputback(q, b);
-       }
-
-       /* restart producer */
-       qwakeup_iunlock(q);
-
-       return n;
+       return read_all_blocks(blist, va, len);
 }
 
 static int qnotfull(void *a)