qio: Add non-blocking queues
[akaros.git] / kern / src / ns / qio.c
index cc12165..4741f6c 100644 (file)
 #include <smp.h>
 #include <ip.h>
 
-#define WARN_EXTRA(b)                                                          \
+#define PANIC_EXTRA(b)                                                          \
 {                                                                              \
        if ((b)->extra_len)                                                        \
-               warn_once("%s doesn't handle extra_data", __FUNCTION__);               \
+               panic("%s doesn't handle extra_data", __FUNCTION__);               \
 }
 
 static uint32_t padblockcnt;
@@ -46,7 +46,6 @@ struct queue {
        int limit;                                      /* max bytes in queue */
        int inilim;                             /* initial limit */
        int state;
-       int noblock;                            /* true if writes return immediately when q full */
        int eof;                                        /* number of eofs read by user */
 
        void (*kick) (void *);          /* restart output */
@@ -101,6 +100,7 @@ struct block *padblock(struct block *bp, int size)
        uint8_t bcksum = bp->flag & BCKSUM_FLAGS;
        uint16_t checksum_start = bp->checksum_start;
        uint16_t checksum_offset = bp->checksum_offset;
+       uint16_t mss = bp->mss;
 
        QDEBUG checkb(bp, "padblock 1");
        if (size >= 0) {
@@ -110,7 +110,7 @@ struct block *padblock(struct block *bp, int size)
                        return bp;
                }
 
-               WARN_EXTRA(bp);
+               PANIC_EXTRA(bp);
                if (bp->next)
                        panic("padblock %p", getcallerpc(&bp));
                n = BLEN(bp);
@@ -125,7 +125,7 @@ struct block *padblock(struct block *bp, int size)
        } else {
                size = -size;
 
-               WARN_EXTRA(bp);
+               PANIC_EXTRA(bp);
 
                if (bp->next)
                        panic("padblock %p", getcallerpc(&bp));
@@ -144,6 +144,7 @@ struct block *padblock(struct block *bp, int size)
                nbp->flag |= bcksum;
                nbp->checksum_start = checksum_start;
                nbp->checksum_offset = checksum_offset;
+               nbp->mss = mss;
        }
        QDEBUG checkb(nbp, "padblock 1");
        return nbp;
@@ -192,7 +193,7 @@ struct block *concatblock(struct block *bp)
                return bp;
 
        /* probably use parts of qclone */
-       WARN_EXTRA(bp);
+       PANIC_EXTRA(bp);
        nb = allocb(blocklen(bp));
        for (f = bp; f; f = f->next) {
                len = BLEN(f);
@@ -235,6 +236,7 @@ struct block *linearizeblock(struct block *b)
                newb->flag |= (b->flag & BCKSUM_FLAGS);
                newb->checksum_start = b->checksum_start;
                newb->checksum_offset = b->checksum_offset;
+               newb->mss = b->mss;
        }
        freeb(b);
        return newb;
@@ -260,7 +262,8 @@ struct block *pullupblock(struct block *bp, int n)
        if (bp->extra_len) {
                if (n > bp->lim - bp->rp) {
                        /* would need to realloc a new block and copy everything over. */
-                       panic("can't pullup, no place to put it\n");
+                       panic("can't pullup %d bytes, no place to put it: bp->lim %p, bp->rp %p, bp->lim-bp->rp %d\n",
+                                       n, bp->lim, bp->rp, bp->lim-bp->rp);
                }
                len = n - BHLEN(bp);
                if (len > bp->extra_len)
@@ -280,7 +283,6 @@ struct block *pullupblock(struct block *bp, int n)
                        bp->extra_len -= seglen;
                        if (ebd->len == 0) {
                                kfree((void *)ebd->base);
-                               ebd->len = 0;
                                ebd->off = 0;
                                ebd->base = 0;
                        }
@@ -349,13 +351,105 @@ struct block *pullupqueue(struct queue *q, int n)
        return q->bfirst;
 }
 
