qio: Provide helpers for O_NONBLOCK operations
authorBarret Rhoden <brho@cs.berkeley.edu>
Wed, 30 Mar 2016 20:49:29 +0000 (16:49 -0400)
committerBarret Rhoden <brho@cs.berkeley.edu>
Tue, 5 Apr 2016 19:42:18 +0000 (15:42 -0400)
We can now q{b,}{read,write}() in a non-blocking manner, such that each
operation is non-blocking, independently of the rest of the queue.

The old qnonblock style was setting the entire queue to be non-blocking.
This would be a big surprise to the distant end of the queue.  We didn't
notice it with #ip, since the other end of the queue was usually doing a
non-blocking op anyways (or was just buggy!).  We definitely noticed with
pipe, especially when busybox suddenly had its read of stdin throw EAGAIN.

Signed-off-by: Barret Rhoden <brho@cs.berkeley.edu>
kern/include/ns.h
kern/src/ns/qio.c

index 8bcbe2c..88f5dc2 100644 (file)
@@ -810,7 +810,9 @@ void putmhead(struct mhead *);
 void putstrn(char *unused_char_p_t, int);
 void qaddlist(struct queue *, struct block *);
 struct block *qbread(struct queue *q, size_t len);
+struct block *qbread_nonblock(struct queue *q, size_t len);
 ssize_t qbwrite(struct queue *, struct block *);
+ssize_t qbwrite_nonblock(struct queue *, struct block *);
 ssize_t qibwrite(struct queue *q, struct block *b);
 struct queue *qbypass(void (*)(void *, struct block *), void *);
 int qcanread(struct queue *);
@@ -836,10 +838,12 @@ ssize_t qpass(struct queue *, struct block *);
 ssize_t qpassnolim(struct queue *, struct block *);
 void qputback(struct queue *, struct block *);
 size_t qread(struct queue *q, void *va, size_t len);
+size_t qread_nonblock(struct queue *q, void *va, size_t len);
 void qreopen(struct queue *);
 void qsetlimit(struct queue *, int);
 int qwindow(struct queue *);
 ssize_t qwrite(struct queue *, void *, int);
+ssize_t qwrite_nonblock(struct queue *, void *, int);
 typedef void (*qio_wake_cb_t)(struct queue *q, void *data, int filter);
 void qio_set_wake_cb(struct queue *q, qio_wake_cb_t func, void *data);
 
index a9a9c73..b1c6b7c 100644 (file)
@@ -96,6 +96,7 @@ enum {
        QIO_LIMIT = (1 << 1),                   /* respect q->limit */
        QIO_DROP_OVERFLOW = (1 << 2),   /* alternative to setting qdropoverflow */
        QIO_JUST_ONE_BLOCK = (1 << 3),  /* when qbreading, just get one block */
+       QIO_NON_BLOCK = (1 << 4),               /* throw EAGAIN instead of blocking */
 };
 
 unsigned int qiomaxatomic = Maxatomic;
@@ -103,7 +104,7 @@ unsigned int qiomaxatomic = Maxatomic;
 static ssize_t __qbwrite(struct queue *q, struct block *b, int flags);
 static struct block *__qbread(struct queue *q, size_t len, int qio_flags,
                               int mem_flags);
-static bool qwait_and_ilock(struct queue *q);
+static bool qwait_and_ilock(struct queue *q, int qio_flags);
 static void qwakeup_iunlock(struct queue *q);
 
 /* Helper: fires a wake callback, sending 'filter' */
