16caad33078ffe06a740791b481b903257c1f12b
[akaros.git] / kern / src / ns / qio.c
1 /* Copyright © 1994-1999 Lucent Technologies Inc.  All rights reserved.
2  * Portions Copyright © 1997-1999 Vita Nuova Limited
3  * Portions Copyright © 2000-2007 Vita Nuova Holdings Limited
4  *                                (www.vitanuova.com)
5  * Revisions Copyright © 2000-2007 Lucent Technologies Inc. and others
6  *
7  * Modified for the Akaros operating system:
8  * Copyright (c) 2013-2014 The Regents of the University of California
9  * Copyright (c) 2013-2015 Google Inc.
10  *
11  * Permission is hereby granted, free of charge, to any person obtaining a copy
12  * of this software and associated documentation files (the "Software"), to deal
13  * in the Software without restriction, including without limitation the rights
14  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15  * copies of the Software, and to permit persons to whom the Software is
16  * furnished to do so, subject to the following conditions:
17  *
18  * The above copyright notice and this permission notice shall be included in
19  * all copies or substantial portions of the Software.
20  *
21  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL THE
24  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
27  * SOFTWARE. */
28
29 #include <slab.h>
30 #include <kmalloc.h>
31 #include <kref.h>
32 #include <string.h>
33 #include <stdio.h>
34 #include <assert.h>
35 #include <error.h>
36 #include <cpio.h>
37 #include <pmap.h>
38 #include <smp.h>
39 #include <net/ip.h>
40
41 #define PANIC_EXTRA(b)                                                  \
42 {                                                                       \
43         if ((b)->extra_len) {                                           \
44                 printblock(b);                                          \
45                 backtrace();                                            \
46                 panic("%s doesn't handle extra_data", __FUNCTION__);    \
47         }                                                               \
48 }
49
50 static uint32_t padblockcnt;
51 static uint32_t concatblockcnt;
52 static uint32_t pullupblockcnt;
53 static uint32_t copyblockcnt;
54 static uint32_t consumecnt;
55 static uint32_t producecnt;
56 static uint32_t qcopycnt;
57
58 static int debugging;
59
60 #define QDEBUG  if(0)
61
62 /*
63  *  IO queues
64  */
65
66 struct queue {
67         spinlock_t lock;;
68
69         struct block *bfirst;           /* buffer */
70         struct block *blast;
71
72         int dlen;                       /* data bytes in queue */
73         int limit;                      /* max bytes in queue */
74         int inilim;                     /* initial limit */
75         int state;
76         int eof;                        /* number of eofs read by user */
77         size_t bytes_read;
78
79         void (*kick) (void *);          /* restart output */
80         void (*bypass) (void *, struct block *); /* bypass queue altogether */
81         void *arg;                      /* argument to kick */
82
83         struct rendez rr;               /* process waiting to read */
84         struct rendez wr;               /* process waiting to write */
85         qio_wake_cb_t wake_cb;          /* callbacks for qio wakeups */
86         void *wake_data;
87
88         char err[ERRMAX];
89 };
90
91 enum {
92         Maxatomic = 64 * 1024,
93         QIO_CAN_ERR_SLEEP = (1 << 0),   /* can throw errors or block/sleep */
94         QIO_LIMIT = (1 << 1),           /* respect q->limit */
95         QIO_DROP_OVERFLOW = (1 << 2),   /* alternative to qdropoverflow */
96         QIO_JUST_ONE_BLOCK = (1 << 3),  /* when qbreading, just get one block */
97         QIO_NON_BLOCK = (1 << 4),       /* throw EAGAIN instead of blocking */
98         QIO_DONT_KICK = (1 << 5),       /* don't kick when waking */
99 };
100
101 unsigned int qiomaxatomic = Maxatomic;
102
103 static ssize_t __qbwrite(struct queue *q, struct block *b, int flags);
104 static struct block *__qbread(struct queue *q, size_t len, int qio_flags,
105                               int mem_flags);
106 static bool qwait_and_ilock(struct queue *q, int qio_flags);
107
108 /* Helper: fires a wake callback, sending 'filter' */
109 static void qwake_cb(struct queue *q, int filter)
110 {
111         if (q->wake_cb)
112                 q->wake_cb(q, q->wake_data, filter);
113 }
114
115 void ixsummary(void)
116 {
117         debugging ^= 1;
118         printd("pad %lu, concat %lu, pullup %lu, copy %lu\n",
119                    padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
120         printd("consume %lu, produce %lu, qcopy %lu\n",
121                    consumecnt, producecnt, qcopycnt);
122 }
123
124 /*
125  *  pad a block to the front (or the back if size is negative)
126  */
127 struct block *padblock(struct block *bp, int size)
128 {
129         int n;
130         struct block *nbp;
131
132         QDEBUG checkb(bp, "padblock 1");
133         if (size >= 0) {
134                 if (bp->rp - bp->base >= size) {
135                         bp->network_offset += size;
136                         bp->transport_offset += size;
137                         bp->rp -= size;
138                         return bp;
139                 }
140
141                 PANIC_EXTRA(bp);
142                 if (bp->next)
143                         panic("%s %p had a next", __func__, bp);
144                 n = BLEN(bp);
145                 padblockcnt++;
146                 nbp = block_alloc(size + n, MEM_WAIT);
147                 block_copy_metadata(nbp, bp);
148                 nbp->rp += size;
149                 nbp->wp = nbp->rp;
150                 memmove(nbp->wp, bp->rp, n);
151                 nbp->wp += n;
152                 freeb(bp);
153                 nbp->rp -= size;
154         } else {
155                 size = -size;
156
157                 PANIC_EXTRA(bp);
158
159                 if (bp->next)
160                         panic("%s %p had a next", __func__, bp);
161
162                 if (bp->lim - bp->wp >= size)
163                         return bp;
164
165                 n = BLEN(bp);
166                 padblockcnt++;
167                 nbp = block_alloc(size + n, MEM_WAIT);
168                 block_copy_metadata(nbp, bp);
169                 memmove(nbp->wp, bp->rp, n);
170                 nbp->wp += n;
171                 freeb(bp);
172         }
173         QDEBUG checkb(nbp, "padblock 1");
174         return nbp;
175 }
176
177 /*
178  *  return count of bytes in a string of blocks
179  */
180 int blocklen(struct block *bp)
181 {
182         int len;
183
184         len = 0;
185         while (bp) {
186                 len += BLEN(bp);
187                 bp = bp->next;
188         }
189         return len;
190 }
191
192 /*
193  * return count of space in blocks
194  */
195 int blockalloclen(struct block *bp)
196 {
197         int len;
198
199         len = 0;
200         while (bp) {
201                 len += BALLOC(bp);
202                 bp = bp->next;
203         }
204         return len;
205 }
206
207 /*
208  *  copy the  string of blocks into
209  *  a single block and free the string
210  */
211 struct block *concatblock(struct block *bp)
212 {
213         int len;
214         struct block *nb, *f;
215
216         if (bp->next == 0)
217                 return bp;
218
219         /* probably use parts of qclone */
220         PANIC_EXTRA(bp);
221         nb = block_alloc(blocklen(bp), MEM_WAIT);
222         for (f = bp; f; f = f->next) {
223                 len = BLEN(f);
224                 memmove(nb->wp, f->rp, len);
225                 nb->wp += len;
226         }
227         concatblockcnt += BLEN(nb);
228         freeblist(bp);
229         QDEBUG checkb(nb, "concatblock 1");
230         return nb;
231 }
232
233 /* Makes an identical copy of the block, collapsing all the data into the block
234  * body.  It does not point to the contents of the original, it is a copy
235  * (unlike qclone).  Since we're copying, we might as well put the memory into
236  * one contiguous chunk. */
237 struct block *copyblock(struct block *bp, int mem_flags)
238 {
239         struct block *newb;
240         struct extra_bdata *ebd;
241         size_t amt;
242
243         QDEBUG checkb(bp, "copyblock 0");
244         newb = block_alloc(BLEN(bp), mem_flags);
245         if (!newb)
246                 return 0;
247         amt = block_copy_to_body(newb, bp->rp, BHLEN(bp));
248         assert(amt == BHLEN(bp));
249         for (int i = 0; i < bp->nr_extra_bufs; i++) {
250                 ebd = &bp->extra_data[i];
251                 if (!ebd->base || !ebd->len)
252                         continue;
253                 amt = block_copy_to_body(newb, (void*)ebd->base + ebd->off,
254                                          ebd->len);
255                 assert(amt == ebd->len);
256         }
257         block_copy_metadata(newb, bp);
258         copyblockcnt++;
259         QDEBUG checkb(newb, "copyblock 1");
260         return newb;
261 }
262
263 /* Returns a block with the remaining contents of b all in the main body of the
264  * returned block.  Replace old references to b with the returned value (which
265  * may still be 'b', if no change was needed. */
266 struct block *linearizeblock(struct block *b)
267 {
268         struct block *newb;
269
270         if (!b->extra_len)
271                 return b;
272         newb = copyblock(b, MEM_WAIT);
273         freeb(b);
274         return newb;
275 }
276
277 /* Helper for bookkeeping: we removed amt bytes from block b's ebd, which may
278  * have emptied it. */
279 static void block_consume_ebd_bytes(struct block *b, struct extra_bdata *ebd,
280                                     size_t amt)
281 {
282         ebd->len -= amt;
283         ebd->off += amt;
284         b->extra_len -= amt;
285         if (!ebd->len) {
286                 /* we don't actually have to decref here.  it's also
287                  * done in freeb().  this is the earliest we can free. */
288                 kfree((void*)ebd->base);
289                 ebd->base = ebd->off = 0;
290         }
291 }
292
293 /* Make sure the first block has at least n bytes in its main body.  Pulls up
294  * data from the *list* of blocks.  Returns 0 if there is not enough data in the
295  * block list. */
296 struct block *pullupblock(struct block *bp, int n)
297 {
298         int i, len, seglen;
299         struct block *nbp;
300         struct extra_bdata *ebd;
301
302         /*
303          *  this should almost always be true, it's
304          *  just to avoid every caller checking.
305          */
306         if (BHLEN(bp) >= n)
307                 return bp;
308
309         /* If there's no chance, just bail out now.  This might be slightly
310          * wasteful if there's a long blist that does have enough data. */
311         if (n > blocklen(bp))
312                 return 0;
313         /* a start at explicit main-body / header management */
314         if (bp->extra_len) {
315                 if (n > bp->lim - bp->rp) {
316                         /* would need to realloc a new block and copy everything
317                          * over. */
318                         panic("can't pullup %d bytes, no place to put it: bp->lim %p, bp->rp %p, bp->lim-bp->rp %d\n",
319                               n, bp->lim, bp->rp, bp->lim-bp->rp);
320                 }
321                 len = n - BHLEN(bp);
322                 /* Would need to recursively call this, or otherwise pull from
323                  * later blocks and put chunks of their data into the block
324                  * we're building. */
325                 if (len > bp->extra_len)
326                         panic("pullup more than extra (%d, %d, %d)\n",
327                               n, BHLEN(bp), bp->extra_len);
328                 QDEBUG checkb(bp, "before pullup");
329                 for (int i = 0; (i < bp->nr_extra_bufs) && len; i++) {
330                         ebd = &bp->extra_data[i];
331                         if (!ebd->base || !ebd->len)
332                                 continue;
333                         seglen = MIN(ebd->len, len);
334                         memcpy(bp->wp, (void*)(ebd->base + ebd->off), seglen);
335                         bp->wp += seglen;
336                         len -= seglen;
337                         ebd->len -= seglen;
338                         ebd->off += seglen;
339                         bp->extra_len -= seglen;
340                         if (ebd->len == 0) {
341                                 kfree((void *)ebd->base);
342                                 ebd->off = 0;
343                                 ebd->base = 0;
344                         }
345                 }
346                 /* maybe just call pullupblock recursively here */
347                 if (len)
348                         panic("pullup %d bytes overdrawn\n", len);
349                 QDEBUG checkb(bp, "after pullup");
350                 return bp;
351         }
352
353         /*
354          *  if not enough room in the first block,
355          *  add another to the front of the list.
356          */
357         if (bp->lim - bp->rp < n) {
358                 nbp = block_alloc(n, MEM_WAIT);
359                 nbp->next = bp;
360                 bp = nbp;
361         }
362
363         /*
364          *  copy bytes from the trailing blocks into the first
365          */
366         n -= BLEN(bp);
367         while ((nbp = bp->next)) {
368                 i = BLEN(nbp);
369                 if (i > n) {
370                         memmove(bp->wp, nbp->rp, n);
371                         pullupblockcnt++;
372                         bp->wp += n;
373                         nbp->rp += n;
374                         QDEBUG checkb(bp, "pullupblock 1");
375                         return bp;
376                 } else {
377                         memmove(bp->wp, nbp->rp, i);
378                         pullupblockcnt++;
379                         bp->wp += i;
380                         bp->next = nbp->next;
381                         nbp->next = 0;
382                         freeb(nbp);
383                         n -= i;
384                         if (n == 0) {
385                                 QDEBUG checkb(bp, "pullupblock 2");
386                                 return bp;
387                         }
388                 }
389         }
390         freeb(bp);
391         return 0;
392 }
393
394 /*
395  *  make sure the first block has at least n bytes in its main body
396  */
397 struct block *pullupqueue(struct queue *q, int n)
398 {
399         struct block *b;
400
401         /* TODO: lock to protect the queue links? */
402         if ((BHLEN(q->bfirst) >= n))
403                 return q->bfirst;
404         q->bfirst = pullupblock(q->bfirst, n);
405         for (b = q->bfirst; b != NULL && b->next != NULL; b = b->next) ;
406         q->blast = b;
407         return q->bfirst;
408 }
409
410 /* throw away count bytes from the front of
411  * block's extradata.  Returns count of bytes
412  * thrown away
413  */
414
415 static int pullext(struct block *bp, int count)
416 {
417         struct extra_bdata *ed;
418         int i, rem, bytes = 0;
419
420         for (i = 0; bp->extra_len && count && i < bp->nr_extra_bufs; i++) {
421                 ed = &bp->extra_data[i];
422                 rem = MIN(count, ed->len);
423                 bp->extra_len -= rem;
424                 count -= rem;
425                 bytes += rem;
426                 ed->off += rem;
427                 ed->len -= rem;
428                 if (ed->len == 0) {
429                         kfree((void *)ed->base);
430                         ed->base = 0;
431                         ed->off = 0;
432                 }
433         }
434         return bytes;
435 }
436
437 /* throw away count bytes from the end of a
438  * block's extradata.  Returns count of bytes
439  * thrown away
440  */
441
442 static int dropext(struct block *bp, int count)
443 {
444         struct extra_bdata *ed;
445         int i, rem, bytes = 0;
446
447         for (i = bp->nr_extra_bufs - 1; bp->extra_len && count && i >= 0; i--) {
448                 ed = &bp->extra_data[i];
449                 rem = MIN(count, ed->len);
450                 bp->extra_len -= rem;
451                 count -= rem;
452                 bytes += rem;
453                 ed->len -= rem;
454                 if (ed->len == 0) {
455                         kfree((void *)ed->base);
456                         ed->base = 0;
457                         ed->off = 0;
458                 }
459         }
460         return bytes;
461 }
462
463 /*
464  *  throw away up to count bytes from a
465  *  list of blocks.  Return count of bytes
466  *  thrown away.
467  */
468 static int _pullblock(struct block **bph, int count, int free)
469 {
470         struct block *bp;
471         int n, bytes;
472
473         bytes = 0;
474         if (bph == NULL)
475                 return 0;
476
477         while (*bph != NULL && count != 0) {
478                 bp = *bph;
479
480                 n = MIN(BHLEN(bp), count);
481                 bytes += n;
482                 count -= n;
483                 bp->rp += n;
484                 n = pullext(bp, count);
485                 bytes += n;
486                 count -= n;
487                 QDEBUG checkb(bp, "pullblock ");
488                 if (BLEN(bp) == 0 && (free || count)) {
489                         *bph = bp->next;
490                         bp->next = NULL;
491                         freeb(bp);
492                 }
493         }
494         return bytes;
495 }
496
497 int pullblock(struct block **bph, int count)
498 {
499         return _pullblock(bph, count, 1);
500 }
501
502 /*
503  *  trim to len bytes starting at offset
504  */
505 struct block *trimblock(struct block *bp, int offset, int len)
506 {
507         uint32_t l, trim;
508         int olen = len;
509
510         QDEBUG checkb(bp, "trimblock 1");
511         if (blocklen(bp) < offset + len) {
512                 freeblist(bp);
513                 return NULL;
514         }
515
516         l =_pullblock(&bp, offset, 0);
517         if (bp == NULL)
518                 return NULL;
519         if (l != offset) {
520                 freeblist(bp);
521                 return NULL;
522         }
523
524         while ((l = BLEN(bp)) < len) {
525                 len -= l;
526                 bp = bp->next;
527         }
528
529         trim = BLEN(bp) - len;
530         trim -= dropext(bp, trim);
531         bp->wp -= trim;
532
533         if (bp->next) {
534                 freeblist(bp->next);
535                 bp->next = NULL;
536         }
537         return bp;
538 }
539
540 /* Adjust block @bp so that its size is exactly @len.
541  * If the size is increased, fill in the new contents with zeros.
542  * If the size is decreased, discard some of the old contents at the tail. */
543 struct block *adjustblock(struct block *bp, int len)
544 {
545         struct extra_bdata *ebd;
546         void *buf;
547         int i;
548
549         if (len < 0) {
550                 freeb(bp);
551                 return NULL;
552         }
553
554         if (len == BLEN(bp))
555                 return bp;
556
557         /* Shrink within block main body. */
558         if (len <= BHLEN(bp)) {
559                 free_block_extra(bp);
560                 bp->wp = bp->rp + len;
561                 QDEBUG checkb(bp, "adjustblock 1");
562                 return bp;
563         }
564
565         /* Need to grow. */
566         if (len > BLEN(bp)) {
567                 /* Grow within block main body. */
568                 if (bp->extra_len == 0 && bp->rp + len <= bp->lim) {
569                         memset(bp->wp, 0, len - BLEN(bp));
570                         bp->wp = bp->rp + len;
571                         QDEBUG checkb(bp, "adjustblock 2");
572                         return bp;
573                 }
574                 /* Grow with extra data buffers. */
575                 buf = kzmalloc(len - BLEN(bp), MEM_WAIT);
576                 block_append_extra(bp, (uintptr_t)buf, 0, len - BLEN(bp),
577                                    MEM_WAIT);
578                 QDEBUG checkb(bp, "adjustblock 3");
579                 return bp;
580         }
581
582         /* Shrink extra data buffers.
583          * len is how much of ebd we need to keep.
584          * extra_len is re-accumulated. */
585         assert(bp->extra_len > 0);
586         len -= BHLEN(bp);
587         bp->extra_len = 0;
588         for (i = 0; i < bp->nr_extra_bufs; i++) {
589                 ebd = &bp->extra_data[i];
590                 if (len <= ebd->len)
591                         break;
592                 len -= ebd->len;
593                 bp->extra_len += ebd->len;
594         }
595         /* If len becomes zero, extra_data[i] should be freed. */
596         if (len > 0) {
597                 ebd = &bp->extra_data[i];
598                 ebd->len = len;
599                 bp->extra_len += ebd->len;
600                 i++;
601         }
602         for (; i < bp->nr_extra_bufs; i++) {
603                 ebd = &bp->extra_data[i];
604                 if (ebd->base)
605                         kfree((void*)ebd->base);
606                 ebd->base = ebd->off = ebd->len = 0;
607         }
608         QDEBUG checkb(bp, "adjustblock 4");
609         return bp;
610 }
611
612 /* Helper: removes and returns the first block from q */
613 static struct block *pop_first_block(struct queue *q)
614 {
615         struct block *b = q->bfirst;
616
617         q->dlen -= BLEN(b);
618         q->bytes_read += BLEN(b);
619         q->bfirst = b->next;
620         b->next = 0;
621         return b;
622 }
623
624 /* Accounting helper.  Block b in q lost amt extra_data */
625 static void block_and_q_lost_extra(struct block *b, struct queue *q, size_t amt)
626 {
627         b->extra_len -= amt;
628         q->dlen -= amt;
629         q->bytes_read += amt;
630 }
631
632 /* Helper: moves ebd from a block (in from_q) to another block.  The *ebd is
633  * fixed in 'from', so we move its contents and zero it out in 'from'.
634  *
635  * Returns the length moved (0 on failure). */
636 static size_t move_ebd(struct extra_bdata *ebd, struct block *to,
637                        struct block *from, struct queue *from_q)
638 {
639         size_t ret = ebd->len;
640
641         if (block_append_extra(to, ebd->base, ebd->off, ebd->len, MEM_ATOMIC))
642                 return 0;
643         block_and_q_lost_extra(from, from_q, ebd->len);
644         ebd->base = ebd->len = ebd->off = 0;
645         return ret;
646 }
647
648 /* Copy up to len bytes from q->bfirst to @to, leaving the block in place.  May
649  * return with less than len, but greater than 0, even if there is more
650  * available in q.
651  *
652  * At any moment that we have copied anything and things are tricky, we can just
653  * return.  The trickiness comes from a bunch of variables: is the main body
654  * empty?  How do we split the ebd?  If our alloc fails, then we can fall back
655  * to @to's main body, but only if we haven't used it yet. */
656 static size_t copy_from_first_block(struct queue *q, struct block *to,
657                                     size_t len)
658 {
659         struct block *from = q->bfirst;
660         size_t copy_amt, amt;
661         struct extra_bdata *ebd;
662
663         assert(len < BLEN(from));       /* sanity */
664         /* Try to extract from the main body */
665         copy_amt = MIN(BHLEN(from), len);
666         if (copy_amt) {
667                 copy_amt = block_copy_to_body(to, from->rp, copy_amt);
668                 from->rp += copy_amt;
669                 /* We only change dlen, (data len), not q->len, since the q
670                  * still has the same block memory allocation (no kfrees
671                  * happened) */
672                 q->dlen -= copy_amt;
673                 q->bytes_read += copy_amt;
674         }
675         /* Try to extract the remainder from the extra data */
676         len -= copy_amt;
677         for (int i = 0; (i < from->nr_extra_bufs) && len; i++) {
678                 ebd = &from->extra_data[i];
679                 if (!ebd->base || !ebd->len)
680                         continue;
681                 if (len >= ebd->len) {
682                         amt = move_ebd(ebd, to, from, q);
683                         if (!amt) {
684                                 /* our internal alloc could have failed.   this
685                                  * ebd is now the last one we'll consider.
686                                  * let's handle it separately and put it in the
687                                  * main body. */
688                                 if (copy_amt)
689                                         return copy_amt;
690                                 copy_amt = block_copy_to_body(to,
691                                                               (void*)ebd->base +
692                                                               ebd->off,
693                                                               ebd->len);
694                                 block_and_q_lost_extra(from, q, copy_amt);
695                                 break;
696                         }
697                         len -= amt;
698                         copy_amt += amt;
699                         continue;
700                 } else {
701                         /* If we're here, we reached our final ebd, which we'll
702                          * need to split to get anything from it. */
703                         if (copy_amt)
704                                 return copy_amt;
705                         copy_amt = block_copy_to_body(to, (void*)ebd->base +
706                                                       ebd->off, len);
707                         ebd->off += copy_amt;
708                         ebd->len -= copy_amt;
709                         block_and_q_lost_extra(from, q, copy_amt);
710                         break;
711                 }
712         }
713         if (len)
714                 assert(copy_amt);       /* sanity */
715         return copy_amt;
716 }
717
718 /* Return codes for __qbread and __try_qbread. */
719 enum {
720         QBR_OK,
721         QBR_FAIL,
722         QBR_SPARE,      /* we need a spare block */
723         QBR_AGAIN,      /* do it again, we are coalescing blocks */
724 };
725
726 /* Helper and back-end for __qbread: extracts and returns a list of blocks
727  * containing up to len bytes.  It may contain less than len even if q has more
728  * data.
729  *
730  * Returns a code interpreted by __qbread, and the returned blist in ret. */
731 static int __try_qbread(struct queue *q, size_t len, int qio_flags,
732                         struct block **real_ret, struct block *spare)
733 {
734         struct block *ret, *ret_last, *first;
735         size_t blen;
736         bool was_unwritable = FALSE;
737
738         if (qio_flags & QIO_CAN_ERR_SLEEP) {
739                 if (!qwait_and_ilock(q, qio_flags)) {
740                         spin_unlock_irqsave(&q->lock);
741                         return QBR_FAIL;
742                 }
743                 /* we qwaited and still hold the lock, so the q is not empty */
744                 first = q->bfirst;
745         } else {
746                 spin_lock_irqsave(&q->lock);
747                 first = q->bfirst;
748                 if (!first) {
749                         spin_unlock_irqsave(&q->lock);
750                         return QBR_FAIL;
751                 }
752         }
753         /* We need to check before adjusting q->len.  We're checking the
754          * writer's sleep condition / tap condition.  When set, we *might* be
755          * making an edge transition (from unwritable to writable), which needs
756          * to wake and fire taps.  But, our read might not drain the queue below
757          * q->lim.  We'll check again later to see if we should really wake
758          * them.  */
759         was_unwritable = !qwritable(q);
760         blen = BLEN(first);
761         if ((q->state & Qcoalesce) && (blen == 0)) {
762                 freeb(pop_first_block(q));
763                 spin_unlock_irqsave(&q->lock);
764                 /* Need to retry to make sure we have a first block */
765                 return QBR_AGAIN;
766         }
767         /* Qmsg: just return the first block.  Be careful, since our caller
768          * might not read all of the block and thus drop bytes.  Similar to
769          * SOCK_DGRAM. */
770         if (q->state & Qmsg) {
771                 ret = pop_first_block(q);
772                 goto out_ok;
773         }
774         /* Let's get at least something first - makes the code easier.  This
775          * way, we'll only ever split the block once. */
776         if (blen <= len) {
777                 ret = pop_first_block(q);
778                 len -= blen;
779         } else {
780                 /* need to split the block.  we won't actually take the first
781                  * block out of the queue - we're just extracting a little bit.
782                  */
783                 if (!spare) {
784                         /* We have nothing and need a spare block.  Retry! */
785                         spin_unlock_irqsave(&q->lock);
786                         return QBR_SPARE;
787                 }
788                 copy_from_first_block(q, spare, len);
789                 ret = spare;
790                 goto out_ok;
791         }
792         /* At this point, we just grabbed the first block.  We can try to grab
793          * some more, up to len (if they want). */
794         if (qio_flags & QIO_JUST_ONE_BLOCK)
795                 goto out_ok;
796         ret_last = ret;
797         while (q->bfirst && (len > 0)) {
798                 blen = BLEN(q->bfirst);
799                 if ((q->state & Qcoalesce) && (blen == 0)) {
800                         /* remove the intermediate 0 blocks */
801                         freeb(pop_first_block(q));
802                         continue;
803                 }
804                 if (blen > len) {
805                         /* We could try to split the block, but that's a huge
806                          * pain.  For instance, we might need to move the main
807                          * body of b into an extra_data of ret_last.  lots of
808                          * ways for that to fail, and lots of cases to consider.
809                          * Easier to just bail out.  This is why I did the first
810                          * block above: we don't need to worry about this. */
811                          break;
812                 }
813                 ret_last->next = pop_first_block(q);
814                 ret_last = ret_last->next;
815                 len -= blen;
816         }
817 out_ok:
818         /* Don't wake them up or fire tap if we didn't drain enough. */
819         if (!qwritable(q))
820                 was_unwritable = FALSE;
821         spin_unlock_irqsave(&q->lock);
822         if (was_unwritable) {
823                 if (q->kick && !(qio_flags & QIO_DONT_KICK))
824                         q->kick(q->arg);
825                 rendez_wakeup(&q->wr);
826                 qwake_cb(q, FDTAP_FILT_WRITABLE);
827         }
828         *real_ret = ret;
829         return QBR_OK;
830 }
831
832 /* Helper and front-end for __try_qbread: extracts and returns a list of blocks
833  * containing up to len bytes.  It may contain less than len even if q has more
834  * data.
835  *
836  * Returns 0 if the q is closed, if it would require blocking and !CAN_BLOCK, or
837  * if it required a spare and the memory allocation failed.
838  *
839  * Technically, there's a weird corner case with !Qcoalesce and Qmsg where you
840  * could get a zero length block back. */
841 static struct block *__qbread(struct queue *q, size_t len, int qio_flags,
842                               int mem_flags)
843 {
844         ERRSTACK(1);
845         struct block *ret = 0;
846         struct block *volatile spare = 0;       /* volatile for the waserror */
847
848         /* __try_qbread can throw, based on qio flags. */
849         if ((qio_flags & QIO_CAN_ERR_SLEEP) && waserror()) {
850                 if (spare)
851                         freeb(spare);
852                 nexterror();
853         }
854         while (1) {
855                 switch (__try_qbread(q, len, qio_flags, &ret, spare)) {
856                 case QBR_OK:
857                 case QBR_FAIL:
858                         if (spare && (ret != spare))
859                                 freeb(spare);
860                         goto out_ret;
861                 case QBR_SPARE:
862                         assert(!spare);
863                         /* Due to some nastiness, we need a fresh block so we
864                          * can read out anything from the queue.  'len' seems
865                          * like a reasonable amount.  Maybe we can get away with
866                          * less. */
867                         spare = block_alloc(len, mem_flags);
868                         if (!spare) {
869                                 /* Careful here: a memory failure (possible with
870                                  * MEM_ATOMIC) could look like 'no data in the
871                                  * queue' (QBR_FAIL).  The only one who does is
872                                  * this qget(), who happens to know that we
873                                  * won't need a spare, due to the len argument.
874                                  * Spares are only needed when we need to split
875                                  * a block. */
876                                 ret = 0;
877                                 goto out_ret;
878                         }
879                         break;
880                 case QBR_AGAIN:
881                         /* if the first block is 0 and we are Qcoalesce, then
882                          * we'll need to try again.  We bounce out of __try so
883                          * we can perform the "is there a block" logic again
884                          * from the top. */
885                         break;
886                 }
887         }
888         assert(0);
889 out_ret:
890         if (qio_flags & QIO_CAN_ERR_SLEEP)
891                 poperror();
892         return ret;
893 }
894
895 /*
896  *  get next block from a queue, return null if nothing there
897  */
898 struct block *qget(struct queue *q)
899 {
900         /* since len == SIZE_MAX, we should never need to do a mem alloc */
901         return __qbread(q, SIZE_MAX, QIO_JUST_ONE_BLOCK, MEM_ATOMIC);
902 }
903
904 /* Throw away the next 'len' bytes in the queue returning the number actually
905  * discarded.
906  *
907  * If the bytes are in the queue, then they must be discarded.  The only time to
908  * return less than len is if the q itself has less than len bytes.
909  *
910  * This won't trigger a kick when waking up any sleepers.  This seems to be Plan
911  * 9's intent, since the TCP stack will deadlock if qdiscard kicks. */
912 size_t qdiscard(struct queue *q, size_t len)
913 {
914         struct block *blist;
915         size_t removed_amt;
916         size_t sofar = 0;
917
918         /* This is racy.  There could be multiple qdiscarders or other
919          * consumers, where the consumption could be interleaved. */
920         while (qlen(q) && len) {
921                 blist = __qbread(q, len, QIO_DONT_KICK, MEM_WAIT);
922                 removed_amt = freeblist(blist);
923                 sofar += removed_amt;
924                 len -= removed_amt;
925         }
926         return sofar;
927 }
928
929 ssize_t qpass(struct queue *q, struct block *b)
930 {
931         return __qbwrite(q, b, QIO_LIMIT | QIO_DROP_OVERFLOW);
932 }
933
934 ssize_t qpassnolim(struct queue *q, struct block *b)
935 {
936         return __qbwrite(q, b, 0);
937 }
938
939 /*
940  *  if the allocated space is way out of line with the used
941  *  space, reallocate to a smaller block
942  */
943 struct block *packblock(struct block *bp)
944 {
945         struct block **l, *nbp;
946         int n;
947
948         if (bp->extra_len)
949                 return bp;
950         for (l = &bp; *l; l = &(*l)->next) {
951                 nbp = *l;
952                 n = BLEN(nbp);
953                 if ((n << 2) < BALLOC(nbp)) {
954                         *l = block_alloc(n, MEM_WAIT);
955                         memmove((*l)->wp, nbp->rp, n);
956                         (*l)->wp += n;
957                         (*l)->next = nbp->next;
958                         nbp->next = NULL;
959                         freeb(nbp);
960                 }
961         }
962
963         return bp;
964 }
965
966 /* Add an extra_data entry to newb at newb_idx pointing to b's body, starting at
967  * body_rp, for up to len.  Returns the len consumed.
968  *
969  * The base is 'b', so that we can kfree it later.  This currently ties us to
970  * using kfree for the release method for all extra_data.
971  *
972  * It is possible to have a body size that is 0, if there is no offset, and
973  * b->wp == b->rp.  This will have an extra data entry of 0 length. */
974 static size_t point_to_body(struct block *b, uint8_t *body_rp,
975                             struct block *newb, unsigned int newb_idx,
976                             size_t len)
977 {
978         struct extra_bdata *ebd = &newb->extra_data[newb_idx];
979
980         assert(newb_idx < newb->nr_extra_bufs);
981
982         kmalloc_incref(b);
983         ebd->base = (uintptr_t)b;
984         ebd->off = (uint32_t)(body_rp - (uint8_t*)b);
985         ebd->len = MIN(b->wp - body_rp, len);   /* think of body_rp as b->rp */
986         assert((int)ebd->len >= 0);
987         newb->extra_len += ebd->len;
988         return ebd->len;
989 }
990
991 /* Add an extra_data entry to newb at newb_idx pointing to b's b_idx'th
992  * extra_data buf, at b_off within that buffer, for up to len.  Returns the len
993  * consumed.
994  *
995  * We can have blocks with 0 length, but they are still refcnt'd.  See above. */
996 static size_t point_to_buf(struct block *b, unsigned int b_idx, uint32_t b_off,
997                            struct block *newb, unsigned int newb_idx,
998                            size_t len)
999 {
1000         struct extra_bdata *n_ebd = &newb->extra_data[newb_idx];
1001         struct extra_bdata *b_ebd = &b->extra_data[b_idx];
1002
1003         assert(b_idx < b->nr_extra_bufs);
1004         assert(newb_idx < newb->nr_extra_bufs);
1005
1006         kmalloc_incref((void*)b_ebd->base);
1007         n_ebd->base = b_ebd->base;
1008         n_ebd->off = b_ebd->off + b_off;
1009         n_ebd->len = MIN(b_ebd->len - b_off, len);
1010         newb->extra_len += n_ebd->len;
1011         return n_ebd->len;
1012 }
1013
1014 /* given a string of blocks, sets up the new block's extra_data such that it
1015  * *points* to the contents of the blist [offset, len + offset).  This does not
1016  * make a separate copy of the contents of the blist.
1017  *
1018  * returns 0 on success.  the only failure is if the extra_data array was too
1019  * small, so this returns a positive integer saying how big the extra_data needs
1020  * to be.
1021  *
1022  * callers are responsible for protecting the list structure. */
1023 static int __blist_clone_to(struct block *blist, struct block *newb, int len,
1024                             uint32_t offset)
1025 {
1026         struct block *b, *first;
1027         unsigned int nr_bufs = 0;
1028         unsigned int b_idx, newb_idx = 0;
1029         uint8_t *first_main_body = 0;
1030         ssize_t sofar = 0;
1031
1032         /* find the first block; keep offset relative to the latest b in the
1033          * list */
1034         for (b = blist; b; b = b->next) {
1035                 if (BLEN(b) > offset)
1036                         break;
1037                 offset -= BLEN(b);
1038         }
1039         /* qcopy semantics: if you asked for an offset outside the block list,
1040          * you get an empty block back */
1041         if (!b)
1042                 return 0;
1043         first = b;
1044         sofar -= offset; /* don't count the remaining offset in the first b */
1045         /* upper bound for how many buffers we'll need in newb */
1046         for (/* b is set*/; b; b = b->next) {
1047                 nr_bufs += BHLEN(b) ? 1 : 0;
1048                 /* still assuming nr_extra == nr_valid */
1049                 nr_bufs += b->nr_extra_bufs;
1050                 sofar += BLEN(b);
1051                 if (sofar > len)
1052                         break;
1053         }
1054         /* we might be holding a spinlock here, so we won't wait for kmalloc */
1055         if (block_add_extd(newb, nr_bufs, 0) != 0) {
1056                 /* caller will need to alloc these, then re-call us */
1057                 return nr_bufs;
1058         }
1059         for (b = first; b && len; b = b->next) {
1060                 b_idx = 0;
1061                 if (offset) {
1062                         if (offset < BHLEN(b)) {
1063                                 /* off is in the main body */
1064                                 len -= point_to_body(b, b->rp + offset, newb,
1065                                                      newb_idx, len);
1066                                 newb_idx++;
1067                         } else {
1068                                 /* off is in one of the buffers (or just past
1069                                  * the last one).  we're not going to point to
1070                                  * b's main body at all. */
1071                                 offset -= BHLEN(b);
1072                                 assert(b->extra_data);
1073                                 /* assuming these extrabufs are packed, or at
1074                                  * least that len isn't gibberish */
1075                                 while (b->extra_data[b_idx].len <= offset) {
1076                                         offset -= b->extra_data[b_idx].len;
1077                                         b_idx++;
1078                                 }
1079                                 /* now offset is set to our offset in the
1080                                  * b_idx'th buf */
1081                                 len -= point_to_buf(b, b_idx, offset, newb,
1082                                                     newb_idx, len);
1083                                 newb_idx++;
1084                                 b_idx++;
1085                         }
1086                         offset = 0;
1087                 } else {
1088                         if (BHLEN(b)) {
1089                                 len -= point_to_body(b, b->rp, newb, newb_idx,
1090                                                      len);
1091                                 newb_idx++;
1092                         }
1093                 }
1094                 /* knock out all remaining bufs.  we only did one point_to_ op
1095                  * by now, and any point_to_ could be our last if it consumed
1096                  * all of len. */
1097                 for (int i = b_idx; (i < b->nr_extra_bufs) && len; i++) {
1098                         len -= point_to_buf(b, i, 0, newb, newb_idx, len);
1099                         newb_idx++;
1100                 }
1101         }
1102         return 0;
1103 }
1104
1105 struct block *blist_clone(struct block *blist, int header_len, int len,
1106                           uint32_t offset)
1107 {
1108         int ret;
1109         struct block *newb = block_alloc(header_len, MEM_WAIT);
1110
1111         do {
1112                 ret = __blist_clone_to(blist, newb, len, offset);
1113                 if (ret)
1114                         block_add_extd(newb, ret, MEM_WAIT);
1115         } while (ret);
1116         return newb;
1117 }
1118
1119 /* given a queue, makes a single block with header_len reserved space in the
1120  * block main body, and the contents of [offset, len + offset) pointed to in the
1121  * new blocks ext_data.  This does not make a copy of the q's contents, though
1122  * you do have a ref count on the memory. */
1123 struct block *qclone(struct queue *q, int header_len, int len, uint32_t offset)
1124 {
1125         int ret;
1126         struct block *newb = block_alloc(header_len, MEM_WAIT);
1127         /* the while loop should rarely be used: it would require someone
1128          * concurrently adding to the queue. */
1129         do {
1130                 /* TODO: RCU protect the q list (b->next) (need read lock) */
1131                 spin_lock_irqsave(&q->lock);
1132                 ret = __blist_clone_to(q->bfirst, newb, len, offset);
1133                 spin_unlock_irqsave(&q->lock);
1134                 if (ret)
1135                         block_add_extd(newb, ret, MEM_WAIT);
1136         } while (ret);
1137         return newb;
1138 }
1139
1140 struct block *qcopy(struct queue *q, int len, uint32_t offset)
1141 {
1142         return qclone(q, 0, len, offset);
1143 }
1144
1145 static void qinit_common(struct queue *q)
1146 {
1147         spinlock_init_irqsave(&q->lock);
1148         rendez_init(&q->rr);
1149         rendez_init(&q->wr);
1150 }
1151
1152 /*
1153  *  called by non-interrupt code
1154  */
1155 struct queue *qopen(int limit, int msg, void (*kick) (void *), void *arg)
1156 {
1157         struct queue *q;
1158
1159         q = kzmalloc(sizeof(struct queue), 0);
1160         if (q == 0)
1161                 return 0;
1162         qinit_common(q);
1163
1164         q->limit = q->inilim = limit;
1165         q->kick = kick;
1166         q->arg = arg;
1167         q->state = msg;
1168         q->eof = 0;
1169
1170         return q;
1171 }
1172
1173 /* open a queue to be bypassed */
1174 struct queue *qbypass(void (*bypass) (void *, struct block *), void *arg)
1175 {
1176         struct queue *q;
1177
1178         q = kzmalloc(sizeof(struct queue), 0);
1179         if (q == 0)
1180                 return 0;
1181         qinit_common(q);
1182
1183         q->limit = 0;
1184         q->arg = arg;
1185         q->bypass = bypass;
1186         q->state = 0;
1187
1188         return q;
1189 }
1190
1191 static int notempty(void *a)
1192 {
1193         struct queue *q = a;
1194
1195         return (q->state & Qclosed) || q->bfirst != 0;
1196 }
1197
1198 /* Block, waiting for the queue to be non-empty or closed.  Returns with
1199  * the spinlock held.  Returns TRUE when there queue is not empty, FALSE if it
1200  * was naturally closed.  Throws an error o/w. */
1201 static bool qwait_and_ilock(struct queue *q, int qio_flags)
1202 {
1203         while (1) {
1204                 spin_lock_irqsave(&q->lock);
1205                 if (q->bfirst != NULL)
1206                         return TRUE;
1207                 if (q->state & Qclosed) {
1208                         if (++q->eof > 3) {
1209                                 spin_unlock_irqsave(&q->lock);
1210                                 error(EPIPE,
1211                                       "multiple reads on a closed queue");
1212                         }
1213                         if (q->err[0]) {
1214                                 spin_unlock_irqsave(&q->lock);
1215                                 error(EPIPE, q->err);
1216                         }
1217                         return FALSE;
1218                 }
1219                 if (qio_flags & QIO_NON_BLOCK) {
1220                         spin_unlock_irqsave(&q->lock);
1221                         error(EAGAIN, "queue empty");
1222                 }
1223                 spin_unlock_irqsave(&q->lock);
1224                 /* As with the producer side, we check for a condition while
1225                  * holding the q->lock, decide to sleep, then unlock.  It's like
1226                  * the "check, signal, check again" pattern, but we do it
1227                  * conditionally.  Both sides agree synchronously to do it, and
1228                  * those decisions are made while holding q->lock.  I think this
1229                  * is OK.
1230                  *
1231                  * The invariant is that no reader sleeps when the queue has
1232                  * data.  While holding the rendez lock, if we see there's no
1233                  * data, we'll sleep.  Since we saw there was no data, the next
1234                  * writer will see (or already saw) no data, and then the writer
1235                  * decides to rendez_wake, which will grab the rendez lock.  If
1236                  * the writer already did that, then we'll see notempty when we
1237                  * do our check-again. */
1238                 rendez_sleep(&q->rr, notempty, q);
1239         }
1240 }
1241
1242 /*
1243  * add a block list to a queue
1244  * XXX basically the same as enqueue blist, and has no locking!
1245  */
1246 void qaddlist(struct queue *q, struct block *b)
1247 {
1248         /* TODO: q lock? */
1249         /* queue the block */
1250         if (q->bfirst)
1251                 q->blast->next = b;
1252         else
1253                 q->bfirst = b;
1254         q->dlen += blocklen(b);
1255         while (b->next)
1256                 b = b->next;
1257         q->blast = b;
1258 }
1259
1260 static size_t read_from_block(struct block *b, uint8_t *to, size_t amt)
1261 {
1262         size_t copy_amt, retval = 0;
1263         struct extra_bdata *ebd;
1264
1265         copy_amt = MIN(BHLEN(b), amt);
1266         memcpy(to, b->rp, copy_amt);
1267         /* advance the rp, since this block might not be completely consumed and
1268          * future reads need to know where to pick up from */
1269         b->rp += copy_amt;
1270         to += copy_amt;
1271         amt -= copy_amt;
1272         retval += copy_amt;
1273         for (int i = 0; (i < b->nr_extra_bufs) && amt; i++) {
1274                 ebd = &b->extra_data[i];
1275                 /* skip empty entires.  if we track this in the struct block, we
1276                  * can just start the for loop early */
1277                 if (!ebd->base || !ebd->len)
1278                         continue;
1279                 copy_amt = MIN(ebd->len, amt);
1280                 memcpy(to, (void*)(ebd->base + ebd->off), copy_amt);
1281                 /* we're actually consuming the entries, just like how we
1282                  * advance rp up above, and might only consume part of one. */
1283                 block_consume_ebd_bytes(b, ebd, copy_amt);
1284                 to += copy_amt;
1285                 amt -= copy_amt;
1286                 retval += copy_amt;
1287         }
1288         return retval;
1289 }
1290
1291 /*
1292  *  copy the contents of a string of blocks into
1293  *  memory.  emptied blocks are freed.  return
1294  *  pointer to first unconsumed block.
1295  */
1296 struct block *bl2mem(uint8_t * p, struct block *b, int n)
1297 {
1298         int i;
1299         struct block *next;
1300
1301         /* could be slicker here, since read_from_block is smart */
1302         for (; b != NULL; b = next) {
1303                 i = BLEN(b);
1304                 if (i > n) {
1305                         /* partial block, consume some */
1306                         read_from_block(b, p, n);
1307                         return b;
1308                 }
1309                 /* full block, consume all and move on */
1310                 i = read_from_block(b, p, i);
1311                 n -= i;
1312                 p += i;
1313                 next = b->next;
1314                 b->next = NULL;
1315                 freeb(b);
1316         }
1317         return NULL;
1318 }
1319
1320 /* Extract the contents of all blocks and copy to va, up to len.  Returns the
1321  * actual amount copied. */
1322 static size_t read_all_blocks(struct block *b, void *va, size_t len)
1323 {
1324         size_t sofar = 0;
1325         struct block *next;
1326
1327         do {
1328                 assert(va);
1329                 assert(b->rp);
1330                 sofar += read_from_block(b, va + sofar, len - sofar);
1331                 if (BLEN(b) && b->next)
1332                         panic("Failed to drain entire block (Qmsg?) but had a next!");
1333                 next = b->next;
1334                 b->next = NULL;
1335                 freeb(b);
1336                 b = next;
1337         } while (b);
1338         return sofar;
1339 }
1340
1341 /*
1342  *  copy the contents of memory into a string of blocks.
1343  *  return NULL on error.
1344  */
1345 struct block *mem2bl(uint8_t * p, int len)
1346 {
1347         ERRSTACK(1);
1348         int n;
1349         struct block *b, *first, **l;
1350
1351         first = NULL;
1352         l = &first;
1353         if (waserror()) {
1354                 freeblist(first);
1355                 nexterror();
1356         }
1357         do {
1358                 n = len;
1359                 if (n > Maxatomic)
1360                         n = Maxatomic;
1361
1362                 *l = b = block_alloc(n, MEM_WAIT);
1363                 /* TODO consider extra_data */
1364                 memmove(b->wp, p, n);
1365                 b->wp += n;
1366                 p += n;
1367                 len -= n;
1368                 l = &b->next;
1369         } while (len > 0);
1370         poperror();
1371
1372         return first;
1373 }
1374
1375 /*
1376  *  put a block back to the front of the queue
1377  *  called with q ilocked
1378  */
1379 void qputback(struct queue *q, struct block *b)
1380 {
1381         b->next = q->bfirst;
1382         if (q->bfirst == NULL)
1383                 q->blast = b;
1384         q->bfirst = b;
1385         q->dlen += BLEN(b);
1386         /* qputback seems to undo a read, so we can undo the accounting too. */
1387         q->bytes_read -= BLEN(b);
1388 }
1389
1390 /*
1391  *  get next block from a queue (up to a limit)
1392  *
1393  */
1394 struct block *qbread(struct queue *q, size_t len)
1395 {
1396         return __qbread(q, len, QIO_JUST_ONE_BLOCK | QIO_CAN_ERR_SLEEP,
1397                         MEM_WAIT);
1398 }
1399
1400 struct block *qbread_nonblock(struct queue *q, size_t len)
1401 {
1402         return __qbread(q, len, QIO_JUST_ONE_BLOCK | QIO_CAN_ERR_SLEEP |
1403                         QIO_NON_BLOCK, MEM_WAIT);
1404 }
1405
1406 /* read up to len from a queue into vp. */
1407 size_t qread(struct queue *q, void *va, size_t len)
1408 {
1409         struct block *blist = __qbread(q, len, QIO_CAN_ERR_SLEEP, MEM_WAIT);
1410
1411         if (!blist)
1412                 return 0;
1413         return read_all_blocks(blist, va, len);
1414 }
1415
1416 size_t qread_nonblock(struct queue *q, void *va, size_t len)
1417 {
1418         struct block *blist = __qbread(q, len, QIO_CAN_ERR_SLEEP |
1419                                        QIO_NON_BLOCK, MEM_WAIT);
1420
1421         if (!blist)
1422                 return 0;
1423         return read_all_blocks(blist, va, len);
1424 }
1425
1426 /* This is the rendez wake condition for writers. */
1427 static int qwriter_should_wake(void *a)
1428 {
1429         struct queue *q = a;
1430
1431         return qwritable(q) || (q->state & Qclosed);
1432 }
1433
1434 /* Helper: enqueues a list of blocks to a queue.  Returns the total length. */
1435 static size_t enqueue_blist(struct queue *q, struct block *b)
1436 {
1437         size_t dlen;
1438
1439         if (q->bfirst)
1440                 q->blast->next = b;
1441         else
1442                 q->bfirst = b;
1443         dlen = BLEN(b);
1444         while (b->next) {
1445                 b = b->next;
1446                 dlen += BLEN(b);
1447         }
1448         q->blast = b;
1449         q->dlen += dlen;
1450         return dlen;
1451 }
1452
1453 /* Adds block (which can be a list of blocks) to the queue, subject to
1454  * qio_flags.  Returns the length written on success or -1 on non-throwable
1455  * error.  Adjust qio_flags to control the value-added features!. */
1456 static ssize_t __qbwrite(struct queue *q, struct block *b, int qio_flags)
1457 {
1458         ssize_t ret;
1459         bool was_unreadable;
1460
1461         if (q->bypass) {
1462                 ret = blocklen(b);
1463                 (*q->bypass) (q->arg, b);
1464                 return ret;
1465         }
1466         spin_lock_irqsave(&q->lock);
1467         was_unreadable = q->dlen == 0;
1468         if (q->state & Qclosed) {
1469                 spin_unlock_irqsave(&q->lock);
1470                 freeblist(b);
1471                 if (!(qio_flags & QIO_CAN_ERR_SLEEP))
1472                         return -1;
1473                 if (q->err[0])
1474                         error(EPIPE, q->err);
1475                 else
1476                         error(EPIPE, "connection closed");
1477         }
1478         if ((qio_flags & QIO_LIMIT) && (q->dlen >= q->limit)) {
1479                 /* drop overflow takes priority over regular non-blocking */
1480                 if ((qio_flags & QIO_DROP_OVERFLOW)
1481                     || (q->state & Qdropoverflow)) {
1482                         spin_unlock_irqsave(&q->lock);
1483                         freeb(b);
1484                         return -1;
1485                 }
1486                 /* People shouldn't set NON_BLOCK without CAN_ERR, but we can be
1487                  * nice and catch it. */
1488                 if ((qio_flags & QIO_CAN_ERR_SLEEP)
1489                     && (qio_flags & QIO_NON_BLOCK)) {
1490                         spin_unlock_irqsave(&q->lock);
1491                         freeb(b);
1492                         error(EAGAIN, "queue full");
1493                 }
1494         }
1495         ret = enqueue_blist(q, b);
1496         QDEBUG checkb(b, "__qbwrite");
1497         spin_unlock_irqsave(&q->lock);
1498         /* TODO: not sure if the usage of a kick is mutually exclusive with a
1499          * wakeup, meaning that actual users either want a kick or have
1500          * qreaders. */
1501         if (q->kick && (was_unreadable || (q->state & Qkick)))
1502                 q->kick(q->arg);
1503         if (was_unreadable) {
1504                 /* Unlike the read side, there's no double-check to make sure
1505                  * the queue transitioned across an edge.  We know we added
1506                  * something, so that's enough.  We wake if the queue was empty.
1507                  * Both sides are the same, in that the condition for which we
1508                  * do the rendez_wakeup() is the same as the condition done for
1509                  * the rendez_sleep(). */
1510                 rendez_wakeup(&q->rr);
1511                 qwake_cb(q, FDTAP_FILT_READABLE);
1512         }
1513         /*
1514          *  flow control, wait for queue to get below the limit
1515          *  before allowing the process to continue and queue
1516          *  more.  We do this here so that postnote can only
1517          *  interrupt us after the data has been queued.  This
1518          *  means that things like 9p flushes and ssl messages
1519          *  will not be disrupted by software interrupts.
1520          *
1521          *  Note - this is moderately dangerous since a process
1522          *  that keeps getting interrupted and rewriting will
1523          *  queue infinite crud.
1524          */
1525         if ((qio_flags & QIO_CAN_ERR_SLEEP) &&
1526             !(q->state & Qdropoverflow) && !(qio_flags & QIO_NON_BLOCK)) {
1527                 /* This is a racy peek at the q status.  If we accidentally
1528                  * block, our rendez will return.  The rendez's peak
1529                  * (qwriter_should_wake) is also racy w.r.t.  the q's spinlock
1530                  * (that lock protects writes, but not reads).
1531                  *
1532                  * Here's the deal: when holding the rendez lock, if we see the
1533                  * sleep condition, the consumer will wake us.  The condition
1534                  * will only ever be changed by the next qbread() (consumer,
1535                  * changes q->dlen).  That code will do a rendez wake, which
1536                  * will spin on the rendez lock, meaning it won't procede until
1537                  * we either see the new state (and return) or put ourselves on
1538                  * the rendez, and wake up.
1539                  *
1540                  * The pattern is one side writes mem, then signals.  Our side
1541                  * checks the signal, then reads the mem.  The goal is to not
1542                  * miss seeing the signal AND missing the memory write.  In this
1543                  * specific case, the signal is actually synchronous (the rendez
1544                  * lock) and not basic shared memory.
1545                  *
1546                  * Oh, and we spin in case we woke early and someone else filled
1547                  * the queue, mesa-style. */
1548                 while (!qwriter_should_wake(q))
1549                         rendez_sleep(&q->wr, qwriter_should_wake, q);
1550         }
1551         return ret;
1552 }
1553
1554 /*
1555  *  add a block to a queue obeying flow control
1556  */
1557 ssize_t qbwrite(struct queue *q, struct block *b)
1558 {
1559         return __qbwrite(q, b, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
1560 }
1561
1562 ssize_t qbwrite_nonblock(struct queue *q, struct block *b)
1563 {
1564         return __qbwrite(q, b, QIO_CAN_ERR_SLEEP | QIO_LIMIT | QIO_NON_BLOCK);
1565 }
1566
1567 ssize_t qibwrite(struct queue *q, struct block *b)
1568 {
1569         return __qbwrite(q, b, 0);
1570 }
1571
1572 /* Helper, allocs a block and copies [from, from + len) into it.  Returns the
1573  * block on success, 0 on failure. */
1574 static struct block *build_block(void *from, size_t len, int mem_flags)
1575 {
1576         struct block *b;
1577         void *ext_buf;
1578
1579         /* If len is small, we don't need to bother with the extra_data.  But
1580          * until the whole stack can handle extd blocks, we'll use them
1581          * unconditionally.  */
1582
1583         /* allocb builds in 128 bytes of header space to all blocks, but this is
1584          * only available via padblock (to the left).  we also need some space
1585          * for pullupblock for some basic headers (like icmp) that get written
1586          * in directly */
1587         b = block_alloc(64, mem_flags);
1588         if (!b)
1589                 return 0;
1590         ext_buf = kmalloc(len, mem_flags);
1591         if (!ext_buf) {
1592                 kfree(b);
1593                 return 0;
1594         }
1595         memcpy(ext_buf, from, len);
1596         if (block_add_extd(b, 1, mem_flags)) {
1597                 kfree(ext_buf);
1598                 kfree(b);
1599                 return 0;
1600         }
1601         b->extra_data[0].base = (uintptr_t)ext_buf;
1602         b->extra_data[0].off = 0;
1603         b->extra_data[0].len = len;
1604         b->extra_len += len;
1605         return b;
1606 }
1607
1608 static ssize_t __qwrite(struct queue *q, void *vp, size_t len, int mem_flags,
1609                         int qio_flags)
1610 {
1611         ERRSTACK(1);
1612         size_t n;
1613         volatile size_t sofar = 0;      /* volatile for the waserror */
1614         struct block *b;
1615         uint8_t *p = vp;
1616         void *ext_buf;
1617
1618         /* Only some callers can throw.  Others might be in a context where
1619          * waserror isn't safe. */
1620         if ((qio_flags & QIO_CAN_ERR_SLEEP) && waserror()) {
1621                 /* Any error (EAGAIN for nonblock, syscall aborted, even EPIPE)
1622                  * after some data has been sent should be treated as a partial
1623                  * write. */
1624                 if (sofar)
1625                         goto out_ok;
1626                 nexterror();
1627         }
1628         do {
1629                 n = len - sofar;
1630                 /* This is 64K, the max amount per single block.  Still a good
1631                  * value? */
1632                 if (n > Maxatomic)
1633                         n = Maxatomic;
1634                 b = build_block(p + sofar, n, mem_flags);
1635                 if (!b)
1636                         break;
1637                 if (__qbwrite(q, b, qio_flags) < 0)
1638                         break;
1639                 sofar += n;
1640         } while ((sofar < len) && (q->state & Qmsg) == 0);
1641 out_ok:
1642         if (qio_flags & QIO_CAN_ERR_SLEEP)
1643                 poperror();
1644         return sofar;
1645 }
1646
1647 ssize_t qwrite(struct queue *q, void *vp, int len)
1648 {
1649         return __qwrite(q, vp, len, MEM_WAIT, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
1650 }
1651
1652 ssize_t qwrite_nonblock(struct queue *q, void *vp, int len)
1653 {
1654         return __qwrite(q, vp, len, MEM_WAIT, QIO_CAN_ERR_SLEEP | QIO_LIMIT |
1655                                               QIO_NON_BLOCK);
1656 }
1657
1658 ssize_t qiwrite(struct queue *q, void *vp, int len)
1659 {
1660         return __qwrite(q, vp, len, MEM_ATOMIC, 0);
1661 }
1662
1663 /*
1664  *  be extremely careful when calling this,
1665  *  as there is no reference accounting
1666  */
1667 void qfree(struct queue *q)
1668 {
1669         qclose(q);
1670         kfree(q);
1671 }
1672
1673 /*
1674  *  Mark a queue as closed.  No further IO is permitted.
1675  *  All blocks are released.
1676  */
1677 void qclose(struct queue *q)
1678 {
1679         struct block *bfirst;
1680
1681         if (q == NULL)
1682                 return;
1683
1684         /* mark it */
1685         spin_lock_irqsave(&q->lock);
1686         q->state |= Qclosed;
1687         q->state &= ~Qdropoverflow;
1688         q->err[0] = 0;
1689         bfirst = q->bfirst;
1690         q->bfirst = 0;
1691         q->dlen = 0;
1692         spin_unlock_irqsave(&q->lock);
1693
1694         /* free queued blocks */
1695         freeblist(bfirst);
1696
1697         /* wake up readers/writers */
1698         rendez_wakeup(&q->rr);
1699         rendez_wakeup(&q->wr);
1700         qwake_cb(q, FDTAP_FILT_HANGUP);
1701 }
1702
1703 /* Mark a queue as closed.  Wakeup any readers.  Don't remove queued blocks.
1704  *
1705  * msg will be the errstr received by any waiters (qread, qbread, etc).  If
1706  * there is no message, which is what also happens during a natural qclose(),
1707  * those waiters will simply return 0.  qwriters will always error() on a
1708  * closed/hungup queue. */
1709 void qhangup(struct queue *q, char *msg)
1710 {
1711         /* mark it */
1712         spin_lock_irqsave(&q->lock);
1713         q->state |= Qclosed;
1714         if (msg == 0 || *msg == 0)
1715                 q->err[0] = 0;
1716         else
1717                 strlcpy(q->err, msg, ERRMAX);
1718         spin_unlock_irqsave(&q->lock);
1719
1720         /* wake up readers/writers */
1721         rendez_wakeup(&q->rr);
1722         rendez_wakeup(&q->wr);
1723         qwake_cb(q, FDTAP_FILT_HANGUP);
1724 }
1725
1726 /*
1727  *  return non-zero if the q is hungup
1728  */
1729 int qisclosed(struct queue *q)
1730 {
1731         return q->state & Qclosed;
1732 }
1733
1734 /*
1735  *  mark a queue as no longer hung up.  resets the wake_cb.
1736  */
1737 void qreopen(struct queue *q)
1738 {
1739         spin_lock_irqsave(&q->lock);
1740         q->state &= ~Qclosed;
1741         q->eof = 0;
1742         q->limit = q->inilim;
1743         q->wake_cb = 0;
1744         q->wake_data = 0;
1745         spin_unlock_irqsave(&q->lock);
1746 }
1747
1748 /*
1749  *  return bytes queued
1750  */
1751 int qlen(struct queue *q)
1752 {
1753         return q->dlen;
1754 }
1755
1756 size_t q_bytes_read(struct queue *q)
1757 {
1758         return q->bytes_read;
1759 }
1760
1761 /*
1762  * return space remaining before flow control
1763  *
1764  *  This used to be
1765  *  q->len < q->limit/2
1766  *  but it slows down tcp too much for certain write sizes.
1767  *  I really don't understand it completely.  It may be
1768  *  due to the queue draining so fast that the transmission
1769  *  stalls waiting for the app to produce more data.  - presotto
1770  *
1771  *  q->len was the amount of bytes, which is no longer used.  we now use
1772  *  q->dlen, the amount of usable data.  a.k.a. qlen()...  - brho
1773  */
1774 int qwindow(struct queue *q)
1775 {
1776         int l;
1777
1778         l = q->limit - q->dlen;
1779         if (l < 0)
1780                 l = 0;
1781         return l;
1782 }
1783
1784 /*
1785  *  return true if we can read without blocking
1786  */
1787 int qcanread(struct queue *q)
1788 {
1789         return q->bfirst != 0;
1790 }
1791
1792 /*
1793  *  change queue limit
1794  */
1795 void qsetlimit(struct queue *q, size_t limit)
1796 {
1797         bool was_writable = qwritable(q);
1798
1799         q->limit = limit;
1800         if (!was_writable && qwritable(q)) {
1801                 rendez_wakeup(&q->wr);
1802                 qwake_cb(q, FDTAP_FILT_WRITABLE);
1803         }
1804 }
1805
1806 size_t qgetlimit(struct queue *q)
1807 {
1808         return q->limit;
1809 }
1810
1811 /*
1812  *  set whether writes drop overflowing blocks, or if we sleep
1813  */
1814 void qdropoverflow(struct queue *q, bool onoff)
1815 {
1816         spin_lock_irqsave(&q->lock);
1817         if (onoff)
1818                 q->state |= Qdropoverflow;
1819         else
1820                 q->state &= ~Qdropoverflow;
1821         spin_unlock_irqsave(&q->lock);
1822 }
1823
1824 /* Be careful: this can affect concurrent reads/writes and code that might have
1825  * built-in expectations of the q's type. */
1826 void q_toggle_qmsg(struct queue *q, bool onoff)
1827 {
1828         spin_lock_irqsave(&q->lock);
1829         if (onoff)
1830                 q->state |= Qmsg;
1831         else
1832                 q->state &= ~Qmsg;
1833         spin_unlock_irqsave(&q->lock);
1834 }
1835
1836 /* Be careful: this can affect concurrent reads/writes and code that might have
1837  * built-in expectations of the q's type. */
1838 void q_toggle_qcoalesce(struct queue *q, bool onoff)
1839 {
1840         spin_lock_irqsave(&q->lock);
1841         if (onoff)
1842                 q->state |= Qcoalesce;
1843         else
1844                 q->state &= ~Qcoalesce;
1845         spin_unlock_irqsave(&q->lock);
1846 }
1847
1848 /*
1849  *  flush the output queue
1850  */
1851 void qflush(struct queue *q)
1852 {
1853         struct block *bfirst;
1854
1855         /* mark it */
1856         spin_lock_irqsave(&q->lock);
1857         bfirst = q->bfirst;
1858         q->bfirst = 0;
1859         q->dlen = 0;
1860         spin_unlock_irqsave(&q->lock);
1861
1862         /* free queued blocks */
1863         freeblist(bfirst);
1864
1865         /* wake up writers */
1866         rendez_wakeup(&q->wr);
1867         qwake_cb(q, FDTAP_FILT_WRITABLE);
1868 }
1869
1870 int qfull(struct queue *q)
1871 {
1872         return !qwritable(q);
1873 }
1874
1875 int qstate(struct queue *q)
1876 {
1877         return q->state;
1878 }
1879
1880 void qdump(struct queue *q)
1881 {
1882         if (q)
1883                 printk("q=%p bfirst=%p blast=%p dlen=%d limit=%d state=#%x\n",
1884                            q, q->bfirst, q->blast, q->dlen, q->limit, q->state);
1885 }
1886
1887 /* On certain wakeup events, qio will call func(q, data, filter), where filter
1888  * marks the type of wakeup event (flags from FDTAP).
1889  *
1890  * There's no sync protection.  If you change the CB while the qio is running,
1891  * you might get a CB with the data or func from a previous set_wake_cb.  You
1892  * should set this once per queue and forget it.
1893  *
1894  * You can remove the CB by passing in 0 for the func.  Alternatively, you can
1895  * just make sure that the func(data) pair are valid until the queue is freed or
1896  * reopened. */
1897 void qio_set_wake_cb(struct queue *q, qio_wake_cb_t func, void *data)
1898 {
1899         q->wake_data = data;
1900         wmb();  /* if we see func, we'll also see the data for it */
1901         q->wake_cb = func;
1902 }
1903
1904 /* Helper for detecting whether we'll block on a read at this instant. */
1905 bool qreadable(struct queue *q)
1906 {
1907         return qlen(q) > 0;
1908 }
1909
1910 /* Helper for detecting whether we'll block on a write at this instant. */
1911 bool qwritable(struct queue *q)
1912 {
1913         return !q->limit || qwindow(q) > 0;
1914 }