Report readablity/writablility via 9p stat
[akaros.git] / kern / src / ns / qio.c
1 /* Copyright © 1994-1999 Lucent Technologies Inc.  All rights reserved.
2  * Portions Copyright © 1997-1999 Vita Nuova Limited
3  * Portions Copyright © 2000-2007 Vita Nuova Holdings Limited
4  *                                (www.vitanuova.com)
5  * Revisions Copyright © 2000-2007 Lucent Technologies Inc. and others
6  *
7  * Modified for the Akaros operating system:
8  * Copyright (c) 2013-2014 The Regents of the University of California
9  * Copyright (c) 2013-2015 Google Inc.
10  *
11  * Permission is hereby granted, free of charge, to any person obtaining a copy
12  * of this software and associated documentation files (the "Software"), to deal
13  * in the Software without restriction, including without limitation the rights
14  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15  * copies of the Software, and to permit persons to whom the Software is
16  * furnished to do so, subject to the following conditions:
17  *
18  * The above copyright notice and this permission notice shall be included in
19  * all copies or substantial portions of the Software.
20  *
21  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL THE
24  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
27  * SOFTWARE. */
28
29 #include <vfs.h>
30 #include <kfs.h>
31 #include <slab.h>
32 #include <kmalloc.h>
33 #include <kref.h>
34 #include <string.h>
35 #include <stdio.h>
36 #include <assert.h>
37 #include <error.h>
38 #include <cpio.h>
39 #include <pmap.h>
40 #include <smp.h>
41 #include <ip.h>
42
43 #define PANIC_EXTRA(b)                                                  \
44 {                                                                       \
45         if ((b)->extra_len) {                                           \
46                 printblock(b);                                          \
47                 backtrace();                                            \
48                 panic("%s doesn't handle extra_data", __FUNCTION__);    \
49         }                                                               \
50 }
51
52 static uint32_t padblockcnt;
53 static uint32_t concatblockcnt;
54 static uint32_t pullupblockcnt;
55 static uint32_t copyblockcnt;
56 static uint32_t consumecnt;
57 static uint32_t producecnt;
58 static uint32_t qcopycnt;
59
60 static int debugging;
61
62 #define QDEBUG  if(0)
63
64 /*
65  *  IO queues
66  */
67
68 struct queue {
69         spinlock_t lock;;
70
71         struct block *bfirst;           /* buffer */
72         struct block *blast;
73
74         int len;                                        /* bytes allocated to queue */
75         int dlen;                                       /* data bytes in queue */
76         int limit;                                      /* max bytes in queue */
77         int inilim;                             /* initial limit */
78         int state;
79         int eof;                                        /* number of eofs read by user */
80
81         void (*kick) (void *);          /* restart output */
82         void (*bypass) (void *, struct block *);        /* bypass queue altogether */
83         void *arg;                                      /* argument to kick */
84
85         struct rendez rr;                       /* process waiting to read */
86         struct rendez wr;                       /* process waiting to write */
87         qio_wake_cb_t wake_cb;          /* callbacks for qio wakeups */
88         void *wake_data;
89
90         char err[ERRMAX];
91 };
92
93 enum {
94         Maxatomic = 64 * 1024,
95         QIO_CAN_ERR_SLEEP = (1 << 0),   /* can throw errors or block/sleep */
96         QIO_LIMIT = (1 << 1),                   /* respect q->limit */
97         QIO_DROP_OVERFLOW = (1 << 2),   /* alternative to setting qdropoverflow */
98         QIO_JUST_ONE_BLOCK = (1 << 3),  /* when qbreading, just get one block */
99         QIO_NON_BLOCK = (1 << 4),               /* throw EAGAIN instead of blocking */
100 };
101
102 unsigned int qiomaxatomic = Maxatomic;
103
104 static ssize_t __qbwrite(struct queue *q, struct block *b, int flags);
105 static struct block *__qbread(struct queue *q, size_t len, int qio_flags,
106                               int mem_flags);
107 static bool qwait_and_ilock(struct queue *q, int qio_flags);
108 static void qwakeup_iunlock(struct queue *q);
109
110 /* Helper: fires a wake callback, sending 'filter' */
111 static void qwake_cb(struct queue *q, int filter)
112 {
113         if (q->wake_cb)
114                 q->wake_cb(q, q->wake_data, filter);
115 }
116
117 void ixsummary(void)
118 {
119         debugging ^= 1;
120         printd("pad %lu, concat %lu, pullup %lu, copy %lu\n",
121                    padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
122         printd("consume %lu, produce %lu, qcopy %lu\n",
123                    consumecnt, producecnt, qcopycnt);
124 }
125
126 /*
127  *  pad a block to the front (or the back if size is negative)
128  */
129 struct block *padblock(struct block *bp, int size)
130 {
131         int n;
132         struct block *nbp;
133         uint8_t bcksum = bp->flag & BCKSUM_FLAGS;
134         uint16_t checksum_start = bp->checksum_start;
135         uint16_t checksum_offset = bp->checksum_offset;
136         uint16_t mss = bp->mss;
137
138         QDEBUG checkb(bp, "padblock 1");
139         if (size >= 0) {
140                 if (bp->rp - bp->base >= size) {
141                         bp->checksum_start += size;
142                         bp->rp -= size;
143                         return bp;
144                 }
145
146                 PANIC_EXTRA(bp);
147                 if (bp->next)
148                         panic("padblock %p", getcallerpc(&bp));
149                 n = BLEN(bp);
150                 padblockcnt++;
151                 nbp = block_alloc(size + n, MEM_WAIT);
152                 nbp->rp += size;
153                 nbp->wp = nbp->rp;
154                 memmove(nbp->wp, bp->rp, n);
155                 nbp->wp += n;
156                 freeb(bp);
157                 nbp->rp -= size;
158         } else {
159                 size = -size;
160
161                 PANIC_EXTRA(bp);
162
163                 if (bp->next)
164                         panic("padblock %p", getcallerpc(&bp));
165
166                 if (bp->lim - bp->wp >= size)
167                         return bp;
168
169                 n = BLEN(bp);
170                 padblockcnt++;
171                 nbp = block_alloc(size + n, MEM_WAIT);
172                 memmove(nbp->wp, bp->rp, n);
173                 nbp->wp += n;
174                 freeb(bp);
175         }
176         if (bcksum) {
177                 nbp->flag |= bcksum;
178                 nbp->checksum_start = checksum_start;
179                 nbp->checksum_offset = checksum_offset;
180                 nbp->mss = mss;
181         }
182         QDEBUG checkb(nbp, "padblock 1");
183         return nbp;
184 }
185
186 /*
187  *  return count of bytes in a string of blocks
188  */
189 int blocklen(struct block *bp)
190 {
191         int len;
192
193         len = 0;
194         while (bp) {
195                 len += BLEN(bp);
196                 bp = bp->next;
197         }
198         return len;
199 }
200
201 /*
202  * return count of space in blocks
203  */
204 int blockalloclen(struct block *bp)
205 {
206         int len;
207
208         len = 0;
209         while (bp) {
210                 len += BALLOC(bp);
211                 bp = bp->next;
212         }
213         return len;
214 }
215
216 /*
217  *  copy the  string of blocks into
218  *  a single block and free the string
219  */
220 struct block *concatblock(struct block *bp)
221 {
222         int len;
223         struct block *nb, *f;
224
225         if (bp->next == 0)
226                 return bp;
227
228         /* probably use parts of qclone */
229         PANIC_EXTRA(bp);
230         nb = block_alloc(blocklen(bp), MEM_WAIT);
231         for (f = bp; f; f = f->next) {
232                 len = BLEN(f);
233                 memmove(nb->wp, f->rp, len);
234                 nb->wp += len;
235         }
236         concatblockcnt += BLEN(nb);
237         freeblist(bp);
238         QDEBUG checkb(nb, "concatblock 1");
239         return nb;
240 }
241
242 /* Returns a block with the remaining contents of b all in the main body of the
243  * returned block.  Replace old references to b with the returned value (which
244  * may still be 'b', if no change was needed. */
245 struct block *linearizeblock(struct block *b)
246 {
247         struct block *newb;
248         size_t len;
249         struct extra_bdata *ebd;
250
251         if (!b->extra_len)
252                 return b;
253
254         newb = block_alloc(BLEN(b), MEM_WAIT);
255         len = BHLEN(b);
256         memcpy(newb->wp, b->rp, len);
257         newb->wp += len;
258         len = b->extra_len;
259         for (int i = 0; (i < b->nr_extra_bufs) && len; i++) {
260                 ebd = &b->extra_data[i];
261                 if (!ebd->base || !ebd->len)
262                         continue;
263                 memcpy(newb->wp, (void*)(ebd->base + ebd->off), ebd->len);
264                 newb->wp += ebd->len;
265                 len -= ebd->len;
266         }
267         /* TODO: any other flags that need copied over? */
268         if (b->flag & BCKSUM_FLAGS) {
269                 newb->flag |= (b->flag & BCKSUM_FLAGS);
270                 newb->checksum_start = b->checksum_start;
271                 newb->checksum_offset = b->checksum_offset;
272                 newb->mss = b->mss;
273         }
274         freeb(b);
275         return newb;
276 }
277
278 /*
279  *  make sure the first block has at least n bytes in its main body
280  */
281 struct block *pullupblock(struct block *bp, int n)
282 {
283         int i, len, seglen;
284         struct block *nbp;
285         struct extra_bdata *ebd;
286
287         /*
288          *  this should almost always be true, it's
289          *  just to avoid every caller checking.
290          */
291         if (BHLEN(bp) >= n)
292                 return bp;
293
294          /* a start at explicit main-body / header management */
295         if (bp->extra_len) {
296                 if (n > bp->lim - bp->rp) {
297                         /* would need to realloc a new block and copy everything over. */
298                         panic("can't pullup %d bytes, no place to put it: bp->lim %p, bp->rp %p, bp->lim-bp->rp %d\n",
299                                         n, bp->lim, bp->rp, bp->lim-bp->rp);
300                 }
301                 len = n - BHLEN(bp);
302                 if (len > bp->extra_len)
303                         panic("pullup more than extra (%d, %d, %d)\n",
304                               n, BHLEN(bp), bp->extra_len);
305                 QDEBUG checkb(bp, "before pullup");
306                 for (int i = 0; (i < bp->nr_extra_bufs) && len; i++) {
307                         ebd = &bp->extra_data[i];
308                         if (!ebd->base || !ebd->len)
309                                 continue;
310                         seglen = MIN(ebd->len, len);
311                         memcpy(bp->wp, (void*)(ebd->base + ebd->off), seglen);
312                         bp->wp += seglen;
313                         len -= seglen;
314                         ebd->len -= seglen;
315                         ebd->off += seglen;
316                         bp->extra_len -= seglen;
317                         if (ebd->len == 0) {
318                                 kfree((void *)ebd->base);
319                                 ebd->off = 0;
320                                 ebd->base = 0;
321                         }
322                 }
323                 /* maybe just call pullupblock recursively here */
324                 if (len)
325                         panic("pullup %d bytes overdrawn\n", len);
326                 QDEBUG checkb(bp, "after pullup");
327                 return bp;
328         }
329
330         /*
331          *  if not enough room in the first block,
332          *  add another to the front of the list.
333          */
334         if (bp->lim - bp->rp < n) {
335                 nbp = block_alloc(n, MEM_WAIT);
336                 nbp->next = bp;
337                 bp = nbp;
338         }
339
340         /*
341          *  copy bytes from the trailing blocks into the first
342          */
343         n -= BLEN(bp);
344         while ((nbp = bp->next)) {
345                 i = BLEN(nbp);
346                 if (i > n) {
347                         memmove(bp->wp, nbp->rp, n);
348                         pullupblockcnt++;
349                         bp->wp += n;
350                         nbp->rp += n;
351                         QDEBUG checkb(bp, "pullupblock 1");
352                         return bp;
353                 } else {
354                         memmove(bp->wp, nbp->rp, i);
355                         pullupblockcnt++;
356                         bp->wp += i;
357                         bp->next = nbp->next;
358                         nbp->next = 0;
359                         freeb(nbp);
360                         n -= i;
361                         if (n == 0) {
362                                 QDEBUG checkb(bp, "pullupblock 2");
363                                 return bp;
364                         }
365                 }
366         }
367         freeb(bp);
368         return 0;
369 }
370
371 /*
372  *  make sure the first block has at least n bytes in its main body
373  */
374 struct block *pullupqueue(struct queue *q, int n)
375 {
376         struct block *b;
377
378         /* TODO: lock to protect the queue links? */
379         if ((BHLEN(q->bfirst) >= n))
380                 return q->bfirst;
381         q->bfirst = pullupblock(q->bfirst, n);
382         for (b = q->bfirst; b != NULL && b->next != NULL; b = b->next) ;
383         q->blast = b;
384         return q->bfirst;
385 }
386
387 /* throw away count bytes from the front of
388  * block's extradata.  Returns count of bytes
389  * thrown away
390  */
391
392 static int pullext(struct block *bp, int count)
393 {
394         struct extra_bdata *ed;
395         int i, rem, bytes = 0;
396
397         for (i = 0; bp->extra_len && count && i < bp->nr_extra_bufs; i++) {
398                 ed = &bp->extra_data[i];
399                 rem = MIN(count, ed->len);
400                 bp->extra_len -= rem;
401                 count -= rem;
402                 bytes += rem;
403                 ed->off += rem;
404                 ed->len -= rem;
405                 if (ed->len == 0) {
406                         kfree((void *)ed->base);
407                         ed->base = 0;
408                         ed->off = 0;
409                 }
410         }
411         return bytes;
412 }
413
414 /* throw away count bytes from the end of a
415  * block's extradata.  Returns count of bytes
416  * thrown away
417  */
418
419 static int dropext(struct block *bp, int count)
420 {
421         struct extra_bdata *ed;
422         int i, rem, bytes = 0;
423
424         for (i = bp->nr_extra_bufs - 1; bp->extra_len && count && i >= 0; i--) {
425                 ed = &bp->extra_data[i];
426                 rem = MIN(count, ed->len);
427                 bp->extra_len -= rem;
428                 count -= rem;
429                 bytes += rem;
430                 ed->len -= rem;
431                 if (ed->len == 0) {
432                         kfree((void *)ed->base);
433                         ed->base = 0;
434                         ed->off = 0;
435                 }
436         }
437         return bytes;
438 }
439
440 /*
441  *  throw away up to count bytes from a
442  *  list of blocks.  Return count of bytes
443  *  thrown away.
444  */
445 static int _pullblock(struct block **bph, int count, int free)
446 {
447         struct block *bp;
448         int n, bytes;
449
450         bytes = 0;
451         if (bph == NULL)
452                 return 0;
453
454         while (*bph != NULL && count != 0) {
455                 bp = *bph;
456
457                 n = MIN(BHLEN(bp), count);
458                 bytes += n;
459                 count -= n;
460                 bp->rp += n;
461                 n = pullext(bp, count);
462                 bytes += n;
463                 count -= n;
464                 QDEBUG checkb(bp, "pullblock ");
465                 if (BLEN(bp) == 0 && (free || count)) {
466                         *bph = bp->next;
467                         bp->next = NULL;
468                         freeb(bp);
469                 }
470         }
471         return bytes;
472 }
473
474 int pullblock(struct block **bph, int count)
475 {
476         return _pullblock(bph, count, 1);
477 }
478
479 /*
480  *  trim to len bytes starting at offset
481  */
482 struct block *trimblock(struct block *bp, int offset, int len)
483 {
484         uint32_t l, trim;
485         int olen = len;
486
487         QDEBUG checkb(bp, "trimblock 1");
488         if (blocklen(bp) < offset + len) {
489                 freeblist(bp);
490                 return NULL;
491         }
492
493         l =_pullblock(&bp, offset, 0);
494         if (bp == NULL)
495                 return NULL;
496         if (l != offset) {
497                 freeblist(bp);
498                 return NULL;
499         }
500
501         while ((l = BLEN(bp)) < len) {
502                 len -= l;
503                 bp = bp->next;
504         }
505
506         trim = BLEN(bp) - len;
507         trim -= dropext(bp, trim);
508         bp->wp -= trim;
509
510         if (bp->next) {
511                 freeblist(bp->next);
512                 bp->next = NULL;
513         }
514         return bp;
515 }
516
517 /*
518  *  copy 'count' bytes into a new block
519  */
520 struct block *copyblock(struct block *bp, int count)
521 {
522         int l;
523         struct block *nbp;
524
525         QDEBUG checkb(bp, "copyblock 0");
526         nbp = block_alloc(count, MEM_WAIT);
527         if (bp->flag & BCKSUM_FLAGS) {
528                 nbp->flag |= (bp->flag & BCKSUM_FLAGS);
529                 nbp->checksum_start = bp->checksum_start;
530                 nbp->checksum_offset = bp->checksum_offset;
531                 nbp->mss = bp->mss;
532         }
533         PANIC_EXTRA(bp);
534         for (; count > 0 && bp != 0; bp = bp->next) {
535                 l = BLEN(bp);
536                 if (l > count)
537                         l = count;
538                 memmove(nbp->wp, bp->rp, l);
539                 nbp->wp += l;
540                 count -= l;
541         }
542         if (count > 0) {
543                 memset(nbp->wp, 0, count);
544                 nbp->wp += count;
545         }
546         copyblockcnt++;
547         QDEBUG checkb(nbp, "copyblock 1");
548
549         return nbp;
550 }
551
552 /* Adjust block @bp so that its size is exactly @len.
553  * If the size is increased, fill in the new contents with zeros.
554  * If the size is decreased, discard some of the old contents at the tail. */
555 struct block *adjustblock(struct block *bp, int len)
556 {
557         struct extra_bdata *ebd;
558         void *buf;
559         int i;
560
561         if (len < 0) {
562                 freeb(bp);
563                 return NULL;
564         }
565
566         if (len == BLEN(bp))
567                 return bp;
568
569         /* Shrink within block main body. */
570         if (len <= BHLEN(bp)) {
571                 free_block_extra(bp);
572                 bp->wp = bp->rp + len;
573                 QDEBUG checkb(bp, "adjustblock 1");
574                 return bp;
575         }
576
577         /* Need to grow. */
578         if (len > BLEN(bp)) {
579                 /* Grow within block main body. */
580                 if (bp->extra_len == 0 && bp->rp + len <= bp->lim) {
581                         memset(bp->wp, 0, len - BLEN(bp));
582                         bp->wp = bp->rp + len;
583                         QDEBUG checkb(bp, "adjustblock 2");
584                         return bp;
585                 }
586                 /* Grow with extra data buffers. */
587                 buf = kzmalloc(len - BLEN(bp), MEM_WAIT);
588                 block_append_extra(bp, (uintptr_t)buf, 0, len - BLEN(bp), MEM_WAIT);
589                 QDEBUG checkb(bp, "adjustblock 3");
590                 return bp;
591         }
592
593         /* Shrink extra data buffers.
594          * len is how much of ebd we need to keep.
595          * extra_len is re-accumulated. */
596         assert(bp->extra_len > 0);
597         len -= BHLEN(bp);
598         bp->extra_len = 0;
599         for (i = 0; i < bp->nr_extra_bufs; i++) {
600                 ebd = &bp->extra_data[i];
601                 if (len <= ebd->len)
602                         break;
603                 len -= ebd->len;
604                 bp->extra_len += ebd->len;
605         }
606         /* If len becomes zero, extra_data[i] should be freed. */
607         if (len > 0) {
608                 ebd = &bp->extra_data[i];
609                 ebd->len = len;
610                 bp->extra_len += ebd->len;
611                 i++;
612         }
613         for (; i < bp->nr_extra_bufs; i++) {
614                 ebd = &bp->extra_data[i];
615                 if (ebd->base)
616                         kfree((void*)ebd->base);
617                 ebd->base = ebd->off = ebd->len = 0;
618         }
619         QDEBUG checkb(bp, "adjustblock 4");
620         return bp;
621 }
622
623 /* Helper: removes and returns the first block from q */
624 static struct block *pop_first_block(struct queue *q)
625 {
626         struct block *b = q->bfirst;
627
628         q->len -= BALLOC(b);
629         q->dlen -= BLEN(b);
630         q->bfirst = b->next;
631         b->next = 0;
632         return b;
633 }
634
635 /* Helper: copies up to copy_amt from a buf to a block's main body (b->wp) */
636 static size_t copy_to_block_body(struct block *to, void *from, size_t copy_amt)
637 {
638         copy_amt = MIN(to->lim - to->wp, copy_amt);
639         memcpy(to->wp, from, copy_amt);
640         to->wp += copy_amt;
641         return copy_amt;
642 }
643
644 /* Accounting helper.  Block b in q lost amt extra_data */
645 static void block_and_q_lost_extra(struct block *b, struct queue *q, size_t amt)
646 {
647         b->extra_len -= amt;
648         q->len -= amt;
649         q->dlen -= amt;
650 }
651
652 /* Helper: moves ebd from a block (in from_q) to another block.  The *ebd is
653  * fixed in 'from', so we move its contents and zero it out in 'from'.
654  *
655  * Returns the length moved (0 on failure). */
656 static size_t move_ebd(struct extra_bdata *ebd, struct block *to,
657                        struct block *from, struct queue *from_q)
658 {
659         size_t ret = ebd->len;
660
661         if (block_append_extra(to, ebd->base, ebd->off, ebd->len, MEM_ATOMIC))
662                 return 0;
663         block_and_q_lost_extra(from, from_q, ebd->len);
664         ebd->base = ebd->len = ebd->off = 0;
665         return ret;
666 }
667
668 /* Copy up to len bytes from q->bfirst to @to, leaving the block in place.  May
669  * return with less than len, but greater than 0, even if there is more
670  * available in q.
671  *
672  * At any moment that we have copied anything and things are tricky, we can just
673  * return.  The trickiness comes from a bunch of variables: is the main body
674  * empty?  How do we split the ebd?  If our alloc fails, then we can fall back
675  * to @to's main body, but only if we haven't used it yet. */
676 static size_t copy_from_first_block(struct queue *q, struct block *to,
677                                     size_t len)
678 {
679         struct block *from = q->bfirst;
680         size_t copy_amt, amt;
681         struct extra_bdata *ebd;
682
683         assert(len < BLEN(from));       /* sanity */
684         /* Try to extract from the main body */
685         copy_amt = MIN(BHLEN(from), len);
686         if (copy_amt) {
687                 copy_amt = copy_to_block_body(to, from->rp, copy_amt);
688                 from->rp += copy_amt;
689                 /* We only change dlen, (data len), not q->len, since the q still has
690                  * the same block memory allocation (no kfrees happened) */
691                 q->dlen -= copy_amt;
692         }
693         /* Try to extract the remainder from the extra data */
694         len -= copy_amt;
695         for (int i = 0; (i < from->nr_extra_bufs) && len; i++) {
696                 ebd = &from->extra_data[i];
697                 if (!ebd->base || !ebd->len)
698                         continue;
699                 if (len >= ebd->len) {
700                         amt = move_ebd(ebd, to, from, q);
701                         if (!amt) {
702                                 /* our internal alloc could have failed.   this ebd is now the
703                                  * last one we'll consider.  let's handle it separately and put
704                                  * it in the main body. */
705                                 if (copy_amt)
706                                         return copy_amt;
707                                 copy_amt = copy_to_block_body(to, (void*)ebd->base + ebd->off,
708                                                               ebd->len);
709                                 block_and_q_lost_extra(from, q, copy_amt);
710                                 break;
711                         }
712                         len -= amt;
713                         copy_amt += amt;
714                         continue;
715                 } else {
716                         /* If we're here, we reached our final ebd, which we'll need to
717                          * split to get anything from it. */
718                         if (copy_amt)
719                                 return copy_amt;
720                         copy_amt = copy_to_block_body(to, (void*)ebd->base + ebd->off,
721                                                       len);
722                         ebd->off += copy_amt;
723                         ebd->len -= copy_amt;
724                         block_and_q_lost_extra(from, q, copy_amt);
725                         break;
726                 }
727         }
728         if (len)
729                 assert(copy_amt);       /* sanity */
730         return copy_amt;
731 }
732
733 /* Return codes for __qbread and __try_qbread. */
734 enum {
735         QBR_OK,
736         QBR_FAIL,
737         QBR_SPARE,      /* we need a spare block */
738         QBR_AGAIN,      /* do it again, we are coalescing blocks */
739 };
740
741 /* Helper and back-end for __qbread: extracts and returns a list of blocks
742  * containing up to len bytes.  It may contain less than len even if q has more
743  * data.
744  *
745  * Returns a code interpreted by __qbread, and the returned blist in ret. */
746 static int __try_qbread(struct queue *q, size_t len, int qio_flags,
747                         struct block **real_ret, struct block *spare)
748 {
749         struct block *ret, *ret_last, *first;
750         size_t blen;
751
752         if (qio_flags & QIO_CAN_ERR_SLEEP) {
753                 if (!qwait_and_ilock(q, qio_flags)) {
754                         spin_unlock_irqsave(&q->lock);
755                         return QBR_FAIL;
756                 }
757                 /* we qwaited and still hold the lock, so the q is not empty */
758                 first = q->bfirst;
759         } else {
760                 spin_lock_irqsave(&q->lock);
761                 first = q->bfirst;
762                 if (!first) {
763                         spin_unlock_irqsave(&q->lock);
764                         return QBR_FAIL;
765                 }
766         }
767         blen = BLEN(first);
768         if ((q->state & Qcoalesce) && (blen == 0)) {
769                 freeb(pop_first_block(q));
770                 spin_unlock_irqsave(&q->lock);
771                 /* Need to retry to make sure we have a first block */
772                 return QBR_AGAIN;
773         }
774         /* Qmsg is a bit weird.  The old 9ns code seemed to yank the entire block,
775          * regardless of len.  We'll do the same, and just return the minimum: the
776          * first block.  I'd be happy to remove this. */
777         if (q->state & Qmsg) {
778                 ret = pop_first_block(q);
779                 goto out_ok;
780         }
781         /* Let's get at least something first - makes the code easier.  This way,
782          * we'll only ever split the block once. */
783         if (blen <= len) {
784                 ret = pop_first_block(q);
785                 len -= blen;
786         } else {
787                 /* need to split the block.  we won't actually take the first block out
788                  * of the queue - we're just extracting a little bit. */
789                 if (!spare) {
790                         /* We have nothing and need a spare block.  Retry! */
791                         spin_unlock_irqsave(&q->lock);
792                         return QBR_SPARE;
793                 }
794                 copy_from_first_block(q, spare, len);
795                 ret = spare;
796                 goto out_ok;
797         }
798         /* At this point, we just grabbed the first block.  We can try to grab some
799          * more, up to len (if they want). */
800         if (qio_flags & QIO_JUST_ONE_BLOCK)
801                 goto out_ok;
802         ret_last = ret;
803         while (q->bfirst && (len > 0)) {
804                 blen = BLEN(q->bfirst);
805                 if ((q->state & Qcoalesce) && (blen == 0)) {
806                         /* remove the intermediate 0 blocks */
807                         freeb(pop_first_block(q));
808                         continue;
809                 }
810                 if (blen > len) {
811                         /* We could try to split the block, but that's a huge pain.  For
812                          * instance, we might need to move the main body of b into an
813                          * extra_data of ret_last.  lots of ways for that to fail, and lots
814                          * of cases to consider.  Easier to just bail out.  This is why I
815                          * did the first block above: we don't need to worry about this. */
816                          break;
817                 }
818                 ret_last->next = pop_first_block(q);
819                 ret_last = ret_last->next;
820                 len -= blen;
821         }
822 out_ok:
823         qwakeup_iunlock(q);
824         *real_ret = ret;
825         return QBR_OK;
826 }
827
828 /* Helper and front-end for __try_qbread: extracts and returns a list of blocks
829  * containing up to len bytes.  It may contain less than len even if q has more
830  * data.
831  *
832  * Returns 0 if the q is closed or would require blocking and !CAN_BLOCK.
833  *
834  * Technically, there's a weird corner case with !Qcoalesce and Qmsg where you
835  * could get a zero length block back. */
836 static struct block *__qbread(struct queue *q, size_t len, int qio_flags,
837                               int mem_flags)
838 {
839         struct block *ret = 0;
840         struct block *spare = 0;
841
842         while (1) {
843                 switch (__try_qbread(q, len, qio_flags, &ret, spare)) {
844                 case QBR_OK:
845                 case QBR_FAIL:
846                         if (spare && (ret != spare))
847                                 freeb(spare);
848                         return ret;
849                 case QBR_SPARE:
850                         assert(!spare);
851                         /* Due to some nastiness, we need a fresh block so we can read out
852                          * anything from the queue.  'len' seems like a reasonable amount.
853                          * Maybe we can get away with less. */
854                         spare = block_alloc(len, mem_flags);
855                         if (!spare)
856                                 return 0;
857                         break;
858                 case QBR_AGAIN:
859                         /* if the first block is 0 and we are Qcoalesce, then we'll need to
860                          * try again.  We bounce out of __try so we can perform the "is
861                          * there a block" logic again from the top. */
862                         break;
863                 }
864         }
865 }
866
867 /*
868  *  get next block from a queue, return null if nothing there
869  */
870 struct block *qget(struct queue *q)
871 {
872         /* since len == SIZE_MAX, we should never need to do a mem alloc */
873         return __qbread(q, SIZE_MAX, QIO_JUST_ONE_BLOCK, MEM_ATOMIC);
874 }
875
876 /* Throw away the next 'len' bytes in the queue returning the number actually
877  * discarded.
878  *
879  * If the bytes are in the queue, then they must be discarded.  The only time to
880  * return less than len is if the q itself has less than len bytes.  */
881 size_t qdiscard(struct queue *q, size_t len)
882 {
883         struct block *blist;
884         size_t removed_amt;
885         size_t sofar = 0;
886
887         /* This is racy.  There could be multiple qdiscarders or other consumers,
888          * where the consumption could be interleaved. */
889         while (qlen(q) && len) {
890                 blist = __qbread(q, len, 0, MEM_WAIT);
891                 removed_amt = freeblist(blist);
892                 sofar += removed_amt;
893                 len -= removed_amt;
894         }
895         return sofar;
896 }
897
898 ssize_t qpass(struct queue *q, struct block *b)
899 {
900         return __qbwrite(q, b, QIO_LIMIT | QIO_DROP_OVERFLOW);
901 }
902
903 ssize_t qpassnolim(struct queue *q, struct block *b)
904 {
905         return __qbwrite(q, b, 0);
906 }
907
908 /*
909  *  if the allocated space is way out of line with the used
910  *  space, reallocate to a smaller block
911  */
912 struct block *packblock(struct block *bp)
913 {
914         struct block **l, *nbp;
915         int n;
916
917         if (bp->extra_len)
918                 return bp;
919         for (l = &bp; *l; l = &(*l)->next) {
920                 nbp = *l;
921                 n = BLEN(nbp);
922                 if ((n << 2) < BALLOC(nbp)) {
923                         *l = block_alloc(n, MEM_WAIT);
924                         memmove((*l)->wp, nbp->rp, n);
925                         (*l)->wp += n;
926                         (*l)->next = nbp->next;
927                         freeb(nbp);
928                 }
929         }
930
931         return bp;
932 }
933
934 /* Add an extra_data entry to newb at newb_idx pointing to b's body, starting at
935  * body_rp, for up to len.  Returns the len consumed. 
936  *
937  * The base is 'b', so that we can kfree it later.  This currently ties us to
938  * using kfree for the release method for all extra_data.
939  *
940  * It is possible to have a body size that is 0, if there is no offset, and
941  * b->wp == b->rp.  This will have an extra data entry of 0 length. */
942 static size_t point_to_body(struct block *b, uint8_t *body_rp,
943                             struct block *newb, unsigned int newb_idx,
944                             size_t len)
945 {
946         struct extra_bdata *ebd = &newb->extra_data[newb_idx];
947
948         assert(newb_idx < newb->nr_extra_bufs);
949
950         kmalloc_incref(b);
951         ebd->base = (uintptr_t)b;
952         ebd->off = (uint32_t)(body_rp - (uint8_t*)b);
953         ebd->len = MIN(b->wp - body_rp, len);   /* think of body_rp as b->rp */
954         assert((int)ebd->len >= 0);
955         newb->extra_len += ebd->len;
956         return ebd->len;
957 }
958
959 /* Add an extra_data entry to newb at newb_idx pointing to b's b_idx'th
960  * extra_data buf, at b_off within that buffer, for up to len.  Returns the len
961  * consumed.
962  *
963  * We can have blocks with 0 length, but they are still refcnt'd.  See above. */
964 static size_t point_to_buf(struct block *b, unsigned int b_idx, uint32_t b_off,
965                            struct block *newb, unsigned int newb_idx,
966                            size_t len)
967 {
968         struct extra_bdata *n_ebd = &newb->extra_data[newb_idx];
969         struct extra_bdata *b_ebd = &b->extra_data[b_idx];
970
971         assert(b_idx < b->nr_extra_bufs);
972         assert(newb_idx < newb->nr_extra_bufs);
973
974         kmalloc_incref((void*)b_ebd->base);
975         n_ebd->base = b_ebd->base;
976         n_ebd->off = b_ebd->off + b_off;
977         n_ebd->len = MIN(b_ebd->len - b_off, len);
978         newb->extra_len += n_ebd->len;
979         return n_ebd->len;
980 }
981
982 /* given a string of blocks, fills the new block's extra_data  with the contents
983  * of the blist [offset, len + offset)
984  *
985  * returns 0 on success.  the only failure is if the extra_data array was too
986  * small, so this returns a positive integer saying how big the extra_data needs
987  * to be.
988  *
989  * callers are responsible for protecting the list structure. */
990 static int __blist_clone_to(struct block *blist, struct block *newb, int len,
991                             uint32_t offset)
992 {
993         struct block *b, *first;
994         unsigned int nr_bufs = 0;
995         unsigned int b_idx, newb_idx = 0;
996         uint8_t *first_main_body = 0;
997
998         /* find the first block; keep offset relative to the latest b in the list */
999         for (b = blist; b; b = b->next) {
1000                 if (BLEN(b) > offset)
1001                         break;
1002                 offset -= BLEN(b);
1003         }
1004         /* qcopy semantics: if you asked for an offset outside the block list, you
1005          * get an empty block back */
1006         if (!b)
1007                 return 0;
1008         first = b;
1009         /* upper bound for how many buffers we'll need in newb */
1010         for (/* b is set*/; b; b = b->next) {
1011                 nr_bufs += 1 + b->nr_extra_bufs;        /* 1 for the main body */
1012         }
1013         /* we might be holding a spinlock here, so we won't wait for kmalloc */
1014         if (block_add_extd(newb, nr_bufs, 0) != 0) {
1015                 /* caller will need to alloc these, then re-call us */
1016                 return nr_bufs;
1017         }
1018         for (b = first; b && len; b = b->next) {
1019                 b_idx = 0;
1020                 if (offset) {
1021                         if (offset < BHLEN(b)) {
1022                                 /* off is in the main body */
1023                                 len -= point_to_body(b, b->rp + offset, newb, newb_idx, len);
1024                                 newb_idx++;
1025                         } else {
1026                                 /* off is in one of the buffers (or just past the last one).
1027                                  * we're not going to point to b's main body at all. */
1028                                 offset -= BHLEN(b);
1029                                 assert(b->extra_data);
1030                                 /* assuming these extrabufs are packed, or at least that len
1031                                  * isn't gibberish */
1032                                 while (b->extra_data[b_idx].len <= offset) {
1033                                         offset -= b->extra_data[b_idx].len;
1034                                         b_idx++;
1035                                 }
1036                                 /* now offset is set to our offset in the b_idx'th buf */
1037                                 len -= point_to_buf(b, b_idx, offset, newb, newb_idx, len);
1038                                 newb_idx++;
1039                                 b_idx++;
1040                         }
1041                         offset = 0;
1042                 } else {
1043                         len -= point_to_body(b, b->rp, newb, newb_idx, len);
1044                         newb_idx++;
1045                 }
1046                 /* knock out all remaining bufs.  we only did one point_to_ op by now,
1047                  * and any point_to_ could be our last if it consumed all of len. */
1048                 for (int i = b_idx; (i < b->nr_extra_bufs) && len; i++) {
1049                         len -= point_to_buf(b, i, 0, newb, newb_idx, len);
1050                         newb_idx++;
1051                 }
1052         }
1053         return 0;
1054 }
1055
1056 struct block *blist_clone(struct block *blist, int header_len, int len,
1057                           uint32_t offset)
1058 {
1059         int ret;
1060         struct block *newb = block_alloc(header_len, MEM_WAIT);
1061         do {
1062                 ret = __blist_clone_to(blist, newb, len, offset);
1063                 if (ret)
1064                         block_add_extd(newb, ret, MEM_WAIT);
1065         } while (ret);
1066         return newb;
1067 }
1068
1069 /* given a queue, makes a single block with header_len reserved space in the
1070  * block main body, and the contents of [offset, len + offset) pointed to in the
1071  * new blocks ext_data. */
1072 struct block *qclone(struct queue *q, int header_len, int len, uint32_t offset)
1073 {
1074         int ret;
1075         struct block *newb = block_alloc(header_len, MEM_WAIT);
1076         /* the while loop should rarely be used: it would require someone
1077          * concurrently adding to the queue. */
1078         do {
1079                 /* TODO: RCU: protecting the q list (b->next) (need read lock) */
1080                 spin_lock_irqsave(&q->lock);
1081                 ret = __blist_clone_to(q->bfirst, newb, len, offset);
1082                 spin_unlock_irqsave(&q->lock);
1083                 if (ret)
1084                         block_add_extd(newb, ret, MEM_WAIT);
1085         } while (ret);
1086         return newb;
1087 }
1088
1089 /*
1090  *  copy from offset in the queue
1091  */
1092 struct block *qcopy_old(struct queue *q, int len, uint32_t offset)
1093 {
1094         int sofar;
1095         int n;
1096         struct block *b, *nb;
1097         uint8_t *p;
1098
1099         nb = block_alloc(len, MEM_WAIT);
1100
1101         spin_lock_irqsave(&q->lock);
1102
1103         /* go to offset */
1104         b = q->bfirst;
1105         for (sofar = 0;; sofar += n) {
1106                 if (b == NULL) {
1107                         spin_unlock_irqsave(&q->lock);
1108                         return nb;
1109                 }
1110                 n = BLEN(b);
1111                 if (sofar + n > offset) {
1112                         p = b->rp + offset - sofar;
1113                         n -= offset - sofar;
1114                         break;
1115                 }
1116                 QDEBUG checkb(b, "qcopy");
1117                 b = b->next;
1118         }
1119
1120         /* copy bytes from there */
1121         for (sofar = 0; sofar < len;) {
1122                 if (n > len - sofar)
1123                         n = len - sofar;
1124                 PANIC_EXTRA(b);
1125                 memmove(nb->wp, p, n);
1126                 qcopycnt += n;
1127                 sofar += n;
1128                 nb->wp += n;
1129                 b = b->next;
1130                 if (b == NULL)
1131                         break;
1132                 n = BLEN(b);
1133                 p = b->rp;
1134         }
1135         spin_unlock_irqsave(&q->lock);
1136
1137         return nb;
1138 }
1139
1140 struct block *qcopy(struct queue *q, int len, uint32_t offset)
1141 {
1142 #ifdef CONFIG_BLOCK_EXTRAS
1143         return qclone(q, 0, len, offset);
1144 #else
1145         return qcopy_old(q, len, offset);
1146 #endif
1147 }
1148
1149 static void qinit_common(struct queue *q)
1150 {
1151         spinlock_init_irqsave(&q->lock);
1152         rendez_init(&q->rr);
1153         rendez_init(&q->wr);
1154 }
1155
1156 /*
1157  *  called by non-interrupt code
1158  */
1159 struct queue *qopen(int limit, int msg, void (*kick) (void *), void *arg)
1160 {
1161         struct queue *q;
1162
1163         q = kzmalloc(sizeof(struct queue), 0);
1164         if (q == 0)
1165                 return 0;
1166         qinit_common(q);
1167
1168         q->limit = q->inilim = limit;
1169         q->kick = kick;
1170         q->arg = arg;
1171         q->state = msg;
1172         q->state |= Qstarve;
1173         q->eof = 0;
1174
1175         return q;
1176 }
1177
1178 /* open a queue to be bypassed */
1179 struct queue *qbypass(void (*bypass) (void *, struct block *), void *arg)
1180 {
1181         struct queue *q;
1182
1183         q = kzmalloc(sizeof(struct queue), 0);
1184         if (q == 0)
1185                 return 0;
1186         qinit_common(q);
1187
1188         q->limit = 0;
1189         q->arg = arg;
1190         q->bypass = bypass;
1191         q->state = 0;
1192
1193         return q;
1194 }
1195
1196 static int notempty(void *a)
1197 {
1198         struct queue *q = a;
1199
1200         return (q->state & Qclosed) || q->bfirst != 0;
1201 }
1202
1203 /* Block, waiting for the queue to be non-empty or closed.  Returns with
1204  * the spinlock held.  Returns TRUE when there queue is not empty, FALSE if it
1205  * was naturally closed.  Throws an error o/w. */
1206 static bool qwait_and_ilock(struct queue *q, int qio_flags)
1207 {
1208         while (1) {
1209                 spin_lock_irqsave(&q->lock);
1210                 if (q->bfirst != NULL)
1211                         return TRUE;
1212                 if (q->state & Qclosed) {
1213                         if (++q->eof > 3) {
1214                                 spin_unlock_irqsave(&q->lock);
1215                                 error(EPIPE, "multiple reads on a closed queue");
1216                         }
1217                         if (q->err[0]) {
1218                                 spin_unlock_irqsave(&q->lock);
1219                                 error(EPIPE, q->err);
1220                         }
1221                         return FALSE;
1222                 }
1223                 /* We set Qstarve regardless of whether we are non-blocking or not.
1224                  * Qstarve tracks the edge detection of the queue being empty. */
1225                 q->state |= Qstarve;
1226                 if (qio_flags & QIO_NON_BLOCK) {
1227                         spin_unlock_irqsave(&q->lock);
1228                         error(EAGAIN, "queue empty");
1229                 }
1230                 spin_unlock_irqsave(&q->lock);
1231                 /* may throw an error() */
1232                 rendez_sleep(&q->rr, notempty, q);
1233         }
1234 }
1235
1236 /*
1237  * add a block list to a queue
1238  */
1239 void qaddlist(struct queue *q, struct block *b)
1240 {
1241         /* TODO: q lock? */
1242         /* queue the block */
1243         if (q->bfirst)
1244                 q->blast->next = b;
1245         else
1246                 q->bfirst = b;
1247         q->len += blockalloclen(b);
1248         q->dlen += blocklen(b);
1249         while (b->next)
1250                 b = b->next;
1251         q->blast = b;
1252 }
1253
1254 static size_t read_from_block(struct block *b, uint8_t *to, size_t amt)
1255 {
1256         size_t copy_amt, retval = 0;
1257         struct extra_bdata *ebd;
1258         
1259         copy_amt = MIN(BHLEN(b), amt);
1260         memcpy(to, b->rp, copy_amt);
1261         /* advance the rp, since this block not be completely consumed and future
1262          * reads need to know where to pick up from */
1263         b->rp += copy_amt;
1264         to += copy_amt;
1265         amt -= copy_amt;
1266         retval += copy_amt;
1267         for (int i = 0; (i < b->nr_extra_bufs) && amt; i++) {
1268                 ebd = &b->extra_data[i];
1269                 /* skip empty entires.  if we track this in the struct block, we can
1270                  * just start the for loop early */
1271                 if (!ebd->base || !ebd->len)
1272                         continue;
1273                 copy_amt = MIN(ebd->len, amt);
1274                 memcpy(to, (void*)(ebd->base + ebd->off), copy_amt);
1275                 /* we're actually consuming the entries, just like how we advance rp up
1276                  * above, and might only consume part of one. */
1277                 ebd->len -= copy_amt;
1278                 ebd->off += copy_amt;
1279                 b->extra_len -= copy_amt;
1280                 if (!ebd->len) {
1281                         /* we don't actually have to decref here.  it's also done in
1282                          * freeb().  this is the earliest we can free. */
1283                         kfree((void*)ebd->base);
1284                         ebd->base = ebd->off = 0;
1285                 }
1286                 to += copy_amt;
1287                 amt -= copy_amt;
1288                 retval += copy_amt;
1289         }
1290         return retval;
1291 }
1292
1293 /*
1294  *  copy the contents of a string of blocks into
1295  *  memory.  emptied blocks are freed.  return
1296  *  pointer to first unconsumed block.
1297  */
1298 struct block *bl2mem(uint8_t * p, struct block *b, int n)
1299 {
1300         int i;
1301         struct block *next;
1302
1303         /* could be slicker here, since read_from_block is smart */
1304         for (; b != NULL; b = next) {
1305                 i = BLEN(b);
1306                 if (i > n) {
1307                         /* partial block, consume some */
1308                         read_from_block(b, p, n);
1309                         return b;
1310                 }
1311                 /* full block, consume all and move on */
1312                 i = read_from_block(b, p, i);
1313                 n -= i;
1314                 p += i;
1315                 next = b->next;
1316                 freeb(b);
1317         }
1318         return NULL;
1319 }
1320
1321 /* Extract the contents of all blocks and copy to va, up to len.  Returns the
1322  * actual amount copied. */
1323 static size_t read_all_blocks(struct block *b, void *va, size_t len)
1324 {
1325         size_t sofar = 0;
1326         struct block *next;
1327
1328         do {
1329                 /* We should be draining every block completely. */
1330                 assert(BLEN(b) <= len - sofar);
1331                 sofar += read_from_block(b, va + sofar, len - sofar);
1332                 next = b->next;
1333                 freeb(b);
1334                 b = next;
1335         } while (b);
1336         return sofar;
1337 }
1338
1339 /*
1340  *  copy the contents of memory into a string of blocks.
1341  *  return NULL on error.
1342  */
1343 struct block *mem2bl(uint8_t * p, int len)
1344 {
1345         ERRSTACK(1);
1346         int n;
1347         struct block *b, *first, **l;
1348
1349         first = NULL;
1350         l = &first;
1351         if (waserror()) {
1352                 freeblist(first);
1353                 nexterror();
1354         }
1355         do {
1356                 n = len;
1357                 if (n > Maxatomic)
1358                         n = Maxatomic;
1359
1360                 *l = b = block_alloc(n, MEM_WAIT);
1361                 /* TODO consider extra_data */
1362                 memmove(b->wp, p, n);
1363                 b->wp += n;
1364                 p += n;
1365                 len -= n;
1366                 l = &b->next;
1367         } while (len > 0);
1368         poperror();
1369
1370         return first;
1371 }
1372
1373 /*
1374  *  put a block back to the front of the queue
1375  *  called with q ilocked
1376  */
1377 void qputback(struct queue *q, struct block *b)
1378 {
1379         b->next = q->bfirst;
1380         if (q->bfirst == NULL)
1381                 q->blast = b;
1382         q->bfirst = b;
1383         q->len += BALLOC(b);
1384         q->dlen += BLEN(b);
1385 }
1386
1387 /*
1388  *  flow control, get producer going again
1389  *  called with q ilocked
1390  */
1391 static void qwakeup_iunlock(struct queue *q)
1392 {
1393         int dowakeup = 0;
1394
1395         /*
1396          *  if writer flow controlled, restart
1397          *
1398          *  This used to be
1399          *  q->len < q->limit/2
1400          *  but it slows down tcp too much for certain write sizes.
1401          *  I really don't understand it completely.  It may be
1402          *  due to the queue draining so fast that the transmission
1403          *  stalls waiting for the app to produce more data.  - presotto
1404          */
1405         if ((q->state & Qflow) && q->len < q->limit) {
1406                 q->state &= ~Qflow;
1407                 dowakeup = 1;
1408         }
1409
1410         spin_unlock_irqsave(&q->lock);
1411
1412         /* wakeup flow controlled writers */
1413         if (dowakeup) {
1414                 if (q->kick)
1415                         q->kick(q->arg);
1416                 rendez_wakeup(&q->wr);
1417         }
1418         qwake_cb(q, FDTAP_FILT_WRITABLE);
1419 }
1420
1421 /*
1422  *  get next block from a queue (up to a limit)
1423  *
1424  */
1425 struct block *qbread(struct queue *q, size_t len)
1426 {
1427         return __qbread(q, len, QIO_JUST_ONE_BLOCK | QIO_CAN_ERR_SLEEP, MEM_WAIT);
1428 }
1429
1430 struct block *qbread_nonblock(struct queue *q, size_t len)
1431 {
1432         return __qbread(q, len, QIO_JUST_ONE_BLOCK | QIO_CAN_ERR_SLEEP |
1433                         QIO_NON_BLOCK, MEM_WAIT);
1434 }
1435
1436 /* read up to len from a queue into vp. */
1437 size_t qread(struct queue *q, void *va, size_t len)
1438 {
1439         struct block *blist = __qbread(q, len, QIO_CAN_ERR_SLEEP, MEM_WAIT);
1440
1441         if (!blist)
1442                 return 0;
1443         return read_all_blocks(blist, va, len);
1444 }
1445
1446 size_t qread_nonblock(struct queue *q, void *va, size_t len)
1447 {
1448         struct block *blist = __qbread(q, len, QIO_CAN_ERR_SLEEP | QIO_NON_BLOCK,
1449                                        MEM_WAIT);
1450
1451         if (!blist)
1452                 return 0;
1453         return read_all_blocks(blist, va, len);
1454 }
1455
1456 static int qnotfull(void *a)
1457 {
1458         struct queue *q = a;
1459
1460         return q->len < q->limit || (q->state & Qclosed);
1461 }
1462
1463 /* Helper: enqueues a list of blocks to a queue.  Returns the total length. */
1464 static size_t enqueue_blist(struct queue *q, struct block *b)
1465 {
1466         size_t len, dlen;
1467
1468         if (q->bfirst)
1469                 q->blast->next = b;
1470         else
1471                 q->bfirst = b;
1472         len = BALLOC(b);
1473         dlen = BLEN(b);
1474         while (b->next) {
1475                 b = b->next;
1476                 len += BALLOC(b);
1477                 dlen += BLEN(b);
1478         }
1479         q->blast = b;
1480         q->len += len;
1481         q->dlen += dlen;
1482         return dlen;
1483 }
1484
1485 /* Adds block (which can be a list of blocks) to the queue, subject to
1486  * qio_flags.  Returns the length written on success or -1 on non-throwable
1487  * error.  Adjust qio_flags to control the value-added features!. */
1488 static ssize_t __qbwrite(struct queue *q, struct block *b, int qio_flags)
1489 {
1490         ssize_t ret;
1491         bool dowakeup = FALSE;
1492         bool was_empty;
1493
1494         if (q->bypass) {
1495                 ret = blocklen(b);
1496                 (*q->bypass) (q->arg, b);
1497                 return ret;
1498         }
1499         spin_lock_irqsave(&q->lock);
1500         was_empty = q->len == 0;
1501         if (q->state & Qclosed) {
1502                 spin_unlock_irqsave(&q->lock);
1503                 freeblist(b);
1504                 if (!(qio_flags & QIO_CAN_ERR_SLEEP))
1505                         return -1;
1506                 if (q->err[0])
1507                         error(EPIPE, q->err);
1508                 else
1509                         error(EPIPE, "connection closed");
1510         }
1511         if ((qio_flags & QIO_LIMIT) && (q->len >= q->limit)) {
1512                 /* drop overflow takes priority over regular non-blocking */
1513                 if ((qio_flags & QIO_DROP_OVERFLOW) || (q->state & Qdropoverflow)) {
1514                         spin_unlock_irqsave(&q->lock);
1515                         freeb(b);
1516                         return -1;
1517                 }
1518                 /* People shouldn't set NON_BLOCK without CAN_ERR, but we can be nice
1519                  * and catch it. */
1520                 if ((qio_flags & QIO_CAN_ERR_SLEEP) && (qio_flags & QIO_NON_BLOCK)) {
1521                         spin_unlock_irqsave(&q->lock);
1522                         freeb(b);
1523                         error(EAGAIN, "queue full");
1524                 }
1525         }
1526         ret = enqueue_blist(q, b);
1527         QDEBUG checkb(b, "__qbwrite");
1528         /* make sure other end gets awakened */
1529         if (q->state & Qstarve) {
1530                 q->state &= ~Qstarve;
1531                 dowakeup = TRUE;
1532         }
1533         spin_unlock_irqsave(&q->lock);
1534         /* TODO: not sure if the usage of a kick is mutually exclusive with a
1535          * wakeup, meaning that actual users either want a kick or have qreaders. */
1536         if (q->kick && (dowakeup || (q->state & Qkick)))
1537                 q->kick(q->arg);
1538         if (dowakeup)
1539                 rendez_wakeup(&q->rr);
1540         if (was_empty)
1541                 qwake_cb(q, FDTAP_FILT_READABLE);
1542         /*
1543          *  flow control, wait for queue to get below the limit
1544          *  before allowing the process to continue and queue
1545          *  more.  We do this here so that postnote can only
1546          *  interrupt us after the data has been queued.  This
1547          *  means that things like 9p flushes and ssl messages
1548          *  will not be disrupted by software interrupts.
1549          *
1550          *  Note - this is moderately dangerous since a process
1551          *  that keeps getting interrupted and rewriting will
1552          *  queue infinite crud.
1553          */
1554         if ((qio_flags & QIO_CAN_ERR_SLEEP) &&
1555             !(q->state & Qdropoverflow) && !(qio_flags & QIO_NON_BLOCK)) {
1556                 /* This is a racy peek at the q status.  If we accidentally block, we
1557                  * set Qflow, so someone should wake us.  If we accidentally don't
1558                  * block, we just returned to the user and let them slip a block past
1559                  * flow control. */
1560                 while (!qnotfull(q)) {
1561                         spin_lock_irqsave(&q->lock);
1562                         q->state |= Qflow;
1563                         spin_unlock_irqsave(&q->lock);
1564                         rendez_sleep(&q->wr, qnotfull, q);
1565                 }
1566         }
1567         return ret;
1568 }
1569
1570 /*
1571  *  add a block to a queue obeying flow control
1572  */
1573 ssize_t qbwrite(struct queue *q, struct block *b)
1574 {
1575         return __qbwrite(q, b, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
1576 }
1577
1578 ssize_t qbwrite_nonblock(struct queue *q, struct block *b)
1579 {
1580         return __qbwrite(q, b, QIO_CAN_ERR_SLEEP | QIO_LIMIT | QIO_NON_BLOCK);
1581 }
1582
1583 ssize_t qibwrite(struct queue *q, struct block *b)
1584 {
1585         return __qbwrite(q, b, 0);
1586 }
1587
1588 /* Helper, allocs a block and copies [from, from + len) into it.  Returns the
1589  * block on success, 0 on failure. */
1590 static struct block *build_block(void *from, size_t len, int mem_flags)
1591 {
1592         struct block *b;
1593         void *ext_buf;
1594
1595         /* If len is small, we don't need to bother with the extra_data.  But until
1596          * the whole stack can handle extd blocks, we'll use them unconditionally.
1597          * */
1598 #ifdef CONFIG_BLOCK_EXTRAS
1599         /* allocb builds in 128 bytes of header space to all blocks, but this is
1600          * only available via padblock (to the left).  we also need some space
1601          * for pullupblock for some basic headers (like icmp) that get written
1602          * in directly */
1603         b = block_alloc(64, mem_flags);
1604         if (!b)
1605                 return 0;
1606         ext_buf = kmalloc(len, mem_flags);
1607         if (!ext_buf) {
1608                 kfree(b);
1609                 return 0;
1610         }
1611         memcpy(ext_buf, from, len);
1612         if (block_add_extd(b, 1, mem_flags)) {
1613                 kfree(ext_buf);
1614                 kfree(b);
1615                 return 0;
1616         }
1617         b->extra_data[0].base = (uintptr_t)ext_buf;
1618         b->extra_data[0].off = 0;
1619         b->extra_data[0].len = len;
1620         b->extra_len += len;
1621 #else
1622         b = block_alloc(n, mem_flags);
1623         if (!b)
1624                 return 0;
1625         memmove(b->wp, from, len);
1626         b->wp += len;
1627 #endif
1628         return b;
1629 }
1630
1631 static ssize_t __qwrite(struct queue *q, void *vp, size_t len, int mem_flags,
1632                         int qio_flags)
1633 {
1634         size_t n, sofar;
1635         struct block *b;
1636         uint8_t *p = vp;
1637         void *ext_buf;
1638
1639         sofar = 0;
1640         do {
1641                 n = len - sofar;
1642                 /* This is 64K, the max amount per single block.  Still a good value? */
1643                 if (n > Maxatomic)
1644                         n = Maxatomic;
1645                 b = build_block(p + sofar, n, mem_flags);
1646                 if (!b)
1647                         break;
1648                 if (__qbwrite(q, b, qio_flags) < 0)
1649                         break;
1650                 sofar += n;
1651         } while ((sofar < len) && (q->state & Qmsg) == 0);
1652         return sofar;
1653 }
1654
1655 ssize_t qwrite(struct queue *q, void *vp, int len)
1656 {
1657         return __qwrite(q, vp, len, MEM_WAIT, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
1658 }
1659
1660 ssize_t qwrite_nonblock(struct queue *q, void *vp, int len)
1661 {
1662         return __qwrite(q, vp, len, MEM_WAIT, QIO_CAN_ERR_SLEEP | QIO_LIMIT |
1663                                               QIO_NON_BLOCK);
1664 }
1665
1666 ssize_t qiwrite(struct queue *q, void *vp, int len)
1667 {
1668         return __qwrite(q, vp, len, MEM_ATOMIC, 0);
1669 }
1670
1671 /*
1672  *  be extremely careful when calling this,
1673  *  as there is no reference accounting
1674  */
1675 void qfree(struct queue *q)
1676 {
1677         qclose(q);
1678         kfree(q);
1679 }
1680
1681 /*
1682  *  Mark a queue as closed.  No further IO is permitted.
1683  *  All blocks are released.
1684  */
1685 void qclose(struct queue *q)
1686 {
1687         struct block *bfirst;
1688
1689         if (q == NULL)
1690                 return;
1691
1692         /* mark it */
1693         spin_lock_irqsave(&q->lock);
1694         q->state |= Qclosed;
1695         q->state &= ~(Qflow | Qstarve | Qdropoverflow);
1696         q->err[0] = 0;
1697         bfirst = q->bfirst;
1698         q->bfirst = 0;
1699         q->len = 0;
1700         q->dlen = 0;
1701         spin_unlock_irqsave(&q->lock);
1702
1703         /* free queued blocks */
1704         freeblist(bfirst);
1705
1706         /* wake up readers/writers */
1707         rendez_wakeup(&q->rr);
1708         rendez_wakeup(&q->wr);
1709         qwake_cb(q, FDTAP_FILT_HANGUP);
1710 }
1711
1712 /* Mark a queue as closed.  Wakeup any readers.  Don't remove queued blocks.
1713  *
1714  * msg will be the errstr received by any waiters (qread, qbread, etc).  If
1715  * there is no message, which is what also happens during a natural qclose(),
1716  * those waiters will simply return 0.  qwriters will always error() on a
1717  * closed/hungup queue. */
1718 void qhangup(struct queue *q, char *msg)
1719 {
1720         /* mark it */
1721         spin_lock_irqsave(&q->lock);
1722         q->state |= Qclosed;
1723         if (msg == 0 || *msg == 0)
1724                 q->err[0] = 0;
1725         else
1726                 strlcpy(q->err, msg, ERRMAX);
1727         spin_unlock_irqsave(&q->lock);
1728
1729         /* wake up readers/writers */
1730         rendez_wakeup(&q->rr);
1731         rendez_wakeup(&q->wr);
1732         qwake_cb(q, FDTAP_FILT_HANGUP);
1733 }
1734
1735 /*
1736  *  return non-zero if the q is hungup
1737  */
1738 int qisclosed(struct queue *q)
1739 {
1740         return q->state & Qclosed;
1741 }
1742
1743 /*
1744  *  mark a queue as no longer hung up.  resets the wake_cb.
1745  */
1746 void qreopen(struct queue *q)
1747 {
1748         spin_lock_irqsave(&q->lock);
1749         q->state &= ~Qclosed;
1750         q->state |= Qstarve;
1751         q->eof = 0;
1752         q->limit = q->inilim;
1753         q->wake_cb = 0;
1754         q->wake_data = 0;
1755         spin_unlock_irqsave(&q->lock);
1756 }
1757
1758 /*
1759  *  return bytes queued
1760  */
1761 int qlen(struct queue *q)
1762 {
1763         return q->dlen;
1764 }
1765
1766 /*
1767  * return space remaining before flow control
1768  */
1769 int qwindow(struct queue *q)
1770 {
1771         int l;
1772
1773         l = q->limit - q->len;
1774         if (l < 0)
1775                 l = 0;
1776         return l;
1777 }
1778
1779 /*
1780  *  return true if we can read without blocking
1781  */
1782 int qcanread(struct queue *q)
1783 {
1784         return q->bfirst != 0;
1785 }
1786
1787 /*
1788  *  change queue limit
1789  */
1790 void qsetlimit(struct queue *q, int limit)
1791 {
1792         q->limit = limit;
1793 }
1794
1795 /*
1796  *  set whether writes drop overflowing blocks, or if we sleep
1797  */
1798 void qdropoverflow(struct queue *q, bool onoff)
1799 {
1800         if (onoff)
1801                 q->state |= Qdropoverflow;
1802         else
1803                 q->state &= ~Qdropoverflow;
1804 }
1805
1806 /*
1807  *  flush the output queue
1808  */
1809 void qflush(struct queue *q)
1810 {
1811         struct block *bfirst;
1812
1813         /* mark it */
1814         spin_lock_irqsave(&q->lock);
1815         bfirst = q->bfirst;
1816         q->bfirst = 0;
1817         q->len = 0;
1818         q->dlen = 0;
1819         spin_unlock_irqsave(&q->lock);
1820
1821         /* free queued blocks */
1822         freeblist(bfirst);
1823
1824         /* wake up writers */
1825         rendez_wakeup(&q->wr);
1826         qwake_cb(q, FDTAP_FILT_WRITABLE);
1827 }
1828
1829 int qfull(struct queue *q)
1830 {
1831         return q->len >= q->limit;
1832 }
1833
1834 int qstate(struct queue *q)
1835 {
1836         return q->state;
1837 }
1838
1839 void qdump(struct queue *q)
1840 {
1841         if (q)
1842                 printk("q=%p bfirst=%p blast=%p len=%d dlen=%d limit=%d state=#%x\n",
1843                            q, q->bfirst, q->blast, q->len, q->dlen, q->limit, q->state);
1844 }
1845
1846 /* On certain wakeup events, qio will call func(q, data, filter), where filter
1847  * marks the type of wakeup event (flags from FDTAP).
1848  *
1849  * There's no sync protection.  If you change the CB while the qio is running,
1850  * you might get a CB with the data or func from a previous set_wake_cb.  You
1851  * should set this once per queue and forget it.
1852  *
1853  * You can remove the CB by passing in 0 for the func.  Alternatively, you can
1854  * just make sure that the func(data) pair are valid until the queue is freed or
1855  * reopened. */
1856 void qio_set_wake_cb(struct queue *q, qio_wake_cb_t func, void *data)
1857 {
1858         q->wake_data = data;
1859         wmb();  /* if we see func, we'll also see the data for it */
1860         q->wake_cb = func;
1861 }
1862
1863 /* Helper for detecting whether we'll block on a read at this instant. */
1864 bool qreadable(struct queue *q)
1865 {
1866         return qlen(q) > 0;
1867 }
1868
1869 /* Helper for detecting whether we'll block on a write at this instant. */
1870 bool qwritable(struct queue *q)
1871 {
1872         return qwindow(q) > 0;
1873 }