Remove getcallerpc()
[akaros.git] / kern / src / ns / qio.c
1 /* Copyright © 1994-1999 Lucent Technologies Inc.  All rights reserved.
2  * Portions Copyright © 1997-1999 Vita Nuova Limited
3  * Portions Copyright © 2000-2007 Vita Nuova Holdings Limited
4  *                                (www.vitanuova.com)
5  * Revisions Copyright © 2000-2007 Lucent Technologies Inc. and others
6  *
7  * Modified for the Akaros operating system:
8  * Copyright (c) 2013-2014 The Regents of the University of California
9  * Copyright (c) 2013-2015 Google Inc.
10  *
11  * Permission is hereby granted, free of charge, to any person obtaining a copy
12  * of this software and associated documentation files (the "Software"), to deal
13  * in the Software without restriction, including without limitation the rights
14  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15  * copies of the Software, and to permit persons to whom the Software is
16  * furnished to do so, subject to the following conditions:
17  *
18  * The above copyright notice and this permission notice shall be included in
19  * all copies or substantial portions of the Software.
20  *
21  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL THE
24  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
27  * SOFTWARE. */
28
29 #include <slab.h>
30 #include <kmalloc.h>
31 #include <kref.h>
32 #include <string.h>
33 #include <stdio.h>
34 #include <assert.h>
35 #include <error.h>
36 #include <cpio.h>
37 #include <pmap.h>
38 #include <smp.h>
39 #include <net/ip.h>
40
41 #define PANIC_EXTRA(b)                                                  \
42 {                                                                       \
43         if ((b)->extra_len) {                                           \
44                 printblock(b);                                          \
45                 backtrace();                                            \
46                 panic("%s doesn't handle extra_data", __FUNCTION__);    \
47         }                                                               \
48 }
49
50 static uint32_t padblockcnt;
51 static uint32_t concatblockcnt;
52 static uint32_t pullupblockcnt;
53 static uint32_t copyblockcnt;
54 static uint32_t consumecnt;
55 static uint32_t producecnt;
56 static uint32_t qcopycnt;
57
58 static int debugging;
59
60 #define QDEBUG  if(0)
61
62 /*
63  *  IO queues
64  */
65
66 struct queue {
67         spinlock_t lock;;
68
69         struct block *bfirst;           /* buffer */
70         struct block *blast;
71
72         int dlen;                       /* data bytes in queue */
73         int limit;                      /* max bytes in queue */
74         int inilim;                     /* initial limit */
75         int state;
76         int eof;                        /* number of eofs read by user */
77         size_t bytes_read;
78
79         void (*kick) (void *);          /* restart output */
80         void (*bypass) (void *, struct block *); /* bypass queue altogether */
81         void *arg;                      /* argument to kick */
82
83         struct rendez rr;               /* process waiting to read */
84         struct rendez wr;               /* process waiting to write */
85         qio_wake_cb_t wake_cb;          /* callbacks for qio wakeups */
86         void *wake_data;
87
88         char err[ERRMAX];
89 };
90
91 enum {
92         Maxatomic = 64 * 1024,
93         QIO_CAN_ERR_SLEEP = (1 << 0),   /* can throw errors or block/sleep */
94         QIO_LIMIT = (1 << 1),           /* respect q->limit */
95         QIO_DROP_OVERFLOW = (1 << 2),   /* alternative to qdropoverflow */
96         QIO_JUST_ONE_BLOCK = (1 << 3),  /* when qbreading, just get one block */
97         QIO_NON_BLOCK = (1 << 4),       /* throw EAGAIN instead of blocking */
98         QIO_DONT_KICK = (1 << 5),       /* don't kick when waking */
99 };
100
101 unsigned int qiomaxatomic = Maxatomic;
102
103 static size_t copy_to_block_body(struct block *to, void *from, size_t copy_amt);
104 static ssize_t __qbwrite(struct queue *q, struct block *b, int flags);
105 static struct block *__qbread(struct queue *q, size_t len, int qio_flags,
106                               int mem_flags);
107 static bool qwait_and_ilock(struct queue *q, int qio_flags);
108
109 /* Helper: fires a wake callback, sending 'filter' */
110 static void qwake_cb(struct queue *q, int filter)
111 {
112         if (q->wake_cb)
113                 q->wake_cb(q, q->wake_data, filter);
114 }
115
116 void ixsummary(void)
117 {
118         debugging ^= 1;
119         printd("pad %lu, concat %lu, pullup %lu, copy %lu\n",
120                    padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
121         printd("consume %lu, produce %lu, qcopy %lu\n",
122                    consumecnt, producecnt, qcopycnt);
123 }
124
125 /*
126  *  pad a block to the front (or the back if size is negative)
127  */
128 struct block *padblock(struct block *bp, int size)
129 {
130         int n;
131         struct block *nbp;
132
133         QDEBUG checkb(bp, "padblock 1");
134         if (size >= 0) {
135                 if (bp->rp - bp->base >= size) {
136                         bp->network_offset += size;
137                         bp->transport_offset += size;
138                         bp->rp -= size;
139                         return bp;
140                 }
141
142                 PANIC_EXTRA(bp);
143                 if (bp->next)
144                         panic("%s %p had a next", __func__, bp);
145                 n = BLEN(bp);
146                 padblockcnt++;
147                 nbp = block_alloc(size + n, MEM_WAIT);
148                 block_copy_metadata(nbp, bp);
149                 nbp->rp += size;
150                 nbp->wp = nbp->rp;
151                 memmove(nbp->wp, bp->rp, n);
152                 nbp->wp += n;
153                 freeb(bp);
154                 nbp->rp -= size;
155         } else {
156                 size = -size;
157
158                 PANIC_EXTRA(bp);
159
160                 if (bp->next)
161                         panic("%s %p had a next", __func__, bp);
162
163                 if (bp->lim - bp->wp >= size)
164                         return bp;
165
166                 n = BLEN(bp);
167                 padblockcnt++;
168                 nbp = block_alloc(size + n, MEM_WAIT);
169                 block_copy_metadata(nbp, bp);
170                 memmove(nbp->wp, bp->rp, n);
171                 nbp->wp += n;
172                 freeb(bp);
173         }
174         QDEBUG checkb(nbp, "padblock 1");
175         return nbp;
176 }
177
178 /*
179  *  return count of bytes in a string of blocks
180  */
181 int blocklen(struct block *bp)
182 {
183         int len;
184
185         len = 0;
186         while (bp) {
187                 len += BLEN(bp);
188                 bp = bp->next;
189         }
190         return len;
191 }
192
193 /*
194  * return count of space in blocks
195  */
196 int blockalloclen(struct block *bp)
197 {
198         int len;
199
200         len = 0;
201         while (bp) {
202                 len += BALLOC(bp);
203                 bp = bp->next;
204         }
205         return len;
206 }
207
208 /*
209  *  copy the  string of blocks into
210  *  a single block and free the string
211  */
212 struct block *concatblock(struct block *bp)
213 {
214         int len;
215         struct block *nb, *f;
216
217         if (bp->next == 0)
218                 return bp;
219
220         /* probably use parts of qclone */
221         PANIC_EXTRA(bp);
222         nb = block_alloc(blocklen(bp), MEM_WAIT);
223         for (f = bp; f; f = f->next) {
224                 len = BLEN(f);
225                 memmove(nb->wp, f->rp, len);
226                 nb->wp += len;
227         }
228         concatblockcnt += BLEN(nb);
229         freeblist(bp);
230         QDEBUG checkb(nb, "concatblock 1");
231         return nb;
232 }
233
234 /* Makes an identical copy of the block, collapsing all the data into the block
235  * body.  It does not point to the contents of the original, it is a copy
236  * (unlike qclone).  Since we're copying, we might as well put the memory into
237  * one contiguous chunk. */
238 struct block *copyblock(struct block *bp, int mem_flags)
239 {
240         struct block *newb;
241         struct extra_bdata *ebd;
242         size_t amt;
243
244         QDEBUG checkb(bp, "copyblock 0");
245         newb = block_alloc(BLEN(bp), mem_flags);
246         if (!newb)
247                 return 0;
248         amt = copy_to_block_body(newb, bp->rp, BHLEN(bp));
249         assert(amt == BHLEN(bp));
250         for (int i = 0; i < bp->nr_extra_bufs; i++) {
251                 ebd = &bp->extra_data[i];
252                 if (!ebd->base || !ebd->len)
253                         continue;
254                 amt = copy_to_block_body(newb, (void*)ebd->base + ebd->off,
255                                          ebd->len);
256                 assert(amt == ebd->len);
257         }
258         block_copy_metadata(newb, bp);
259         copyblockcnt++;
260         QDEBUG checkb(newb, "copyblock 1");
261         return newb;
262 }
263
264 /* Returns a block with the remaining contents of b all in the main body of the
265  * returned block.  Replace old references to b with the returned value (which
266  * may still be 'b', if no change was needed. */
267 struct block *linearizeblock(struct block *b)
268 {
269         struct block *newb;
270
271         if (!b->extra_len)
272                 return b;
273         newb = copyblock(b, MEM_WAIT);
274         freeb(b);
275         return newb;
276 }
277
278 /* Make sure the first block has at least n bytes in its main body.  Pulls up
279  * data from the *list* of blocks.  Returns 0 if there is not enough data in the
280  * block list. */
281 struct block *pullupblock(struct block *bp, int n)
282 {
283         int i, len, seglen;
284         struct block *nbp;
285         struct extra_bdata *ebd;
286
287         /*
288          *  this should almost always be true, it's
289          *  just to avoid every caller checking.
290          */
291         if (BHLEN(bp) >= n)
292                 return bp;
293
294         /* If there's no chance, just bail out now.  This might be slightly
295          * wasteful if there's a long blist that does have enough data. */
296         if (n > blocklen(bp))
297                 return 0;
298         /* a start at explicit main-body / header management */
299         if (bp->extra_len) {
300                 if (n > bp->lim - bp->rp) {
301                         /* would need to realloc a new block and copy everything
302                          * over. */
303                         panic("can't pullup %d bytes, no place to put it: bp->lim %p, bp->rp %p, bp->lim-bp->rp %d\n",
304                               n, bp->lim, bp->rp, bp->lim-bp->rp);
305                 }
306                 len = n - BHLEN(bp);
307                 /* Would need to recursively call this, or otherwise pull from
308                  * later blocks and put chunks of their data into the block
309                  * we're building. */
310                 if (len > bp->extra_len)
311                         panic("pullup more than extra (%d, %d, %d)\n",
312                               n, BHLEN(bp), bp->extra_len);
313                 QDEBUG checkb(bp, "before pullup");
314                 for (int i = 0; (i < bp->nr_extra_bufs) && len; i++) {
315                         ebd = &bp->extra_data[i];
316                         if (!ebd->base || !ebd->len)
317                                 continue;
318                         seglen = MIN(ebd->len, len);
319                         memcpy(bp->wp, (void*)(ebd->base + ebd->off), seglen);
320                         bp->wp += seglen;
321                         len -= seglen;
322                         ebd->len -= seglen;
323                         ebd->off += seglen;
324                         bp->extra_len -= seglen;
325                         if (ebd->len == 0) {
326                                 kfree((void *)ebd->base);
327                                 ebd->off = 0;
328                                 ebd->base = 0;
329                         }
330                 }
331                 /* maybe just call pullupblock recursively here */
332                 if (len)
333                         panic("pullup %d bytes overdrawn\n", len);
334                 QDEBUG checkb(bp, "after pullup");
335                 return bp;
336         }
337
338         /*
339          *  if not enough room in the first block,
340          *  add another to the front of the list.
341          */
342         if (bp->lim - bp->rp < n) {
343                 nbp = block_alloc(n, MEM_WAIT);
344                 nbp->next = bp;
345                 bp = nbp;
346         }
347
348         /*
349          *  copy bytes from the trailing blocks into the first
350          */
351         n -= BLEN(bp);
352         while ((nbp = bp->next)) {
353                 i = BLEN(nbp);
354                 if (i > n) {
355                         memmove(bp->wp, nbp->rp, n);
356                         pullupblockcnt++;
357                         bp->wp += n;
358                         nbp->rp += n;
359                         QDEBUG checkb(bp, "pullupblock 1");
360                         return bp;
361                 } else {
362                         memmove(bp->wp, nbp->rp, i);
363                         pullupblockcnt++;
364                         bp->wp += i;
365                         bp->next = nbp->next;
366                         nbp->next = 0;
367                         freeb(nbp);
368                         n -= i;
369                         if (n == 0) {
370                                 QDEBUG checkb(bp, "pullupblock 2");
371                                 return bp;
372                         }
373                 }
374         }
375         freeb(bp);
376         return 0;
377 }
378
379 /*
380  *  make sure the first block has at least n bytes in its main body
381  */
382 struct block *pullupqueue(struct queue *q, int n)
383 {
384         struct block *b;
385
386         /* TODO: lock to protect the queue links? */
387         if ((BHLEN(q->bfirst) >= n))
388                 return q->bfirst;
389         q->bfirst = pullupblock(q->bfirst, n);
390         for (b = q->bfirst; b != NULL && b->next != NULL; b = b->next) ;
391         q->blast = b;
392         return q->bfirst;
393 }
394
395 /* throw away count bytes from the front of
396  * block's extradata.  Returns count of bytes
397  * thrown away
398  */
399
400 static int pullext(struct block *bp, int count)
401 {
402         struct extra_bdata *ed;
403         int i, rem, bytes = 0;
404
405         for (i = 0; bp->extra_len && count && i < bp->nr_extra_bufs; i++) {
406                 ed = &bp->extra_data[i];
407                 rem = MIN(count, ed->len);
408                 bp->extra_len -= rem;
409                 count -= rem;
410                 bytes += rem;
411                 ed->off += rem;
412                 ed->len -= rem;
413                 if (ed->len == 0) {
414                         kfree((void *)ed->base);
415                         ed->base = 0;
416                         ed->off = 0;
417                 }
418         }
419         return bytes;
420 }
421
422 /* throw away count bytes from the end of a
423  * block's extradata.  Returns count of bytes
424  * thrown away
425  */
426
427 static int dropext(struct block *bp, int count)
428 {
429         struct extra_bdata *ed;
430         int i, rem, bytes = 0;
431
432         for (i = bp->nr_extra_bufs - 1; bp->extra_len && count && i >= 0; i--) {
433                 ed = &bp->extra_data[i];
434                 rem = MIN(count, ed->len);
435                 bp->extra_len -= rem;
436                 count -= rem;
437                 bytes += rem;
438                 ed->len -= rem;
439                 if (ed->len == 0) {
440                         kfree((void *)ed->base);
441                         ed->base = 0;
442                         ed->off = 0;
443                 }
444         }
445         return bytes;
446 }
447
448 /*
449  *  throw away up to count bytes from a
450  *  list of blocks.  Return count of bytes
451  *  thrown away.
452  */
453 static int _pullblock(struct block **bph, int count, int free)
454 {
455         struct block *bp;
456         int n, bytes;
457
458         bytes = 0;
459         if (bph == NULL)
460                 return 0;
461
462         while (*bph != NULL && count != 0) {
463                 bp = *bph;
464
465                 n = MIN(BHLEN(bp), count);
466                 bytes += n;
467                 count -= n;
468                 bp->rp += n;
469                 n = pullext(bp, count);
470                 bytes += n;
471                 count -= n;
472                 QDEBUG checkb(bp, "pullblock ");
473                 if (BLEN(bp) == 0 && (free || count)) {
474                         *bph = bp->next;
475                         bp->next = NULL;
476                         freeb(bp);
477                 }
478         }
479         return bytes;
480 }
481
482 int pullblock(struct block **bph, int count)
483 {
484         return _pullblock(bph, count, 1);
485 }
486
487 /*
488  *  trim to len bytes starting at offset
489  */
490 struct block *trimblock(struct block *bp, int offset, int len)
491 {
492         uint32_t l, trim;
493         int olen = len;
494
495         QDEBUG checkb(bp, "trimblock 1");
496         if (blocklen(bp) < offset + len) {
497                 freeblist(bp);
498                 return NULL;
499         }
500
501         l =_pullblock(&bp, offset, 0);
502         if (bp == NULL)
503                 return NULL;
504         if (l != offset) {
505                 freeblist(bp);
506                 return NULL;
507         }
508
509         while ((l = BLEN(bp)) < len) {
510                 len -= l;
511                 bp = bp->next;
512         }
513
514         trim = BLEN(bp) - len;
515         trim -= dropext(bp, trim);
516         bp->wp -= trim;
517
518         if (bp->next) {
519                 freeblist(bp->next);
520                 bp->next = NULL;
521         }
522         return bp;
523 }
524
525 /* Adjust block @bp so that its size is exactly @len.
526  * If the size is increased, fill in the new contents with zeros.
527  * If the size is decreased, discard some of the old contents at the tail. */
528 struct block *adjustblock(struct block *bp, int len)
529 {
530         struct extra_bdata *ebd;
531         void *buf;
532         int i;
533
534         if (len < 0) {
535                 freeb(bp);
536                 return NULL;
537         }
538
539         if (len == BLEN(bp))
540                 return bp;
541
542         /* Shrink within block main body. */
543         if (len <= BHLEN(bp)) {
544                 free_block_extra(bp);
545                 bp->wp = bp->rp + len;
546                 QDEBUG checkb(bp, "adjustblock 1");
547                 return bp;
548         }
549
550         /* Need to grow. */
551         if (len > BLEN(bp)) {
552                 /* Grow within block main body. */
553                 if (bp->extra_len == 0 && bp->rp + len <= bp->lim) {
554                         memset(bp->wp, 0, len - BLEN(bp));
555                         bp->wp = bp->rp + len;
556                         QDEBUG checkb(bp, "adjustblock 2");
557                         return bp;
558                 }
559                 /* Grow with extra data buffers. */
560                 buf = kzmalloc(len - BLEN(bp), MEM_WAIT);
561                 block_append_extra(bp, (uintptr_t)buf, 0, len - BLEN(bp),
562                                    MEM_WAIT);
563                 QDEBUG checkb(bp, "adjustblock 3");
564                 return bp;
565         }
566
567         /* Shrink extra data buffers.
568          * len is how much of ebd we need to keep.
569          * extra_len is re-accumulated. */
570         assert(bp->extra_len > 0);
571         len -= BHLEN(bp);
572         bp->extra_len = 0;
573         for (i = 0; i < bp->nr_extra_bufs; i++) {
574                 ebd = &bp->extra_data[i];
575                 if (len <= ebd->len)
576                         break;
577                 len -= ebd->len;
578                 bp->extra_len += ebd->len;
579         }
580         /* If len becomes zero, extra_data[i] should be freed. */
581         if (len > 0) {
582                 ebd = &bp->extra_data[i];
583                 ebd->len = len;
584                 bp->extra_len += ebd->len;
585                 i++;
586         }
587         for (; i < bp->nr_extra_bufs; i++) {
588                 ebd = &bp->extra_data[i];
589                 if (ebd->base)
590                         kfree((void*)ebd->base);
591                 ebd->base = ebd->off = ebd->len = 0;
592         }
593         QDEBUG checkb(bp, "adjustblock 4");
594         return bp;
595 }
596
597 /* Helper: removes and returns the first block from q */
598 static struct block *pop_first_block(struct queue *q)
599 {
600         struct block *b = q->bfirst;
601
602         q->dlen -= BLEN(b);
603         q->bytes_read += BLEN(b);
604         q->bfirst = b->next;
605         b->next = 0;
606         return b;
607 }
608
609 /* Helper: copies up to copy_amt from a buf to a block's main body (b->wp) */
610 static size_t copy_to_block_body(struct block *to, void *from, size_t copy_amt)
611 {
612         copy_amt = MIN(to->lim - to->wp, copy_amt);
613         memcpy(to->wp, from, copy_amt);
614         to->wp += copy_amt;
615         return copy_amt;
616 }
617
618 /* Accounting helper.  Block b in q lost amt extra_data */
619 static void block_and_q_lost_extra(struct block *b, struct queue *q, size_t amt)
620 {
621         b->extra_len -= amt;
622         q->dlen -= amt;
623         q->bytes_read += amt;
624 }
625
626 /* Helper: moves ebd from a block (in from_q) to another block.  The *ebd is
627  * fixed in 'from', so we move its contents and zero it out in 'from'.
628  *
629  * Returns the length moved (0 on failure). */
630 static size_t move_ebd(struct extra_bdata *ebd, struct block *to,
631                        struct block *from, struct queue *from_q)
632 {
633         size_t ret = ebd->len;
634
635         if (block_append_extra(to, ebd->base, ebd->off, ebd->len, MEM_ATOMIC))
636                 return 0;
637         block_and_q_lost_extra(from, from_q, ebd->len);
638         ebd->base = ebd->len = ebd->off = 0;
639         return ret;
640 }
641
642 /* Copy up to len bytes from q->bfirst to @to, leaving the block in place.  May
643  * return with less than len, but greater than 0, even if there is more
644  * available in q.
645  *
646  * At any moment that we have copied anything and things are tricky, we can just
647  * return.  The trickiness comes from a bunch of variables: is the main body
648  * empty?  How do we split the ebd?  If our alloc fails, then we can fall back
649  * to @to's main body, but only if we haven't used it yet. */
650 static size_t copy_from_first_block(struct queue *q, struct block *to,
651                                     size_t len)
652 {
653         struct block *from = q->bfirst;
654         size_t copy_amt, amt;
655         struct extra_bdata *ebd;
656
657         assert(len < BLEN(from));       /* sanity */
658         /* Try to extract from the main body */
659         copy_amt = MIN(BHLEN(from), len);
660         if (copy_amt) {
661                 copy_amt = copy_to_block_body(to, from->rp, copy_amt);
662                 from->rp += copy_amt;
663                 /* We only change dlen, (data len), not q->len, since the q
664                  * still has the same block memory allocation (no kfrees
665                  * happened) */
666                 q->dlen -= copy_amt;
667                 q->bytes_read += copy_amt;
668         }
669         /* Try to extract the remainder from the extra data */
670         len -= copy_amt;
671         for (int i = 0; (i < from->nr_extra_bufs) && len; i++) {
672                 ebd = &from->extra_data[i];
673                 if (!ebd->base || !ebd->len)
674                         continue;
675                 if (len >= ebd->len) {
676                         amt = move_ebd(ebd, to, from, q);
677                         if (!amt) {
678                                 /* our internal alloc could have failed.   this
679                                  * ebd is now the last one we'll consider.
680                                  * let's handle it separately and put it in the
681                                  * main body. */
682                                 if (copy_amt)
683                                         return copy_amt;
684                                 copy_amt = copy_to_block_body(to,
685                                                               (void*)ebd->base +
686                                                               ebd->off,
687                                                               ebd->len);
688                                 block_and_q_lost_extra(from, q, copy_amt);
689                                 break;
690                         }
691                         len -= amt;
692                         copy_amt += amt;
693                         continue;
694                 } else {
695                         /* If we're here, we reached our final ebd, which we'll
696                          * need to split to get anything from it. */
697                         if (copy_amt)
698                                 return copy_amt;
699                         copy_amt = copy_to_block_body(to, (void*)ebd->base +
700                                                       ebd->off, len);
701                         ebd->off += copy_amt;
702                         ebd->len -= copy_amt;
703                         block_and_q_lost_extra(from, q, copy_amt);
704                         break;
705                 }
706         }
707         if (len)
708                 assert(copy_amt);       /* sanity */
709         return copy_amt;
710 }
711
712 /* Return codes for __qbread and __try_qbread. */
713 enum {
714         QBR_OK,
715         QBR_FAIL,
716         QBR_SPARE,      /* we need a spare block */
717         QBR_AGAIN,      /* do it again, we are coalescing blocks */
718 };
719
720 /* Helper and back-end for __qbread: extracts and returns a list of blocks
721  * containing up to len bytes.  It may contain less than len even if q has more
722  * data.
723  *
724  * Returns a code interpreted by __qbread, and the returned blist in ret. */
725 static int __try_qbread(struct queue *q, size_t len, int qio_flags,
726                         struct block **real_ret, struct block *spare)
727 {
728         struct block *ret, *ret_last, *first;
729         size_t blen;
730         bool was_unwritable = FALSE;
731
732         if (qio_flags & QIO_CAN_ERR_SLEEP) {
733                 if (!qwait_and_ilock(q, qio_flags)) {
734                         spin_unlock_irqsave(&q->lock);
735                         return QBR_FAIL;
736                 }
737                 /* we qwaited and still hold the lock, so the q is not empty */
738                 first = q->bfirst;
739         } else {
740                 spin_lock_irqsave(&q->lock);
741                 first = q->bfirst;
742                 if (!first) {
743                         spin_unlock_irqsave(&q->lock);
744                         return QBR_FAIL;
745                 }
746         }
747         /* We need to check before adjusting q->len.  We're checking the
748          * writer's sleep condition / tap condition.  When set, we *might* be
749          * making an edge transition (from unwritable to writable), which needs
750          * to wake and fire taps.  But, our read might not drain the queue below
751          * q->lim.  We'll check again later to see if we should really wake
752          * them.  */
753         was_unwritable = !qwritable(q);
754         blen = BLEN(first);
755         if ((q->state & Qcoalesce) && (blen == 0)) {
756                 freeb(pop_first_block(q));
757                 spin_unlock_irqsave(&q->lock);
758                 /* Need to retry to make sure we have a first block */
759                 return QBR_AGAIN;
760         }
761         /* Qmsg: just return the first block.  Be careful, since our caller
762          * might not read all of the block and thus drop bytes.  Similar to
763          * SOCK_DGRAM. */
764         if (q->state & Qmsg) {
765                 ret = pop_first_block(q);
766                 goto out_ok;
767         }
768         /* Let's get at least something first - makes the code easier.  This
769          * way, we'll only ever split the block once. */
770         if (blen <= len) {
771                 ret = pop_first_block(q);
772                 len -= blen;
773         } else {
774                 /* need to split the block.  we won't actually take the first
775                  * block out of the queue - we're just extracting a little bit.
776                  */
777                 if (!spare) {
778                         /* We have nothing and need a spare block.  Retry! */
779                         spin_unlock_irqsave(&q->lock);
780                         return QBR_SPARE;
781                 }
782                 copy_from_first_block(q, spare, len);
783                 ret = spare;
784                 goto out_ok;
785         }
786         /* At this point, we just grabbed the first block.  We can try to grab
787          * some more, up to len (if they want). */
788         if (qio_flags & QIO_JUST_ONE_BLOCK)
789                 goto out_ok;
790         ret_last = ret;
791         while (q->bfirst && (len > 0)) {
792                 blen = BLEN(q->bfirst);
793                 if ((q->state & Qcoalesce) && (blen == 0)) {
794                         /* remove the intermediate 0 blocks */
795                         freeb(pop_first_block(q));
796                         continue;
797                 }
798                 if (blen > len) {
799                         /* We could try to split the block, but that's a huge
800                          * pain.  For instance, we might need to move the main
801                          * body of b into an extra_data of ret_last.  lots of
802                          * ways for that to fail, and lots of cases to consider.
803                          * Easier to just bail out.  This is why I did the first
804                          * block above: we don't need to worry about this. */
805                          break;
806                 }
807                 ret_last->next = pop_first_block(q);
808                 ret_last = ret_last->next;
809                 len -= blen;
810         }
811 out_ok:
812         /* Don't wake them up or fire tap if we didn't drain enough. */
813         if (!qwritable(q))
814                 was_unwritable = FALSE;
815         spin_unlock_irqsave(&q->lock);
816         if (was_unwritable) {
817                 if (q->kick && !(qio_flags & QIO_DONT_KICK))
818                         q->kick(q->arg);
819                 rendez_wakeup(&q->wr);
820                 qwake_cb(q, FDTAP_FILT_WRITABLE);
821         }
822         *real_ret = ret;
823         return QBR_OK;
824 }
825
826 /* Helper and front-end for __try_qbread: extracts and returns a list of blocks
827  * containing up to len bytes.  It may contain less than len even if q has more
828  * data.
829  *
830  * Returns 0 if the q is closed, if it would require blocking and !CAN_BLOCK, or
831  * if it required a spare and the memory allocation failed.
832  *
833  * Technically, there's a weird corner case with !Qcoalesce and Qmsg where you
834  * could get a zero length block back. */
835 static struct block *__qbread(struct queue *q, size_t len, int qio_flags,
836                               int mem_flags)
837 {
838         ERRSTACK(1);
839         struct block *ret = 0;
840         struct block *volatile spare = 0;       /* volatile for the waserror */
841
842         /* __try_qbread can throw, based on qio flags. */
843         if ((qio_flags & QIO_CAN_ERR_SLEEP) && waserror()) {
844                 if (spare)
845                         freeb(spare);
846                 nexterror();
847         }
848         while (1) {
849                 switch (__try_qbread(q, len, qio_flags, &ret, spare)) {
850                 case QBR_OK:
851                 case QBR_FAIL:
852                         if (spare && (ret != spare))
853                                 freeb(spare);
854                         goto out_ret;
855                 case QBR_SPARE:
856                         assert(!spare);
857                         /* Due to some nastiness, we need a fresh block so we
858                          * can read out anything from the queue.  'len' seems
859                          * like a reasonable amount.  Maybe we can get away with
860                          * less. */
861                         spare = block_alloc(len, mem_flags);
862                         if (!spare) {
863                                 /* Careful here: a memory failure (possible with
864                                  * MEM_ATOMIC) could look like 'no data in the
865                                  * queue' (QBR_FAIL).  The only one who does is
866                                  * this qget(), who happens to know that we
867                                  * won't need a spare, due to the len argument.
868                                  * Spares are only needed when we need to split
869                                  * a block. */
870                                 ret = 0;
871                                 goto out_ret;
872                         }
873                         break;
874                 case QBR_AGAIN:
875                         /* if the first block is 0 and we are Qcoalesce, then
876                          * we'll need to try again.  We bounce out of __try so
877                          * we can perform the "is there a block" logic again
878                          * from the top. */
879                         break;
880                 }
881         }
882         assert(0);
883 out_ret:
884         if (qio_flags & QIO_CAN_ERR_SLEEP)
885                 poperror();
886         return ret;
887 }
888
889 /*
890  *  get next block from a queue, return null if nothing there
891  */
892 struct block *qget(struct queue *q)
893 {
894         /* since len == SIZE_MAX, we should never need to do a mem alloc */
895         return __qbread(q, SIZE_MAX, QIO_JUST_ONE_BLOCK, MEM_ATOMIC);
896 }
897
898 /* Throw away the next 'len' bytes in the queue returning the number actually
899  * discarded.
900  *
901  * If the bytes are in the queue, then they must be discarded.  The only time to
902  * return less than len is if the q itself has less than len bytes.
903  *
904  * This won't trigger a kick when waking up any sleepers.  This seems to be Plan
905  * 9's intent, since the TCP stack will deadlock if qdiscard kicks. */
906 size_t qdiscard(struct queue *q, size_t len)
907 {
908         struct block *blist;
909         size_t removed_amt;
910         size_t sofar = 0;
911
912         /* This is racy.  There could be multiple qdiscarders or other
913          * consumers, where the consumption could be interleaved. */
914         while (qlen(q) && len) {
915                 blist = __qbread(q, len, QIO_DONT_KICK, MEM_WAIT);
916                 removed_amt = freeblist(blist);
917                 sofar += removed_amt;
918                 len -= removed_amt;
919         }
920         return sofar;
921 }
922
923 ssize_t qpass(struct queue *q, struct block *b)
924 {
925         return __qbwrite(q, b, QIO_LIMIT | QIO_DROP_OVERFLOW);
926 }
927
928 ssize_t qpassnolim(struct queue *q, struct block *b)
929 {
930         return __qbwrite(q, b, 0);
931 }
932
933 /*
934  *  if the allocated space is way out of line with the used
935  *  space, reallocate to a smaller block
936  */
937 struct block *packblock(struct block *bp)
938 {
939         struct block **l, *nbp;
940         int n;
941
942         if (bp->extra_len)
943                 return bp;
944         for (l = &bp; *l; l = &(*l)->next) {
945                 nbp = *l;
946                 n = BLEN(nbp);
947                 if ((n << 2) < BALLOC(nbp)) {
948                         *l = block_alloc(n, MEM_WAIT);
949                         memmove((*l)->wp, nbp->rp, n);
950                         (*l)->wp += n;
951                         (*l)->next = nbp->next;
952                         freeb(nbp);
953                 }
954         }
955
956         return bp;
957 }
958
959 /* Add an extra_data entry to newb at newb_idx pointing to b's body, starting at
960  * body_rp, for up to len.  Returns the len consumed.
961  *
962  * The base is 'b', so that we can kfree it later.  This currently ties us to
963  * using kfree for the release method for all extra_data.
964  *
965  * It is possible to have a body size that is 0, if there is no offset, and
966  * b->wp == b->rp.  This will have an extra data entry of 0 length. */
967 static size_t point_to_body(struct block *b, uint8_t *body_rp,
968                             struct block *newb, unsigned int newb_idx,
969                             size_t len)
970 {
971         struct extra_bdata *ebd = &newb->extra_data[newb_idx];
972
973         assert(newb_idx < newb->nr_extra_bufs);
974
975         kmalloc_incref(b);
976         ebd->base = (uintptr_t)b;
977         ebd->off = (uint32_t)(body_rp - (uint8_t*)b);
978         ebd->len = MIN(b->wp - body_rp, len);   /* think of body_rp as b->rp */
979         assert((int)ebd->len >= 0);
980         newb->extra_len += ebd->len;
981         return ebd->len;
982 }
983
984 /* Add an extra_data entry to newb at newb_idx pointing to b's b_idx'th
985  * extra_data buf, at b_off within that buffer, for up to len.  Returns the len
986  * consumed.
987  *
988  * We can have blocks with 0 length, but they are still refcnt'd.  See above. */
989 static size_t point_to_buf(struct block *b, unsigned int b_idx, uint32_t b_off,
990                            struct block *newb, unsigned int newb_idx,
991                            size_t len)
992 {
993         struct extra_bdata *n_ebd = &newb->extra_data[newb_idx];
994         struct extra_bdata *b_ebd = &b->extra_data[b_idx];
995
996         assert(b_idx < b->nr_extra_bufs);
997         assert(newb_idx < newb->nr_extra_bufs);
998
999         kmalloc_incref((void*)b_ebd->base);
1000         n_ebd->base = b_ebd->base;
1001         n_ebd->off = b_ebd->off + b_off;
1002         n_ebd->len = MIN(b_ebd->len - b_off, len);
1003         newb->extra_len += n_ebd->len;
1004         return n_ebd->len;
1005 }
1006
1007 /* given a string of blocks, sets up the new block's extra_data such that it
1008  * *points* to the contents of the blist [offset, len + offset).  This does not
1009  * make a separate copy of the contents of the blist.
1010  *
1011  * returns 0 on success.  the only failure is if the extra_data array was too
1012  * small, so this returns a positive integer saying how big the extra_data needs
1013  * to be.
1014  *
1015  * callers are responsible for protecting the list structure. */
1016 static int __blist_clone_to(struct block *blist, struct block *newb, int len,
1017                             uint32_t offset)
1018 {
1019         struct block *b, *first;
1020         unsigned int nr_bufs = 0;
1021         unsigned int b_idx, newb_idx = 0;
1022         uint8_t *first_main_body = 0;
1023         ssize_t sofar = 0;
1024
1025         /* find the first block; keep offset relative to the latest b in the
1026          * list */
1027         for (b = blist; b; b = b->next) {
1028                 if (BLEN(b) > offset)
1029                         break;
1030                 offset -= BLEN(b);
1031         }
1032         /* qcopy semantics: if you asked for an offset outside the block list,
1033          * you get an empty block back */
1034         if (!b)
1035                 return 0;
1036         first = b;
1037         sofar -= offset; /* don't count the remaining offset in the first b */
1038         /* upper bound for how many buffers we'll need in newb */
1039         for (/* b is set*/; b; b = b->next) {
1040                 nr_bufs += BHLEN(b) ? 1 : 0;
1041                 /* still assuming nr_extra == nr_valid */
1042                 nr_bufs += b->nr_extra_bufs;
1043                 sofar += BLEN(b);
1044                 if (sofar > len)
1045                         break;
1046         }
1047         /* we might be holding a spinlock here, so we won't wait for kmalloc */
1048         if (block_add_extd(newb, nr_bufs, 0) != 0) {
1049                 /* caller will need to alloc these, then re-call us */
1050                 return nr_bufs;
1051         }
1052         for (b = first; b && len; b = b->next) {
1053                 b_idx = 0;
1054                 if (offset) {
1055                         if (offset < BHLEN(b)) {
1056                                 /* off is in the main body */
1057                                 len -= point_to_body(b, b->rp + offset, newb,
1058                                                      newb_idx, len);
1059                                 newb_idx++;
1060                         } else {
1061                                 /* off is in one of the buffers (or just past
1062                                  * the last one).  we're not going to point to
1063                                  * b's main body at all. */
1064                                 offset -= BHLEN(b);
1065                                 assert(b->extra_data);
1066                                 /* assuming these extrabufs are packed, or at
1067                                  * least that len isn't gibberish */
1068                                 while (b->extra_data[b_idx].len <= offset) {
1069                                         offset -= b->extra_data[b_idx].len;
1070                                         b_idx++;
1071                                 }
1072                                 /* now offset is set to our offset in the
1073                                  * b_idx'th buf */
1074                                 len -= point_to_buf(b, b_idx, offset, newb,
1075                                                     newb_idx, len);
1076                                 newb_idx++;
1077                                 b_idx++;
1078                         }
1079                         offset = 0;
1080                 } else {
1081                         if (BHLEN(b)) {
1082                                 len -= point_to_body(b, b->rp, newb, newb_idx,
1083                                                      len);
1084                                 newb_idx++;
1085                         }
1086                 }
1087                 /* knock out all remaining bufs.  we only did one point_to_ op
1088                  * by now, and any point_to_ could be our last if it consumed
1089                  * all of len. */
1090                 for (int i = b_idx; (i < b->nr_extra_bufs) && len; i++) {
1091                         len -= point_to_buf(b, i, 0, newb, newb_idx, len);
1092                         newb_idx++;
1093                 }
1094         }
1095         return 0;
1096 }
1097
1098 struct block *blist_clone(struct block *blist, int header_len, int len,
1099                           uint32_t offset)
1100 {
1101         int ret;
1102         struct block *newb = block_alloc(header_len, MEM_WAIT);
1103
1104         do {
1105                 ret = __blist_clone_to(blist, newb, len, offset);
1106                 if (ret)
1107                         block_add_extd(newb, ret, MEM_WAIT);
1108         } while (ret);
1109         return newb;
1110 }
1111
1112 /* given a queue, makes a single block with header_len reserved space in the
1113  * block main body, and the contents of [offset, len + offset) pointed to in the
1114  * new blocks ext_data.  This does not make a copy of the q's contents, though
1115  * you do have a ref count on the memory. */
1116 struct block *qclone(struct queue *q, int header_len, int len, uint32_t offset)
1117 {
1118         int ret;
1119         struct block *newb = block_alloc(header_len, MEM_WAIT);
1120         /* the while loop should rarely be used: it would require someone
1121          * concurrently adding to the queue. */
1122         do {
1123                 /* TODO: RCU protect the q list (b->next) (need read lock) */
1124                 spin_lock_irqsave(&q->lock);
1125                 ret = __blist_clone_to(q->bfirst, newb, len, offset);
1126                 spin_unlock_irqsave(&q->lock);
1127                 if (ret)
1128                         block_add_extd(newb, ret, MEM_WAIT);
1129         } while (ret);
1130         return newb;
1131 }
1132
1133 /*
1134  *  copy from offset in the queue
1135  */
1136 struct block *qcopy_old(struct queue *q, int len, uint32_t offset)
1137 {
1138         int sofar;
1139         int n;
1140         struct block *b, *nb;
1141         uint8_t *p;
1142
1143         nb = block_alloc(len, MEM_WAIT);
1144
1145         spin_lock_irqsave(&q->lock);
1146
1147         /* go to offset */
1148         b = q->bfirst;
1149         for (sofar = 0;; sofar += n) {
1150                 if (b == NULL) {
1151                         spin_unlock_irqsave(&q->lock);
1152                         return nb;
1153                 }
1154                 n = BLEN(b);
1155                 if (sofar + n > offset) {
1156                         p = b->rp + offset - sofar;
1157                         n -= offset - sofar;
1158                         break;
1159                 }
1160                 QDEBUG checkb(b, "qcopy");
1161                 b = b->next;
1162         }
1163
1164         /* copy bytes from there */
1165         for (sofar = 0; sofar < len;) {
1166                 if (n > len - sofar)
1167                         n = len - sofar;
1168                 PANIC_EXTRA(b);
1169                 memmove(nb->wp, p, n);
1170                 qcopycnt += n;
1171                 sofar += n;
1172                 nb->wp += n;
1173                 b = b->next;
1174                 if (b == NULL)
1175                         break;
1176                 n = BLEN(b);
1177                 p = b->rp;
1178         }
1179         spin_unlock_irqsave(&q->lock);
1180
1181         return nb;
1182 }
1183
1184 struct block *qcopy(struct queue *q, int len, uint32_t offset)
1185 {
1186 #ifdef CONFIG_BLOCK_EXTRAS
1187         return qclone(q, 0, len, offset);
1188 #else
1189         return qcopy_old(q, len, offset);
1190 #endif
1191 }
1192
1193 static void qinit_common(struct queue *q)
1194 {
1195         spinlock_init_irqsave(&q->lock);
1196         rendez_init(&q->rr);
1197         rendez_init(&q->wr);
1198 }
1199
1200 /*
1201  *  called by non-interrupt code
1202  */
1203 struct queue *qopen(int limit, int msg, void (*kick) (void *), void *arg)
1204 {
1205         struct queue *q;
1206
1207         q = kzmalloc(sizeof(struct queue), 0);
1208         if (q == 0)
1209                 return 0;
1210         qinit_common(q);
1211
1212         q->limit = q->inilim = limit;
1213         q->kick = kick;
1214         q->arg = arg;
1215         q->state = msg;
1216         q->eof = 0;
1217
1218         return q;
1219 }
1220
1221 /* open a queue to be bypassed */
1222 struct queue *qbypass(void (*bypass) (void *, struct block *), void *arg)
1223 {
1224         struct queue *q;
1225
1226         q = kzmalloc(sizeof(struct queue), 0);
1227         if (q == 0)
1228                 return 0;
1229         qinit_common(q);
1230
1231         q->limit = 0;
1232         q->arg = arg;
1233         q->bypass = bypass;
1234         q->state = 0;
1235
1236         return q;
1237 }
1238
1239 static int notempty(void *a)
1240 {
1241         struct queue *q = a;
1242
1243         return (q->state & Qclosed) || q->bfirst != 0;
1244 }
1245
1246 /* Block, waiting for the queue to be non-empty or closed.  Returns with
1247  * the spinlock held.  Returns TRUE when there queue is not empty, FALSE if it
1248  * was naturally closed.  Throws an error o/w. */
1249 static bool qwait_and_ilock(struct queue *q, int qio_flags)
1250 {
1251         while (1) {
1252                 spin_lock_irqsave(&q->lock);
1253                 if (q->bfirst != NULL)
1254                         return TRUE;
1255                 if (q->state & Qclosed) {
1256                         if (++q->eof > 3) {
1257                                 spin_unlock_irqsave(&q->lock);
1258                                 error(EPIPE,
1259                                       "multiple reads on a closed queue");
1260                         }
1261                         if (q->err[0]) {
1262                                 spin_unlock_irqsave(&q->lock);
1263                                 error(EPIPE, q->err);
1264                         }
1265                         return FALSE;
1266                 }
1267                 if (qio_flags & QIO_NON_BLOCK) {
1268                         spin_unlock_irqsave(&q->lock);
1269                         error(EAGAIN, "queue empty");
1270                 }
1271                 spin_unlock_irqsave(&q->lock);
1272                 /* As with the producer side, we check for a condition while
1273                  * holding the q->lock, decide to sleep, then unlock.  It's like
1274                  * the "check, signal, check again" pattern, but we do it
1275                  * conditionally.  Both sides agree synchronously to do it, and
1276                  * those decisions are made while holding q->lock.  I think this
1277                  * is OK.
1278                  *
1279                  * The invariant is that no reader sleeps when the queue has
1280                  * data.  While holding the rendez lock, if we see there's no
1281                  * data, we'll sleep.  Since we saw there was no data, the next
1282                  * writer will see (or already saw) no data, and then the writer
1283                  * decides to rendez_wake, which will grab the rendez lock.  If
1284                  * the writer already did that, then we'll see notempty when we
1285                  * do our check-again. */
1286                 rendez_sleep(&q->rr, notempty, q);
1287         }
1288 }
1289
1290 /*
1291  * add a block list to a queue
1292  * XXX basically the same as enqueue blist, and has no locking!
1293  */
1294 void qaddlist(struct queue *q, struct block *b)
1295 {
1296         /* TODO: q lock? */
1297         /* queue the block */
1298         if (q->bfirst)
1299                 q->blast->next = b;
1300         else
1301                 q->bfirst = b;
1302         q->dlen += blocklen(b);
1303         while (b->next)
1304                 b = b->next;
1305         q->blast = b;
1306 }
1307
1308 static size_t read_from_block(struct block *b, uint8_t *to, size_t amt)
1309 {
1310         size_t copy_amt, retval = 0;
1311         struct extra_bdata *ebd;
1312
1313         copy_amt = MIN(BHLEN(b), amt);
1314         memcpy(to, b->rp, copy_amt);
1315         /* advance the rp, since this block might not be completely consumed and
1316          * future reads need to know where to pick up from */
1317         b->rp += copy_amt;
1318         to += copy_amt;
1319         amt -= copy_amt;
1320         retval += copy_amt;
1321         for (int i = 0; (i < b->nr_extra_bufs) && amt; i++) {
1322                 ebd = &b->extra_data[i];
1323                 /* skip empty entires.  if we track this in the struct block, we
1324                  * can just start the for loop early */
1325                 if (!ebd->base || !ebd->len)
1326                         continue;
1327                 copy_amt = MIN(ebd->len, amt);
1328                 memcpy(to, (void*)(ebd->base + ebd->off), copy_amt);
1329                 /* we're actually consuming the entries, just like how we
1330                  * advance rp up above, and might only consume part of one. */
1331                 ebd->len -= copy_amt;
1332                 ebd->off += copy_amt;
1333                 b->extra_len -= copy_amt;
1334                 if (!ebd->len) {
1335                         /* we don't actually have to decref here.  it's also
1336                          * done in freeb().  this is the earliest we can free.
1337                          */
1338                         kfree((void*)ebd->base);
1339                         ebd->base = ebd->off = 0;
1340                 }
1341                 to += copy_amt;
1342                 amt -= copy_amt;
1343                 retval += copy_amt;
1344         }
1345         return retval;
1346 }
1347
1348 /*
1349  *  copy the contents of a string of blocks into
1350  *  memory.  emptied blocks are freed.  return
1351  *  pointer to first unconsumed block.
1352  */
1353 struct block *bl2mem(uint8_t * p, struct block *b, int n)
1354 {
1355         int i;
1356         struct block *next;
1357
1358         /* could be slicker here, since read_from_block is smart */
1359         for (; b != NULL; b = next) {
1360                 i = BLEN(b);
1361                 if (i > n) {
1362                         /* partial block, consume some */
1363                         read_from_block(b, p, n);
1364                         return b;
1365                 }
1366                 /* full block, consume all and move on */
1367                 i = read_from_block(b, p, i);
1368                 n -= i;
1369                 p += i;
1370                 next = b->next;
1371                 freeb(b);
1372         }
1373         return NULL;
1374 }
1375
1376 /* Extract the contents of all blocks and copy to va, up to len.  Returns the
1377  * actual amount copied. */
1378 static size_t read_all_blocks(struct block *b, void *va, size_t len)
1379 {
1380         size_t sofar = 0;
1381         struct block *next;
1382
1383         do {
1384                 assert(va);
1385                 assert(b->rp);
1386                 sofar += read_from_block(b, va + sofar, len - sofar);
1387                 if (BLEN(b) && b->next)
1388                         panic("Failed to drain entire block (Qmsg?) but had a next!");
1389                 next = b->next;
1390                 freeb(b);
1391                 b = next;
1392         } while (b);
1393         return sofar;
1394 }
1395
1396 /*
1397  *  copy the contents of memory into a string of blocks.
1398  *  return NULL on error.
1399  */
1400 struct block *mem2bl(uint8_t * p, int len)
1401 {
1402         ERRSTACK(1);
1403         int n;
1404         struct block *b, *first, **l;
1405
1406         first = NULL;
1407         l = &first;
1408         if (waserror()) {
1409                 freeblist(first);
1410                 nexterror();
1411         }
1412         do {
1413                 n = len;
1414                 if (n > Maxatomic)
1415                         n = Maxatomic;
1416
1417                 *l = b = block_alloc(n, MEM_WAIT);
1418                 /* TODO consider extra_data */
1419                 memmove(b->wp, p, n);
1420                 b->wp += n;
1421                 p += n;
1422                 len -= n;
1423                 l = &b->next;
1424         } while (len > 0);
1425         poperror();
1426
1427         return first;
1428 }
1429
1430 /*
1431  *  put a block back to the front of the queue
1432  *  called with q ilocked
1433  */
1434 void qputback(struct queue *q, struct block *b)
1435 {
1436         b->next = q->bfirst;
1437         if (q->bfirst == NULL)
1438                 q->blast = b;
1439         q->bfirst = b;
1440         q->dlen += BLEN(b);
1441         /* qputback seems to undo a read, so we can undo the accounting too. */
1442         q->bytes_read -= BLEN(b);
1443 }
1444
1445 /*
1446  *  get next block from a queue (up to a limit)
1447  *
1448  */
1449 struct block *qbread(struct queue *q, size_t len)
1450 {
1451         return __qbread(q, len, QIO_JUST_ONE_BLOCK | QIO_CAN_ERR_SLEEP,
1452                         MEM_WAIT);
1453 }
1454
1455 struct block *qbread_nonblock(struct queue *q, size_t len)
1456 {
1457         return __qbread(q, len, QIO_JUST_ONE_BLOCK | QIO_CAN_ERR_SLEEP |
1458                         QIO_NON_BLOCK, MEM_WAIT);
1459 }
1460
1461 /* read up to len from a queue into vp. */
1462 size_t qread(struct queue *q, void *va, size_t len)
1463 {
1464         struct block *blist = __qbread(q, len, QIO_CAN_ERR_SLEEP, MEM_WAIT);
1465
1466         if (!blist)
1467                 return 0;
1468         return read_all_blocks(blist, va, len);
1469 }
1470
1471 size_t qread_nonblock(struct queue *q, void *va, size_t len)
1472 {
1473         struct block *blist = __qbread(q, len, QIO_CAN_ERR_SLEEP |
1474                                        QIO_NON_BLOCK, MEM_WAIT);
1475
1476         if (!blist)
1477                 return 0;
1478         return read_all_blocks(blist, va, len);
1479 }
1480
1481 /* This is the rendez wake condition for writers. */
1482 static int qwriter_should_wake(void *a)
1483 {
1484         struct queue *q = a;
1485
1486         return qwritable(q) || (q->state & Qclosed);
1487 }
1488
1489 /* Helper: enqueues a list of blocks to a queue.  Returns the total length. */
1490 static size_t enqueue_blist(struct queue *q, struct block *b)
1491 {
1492         size_t dlen;
1493
1494         if (q->bfirst)
1495                 q->blast->next = b;
1496         else
1497                 q->bfirst = b;
1498         dlen = BLEN(b);
1499         while (b->next) {
1500                 b = b->next;
1501                 dlen += BLEN(b);
1502         }
1503         q->blast = b;
1504         q->dlen += dlen;
1505         return dlen;
1506 }
1507
1508 /* Adds block (which can be a list of blocks) to the queue, subject to
1509  * qio_flags.  Returns the length written on success or -1 on non-throwable
1510  * error.  Adjust qio_flags to control the value-added features!. */
1511 static ssize_t __qbwrite(struct queue *q, struct block *b, int qio_flags)
1512 {
1513         ssize_t ret;
1514         bool was_unreadable;
1515
1516         if (q->bypass) {
1517                 ret = blocklen(b);
1518                 (*q->bypass) (q->arg, b);
1519                 return ret;
1520         }
1521         spin_lock_irqsave(&q->lock);
1522         was_unreadable = q->dlen == 0;
1523         if (q->state & Qclosed) {
1524                 spin_unlock_irqsave(&q->lock);
1525                 freeblist(b);
1526                 if (!(qio_flags & QIO_CAN_ERR_SLEEP))
1527                         return -1;
1528                 if (q->err[0])
1529                         error(EPIPE, q->err);
1530                 else
1531                         error(EPIPE, "connection closed");
1532         }
1533         if ((qio_flags & QIO_LIMIT) && (q->dlen >= q->limit)) {
1534                 /* drop overflow takes priority over regular non-blocking */
1535                 if ((qio_flags & QIO_DROP_OVERFLOW)
1536                     || (q->state & Qdropoverflow)) {
1537                         spin_unlock_irqsave(&q->lock);
1538                         freeb(b);
1539                         return -1;
1540                 }
1541                 /* People shouldn't set NON_BLOCK without CAN_ERR, but we can be
1542                  * nice and catch it. */
1543                 if ((qio_flags & QIO_CAN_ERR_SLEEP)
1544                     && (qio_flags & QIO_NON_BLOCK)) {
1545                         spin_unlock_irqsave(&q->lock);
1546                         freeb(b);
1547                         error(EAGAIN, "queue full");
1548                 }
1549         }
1550         ret = enqueue_blist(q, b);
1551         QDEBUG checkb(b, "__qbwrite");
1552         spin_unlock_irqsave(&q->lock);
1553         /* TODO: not sure if the usage of a kick is mutually exclusive with a
1554          * wakeup, meaning that actual users either want a kick or have
1555          * qreaders. */
1556         if (q->kick && (was_unreadable || (q->state & Qkick)))
1557                 q->kick(q->arg);
1558         if (was_unreadable) {
1559                 /* Unlike the read side, there's no double-check to make sure
1560                  * the queue transitioned across an edge.  We know we added
1561                  * something, so that's enough.  We wake if the queue was empty.
1562                  * Both sides are the same, in that the condition for which we
1563                  * do the rendez_wakeup() is the same as the condition done for
1564                  * the rendez_sleep(). */
1565                 rendez_wakeup(&q->rr);
1566                 qwake_cb(q, FDTAP_FILT_READABLE);
1567         }
1568         /*
1569          *  flow control, wait for queue to get below the limit
1570          *  before allowing the process to continue and queue
1571          *  more.  We do this here so that postnote can only
1572          *  interrupt us after the data has been queued.  This
1573          *  means that things like 9p flushes and ssl messages
1574          *  will not be disrupted by software interrupts.
1575          *
1576          *  Note - this is moderately dangerous since a process
1577          *  that keeps getting interrupted and rewriting will
1578          *  queue infinite crud.
1579          */
1580         if ((qio_flags & QIO_CAN_ERR_SLEEP) &&
1581             !(q->state & Qdropoverflow) && !(qio_flags & QIO_NON_BLOCK)) {
1582                 /* This is a racy peek at the q status.  If we accidentally
1583                  * block, our rendez will return.  The rendez's peak
1584                  * (qwriter_should_wake) is also racy w.r.t.  the q's spinlock
1585                  * (that lock protects writes, but not reads).
1586                  *
1587                  * Here's the deal: when holding the rendez lock, if we see the
1588                  * sleep condition, the consumer will wake us.  The condition
1589                  * will only ever be changed by the next qbread() (consumer,
1590                  * changes q->dlen).  That code will do a rendez wake, which
1591                  * will spin on the rendez lock, meaning it won't procede until
1592                  * we either see the new state (and return) or put ourselves on
1593                  * the rendez, and wake up.
1594                  *
1595                  * The pattern is one side writes mem, then signals.  Our side
1596                  * checks the signal, then reads the mem.  The goal is to not
1597                  * miss seeing the signal AND missing the memory write.  In this
1598                  * specific case, the signal is actually synchronous (the rendez
1599                  * lock) and not basic shared memory.
1600                  *
1601                  * Oh, and we spin in case we woke early and someone else filled
1602                  * the queue, mesa-style. */
1603                 while (!qwriter_should_wake(q))
1604                         rendez_sleep(&q->wr, qwriter_should_wake, q);
1605         }
1606         return ret;
1607 }
1608
1609 /*
1610  *  add a block to a queue obeying flow control
1611  */
1612 ssize_t qbwrite(struct queue *q, struct block *b)
1613 {
1614         return __qbwrite(q, b, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
1615 }
1616
1617 ssize_t qbwrite_nonblock(struct queue *q, struct block *b)
1618 {
1619         return __qbwrite(q, b, QIO_CAN_ERR_SLEEP | QIO_LIMIT | QIO_NON_BLOCK);
1620 }
1621
1622 ssize_t qibwrite(struct queue *q, struct block *b)
1623 {
1624         return __qbwrite(q, b, 0);
1625 }
1626
1627 /* Helper, allocs a block and copies [from, from + len) into it.  Returns the
1628  * block on success, 0 on failure. */
1629 static struct block *build_block(void *from, size_t len, int mem_flags)
1630 {
1631         struct block *b;
1632         void *ext_buf;
1633
1634         /* If len is small, we don't need to bother with the extra_data.  But
1635          * until the whole stack can handle extd blocks, we'll use them
1636          * unconditionally.  */
1637 #ifdef CONFIG_BLOCK_EXTRAS
1638         /* allocb builds in 128 bytes of header space to all blocks, but this is
1639          * only available via padblock (to the left).  we also need some space
1640          * for pullupblock for some basic headers (like icmp) that get written
1641          * in directly */
1642         b = block_alloc(64, mem_flags);
1643         if (!b)
1644                 return 0;
1645         ext_buf = kmalloc(len, mem_flags);
1646         if (!ext_buf) {
1647                 kfree(b);
1648                 return 0;
1649         }
1650         memcpy(ext_buf, from, len);
1651         if (block_add_extd(b, 1, mem_flags)) {
1652                 kfree(ext_buf);
1653                 kfree(b);
1654                 return 0;
1655         }
1656         b->extra_data[0].base = (uintptr_t)ext_buf;
1657         b->extra_data[0].off = 0;
1658         b->extra_data[0].len = len;
1659         b->extra_len += len;
1660 #else
1661         b = block_alloc(len, mem_flags);
1662         if (!b)
1663                 return 0;
1664         memmove(b->wp, from, len);
1665         b->wp += len;
1666 #endif
1667         return b;
1668 }
1669
1670 static ssize_t __qwrite(struct queue *q, void *vp, size_t len, int mem_flags,
1671                         int qio_flags)
1672 {
1673         ERRSTACK(1);
1674         size_t n;
1675         volatile size_t sofar = 0;      /* volatile for the waserror */
1676         struct block *b;
1677         uint8_t *p = vp;
1678         void *ext_buf;
1679
1680         /* Only some callers can throw.  Others might be in a context where
1681          * waserror isn't safe. */
1682         if ((qio_flags & QIO_CAN_ERR_SLEEP) && waserror()) {
1683                 /* Any error (EAGAIN for nonblock, syscall aborted, even EPIPE)
1684                  * after some data has been sent should be treated as a partial
1685                  * write. */
1686                 if (sofar)
1687                         goto out_ok;
1688                 nexterror();
1689         }
1690         do {
1691                 n = len - sofar;
1692                 /* This is 64K, the max amount per single block.  Still a good
1693                  * value? */
1694                 if (n > Maxatomic)
1695                         n = Maxatomic;
1696                 b = build_block(p + sofar, n, mem_flags);
1697                 if (!b)
1698                         break;
1699                 if (__qbwrite(q, b, qio_flags) < 0)
1700                         break;
1701                 sofar += n;
1702         } while ((sofar < len) && (q->state & Qmsg) == 0);
1703 out_ok:
1704         if (qio_flags & QIO_CAN_ERR_SLEEP)
1705                 poperror();
1706         return sofar;
1707 }
1708
1709 ssize_t qwrite(struct queue *q, void *vp, int len)
1710 {
1711         return __qwrite(q, vp, len, MEM_WAIT, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
1712 }
1713
1714 ssize_t qwrite_nonblock(struct queue *q, void *vp, int len)
1715 {
1716         return __qwrite(q, vp, len, MEM_WAIT, QIO_CAN_ERR_SLEEP | QIO_LIMIT |
1717                                               QIO_NON_BLOCK);
1718 }
1719
1720 ssize_t qiwrite(struct queue *q, void *vp, int len)
1721 {
1722         return __qwrite(q, vp, len, MEM_ATOMIC, 0);
1723 }
1724
1725 /*
1726  *  be extremely careful when calling this,
1727  *  as there is no reference accounting
1728  */
1729 void qfree(struct queue *q)
1730 {
1731         qclose(q);
1732         kfree(q);
1733 }
1734
1735 /*
1736  *  Mark a queue as closed.  No further IO is permitted.
1737  *  All blocks are released.
1738  */
1739 void qclose(struct queue *q)
1740 {
1741         struct block *bfirst;
1742
1743         if (q == NULL)
1744                 return;
1745
1746         /* mark it */
1747         spin_lock_irqsave(&q->lock);
1748         q->state |= Qclosed;
1749         q->state &= ~Qdropoverflow;
1750         q->err[0] = 0;
1751         bfirst = q->bfirst;
1752         q->bfirst = 0;
1753         q->dlen = 0;
1754         spin_unlock_irqsave(&q->lock);
1755
1756         /* free queued blocks */
1757         freeblist(bfirst);
1758
1759         /* wake up readers/writers */
1760         rendez_wakeup(&q->rr);
1761         rendez_wakeup(&q->wr);
1762         qwake_cb(q, FDTAP_FILT_HANGUP);
1763 }
1764
1765 /* Mark a queue as closed.  Wakeup any readers.  Don't remove queued blocks.
1766  *
1767  * msg will be the errstr received by any waiters (qread, qbread, etc).  If
1768  * there is no message, which is what also happens during a natural qclose(),
1769  * those waiters will simply return 0.  qwriters will always error() on a
1770  * closed/hungup queue. */
1771 void qhangup(struct queue *q, char *msg)
1772 {
1773         /* mark it */
1774         spin_lock_irqsave(&q->lock);
1775         q->state |= Qclosed;
1776         if (msg == 0 || *msg == 0)
1777                 q->err[0] = 0;
1778         else
1779                 strlcpy(q->err, msg, ERRMAX);
1780         spin_unlock_irqsave(&q->lock);
1781
1782         /* wake up readers/writers */
1783         rendez_wakeup(&q->rr);
1784         rendez_wakeup(&q->wr);
1785         qwake_cb(q, FDTAP_FILT_HANGUP);
1786 }
1787
1788 /*
1789  *  return non-zero if the q is hungup
1790  */
1791 int qisclosed(struct queue *q)
1792 {
1793         return q->state & Qclosed;
1794 }
1795
1796 /*
1797  *  mark a queue as no longer hung up.  resets the wake_cb.
1798  */
1799 void qreopen(struct queue *q)
1800 {
1801         spin_lock_irqsave(&q->lock);
1802         q->state &= ~Qclosed;
1803         q->eof = 0;
1804         q->limit = q->inilim;
1805         q->wake_cb = 0;
1806         q->wake_data = 0;
1807         spin_unlock_irqsave(&q->lock);
1808 }
1809
1810 /*
1811  *  return bytes queued
1812  */
1813 int qlen(struct queue *q)
1814 {
1815         return q->dlen;
1816 }
1817
1818 size_t q_bytes_read(struct queue *q)
1819 {
1820         return q->bytes_read;
1821 }
1822
1823 /*
1824  * return space remaining before flow control
1825  *
1826  *  This used to be
1827  *  q->len < q->limit/2
1828  *  but it slows down tcp too much for certain write sizes.
1829  *  I really don't understand it completely.  It may be
1830  *  due to the queue draining so fast that the transmission
1831  *  stalls waiting for the app to produce more data.  - presotto
1832  *
1833  *  q->len was the amount of bytes, which is no longer used.  we now use
1834  *  q->dlen, the amount of usable data.  a.k.a. qlen()...  - brho
1835  */
1836 int qwindow(struct queue *q)
1837 {
1838         int l;
1839
1840         l = q->limit - q->dlen;
1841         if (l < 0)
1842                 l = 0;
1843         return l;
1844 }
1845
1846 /*
1847  *  return true if we can read without blocking
1848  */
1849 int qcanread(struct queue *q)
1850 {
1851         return q->bfirst != 0;
1852 }
1853
1854 /*
1855  *  change queue limit
1856  */
1857 void qsetlimit(struct queue *q, size_t limit)
1858 {
1859         bool was_writable = qwritable(q);
1860
1861         q->limit = limit;
1862         if (!was_writable && qwritable(q)) {
1863                 rendez_wakeup(&q->wr);
1864                 qwake_cb(q, FDTAP_FILT_WRITABLE);
1865         }
1866 }
1867
1868 size_t qgetlimit(struct queue *q)
1869 {
1870         return q->limit;
1871 }
1872
1873 /*
1874  *  set whether writes drop overflowing blocks, or if we sleep
1875  */
1876 void qdropoverflow(struct queue *q, bool onoff)
1877 {
1878         spin_lock_irqsave(&q->lock);
1879         if (onoff)
1880                 q->state |= Qdropoverflow;
1881         else
1882                 q->state &= ~Qdropoverflow;
1883         spin_unlock_irqsave(&q->lock);
1884 }
1885
1886 /* Be careful: this can affect concurrent reads/writes and code that might have
1887  * built-in expectations of the q's type. */
1888 void q_toggle_qmsg(struct queue *q, bool onoff)
1889 {
1890         spin_lock_irqsave(&q->lock);
1891         if (onoff)
1892                 q->state |= Qmsg;
1893         else
1894                 q->state &= ~Qmsg;
1895         spin_unlock_irqsave(&q->lock);
1896 }
1897
1898 /* Be careful: this can affect concurrent reads/writes and code that might have
1899  * built-in expectations of the q's type. */
1900 void q_toggle_qcoalesce(struct queue *q, bool onoff)
1901 {
1902         spin_lock_irqsave(&q->lock);
1903         if (onoff)
1904                 q->state |= Qcoalesce;
1905         else
1906                 q->state &= ~Qcoalesce;
1907         spin_unlock_irqsave(&q->lock);
1908 }
1909
1910 /*
1911  *  flush the output queue
1912  */
1913 void qflush(struct queue *q)
1914 {
1915         struct block *bfirst;
1916
1917         /* mark it */
1918         spin_lock_irqsave(&q->lock);
1919         bfirst = q->bfirst;
1920         q->bfirst = 0;
1921         q->dlen = 0;
1922         spin_unlock_irqsave(&q->lock);
1923
1924         /* free queued blocks */
1925         freeblist(bfirst);
1926
1927         /* wake up writers */
1928         rendez_wakeup(&q->wr);
1929         qwake_cb(q, FDTAP_FILT_WRITABLE);
1930 }
1931
1932 int qfull(struct queue *q)
1933 {
1934         return !qwritable(q);
1935 }
1936
1937 int qstate(struct queue *q)
1938 {
1939         return q->state;
1940 }
1941
1942 void qdump(struct queue *q)
1943 {
1944         if (q)
1945                 printk("q=%p bfirst=%p blast=%p dlen=%d limit=%d state=#%x\n",
1946                            q, q->bfirst, q->blast, q->dlen, q->limit, q->state);
1947 }
1948
1949 /* On certain wakeup events, qio will call func(q, data, filter), where filter
1950  * marks the type of wakeup event (flags from FDTAP).
1951  *
1952  * There's no sync protection.  If you change the CB while the qio is running,
1953  * you might get a CB with the data or func from a previous set_wake_cb.  You
1954  * should set this once per queue and forget it.
1955  *
1956  * You can remove the CB by passing in 0 for the func.  Alternatively, you can
1957  * just make sure that the func(data) pair are valid until the queue is freed or
1958  * reopened. */
1959 void qio_set_wake_cb(struct queue *q, qio_wake_cb_t func, void *data)
1960 {
1961         q->wake_data = data;
1962         wmb();  /* if we see func, we'll also see the data for it */
1963         q->wake_cb = func;
1964 }
1965
1966 /* Helper for detecting whether we'll block on a read at this instant. */
1967 bool qreadable(struct queue *q)
1968 {
1969         return qlen(q) > 0;
1970 }
1971
1972 /* Helper for detecting whether we'll block on a write at this instant. */
1973 bool qwritable(struct queue *q)
1974 {
1975         return !q->limit || qwindow(q) > 0;
1976 }