qio: implement pullupblock() for block extra data
[akaros.git] / kern / src / ns / qio.c
1 /* Copyright © 1994-1999 Lucent Technologies Inc.  All rights reserved.
2  * Portions Copyright © 1997-1999 Vita Nuova Limited
3  * Portions Copyright © 2000-2007 Vita Nuova Holdings Limited
4  *                                (www.vitanuova.com)
5  * Revisions Copyright © 2000-2007 Lucent Technologies Inc. and others
6  *
7  * Modified for the Akaros operating system:
8  * Copyright (c) 2013-2014 The Regents of the University of California
9  * Copyright (c) 2013-2015 Google Inc.
10  *
11  * Permission is hereby granted, free of charge, to any person obtaining a copy
12  * of this software and associated documentation files (the "Software"), to deal
13  * in the Software without restriction, including without limitation the rights
14  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15  * copies of the Software, and to permit persons to whom the Software is
16  * furnished to do so, subject to the following conditions:
17  *
18  * The above copyright notice and this permission notice shall be included in
19  * all copies or substantial portions of the Software.
20  *
21  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL THE
24  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
27  * SOFTWARE. */
28
29 #include <slab.h>
30 #include <kmalloc.h>
31 #include <kref.h>
32 #include <string.h>
33 #include <stdio.h>
34 #include <assert.h>
35 #include <error.h>
36 #include <cpio.h>
37 #include <pmap.h>
38 #include <smp.h>
39 #include <net/ip.h>
40
41 #define PANIC_EXTRA(b)                                                  \
42 {                                                                       \
43         if ((b)->extra_len) {                                           \
44                 printblock(b);                                          \
45                 backtrace();                                            \
46                 panic("%s doesn't handle extra_data", __FUNCTION__);    \
47         }                                                               \
48 }
49
50 static uint32_t padblockcnt;
51 static uint32_t concatblockcnt;
52 static uint32_t pullupblockcnt;
53 static uint32_t copyblockcnt;
54 static uint32_t consumecnt;
55 static uint32_t producecnt;
56 static uint32_t qcopycnt;
57
58 static int debugging;
59
60 #define QDEBUG  if(0)
61
62 /*
63  *  IO queues
64  */
65
66 struct queue {
67         spinlock_t lock;;
68
69         struct block *bfirst;           /* buffer */
70         struct block *blast;
71
72         int dlen;                       /* data bytes in queue */
73         int limit;                      /* max bytes in queue */
74         int inilim;                     /* initial limit */
75         int state;
76         int eof;                        /* number of eofs read by user */
77         size_t bytes_read;
78
79         void (*kick) (void *);          /* restart output */
80         void (*bypass) (void *, struct block *); /* bypass queue altogether */
81         void *arg;                      /* argument to kick */
82
83         struct rendez rr;               /* process waiting to read */
84         struct rendez wr;               /* process waiting to write */
85         qio_wake_cb_t wake_cb;          /* callbacks for qio wakeups */
86         void *wake_data;
87
88         char err[ERRMAX];
89 };
90
91 enum {
92         Maxatomic = 64 * 1024,
93         QIO_CAN_ERR_SLEEP = (1 << 0),   /* can throw errors or block/sleep */
94         QIO_LIMIT = (1 << 1),           /* respect q->limit */
95         QIO_DROP_OVERFLOW = (1 << 2),   /* alternative to qdropoverflow */
96         QIO_JUST_ONE_BLOCK = (1 << 3),  /* when qbreading, just get one block */
97         QIO_NON_BLOCK = (1 << 4),       /* throw EAGAIN instead of blocking */
98         QIO_DONT_KICK = (1 << 5),       /* don't kick when waking */
99 };
100
101 unsigned int qiomaxatomic = Maxatomic;
102
103 static ssize_t __qbwrite(struct queue *q, struct block *b, int flags);
104 static struct block *__qbread(struct queue *q, size_t len, int qio_flags,
105                               int mem_flags);
106 static bool qwait_and_ilock(struct queue *q, int qio_flags);
107
108 /* Helper: fires a wake callback, sending 'filter' */
109 static void qwake_cb(struct queue *q, int filter)
110 {
111         if (q->wake_cb)
112                 q->wake_cb(q, q->wake_data, filter);
113 }
114
115 void ixsummary(void)
116 {
117         debugging ^= 1;
118         printd("pad %lu, concat %lu, pullup %lu, copy %lu\n",
119                    padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
120         printd("consume %lu, produce %lu, qcopy %lu\n",
121                    consumecnt, producecnt, qcopycnt);
122 }
123
124 /* Pad a block to the front (or the back if size is negative).  Returns the
125  * block pointer that you must use instead of bp.  i.e. bp = padblock(bp, x);
126  *
127  * This space is in the main body / header space, not the extra data.  In
128  * essence, the block has size bytes of uninitialized data placed in the front
129  * (left) of the old header data.  The caller needs to fill in that data,
130  * presumably with packet headers.  Any block[offset] in the old block is now at
131  * block[offset + size].
132  *
133  * Negative padding applies at the end of the block.  This means that there is
134  * space in block header for size bytes (in between wp and lim).  Given that all
135  * of the block 'padding' needs to be in the main header, that means we need to
136  * linearize the entire block, such that the end padding is in the main
137  * body/header space.
138  */
139 struct block *padblock(struct block *bp, int size)
140 {
141         int n;
142         struct block *nbp;
143
144         QDEBUG checkb(bp, "padblock 1");
145         if (size >= 0) {
146                 if (bp->rp - bp->base >= size) {
147                         block_add_to_offsets(bp, size);
148                         bp->rp -= size;
149                         return bp;
150                 }
151                 if (bp->next)
152                         panic("%s %p had a next", __func__, bp);
153                 n = BHLEN(bp);
154                 padblockcnt++;
155                 nbp = block_alloc(size + n, MEM_WAIT);
156                 block_copy_metadata(nbp, bp);
157                 block_replace_extras(nbp, bp);
158                 /* This effectively copies the old block main body such that we
159                  * know we have size bytes to the left of rp.  All of the
160                  * metadata offsets (e.g. tx_csum) are relative from this blob
161                  * of data: i.e. nbp->rp + size. */
162                 nbp->rp += size;
163                 nbp->wp = nbp->rp;
164                 memmove(nbp->wp, bp->rp, n);
165                 nbp->wp += n;
166                 freeb(bp);
167                 block_add_to_offsets(nbp, size);
168                 nbp->rp -= size;
169         } else {
170                 /* No one I know of calls this yet, so this is untested.  Maybe
171                  * we can remove it. */
172                 warn_once("pad attempt with negative size %d", size);
173                 size = -size;
174                 if (bp->next)
175                         panic("%s %p had a next", __func__, bp);
176                 if (bp->lim - bp->wp >= size)
177                         return bp;
178                 /* Negative padding goes after all data.  In essence, we'll need
179                  * to pull all block extra data up into the headers.  The
180                  * easiest thing is to linearize, then do the old algorithm.  We
181                  * may do extra allocations - if we ever use this, we can fix it
182                  * up.  Maybe make linearizeblock/copyblock take neg-padding. */
183                 if (bp->extra_len) {
184                         bp = linearizeblock(bp);
185                         if (bp->lim - bp->wp >= size)
186                                 return bp;
187                 }
188                 n = BLEN(bp);
189                 padblockcnt++;
190                 nbp = block_alloc(size + n, MEM_WAIT);
191                 block_copy_metadata(nbp, bp);
192                 memmove(nbp->wp, bp->rp, n);
193                 nbp->wp += n;
194                 freeb(bp);
195         }
196         QDEBUG checkb(nbp, "padblock 1");
197         return nbp;
198 }
199
200 /*
201  *  return count of bytes in a string of blocks
202  */
203 int blocklen(struct block *bp)
204 {
205         int len;
206
207         len = 0;
208         while (bp) {
209                 len += BLEN(bp);
210                 bp = bp->next;
211         }
212         return len;
213 }
214
215 /*
216  * return count of space in blocks
217  */
218 int blockalloclen(struct block *bp)
219 {
220         int len;
221
222         len = 0;
223         while (bp) {
224                 len += BALLOC(bp);
225                 bp = bp->next;
226         }
227         return len;
228 }
229
230 /*
231  *  copy the  string of blocks into
232  *  a single block and free the string
233  */
234 struct block *concatblock(struct block *bp)
235 {
236         int len;
237         struct block *nb, *f;
238
239         if (bp->next == 0)
240                 return bp;
241
242         /* probably use parts of qclone */
243         PANIC_EXTRA(bp);
244         nb = block_alloc(blocklen(bp), MEM_WAIT);
245         for (f = bp; f; f = f->next) {
246                 len = BLEN(f);
247                 memmove(nb->wp, f->rp, len);
248                 nb->wp += len;
249         }
250         concatblockcnt += BLEN(nb);
251         freeblist(bp);
252         QDEBUG checkb(nb, "concatblock 1");
253         return nb;
254 }
255
256 /* Makes an identical copy of the block, collapsing all the data into the block
257  * body.  It does not point to the contents of the original, it is a copy
258  * (unlike qclone).  Since we're copying, we might as well put the memory into
259  * one contiguous chunk. */
260 struct block *copyblock(struct block *bp, int mem_flags)
261 {
262         struct block *newb;
263         struct extra_bdata *ebd;
264         size_t amt;
265
266         QDEBUG checkb(bp, "copyblock 0");
267         newb = block_alloc(BLEN(bp), mem_flags);
268         if (!newb)
269                 return 0;
270         amt = block_copy_to_body(newb, bp->rp, BHLEN(bp));
271         assert(amt == BHLEN(bp));
272         for (int i = 0; i < bp->nr_extra_bufs; i++) {
273                 ebd = &bp->extra_data[i];
274                 if (!ebd->base || !ebd->len)
275                         continue;
276                 amt = block_copy_to_body(newb, (void*)ebd->base + ebd->off,
277                                          ebd->len);
278                 assert(amt == ebd->len);
279         }
280         block_copy_metadata(newb, bp);
281         copyblockcnt++;
282         QDEBUG checkb(newb, "copyblock 1");
283         return newb;
284 }
285
286 /* Returns a block with the remaining contents of b all in the main body of the
287  * returned block.  Replace old references to b with the returned value (which
288  * may still be 'b', if no change was needed. */
289 struct block *linearizeblock(struct block *b)
290 {
291         struct block *newb;
292
293         if (!b->extra_len)
294                 return b;
295         newb = copyblock(b, MEM_WAIT);
296         freeb(b);
297         return newb;
298 }
299
300 /* Helper for bookkeeping: we removed amt bytes from block b's ebd, which may
301  * have emptied it. */
302 static void block_consume_ebd_bytes(struct block *b, struct extra_bdata *ebd,
303                                     size_t amt)
304 {
305         ebd->len -= amt;
306         ebd->off += amt;
307         b->extra_len -= amt;
308         if (!ebd->len) {
309                 /* we don't actually have to decref here.  it's also
310                  * done in freeb().  this is the earliest we can free. */
311                 kfree((void*)ebd->base);
312                 ebd->base = ebd->off = 0;
313         }
314 }
315
316 /* Helper: Yanks up to size bytes worth of extra data blocks from 'from', puts
317  * them in the main body of 'to'.  Returns the amount yanked.  Will panic if
318  * there is not enough room in 'to'. */
319 static size_t block_yank_ebd_bytes(struct block *to, struct block *from,
320                                    size_t amt)
321 {
322         size_t copy_amt, total = 0;
323         struct extra_bdata *ebd;
324
325         for (int i = 0; amt && i < from->nr_extra_bufs; i++) {
326                 ebd = &from->extra_data[i];
327                 if (!ebd->base || !ebd->len)
328                         continue;
329                 copy_amt = MIN(amt, ebd->len);
330                 copy_amt = block_copy_to_body(to, (void*)ebd->base + ebd->off,
331                                               copy_amt);
332                 if (copy_amt != MIN(amt, ebd->len))
333                         panic("'to' block didn't have enough space! %d %d %d",
334                               amt, ebd->len, copy_amt);
335                 block_consume_ebd_bytes(from, ebd, copy_amt);
336                 total += copy_amt;
337                 amt -= copy_amt;
338         }
339         return total;
340 }
341
342 /* Make sure the first block has at least n bytes in its headers/main body.
343  * Pulls up data from the *list* of blocks.  Returns 0 if there is not enough
344  * data in the block list.
345  *
346  * Any data to the *left* of rp, such as old network headers that were popped
347  * off when processing an inbound packet, may be discarded. */
348 struct block *pullupblock(struct block *bp, size_t n)
349 {
350         struct block *i;
351         struct extra_bdata *ebd;
352         size_t copy_amt;
353
354         /*
355          *  this should almost always be true, it's
356          *  just to avoid every caller checking.
357          */
358         if (BHLEN(bp) >= n)
359                 return bp;
360
361         /* If there's no chance, just bail out now.  This might be slightly
362          * wasteful if there's a long blist that does have enough data. */
363         if (n > blocklen(bp))
364                 return 0;
365
366         /* Replace bp with a new block with enough size, and yank up enough of
367          * its ebds. */
368         bp = block_realloc(bp, n);
369         pullupblockcnt++;
370         n -= BHLEN(bp);
371         n -= block_yank_ebd_bytes(bp, bp, n);
372         /* Need to pull from the remainder of the list, both the main bodies and
373          * the ebds. */
374         while (n) {
375                 i = bp->next;
376                 if (!i)
377                         panic("Not enough b's in the list; we checked the len");
378                 copy_amt = MIN(BHLEN(i), n);
379                 block_copy_to_body(bp, i->rp, copy_amt);
380                 /* That may or may not have consumed all of the main body */
381                 i->rp += copy_amt;
382                 /* Subsequent blocks with offsets shouldn't be subject to
383                  * pullup. */
384                 warn_on(i->tx_csum_offset || i->network_offset ||
385                         i->transport_offset);
386                 n -= copy_amt;
387                 n -= block_yank_ebd_bytes(bp, i, n);
388                 if (!BLEN(i)) {
389                         bp->next = i->next;
390                         i->next = NULL;
391                         freeb(i);
392                 } else {
393                         assert(!n);
394                 }
395         }
396         return bp;
397 }
398
399 /*
400  *  make sure the first block has at least n bytes in its main body
401  */
402 struct block *pullupqueue(struct queue *q, size_t n)
403 {
404         struct block *b;
405
406         /* TODO: lock to protect the queue links? */
407         if ((BHLEN(q->bfirst) >= n))
408                 return q->bfirst;
409         /* This is restoring qio metadata.  If pullupblock did any work, it
410          * changed the list - at least the first block and possibly the last in
411          * the blist. */
412         q->bfirst = pullupblock(q->bfirst, n);
413         for (b = q->bfirst; b != NULL && b->next != NULL; b = b->next) ;
414         q->blast = b;
415         return q->bfirst;
416 }
417
418 /* throw away count bytes from the front of
419  * block's extradata.  Returns count of bytes
420  * thrown away
421  */
422
423 static int pullext(struct block *bp, int count)
424 {
425         struct extra_bdata *ed;
426         int i, rem, bytes = 0;
427
428         for (i = 0; bp->extra_len && count && i < bp->nr_extra_bufs; i++) {
429                 ed = &bp->extra_data[i];
430                 rem = MIN(count, ed->len);
431                 bp->extra_len -= rem;
432                 count -= rem;
433                 bytes += rem;
434                 ed->off += rem;
435                 ed->len -= rem;
436                 if (ed->len == 0) {
437                         kfree((void *)ed->base);
438                         ed->base = 0;
439                         ed->off = 0;
440                 }
441         }
442         return bytes;
443 }
444
445 /* throw away count bytes from the end of a
446  * block's extradata.  Returns count of bytes
447  * thrown away
448  */
449
450 static int dropext(struct block *bp, int count)
451 {
452         struct extra_bdata *ed;
453         int i, rem, bytes = 0;
454
455         for (i = bp->nr_extra_bufs - 1; bp->extra_len && count && i >= 0; i--) {
456                 ed = &bp->extra_data[i];
457                 rem = MIN(count, ed->len);
458                 bp->extra_len -= rem;
459                 count -= rem;
460                 bytes += rem;
461                 ed->len -= rem;
462                 if (ed->len == 0) {
463                         kfree((void *)ed->base);
464                         ed->base = 0;
465                         ed->off = 0;
466                 }
467         }
468         return bytes;
469 }
470
471 /*
472  *  throw away up to count bytes from a
473  *  list of blocks.  Return count of bytes
474  *  thrown away.
475  */
476 static int _pullblock(struct block **bph, int count, int free)
477 {
478         struct block *bp;
479         int n, bytes;
480
481         bytes = 0;
482         if (bph == NULL)
483                 return 0;
484
485         while (*bph != NULL && count != 0) {
486                 bp = *bph;
487
488                 n = MIN(BHLEN(bp), count);
489                 bytes += n;
490                 count -= n;
491                 bp->rp += n;
492                 n = pullext(bp, count);
493                 bytes += n;
494                 count -= n;
495                 QDEBUG checkb(bp, "pullblock ");
496                 if (BLEN(bp) == 0 && (free || count)) {
497                         *bph = bp->next;
498                         bp->next = NULL;
499                         freeb(bp);
500                 }
501         }
502         return bytes;
503 }
504
505 int pullblock(struct block **bph, int count)
506 {
507         return _pullblock(bph, count, 1);
508 }
509
510 /*
511  *  trim to len bytes starting at offset
512  */
513 struct block *trimblock(struct block *bp, int offset, int len)
514 {
515         uint32_t l, trim;
516         int olen = len;
517
518         QDEBUG checkb(bp, "trimblock 1");
519         if (blocklen(bp) < offset + len) {
520                 freeblist(bp);
521                 return NULL;
522         }
523
524         l =_pullblock(&bp, offset, 0);
525         if (bp == NULL)
526                 return NULL;
527         if (l != offset) {
528                 freeblist(bp);
529                 return NULL;
530         }
531
532         while ((l = BLEN(bp)) < len) {
533                 len -= l;
534                 bp = bp->next;
535         }
536
537         trim = BLEN(bp) - len;
538         trim -= dropext(bp, trim);
539         bp->wp -= trim;
540
541         if (bp->next) {
542                 freeblist(bp->next);
543                 bp->next = NULL;
544         }
545         return bp;
546 }
547
548 /* Adjust block @bp so that its size is exactly @len.
549  * If the size is increased, fill in the new contents with zeros.
550  * If the size is decreased, discard some of the old contents at the tail. */
551 struct block *adjustblock(struct block *bp, int len)
552 {
553         struct extra_bdata *ebd;
554         void *buf;
555         int i;
556
557         if (len < 0) {
558                 freeb(bp);
559                 return NULL;
560         }
561
562         if (len == BLEN(bp))
563                 return bp;
564
565         /* Shrink within block main body. */
566         if (len <= BHLEN(bp)) {
567                 free_block_extra(bp);
568                 bp->wp = bp->rp + len;
569                 QDEBUG checkb(bp, "adjustblock 1");
570                 return bp;
571         }
572
573         /* Need to grow. */
574         if (len > BLEN(bp)) {
575                 /* Grow within block main body. */
576                 if (bp->extra_len == 0 && bp->rp + len <= bp->lim) {
577                         memset(bp->wp, 0, len - BLEN(bp));
578                         bp->wp = bp->rp + len;
579                         QDEBUG checkb(bp, "adjustblock 2");
580                         return bp;
581                 }
582                 /* Grow with extra data buffers. */
583                 buf = kzmalloc(len - BLEN(bp), MEM_WAIT);
584                 block_append_extra(bp, (uintptr_t)buf, 0, len - BLEN(bp),
585                                    MEM_WAIT);
586                 QDEBUG checkb(bp, "adjustblock 3");
587                 return bp;
588         }
589
590         /* Shrink extra data buffers.
591          * len is how much of ebd we need to keep.
592          * extra_len is re-accumulated. */
593         assert(bp->extra_len > 0);
594         len -= BHLEN(bp);
595         bp->extra_len = 0;
596         for (i = 0; i < bp->nr_extra_bufs; i++) {
597                 ebd = &bp->extra_data[i];
598                 if (len <= ebd->len)
599                         break;
600                 len -= ebd->len;
601                 bp->extra_len += ebd->len;
602         }
603         /* If len becomes zero, extra_data[i] should be freed. */
604         if (len > 0) {
605                 ebd = &bp->extra_data[i];
606                 ebd->len = len;
607                 bp->extra_len += ebd->len;
608                 i++;
609         }
610         for (; i < bp->nr_extra_bufs; i++) {
611                 ebd = &bp->extra_data[i];
612                 if (ebd->base)
613                         kfree((void*)ebd->base);
614                 ebd->base = ebd->off = ebd->len = 0;
615         }
616         QDEBUG checkb(bp, "adjustblock 4");
617         return bp;
618 }
619
620 /* Helper: removes and returns the first block from q */
621 static struct block *pop_first_block(struct queue *q)
622 {
623         struct block *b = q->bfirst;
624
625         q->dlen -= BLEN(b);
626         q->bytes_read += BLEN(b);
627         q->bfirst = b->next;
628         b->next = 0;
629         return b;
630 }
631
632 /* Accounting helper.  Block b in q lost amt extra_data */
633 static void block_and_q_lost_extra(struct block *b, struct queue *q, size_t amt)
634 {
635         b->extra_len -= amt;
636         q->dlen -= amt;
637         q->bytes_read += amt;
638 }
639
640 /* Helper: moves ebd from a block (in from_q) to another block.  The *ebd is
641  * fixed in 'from', so we move its contents and zero it out in 'from'.
642  *
643  * Returns the length moved (0 on failure). */
644 static size_t move_ebd(struct extra_bdata *ebd, struct block *to,
645                        struct block *from, struct queue *from_q)
646 {
647         size_t ret = ebd->len;
648
649         if (block_append_extra(to, ebd->base, ebd->off, ebd->len, MEM_ATOMIC))
650                 return 0;
651         block_and_q_lost_extra(from, from_q, ebd->len);
652         ebd->base = ebd->len = ebd->off = 0;
653         return ret;
654 }
655
656 /* Copy up to len bytes from q->bfirst to @to, leaving the block in place.  May
657  * return with less than len, but greater than 0, even if there is more
658  * available in q.
659  *
660  * At any moment that we have copied anything and things are tricky, we can just
661  * return.  The trickiness comes from a bunch of variables: is the main body
662  * empty?  How do we split the ebd?  If our alloc fails, then we can fall back
663  * to @to's main body, but only if we haven't used it yet. */
664 static size_t copy_from_first_block(struct queue *q, struct block *to,
665                                     size_t len)
666 {
667         struct block *from = q->bfirst;
668         size_t copy_amt, amt;
669         struct extra_bdata *ebd;
670
671         assert(len < BLEN(from));       /* sanity */
672         /* Try to extract from the main body */
673         copy_amt = MIN(BHLEN(from), len);
674         if (copy_amt) {
675                 copy_amt = block_copy_to_body(to, from->rp, copy_amt);
676                 from->rp += copy_amt;
677                 /* We only change dlen, (data len), not q->len, since the q
678                  * still has the same block memory allocation (no kfrees
679                  * happened) */
680                 q->dlen -= copy_amt;
681                 q->bytes_read += copy_amt;
682         }
683         /* Try to extract the remainder from the extra data */
684         len -= copy_amt;
685         for (int i = 0; (i < from->nr_extra_bufs) && len; i++) {
686                 ebd = &from->extra_data[i];
687                 if (!ebd->base || !ebd->len)
688                         continue;
689                 if (len >= ebd->len) {
690                         amt = move_ebd(ebd, to, from, q);
691                         if (!amt) {
692                                 /* our internal alloc could have failed.   this
693                                  * ebd is now the last one we'll consider.
694                                  * let's handle it separately and put it in the
695                                  * main body. */
696                                 if (copy_amt)
697                                         return copy_amt;
698                                 copy_amt = block_copy_to_body(to,
699                                                               (void*)ebd->base +
700                                                               ebd->off,
701                                                               ebd->len);
702                                 block_and_q_lost_extra(from, q, copy_amt);
703                                 break;
704                         }
705                         len -= amt;
706                         copy_amt += amt;
707                         continue;
708                 } else {
709                         /* If we're here, we reached our final ebd, which we'll
710                          * need to split to get anything from it. */
711                         if (copy_amt)
712                                 return copy_amt;
713                         copy_amt = block_copy_to_body(to, (void*)ebd->base +
714                                                       ebd->off, len);
715                         ebd->off += copy_amt;
716                         ebd->len -= copy_amt;
717                         block_and_q_lost_extra(from, q, copy_amt);
718                         break;
719                 }
720         }
721         if (len)
722                 assert(copy_amt);       /* sanity */
723         return copy_amt;
724 }
725
726 /* Return codes for __qbread and __try_qbread. */
727 enum {
728         QBR_OK,
729         QBR_FAIL,
730         QBR_SPARE,      /* we need a spare block */
731         QBR_AGAIN,      /* do it again, we are coalescing blocks */
732 };
733
734 /* Helper and back-end for __qbread: extracts and returns a list of blocks
735  * containing up to len bytes.  It may contain less than len even if q has more
736  * data.
737  *
738  * Returns a code interpreted by __qbread, and the returned blist in ret. */
739 static int __try_qbread(struct queue *q, size_t len, int qio_flags,
740                         struct block **real_ret, struct block *spare)
741 {
742         struct block *ret, *ret_last, *first;
743         size_t blen;
744         bool was_unwritable = FALSE;
745
746         if (qio_flags & QIO_CAN_ERR_SLEEP) {
747                 if (!qwait_and_ilock(q, qio_flags)) {
748                         spin_unlock_irqsave(&q->lock);
749                         return QBR_FAIL;
750                 }
751                 /* we qwaited and still hold the lock, so the q is not empty */
752                 first = q->bfirst;
753         } else {
754                 spin_lock_irqsave(&q->lock);
755                 first = q->bfirst;
756                 if (!first) {
757                         spin_unlock_irqsave(&q->lock);
758                         return QBR_FAIL;
759                 }
760         }
761         /* We need to check before adjusting q->len.  We're checking the
762          * writer's sleep condition / tap condition.  When set, we *might* be
763          * making an edge transition (from unwritable to writable), which needs
764          * to wake and fire taps.  But, our read might not drain the queue below
765          * q->lim.  We'll check again later to see if we should really wake
766          * them.  */
767         was_unwritable = !qwritable(q);
768         blen = BLEN(first);
769         if ((q->state & Qcoalesce) && (blen == 0)) {
770                 freeb(pop_first_block(q));
771                 spin_unlock_irqsave(&q->lock);
772                 /* Need to retry to make sure we have a first block */
773                 return QBR_AGAIN;
774         }
775         /* Qmsg: just return the first block.  Be careful, since our caller
776          * might not read all of the block and thus drop bytes.  Similar to
777          * SOCK_DGRAM. */
778         if (q->state & Qmsg) {
779                 ret = pop_first_block(q);
780                 goto out_ok;
781         }
782         /* Let's get at least something first - makes the code easier.  This
783          * way, we'll only ever split the block once. */
784         if (blen <= len) {
785                 ret = pop_first_block(q);
786                 len -= blen;
787         } else {
788                 /* need to split the block.  we won't actually take the first
789                  * block out of the queue - we're just extracting a little bit.
790                  */
791                 if (!spare) {
792                         /* We have nothing and need a spare block.  Retry! */
793                         spin_unlock_irqsave(&q->lock);
794                         return QBR_SPARE;
795                 }
796                 copy_from_first_block(q, spare, len);
797                 ret = spare;
798                 goto out_ok;
799         }
800         /* At this point, we just grabbed the first block.  We can try to grab
801          * some more, up to len (if they want). */
802         if (qio_flags & QIO_JUST_ONE_BLOCK)
803                 goto out_ok;
804         ret_last = ret;
805         while (q->bfirst && (len > 0)) {
806                 blen = BLEN(q->bfirst);
807                 if ((q->state & Qcoalesce) && (blen == 0)) {
808                         /* remove the intermediate 0 blocks */
809                         freeb(pop_first_block(q));
810                         continue;
811                 }
812                 if (blen > len) {
813                         /* We could try to split the block, but that's a huge
814                          * pain.  For instance, we might need to move the main
815                          * body of b into an extra_data of ret_last.  lots of
816                          * ways for that to fail, and lots of cases to consider.
817                          * Easier to just bail out.  This is why I did the first
818                          * block above: we don't need to worry about this. */
819                          break;
820                 }
821                 ret_last->next = pop_first_block(q);
822                 ret_last = ret_last->next;
823                 len -= blen;
824         }
825 out_ok:
826         /* Don't wake them up or fire tap if we didn't drain enough. */
827         if (!qwritable(q))
828                 was_unwritable = FALSE;
829         spin_unlock_irqsave(&q->lock);
830         if (was_unwritable) {
831                 if (q->kick && !(qio_flags & QIO_DONT_KICK))
832                         q->kick(q->arg);
833                 rendez_wakeup(&q->wr);
834                 qwake_cb(q, FDTAP_FILT_WRITABLE);
835         }
836         *real_ret = ret;
837         return QBR_OK;
838 }
839
840 /* Helper and front-end for __try_qbread: extracts and returns a list of blocks
841  * containing up to len bytes.  It may contain less than len even if q has more
842  * data.
843  *
844  * Returns 0 if the q is closed, if it would require blocking and !CAN_BLOCK, or
845  * if it required a spare and the memory allocation failed.
846  *
847  * Technically, there's a weird corner case with !Qcoalesce and Qmsg where you
848  * could get a zero length block back. */
849 static struct block *__qbread(struct queue *q, size_t len, int qio_flags,
850                               int mem_flags)
851 {
852         ERRSTACK(1);
853         struct block *ret = 0;
854         struct block *volatile spare = 0;       /* volatile for the waserror */
855
856         /* __try_qbread can throw, based on qio flags. */
857         if ((qio_flags & QIO_CAN_ERR_SLEEP) && waserror()) {
858                 if (spare)
859                         freeb(spare);
860                 nexterror();
861         }
862         while (1) {
863                 switch (__try_qbread(q, len, qio_flags, &ret, spare)) {
864                 case QBR_OK:
865                 case QBR_FAIL:
866                         if (spare && (ret != spare))
867                                 freeb(spare);
868                         goto out_ret;
869                 case QBR_SPARE:
870                         assert(!spare);
871                         /* Due to some nastiness, we need a fresh block so we
872                          * can read out anything from the queue.  'len' seems
873                          * like a reasonable amount.  Maybe we can get away with
874                          * less. */
875                         spare = block_alloc(len, mem_flags);
876                         if (!spare) {
877                                 /* Careful here: a memory failure (possible with
878                                  * MEM_ATOMIC) could look like 'no data in the
879                                  * queue' (QBR_FAIL).  The only one who does is
880                                  * this qget(), who happens to know that we
881                                  * won't need a spare, due to the len argument.
882                                  * Spares are only needed when we need to split
883                                  * a block. */
884                                 ret = 0;
885                                 goto out_ret;
886                         }
887                         break;
888                 case QBR_AGAIN:
889                         /* if the first block is 0 and we are Qcoalesce, then
890                          * we'll need to try again.  We bounce out of __try so
891                          * we can perform the "is there a block" logic again
892                          * from the top. */
893                         break;
894                 }
895         }
896         assert(0);
897 out_ret:
898         if (qio_flags & QIO_CAN_ERR_SLEEP)
899                 poperror();
900         return ret;
901 }
902
903 /*
904  *  get next block from a queue, return null if nothing there
905  */
906 struct block *qget(struct queue *q)
907 {
908         /* since len == SIZE_MAX, we should never need to do a mem alloc */
909         return __qbread(q, SIZE_MAX, QIO_JUST_ONE_BLOCK, MEM_ATOMIC);
910 }
911
912 /* Throw away the next 'len' bytes in the queue returning the number actually
913  * discarded.
914  *
915  * If the bytes are in the queue, then they must be discarded.  The only time to
916  * return less than len is if the q itself has less than len bytes.
917  *
918  * This won't trigger a kick when waking up any sleepers.  This seems to be Plan
919  * 9's intent, since the TCP stack will deadlock if qdiscard kicks. */
920 size_t qdiscard(struct queue *q, size_t len)
921 {
922         struct block *blist;
923         size_t removed_amt;
924         size_t sofar = 0;
925
926         /* This is racy.  There could be multiple qdiscarders or other
927          * consumers, where the consumption could be interleaved. */
928         while (qlen(q) && len) {
929                 blist = __qbread(q, len, QIO_DONT_KICK, MEM_WAIT);
930                 removed_amt = freeblist(blist);
931                 sofar += removed_amt;
932                 len -= removed_amt;
933         }
934         return sofar;
935 }
936
937 ssize_t qpass(struct queue *q, struct block *b)
938 {
939         return __qbwrite(q, b, QIO_LIMIT | QIO_DROP_OVERFLOW);
940 }
941
942 ssize_t qpassnolim(struct queue *q, struct block *b)
943 {
944         return __qbwrite(q, b, 0);
945 }
946
947 /*
948  *  if the allocated space is way out of line with the used
949  *  space, reallocate to a smaller block
950  */
951 struct block *packblock(struct block *bp)
952 {
953         struct block **l, *nbp;
954         int n;
955
956         if (bp->extra_len)
957                 return bp;
958         for (l = &bp; *l; l = &(*l)->next) {
959                 nbp = *l;
960                 n = BLEN(nbp);
961                 if ((n << 2) < BALLOC(nbp)) {
962                         *l = block_alloc(n, MEM_WAIT);
963                         memmove((*l)->wp, nbp->rp, n);
964                         (*l)->wp += n;
965                         (*l)->next = nbp->next;
966                         nbp->next = NULL;
967                         freeb(nbp);
968                 }
969         }
970
971         return bp;
972 }
973
974 /* Add an extra_data entry to newb at newb_idx pointing to b's body, starting at
975  * body_rp, for up to len.  Returns the len consumed.
976  *
977  * The base is 'b', so that we can kfree it later.  This currently ties us to
978  * using kfree for the release method for all extra_data.
979  *
980  * It is possible to have a body size that is 0, if there is no offset, and
981  * b->wp == b->rp.  This will have an extra data entry of 0 length. */
982 static size_t point_to_body(struct block *b, uint8_t *body_rp,
983                             struct block *newb, unsigned int newb_idx,
984                             size_t len)
985 {
986         struct extra_bdata *ebd = &newb->extra_data[newb_idx];
987
988         assert(newb_idx < newb->nr_extra_bufs);
989
990         kmalloc_incref(b);
991         ebd->base = (uintptr_t)b;
992         ebd->off = (uint32_t)(body_rp - (uint8_t*)b);
993         ebd->len = MIN(b->wp - body_rp, len);   /* think of body_rp as b->rp */
994         assert((int)ebd->len >= 0);
995         newb->extra_len += ebd->len;
996         return ebd->len;
997 }
998
999 /* Add an extra_data entry to newb at newb_idx pointing to b's b_idx'th
1000  * extra_data buf, at b_off within that buffer, for up to len.  Returns the len
1001  * consumed.
1002  *
1003  * We can have blocks with 0 length, but they are still refcnt'd.  See above. */
1004 static size_t point_to_buf(struct block *b, unsigned int b_idx, uint32_t b_off,
1005                            struct block *newb, unsigned int newb_idx,
1006                            size_t len)
1007 {
1008         struct extra_bdata *n_ebd = &newb->extra_data[newb_idx];
1009         struct extra_bdata *b_ebd = &b->extra_data[b_idx];
1010
1011         assert(b_idx < b->nr_extra_bufs);
1012         assert(newb_idx < newb->nr_extra_bufs);
1013
1014         kmalloc_incref((void*)b_ebd->base);
1015         n_ebd->base = b_ebd->base;
1016         n_ebd->off = b_ebd->off + b_off;
1017         n_ebd->len = MIN(b_ebd->len - b_off, len);
1018         newb->extra_len += n_ebd->len;
1019         return n_ebd->len;
1020 }
1021
1022 /* given a string of blocks, sets up the new block's extra_data such that it
1023  * *points* to the contents of the blist [offset, len + offset).  This does not
1024  * make a separate copy of the contents of the blist.
1025  *
1026  * returns 0 on success.  the only failure is if the extra_data array was too
1027  * small, so this returns a positive integer saying how big the extra_data needs
1028  * to be.
1029  *
1030  * callers are responsible for protecting the list structure. */
1031 static int __blist_clone_to(struct block *blist, struct block *newb, int len,
1032                             uint32_t offset)
1033 {
1034         struct block *b, *first;
1035         unsigned int nr_bufs = 0;
1036         unsigned int b_idx, newb_idx = 0;
1037         uint8_t *first_main_body = 0;
1038         ssize_t sofar = 0;
1039
1040         /* find the first block; keep offset relative to the latest b in the
1041          * list */
1042         for (b = blist; b; b = b->next) {
1043                 if (BLEN(b) > offset)
1044                         break;
1045                 offset -= BLEN(b);
1046         }
1047         /* qcopy semantics: if you asked for an offset outside the block list,
1048          * you get an empty block back */
1049         if (!b)
1050                 return 0;
1051         first = b;
1052         sofar -= offset; /* don't count the remaining offset in the first b */
1053         /* upper bound for how many buffers we'll need in newb */
1054         for (/* b is set*/; b; b = b->next) {
1055                 nr_bufs += BHLEN(b) ? 1 : 0;
1056                 /* still assuming nr_extra == nr_valid */
1057                 nr_bufs += b->nr_extra_bufs;
1058                 sofar += BLEN(b);
1059                 if (sofar > len)
1060                         break;
1061         }
1062         /* we might be holding a spinlock here, so we won't wait for kmalloc */
1063         if (block_add_extd(newb, nr_bufs, 0) != 0) {
1064                 /* caller will need to alloc these, then re-call us */
1065                 return nr_bufs;
1066         }
1067         for (b = first; b && len; b = b->next) {
1068                 b_idx = 0;
1069                 if (offset) {
1070                         if (offset < BHLEN(b)) {
1071                                 /* off is in the main body */
1072                                 len -= point_to_body(b, b->rp + offset, newb,
1073                                                      newb_idx, len);
1074                                 newb_idx++;
1075                         } else {
1076                                 /* off is in one of the buffers (or just past
1077                                  * the last one).  we're not going to point to
1078                                  * b's main body at all. */
1079                                 offset -= BHLEN(b);
1080                                 assert(b->extra_data);
1081                                 /* assuming these extrabufs are packed, or at
1082                                  * least that len isn't gibberish */
1083                                 while (b->extra_data[b_idx].len <= offset) {
1084                                         offset -= b->extra_data[b_idx].len;
1085                                         b_idx++;
1086                                 }
1087                                 /* now offset is set to our offset in the
1088                                  * b_idx'th buf */
1089                                 len -= point_to_buf(b, b_idx, offset, newb,
1090                                                     newb_idx, len);
1091                                 newb_idx++;
1092                                 b_idx++;
1093                         }
1094                         offset = 0;
1095                 } else {
1096                         if (BHLEN(b)) {
1097                                 len -= point_to_body(b, b->rp, newb, newb_idx,
1098                                                      len);
1099                                 newb_idx++;
1100                         }
1101                 }
1102                 /* knock out all remaining bufs.  we only did one point_to_ op
1103                  * by now, and any point_to_ could be our last if it consumed
1104                  * all of len. */
1105                 for (int i = b_idx; (i < b->nr_extra_bufs) && len; i++) {
1106                         len -= point_to_buf(b, i, 0, newb, newb_idx, len);
1107                         newb_idx++;
1108                 }
1109         }
1110         return 0;
1111 }
1112
1113 struct block *blist_clone(struct block *blist, int header_len, int len,
1114                           uint32_t offset)
1115 {
1116         int ret;
1117         struct block *newb = block_alloc(header_len, MEM_WAIT);
1118
1119         do {
1120                 ret = __blist_clone_to(blist, newb, len, offset);
1121                 if (ret)
1122                         block_add_extd(newb, ret, MEM_WAIT);
1123         } while (ret);
1124         return newb;
1125 }
1126
1127 /* given a queue, makes a single block with header_len reserved space in the
1128  * block main body, and the contents of [offset, len + offset) pointed to in the
1129  * new blocks ext_data.  This does not make a copy of the q's contents, though
1130  * you do have a ref count on the memory. */
1131 struct block *qclone(struct queue *q, int header_len, int len, uint32_t offset)
1132 {
1133         int ret;
1134         struct block *newb = block_alloc(header_len, MEM_WAIT);
1135         /* the while loop should rarely be used: it would require someone
1136          * concurrently adding to the queue. */
1137         do {
1138                 /* TODO: RCU protect the q list (b->next) (need read lock) */
1139                 spin_lock_irqsave(&q->lock);
1140                 ret = __blist_clone_to(q->bfirst, newb, len, offset);
1141                 spin_unlock_irqsave(&q->lock);
1142                 if (ret)
1143                         block_add_extd(newb, ret, MEM_WAIT);
1144         } while (ret);
1145         return newb;
1146 }
1147
1148 struct block *qcopy(struct queue *q, int len, uint32_t offset)
1149 {
1150         return qclone(q, 0, len, offset);
1151 }
1152
1153 static void qinit_common(struct queue *q)
1154 {
1155         spinlock_init_irqsave(&q->lock);
1156         rendez_init(&q->rr);
1157         rendez_init(&q->wr);
1158 }
1159
1160 /*
1161  *  called by non-interrupt code
1162  */
1163 struct queue *qopen(int limit, int msg, void (*kick) (void *), void *arg)
1164 {
1165         struct queue *q;
1166
1167         q = kzmalloc(sizeof(struct queue), 0);
1168         if (q == 0)
1169                 return 0;
1170         qinit_common(q);
1171
1172         q->limit = q->inilim = limit;
1173         q->kick = kick;
1174         q->arg = arg;
1175         q->state = msg;
1176         q->eof = 0;
1177
1178         return q;
1179 }
1180
1181 /* open a queue to be bypassed */
1182 struct queue *qbypass(void (*bypass) (void *, struct block *), void *arg)
1183 {
1184         struct queue *q;
1185
1186         q = kzmalloc(sizeof(struct queue), 0);
1187         if (q == 0)
1188                 return 0;
1189         qinit_common(q);
1190
1191         q->limit = 0;
1192         q->arg = arg;
1193         q->bypass = bypass;
1194         q->state = 0;
1195
1196         return q;
1197 }
1198
1199 static int notempty(void *a)
1200 {
1201         struct queue *q = a;
1202
1203         return (q->state & Qclosed) || q->bfirst != 0;
1204 }
1205
1206 /* Block, waiting for the queue to be non-empty or closed.  Returns with
1207  * the spinlock held.  Returns TRUE when there queue is not empty, FALSE if it
1208  * was naturally closed.  Throws an error o/w. */
1209 static bool qwait_and_ilock(struct queue *q, int qio_flags)
1210 {
1211         while (1) {
1212                 spin_lock_irqsave(&q->lock);
1213                 if (q->bfirst != NULL)
1214                         return TRUE;
1215                 if (q->state & Qclosed) {
1216                         if (++q->eof > 3) {
1217                                 spin_unlock_irqsave(&q->lock);
1218                                 error(EPIPE,
1219                                       "multiple reads on a closed queue");
1220                         }
1221                         if (q->err[0]) {
1222                                 spin_unlock_irqsave(&q->lock);
1223                                 error(EPIPE, q->err);
1224                         }
1225                         return FALSE;
1226                 }
1227                 if (qio_flags & QIO_NON_BLOCK) {
1228                         spin_unlock_irqsave(&q->lock);
1229                         error(EAGAIN, "queue empty");
1230                 }
1231                 spin_unlock_irqsave(&q->lock);
1232                 /* As with the producer side, we check for a condition while
1233                  * holding the q->lock, decide to sleep, then unlock.  It's like
1234                  * the "check, signal, check again" pattern, but we do it
1235                  * conditionally.  Both sides agree synchronously to do it, and
1236                  * those decisions are made while holding q->lock.  I think this
1237                  * is OK.
1238                  *
1239                  * The invariant is that no reader sleeps when the queue has
1240                  * data.  While holding the rendez lock, if we see there's no
1241                  * data, we'll sleep.  Since we saw there was no data, the next
1242                  * writer will see (or already saw) no data, and then the writer
1243                  * decides to rendez_wake, which will grab the rendez lock.  If
1244                  * the writer already did that, then we'll see notempty when we
1245                  * do our check-again. */
1246                 rendez_sleep(&q->rr, notempty, q);
1247         }
1248 }
1249
1250 /*
1251  * add a block list to a queue
1252  * XXX basically the same as enqueue blist, and has no locking!
1253  */
1254 void qaddlist(struct queue *q, struct block *b)
1255 {
1256         /* TODO: q lock? */
1257         /* queue the block */
1258         if (q->bfirst)
1259                 q->blast->next = b;
1260         else
1261                 q->bfirst = b;
1262         q->dlen += blocklen(b);
1263         while (b->next)
1264                 b = b->next;
1265         q->blast = b;
1266 }
1267
1268 static size_t read_from_block(struct block *b, uint8_t *to, size_t amt)
1269 {
1270         size_t copy_amt, retval = 0;
1271         struct extra_bdata *ebd;
1272
1273         copy_amt = MIN(BHLEN(b), amt);
1274         memcpy(to, b->rp, copy_amt);
1275         /* advance the rp, since this block might not be completely consumed and
1276          * future reads need to know where to pick up from */
1277         b->rp += copy_amt;
1278         to += copy_amt;
1279         amt -= copy_amt;
1280         retval += copy_amt;
1281         for (int i = 0; (i < b->nr_extra_bufs) && amt; i++) {
1282                 ebd = &b->extra_data[i];
1283                 /* skip empty entires.  if we track this in the struct block, we
1284                  * can just start the for loop early */
1285                 if (!ebd->base || !ebd->len)
1286                         continue;
1287                 copy_amt = MIN(ebd->len, amt);
1288                 memcpy(to, (void*)(ebd->base + ebd->off), copy_amt);
1289                 /* we're actually consuming the entries, just like how we
1290                  * advance rp up above, and might only consume part of one. */
1291                 block_consume_ebd_bytes(b, ebd, copy_amt);
1292                 to += copy_amt;
1293                 amt -= copy_amt;
1294                 retval += copy_amt;
1295         }
1296         return retval;
1297 }
1298
1299 /*
1300  *  copy the contents of a string of blocks into
1301  *  memory.  emptied blocks are freed.  return
1302  *  pointer to first unconsumed block.
1303  */
1304 struct block *bl2mem(uint8_t * p, struct block *b, int n)
1305 {
1306         int i;
1307         struct block *next;
1308
1309         /* could be slicker here, since read_from_block is smart */
1310         for (; b != NULL; b = next) {
1311                 i = BLEN(b);
1312                 if (i > n) {
1313                         /* partial block, consume some */
1314                         read_from_block(b, p, n);
1315                         return b;
1316                 }
1317                 /* full block, consume all and move on */
1318                 i = read_from_block(b, p, i);
1319                 n -= i;
1320                 p += i;
1321                 next = b->next;
1322                 b->next = NULL;
1323                 freeb(b);
1324         }
1325         return NULL;
1326 }
1327
1328 /* Extract the contents of all blocks and copy to va, up to len.  Returns the
1329  * actual amount copied. */
1330 static size_t read_all_blocks(struct block *b, void *va, size_t len)
1331 {
1332         size_t sofar = 0;
1333         struct block *next;
1334
1335         do {
1336                 assert(va);
1337                 assert(b->rp);
1338                 sofar += read_from_block(b, va + sofar, len - sofar);
1339                 if (BLEN(b) && b->next)
1340                         panic("Failed to drain entire block (Qmsg?) but had a next!");
1341                 next = b->next;
1342                 b->next = NULL;
1343                 freeb(b);
1344                 b = next;
1345         } while (b);
1346         return sofar;
1347 }
1348
1349 /*
1350  *  copy the contents of memory into a string of blocks.
1351  *  return NULL on error.
1352  */
1353 struct block *mem2bl(uint8_t * p, int len)
1354 {
1355         ERRSTACK(1);
1356         int n;
1357         struct block *b, *first, **l;
1358
1359         first = NULL;
1360         l = &first;
1361         if (waserror()) {
1362                 freeblist(first);
1363                 nexterror();
1364         }
1365         do {
1366                 n = len;
1367                 if (n > Maxatomic)
1368                         n = Maxatomic;
1369
1370                 *l = b = block_alloc(n, MEM_WAIT);
1371                 /* TODO consider extra_data */
1372                 memmove(b->wp, p, n);
1373                 b->wp += n;
1374                 p += n;
1375                 len -= n;
1376                 l = &b->next;
1377         } while (len > 0);
1378         poperror();
1379
1380         return first;
1381 }
1382
1383 /*
1384  *  put a block back to the front of the queue
1385  *  called with q ilocked
1386  */
1387 void qputback(struct queue *q, struct block *b)
1388 {
1389         b->next = q->bfirst;
1390         if (q->bfirst == NULL)
1391                 q->blast = b;
1392         q->bfirst = b;
1393         q->dlen += BLEN(b);
1394         /* qputback seems to undo a read, so we can undo the accounting too. */
1395         q->bytes_read -= BLEN(b);
1396 }
1397
1398 /*
1399  *  get next block from a queue (up to a limit)
1400  *
1401  */
1402 struct block *qbread(struct queue *q, size_t len)
1403 {
1404         return __qbread(q, len, QIO_JUST_ONE_BLOCK | QIO_CAN_ERR_SLEEP,
1405                         MEM_WAIT);
1406 }
1407
1408 struct block *qbread_nonblock(struct queue *q, size_t len)
1409 {
1410         return __qbread(q, len, QIO_JUST_ONE_BLOCK | QIO_CAN_ERR_SLEEP |
1411                         QIO_NON_BLOCK, MEM_WAIT);
1412 }
1413
1414 /* read up to len from a queue into vp. */
1415 size_t qread(struct queue *q, void *va, size_t len)
1416 {
1417         struct block *blist = __qbread(q, len, QIO_CAN_ERR_SLEEP, MEM_WAIT);
1418
1419         if (!blist)
1420                 return 0;
1421         return read_all_blocks(blist, va, len);
1422 }
1423
1424 size_t qread_nonblock(struct queue *q, void *va, size_t len)
1425 {
1426         struct block *blist = __qbread(q, len, QIO_CAN_ERR_SLEEP |
1427                                        QIO_NON_BLOCK, MEM_WAIT);
1428
1429         if (!blist)
1430                 return 0;
1431         return read_all_blocks(blist, va, len);
1432 }
1433
1434 /* This is the rendez wake condition for writers. */
1435 static int qwriter_should_wake(void *a)
1436 {
1437         struct queue *q = a;
1438
1439         return qwritable(q) || (q->state & Qclosed);
1440 }
1441
1442 /* Helper: enqueues a list of blocks to a queue.  Returns the total length. */
1443 static size_t enqueue_blist(struct queue *q, struct block *b)
1444 {
1445         size_t dlen;
1446
1447         if (q->bfirst)
1448                 q->blast->next = b;
1449         else
1450                 q->bfirst = b;
1451         dlen = BLEN(b);
1452         while (b->next) {
1453                 b = b->next;
1454                 dlen += BLEN(b);
1455         }
1456         q->blast = b;
1457         q->dlen += dlen;
1458         return dlen;
1459 }
1460
1461 /* Adds block (which can be a list of blocks) to the queue, subject to
1462  * qio_flags.  Returns the length written on success or -1 on non-throwable
1463  * error.  Adjust qio_flags to control the value-added features!. */
1464 static ssize_t __qbwrite(struct queue *q, struct block *b, int qio_flags)
1465 {
1466         ssize_t ret;
1467         bool was_unreadable;
1468
1469         if (q->bypass) {
1470                 ret = blocklen(b);
1471                 (*q->bypass) (q->arg, b);
1472                 return ret;
1473         }
1474         spin_lock_irqsave(&q->lock);
1475         was_unreadable = q->dlen == 0;
1476         if (q->state & Qclosed) {
1477                 spin_unlock_irqsave(&q->lock);
1478                 freeblist(b);
1479                 if (!(qio_flags & QIO_CAN_ERR_SLEEP))
1480                         return -1;
1481                 if (q->err[0])
1482                         error(EPIPE, q->err);
1483                 else
1484                         error(EPIPE, "connection closed");
1485         }
1486         if ((qio_flags & QIO_LIMIT) && (q->dlen >= q->limit)) {
1487                 /* drop overflow takes priority over regular non-blocking */
1488                 if ((qio_flags & QIO_DROP_OVERFLOW)
1489                     || (q->state & Qdropoverflow)) {
1490                         spin_unlock_irqsave(&q->lock);
1491                         freeb(b);
1492                         return -1;
1493                 }
1494                 /* People shouldn't set NON_BLOCK without CAN_ERR, but we can be
1495                  * nice and catch it. */
1496                 if ((qio_flags & QIO_CAN_ERR_SLEEP)
1497                     && (qio_flags & QIO_NON_BLOCK)) {
1498                         spin_unlock_irqsave(&q->lock);
1499                         freeb(b);
1500                         error(EAGAIN, "queue full");
1501                 }
1502         }
1503         ret = enqueue_blist(q, b);
1504         QDEBUG checkb(b, "__qbwrite");
1505         spin_unlock_irqsave(&q->lock);
1506         /* TODO: not sure if the usage of a kick is mutually exclusive with a
1507          * wakeup, meaning that actual users either want a kick or have
1508          * qreaders. */
1509         if (q->kick && (was_unreadable || (q->state & Qkick)))
1510                 q->kick(q->arg);
1511         if (was_unreadable) {
1512                 /* Unlike the read side, there's no double-check to make sure
1513                  * the queue transitioned across an edge.  We know we added
1514                  * something, so that's enough.  We wake if the queue was empty.
1515                  * Both sides are the same, in that the condition for which we
1516                  * do the rendez_wakeup() is the same as the condition done for
1517                  * the rendez_sleep(). */
1518                 rendez_wakeup(&q->rr);
1519                 qwake_cb(q, FDTAP_FILT_READABLE);
1520         }
1521         /*
1522          *  flow control, wait for queue to get below the limit
1523          *  before allowing the process to continue and queue
1524          *  more.  We do this here so that postnote can only
1525          *  interrupt us after the data has been queued.  This
1526          *  means that things like 9p flushes and ssl messages
1527          *  will not be disrupted by software interrupts.
1528          *
1529          *  Note - this is moderately dangerous since a process
1530          *  that keeps getting interrupted and rewriting will
1531          *  queue infinite crud.
1532          */
1533         if ((qio_flags & QIO_CAN_ERR_SLEEP) &&
1534             !(q->state & Qdropoverflow) && !(qio_flags & QIO_NON_BLOCK)) {
1535                 /* This is a racy peek at the q status.  If we accidentally
1536                  * block, our rendez will return.  The rendez's peak
1537                  * (qwriter_should_wake) is also racy w.r.t.  the q's spinlock
1538                  * (that lock protects writes, but not reads).
1539                  *
1540                  * Here's the deal: when holding the rendez lock, if we see the
1541                  * sleep condition, the consumer will wake us.  The condition
1542                  * will only ever be changed by the next qbread() (consumer,
1543                  * changes q->dlen).  That code will do a rendez wake, which
1544                  * will spin on the rendez lock, meaning it won't procede until
1545                  * we either see the new state (and return) or put ourselves on
1546                  * the rendez, and wake up.
1547                  *
1548                  * The pattern is one side writes mem, then signals.  Our side
1549                  * checks the signal, then reads the mem.  The goal is to not
1550                  * miss seeing the signal AND missing the memory write.  In this
1551                  * specific case, the signal is actually synchronous (the rendez
1552                  * lock) and not basic shared memory.
1553                  *
1554                  * Oh, and we spin in case we woke early and someone else filled
1555                  * the queue, mesa-style. */
1556                 while (!qwriter_should_wake(q))
1557                         rendez_sleep(&q->wr, qwriter_should_wake, q);
1558         }
1559         return ret;
1560 }
1561
1562 /*
1563  *  add a block to a queue obeying flow control
1564  */
1565 ssize_t qbwrite(struct queue *q, struct block *b)
1566 {
1567         return __qbwrite(q, b, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
1568 }
1569
1570 ssize_t qbwrite_nonblock(struct queue *q, struct block *b)
1571 {
1572         return __qbwrite(q, b, QIO_CAN_ERR_SLEEP | QIO_LIMIT | QIO_NON_BLOCK);
1573 }
1574
1575 ssize_t qibwrite(struct queue *q, struct block *b)
1576 {
1577         return __qbwrite(q, b, 0);
1578 }
1579
1580 /* Helper, allocs a block and copies [from, from + len) into it.  Returns the
1581  * block on success, 0 on failure. */
1582 static struct block *build_block(void *from, size_t len, int mem_flags)
1583 {
1584         struct block *b;
1585         void *ext_buf;
1586
1587         /* If len is small, we don't need to bother with the extra_data.  But
1588          * until the whole stack can handle extd blocks, we'll use them
1589          * unconditionally.  */
1590
1591         /* allocb builds in 128 bytes of header space to all blocks, but this is
1592          * only available via padblock (to the left).  we also need some space
1593          * for pullupblock for some basic headers (like icmp) that get written
1594          * in directly */
1595         b = block_alloc(64, mem_flags);
1596         if (!b)
1597                 return 0;
1598         ext_buf = kmalloc(len, mem_flags);
1599         if (!ext_buf) {
1600                 kfree(b);
1601                 return 0;
1602         }
1603         memcpy(ext_buf, from, len);
1604         if (block_add_extd(b, 1, mem_flags)) {
1605                 kfree(ext_buf);
1606                 kfree(b);
1607                 return 0;
1608         }
1609         b->extra_data[0].base = (uintptr_t)ext_buf;
1610         b->extra_data[0].off = 0;
1611         b->extra_data[0].len = len;
1612         b->extra_len += len;
1613         return b;
1614 }
1615
1616 static ssize_t __qwrite(struct queue *q, void *vp, size_t len, int mem_flags,
1617                         int qio_flags)
1618 {
1619         ERRSTACK(1);
1620         size_t n;
1621         volatile size_t sofar = 0;      /* volatile for the waserror */
1622         struct block *b;
1623         uint8_t *p = vp;
1624         void *ext_buf;
1625
1626         /* Only some callers can throw.  Others might be in a context where
1627          * waserror isn't safe. */
1628         if ((qio_flags & QIO_CAN_ERR_SLEEP) && waserror()) {
1629                 /* Any error (EAGAIN for nonblock, syscall aborted, even EPIPE)
1630                  * after some data has been sent should be treated as a partial
1631                  * write. */
1632                 if (sofar)
1633                         goto out_ok;
1634                 nexterror();
1635         }
1636         do {
1637                 n = len - sofar;
1638                 /* This is 64K, the max amount per single block.  Still a good
1639                  * value? */
1640                 if (n > Maxatomic)
1641                         n = Maxatomic;
1642                 b = build_block(p + sofar, n, mem_flags);
1643                 if (!b)
1644                         break;
1645                 if (__qbwrite(q, b, qio_flags) < 0)
1646                         break;
1647                 sofar += n;
1648         } while ((sofar < len) && (q->state & Qmsg) == 0);
1649 out_ok:
1650         if (qio_flags & QIO_CAN_ERR_SLEEP)
1651                 poperror();
1652         return sofar;
1653 }
1654
1655 ssize_t qwrite(struct queue *q, void *vp, int len)
1656 {
1657         return __qwrite(q, vp, len, MEM_WAIT, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
1658 }
1659
1660 ssize_t qwrite_nonblock(struct queue *q, void *vp, int len)
1661 {
1662         return __qwrite(q, vp, len, MEM_WAIT, QIO_CAN_ERR_SLEEP | QIO_LIMIT |
1663                                               QIO_NON_BLOCK);
1664 }
1665
1666 ssize_t qiwrite(struct queue *q, void *vp, int len)
1667 {
1668         return __qwrite(q, vp, len, MEM_ATOMIC, 0);
1669 }
1670
1671 /*
1672  *  be extremely careful when calling this,
1673  *  as there is no reference accounting
1674  */
1675 void qfree(struct queue *q)
1676 {
1677         qclose(q);
1678         kfree(q);
1679 }
1680
1681 /*
1682  *  Mark a queue as closed.  No further IO is permitted.
1683  *  All blocks are released.
1684  */
1685 void qclose(struct queue *q)
1686 {
1687         struct block *bfirst;
1688
1689         if (q == NULL)
1690                 return;
1691
1692         /* mark it */
1693         spin_lock_irqsave(&q->lock);
1694         q->state |= Qclosed;
1695         q->state &= ~Qdropoverflow;
1696         q->err[0] = 0;
1697         bfirst = q->bfirst;
1698         q->bfirst = 0;
1699         q->dlen = 0;
1700         spin_unlock_irqsave(&q->lock);
1701
1702         /* free queued blocks */
1703         freeblist(bfirst);
1704
1705         /* wake up readers/writers */
1706         rendez_wakeup(&q->rr);
1707         rendez_wakeup(&q->wr);
1708         qwake_cb(q, FDTAP_FILT_HANGUP);
1709 }
1710
1711 /* Mark a queue as closed.  Wakeup any readers.  Don't remove queued blocks.
1712  *
1713  * msg will be the errstr received by any waiters (qread, qbread, etc).  If
1714  * there is no message, which is what also happens during a natural qclose(),
1715  * those waiters will simply return 0.  qwriters will always error() on a
1716  * closed/hungup queue. */
1717 void qhangup(struct queue *q, char *msg)
1718 {
1719         /* mark it */
1720         spin_lock_irqsave(&q->lock);
1721         q->state |= Qclosed;
1722         if (msg == 0 || *msg == 0)
1723                 q->err[0] = 0;
1724         else
1725                 strlcpy(q->err, msg, ERRMAX);
1726         spin_unlock_irqsave(&q->lock);
1727
1728         /* wake up readers/writers */
1729         rendez_wakeup(&q->rr);
1730         rendez_wakeup(&q->wr);
1731         qwake_cb(q, FDTAP_FILT_HANGUP);
1732 }
1733
1734 /*
1735  *  return non-zero if the q is hungup
1736  */
1737 int qisclosed(struct queue *q)
1738 {
1739         return q->state & Qclosed;
1740 }
1741
1742 /*
1743  *  mark a queue as no longer hung up.  resets the wake_cb.
1744  */
1745 void qreopen(struct queue *q)
1746 {
1747         spin_lock_irqsave(&q->lock);
1748         q->state &= ~Qclosed;
1749         q->eof = 0;
1750         q->limit = q->inilim;
1751         q->wake_cb = 0;
1752         q->wake_data = 0;
1753         spin_unlock_irqsave(&q->lock);
1754 }
1755
1756 /*
1757  *  return bytes queued
1758  */
1759 int qlen(struct queue *q)
1760 {
1761         return q->dlen;
1762 }
1763
1764 size_t q_bytes_read(struct queue *q)
1765 {
1766         return q->bytes_read;
1767 }
1768
1769 /*
1770  * return space remaining before flow control
1771  *
1772  *  This used to be
1773  *  q->len < q->limit/2
1774  *  but it slows down tcp too much for certain write sizes.
1775  *  I really don't understand it completely.  It may be
1776  *  due to the queue draining so fast that the transmission
1777  *  stalls waiting for the app to produce more data.  - presotto
1778  *
1779  *  q->len was the amount of bytes, which is no longer used.  we now use
1780  *  q->dlen, the amount of usable data.  a.k.a. qlen()...  - brho
1781  */
1782 int qwindow(struct queue *q)
1783 {
1784         int l;
1785
1786         l = q->limit - q->dlen;
1787         if (l < 0)
1788                 l = 0;
1789         return l;
1790 }
1791
1792 /*
1793  *  return true if we can read without blocking
1794  */
1795 int qcanread(struct queue *q)
1796 {
1797         return q->bfirst != 0;
1798 }
1799
1800 /*
1801  *  change queue limit
1802  */
1803 void qsetlimit(struct queue *q, size_t limit)
1804 {
1805         bool was_writable = qwritable(q);
1806
1807         q->limit = limit;
1808         if (!was_writable && qwritable(q)) {
1809                 rendez_wakeup(&q->wr);
1810                 qwake_cb(q, FDTAP_FILT_WRITABLE);
1811         }
1812 }
1813
1814 size_t qgetlimit(struct queue *q)
1815 {
1816         return q->limit;
1817 }
1818
1819 /*
1820  *  set whether writes drop overflowing blocks, or if we sleep
1821  */
1822 void qdropoverflow(struct queue *q, bool onoff)
1823 {
1824         spin_lock_irqsave(&q->lock);
1825         if (onoff)
1826                 q->state |= Qdropoverflow;
1827         else
1828                 q->state &= ~Qdropoverflow;
1829         spin_unlock_irqsave(&q->lock);
1830 }
1831
1832 /* Be careful: this can affect concurrent reads/writes and code that might have
1833  * built-in expectations of the q's type. */
1834 void q_toggle_qmsg(struct queue *q, bool onoff)
1835 {
1836         spin_lock_irqsave(&q->lock);
1837         if (onoff)
1838                 q->state |= Qmsg;
1839         else
1840                 q->state &= ~Qmsg;
1841         spin_unlock_irqsave(&q->lock);
1842 }
1843
1844 /* Be careful: this can affect concurrent reads/writes and code that might have
1845  * built-in expectations of the q's type. */
1846 void q_toggle_qcoalesce(struct queue *q, bool onoff)
1847 {
1848         spin_lock_irqsave(&q->lock);
1849         if (onoff)
1850                 q->state |= Qcoalesce;
1851         else
1852                 q->state &= ~Qcoalesce;
1853         spin_unlock_irqsave(&q->lock);
1854 }
1855
1856 /*
1857  *  flush the output queue
1858  */
1859 void qflush(struct queue *q)
1860 {
1861         struct block *bfirst;
1862
1863         /* mark it */
1864         spin_lock_irqsave(&q->lock);
1865         bfirst = q->bfirst;
1866         q->bfirst = 0;
1867         q->dlen = 0;
1868         spin_unlock_irqsave(&q->lock);
1869
1870         /* free queued blocks */
1871         freeblist(bfirst);
1872
1873         /* wake up writers */
1874         rendez_wakeup(&q->wr);
1875         qwake_cb(q, FDTAP_FILT_WRITABLE);
1876 }
1877
1878 int qfull(struct queue *q)
1879 {
1880         return !qwritable(q);
1881 }
1882
1883 int qstate(struct queue *q)
1884 {
1885         return q->state;
1886 }
1887
1888 void qdump(struct queue *q)
1889 {
1890         if (q)
1891                 printk("q=%p bfirst=%p blast=%p dlen=%d limit=%d state=#%x\n",
1892                            q, q->bfirst, q->blast, q->dlen, q->limit, q->state);
1893 }
1894
1895 /* On certain wakeup events, qio will call func(q, data, filter), where filter
1896  * marks the type of wakeup event (flags from FDTAP).
1897  *
1898  * There's no sync protection.  If you change the CB while the qio is running,
1899  * you might get a CB with the data or func from a previous set_wake_cb.  You
1900  * should set this once per queue and forget it.
1901  *
1902  * You can remove the CB by passing in 0 for the func.  Alternatively, you can
1903  * just make sure that the func(data) pair are valid until the queue is freed or
1904  * reopened. */
1905 void qio_set_wake_cb(struct queue *q, qio_wake_cb_t func, void *data)
1906 {
1907         q->wake_data = data;
1908         wmb();  /* if we see func, we'll also see the data for it */
1909         q->wake_cb = func;
1910 }
1911
1912 /* Helper for detecting whether we'll block on a read at this instant. */
1913 bool qreadable(struct queue *q)
1914 {
1915         return qlen(q) > 0;
1916 }
1917
1918 /* Helper for detecting whether we'll block on a write at this instant. */
1919 bool qwritable(struct queue *q)
1920 {
1921         return !q->limit || qwindow(q) > 0;
1922 }