+/* throw away count bytes from the front of
+ * block's extradata.  Returns count of bytes
+ * thrown away
+ */
+
+static int pullext(struct block *bp, int count)
+{
+       struct extra_bdata *ed;
+       int i, rem, bytes = 0;
+
+       for (i = 0; bp->extra_len && count && i < bp->nr_extra_bufs; i++) {
+               ed = &bp->extra_data[i];
+               rem = MIN(count, ed->len);
+               bp->extra_len -= rem;
+               count -= rem;
+               bytes += rem;
+               ed->off += rem;
+               ed->len -= rem;
+               if (ed->len == 0) {
+                       kfree((void *)ed->base);
+                       ed->base = 0;
+                       ed->off = 0;
+               }
+       }
+       return bytes;
+}
+
+/* throw away count bytes from the end of a
+ * block's extradata.  Returns count of bytes
+ * thrown away
+ */
+
+static int dropext(struct block *bp, int count)
+{
+       struct extra_bdata *ed;
+       int i, rem, bytes = 0;
+
+       for (i = bp->nr_extra_bufs - 1; bp->extra_len && count && i >= 0; i--) {
+               ed = &bp->extra_data[i];
+               rem = MIN(count, ed->len);
+               bp->extra_len -= rem;
+               count -= rem;
+               bytes += rem;
+               ed->len -= rem;
+               if (ed->len == 0) {
+                       kfree((void *)ed->base);
+                       ed->base = 0;
+                       ed->off = 0;
+               }
+       }
+       return bytes;
+}
+
+/*
+ *  throw away up to count bytes from a
+ *  list of blocks.  Return count of bytes
+ *  thrown away.
+ */
+static int _pullblock(struct block **bph, int count, int free)
+{
+       struct block *bp;
+       int n, bytes;
+
+       bytes = 0;
+       if (bph == NULL)
+               return 0;
+
+       while (*bph != NULL && count != 0) {
+               bp = *bph;
+
+               n = MIN(BHLEN(bp), count);
+               bytes += n;
+               count -= n;
+               bp->rp += n;
+               n = pullext(bp, count);
+               bytes += n;
+               count -= n;
+               QDEBUG checkb(bp, "pullblock ");
+               if (BLEN(bp) == 0 && (free || count)) {
+                       *bph = bp->next;
+                       bp->next = NULL;
+                       freeb(bp);
+               }
+       }
+       return bytes;
+}
+
+int pullblock(struct block **bph, int count)
+{
+       return _pullblock(bph, count, 1);
+}
+
 /*
  *  trim to len bytes starting at offset
  */
 struct block *trimblock(struct block *bp, int offset, int len)
 {
-       uint32_t l;
-       struct block *nb, *startb;
+       uint32_t l, trim;
+       int olen = len;
 
        QDEBUG checkb(bp, "trimblock 1");
        if (blocklen(bp) < offset + len) {
@@ -363,31 +457,28 @@ struct block *trimblock(struct block *bp, int offset, int len)
                return NULL;
        }
 
-       while ((l = BLEN(bp)) < offset) {
-               offset -= l;
-               nb = bp->next;
-               bp->next = NULL;
-               freeb(bp);
-               bp = nb;
+       l =_pullblock(&bp, offset, 0);
+       if (bp == NULL)
+               return NULL;
+       if (l != offset) {
+               freeblist(bp);
+               return NULL;
        }
 
-       WARN_EXTRA(bp);
-       startb = bp;
-       bp->rp += offset;
-
        while ((l = BLEN(bp)) < len) {
                len -= l;
                bp = bp->next;
        }
 
-       bp->wp -= (BLEN(bp) - len);
+       trim = BLEN(bp) - len;
+       trim -= dropext(bp, trim);
+       bp->wp -= trim;
 
        if (bp->next) {
                freeblist(bp->next);
                bp->next = NULL;
        }
-
-       return startb;
+       return bp;
 }
 
 /*
@@ -404,8 +495,9 @@ struct block *copyblock(struct block *bp, int count)
                nbp->flag |= (bp->flag & BCKSUM_FLAGS);
                nbp->checksum_start = bp->checksum_start;
                nbp->checksum_offset = bp->checksum_offset;
+               nbp->mss = bp->mss;
        }
-       WARN_EXTRA(bp);
+       PANIC_EXTRA(bp);
        for (; count > 0 && bp != 0; bp = bp->next) {
                l = BLEN(bp);
                if (l > count)
@@ -434,7 +526,7 @@ struct block *adjustblock(struct block *bp, int len)
                return NULL;
        }
 
-       WARN_EXTRA(bp);
+       PANIC_EXTRA(bp);
        if (bp->rp + len > bp->lim) {
                nbp = copyblock(bp, len);
                freeblist(bp);
@@ -452,39 +544,6 @@ struct block *adjustblock(struct block *bp, int len)
        return bp;
 }
 
-/*
- *  throw away up to count bytes from a
- *  list of blocks.  Return count of bytes
- *  thrown away.
- */
-int pullblock(struct block **bph, int count)
-{
-       struct block *bp;
-       int n, bytes;
-
-       bytes = 0;
-       if (bph == NULL)
-               return 0;
-
-       while (*bph != NULL && count != 0) {
-               bp = *bph;
-       WARN_EXTRA(bp);
-
-               n = BLEN(bp);
-               if (count < n)
-                       n = count;
-               bytes += n;
-               count -= n;
-               bp->rp += n;
-               QDEBUG checkb(bp, "pullblock ");
-               if (BLEN(bp) == 0) {
-                       *bph = bp->next;
-                       bp->next = NULL;
-                       freeb(bp);
-               }
-       }
-       return bytes;
-}
 
 /*
  *  get next block from a queue, return null if nothing there
@@ -635,7 +694,7 @@ int qconsume(struct queue *q, void *vp, int len)
                tofree = b;
        };
 
-       WARN_EXTRA(b);
+       PANIC_EXTRA(b);
        if (n < len)
                len = n;
        memmove(p, b->rp, len);
@@ -781,7 +840,8 @@ struct block *packblock(struct block *bp)
        struct block **l, *nbp;
        int n;
 
-       WARN_EXTRA(bp);
+       if (bp->extra_len)
+               return bp;
        for (l = &bp; *l; l = &(*l)->next) {
                nbp = *l;
                n = BLEN(nbp);
@@ -835,7 +895,7 @@ int qproduce(struct queue *q, void *vp, int len)
                /* b->next = 0; done by iallocb() */
                q->len += BALLOC(b);
        }
