net: Add accounting to help TSO/LSO/GSO
[akaros.git] / kern / src / ns / qio.c
1 /* Copyright © 1994-1999 Lucent Technologies Inc.  All rights reserved.
2  * Portions Copyright © 1997-1999 Vita Nuova Limited
3  * Portions Copyright © 2000-2007 Vita Nuova Holdings Limited
4  *                                (www.vitanuova.com)
5  * Revisions Copyright © 2000-2007 Lucent Technologies Inc. and others
6  *
7  * Modified for the Akaros operating system:
8  * Copyright (c) 2013-2014 The Regents of the University of California
9  * Copyright (c) 2013-2015 Google Inc.
10  *
11  * Permission is hereby granted, free of charge, to any person obtaining a copy
12  * of this software and associated documentation files (the "Software"), to deal
13  * in the Software without restriction, including without limitation the rights
14  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15  * copies of the Software, and to permit persons to whom the Software is
16  * furnished to do so, subject to the following conditions:
17  *
18  * The above copyright notice and this permission notice shall be included in
19  * all copies or substantial portions of the Software.
20  *
21  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL THE
24  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
27  * SOFTWARE. */
28
29 #include <vfs.h>
30 #include <kfs.h>
31 #include <slab.h>
32 #include <kmalloc.h>
33 #include <kref.h>
34 #include <string.h>
35 #include <stdio.h>
36 #include <assert.h>
37 #include <error.h>
38 #include <cpio.h>
39 #include <pmap.h>
40 #include <smp.h>
41 #include <ip.h>
42
43 #define PANIC_EXTRA(b)                                                  \
44 {                                                                       \
45         if ((b)->extra_len) {                                           \
46                 printblock(b);                                          \
47                 backtrace();                                            \
48                 panic("%s doesn't handle extra_data", __FUNCTION__);    \
49         }                                                               \
50 }
51
52 static uint32_t padblockcnt;
53 static uint32_t concatblockcnt;
54 static uint32_t pullupblockcnt;
55 static uint32_t copyblockcnt;
56 static uint32_t consumecnt;
57 static uint32_t producecnt;
58 static uint32_t qcopycnt;
59
60 static int debugging;
61
62 #define QDEBUG  if(0)
63
64 /*
65  *  IO queues
66  */
67
68 struct queue {
69         spinlock_t lock;;
70
71         struct block *bfirst;           /* buffer */
72         struct block *blast;
73
74         int dlen;                                       /* data bytes in queue */
75         int limit;                                      /* max bytes in queue */
76         int inilim;                             /* initial limit */
77         int state;
78         int eof;                                        /* number of eofs read by user */
79         size_t bytes_read;
80
81         void (*kick) (void *);          /* restart output */
82         void (*bypass) (void *, struct block *);        /* bypass queue altogether */
83         void *arg;                                      /* argument to kick */
84
85         struct rendez rr;                       /* process waiting to read */
86         struct rendez wr;                       /* process waiting to write */
87         qio_wake_cb_t wake_cb;          /* callbacks for qio wakeups */
88         void *wake_data;
89
90         char err[ERRMAX];
91 };
92
93 enum {
94         Maxatomic = 64 * 1024,
95         QIO_CAN_ERR_SLEEP = (1 << 0),   /* can throw errors or block/sleep */
96         QIO_LIMIT = (1 << 1),                   /* respect q->limit */
97         QIO_DROP_OVERFLOW = (1 << 2),   /* alternative to setting qdropoverflow */
98         QIO_JUST_ONE_BLOCK = (1 << 3),  /* when qbreading, just get one block */
99         QIO_NON_BLOCK = (1 << 4),               /* throw EAGAIN instead of blocking */
100         QIO_DONT_KICK = (1 << 5),               /* don't kick when waking */
101 };
102
103 unsigned int qiomaxatomic = Maxatomic;
104
105 static size_t copy_to_block_body(struct block *to, void *from, size_t copy_amt);
106 static ssize_t __qbwrite(struct queue *q, struct block *b, int flags);
107 static struct block *__qbread(struct queue *q, size_t len, int qio_flags,
108                               int mem_flags);
109 static bool qwait_and_ilock(struct queue *q, int qio_flags);
110
111 /* Helper: fires a wake callback, sending 'filter' */
112 static void qwake_cb(struct queue *q, int filter)
113 {
114         if (q->wake_cb)
115                 q->wake_cb(q, q->wake_data, filter);
116 }
117
118 void ixsummary(void)
119 {
120         debugging ^= 1;
121         printd("pad %lu, concat %lu, pullup %lu, copy %lu\n",
122                    padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
123         printd("consume %lu, produce %lu, qcopy %lu\n",
124                    consumecnt, producecnt, qcopycnt);
125 }
126
127 /*
128  *  pad a block to the front (or the back if size is negative)
129  */
130 struct block *padblock(struct block *bp, int size)
131 {
132         int n;
133         struct block *nbp;
134         uint8_t bcksum = bp->flag & BCKSUM_FLAGS;
135         uint16_t checksum_start = bp->checksum_start;
136         uint16_t checksum_offset = bp->checksum_offset;
137         uint16_t mss = bp->mss;
138         uint16_t transport_header_end = bp->transport_header_end;
139
140         QDEBUG checkb(bp, "padblock 1");
141         if (size >= 0) {
142                 if (bp->rp - bp->base >= size) {
143                         bp->checksum_start += size;
144                         bp->transport_header_end += size;
145                         bp->rp -= size;
146                         return bp;
147                 }
148
149                 PANIC_EXTRA(bp);
150                 if (bp->next)
151                         panic("padblock %p", getcallerpc(&bp));
152                 n = BLEN(bp);
153                 padblockcnt++;
154                 nbp = block_alloc(size + n, MEM_WAIT);
155                 nbp->rp += size;
156                 nbp->wp = nbp->rp;
157                 memmove(nbp->wp, bp->rp, n);
158                 nbp->wp += n;
159                 freeb(bp);
160                 nbp->rp -= size;
161         } else {
162                 size = -size;
163
164                 PANIC_EXTRA(bp);
165
166                 if (bp->next)
167                         panic("padblock %p", getcallerpc(&bp));
168
169                 if (bp->lim - bp->wp >= size)
170                         return bp;
171
172                 n = BLEN(bp);
173                 padblockcnt++;
174                 nbp = block_alloc(size + n, MEM_WAIT);
175                 memmove(nbp->wp, bp->rp, n);
176                 nbp->wp += n;
177                 freeb(bp);
178         }
179         if (bcksum) {
180                 nbp->flag |= bcksum;
181                 nbp->checksum_start = checksum_start;
182                 nbp->checksum_offset = checksum_offset;
183                 nbp->mss = mss;
184                 nbp->transport_header_end = transport_header_end;
185         }
186         QDEBUG checkb(nbp, "padblock 1");
187         return nbp;
188 }
189
190 /*
191  *  return count of bytes in a string of blocks
192  */
193 int blocklen(struct block *bp)
194 {
195         int len;
196
197         len = 0;
198         while (bp) {
199                 len += BLEN(bp);
200                 bp = bp->next;
201         }
202         return len;
203 }
204
205 /*
206  * return count of space in blocks
207  */
208 int blockalloclen(struct block *bp)
209 {
210         int len;
211
212         len = 0;
213         while (bp) {
214                 len += BALLOC(bp);
215                 bp = bp->next;
216         }
217         return len;
218 }
219
220 /*
221  *  copy the  string of blocks into
222  *  a single block and free the string
223  */
224 struct block *concatblock(struct block *bp)
225 {
226         int len;
227         struct block *nb, *f;
228
229         if (bp->next == 0)
230                 return bp;
231
232         /* probably use parts of qclone */
233         PANIC_EXTRA(bp);
234         nb = block_alloc(blocklen(bp), MEM_WAIT);
235         for (f = bp; f; f = f->next) {
236                 len = BLEN(f);
237                 memmove(nb->wp, f->rp, len);
238                 nb->wp += len;
239         }
240         concatblockcnt += BLEN(nb);
241         freeblist(bp);
242         QDEBUG checkb(nb, "concatblock 1");
243         return nb;
244 }
245
246 /* Makes an identical copy of the block, collapsing all the data into the block
247  * body.  It does not point to the contents of the original, it is a copy
248  * (unlike qclone).  Since we're copying, we might as well put the memory into
249  * one contiguous chunk. */
250 struct block *copyblock(struct block *bp, int mem_flags)
251 {
252         struct block *newb;
253         struct extra_bdata *ebd;
254         size_t amt;
255
256         QDEBUG checkb(bp, "copyblock 0");
257         newb = block_alloc(BLEN(bp), mem_flags);
258         if (!newb)
259                 return 0;
260         amt = copy_to_block_body(newb, bp->rp, BHLEN(bp));
261         assert(amt == BHLEN(bp));
262         for (int i = 0; i < bp->nr_extra_bufs; i++) {
263                 ebd = &bp->extra_data[i];
264                 if (!ebd->base || !ebd->len)
265                         continue;
266                 amt = copy_to_block_body(newb, (void*)ebd->base + ebd->off, ebd->len);
267                 assert(amt == ebd->len);
268         }
269         /* TODO: any other flags that need copied over? */
270         if (bp->flag & BCKSUM_FLAGS) {
271                 newb->flag |= (bp->flag & BCKSUM_FLAGS);
272                 newb->checksum_start = bp->checksum_start;
273                 newb->checksum_offset = bp->checksum_offset;
274                 newb->mss = bp->mss;
275                 newb->transport_header_end = bp->transport_header_end;
276         }
277         copyblockcnt++;
278         QDEBUG checkb(newb, "copyblock 1");
279         return newb;
280 }
281
282 /* Returns a block with the remaining contents of b all in the main body of the
283  * returned block.  Replace old references to b with the returned value (which
284  * may still be 'b', if no change was needed. */
285 struct block *linearizeblock(struct block *b)
286 {
287         struct block *newb;
288
289         if (!b->extra_len)
290                 return b;
291         newb = copyblock(b, MEM_WAIT);
292         freeb(b);
293         return newb;
294 }
295
296 /* Make sure the first block has at least n bytes in its main body.  Pulls up
297  * data from the *list* of blocks.  Returns 0 if there is not enough data in the
298  * block list. */
299 struct block *pullupblock(struct block *bp, int n)
300 {
301         int i, len, seglen;
302         struct block *nbp;
303         struct extra_bdata *ebd;
304
305         /*
306          *  this should almost always be true, it's
307          *  just to avoid every caller checking.
308          */
309         if (BHLEN(bp) >= n)
310                 return bp;
311
312         /* If there's no chance, just bail out now.  This might be slightly wasteful
313          * if there's a long blist that does have enough data. */
314         if (n > blocklen(bp))
315                 return 0;
316         /* a start at explicit main-body / header management */
317         if (bp->extra_len) {
318                 if (n > bp->lim - bp->rp) {
319                         /* would need to realloc a new block and copy everything over. */
320                         panic("can't pullup %d bytes, no place to put it: bp->lim %p, bp->rp %p, bp->lim-bp->rp %d\n",
321                                         n, bp->lim, bp->rp, bp->lim-bp->rp);
322                 }
323                 len = n - BHLEN(bp);
324                 /* Would need to recursively call this, or otherwise pull from later
325                  * blocks and put chunks of their data into the block we're building. */
326                 if (len > bp->extra_len)
327                         panic("pullup more than extra (%d, %d, %d)\n",
328                               n, BHLEN(bp), bp->extra_len);
329                 QDEBUG checkb(bp, "before pullup");
330                 for (int i = 0; (i < bp->nr_extra_bufs) && len; i++) {
331                         ebd = &bp->extra_data[i];
332                         if (!ebd->base || !ebd->len)
333                                 continue;
334                         seglen = MIN(ebd->len, len);
335                         memcpy(bp->wp, (void*)(ebd->base + ebd->off), seglen);
336                         bp->wp += seglen;
337                         len -= seglen;
338                         ebd->len -= seglen;
339                         ebd->off += seglen;
340                         bp->extra_len -= seglen;
341                         if (ebd->len == 0) {
342                                 kfree((void *)ebd->base);
343                                 ebd->off = 0;
344                                 ebd->base = 0;
345                         }
346                 }
347                 /* maybe just call pullupblock recursively here */
348                 if (len)
349                         panic("pullup %d bytes overdrawn\n", len);
350                 QDEBUG checkb(bp, "after pullup");
351                 return bp;
352         }
353
354         /*
355          *  if not enough room in the first block,
356          *  add another to the front of the list.
357          */
358         if (bp->lim - bp->rp < n) {
359                 nbp = block_alloc(n, MEM_WAIT);
360                 nbp->next = bp;
361                 bp = nbp;
362         }
363
364         /*
365          *  copy bytes from the trailing blocks into the first
366          */
367         n -= BLEN(bp);
368         while ((nbp = bp->next)) {
369                 i = BLEN(nbp);
370                 if (i > n) {
371                         memmove(bp->wp, nbp->rp, n);
372                         pullupblockcnt++;
373                         bp->wp += n;
374                         nbp->rp += n;
375                         QDEBUG checkb(bp, "pullupblock 1");
376                         return bp;
377                 } else {
378                         memmove(bp->wp, nbp->rp, i);
379                         pullupblockcnt++;
380                         bp->wp += i;
381                         bp->next = nbp->next;
382                         nbp->next = 0;
383                         freeb(nbp);
384                         n -= i;
385                         if (n == 0) {
386                                 QDEBUG checkb(bp, "pullupblock 2");
387                                 return bp;
388                         }
389                 }
390         }
391         freeb(bp);
392         return 0;
393 }
394
395 /*
396  *  make sure the first block has at least n bytes in its main body
397  */
398 struct block *pullupqueue(struct queue *q, int n)
399 {
400         struct block *b;
401
402         /* TODO: lock to protect the queue links? */
403         if ((BHLEN(q->bfirst) >= n))
404                 return q->bfirst;
405         q->bfirst = pullupblock(q->bfirst, n);
406         for (b = q->bfirst; b != NULL && b->next != NULL; b = b->next) ;
407         q->blast = b;
408         return q->bfirst;
409 }
410
411 /* throw away count bytes from the front of
412  * block's extradata.  Returns count of bytes
413  * thrown away
414  */
415
416 static int pullext(struct block *bp, int count)
417 {
418         struct extra_bdata *ed;
419         int i, rem, bytes = 0;
420
421         for (i = 0; bp->extra_len && count && i < bp->nr_extra_bufs; i++) {
422                 ed = &bp->extra_data[i];
423                 rem = MIN(count, ed->len);
424                 bp->extra_len -= rem;
425                 count -= rem;
426                 bytes += rem;
427                 ed->off += rem;
428                 ed->len -= rem;
429                 if (ed->len == 0) {
430                         kfree((void *)ed->base);
431                         ed->base = 0;
432                         ed->off = 0;
433                 }
434         }
435         return bytes;
436 }
437
438 /* throw away count bytes from the end of a
439  * block's extradata.  Returns count of bytes
440  * thrown away
441  */
442
443 static int dropext(struct block *bp, int count)
444 {
445         struct extra_bdata *ed;
446         int i, rem, bytes = 0;
447
448         for (i = bp->nr_extra_bufs - 1; bp->extra_len && count && i >= 0; i--) {
449                 ed = &bp->extra_data[i];
450                 rem = MIN(count, ed->len);
451                 bp->extra_len -= rem;
452                 count -= rem;
453                 bytes += rem;
454                 ed->len -= rem;
455                 if (ed->len == 0) {
456                         kfree((void *)ed->base);
457                         ed->base = 0;
458                         ed->off = 0;
459                 }
460         }
461         return bytes;
462 }
463
464 /*
465  *  throw away up to count bytes from a
466  *  list of blocks.  Return count of bytes
467  *  thrown away.
468  */
469 static int _pullblock(struct block **bph, int count, int free)
470 {
471         struct block *bp;
472         int n, bytes;
473
474         bytes = 0;
475         if (bph == NULL)
476                 return 0;
477
478         while (*bph != NULL && count != 0) {
479                 bp = *bph;
480
481                 n = MIN(BHLEN(bp), count);
482                 bytes += n;
483                 count -= n;
484                 bp->rp += n;
485                 n = pullext(bp, count);
486                 bytes += n;
487                 count -= n;
488                 QDEBUG checkb(bp, "pullblock ");
489                 if (BLEN(bp) == 0 && (free || count)) {
490                         *bph = bp->next;
491                         bp->next = NULL;
492                         freeb(bp);
493                 }
494         }
495         return bytes;
496 }
497
498 int pullblock(struct block **bph, int count)
499 {
500         return _pullblock(bph, count, 1);
501 }
502
503 /*
504  *  trim to len bytes starting at offset
505  */
506 struct block *trimblock(struct block *bp, int offset, int len)
507 {
508         uint32_t l, trim;
509         int olen = len;
510
511         QDEBUG checkb(bp, "trimblock 1");
512         if (blocklen(bp) < offset + len) {
513                 freeblist(bp);
514                 return NULL;
515         }
516
517         l =_pullblock(&bp, offset, 0);
518         if (bp == NULL)
519                 return NULL;
520         if (l != offset) {
521                 freeblist(bp);
522                 return NULL;
523         }
524
525         while ((l = BLEN(bp)) < len) {
526                 len -= l;
527                 bp = bp->next;
528         }
529
530         trim = BLEN(bp) - len;
531         trim -= dropext(bp, trim);
532         bp->wp -= trim;
533
534         if (bp->next) {
535                 freeblist(bp->next);
536                 bp->next = NULL;
537         }
538         return bp;
539 }
540
541 /* Adjust block @bp so that its size is exactly @len.
542  * If the size is increased, fill in the new contents with zeros.
543  * If the size is decreased, discard some of the old contents at the tail. */
544 struct block *adjustblock(struct block *bp, int len)
545 {
546         struct extra_bdata *ebd;
547         void *buf;
548         int i;
549
550         if (len < 0) {
551                 freeb(bp);
552                 return NULL;
553         }
554
555         if (len == BLEN(bp))
556                 return bp;
557
558         /* Shrink within block main body. */
559         if (len <= BHLEN(bp)) {
560                 free_block_extra(bp);
561                 bp->wp = bp->rp + len;
562                 QDEBUG checkb(bp, "adjustblock 1");
563                 return bp;
564         }
565
566         /* Need to grow. */
567         if (len > BLEN(bp)) {
568                 /* Grow within block main body. */
569                 if (bp->extra_len == 0 && bp->rp + len <= bp->lim) {
570                         memset(bp->wp, 0, len - BLEN(bp));
571                         bp->wp = bp->rp + len;
572                         QDEBUG checkb(bp, "adjustblock 2");
573                         return bp;
574                 }
575                 /* Grow with extra data buffers. */
576                 buf = kzmalloc(len - BLEN(bp), MEM_WAIT);
577                 block_append_extra(bp, (uintptr_t)buf, 0, len - BLEN(bp), MEM_WAIT);
578                 QDEBUG checkb(bp, "adjustblock 3");
579                 return bp;
580         }
581
582         /* Shrink extra data buffers.
583          * len is how much of ebd we need to keep.
584          * extra_len is re-accumulated. */
585         assert(bp->extra_len > 0);
586         len -= BHLEN(bp);
587         bp->extra_len = 0;
588         for (i = 0; i < bp->nr_extra_bufs; i++) {
589                 ebd = &bp->extra_data[i];
590                 if (len <= ebd->len)
591                         break;
592                 len -= ebd->len;
593                 bp->extra_len += ebd->len;
594         }
595         /* If len becomes zero, extra_data[i] should be freed. */
596         if (len > 0) {
597                 ebd = &bp->extra_data[i];
598                 ebd->len = len;
599                 bp->extra_len += ebd->len;
600                 i++;
601         }
602         for (; i < bp->nr_extra_bufs; i++) {
603                 ebd = &bp->extra_data[i];
604                 if (ebd->base)
605                         kfree((void*)ebd->base);
606                 ebd->base = ebd->off = ebd->len = 0;
607         }
608         QDEBUG checkb(bp, "adjustblock 4");
609         return bp;
610 }
611
612 /* Helper: removes and returns the first block from q */
613 static struct block *pop_first_block(struct queue *q)
614 {
615         struct block *b = q->bfirst;
616
617         q->dlen -= BLEN(b);
618         q->bytes_read += BLEN(b);
619         q->bfirst = b->next;
620         b->next = 0;
621         return b;
622 }
623
624 /* Helper: copies up to copy_amt from a buf to a block's main body (b->wp) */
625 static size_t copy_to_block_body(struct block *to, void *from, size_t copy_amt)
626 {
627         copy_amt = MIN(to->lim - to->wp, copy_amt);
628         memcpy(to->wp, from, copy_amt);
629         to->wp += copy_amt;
630         return copy_amt;
631 }
632
633 /* Accounting helper.  Block b in q lost amt extra_data */
634 static void block_and_q_lost_extra(struct block *b, struct queue *q, size_t amt)
635 {
636         b->extra_len -= amt;
637         q->dlen -= amt;
638         q->bytes_read += amt;
639 }
640
641 /* Helper: moves ebd from a block (in from_q) to another block.  The *ebd is
642  * fixed in 'from', so we move its contents and zero it out in 'from'.
643  *
644  * Returns the length moved (0 on failure). */
645 static size_t move_ebd(struct extra_bdata *ebd, struct block *to,
646                        struct block *from, struct queue *from_q)
647 {
648         size_t ret = ebd->len;
649
650         if (block_append_extra(to, ebd->base, ebd->off, ebd->len, MEM_ATOMIC))
651                 return 0;
652         block_and_q_lost_extra(from, from_q, ebd->len);
653         ebd->base = ebd->len = ebd->off = 0;
654         return ret;
655 }
656
657 /* Copy up to len bytes from q->bfirst to @to, leaving the block in place.  May
658  * return with less than len, but greater than 0, even if there is more
659  * available in q.
660  *
661  * At any moment that we have copied anything and things are tricky, we can just
662  * return.  The trickiness comes from a bunch of variables: is the main body
663  * empty?  How do we split the ebd?  If our alloc fails, then we can fall back
664  * to @to's main body, but only if we haven't used it yet. */
665 static size_t copy_from_first_block(struct queue *q, struct block *to,
666                                     size_t len)
667 {
668         struct block *from = q->bfirst;
669         size_t copy_amt, amt;
670         struct extra_bdata *ebd;
671
672         assert(len < BLEN(from));       /* sanity */
673         /* Try to extract from the main body */
674         copy_amt = MIN(BHLEN(from), len);
675         if (copy_amt) {
676                 copy_amt = copy_to_block_body(to, from->rp, copy_amt);
677                 from->rp += copy_amt;
678                 /* We only change dlen, (data len), not q->len, since the q still has
679                  * the same block memory allocation (no kfrees happened) */
680                 q->dlen -= copy_amt;
681                 q->bytes_read += copy_amt;
682         }
683         /* Try to extract the remainder from the extra data */
684         len -= copy_amt;
685         for (int i = 0; (i < from->nr_extra_bufs) && len; i++) {
686                 ebd = &from->extra_data[i];
687                 if (!ebd->base || !ebd->len)
688                         continue;
689                 if (len >= ebd->len) {
690                         amt = move_ebd(ebd, to, from, q);
691                         if (!amt) {
692                                 /* our internal alloc could have failed.   this ebd is now the
693                                  * last one we'll consider.  let's handle it separately and put
694                                  * it in the main body. */
695                                 if (copy_amt)
696                                         return copy_amt;
697                                 copy_amt = copy_to_block_body(to, (void*)ebd->base + ebd->off,
698                                                               ebd->len);
699                                 block_and_q_lost_extra(from, q, copy_amt);
700                                 break;
701                         }
702                         len -= amt;
703                         copy_amt += amt;
704                         continue;
705                 } else {
706                         /* If we're here, we reached our final ebd, which we'll need to
707                          * split to get anything from it. */
708                         if (copy_amt)
709                                 return copy_amt;
710                         copy_amt = copy_to_block_body(to, (void*)ebd->base + ebd->off,
711                                                       len);
712                         ebd->off += copy_amt;
713                         ebd->len -= copy_amt;
714                         block_and_q_lost_extra(from, q, copy_amt);
715                         break;
716                 }
717         }
718         if (len)
719                 assert(copy_amt);       /* sanity */
720         return copy_amt;
721 }
722
723 /* Return codes for __qbread and __try_qbread. */
724 enum {
725         QBR_OK,
726         QBR_FAIL,
727         QBR_SPARE,      /* we need a spare block */
728         QBR_AGAIN,      /* do it again, we are coalescing blocks */
729 };
730
731 /* Helper and back-end for __qbread: extracts and returns a list of blocks
732  * containing up to len bytes.  It may contain less than len even if q has more
733  * data.
734  *
735  * Returns a code interpreted by __qbread, and the returned blist in ret. */
736 static int __try_qbread(struct queue *q, size_t len, int qio_flags,
737                         struct block **real_ret, struct block *spare)
738 {
739         struct block *ret, *ret_last, *first;
740         size_t blen;
741         bool was_unwritable = FALSE;
742
743         if (qio_flags & QIO_CAN_ERR_SLEEP) {
744                 if (!qwait_and_ilock(q, qio_flags)) {
745                         spin_unlock_irqsave(&q->lock);
746                         return QBR_FAIL;
747                 }
748                 /* we qwaited and still hold the lock, so the q is not empty */
749                 first = q->bfirst;
750         } else {
751                 spin_lock_irqsave(&q->lock);
752                 first = q->bfirst;
753                 if (!first) {
754                         spin_unlock_irqsave(&q->lock);
755                         return QBR_FAIL;
756                 }
757         }
758         /* We need to check before adjusting q->len.  We're checking the writer's
759          * sleep condition / tap condition.  When set, we *might* be making an edge
760          * transition (from unwritable to writable), which needs to wake and fire
761          * taps.  But, our read might not drain the queue below q->lim.  We'll check
762          * again later to see if we should really wake them.  */
763         was_unwritable = !qwritable(q);
764         blen = BLEN(first);
765         if ((q->state & Qcoalesce) && (blen == 0)) {
766                 freeb(pop_first_block(q));
767                 spin_unlock_irqsave(&q->lock);
768                 /* Need to retry to make sure we have a first block */
769                 return QBR_AGAIN;
770         }
771         /* Qmsg: just return the first block.  Be careful, since our caller might
772          * not read all of the block and thus drop bytes.  Similar to SOCK_DGRAM. */
773         if (q->state & Qmsg) {
774                 ret = pop_first_block(q);
775                 goto out_ok;
776         }
777         /* Let's get at least something first - makes the code easier.  This way,
778          * we'll only ever split the block once. */
779         if (blen <= len) {
780                 ret = pop_first_block(q);
781                 len -= blen;
782         } else {
783                 /* need to split the block.  we won't actually take the first block out
784                  * of the queue - we're just extracting a little bit. */
785                 if (!spare) {
786                         /* We have nothing and need a spare block.  Retry! */
787                         spin_unlock_irqsave(&q->lock);
788                         return QBR_SPARE;
789                 }
790                 copy_from_first_block(q, spare, len);
791                 ret = spare;
792                 goto out_ok;
793         }
794         /* At this point, we just grabbed the first block.  We can try to grab some
795          * more, up to len (if they want). */
796         if (qio_flags & QIO_JUST_ONE_BLOCK)
797                 goto out_ok;
798         ret_last = ret;
799         while (q->bfirst && (len > 0)) {
800                 blen = BLEN(q->bfirst);
801                 if ((q->state & Qcoalesce) && (blen == 0)) {
802                         /* remove the intermediate 0 blocks */
803                         freeb(pop_first_block(q));
804                         continue;
805                 }
806                 if (blen > len) {
807                         /* We could try to split the block, but that's a huge pain.  For
808                          * instance, we might need to move the main body of b into an
809                          * extra_data of ret_last.  lots of ways for that to fail, and lots
810                          * of cases to consider.  Easier to just bail out.  This is why I
811                          * did the first block above: we don't need to worry about this. */
812                          break;
813                 }
814                 ret_last->next = pop_first_block(q);
815                 ret_last = ret_last->next;
816                 len -= blen;
817         }
818 out_ok:
819         /* Don't wake them up or fire tap if we didn't drain enough. */
820         if (!qwritable(q))
821                 was_unwritable = FALSE;
822         spin_unlock_irqsave(&q->lock);
823         if (was_unwritable) {
824                 if (q->kick && !(qio_flags & QIO_DONT_KICK))
825                         q->kick(q->arg);
826                 rendez_wakeup(&q->wr);
827                 qwake_cb(q, FDTAP_FILT_WRITABLE);
828         }
829         *real_ret = ret;
830         return QBR_OK;
831 }
832
833 /* Helper and front-end for __try_qbread: extracts and returns a list of blocks
834  * containing up to len bytes.  It may contain less than len even if q has more
835  * data.
836  *
837  * Returns 0 if the q is closed, if it would require blocking and !CAN_BLOCK, or
838  * if it required a spare and the memory allocation failed.
839  *
840  * Technically, there's a weird corner case with !Qcoalesce and Qmsg where you
841  * could get a zero length block back. */
842 static struct block *__qbread(struct queue *q, size_t len, int qio_flags,
843                               int mem_flags)
844 {
845         ERRSTACK(1);
846         struct block *ret = 0;
847         struct block *volatile spare = 0;       /* volatile for the waserror */
848
849         /* __try_qbread can throw, based on qio flags. */
850         if ((qio_flags & QIO_CAN_ERR_SLEEP) && waserror()) {
851                 if (spare)
852                         freeb(spare);
853                 nexterror();
854         }
855         while (1) {
856                 switch (__try_qbread(q, len, qio_flags, &ret, spare)) {
857                 case QBR_OK:
858                 case QBR_FAIL:
859                         if (spare && (ret != spare))
860                                 freeb(spare);
861                         goto out_ret;
862                 case QBR_SPARE:
863                         assert(!spare);
864                         /* Due to some nastiness, we need a fresh block so we can read out
865                          * anything from the queue.  'len' seems like a reasonable amount.
866                          * Maybe we can get away with less. */
867                         spare = block_alloc(len, mem_flags);
868                         if (!spare) {
869                                 /* Careful here: a memory failure (possible with MEM_ATOMIC)
870                                  * could look like 'no data in the queue' (QBR_FAIL).  The only
871                                  * one who does is this qget(), who happens to know that we
872                                  * won't need a spare, due to the len argument.  Spares are only
873                                  * needed when we need to split a block. */
874                                 ret = 0;
875                                 goto out_ret;
876                         }
877                         break;
878                 case QBR_AGAIN:
879                         /* if the first block is 0 and we are Qcoalesce, then we'll need to
880                          * try again.  We bounce out of __try so we can perform the "is
881                          * there a block" logic again from the top. */
882                         break;
883                 }
884         }
885         assert(0);
886 out_ret:
887         if (qio_flags & QIO_CAN_ERR_SLEEP)
888                 poperror();
889         return ret;
890 }
891
892 /*
893  *  get next block from a queue, return null if nothing there
894  */
895 struct block *qget(struct queue *q)
896 {
897         /* since len == SIZE_MAX, we should never need to do a mem alloc */
898         return __qbread(q, SIZE_MAX, QIO_JUST_ONE_BLOCK, MEM_ATOMIC);
899 }
900
901 /* Throw away the next 'len' bytes in the queue returning the number actually
902  * discarded.
903  *
904  * If the bytes are in the queue, then they must be discarded.  The only time to
905  * return less than len is if the q itself has less than len bytes.
906  *
907  * This won't trigger a kick when waking up any sleepers.  This seems to be Plan
908  * 9's intent, since the TCP stack will deadlock if qdiscard kicks. */
909 size_t qdiscard(struct queue *q, size_t len)
910 {
911         struct block *blist;
912         size_t removed_amt;
913         size_t sofar = 0;
914
915         /* This is racy.  There could be multiple qdiscarders or other consumers,
916          * where the consumption could be interleaved. */
917         while (qlen(q) && len) {
918                 blist = __qbread(q, len, QIO_DONT_KICK, MEM_WAIT);
919                 removed_amt = freeblist(blist);
920                 sofar += removed_amt;
921                 len -= removed_amt;
922         }
923         return sofar;
924 }
925
926 ssize_t qpass(struct queue *q, struct block *b)
927 {
928         return __qbwrite(q, b, QIO_LIMIT | QIO_DROP_OVERFLOW);
929 }
930
931 ssize_t qpassnolim(struct queue *q, struct block *b)
932 {
933         return __qbwrite(q, b, 0);
934 }
935
936 /*
937  *  if the allocated space is way out of line with the used
938  *  space, reallocate to a smaller block
939  */
940 struct block *packblock(struct block *bp)
941 {
942         struct block **l, *nbp;
943         int n;
944
945         if (bp->extra_len)
946                 return bp;
947         for (l = &bp; *l; l = &(*l)->next) {
948                 nbp = *l;
949                 n = BLEN(nbp);
950                 if ((n << 2) < BALLOC(nbp)) {
951                         *l = block_alloc(n, MEM_WAIT);
952                         memmove((*l)->wp, nbp->rp, n);
953                         (*l)->wp += n;
954                         (*l)->next = nbp->next;
955                         freeb(nbp);
956                 }
957         }
958
959         return bp;
960 }
961
962 /* Add an extra_data entry to newb at newb_idx pointing to b's body, starting at
963  * body_rp, for up to len.  Returns the len consumed.
964  *
965  * The base is 'b', so that we can kfree it later.  This currently ties us to
966  * using kfree for the release method for all extra_data.
967  *
968  * It is possible to have a body size that is 0, if there is no offset, and
969  * b->wp == b->rp.  This will have an extra data entry of 0 length. */
970 static size_t point_to_body(struct block *b, uint8_t *body_rp,
971                             struct block *newb, unsigned int newb_idx,
972                             size_t len)
973 {
974         struct extra_bdata *ebd = &newb->extra_data[newb_idx];
975
976         assert(newb_idx < newb->nr_extra_bufs);
977
978         kmalloc_incref(b);
979         ebd->base = (uintptr_t)b;
980         ebd->off = (uint32_t)(body_rp - (uint8_t*)b);
981         ebd->len = MIN(b->wp - body_rp, len);   /* think of body_rp as b->rp */
982         assert((int)ebd->len >= 0);
983         newb->extra_len += ebd->len;
984         return ebd->len;
985 }
986
987 /* Add an extra_data entry to newb at newb_idx pointing to b's b_idx'th
988  * extra_data buf, at b_off within that buffer, for up to len.  Returns the len
989  * consumed.
990  *
991  * We can have blocks with 0 length, but they are still refcnt'd.  See above. */
992 static size_t point_to_buf(struct block *b, unsigned int b_idx, uint32_t b_off,
993                            struct block *newb, unsigned int newb_idx,
994                            size_t len)
995 {
996         struct extra_bdata *n_ebd = &newb->extra_data[newb_idx];
997         struct extra_bdata *b_ebd = &b->extra_data[b_idx];
998
999         assert(b_idx < b->nr_extra_bufs);
1000         assert(newb_idx < newb->nr_extra_bufs);
1001
1002         kmalloc_incref((void*)b_ebd->base);
1003         n_ebd->base = b_ebd->base;
1004         n_ebd->off = b_ebd->off + b_off;
1005         n_ebd->len = MIN(b_ebd->len - b_off, len);
1006         newb->extra_len += n_ebd->len;
1007         return n_ebd->len;
1008 }
1009
1010 /* given a string of blocks, sets up the new block's extra_data such that it
1011  * *points* to the contents of the blist [offset, len + offset).  This does not
1012  * make a separate copy of the contents of the blist.
1013  *
1014  * returns 0 on success.  the only failure is if the extra_data array was too
1015  * small, so this returns a positive integer saying how big the extra_data needs
1016  * to be.
1017  *
1018  * callers are responsible for protecting the list structure. */
1019 static int __blist_clone_to(struct block *blist, struct block *newb, int len,
1020                             uint32_t offset)
1021 {
1022         struct block *b, *first;
1023         unsigned int nr_bufs = 0;
1024         unsigned int b_idx, newb_idx = 0;
1025         uint8_t *first_main_body = 0;
1026
1027         /* find the first block; keep offset relative to the latest b in the list */
1028         for (b = blist; b; b = b->next) {
1029                 if (BLEN(b) > offset)
1030                         break;
1031                 offset -= BLEN(b);
1032         }
1033         /* qcopy semantics: if you asked for an offset outside the block list, you
1034          * get an empty block back */
1035         if (!b)
1036                 return 0;
1037         first = b;
1038         /* upper bound for how many buffers we'll need in newb */
1039         for (/* b is set*/; b; b = b->next) {
1040                 nr_bufs += 1 + b->nr_extra_bufs;        /* 1 for the main body */
1041         }
1042         /* we might be holding a spinlock here, so we won't wait for kmalloc */
1043         if (block_add_extd(newb, nr_bufs, 0) != 0) {
1044                 /* caller will need to alloc these, then re-call us */
1045                 return nr_bufs;
1046         }
1047         for (b = first; b && len; b = b->next) {
1048                 b_idx = 0;
1049                 if (offset) {
1050                         if (offset < BHLEN(b)) {
1051                                 /* off is in the main body */
1052                                 len -= point_to_body(b, b->rp + offset, newb, newb_idx, len);
1053                                 newb_idx++;
1054                         } else {
1055                                 /* off is in one of the buffers (or just past the last one).
1056                                  * we're not going to point to b's main body at all. */
1057                                 offset -= BHLEN(b);
1058                                 assert(b->extra_data);
1059                                 /* assuming these extrabufs are packed, or at least that len
1060                                  * isn't gibberish */
1061                                 while (b->extra_data[b_idx].len <= offset) {
1062                                         offset -= b->extra_data[b_idx].len;
1063                                         b_idx++;
1064                                 }
1065                                 /* now offset is set to our offset in the b_idx'th buf */
1066                                 len -= point_to_buf(b, b_idx, offset, newb, newb_idx, len);
1067                                 newb_idx++;
1068                                 b_idx++;
1069                         }
1070                         offset = 0;
1071                 } else {
1072                         len -= point_to_body(b, b->rp, newb, newb_idx, len);
1073                         newb_idx++;
1074                 }
1075                 /* knock out all remaining bufs.  we only did one point_to_ op by now,
1076                  * and any point_to_ could be our last if it consumed all of len. */
1077                 for (int i = b_idx; (i < b->nr_extra_bufs) && len; i++) {
1078                         len -= point_to_buf(b, i, 0, newb, newb_idx, len);
1079                         newb_idx++;
1080                 }
1081         }
1082         return 0;
1083 }
1084
1085 struct block *blist_clone(struct block *blist, int header_len, int len,
1086                           uint32_t offset)
1087 {
1088         int ret;
1089         struct block *newb = block_alloc(header_len, MEM_WAIT);
1090         do {
1091                 ret = __blist_clone_to(blist, newb, len, offset);
1092                 if (ret)
1093                         block_add_extd(newb, ret, MEM_WAIT);
1094         } while (ret);
1095         return newb;
1096 }
1097
1098 /* given a queue, makes a single block with header_len reserved space in the
1099  * block main body, and the contents of [offset, len + offset) pointed to in the
1100  * new blocks ext_data.  This does not make a copy of the q's contents, though
1101  * you do have a ref count on the memory. */
1102 struct block *qclone(struct queue *q, int header_len, int len, uint32_t offset)
1103 {
1104         int ret;
1105         struct block *newb = block_alloc(header_len, MEM_WAIT);
1106         /* the while loop should rarely be used: it would require someone
1107          * concurrently adding to the queue. */
1108         do {
1109                 /* TODO: RCU: protecting the q list (b->next) (need read lock) */
1110                 spin_lock_irqsave(&q->lock);
1111                 ret = __blist_clone_to(q->bfirst, newb, len, offset);
1112                 spin_unlock_irqsave(&q->lock);
1113                 if (ret)
1114                         block_add_extd(newb, ret, MEM_WAIT);
1115         } while (ret);
1116         return newb;
1117 }
1118
1119 /*
1120  *  copy from offset in the queue
1121  */
1122 struct block *qcopy_old(struct queue *q, int len, uint32_t offset)
1123 {
1124         int sofar;
1125         int n;
1126         struct block *b, *nb;
1127         uint8_t *p;
1128
1129         nb = block_alloc(len, MEM_WAIT);
1130
1131         spin_lock_irqsave(&q->lock);
1132
1133         /* go to offset */
1134         b = q->bfirst;
1135         for (sofar = 0;; sofar += n) {
1136                 if (b == NULL) {
1137                         spin_unlock_irqsave(&q->lock);
1138                         return nb;
1139                 }
1140                 n = BLEN(b);
1141                 if (sofar + n > offset) {
1142                         p = b->rp + offset - sofar;
1143                         n -= offset - sofar;
1144                         break;
1145                 }
1146                 QDEBUG checkb(b, "qcopy");
1147                 b = b->next;
1148         }
1149
1150         /* copy bytes from there */
1151         for (sofar = 0; sofar < len;) {
1152                 if (n > len - sofar)
1153                         n = len - sofar;
1154                 PANIC_EXTRA(b);
1155                 memmove(nb->wp, p, n);
1156                 qcopycnt += n;
1157                 sofar += n;
1158                 nb->wp += n;
1159                 b = b->next;
1160                 if (b == NULL)
1161                         break;
1162                 n = BLEN(b);
1163                 p = b->rp;
1164         }
1165         spin_unlock_irqsave(&q->lock);
1166
1167         return nb;
1168 }
1169
1170 struct block *qcopy(struct queue *q, int len, uint32_t offset)
1171 {
1172 #ifdef CONFIG_BLOCK_EXTRAS
1173         return qclone(q, 0, len, offset);
1174 #else
1175         return qcopy_old(q, len, offset);
1176 #endif
1177 }
1178
1179 static void qinit_common(struct queue *q)
1180 {
1181         spinlock_init_irqsave(&q->lock);
1182         rendez_init(&q->rr);
1183         rendez_init(&q->wr);
1184 }
1185
1186 /*
1187  *  called by non-interrupt code
1188  */
1189 struct queue *qopen(int limit, int msg, void (*kick) (void *), void *arg)
1190 {
1191         struct queue *q;
1192
1193         q = kzmalloc(sizeof(struct queue), 0);
1194         if (q == 0)
1195                 return 0;
1196         qinit_common(q);
1197
1198         q->limit = q->inilim = limit;
1199         q->kick = kick;
1200         q->arg = arg;
1201         q->state = msg;
1202         q->eof = 0;
1203
1204         return q;
1205 }
1206
1207 /* open a queue to be bypassed */
1208 struct queue *qbypass(void (*bypass) (void *, struct block *), void *arg)
1209 {
1210         struct queue *q;
1211
1212         q = kzmalloc(sizeof(struct queue), 0);
1213         if (q == 0)
1214                 return 0;
1215         qinit_common(q);
1216
1217         q->limit = 0;
1218         q->arg = arg;
1219         q->bypass = bypass;
1220         q->state = 0;
1221
1222         return q;
1223 }
1224
1225 static int notempty(void *a)
1226 {
1227         struct queue *q = a;
1228
1229         return (q->state & Qclosed) || q->bfirst != 0;
1230 }
1231
1232 /* Block, waiting for the queue to be non-empty or closed.  Returns with
1233  * the spinlock held.  Returns TRUE when there queue is not empty, FALSE if it
1234  * was naturally closed.  Throws an error o/w. */
1235 static bool qwait_and_ilock(struct queue *q, int qio_flags)
1236 {
1237         while (1) {
1238                 spin_lock_irqsave(&q->lock);
1239                 if (q->bfirst != NULL)
1240                         return TRUE;
1241                 if (q->state & Qclosed) {
1242                         if (++q->eof > 3) {
1243                                 spin_unlock_irqsave(&q->lock);
1244                                 error(EPIPE, "multiple reads on a closed queue");
1245                         }
1246                         if (q->err[0]) {
1247                                 spin_unlock_irqsave(&q->lock);
1248                                 error(EPIPE, q->err);
1249                         }
1250                         return FALSE;
1251                 }
1252                 if (qio_flags & QIO_NON_BLOCK) {
1253                         spin_unlock_irqsave(&q->lock);
1254                         error(EAGAIN, "queue empty");
1255                 }
1256                 spin_unlock_irqsave(&q->lock);
1257                 /* As with the producer side, we check for a condition while holding the
1258                  * q->lock, decide to sleep, then unlock.  It's like the "check, signal,
1259                  * check again" pattern, but we do it conditionally.  Both sides agree
1260                  * synchronously to do it, and those decisions are made while holding
1261                  * q->lock.  I think this is OK.
1262                  *
1263                  * The invariant is that no reader sleeps when the queue has data.
1264                  * While holding the rendez lock, if we see there's no data, we'll
1265                  * sleep.  Since we saw there was no data, the next writer will see (or
1266                  * already saw) no data, and then the writer decides to rendez_wake,
1267                  * which will grab the rendez lock.  If the writer already did that,
1268                  * then we'll see notempty when we do our check-again. */
1269                 rendez_sleep(&q->rr, notempty, q);
1270         }
1271 }
1272
1273 /*
1274  * add a block list to a queue
1275  * XXX basically the same as enqueue blist, and has no locking!
1276  */
1277 void qaddlist(struct queue *q, struct block *b)
1278 {
1279         /* TODO: q lock? */
1280         /* queue the block */
1281         if (q->bfirst)
1282                 q->blast->next = b;
1283         else
1284                 q->bfirst = b;
1285         q->dlen += blocklen(b);
1286         while (b->next)
1287                 b = b->next;
1288         q->blast = b;
1289 }
1290
1291 static size_t read_from_block(struct block *b, uint8_t *to, size_t amt)
1292 {
1293         size_t copy_amt, retval = 0;
1294         struct extra_bdata *ebd;
1295
1296         copy_amt = MIN(BHLEN(b), amt);
1297         memcpy(to, b->rp, copy_amt);
1298         /* advance the rp, since this block not be completely consumed and future
1299          * reads need to know where to pick up from */
1300         b->rp += copy_amt;
1301         to += copy_amt;
1302         amt -= copy_amt;
1303         retval += copy_amt;
1304         for (int i = 0; (i < b->nr_extra_bufs) && amt; i++) {
1305                 ebd = &b->extra_data[i];
1306                 /* skip empty entires.  if we track this in the struct block, we can
1307                  * just start the for loop early */
1308                 if (!ebd->base || !ebd->len)
1309                         continue;
1310                 copy_amt = MIN(ebd->len, amt);
1311                 memcpy(to, (void*)(ebd->base + ebd->off), copy_amt);
1312                 /* we're actually consuming the entries, just like how we advance rp up
1313                  * above, and might only consume part of one. */
1314                 ebd->len -= copy_amt;
1315                 ebd->off += copy_amt;
1316                 b->extra_len -= copy_amt;
1317                 if (!ebd->len) {
1318                         /* we don't actually have to decref here.  it's also done in
1319                          * freeb().  this is the earliest we can free. */
1320                         kfree((void*)ebd->base);
1321                         ebd->base = ebd->off = 0;
1322                 }
1323                 to += copy_amt;
1324                 amt -= copy_amt;
1325                 retval += copy_amt;
1326         }
1327         return retval;
1328 }
1329
1330 /*
1331  *  copy the contents of a string of blocks into
1332  *  memory.  emptied blocks are freed.  return
1333  *  pointer to first unconsumed block.
1334  */
1335 struct block *bl2mem(uint8_t * p, struct block *b, int n)
1336 {
1337         int i;
1338         struct block *next;
1339
1340         /* could be slicker here, since read_from_block is smart */
1341         for (; b != NULL; b = next) {
1342                 i = BLEN(b);
1343                 if (i > n) {
1344                         /* partial block, consume some */
1345                         read_from_block(b, p, n);
1346                         return b;
1347                 }
1348                 /* full block, consume all and move on */
1349                 i = read_from_block(b, p, i);
1350                 n -= i;
1351                 p += i;
1352                 next = b->next;
1353                 freeb(b);
1354         }
1355         return NULL;
1356 }
1357
1358 /* Extract the contents of all blocks and copy to va, up to len.  Returns the
1359  * actual amount copied. */
1360 static size_t read_all_blocks(struct block *b, void *va, size_t len)
1361 {
1362         size_t sofar = 0;
1363         struct block *next;
1364
1365         do {
1366                 /* We should be draining every block completely. */
1367                 assert(BLEN(b) <= len - sofar);
1368                 assert(va);
1369                 assert(va + sofar);
1370                 assert(b->rp);
1371                 sofar += read_from_block(b, va + sofar, len - sofar);
1372                 next = b->next;
1373                 freeb(b);
1374                 b = next;
1375         } while (b);
1376         return sofar;
1377 }
1378
1379 /*
1380  *  copy the contents of memory into a string of blocks.
1381  *  return NULL on error.
1382  */
1383 struct block *mem2bl(uint8_t * p, int len)
1384 {
1385         ERRSTACK(1);
1386         int n;
1387         struct block *b, *first, **l;
1388
1389         first = NULL;
1390         l = &first;
1391         if (waserror()) {
1392                 freeblist(first);
1393                 nexterror();
1394         }
1395         do {
1396                 n = len;
1397                 if (n > Maxatomic)
1398                         n = Maxatomic;
1399
1400                 *l = b = block_alloc(n, MEM_WAIT);
1401                 /* TODO consider extra_data */
1402                 memmove(b->wp, p, n);
1403                 b->wp += n;
1404                 p += n;
1405                 len -= n;
1406                 l = &b->next;
1407         } while (len > 0);
1408         poperror();
1409
1410         return first;
1411 }
1412
1413 /*
1414  *  put a block back to the front of the queue
1415  *  called with q ilocked
1416  */
1417 void qputback(struct queue *q, struct block *b)
1418 {
1419         b->next = q->bfirst;
1420         if (q->bfirst == NULL)
1421                 q->blast = b;
1422         q->bfirst = b;
1423         q->dlen += BLEN(b);
1424         /* qputback seems to undo a read, so we can undo the accounting too. */
1425         q->bytes_read -= BLEN(b);
1426 }
1427
1428 /*
1429  *  get next block from a queue (up to a limit)
1430  *
1431  */
1432 struct block *qbread(struct queue *q, size_t len)
1433 {
1434         return __qbread(q, len, QIO_JUST_ONE_BLOCK | QIO_CAN_ERR_SLEEP, MEM_WAIT);
1435 }
1436
1437 struct block *qbread_nonblock(struct queue *q, size_t len)
1438 {
1439         return __qbread(q, len, QIO_JUST_ONE_BLOCK | QIO_CAN_ERR_SLEEP |
1440                         QIO_NON_BLOCK, MEM_WAIT);
1441 }
1442
1443 /* read up to len from a queue into vp. */
1444 size_t qread(struct queue *q, void *va, size_t len)
1445 {
1446         struct block *blist = __qbread(q, len, QIO_CAN_ERR_SLEEP, MEM_WAIT);
1447
1448         if (!blist)
1449                 return 0;
1450         return read_all_blocks(blist, va, len);
1451 }
1452
1453 size_t qread_nonblock(struct queue *q, void *va, size_t len)
1454 {
1455         struct block *blist = __qbread(q, len, QIO_CAN_ERR_SLEEP | QIO_NON_BLOCK,
1456                                        MEM_WAIT);
1457
1458         if (!blist)
1459                 return 0;
1460         return read_all_blocks(blist, va, len);
1461 }
1462
1463 /* This is the rendez wake condition for writers. */
1464 static int qwriter_should_wake(void *a)
1465 {
1466         struct queue *q = a;
1467
1468         return qwritable(q) || (q->state & Qclosed);
1469 }
1470
1471 /* Helper: enqueues a list of blocks to a queue.  Returns the total length. */
1472 static size_t enqueue_blist(struct queue *q, struct block *b)
1473 {
1474         size_t dlen;
1475
1476         if (q->bfirst)
1477                 q->blast->next = b;
1478         else
1479                 q->bfirst = b;
1480         dlen = BLEN(b);
1481         while (b->next) {
1482                 b = b->next;
1483                 dlen += BLEN(b);
1484         }
1485         q->blast = b;
1486         q->dlen += dlen;
1487         return dlen;
1488 }
1489
1490 /* Adds block (which can be a list of blocks) to the queue, subject to
1491  * qio_flags.  Returns the length written on success or -1 on non-throwable
1492  * error.  Adjust qio_flags to control the value-added features!. */
1493 static ssize_t __qbwrite(struct queue *q, struct block *b, int qio_flags)
1494 {
1495         ssize_t ret;
1496         bool was_unreadable;
1497
1498         if (q->bypass) {
1499                 ret = blocklen(b);
1500                 (*q->bypass) (q->arg, b);
1501                 return ret;
1502         }
1503         spin_lock_irqsave(&q->lock);
1504         was_unreadable = q->dlen == 0;
1505         if (q->state & Qclosed) {
1506                 spin_unlock_irqsave(&q->lock);
1507                 freeblist(b);
1508                 if (!(qio_flags & QIO_CAN_ERR_SLEEP))
1509                         return -1;
1510                 if (q->err[0])
1511                         error(EPIPE, q->err);
1512                 else
1513                         error(EPIPE, "connection closed");
1514         }
1515         if ((qio_flags & QIO_LIMIT) && (q->dlen >= q->limit)) {
1516                 /* drop overflow takes priority over regular non-blocking */
1517                 if ((qio_flags & QIO_DROP_OVERFLOW) || (q->state & Qdropoverflow)) {
1518                         spin_unlock_irqsave(&q->lock);
1519                         freeb(b);
1520                         return -1;
1521                 }
1522                 /* People shouldn't set NON_BLOCK without CAN_ERR, but we can be nice
1523                  * and catch it. */
1524                 if ((qio_flags & QIO_CAN_ERR_SLEEP) && (qio_flags & QIO_NON_BLOCK)) {
1525                         spin_unlock_irqsave(&q->lock);
1526                         freeb(b);
1527                         error(EAGAIN, "queue full");
1528                 }
1529         }
1530         ret = enqueue_blist(q, b);
1531         QDEBUG checkb(b, "__qbwrite");
1532         spin_unlock_irqsave(&q->lock);
1533         /* TODO: not sure if the usage of a kick is mutually exclusive with a
1534          * wakeup, meaning that actual users either want a kick or have qreaders. */
1535         if (q->kick && (was_unreadable || (q->state & Qkick)))
1536                 q->kick(q->arg);
1537         if (was_unreadable) {
1538                 /* Unlike the read side, there's no double-check to make sure the queue
1539                  * transitioned across an edge.  We know we added something, so that's
1540                  * enough.  We wake if the queue was empty.  Both sides are the same, in
1541                  * that the condition for which we do the rendez_wakeup() is the same as
1542                  * the condition done for the rendez_sleep(). */
1543                 rendez_wakeup(&q->rr);
1544                 qwake_cb(q, FDTAP_FILT_READABLE);
1545         }
1546         /*
1547          *  flow control, wait for queue to get below the limit
1548          *  before allowing the process to continue and queue
1549          *  more.  We do this here so that postnote can only
1550          *  interrupt us after the data has been queued.  This
1551          *  means that things like 9p flushes and ssl messages
1552          *  will not be disrupted by software interrupts.
1553          *
1554          *  Note - this is moderately dangerous since a process
1555          *  that keeps getting interrupted and rewriting will
1556          *  queue infinite crud.
1557          */
1558         if ((qio_flags & QIO_CAN_ERR_SLEEP) &&
1559             !(q->state & Qdropoverflow) && !(qio_flags & QIO_NON_BLOCK)) {
1560                 /* This is a racy peek at the q status.  If we accidentally block, our
1561                  * rendez will return.  The rendez's peak (qwriter_should_wake) is also
1562                  * racy w.r.t.  the q's spinlock (that lock protects writes, but not
1563                  * reads).
1564                  *
1565                  * Here's the deal: when holding the rendez lock, if we see the sleep
1566                  * condition, the consumer will wake us.  The condition will only ever
1567                  * be changed by the next qbread() (consumer, changes q->dlen).  That
1568                  * code will do a rendez wake, which will spin on the rendez lock,
1569                  * meaning it won't procede until we either see the new state (and
1570                  * return) or put ourselves on the rendez, and wake up.
1571                  *
1572                  * The pattern is one side writes mem, then signals.  Our side checks
1573                  * the signal, then reads the mem.  The goal is to not miss seeing the
1574                  * signal AND missing the memory write.  In this specific case, the
1575                  * signal is actually synchronous (the rendez lock) and not basic shared
1576                  * memory.
1577                  *
1578                  * Oh, and we spin in case we woke early and someone else filled the
1579                  * queue, mesa-style. */
1580                 while (!qwriter_should_wake(q))
1581                         rendez_sleep(&q->wr, qwriter_should_wake, q);
1582         }
1583         return ret;
1584 }
1585
1586 /*
1587  *  add a block to a queue obeying flow control
1588  */
1589 ssize_t qbwrite(struct queue *q, struct block *b)
1590 {
1591         return __qbwrite(q, b, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
1592 }
1593
1594 ssize_t qbwrite_nonblock(struct queue *q, struct block *b)
1595 {
1596         return __qbwrite(q, b, QIO_CAN_ERR_SLEEP | QIO_LIMIT | QIO_NON_BLOCK);
1597 }
1598
1599 ssize_t qibwrite(struct queue *q, struct block *b)
1600 {
1601         return __qbwrite(q, b, 0);
1602 }
1603
1604 /* Helper, allocs a block and copies [from, from + len) into it.  Returns the
1605  * block on success, 0 on failure. */
1606 static struct block *build_block(void *from, size_t len, int mem_flags)
1607 {
1608         struct block *b;
1609         void *ext_buf;
1610
1611         /* If len is small, we don't need to bother with the extra_data.  But until
1612          * the whole stack can handle extd blocks, we'll use them unconditionally.
1613          * */
1614 #ifdef CONFIG_BLOCK_EXTRAS
1615         /* allocb builds in 128 bytes of header space to all blocks, but this is
1616          * only available via padblock (to the left).  we also need some space
1617          * for pullupblock for some basic headers (like icmp) that get written
1618          * in directly */
1619         b = block_alloc(64, mem_flags);
1620         if (!b)
1621                 return 0;
1622         ext_buf = kmalloc(len, mem_flags);
1623         if (!ext_buf) {
1624                 kfree(b);
1625                 return 0;
1626         }
1627         memcpy(ext_buf, from, len);
1628         if (block_add_extd(b, 1, mem_flags)) {
1629                 kfree(ext_buf);
1630                 kfree(b);
1631                 return 0;
1632         }
1633         b->extra_data[0].base = (uintptr_t)ext_buf;
1634         b->extra_data[0].off = 0;
1635         b->extra_data[0].len = len;
1636         b->extra_len += len;
1637 #else
1638         b = block_alloc(len, mem_flags);
1639         if (!b)
1640                 return 0;
1641         memmove(b->wp, from, len);
1642         b->wp += len;
1643 #endif
1644         return b;
1645 }
1646
1647 static ssize_t __qwrite(struct queue *q, void *vp, size_t len, int mem_flags,
1648                         int qio_flags)
1649 {
1650         ERRSTACK(1);
1651         size_t n;
1652         volatile size_t sofar = 0;      /* volatile for the waserror */
1653         struct block *b;
1654         uint8_t *p = vp;
1655         void *ext_buf;
1656
1657         /* Only some callers can throw.  Others might be in a context where waserror
1658          * isn't safe. */
1659         if ((qio_flags & QIO_CAN_ERR_SLEEP) && waserror()) {
1660                 /* Any error (EAGAIN for nonblock, syscall aborted, even EPIPE) after
1661                  * some data has been sent should be treated as a partial write. */
1662                 if (sofar)
1663                         goto out_ok;
1664                 nexterror();
1665         }
1666         do {
1667                 n = len - sofar;
1668                 /* This is 64K, the max amount per single block.  Still a good value? */
1669                 if (n > Maxatomic)
1670                         n = Maxatomic;
1671                 b = build_block(p + sofar, n, mem_flags);
1672                 if (!b)
1673                         break;
1674                 if (__qbwrite(q, b, qio_flags) < 0)
1675                         break;
1676                 sofar += n;
1677         } while ((sofar < len) && (q->state & Qmsg) == 0);
1678 out_ok:
1679         if (qio_flags & QIO_CAN_ERR_SLEEP)
1680                 poperror();
1681         return sofar;
1682 }
1683
1684 ssize_t qwrite(struct queue *q, void *vp, int len)
1685 {
1686         return __qwrite(q, vp, len, MEM_WAIT, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
1687 }
1688
1689 ssize_t qwrite_nonblock(struct queue *q, void *vp, int len)
1690 {
1691         return __qwrite(q, vp, len, MEM_WAIT, QIO_CAN_ERR_SLEEP | QIO_LIMIT |
1692                                               QIO_NON_BLOCK);
1693 }
1694
1695 ssize_t qiwrite(struct queue *q, void *vp, int len)
1696 {
1697         return __qwrite(q, vp, len, MEM_ATOMIC, 0);
1698 }
1699
1700 /*
1701  *  be extremely careful when calling this,
1702  *  as there is no reference accounting
1703  */
1704 void qfree(struct queue *q)
1705 {
1706         qclose(q);
1707         kfree(q);
1708 }
1709
1710 /*
1711  *  Mark a queue as closed.  No further IO is permitted.
1712  *  All blocks are released.
1713  */
1714 void qclose(struct queue *q)
1715 {
1716         struct block *bfirst;
1717
1718         if (q == NULL)
1719                 return;
1720
1721         /* mark it */
1722         spin_lock_irqsave(&q->lock);
1723         q->state |= Qclosed;
1724         q->state &= ~Qdropoverflow;
1725         q->err[0] = 0;
1726         bfirst = q->bfirst;
1727         q->bfirst = 0;
1728         q->dlen = 0;
1729         spin_unlock_irqsave(&q->lock);
1730
1731         /* free queued blocks */
1732         freeblist(bfirst);
1733
1734         /* wake up readers/writers */
1735         rendez_wakeup(&q->rr);
1736         rendez_wakeup(&q->wr);
1737         qwake_cb(q, FDTAP_FILT_HANGUP);
1738 }
1739
1740 /* Mark a queue as closed.  Wakeup any readers.  Don't remove queued blocks.
1741  *
1742  * msg will be the errstr received by any waiters (qread, qbread, etc).  If
1743  * there is no message, which is what also happens during a natural qclose(),
1744  * those waiters will simply return 0.  qwriters will always error() on a
1745  * closed/hungup queue. */
1746 void qhangup(struct queue *q, char *msg)
1747 {
1748         /* mark it */
1749         spin_lock_irqsave(&q->lock);
1750         q->state |= Qclosed;
1751         if (msg == 0 || *msg == 0)
1752                 q->err[0] = 0;
1753         else
1754                 strlcpy(q->err, msg, ERRMAX);
1755         spin_unlock_irqsave(&q->lock);
1756
1757         /* wake up readers/writers */
1758         rendez_wakeup(&q->rr);
1759         rendez_wakeup(&q->wr);
1760         qwake_cb(q, FDTAP_FILT_HANGUP);
1761 }
1762
1763 /*
1764  *  return non-zero if the q is hungup
1765  */
1766 int qisclosed(struct queue *q)
1767 {
1768         return q->state & Qclosed;
1769 }
1770
1771 /*
1772  *  mark a queue as no longer hung up.  resets the wake_cb.
1773  */
1774 void qreopen(struct queue *q)
1775 {
1776         spin_lock_irqsave(&q->lock);
1777         q->state &= ~Qclosed;
1778         q->eof = 0;
1779         q->limit = q->inilim;
1780         q->wake_cb = 0;
1781         q->wake_data = 0;
1782         spin_unlock_irqsave(&q->lock);
1783 }
1784
1785 /*
1786  *  return bytes queued
1787  */
1788 int qlen(struct queue *q)
1789 {
1790         return q->dlen;
1791 }
1792
1793 size_t q_bytes_read(struct queue *q)
1794 {
1795         return q->bytes_read;
1796 }
1797
1798 /*
1799  * return space remaining before flow control
1800  *
1801  *  This used to be
1802  *  q->len < q->limit/2
1803  *  but it slows down tcp too much for certain write sizes.
1804  *  I really don't understand it completely.  It may be
1805  *  due to the queue draining so fast that the transmission
1806  *  stalls waiting for the app to produce more data.  - presotto
1807  *
1808  *  q->len was the amount of bytes, which is no longer used.  we now use
1809  *  q->dlen, the amount of usable data.  a.k.a. qlen()...  - brho
1810  */
1811 int qwindow(struct queue *q)
1812 {
1813         int l;
1814
1815         l = q->limit - q->dlen;
1816         if (l < 0)
1817                 l = 0;
1818         return l;
1819 }
1820
1821 /*
1822  *  return true if we can read without blocking
1823  */
1824 int qcanread(struct queue *q)
1825 {
1826         return q->bfirst != 0;
1827 }
1828
1829 /*
1830  *  change queue limit
1831  */
1832 void qsetlimit(struct queue *q, int limit)
1833 {
1834         q->limit = limit;
1835 }
1836
1837 /*
1838  *  set whether writes drop overflowing blocks, or if we sleep
1839  */
1840 void qdropoverflow(struct queue *q, bool onoff)
1841 {
1842         spin_lock_irqsave(&q->lock);
1843         if (onoff)
1844                 q->state |= Qdropoverflow;
1845         else
1846                 q->state &= ~Qdropoverflow;
1847         spin_unlock_irqsave(&q->lock);
1848 }
1849
1850 /* Be careful: this can affect concurrent reads/writes and code that might have
1851  * built-in expectations of the q's type. */
1852 void q_toggle_qmsg(struct queue *q, bool onoff)
1853 {
1854         spin_lock_irqsave(&q->lock);
1855         if (onoff)
1856                 q->state |= Qmsg;
1857         else
1858                 q->state &= ~Qmsg;
1859         spin_unlock_irqsave(&q->lock);
1860 }
1861
1862 /* Be careful: this can affect concurrent reads/writes and code that might have
1863  * built-in expectations of the q's type. */
1864 void q_toggle_qcoalesce(struct queue *q, bool onoff)
1865 {
1866         spin_lock_irqsave(&q->lock);
1867         if (onoff)
1868                 q->state |= Qcoalesce;
1869         else
1870                 q->state &= ~Qcoalesce;
1871         spin_unlock_irqsave(&q->lock);
1872 }
1873
1874 /*
1875  *  flush the output queue
1876  */
1877 void qflush(struct queue *q)
1878 {
1879         struct block *bfirst;
1880
1881         /* mark it */
1882         spin_lock_irqsave(&q->lock);
1883         bfirst = q->bfirst;
1884         q->bfirst = 0;
1885         q->dlen = 0;
1886         spin_unlock_irqsave(&q->lock);
1887
1888         /* free queued blocks */
1889         freeblist(bfirst);
1890
1891         /* wake up writers */
1892         rendez_wakeup(&q->wr);
1893         qwake_cb(q, FDTAP_FILT_WRITABLE);
1894 }
1895
1896 int qfull(struct queue *q)
1897 {
1898         return !qwritable(q);
1899 }
1900
1901 int qstate(struct queue *q)
1902 {
1903         return q->state;
1904 }
1905
1906 void qdump(struct queue *q)
1907 {
1908         if (q)
1909                 printk("q=%p bfirst=%p blast=%p dlen=%d limit=%d state=#%x\n",
1910                            q, q->bfirst, q->blast, q->dlen, q->limit, q->state);
1911 }
1912
1913 /* On certain wakeup events, qio will call func(q, data, filter), where filter
1914  * marks the type of wakeup event (flags from FDTAP).
1915  *
1916  * There's no sync protection.  If you change the CB while the qio is running,
1917  * you might get a CB with the data or func from a previous set_wake_cb.  You
1918  * should set this once per queue and forget it.
1919  *
1920  * You can remove the CB by passing in 0 for the func.  Alternatively, you can
1921  * just make sure that the func(data) pair are valid until the queue is freed or
1922  * reopened. */
1923 void qio_set_wake_cb(struct queue *q, qio_wake_cb_t func, void *data)
1924 {
1925         q->wake_data = data;
1926         wmb();  /* if we see func, we'll also see the data for it */
1927         q->wake_cb = func;
1928 }
1929
1930 /* Helper for detecting whether we'll block on a read at this instant. */
1931 bool qreadable(struct queue *q)
1932 {
1933         return qlen(q) > 0;
1934 }
1935
1936 /* Helper for detecting whether we'll block on a write at this instant. */
1937 bool qwritable(struct queue *q)
1938 {
1939         return !q->limit || qwindow(q) > 0;
1940 }