@@ -749,7 +750,7 @@ static int __try_qbread(struct queue *q, size_t len, int qio_flags,
        size_t blen;
 
        if (qio_flags & QIO_CAN_ERR_SLEEP) {
-               if (!qwait_and_ilock(q)) {
+               if (!qwait_and_ilock(q, qio_flags)) {
                        spin_unlock_irqsave(&q->lock);
                        return QBR_FAIL;
                }
@@ -1202,7 +1203,7 @@ static int notempty(void *a)
 /* Block, waiting for the queue to be non-empty or closed.  Returns with
  * the spinlock held.  Returns TRUE when there queue is not empty, FALSE if it
  * was naturally closed.  Throws an error o/w. */
-static bool qwait_and_ilock(struct queue *q)
+static bool qwait_and_ilock(struct queue *q, int qio_flags)
 {
        while (1) {
                spin_lock_irqsave(&q->lock);
@@ -1222,7 +1223,7 @@ static bool qwait_and_ilock(struct queue *q)
                /* We set Qstarve regardless of whether we are non-blocking or not.
                 * Qstarve tracks the edge detection of the queue being empty. */
                q->state |= Qstarve;
-               if (q->state & Qnonblock) {
+               if (qio_flags & QIO_NON_BLOCK) {
                        spin_unlock_irqsave(&q->lock);
                        error(EAGAIN, "queue empty");
                }
@@ -1426,6 +1427,12 @@ struct block *qbread(struct queue *q, size_t len)
        return __qbread(q, len, QIO_JUST_ONE_BLOCK | QIO_CAN_ERR_SLEEP, MEM_WAIT);
 }
 
+struct block *qbread_nonblock(struct queue *q, size_t len)
+{
+       return __qbread(q, len, QIO_JUST_ONE_BLOCK | QIO_CAN_ERR_SLEEP |
+                       QIO_NON_BLOCK, MEM_WAIT);
+}
+
 /* read up to len from a queue into vp. */
 size_t qread(struct queue *q, void *va, size_t len)
 {
@@ -1436,6 +1443,16 @@ size_t qread(struct queue *q, void *va, size_t len)
        return read_all_blocks(blist, va, len);
 }
 
+size_t qread_nonblock(struct queue *q, void *va, size_t len)
+{
+       struct block *blist = __qbread(q, len, QIO_CAN_ERR_SLEEP | QIO_NON_BLOCK,
+                                      MEM_WAIT);
+
+       if (!blist)
+               return 0;
+       return read_all_blocks(blist, va, len);
+}
+
 static int qnotfull(void *a)
 {
        struct queue *q = a;
@@ -1497,7 +1514,9 @@ static ssize_t __qbwrite(struct queue *q, struct block *b, int qio_flags)
                        freeb(b);
                        return -1;
                }
-               if ((qio_flags & QIO_CAN_ERR_SLEEP) && (q->state & Qnonblock)) {
+               /* People shouldn't set NON_BLOCK without CAN_ERR, but we can be nice
+                * and catch it. */
+               if ((qio_flags & QIO_CAN_ERR_SLEEP) && (qio_flags & QIO_NON_BLOCK)) {
                        spin_unlock_irqsave(&q->lock);
                        freeb(b);
                        error(EAGAIN, "queue full");
@@ -1532,7 +1551,7 @@ static ssize_t __qbwrite(struct queue *q, struct block *b, int qio_flags)
         *  queue infinite crud.
         */
        if ((qio_flags & QIO_CAN_ERR_SLEEP) &&
-           !(q->state & (Qdropoverflow | Qnonblock))) {
+           !(q->state & Qdropoverflow) && !(qio_flags & QIO_NON_BLOCK)) {
                /* This is a racy peek at the q status.  If we accidentally block, we
                 * set Qflow, so someone should wake us.  If we accidentally don't
                 * block, we just returned to the user and let them slip a block past
@@ -1555,6 +1574,11 @@ ssize_t qbwrite(struct queue *q, struct block *b)
        return __qbwrite(q, b, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
 }
 
+ssize_t qbwrite_nonblock(struct queue *q, struct block *b)
+{
+       return __qbwrite(q, b, QIO_CAN_ERR_SLEEP | QIO_LIMIT | QIO_NON_BLOCK);
+}
+
 ssize_t qibwrite(struct queue *q, struct block *b)
 {
        return __qbwrite(q, b, 0);
@@ -1632,6 +1656,12 @@ ssize_t qwrite(struct queue *q, void *vp, int len)
        return __qwrite(q, vp, len, MEM_WAIT, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
 }
 
+ssize_t qwrite_nonblock(struct queue *q, void *vp, int len)
+{
+       return __qwrite(q, vp, len, MEM_WAIT, QIO_CAN_ERR_SLEEP | QIO_LIMIT |
+                                             QIO_NON_BLOCK);
+}
+
 ssize_t qiwrite(struct queue *q, void *vp, int len)
 {
        return __qwrite(q, vp, len, MEM_ATOMIC, 0);