-       WARN_EXTRA(b);
+       PANIC_EXTRA(b);
        memmove(b->wp, p, len);
        producecnt += len;
        b->wp += len;
@@ -1048,7 +1108,7 @@ struct block *qcopy_old(struct queue *q, int len, uint32_t offset)
        for (sofar = 0; sofar < len;) {
                if (n > len - sofar)
                        n = len - sofar;
-               WARN_EXTRA(b);
+               PANIC_EXTRA(b);
                memmove(nb->wp, p, n);
                qcopycnt += n;
                sofar += n;
@@ -1100,7 +1160,6 @@ struct queue *qopen(int limit, int msg, void (*kick) (void *), void *arg)
        q->state = msg;
        q->state |= Qstarve;
        q->eof = 0;
-       q->noblock = 0;
 
        return q;
 }
@@ -1130,11 +1189,12 @@ static int notempty(void *a)
        return (q->state & Qclosed) || q->bfirst != 0;
 }
 
-/* wait for the queue to be non-empty or closed.
+/* Wait for the queue to be non-empty or closed.  Returns TRUE for a successful
+ * wait, FALSE on Qclose (without error)
  *
- * called with q ilocked.  rendez may error out, back through the caller, with
+ * Called with q ilocked.  May error out, back through the caller, with
  * the irqsave lock unlocked.  */
-static int qwait(struct queue *q)
+static bool qwait(struct queue *q)
 {
        /* wait for data */
        for (;;) {
@@ -1142,20 +1202,28 @@ static int qwait(struct queue *q)
                        break;
 
                if (q->state & Qclosed) {
-                       if (++q->eof > 3)
-                               return -1;
-                       if (*q->err && strcmp(q->err, Ehungup) != 0)
-                               return -1;
-                       return 0;
+                       if (++q->eof > 3) {
+                               spin_unlock_irqsave(&q->lock);
+                               error("multiple reads on a closed queue");
+                       }
+                       if (*q->err && strcmp(q->err, Ehungup) != 0) {
+                               spin_unlock_irqsave(&q->lock);
+                               error(q->err);
+                       }
+                       return FALSE;
+               }
+               if (q->state & Qnonblock) {
+                       spin_unlock_irqsave(&q->lock);
+                       set_errno(EAGAIN);
+                       error("queue empty");
                }
-
                q->state |= Qstarve;    /* flag requesting producer to wake me */
                spin_unlock_irqsave(&q->lock);
                /* may throw an error() */
                rendez_sleep(&q->rr, notempty, q);
                spin_lock_irqsave(&q->lock);
        }
-       return 1;
+       return TRUE;
 }
 
 /*
@@ -1191,7 +1259,6 @@ struct block *qremove(struct queue *q)
        q->dlen -= BLEN(b);
        q->len -= BALLOC(b);
        QDEBUG checkb(b, "qremove");
-       b = linearizeblock(b);
        return b;
 }
 
@@ -1350,17 +1417,12 @@ struct block *qbread(struct queue *q, int len)
        }
 
        spin_lock_irqsave(&q->lock);
-       switch (qwait(q)) {
-               case 0:
-                       /* queue closed */
-                       spin_unlock_irqsave(&q->lock);
-                       qunlock(&q->rlock);
-                       poperror();
-                       return NULL;
-               case -1:
-                       /* multiple reads on a closed queue */
-                       spin_unlock_irqsave(&q->lock);
-                       error(q->err);
+       if (!qwait(q)) {
+               /* queue closed */
+               spin_unlock_irqsave(&q->lock);
+               qunlock(&q->rlock);
+               poperror();
+               return NULL;
        }
 
        /* if we get here, there's at least one block in the queue */
