qio: Add non-blocking queues
[akaros.git] / kern / src / ns / qio.c
index 783c88b..4741f6c 100644 (file)
@@ -1212,6 +1212,10 @@ static bool qwait(struct queue *q)
                        }
                        return FALSE;
                }
+               if (q->state & Qnonblock) {
+                       spin_unlock_irqsave(&q->lock);
+                       set_errno(EAGAIN);
+                       error("queue empty");
                }
                q->state |= Qstarve;    /* flag requesting producer to wake me */
                spin_unlock_irqsave(&q->lock);
@@ -1574,6 +1578,7 @@ long qbwrite(struct queue *q, struct block *b)
 
        /* if nonblocking, don't queue over the limit */
        if (q->len >= q->limit) {
+               /* drop overflow takes priority over regular non-blocking */
                if (q->state & Qdropoverflow) {
                        spin_unlock_irqsave(&q->lock);
                        freeb(b);
@@ -1582,6 +1587,12 @@ long qbwrite(struct queue *q, struct block *b)
                        poperror();
                        return n;
                }
+               if (q->state & Qnonblock) {
+                       spin_unlock_irqsave(&q->lock);
+                       freeb(b);
+                       set_errno(EAGAIN);
+                       error("queue full");
+               }
        }
 
        /* queue the block */
@@ -1625,7 +1636,7 @@ long qbwrite(struct queue *q, struct block *b)
         *  queue infinite crud.
         */
        for (;;) {
-               if ((q->state & Qdropoverflow) || qnotfull(q))
+               if ((q->state & (Qdropoverflow | Qnonblock)) || qnotfull(q))
                        break;
 
                spin_lock_irqsave(&q->lock);
@@ -1782,7 +1793,7 @@ void qclose(struct queue *q)
        /* mark it */
        spin_lock_irqsave(&q->lock);
        q->state |= Qclosed;
-       q->state &= ~(Qflow | Qstarve | Qdropoverflow);
+       q->state &= ~(Qflow | Qstarve | Qdropoverflow | Qnonblock);
        strncpy(q->err, Ehungup, sizeof(q->err));
        bfirst = q->bfirst;
        q->bfirst = 0;
@@ -1887,6 +1898,15 @@ void qdropoverflow(struct queue *q, bool onoff)
                q->state &= ~Qdropoverflow;
 }
 
+/* set whether or not the queue is nonblocking, in the EAGAIN sense. */
+void qnonblock(struct queue *q, bool onoff)
+{
+       if (onoff)
+               q->state |= Qnonblock;
+       else
+               q->state &= ~Qnonblock;
+}
+
 /*
  *  flush the output queue
  */