qio: Add non-blocking queues
[akaros.git] / kern / src / ns / qio.c
index 54a776d..4741f6c 100644 (file)
@@ -46,7 +46,6 @@ struct queue {
        int limit;                                      /* max bytes in queue */
        int inilim;                             /* initial limit */
        int state;
-       int noblock;                            /* true if writes return immediately when q full */
        int eof;                                        /* number of eofs read by user */
 
        void (*kick) (void *);          /* restart output */
@@ -1161,7 +1160,6 @@ struct queue *qopen(int limit, int msg, void (*kick) (void *), void *arg)
        q->state = msg;
        q->state |= Qstarve;
        q->eof = 0;
-       q->noblock = 0;
 
        return q;
 }
@@ -1191,11 +1189,12 @@ static int notempty(void *a)
        return (q->state & Qclosed) || q->bfirst != 0;
 }
 
-/* wait for the queue to be non-empty or closed.
+/* Wait for the queue to be non-empty or closed.  Returns TRUE for a successful
+ * wait, FALSE on Qclose (without error)
  *
- * called with q ilocked.  rendez may error out, back through the caller, with
+ * Called with q ilocked.  May error out, back through the caller, with
  * the irqsave lock unlocked.  */
-static int qwait(struct queue *q)
+static bool qwait(struct queue *q)
 {
        /* wait for data */
        for (;;) {
@@ -1203,20 +1202,28 @@ static int qwait(struct queue *q)
                        break;
 
                if (q->state & Qclosed) {
-                       if (++q->eof > 3)
-                               return -1;
-                       if (*q->err && strcmp(q->err, Ehungup) != 0)
-                               return -1;
-                       return 0;
+                       if (++q->eof > 3) {
+                               spin_unlock_irqsave(&q->lock);
+                               error("multiple reads on a closed queue");
+                       }
+                       if (*q->err && strcmp(q->err, Ehungup) != 0) {
+                               spin_unlock_irqsave(&q->lock);
+                               error(q->err);
+                       }
+                       return FALSE;
+               }
+               if (q->state & Qnonblock) {
+                       spin_unlock_irqsave(&q->lock);
+                       set_errno(EAGAIN);
+                       error("queue empty");
                }
-
                q->state |= Qstarve;    /* flag requesting producer to wake me */
                spin_unlock_irqsave(&q->lock);
                /* may throw an error() */
                rendez_sleep(&q->rr, notempty, q);
                spin_lock_irqsave(&q->lock);
        }
-       return 1;
+       return TRUE;
 }
 
 /*
@@ -1410,17 +1417,12 @@ struct block *qbread(struct queue *q, int len)
        }
 
        spin_lock_irqsave(&q->lock);
-       switch (qwait(q)) {
-               case 0:
-                       /* queue closed */
-                       spin_unlock_irqsave(&q->lock);
-                       qunlock(&q->rlock);
-                       poperror();
-                       return NULL;
-               case -1:
-                       /* multiple reads on a closed queue */
-                       spin_unlock_irqsave(&q->lock);
-                       error(q->err);
+       if (!qwait(q)) {
+               /* queue closed */
+               spin_unlock_irqsave(&q->lock);
+               qunlock(&q->rlock);
+               poperror();
+               return NULL;
        }
 
        /* if we get here, there's at least one block in the queue */
@@ -1467,17 +1469,12 @@ long qread(struct queue *q, void *vp, int len)
 
        spin_lock_irqsave(&q->lock);
 again:
-       switch (qwait(q)) {
-               case 0:
-                       /* queue closed */
-                       spin_unlock_irqsave(&q->lock);
-                       qunlock(&q->rlock);
-                       poperror();
-                       return 0;
-               case -1:
-                       /* multiple reads on a closed queue */
-                       spin_unlock_irqsave(&q->lock);
-                       error(q->err);
+       if (!qwait(q)) {
+               /* queue closed */
+               spin_unlock_irqsave(&q->lock);
+               qunlock(&q->rlock);
+               poperror();
+               return 0;
        }
 
        /* if we get here, there's at least one block in the queue */
@@ -1544,7 +1541,7 @@ static int qnotfull(void *a)
        return q->len < q->limit || (q->state & Qclosed);
 }
 
-uint32_t noblockcnt;
+uint32_t dropcnt;
 
 /*
  *  add a block to a queue obeying flow control
@@ -1581,14 +1578,21 @@ long qbwrite(struct queue *q, struct block *b)
 
        /* if nonblocking, don't queue over the limit */
        if (q->len >= q->limit) {
-               if (q->noblock) {
+               /* drop overflow takes priority over regular non-blocking */
+               if (q->state & Qdropoverflow) {
                        spin_unlock_irqsave(&q->lock);
                        freeb(b);
-                       noblockcnt += n;
+                       dropcnt += n;
                        qunlock(&q->wlock);
                        poperror();
                        return n;
                }
+               if (q->state & Qnonblock) {
+                       spin_unlock_irqsave(&q->lock);
+                       freeb(b);
+                       set_errno(EAGAIN);
+                       error("queue full");
+               }
        }
 
        /* queue the block */
@@ -1632,7 +1636,7 @@ long qbwrite(struct queue *q, struct block *b)
         *  queue infinite crud.
         */
        for (;;) {
-               if (q->noblock || qnotfull(q))
+               if ((q->state & (Qdropoverflow | Qnonblock)) || qnotfull(q))
                        break;
 
                spin_lock_irqsave(&q->lock);
@@ -1789,13 +1793,12 @@ void qclose(struct queue *q)
        /* mark it */
        spin_lock_irqsave(&q->lock);
        q->state |= Qclosed;
-       q->state &= ~(Qflow | Qstarve);
+       q->state &= ~(Qflow | Qstarve | Qdropoverflow | Qnonblock);
        strncpy(q->err, Ehungup, sizeof(q->err));
        bfirst = q->bfirst;
        q->bfirst = 0;
        q->len = 0;
        q->dlen = 0;
-       q->noblock = 0;
        spin_unlock_irqsave(&q->lock);
 
        /* free queued blocks */
@@ -1889,7 +1892,19 @@ void qsetlimit(struct queue *q, int limit)
  */
 void qdropoverflow(struct queue *q, bool onoff)
 {
-       q->noblock = onoff;
+       if (onoff)
+               q->state |= Qdropoverflow;
+       else
+               q->state &= ~Qdropoverflow;
+}
+
+/* set whether or not the queue is nonblocking, in the EAGAIN sense. */
+void qnonblock(struct queue *q, bool onoff)
+{
+       if (onoff)
+               q->state |= Qnonblock;
+       else
+               q->state &= ~Qnonblock;
 }
 
 /*