@@ -1370,7 +1432,7 @@ struct block *qbread(struct queue *q, int len)
        /* split block if it's too big and this is not a message queue */
        nb = b;
        if (n > len) {
-               WARN_EXTRA(b);
+               PANIC_EXTRA(b);
                if ((q->state & Qmsg) == 0) {
                        n -= len;
                        b = allocb(n);
@@ -1407,17 +1469,12 @@ long qread(struct queue *q, void *vp, int len)
 
        spin_lock_irqsave(&q->lock);
 again:
-       switch (qwait(q)) {
-               case 0:
-                       /* queue closed */
-                       spin_unlock_irqsave(&q->lock);
-                       qunlock(&q->rlock);
-                       poperror();
-                       return 0;
-               case -1:
-                       /* multiple reads on a closed queue */
-                       spin_unlock_irqsave(&q->lock);
-                       error(q->err);
+       if (!qwait(q)) {
+               /* queue closed */
+               spin_unlock_irqsave(&q->lock);
+               qunlock(&q->rlock);
+               poperror();
+               return 0;
        }
 
        /* if we get here, there's at least one block in the queue */
@@ -1484,7 +1541,7 @@ static int qnotfull(void *a)
        return q->len < q->limit || (q->state & Qclosed);
 }
 
-uint32_t noblockcnt;
+uint32_t dropcnt;
 
 /*
  *  add a block to a queue obeying flow control
@@ -1521,14 +1578,21 @@ long qbwrite(struct queue *q, struct block *b)
 
        /* if nonblocking, don't queue over the limit */
        if (q->len >= q->limit) {
-               if (q->noblock) {
+               /* drop overflow takes priority over regular non-blocking */
+               if (q->state & Qdropoverflow) {
                        spin_unlock_irqsave(&q->lock);
                        freeb(b);
-                       noblockcnt += n;
+                       dropcnt += n;
                        qunlock(&q->wlock);
                        poperror();
                        return n;
                }
+               if (q->state & Qnonblock) {
+                       spin_unlock_irqsave(&q->lock);
+                       freeb(b);
+                       set_errno(EAGAIN);
+                       error("queue full");
+               }
        }
 
        /* queue the block */
@@ -1572,7 +1636,7 @@ long qbwrite(struct queue *q, struct block *b)
         *  queue infinite crud.
         */
        for (;;) {
-               if (q->noblock || qnotfull(q))
+               if ((q->state & (Qdropoverflow | Qnonblock)) || qnotfull(q))
                        break;
 
                spin_lock_irqsave(&q->lock);
@@ -1729,13 +1793,12 @@ void qclose(struct queue *q)
        /* mark it */
        spin_lock_irqsave(&q->lock);
        q->state |= Qclosed;
-       q->state &= ~(Qflow | Qstarve);
+       q->state &= ~(Qflow | Qstarve | Qdropoverflow | Qnonblock);
        strncpy(q->err, Ehungup, sizeof(q->err));
        bfirst = q->bfirst;
        q->bfirst = 0;
        q->len = 0;
        q->dlen = 0;
-       q->noblock = 0;
        spin_unlock_irqsave(&q->lock);
 
        /* free queued blocks */
@@ -1825,11 +1888,23 @@ void qsetlimit(struct queue *q, int limit)
 }
 
 /*
- *  set blocking/nonblocking
+ *  set whether writes drop overflowing blocks, or if we sleep
  */
-void qnoblock(struct queue *q, int onoff)
+void qdropoverflow(struct queue *q, bool onoff)
 {
-       q->noblock = onoff;
+       if (onoff)
+               q->state |= Qdropoverflow;
+       else
+               q->state &= ~Qdropoverflow;
+}
+
+/* set whether or not the queue is nonblocking, in the EAGAIN sense. */
+void qnonblock(struct queue *q, bool onoff)
+{
+       if (onoff)
+               q->state |= Qnonblock;
+       else
+               q->state &= ~Qnonblock;
 }
 
 /*