WIP-pop-3000
[akaros.git] / kern / src / ns / qio.c
1 /* Copyright © 1994-1999 Lucent Technologies Inc.  All rights reserved.
2  * Portions Copyright © 1997-1999 Vita Nuova Limited
3  * Portions Copyright © 2000-2007 Vita Nuova Holdings Limited
4  *                                (www.vitanuova.com)
5  * Revisions Copyright © 2000-2007 Lucent Technologies Inc. and others
6  *
7  * Modified for the Akaros operating system:
8  * Copyright (c) 2013-2014 The Regents of the University of California
9  * Copyright (c) 2013-2015 Google Inc.
10  *
11  * Permission is hereby granted, free of charge, to any person obtaining a copy
12  * of this software and associated documentation files (the "Software"), to deal
13  * in the Software without restriction, including without limitation the rights
14  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15  * copies of the Software, and to permit persons to whom the Software is
16  * furnished to do so, subject to the following conditions:
17  *
18  * The above copyright notice and this permission notice shall be included in
19  * all copies or substantial portions of the Software.
20  *
21  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL THE
24  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
27  * SOFTWARE. */
28
29 #include <vfs.h>
30 #include <kfs.h>
31 #include <slab.h>
32 #include <kmalloc.h>
33 #include <kref.h>
34 #include <string.h>
35 #include <stdio.h>
36 #include <assert.h>
37 #include <error.h>
38 #include <cpio.h>
39 #include <pmap.h>
40 #include <smp.h>
41 #include <net/ip.h>
42
43 #define PANIC_EXTRA(b)                                                  \
44 {                                                                       \
45         if ((b)->extra_len) {                                           \
46                 printblock(b);                                          \
47                 backtrace();                                            \
48                 panic("%s doesn't handle extra_data", __FUNCTION__);    \
49         }                                                               \
50 }
51
52 static uint32_t padblockcnt;
53 static uint32_t concatblockcnt;
54 static uint32_t pullupblockcnt;
55 static uint32_t copyblockcnt;
56 static uint32_t consumecnt;
57 static uint32_t producecnt;
58 static uint32_t qcopycnt;
59
60 static int debugging;
61
62 #define QDEBUG  if(0)
63
64 /*
65  *  IO queues
66  */
67
68 struct queue {
69         spinlock_t lock;;
70
71         struct block *bfirst;           /* buffer */
72         struct block *blast;
73
74         int dlen;                                       /* data bytes in queue */
75         int limit;                                      /* max bytes in queue */
76         int inilim;                             /* initial limit */
77         int state;
78         int eof;                                        /* number of eofs read by user */
79         size_t bytes_read;
80
81         void (*kick) (void *);          /* restart output */
82         void (*bypass) (void *, struct block *);        /* bypass queue altogether */
83         void *arg;                                      /* argument to kick */
84
85         struct rendez rr;                       /* process waiting to read */
86         struct rendez wr;                       /* process waiting to write */
87         qio_wake_cb_t wake_cb;          /* callbacks for qio wakeups */
88         void *wake_data;
89
90         char err[ERRMAX];
91 };
92
93 enum {
94         Maxatomic = 64 * 1024,
95         QIO_CAN_ERR_SLEEP = (1 << 0),   /* can throw errors or block/sleep */
96         QIO_LIMIT = (1 << 1),                   /* respect q->limit */
97         QIO_DROP_OVERFLOW = (1 << 2),   /* alternative to setting qdropoverflow */
98         QIO_JUST_ONE_BLOCK = (1 << 3),  /* when qbreading, just get one block */
99         QIO_NON_BLOCK = (1 << 4),               /* throw EAGAIN instead of blocking */
100         QIO_DONT_KICK = (1 << 5),               /* don't kick when waking */
101 };
102
103 unsigned int qiomaxatomic = Maxatomic;
104
105 static size_t copy_to_block_body(struct block *to, void *from, size_t copy_amt);
106 static ssize_t __qbwrite(struct queue *q, struct block *b, int flags);
107 static struct block *__qbread(struct queue *q, size_t len, int qio_flags,
108                               int mem_flags);
109 static bool qwait_and_ilock(struct queue *q, int qio_flags);
110
111 /* Helper: fires a wake callback, sending 'filter' */
112 static void qwake_cb(struct queue *q, int filter)
113 {
114         if (q->wake_cb)
115                 q->wake_cb(q, q->wake_data, filter);
116 }
117
118 void ixsummary(void)
119 {
120         debugging ^= 1;
121         printd("pad %lu, concat %lu, pullup %lu, copy %lu\n",
122                    padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
123         printd("consume %lu, produce %lu, qcopy %lu\n",
124                    consumecnt, producecnt, qcopycnt);
125 }
126
127 /*
128  *  pad a block to the front (or the back if size is negative)
129  */
130 struct block *padblock(struct block *bp, int size)
131 {
132         int n;
133         struct block *nbp;
134
135         QDEBUG checkb(bp, "padblock 1");
136         if (size >= 0) {
137                 if (bp->rp - bp->base >= size) {
138                         bp->network_offset += size;
139                         bp->transport_offset += size;
140                         bp->rp -= size;
141                         return bp;
142                 }
143
144                 PANIC_EXTRA(bp);
145                 if (bp->next)
146                         panic("padblock %p", getcallerpc(&bp));
147                 n = BLEN(bp);
148                 padblockcnt++;
149                 nbp = block_alloc(size + n, MEM_WAIT);
150                 block_copy_metadata(nbp, bp);
151                 nbp->rp += size;
152                 nbp->wp = nbp->rp;
153                 memmove(nbp->wp, bp->rp, n);
154                 nbp->wp += n;
155                 freeb(bp);
156                 nbp->rp -= size;
157         } else {
158                 size = -size;
159
160                 PANIC_EXTRA(bp);
161
162                 if (bp->next)
163                         panic("padblock %p", getcallerpc(&bp));
164
165                 if (bp->lim - bp->wp >= size)
166                         return bp;
167
168                 n = BLEN(bp);
169                 padblockcnt++;
170                 nbp = block_alloc(size + n, MEM_WAIT);
171                 block_copy_metadata(nbp, bp);
172                 memmove(nbp->wp, bp->rp, n);
173                 nbp->wp += n;
174                 freeb(bp);
175         }
176         QDEBUG checkb(nbp, "padblock 1");
177         return nbp;
178 }
179
180 /*
181  *  return count of bytes in a string of blocks
182  */
183 int blocklen(struct block *bp)
184 {
185         int len;
186
187         len = 0;
188         while (bp) {
189                 len += BLEN(bp);
190                 bp = bp->next;
191         }
192         return len;
193 }
194
195 /*
196  * return count of space in blocks
197  */
198 int blockalloclen(struct block *bp)
199 {
200         int len;
201
202         len = 0;
203         while (bp) {
204                 len += BALLOC(bp);
205                 bp = bp->next;
206         }
207         return len;
208 }
209
210 /*
211  *  copy the  string of blocks into
212  *  a single block and free the string
213  */
214 struct block *concatblock(struct block *bp)
215 {
216         int len;
217         struct block *nb, *f;
218
219         if (bp->next == 0)
220                 return bp;
221
222         /* probably use parts of qclone */
223         PANIC_EXTRA(bp);
224         nb = block_alloc(blocklen(bp), MEM_WAIT);
225         for (f = bp; f; f = f->next) {
226                 len = BLEN(f);
227                 memmove(nb->wp, f->rp, len);
228                 nb->wp += len;
229         }
230         concatblockcnt += BLEN(nb);
231         freeblist(bp);
232         QDEBUG checkb(nb, "concatblock 1");
233         return nb;
234 }
235
236 /* Makes an identical copy of the block, collapsing all the data into the block
237  * body.  It does not point to the contents of the original, it is a copy
238  * (unlike qclone).  Since we're copying, we might as well put the memory into
239  * one contiguous chunk. */
240 struct block *copyblock(struct block *bp, int mem_flags)
241 {
242         struct block *newb;
243         struct extra_bdata *ebd;
244         size_t amt;
245
246         QDEBUG checkb(bp, "copyblock 0");
247         newb = block_alloc(BLEN(bp), mem_flags);
248         if (!newb)
249                 return 0;
250         amt = copy_to_block_body(newb, bp->rp, BHLEN(bp));
251         assert(amt == BHLEN(bp));
252         for (int i = 0; i < bp->nr_extra_bufs; i++) {
253                 ebd = &bp->extra_data[i];
254                 if (!ebd->base || !ebd->len)
255                         continue;
256                 amt = copy_to_block_body(newb, (void*)ebd->base + ebd->off, ebd->len);
257                 assert(amt == ebd->len);
258         }
259         block_copy_metadata(newb, bp);
260         copyblockcnt++;
261         QDEBUG checkb(newb, "copyblock 1");
262         return newb;
263 }
264
265 /* Returns a block with the remaining contents of b all in the main body of the
266  * returned block.  Replace old references to b with the returned value (which
267  * may still be 'b', if no change was needed. */
268 struct block *linearizeblock(struct block *b)
269 {
270         struct block *newb;
271
272         if (!b->extra_len)
273                 return b;
274         newb = copyblock(b, MEM_WAIT);
275         freeb(b);
276         return newb;
277 }
278
279 /* Make sure the first block has at least n bytes in its main body.  Pulls up
280  * data from the *list* of blocks.  Returns 0 if there is not enough data in the
281  * block list. */
282 struct block *pullupblock(struct block *bp, int n)
283 {
284         int i, len, seglen;
285         struct block *nbp;
286         struct extra_bdata *ebd;
287
288         /*
289          *  this should almost always be true, it's
290          *  just to avoid every caller checking.
291          */
292         if (BHLEN(bp) >= n)
293                 return bp;
294
295         /* If there's no chance, just bail out now.  This might be slightly wasteful
296          * if there's a long blist that does have enough data. */
297         if (n > blocklen(bp))
298                 return 0;
299         /* a start at explicit main-body / header management */
300         if (bp->extra_len) {
301                 if (n > bp->lim - bp->rp) {
302                         /* would need to realloc a new block and copy everything over. */
303                         panic("can't pullup %d bytes, no place to put it: bp->lim %p, bp->rp %p, bp->lim-bp->rp %d\n",
304                                         n, bp->lim, bp->rp, bp->lim-bp->rp);
305                 }
306                 len = n - BHLEN(bp);
307                 /* Would need to recursively call this, or otherwise pull from later
308                  * blocks and put chunks of their data into the block we're building. */
309                 if (len > bp->extra_len)
310                         panic("pullup more than extra (%d, %d, %d)\n",
311                               n, BHLEN(bp), bp->extra_len);
312                 QDEBUG checkb(bp, "before pullup");
313                 for (int i = 0; (i < bp->nr_extra_bufs) && len; i++) {
314                         ebd = &bp->extra_data[i];
315                         if (!ebd->base || !ebd->len)
316                                 continue;
317                         seglen = MIN(ebd->len, len);
318                         memcpy(bp->wp, (void*)(ebd->base + ebd->off), seglen);
319                         bp->wp += seglen;
320                         len -= seglen;
321                         ebd->len -= seglen;
322                         ebd->off += seglen;
323                         bp->extra_len -= seglen;
324                         if (ebd->len == 0) {
325                                 kfree((void *)ebd->base);
326                                 ebd->off = 0;
327                                 ebd->base = 0;
328                         }
329                 }
330                 /* maybe just call pullupblock recursively here */
331                 if (len)
332                         panic("pullup %d bytes overdrawn\n", len);
333                 QDEBUG checkb(bp, "after pullup");
334                 return bp;
335         }
336
337         /*
338          *  if not enough room in the first block,
339          *  add another to the front of the list.
340          */
341         if (bp->lim - bp->rp < n) {
342                 nbp = block_alloc(n, MEM_WAIT);
343                 nbp->next = bp;
344                 bp = nbp;
345         }
346
347         /*
348          *  copy bytes from the trailing blocks into the first
349          */
350         n -= BLEN(bp);
351         while ((nbp = bp->next)) {
352                 i = BLEN(nbp);
353                 if (i > n) {
354                         memmove(bp->wp, nbp->rp, n);
355                         pullupblockcnt++;
356                         bp->wp += n;
357                         nbp->rp += n;
358                         QDEBUG checkb(bp, "pullupblock 1");
359                         return bp;
360                 } else {
361                         memmove(bp->wp, nbp->rp, i);
362                         pullupblockcnt++;
363                         bp->wp += i;
364                         bp->next = nbp->next;
365                         nbp->next = 0;
366                         freeb(nbp);
367                         n -= i;
368                         if (n == 0) {
369                                 QDEBUG checkb(bp, "pullupblock 2");
370                                 return bp;
371                         }
372                 }
373         }
374         freeb(bp);
375         return 0;
376 }
377
378 /*
379  *  make sure the first block has at least n bytes in its main body
380  */
381 struct block *pullupqueue(struct queue *q, int n)
382 {
383         struct block *b;
384
385         /* TODO: lock to protect the queue links? */
386         if ((BHLEN(q->bfirst) >= n))
387                 return q->bfirst;
388         q->bfirst = pullupblock(q->bfirst, n);
389         for (b = q->bfirst; b != NULL && b->next != NULL; b = b->next) ;
390         q->blast = b;
391         return q->bfirst;
392 }
393
394 /* throw away count bytes from the front of
395  * block's extradata.  Returns count of bytes
396  * thrown away
397  */
398
399 static int pullext(struct block *bp, int count)
400 {
401         struct extra_bdata *ed;
402         int i, rem, bytes = 0;
403
404         for (i = 0; bp->extra_len && count && i < bp->nr_extra_bufs; i++) {
405                 ed = &bp->extra_data[i];
406                 rem = MIN(count, ed->len);
407                 bp->extra_len -= rem;
408                 count -= rem;
409                 bytes += rem;
410                 ed->off += rem;
411                 ed->len -= rem;
412                 if (ed->len == 0) {
413                         kfree((void *)ed->base);
414                         ed->base = 0;
415                         ed->off = 0;
416                 }
417         }
418         return bytes;
419 }
420
421 /* throw away count bytes from the end of a
422  * block's extradata.  Returns count of bytes
423  * thrown away
424  */
425
426 static int dropext(struct block *bp, int count)
427 {
428         struct extra_bdata *ed;
429         int i, rem, bytes = 0;
430
431         for (i = bp->nr_extra_bufs - 1; bp->extra_len && count && i >= 0; i--) {
432                 ed = &bp->extra_data[i];
433                 rem = MIN(count, ed->len);
434                 bp->extra_len -= rem;
435                 count -= rem;
436                 bytes += rem;
437                 ed->len -= rem;
438                 if (ed->len == 0) {
439                         kfree((void *)ed->base);
440                         ed->base = 0;
441                         ed->off = 0;
442                 }
443         }
444         return bytes;
445 }
446
447 /*
448  *  throw away up to count bytes from a
449  *  list of blocks.  Return count of bytes
450  *  thrown away.
451  */
452 static int _pullblock(struct block **bph, int count, int free)
453 {
454         struct block *bp;
455         int n, bytes;
456
457         bytes = 0;
458         if (bph == NULL)
459                 return 0;
460
461         while (*bph != NULL && count != 0) {
462                 bp = *bph;
463
464                 n = MIN(BHLEN(bp), count);
465                 bytes += n;
466                 count -= n;
467                 bp->rp += n;
468                 n = pullext(bp, count);
469                 bytes += n;
470                 count -= n;
471                 QDEBUG checkb(bp, "pullblock ");
472                 if (BLEN(bp) == 0 && (free || count)) {
473                         *bph = bp->next;
474                         bp->next = NULL;
475                         freeb(bp);
476                 }
477         }
478         return bytes;
479 }
480
481 int pullblock(struct block **bph, int count)
482 {
483         return _pullblock(bph, count, 1);
484 }
485
486 /*
487  *  trim to len bytes starting at offset
488  */
489 struct block *trimblock(struct block *bp, int offset, int len)
490 {
491         uint32_t l, trim;
492         int olen = len;
493
494         QDEBUG checkb(bp, "trimblock 1");
495         if (blocklen(bp) < offset + len) {
496                 freeblist(bp);
497                 return NULL;
498         }
499
500         l =_pullblock(&bp, offset, 0);
501         if (bp == NULL)
502                 return NULL;
503         if (l != offset) {
504                 freeblist(bp);
505                 return NULL;
506         }
507
508         while ((l = BLEN(bp)) < len) {
509                 len -= l;
510                 bp = bp->next;
511         }
512
513         trim = BLEN(bp) - len;
514         trim -= dropext(bp, trim);
515         bp->wp -= trim;
516
517         if (bp->next) {
518                 freeblist(bp->next);
519                 bp->next = NULL;
520         }
521         return bp;
522 }
523
524 /* Adjust block @bp so that its size is exactly @len.
525  * If the size is increased, fill in the new contents with zeros.
526  * If the size is decreased, discard some of the old contents at the tail. */
527 struct block *adjustblock(struct block *bp, int len)
528 {
529         struct extra_bdata *ebd;
530         void *buf;
531         int i;
532
533         if (len < 0) {
534                 freeb(bp);
535                 return NULL;
536         }
537
538         if (len == BLEN(bp))
539                 return bp;
540
541         /* Shrink within block main body. */
542         if (len <= BHLEN(bp)) {
543                 free_block_extra(bp);
544                 bp->wp = bp->rp + len;
545                 QDEBUG checkb(bp, "adjustblock 1");
546                 return bp;
547         }
548
549         /* Need to grow. */
550         if (len > BLEN(bp)) {
551                 /* Grow within block main body. */
552                 if (bp->extra_len == 0 && bp->rp + len <= bp->lim) {
553                         memset(bp->wp, 0, len - BLEN(bp));
554                         bp->wp = bp->rp + len;
555                         QDEBUG checkb(bp, "adjustblock 2");
556                         return bp;
557                 }
558                 /* Grow with extra data buffers. */
559                 buf = kzmalloc(len - BLEN(bp), MEM_WAIT);
560                 block_append_extra(bp, (uintptr_t)buf, 0, len - BLEN(bp), MEM_WAIT);
561                 QDEBUG checkb(bp, "adjustblock 3");
562                 return bp;
563         }
564
565         /* Shrink extra data buffers.
566          * len is how much of ebd we need to keep.
567          * extra_len is re-accumulated. */
568         assert(bp->extra_len > 0);
569         len -= BHLEN(bp);
570         bp->extra_len = 0;
571         for (i = 0; i < bp->nr_extra_bufs; i++) {
572                 ebd = &bp->extra_data[i];
573                 if (len <= ebd->len)
574                         break;
575                 len -= ebd->len;
576                 bp->extra_len += ebd->len;
577         }
578         /* If len becomes zero, extra_data[i] should be freed. */
579         if (len > 0) {
580                 ebd = &bp->extra_data[i];
581                 ebd->len = len;
582                 bp->extra_len += ebd->len;
583                 i++;
584         }
585         for (; i < bp->nr_extra_bufs; i++) {
586                 ebd = &bp->extra_data[i];
587                 if (ebd->base)
588                         kfree((void*)ebd->base);
589                 ebd->base = ebd->off = ebd->len = 0;
590         }
591         QDEBUG checkb(bp, "adjustblock 4");
592         return bp;
593 }
594
595 /* Helper: removes and returns the first block from q */
596 static struct block *pop_first_block(struct queue *q)
597 {
598         struct block *b = q->bfirst;
599
600         q->dlen -= BLEN(b);
601         q->bytes_read += BLEN(b);
602         q->bfirst = b->next;
603         b->next = 0;
604         return b;
605 }
606
607 /* Helper: copies up to copy_amt from a buf to a block's main body (b->wp) */
608 static size_t copy_to_block_body(struct block *to, void *from, size_t copy_amt)
609 {
610         copy_amt = MIN(to->lim - to->wp, copy_amt);
611         memcpy(to->wp, from, copy_amt);
612         to->wp += copy_amt;
613         return copy_amt;
614 }
615
616 /* Accounting helper.  Block b in q lost amt extra_data */
617 static void block_and_q_lost_extra(struct block *b, struct queue *q, size_t amt)
618 {
619         b->extra_len -= amt;
620         q->dlen -= amt;
621         q->bytes_read += amt;
622 }
623
624 /* Helper: moves ebd from a block (in from_q) to another block.  The *ebd is
625  * fixed in 'from', so we move its contents and zero it out in 'from'.
626  *
627  * Returns the length moved (0 on failure). */
628 static size_t move_ebd(struct extra_bdata *ebd, struct block *to,
629                        struct block *from, struct queue *from_q)
630 {
631         size_t ret = ebd->len;
632
633         if (block_append_extra(to, ebd->base, ebd->off, ebd->len, MEM_ATOMIC))
634                 return 0;
635         block_and_q_lost_extra(from, from_q, ebd->len);
636         ebd->base = ebd->len = ebd->off = 0;
637         return ret;
638 }
639
640 /* Copy up to len bytes from q->bfirst to @to, leaving the block in place.  May
641  * return with less than len, but greater than 0, even if there is more
642  * available in q.
643  *
644  * At any moment that we have copied anything and things are tricky, we can just
645  * return.  The trickiness comes from a bunch of variables: is the main body
646  * empty?  How do we split the ebd?  If our alloc fails, then we can fall back
647  * to @to's main body, but only if we haven't used it yet. */
648 static size_t copy_from_first_block(struct queue *q, struct block *to,
649                                     size_t len)
650 {
651         struct block *from = q->bfirst;
652         size_t copy_amt, amt;
653         struct extra_bdata *ebd;
654
655         assert(len < BLEN(from));       /* sanity */
656         /* Try to extract from the main body */
657         copy_amt = MIN(BHLEN(from), len);
658         if (copy_amt) {
659                 copy_amt = copy_to_block_body(to, from->rp, copy_amt);
660                 from->rp += copy_amt;
661                 /* We only change dlen, (data len), not q->len, since the q still has
662                  * the same block memory allocation (no kfrees happened) */
663                 q->dlen -= copy_amt;
664                 q->bytes_read += copy_amt;
665         }
666         /* Try to extract the remainder from the extra data */
667         len -= copy_amt;
668         for (int i = 0; (i < from->nr_extra_bufs) && len; i++) {
669                 ebd = &from->extra_data[i];
670                 if (!ebd->base || !ebd->len)
671                         continue;
672                 if (len >= ebd->len) {
673                         amt = move_ebd(ebd, to, from, q);
674                         if (!amt) {
675                                 /* our internal alloc could have failed.   this ebd is now the
676                                  * last one we'll consider.  let's handle it separately and put
677                                  * it in the main body. */
678                                 if (copy_amt)
679                                         return copy_amt;
680                                 copy_amt = copy_to_block_body(to, (void*)ebd->base + ebd->off,
681                                                               ebd->len);
682                                 block_and_q_lost_extra(from, q, copy_amt);
683                                 break;
684                         }
685                         len -= amt;
686                         copy_amt += amt;
687                         continue;
688                 } else {
689                         /* If we're here, we reached our final ebd, which we'll need to
690                          * split to get anything from it. */
691                         if (copy_amt)
692                                 return copy_amt;
693                         copy_amt = copy_to_block_body(to, (void*)ebd->base + ebd->off,
694                                                       len);
695                         ebd->off += copy_amt;
696                         ebd->len -= copy_amt;
697                         block_and_q_lost_extra(from, q, copy_amt);
698                         break;
699                 }
700         }
701         if (len)
702                 assert(copy_amt);       /* sanity */
703         return copy_amt;
704 }
705
706 /* Return codes for __qbread and __try_qbread. */
707 enum {
708         QBR_OK,
709         QBR_FAIL,
710         QBR_SPARE,      /* we need a spare block */
711         QBR_AGAIN,      /* do it again, we are coalescing blocks */
712 };
713
714 /* Helper and back-end for __qbread: extracts and returns a list of blocks
715  * containing up to len bytes.  It may contain less than len even if q has more
716  * data.
717  *
718  * Returns a code interpreted by __qbread, and the returned blist in ret. */
719 static int __try_qbread(struct queue *q, size_t len, int qio_flags,
720                         struct block **real_ret, struct block *spare)
721 {
722         struct block *ret, *ret_last, *first;
723         size_t blen;
724         bool was_unwritable = FALSE;
725
726         if (qio_flags & QIO_CAN_ERR_SLEEP) {
727                 if (!qwait_and_ilock(q, qio_flags)) {
728                         spin_unlock_irqsave(&q->lock);
729                         return QBR_FAIL;
730                 }
731                 /* we qwaited and still hold the lock, so the q is not empty */
732                 first = q->bfirst;
733         } else {
734                 spin_lock_irqsave(&q->lock);
735                 first = q->bfirst;
736                 if (!first) {
737                         spin_unlock_irqsave(&q->lock);
738                         return QBR_FAIL;
739                 }
740         }
741         /* We need to check before adjusting q->len.  We're checking the writer's
742          * sleep condition / tap condition.  When set, we *might* be making an edge
743          * transition (from unwritable to writable), which needs to wake and fire
744          * taps.  But, our read might not drain the queue below q->lim.  We'll check
745          * again later to see if we should really wake them.  */
746         was_unwritable = !qwritable(q);
747         blen = BLEN(first);
748         if ((q->state & Qcoalesce) && (blen == 0)) {
749                 freeb(pop_first_block(q));
750                 spin_unlock_irqsave(&q->lock);
751                 /* Need to retry to make sure we have a first block */
752                 return QBR_AGAIN;
753         }
754         /* Qmsg: just return the first block.  Be careful, since our caller might
755          * not read all of the block and thus drop bytes.  Similar to SOCK_DGRAM. */
756         if (q->state & Qmsg) {
757                 ret = pop_first_block(q);
758                 goto out_ok;
759         }
760         /* Let's get at least something first - makes the code easier.  This way,
761          * we'll only ever split the block once. */
762         if (blen <= len) {
763                 ret = pop_first_block(q);
764                 len -= blen;
765         } else {
766                 /* need to split the block.  we won't actually take the first block out
767                  * of the queue - we're just extracting a little bit. */
768                 if (!spare) {
769                         /* We have nothing and need a spare block.  Retry! */
770                         spin_unlock_irqsave(&q->lock);
771                         return QBR_SPARE;
772                 }
773                 copy_from_first_block(q, spare, len);
774                 ret = spare;
775                 goto out_ok;
776         }
777         /* At this point, we just grabbed the first block.  We can try to grab some
778          * more, up to len (if they want). */
779         if (qio_flags & QIO_JUST_ONE_BLOCK)
780                 goto out_ok;
781         ret_last = ret;
782         while (q->bfirst && (len > 0)) {
783                 blen = BLEN(q->bfirst);
784                 if ((q->state & Qcoalesce) && (blen == 0)) {
785                         /* remove the intermediate 0 blocks */
786                         freeb(pop_first_block(q));
787                         continue;
788                 }
789                 if (blen > len) {
790                         /* We could try to split the block, but that's a huge pain.  For
791                          * instance, we might need to move the main body of b into an
792                          * extra_data of ret_last.  lots of ways for that to fail, and lots
793                          * of cases to consider.  Easier to just bail out.  This is why I
794                          * did the first block above: we don't need to worry about this. */
795                          break;
796                 }
797                 ret_last->next = pop_first_block(q);
798                 ret_last = ret_last->next;
799                 len -= blen;
800         }
801 out_ok:
802         /* Don't wake them up or fire tap if we didn't drain enough. */
803         if (!qwritable(q))
804                 was_unwritable = FALSE;
805         spin_unlock_irqsave(&q->lock);
806         if (was_unwritable) {
807                 if (q->kick && !(qio_flags & QIO_DONT_KICK))
808                         q->kick(q->arg);
809                 rendez_wakeup(&q->wr);
810                 qwake_cb(q, FDTAP_FILT_WRITABLE);
811         }
812         *real_ret = ret;
813         return QBR_OK;
814 }
815
816 /* Helper and front-end for __try_qbread: extracts and returns a list of blocks
817  * containing up to len bytes.  It may contain less than len even if q has more
818  * data.
819  *
820  * Returns 0 if the q is closed, if it would require blocking and !CAN_BLOCK, or
821  * if it required a spare and the memory allocation failed.
822  *
823  * Technically, there's a weird corner case with !Qcoalesce and Qmsg where you
824  * could get a zero length block back. */
825 static struct block *__qbread(struct queue *q, size_t len, int qio_flags,
826                               int mem_flags)
827 {
828         ERRSTACK(1);
829         struct block *ret = 0;
830         struct block *volatile spare = 0;       /* volatile for the waserror */
831
832         /* __try_qbread can throw, based on qio flags. */
833         if ((qio_flags & QIO_CAN_ERR_SLEEP) && waserror()) {
834                 if (spare)
835                         freeb(spare);
836                 nexterror();
837         }
838         while (1) {
839                 switch (__try_qbread(q, len, qio_flags, &ret, spare)) {
840                 case QBR_OK:
841                 case QBR_FAIL:
842                         if (spare && (ret != spare))
843                                 freeb(spare);
844                         goto out_ret;
845                 case QBR_SPARE:
846                         assert(!spare);
847                         /* Due to some nastiness, we need a fresh block so we can read out
848                          * anything from the queue.  'len' seems like a reasonable amount.
849                          * Maybe we can get away with less. */
850                         spare = block_alloc(len, mem_flags);
851                         if (!spare) {
852                                 /* Careful here: a memory failure (possible with MEM_ATOMIC)
853                                  * could look like 'no data in the queue' (QBR_FAIL).  The only
854                                  * one who does is this qget(), who happens to know that we
855                                  * won't need a spare, due to the len argument.  Spares are only
856                                  * needed when we need to split a block. */
857                                 ret = 0;
858                                 goto out_ret;
859                         }
860                         break;
861                 case QBR_AGAIN:
862                         /* if the first block is 0 and we are Qcoalesce, then we'll need to
863                          * try again.  We bounce out of __try so we can perform the "is
864                          * there a block" logic again from the top. */
865                         break;
866                 }
867         }
868         assert(0);
869 out_ret:
870         if (qio_flags & QIO_CAN_ERR_SLEEP)
871                 poperror();
872         return ret;
873 }
874
875 /*
876  *  get next block from a queue, return null if nothing there
877  */
878 struct block *qget(struct queue *q)
879 {
880         /* since len == SIZE_MAX, we should never need to do a mem alloc */
881         return __qbread(q, SIZE_MAX, QIO_JUST_ONE_BLOCK, MEM_ATOMIC);
882 }
883
884 /* Throw away the next 'len' bytes in the queue returning the number actually
885  * discarded.
886  *
887  * If the bytes are in the queue, then they must be discarded.  The only time to
888  * return less than len is if the q itself has less than len bytes.
889  *
890  * This won't trigger a kick when waking up any sleepers.  This seems to be Plan
891  * 9's intent, since the TCP stack will deadlock if qdiscard kicks. */
892 size_t qdiscard(struct queue *q, size_t len)
893 {
894         struct block *blist;
895         size_t removed_amt;
896         size_t sofar = 0;
897
898         /* This is racy.  There could be multiple qdiscarders or other consumers,
899          * where the consumption could be interleaved. */
900         while (qlen(q) && len) {
901                 blist = __qbread(q, len, QIO_DONT_KICK, MEM_WAIT);
902                 removed_amt = freeblist(blist);
903                 sofar += removed_amt;
904                 len -= removed_amt;
905         }
906         return sofar;
907 }
908
909 ssize_t qpass(struct queue *q, struct block *b)
910 {
911         return __qbwrite(q, b, QIO_LIMIT | QIO_DROP_OVERFLOW);
912 }
913
914 ssize_t qpassnolim(struct queue *q, struct block *b)
915 {
916         return __qbwrite(q, b, 0);
917 }
918
919 /*
920  *  if the allocated space is way out of line with the used
921  *  space, reallocate to a smaller block
922  */
923 struct block *packblock(struct block *bp)
924 {
925         struct block **l, *nbp;
926         int n;
927
928         if (bp->extra_len)
929                 return bp;
930         for (l = &bp; *l; l = &(*l)->next) {
931                 nbp = *l;
932                 n = BLEN(nbp);
933                 if ((n << 2) < BALLOC(nbp)) {
934                         *l = block_alloc(n, MEM_WAIT);
935                         memmove((*l)->wp, nbp->rp, n);
936                         (*l)->wp += n;
937                         (*l)->next = nbp->next;
938                         freeb(nbp);
939                 }
940         }
941
942         return bp;
943 }
944
945 /* Add an extra_data entry to newb at newb_idx pointing to b's body, starting at
946  * body_rp, for up to len.  Returns the len consumed.
947  *
948  * The base is 'b', so that we can kfree it later.  This currently ties us to
949  * using kfree for the release method for all extra_data.
950  *
951  * It is possible to have a body size that is 0, if there is no offset, and
952  * b->wp == b->rp.  This will have an extra data entry of 0 length. */
953 static size_t point_to_body(struct block *b, uint8_t *body_rp,
954                             struct block *newb, unsigned int newb_idx,
955                             size_t len)
956 {
957         struct extra_bdata *ebd = &newb->extra_data[newb_idx];
958
959         assert(newb_idx < newb->nr_extra_bufs);
960
961         kmalloc_incref(b);
962         ebd->base = (uintptr_t)b;
963         ebd->off = (uint32_t)(body_rp - (uint8_t*)b);
964         ebd->len = MIN(b->wp - body_rp, len);   /* think of body_rp as b->rp */
965         assert((int)ebd->len >= 0);
966         newb->extra_len += ebd->len;
967         return ebd->len;
968 }
969
970 /* Add an extra_data entry to newb at newb_idx pointing to b's b_idx'th
971  * extra_data buf, at b_off within that buffer, for up to len.  Returns the len
972  * consumed.
973  *
974  * We can have blocks with 0 length, but they are still refcnt'd.  See above. */
975 static size_t point_to_buf(struct block *b, unsigned int b_idx, uint32_t b_off,
976                            struct block *newb, unsigned int newb_idx,
977                            size_t len)
978 {
979         struct extra_bdata *n_ebd = &newb->extra_data[newb_idx];
980         struct extra_bdata *b_ebd = &b->extra_data[b_idx];
981
982         assert(b_idx < b->nr_extra_bufs);
983         assert(newb_idx < newb->nr_extra_bufs);
984
985         kmalloc_incref((void*)b_ebd->base);
986         n_ebd->base = b_ebd->base;
987         n_ebd->off = b_ebd->off + b_off;
988         n_ebd->len = MIN(b_ebd->len - b_off, len);
989         newb->extra_len += n_ebd->len;
990         return n_ebd->len;
991 }
992
993 /* given a string of blocks, sets up the new block's extra_data such that it
994  * *points* to the contents of the blist [offset, len + offset).  This does not
995  * make a separate copy of the contents of the blist.
996  *
997  * returns 0 on success.  the only failure is if the extra_data array was too
998  * small, so this returns a positive integer saying how big the extra_data needs
999  * to be.
1000  *
1001  * callers are responsible for protecting the list structure. */
1002 static int __blist_clone_to(struct block *blist, struct block *newb, int len,
1003                             uint32_t offset)
1004 {
1005         struct block *b, *first;
1006         unsigned int nr_bufs = 0;
1007         unsigned int b_idx, newb_idx = 0;
1008         uint8_t *first_main_body = 0;
1009         ssize_t sofar = 0;
1010
1011         /* find the first block; keep offset relative to the latest b in the list */
1012         for (b = blist; b; b = b->next) {
1013                 if (BLEN(b) > offset)
1014                         break;
1015                 offset -= BLEN(b);
1016         }
1017         /* qcopy semantics: if you asked for an offset outside the block list, you
1018          * get an empty block back */
1019         if (!b)
1020                 return 0;
1021         first = b;
1022         sofar -= offset;        /* don't count the remaining offset in the first b */
1023         /* upper bound for how many buffers we'll need in newb */
1024         for (/* b is set*/; b; b = b->next) {
1025                 nr_bufs += BHLEN(b) ? 1 : 0;
1026                 nr_bufs += b->nr_extra_bufs; /* still assuming nr_extra == nr_valid */
1027                 sofar += BLEN(b);
1028                 if (sofar > len)
1029                         break;
1030         }
1031         /* we might be holding a spinlock here, so we won't wait for kmalloc */
1032         if (block_add_extd(newb, nr_bufs, 0) != 0) {
1033                 /* caller will need to alloc these, then re-call us */
1034                 return nr_bufs;
1035         }
1036         for (b = first; b && len; b = b->next) {
1037                 b_idx = 0;
1038                 if (offset) {
1039                         if (offset < BHLEN(b)) {
1040                                 /* off is in the main body */
1041                                 len -= point_to_body(b, b->rp + offset, newb, newb_idx, len);
1042                                 newb_idx++;
1043                         } else {
1044                                 /* off is in one of the buffers (or just past the last one).
1045                                  * we're not going to point to b's main body at all. */
1046                                 offset -= BHLEN(b);
1047                                 assert(b->extra_data);
1048                                 /* assuming these extrabufs are packed, or at least that len
1049                                  * isn't gibberish */
1050                                 while (b->extra_data[b_idx].len <= offset) {
1051                                         offset -= b->extra_data[b_idx].len;
1052                                         b_idx++;
1053                                 }
1054                                 /* now offset is set to our offset in the b_idx'th buf */
1055                                 len -= point_to_buf(b, b_idx, offset, newb, newb_idx, len);
1056                                 newb_idx++;
1057                                 b_idx++;
1058                         }
1059                         offset = 0;
1060                 } else {
1061                         if (BHLEN(b)) {
1062                                 len -= point_to_body(b, b->rp, newb, newb_idx, len);
1063                                 newb_idx++;
1064                         }
1065                 }
1066                 /* knock out all remaining bufs.  we only did one point_to_ op by now,
1067                  * and any point_to_ could be our last if it consumed all of len. */
1068                 for (int i = b_idx; (i < b->nr_extra_bufs) && len; i++) {
1069                         len -= point_to_buf(b, i, 0, newb, newb_idx, len);
1070                         newb_idx++;
1071                 }
1072         }
1073         return 0;
1074 }
1075
1076 struct block *blist_clone(struct block *blist, int header_len, int len,
1077                           uint32_t offset)
1078 {
1079         int ret;
1080         struct block *newb = block_alloc(header_len, MEM_WAIT);
1081         do {
1082                 ret = __blist_clone_to(blist, newb, len, offset);
1083                 if (ret)
1084                         block_add_extd(newb, ret, MEM_WAIT);
1085         } while (ret);
1086         return newb;
1087 }
1088
1089 /* given a queue, makes a single block with header_len reserved space in the
1090  * block main body, and the contents of [offset, len + offset) pointed to in the
1091  * new blocks ext_data.  This does not make a copy of the q's contents, though
1092  * you do have a ref count on the memory. */
1093 struct block *qclone(struct queue *q, int header_len, int len, uint32_t offset)
1094 {
1095         int ret;
1096         struct block *newb = block_alloc(header_len, MEM_WAIT);
1097         /* the while loop should rarely be used: it would require someone
1098          * concurrently adding to the queue. */
1099         do {
1100                 /* TODO: RCU: protecting the q list (b->next) (need read lock) */
1101                 spin_lock_irqsave(&q->lock);
1102                 ret = __blist_clone_to(q->bfirst, newb, len, offset);
1103                 spin_unlock_irqsave(&q->lock);
1104                 if (ret)
1105                         block_add_extd(newb, ret, MEM_WAIT);
1106         } while (ret);
1107         return newb;
1108 }
1109
1110 /*
1111  *  copy from offset in the queue
1112  */
1113 struct block *qcopy_old(struct queue *q, int len, uint32_t offset)
1114 {
1115         int sofar;
1116         int n;
1117         struct block *b, *nb;
1118         uint8_t *p;
1119
1120         nb = block_alloc(len, MEM_WAIT);
1121
1122         spin_lock_irqsave(&q->lock);
1123
1124         /* go to offset */
1125         b = q->bfirst;
1126         for (sofar = 0;; sofar += n) {
1127                 if (b == NULL) {
1128                         spin_unlock_irqsave(&q->lock);
1129                         return nb;
1130                 }
1131                 n = BLEN(b);
1132                 if (sofar + n > offset) {
1133                         p = b->rp + offset - sofar;
1134                         n -= offset - sofar;
1135                         break;
1136                 }
1137                 QDEBUG checkb(b, "qcopy");
1138                 b = b->next;
1139         }
1140
1141         /* copy bytes from there */
1142         for (sofar = 0; sofar < len;) {
1143                 if (n > len - sofar)
1144                         n = len - sofar;
1145                 PANIC_EXTRA(b);
1146                 memmove(nb->wp, p, n);
1147                 qcopycnt += n;
1148                 sofar += n;
1149                 nb->wp += n;
1150                 b = b->next;
1151                 if (b == NULL)
1152                         break;
1153                 n = BLEN(b);
1154                 p = b->rp;
1155         }
1156         spin_unlock_irqsave(&q->lock);
1157
1158         return nb;
1159 }
1160
1161 struct block *qcopy(struct queue *q, int len, uint32_t offset)
1162 {
1163 #ifdef CONFIG_BLOCK_EXTRAS
1164         return qclone(q, 0, len, offset);
1165 #else
1166         return qcopy_old(q, len, offset);
1167 #endif
1168 }
1169
1170 static void qinit_common(struct queue *q)
1171 {
1172         spinlock_init_irqsave(&q->lock);
1173         rendez_init(&q->rr);
1174         rendez_init(&q->wr);
1175 }
1176
1177 /*
1178  *  called by non-interrupt code
1179  */
1180 struct queue *qopen(int limit, int msg, void (*kick) (void *), void *arg)
1181 {
1182         struct queue *q;
1183
1184         q = kzmalloc(sizeof(struct queue), 0);
1185         if (q == 0)
1186                 return 0;
1187         qinit_common(q);
1188
1189         q->limit = q->inilim = limit;
1190         q->kick = kick;
1191         q->arg = arg;
1192         q->state = msg;
1193         q->eof = 0;
1194
1195         return q;
1196 }
1197
1198 /* open a queue to be bypassed */
1199 struct queue *qbypass(void (*bypass) (void *, struct block *), void *arg)
1200 {
1201         struct queue *q;
1202
1203         q = kzmalloc(sizeof(struct queue), 0);
1204         if (q == 0)
1205                 return 0;
1206         qinit_common(q);
1207
1208         q->limit = 0;
1209         q->arg = arg;
1210         q->bypass = bypass;
1211         q->state = 0;
1212
1213         return q;
1214 }
1215
1216 static int notempty(void *a)
1217 {
1218         struct queue *q = a;
1219
1220         return (q->state & Qclosed) || q->bfirst != 0;
1221 }
1222
1223 /* Block, waiting for the queue to be non-empty or closed.  Returns with
1224  * the spinlock held.  Returns TRUE when there queue is not empty, FALSE if it
1225  * was naturally closed.  Throws an error o/w. */
1226 static bool qwait_and_ilock(struct queue *q, int qio_flags)
1227 {
1228         while (1) {
1229                 spin_lock_irqsave(&q->lock);
1230                 if (q->bfirst != NULL)
1231                         return TRUE;
1232                 if (q->state & Qclosed) {
1233                         if (++q->eof > 3) {
1234                                 spin_unlock_irqsave(&q->lock);
1235                                 error(EPIPE, "multiple reads on a closed queue");
1236                         }
1237                         if (q->err[0]) {
1238                                 spin_unlock_irqsave(&q->lock);
1239                                 error(EPIPE, q->err);
1240                         }
1241                         return FALSE;
1242                 }
1243                 if (qio_flags & QIO_NON_BLOCK) {
1244                         spin_unlock_irqsave(&q->lock);
1245                         error(EAGAIN, "queue empty");
1246                 }
1247                 spin_unlock_irqsave(&q->lock);
1248                 /* As with the producer side, we check for a condition while holding the
1249                  * q->lock, decide to sleep, then unlock.  It's like the "check, signal,
1250                  * check again" pattern, but we do it conditionally.  Both sides agree
1251                  * synchronously to do it, and those decisions are made while holding
1252                  * q->lock.  I think this is OK.
1253                  *
1254                  * The invariant is that no reader sleeps when the queue has data.
1255                  * While holding the rendez lock, if we see there's no data, we'll
1256                  * sleep.  Since we saw there was no data, the next writer will see (or
1257                  * already saw) no data, and then the writer decides to rendez_wake,
1258                  * which will grab the rendez lock.  If the writer already did that,
1259                  * then we'll see notempty when we do our check-again. */
1260                 rendez_sleep(&q->rr, notempty, q);
1261         }
1262 }
1263
1264 /*
1265  * add a block list to a queue
1266  * XXX basically the same as enqueue blist, and has no locking!
1267  */
1268 void qaddlist(struct queue *q, struct block *b)
1269 {
1270         /* TODO: q lock? */
1271         /* queue the block */
1272         if (q->bfirst)
1273                 q->blast->next = b;
1274         else
1275                 q->bfirst = b;
1276         q->dlen += blocklen(b);
1277         while (b->next)
1278                 b = b->next;
1279         q->blast = b;
1280 }
1281
1282 static size_t read_from_block(struct block *b, uint8_t *to, size_t amt)
1283 {
1284         size_t copy_amt, retval = 0;
1285         struct extra_bdata *ebd;
1286
1287         copy_amt = MIN(BHLEN(b), amt);
1288         memcpy(to, b->rp, copy_amt);
1289         /* advance the rp, since this block not be completely consumed and future
1290          * reads need to know where to pick up from */
1291         b->rp += copy_amt;
1292         to += copy_amt;
1293         amt -= copy_amt;
1294         retval += copy_amt;
1295         for (int i = 0; (i < b->nr_extra_bufs) && amt; i++) {
1296                 ebd = &b->extra_data[i];
1297                 /* skip empty entires.  if we track this in the struct block, we can
1298                  * just start the for loop early */
1299                 if (!ebd->base || !ebd->len)
1300                         continue;
1301                 copy_amt = MIN(ebd->len, amt);
1302                 memcpy(to, (void*)(ebd->base + ebd->off), copy_amt);
1303                 /* we're actually consuming the entries, just like how we advance rp up
1304                  * above, and might only consume part of one. */
1305                 ebd->len -= copy_amt;
1306                 ebd->off += copy_amt;
1307                 b->extra_len -= copy_amt;
1308                 if (!ebd->len) {
1309                         /* we don't actually have to decref here.  it's also done in
1310                          * freeb().  this is the earliest we can free. */
1311                         kfree((void*)ebd->base);
1312                         ebd->base = ebd->off = 0;
1313                 }
1314                 to += copy_amt;
1315                 amt -= copy_amt;
1316                 retval += copy_amt;
1317         }
1318         return retval;
1319 }
1320
1321 /*
1322  *  copy the contents of a string of blocks into
1323  *  memory.  emptied blocks are freed.  return
1324  *  pointer to first unconsumed block.
1325  */
1326 struct block *bl2mem(uint8_t * p, struct block *b, int n)
1327 {
1328         int i;
1329         struct block *next;
1330
1331         /* could be slicker here, since read_from_block is smart */
1332         for (; b != NULL; b = next) {
1333                 i = BLEN(b);
1334                 if (i > n) {
1335                         /* partial block, consume some */
1336                         read_from_block(b, p, n);
1337                         return b;
1338                 }
1339                 /* full block, consume all and move on */
1340                 i = read_from_block(b, p, i);
1341                 n -= i;
1342                 p += i;
1343                 next = b->next;
1344                 freeb(b);
1345         }
1346         return NULL;
1347 }
1348
1349 /* Extract the contents of all blocks and copy to va, up to len.  Returns the
1350  * actual amount copied. */
1351 static size_t read_all_blocks(struct block *b, void *va, size_t len)
1352 {
1353         size_t sofar = 0;
1354         struct block *next;
1355
1356         do {
1357                 /* We should be draining every block completely. */
1358                 assert(BLEN(b) <= len - sofar);
1359                 assert(va);
1360                 assert(va + sofar);
1361                 assert(b->rp);
1362                 sofar += read_from_block(b, va + sofar, len - sofar);
1363                 next = b->next;
1364                 freeb(b);
1365                 b = next;
1366         } while (b);
1367         return sofar;
1368 }
1369
1370 /*
1371  *  copy the contents of memory into a string of blocks.
1372  *  return NULL on error.
1373  */
1374 struct block *mem2bl(uint8_t * p, int len)
1375 {
1376         ERRSTACK(1);
1377         int n;
1378         struct block *b, *first, **l;
1379
1380         first = NULL;
1381         l = &first;
1382         if (waserror()) {
1383                 freeblist(first);
1384                 nexterror();
1385         }
1386         do {
1387                 n = len;
1388                 if (n > Maxatomic)
1389                         n = Maxatomic;
1390
1391                 *l = b = block_alloc(n, MEM_WAIT);
1392                 /* TODO consider extra_data */
1393                 memmove(b->wp, p, n);
1394                 b->wp += n;
1395                 p += n;
1396                 len -= n;
1397                 l = &b->next;
1398         } while (len > 0);
1399         poperror();
1400
1401         return first;
1402 }
1403
1404 /*
1405  *  put a block back to the front of the queue
1406  *  called with q ilocked
1407  */
1408 void qputback(struct queue *q, struct block *b)
1409 {
1410         b->next = q->bfirst;
1411         if (q->bfirst == NULL)
1412                 q->blast = b;
1413         q->bfirst = b;
1414         q->dlen += BLEN(b);
1415         /* qputback seems to undo a read, so we can undo the accounting too. */
1416         q->bytes_read -= BLEN(b);
1417 }
1418
1419 /*
1420  *  get next block from a queue (up to a limit)
1421  *
1422  */
1423 struct block *qbread(struct queue *q, size_t len)
1424 {
1425         return __qbread(q, len, QIO_JUST_ONE_BLOCK | QIO_CAN_ERR_SLEEP, MEM_WAIT);
1426 }
1427
1428 struct block *qbread_nonblock(struct queue *q, size_t len)
1429 {
1430         return __qbread(q, len, QIO_JUST_ONE_BLOCK | QIO_CAN_ERR_SLEEP |
1431                         QIO_NON_BLOCK, MEM_WAIT);
1432 }
1433
1434 /* read up to len from a queue into vp. */
1435 size_t qread(struct queue *q, void *va, size_t len)
1436 {
1437         struct block *blist = __qbread(q, len, QIO_CAN_ERR_SLEEP, MEM_WAIT);
1438
1439         if (!blist)
1440                 return 0;
1441         return read_all_blocks(blist, va, len);
1442 }
1443
1444 size_t qread_nonblock(struct queue *q, void *va, size_t len)
1445 {
1446         struct block *blist = __qbread(q, len, QIO_CAN_ERR_SLEEP | QIO_NON_BLOCK,
1447                                        MEM_WAIT);
1448
1449         if (!blist)
1450                 return 0;
1451         return read_all_blocks(blist, va, len);
1452 }
1453
1454 /* This is the rendez wake condition for writers. */
1455 static int qwriter_should_wake(void *a)
1456 {
1457         struct queue *q = a;
1458
1459         return qwritable(q) || (q->state & Qclosed);
1460 }
1461
1462 /* Helper: enqueues a list of blocks to a queue.  Returns the total length. */
1463 static size_t enqueue_blist(struct queue *q, struct block *b)
1464 {
1465         size_t dlen;
1466
1467         if (q->bfirst)
1468                 q->blast->next = b;
1469         else
1470                 q->bfirst = b;
1471         dlen = BLEN(b);
1472         while (b->next) {
1473                 b = b->next;
1474                 dlen += BLEN(b);
1475         }
1476         q->blast = b;
1477         q->dlen += dlen;
1478         return dlen;
1479 }
1480
1481 /* Adds block (which can be a list of blocks) to the queue, subject to
1482  * qio_flags.  Returns the length written on success or -1 on non-throwable
1483  * error.  Adjust qio_flags to control the value-added features!. */
1484 static ssize_t __qbwrite(struct queue *q, struct block *b, int qio_flags)
1485 {
1486         ssize_t ret;
1487         bool was_unreadable;
1488
1489         if (q->bypass) {
1490                 ret = blocklen(b);
1491                 (*q->bypass) (q->arg, b);
1492                 return ret;
1493         }
1494         spin_lock_irqsave(&q->lock);
1495         was_unreadable = q->dlen == 0;
1496         if (q->state & Qclosed) {
1497                 spin_unlock_irqsave(&q->lock);
1498                 freeblist(b);
1499                 if (!(qio_flags & QIO_CAN_ERR_SLEEP))
1500                         return -1;
1501                 if (q->err[0])
1502                         error(EPIPE, q->err);
1503                 else
1504                         error(EPIPE, "connection closed");
1505         }
1506         if ((qio_flags & QIO_LIMIT) && (q->dlen >= q->limit)) {
1507                 /* drop overflow takes priority over regular non-blocking */
1508                 if ((qio_flags & QIO_DROP_OVERFLOW) || (q->state & Qdropoverflow)) {
1509                         spin_unlock_irqsave(&q->lock);
1510                         freeb(b);
1511                         return -1;
1512                 }
1513                 /* People shouldn't set NON_BLOCK without CAN_ERR, but we can be nice
1514                  * and catch it. */
1515                 if ((qio_flags & QIO_CAN_ERR_SLEEP) && (qio_flags & QIO_NON_BLOCK)) {
1516                         spin_unlock_irqsave(&q->lock);
1517                         freeb(b);
1518                         error(EAGAIN, "queue full");
1519                 }
1520         }
1521         ret = enqueue_blist(q, b);
1522         QDEBUG checkb(b, "__qbwrite");
1523         spin_unlock_irqsave(&q->lock);
1524         /* TODO: not sure if the usage of a kick is mutually exclusive with a
1525          * wakeup, meaning that actual users either want a kick or have qreaders. */
1526         if (q->kick && (was_unreadable || (q->state & Qkick)))
1527                 q->kick(q->arg);
1528         if (was_unreadable) {
1529                 /* Unlike the read side, there's no double-check to make sure the queue
1530                  * transitioned across an edge.  We know we added something, so that's
1531                  * enough.  We wake if the queue was empty.  Both sides are the same, in
1532                  * that the condition for which we do the rendez_wakeup() is the same as
1533                  * the condition done for the rendez_sleep(). */
1534                 rendez_wakeup(&q->rr);
1535                 qwake_cb(q, FDTAP_FILT_READABLE);
1536         }
1537         /*
1538          *  flow control, wait for queue to get below the limit
1539          *  before allowing the process to continue and queue
1540          *  more.  We do this here so that postnote can only
1541          *  interrupt us after the data has been queued.  This
1542          *  means that things like 9p flushes and ssl messages
1543          *  will not be disrupted by software interrupts.
1544          *
1545          *  Note - this is moderately dangerous since a process
1546          *  that keeps getting interrupted and rewriting will
1547          *  queue infinite crud.
1548          */
1549         if ((qio_flags & QIO_CAN_ERR_SLEEP) &&
1550             !(q->state & Qdropoverflow) && !(qio_flags & QIO_NON_BLOCK)) {
1551                 /* This is a racy peek at the q status.  If we accidentally block, our
1552                  * rendez will return.  The rendez's peak (qwriter_should_wake) is also
1553                  * racy w.r.t.  the q's spinlock (that lock protects writes, but not
1554                  * reads).
1555                  *
1556                  * Here's the deal: when holding the rendez lock, if we see the sleep
1557                  * condition, the consumer will wake us.  The condition will only ever
1558                  * be changed by the next qbread() (consumer, changes q->dlen).  That
1559                  * code will do a rendez wake, which will spin on the rendez lock,
1560                  * meaning it won't procede until we either see the new state (and
1561                  * return) or put ourselves on the rendez, and wake up.
1562                  *
1563                  * The pattern is one side writes mem, then signals.  Our side checks
1564                  * the signal, then reads the mem.  The goal is to not miss seeing the
1565                  * signal AND missing the memory write.  In this specific case, the
1566                  * signal is actually synchronous (the rendez lock) and not basic shared
1567                  * memory.
1568                  *
1569                  * Oh, and we spin in case we woke early and someone else filled the
1570                  * queue, mesa-style. */
1571                 while (!qwriter_should_wake(q))
1572                         rendez_sleep(&q->wr, qwriter_should_wake, q);
1573         }
1574         return ret;
1575 }
1576
1577 /*
1578  *  add a block to a queue obeying flow control
1579  */
1580 ssize_t qbwrite(struct queue *q, struct block *b)
1581 {
1582         return __qbwrite(q, b, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
1583 }
1584
1585 ssize_t qbwrite_nonblock(struct queue *q, struct block *b)
1586 {
1587         return __qbwrite(q, b, QIO_CAN_ERR_SLEEP | QIO_LIMIT | QIO_NON_BLOCK);
1588 }
1589
1590 ssize_t qibwrite(struct queue *q, struct block *b)
1591 {
1592         return __qbwrite(q, b, 0);
1593 }
1594
1595 /* Helper, allocs a block and copies [from, from + len) into it.  Returns the
1596  * block on success, 0 on failure. */
1597 static struct block *build_block(void *from, size_t len, int mem_flags)
1598 {
1599         struct block *b;
1600         void *ext_buf;
1601
1602         /* If len is small, we don't need to bother with the extra_data.  But until
1603          * the whole stack can handle extd blocks, we'll use them unconditionally.
1604          * */
1605 #ifdef CONFIG_BLOCK_EXTRAS
1606         /* allocb builds in 128 bytes of header space to all blocks, but this is
1607          * only available via padblock (to the left).  we also need some space
1608          * for pullupblock for some basic headers (like icmp) that get written
1609          * in directly */
1610                         // XXX might need to increase for TCP opts
1611         b = block_alloc(64, mem_flags);
1612         if (!b)
1613                 return 0;
1614         ext_buf = kmalloc(len, mem_flags);
1615         if (!ext_buf) {
1616                 kfree(b);
1617                 return 0;
1618         }
1619         memcpy(ext_buf, from, len);
1620         if (block_add_extd(b, 1, mem_flags)) {
1621                 kfree(ext_buf);
1622                 kfree(b);
1623                 return 0;
1624         }
1625         b->extra_data[0].base = (uintptr_t)ext_buf;
1626         b->extra_data[0].off = 0;
1627         b->extra_data[0].len = len;
1628         b->extra_len += len;
1629 #else
1630         b = block_alloc(len, mem_flags);
1631         if (!b)
1632                 return 0;
1633         memmove(b->wp, from, len);
1634         b->wp += len;
1635 #endif
1636         return b;
1637 }
1638
1639 static ssize_t __qwrite(struct queue *q, void *vp, size_t len, int mem_flags,
1640                         int qio_flags)
1641 {
1642         ERRSTACK(1);
1643         size_t n;
1644         volatile size_t sofar = 0;      /* volatile for the waserror */
1645         struct block *b;
1646         uint8_t *p = vp;
1647         void *ext_buf;
1648
1649         /* Only some callers can throw.  Others might be in a context where waserror
1650          * isn't safe. */
1651         if ((qio_flags & QIO_CAN_ERR_SLEEP) && waserror()) {
1652                 /* Any error (EAGAIN for nonblock, syscall aborted, even EPIPE) after
1653                  * some data has been sent should be treated as a partial write. */
1654                 if (sofar)
1655                         goto out_ok;
1656                 nexterror();
1657         }
1658         do {
1659                 n = len - sofar;
1660                 /* This is 64K, the max amount per single block.  Still a good value? */
1661                 // XXX btw, Maxatomic will probably fragment memory a bit, since the
1662                 // kmalloc in build_block is a little more than 64K, all together.
1663                 //              or don't use kmalloc
1664                 //
1665                 if (n > Maxatomic)
1666                         n = Maxatomic;
1667                 b = build_block(p + sofar, n, mem_flags);
1668                 if (!b)
1669                         break;
1670                 if (__qbwrite(q, b, qio_flags) < 0)
1671                         break;
1672                 sofar += n;
1673         } while ((sofar < len) && (q->state & Qmsg) == 0);
1674 out_ok:
1675         if (qio_flags & QIO_CAN_ERR_SLEEP)
1676                 poperror();
1677         return sofar;
1678 }
1679
1680 ssize_t qwrite(struct queue *q, void *vp, int len)
1681 {
1682         return __qwrite(q, vp, len, MEM_WAIT, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
1683 }
1684
1685 ssize_t qwrite_nonblock(struct queue *q, void *vp, int len)
1686 {
1687         return __qwrite(q, vp, len, MEM_WAIT, QIO_CAN_ERR_SLEEP | QIO_LIMIT |
1688                                               QIO_NON_BLOCK);
1689 }
1690
1691 ssize_t qiwrite(struct queue *q, void *vp, int len)
1692 {
1693         return __qwrite(q, vp, len, MEM_ATOMIC, 0);
1694 }
1695
1696 /*
1697  *  be extremely careful when calling this,
1698  *  as there is no reference accounting
1699  */
1700 void qfree(struct queue *q)
1701 {
1702         qclose(q);
1703         kfree(q);
1704 }
1705
1706 /*
1707  *  Mark a queue as closed.  No further IO is permitted.
1708  *  All blocks are released.
1709  */
1710 void qclose(struct queue *q)
1711 {
1712         struct block *bfirst;
1713
1714         if (q == NULL)
1715                 return;
1716
1717         /* mark it */
1718         spin_lock_irqsave(&q->lock);
1719         q->state |= Qclosed;
1720         q->state &= ~Qdropoverflow;
1721         q->err[0] = 0;
1722         bfirst = q->bfirst;
1723         q->bfirst = 0;
1724         q->dlen = 0;
1725         spin_unlock_irqsave(&q->lock);
1726
1727         /* free queued blocks */
1728         freeblist(bfirst);
1729
1730         /* wake up readers/writers */
1731         rendez_wakeup(&q->rr);
1732         rendez_wakeup(&q->wr);
1733         qwake_cb(q, FDTAP_FILT_HANGUP);
1734 }
1735
1736 /* Mark a queue as closed.  Wakeup any readers.  Don't remove queued blocks.
1737  *
1738  * msg will be the errstr received by any waiters (qread, qbread, etc).  If
1739  * there is no message, which is what also happens during a natural qclose(),
1740  * those waiters will simply return 0.  qwriters will always error() on a
1741  * closed/hungup queue. */
1742 void qhangup(struct queue *q, char *msg)
1743 {
1744         /* mark it */
1745         spin_lock_irqsave(&q->lock);
1746         q->state |= Qclosed;
1747         if (msg == 0 || *msg == 0)
1748                 q->err[0] = 0;
1749         else
1750                 strlcpy(q->err, msg, ERRMAX);
1751         spin_unlock_irqsave(&q->lock);
1752
1753         /* wake up readers/writers */
1754         rendez_wakeup(&q->rr);
1755         rendez_wakeup(&q->wr);
1756         qwake_cb(q, FDTAP_FILT_HANGUP);
1757 }
1758
1759 /*
1760  *  return non-zero if the q is hungup
1761  */
1762 int qisclosed(struct queue *q)
1763 {
1764         return q->state & Qclosed;
1765 }
1766
1767 /*
1768  *  mark a queue as no longer hung up.  resets the wake_cb.
1769  */
1770 void qreopen(struct queue *q)
1771 {
1772         spin_lock_irqsave(&q->lock);
1773         q->state &= ~Qclosed;
1774         q->eof = 0;
1775         q->limit = q->inilim;
1776         q->wake_cb = 0;
1777         q->wake_data = 0;
1778         spin_unlock_irqsave(&q->lock);
1779 }
1780
1781 /*
1782  *  return bytes queued
1783  */
1784 int qlen(struct queue *q)
1785 {
1786         return q->dlen;
1787 }
1788
1789 size_t q_bytes_read(struct queue *q)
1790 {
1791         return q->bytes_read;
1792 }
1793
1794 /*
1795  * return space remaining before flow control
1796  *
1797  *  This used to be
1798  *  q->len < q->limit/2
1799  *  but it slows down tcp too much for certain write sizes.
1800  *  I really don't understand it completely.  It may be
1801  *  due to the queue draining so fast that the transmission
1802  *  stalls waiting for the app to produce more data.  - presotto
1803  *
1804  *  q->len was the amount of bytes, which is no longer used.  we now use
1805  *  q->dlen, the amount of usable data.  a.k.a. qlen()...  - brho
1806  */
1807 int qwindow(struct queue *q)
1808 {
1809         int l;
1810
1811         l = q->limit - q->dlen;
1812         if (l < 0)
1813                 l = 0;
1814         return l;
1815 }
1816
1817 /*
1818  *  return true if we can read without blocking
1819  */
1820 int qcanread(struct queue *q)
1821 {
1822         return q->bfirst != 0;
1823 }
1824
1825 /*
1826  *  change queue limit
1827  */
1828 void qsetlimit(struct queue *q, size_t limit)
1829 {
1830         bool was_writable = qwritable(q);
1831
1832         q->limit = limit;
1833         if (!was_writable && qwritable(q)) {
1834                 rendez_wakeup(&q->wr);
1835                 qwake_cb(q, FDTAP_FILT_WRITABLE);
1836         }
1837 }
1838
1839 size_t qgetlimit(struct queue *q)
1840 {
1841         return q->limit;
1842 }
1843
1844 /*
1845  *  set whether writes drop overflowing blocks, or if we sleep
1846  */
1847 void qdropoverflow(struct queue *q, bool onoff)
1848 {
1849         spin_lock_irqsave(&q->lock);
1850         if (onoff)
1851                 q->state |= Qdropoverflow;
1852         else
1853                 q->state &= ~Qdropoverflow;
1854         spin_unlock_irqsave(&q->lock);
1855 }
1856
1857 /* Be careful: this can affect concurrent reads/writes and code that might have
1858  * built-in expectations of the q's type. */
1859 void q_toggle_qmsg(struct queue *q, bool onoff)
1860 {
1861         spin_lock_irqsave(&q->lock);
1862         if (onoff)
1863                 q->state |= Qmsg;
1864         else
1865                 q->state &= ~Qmsg;
1866         spin_unlock_irqsave(&q->lock);
1867 }
1868
1869 /* Be careful: this can affect concurrent reads/writes and code that might have
1870  * built-in expectations of the q's type. */
1871 void q_toggle_qcoalesce(struct queue *q, bool onoff)
1872 {
1873         spin_lock_irqsave(&q->lock);
1874         if (onoff)
1875                 q->state |= Qcoalesce;
1876         else
1877                 q->state &= ~Qcoalesce;
1878         spin_unlock_irqsave(&q->lock);
1879 }
1880
1881 /*
1882  *  flush the output queue
1883  */
1884 void qflush(struct queue *q)
1885 {
1886         struct block *bfirst;
1887
1888         /* mark it */
1889         spin_lock_irqsave(&q->lock);
1890         bfirst = q->bfirst;
1891         q->bfirst = 0;
1892         q->dlen = 0;
1893         spin_unlock_irqsave(&q->lock);
1894
1895         /* free queued blocks */
1896         freeblist(bfirst);
1897
1898         /* wake up writers */
1899         rendez_wakeup(&q->wr);
1900         qwake_cb(q, FDTAP_FILT_WRITABLE);
1901 }
1902
1903 int qfull(struct queue *q)
1904 {
1905         return !qwritable(q);
1906 }
1907
1908 int qstate(struct queue *q)
1909 {
1910         return q->state;
1911 }
1912
1913 void qdump(struct queue *q)
1914 {
1915         if (q)
1916                 printk("q=%p bfirst=%p blast=%p dlen=%d limit=%d state=#%x\n",
1917                            q, q->bfirst, q->blast, q->dlen, q->limit, q->state);
1918 }
1919
1920 /* On certain wakeup events, qio will call func(q, data, filter), where filter
1921  * marks the type of wakeup event (flags from FDTAP).
1922  *
1923  * There's no sync protection.  If you change the CB while the qio is running,
1924  * you might get a CB with the data or func from a previous set_wake_cb.  You
1925  * should set this once per queue and forget it.
1926  *
1927  * You can remove the CB by passing in 0 for the func.  Alternatively, you can
1928  * just make sure that the func(data) pair are valid until the queue is freed or
1929  * reopened. */
1930 void qio_set_wake_cb(struct queue *q, qio_wake_cb_t func, void *data)
1931 {
1932         q->wake_data = data;
1933         wmb();  /* if we see func, we'll also see the data for it */
1934         q->wake_cb = func;
1935 }
1936
1937 /* Helper for detecting whether we'll block on a read at this instant. */
1938 bool qreadable(struct queue *q)
1939 {
1940         return qlen(q) > 0;
1941 }
1942
1943 /* Helper for detecting whether we'll block on a write at this instant. */
1944 bool qwritable(struct queue *q)
1945 {
1946         return !q->limit || qwindow(q) > 0;
1947 }