qio: Add non-blocking queues
authorBarret Rhoden <brho@cs.berkeley.edu>
Thu, 16 Jul 2015 17:25:23 +0000 (13:25 -0400)
committerBarret Rhoden <brho@cs.berkeley.edu>
Fri, 24 Jul 2015 07:05:13 +0000 (03:05 -0400)
These queues are non-blocking in the O_NONBLOCK sense, which is distinct
from the old Plan 9 style of where data blocks get dropped when a writer
overflows the queue.

Conversation readers and writers will get an errno of EAGAIN with an
appropriate errstr, such as "queue full".

Userspace can't set O_NONBLOCK yet.

kern/include/ns.h
kern/src/ns/qio.c

index d102584..dd289dc 100644 (file)
@@ -607,6 +607,7 @@ enum {
        Qcoalesce = (1 << 4),   /* coallesce packets on read */
        Qkick = (1 << 5),       /* always call the kick routine after qwrite */
        Qdropoverflow = (1 << 6),       /* writes that would block will be dropped */
+       Qnonblock = (1 << 7),   /* do not block, throw EAGAIN */
 };
 
 #define DEVDOTDOT -1
@@ -816,6 +817,7 @@ int qisclosed(struct queue *);
 int qiwrite(struct queue *, void *, int);
 int qlen(struct queue *);
 void qdropoverflow(struct queue *, bool);
+void qnonblock(struct queue *, bool);
 struct queue *qopen(int unused_int, int, void (*)(void *), void *);
 int qpass(struct queue *, struct block *);
 int qpassnolim(struct queue *, struct block *);
index 783c88b..4741f6c 100644 (file)
@@ -1212,6 +1212,10 @@ static bool qwait(struct queue *q)
                        }
                        return FALSE;
                }
+               if (q->state & Qnonblock) {
+                       spin_unlock_irqsave(&q->lock);
+                       set_errno(EAGAIN);
+                       error("queue empty");
                }
                q->state |= Qstarve;    /* flag requesting producer to wake me */
                spin_unlock_irqsave(&q->lock);
@@ -1574,6 +1578,7 @@ long qbwrite(struct queue *q, struct block *b)
 
        /* if nonblocking, don't queue over the limit */
        if (q->len >= q->limit) {
+               /* drop overflow takes priority over regular non-blocking */
                if (q->state & Qdropoverflow) {
                        spin_unlock_irqsave(&q->lock);
                        freeb(b);
@@ -1582,6 +1587,12 @@ long qbwrite(struct queue *q, struct block *b)
                        poperror();
                        return n;
                }
+               if (q->state & Qnonblock) {
+                       spin_unlock_irqsave(&q->lock);
+                       freeb(b);
+                       set_errno(EAGAIN);
+                       error("queue full");
+               }
        }
 
        /* queue the block */
@@ -1625,7 +1636,7 @@ long qbwrite(struct queue *q, struct block *b)
         *  queue infinite crud.
         */
        for (;;) {
-               if ((q->state & Qdropoverflow) || qnotfull(q))
+               if ((q->state & (Qdropoverflow | Qnonblock)) || qnotfull(q))
                        break;
 
                spin_lock_irqsave(&q->lock);
@@ -1782,7 +1793,7 @@ void qclose(struct queue *q)
        /* mark it */
        spin_lock_irqsave(&q->lock);
        q->state |= Qclosed;
-       q->state &= ~(Qflow | Qstarve | Qdropoverflow);
+       q->state &= ~(Qflow | Qstarve | Qdropoverflow | Qnonblock);
        strncpy(q->err, Ehungup, sizeof(q->err));
        bfirst = q->bfirst;
        q->bfirst = 0;
@@ -1887,6 +1898,15 @@ void qdropoverflow(struct queue *q, bool onoff)
                q->state &= ~Qdropoverflow;
 }
 
+/* set whether or not the queue is nonblocking, in the EAGAIN sense. */
+void qnonblock(struct queue *q, bool onoff)
+{
+       if (onoff)
+               q->state |= Qnonblock;
+       else
+               q->state &= ~Qnonblock;
+}
+
 /*
  *  flush the output queue
  */