qio: Fix Qmsg panic in read_all_blocks()
[akaros.git] / kern / src / ns / qio.c
1 /* Copyright © 1994-1999 Lucent Technologies Inc.  All rights reserved.
2  * Portions Copyright © 1997-1999 Vita Nuova Limited
3  * Portions Copyright © 2000-2007 Vita Nuova Holdings Limited
4  *                                (www.vitanuova.com)
5  * Revisions Copyright © 2000-2007 Lucent Technologies Inc. and others
6  *
7  * Modified for the Akaros operating system:
8  * Copyright (c) 2013-2014 The Regents of the University of California
9  * Copyright (c) 2013-2015 Google Inc.
10  *
11  * Permission is hereby granted, free of charge, to any person obtaining a copy
12  * of this software and associated documentation files (the "Software"), to deal
13  * in the Software without restriction, including without limitation the rights
14  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15  * copies of the Software, and to permit persons to whom the Software is
16  * furnished to do so, subject to the following conditions:
17  *
18  * The above copyright notice and this permission notice shall be included in
19  * all copies or substantial portions of the Software.
20  *
21  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL THE
24  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
27  * SOFTWARE. */
28
29 #include <slab.h>
30 #include <kmalloc.h>
31 #include <kref.h>
32 #include <string.h>
33 #include <stdio.h>
34 #include <assert.h>
35 #include <error.h>
36 #include <cpio.h>
37 #include <pmap.h>
38 #include <smp.h>
39 #include <net/ip.h>
40
41 #define PANIC_EXTRA(b)                                                  \
42 {                                                                       \
43         if ((b)->extra_len) {                                           \
44                 printblock(b);                                          \
45                 backtrace();                                            \
46                 panic("%s doesn't handle extra_data", __FUNCTION__);    \
47         }                                                               \
48 }
49
50 static uint32_t padblockcnt;
51 static uint32_t concatblockcnt;
52 static uint32_t pullupblockcnt;
53 static uint32_t copyblockcnt;
54 static uint32_t consumecnt;
55 static uint32_t producecnt;
56 static uint32_t qcopycnt;
57
58 static int debugging;
59
60 #define QDEBUG  if(0)
61
62 /*
63  *  IO queues
64  */
65
66 struct queue {
67         spinlock_t lock;;
68
69         struct block *bfirst;           /* buffer */
70         struct block *blast;
71
72         int dlen;                                       /* data bytes in queue */
73         int limit;                                      /* max bytes in queue */
74         int inilim;                             /* initial limit */
75         int state;
76         int eof;                                        /* number of eofs read by user */
77         size_t bytes_read;
78
79         void (*kick) (void *);          /* restart output */
80         void (*bypass) (void *, struct block *);        /* bypass queue altogether */
81         void *arg;                                      /* argument to kick */
82
83         struct rendez rr;                       /* process waiting to read */
84         struct rendez wr;                       /* process waiting to write */
85         qio_wake_cb_t wake_cb;          /* callbacks for qio wakeups */
86         void *wake_data;
87
88         char err[ERRMAX];
89 };
90
91 enum {
92         Maxatomic = 64 * 1024,
93         QIO_CAN_ERR_SLEEP = (1 << 0),   /* can throw errors or block/sleep */
94         QIO_LIMIT = (1 << 1),                   /* respect q->limit */
95         QIO_DROP_OVERFLOW = (1 << 2),   /* alternative to setting qdropoverflow */
96         QIO_JUST_ONE_BLOCK = (1 << 3),  /* when qbreading, just get one block */
97         QIO_NON_BLOCK = (1 << 4),               /* throw EAGAIN instead of blocking */
98         QIO_DONT_KICK = (1 << 5),               /* don't kick when waking */
99 };
100
101 unsigned int qiomaxatomic = Maxatomic;
102
103 static size_t copy_to_block_body(struct block *to, void *from, size_t copy_amt);
104 static ssize_t __qbwrite(struct queue *q, struct block *b, int flags);
105 static struct block *__qbread(struct queue *q, size_t len, int qio_flags,
106                               int mem_flags);
107 static bool qwait_and_ilock(struct queue *q, int qio_flags);
108
109 /* Helper: fires a wake callback, sending 'filter' */
110 static void qwake_cb(struct queue *q, int filter)
111 {
112         if (q->wake_cb)
113                 q->wake_cb(q, q->wake_data, filter);
114 }
115
116 void ixsummary(void)
117 {
118         debugging ^= 1;
119         printd("pad %lu, concat %lu, pullup %lu, copy %lu\n",
120                    padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
121         printd("consume %lu, produce %lu, qcopy %lu\n",
122                    consumecnt, producecnt, qcopycnt);
123 }
124
125 /*
126  *  pad a block to the front (or the back if size is negative)
127  */
128 struct block *padblock(struct block *bp, int size)
129 {
130         int n;
131         struct block *nbp;
132
133         QDEBUG checkb(bp, "padblock 1");
134         if (size >= 0) {
135                 if (bp->rp - bp->base >= size) {
136                         bp->network_offset += size;
137                         bp->transport_offset += size;
138                         bp->rp -= size;
139                         return bp;
140                 }
141
142                 PANIC_EXTRA(bp);
143                 if (bp->next)
144                         panic("padblock %p", getcallerpc(&bp));
145                 n = BLEN(bp);
146                 padblockcnt++;
147                 nbp = block_alloc(size + n, MEM_WAIT);
148                 block_copy_metadata(nbp, bp);
149                 nbp->rp += size;
150                 nbp->wp = nbp->rp;
151                 memmove(nbp->wp, bp->rp, n);
152                 nbp->wp += n;
153                 freeb(bp);
154                 nbp->rp -= size;
155         } else {
156                 size = -size;
157
158                 PANIC_EXTRA(bp);
159
160                 if (bp->next)
161                         panic("padblock %p", getcallerpc(&bp));
162
163                 if (bp->lim - bp->wp >= size)
164                         return bp;
165
166                 n = BLEN(bp);
167                 padblockcnt++;
168                 nbp = block_alloc(size + n, MEM_WAIT);
169                 block_copy_metadata(nbp, bp);
170                 memmove(nbp->wp, bp->rp, n);
171                 nbp->wp += n;
172                 freeb(bp);
173         }
174         QDEBUG checkb(nbp, "padblock 1");
175         return nbp;
176 }
177
178 /*
179  *  return count of bytes in a string of blocks
180  */
181 int blocklen(struct block *bp)
182 {
183         int len;
184
185         len = 0;
186         while (bp) {
187                 len += BLEN(bp);
188                 bp = bp->next;
189         }
190         return len;
191 }
192
193 /*
194  * return count of space in blocks
195  */
196 int blockalloclen(struct block *bp)
197 {
198         int len;
199
200         len = 0;
201         while (bp) {
202                 len += BALLOC(bp);
203                 bp = bp->next;
204         }
205         return len;
206 }
207
208 /*
209  *  copy the  string of blocks into
210  *  a single block and free the string
211  */
212 struct block *concatblock(struct block *bp)
213 {
214         int len;
215         struct block *nb, *f;
216
217         if (bp->next == 0)
218                 return bp;
219
220         /* probably use parts of qclone */
221         PANIC_EXTRA(bp);
222         nb = block_alloc(blocklen(bp), MEM_WAIT);
223         for (f = bp; f; f = f->next) {
224                 len = BLEN(f);
225                 memmove(nb->wp, f->rp, len);
226                 nb->wp += len;
227         }
228         concatblockcnt += BLEN(nb);
229         freeblist(bp);
230         QDEBUG checkb(nb, "concatblock 1");
231         return nb;
232 }
233
234 /* Makes an identical copy of the block, collapsing all the data into the block
235  * body.  It does not point to the contents of the original, it is a copy
236  * (unlike qclone).  Since we're copying, we might as well put the memory into
237  * one contiguous chunk. */
238 struct block *copyblock(struct block *bp, int mem_flags)
239 {
240         struct block *newb;
241         struct extra_bdata *ebd;
242         size_t amt;
243
244         QDEBUG checkb(bp, "copyblock 0");
245         newb = block_alloc(BLEN(bp), mem_flags);
246         if (!newb)
247                 return 0;
248         amt = copy_to_block_body(newb, bp->rp, BHLEN(bp));
249         assert(amt == BHLEN(bp));
250         for (int i = 0; i < bp->nr_extra_bufs; i++) {
251                 ebd = &bp->extra_data[i];
252                 if (!ebd->base || !ebd->len)
253                         continue;
254                 amt = copy_to_block_body(newb, (void*)ebd->base + ebd->off, ebd->len);
255                 assert(amt == ebd->len);
256         }
257         block_copy_metadata(newb, bp);
258         copyblockcnt++;
259         QDEBUG checkb(newb, "copyblock 1");
260         return newb;
261 }
262
263 /* Returns a block with the remaining contents of b all in the main body of the
264  * returned block.  Replace old references to b with the returned value (which
265  * may still be 'b', if no change was needed. */
266 struct block *linearizeblock(struct block *b)
267 {
268         struct block *newb;
269
270         if (!b->extra_len)
271                 return b;
272         newb = copyblock(b, MEM_WAIT);
273         freeb(b);
274         return newb;
275 }
276
277 /* Make sure the first block has at least n bytes in its main body.  Pulls up
278  * data from the *list* of blocks.  Returns 0 if there is not enough data in the
279  * block list. */
280 struct block *pullupblock(struct block *bp, int n)
281 {
282         int i, len, seglen;
283         struct block *nbp;
284         struct extra_bdata *ebd;
285
286         /*
287          *  this should almost always be true, it's
288          *  just to avoid every caller checking.
289          */
290         if (BHLEN(bp) >= n)
291                 return bp;
292
293         /* If there's no chance, just bail out now.  This might be slightly wasteful
294          * if there's a long blist that does have enough data. */
295         if (n > blocklen(bp))
296                 return 0;
297         /* a start at explicit main-body / header management */
298         if (bp->extra_len) {
299                 if (n > bp->lim - bp->rp) {
300                         /* would need to realloc a new block and copy everything over. */
301                         panic("can't pullup %d bytes, no place to put it: bp->lim %p, bp->rp %p, bp->lim-bp->rp %d\n",
302                                         n, bp->lim, bp->rp, bp->lim-bp->rp);
303                 }
304                 len = n - BHLEN(bp);
305                 /* Would need to recursively call this, or otherwise pull from later
306                  * blocks and put chunks of their data into the block we're building. */
307                 if (len > bp->extra_len)
308                         panic("pullup more than extra (%d, %d, %d)\n",
309                               n, BHLEN(bp), bp->extra_len);
310                 QDEBUG checkb(bp, "before pullup");
311                 for (int i = 0; (i < bp->nr_extra_bufs) && len; i++) {
312                         ebd = &bp->extra_data[i];
313                         if (!ebd->base || !ebd->len)
314                                 continue;
315                         seglen = MIN(ebd->len, len);
316                         memcpy(bp->wp, (void*)(ebd->base + ebd->off), seglen);
317                         bp->wp += seglen;
318                         len -= seglen;
319                         ebd->len -= seglen;
320                         ebd->off += seglen;
321                         bp->extra_len -= seglen;
322                         if (ebd->len == 0) {
323                                 kfree((void *)ebd->base);
324                                 ebd->off = 0;
325                                 ebd->base = 0;
326                         }
327                 }
328                 /* maybe just call pullupblock recursively here */
329                 if (len)
330                         panic("pullup %d bytes overdrawn\n", len);
331                 QDEBUG checkb(bp, "after pullup");
332                 return bp;
333         }
334
335         /*
336          *  if not enough room in the first block,
337          *  add another to the front of the list.
338          */
339         if (bp->lim - bp->rp < n) {
340                 nbp = block_alloc(n, MEM_WAIT);
341                 nbp->next = bp;
342                 bp = nbp;
343         }
344
345         /*
346          *  copy bytes from the trailing blocks into the first
347          */
348         n -= BLEN(bp);
349         while ((nbp = bp->next)) {
350                 i = BLEN(nbp);
351                 if (i > n) {
352                         memmove(bp->wp, nbp->rp, n);
353                         pullupblockcnt++;
354                         bp->wp += n;
355                         nbp->rp += n;
356                         QDEBUG checkb(bp, "pullupblock 1");
357                         return bp;
358                 } else {
359                         memmove(bp->wp, nbp->rp, i);
360                         pullupblockcnt++;
361                         bp->wp += i;
362                         bp->next = nbp->next;
363                         nbp->next = 0;
364                         freeb(nbp);
365                         n -= i;
366                         if (n == 0) {
367                                 QDEBUG checkb(bp, "pullupblock 2");
368                                 return bp;
369                         }
370                 }
371         }
372         freeb(bp);
373         return 0;
374 }
375
376 /*
377  *  make sure the first block has at least n bytes in its main body
378  */
379 struct block *pullupqueue(struct queue *q, int n)
380 {
381         struct block *b;
382
383         /* TODO: lock to protect the queue links? */
384         if ((BHLEN(q->bfirst) >= n))
385                 return q->bfirst;
386         q->bfirst = pullupblock(q->bfirst, n);
387         for (b = q->bfirst; b != NULL && b->next != NULL; b = b->next) ;
388         q->blast = b;
389         return q->bfirst;
390 }
391
392 /* throw away count bytes from the front of
393  * block's extradata.  Returns count of bytes
394  * thrown away
395  */
396
397 static int pullext(struct block *bp, int count)
398 {
399         struct extra_bdata *ed;
400         int i, rem, bytes = 0;
401
402         for (i = 0; bp->extra_len && count && i < bp->nr_extra_bufs; i++) {
403                 ed = &bp->extra_data[i];
404                 rem = MIN(count, ed->len);
405                 bp->extra_len -= rem;
406                 count -= rem;
407                 bytes += rem;
408                 ed->off += rem;
409                 ed->len -= rem;
410                 if (ed->len == 0) {
411                         kfree((void *)ed->base);
412                         ed->base = 0;
413                         ed->off = 0;
414                 }
415         }
416         return bytes;
417 }
418
419 /* throw away count bytes from the end of a
420  * block's extradata.  Returns count of bytes
421  * thrown away
422  */
423
424 static int dropext(struct block *bp, int count)
425 {
426         struct extra_bdata *ed;
427         int i, rem, bytes = 0;
428
429         for (i = bp->nr_extra_bufs - 1; bp->extra_len && count && i >= 0; i--) {
430                 ed = &bp->extra_data[i];
431                 rem = MIN(count, ed->len);
432                 bp->extra_len -= rem;
433                 count -= rem;
434                 bytes += rem;
435                 ed->len -= rem;
436                 if (ed->len == 0) {
437                         kfree((void *)ed->base);
438                         ed->base = 0;
439                         ed->off = 0;
440                 }
441         }
442         return bytes;
443 }
444
445 /*
446  *  throw away up to count bytes from a
447  *  list of blocks.  Return count of bytes
448  *  thrown away.
449  */
450 static int _pullblock(struct block **bph, int count, int free)
451 {
452         struct block *bp;
453         int n, bytes;
454
455         bytes = 0;
456         if (bph == NULL)
457                 return 0;
458
459         while (*bph != NULL && count != 0) {
460                 bp = *bph;
461
462                 n = MIN(BHLEN(bp), count);
463                 bytes += n;
464                 count -= n;
465                 bp->rp += n;
466                 n = pullext(bp, count);
467                 bytes += n;
468                 count -= n;
469                 QDEBUG checkb(bp, "pullblock ");
470                 if (BLEN(bp) == 0 && (free || count)) {
471                         *bph = bp->next;
472                         bp->next = NULL;
473                         freeb(bp);
474                 }
475         }
476         return bytes;
477 }
478
479 int pullblock(struct block **bph, int count)
480 {
481         return _pullblock(bph, count, 1);
482 }
483
484 /*
485  *  trim to len bytes starting at offset
486  */
487 struct block *trimblock(struct block *bp, int offset, int len)
488 {
489         uint32_t l, trim;
490         int olen = len;
491
492         QDEBUG checkb(bp, "trimblock 1");
493         if (blocklen(bp) < offset + len) {
494                 freeblist(bp);
495                 return NULL;
496         }
497
498         l =_pullblock(&bp, offset, 0);
499         if (bp == NULL)
500                 return NULL;
501         if (l != offset) {
502                 freeblist(bp);
503                 return NULL;
504         }
505
506         while ((l = BLEN(bp)) < len) {
507                 len -= l;
508                 bp = bp->next;
509         }
510
511         trim = BLEN(bp) - len;
512         trim -= dropext(bp, trim);
513         bp->wp -= trim;
514
515         if (bp->next) {
516                 freeblist(bp->next);
517                 bp->next = NULL;
518         }
519         return bp;
520 }
521
522 /* Adjust block @bp so that its size is exactly @len.
523  * If the size is increased, fill in the new contents with zeros.
524  * If the size is decreased, discard some of the old contents at the tail. */
525 struct block *adjustblock(struct block *bp, int len)
526 {
527         struct extra_bdata *ebd;
528         void *buf;
529         int i;
530
531         if (len < 0) {
532                 freeb(bp);
533                 return NULL;
534         }
535
536         if (len == BLEN(bp))
537                 return bp;
538
539         /* Shrink within block main body. */
540         if (len <= BHLEN(bp)) {
541                 free_block_extra(bp);
542                 bp->wp = bp->rp + len;
543                 QDEBUG checkb(bp, "adjustblock 1");
544                 return bp;
545         }
546
547         /* Need to grow. */
548         if (len > BLEN(bp)) {
549                 /* Grow within block main body. */
550                 if (bp->extra_len == 0 && bp->rp + len <= bp->lim) {
551                         memset(bp->wp, 0, len - BLEN(bp));
552                         bp->wp = bp->rp + len;
553                         QDEBUG checkb(bp, "adjustblock 2");
554                         return bp;
555                 }
556                 /* Grow with extra data buffers. */
557                 buf = kzmalloc(len - BLEN(bp), MEM_WAIT);
558                 block_append_extra(bp, (uintptr_t)buf, 0, len - BLEN(bp), MEM_WAIT);
559                 QDEBUG checkb(bp, "adjustblock 3");
560                 return bp;
561         }
562
563         /* Shrink extra data buffers.
564          * len is how much of ebd we need to keep.
565          * extra_len is re-accumulated. */
566         assert(bp->extra_len > 0);
567         len -= BHLEN(bp);
568         bp->extra_len = 0;
569         for (i = 0; i < bp->nr_extra_bufs; i++) {
570                 ebd = &bp->extra_data[i];
571                 if (len <= ebd->len)
572                         break;
573                 len -= ebd->len;
574                 bp->extra_len += ebd->len;
575         }
576         /* If len becomes zero, extra_data[i] should be freed. */
577         if (len > 0) {
578                 ebd = &bp->extra_data[i];
579                 ebd->len = len;
580                 bp->extra_len += ebd->len;
581                 i++;
582         }
583         for (; i < bp->nr_extra_bufs; i++) {
584                 ebd = &bp->extra_data[i];
585                 if (ebd->base)
586                         kfree((void*)ebd->base);
587                 ebd->base = ebd->off = ebd->len = 0;
588         }
589         QDEBUG checkb(bp, "adjustblock 4");
590         return bp;
591 }
592
593 /* Helper: removes and returns the first block from q */
594 static struct block *pop_first_block(struct queue *q)
595 {
596         struct block *b = q->bfirst;
597
598         q->dlen -= BLEN(b);
599         q->bytes_read += BLEN(b);
600         q->bfirst = b->next;
601         b->next = 0;
602         return b;
603 }
604
605 /* Helper: copies up to copy_amt from a buf to a block's main body (b->wp) */
606 static size_t copy_to_block_body(struct block *to, void *from, size_t copy_amt)
607 {
608         copy_amt = MIN(to->lim - to->wp, copy_amt);
609         memcpy(to->wp, from, copy_amt);
610         to->wp += copy_amt;
611         return copy_amt;
612 }
613
614 /* Accounting helper.  Block b in q lost amt extra_data */
615 static void block_and_q_lost_extra(struct block *b, struct queue *q, size_t amt)
616 {
617         b->extra_len -= amt;
618         q->dlen -= amt;
619         q->bytes_read += amt;
620 }
621
622 /* Helper: moves ebd from a block (in from_q) to another block.  The *ebd is
623  * fixed in 'from', so we move its contents and zero it out in 'from'.
624  *
625  * Returns the length moved (0 on failure). */
626 static size_t move_ebd(struct extra_bdata *ebd, struct block *to,
627                        struct block *from, struct queue *from_q)
628 {
629         size_t ret = ebd->len;
630
631         if (block_append_extra(to, ebd->base, ebd->off, ebd->len, MEM_ATOMIC))
632                 return 0;
633         block_and_q_lost_extra(from, from_q, ebd->len);
634         ebd->base = ebd->len = ebd->off = 0;
635         return ret;
636 }
637
638 /* Copy up to len bytes from q->bfirst to @to, leaving the block in place.  May
639  * return with less than len, but greater than 0, even if there is more
640  * available in q.
641  *
642  * At any moment that we have copied anything and things are tricky, we can just
643  * return.  The trickiness comes from a bunch of variables: is the main body
644  * empty?  How do we split the ebd?  If our alloc fails, then we can fall back
645  * to @to's main body, but only if we haven't used it yet. */
646 static size_t copy_from_first_block(struct queue *q, struct block *to,
647                                     size_t len)
648 {
649         struct block *from = q->bfirst;
650         size_t copy_amt, amt;
651         struct extra_bdata *ebd;
652
653         assert(len < BLEN(from));       /* sanity */
654         /* Try to extract from the main body */
655         copy_amt = MIN(BHLEN(from), len);
656         if (copy_amt) {
657                 copy_amt = copy_to_block_body(to, from->rp, copy_amt);
658                 from->rp += copy_amt;
659                 /* We only change dlen, (data len), not q->len, since the q still has
660                  * the same block memory allocation (no kfrees happened) */
661                 q->dlen -= copy_amt;
662                 q->bytes_read += copy_amt;
663         }
664         /* Try to extract the remainder from the extra data */
665         len -= copy_amt;
666         for (int i = 0; (i < from->nr_extra_bufs) && len; i++) {
667                 ebd = &from->extra_data[i];
668                 if (!ebd->base || !ebd->len)
669                         continue;
670                 if (len >= ebd->len) {
671                         amt = move_ebd(ebd, to, from, q);
672                         if (!amt) {
673                                 /* our internal alloc could have failed.   this ebd is now the
674                                  * last one we'll consider.  let's handle it separately and put
675                                  * it in the main body. */
676                                 if (copy_amt)
677                                         return copy_amt;
678                                 copy_amt = copy_to_block_body(to, (void*)ebd->base + ebd->off,
679                                                               ebd->len);
680                                 block_and_q_lost_extra(from, q, copy_amt);
681                                 break;
682                         }
683                         len -= amt;
684                         copy_amt += amt;
685                         continue;
686                 } else {
687                         /* If we're here, we reached our final ebd, which we'll need to
688                          * split to get anything from it. */
689                         if (copy_amt)
690                                 return copy_amt;
691                         copy_amt = copy_to_block_body(to, (void*)ebd->base + ebd->off,
692                                                       len);
693                         ebd->off += copy_amt;
694                         ebd->len -= copy_amt;
695                         block_and_q_lost_extra(from, q, copy_amt);
696                         break;
697                 }
698         }
699         if (len)
700                 assert(copy_amt);       /* sanity */
701         return copy_amt;
702 }
703
704 /* Return codes for __qbread and __try_qbread. */
705 enum {
706         QBR_OK,
707         QBR_FAIL,
708         QBR_SPARE,      /* we need a spare block */
709         QBR_AGAIN,      /* do it again, we are coalescing blocks */
710 };
711
712 /* Helper and back-end for __qbread: extracts and returns a list of blocks
713  * containing up to len bytes.  It may contain less than len even if q has more
714  * data.
715  *
716  * Returns a code interpreted by __qbread, and the returned blist in ret. */
717 static int __try_qbread(struct queue *q, size_t len, int qio_flags,
718                         struct block **real_ret, struct block *spare)
719 {
720         struct block *ret, *ret_last, *first;
721         size_t blen;
722         bool was_unwritable = FALSE;
723
724         if (qio_flags & QIO_CAN_ERR_SLEEP) {
725                 if (!qwait_and_ilock(q, qio_flags)) {
726                         spin_unlock_irqsave(&q->lock);
727                         return QBR_FAIL;
728                 }
729                 /* we qwaited and still hold the lock, so the q is not empty */
730                 first = q->bfirst;
731         } else {
732                 spin_lock_irqsave(&q->lock);
733                 first = q->bfirst;
734                 if (!first) {
735                         spin_unlock_irqsave(&q->lock);
736                         return QBR_FAIL;
737                 }
738         }
739         /* We need to check before adjusting q->len.  We're checking the writer's
740          * sleep condition / tap condition.  When set, we *might* be making an edge
741          * transition (from unwritable to writable), which needs to wake and fire
742          * taps.  But, our read might not drain the queue below q->lim.  We'll check
743          * again later to see if we should really wake them.  */
744         was_unwritable = !qwritable(q);
745         blen = BLEN(first);
746         if ((q->state & Qcoalesce) && (blen == 0)) {
747                 freeb(pop_first_block(q));
748                 spin_unlock_irqsave(&q->lock);
749                 /* Need to retry to make sure we have a first block */
750                 return QBR_AGAIN;
751         }
752         /* Qmsg: just return the first block.  Be careful, since our caller might
753          * not read all of the block and thus drop bytes.  Similar to SOCK_DGRAM. */
754         if (q->state & Qmsg) {
755                 ret = pop_first_block(q);
756                 goto out_ok;
757         }
758         /* Let's get at least something first - makes the code easier.  This way,
759          * we'll only ever split the block once. */
760         if (blen <= len) {
761                 ret = pop_first_block(q);
762                 len -= blen;
763         } else {
764                 /* need to split the block.  we won't actually take the first block out
765                  * of the queue - we're just extracting a little bit. */
766                 if (!spare) {
767                         /* We have nothing and need a spare block.  Retry! */
768                         spin_unlock_irqsave(&q->lock);
769                         return QBR_SPARE;
770                 }
771                 copy_from_first_block(q, spare, len);
772                 ret = spare;
773                 goto out_ok;
774         }
775         /* At this point, we just grabbed the first block.  We can try to grab some
776          * more, up to len (if they want). */
777         if (qio_flags & QIO_JUST_ONE_BLOCK)
778                 goto out_ok;
779         ret_last = ret;
780         while (q->bfirst && (len > 0)) {
781                 blen = BLEN(q->bfirst);
782                 if ((q->state & Qcoalesce) && (blen == 0)) {
783                         /* remove the intermediate 0 blocks */
784                         freeb(pop_first_block(q));
785                         continue;
786                 }
787                 if (blen > len) {
788                         /* We could try to split the block, but that's a huge pain.  For
789                          * instance, we might need to move the main body of b into an
790                          * extra_data of ret_last.  lots of ways for that to fail, and lots
791                          * of cases to consider.  Easier to just bail out.  This is why I
792                          * did the first block above: we don't need to worry about this. */
793                          break;
794                 }
795                 ret_last->next = pop_first_block(q);
796                 ret_last = ret_last->next;
797                 len -= blen;
798         }
799 out_ok:
800         /* Don't wake them up or fire tap if we didn't drain enough. */
801         if (!qwritable(q))
802                 was_unwritable = FALSE;
803         spin_unlock_irqsave(&q->lock);
804         if (was_unwritable) {
805                 if (q->kick && !(qio_flags & QIO_DONT_KICK))
806                         q->kick(q->arg);
807                 rendez_wakeup(&q->wr);
808                 qwake_cb(q, FDTAP_FILT_WRITABLE);
809         }
810         *real_ret = ret;
811         return QBR_OK;
812 }
813
814 /* Helper and front-end for __try_qbread: extracts and returns a list of blocks
815  * containing up to len bytes.  It may contain less than len even if q has more
816  * data.
817  *
818  * Returns 0 if the q is closed, if it would require blocking and !CAN_BLOCK, or
819  * if it required a spare and the memory allocation failed.
820  *
821  * Technically, there's a weird corner case with !Qcoalesce and Qmsg where you
822  * could get a zero length block back. */
823 static struct block *__qbread(struct queue *q, size_t len, int qio_flags,
824                               int mem_flags)
825 {
826         ERRSTACK(1);
827         struct block *ret = 0;
828         struct block *volatile spare = 0;       /* volatile for the waserror */
829
830         /* __try_qbread can throw, based on qio flags. */
831         if ((qio_flags & QIO_CAN_ERR_SLEEP) && waserror()) {
832                 if (spare)
833                         freeb(spare);
834                 nexterror();
835         }
836         while (1) {
837                 switch (__try_qbread(q, len, qio_flags, &ret, spare)) {
838                 case QBR_OK:
839                 case QBR_FAIL:
840                         if (spare && (ret != spare))
841                                 freeb(spare);
842                         goto out_ret;
843                 case QBR_SPARE:
844                         assert(!spare);
845                         /* Due to some nastiness, we need a fresh block so we can read out
846                          * anything from the queue.  'len' seems like a reasonable amount.
847                          * Maybe we can get away with less. */
848                         spare = block_alloc(len, mem_flags);
849                         if (!spare) {
850                                 /* Careful here: a memory failure (possible with MEM_ATOMIC)
851                                  * could look like 'no data in the queue' (QBR_FAIL).  The only
852                                  * one who does is this qget(), who happens to know that we
853                                  * won't need a spare, due to the len argument.  Spares are only
854                                  * needed when we need to split a block. */
855                                 ret = 0;
856                                 goto out_ret;
857                         }
858                         break;
859                 case QBR_AGAIN:
860                         /* if the first block is 0 and we are Qcoalesce, then we'll need to
861                          * try again.  We bounce out of __try so we can perform the "is
862                          * there a block" logic again from the top. */
863                         break;
864                 }
865         }
866         assert(0);
867 out_ret:
868         if (qio_flags & QIO_CAN_ERR_SLEEP)
869                 poperror();
870         return ret;
871 }
872
873 /*
874  *  get next block from a queue, return null if nothing there
875  */
876 struct block *qget(struct queue *q)
877 {
878         /* since len == SIZE_MAX, we should never need to do a mem alloc */
879         return __qbread(q, SIZE_MAX, QIO_JUST_ONE_BLOCK, MEM_ATOMIC);
880 }
881
882 /* Throw away the next 'len' bytes in the queue returning the number actually
883  * discarded.
884  *
885  * If the bytes are in the queue, then they must be discarded.  The only time to
886  * return less than len is if the q itself has less than len bytes.
887  *
888  * This won't trigger a kick when waking up any sleepers.  This seems to be Plan
889  * 9's intent, since the TCP stack will deadlock if qdiscard kicks. */
890 size_t qdiscard(struct queue *q, size_t len)
891 {
892         struct block *blist;
893         size_t removed_amt;
894         size_t sofar = 0;
895
896         /* This is racy.  There could be multiple qdiscarders or other consumers,
897          * where the consumption could be interleaved. */
898         while (qlen(q) && len) {
899                 blist = __qbread(q, len, QIO_DONT_KICK, MEM_WAIT);
900                 removed_amt = freeblist(blist);
901                 sofar += removed_amt;
902                 len -= removed_amt;
903         }
904         return sofar;
905 }
906
907 ssize_t qpass(struct queue *q, struct block *b)
908 {
909         return __qbwrite(q, b, QIO_LIMIT | QIO_DROP_OVERFLOW);
910 }
911
912 ssize_t qpassnolim(struct queue *q, struct block *b)
913 {
914         return __qbwrite(q, b, 0);
915 }
916
917 /*
918  *  if the allocated space is way out of line with the used
919  *  space, reallocate to a smaller block
920  */
921 struct block *packblock(struct block *bp)
922 {
923         struct block **l, *nbp;
924         int n;
925
926         if (bp->extra_len)
927                 return bp;
928         for (l = &bp; *l; l = &(*l)->next) {
929                 nbp = *l;
930                 n = BLEN(nbp);
931                 if ((n << 2) < BALLOC(nbp)) {
932                         *l = block_alloc(n, MEM_WAIT);
933                         memmove((*l)->wp, nbp->rp, n);
934                         (*l)->wp += n;
935                         (*l)->next = nbp->next;
936                         freeb(nbp);
937                 }
938         }
939
940         return bp;
941 }
942
943 /* Add an extra_data entry to newb at newb_idx pointing to b's body, starting at
944  * body_rp, for up to len.  Returns the len consumed.
945  *
946  * The base is 'b', so that we can kfree it later.  This currently ties us to
947  * using kfree for the release method for all extra_data.
948  *
949  * It is possible to have a body size that is 0, if there is no offset, and
950  * b->wp == b->rp.  This will have an extra data entry of 0 length. */
951 static size_t point_to_body(struct block *b, uint8_t *body_rp,
952                             struct block *newb, unsigned int newb_idx,
953                             size_t len)
954 {
955         struct extra_bdata *ebd = &newb->extra_data[newb_idx];
956
957         assert(newb_idx < newb->nr_extra_bufs);
958
959         kmalloc_incref(b);
960         ebd->base = (uintptr_t)b;
961         ebd->off = (uint32_t)(body_rp - (uint8_t*)b);
962         ebd->len = MIN(b->wp - body_rp, len);   /* think of body_rp as b->rp */
963         assert((int)ebd->len >= 0);
964         newb->extra_len += ebd->len;
965         return ebd->len;
966 }
967
968 /* Add an extra_data entry to newb at newb_idx pointing to b's b_idx'th
969  * extra_data buf, at b_off within that buffer, for up to len.  Returns the len
970  * consumed.
971  *
972  * We can have blocks with 0 length, but they are still refcnt'd.  See above. */
973 static size_t point_to_buf(struct block *b, unsigned int b_idx, uint32_t b_off,
974                            struct block *newb, unsigned int newb_idx,
975                            size_t len)
976 {
977         struct extra_bdata *n_ebd = &newb->extra_data[newb_idx];
978         struct extra_bdata *b_ebd = &b->extra_data[b_idx];
979
980         assert(b_idx < b->nr_extra_bufs);
981         assert(newb_idx < newb->nr_extra_bufs);
982
983         kmalloc_incref((void*)b_ebd->base);
984         n_ebd->base = b_ebd->base;
985         n_ebd->off = b_ebd->off + b_off;
986         n_ebd->len = MIN(b_ebd->len - b_off, len);
987         newb->extra_len += n_ebd->len;
988         return n_ebd->len;
989 }
990
991 /* given a string of blocks, sets up the new block's extra_data such that it
992  * *points* to the contents of the blist [offset, len + offset).  This does not
993  * make a separate copy of the contents of the blist.
994  *
995  * returns 0 on success.  the only failure is if the extra_data array was too
996  * small, so this returns a positive integer saying how big the extra_data needs
997  * to be.
998  *
999  * callers are responsible for protecting the list structure. */
1000 static int __blist_clone_to(struct block *blist, struct block *newb, int len,
1001                             uint32_t offset)
1002 {
1003         struct block *b, *first;
1004         unsigned int nr_bufs = 0;
1005         unsigned int b_idx, newb_idx = 0;
1006         uint8_t *first_main_body = 0;
1007         ssize_t sofar = 0;
1008
1009         /* find the first block; keep offset relative to the latest b in the list */
1010         for (b = blist; b; b = b->next) {
1011                 if (BLEN(b) > offset)
1012                         break;
1013                 offset -= BLEN(b);
1014         }
1015         /* qcopy semantics: if you asked for an offset outside the block list, you
1016          * get an empty block back */
1017         if (!b)
1018                 return 0;
1019         first = b;
1020         sofar -= offset;        /* don't count the remaining offset in the first b */
1021         /* upper bound for how many buffers we'll need in newb */
1022         for (/* b is set*/; b; b = b->next) {
1023                 nr_bufs += BHLEN(b) ? 1 : 0;
1024                 nr_bufs += b->nr_extra_bufs; /* still assuming nr_extra == nr_valid */
1025                 sofar += BLEN(b);
1026                 if (sofar > len)
1027                         break;
1028         }
1029         /* we might be holding a spinlock here, so we won't wait for kmalloc */
1030         if (block_add_extd(newb, nr_bufs, 0) != 0) {
1031                 /* caller will need to alloc these, then re-call us */
1032                 return nr_bufs;
1033         }
1034         for (b = first; b && len; b = b->next) {
1035                 b_idx = 0;
1036                 if (offset) {
1037                         if (offset < BHLEN(b)) {
1038                                 /* off is in the main body */
1039                                 len -= point_to_body(b, b->rp + offset, newb, newb_idx, len);
1040                                 newb_idx++;
1041                         } else {
1042                                 /* off is in one of the buffers (or just past the last one).
1043                                  * we're not going to point to b's main body at all. */
1044                                 offset -= BHLEN(b);
1045                                 assert(b->extra_data);
1046                                 /* assuming these extrabufs are packed, or at least that len
1047                                  * isn't gibberish */
1048                                 while (b->extra_data[b_idx].len <= offset) {
1049                                         offset -= b->extra_data[b_idx].len;
1050                                         b_idx++;
1051                                 }
1052                                 /* now offset is set to our offset in the b_idx'th buf */
1053                                 len -= point_to_buf(b, b_idx, offset, newb, newb_idx, len);
1054                                 newb_idx++;
1055                                 b_idx++;
1056                         }
1057                         offset = 0;
1058                 } else {
1059                         if (BHLEN(b)) {
1060                                 len -= point_to_body(b, b->rp, newb, newb_idx, len);
1061                                 newb_idx++;
1062                         }
1063                 }
1064                 /* knock out all remaining bufs.  we only did one point_to_ op by now,
1065                  * and any point_to_ could be our last if it consumed all of len. */
1066                 for (int i = b_idx; (i < b->nr_extra_bufs) && len; i++) {
1067                         len -= point_to_buf(b, i, 0, newb, newb_idx, len);
1068                         newb_idx++;
1069                 }
1070         }
1071         return 0;
1072 }
1073
1074 struct block *blist_clone(struct block *blist, int header_len, int len,
1075                           uint32_t offset)
1076 {
1077         int ret;
1078         struct block *newb = block_alloc(header_len, MEM_WAIT);
1079         do {
1080                 ret = __blist_clone_to(blist, newb, len, offset);
1081                 if (ret)
1082                         block_add_extd(newb, ret, MEM_WAIT);
1083         } while (ret);
1084         return newb;
1085 }
1086
1087 /* given a queue, makes a single block with header_len reserved space in the
1088  * block main body, and the contents of [offset, len + offset) pointed to in the
1089  * new blocks ext_data.  This does not make a copy of the q's contents, though
1090  * you do have a ref count on the memory. */
1091 struct block *qclone(struct queue *q, int header_len, int len, uint32_t offset)
1092 {
1093         int ret;
1094         struct block *newb = block_alloc(header_len, MEM_WAIT);
1095         /* the while loop should rarely be used: it would require someone
1096          * concurrently adding to the queue. */
1097         do {
1098                 /* TODO: RCU: protecting the q list (b->next) (need read lock) */
1099                 spin_lock_irqsave(&q->lock);
1100                 ret = __blist_clone_to(q->bfirst, newb, len, offset);
1101                 spin_unlock_irqsave(&q->lock);
1102                 if (ret)
1103                         block_add_extd(newb, ret, MEM_WAIT);
1104         } while (ret);
1105         return newb;
1106 }
1107
1108 /*
1109  *  copy from offset in the queue
1110  */
1111 struct block *qcopy_old(struct queue *q, int len, uint32_t offset)
1112 {
1113         int sofar;
1114         int n;
1115         struct block *b, *nb;
1116         uint8_t *p;
1117
1118         nb = block_alloc(len, MEM_WAIT);
1119
1120         spin_lock_irqsave(&q->lock);
1121
1122         /* go to offset */
1123         b = q->bfirst;
1124         for (sofar = 0;; sofar += n) {
1125                 if (b == NULL) {
1126                         spin_unlock_irqsave(&q->lock);
1127                         return nb;
1128                 }
1129                 n = BLEN(b);
1130                 if (sofar + n > offset) {
1131                         p = b->rp + offset - sofar;
1132                         n -= offset - sofar;
1133                         break;
1134                 }
1135                 QDEBUG checkb(b, "qcopy");
1136                 b = b->next;
1137         }
1138
1139         /* copy bytes from there */
1140         for (sofar = 0; sofar < len;) {
1141                 if (n > len - sofar)
1142                         n = len - sofar;
1143                 PANIC_EXTRA(b);
1144                 memmove(nb->wp, p, n);
1145                 qcopycnt += n;
1146                 sofar += n;
1147                 nb->wp += n;
1148                 b = b->next;
1149                 if (b == NULL)
1150                         break;
1151                 n = BLEN(b);
1152                 p = b->rp;
1153         }
1154         spin_unlock_irqsave(&q->lock);
1155
1156         return nb;
1157 }
1158
1159 struct block *qcopy(struct queue *q, int len, uint32_t offset)
1160 {
1161 #ifdef CONFIG_BLOCK_EXTRAS
1162         return qclone(q, 0, len, offset);
1163 #else
1164         return qcopy_old(q, len, offset);
1165 #endif
1166 }
1167
1168 static void qinit_common(struct queue *q)
1169 {
1170         spinlock_init_irqsave(&q->lock);
1171         rendez_init(&q->rr);
1172         rendez_init(&q->wr);
1173 }
1174
1175 /*
1176  *  called by non-interrupt code
1177  */
1178 struct queue *qopen(int limit, int msg, void (*kick) (void *), void *arg)
1179 {
1180         struct queue *q;
1181
1182         q = kzmalloc(sizeof(struct queue), 0);
1183         if (q == 0)
1184                 return 0;
1185         qinit_common(q);
1186
1187         q->limit = q->inilim = limit;
1188         q->kick = kick;
1189         q->arg = arg;
1190         q->state = msg;
1191         q->eof = 0;
1192
1193         return q;
1194 }
1195
1196 /* open a queue to be bypassed */
1197 struct queue *qbypass(void (*bypass) (void *, struct block *), void *arg)
1198 {
1199         struct queue *q;
1200
1201         q = kzmalloc(sizeof(struct queue), 0);
1202         if (q == 0)
1203                 return 0;
1204         qinit_common(q);
1205
1206         q->limit = 0;
1207         q->arg = arg;
1208         q->bypass = bypass;
1209         q->state = 0;
1210
1211         return q;
1212 }
1213
1214 static int notempty(void *a)
1215 {
1216         struct queue *q = a;
1217
1218         return (q->state & Qclosed) || q->bfirst != 0;
1219 }
1220
1221 /* Block, waiting for the queue to be non-empty or closed.  Returns with
1222  * the spinlock held.  Returns TRUE when there queue is not empty, FALSE if it
1223  * was naturally closed.  Throws an error o/w. */
1224 static bool qwait_and_ilock(struct queue *q, int qio_flags)
1225 {
1226         while (1) {
1227                 spin_lock_irqsave(&q->lock);
1228                 if (q->bfirst != NULL)
1229                         return TRUE;
1230                 if (q->state & Qclosed) {
1231                         if (++q->eof > 3) {
1232                                 spin_unlock_irqsave(&q->lock);
1233                                 error(EPIPE, "multiple reads on a closed queue");
1234                         }
1235                         if (q->err[0]) {
1236                                 spin_unlock_irqsave(&q->lock);
1237                                 error(EPIPE, q->err);
1238                         }
1239                         return FALSE;
1240                 }
1241                 if (qio_flags & QIO_NON_BLOCK) {
1242                         spin_unlock_irqsave(&q->lock);
1243                         error(EAGAIN, "queue empty");
1244                 }
1245                 spin_unlock_irqsave(&q->lock);
1246                 /* As with the producer side, we check for a condition while holding the
1247                  * q->lock, decide to sleep, then unlock.  It's like the "check, signal,
1248                  * check again" pattern, but we do it conditionally.  Both sides agree
1249                  * synchronously to do it, and those decisions are made while holding
1250                  * q->lock.  I think this is OK.
1251                  *
1252                  * The invariant is that no reader sleeps when the queue has data.
1253                  * While holding the rendez lock, if we see there's no data, we'll
1254                  * sleep.  Since we saw there was no data, the next writer will see (or
1255                  * already saw) no data, and then the writer decides to rendez_wake,
1256                  * which will grab the rendez lock.  If the writer already did that,
1257                  * then we'll see notempty when we do our check-again. */
1258                 rendez_sleep(&q->rr, notempty, q);
1259         }
1260 }
1261
1262 /*
1263  * add a block list to a queue
1264  * XXX basically the same as enqueue blist, and has no locking!
1265  */
1266 void qaddlist(struct queue *q, struct block *b)
1267 {
1268         /* TODO: q lock? */
1269         /* queue the block */
1270         if (q->bfirst)
1271                 q->blast->next = b;
1272         else
1273                 q->bfirst = b;
1274         q->dlen += blocklen(b);
1275         while (b->next)
1276                 b = b->next;
1277         q->blast = b;
1278 }
1279
1280 static size_t read_from_block(struct block *b, uint8_t *to, size_t amt)
1281 {
1282         size_t copy_amt, retval = 0;
1283         struct extra_bdata *ebd;
1284
1285         copy_amt = MIN(BHLEN(b), amt);
1286         memcpy(to, b->rp, copy_amt);
1287         /* advance the rp, since this block might not be completely consumed and
1288          * future reads need to know where to pick up from */
1289         b->rp += copy_amt;
1290         to += copy_amt;
1291         amt -= copy_amt;
1292         retval += copy_amt;
1293         for (int i = 0; (i < b->nr_extra_bufs) && amt; i++) {
1294                 ebd = &b->extra_data[i];
1295                 /* skip empty entires.  if we track this in the struct block, we can
1296                  * just start the for loop early */
1297                 if (!ebd->base || !ebd->len)
1298                         continue;
1299                 copy_amt = MIN(ebd->len, amt);
1300                 memcpy(to, (void*)(ebd->base + ebd->off), copy_amt);
1301                 /* we're actually consuming the entries, just like how we advance rp up
1302                  * above, and might only consume part of one. */
1303                 ebd->len -= copy_amt;
1304                 ebd->off += copy_amt;
1305                 b->extra_len -= copy_amt;
1306                 if (!ebd->len) {
1307                         /* we don't actually have to decref here.  it's also done in
1308                          * freeb().  this is the earliest we can free. */
1309                         kfree((void*)ebd->base);
1310                         ebd->base = ebd->off = 0;
1311                 }
1312                 to += copy_amt;
1313                 amt -= copy_amt;
1314                 retval += copy_amt;
1315         }
1316         return retval;
1317 }
1318
1319 /*
1320  *  copy the contents of a string of blocks into
1321  *  memory.  emptied blocks are freed.  return
1322  *  pointer to first unconsumed block.
1323  */
1324 struct block *bl2mem(uint8_t * p, struct block *b, int n)
1325 {
1326         int i;
1327         struct block *next;
1328
1329         /* could be slicker here, since read_from_block is smart */
1330         for (; b != NULL; b = next) {
1331                 i = BLEN(b);
1332                 if (i > n) {
1333                         /* partial block, consume some */
1334                         read_from_block(b, p, n);
1335                         return b;
1336                 }
1337                 /* full block, consume all and move on */
1338                 i = read_from_block(b, p, i);
1339                 n -= i;
1340                 p += i;
1341                 next = b->next;
1342                 freeb(b);
1343         }
1344         return NULL;
1345 }
1346
1347 /* Extract the contents of all blocks and copy to va, up to len.  Returns the
1348  * actual amount copied. */
1349 static size_t read_all_blocks(struct block *b, void *va, size_t len)
1350 {
1351         size_t sofar = 0;
1352         struct block *next;
1353
1354         do {
1355                 assert(va);
1356                 assert(b->rp);
1357                 sofar += read_from_block(b, va + sofar, len - sofar);
1358                 if (BLEN(b) && b->next)
1359                         panic("Failed to drain entire block (Qmsg?) but had a next!");
1360                 next = b->next;
1361                 freeb(b);
1362                 b = next;
1363         } while (b);
1364         return sofar;
1365 }
1366
1367 /*
1368  *  copy the contents of memory into a string of blocks.
1369  *  return NULL on error.
1370  */
1371 struct block *mem2bl(uint8_t * p, int len)
1372 {
1373         ERRSTACK(1);
1374         int n;
1375         struct block *b, *first, **l;
1376
1377         first = NULL;
1378         l = &first;
1379         if (waserror()) {
1380                 freeblist(first);
1381                 nexterror();
1382         }
1383         do {
1384                 n = len;
1385                 if (n > Maxatomic)
1386                         n = Maxatomic;
1387
1388                 *l = b = block_alloc(n, MEM_WAIT);
1389                 /* TODO consider extra_data */
1390                 memmove(b->wp, p, n);
1391                 b->wp += n;
1392                 p += n;
1393                 len -= n;
1394                 l = &b->next;
1395         } while (len > 0);
1396         poperror();
1397
1398         return first;
1399 }
1400
1401 /*
1402  *  put a block back to the front of the queue
1403  *  called with q ilocked
1404  */
1405 void qputback(struct queue *q, struct block *b)
1406 {
1407         b->next = q->bfirst;
1408         if (q->bfirst == NULL)
1409                 q->blast = b;
1410         q->bfirst = b;
1411         q->dlen += BLEN(b);
1412         /* qputback seems to undo a read, so we can undo the accounting too. */
1413         q->bytes_read -= BLEN(b);
1414 }
1415
1416 /*
1417  *  get next block from a queue (up to a limit)
1418  *
1419  */
1420 struct block *qbread(struct queue *q, size_t len)
1421 {
1422         return __qbread(q, len, QIO_JUST_ONE_BLOCK | QIO_CAN_ERR_SLEEP, MEM_WAIT);
1423 }
1424
1425 struct block *qbread_nonblock(struct queue *q, size_t len)
1426 {
1427         return __qbread(q, len, QIO_JUST_ONE_BLOCK | QIO_CAN_ERR_SLEEP |
1428                         QIO_NON_BLOCK, MEM_WAIT);
1429 }
1430
1431 /* read up to len from a queue into vp. */
1432 size_t qread(struct queue *q, void *va, size_t len)
1433 {
1434         struct block *blist = __qbread(q, len, QIO_CAN_ERR_SLEEP, MEM_WAIT);
1435
1436         if (!blist)
1437                 return 0;
1438         return read_all_blocks(blist, va, len);
1439 }
1440
1441 size_t qread_nonblock(struct queue *q, void *va, size_t len)
1442 {
1443         struct block *blist = __qbread(q, len, QIO_CAN_ERR_SLEEP | QIO_NON_BLOCK,
1444                                        MEM_WAIT);
1445
1446         if (!blist)
1447                 return 0;
1448         return read_all_blocks(blist, va, len);
1449 }
1450
1451 /* This is the rendez wake condition for writers. */
1452 static int qwriter_should_wake(void *a)
1453 {
1454         struct queue *q = a;
1455
1456         return qwritable(q) || (q->state & Qclosed);
1457 }
1458
1459 /* Helper: enqueues a list of blocks to a queue.  Returns the total length. */
1460 static size_t enqueue_blist(struct queue *q, struct block *b)
1461 {
1462         size_t dlen;
1463
1464         if (q->bfirst)
1465                 q->blast->next = b;
1466         else
1467                 q->bfirst = b;
1468         dlen = BLEN(b);
1469         while (b->next) {
1470                 b = b->next;
1471                 dlen += BLEN(b);
1472         }
1473         q->blast = b;
1474         q->dlen += dlen;
1475         return dlen;
1476 }
1477
1478 /* Adds block (which can be a list of blocks) to the queue, subject to
1479  * qio_flags.  Returns the length written on success or -1 on non-throwable
1480  * error.  Adjust qio_flags to control the value-added features!. */
1481 static ssize_t __qbwrite(struct queue *q, struct block *b, int qio_flags)
1482 {
1483         ssize_t ret;
1484         bool was_unreadable;
1485
1486         if (q->bypass) {
1487                 ret = blocklen(b);
1488                 (*q->bypass) (q->arg, b);
1489                 return ret;
1490         }
1491         spin_lock_irqsave(&q->lock);
1492         was_unreadable = q->dlen == 0;
1493         if (q->state & Qclosed) {
1494                 spin_unlock_irqsave(&q->lock);
1495                 freeblist(b);
1496                 if (!(qio_flags & QIO_CAN_ERR_SLEEP))
1497                         return -1;
1498                 if (q->err[0])
1499                         error(EPIPE, q->err);
1500                 else
1501                         error(EPIPE, "connection closed");
1502         }
1503         if ((qio_flags & QIO_LIMIT) && (q->dlen >= q->limit)) {
1504                 /* drop overflow takes priority over regular non-blocking */
1505                 if ((qio_flags & QIO_DROP_OVERFLOW) || (q->state & Qdropoverflow)) {
1506                         spin_unlock_irqsave(&q->lock);
1507                         freeb(b);
1508                         return -1;
1509                 }
1510                 /* People shouldn't set NON_BLOCK without CAN_ERR, but we can be nice
1511                  * and catch it. */
1512                 if ((qio_flags & QIO_CAN_ERR_SLEEP) && (qio_flags & QIO_NON_BLOCK)) {
1513                         spin_unlock_irqsave(&q->lock);
1514                         freeb(b);
1515                         error(EAGAIN, "queue full");
1516                 }
1517         }
1518         ret = enqueue_blist(q, b);
1519         QDEBUG checkb(b, "__qbwrite");
1520         spin_unlock_irqsave(&q->lock);
1521         /* TODO: not sure if the usage of a kick is mutually exclusive with a
1522          * wakeup, meaning that actual users either want a kick or have qreaders. */
1523         if (q->kick && (was_unreadable || (q->state & Qkick)))
1524                 q->kick(q->arg);
1525         if (was_unreadable) {
1526                 /* Unlike the read side, there's no double-check to make sure the queue
1527                  * transitioned across an edge.  We know we added something, so that's
1528                  * enough.  We wake if the queue was empty.  Both sides are the same, in
1529                  * that the condition for which we do the rendez_wakeup() is the same as
1530                  * the condition done for the rendez_sleep(). */
1531                 rendez_wakeup(&q->rr);
1532                 qwake_cb(q, FDTAP_FILT_READABLE);
1533         }
1534         /*
1535          *  flow control, wait for queue to get below the limit
1536          *  before allowing the process to continue and queue
1537          *  more.  We do this here so that postnote can only
1538          *  interrupt us after the data has been queued.  This
1539          *  means that things like 9p flushes and ssl messages
1540          *  will not be disrupted by software interrupts.
1541          *
1542          *  Note - this is moderately dangerous since a process
1543          *  that keeps getting interrupted and rewriting will
1544          *  queue infinite crud.
1545          */
1546         if ((qio_flags & QIO_CAN_ERR_SLEEP) &&
1547             !(q->state & Qdropoverflow) && !(qio_flags & QIO_NON_BLOCK)) {
1548                 /* This is a racy peek at the q status.  If we accidentally block, our
1549                  * rendez will return.  The rendez's peak (qwriter_should_wake) is also
1550                  * racy w.r.t.  the q's spinlock (that lock protects writes, but not
1551                  * reads).
1552                  *
1553                  * Here's the deal: when holding the rendez lock, if we see the sleep
1554                  * condition, the consumer will wake us.  The condition will only ever
1555                  * be changed by the next qbread() (consumer, changes q->dlen).  That
1556                  * code will do a rendez wake, which will spin on the rendez lock,
1557                  * meaning it won't procede until we either see the new state (and
1558                  * return) or put ourselves on the rendez, and wake up.
1559                  *
1560                  * The pattern is one side writes mem, then signals.  Our side checks
1561                  * the signal, then reads the mem.  The goal is to not miss seeing the
1562                  * signal AND missing the memory write.  In this specific case, the
1563                  * signal is actually synchronous (the rendez lock) and not basic shared
1564                  * memory.
1565                  *
1566                  * Oh, and we spin in case we woke early and someone else filled the
1567                  * queue, mesa-style. */
1568                 while (!qwriter_should_wake(q))
1569                         rendez_sleep(&q->wr, qwriter_should_wake, q);
1570         }
1571         return ret;
1572 }
1573
1574 /*
1575  *  add a block to a queue obeying flow control
1576  */
1577 ssize_t qbwrite(struct queue *q, struct block *b)
1578 {
1579         return __qbwrite(q, b, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
1580 }
1581
1582 ssize_t qbwrite_nonblock(struct queue *q, struct block *b)
1583 {
1584         return __qbwrite(q, b, QIO_CAN_ERR_SLEEP | QIO_LIMIT | QIO_NON_BLOCK);
1585 }
1586
1587 ssize_t qibwrite(struct queue *q, struct block *b)
1588 {
1589         return __qbwrite(q, b, 0);
1590 }
1591
1592 /* Helper, allocs a block and copies [from, from + len) into it.  Returns the
1593  * block on success, 0 on failure. */
1594 static struct block *build_block(void *from, size_t len, int mem_flags)
1595 {
1596         struct block *b;
1597         void *ext_buf;
1598
1599         /* If len is small, we don't need to bother with the extra_data.  But until
1600          * the whole stack can handle extd blocks, we'll use them unconditionally.
1601          * */
1602 #ifdef CONFIG_BLOCK_EXTRAS
1603         /* allocb builds in 128 bytes of header space to all blocks, but this is
1604          * only available via padblock (to the left).  we also need some space
1605          * for pullupblock for some basic headers (like icmp) that get written
1606          * in directly */
1607         b = block_alloc(64, mem_flags);
1608         if (!b)
1609                 return 0;
1610         ext_buf = kmalloc(len, mem_flags);
1611         if (!ext_buf) {
1612                 kfree(b);
1613                 return 0;
1614         }
1615         memcpy(ext_buf, from, len);
1616         if (block_add_extd(b, 1, mem_flags)) {
1617                 kfree(ext_buf);
1618                 kfree(b);
1619                 return 0;
1620         }
1621         b->extra_data[0].base = (uintptr_t)ext_buf;
1622         b->extra_data[0].off = 0;
1623         b->extra_data[0].len = len;
1624         b->extra_len += len;
1625 #else
1626         b = block_alloc(len, mem_flags);
1627         if (!b)
1628                 return 0;
1629         memmove(b->wp, from, len);
1630         b->wp += len;
1631 #endif
1632         return b;
1633 }
1634
1635 static ssize_t __qwrite(struct queue *q, void *vp, size_t len, int mem_flags,
1636                         int qio_flags)
1637 {
1638         ERRSTACK(1);
1639         size_t n;
1640         volatile size_t sofar = 0;      /* volatile for the waserror */
1641         struct block *b;
1642         uint8_t *p = vp;
1643         void *ext_buf;
1644
1645         /* Only some callers can throw.  Others might be in a context where waserror
1646          * isn't safe. */
1647         if ((qio_flags & QIO_CAN_ERR_SLEEP) && waserror()) {
1648                 /* Any error (EAGAIN for nonblock, syscall aborted, even EPIPE) after
1649                  * some data has been sent should be treated as a partial write. */
1650                 if (sofar)
1651                         goto out_ok;
1652                 nexterror();
1653         }
1654         do {
1655                 n = len - sofar;
1656                 /* This is 64K, the max amount per single block.  Still a good value? */
1657                 if (n > Maxatomic)
1658                         n = Maxatomic;
1659                 b = build_block(p + sofar, n, mem_flags);
1660                 if (!b)
1661                         break;
1662                 if (__qbwrite(q, b, qio_flags) < 0)
1663                         break;
1664                 sofar += n;
1665         } while ((sofar < len) && (q->state & Qmsg) == 0);
1666 out_ok:
1667         if (qio_flags & QIO_CAN_ERR_SLEEP)
1668                 poperror();
1669         return sofar;
1670 }
1671
1672 ssize_t qwrite(struct queue *q, void *vp, int len)
1673 {
1674         return __qwrite(q, vp, len, MEM_WAIT, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
1675 }
1676
1677 ssize_t qwrite_nonblock(struct queue *q, void *vp, int len)
1678 {
1679         return __qwrite(q, vp, len, MEM_WAIT, QIO_CAN_ERR_SLEEP | QIO_LIMIT |
1680                                               QIO_NON_BLOCK);
1681 }
1682
1683 ssize_t qiwrite(struct queue *q, void *vp, int len)
1684 {
1685         return __qwrite(q, vp, len, MEM_ATOMIC, 0);
1686 }
1687
1688 /*
1689  *  be extremely careful when calling this,
1690  *  as there is no reference accounting
1691  */
1692 void qfree(struct queue *q)
1693 {
1694         qclose(q);
1695         kfree(q);
1696 }
1697
1698 /*
1699  *  Mark a queue as closed.  No further IO is permitted.
1700  *  All blocks are released.
1701  */
1702 void qclose(struct queue *q)
1703 {
1704         struct block *bfirst;
1705
1706         if (q == NULL)
1707                 return;
1708
1709         /* mark it */
1710         spin_lock_irqsave(&q->lock);
1711         q->state |= Qclosed;
1712         q->state &= ~Qdropoverflow;
1713         q->err[0] = 0;
1714         bfirst = q->bfirst;
1715         q->bfirst = 0;
1716         q->dlen = 0;
1717         spin_unlock_irqsave(&q->lock);
1718
1719         /* free queued blocks */
1720         freeblist(bfirst);
1721
1722         /* wake up readers/writers */
1723         rendez_wakeup(&q->rr);
1724         rendez_wakeup(&q->wr);
1725         qwake_cb(q, FDTAP_FILT_HANGUP);
1726 }
1727
1728 /* Mark a queue as closed.  Wakeup any readers.  Don't remove queued blocks.
1729  *
1730  * msg will be the errstr received by any waiters (qread, qbread, etc).  If
1731  * there is no message, which is what also happens during a natural qclose(),
1732  * those waiters will simply return 0.  qwriters will always error() on a
1733  * closed/hungup queue. */
1734 void qhangup(struct queue *q, char *msg)
1735 {
1736         /* mark it */
1737         spin_lock_irqsave(&q->lock);
1738         q->state |= Qclosed;
1739         if (msg == 0 || *msg == 0)
1740                 q->err[0] = 0;
1741         else
1742                 strlcpy(q->err, msg, ERRMAX);
1743         spin_unlock_irqsave(&q->lock);
1744
1745         /* wake up readers/writers */
1746         rendez_wakeup(&q->rr);
1747         rendez_wakeup(&q->wr);
1748         qwake_cb(q, FDTAP_FILT_HANGUP);
1749 }
1750
1751 /*
1752  *  return non-zero if the q is hungup
1753  */
1754 int qisclosed(struct queue *q)
1755 {
1756         return q->state & Qclosed;
1757 }
1758
1759 /*
1760  *  mark a queue as no longer hung up.  resets the wake_cb.
1761  */
1762 void qreopen(struct queue *q)
1763 {
1764         spin_lock_irqsave(&q->lock);
1765         q->state &= ~Qclosed;
1766         q->eof = 0;
1767         q->limit = q->inilim;
1768         q->wake_cb = 0;
1769         q->wake_data = 0;
1770         spin_unlock_irqsave(&q->lock);
1771 }
1772
1773 /*
1774  *  return bytes queued
1775  */
1776 int qlen(struct queue *q)
1777 {
1778         return q->dlen;
1779 }
1780
1781 size_t q_bytes_read(struct queue *q)
1782 {
1783         return q->bytes_read;
1784 }
1785
1786 /*
1787  * return space remaining before flow control
1788  *
1789  *  This used to be
1790  *  q->len < q->limit/2
1791  *  but it slows down tcp too much for certain write sizes.
1792  *  I really don't understand it completely.  It may be
1793  *  due to the queue draining so fast that the transmission
1794  *  stalls waiting for the app to produce more data.  - presotto
1795  *
1796  *  q->len was the amount of bytes, which is no longer used.  we now use
1797  *  q->dlen, the amount of usable data.  a.k.a. qlen()...  - brho
1798  */
1799 int qwindow(struct queue *q)
1800 {
1801         int l;
1802
1803         l = q->limit - q->dlen;
1804         if (l < 0)
1805                 l = 0;
1806         return l;
1807 }
1808
1809 /*
1810  *  return true if we can read without blocking
1811  */
1812 int qcanread(struct queue *q)
1813 {
1814         return q->bfirst != 0;
1815 }
1816
1817 /*
1818  *  change queue limit
1819  */
1820 void qsetlimit(struct queue *q, size_t limit)
1821 {
1822         bool was_writable = qwritable(q);
1823
1824         q->limit = limit;
1825         if (!was_writable && qwritable(q)) {
1826                 rendez_wakeup(&q->wr);
1827                 qwake_cb(q, FDTAP_FILT_WRITABLE);
1828         }
1829 }
1830
1831 size_t qgetlimit(struct queue *q)
1832 {
1833         return q->limit;
1834 }
1835
1836 /*
1837  *  set whether writes drop overflowing blocks, or if we sleep
1838  */
1839 void qdropoverflow(struct queue *q, bool onoff)
1840 {
1841         spin_lock_irqsave(&q->lock);
1842         if (onoff)
1843                 q->state |= Qdropoverflow;
1844         else
1845                 q->state &= ~Qdropoverflow;
1846         spin_unlock_irqsave(&q->lock);
1847 }
1848
1849 /* Be careful: this can affect concurrent reads/writes and code that might have
1850  * built-in expectations of the q's type. */
1851 void q_toggle_qmsg(struct queue *q, bool onoff)
1852 {
1853         spin_lock_irqsave(&q->lock);
1854         if (onoff)
1855                 q->state |= Qmsg;
1856         else
1857                 q->state &= ~Qmsg;
1858         spin_unlock_irqsave(&q->lock);
1859 }
1860
1861 /* Be careful: this can affect concurrent reads/writes and code that might have
1862  * built-in expectations of the q's type. */
1863 void q_toggle_qcoalesce(struct queue *q, bool onoff)
1864 {
1865         spin_lock_irqsave(&q->lock);
1866         if (onoff)
1867                 q->state |= Qcoalesce;
1868         else
1869                 q->state &= ~Qcoalesce;
1870         spin_unlock_irqsave(&q->lock);
1871 }
1872
1873 /*
1874  *  flush the output queue
1875  */
1876 void qflush(struct queue *q)
1877 {
1878         struct block *bfirst;
1879
1880         /* mark it */
1881         spin_lock_irqsave(&q->lock);
1882         bfirst = q->bfirst;
1883         q->bfirst = 0;
1884         q->dlen = 0;
1885         spin_unlock_irqsave(&q->lock);
1886
1887         /* free queued blocks */
1888         freeblist(bfirst);
1889
1890         /* wake up writers */
1891         rendez_wakeup(&q->wr);
1892         qwake_cb(q, FDTAP_FILT_WRITABLE);
1893 }
1894
1895 int qfull(struct queue *q)
1896 {
1897         return !qwritable(q);
1898 }
1899
1900 int qstate(struct queue *q)
1901 {
1902         return q->state;
1903 }
1904
1905 void qdump(struct queue *q)
1906 {
1907         if (q)
1908                 printk("q=%p bfirst=%p blast=%p dlen=%d limit=%d state=#%x\n",
1909                            q, q->bfirst, q->blast, q->dlen, q->limit, q->state);
1910 }
1911
1912 /* On certain wakeup events, qio will call func(q, data, filter), where filter
1913  * marks the type of wakeup event (flags from FDTAP).
1914  *
1915  * There's no sync protection.  If you change the CB while the qio is running,
1916  * you might get a CB with the data or func from a previous set_wake_cb.  You
1917  * should set this once per queue and forget it.
1918  *
1919  * You can remove the CB by passing in 0 for the func.  Alternatively, you can
1920  * just make sure that the func(data) pair are valid until the queue is freed or
1921  * reopened. */
1922 void qio_set_wake_cb(struct queue *q, qio_wake_cb_t func, void *data)
1923 {
1924         q->wake_data = data;
1925         wmb();  /* if we see func, we'll also see the data for it */
1926         q->wake_cb = func;
1927 }
1928
1929 /* Helper for detecting whether we'll block on a read at this instant. */
1930 bool qreadable(struct queue *q)
1931 {
1932         return qlen(q) > 0;
1933 }
1934
1935 /* Helper for detecting whether we'll block on a write at this instant. */
1936 bool qwritable(struct queue *q)
1937 {
1938         return !q->limit || qwindow(q) > 0;
1939 }