kconfig: use pkg-config for ncurses detection
[akaros.git] / kern / src / ns / qio.c
1 /* Copyright © 1994-1999 Lucent Technologies Inc.  All rights reserved.
2  * Portions Copyright © 1997-1999 Vita Nuova Limited
3  * Portions Copyright © 2000-2007 Vita Nuova Holdings Limited
4  *                                (www.vitanuova.com)
5  * Revisions Copyright © 2000-2007 Lucent Technologies Inc. and others
6  *
7  * Modified for the Akaros operating system:
8  * Copyright (c) 2013-2014 The Regents of the University of California
9  * Copyright (c) 2013-2015 Google Inc.
10  *
11  * Permission is hereby granted, free of charge, to any person obtaining a copy
12  * of this software and associated documentation files (the "Software"), to deal
13  * in the Software without restriction, including without limitation the rights
14  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15  * copies of the Software, and to permit persons to whom the Software is
16  * furnished to do so, subject to the following conditions:
17  *
18  * The above copyright notice and this permission notice shall be included in
19  * all copies or substantial portions of the Software.
20  *
21  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL THE
24  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
27  * SOFTWARE. */
28
29 #include <slab.h>
30 #include <kmalloc.h>
31 #include <kref.h>
32 #include <string.h>
33 #include <stdio.h>
34 #include <assert.h>
35 #include <error.h>
36 #include <cpio.h>
37 #include <pmap.h>
38 #include <smp.h>
39 #include <net/ip.h>
40
41 static uint32_t padblockcnt;
42 static uint32_t concatblockcnt;
43 static uint32_t pullupblockcnt;
44 static uint32_t copyblockcnt;
45 static uint32_t consumecnt;
46 static uint32_t producecnt;
47 static uint32_t qcopycnt;
48
49 static int debugging;
50
51 #define QDEBUG  if(0)
52
53 /*
54  *  IO queues
55  */
56
57 struct queue {
58         spinlock_t lock;;
59
60         struct block *bfirst;           /* buffer */
61         struct block *blast;
62
63         int dlen;                       /* data bytes in queue */
64         int limit;                      /* max bytes in queue */
65         int inilim;                     /* initial limit */
66         int state;
67         int eof;                        /* number of eofs read by user */
68         size_t bytes_read;
69
70         void (*kick) (void *);          /* restart output */
71         void (*bypass) (void *, struct block *); /* bypass queue altogether */
72         void *arg;                      /* argument to kick */
73
74         struct rendez rr;               /* process waiting to read */
75         struct rendez wr;               /* process waiting to write */
76         qio_wake_cb_t wake_cb;          /* callbacks for qio wakeups */
77         void *wake_data;
78
79         char err[ERRMAX];
80 };
81
82 enum {
83         Maxatomic = 64 * 1024,
84         QIO_CAN_ERR_SLEEP = (1 << 0),   /* can throw errors or block/sleep */
85         QIO_LIMIT = (1 << 1),           /* respect q->limit */
86         QIO_DROP_OVERFLOW = (1 << 2),   /* alternative to qdropoverflow */
87         QIO_JUST_ONE_BLOCK = (1 << 3),  /* when qbreading, just get one block */
88         QIO_NON_BLOCK = (1 << 4),       /* throw EAGAIN instead of blocking */
89         QIO_DONT_KICK = (1 << 5),       /* don't kick when waking */
90 };
91
92 unsigned int qiomaxatomic = Maxatomic;
93
94 static ssize_t __qbwrite(struct queue *q, struct block *b, int flags);
95 static struct block *__qbread(struct queue *q, size_t len, int qio_flags,
96                               int mem_flags);
97 static bool qwait_and_ilock(struct queue *q, int qio_flags);
98
99 /* Helper: fires a wake callback, sending 'filter' */
100 static void qwake_cb(struct queue *q, int filter)
101 {
102         if (q->wake_cb)
103                 q->wake_cb(q, q->wake_data, filter);
104 }
105
106 void ixsummary(void)
107 {
108         debugging ^= 1;
109         printd("pad %lu, concat %lu, pullup %lu, copy %lu\n",
110                    padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
111         printd("consume %lu, produce %lu, qcopy %lu\n",
112                    consumecnt, producecnt, qcopycnt);
113 }
114
115 /* Pad a block to the front (or the back if size is negative).  Returns the
116  * block pointer that you must use instead of bp.  i.e. bp = padblock(bp, x);
117  *
118  * This space is in the main body / header space, not the extra data.  In
119  * essence, the block has size bytes of uninitialized data placed in the front
120  * (left) of the old header data.  The caller needs to fill in that data,
121  * presumably with packet headers.  Any block[offset] in the old block is now at
122  * block[offset + size].
123  *
124  * Negative padding applies at the end of the block.  This means that there is
125  * space in block header for size bytes (in between wp and lim).  Given that all
126  * of the block 'padding' needs to be in the main header, that means we need to
127  * linearize the entire block, such that the end padding is in the main
128  * body/header space.
129  */
130 struct block *padblock(struct block *bp, int size)
131 {
132         int n;
133         struct block *nbp;
134
135         QDEBUG checkb(bp, "padblock 1");
136         if (size >= 0) {
137                 if (bp->rp - bp->base >= size) {
138                         block_add_to_offsets(bp, size);
139                         bp->rp -= size;
140                         return bp;
141                 }
142                 if (bp->next)
143                         panic("%s %p had a next", __func__, bp);
144                 n = BHLEN(bp);
145                 padblockcnt++;
146                 nbp = block_alloc(size + n, MEM_WAIT);
147                 block_copy_metadata(nbp, bp);
148                 block_replace_extras(nbp, bp);
149                 /* This effectively copies the old block main body such that we
150                  * know we have size bytes to the left of rp.  All of the
151                  * metadata offsets (e.g. tx_csum) are relative from this blob
152                  * of data: i.e. nbp->rp + size. */
153                 nbp->rp += size;
154                 nbp->wp = nbp->rp;
155                 memmove(nbp->wp, bp->rp, n);
156                 nbp->wp += n;
157                 freeb(bp);
158                 block_add_to_offsets(nbp, size);
159                 nbp->rp -= size;
160         } else {
161                 /* No one I know of calls this yet, so this is untested.  Maybe
162                  * we can remove it. */
163                 warn_once("pad attempt with negative size %d", size);
164                 size = -size;
165                 if (bp->next)
166                         panic("%s %p had a next", __func__, bp);
167                 if (bp->lim - bp->wp >= size)
168                         return bp;
169                 /* Negative padding goes after all data.  In essence, we'll need
170                  * to pull all block extra data up into the headers.  The
171                  * easiest thing is to linearize, then do the old algorithm.  We
172                  * may do extra allocations - if we ever use this, we can fix it
173                  * up.  Maybe make linearizeblock/copyblock take neg-padding. */
174                 if (bp->extra_len) {
175                         bp = linearizeblock(bp);
176                         if (bp->lim - bp->wp >= size)
177                                 return bp;
178                 }
179                 n = BLEN(bp);
180                 padblockcnt++;
181                 nbp = block_alloc(size + n, MEM_WAIT);
182                 block_copy_metadata(nbp, bp);
183                 memmove(nbp->wp, bp->rp, n);
184                 nbp->wp += n;
185                 freeb(bp);
186         }
187         QDEBUG checkb(nbp, "padblock 1");
188         return nbp;
189 }
190
191 /*
192  *  return count of bytes in a string of blocks
193  */
194 int blocklen(struct block *bp)
195 {
196         int len;
197
198         len = 0;
199         while (bp) {
200                 len += BLEN(bp);
201                 bp = bp->next;
202         }
203         return len;
204 }
205
206 /*
207  * return count of space in blocks
208  */
209 int blockalloclen(struct block *bp)
210 {
211         int len;
212
213         len = 0;
214         while (bp) {
215                 len += BALLOC(bp);
216                 bp = bp->next;
217         }
218         return len;
219 }
220
221 /*
222  *  copy the  string of blocks into
223  *  a single block and free the string
224  */
225 struct block *concatblock(struct block *bp)
226 {
227         if (bp->next == 0)
228                 return bp;
229         /* If this is slow, we can get fancy and append a bunch of ebds to bp
230          * for each subsequent block in the list. */
231         return pullupblock(bp, blocklen(bp));
232 }
233
234 /* Makes an identical copy of the block, collapsing all the data into the block
235  * body.  It does not point to the contents of the original, it is a copy
236  * (unlike qclone).  Since we're copying, we might as well put the memory into
237  * one contiguous chunk. */
238 struct block *copyblock(struct block *bp, int mem_flags)
239 {
240         struct block *newb;
241         struct extra_bdata *ebd;
242         size_t amt;
243
244         QDEBUG checkb(bp, "copyblock 0");
245         newb = block_alloc(BLEN(bp), mem_flags);
246         if (!newb)
247                 return 0;
248         amt = block_copy_to_body(newb, bp->rp, BHLEN(bp));
249         assert(amt == BHLEN(bp));
250         for (int i = 0; i < bp->nr_extra_bufs; i++) {
251                 ebd = &bp->extra_data[i];
252                 if (!ebd->base || !ebd->len)
253                         continue;
254                 amt = block_copy_to_body(newb, (void*)ebd->base + ebd->off,
255                                          ebd->len);
256                 assert(amt == ebd->len);
257         }
258         block_copy_metadata(newb, bp);
259         copyblockcnt++;
260         QDEBUG checkb(newb, "copyblock 1");
261         return newb;
262 }
263
264 /* Returns a block with the remaining contents of b all in the main body of the
265  * returned block.  Replace old references to b with the returned value (which
266  * may still be 'b', if no change was needed. */
267 struct block *linearizeblock(struct block *b)
268 {
269         struct block *newb;
270
271         if (!b->extra_len)
272                 return b;
273         newb = copyblock(b, MEM_WAIT);
274         freeb(b);
275         return newb;
276 }
277
278 /* Helper for bookkeeping: we removed amt bytes from block b's ebd, which may
279  * have emptied it. */
280 static void block_consume_ebd_bytes(struct block *b, struct extra_bdata *ebd,
281                                     size_t amt)
282 {
283         ebd->len -= amt;
284         ebd->off += amt;
285         b->extra_len -= amt;
286         if (!ebd->len) {
287                 /* we don't actually have to decref here.  it's also
288                  * done in freeb().  this is the earliest we can free. */
289                 kfree((void*)ebd->base);
290                 ebd->base = ebd->off = 0;
291         }
292 }
293
294 /* Helper: Yanks up to size bytes worth of extra data blocks from 'from', puts
295  * them in the main body of 'to'.  Returns the amount yanked.  Will panic if
296  * there is not enough room in 'to'. */
297 static size_t block_yank_ebd_bytes(struct block *to, struct block *from,
298                                    size_t amt)
299 {
300         size_t copy_amt, total = 0;
301         struct extra_bdata *ebd;
302
303         for (int i = 0; amt && i < from->nr_extra_bufs; i++) {
304                 ebd = &from->extra_data[i];
305                 if (!ebd->base || !ebd->len)
306                         continue;
307                 copy_amt = MIN(amt, ebd->len);
308                 copy_amt = block_copy_to_body(to, (void*)ebd->base + ebd->off,
309                                               copy_amt);
310                 if (copy_amt != MIN(amt, ebd->len))
311                         panic("'to' block didn't have enough space! %d %d %d",
312                               amt, ebd->len, copy_amt);
313                 block_consume_ebd_bytes(from, ebd, copy_amt);
314                 total += copy_amt;
315                 amt -= copy_amt;
316         }
317         return total;
318 }
319
320 /* Make sure the first block has at least n bytes in its headers/main body.
321  * Pulls up data from the *list* of blocks.  Returns 0 if there is not enough
322  * data in the block list.
323  *
324  * Any data to the *left* of rp, such as old network headers that were popped
325  * off when processing an inbound packet, may be discarded. */
326 struct block *pullupblock(struct block *bp, size_t n)
327 {
328         struct block *i;
329         struct extra_bdata *ebd;
330         size_t copy_amt;
331
332         /*
333          *  this should almost always be true, it's
334          *  just to avoid every caller checking.
335          */
336         if (BHLEN(bp) >= n)
337                 return bp;
338
339         /* If there's no chance, just bail out now.  This might be slightly
340          * wasteful if there's a long blist that does have enough data. */
341         if (n > blocklen(bp))
342                 return 0;
343
344         /* Replace bp with a new block with enough size, and yank up enough of
345          * its ebds. */
346         bp = block_realloc(bp, n);
347         pullupblockcnt++;
348         n -= BHLEN(bp);
349         n -= block_yank_ebd_bytes(bp, bp, n);
350         /* Need to pull from the remainder of the list, both the main bodies and
351          * the ebds. */
352         while (n) {
353                 i = bp->next;
354                 if (!i)
355                         panic("Not enough b's in the list; we checked the len");
356                 copy_amt = MIN(BHLEN(i), n);
357                 block_copy_to_body(bp, i->rp, copy_amt);
358                 /* That may or may not have consumed all of the main body */
359                 i->rp += copy_amt;
360                 /* Subsequent blocks with offsets shouldn't be subject to
361                  * pullup. */
362                 warn_on(i->tx_csum_offset || i->network_offset ||
363                         i->transport_offset);
364                 n -= copy_amt;
365                 n -= block_yank_ebd_bytes(bp, i, n);
366                 if (!BLEN(i)) {
367                         bp->next = i->next;
368                         i->next = NULL;
369                         freeb(i);
370                 } else {
371                         assert(!n);
372                 }
373         }
374         return bp;
375 }
376
377 /*
378  *  make sure the first block has at least n bytes in its main body
379  */
380 struct block *pullupqueue(struct queue *q, size_t n)
381 {
382         struct block *b;
383
384         /* TODO: lock to protect the queue links? */
385         if ((BHLEN(q->bfirst) >= n))
386                 return q->bfirst;
387         /* This is restoring qio metadata.  If pullupblock did any work, it
388          * changed the list - at least the first block and possibly the last in
389          * the blist. */
390         q->bfirst = pullupblock(q->bfirst, n);
391         for (b = q->bfirst; b != NULL && b->next != NULL; b = b->next) ;
392         q->blast = b;
393         return q->bfirst;
394 }
395
396 /* throw away count bytes from the front of
397  * block's extradata.  Returns count of bytes
398  * thrown away
399  */
400
401 static int pullext(struct block *bp, int count)
402 {
403         struct extra_bdata *ed;
404         int i, rem, bytes = 0;
405
406         for (i = 0; bp->extra_len && count && i < bp->nr_extra_bufs; i++) {
407                 ed = &bp->extra_data[i];
408                 rem = MIN(count, ed->len);
409                 bp->extra_len -= rem;
410                 count -= rem;
411                 bytes += rem;
412                 ed->off += rem;
413                 ed->len -= rem;
414                 if (ed->len == 0) {
415                         kfree((void *)ed->base);
416                         ed->base = 0;
417                         ed->off = 0;
418                 }
419         }
420         return bytes;
421 }
422
423 /* throw away count bytes from the end of a
424  * block's extradata.  Returns count of bytes
425  * thrown away
426  */
427
428 static int dropext(struct block *bp, int count)
429 {
430         struct extra_bdata *ed;
431         int i, rem, bytes = 0;
432
433         for (i = bp->nr_extra_bufs - 1; bp->extra_len && count && i >= 0; i--) {
434                 ed = &bp->extra_data[i];
435                 rem = MIN(count, ed->len);
436                 bp->extra_len -= rem;
437                 count -= rem;
438                 bytes += rem;
439                 ed->len -= rem;
440                 if (ed->len == 0) {
441                         kfree((void *)ed->base);
442                         ed->base = 0;
443                         ed->off = 0;
444                 }
445         }
446         return bytes;
447 }
448
449 /*
450  *  throw away up to count bytes from a
451  *  list of blocks.  Return count of bytes
452  *  thrown away.
453  */
454 static int _pullblock(struct block **bph, int count, int free)
455 {
456         struct block *bp;
457         int n, bytes;
458
459         bytes = 0;
460         if (bph == NULL)
461                 return 0;
462
463         while (*bph != NULL && count != 0) {
464                 bp = *bph;
465
466                 n = MIN(BHLEN(bp), count);
467                 bytes += n;
468                 count -= n;
469                 bp->rp += n;
470                 n = pullext(bp, count);
471                 bytes += n;
472                 count -= n;
473                 QDEBUG checkb(bp, "pullblock ");
474                 if (BLEN(bp) == 0 && (free || count)) {
475                         *bph = bp->next;
476                         bp->next = NULL;
477                         freeb(bp);
478                 }
479         }
480         return bytes;
481 }
482
483 int pullblock(struct block **bph, int count)
484 {
485         return _pullblock(bph, count, 1);
486 }
487
488 /*
489  *  trim to len bytes starting at offset
490  */
491 struct block *trimblock(struct block *bp, int offset, int len)
492 {
493         uint32_t l, trim;
494         int olen = len;
495
496         QDEBUG checkb(bp, "trimblock 1");
497         if (blocklen(bp) < offset + len) {
498                 freeblist(bp);
499                 return NULL;
500         }
501
502         l =_pullblock(&bp, offset, 0);
503         if (bp == NULL)
504                 return NULL;
505         if (l != offset) {
506                 freeblist(bp);
507                 return NULL;
508         }
509
510         while ((l = BLEN(bp)) < len) {
511                 len -= l;
512                 bp = bp->next;
513         }
514
515         trim = BLEN(bp) - len;
516         trim -= dropext(bp, trim);
517         bp->wp -= trim;
518
519         if (bp->next) {
520                 freeblist(bp->next);
521                 bp->next = NULL;
522         }
523         return bp;
524 }
525
526 /* Adjust block @bp so that its size is exactly @len.
527  * If the size is increased, fill in the new contents with zeros.
528  * If the size is decreased, discard some of the old contents at the tail. */
529 struct block *adjustblock(struct block *bp, int len)
530 {
531         struct extra_bdata *ebd;
532         void *buf;
533         int i;
534
535         if (len < 0) {
536                 freeb(bp);
537                 return NULL;
538         }
539
540         if (len == BLEN(bp))
541                 return bp;
542
543         /* Shrink within block main body. */
544         if (len <= BHLEN(bp)) {
545                 free_block_extra(bp);
546                 bp->wp = bp->rp + len;
547                 QDEBUG checkb(bp, "adjustblock 1");
548                 return bp;
549         }
550
551         /* Need to grow. */
552         if (len > BLEN(bp)) {
553                 /* Grow within block main body. */
554                 if (bp->extra_len == 0 && bp->rp + len <= bp->lim) {
555                         memset(bp->wp, 0, len - BLEN(bp));
556                         bp->wp = bp->rp + len;
557                         QDEBUG checkb(bp, "adjustblock 2");
558                         return bp;
559                 }
560                 /* Grow with extra data buffers. */
561                 buf = kzmalloc(len - BLEN(bp), MEM_WAIT);
562                 block_append_extra(bp, (uintptr_t)buf, 0, len - BLEN(bp),
563                                    MEM_WAIT);
564                 QDEBUG checkb(bp, "adjustblock 3");
565                 return bp;
566         }
567
568         /* Shrink extra data buffers.
569          * len is how much of ebd we need to keep.
570          * extra_len is re-accumulated. */
571         assert(bp->extra_len > 0);
572         len -= BHLEN(bp);
573         bp->extra_len = 0;
574         for (i = 0; i < bp->nr_extra_bufs; i++) {
575                 ebd = &bp->extra_data[i];
576                 if (len <= ebd->len)
577                         break;
578                 len -= ebd->len;
579                 bp->extra_len += ebd->len;
580         }
581         /* If len becomes zero, extra_data[i] should be freed. */
582         if (len > 0) {
583                 ebd = &bp->extra_data[i];
584                 ebd->len = len;
585                 bp->extra_len += ebd->len;
586                 i++;
587         }
588         for (; i < bp->nr_extra_bufs; i++) {
589                 ebd = &bp->extra_data[i];
590                 if (ebd->base)
591                         kfree((void*)ebd->base);
592                 ebd->base = ebd->off = ebd->len = 0;
593         }
594         QDEBUG checkb(bp, "adjustblock 4");
595         return bp;
596 }
597
598 /* Helper: removes and returns the first block from q */
599 static struct block *pop_first_block(struct queue *q)
600 {
601         struct block *b = q->bfirst;
602
603         q->dlen -= BLEN(b);
604         q->bytes_read += BLEN(b);
605         q->bfirst = b->next;
606         b->next = 0;
607         return b;
608 }
609
610 /* Accounting helper.  Block b in q lost amt extra_data */
611 static void block_and_q_lost_extra(struct block *b, struct queue *q, size_t amt)
612 {
613         b->extra_len -= amt;
614         q->dlen -= amt;
615         q->bytes_read += amt;
616 }
617
618 /* Helper: moves ebd from a block (in from_q) to another block.  The *ebd is
619  * fixed in 'from', so we move its contents and zero it out in 'from'.
620  *
621  * Returns the length moved (0 on failure). */
622 static size_t move_ebd(struct extra_bdata *ebd, struct block *to,
623                        struct block *from, struct queue *from_q)
624 {
625         size_t ret = ebd->len;
626
627         if (block_append_extra(to, ebd->base, ebd->off, ebd->len, MEM_ATOMIC))
628                 return 0;
629         block_and_q_lost_extra(from, from_q, ebd->len);
630         ebd->base = ebd->len = ebd->off = 0;
631         return ret;
632 }
633
634 /* Copy up to len bytes from q->bfirst to @to, leaving the block in place.  May
635  * return with less than len, but greater than 0, even if there is more
636  * available in q.
637  *
638  * At any moment that we have copied anything and things are tricky, we can just
639  * return.  The trickiness comes from a bunch of variables: is the main body
640  * empty?  How do we split the ebd?  If our alloc fails, then we can fall back
641  * to @to's main body, but only if we haven't used it yet. */
642 static size_t copy_from_first_block(struct queue *q, struct block *to,
643                                     size_t len)
644 {
645         struct block *from = q->bfirst;
646         size_t copy_amt, amt;
647         struct extra_bdata *ebd;
648
649         assert(len < BLEN(from));       /* sanity */
650         /* Try to extract from the main body */
651         copy_amt = MIN(BHLEN(from), len);
652         if (copy_amt) {
653                 copy_amt = block_copy_to_body(to, from->rp, copy_amt);
654                 from->rp += copy_amt;
655                 /* We only change dlen, (data len), not q->len, since the q
656                  * still has the same block memory allocation (no kfrees
657                  * happened) */
658                 q->dlen -= copy_amt;
659                 q->bytes_read += copy_amt;
660         }
661         /* Try to extract the remainder from the extra data */
662         len -= copy_amt;
663         for (int i = 0; (i < from->nr_extra_bufs) && len; i++) {
664                 ebd = &from->extra_data[i];
665                 if (!ebd->base || !ebd->len)
666                         continue;
667                 if (len >= ebd->len) {
668                         amt = move_ebd(ebd, to, from, q);
669                         if (!amt) {
670                                 /* our internal alloc could have failed.   this
671                                  * ebd is now the last one we'll consider.
672                                  * let's handle it separately and put it in the
673                                  * main body. */
674                                 if (copy_amt)
675                                         return copy_amt;
676                                 copy_amt = block_copy_to_body(to,
677                                                               (void*)ebd->base +
678                                                               ebd->off,
679                                                               ebd->len);
680                                 block_and_q_lost_extra(from, q, copy_amt);
681                                 break;
682                         }
683                         len -= amt;
684                         copy_amt += amt;
685                         continue;
686                 } else {
687                         /* If we're here, we reached our final ebd, which we'll
688                          * need to split to get anything from it. */
689                         if (copy_amt)
690                                 return copy_amt;
691                         copy_amt = block_copy_to_body(to, (void*)ebd->base +
692                                                       ebd->off, len);
693                         ebd->off += copy_amt;
694                         ebd->len -= copy_amt;
695                         block_and_q_lost_extra(from, q, copy_amt);
696                         break;
697                 }
698         }
699         if (len)
700                 assert(copy_amt);       /* sanity */
701         return copy_amt;
702 }
703
704 /* Return codes for __qbread and __try_qbread. */
705 enum {
706         QBR_OK,
707         QBR_FAIL,
708         QBR_SPARE,      /* we need a spare block */
709         QBR_AGAIN,      /* do it again, we are coalescing blocks */
710 };
711
712 /* Helper and back-end for __qbread: extracts and returns a list of blocks
713  * containing up to len bytes.  It may contain less than len even if q has more
714  * data.
715  *
716  * Returns a code interpreted by __qbread, and the returned blist in ret. */
717 static int __try_qbread(struct queue *q, size_t len, int qio_flags,
718                         struct block **real_ret, struct block *spare)
719 {
720         struct block *ret, *ret_last, *first;
721         size_t blen;
722         bool was_unwritable = FALSE;
723
724         if (qio_flags & QIO_CAN_ERR_SLEEP) {
725                 if (!qwait_and_ilock(q, qio_flags)) {
726                         spin_unlock_irqsave(&q->lock);
727                         return QBR_FAIL;
728                 }
729                 /* we qwaited and still hold the lock, so the q is not empty */
730                 first = q->bfirst;
731         } else {
732                 spin_lock_irqsave(&q->lock);
733                 first = q->bfirst;
734                 if (!first) {
735                         spin_unlock_irqsave(&q->lock);
736                         return QBR_FAIL;
737                 }
738         }
739         /* We need to check before adjusting q->len.  We're checking the
740          * writer's sleep condition / tap condition.  When set, we *might* be
741          * making an edge transition (from unwritable to writable), which needs
742          * to wake and fire taps.  But, our read might not drain the queue below
743          * q->lim.  We'll check again later to see if we should really wake
744          * them.  */
745         was_unwritable = !qwritable(q);
746         blen = BLEN(first);
747         if ((q->state & Qcoalesce) && (blen == 0)) {
748                 freeb(pop_first_block(q));
749                 spin_unlock_irqsave(&q->lock);
750                 /* Need to retry to make sure we have a first block */
751                 return QBR_AGAIN;
752         }
753         /* Qmsg: just return the first block.  Be careful, since our caller
754          * might not read all of the block and thus drop bytes.  Similar to
755          * SOCK_DGRAM. */
756         if (q->state & Qmsg) {
757                 ret = pop_first_block(q);
758                 goto out_ok;
759         }
760         /* Let's get at least something first - makes the code easier.  This
761          * way, we'll only ever split the block once. */
762         if (blen <= len) {
763                 ret = pop_first_block(q);
764                 len -= blen;
765         } else {
766                 /* need to split the block.  we won't actually take the first
767                  * block out of the queue - we're just extracting a little bit.
768                  */
769                 if (!spare) {
770                         /* We have nothing and need a spare block.  Retry! */
771                         spin_unlock_irqsave(&q->lock);
772                         return QBR_SPARE;
773                 }
774                 copy_from_first_block(q, spare, len);
775                 ret = spare;
776                 goto out_ok;
777         }
778         /* At this point, we just grabbed the first block.  We can try to grab
779          * some more, up to len (if they want). */
780         if (qio_flags & QIO_JUST_ONE_BLOCK)
781                 goto out_ok;
782         ret_last = ret;
783         while (q->bfirst && (len > 0)) {
784                 blen = BLEN(q->bfirst);
785                 if ((q->state & Qcoalesce) && (blen == 0)) {
786                         /* remove the intermediate 0 blocks */
787                         freeb(pop_first_block(q));
788                         continue;
789                 }
790                 if (blen > len) {
791                         /* We could try to split the block, but that's a huge
792                          * pain.  For instance, we might need to move the main
793                          * body of b into an extra_data of ret_last.  lots of
794                          * ways for that to fail, and lots of cases to consider.
795                          * Easier to just bail out.  This is why I did the first
796                          * block above: we don't need to worry about this. */
797                          break;
798                 }
799                 ret_last->next = pop_first_block(q);
800                 ret_last = ret_last->next;
801                 len -= blen;
802         }
803 out_ok:
804         /* Don't wake them up or fire tap if we didn't drain enough. */
805         if (!qwritable(q))
806                 was_unwritable = FALSE;
807         spin_unlock_irqsave(&q->lock);
808         if (was_unwritable) {
809                 if (q->kick && !(qio_flags & QIO_DONT_KICK))
810                         q->kick(q->arg);
811                 rendez_wakeup(&q->wr);
812                 qwake_cb(q, FDTAP_FILT_WRITABLE);
813         }
814         *real_ret = ret;
815         return QBR_OK;
816 }
817
818 /* Helper and front-end for __try_qbread: extracts and returns a list of blocks
819  * containing up to len bytes.  It may contain less than len even if q has more
820  * data.
821  *
822  * Returns 0 if the q is closed, if it would require blocking and !CAN_BLOCK, or
823  * if it required a spare and the memory allocation failed.
824  *
825  * Technically, there's a weird corner case with !Qcoalesce and Qmsg where you
826  * could get a zero length block back. */
827 static struct block *__qbread(struct queue *q, size_t len, int qio_flags,
828                               int mem_flags)
829 {
830         ERRSTACK(1);
831         struct block *ret = 0;
832         struct block *volatile spare = 0;       /* volatile for the waserror */
833
834         /* __try_qbread can throw, based on qio flags. */
835         if ((qio_flags & QIO_CAN_ERR_SLEEP) && waserror()) {
836                 if (spare)
837                         freeb(spare);
838                 nexterror();
839         }
840         while (1) {
841                 switch (__try_qbread(q, len, qio_flags, &ret, spare)) {
842                 case QBR_OK:
843                 case QBR_FAIL:
844                         if (spare && (ret != spare))
845                                 freeb(spare);
846                         goto out_ret;
847                 case QBR_SPARE:
848                         assert(!spare);
849                         /* Due to some nastiness, we need a fresh block so we
850                          * can read out anything from the queue.  'len' seems
851                          * like a reasonable amount.  Maybe we can get away with
852                          * less. */
853                         spare = block_alloc(len, mem_flags);
854                         if (!spare) {
855                                 /* Careful here: a memory failure (possible with
856                                  * MEM_ATOMIC) could look like 'no data in the
857                                  * queue' (QBR_FAIL).  The only one who does is
858                                  * this qget(), who happens to know that we
859                                  * won't need a spare, due to the len argument.
860                                  * Spares are only needed when we need to split
861                                  * a block. */
862                                 ret = 0;
863                                 goto out_ret;
864                         }
865                         break;
866                 case QBR_AGAIN:
867                         /* if the first block is 0 and we are Qcoalesce, then
868                          * we'll need to try again.  We bounce out of __try so
869                          * we can perform the "is there a block" logic again
870                          * from the top. */
871                         break;
872                 }
873         }
874         assert(0);
875 out_ret:
876         if (qio_flags & QIO_CAN_ERR_SLEEP)
877                 poperror();
878         return ret;
879 }
880
881 /*
882  *  get next block from a queue, return null if nothing there
883  */
884 struct block *qget(struct queue *q)
885 {
886         /* since len == SIZE_MAX, we should never need to do a mem alloc */
887         return __qbread(q, SIZE_MAX, QIO_JUST_ONE_BLOCK, MEM_ATOMIC);
888 }
889
890 /* Throw away the next 'len' bytes in the queue returning the number actually
891  * discarded.
892  *
893  * If the bytes are in the queue, then they must be discarded.  The only time to
894  * return less than len is if the q itself has less than len bytes.
895  *
896  * This won't trigger a kick when waking up any sleepers.  This seems to be Plan
897  * 9's intent, since the TCP stack will deadlock if qdiscard kicks. */
898 size_t qdiscard(struct queue *q, size_t len)
899 {
900         struct block *blist;
901         size_t removed_amt;
902         size_t sofar = 0;
903
904         /* This is racy.  There could be multiple qdiscarders or other
905          * consumers, where the consumption could be interleaved. */
906         while (qlen(q) && len) {
907                 blist = __qbread(q, len, QIO_DONT_KICK, MEM_WAIT);
908                 removed_amt = freeblist(blist);
909                 sofar += removed_amt;
910                 len -= removed_amt;
911         }
912         return sofar;
913 }
914
915 ssize_t qpass(struct queue *q, struct block *b)
916 {
917         return __qbwrite(q, b, QIO_LIMIT | QIO_DROP_OVERFLOW);
918 }
919
920 ssize_t qpassnolim(struct queue *q, struct block *b)
921 {
922         return __qbwrite(q, b, 0);
923 }
924
925 /*
926  *  if the allocated space is way out of line with the used
927  *  space, reallocate to a smaller block
928  */
929 struct block *packblock(struct block *bp)
930 {
931         struct block **l, *nbp;
932         int n;
933
934         if (bp->extra_len)
935                 return bp;
936         for (l = &bp; *l; l = &(*l)->next) {
937                 nbp = *l;
938                 n = BLEN(nbp);
939                 if ((n << 2) < BALLOC(nbp)) {
940                         *l = block_alloc(n, MEM_WAIT);
941                         memmove((*l)->wp, nbp->rp, n);
942                         (*l)->wp += n;
943                         (*l)->next = nbp->next;
944                         nbp->next = NULL;
945                         freeb(nbp);
946                 }
947         }
948
949         return bp;
950 }
951
952 /* Add an extra_data entry to newb at newb_idx pointing to b's body, starting at
953  * body_rp, for up to len.  Returns the len consumed.
954  *
955  * The base is 'b', so that we can kfree it later.  This currently ties us to
956  * using kfree for the release method for all extra_data.
957  *
958  * It is possible to have a body size that is 0, if there is no offset, and
959  * b->wp == b->rp.  This will have an extra data entry of 0 length. */
960 static size_t point_to_body(struct block *b, uint8_t *body_rp,
961                             struct block *newb, unsigned int newb_idx,
962                             size_t len)
963 {
964         struct extra_bdata *ebd = &newb->extra_data[newb_idx];
965
966         assert(newb_idx < newb->nr_extra_bufs);
967
968         kmalloc_incref(b);
969         ebd->base = (uintptr_t)b;
970         ebd->off = (uint32_t)(body_rp - (uint8_t*)b);
971         ebd->len = MIN(b->wp - body_rp, len);   /* think of body_rp as b->rp */
972         assert((int)ebd->len >= 0);
973         newb->extra_len += ebd->len;
974         return ebd->len;
975 }
976
977 /* Add an extra_data entry to newb at newb_idx pointing to b's b_idx'th
978  * extra_data buf, at b_off within that buffer, for up to len.  Returns the len
979  * consumed.
980  *
981  * We can have blocks with 0 length, but they are still refcnt'd.  See above. */
982 static size_t point_to_buf(struct block *b, unsigned int b_idx, uint32_t b_off,
983                            struct block *newb, unsigned int newb_idx,
984                            size_t len)
985 {
986         struct extra_bdata *n_ebd = &newb->extra_data[newb_idx];
987         struct extra_bdata *b_ebd = &b->extra_data[b_idx];
988
989         assert(b_idx < b->nr_extra_bufs);
990         assert(newb_idx < newb->nr_extra_bufs);
991
992         kmalloc_incref((void*)b_ebd->base);
993         n_ebd->base = b_ebd->base;
994         n_ebd->off = b_ebd->off + b_off;
995         n_ebd->len = MIN(b_ebd->len - b_off, len);
996         newb->extra_len += n_ebd->len;
997         return n_ebd->len;
998 }
999
1000 /* given a string of blocks, sets up the new block's extra_data such that it
1001  * *points* to the contents of the blist [offset, len + offset).  This does not
1002  * make a separate copy of the contents of the blist.
1003  *
1004  * returns 0 on success.  the only failure is if the extra_data array was too
1005  * small, so this returns a positive integer saying how big the extra_data needs
1006  * to be.
1007  *
1008  * callers are responsible for protecting the list structure. */
1009 static int __blist_clone_to(struct block *blist, struct block *newb, int len,
1010                             uint32_t offset)
1011 {
1012         struct block *b, *first;
1013         unsigned int nr_bufs = 0;
1014         unsigned int b_idx, newb_idx = 0;
1015         uint8_t *first_main_body = 0;
1016         ssize_t sofar = 0;
1017
1018         /* find the first block; keep offset relative to the latest b in the
1019          * list */
1020         for (b = blist; b; b = b->next) {
1021                 if (BLEN(b) > offset)
1022                         break;
1023                 offset -= BLEN(b);
1024         }
1025         /* qcopy semantics: if you asked for an offset outside the block list,
1026          * you get an empty block back */
1027         if (!b)
1028                 return 0;
1029         first = b;
1030         sofar -= offset; /* don't count the remaining offset in the first b */
1031         /* upper bound for how many buffers we'll need in newb */
1032         for (/* b is set*/; b; b = b->next) {
1033                 nr_bufs += BHLEN(b) ? 1 : 0;
1034                 /* still assuming nr_extra == nr_valid */
1035                 nr_bufs += b->nr_extra_bufs;
1036                 sofar += BLEN(b);
1037                 if (sofar > len)
1038                         break;
1039         }
1040         /* we might be holding a spinlock here, so we won't wait for kmalloc */
1041         if (block_add_extd(newb, nr_bufs, 0) != 0) {
1042                 /* caller will need to alloc these, then re-call us */
1043                 return nr_bufs;
1044         }
1045         for (b = first; b && len; b = b->next) {
1046                 b_idx = 0;
1047                 if (offset) {
1048                         if (offset < BHLEN(b)) {
1049                                 /* off is in the main body */
1050                                 len -= point_to_body(b, b->rp + offset, newb,
1051                                                      newb_idx, len);
1052                                 newb_idx++;
1053                         } else {
1054                                 /* off is in one of the buffers (or just past
1055                                  * the last one).  we're not going to point to
1056                                  * b's main body at all. */
1057                                 offset -= BHLEN(b);
1058                                 assert(b->extra_data);
1059                                 /* assuming these extrabufs are packed, or at
1060                                  * least that len isn't gibberish */
1061                                 while (b->extra_data[b_idx].len <= offset) {
1062                                         offset -= b->extra_data[b_idx].len;
1063                                         b_idx++;
1064                                 }
1065                                 /* now offset is set to our offset in the
1066                                  * b_idx'th buf */
1067                                 len -= point_to_buf(b, b_idx, offset, newb,
1068                                                     newb_idx, len);
1069                                 newb_idx++;
1070                                 b_idx++;
1071                         }
1072                         offset = 0;
1073                 } else {
1074                         if (BHLEN(b)) {
1075                                 len -= point_to_body(b, b->rp, newb, newb_idx,
1076                                                      len);
1077                                 newb_idx++;
1078                         }
1079                 }
1080                 /* knock out all remaining bufs.  we only did one point_to_ op
1081                  * by now, and any point_to_ could be our last if it consumed
1082                  * all of len. */
1083                 for (int i = b_idx; (i < b->nr_extra_bufs) && len; i++) {
1084                         len -= point_to_buf(b, i, 0, newb, newb_idx, len);
1085                         newb_idx++;
1086                 }
1087         }
1088         return 0;
1089 }
1090
1091 struct block *blist_clone(struct block *blist, int header_len, int len,
1092                           uint32_t offset)
1093 {
1094         int ret;
1095         struct block *newb = block_alloc(header_len, MEM_WAIT);
1096
1097         do {
1098                 ret = __blist_clone_to(blist, newb, len, offset);
1099                 if (ret)
1100                         block_add_extd(newb, ret, MEM_WAIT);
1101         } while (ret);
1102         return newb;
1103 }
1104
1105 /* given a queue, makes a single block with header_len reserved space in the
1106  * block main body, and the contents of [offset, len + offset) pointed to in the
1107  * new blocks ext_data.  This does not make a copy of the q's contents, though
1108  * you do have a ref count on the memory. */
1109 struct block *qclone(struct queue *q, int header_len, int len, uint32_t offset)
1110 {
1111         int ret;
1112         struct block *newb = block_alloc(header_len, MEM_WAIT);
1113         /* the while loop should rarely be used: it would require someone
1114          * concurrently adding to the queue. */
1115         do {
1116                 /* TODO: RCU protect the q list (b->next) (need read lock) */
1117                 spin_lock_irqsave(&q->lock);
1118                 ret = __blist_clone_to(q->bfirst, newb, len, offset);
1119                 spin_unlock_irqsave(&q->lock);
1120                 if (ret)
1121                         block_add_extd(newb, ret, MEM_WAIT);
1122         } while (ret);
1123         return newb;
1124 }
1125
1126 struct block *qcopy(struct queue *q, int len, uint32_t offset)
1127 {
1128         return qclone(q, 0, len, offset);
1129 }
1130
1131 static void qinit_common(struct queue *q)
1132 {
1133         spinlock_init_irqsave(&q->lock);
1134         rendez_init(&q->rr);
1135         rendez_init(&q->wr);
1136 }
1137
1138 /*
1139  *  called by non-interrupt code
1140  */
1141 struct queue *qopen(int limit, int msg, void (*kick) (void *), void *arg)
1142 {
1143         struct queue *q;
1144
1145         q = kzmalloc(sizeof(struct queue), 0);
1146         if (q == 0)
1147                 return 0;
1148         qinit_common(q);
1149
1150         q->limit = q->inilim = limit;
1151         q->kick = kick;
1152         q->arg = arg;
1153         q->state = msg;
1154         q->eof = 0;
1155
1156         return q;
1157 }
1158
1159 /* open a queue to be bypassed */
1160 struct queue *qbypass(void (*bypass) (void *, struct block *), void *arg)
1161 {
1162         struct queue *q;
1163
1164         q = kzmalloc(sizeof(struct queue), 0);
1165         if (q == 0)
1166                 return 0;
1167         qinit_common(q);
1168
1169         q->limit = 0;
1170         q->arg = arg;
1171         q->bypass = bypass;
1172         q->state = 0;
1173
1174         return q;
1175 }
1176
1177 static int notempty(void *a)
1178 {
1179         struct queue *q = a;
1180
1181         return (q->state & Qclosed) || q->bfirst != 0;
1182 }
1183
1184 /* Block, waiting for the queue to be non-empty or closed.  Returns with
1185  * the spinlock held.  Returns TRUE when there queue is not empty, FALSE if it
1186  * was naturally closed.  Throws an error o/w. */
1187 static bool qwait_and_ilock(struct queue *q, int qio_flags)
1188 {
1189         while (1) {
1190                 spin_lock_irqsave(&q->lock);
1191                 if (q->bfirst != NULL)
1192                         return TRUE;
1193                 if (q->state & Qclosed) {
1194                         if (++q->eof > 3) {
1195                                 spin_unlock_irqsave(&q->lock);
1196                                 error(EPIPE,
1197                                       "multiple reads on a closed queue");
1198                         }
1199                         if (q->err[0]) {
1200                                 spin_unlock_irqsave(&q->lock);
1201                                 error(EPIPE, q->err);
1202                         }
1203                         return FALSE;
1204                 }
1205                 if (qio_flags & QIO_NON_BLOCK) {
1206                         spin_unlock_irqsave(&q->lock);
1207                         error(EAGAIN, "queue empty");
1208                 }
1209                 spin_unlock_irqsave(&q->lock);
1210                 /* As with the producer side, we check for a condition while
1211                  * holding the q->lock, decide to sleep, then unlock.  It's like
1212                  * the "check, signal, check again" pattern, but we do it
1213                  * conditionally.  Both sides agree synchronously to do it, and
1214                  * those decisions are made while holding q->lock.  I think this
1215                  * is OK.
1216                  *
1217                  * The invariant is that no reader sleeps when the queue has
1218                  * data.  While holding the rendez lock, if we see there's no
1219                  * data, we'll sleep.  Since we saw there was no data, the next
1220                  * writer will see (or already saw) no data, and then the writer
1221                  * decides to rendez_wake, which will grab the rendez lock.  If
1222                  * the writer already did that, then we'll see notempty when we
1223                  * do our check-again. */
1224                 rendez_sleep(&q->rr, notempty, q);
1225         }
1226 }
1227
1228 /*
1229  * add a block list to a queue
1230  * XXX basically the same as enqueue blist, and has no locking!
1231  */
1232 void qaddlist(struct queue *q, struct block *b)
1233 {
1234         /* TODO: q lock? */
1235         /* queue the block */
1236         if (q->bfirst)
1237                 q->blast->next = b;
1238         else
1239                 q->bfirst = b;
1240         q->dlen += blocklen(b);
1241         while (b->next)
1242                 b = b->next;
1243         q->blast = b;
1244 }
1245
1246 static size_t read_from_block(struct block *b, uint8_t *to, size_t amt)
1247 {
1248         size_t copy_amt, retval = 0;
1249         struct extra_bdata *ebd;
1250
1251         copy_amt = MIN(BHLEN(b), amt);
1252         memcpy(to, b->rp, copy_amt);
1253         /* advance the rp, since this block might not be completely consumed and
1254          * future reads need to know where to pick up from */
1255         b->rp += copy_amt;
1256         to += copy_amt;
1257         amt -= copy_amt;
1258         retval += copy_amt;
1259         for (int i = 0; (i < b->nr_extra_bufs) && amt; i++) {
1260                 ebd = &b->extra_data[i];
1261                 /* skip empty entires.  if we track this in the struct block, we
1262                  * can just start the for loop early */
1263                 if (!ebd->base || !ebd->len)
1264                         continue;
1265                 copy_amt = MIN(ebd->len, amt);
1266                 memcpy(to, (void*)(ebd->base + ebd->off), copy_amt);
1267                 /* we're actually consuming the entries, just like how we
1268                  * advance rp up above, and might only consume part of one. */
1269                 block_consume_ebd_bytes(b, ebd, copy_amt);
1270                 to += copy_amt;
1271                 amt -= copy_amt;
1272                 retval += copy_amt;
1273         }
1274         return retval;
1275 }
1276
1277 /*
1278  *  copy the contents of a string of blocks into
1279  *  memory.  emptied blocks are freed.  return
1280  *  pointer to first unconsumed block.
1281  */
1282 struct block *bl2mem(uint8_t * p, struct block *b, int n)
1283 {
1284         int i;
1285         struct block *next;
1286
1287         /* could be slicker here, since read_from_block is smart */
1288         for (; b != NULL; b = next) {
1289                 i = BLEN(b);
1290                 if (i > n) {
1291                         /* partial block, consume some */
1292                         read_from_block(b, p, n);
1293                         return b;
1294                 }
1295                 /* full block, consume all and move on */
1296                 i = read_from_block(b, p, i);
1297                 n -= i;
1298                 p += i;
1299                 next = b->next;
1300                 b->next = NULL;
1301                 freeb(b);
1302         }
1303         return NULL;
1304 }
1305
1306 /* Extract the contents of all blocks and copy to va, up to len.  Returns the
1307  * actual amount copied. */
1308 static size_t read_all_blocks(struct block *b, void *va, size_t len)
1309 {
1310         size_t sofar = 0;
1311         struct block *next;
1312
1313         do {
1314                 assert(va);
1315                 assert(b->rp);
1316                 sofar += read_from_block(b, va + sofar, len - sofar);
1317                 if (BLEN(b) && b->next)
1318                         panic("Failed to drain entire block (Qmsg?) but had a next!");
1319                 next = b->next;
1320                 b->next = NULL;
1321                 freeb(b);
1322                 b = next;
1323         } while (b);
1324         return sofar;
1325 }
1326
1327 /*
1328  *  copy the contents of memory into a string of blocks.
1329  *  return NULL on error.
1330  */
1331 struct block *mem2bl(uint8_t * p, int len)
1332 {
1333         ERRSTACK(1);
1334         int n;
1335         struct block *b, *first, **l;
1336
1337         first = NULL;
1338         l = &first;
1339         if (waserror()) {
1340                 freeblist(first);
1341                 nexterror();
1342         }
1343         do {
1344                 n = len;
1345                 if (n > Maxatomic)
1346                         n = Maxatomic;
1347
1348                 *l = b = block_alloc(n, MEM_WAIT);
1349                 /* TODO consider extra_data */
1350                 memmove(b->wp, p, n);
1351                 b->wp += n;
1352                 p += n;
1353                 len -= n;
1354                 l = &b->next;
1355         } while (len > 0);
1356         poperror();
1357
1358         return first;
1359 }
1360
1361 /*
1362  *  put a block back to the front of the queue
1363  *  called with q ilocked
1364  */
1365 void qputback(struct queue *q, struct block *b)
1366 {
1367         b->next = q->bfirst;
1368         if (q->bfirst == NULL)
1369                 q->blast = b;
1370         q->bfirst = b;
1371         q->dlen += BLEN(b);
1372         /* qputback seems to undo a read, so we can undo the accounting too. */
1373         q->bytes_read -= BLEN(b);
1374 }
1375
1376 /*
1377  *  get next block from a queue (up to a limit)
1378  *
1379  */
1380 struct block *qbread(struct queue *q, size_t len)
1381 {
1382         return __qbread(q, len, QIO_JUST_ONE_BLOCK | QIO_CAN_ERR_SLEEP,
1383                         MEM_WAIT);
1384 }
1385
1386 struct block *qbread_nonblock(struct queue *q, size_t len)
1387 {
1388         return __qbread(q, len, QIO_JUST_ONE_BLOCK | QIO_CAN_ERR_SLEEP |
1389                         QIO_NON_BLOCK, MEM_WAIT);
1390 }
1391
1392 /* read up to len from a queue into vp. */
1393 size_t qread(struct queue *q, void *va, size_t len)
1394 {
1395         struct block *blist = __qbread(q, len, QIO_CAN_ERR_SLEEP, MEM_WAIT);
1396
1397         if (!blist)
1398                 return 0;
1399         return read_all_blocks(blist, va, len);
1400 }
1401
1402 size_t qread_nonblock(struct queue *q, void *va, size_t len)
1403 {
1404         struct block *blist = __qbread(q, len, QIO_CAN_ERR_SLEEP |
1405                                        QIO_NON_BLOCK, MEM_WAIT);
1406
1407         if (!blist)
1408                 return 0;
1409         return read_all_blocks(blist, va, len);
1410 }
1411
1412 /* This is the rendez wake condition for writers. */
1413 static int qwriter_should_wake(void *a)
1414 {
1415         struct queue *q = a;
1416
1417         return qwritable(q) || (q->state & Qclosed);
1418 }
1419
1420 /* Helper: enqueues a list of blocks to a queue.  Returns the total length. */
1421 static size_t enqueue_blist(struct queue *q, struct block *b)
1422 {
1423         size_t dlen;
1424
1425         if (q->bfirst)
1426                 q->blast->next = b;
1427         else
1428                 q->bfirst = b;
1429         dlen = BLEN(b);
1430         while (b->next) {
1431                 b = b->next;
1432                 dlen += BLEN(b);
1433         }
1434         q->blast = b;
1435         q->dlen += dlen;
1436         return dlen;
1437 }
1438
1439 /* Adds block (which can be a list of blocks) to the queue, subject to
1440  * qio_flags.  Returns the length written on success or -1 on non-throwable
1441  * error.  Adjust qio_flags to control the value-added features!. */
1442 static ssize_t __qbwrite(struct queue *q, struct block *b, int qio_flags)
1443 {
1444         ssize_t ret;
1445         bool was_unreadable;
1446
1447         if (q->bypass) {
1448                 ret = blocklen(b);
1449                 (*q->bypass) (q->arg, b);
1450                 return ret;
1451         }
1452         spin_lock_irqsave(&q->lock);
1453         was_unreadable = q->dlen == 0;
1454         if (q->state & Qclosed) {
1455                 spin_unlock_irqsave(&q->lock);
1456                 freeblist(b);
1457                 if (!(qio_flags & QIO_CAN_ERR_SLEEP))
1458                         return -1;
1459                 if (q->err[0])
1460                         error(EPIPE, q->err);
1461                 else
1462                         error(EPIPE, "connection closed");
1463         }
1464         if ((qio_flags & QIO_LIMIT) && (q->dlen >= q->limit)) {
1465                 /* drop overflow takes priority over regular non-blocking */
1466                 if ((qio_flags & QIO_DROP_OVERFLOW)
1467                     || (q->state & Qdropoverflow)) {
1468                         spin_unlock_irqsave(&q->lock);
1469                         freeb(b);
1470                         return -1;
1471                 }
1472                 /* People shouldn't set NON_BLOCK without CAN_ERR, but we can be
1473                  * nice and catch it. */
1474                 if ((qio_flags & QIO_CAN_ERR_SLEEP)
1475                     && (qio_flags & QIO_NON_BLOCK)) {
1476                         spin_unlock_irqsave(&q->lock);
1477                         freeb(b);
1478                         error(EAGAIN, "queue full");
1479                 }
1480         }
1481         ret = enqueue_blist(q, b);
1482         QDEBUG checkb(b, "__qbwrite");
1483         spin_unlock_irqsave(&q->lock);
1484         /* TODO: not sure if the usage of a kick is mutually exclusive with a
1485          * wakeup, meaning that actual users either want a kick or have
1486          * qreaders. */
1487         if (q->kick && (was_unreadable || (q->state & Qkick)))
1488                 q->kick(q->arg);
1489         if (was_unreadable) {
1490                 /* Unlike the read side, there's no double-check to make sure
1491                  * the queue transitioned across an edge.  We know we added
1492                  * something, so that's enough.  We wake if the queue was empty.
1493                  * Both sides are the same, in that the condition for which we
1494                  * do the rendez_wakeup() is the same as the condition done for
1495                  * the rendez_sleep(). */
1496                 rendez_wakeup(&q->rr);
1497                 qwake_cb(q, FDTAP_FILT_READABLE);
1498         }
1499         /*
1500          *  flow control, wait for queue to get below the limit
1501          *  before allowing the process to continue and queue
1502          *  more.  We do this here so that postnote can only
1503          *  interrupt us after the data has been queued.  This
1504          *  means that things like 9p flushes and ssl messages
1505          *  will not be disrupted by software interrupts.
1506          *
1507          *  Note - this is moderately dangerous since a process
1508          *  that keeps getting interrupted and rewriting will
1509          *  queue infinite crud.
1510          */
1511         if ((qio_flags & QIO_CAN_ERR_SLEEP) &&
1512             !(q->state & Qdropoverflow) && !(qio_flags & QIO_NON_BLOCK)) {
1513                 /* This is a racy peek at the q status.  If we accidentally
1514                  * block, our rendez will return.  The rendez's peak
1515                  * (qwriter_should_wake) is also racy w.r.t.  the q's spinlock
1516                  * (that lock protects writes, but not reads).
1517                  *
1518                  * Here's the deal: when holding the rendez lock, if we see the
1519                  * sleep condition, the consumer will wake us.  The condition
1520                  * will only ever be changed by the next qbread() (consumer,
1521                  * changes q->dlen).  That code will do a rendez wake, which
1522                  * will spin on the rendez lock, meaning it won't procede until
1523                  * we either see the new state (and return) or put ourselves on
1524                  * the rendez, and wake up.
1525                  *
1526                  * The pattern is one side writes mem, then signals.  Our side
1527                  * checks the signal, then reads the mem.  The goal is to not
1528                  * miss seeing the signal AND missing the memory write.  In this
1529                  * specific case, the signal is actually synchronous (the rendez
1530                  * lock) and not basic shared memory.
1531                  *
1532                  * Oh, and we spin in case we woke early and someone else filled
1533                  * the queue, mesa-style. */
1534                 while (!qwriter_should_wake(q))
1535                         rendez_sleep(&q->wr, qwriter_should_wake, q);
1536         }
1537         return ret;
1538 }
1539
1540 /*
1541  *  add a block to a queue obeying flow control
1542  */
1543 ssize_t qbwrite(struct queue *q, struct block *b)
1544 {
1545         return __qbwrite(q, b, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
1546 }
1547
1548 ssize_t qbwrite_nonblock(struct queue *q, struct block *b)
1549 {
1550         return __qbwrite(q, b, QIO_CAN_ERR_SLEEP | QIO_LIMIT | QIO_NON_BLOCK);
1551 }
1552
1553 ssize_t qibwrite(struct queue *q, struct block *b)
1554 {
1555         return __qbwrite(q, b, 0);
1556 }
1557
1558 /* Helper, allocs a block and copies [from, from + len) into it.  Returns the
1559  * block on success, 0 on failure. */
1560 static struct block *build_block(void *from, size_t len, int mem_flags)
1561 {
1562         struct block *b;
1563         void *ext_buf;
1564
1565         /* If len is small, we don't need to bother with the extra_data.  But
1566          * until the whole stack can handle extd blocks, we'll use them
1567          * unconditionally.  */
1568
1569         /* allocb builds in 128 bytes of header space to all blocks, but this is
1570          * only available via padblock (to the left).  we also need some space
1571          * for pullupblock for some basic headers (like icmp) that get written
1572          * in directly */
1573         b = block_alloc(64, mem_flags);
1574         if (!b)
1575                 return 0;
1576         ext_buf = kmalloc(len, mem_flags);
1577         if (!ext_buf) {
1578                 kfree(b);
1579                 return 0;
1580         }
1581         memcpy(ext_buf, from, len);
1582         if (block_add_extd(b, 1, mem_flags)) {
1583                 kfree(ext_buf);
1584                 kfree(b);
1585                 return 0;
1586         }
1587         b->extra_data[0].base = (uintptr_t)ext_buf;
1588         b->extra_data[0].off = 0;
1589         b->extra_data[0].len = len;
1590         b->extra_len += len;
1591         return b;
1592 }
1593
1594 static ssize_t __qwrite(struct queue *q, void *vp, size_t len, int mem_flags,
1595                         int qio_flags)
1596 {
1597         ERRSTACK(1);
1598         size_t n;
1599         volatile size_t sofar = 0;      /* volatile for the waserror */
1600         struct block *b;
1601         uint8_t *p = vp;
1602         void *ext_buf;
1603
1604         /* Only some callers can throw.  Others might be in a context where
1605          * waserror isn't safe. */
1606         if ((qio_flags & QIO_CAN_ERR_SLEEP) && waserror()) {
1607                 /* Any error (EAGAIN for nonblock, syscall aborted, even EPIPE)
1608                  * after some data has been sent should be treated as a partial
1609                  * write. */
1610                 if (sofar)
1611                         goto out_ok;
1612                 nexterror();
1613         }
1614         do {
1615                 n = len - sofar;
1616                 /* This is 64K, the max amount per single block.  Still a good
1617                  * value? */
1618                 if (n > Maxatomic)
1619                         n = Maxatomic;
1620                 b = build_block(p + sofar, n, mem_flags);
1621                 if (!b)
1622                         break;
1623                 if (__qbwrite(q, b, qio_flags) < 0)
1624                         break;
1625                 sofar += n;
1626         } while ((sofar < len) && (q->state & Qmsg) == 0);
1627 out_ok:
1628         if (qio_flags & QIO_CAN_ERR_SLEEP)
1629                 poperror();
1630         return sofar;
1631 }
1632
1633 ssize_t qwrite(struct queue *q, void *vp, int len)
1634 {
1635         return __qwrite(q, vp, len, MEM_WAIT, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
1636 }
1637
1638 ssize_t qwrite_nonblock(struct queue *q, void *vp, int len)
1639 {
1640         return __qwrite(q, vp, len, MEM_WAIT, QIO_CAN_ERR_SLEEP | QIO_LIMIT |
1641                                               QIO_NON_BLOCK);
1642 }
1643
1644 ssize_t qiwrite(struct queue *q, void *vp, int len)
1645 {
1646         return __qwrite(q, vp, len, MEM_ATOMIC, 0);
1647 }
1648
1649 /*
1650  *  be extremely careful when calling this,
1651  *  as there is no reference accounting
1652  */
1653 void qfree(struct queue *q)
1654 {
1655         qclose(q);
1656         kfree(q);
1657 }
1658
1659 /*
1660  *  Mark a queue as closed.  No further IO is permitted.
1661  *  All blocks are released.
1662  */
1663 void qclose(struct queue *q)
1664 {
1665         struct block *bfirst;
1666
1667         if (q == NULL)
1668                 return;
1669
1670         /* mark it */
1671         spin_lock_irqsave(&q->lock);
1672         q->state |= Qclosed;
1673         q->state &= ~Qdropoverflow;
1674         q->err[0] = 0;
1675         bfirst = q->bfirst;
1676         q->bfirst = 0;
1677         q->dlen = 0;
1678         spin_unlock_irqsave(&q->lock);
1679
1680         /* free queued blocks */
1681         freeblist(bfirst);
1682
1683         /* wake up readers/writers */
1684         rendez_wakeup(&q->rr);
1685         rendez_wakeup(&q->wr);
1686         qwake_cb(q, FDTAP_FILT_HANGUP);
1687 }
1688
1689 /* Mark a queue as closed.  Wakeup any readers.  Don't remove queued blocks.
1690  *
1691  * msg will be the errstr received by any waiters (qread, qbread, etc).  If
1692  * there is no message, which is what also happens during a natural qclose(),
1693  * those waiters will simply return 0.  qwriters will always error() on a
1694  * closed/hungup queue. */
1695 void qhangup(struct queue *q, char *msg)
1696 {
1697         /* mark it */
1698         spin_lock_irqsave(&q->lock);
1699         q->state |= Qclosed;
1700         if (msg == 0 || *msg == 0)
1701                 q->err[0] = 0;
1702         else
1703                 strlcpy(q->err, msg, ERRMAX);
1704         spin_unlock_irqsave(&q->lock);
1705
1706         /* wake up readers/writers */
1707         rendez_wakeup(&q->rr);
1708         rendez_wakeup(&q->wr);
1709         qwake_cb(q, FDTAP_FILT_HANGUP);
1710 }
1711
1712 /*
1713  *  return non-zero if the q is hungup
1714  */
1715 int qisclosed(struct queue *q)
1716 {
1717         return q->state & Qclosed;
1718 }
1719
1720 /*
1721  *  mark a queue as no longer hung up.  resets the wake_cb.
1722  */
1723 void qreopen(struct queue *q)
1724 {
1725         spin_lock_irqsave(&q->lock);
1726         q->state &= ~Qclosed;
1727         q->eof = 0;
1728         q->limit = q->inilim;
1729         q->wake_cb = 0;
1730         q->wake_data = 0;
1731         spin_unlock_irqsave(&q->lock);
1732 }
1733
1734 /*
1735  *  return bytes queued
1736  */
1737 int qlen(struct queue *q)
1738 {
1739         return q->dlen;
1740 }
1741
1742 size_t q_bytes_read(struct queue *q)
1743 {
1744         return q->bytes_read;
1745 }
1746
1747 /*
1748  * return space remaining before flow control
1749  *
1750  *  This used to be
1751  *  q->len < q->limit/2
1752  *  but it slows down tcp too much for certain write sizes.
1753  *  I really don't understand it completely.  It may be
1754  *  due to the queue draining so fast that the transmission
1755  *  stalls waiting for the app to produce more data.  - presotto
1756  *
1757  *  q->len was the amount of bytes, which is no longer used.  we now use
1758  *  q->dlen, the amount of usable data.  a.k.a. qlen()...  - brho
1759  */
1760 int qwindow(struct queue *q)
1761 {
1762         int l;
1763
1764         l = q->limit - q->dlen;
1765         if (l < 0)
1766                 l = 0;
1767         return l;
1768 }
1769
1770 /*
1771  *  return true if we can read without blocking
1772  */
1773 int qcanread(struct queue *q)
1774 {
1775         return q->bfirst != 0;
1776 }
1777
1778 /*
1779  *  change queue limit
1780  */
1781 void qsetlimit(struct queue *q, size_t limit)
1782 {
1783         bool was_writable = qwritable(q);
1784
1785         q->limit = limit;
1786         if (!was_writable && qwritable(q)) {
1787                 rendez_wakeup(&q->wr);
1788                 qwake_cb(q, FDTAP_FILT_WRITABLE);
1789         }
1790 }
1791
1792 size_t qgetlimit(struct queue *q)
1793 {
1794         return q->limit;
1795 }
1796
1797 /*
1798  *  set whether writes drop overflowing blocks, or if we sleep
1799  */
1800 void qdropoverflow(struct queue *q, bool onoff)
1801 {
1802         spin_lock_irqsave(&q->lock);
1803         if (onoff)
1804                 q->state |= Qdropoverflow;
1805         else
1806                 q->state &= ~Qdropoverflow;
1807         spin_unlock_irqsave(&q->lock);
1808 }
1809
1810 /* Be careful: this can affect concurrent reads/writes and code that might have
1811  * built-in expectations of the q's type. */
1812 void q_toggle_qmsg(struct queue *q, bool onoff)
1813 {
1814         spin_lock_irqsave(&q->lock);
1815         if (onoff)
1816                 q->state |= Qmsg;
1817         else
1818                 q->state &= ~Qmsg;
1819         spin_unlock_irqsave(&q->lock);
1820 }
1821
1822 /* Be careful: this can affect concurrent reads/writes and code that might have
1823  * built-in expectations of the q's type. */
1824 void q_toggle_qcoalesce(struct queue *q, bool onoff)
1825 {
1826         spin_lock_irqsave(&q->lock);
1827         if (onoff)
1828                 q->state |= Qcoalesce;
1829         else
1830                 q->state &= ~Qcoalesce;
1831         spin_unlock_irqsave(&q->lock);
1832 }
1833
1834 /*
1835  *  flush the output queue
1836  */
1837 void qflush(struct queue *q)
1838 {
1839         struct block *bfirst;
1840
1841         /* mark it */
1842         spin_lock_irqsave(&q->lock);
1843         bfirst = q->bfirst;
1844         q->bfirst = 0;
1845         q->dlen = 0;
1846         spin_unlock_irqsave(&q->lock);
1847
1848         /* free queued blocks */
1849         freeblist(bfirst);
1850
1851         /* wake up writers */
1852         rendez_wakeup(&q->wr);
1853         qwake_cb(q, FDTAP_FILT_WRITABLE);
1854 }
1855
1856 int qfull(struct queue *q)
1857 {
1858         return !qwritable(q);
1859 }
1860
1861 int qstate(struct queue *q)
1862 {
1863         return q->state;
1864 }
1865
1866 void qdump(struct queue *q)
1867 {
1868         if (q)
1869                 printk("q=%p bfirst=%p blast=%p dlen=%d limit=%d state=#%x\n",
1870                            q, q->bfirst, q->blast, q->dlen, q->limit, q->state);
1871 }
1872
1873 /* On certain wakeup events, qio will call func(q, data, filter), where filter
1874  * marks the type of wakeup event (flags from FDTAP).
1875  *
1876  * There's no sync protection.  If you change the CB while the qio is running,
1877  * you might get a CB with the data or func from a previous set_wake_cb.  You
1878  * should set this once per queue and forget it.
1879  *
1880  * You can remove the CB by passing in 0 for the func.  Alternatively, you can
1881  * just make sure that the func(data) pair are valid until the queue is freed or
1882  * reopened. */
1883 void qio_set_wake_cb(struct queue *q, qio_wake_cb_t func, void *data)
1884 {
1885         q->wake_data = data;
1886         wmb();  /* if we see func, we'll also see the data for it */
1887         q->wake_cb = func;
1888 }
1889
1890 /* Helper for detecting whether we'll block on a read at this instant. */
1891 bool qreadable(struct queue *q)
1892 {
1893         return qlen(q) > 0;
1894 }
1895
1896 /* Helper for detecting whether we'll block on a write at this instant. */
1897 bool qwritable(struct queue *q)
1898 {
1899         return !q->limit || qwindow(q) > 0;
1900 }