qio: Consolidate producer functions
authorBarret Rhoden <brho@cs.berkeley.edu>
Fri, 25 Mar 2016 22:11:36 +0000 (18:11 -0400)
committerBarret Rhoden <brho@cs.berkeley.edu>
Thu, 31 Mar 2016 20:53:42 +0000 (16:53 -0400)
There are a bunch of qio functions for adding to a queue:
- qbwrite (append a single block)
- qibwrite (append a single block from IRQ ctx)
- qwrite (wrapper and calls qbwrite)
- qiwrite (mostly the same wrapper, calls qibwrite)
- qpass (append a string of blocks)
- qpassnolim (same, but with no limit)

Anyway, all of these functions do very similar things, but with a few
options.  Now all of those functions call the same underlying function
(with the same front-wrapper for qwrite/qiwrite), subject to a few flags.

There are some subtle changes.  qpass didn't call kick or bypass before.
Although I could control that with a QIO flag, it seems like if someone
wanted a bypass, then they should always get it.

Of course, kick and bypass seem rather special purpose, and might just need
overhauled at some point.

Part of the motivation for this, other than understandability and ease of
maintenance, was that I'll be adding more QIO functions to control whether
or not we block on an individual call (think chan->flag & O_NONBLOCK).

Signed-off-by: Barret Rhoden <brho@cs.berkeley.edu>
kern/include/ns.h
kern/src/ns/qio.c

index fa6fdc4..26844be 100644 (file)
@@ -810,8 +810,8 @@ void putmhead(struct mhead *);
 void putstrn(char *unused_char_p_t, int);
 void qaddlist(struct queue *, struct block *);
 struct block *qbread(struct queue *, int);
-long qbwrite(struct queue *, struct block *);
-long qibwrite(struct queue *q, struct block *b);
+ssize_t qbwrite(struct queue *, struct block *);
+ssize_t qibwrite(struct queue *q, struct block *b);
 struct queue *qbypass(void (*)(void *, struct block *), void *);
 int qcanread(struct queue *);
 void qclose(struct queue *);
@@ -828,20 +828,20 @@ int qfull(struct queue *);
 struct block *qget(struct queue *);
 void qhangup(struct queue *, char *unused_char_p_t);
 int qisclosed(struct queue *);
-int qiwrite(struct queue *, void *, int);
+ssize_t qiwrite(struct queue *, void *, int);
 int qlen(struct queue *);
 void qdropoverflow(struct queue *, bool);
 void qnonblock(struct queue *, bool);
 struct queue *qopen(int unused_int, int, void (*)(void *), void *);
-int qpass(struct queue *, struct block *);
-int qpassnolim(struct queue *, struct block *);
+ssize_t qpass(struct queue *, struct block *);
+ssize_t qpassnolim(struct queue *, struct block *);
 void qputback(struct queue *, struct block *);
 long qread(struct queue *, void *, int);
 struct block *qremove(struct queue *);
 void qreopen(struct queue *);
 void qsetlimit(struct queue *, int);
 int qwindow(struct queue *);
-int qwrite(struct queue *, void *, int);
+ssize_t qwrite(struct queue *, void *, int);
 typedef void (*qio_wake_cb_t)(struct queue *q, void *data, int filter);
 void qio_set_wake_cb(struct queue *q, qio_wake_cb_t func, void *data);
 
