perf: Update documentation
[akaros.git] / kern / src / ns / qio.c
1 /* Copyright © 1994-1999 Lucent Technologies Inc.  All rights reserved.
2  * Portions Copyright © 1997-1999 Vita Nuova Limited
3  * Portions Copyright © 2000-2007 Vita Nuova Holdings Limited
4  *                                (www.vitanuova.com)
5  * Revisions Copyright © 2000-2007 Lucent Technologies Inc. and others
6  *
7  * Modified for the Akaros operating system:
8  * Copyright (c) 2013-2014 The Regents of the University of California
9  * Copyright (c) 2013-2015 Google Inc.
10  *
11  * Permission is hereby granted, free of charge, to any person obtaining a copy
12  * of this software and associated documentation files (the "Software"), to deal
13  * in the Software without restriction, including without limitation the rights
14  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15  * copies of the Software, and to permit persons to whom the Software is
16  * furnished to do so, subject to the following conditions:
17  *
18  * The above copyright notice and this permission notice shall be included in
19  * all copies or substantial portions of the Software.
20  *
21  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL THE
24  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
27  * SOFTWARE. */
28
29 #include <vfs.h>
30 #include <kfs.h>
31 #include <slab.h>
32 #include <kmalloc.h>
33 #include <kref.h>
34 #include <string.h>
35 #include <stdio.h>
36 #include <assert.h>
37 #include <error.h>
38 #include <cpio.h>
39 #include <pmap.h>
40 #include <smp.h>
41 #include <ip.h>
42
43 #define PANIC_EXTRA(b)                                                  \
44 {                                                                       \
45         if ((b)->extra_len) {                                           \
46                 printblock(b);                                          \
47                 backtrace();                                            \
48                 panic("%s doesn't handle extra_data", __FUNCTION__);    \
49         }                                                               \
50 }
51
52 static uint32_t padblockcnt;
53 static uint32_t concatblockcnt;
54 static uint32_t pullupblockcnt;
55 static uint32_t copyblockcnt;
56 static uint32_t consumecnt;
57 static uint32_t producecnt;
58 static uint32_t qcopycnt;
59
60 static int debugging;
61
62 #define QDEBUG  if(0)
63
64 /*
65  *  IO queues
66  */
67
68 struct queue {
69         spinlock_t lock;;
70
71         struct block *bfirst;           /* buffer */
72         struct block *blast;
73
74         int len;                                        /* bytes allocated to queue */
75         int dlen;                                       /* data bytes in queue */
76         int limit;                                      /* max bytes in queue */
77         int inilim;                             /* initial limit */
78         int state;
79         int eof;                                        /* number of eofs read by user */
80
81         void (*kick) (void *);          /* restart output */
82         void (*bypass) (void *, struct block *);        /* bypass queue altogether */
83         void *arg;                                      /* argument to kick */
84
85         struct rendez rr;                       /* process waiting to read */
86         struct rendez wr;                       /* process waiting to write */
87         qio_wake_cb_t wake_cb;          /* callbacks for qio wakeups */
88         void *wake_data;
89
90         char err[ERRMAX];
91 };
92
93 enum {
94         Maxatomic = 64 * 1024,
95         QIO_CAN_ERR_SLEEP = (1 << 0),   /* can throw errors or block/sleep */
96         QIO_LIMIT = (1 << 1),                   /* respect q->limit */
97         QIO_DROP_OVERFLOW = (1 << 2),   /* alternative to setting qdropoverflow */
98         QIO_JUST_ONE_BLOCK = (1 << 3),  /* when qbreading, just get one block */
99         QIO_NON_BLOCK = (1 << 4),               /* throw EAGAIN instead of blocking */
100 };
101
102 unsigned int qiomaxatomic = Maxatomic;
103
104 static ssize_t __qbwrite(struct queue *q, struct block *b, int flags);
105 static struct block *__qbread(struct queue *q, size_t len, int qio_flags,
106                               int mem_flags);
107 static bool qwait_and_ilock(struct queue *q, int qio_flags);
108 static void qwakeup_iunlock(struct queue *q);
109
110 /* Helper: fires a wake callback, sending 'filter' */
111 static void qwake_cb(struct queue *q, int filter)
112 {
113         if (q->wake_cb)
114                 q->wake_cb(q, q->wake_data, filter);
115 }
116
117 void ixsummary(void)
118 {
119         debugging ^= 1;
120         printd("pad %lu, concat %lu, pullup %lu, copy %lu\n",
121                    padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
122         printd("consume %lu, produce %lu, qcopy %lu\n",
123                    consumecnt, producecnt, qcopycnt);
124 }
125
126 /*
127  *  pad a block to the front (or the back if size is negative)
128  */
129 struct block *padblock(struct block *bp, int size)
130 {
131         int n;
132         struct block *nbp;
133         uint8_t bcksum = bp->flag & BCKSUM_FLAGS;
134         uint16_t checksum_start = bp->checksum_start;
135         uint16_t checksum_offset = bp->checksum_offset;
136         uint16_t mss = bp->mss;
137
138         QDEBUG checkb(bp, "padblock 1");
139         if (size >= 0) {
140                 if (bp->rp - bp->base >= size) {
141                         bp->checksum_start += size;
142                         bp->rp -= size;
143                         return bp;
144                 }
145
146                 PANIC_EXTRA(bp);
147                 if (bp->next)
148                         panic("padblock %p", getcallerpc(&bp));
149                 n = BLEN(bp);
150                 padblockcnt++;
151                 nbp = block_alloc(size + n, MEM_WAIT);
152                 nbp->rp += size;
153                 nbp->wp = nbp->rp;
154                 memmove(nbp->wp, bp->rp, n);
155                 nbp->wp += n;
156                 freeb(bp);
157                 nbp->rp -= size;
158         } else {
159                 size = -size;
160
161                 PANIC_EXTRA(bp);
162
163                 if (bp->next)
164                         panic("padblock %p", getcallerpc(&bp));
165
166                 if (bp->lim - bp->wp >= size)
167                         return bp;
168
169                 n = BLEN(bp);
170                 padblockcnt++;
171                 nbp = block_alloc(size + n, MEM_WAIT);
172                 memmove(nbp->wp, bp->rp, n);
173                 nbp->wp += n;
174                 freeb(bp);
175         }
176         if (bcksum) {
177                 nbp->flag |= bcksum;
178                 nbp->checksum_start = checksum_start;
179                 nbp->checksum_offset = checksum_offset;
180                 nbp->mss = mss;
181         }
182         QDEBUG checkb(nbp, "padblock 1");
183         return nbp;
184 }
185
186 /*
187  *  return count of bytes in a string of blocks
188  */
189 int blocklen(struct block *bp)
190 {
191         int len;
192
193         len = 0;
194         while (bp) {
195                 len += BLEN(bp);
196                 bp = bp->next;
197         }
198         return len;
199 }
200
201 /*
202  * return count of space in blocks
203  */
204 int blockalloclen(struct block *bp)
205 {
206         int len;
207
208         len = 0;
209         while (bp) {
210                 len += BALLOC(bp);
211                 bp = bp->next;
212         }
213         return len;
214 }
215
216 /*
217  *  copy the  string of blocks into
218  *  a single block and free the string
219  */
220 struct block *concatblock(struct block *bp)
221 {
222         int len;
223         struct block *nb, *f;
224
225         if (bp->next == 0)
226                 return bp;
227
228         /* probably use parts of qclone */
229         PANIC_EXTRA(bp);
230         nb = block_alloc(blocklen(bp), MEM_WAIT);
231         for (f = bp; f; f = f->next) {
232                 len = BLEN(f);
233                 memmove(nb->wp, f->rp, len);
234                 nb->wp += len;
235         }
236         concatblockcnt += BLEN(nb);
237         freeblist(bp);
238         QDEBUG checkb(nb, "concatblock 1");
239         return nb;
240 }
241
242 /* Returns a block with the remaining contents of b all in the main body of the
243  * returned block.  Replace old references to b with the returned value (which
244  * may still be 'b', if no change was needed. */
245 struct block *linearizeblock(struct block *b)
246 {
247         struct block *newb;
248         size_t len;
249         struct extra_bdata *ebd;
250
251         if (!b->extra_len)
252                 return b;
253
254         newb = block_alloc(BLEN(b), MEM_WAIT);
255         len = BHLEN(b);
256         memcpy(newb->wp, b->rp, len);
257         newb->wp += len;
258         len = b->extra_len;
259         for (int i = 0; (i < b->nr_extra_bufs) && len; i++) {
260                 ebd = &b->extra_data[i];
261                 if (!ebd->base || !ebd->len)
262                         continue;
263                 memcpy(newb->wp, (void*)(ebd->base + ebd->off), ebd->len);
264                 newb->wp += ebd->len;
265                 len -= ebd->len;
266         }
267         /* TODO: any other flags that need copied over? */
268         if (b->flag & BCKSUM_FLAGS) {
269                 newb->flag |= (b->flag & BCKSUM_FLAGS);
270                 newb->checksum_start = b->checksum_start;
271                 newb->checksum_offset = b->checksum_offset;
272                 newb->mss = b->mss;
273         }
274         freeb(b);
275         return newb;
276 }
277
278 /*
279  *  make sure the first block has at least n bytes in its main body
280  */
281 struct block *pullupblock(struct block *bp, int n)
282 {
283         int i, len, seglen;
284         struct block *nbp;
285         struct extra_bdata *ebd;
286
287         /*
288          *  this should almost always be true, it's
289          *  just to avoid every caller checking.
290          */
291         if (BHLEN(bp) >= n)
292                 return bp;
293
294          /* a start at explicit main-body / header management */
295         if (bp->extra_len) {
296                 if (n > bp->lim - bp->rp) {
297                         /* would need to realloc a new block and copy everything over. */
298                         panic("can't pullup %d bytes, no place to put it: bp->lim %p, bp->rp %p, bp->lim-bp->rp %d\n",
299                                         n, bp->lim, bp->rp, bp->lim-bp->rp);
300                 }
301                 len = n - BHLEN(bp);
302                 if (len > bp->extra_len)
303                         panic("pullup more than extra (%d, %d, %d)\n",
304                               n, BHLEN(bp), bp->extra_len);
305                 QDEBUG checkb(bp, "before pullup");
306                 for (int i = 0; (i < bp->nr_extra_bufs) && len; i++) {
307                         ebd = &bp->extra_data[i];
308                         if (!ebd->base || !ebd->len)
309                                 continue;
310                         seglen = MIN(ebd->len, len);
311                         memcpy(bp->wp, (void*)(ebd->base + ebd->off), seglen);
312                         bp->wp += seglen;
313                         len -= seglen;
314                         ebd->len -= seglen;
315                         ebd->off += seglen;
316                         bp->extra_len -= seglen;
317                         if (ebd->len == 0) {
318                                 kfree((void *)ebd->base);
319                                 ebd->off = 0;
320                                 ebd->base = 0;
321                         }
322                 }
323                 /* maybe just call pullupblock recursively here */
324                 if (len)
325                         panic("pullup %d bytes overdrawn\n", len);
326                 QDEBUG checkb(bp, "after pullup");
327                 return bp;
328         }
329
330         /*
331          *  if not enough room in the first block,
332          *  add another to the front of the list.
333          */
334         if (bp->lim - bp->rp < n) {
335                 nbp = block_alloc(n, MEM_WAIT);
336                 nbp->next = bp;
337                 bp = nbp;
338         }
339
340         /*
341          *  copy bytes from the trailing blocks into the first
342          */
343         n -= BLEN(bp);
344         while ((nbp = bp->next)) {
345                 i = BLEN(nbp);
346                 if (i > n) {
347                         memmove(bp->wp, nbp->rp, n);
348                         pullupblockcnt++;
349                         bp->wp += n;
350                         nbp->rp += n;
351                         QDEBUG checkb(bp, "pullupblock 1");
352                         return bp;
353                 } else {
354                         memmove(bp->wp, nbp->rp, i);
355                         pullupblockcnt++;
356                         bp->wp += i;
357                         bp->next = nbp->next;
358                         nbp->next = 0;
359                         freeb(nbp);
360                         n -= i;
361                         if (n == 0) {
362                                 QDEBUG checkb(bp, "pullupblock 2");
363                                 return bp;
364                         }
365                 }
366         }
367         freeb(bp);
368         return 0;
369 }
370
371 /*
372  *  make sure the first block has at least n bytes in its main body
373  */
374 struct block *pullupqueue(struct queue *q, int n)
375 {
376         struct block *b;
377
378         /* TODO: lock to protect the queue links? */
379         if ((BHLEN(q->bfirst) >= n))
380                 return q->bfirst;
381         q->bfirst = pullupblock(q->bfirst, n);
382         for (b = q->bfirst; b != NULL && b->next != NULL; b = b->next) ;
383         q->blast = b;
384         return q->bfirst;
385 }
386
387 /* throw away count bytes from the front of
388  * block's extradata.  Returns count of bytes
389  * thrown away
390  */
391
392 static int pullext(struct block *bp, int count)
393 {
394         struct extra_bdata *ed;
395         int i, rem, bytes = 0;
396
397         for (i = 0; bp->extra_len && count && i < bp->nr_extra_bufs; i++) {
398                 ed = &bp->extra_data[i];
399                 rem = MIN(count, ed->len);
400                 bp->extra_len -= rem;
401                 count -= rem;
402                 bytes += rem;
403                 ed->off += rem;
404                 ed->len -= rem;
405                 if (ed->len == 0) {
406                         kfree((void *)ed->base);
407                         ed->base = 0;
408                         ed->off = 0;
409                 }
410         }
411         return bytes;
412 }
413
414 /* throw away count bytes from the end of a
415  * block's extradata.  Returns count of bytes
416  * thrown away
417  */
418
419 static int dropext(struct block *bp, int count)
420 {
421         struct extra_bdata *ed;
422         int i, rem, bytes = 0;
423
424         for (i = bp->nr_extra_bufs - 1; bp->extra_len && count && i >= 0; i--) {
425                 ed = &bp->extra_data[i];
426                 rem = MIN(count, ed->len);
427                 bp->extra_len -= rem;
428                 count -= rem;
429                 bytes += rem;
430                 ed->len -= rem;
431                 if (ed->len == 0) {
432                         kfree((void *)ed->base);
433                         ed->base = 0;
434                         ed->off = 0;
435                 }
436         }
437         return bytes;
438 }
439
440 /*
441  *  throw away up to count bytes from a
442  *  list of blocks.  Return count of bytes
443  *  thrown away.
444  */
445 static int _pullblock(struct block **bph, int count, int free)
446 {
447         struct block *bp;
448         int n, bytes;
449
450         bytes = 0;
451         if (bph == NULL)
452                 return 0;
453
454         while (*bph != NULL && count != 0) {
455                 bp = *bph;
456
457                 n = MIN(BHLEN(bp), count);
458                 bytes += n;
459                 count -= n;
460                 bp->rp += n;
461                 n = pullext(bp, count);
462                 bytes += n;
463                 count -= n;
464                 QDEBUG checkb(bp, "pullblock ");
465                 if (BLEN(bp) == 0 && (free || count)) {
466                         *bph = bp->next;
467                         bp->next = NULL;
468                         freeb(bp);
469                 }
470         }
471         return bytes;
472 }
473
474 int pullblock(struct block **bph, int count)
475 {
476         return _pullblock(bph, count, 1);
477 }
478
479 /*
480  *  trim to len bytes starting at offset
481  */
482 struct block *trimblock(struct block *bp, int offset, int len)
483 {
484         uint32_t l, trim;
485         int olen = len;
486
487         QDEBUG checkb(bp, "trimblock 1");
488         if (blocklen(bp) < offset + len) {
489                 freeblist(bp);
490                 return NULL;
491         }
492
493         l =_pullblock(&bp, offset, 0);
494         if (bp == NULL)
495                 return NULL;
496         if (l != offset) {
497                 freeblist(bp);
498                 return NULL;
499         }
500
501         while ((l = BLEN(bp)) < len) {
502                 len -= l;
503                 bp = bp->next;
504         }
505
506         trim = BLEN(bp) - len;
507         trim -= dropext(bp, trim);
508         bp->wp -= trim;
509
510         if (bp->next) {
511                 freeblist(bp->next);
512                 bp->next = NULL;
513         }
514         return bp;
515 }
516
517 /*
518  *  copy 'count' bytes into a new block
519  */
520 struct block *copyblock(struct block *bp, int count)
521 {
522         int l;
523         struct block *nbp;
524
525         QDEBUG checkb(bp, "copyblock 0");
526         nbp = block_alloc(count, MEM_WAIT);
527         if (bp->flag & BCKSUM_FLAGS) {
528                 nbp->flag |= (bp->flag & BCKSUM_FLAGS);
529                 nbp->checksum_start = bp->checksum_start;
530                 nbp->checksum_offset = bp->checksum_offset;
531                 nbp->mss = bp->mss;
532         }
533         PANIC_EXTRA(bp);
534         for (; count > 0 && bp != 0; bp = bp->next) {
535                 l = BLEN(bp);
536                 if (l > count)
537                         l = count;
538                 memmove(nbp->wp, bp->rp, l);
539                 nbp->wp += l;
540                 count -= l;
541         }
542         if (count > 0) {
543                 memset(nbp->wp, 0, count);
544                 nbp->wp += count;
545         }
546         copyblockcnt++;
547         QDEBUG checkb(nbp, "copyblock 1");
548
549         return nbp;
550 }
551
552 /* Adjust block @bp so that its size is exactly @len.
553  * If the size is increased, fill in the new contents with zeros.
554  * If the size is decreased, discard some of the old contents at the tail. */
555 struct block *adjustblock(struct block *bp, int len)
556 {
557         struct extra_bdata *ebd;
558         void *buf;
559         int i;
560
561         if (len < 0) {
562                 freeb(bp);
563                 return NULL;
564         }
565
566         if (len == BLEN(bp))
567                 return bp;
568
569         /* Shrink within block main body. */
570         if (len <= BHLEN(bp)) {
571                 free_block_extra(bp);
572                 bp->wp = bp->rp + len;
573                 QDEBUG checkb(bp, "adjustblock 1");
574                 return bp;
575         }
576
577         /* Need to grow. */
578         if (len > BLEN(bp)) {
579                 /* Grow within block main body. */
580                 if (bp->extra_len == 0 && bp->rp + len <= bp->lim) {
581                         memset(bp->wp, 0, len - BLEN(bp));
582                         bp->wp = bp->rp + len;
583                         QDEBUG checkb(bp, "adjustblock 2");
584                         return bp;
585                 }
586                 /* Grow with extra data buffers. */
587                 buf = kzmalloc(len - BLEN(bp), MEM_WAIT);
588                 block_append_extra(bp, (uintptr_t)buf, 0, len - BLEN(bp), MEM_WAIT);
589                 QDEBUG checkb(bp, "adjustblock 3");
590                 return bp;
591         }
592
593         /* Shrink extra data buffers.
594          * len is how much of ebd we need to keep.
595          * extra_len is re-accumulated. */
596         assert(bp->extra_len > 0);
597         len -= BHLEN(bp);
598         bp->extra_len = 0;
599         for (i = 0; i < bp->nr_extra_bufs; i++) {
600                 ebd = &bp->extra_data[i];
601                 if (len <= ebd->len)
602                         break;
603                 len -= ebd->len;
604                 bp->extra_len += ebd->len;
605         }
606         /* If len becomes zero, extra_data[i] should be freed. */
607         if (len > 0) {
608                 ebd = &bp->extra_data[i];
609                 ebd->len = len;
610                 bp->extra_len += ebd->len;
611                 i++;
612         }
613         for (; i < bp->nr_extra_bufs; i++) {
614                 ebd = &bp->extra_data[i];
615                 if (ebd->base)
616                         kfree((void*)ebd->base);
617                 ebd->base = ebd->off = ebd->len = 0;
618         }
619         QDEBUG checkb(bp, "adjustblock 4");
620         return bp;
621 }
622
623 /* Helper: removes and returns the first block from q */
624 static struct block *pop_first_block(struct queue *q)
625 {
626         struct block *b = q->bfirst;
627
628         q->len -= BALLOC(b);  // XXX all usages of q->len with extra_data are fucked
629         q->dlen -= BLEN(b);
630         q->bfirst = b->next;
631         b->next = 0;
632         return b;
633 }
634
635 /* Helper: copies up to copy_amt from a buf to a block's main body (b->wp) */
636 static size_t copy_to_block_body(struct block *to, void *from, size_t copy_amt)
637 {
638         copy_amt = MIN(to->lim - to->wp, copy_amt);
639         memcpy(to->wp, from, copy_amt);
640         to->wp += copy_amt;
641         return copy_amt;
642 }
643
644 /* Accounting helper.  Block b in q lost amt extra_data */
645 static void block_and_q_lost_extra(struct block *b, struct queue *q, size_t amt)
646 {
647         b->extra_len -= amt;
648         q->len -= amt;
649         q->dlen -= amt;
650 }
651
652 /* Helper: moves ebd from a block (in from_q) to another block.  The *ebd is
653  * fixed in 'from', so we move its contents and zero it out in 'from'.
654  *
655  * Returns the length moved (0 on failure). */
656 static size_t move_ebd(struct extra_bdata *ebd, struct block *to,
657                        struct block *from, struct queue *from_q)
658 {
659         size_t ret = ebd->len;
660
661         if (block_append_extra(to, ebd->base, ebd->off, ebd->len, MEM_ATOMIC))
662                 return 0;
663         block_and_q_lost_extra(from, from_q, ebd->len);
664         ebd->base = ebd->len = ebd->off = 0;
665         return ret;
666 }
667
668 /* Copy up to len bytes from q->bfirst to @to, leaving the block in place.  May
669  * return with less than len, but greater than 0, even if there is more
670  * available in q.
671  *
672  * At any moment that we have copied anything and things are tricky, we can just
673  * return.  The trickiness comes from a bunch of variables: is the main body
674  * empty?  How do we split the ebd?  If our alloc fails, then we can fall back
675  * to @to's main body, but only if we haven't used it yet. */
676 static size_t copy_from_first_block(struct queue *q, struct block *to,
677                                     size_t len)
678 {
679         struct block *from = q->bfirst;
680         size_t copy_amt, amt;
681         struct extra_bdata *ebd;
682
683         assert(len < BLEN(from));       /* sanity */
684         /* Try to extract from the main body */
685         copy_amt = MIN(BHLEN(from), len);
686         if (copy_amt) {
687                 copy_amt = copy_to_block_body(to, from->rp, copy_amt);
688                 from->rp += copy_amt;
689                 /* We only change dlen, (data len), not q->len, since the q still has
690                  * the same block memory allocation (no kfrees happened) */
691                 q->dlen -= copy_amt;
692         }
693         /* Try to extract the remainder from the extra data */
694         len -= copy_amt;
695         for (int i = 0; (i < from->nr_extra_bufs) && len; i++) {
696                 ebd = &from->extra_data[i];
697                 if (!ebd->base || !ebd->len)
698                         continue;
699                 if (len >= ebd->len) {
700                         amt = move_ebd(ebd, to, from, q);
701                         if (!amt) {
702                                 /* our internal alloc could have failed.   this ebd is now the
703                                  * last one we'll consider.  let's handle it separately and put
704                                  * it in the main body. */
705                                 if (copy_amt)
706                                         return copy_amt;
707                                 copy_amt = copy_to_block_body(to, (void*)ebd->base + ebd->off,
708                                                               ebd->len);
709                                 block_and_q_lost_extra(from, q, copy_amt);
710                                 break;
711                         }
712                         len -= amt;
713                         copy_amt += amt;
714                         continue;
715                 } else {
716                         /* If we're here, we reached our final ebd, which we'll need to
717                          * split to get anything from it. */
718                         if (copy_amt)
719                                 return copy_amt;
720                         copy_amt = copy_to_block_body(to, (void*)ebd->base + ebd->off,
721                                                       len);
722                         ebd->off += copy_amt;
723                         ebd->len -= copy_amt;
724                         block_and_q_lost_extra(from, q, copy_amt);
725                         break;
726                 }
727         }
728         if (len)
729                 assert(copy_amt);       /* sanity */
730         return copy_amt;
731 }
732
733 /* Return codes for __qbread and __try_qbread. */
734 enum {
735         QBR_OK,
736         QBR_FAIL,
737         QBR_SPARE,      /* we need a spare block */
738         QBR_AGAIN,      /* do it again, we are coalescing blocks */
739 };
740
741 /* Helper and back-end for __qbread: extracts and returns a list of blocks
742  * containing up to len bytes.  It may contain less than len even if q has more
743  * data.
744  *
745  * Returns a code interpreted by __qbread, and the returned blist in ret. */
746 static int __try_qbread(struct queue *q, size_t len, int qio_flags,
747                         struct block **real_ret, struct block *spare)
748 {
749         struct block *ret, *ret_last, *first;
750         size_t blen;
751
752         if (qio_flags & QIO_CAN_ERR_SLEEP) {
753                 if (!qwait_and_ilock(q, qio_flags)) {
754                         spin_unlock_irqsave(&q->lock);
755                         return QBR_FAIL;
756                 }
757                 /* we qwaited and still hold the lock, so the q is not empty */
758                 first = q->bfirst;
759         } else {
760                 spin_lock_irqsave(&q->lock);
761                 first = q->bfirst;
762                 if (!first) {
763                         spin_unlock_irqsave(&q->lock);
764                         return QBR_FAIL;
765                 }
766         }
767         blen = BLEN(first);
768         if ((q->state & Qcoalesce) && (blen == 0)) {
769                 freeb(pop_first_block(q));
770                 spin_unlock_irqsave(&q->lock);
771                 /* Need to retry to make sure we have a first block */
772                 return QBR_AGAIN;
773         }
774         /* Qmsg is a bit weird.  The old 9ns code seemed to yank the entire block,
775          * regardless of len.  We'll do the same, and just return the minimum: the
776          * first block.  I'd be happy to remove this. */
777         if (q->state & Qmsg) {
778                 ret = pop_first_block(q);
779                 goto out_ok;
780         }
781         /* Let's get at least something first - makes the code easier.  This way,
782          * we'll only ever split the block once. */
783         if (blen <= len) {
784                 ret = pop_first_block(q);
785                 len -= blen;
786         } else {
787                 /* need to split the block.  we won't actually take the first block out
788                  * of the queue - we're just extracting a little bit. */
789                 if (!spare) {
790                         /* We have nothing and need a spare block.  Retry! */
791                         spin_unlock_irqsave(&q->lock);
792                         return QBR_SPARE;
793                 }
794                 copy_from_first_block(q, spare, len);
795                 ret = spare;
796                 goto out_ok;
797         }
798         /* At this point, we just grabbed the first block.  We can try to grab some
799          * more, up to len (if they want). */
800         if (qio_flags & QIO_JUST_ONE_BLOCK)
801                 goto out_ok;
802         ret_last = ret;
803         while (q->bfirst && (len > 0)) {
804                 blen = BLEN(q->bfirst);
805                 if ((q->state & Qcoalesce) && (blen == 0)) {
806                         /* remove the intermediate 0 blocks */
807                         freeb(pop_first_block(q));
808                         continue;
809                 }
810                 if (blen > len) {
811                         /* We could try to split the block, but that's a huge pain.  For
812                          * instance, we might need to move the main body of b into an
813                          * extra_data of ret_last.  lots of ways for that to fail, and lots
814                          * of cases to consider.  Easier to just bail out.  This is why I
815                          * did the first block above: we don't need to worry about this. */
816                          break;
817                 }
818                 ret_last->next = pop_first_block(q);
819                 ret_last = ret_last->next;
820                 len -= blen;
821         }
822 out_ok:
823         qwakeup_iunlock(q);
824         *real_ret = ret;
825         return QBR_OK;
826 }
827
828 /* Helper and front-end for __try_qbread: extracts and returns a list of blocks
829  * containing up to len bytes.  It may contain less than len even if q has more
830  * data.
831  *
832  * Returns 0 if the q is closed or would require blocking and !CAN_BLOCK.
833  *
834  * Technically, there's a weird corner case with !Qcoalesce and Qmsg where you
835  * could get a zero length block back. */
836 static struct block *__qbread(struct queue *q, size_t len, int qio_flags,
837                               int mem_flags)
838 {
839         struct block *ret = 0;
840         struct block *spare = 0;
841
842         while (1) {
843                 switch (__try_qbread(q, len, qio_flags, &ret, spare)) {
844                 case QBR_OK:
845                 case QBR_FAIL:
846                         if (spare && (ret != spare))
847                                 freeb(spare);
848                         return ret;
849                 case QBR_SPARE:
850                         assert(!spare);
851                         /* Due to some nastiness, we need a fresh block so we can read out
852                          * anything from the queue.  'len' seems like a reasonable amount.
853                          * Maybe we can get away with less. */
854                         spare = block_alloc(len, mem_flags);
855                         if (!spare)
856                                 return 0;
857                         break;
858                 case QBR_AGAIN:
859                         /* if the first block is 0 and we are Qcoalesce, then we'll need to
860                          * try again.  We bounce out of __try so we can perform the "is
861                          * there a block" logic again from the top. */
862                         break;
863                 }
864         }
865 }
866
867 /*
868  *  get next block from a queue, return null if nothing there
869  */
870 struct block *qget(struct queue *q)
871 {
872         /* since len == SIZE_MAX, we should never need to do a mem alloc */
873         return __qbread(q, SIZE_MAX, QIO_JUST_ONE_BLOCK, MEM_ATOMIC);
874 }
875
876 /* Throw away the next 'len' bytes in the queue returning the number actually
877  * discarded.
878  *
879  * If the bytes are in the queue, then they must be discarded.  The only time to
880  * return less than len is if the q itself has less than len bytes.  */
881 size_t qdiscard(struct queue *q, size_t len)
882 {
883         struct block *blist;
884         size_t removed_amt;
885         size_t sofar = 0;
886
887         /* This is racy.  There could be multiple qdiscarders or other consumers,
888          * where the consumption could be interleaved. */
889         while (qlen(q) && len) {
890                 blist = __qbread(q, len, 0, MEM_WAIT);
891                 removed_amt = freeblist(blist);
892                 sofar += removed_amt;
893                 len -= removed_amt;
894         }
895         return sofar;
896 }
897
898 ssize_t qpass(struct queue *q, struct block *b)
899 {
900         return __qbwrite(q, b, QIO_LIMIT | QIO_DROP_OVERFLOW);
901 }
902
903 ssize_t qpassnolim(struct queue *q, struct block *b)
904 {
905         return __qbwrite(q, b, 0);
906 }
907
908 /*
909  *  if the allocated space is way out of line with the used
910  *  space, reallocate to a smaller block
911  */
912 struct block *packblock(struct block *bp)
913 {
914         struct block **l, *nbp;
915         int n;
916
917         if (bp->extra_len)
918                 return bp;
919         for (l = &bp; *l; l = &(*l)->next) {
920                 nbp = *l;
921                 n = BLEN(nbp);
922                 if ((n << 2) < BALLOC(nbp)) {
923                         *l = block_alloc(n, MEM_WAIT);
924                         memmove((*l)->wp, nbp->rp, n);
925                         (*l)->wp += n;
926                         (*l)->next = nbp->next;
927                         freeb(nbp);
928                 }
929         }
930
931         return bp;
932 }
933
934 /* Add an extra_data entry to newb at newb_idx pointing to b's body, starting at
935  * body_rp, for up to len.  Returns the len consumed.
936  *
937  * The base is 'b', so that we can kfree it later.  This currently ties us to
938  * using kfree for the release method for all extra_data.
939  *
940  * It is possible to have a body size that is 0, if there is no offset, and
941  * b->wp == b->rp.  This will have an extra data entry of 0 length. */
942 static size_t point_to_body(struct block *b, uint8_t *body_rp,
943                             struct block *newb, unsigned int newb_idx,
944                             size_t len)
945 {
946         struct extra_bdata *ebd = &newb->extra_data[newb_idx];
947
948         assert(newb_idx < newb->nr_extra_bufs);
949
950         kmalloc_incref(b);
951         ebd->base = (uintptr_t)b;
952         ebd->off = (uint32_t)(body_rp - (uint8_t*)b);
953         ebd->len = MIN(b->wp - body_rp, len);   /* think of body_rp as b->rp */
954         assert((int)ebd->len >= 0);
955         newb->extra_len += ebd->len;
956         return ebd->len;
957 }
958
959 /* Add an extra_data entry to newb at newb_idx pointing to b's b_idx'th
960  * extra_data buf, at b_off within that buffer, for up to len.  Returns the len
961  * consumed.
962  *
963  * We can have blocks with 0 length, but they are still refcnt'd.  See above. */
964 static size_t point_to_buf(struct block *b, unsigned int b_idx, uint32_t b_off,
965                            struct block *newb, unsigned int newb_idx,
966                            size_t len)
967 {
968         struct extra_bdata *n_ebd = &newb->extra_data[newb_idx];
969         struct extra_bdata *b_ebd = &b->extra_data[b_idx];
970
971         assert(b_idx < b->nr_extra_bufs);
972         assert(newb_idx < newb->nr_extra_bufs);
973
974         kmalloc_incref((void*)b_ebd->base);
975         n_ebd->base = b_ebd->base;
976         n_ebd->off = b_ebd->off + b_off;
977         n_ebd->len = MIN(b_ebd->len - b_off, len);
978         newb->extra_len += n_ebd->len;
979         return n_ebd->len;
980 }
981
982 /* given a string of blocks, fills the new block's extra_data  with the contents
983  * of the blist [offset, len + offset)
984  *
985  * returns 0 on success.  the only failure is if the extra_data array was too
986  * small, so this returns a positive integer saying how big the extra_data needs
987  * to be.
988  *
989  * callers are responsible for protecting the list structure. */
990 static int __blist_clone_to(struct block *blist, struct block *newb, int len,
991                             uint32_t offset)
992 {
993         struct block *b, *first;
994         unsigned int nr_bufs = 0;
995         unsigned int b_idx, newb_idx = 0;
996         uint8_t *first_main_body = 0;
997
998         /* find the first block; keep offset relative to the latest b in the list */
999         for (b = blist; b; b = b->next) {
1000                 if (BLEN(b) > offset)
1001                         break;
1002                 offset -= BLEN(b);
1003         }
1004         /* qcopy semantics: if you asked for an offset outside the block list, you
1005          * get an empty block back */
1006         if (!b)
1007                 return 0;
1008         first = b;
1009         /* upper bound for how many buffers we'll need in newb */
1010         for (/* b is set*/; b; b = b->next) {
1011                 nr_bufs += 1 + b->nr_extra_bufs;        /* 1 for the main body */
1012         }
1013         /* we might be holding a spinlock here, so we won't wait for kmalloc */
1014         if (block_add_extd(newb, nr_bufs, 0) != 0) {
1015                 /* caller will need to alloc these, then re-call us */
1016                 return nr_bufs;
1017         }
1018         for (b = first; b && len; b = b->next) {
1019                 b_idx = 0;
1020                 if (offset) {
1021                         if (offset < BHLEN(b)) {
1022                                 /* off is in the main body */
1023                                 len -= point_to_body(b, b->rp + offset, newb, newb_idx, len);
1024                                 newb_idx++;
1025                         } else {
1026                                 /* off is in one of the buffers (or just past the last one).
1027                                  * we're not going to point to b's main body at all. */
1028                                 offset -= BHLEN(b);
1029                                 assert(b->extra_data);
1030                                 /* assuming these extrabufs are packed, or at least that len
1031                                  * isn't gibberish */
1032                                 while (b->extra_data[b_idx].len <= offset) {
1033                                         offset -= b->extra_data[b_idx].len;
1034                                         b_idx++;
1035                                 }
1036                                 /* now offset is set to our offset in the b_idx'th buf */
1037                                 len -= point_to_buf(b, b_idx, offset, newb, newb_idx, len);
1038                                 newb_idx++;
1039                                 b_idx++;
1040                         }
1041                         offset = 0;
1042                 } else {
1043                         len -= point_to_body(b, b->rp, newb, newb_idx, len);
1044                         newb_idx++;
1045                 }
1046                 /* knock out all remaining bufs.  we only did one point_to_ op by now,
1047                  * and any point_to_ could be our last if it consumed all of len. */
1048                 for (int i = b_idx; (i < b->nr_extra_bufs) && len; i++) {
1049                         len -= point_to_buf(b, i, 0, newb, newb_idx, len);
1050                         newb_idx++;
1051                 }
1052         }
1053         return 0;
1054 }
1055
1056 struct block *blist_clone(struct block *blist, int header_len, int len,
1057                           uint32_t offset)
1058 {
1059         int ret;
1060         struct block *newb = block_alloc(header_len, MEM_WAIT);
1061         do {
1062                 ret = __blist_clone_to(blist, newb, len, offset);
1063                 if (ret)
1064                         block_add_extd(newb, ret, MEM_WAIT);
1065         } while (ret);
1066         return newb;
1067 }
1068
1069 /* given a queue, makes a single block with header_len reserved space in the
1070  * block main body, and the contents of [offset, len + offset) pointed to in the
1071  * new blocks ext_data. */
1072 struct block *qclone(struct queue *q, int header_len, int len, uint32_t offset)
1073 {
1074         int ret;
1075         struct block *newb = block_alloc(header_len, MEM_WAIT);
1076         /* the while loop should rarely be used: it would require someone
1077          * concurrently adding to the queue. */
1078         do {
1079                 /* TODO: RCU: protecting the q list (b->next) (need read lock) */
1080                 spin_lock_irqsave(&q->lock);
1081                 ret = __blist_clone_to(q->bfirst, newb, len, offset);
1082                 spin_unlock_irqsave(&q->lock);
1083                 if (ret)
1084                         block_add_extd(newb, ret, MEM_WAIT);
1085         } while (ret);
1086         return newb;
1087 }
1088
1089 /*
1090  *  copy from offset in the queue
1091  */
1092 struct block *qcopy_old(struct queue *q, int len, uint32_t offset)
1093 {
1094         int sofar;
1095         int n;
1096         struct block *b, *nb;
1097         uint8_t *p;
1098
1099         nb = block_alloc(len, MEM_WAIT);
1100
1101         spin_lock_irqsave(&q->lock);
1102
1103         /* go to offset */
1104         b = q->bfirst;
1105         for (sofar = 0;; sofar += n) {
1106                 if (b == NULL) {
1107                         spin_unlock_irqsave(&q->lock);
1108                         return nb;
1109                 }
1110                 n = BLEN(b);
1111                 if (sofar + n > offset) {
1112                         p = b->rp + offset - sofar;
1113                         n -= offset - sofar;
1114                         break;
1115                 }
1116                 QDEBUG checkb(b, "qcopy");
1117                 b = b->next;
1118         }
1119
1120         /* copy bytes from there */
1121         for (sofar = 0; sofar < len;) {
1122                 if (n > len - sofar)
1123                         n = len - sofar;
1124                 PANIC_EXTRA(b);
1125                 memmove(nb->wp, p, n);
1126                 qcopycnt += n;
1127                 sofar += n;
1128                 nb->wp += n;
1129                 b = b->next;
1130                 if (b == NULL)
1131                         break;
1132                 n = BLEN(b);
1133                 p = b->rp;
1134         }
1135         spin_unlock_irqsave(&q->lock);
1136
1137         return nb;
1138 }
1139
1140 struct block *qcopy(struct queue *q, int len, uint32_t offset)
1141 {
1142 #ifdef CONFIG_BLOCK_EXTRAS
1143         return qclone(q, 0, len, offset);
1144 #else
1145         return qcopy_old(q, len, offset);
1146 #endif
1147 }
1148
1149 static void qinit_common(struct queue *q)
1150 {
1151         spinlock_init_irqsave(&q->lock);
1152         rendez_init(&q->rr);
1153         rendez_init(&q->wr);
1154 }
1155
1156 /*
1157  *  called by non-interrupt code
1158  */
1159 struct queue *qopen(int limit, int msg, void (*kick) (void *), void *arg)
1160 {
1161         struct queue *q;
1162
1163         q = kzmalloc(sizeof(struct queue), 0);
1164         if (q == 0)
1165                 return 0;
1166         qinit_common(q);
1167
1168         q->limit = q->inilim = limit;
1169         q->kick = kick;
1170         q->arg = arg;
1171         q->state = msg;
1172         q->state |= Qstarve;
1173         q->eof = 0;
1174
1175         return q;
1176 }
1177
1178 /* open a queue to be bypassed */
1179 struct queue *qbypass(void (*bypass) (void *, struct block *), void *arg)
1180 {
1181         struct queue *q;
1182
1183         q = kzmalloc(sizeof(struct queue), 0);
1184         if (q == 0)
1185                 return 0;
1186         qinit_common(q);
1187
1188         q->limit = 0;
1189         q->arg = arg;
1190         q->bypass = bypass;
1191         q->state = 0;
1192
1193         return q;
1194 }
1195
1196 static int notempty(void *a)
1197 {
1198         struct queue *q = a;
1199
1200         return (q->state & Qclosed) || q->bfirst != 0;
1201 }
1202
1203 /* Block, waiting for the queue to be non-empty or closed.  Returns with
1204  * the spinlock held.  Returns TRUE when there queue is not empty, FALSE if it
1205  * was naturally closed.  Throws an error o/w. */
1206 static bool qwait_and_ilock(struct queue *q, int qio_flags)
1207 {
1208         while (1) {
1209                 spin_lock_irqsave(&q->lock);
1210                 if (q->bfirst != NULL)
1211                         return TRUE;
1212                 if (q->state & Qclosed) {
1213                         if (++q->eof > 3) {
1214                                 spin_unlock_irqsave(&q->lock);
1215                                 error(EPIPE, "multiple reads on a closed queue");
1216                         }
1217                         if (q->err[0]) {
1218                                 spin_unlock_irqsave(&q->lock);
1219                                 error(EPIPE, q->err);
1220                         }
1221                         return FALSE;
1222                 }
1223                 /* We set Qstarve regardless of whether we are non-blocking or not.
1224                  * Qstarve tracks the edge detection of the queue being empty. */
1225                 q->state |= Qstarve;
1226                 if (qio_flags & QIO_NON_BLOCK) {
1227                         spin_unlock_irqsave(&q->lock);
1228                         error(EAGAIN, "queue empty");
1229                 }
1230                 spin_unlock_irqsave(&q->lock);
1231                 /* may throw an error() */
1232                 rendez_sleep(&q->rr, notempty, q);
1233         }
1234 }
1235
1236 /*
1237  * add a block list to a queue
1238  * XXX basically the same as enqueue blist, and has no locking!
1239  */
1240 void qaddlist(struct queue *q, struct block *b)
1241 {
1242         /* TODO: q lock? */
1243         /* queue the block */
1244         if (q->bfirst)
1245                 q->blast->next = b;
1246         else
1247                 q->bfirst = b;
1248         q->len += blockalloclen(b);
1249         q->dlen += blocklen(b);
1250         while (b->next)
1251                 b = b->next;
1252         q->blast = b;
1253 }
1254
1255 static size_t read_from_block(struct block *b, uint8_t *to, size_t amt)
1256 {
1257         size_t copy_amt, retval = 0;
1258         struct extra_bdata *ebd;
1259
1260         copy_amt = MIN(BHLEN(b), amt);
1261         memcpy(to, b->rp, copy_amt);
1262         /* advance the rp, since this block not be completely consumed and future
1263          * reads need to know where to pick up from */
1264         b->rp += copy_amt;
1265         to += copy_amt;
1266         amt -= copy_amt;
1267         retval += copy_amt;
1268         for (int i = 0; (i < b->nr_extra_bufs) && amt; i++) {
1269                 ebd = &b->extra_data[i];
1270                 /* skip empty entires.  if we track this in the struct block, we can
1271                  * just start the for loop early */
1272                 if (!ebd->base || !ebd->len)
1273                         continue;
1274                 copy_amt = MIN(ebd->len, amt);
1275                 memcpy(to, (void*)(ebd->base + ebd->off), copy_amt);
1276                 /* we're actually consuming the entries, just like how we advance rp up
1277                  * above, and might only consume part of one. */
1278                 ebd->len -= copy_amt;
1279                 ebd->off += copy_amt;
1280                 b->extra_len -= copy_amt;
1281                 if (!ebd->len) {
1282                         /* we don't actually have to decref here.  it's also done in
1283                          * freeb().  this is the earliest we can free. */
1284                         kfree((void*)ebd->base);
1285                         ebd->base = ebd->off = 0;
1286                 }
1287                 to += copy_amt;
1288                 amt -= copy_amt;
1289                 retval += copy_amt;
1290         }
1291         return retval;
1292 }
1293
1294 /*
1295  *  copy the contents of a string of blocks into
1296  *  memory.  emptied blocks are freed.  return
1297  *  pointer to first unconsumed block.
1298  */
1299 struct block *bl2mem(uint8_t * p, struct block *b, int n)
1300 {
1301         int i;
1302         struct block *next;
1303
1304         /* could be slicker here, since read_from_block is smart */
1305         for (; b != NULL; b = next) {
1306                 i = BLEN(b);
1307                 if (i > n) {
1308                         /* partial block, consume some */
1309                         read_from_block(b, p, n);
1310                         return b;
1311                 }
1312                 /* full block, consume all and move on */
1313                 i = read_from_block(b, p, i);
1314                 n -= i;
1315                 p += i;
1316                 next = b->next;
1317                 freeb(b);
1318         }
1319         return NULL;
1320 }
1321
1322 /* Extract the contents of all blocks and copy to va, up to len.  Returns the
1323  * actual amount copied. */
1324 static size_t read_all_blocks(struct block *b, void *va, size_t len)
1325 {
1326         size_t sofar = 0;
1327         struct block *next;
1328
1329         do {
1330                 /* We should be draining every block completely. */
1331                 assert(BLEN(b) <= len - sofar);
1332                 sofar += read_from_block(b, va + sofar, len - sofar);
1333                 next = b->next;
1334                 freeb(b);
1335                 b = next;
1336         } while (b);
1337         return sofar;
1338 }
1339
1340 /*
1341  *  copy the contents of memory into a string of blocks.
1342  *  return NULL on error.
1343  */
1344 struct block *mem2bl(uint8_t * p, int len)
1345 {
1346         ERRSTACK(1);
1347         int n;
1348         struct block *b, *first, **l;
1349
1350         first = NULL;
1351         l = &first;
1352         if (waserror()) {
1353                 freeblist(first);
1354                 nexterror();
1355         }
1356         do {
1357                 n = len;
1358                 if (n > Maxatomic)
1359                         n = Maxatomic;
1360
1361                 *l = b = block_alloc(n, MEM_WAIT);
1362                 /* TODO consider extra_data */
1363                 memmove(b->wp, p, n);
1364                 b->wp += n;
1365                 p += n;
1366                 len -= n;
1367                 l = &b->next;
1368         } while (len > 0);
1369         poperror();
1370
1371         return first;
1372 }
1373
1374 /*
1375  *  put a block back to the front of the queue
1376  *  called with q ilocked
1377  */
1378 void qputback(struct queue *q, struct block *b)
1379 {
1380         b->next = q->bfirst;
1381         if (q->bfirst == NULL)
1382                 q->blast = b;
1383         q->bfirst = b;
1384         q->len += BALLOC(b);
1385         q->dlen += BLEN(b);
1386 }
1387
1388 /*
1389  *  flow control, get producer going again
1390  *  called with q ilocked
1391  */
1392 static void qwakeup_iunlock(struct queue *q)
1393 {
1394         int dowakeup = 0;
1395
1396         /*
1397          *  if writer flow controlled, restart
1398          *
1399          *  This used to be
1400          *  q->len < q->limit/2
1401          *  but it slows down tcp too much for certain write sizes.
1402          *  I really don't understand it completely.  It may be
1403          *  due to the queue draining so fast that the transmission
1404          *  stalls waiting for the app to produce more data.  - presotto
1405          */
1406         if ((q->state & Qflow) && q->len < q->limit) {
1407                 q->state &= ~Qflow;
1408                 dowakeup = 1;
1409         }
1410
1411         spin_unlock_irqsave(&q->lock);
1412
1413         /* wakeup flow controlled writers */
1414         if (dowakeup) {
1415                 if (q->kick)
1416                         q->kick(q->arg);
1417                 rendez_wakeup(&q->wr);
1418         }
1419         qwake_cb(q, FDTAP_FILT_WRITABLE);
1420 }
1421
1422 /*
1423  *  get next block from a queue (up to a limit)
1424  *
1425  */
1426 struct block *qbread(struct queue *q, size_t len)
1427 {
1428         return __qbread(q, len, QIO_JUST_ONE_BLOCK | QIO_CAN_ERR_SLEEP, MEM_WAIT);
1429 }
1430
1431 struct block *qbread_nonblock(struct queue *q, size_t len)
1432 {
1433         return __qbread(q, len, QIO_JUST_ONE_BLOCK | QIO_CAN_ERR_SLEEP |
1434                         QIO_NON_BLOCK, MEM_WAIT);
1435 }
1436
1437 /* read up to len from a queue into vp. */
1438 size_t qread(struct queue *q, void *va, size_t len)
1439 {
1440         struct block *blist = __qbread(q, len, QIO_CAN_ERR_SLEEP, MEM_WAIT);
1441
1442         if (!blist)
1443                 return 0;
1444         return read_all_blocks(blist, va, len);
1445 }
1446
1447 size_t qread_nonblock(struct queue *q, void *va, size_t len)
1448 {
1449         struct block *blist = __qbread(q, len, QIO_CAN_ERR_SLEEP | QIO_NON_BLOCK,
1450                                        MEM_WAIT);
1451
1452         if (!blist)
1453                 return 0;
1454         return read_all_blocks(blist, va, len);
1455 }
1456
1457 static int qnotfull(void *a)
1458 {
1459         struct queue *q = a;
1460
1461         return q->len < q->limit || (q->state & Qclosed);
1462 }
1463
1464 /* Helper: enqueues a list of blocks to a queue.  Returns the total length. */
1465 static size_t enqueue_blist(struct queue *q, struct block *b)
1466 {
1467         size_t len, dlen;
1468
1469         if (q->bfirst)
1470                 q->blast->next = b;
1471         else
1472                 q->bfirst = b;
1473         len = BALLOC(b);
1474         dlen = BLEN(b);
1475         while (b->next) {
1476                 b = b->next;
1477                 len += BALLOC(b);
1478                 dlen += BLEN(b);
1479         }
1480         q->blast = b;
1481         q->len += len;
1482         q->dlen += dlen;
1483         return dlen;
1484 }
1485
1486 /* Adds block (which can be a list of blocks) to the queue, subject to
1487  * qio_flags.  Returns the length written on success or -1 on non-throwable
1488  * error.  Adjust qio_flags to control the value-added features!. */
1489 static ssize_t __qbwrite(struct queue *q, struct block *b, int qio_flags)
1490 {
1491         ssize_t ret;
1492         bool dowakeup = FALSE;
1493         bool was_empty;
1494
1495         if (q->bypass) {
1496                 ret = blocklen(b);
1497                 (*q->bypass) (q->arg, b);
1498                 return ret;
1499         }
1500         spin_lock_irqsave(&q->lock);
1501         was_empty = q->len == 0;
1502         if (q->state & Qclosed) {
1503                 spin_unlock_irqsave(&q->lock);
1504                 freeblist(b);
1505                 if (!(qio_flags & QIO_CAN_ERR_SLEEP))
1506                         return -1;
1507                 if (q->err[0])
1508                         error(EPIPE, q->err);
1509                 else
1510                         error(EPIPE, "connection closed");
1511         }
1512         if ((qio_flags & QIO_LIMIT) && (q->len >= q->limit)) {
1513                 /* drop overflow takes priority over regular non-blocking */
1514                 if ((qio_flags & QIO_DROP_OVERFLOW) || (q->state & Qdropoverflow)) {
1515                         spin_unlock_irqsave(&q->lock);
1516                         freeb(b);
1517                         return -1;
1518                 }
1519                 /* People shouldn't set NON_BLOCK without CAN_ERR, but we can be nice
1520                  * and catch it. */
1521                 if ((qio_flags & QIO_CAN_ERR_SLEEP) && (qio_flags & QIO_NON_BLOCK)) {
1522                         spin_unlock_irqsave(&q->lock);
1523                         freeb(b);
1524                         error(EAGAIN, "queue full");
1525                 }
1526         }
1527         ret = enqueue_blist(q, b);
1528         QDEBUG checkb(b, "__qbwrite");
1529         /* make sure other end gets awakened */
1530         if (q->state & Qstarve) {
1531                 q->state &= ~Qstarve;
1532                 dowakeup = TRUE;
1533         }
1534         spin_unlock_irqsave(&q->lock);
1535         /* TODO: not sure if the usage of a kick is mutually exclusive with a
1536          * wakeup, meaning that actual users either want a kick or have qreaders. */
1537         if (q->kick && (dowakeup || (q->state & Qkick)))
1538                 q->kick(q->arg);
1539         if (dowakeup)
1540                 rendez_wakeup(&q->rr);
1541         if (was_empty)
1542                 qwake_cb(q, FDTAP_FILT_READABLE);
1543         /*
1544          *  flow control, wait for queue to get below the limit
1545          *  before allowing the process to continue and queue
1546          *  more.  We do this here so that postnote can only
1547          *  interrupt us after the data has been queued.  This
1548          *  means that things like 9p flushes and ssl messages
1549          *  will not be disrupted by software interrupts.
1550          *
1551          *  Note - this is moderately dangerous since a process
1552          *  that keeps getting interrupted and rewriting will
1553          *  queue infinite crud.
1554          */
1555         if ((qio_flags & QIO_CAN_ERR_SLEEP) &&
1556             !(q->state & Qdropoverflow) && !(qio_flags & QIO_NON_BLOCK)) {
1557                 /* This is a racy peek at the q status.  If we accidentally block, we
1558                  * set Qflow, so someone should wake us.  If we accidentally don't
1559                  * block, we just returned to the user and let them slip a block past
1560                  * flow control. */
1561                 while (!qnotfull(q)) {
1562                         spin_lock_irqsave(&q->lock);
1563                         q->state |= Qflow;
1564                         spin_unlock_irqsave(&q->lock);
1565                         rendez_sleep(&q->wr, qnotfull, q);
1566                 }
1567         }
1568         return ret;
1569 }
1570
1571 /*
1572  *  add a block to a queue obeying flow control
1573  */
1574 ssize_t qbwrite(struct queue *q, struct block *b)
1575 {
1576         return __qbwrite(q, b, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
1577 }
1578
1579 ssize_t qbwrite_nonblock(struct queue *q, struct block *b)
1580 {
1581         return __qbwrite(q, b, QIO_CAN_ERR_SLEEP | QIO_LIMIT | QIO_NON_BLOCK);
1582 }
1583
1584 ssize_t qibwrite(struct queue *q, struct block *b)
1585 {
1586         return __qbwrite(q, b, 0);
1587 }
1588
1589 /* Helper, allocs a block and copies [from, from + len) into it.  Returns the
1590  * block on success, 0 on failure. */
1591 static struct block *build_block(void *from, size_t len, int mem_flags)
1592 {
1593         struct block *b;
1594         void *ext_buf;
1595
1596         /* If len is small, we don't need to bother with the extra_data.  But until
1597          * the whole stack can handle extd blocks, we'll use them unconditionally.
1598          * */
1599 #ifdef CONFIG_BLOCK_EXTRAS
1600         /* allocb builds in 128 bytes of header space to all blocks, but this is
1601          * only available via padblock (to the left).  we also need some space
1602          * for pullupblock for some basic headers (like icmp) that get written
1603          * in directly */
1604         b = block_alloc(64, mem_flags);
1605         if (!b)
1606                 return 0;
1607         ext_buf = kmalloc(len, mem_flags);
1608         if (!ext_buf) {
1609                 kfree(b);
1610                 return 0;
1611         }
1612         memcpy(ext_buf, from, len);
1613         if (block_add_extd(b, 1, mem_flags)) {
1614                 kfree(ext_buf);
1615                 kfree(b);
1616                 return 0;
1617         }
1618         b->extra_data[0].base = (uintptr_t)ext_buf;
1619         b->extra_data[0].off = 0;
1620         b->extra_data[0].len = len;
1621         b->extra_len += len;
1622 #else
1623         b = block_alloc(n, mem_flags);
1624         if (!b)
1625                 return 0;
1626         memmove(b->wp, from, len);
1627         b->wp += len;
1628 #endif
1629         return b;
1630 }
1631
1632 static ssize_t __qwrite(struct queue *q, void *vp, size_t len, int mem_flags,
1633                         int qio_flags)
1634 {
1635         size_t n, sofar;
1636         struct block *b;
1637         uint8_t *p = vp;
1638         void *ext_buf;
1639
1640         sofar = 0;
1641         do {
1642                 n = len - sofar;
1643                 /* This is 64K, the max amount per single block.  Still a good value? */
1644                 if (n > Maxatomic)
1645                         n = Maxatomic;
1646                 b = build_block(p + sofar, n, mem_flags);
1647                 if (!b)
1648                         break;
1649                 if (__qbwrite(q, b, qio_flags) < 0)
1650                         break;
1651                 sofar += n;
1652         } while ((sofar < len) && (q->state & Qmsg) == 0);
1653         return sofar;
1654 }
1655
1656 ssize_t qwrite(struct queue *q, void *vp, int len)
1657 {
1658         return __qwrite(q, vp, len, MEM_WAIT, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
1659 }
1660
1661 ssize_t qwrite_nonblock(struct queue *q, void *vp, int len)
1662 {
1663         return __qwrite(q, vp, len, MEM_WAIT, QIO_CAN_ERR_SLEEP | QIO_LIMIT |
1664                                               QIO_NON_BLOCK);
1665 }
1666
1667 ssize_t qiwrite(struct queue *q, void *vp, int len)
1668 {
1669         return __qwrite(q, vp, len, MEM_ATOMIC, 0);
1670 }
1671
1672 /*
1673  *  be extremely careful when calling this,
1674  *  as there is no reference accounting
1675  */
1676 void qfree(struct queue *q)
1677 {
1678         qclose(q);
1679         kfree(q);
1680 }
1681
1682 /*
1683  *  Mark a queue as closed.  No further IO is permitted.
1684  *  All blocks are released.
1685  */
1686 void qclose(struct queue *q)
1687 {
1688         struct block *bfirst;
1689
1690         if (q == NULL)
1691                 return;
1692
1693         /* mark it */
1694         spin_lock_irqsave(&q->lock);
1695         q->state |= Qclosed;
1696         q->state &= ~(Qflow | Qstarve | Qdropoverflow);
1697         q->err[0] = 0;
1698         bfirst = q->bfirst;
1699         q->bfirst = 0;
1700         q->len = 0;
1701         q->dlen = 0;
1702         spin_unlock_irqsave(&q->lock);
1703
1704         /* free queued blocks */
1705         freeblist(bfirst);
1706
1707         /* wake up readers/writers */
1708         rendez_wakeup(&q->rr);
1709         rendez_wakeup(&q->wr);
1710         qwake_cb(q, FDTAP_FILT_HANGUP);
1711 }
1712
1713 /* Mark a queue as closed.  Wakeup any readers.  Don't remove queued blocks.
1714  *
1715  * msg will be the errstr received by any waiters (qread, qbread, etc).  If
1716  * there is no message, which is what also happens during a natural qclose(),
1717  * those waiters will simply return 0.  qwriters will always error() on a
1718  * closed/hungup queue. */
1719 void qhangup(struct queue *q, char *msg)
1720 {
1721         /* mark it */
1722         spin_lock_irqsave(&q->lock);
1723         q->state |= Qclosed;
1724         if (msg == 0 || *msg == 0)
1725                 q->err[0] = 0;
1726         else
1727                 strlcpy(q->err, msg, ERRMAX);
1728         spin_unlock_irqsave(&q->lock);
1729
1730         /* wake up readers/writers */
1731         rendez_wakeup(&q->rr);
1732         rendez_wakeup(&q->wr);
1733         qwake_cb(q, FDTAP_FILT_HANGUP);
1734 }
1735
1736 /*
1737  *  return non-zero if the q is hungup
1738  */
1739 int qisclosed(struct queue *q)
1740 {
1741         return q->state & Qclosed;
1742 }
1743
1744 /*
1745  *  mark a queue as no longer hung up.  resets the wake_cb.
1746  */
1747 void qreopen(struct queue *q)
1748 {
1749         spin_lock_irqsave(&q->lock);
1750         q->state &= ~Qclosed;
1751         q->state |= Qstarve;
1752         q->eof = 0;
1753         q->limit = q->inilim;
1754         q->wake_cb = 0;
1755         q->wake_data = 0;
1756         spin_unlock_irqsave(&q->lock);
1757 }
1758
1759 /*
1760  *  return bytes queued
1761  */
1762 int qlen(struct queue *q)
1763 {
1764         return q->dlen;
1765 }
1766
1767 /*
1768  * return space remaining before flow control
1769  */
1770 int qwindow(struct queue *q)
1771 {
1772         int l;
1773
1774         l = q->limit - q->len;
1775         if (l < 0)
1776                 l = 0;
1777         return l;
1778 }
1779
1780 /*
1781  *  return true if we can read without blocking
1782  */
1783 int qcanread(struct queue *q)
1784 {
1785         return q->bfirst != 0;
1786 }
1787
1788 /*
1789  *  change queue limit
1790  */
1791 void qsetlimit(struct queue *q, int limit)
1792 {
1793         q->limit = limit;
1794 }
1795
1796 /*
1797  *  set whether writes drop overflowing blocks, or if we sleep
1798  */
1799 void qdropoverflow(struct queue *q, bool onoff)
1800 {
1801         if (onoff)
1802                 q->state |= Qdropoverflow;
1803         else
1804                 q->state &= ~Qdropoverflow;
1805 }
1806
1807 /*
1808  *  flush the output queue
1809  */
1810 void qflush(struct queue *q)
1811 {
1812         struct block *bfirst;
1813
1814         /* mark it */
1815         spin_lock_irqsave(&q->lock);
1816         bfirst = q->bfirst;
1817         q->bfirst = 0;
1818         q->len = 0;
1819         q->dlen = 0;
1820         spin_unlock_irqsave(&q->lock);
1821
1822         /* free queued blocks */
1823         freeblist(bfirst);
1824
1825         /* wake up writers */
1826         rendez_wakeup(&q->wr);
1827         qwake_cb(q, FDTAP_FILT_WRITABLE);
1828 }
1829
1830 int qfull(struct queue *q)
1831 {
1832         return q->len >= q->limit;
1833 }
1834
1835 int qstate(struct queue *q)
1836 {
1837         return q->state;
1838 }
1839
1840 void qdump(struct queue *q)
1841 {
1842         if (q)
1843                 printk("q=%p bfirst=%p blast=%p len=%d dlen=%d limit=%d state=#%x\n",
1844                            q, q->bfirst, q->blast, q->len, q->dlen, q->limit, q->state);
1845 }
1846
1847 /* On certain wakeup events, qio will call func(q, data, filter), where filter
1848  * marks the type of wakeup event (flags from FDTAP).
1849  *
1850  * There's no sync protection.  If you change the CB while the qio is running,
1851  * you might get a CB with the data or func from a previous set_wake_cb.  You
1852  * should set this once per queue and forget it.
1853  *
1854  * You can remove the CB by passing in 0 for the func.  Alternatively, you can
1855  * just make sure that the func(data) pair are valid until the queue is freed or
1856  * reopened. */
1857 void qio_set_wake_cb(struct queue *q, qio_wake_cb_t func, void *data)
1858 {
1859         q->wake_data = data;
1860         wmb();  /* if we see func, we'll also see the data for it */
1861         q->wake_cb = func;
1862 }
1863
1864 /* Helper for detecting whether we'll block on a read at this instant. */
1865 bool qreadable(struct queue *q)
1866 {
1867         return qlen(q) > 0;
1868 }
1869
1870 /* Helper for detecting whether we'll block on a write at this instant. */
1871 bool qwritable(struct queue *q)
1872 {
1873         return qwindow(q) > 0;
1874 }