qio: implement padblock() for block extra data
[akaros.git] / kern / src / ns / qio.c
1 /* Copyright © 1994-1999 Lucent Technologies Inc.  All rights reserved.
2  * Portions Copyright © 1997-1999 Vita Nuova Limited
3  * Portions Copyright © 2000-2007 Vita Nuova Holdings Limited
4  *                                (www.vitanuova.com)
5  * Revisions Copyright © 2000-2007 Lucent Technologies Inc. and others
6  *
7  * Modified for the Akaros operating system:
8  * Copyright (c) 2013-2014 The Regents of the University of California
9  * Copyright (c) 2013-2015 Google Inc.
10  *
11  * Permission is hereby granted, free of charge, to any person obtaining a copy
12  * of this software and associated documentation files (the "Software"), to deal
13  * in the Software without restriction, including without limitation the rights
14  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15  * copies of the Software, and to permit persons to whom the Software is
16  * furnished to do so, subject to the following conditions:
17  *
18  * The above copyright notice and this permission notice shall be included in
19  * all copies or substantial portions of the Software.
20  *
21  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL THE
24  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
27  * SOFTWARE. */
28
29 #include <slab.h>
30 #include <kmalloc.h>
31 #include <kref.h>
32 #include <string.h>
33 #include <stdio.h>
34 #include <assert.h>
35 #include <error.h>
36 #include <cpio.h>
37 #include <pmap.h>
38 #include <smp.h>
39 #include <net/ip.h>
40
41 #define PANIC_EXTRA(b)                                                  \
42 {                                                                       \
43         if ((b)->extra_len) {                                           \
44                 printblock(b);                                          \
45                 backtrace();                                            \
46                 panic("%s doesn't handle extra_data", __FUNCTION__);    \
47         }                                                               \
48 }
49
50 static uint32_t padblockcnt;
51 static uint32_t concatblockcnt;
52 static uint32_t pullupblockcnt;
53 static uint32_t copyblockcnt;
54 static uint32_t consumecnt;
55 static uint32_t producecnt;
56 static uint32_t qcopycnt;
57
58 static int debugging;
59
60 #define QDEBUG  if(0)
61
62 /*
63  *  IO queues
64  */
65
66 struct queue {
67         spinlock_t lock;;
68
69         struct block *bfirst;           /* buffer */
70         struct block *blast;
71
72         int dlen;                       /* data bytes in queue */
73         int limit;                      /* max bytes in queue */
74         int inilim;                     /* initial limit */
75         int state;
76         int eof;                        /* number of eofs read by user */
77         size_t bytes_read;
78
79         void (*kick) (void *);          /* restart output */
80         void (*bypass) (void *, struct block *); /* bypass queue altogether */
81         void *arg;                      /* argument to kick */
82
83         struct rendez rr;               /* process waiting to read */
84         struct rendez wr;               /* process waiting to write */
85         qio_wake_cb_t wake_cb;          /* callbacks for qio wakeups */
86         void *wake_data;
87
88         char err[ERRMAX];
89 };
90
91 enum {
92         Maxatomic = 64 * 1024,
93         QIO_CAN_ERR_SLEEP = (1 << 0),   /* can throw errors or block/sleep */
94         QIO_LIMIT = (1 << 1),           /* respect q->limit */
95         QIO_DROP_OVERFLOW = (1 << 2),   /* alternative to qdropoverflow */
96         QIO_JUST_ONE_BLOCK = (1 << 3),  /* when qbreading, just get one block */
97         QIO_NON_BLOCK = (1 << 4),       /* throw EAGAIN instead of blocking */
98         QIO_DONT_KICK = (1 << 5),       /* don't kick when waking */
99 };
100
101 unsigned int qiomaxatomic = Maxatomic;
102
103 static ssize_t __qbwrite(struct queue *q, struct block *b, int flags);
104 static struct block *__qbread(struct queue *q, size_t len, int qio_flags,
105                               int mem_flags);
106 static bool qwait_and_ilock(struct queue *q, int qio_flags);
107
108 /* Helper: fires a wake callback, sending 'filter' */
109 static void qwake_cb(struct queue *q, int filter)
110 {
111         if (q->wake_cb)
112                 q->wake_cb(q, q->wake_data, filter);
113 }
114
115 void ixsummary(void)
116 {
117         debugging ^= 1;
118         printd("pad %lu, concat %lu, pullup %lu, copy %lu\n",
119                    padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
120         printd("consume %lu, produce %lu, qcopy %lu\n",
121                    consumecnt, producecnt, qcopycnt);
122 }
123
124 /* Pad a block to the front (or the back if size is negative).  Returns the
125  * block pointer that you must use instead of bp.  i.e. bp = padblock(bp, x);
126  *
127  * This space is in the main body / header space, not the extra data.  In
128  * essence, the block has size bytes of uninitialized data placed in the front
129  * (left) of the old header data.  The caller needs to fill in that data,
130  * presumably with packet headers.  Any block[offset] in the old block is now at
131  * block[offset + size].
132  *
133  * Negative padding applies at the end of the block.  This means that there is
134  * space in block header for size bytes (in between wp and lim).  Given that all
135  * of the block 'padding' needs to be in the main header, that means we need to
136  * linearize the entire block, such that the end padding is in the main
137  * body/header space.
138  */
139 struct block *padblock(struct block *bp, int size)
140 {
141         int n;
142         struct block *nbp;
143
144         QDEBUG checkb(bp, "padblock 1");
145         if (size >= 0) {
146                 if (bp->rp - bp->base >= size) {
147                         block_add_to_offsets(bp, size);
148                         bp->rp -= size;
149                         return bp;
150                 }
151                 if (bp->next)
152                         panic("%s %p had a next", __func__, bp);
153                 n = BHLEN(bp);
154                 padblockcnt++;
155                 nbp = block_alloc(size + n, MEM_WAIT);
156                 block_copy_metadata(nbp, bp);
157                 block_replace_extras(nbp, bp);
158                 /* This effectively copies the old block main body such that we
159                  * know we have size bytes to the left of rp.  All of the
160                  * metadata offsets (e.g. tx_csum) are relative from this blob
161                  * of data: i.e. nbp->rp + size. */
162                 nbp->rp += size;
163                 nbp->wp = nbp->rp;
164                 memmove(nbp->wp, bp->rp, n);
165                 nbp->wp += n;
166                 freeb(bp);
167                 block_add_to_offsets(nbp, size);
168                 nbp->rp -= size;
169         } else {
170                 /* No one I know of calls this yet, so this is untested.  Maybe
171                  * we can remove it. */
172                 warn_once("pad attempt with negative size %d", size);
173                 size = -size;
174                 if (bp->next)
175                         panic("%s %p had a next", __func__, bp);
176                 if (bp->lim - bp->wp >= size)
177                         return bp;
178                 /* Negative padding goes after all data.  In essence, we'll need
179                  * to pull all block extra data up into the headers.  The
180                  * easiest thing is to linearize, then do the old algorithm.  We
181                  * may do extra allocations - if we ever use this, we can fix it
182                  * up.  Maybe make linearizeblock/copyblock take neg-padding. */
183                 if (bp->extra_len) {
184                         bp = linearizeblock(bp);
185                         if (bp->lim - bp->wp >= size)
186                                 return bp;
187                 }
188                 n = BLEN(bp);
189                 padblockcnt++;
190                 nbp = block_alloc(size + n, MEM_WAIT);
191                 block_copy_metadata(nbp, bp);
192                 memmove(nbp->wp, bp->rp, n);
193                 nbp->wp += n;
194                 freeb(bp);
195         }
196         QDEBUG checkb(nbp, "padblock 1");
197         return nbp;
198 }
199
200 /*
201  *  return count of bytes in a string of blocks
202  */
203 int blocklen(struct block *bp)
204 {
205         int len;
206
207         len = 0;
208         while (bp) {
209                 len += BLEN(bp);
210                 bp = bp->next;
211         }
212         return len;
213 }
214
215 /*
216  * return count of space in blocks
217  */
218 int blockalloclen(struct block *bp)
219 {
220         int len;
221
222         len = 0;
223         while (bp) {
224                 len += BALLOC(bp);
225                 bp = bp->next;
226         }
227         return len;
228 }
229
230 /*
231  *  copy the  string of blocks into
232  *  a single block and free the string
233  */
234 struct block *concatblock(struct block *bp)
235 {
236         int len;
237         struct block *nb, *f;
238
239         if (bp->next == 0)
240                 return bp;
241
242         /* probably use parts of qclone */
243         PANIC_EXTRA(bp);
244         nb = block_alloc(blocklen(bp), MEM_WAIT);
245         for (f = bp; f; f = f->next) {
246                 len = BLEN(f);
247                 memmove(nb->wp, f->rp, len);
248                 nb->wp += len;
249         }
250         concatblockcnt += BLEN(nb);
251         freeblist(bp);
252         QDEBUG checkb(nb, "concatblock 1");
253         return nb;
254 }
255
256 /* Makes an identical copy of the block, collapsing all the data into the block
257  * body.  It does not point to the contents of the original, it is a copy
258  * (unlike qclone).  Since we're copying, we might as well put the memory into
259  * one contiguous chunk. */
260 struct block *copyblock(struct block *bp, int mem_flags)
261 {
262         struct block *newb;
263         struct extra_bdata *ebd;
264         size_t amt;
265
266         QDEBUG checkb(bp, "copyblock 0");
267         newb = block_alloc(BLEN(bp), mem_flags);
268         if (!newb)
269                 return 0;
270         amt = block_copy_to_body(newb, bp->rp, BHLEN(bp));
271         assert(amt == BHLEN(bp));
272         for (int i = 0; i < bp->nr_extra_bufs; i++) {
273                 ebd = &bp->extra_data[i];
274                 if (!ebd->base || !ebd->len)
275                         continue;
276                 amt = block_copy_to_body(newb, (void*)ebd->base + ebd->off,
277                                          ebd->len);
278                 assert(amt == ebd->len);
279         }
280         block_copy_metadata(newb, bp);
281         copyblockcnt++;
282         QDEBUG checkb(newb, "copyblock 1");
283         return newb;
284 }
285
286 /* Returns a block with the remaining contents of b all in the main body of the
287  * returned block.  Replace old references to b with the returned value (which
288  * may still be 'b', if no change was needed. */
289 struct block *linearizeblock(struct block *b)
290 {
291         struct block *newb;
292
293         if (!b->extra_len)
294                 return b;
295         newb = copyblock(b, MEM_WAIT);
296         freeb(b);
297         return newb;
298 }
299
300 /* Helper for bookkeeping: we removed amt bytes from block b's ebd, which may
301  * have emptied it. */
302 static void block_consume_ebd_bytes(struct block *b, struct extra_bdata *ebd,
303                                     size_t amt)
304 {
305         ebd->len -= amt;
306         ebd->off += amt;
307         b->extra_len -= amt;
308         if (!ebd->len) {
309                 /* we don't actually have to decref here.  it's also
310                  * done in freeb().  this is the earliest we can free. */
311                 kfree((void*)ebd->base);
312                 ebd->base = ebd->off = 0;
313         }
314 }
315
316 /* Make sure the first block has at least n bytes in its main body.  Pulls up
317  * data from the *list* of blocks.  Returns 0 if there is not enough data in the
318  * block list. */
319 struct block *pullupblock(struct block *bp, int n)
320 {
321         int i, len, seglen;
322         struct block *nbp;
323         struct extra_bdata *ebd;
324
325         /*
326          *  this should almost always be true, it's
327          *  just to avoid every caller checking.
328          */
329         if (BHLEN(bp) >= n)
330                 return bp;
331
332         /* If there's no chance, just bail out now.  This might be slightly
333          * wasteful if there's a long blist that does have enough data. */
334         if (n > blocklen(bp))
335                 return 0;
336         /* a start at explicit main-body / header management */
337         if (bp->extra_len) {
338                 if (n > bp->lim - bp->rp) {
339                         /* would need to realloc a new block and copy everything
340                          * over. */
341                         panic("can't pullup %d bytes, no place to put it: bp->lim %p, bp->rp %p, bp->lim-bp->rp %d\n",
342                               n, bp->lim, bp->rp, bp->lim-bp->rp);
343                 }
344                 len = n - BHLEN(bp);
345                 /* Would need to recursively call this, or otherwise pull from
346                  * later blocks and put chunks of their data into the block
347                  * we're building. */
348                 if (len > bp->extra_len)
349                         panic("pullup more than extra (%d, %d, %d)\n",
350                               n, BHLEN(bp), bp->extra_len);
351                 QDEBUG checkb(bp, "before pullup");
352                 for (int i = 0; (i < bp->nr_extra_bufs) && len; i++) {
353                         ebd = &bp->extra_data[i];
354                         if (!ebd->base || !ebd->len)
355                                 continue;
356                         seglen = MIN(ebd->len, len);
357                         memcpy(bp->wp, (void*)(ebd->base + ebd->off), seglen);
358                         bp->wp += seglen;
359                         len -= seglen;
360                         ebd->len -= seglen;
361                         ebd->off += seglen;
362                         bp->extra_len -= seglen;
363                         if (ebd->len == 0) {
364                                 kfree((void *)ebd->base);
365                                 ebd->off = 0;
366                                 ebd->base = 0;
367                         }
368                 }
369                 /* maybe just call pullupblock recursively here */
370                 if (len)
371                         panic("pullup %d bytes overdrawn\n", len);
372                 QDEBUG checkb(bp, "after pullup");
373                 return bp;
374         }
375
376         /*
377          *  if not enough room in the first block,
378          *  add another to the front of the list.
379          */
380         if (bp->lim - bp->rp < n) {
381                 nbp = block_alloc(n, MEM_WAIT);
382                 nbp->next = bp;
383                 bp = nbp;
384         }
385
386         /*
387          *  copy bytes from the trailing blocks into the first
388          */
389         n -= BLEN(bp);
390         while ((nbp = bp->next)) {
391                 i = BLEN(nbp);
392                 if (i > n) {
393                         memmove(bp->wp, nbp->rp, n);
394                         pullupblockcnt++;
395                         bp->wp += n;
396                         nbp->rp += n;
397                         QDEBUG checkb(bp, "pullupblock 1");
398                         return bp;
399                 } else {
400                         memmove(bp->wp, nbp->rp, i);
401                         pullupblockcnt++;
402                         bp->wp += i;
403                         bp->next = nbp->next;
404                         nbp->next = 0;
405                         freeb(nbp);
406                         n -= i;
407                         if (n == 0) {
408                                 QDEBUG checkb(bp, "pullupblock 2");
409                                 return bp;
410                         }
411                 }
412         }
413         freeb(bp);
414         return 0;
415 }
416
417 /*
418  *  make sure the first block has at least n bytes in its main body
419  */
420 struct block *pullupqueue(struct queue *q, int n)
421 {
422         struct block *b;
423
424         /* TODO: lock to protect the queue links? */
425         if ((BHLEN(q->bfirst) >= n))
426                 return q->bfirst;
427         q->bfirst = pullupblock(q->bfirst, n);
428         for (b = q->bfirst; b != NULL && b->next != NULL; b = b->next) ;
429         q->blast = b;
430         return q->bfirst;
431 }
432
433 /* throw away count bytes from the front of
434  * block's extradata.  Returns count of bytes
435  * thrown away
436  */
437
438 static int pullext(struct block *bp, int count)
439 {
440         struct extra_bdata *ed;
441         int i, rem, bytes = 0;
442
443         for (i = 0; bp->extra_len && count && i < bp->nr_extra_bufs; i++) {
444                 ed = &bp->extra_data[i];
445                 rem = MIN(count, ed->len);
446                 bp->extra_len -= rem;
447                 count -= rem;
448                 bytes += rem;
449                 ed->off += rem;
450                 ed->len -= rem;
451                 if (ed->len == 0) {
452                         kfree((void *)ed->base);
453                         ed->base = 0;
454                         ed->off = 0;
455                 }
456         }
457         return bytes;
458 }
459
460 /* throw away count bytes from the end of a
461  * block's extradata.  Returns count of bytes
462  * thrown away
463  */
464
465 static int dropext(struct block *bp, int count)
466 {
467         struct extra_bdata *ed;
468         int i, rem, bytes = 0;
469
470         for (i = bp->nr_extra_bufs - 1; bp->extra_len && count && i >= 0; i--) {
471                 ed = &bp->extra_data[i];
472                 rem = MIN(count, ed->len);
473                 bp->extra_len -= rem;
474                 count -= rem;
475                 bytes += rem;
476                 ed->len -= rem;
477                 if (ed->len == 0) {
478                         kfree((void *)ed->base);
479                         ed->base = 0;
480                         ed->off = 0;
481                 }
482         }
483         return bytes;
484 }
485
486 /*
487  *  throw away up to count bytes from a
488  *  list of blocks.  Return count of bytes
489  *  thrown away.
490  */
491 static int _pullblock(struct block **bph, int count, int free)
492 {
493         struct block *bp;
494         int n, bytes;
495
496         bytes = 0;
497         if (bph == NULL)
498                 return 0;
499
500         while (*bph != NULL && count != 0) {
501                 bp = *bph;
502
503                 n = MIN(BHLEN(bp), count);
504                 bytes += n;
505                 count -= n;
506                 bp->rp += n;
507                 n = pullext(bp, count);
508                 bytes += n;
509                 count -= n;
510                 QDEBUG checkb(bp, "pullblock ");
511                 if (BLEN(bp) == 0 && (free || count)) {
512                         *bph = bp->next;
513                         bp->next = NULL;
514                         freeb(bp);
515                 }
516         }
517         return bytes;
518 }
519
520 int pullblock(struct block **bph, int count)
521 {
522         return _pullblock(bph, count, 1);
523 }
524
525 /*
526  *  trim to len bytes starting at offset
527  */
528 struct block *trimblock(struct block *bp, int offset, int len)
529 {
530         uint32_t l, trim;
531         int olen = len;
532
533         QDEBUG checkb(bp, "trimblock 1");
534         if (blocklen(bp) < offset + len) {
535                 freeblist(bp);
536                 return NULL;
537         }
538
539         l =_pullblock(&bp, offset, 0);
540         if (bp == NULL)
541                 return NULL;
542         if (l != offset) {
543                 freeblist(bp);
544                 return NULL;
545         }
546
547         while ((l = BLEN(bp)) < len) {
548                 len -= l;
549                 bp = bp->next;
550         }
551
552         trim = BLEN(bp) - len;
553         trim -= dropext(bp, trim);
554         bp->wp -= trim;
555
556         if (bp->next) {
557                 freeblist(bp->next);
558                 bp->next = NULL;
559         }
560         return bp;
561 }
562
563 /* Adjust block @bp so that its size is exactly @len.
564  * If the size is increased, fill in the new contents with zeros.
565  * If the size is decreased, discard some of the old contents at the tail. */
566 struct block *adjustblock(struct block *bp, int len)
567 {
568         struct extra_bdata *ebd;
569         void *buf;
570         int i;
571
572         if (len < 0) {
573                 freeb(bp);
574                 return NULL;
575         }
576
577         if (len == BLEN(bp))
578                 return bp;
579
580         /* Shrink within block main body. */
581         if (len <= BHLEN(bp)) {
582                 free_block_extra(bp);
583                 bp->wp = bp->rp + len;
584                 QDEBUG checkb(bp, "adjustblock 1");
585                 return bp;
586         }
587
588         /* Need to grow. */
589         if (len > BLEN(bp)) {
590                 /* Grow within block main body. */
591                 if (bp->extra_len == 0 && bp->rp + len <= bp->lim) {
592                         memset(bp->wp, 0, len - BLEN(bp));
593                         bp->wp = bp->rp + len;
594                         QDEBUG checkb(bp, "adjustblock 2");
595                         return bp;
596                 }
597                 /* Grow with extra data buffers. */
598                 buf = kzmalloc(len - BLEN(bp), MEM_WAIT);
599                 block_append_extra(bp, (uintptr_t)buf, 0, len - BLEN(bp),
600                                    MEM_WAIT);
601                 QDEBUG checkb(bp, "adjustblock 3");
602                 return bp;
603         }
604
605         /* Shrink extra data buffers.
606          * len is how much of ebd we need to keep.
607          * extra_len is re-accumulated. */
608         assert(bp->extra_len > 0);
609         len -= BHLEN(bp);
610         bp->extra_len = 0;
611         for (i = 0; i < bp->nr_extra_bufs; i++) {
612                 ebd = &bp->extra_data[i];
613                 if (len <= ebd->len)
614                         break;
615                 len -= ebd->len;
616                 bp->extra_len += ebd->len;
617         }
618         /* If len becomes zero, extra_data[i] should be freed. */
619         if (len > 0) {
620                 ebd = &bp->extra_data[i];
621                 ebd->len = len;
622                 bp->extra_len += ebd->len;
623                 i++;
624         }
625         for (; i < bp->nr_extra_bufs; i++) {
626                 ebd = &bp->extra_data[i];
627                 if (ebd->base)
628                         kfree((void*)ebd->base);
629                 ebd->base = ebd->off = ebd->len = 0;
630         }
631         QDEBUG checkb(bp, "adjustblock 4");
632         return bp;
633 }
634
635 /* Helper: removes and returns the first block from q */
636 static struct block *pop_first_block(struct queue *q)
637 {
638         struct block *b = q->bfirst;
639
640         q->dlen -= BLEN(b);
641         q->bytes_read += BLEN(b);
642         q->bfirst = b->next;
643         b->next = 0;
644         return b;
645 }
646
647 /* Accounting helper.  Block b in q lost amt extra_data */
648 static void block_and_q_lost_extra(struct block *b, struct queue *q, size_t amt)
649 {
650         b->extra_len -= amt;
651         q->dlen -= amt;
652         q->bytes_read += amt;
653 }
654
655 /* Helper: moves ebd from a block (in from_q) to another block.  The *ebd is
656  * fixed in 'from', so we move its contents and zero it out in 'from'.
657  *
658  * Returns the length moved (0 on failure). */
659 static size_t move_ebd(struct extra_bdata *ebd, struct block *to,
660                        struct block *from, struct queue *from_q)
661 {
662         size_t ret = ebd->len;
663
664         if (block_append_extra(to, ebd->base, ebd->off, ebd->len, MEM_ATOMIC))
665                 return 0;
666         block_and_q_lost_extra(from, from_q, ebd->len);
667         ebd->base = ebd->len = ebd->off = 0;
668         return ret;
669 }
670
671 /* Copy up to len bytes from q->bfirst to @to, leaving the block in place.  May
672  * return with less than len, but greater than 0, even if there is more
673  * available in q.
674  *
675  * At any moment that we have copied anything and things are tricky, we can just
676  * return.  The trickiness comes from a bunch of variables: is the main body
677  * empty?  How do we split the ebd?  If our alloc fails, then we can fall back
678  * to @to's main body, but only if we haven't used it yet. */
679 static size_t copy_from_first_block(struct queue *q, struct block *to,
680                                     size_t len)
681 {
682         struct block *from = q->bfirst;
683         size_t copy_amt, amt;
684         struct extra_bdata *ebd;
685
686         assert(len < BLEN(from));       /* sanity */
687         /* Try to extract from the main body */
688         copy_amt = MIN(BHLEN(from), len);
689         if (copy_amt) {
690                 copy_amt = block_copy_to_body(to, from->rp, copy_amt);
691                 from->rp += copy_amt;
692                 /* We only change dlen, (data len), not q->len, since the q
693                  * still has the same block memory allocation (no kfrees
694                  * happened) */
695                 q->dlen -= copy_amt;
696                 q->bytes_read += copy_amt;
697         }
698         /* Try to extract the remainder from the extra data */
699         len -= copy_amt;
700         for (int i = 0; (i < from->nr_extra_bufs) && len; i++) {
701                 ebd = &from->extra_data[i];
702                 if (!ebd->base || !ebd->len)
703                         continue;
704                 if (len >= ebd->len) {
705                         amt = move_ebd(ebd, to, from, q);
706                         if (!amt) {
707                                 /* our internal alloc could have failed.   this
708                                  * ebd is now the last one we'll consider.
709                                  * let's handle it separately and put it in the
710                                  * main body. */
711                                 if (copy_amt)
712                                         return copy_amt;
713                                 copy_amt = block_copy_to_body(to,
714                                                               (void*)ebd->base +
715                                                               ebd->off,
716                                                               ebd->len);
717                                 block_and_q_lost_extra(from, q, copy_amt);
718                                 break;
719                         }
720                         len -= amt;
721                         copy_amt += amt;
722                         continue;
723                 } else {
724                         /* If we're here, we reached our final ebd, which we'll
725                          * need to split to get anything from it. */
726                         if (copy_amt)
727                                 return copy_amt;
728                         copy_amt = block_copy_to_body(to, (void*)ebd->base +
729                                                       ebd->off, len);
730                         ebd->off += copy_amt;
731                         ebd->len -= copy_amt;
732                         block_and_q_lost_extra(from, q, copy_amt);
733                         break;
734                 }
735         }
736         if (len)
737                 assert(copy_amt);       /* sanity */
738         return copy_amt;
739 }
740
741 /* Return codes for __qbread and __try_qbread. */
742 enum {
743         QBR_OK,
744         QBR_FAIL,
745         QBR_SPARE,      /* we need a spare block */
746         QBR_AGAIN,      /* do it again, we are coalescing blocks */
747 };
748
749 /* Helper and back-end for __qbread: extracts and returns a list of blocks
750  * containing up to len bytes.  It may contain less than len even if q has more
751  * data.
752  *
753  * Returns a code interpreted by __qbread, and the returned blist in ret. */
754 static int __try_qbread(struct queue *q, size_t len, int qio_flags,
755                         struct block **real_ret, struct block *spare)
756 {
757         struct block *ret, *ret_last, *first;
758         size_t blen;
759         bool was_unwritable = FALSE;
760
761         if (qio_flags & QIO_CAN_ERR_SLEEP) {
762                 if (!qwait_and_ilock(q, qio_flags)) {
763                         spin_unlock_irqsave(&q->lock);
764                         return QBR_FAIL;
765                 }
766                 /* we qwaited and still hold the lock, so the q is not empty */
767                 first = q->bfirst;
768         } else {
769                 spin_lock_irqsave(&q->lock);
770                 first = q->bfirst;
771                 if (!first) {
772                         spin_unlock_irqsave(&q->lock);
773                         return QBR_FAIL;
774                 }
775         }
776         /* We need to check before adjusting q->len.  We're checking the
777          * writer's sleep condition / tap condition.  When set, we *might* be
778          * making an edge transition (from unwritable to writable), which needs
779          * to wake and fire taps.  But, our read might not drain the queue below
780          * q->lim.  We'll check again later to see if we should really wake
781          * them.  */
782         was_unwritable = !qwritable(q);
783         blen = BLEN(first);
784         if ((q->state & Qcoalesce) && (blen == 0)) {
785                 freeb(pop_first_block(q));
786                 spin_unlock_irqsave(&q->lock);
787                 /* Need to retry to make sure we have a first block */
788                 return QBR_AGAIN;
789         }
790         /* Qmsg: just return the first block.  Be careful, since our caller
791          * might not read all of the block and thus drop bytes.  Similar to
792          * SOCK_DGRAM. */
793         if (q->state & Qmsg) {
794                 ret = pop_first_block(q);
795                 goto out_ok;
796         }
797         /* Let's get at least something first - makes the code easier.  This
798          * way, we'll only ever split the block once. */
799         if (blen <= len) {
800                 ret = pop_first_block(q);
801                 len -= blen;
802         } else {
803                 /* need to split the block.  we won't actually take the first
804                  * block out of the queue - we're just extracting a little bit.
805                  */
806                 if (!spare) {
807                         /* We have nothing and need a spare block.  Retry! */
808                         spin_unlock_irqsave(&q->lock);
809                         return QBR_SPARE;
810                 }
811                 copy_from_first_block(q, spare, len);
812                 ret = spare;
813                 goto out_ok;
814         }
815         /* At this point, we just grabbed the first block.  We can try to grab
816          * some more, up to len (if they want). */
817         if (qio_flags & QIO_JUST_ONE_BLOCK)
818                 goto out_ok;
819         ret_last = ret;
820         while (q->bfirst && (len > 0)) {
821                 blen = BLEN(q->bfirst);
822                 if ((q->state & Qcoalesce) && (blen == 0)) {
823                         /* remove the intermediate 0 blocks */
824                         freeb(pop_first_block(q));
825                         continue;
826                 }
827                 if (blen > len) {
828                         /* We could try to split the block, but that's a huge
829                          * pain.  For instance, we might need to move the main
830                          * body of b into an extra_data of ret_last.  lots of
831                          * ways for that to fail, and lots of cases to consider.
832                          * Easier to just bail out.  This is why I did the first
833                          * block above: we don't need to worry about this. */
834                          break;
835                 }
836                 ret_last->next = pop_first_block(q);
837                 ret_last = ret_last->next;
838                 len -= blen;
839         }
840 out_ok:
841         /* Don't wake them up or fire tap if we didn't drain enough. */
842         if (!qwritable(q))
843                 was_unwritable = FALSE;
844         spin_unlock_irqsave(&q->lock);
845         if (was_unwritable) {
846                 if (q->kick && !(qio_flags & QIO_DONT_KICK))
847                         q->kick(q->arg);
848                 rendez_wakeup(&q->wr);
849                 qwake_cb(q, FDTAP_FILT_WRITABLE);
850         }
851         *real_ret = ret;
852         return QBR_OK;
853 }
854
855 /* Helper and front-end for __try_qbread: extracts and returns a list of blocks
856  * containing up to len bytes.  It may contain less than len even if q has more
857  * data.
858  *
859  * Returns 0 if the q is closed, if it would require blocking and !CAN_BLOCK, or
860  * if it required a spare and the memory allocation failed.
861  *
862  * Technically, there's a weird corner case with !Qcoalesce and Qmsg where you
863  * could get a zero length block back. */
864 static struct block *__qbread(struct queue *q, size_t len, int qio_flags,
865                               int mem_flags)
866 {
867         ERRSTACK(1);
868         struct block *ret = 0;
869         struct block *volatile spare = 0;       /* volatile for the waserror */
870
871         /* __try_qbread can throw, based on qio flags. */
872         if ((qio_flags & QIO_CAN_ERR_SLEEP) && waserror()) {
873                 if (spare)
874                         freeb(spare);
875                 nexterror();
876         }
877         while (1) {
878                 switch (__try_qbread(q, len, qio_flags, &ret, spare)) {
879                 case QBR_OK:
880                 case QBR_FAIL:
881                         if (spare && (ret != spare))
882                                 freeb(spare);
883                         goto out_ret;
884                 case QBR_SPARE:
885                         assert(!spare);
886                         /* Due to some nastiness, we need a fresh block so we
887                          * can read out anything from the queue.  'len' seems
888                          * like a reasonable amount.  Maybe we can get away with
889                          * less. */
890                         spare = block_alloc(len, mem_flags);
891                         if (!spare) {
892                                 /* Careful here: a memory failure (possible with
893                                  * MEM_ATOMIC) could look like 'no data in the
894                                  * queue' (QBR_FAIL).  The only one who does is
895                                  * this qget(), who happens to know that we
896                                  * won't need a spare, due to the len argument.
897                                  * Spares are only needed when we need to split
898                                  * a block. */
899                                 ret = 0;
900                                 goto out_ret;
901                         }
902                         break;
903                 case QBR_AGAIN:
904                         /* if the first block is 0 and we are Qcoalesce, then
905                          * we'll need to try again.  We bounce out of __try so
906                          * we can perform the "is there a block" logic again
907                          * from the top. */
908                         break;
909                 }
910         }
911         assert(0);
912 out_ret:
913         if (qio_flags & QIO_CAN_ERR_SLEEP)
914                 poperror();
915         return ret;
916 }
917
918 /*
919  *  get next block from a queue, return null if nothing there
920  */
921 struct block *qget(struct queue *q)
922 {
923         /* since len == SIZE_MAX, we should never need to do a mem alloc */
924         return __qbread(q, SIZE_MAX, QIO_JUST_ONE_BLOCK, MEM_ATOMIC);
925 }
926
927 /* Throw away the next 'len' bytes in the queue returning the number actually
928  * discarded.
929  *
930  * If the bytes are in the queue, then they must be discarded.  The only time to
931  * return less than len is if the q itself has less than len bytes.
932  *
933  * This won't trigger a kick when waking up any sleepers.  This seems to be Plan
934  * 9's intent, since the TCP stack will deadlock if qdiscard kicks. */
935 size_t qdiscard(struct queue *q, size_t len)
936 {
937         struct block *blist;
938         size_t removed_amt;
939         size_t sofar = 0;
940
941         /* This is racy.  There could be multiple qdiscarders or other
942          * consumers, where the consumption could be interleaved. */
943         while (qlen(q) && len) {
944                 blist = __qbread(q, len, QIO_DONT_KICK, MEM_WAIT);
945                 removed_amt = freeblist(blist);
946                 sofar += removed_amt;
947                 len -= removed_amt;
948         }
949         return sofar;
950 }
951
952 ssize_t qpass(struct queue *q, struct block *b)
953 {
954         return __qbwrite(q, b, QIO_LIMIT | QIO_DROP_OVERFLOW);
955 }
956
957 ssize_t qpassnolim(struct queue *q, struct block *b)
958 {
959         return __qbwrite(q, b, 0);
960 }
961
962 /*
963  *  if the allocated space is way out of line with the used
964  *  space, reallocate to a smaller block
965  */
966 struct block *packblock(struct block *bp)
967 {
968         struct block **l, *nbp;
969         int n;
970
971         if (bp->extra_len)
972                 return bp;
973         for (l = &bp; *l; l = &(*l)->next) {
974                 nbp = *l;
975                 n = BLEN(nbp);
976                 if ((n << 2) < BALLOC(nbp)) {
977                         *l = block_alloc(n, MEM_WAIT);
978                         memmove((*l)->wp, nbp->rp, n);
979                         (*l)->wp += n;
980                         (*l)->next = nbp->next;
981                         nbp->next = NULL;
982                         freeb(nbp);
983                 }
984         }
985
986         return bp;
987 }
988
989 /* Add an extra_data entry to newb at newb_idx pointing to b's body, starting at
990  * body_rp, for up to len.  Returns the len consumed.
991  *
992  * The base is 'b', so that we can kfree it later.  This currently ties us to
993  * using kfree for the release method for all extra_data.
994  *
995  * It is possible to have a body size that is 0, if there is no offset, and
996  * b->wp == b->rp.  This will have an extra data entry of 0 length. */
997 static size_t point_to_body(struct block *b, uint8_t *body_rp,
998                             struct block *newb, unsigned int newb_idx,
999                             size_t len)
1000 {
1001         struct extra_bdata *ebd = &newb->extra_data[newb_idx];
1002
1003         assert(newb_idx < newb->nr_extra_bufs);
1004
1005         kmalloc_incref(b);
1006         ebd->base = (uintptr_t)b;
1007         ebd->off = (uint32_t)(body_rp - (uint8_t*)b);
1008         ebd->len = MIN(b->wp - body_rp, len);   /* think of body_rp as b->rp */
1009         assert((int)ebd->len >= 0);
1010         newb->extra_len += ebd->len;
1011         return ebd->len;
1012 }
1013
1014 /* Add an extra_data entry to newb at newb_idx pointing to b's b_idx'th
1015  * extra_data buf, at b_off within that buffer, for up to len.  Returns the len
1016  * consumed.
1017  *
1018  * We can have blocks with 0 length, but they are still refcnt'd.  See above. */
1019 static size_t point_to_buf(struct block *b, unsigned int b_idx, uint32_t b_off,
1020                            struct block *newb, unsigned int newb_idx,
1021                            size_t len)
1022 {
1023         struct extra_bdata *n_ebd = &newb->extra_data[newb_idx];
1024         struct extra_bdata *b_ebd = &b->extra_data[b_idx];
1025
1026         assert(b_idx < b->nr_extra_bufs);
1027         assert(newb_idx < newb->nr_extra_bufs);
1028
1029         kmalloc_incref((void*)b_ebd->base);
1030         n_ebd->base = b_ebd->base;
1031         n_ebd->off = b_ebd->off + b_off;
1032         n_ebd->len = MIN(b_ebd->len - b_off, len);
1033         newb->extra_len += n_ebd->len;
1034         return n_ebd->len;
1035 }
1036
1037 /* given a string of blocks, sets up the new block's extra_data such that it
1038  * *points* to the contents of the blist [offset, len + offset).  This does not
1039  * make a separate copy of the contents of the blist.
1040  *
1041  * returns 0 on success.  the only failure is if the extra_data array was too
1042  * small, so this returns a positive integer saying how big the extra_data needs
1043  * to be.
1044  *
1045  * callers are responsible for protecting the list structure. */
1046 static int __blist_clone_to(struct block *blist, struct block *newb, int len,
1047                             uint32_t offset)
1048 {
1049         struct block *b, *first;
1050         unsigned int nr_bufs = 0;
1051         unsigned int b_idx, newb_idx = 0;
1052         uint8_t *first_main_body = 0;
1053         ssize_t sofar = 0;
1054
1055         /* find the first block; keep offset relative to the latest b in the
1056          * list */
1057         for (b = blist; b; b = b->next) {
1058                 if (BLEN(b) > offset)
1059                         break;
1060                 offset -= BLEN(b);
1061         }
1062         /* qcopy semantics: if you asked for an offset outside the block list,
1063          * you get an empty block back */
1064         if (!b)
1065                 return 0;
1066         first = b;
1067         sofar -= offset; /* don't count the remaining offset in the first b */
1068         /* upper bound for how many buffers we'll need in newb */
1069         for (/* b is set*/; b; b = b->next) {
1070                 nr_bufs += BHLEN(b) ? 1 : 0;
1071                 /* still assuming nr_extra == nr_valid */
1072                 nr_bufs += b->nr_extra_bufs;
1073                 sofar += BLEN(b);
1074                 if (sofar > len)
1075                         break;
1076         }
1077         /* we might be holding a spinlock here, so we won't wait for kmalloc */
1078         if (block_add_extd(newb, nr_bufs, 0) != 0) {
1079                 /* caller will need to alloc these, then re-call us */
1080                 return nr_bufs;
1081         }
1082         for (b = first; b && len; b = b->next) {
1083                 b_idx = 0;
1084                 if (offset) {
1085                         if (offset < BHLEN(b)) {
1086                                 /* off is in the main body */
1087                                 len -= point_to_body(b, b->rp + offset, newb,
1088                                                      newb_idx, len);
1089                                 newb_idx++;
1090                         } else {
1091                                 /* off is in one of the buffers (or just past
1092                                  * the last one).  we're not going to point to
1093                                  * b's main body at all. */
1094                                 offset -= BHLEN(b);
1095                                 assert(b->extra_data);
1096                                 /* assuming these extrabufs are packed, or at
1097                                  * least that len isn't gibberish */
1098                                 while (b->extra_data[b_idx].len <= offset) {
1099                                         offset -= b->extra_data[b_idx].len;
1100                                         b_idx++;
1101                                 }
1102                                 /* now offset is set to our offset in the
1103                                  * b_idx'th buf */
1104                                 len -= point_to_buf(b, b_idx, offset, newb,
1105                                                     newb_idx, len);
1106                                 newb_idx++;
1107                                 b_idx++;
1108                         }
1109                         offset = 0;
1110                 } else {
1111                         if (BHLEN(b)) {
1112                                 len -= point_to_body(b, b->rp, newb, newb_idx,
1113                                                      len);
1114                                 newb_idx++;
1115                         }
1116                 }
1117                 /* knock out all remaining bufs.  we only did one point_to_ op
1118                  * by now, and any point_to_ could be our last if it consumed
1119                  * all of len. */
1120                 for (int i = b_idx; (i < b->nr_extra_bufs) && len; i++) {
1121                         len -= point_to_buf(b, i, 0, newb, newb_idx, len);
1122                         newb_idx++;
1123                 }
1124         }
1125         return 0;
1126 }
1127
1128 struct block *blist_clone(struct block *blist, int header_len, int len,
1129                           uint32_t offset)
1130 {
1131         int ret;
1132         struct block *newb = block_alloc(header_len, MEM_WAIT);
1133
1134         do {
1135                 ret = __blist_clone_to(blist, newb, len, offset);
1136                 if (ret)
1137                         block_add_extd(newb, ret, MEM_WAIT);
1138         } while (ret);
1139         return newb;
1140 }
1141
1142 /* given a queue, makes a single block with header_len reserved space in the
1143  * block main body, and the contents of [offset, len + offset) pointed to in the
1144  * new blocks ext_data.  This does not make a copy of the q's contents, though
1145  * you do have a ref count on the memory. */
1146 struct block *qclone(struct queue *q, int header_len, int len, uint32_t offset)
1147 {
1148         int ret;
1149         struct block *newb = block_alloc(header_len, MEM_WAIT);
1150         /* the while loop should rarely be used: it would require someone
1151          * concurrently adding to the queue. */
1152         do {
1153                 /* TODO: RCU protect the q list (b->next) (need read lock) */
1154                 spin_lock_irqsave(&q->lock);
1155                 ret = __blist_clone_to(q->bfirst, newb, len, offset);
1156                 spin_unlock_irqsave(&q->lock);
1157                 if (ret)
1158                         block_add_extd(newb, ret, MEM_WAIT);
1159         } while (ret);
1160         return newb;
1161 }
1162
1163 struct block *qcopy(struct queue *q, int len, uint32_t offset)
1164 {
1165         return qclone(q, 0, len, offset);
1166 }
1167
1168 static void qinit_common(struct queue *q)
1169 {
1170         spinlock_init_irqsave(&q->lock);
1171         rendez_init(&q->rr);
1172         rendez_init(&q->wr);
1173 }
1174
1175 /*
1176  *  called by non-interrupt code
1177  */
1178 struct queue *qopen(int limit, int msg, void (*kick) (void *), void *arg)
1179 {
1180         struct queue *q;
1181
1182         q = kzmalloc(sizeof(struct queue), 0);
1183         if (q == 0)
1184                 return 0;
1185         qinit_common(q);
1186
1187         q->limit = q->inilim = limit;
1188         q->kick = kick;
1189         q->arg = arg;
1190         q->state = msg;
1191         q->eof = 0;
1192
1193         return q;
1194 }
1195
1196 /* open a queue to be bypassed */
1197 struct queue *qbypass(void (*bypass) (void *, struct block *), void *arg)
1198 {
1199         struct queue *q;
1200
1201         q = kzmalloc(sizeof(struct queue), 0);
1202         if (q == 0)
1203                 return 0;
1204         qinit_common(q);
1205
1206         q->limit = 0;
1207         q->arg = arg;
1208         q->bypass = bypass;
1209         q->state = 0;
1210
1211         return q;
1212 }
1213
1214 static int notempty(void *a)
1215 {
1216         struct queue *q = a;
1217
1218         return (q->state & Qclosed) || q->bfirst != 0;
1219 }
1220
1221 /* Block, waiting for the queue to be non-empty or closed.  Returns with
1222  * the spinlock held.  Returns TRUE when there queue is not empty, FALSE if it
1223  * was naturally closed.  Throws an error o/w. */
1224 static bool qwait_and_ilock(struct queue *q, int qio_flags)
1225 {
1226         while (1) {
1227                 spin_lock_irqsave(&q->lock);
1228                 if (q->bfirst != NULL)
1229                         return TRUE;
1230                 if (q->state & Qclosed) {
1231                         if (++q->eof > 3) {
1232                                 spin_unlock_irqsave(&q->lock);
1233                                 error(EPIPE,
1234                                       "multiple reads on a closed queue");
1235                         }
1236                         if (q->err[0]) {
1237                                 spin_unlock_irqsave(&q->lock);
1238                                 error(EPIPE, q->err);
1239                         }
1240                         return FALSE;
1241                 }
1242                 if (qio_flags & QIO_NON_BLOCK) {
1243                         spin_unlock_irqsave(&q->lock);
1244                         error(EAGAIN, "queue empty");
1245                 }
1246                 spin_unlock_irqsave(&q->lock);
1247                 /* As with the producer side, we check for a condition while
1248                  * holding the q->lock, decide to sleep, then unlock.  It's like
1249                  * the "check, signal, check again" pattern, but we do it
1250                  * conditionally.  Both sides agree synchronously to do it, and
1251                  * those decisions are made while holding q->lock.  I think this
1252                  * is OK.
1253                  *
1254                  * The invariant is that no reader sleeps when the queue has
1255                  * data.  While holding the rendez lock, if we see there's no
1256                  * data, we'll sleep.  Since we saw there was no data, the next
1257                  * writer will see (or already saw) no data, and then the writer
1258                  * decides to rendez_wake, which will grab the rendez lock.  If
1259                  * the writer already did that, then we'll see notempty when we
1260                  * do our check-again. */
1261                 rendez_sleep(&q->rr, notempty, q);
1262         }
1263 }
1264
1265 /*
1266  * add a block list to a queue
1267  * XXX basically the same as enqueue blist, and has no locking!
1268  */
1269 void qaddlist(struct queue *q, struct block *b)
1270 {
1271         /* TODO: q lock? */
1272         /* queue the block */
1273         if (q->bfirst)
1274                 q->blast->next = b;
1275         else
1276                 q->bfirst = b;
1277         q->dlen += blocklen(b);
1278         while (b->next)
1279                 b = b->next;
1280         q->blast = b;
1281 }
1282
1283 static size_t read_from_block(struct block *b, uint8_t *to, size_t amt)
1284 {
1285         size_t copy_amt, retval = 0;
1286         struct extra_bdata *ebd;
1287
1288         copy_amt = MIN(BHLEN(b), amt);
1289         memcpy(to, b->rp, copy_amt);
1290         /* advance the rp, since this block might not be completely consumed and
1291          * future reads need to know where to pick up from */
1292         b->rp += copy_amt;
1293         to += copy_amt;
1294         amt -= copy_amt;
1295         retval += copy_amt;
1296         for (int i = 0; (i < b->nr_extra_bufs) && amt; i++) {
1297                 ebd = &b->extra_data[i];
1298                 /* skip empty entires.  if we track this in the struct block, we
1299                  * can just start the for loop early */
1300                 if (!ebd->base || !ebd->len)
1301                         continue;
1302                 copy_amt = MIN(ebd->len, amt);
1303                 memcpy(to, (void*)(ebd->base + ebd->off), copy_amt);
1304                 /* we're actually consuming the entries, just like how we
1305                  * advance rp up above, and might only consume part of one. */
1306                 block_consume_ebd_bytes(b, ebd, copy_amt);
1307                 to += copy_amt;
1308                 amt -= copy_amt;
1309                 retval += copy_amt;
1310         }
1311         return retval;
1312 }
1313
1314 /*
1315  *  copy the contents of a string of blocks into
1316  *  memory.  emptied blocks are freed.  return
1317  *  pointer to first unconsumed block.
1318  */
1319 struct block *bl2mem(uint8_t * p, struct block *b, int n)
1320 {
1321         int i;
1322         struct block *next;
1323
1324         /* could be slicker here, since read_from_block is smart */
1325         for (; b != NULL; b = next) {
1326                 i = BLEN(b);
1327                 if (i > n) {
1328                         /* partial block, consume some */
1329                         read_from_block(b, p, n);
1330                         return b;
1331                 }
1332                 /* full block, consume all and move on */
1333                 i = read_from_block(b, p, i);
1334                 n -= i;
1335                 p += i;
1336                 next = b->next;
1337                 b->next = NULL;
1338                 freeb(b);
1339         }
1340         return NULL;
1341 }
1342
1343 /* Extract the contents of all blocks and copy to va, up to len.  Returns the
1344  * actual amount copied. */
1345 static size_t read_all_blocks(struct block *b, void *va, size_t len)
1346 {
1347         size_t sofar = 0;
1348         struct block *next;
1349
1350         do {
1351                 assert(va);
1352                 assert(b->rp);
1353                 sofar += read_from_block(b, va + sofar, len - sofar);
1354                 if (BLEN(b) && b->next)
1355                         panic("Failed to drain entire block (Qmsg?) but had a next!");
1356                 next = b->next;
1357                 b->next = NULL;
1358                 freeb(b);
1359                 b = next;
1360         } while (b);
1361         return sofar;
1362 }
1363
1364 /*
1365  *  copy the contents of memory into a string of blocks.
1366  *  return NULL on error.
1367  */
1368 struct block *mem2bl(uint8_t * p, int len)
1369 {
1370         ERRSTACK(1);
1371         int n;
1372         struct block *b, *first, **l;
1373
1374         first = NULL;
1375         l = &first;
1376         if (waserror()) {
1377                 freeblist(first);
1378                 nexterror();
1379         }
1380         do {
1381                 n = len;
1382                 if (n > Maxatomic)
1383                         n = Maxatomic;
1384
1385                 *l = b = block_alloc(n, MEM_WAIT);
1386                 /* TODO consider extra_data */
1387                 memmove(b->wp, p, n);
1388                 b->wp += n;
1389                 p += n;
1390                 len -= n;
1391                 l = &b->next;
1392         } while (len > 0);
1393         poperror();
1394
1395         return first;
1396 }
1397
1398 /*
1399  *  put a block back to the front of the queue
1400  *  called with q ilocked
1401  */
1402 void qputback(struct queue *q, struct block *b)
1403 {
1404         b->next = q->bfirst;
1405         if (q->bfirst == NULL)
1406                 q->blast = b;
1407         q->bfirst = b;
1408         q->dlen += BLEN(b);
1409         /* qputback seems to undo a read, so we can undo the accounting too. */
1410         q->bytes_read -= BLEN(b);
1411 }
1412
1413 /*
1414  *  get next block from a queue (up to a limit)
1415  *
1416  */
1417 struct block *qbread(struct queue *q, size_t len)
1418 {
1419         return __qbread(q, len, QIO_JUST_ONE_BLOCK | QIO_CAN_ERR_SLEEP,
1420                         MEM_WAIT);
1421 }
1422
1423 struct block *qbread_nonblock(struct queue *q, size_t len)
1424 {
1425         return __qbread(q, len, QIO_JUST_ONE_BLOCK | QIO_CAN_ERR_SLEEP |
1426                         QIO_NON_BLOCK, MEM_WAIT);
1427 }
1428
1429 /* read up to len from a queue into vp. */
1430 size_t qread(struct queue *q, void *va, size_t len)
1431 {
1432         struct block *blist = __qbread(q, len, QIO_CAN_ERR_SLEEP, MEM_WAIT);
1433
1434         if (!blist)
1435                 return 0;
1436         return read_all_blocks(blist, va, len);
1437 }
1438
1439 size_t qread_nonblock(struct queue *q, void *va, size_t len)
1440 {
1441         struct block *blist = __qbread(q, len, QIO_CAN_ERR_SLEEP |
1442                                        QIO_NON_BLOCK, MEM_WAIT);
1443
1444         if (!blist)
1445                 return 0;
1446         return read_all_blocks(blist, va, len);
1447 }
1448
1449 /* This is the rendez wake condition for writers. */
1450 static int qwriter_should_wake(void *a)
1451 {
1452         struct queue *q = a;
1453
1454         return qwritable(q) || (q->state & Qclosed);
1455 }
1456
1457 /* Helper: enqueues a list of blocks to a queue.  Returns the total length. */
1458 static size_t enqueue_blist(struct queue *q, struct block *b)
1459 {
1460         size_t dlen;
1461
1462         if (q->bfirst)
1463                 q->blast->next = b;
1464         else
1465                 q->bfirst = b;
1466         dlen = BLEN(b);
1467         while (b->next) {
1468                 b = b->next;
1469                 dlen += BLEN(b);
1470         }
1471         q->blast = b;
1472         q->dlen += dlen;
1473         return dlen;
1474 }
1475
1476 /* Adds block (which can be a list of blocks) to the queue, subject to
1477  * qio_flags.  Returns the length written on success or -1 on non-throwable
1478  * error.  Adjust qio_flags to control the value-added features!. */
1479 static ssize_t __qbwrite(struct queue *q, struct block *b, int qio_flags)
1480 {
1481         ssize_t ret;
1482         bool was_unreadable;
1483
1484         if (q->bypass) {
1485                 ret = blocklen(b);
1486                 (*q->bypass) (q->arg, b);
1487                 return ret;
1488         }
1489         spin_lock_irqsave(&q->lock);
1490         was_unreadable = q->dlen == 0;
1491         if (q->state & Qclosed) {
1492                 spin_unlock_irqsave(&q->lock);
1493                 freeblist(b);
1494                 if (!(qio_flags & QIO_CAN_ERR_SLEEP))
1495                         return -1;
1496                 if (q->err[0])
1497                         error(EPIPE, q->err);
1498                 else
1499                         error(EPIPE, "connection closed");
1500         }
1501         if ((qio_flags & QIO_LIMIT) && (q->dlen >= q->limit)) {
1502                 /* drop overflow takes priority over regular non-blocking */
1503                 if ((qio_flags & QIO_DROP_OVERFLOW)
1504                     || (q->state & Qdropoverflow)) {
1505                         spin_unlock_irqsave(&q->lock);
1506                         freeb(b);
1507                         return -1;
1508                 }
1509                 /* People shouldn't set NON_BLOCK without CAN_ERR, but we can be
1510                  * nice and catch it. */
1511                 if ((qio_flags & QIO_CAN_ERR_SLEEP)
1512                     && (qio_flags & QIO_NON_BLOCK)) {
1513                         spin_unlock_irqsave(&q->lock);
1514                         freeb(b);
1515                         error(EAGAIN, "queue full");
1516                 }
1517         }
1518         ret = enqueue_blist(q, b);
1519         QDEBUG checkb(b, "__qbwrite");
1520         spin_unlock_irqsave(&q->lock);
1521         /* TODO: not sure if the usage of a kick is mutually exclusive with a
1522          * wakeup, meaning that actual users either want a kick or have
1523          * qreaders. */
1524         if (q->kick && (was_unreadable || (q->state & Qkick)))
1525                 q->kick(q->arg);
1526         if (was_unreadable) {
1527                 /* Unlike the read side, there's no double-check to make sure
1528                  * the queue transitioned across an edge.  We know we added
1529                  * something, so that's enough.  We wake if the queue was empty.
1530                  * Both sides are the same, in that the condition for which we
1531                  * do the rendez_wakeup() is the same as the condition done for
1532                  * the rendez_sleep(). */
1533                 rendez_wakeup(&q->rr);
1534                 qwake_cb(q, FDTAP_FILT_READABLE);
1535         }
1536         /*
1537          *  flow control, wait for queue to get below the limit
1538          *  before allowing the process to continue and queue
1539          *  more.  We do this here so that postnote can only
1540          *  interrupt us after the data has been queued.  This
1541          *  means that things like 9p flushes and ssl messages
1542          *  will not be disrupted by software interrupts.
1543          *
1544          *  Note - this is moderately dangerous since a process
1545          *  that keeps getting interrupted and rewriting will
1546          *  queue infinite crud.
1547          */
1548         if ((qio_flags & QIO_CAN_ERR_SLEEP) &&
1549             !(q->state & Qdropoverflow) && !(qio_flags & QIO_NON_BLOCK)) {
1550                 /* This is a racy peek at the q status.  If we accidentally
1551                  * block, our rendez will return.  The rendez's peak
1552                  * (qwriter_should_wake) is also racy w.r.t.  the q's spinlock
1553                  * (that lock protects writes, but not reads).
1554                  *
1555                  * Here's the deal: when holding the rendez lock, if we see the
1556                  * sleep condition, the consumer will wake us.  The condition
1557                  * will only ever be changed by the next qbread() (consumer,
1558                  * changes q->dlen).  That code will do a rendez wake, which
1559                  * will spin on the rendez lock, meaning it won't procede until
1560                  * we either see the new state (and return) or put ourselves on
1561                  * the rendez, and wake up.
1562                  *
1563                  * The pattern is one side writes mem, then signals.  Our side
1564                  * checks the signal, then reads the mem.  The goal is to not
1565                  * miss seeing the signal AND missing the memory write.  In this
1566                  * specific case, the signal is actually synchronous (the rendez
1567                  * lock) and not basic shared memory.
1568                  *
1569                  * Oh, and we spin in case we woke early and someone else filled
1570                  * the queue, mesa-style. */
1571                 while (!qwriter_should_wake(q))
1572                         rendez_sleep(&q->wr, qwriter_should_wake, q);
1573         }
1574         return ret;
1575 }
1576
1577 /*
1578  *  add a block to a queue obeying flow control
1579  */
1580 ssize_t qbwrite(struct queue *q, struct block *b)
1581 {
1582         return __qbwrite(q, b, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
1583 }
1584
1585 ssize_t qbwrite_nonblock(struct queue *q, struct block *b)
1586 {
1587         return __qbwrite(q, b, QIO_CAN_ERR_SLEEP | QIO_LIMIT | QIO_NON_BLOCK);
1588 }
1589
1590 ssize_t qibwrite(struct queue *q, struct block *b)
1591 {
1592         return __qbwrite(q, b, 0);
1593 }
1594
1595 /* Helper, allocs a block and copies [from, from + len) into it.  Returns the
1596  * block on success, 0 on failure. */
1597 static struct block *build_block(void *from, size_t len, int mem_flags)
1598 {
1599         struct block *b;
1600         void *ext_buf;
1601
1602         /* If len is small, we don't need to bother with the extra_data.  But
1603          * until the whole stack can handle extd blocks, we'll use them
1604          * unconditionally.  */
1605
1606         /* allocb builds in 128 bytes of header space to all blocks, but this is
1607          * only available via padblock (to the left).  we also need some space
1608          * for pullupblock for some basic headers (like icmp) that get written
1609          * in directly */
1610         b = block_alloc(64, mem_flags);
1611         if (!b)
1612                 return 0;
1613         ext_buf = kmalloc(len, mem_flags);
1614         if (!ext_buf) {
1615                 kfree(b);
1616                 return 0;
1617         }
1618         memcpy(ext_buf, from, len);
1619         if (block_add_extd(b, 1, mem_flags)) {
1620                 kfree(ext_buf);
1621                 kfree(b);
1622                 return 0;
1623         }
1624         b->extra_data[0].base = (uintptr_t)ext_buf;
1625         b->extra_data[0].off = 0;
1626         b->extra_data[0].len = len;
1627         b->extra_len += len;
1628         return b;
1629 }
1630
1631 static ssize_t __qwrite(struct queue *q, void *vp, size_t len, int mem_flags,
1632                         int qio_flags)
1633 {
1634         ERRSTACK(1);
1635         size_t n;
1636         volatile size_t sofar = 0;      /* volatile for the waserror */
1637         struct block *b;
1638         uint8_t *p = vp;
1639         void *ext_buf;
1640
1641         /* Only some callers can throw.  Others might be in a context where
1642          * waserror isn't safe. */
1643         if ((qio_flags & QIO_CAN_ERR_SLEEP) && waserror()) {
1644                 /* Any error (EAGAIN for nonblock, syscall aborted, even EPIPE)
1645                  * after some data has been sent should be treated as a partial
1646                  * write. */
1647                 if (sofar)
1648                         goto out_ok;
1649                 nexterror();
1650         }
1651         do {
1652                 n = len - sofar;
1653                 /* This is 64K, the max amount per single block.  Still a good
1654                  * value? */
1655                 if (n > Maxatomic)
1656                         n = Maxatomic;
1657                 b = build_block(p + sofar, n, mem_flags);
1658                 if (!b)
1659                         break;
1660                 if (__qbwrite(q, b, qio_flags) < 0)
1661                         break;
1662                 sofar += n;
1663         } while ((sofar < len) && (q->state & Qmsg) == 0);
1664 out_ok:
1665         if (qio_flags & QIO_CAN_ERR_SLEEP)
1666                 poperror();
1667         return sofar;
1668 }
1669
1670 ssize_t qwrite(struct queue *q, void *vp, int len)
1671 {
1672         return __qwrite(q, vp, len, MEM_WAIT, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
1673 }
1674
1675 ssize_t qwrite_nonblock(struct queue *q, void *vp, int len)
1676 {
1677         return __qwrite(q, vp, len, MEM_WAIT, QIO_CAN_ERR_SLEEP | QIO_LIMIT |
1678                                               QIO_NON_BLOCK);
1679 }
1680
1681 ssize_t qiwrite(struct queue *q, void *vp, int len)
1682 {
1683         return __qwrite(q, vp, len, MEM_ATOMIC, 0);
1684 }
1685
1686 /*
1687  *  be extremely careful when calling this,
1688  *  as there is no reference accounting
1689  */
1690 void qfree(struct queue *q)
1691 {
1692         qclose(q);
1693         kfree(q);
1694 }
1695
1696 /*
1697  *  Mark a queue as closed.  No further IO is permitted.
1698  *  All blocks are released.
1699  */
1700 void qclose(struct queue *q)
1701 {
1702         struct block *bfirst;
1703
1704         if (q == NULL)
1705                 return;
1706
1707         /* mark it */
1708         spin_lock_irqsave(&q->lock);
1709         q->state |= Qclosed;
1710         q->state &= ~Qdropoverflow;
1711         q->err[0] = 0;
1712         bfirst = q->bfirst;
1713         q->bfirst = 0;
1714         q->dlen = 0;
1715         spin_unlock_irqsave(&q->lock);
1716
1717         /* free queued blocks */
1718         freeblist(bfirst);
1719
1720         /* wake up readers/writers */
1721         rendez_wakeup(&q->rr);
1722         rendez_wakeup(&q->wr);
1723         qwake_cb(q, FDTAP_FILT_HANGUP);
1724 }
1725
1726 /* Mark a queue as closed.  Wakeup any readers.  Don't remove queued blocks.
1727  *
1728  * msg will be the errstr received by any waiters (qread, qbread, etc).  If
1729  * there is no message, which is what also happens during a natural qclose(),
1730  * those waiters will simply return 0.  qwriters will always error() on a
1731  * closed/hungup queue. */
1732 void qhangup(struct queue *q, char *msg)
1733 {
1734         /* mark it */
1735         spin_lock_irqsave(&q->lock);
1736         q->state |= Qclosed;
1737         if (msg == 0 || *msg == 0)
1738                 q->err[0] = 0;
1739         else
1740                 strlcpy(q->err, msg, ERRMAX);
1741         spin_unlock_irqsave(&q->lock);
1742
1743         /* wake up readers/writers */
1744         rendez_wakeup(&q->rr);
1745         rendez_wakeup(&q->wr);
1746         qwake_cb(q, FDTAP_FILT_HANGUP);
1747 }
1748
1749 /*
1750  *  return non-zero if the q is hungup
1751  */
1752 int qisclosed(struct queue *q)
1753 {
1754         return q->state & Qclosed;
1755 }
1756
1757 /*
1758  *  mark a queue as no longer hung up.  resets the wake_cb.
1759  */
1760 void qreopen(struct queue *q)
1761 {
1762         spin_lock_irqsave(&q->lock);
1763         q->state &= ~Qclosed;
1764         q->eof = 0;
1765         q->limit = q->inilim;
1766         q->wake_cb = 0;
1767         q->wake_data = 0;
1768         spin_unlock_irqsave(&q->lock);
1769 }
1770
1771 /*
1772  *  return bytes queued
1773  */
1774 int qlen(struct queue *q)
1775 {
1776         return q->dlen;
1777 }
1778
1779 size_t q_bytes_read(struct queue *q)
1780 {
1781         return q->bytes_read;
1782 }
1783
1784 /*
1785  * return space remaining before flow control
1786  *
1787  *  This used to be
1788  *  q->len < q->limit/2
1789  *  but it slows down tcp too much for certain write sizes.
1790  *  I really don't understand it completely.  It may be
1791  *  due to the queue draining so fast that the transmission
1792  *  stalls waiting for the app to produce more data.  - presotto
1793  *
1794  *  q->len was the amount of bytes, which is no longer used.  we now use
1795  *  q->dlen, the amount of usable data.  a.k.a. qlen()...  - brho
1796  */
1797 int qwindow(struct queue *q)
1798 {
1799         int l;
1800
1801         l = q->limit - q->dlen;
1802         if (l < 0)
1803                 l = 0;
1804         return l;
1805 }
1806
1807 /*
1808  *  return true if we can read without blocking
1809  */
1810 int qcanread(struct queue *q)
1811 {
1812         return q->bfirst != 0;
1813 }
1814
1815 /*
1816  *  change queue limit
1817  */
1818 void qsetlimit(struct queue *q, size_t limit)
1819 {
1820         bool was_writable = qwritable(q);
1821
1822         q->limit = limit;
1823         if (!was_writable && qwritable(q)) {
1824                 rendez_wakeup(&q->wr);
1825                 qwake_cb(q, FDTAP_FILT_WRITABLE);
1826         }
1827 }
1828
1829 size_t qgetlimit(struct queue *q)
1830 {
1831         return q->limit;
1832 }
1833
1834 /*
1835  *  set whether writes drop overflowing blocks, or if we sleep
1836  */
1837 void qdropoverflow(struct queue *q, bool onoff)
1838 {
1839         spin_lock_irqsave(&q->lock);
1840         if (onoff)
1841                 q->state |= Qdropoverflow;
1842         else
1843                 q->state &= ~Qdropoverflow;
1844         spin_unlock_irqsave(&q->lock);
1845 }
1846
1847 /* Be careful: this can affect concurrent reads/writes and code that might have
1848  * built-in expectations of the q's type. */
1849 void q_toggle_qmsg(struct queue *q, bool onoff)
1850 {
1851         spin_lock_irqsave(&q->lock);
1852         if (onoff)
1853                 q->state |= Qmsg;
1854         else
1855                 q->state &= ~Qmsg;
1856         spin_unlock_irqsave(&q->lock);
1857 }
1858
1859 /* Be careful: this can affect concurrent reads/writes and code that might have
1860  * built-in expectations of the q's type. */
1861 void q_toggle_qcoalesce(struct queue *q, bool onoff)
1862 {
1863         spin_lock_irqsave(&q->lock);
1864         if (onoff)
1865                 q->state |= Qcoalesce;
1866         else
1867                 q->state &= ~Qcoalesce;
1868         spin_unlock_irqsave(&q->lock);
1869 }
1870
1871 /*
1872  *  flush the output queue
1873  */
1874 void qflush(struct queue *q)
1875 {
1876         struct block *bfirst;
1877
1878         /* mark it */
1879         spin_lock_irqsave(&q->lock);
1880         bfirst = q->bfirst;
1881         q->bfirst = 0;
1882         q->dlen = 0;
1883         spin_unlock_irqsave(&q->lock);
1884
1885         /* free queued blocks */
1886         freeblist(bfirst);
1887
1888         /* wake up writers */
1889         rendez_wakeup(&q->wr);
1890         qwake_cb(q, FDTAP_FILT_WRITABLE);
1891 }
1892
1893 int qfull(struct queue *q)
1894 {
1895         return !qwritable(q);
1896 }
1897
1898 int qstate(struct queue *q)
1899 {
1900         return q->state;
1901 }
1902
1903 void qdump(struct queue *q)
1904 {
1905         if (q)
1906                 printk("q=%p bfirst=%p blast=%p dlen=%d limit=%d state=#%x\n",
1907                            q, q->bfirst, q->blast, q->dlen, q->limit, q->state);
1908 }
1909
1910 /* On certain wakeup events, qio will call func(q, data, filter), where filter
1911  * marks the type of wakeup event (flags from FDTAP).
1912  *
1913  * There's no sync protection.  If you change the CB while the qio is running,
1914  * you might get a CB with the data or func from a previous set_wake_cb.  You
1915  * should set this once per queue and forget it.
1916  *
1917  * You can remove the CB by passing in 0 for the func.  Alternatively, you can
1918  * just make sure that the func(data) pair are valid until the queue is freed or
1919  * reopened. */
1920 void qio_set_wake_cb(struct queue *q, qio_wake_cb_t func, void *data)
1921 {
1922         q->wake_data = data;
1923         wmb();  /* if we see func, we'll also see the data for it */
1924         q->wake_cb = func;
1925 }
1926
1927 /* Helper for detecting whether we'll block on a read at this instant. */
1928 bool qreadable(struct queue *q)
1929 {
1930         return qlen(q) > 0;
1931 }
1932
1933 /* Helper for detecting whether we'll block on a write at this instant. */
1934 bool qwritable(struct queue *q)
1935 {
1936         return !q->limit || qwindow(q) > 0;
1937 }