index 08fbc07..86a7b95 100644 (file)
@@ -92,10 +92,15 @@ struct queue {
 
 enum {
        Maxatomic = 64 * 1024,
+       QIO_CAN_ERR_SLEEP = (1 << 0),   /* can throw errors or block/sleep */
+       QIO_LIMIT = (1 << 1),                   /* respect q->limit */
+       QIO_DROP_OVERFLOW = (1 << 2),   /* alternative to setting qdropoverflow */
 };
 
 unsigned int qiomaxatomic = Maxatomic;
 
+static ssize_t __qbwrite(struct queue *q, struct block *b, int flags);
+
 /* Helper: fires a wake callback, sending 'filter' */
 static void qwake_cb(struct queue *q, int filter)
 {
@@ -812,111 +817,14 @@ int qconsume(struct queue *q, void *vp, int len)
        return len;
 }
 
-int qpass(struct queue *q, struct block *b)
+ssize_t qpass(struct queue *q, struct block *b)
 {
-       int dlen, len, dowakeup;
-       bool was_empty;
-
-       /* sync with qread */
-       dowakeup = 0;
-       spin_lock_irqsave(&q->lock);
-       was_empty = q->len == 0;
-       if (q->len >= q->limit) {
-               freeblist(b);
-               spin_unlock_irqsave(&q->lock);
-               return -1;
-       }
-       if (q->state & Qclosed) {
-               len = blocklen(b);
-               freeblist(b);
-               spin_unlock_irqsave(&q->lock);
-               return len;
-       }
-
-       /* add buffer to queue */
-       if (q->bfirst)
-               q->blast->next = b;
-       else
-               q->bfirst = b;
-       len = BALLOC(b);
-       dlen = BLEN(b);
-       QDEBUG checkb(b, "qpass");
-       while (b->next) {
-               b = b->next;
-               QDEBUG checkb(b, "qpass");
-               len += BALLOC(b);
-               dlen += BLEN(b);
-       }
-       q->blast = b;
-       q->len += len;
-       q->dlen += dlen;
-
-       if (q->len >= q->limit / 2)
-               q->state |= Qflow;
-
-       if (q->state & Qstarve) {
-               q->state &= ~Qstarve;
-               dowakeup = 1;
-       }
-       spin_unlock_irqsave(&q->lock);
-
-       if (dowakeup)
-               rendez_wakeup(&q->rr);
-       if (was_empty)
-               qwake_cb(q, FDTAP_FILT_READABLE);
-
-       return len;
+       return __qbwrite(q, b, QIO_LIMIT | QIO_DROP_OVERFLOW);
 }
 
-int qpassnolim(struct queue *q, struct block *b)
+ssize_t qpassnolim(struct queue *q, struct block *b)
 {
-       int dlen, len, dowakeup;
-       bool was_empty;
-
-       /* sync with qread */
-       dowakeup = 0;
-       spin_lock_irqsave(&q->lock);
-       was_empty = q->len == 0;
-
-       if (q->state & Qclosed) {
-               freeblist(b);
-               spin_unlock_irqsave(&q->lock);
-               return BALLOC(b);
-       }
-
-       /* add buffer to queue */
-       if (q->bfirst)
-               q->blast->next = b;
-       else
-               q->bfirst = b;
-       len = BALLOC(b);
-       dlen = BLEN(b);
-       QDEBUG checkb(b, "qpass");
-       while (b->next) {
-               b = b->next;
-               QDEBUG checkb(b, "qpass");
-               len += BALLOC(b);
-               dlen += BLEN(b);
-       }
-       q->blast = b;
-       q->len += len;
-       q->dlen += dlen;
-
-       if (q->len >= q->limit / 2)
-               q->state |= Qflow;
-
-       if (q->state & Qstarve) {
-               q->state &= ~Qstarve;
-               dowakeup = 1;
-       }
-       spin_unlock_irqsave(&q->lock);
-
-       if (dowakeup)
-               rendez_wakeup(&q->rr);
-       if (was_empty)
-               qwake_cb(q, FDTAP_FILT_READABLE);
-
-       return len;
+       return __qbwrite(q, b, 0);
 }
 
 /*
@@ -1535,83 +1443,82 @@ static int qnotfull(void *a)
        return q->len < q->limit || (q->state & Qclosed);
 }
 
-uint32_t dropcnt;
+/* Helper: enqueues a list of blocks to a queue.  Returns the total length. */
+static size_t enqueue_blist(struct queue *q, struct block *b)
+{
+       size_t len, dlen;
 
-/*
- *  add a block to a queue obeying flow control
- */
-long qbwrite(struct queue *q, struct block *b)
+       if (q->bfirst)
+               q->blast->next = b;
+       else
+               q->bfirst = b;
+       len = BALLOC(b);
+       dlen = BLEN(b);
+       while (b->next) {
+               b = b->next;
+               len += BALLOC(b);
+               dlen += BLEN(b);
+       }
+       q->blast = b;
+       q->len += len;
+       q->dlen += dlen;
+       return dlen;
+}
+
+/* Adds block (which can be a list of blocks) to the queue, subject to
+ * qio_flags.  Returns the length written on success or -1 on non-throwable
+ * error.  Adjust qio_flags to control the value-added features!. */
+static ssize_t __qbwrite(struct queue *q, struct block *b, int qio_flags)
 {
-       int n, dowakeup;
+       ssize_t ret;
+       bool dowakeup = FALSE;
        bool was_empty;
 
-       n = BLEN(b);
-
        if (q->bypass) {
                (*q->bypass) (q->arg, b);
-               return n;
+               return blocklen(b);
        }
-
-       dowakeup = 0;
-
        spin_lock_irqsave(&q->lock);
        was_empty = q->len == 0;
-
-       /* give up if the queue is closed */
        if (q->state & Qclosed) {
                spin_unlock_irqsave(&q->lock);
-               freeb(b);
+               freeblist(b);
+               if (!(qio_flags & QIO_CAN_ERR_SLEEP))
+                       return -1;
                if (q->err[0])
                        error(EFAIL, q->err);
                else
                        error(EFAIL, "connection closed");
        }
-
-       /* if nonblocking, don't queue over the limit */
-       if (q->len >= q->limit) {
+       if ((qio_flags & QIO_LIMIT) && (q->len >= q->limit)) {
                /* drop overflow takes priority over regular non-blocking */
-               if (q->state & Qdropoverflow) {
+               if ((qio_flags & QIO_DROP_OVERFLOW) || (q->state & Qdropoverflow)) {
                        spin_unlock_irqsave(&q->lock);
                        freeb(b);
-                       dropcnt += n;
-                       return n;
+                       return -1;
                }
-               if (q->state & Qnonblock) {
+               if ((qio_flags & QIO_CAN_ERR_SLEEP) && (q->state & Qnonblock)) {
                        spin_unlock_irqsave(&q->lock);
                        freeb(b);
                        error(EAGAIN, "queue full");
                }
        }
-
-       /* queue the block */
-       if (q->bfirst)
-               q->blast->next = b;
-       else
-               q->bfirst = b;
-       q->blast = b;
-       b->next = 0;
-       q->len += BALLOC(b);
-       q->dlen += n;
-       QDEBUG checkb(b, "qbwrite");
-       b = NULL;
-
+       ret = enqueue_blist(q, b);
+       QDEBUG checkb(b, "__qbwrite");
        /* make sure other end gets awakened */
        if (q->state & Qstarve) {
                q->state &= ~Qstarve;
-               dowakeup = 1;
+               dowakeup = TRUE;
        }
        spin_unlock_irqsave(&q->lock);
-
-       /*  get output going again */
+       /* TODO: not sure if the usage of a kick is mutually exclusive with a
+        * wakeup, meaning that actual users either want a kick or have qreaders. */
        if (q->kick && (dowakeup || (q->state & Qkick)))
                q->kick(q->arg);
-
-       /* wakeup anyone consuming at the other end */
        if (dowakeup)
                rendez_wakeup(&q->rr);
        if (was_empty)
                qwake_cb(q, FDTAP_FILT_READABLE);
-
        /*
         *  flow control, wait for queue to get below the limit
         *  before allowing the process to continue and queue
@@ -1624,140 +1531,112 @@ long qbwrite(struct queue *q, struct block *b)
         *  that keeps getting interrupted and rewriting will
         *  queue infinite crud.
         */
-       for (;;) {
-               if ((q->state & (Qdropoverflow | Qnonblock)) || qnotfull(q))
-                       break;
-
-               spin_lock_irqsave(&q->lock);
-               q->state |= Qflow;
-               spin_unlock_irqsave(&q->lock);
-               rendez_sleep(&q->wr, qnotfull, q);
+       if ((qio_flags & QIO_CAN_ERR_SLEEP) &&
+           !(q->state & (Qdropoverflow | Qnonblock))) {
+               /* This is a racy peek at the q status.  If we accidentally block, we
+                * set Qflow, so someone should wake us.  If we accidentally don't
+                * block, we just returned to the user and let them slip a block past
+                * flow control. */
+               while (!qnotfull(q)) {
+                       spin_lock_irqsave(&q->lock);
+                       q->state |= Qflow;
+                       spin_unlock_irqsave(&q->lock);
+                       rendez_sleep(&q->wr, qnotfull, q);
+               }
        }
-
-       return n;
+       return ret;
 }
 
-long qibwrite(struct queue *q, struct block *b)
+/*
+ *  add a block to a queue obeying flow control
+ */
+ssize_t qbwrite(struct queue *q, struct block *b)
 {
-       int n, dowakeup;
-       bool was_empty;
-
-       dowakeup = 0;
-
-       n = BLEN(b);
-
-       spin_lock_irqsave(&q->lock);
-       was_empty = q->len == 0;
-
-       QDEBUG checkb(b, "qibwrite");
-       if (q->bfirst)
-               q->blast->next = b;
-       else
-               q->bfirst = b;
-       q->blast = b;
-       q->len += BALLOC(b);
-       q->dlen += n;
-
-       if (q->state & Qstarve) {
-               q->state &= ~Qstarve;
-               dowakeup = 1;
-       }
-
-       spin_unlock_irqsave(&q->lock);
-
-       if (dowakeup) {
-               if (q->kick)
-                       q->kick(q->arg);
-               rendez_wakeup(&q->rr);
-       }
-       if (was_empty)
-               qwake_cb(q, FDTAP_FILT_READABLE);
+       return __qbwrite(q, b, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
+}
 
-       return n;
+ssize_t qibwrite(struct queue *q, struct block *b)
+{
+       return __qbwrite(q, b, 0);
 }
 
-/*
- *  write to a queue.  only Maxatomic bytes at a time is atomic.
- */
-int qwrite(struct queue *q, void *vp, int len)
+/* Helper, allocs a block and copies [from, from + len) into it.  Returns the
+ * block on success, 0 on failure. */
+static struct block *build_block(void *from, size_t len, int mem_flags)
 {
-       int n, sofar;
        struct block *b;
-       uint8_t *p = vp;
        void *ext_buf;
 
-       QDEBUG if (!islo())
-                printd("qwrite hi %p\n", getcallerpc(&q));
-
-       sofar = 0;
-       do {
-               n = len - sofar;
-               /* This is 64K, the max amount per single block.  Still a good value? */
-               if (n > Maxatomic)
-                       n = Maxatomic;
-
-               /* If n is small, we don't need to bother with the extra_data.  But
-                * until the whole stack can handle extd blocks, we'll use them
-                * unconditionally. */
+       /* If len is small, we don't need to bother with the extra_data.  But until
+        * the whole stack can handle extd blocks, we'll use them unconditionally.
+        * */
 #ifdef CONFIG_BLOCK_EXTRAS
-               /* allocb builds in 128 bytes of header space to all blocks, but this is
-                * only available via padblock (to the left).  we also need some space
-                * for pullupblock for some basic headers (like icmp) that get written
-                * in directly */
-               b = block_alloc(64, MEM_WAIT);
-               ext_buf = kmalloc(n, 0);
-               memcpy(ext_buf, p + sofar, n);
-               block_add_extd(b, 1, MEM_WAIT); /* returns 0 on success */
-               b->extra_data[0].base = (uintptr_t)ext_buf;
-               b->extra_data[0].off = 0;
-               b->extra_data[0].len = n;
-               b->extra_len += n;
+       /* allocb builds in 128 bytes of header space to all blocks, but this is
+        * only available via padblock (to the left).  we also need some space
+        * for pullupblock for some basic headers (like icmp) that get written
+        * in directly */
+       b = block_alloc(64, mem_flags);
+       if (!b)
+               return 0;
+       ext_buf = kmalloc(len, mem_flags);
+       if (!ext_buf) {
+               kfree(b);
+               return 0;
+       }
+       memcpy(ext_buf, from, len);
+       if (block_add_extd(b, 1, mem_flags)) {
+               kfree(ext_buf);
+               kfree(b);
+               return 0;
+       }
+       b->extra_data[0].base = (uintptr_t)ext_buf;
+       b->extra_data[0].off = 0;
+       b->extra_data[0].len = len;
+       b->extra_len += len;
 #else
-               b = block_alloc(n, MEM_WAIT);
-               memmove(b->wp, p + sofar, n);
-               b->wp += n;
+       b = block_alloc(n, mem_flags);
+       if (!b)
+               return 0;
+       memmove(b->wp, from, len);
+       b->wp += len;
 #endif
-                       
-               qbwrite(q, b);
-
-               sofar += n;
-       } while (sofar < len && (q->state & Qmsg) == 0);
-
-       return len;
+       return b;
 }
 
-/*
- *  used by print() to write to a queue.  Since we may be splhi or not in
- *  a process, don't qlock.
- */
-int qiwrite(struct queue *q, void *vp, int len)
+static ssize_t __qwrite(struct queue *q, void *vp, size_t len, int mem_flags,
+                        int qio_flags)
 {
-       int n, sofar;
+       size_t n, sofar;
        struct block *b;
        uint8_t *p = vp;
+       void *ext_buf;
 
        sofar = 0;
        do {
                n = len - sofar;
+               /* This is 64K, the max amount per single block.  Still a good value? */
                if (n > Maxatomic)
                        n = Maxatomic;
-
-               b = block_alloc(n, MEM_ATOMIC);
-               if (b == NULL)
+               b = build_block(p + sofar, n, mem_flags);
+               if (!b)
+                       break;
+               if (__qbwrite(q, b, qio_flags) < 0)
                        break;
-               /* TODO consider extra_data */
-               memmove(b->wp, p + sofar, n);
-               /* this adjusts BLEN to be n, or at least it should */
-               b->wp += n;
-               assert(n == BLEN(b));
-               qibwrite(q, b);
-
                sofar += n;
-       } while (sofar < len && (q->state & Qmsg) == 0);
-
+       } while ((sofar < len) && (q->state & Qmsg) == 0);
        return sofar;
 }
 
+ssize_t qwrite(struct queue *q, void *vp, int len)
+{
+       return __qwrite(q, vp, len, MEM_WAIT, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
+}
+
+ssize_t qiwrite(struct queue *q, void *vp, int len)
+{
+       return __qwrite(q, vp, len, MEM_ATOMIC, 0);
+}
+
 /*
  *  be extremely careful when calling this,
  *  as there is no reference accounting