qio: Remove q->len
[akaros.git] / kern / src / ns / qio.c
index d229b3c..07c58c0 100644 (file)
@@ -71,7 +71,6 @@ struct queue {
        struct block *bfirst;           /* buffer */
        struct block *blast;
 
-       int len;                                        /* bytes allocated to queue */
        int dlen;                                       /* data bytes in queue */
        int limit;                                      /* max bytes in queue */
        int inilim;                             /* initial limit */
@@ -107,7 +106,6 @@ static ssize_t __qbwrite(struct queue *q, struct block *b, int flags);
 static struct block *__qbread(struct queue *q, size_t len, int qio_flags,
                               int mem_flags);
 static bool qwait_and_ilock(struct queue *q, int qio_flags);
-static void qwakeup_iunlock(struct queue *q, int qio_flags);
 
 /* Helper: fires a wake callback, sending 'filter' */
 static void qwake_cb(struct queue *q, int filter)
@@ -290,9 +288,9 @@ struct block *linearizeblock(struct block *b)
        return newb;
 }
 
-/*
- *  make sure the first block has at least n bytes in its main body
- */
+/* Make sure the first block has at least n bytes in its main body.  Pulls up
+ * data from the *list* of blocks.  Returns 0 if there is not enough data in the
+ * block list. */
 struct block *pullupblock(struct block *bp, int n)
 {
        int i, len, seglen;
@@ -306,7 +304,11 @@ struct block *pullupblock(struct block *bp, int n)
        if (BHLEN(bp) >= n)
                return bp;
 
-        /* a start at explicit main-body / header management */
+       /* If there's no chance, just bail out now.  This might be slightly wasteful
+        * if there's a long blist that does have enough data. */
+       if (n > blocklen(bp))
+               return 0;
+       /* a start at explicit main-body / header management */
        if (bp->extra_len) {
                if (n > bp->lim - bp->rp) {
                        /* would need to realloc a new block and copy everything over. */
@@ -314,6 +316,8 @@ struct block *pullupblock(struct block *bp, int n)
                                        n, bp->lim, bp->rp, bp->lim-bp->rp);
                }
                len = n - BHLEN(bp);
+               /* Would need to recursively call this, or otherwise pull from later
+                * blocks and put chunks of their data into the block we're building. */
                if (len > bp->extra_len)
                        panic("pullup more than extra (%d, %d, %d)\n",
                              n, BHLEN(bp), bp->extra_len);
@@ -605,7 +609,6 @@ static struct block *pop_first_block(struct queue *q)
 {
        struct block *b = q->bfirst;
 
-       q->len -= BALLOC(b);  // XXX all usages of q->len with extra_data are fucked
        q->dlen -= BLEN(b);
        q->bfirst = b->next;
        b->next = 0;
@@ -625,7 +628,6 @@ static size_t copy_to_block_body(struct block *to, void *from, size_t copy_amt)
 static void block_and_q_lost_extra(struct block *b, struct queue *q, size_t amt)
 {
        b->extra_len -= amt;
-       q->len -= amt;
        q->dlen -= amt;
 }
 
@@ -728,6 +730,7 @@ static int __try_qbread(struct queue *q, size_t len, int qio_flags,
 {
        struct block *ret, *ret_last, *first;
        size_t blen;
+       bool was_unwritable = FALSE;
 
        if (qio_flags & QIO_CAN_ERR_SLEEP) {
                if (!qwait_and_ilock(q, qio_flags)) {
@@ -744,6 +747,12 @@ static int __try_qbread(struct queue *q, size_t len, int qio_flags,
                        return QBR_FAIL;
                }
        }
+       /* We need to check before adjusting q->len.  We're checking the writer's
+        * sleep condition / tap condition.  When set, we *might* be making an edge
+        * transition (from unwritable to writable), which needs to wake and fire
+        * taps.  But, our read might not drain the queue below q->lim.  We'll check
+        * again later to see if we should really wake them.  */
+       was_unwritable = !qwritable(q);
        blen = BLEN(first);
        if ((q->state & Qcoalesce) && (blen == 0)) {
                freeb(pop_first_block(q));
@@ -751,9 +760,8 @@ static int __try_qbread(struct queue *q, size_t len, int qio_flags,
                /* Need to retry to make sure we have a first block */
                return QBR_AGAIN;
        }
-       /* Qmsg is a bit weird.  The old 9ns code seemed to yank the entire block,
-        * regardless of len.  We'll do the same, and just return the minimum: the
-        * first block.  I'd be happy to remove this. */
+       /* Qmsg: just return the first block.  Be careful, since our caller might
+        * not read all of the block and thus drop bytes.  Similar to SOCK_DGRAM. */
        if (q->state & Qmsg) {
                ret = pop_first_block(q);
                goto out_ok;
@@ -800,7 +808,16 @@ static int __try_qbread(struct queue *q, size_t len, int qio_flags,
                len -= blen;
        }
 out_ok:
-       qwakeup_iunlock(q, qio_flags);
+       /* Don't wake them up or fire tap if we didn't drain enough. */
+       if (!qwritable(q))
+               was_unwritable = FALSE;
+       spin_unlock_irqsave(&q->lock);
+       if (was_unwritable) {
+               if (q->kick && !(qio_flags & QIO_DONT_KICK))
+                       q->kick(q->arg);
+               rendez_wakeup(&q->wr);
+               qwake_cb(q, FDTAP_FILT_WRITABLE);
+       }
        *real_ret = ret;
        return QBR_OK;
 }
@@ -1154,7 +1171,6 @@ struct queue *qopen(int limit, int msg, void (*kick) (void *), void *arg)
        q->kick = kick;
        q->arg = arg;
        q->state = msg;
-       q->state |= Qstarve;
        q->eof = 0;
 
        return q;
@@ -1205,15 +1221,23 @@ static bool qwait_and_ilock(struct queue *q, int qio_flags)
                        }
                        return FALSE;
                }
-               /* We set Qstarve regardless of whether we are non-blocking or not.
-                * Qstarve tracks the edge detection of the queue being empty. */
-               q->state |= Qstarve;
                if (qio_flags & QIO_NON_BLOCK) {
                        spin_unlock_irqsave(&q->lock);
                        error(EAGAIN, "queue empty");
                }
                spin_unlock_irqsave(&q->lock);
-               /* may throw an error() */
+               /* As with the producer side, we check for a condition while holding the
+                * q->lock, decide to sleep, then unlock.  It's like the "check, signal,
+                * check again" pattern, but we do it conditionally.  Both sides agree
+                * synchronously to do it, and those decisions are made while holding
+                * q->lock.  I think this is OK.
+                *
+                * The invariant is that no reader sleeps when the queue has data.
+                * While holding the rendez lock, if we see there's no data, we'll
+                * sleep.  Since we saw there was no data, the next writer will see (or
+                * already saw) no data, and then the writer decides to rendez_wake,
+                * which will grab the rendez lock.  If the writer already did that,
+                * then we'll see notempty when we do our check-again. */
                rendez_sleep(&q->rr, notempty, q);
        }
 }
@@ -1230,7 +1254,6 @@ void qaddlist(struct queue *q, struct block *b)
                q->blast->next = b;
        else
                q->bfirst = b;
-       q->len += blockalloclen(b);
        q->dlen += blocklen(b);
        while (b->next)
                b = b->next;
@@ -1314,6 +1337,9 @@ static size_t read_all_blocks(struct block *b, void *va, size_t len)
        do {
                /* We should be draining every block completely. */
                assert(BLEN(b) <= len - sofar);
+               assert(va);
+               assert(va + sofar);
+               assert(b->rp);
                sofar += read_from_block(b, va + sofar, len - sofar);
                next = b->next;
                freeb(b);
@@ -1366,45 +1392,10 @@ void qputback(struct queue *q, struct block *b)
        if (q->bfirst == NULL)
                q->blast = b;
        q->bfirst = b;
-       q->len += BALLOC(b);
        q->dlen += BLEN(b);
 }
 
 /*
- *  flow control, get producer going again
- *  called with q ilocked
- */
-static void qwakeup_iunlock(struct queue *q, int qio_flags)
-{
-       int dowakeup = 0;
-
-       /*
-        *  if writer flow controlled, restart
-        *
-        *  This used to be
-        *  q->len < q->limit/2
-        *  but it slows down tcp too much for certain write sizes.
-        *  I really don't understand it completely.  It may be
-        *  due to the queue draining so fast that the transmission
-        *  stalls waiting for the app to produce more data.  - presotto
-        */
-       if ((q->state & Qflow) && q->len < q->limit) {
-               q->state &= ~Qflow;
-               dowakeup = 1;
-       }
-
-       spin_unlock_irqsave(&q->lock);
-
-       /* wakeup flow controlled writers */
-       if (dowakeup) {
-               if (q->kick && !(qio_flags & QIO_DONT_KICK))
-                       q->kick(q->arg);
-               rendez_wakeup(&q->wr);
-       }
-       qwake_cb(q, FDTAP_FILT_WRITABLE);
-}
-
-/*
  *  get next block from a queue (up to a limit)
  *
  */
@@ -1443,27 +1434,24 @@ static int qnotfull(void *a)
 {
        struct queue *q = a;
 
-       return q->len < q->limit || (q->state & Qclosed);
+       return qwritable(q) || (q->state & Qclosed);
 }
 
 /* Helper: enqueues a list of blocks to a queue.  Returns the total length. */
 static size_t enqueue_blist(struct queue *q, struct block *b)
 {
-       size_t len, dlen;
+       size_t dlen;
 
        if (q->bfirst)
                q->blast->next = b;
        else
                q->bfirst = b;
-       len = BALLOC(b);
        dlen = BLEN(b);
        while (b->next) {
                b = b->next;
-               len += BALLOC(b);
                dlen += BLEN(b);
        }
        q->blast = b;
-       q->len += len;
        q->dlen += dlen;
        return dlen;
 }
@@ -1474,8 +1462,7 @@ static size_t enqueue_blist(struct queue *q, struct block *b)
 static ssize_t __qbwrite(struct queue *q, struct block *b, int qio_flags)
 {
        ssize_t ret;
-       bool dowakeup = FALSE;
-       bool was_empty;
+       bool was_unreadable;
 
        if (q->bypass) {
                ret = blocklen(b);
@@ -1483,7 +1470,7 @@ static ssize_t __qbwrite(struct queue *q, struct block *b, int qio_flags)
                return ret;
        }
        spin_lock_irqsave(&q->lock);
-       was_empty = q->len == 0;
+       was_unreadable = q->dlen == 0;
        if (q->state & Qclosed) {
                spin_unlock_irqsave(&q->lock);
                freeblist(b);
@@ -1494,7 +1481,7 @@ static ssize_t __qbwrite(struct queue *q, struct block *b, int qio_flags)
                else
                        error(EPIPE, "connection closed");
        }
-       if ((qio_flags & QIO_LIMIT) && (q->len >= q->limit)) {
+       if ((qio_flags & QIO_LIMIT) && (q->dlen >= q->limit)) {
                /* drop overflow takes priority over regular non-blocking */
                if ((qio_flags & QIO_DROP_OVERFLOW) || (q->state & Qdropoverflow)) {
                        spin_unlock_irqsave(&q->lock);
@@ -1511,20 +1498,20 @@ static ssize_t __qbwrite(struct queue *q, struct block *b, int qio_flags)
        }
        ret = enqueue_blist(q, b);
        QDEBUG checkb(b, "__qbwrite");
-       /* make sure other end gets awakened */
-       if (q->state & Qstarve) {
-               q->state &= ~Qstarve;
-               dowakeup = TRUE;
-       }
        spin_unlock_irqsave(&q->lock);
        /* TODO: not sure if the usage of a kick is mutually exclusive with a
         * wakeup, meaning that actual users either want a kick or have qreaders. */
-       if (q->kick && (dowakeup || (q->state & Qkick)))
+       if (q->kick && (was_unreadable || (q->state & Qkick)))
                q->kick(q->arg);
-       if (dowakeup)
+       if (was_unreadable) {
+               /* Unlike the read side, there's no double-check to make sure the queue
+                * transitioned across an edge.  We know we added something, so that's
+                * enough.  We wake if the queue was empty.  Both sides are the same, in
+                * that the condition for which we do the rendez_wakeup() is the same as
+                * the condition done for the rendez_sleep(). */
                rendez_wakeup(&q->rr);
-       if (was_empty)
                qwake_cb(q, FDTAP_FILT_READABLE);
+       }
        /*
         *  flow control, wait for queue to get below the limit
         *  before allowing the process to continue and queue
@@ -1539,16 +1526,27 @@ static ssize_t __qbwrite(struct queue *q, struct block *b, int qio_flags)
         */
        if ((qio_flags & QIO_CAN_ERR_SLEEP) &&
            !(q->state & Qdropoverflow) && !(qio_flags & QIO_NON_BLOCK)) {
-               /* This is a racy peek at the q status.  If we accidentally block, we
-                * set Qflow, so someone should wake us.  If we accidentally don't
-                * block, we just returned to the user and let them slip a block past
-                * flow control. */
-               while (!qnotfull(q)) {
-                       spin_lock_irqsave(&q->lock);
-                       q->state |= Qflow;
-                       spin_unlock_irqsave(&q->lock);
+               /* This is a racy peek at the q status.  If we accidentally block, our
+                * rendez will return.  The rendez's peak (qnotfull) is also racy w.r.t.
+                * the q's spinlock (that lock protects writes, but not reads).
+                *
+                * Here's the deal: when holding the rendez lock, if we see the sleep
+                * condition, the consumer will wake us.  The condition will only ever
+                * be changed by the next qbread() (consumer, changes q->dlen).  That
+                * code will do a rendez wake, which will spin on the rendez lock,
+                * meaning it won't procede until we either see the new state (and
+                * return) or put ourselves on the rendez, and wake up.
+                *
+                * The pattern is one side writes mem, then signals.  Our side checks
+                * the signal, then reads the mem.  The goal is to not miss seeing the
+                * signal AND missing the memory write.  In this specific case, the
+                * signal is actually synchronous (the rendez lock) and not basic shared
+                * memory.
+                *
+                * Oh, and we spin in case we woke early and someone else filled the
+                * queue, mesa-style. */
+               while (!qnotfull(q))
                        rendez_sleep(&q->wr, qnotfull, q);
-               }
        }
        return ret;
 }
@@ -1678,11 +1676,10 @@ void qclose(struct queue *q)
        /* mark it */
        spin_lock_irqsave(&q->lock);
        q->state |= Qclosed;
-       q->state &= ~(Qflow | Qstarve | Qdropoverflow);
+       q->state &= ~Qdropoverflow;
        q->err[0] = 0;
        bfirst = q->bfirst;
        q->bfirst = 0;
-       q->len = 0;
        q->dlen = 0;
        spin_unlock_irqsave(&q->lock);
 
@@ -1733,7 +1730,6 @@ void qreopen(struct queue *q)
 {
        spin_lock_irqsave(&q->lock);
        q->state &= ~Qclosed;
-       q->state |= Qstarve;
        q->eof = 0;
        q->limit = q->inilim;
        q->wake_cb = 0;
@@ -1751,12 +1747,22 @@ int qlen(struct queue *q)
 
 /*
  * return space remaining before flow control
+ *
+ *  This used to be
+ *  q->len < q->limit/2
+ *  but it slows down tcp too much for certain write sizes.
+ *  I really don't understand it completely.  It may be
+ *  due to the queue draining so fast that the transmission
+ *  stalls waiting for the app to produce more data.  - presotto
+ *
+ *  q->len was the amount of bytes, which is no longer used.  we now use
+ *  q->dlen, the amount of usable data.  a.k.a. qlen()...  - brho
  */
 int qwindow(struct queue *q)
 {
        int l;
 
-       l = q->limit - q->len;
+       l = q->limit - q->dlen;
        if (l < 0)
                l = 0;
        return l;
@@ -1791,6 +1797,30 @@ void qdropoverflow(struct queue *q, bool onoff)
        spin_unlock_irqsave(&q->lock);
 }
 
+/* Be careful: this can affect concurrent reads/writes and code that might have
+ * built-in expectations of the q's type. */
+void q_toggle_qmsg(struct queue *q, bool onoff)
+{
+       spin_lock_irqsave(&q->lock);
+       if (onoff)
+               q->state |= Qmsg;
+       else
+               q->state &= ~Qmsg;
+       spin_unlock_irqsave(&q->lock);
+}
+
+/* Be careful: this can affect concurrent reads/writes and code that might have
+ * built-in expectations of the q's type. */
+void q_toggle_qcoalesce(struct queue *q, bool onoff)
+{
+       spin_lock_irqsave(&q->lock);
+       if (onoff)
+               q->state |= Qcoalesce;
+       else
+               q->state &= ~Qcoalesce;
+       spin_unlock_irqsave(&q->lock);
+}
+
 /*
  *  flush the output queue
  */
@@ -1802,7 +1832,6 @@ void qflush(struct queue *q)
        spin_lock_irqsave(&q->lock);
        bfirst = q->bfirst;
        q->bfirst = 0;
-       q->len = 0;
        q->dlen = 0;
        spin_unlock_irqsave(&q->lock);
 
@@ -1816,7 +1845,7 @@ void qflush(struct queue *q)
 
 int qfull(struct queue *q)
 {
-       return q->len >= q->limit;
+       return q->dlen >= q->limit;
 }
 
 int qstate(struct queue *q)
@@ -1827,8 +1856,8 @@ int qstate(struct queue *q)
 void qdump(struct queue *q)
 {
        if (q)
-               printk("q=%p bfirst=%p blast=%p len=%d dlen=%d limit=%d state=#%x\n",
-                          q, q->bfirst, q->blast, q->len, q->dlen, q->limit, q->state);
+               printk("q=%p bfirst=%p blast=%p dlen=%d limit=%d state=#%x\n",
+                          q, q->bfirst, q->blast, q->dlen, q->limit, q->state);
 }
 
 /* On certain wakeup events, qio will call func(q, data, filter), where filter
@@ -1857,5 +1886,5 @@ bool qreadable(struct queue *q)
 /* Helper for detecting whether we'll block on a write at this instant. */
 bool qwritable(struct queue *q)
 {
-       return qwindow(q) > 0;
+       return !q->limit || qwindow(q) > 0;
 }