Make freeb() and freeblist() return the old size
[akaros.git] / kern / src / ns / qio.c
1 /* Copyright © 1994-1999 Lucent Technologies Inc.  All rights reserved.
2  * Portions Copyright © 1997-1999 Vita Nuova Limited
3  * Portions Copyright © 2000-2007 Vita Nuova Holdings Limited
4  *                                (www.vitanuova.com)
5  * Revisions Copyright © 2000-2007 Lucent Technologies Inc. and others
6  *
7  * Modified for the Akaros operating system:
8  * Copyright (c) 2013-2014 The Regents of the University of California
9  * Copyright (c) 2013-2015 Google Inc.
10  *
11  * Permission is hereby granted, free of charge, to any person obtaining a copy
12  * of this software and associated documentation files (the "Software"), to deal
13  * in the Software without restriction, including without limitation the rights
14  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15  * copies of the Software, and to permit persons to whom the Software is
16  * furnished to do so, subject to the following conditions:
17  *
18  * The above copyright notice and this permission notice shall be included in
19  * all copies or substantial portions of the Software.
20  *
21  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL THE
24  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
27  * SOFTWARE. */
28
29 #include <vfs.h>
30 #include <kfs.h>
31 #include <slab.h>
32 #include <kmalloc.h>
33 #include <kref.h>
34 #include <string.h>
35 #include <stdio.h>
36 #include <assert.h>
37 #include <error.h>
38 #include <cpio.h>
39 #include <pmap.h>
40 #include <smp.h>
41 #include <ip.h>
42
43 #define PANIC_EXTRA(b)                                                  \
44 {                                                                       \
45         if ((b)->extra_len) {                                           \
46                 printblock(b);                                          \
47                 backtrace();                                            \
48                 panic("%s doesn't handle extra_data", __FUNCTION__);    \
49         }                                                               \
50 }
51
52 static uint32_t padblockcnt;
53 static uint32_t concatblockcnt;
54 static uint32_t pullupblockcnt;
55 static uint32_t copyblockcnt;
56 static uint32_t consumecnt;
57 static uint32_t producecnt;
58 static uint32_t qcopycnt;
59
60 static int debugging;
61
62 #define QDEBUG  if(0)
63
64 /*
65  *  IO queues
66  */
67
68 struct queue {
69         spinlock_t lock;;
70
71         struct block *bfirst;           /* buffer */
72         struct block *blast;
73
74         int len;                                        /* bytes allocated to queue */
75         int dlen;                                       /* data bytes in queue */
76         int limit;                                      /* max bytes in queue */
77         int inilim;                             /* initial limit */
78         int state;
79         int eof;                                        /* number of eofs read by user */
80
81         void (*kick) (void *);          /* restart output */
82         void (*bypass) (void *, struct block *);        /* bypass queue altogether */
83         void *arg;                                      /* argument to kick */
84
85         struct rendez rr;                       /* process waiting to read */
86         struct rendez wr;                       /* process waiting to write */
87         qio_wake_cb_t wake_cb;          /* callbacks for qio wakeups */
88         void *wake_data;
89
90         char err[ERRMAX];
91 };
92
93 enum {
94         Maxatomic = 64 * 1024,
95         QIO_CAN_ERR_SLEEP = (1 << 0),   /* can throw errors or block/sleep */
96         QIO_LIMIT = (1 << 1),                   /* respect q->limit */
97         QIO_DROP_OVERFLOW = (1 << 2),   /* alternative to setting qdropoverflow */
98 };
99
100 unsigned int qiomaxatomic = Maxatomic;
101
102 static ssize_t __qbwrite(struct queue *q, struct block *b, int flags);
103
104 /* Helper: fires a wake callback, sending 'filter' */
105 static void qwake_cb(struct queue *q, int filter)
106 {
107         if (q->wake_cb)
108                 q->wake_cb(q, q->wake_data, filter);
109 }
110
111 void ixsummary(void)
112 {
113         debugging ^= 1;
114         printd("pad %lu, concat %lu, pullup %lu, copy %lu\n",
115                    padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
116         printd("consume %lu, produce %lu, qcopy %lu\n",
117                    consumecnt, producecnt, qcopycnt);
118 }
119
120 /*
121  *  pad a block to the front (or the back if size is negative)
122  */
123 struct block *padblock(struct block *bp, int size)
124 {
125         int n;
126         struct block *nbp;
127         uint8_t bcksum = bp->flag & BCKSUM_FLAGS;
128         uint16_t checksum_start = bp->checksum_start;
129         uint16_t checksum_offset = bp->checksum_offset;
130         uint16_t mss = bp->mss;
131
132         QDEBUG checkb(bp, "padblock 1");
133         if (size >= 0) {
134                 if (bp->rp - bp->base >= size) {
135                         bp->checksum_start += size;
136                         bp->rp -= size;
137                         return bp;
138                 }
139
140                 PANIC_EXTRA(bp);
141                 if (bp->next)
142                         panic("padblock %p", getcallerpc(&bp));
143                 n = BLEN(bp);
144                 padblockcnt++;
145                 nbp = block_alloc(size + n, MEM_WAIT);
146                 nbp->rp += size;
147                 nbp->wp = nbp->rp;
148                 memmove(nbp->wp, bp->rp, n);
149                 nbp->wp += n;
150                 freeb(bp);
151                 nbp->rp -= size;
152         } else {
153                 size = -size;
154
155                 PANIC_EXTRA(bp);
156
157                 if (bp->next)
158                         panic("padblock %p", getcallerpc(&bp));
159
160                 if (bp->lim - bp->wp >= size)
161                         return bp;
162
163                 n = BLEN(bp);
164                 padblockcnt++;
165                 nbp = block_alloc(size + n, MEM_WAIT);
166                 memmove(nbp->wp, bp->rp, n);
167                 nbp->wp += n;
168                 freeb(bp);
169         }
170         if (bcksum) {
171                 nbp->flag |= bcksum;
172                 nbp->checksum_start = checksum_start;
173                 nbp->checksum_offset = checksum_offset;
174                 nbp->mss = mss;
175         }
176         QDEBUG checkb(nbp, "padblock 1");
177         return nbp;
178 }
179
180 /*
181  *  return count of bytes in a string of blocks
182  */
183 int blocklen(struct block *bp)
184 {
185         int len;
186
187         len = 0;
188         while (bp) {
189                 len += BLEN(bp);
190                 bp = bp->next;
191         }
192         return len;
193 }
194
195 /*
196  * return count of space in blocks
197  */
198 int blockalloclen(struct block *bp)
199 {
200         int len;
201
202         len = 0;
203         while (bp) {
204                 len += BALLOC(bp);
205                 bp = bp->next;
206         }
207         return len;
208 }
209
210 /*
211  *  copy the  string of blocks into
212  *  a single block and free the string
213  */
214 struct block *concatblock(struct block *bp)
215 {
216         int len;
217         struct block *nb, *f;
218
219         if (bp->next == 0)
220                 return bp;
221
222         /* probably use parts of qclone */
223         PANIC_EXTRA(bp);
224         nb = block_alloc(blocklen(bp), MEM_WAIT);
225         for (f = bp; f; f = f->next) {
226                 len = BLEN(f);
227                 memmove(nb->wp, f->rp, len);
228                 nb->wp += len;
229         }
230         concatblockcnt += BLEN(nb);
231         freeblist(bp);
232         QDEBUG checkb(nb, "concatblock 1");
233         return nb;
234 }
235
236 /* Returns a block with the remaining contents of b all in the main body of the
237  * returned block.  Replace old references to b with the returned value (which
238  * may still be 'b', if no change was needed. */
239 struct block *linearizeblock(struct block *b)
240 {
241         struct block *newb;
242         size_t len;
243         struct extra_bdata *ebd;
244
245         if (!b->extra_len)
246                 return b;
247
248         newb = block_alloc(BLEN(b), MEM_WAIT);
249         len = BHLEN(b);
250         memcpy(newb->wp, b->rp, len);
251         newb->wp += len;
252         len = b->extra_len;
253         for (int i = 0; (i < b->nr_extra_bufs) && len; i++) {
254                 ebd = &b->extra_data[i];
255                 if (!ebd->base || !ebd->len)
256                         continue;
257                 memcpy(newb->wp, (void*)(ebd->base + ebd->off), ebd->len);
258                 newb->wp += ebd->len;
259                 len -= ebd->len;
260         }
261         /* TODO: any other flags that need copied over? */
262         if (b->flag & BCKSUM_FLAGS) {
263                 newb->flag |= (b->flag & BCKSUM_FLAGS);
264                 newb->checksum_start = b->checksum_start;
265                 newb->checksum_offset = b->checksum_offset;
266                 newb->mss = b->mss;
267         }
268         freeb(b);
269         return newb;
270 }
271
272 /*
273  *  make sure the first block has at least n bytes in its main body
274  */
275 struct block *pullupblock(struct block *bp, int n)
276 {
277         int i, len, seglen;
278         struct block *nbp;
279         struct extra_bdata *ebd;
280
281         /*
282          *  this should almost always be true, it's
283          *  just to avoid every caller checking.
284          */
285         if (BHLEN(bp) >= n)
286                 return bp;
287
288          /* a start at explicit main-body / header management */
289         if (bp->extra_len) {
290                 if (n > bp->lim - bp->rp) {
291                         /* would need to realloc a new block and copy everything over. */
292                         panic("can't pullup %d bytes, no place to put it: bp->lim %p, bp->rp %p, bp->lim-bp->rp %d\n",
293                                         n, bp->lim, bp->rp, bp->lim-bp->rp);
294                 }
295                 len = n - BHLEN(bp);
296                 if (len > bp->extra_len)
297                         panic("pullup more than extra (%d, %d, %d)\n",
298                               n, BHLEN(bp), bp->extra_len);
299                 checkb(bp, "before pullup");
300                 for (int i = 0; (i < bp->nr_extra_bufs) && len; i++) {
301                         ebd = &bp->extra_data[i];
302                         if (!ebd->base || !ebd->len)
303                                 continue;
304                         seglen = MIN(ebd->len, len);
305                         memcpy(bp->wp, (void*)(ebd->base + ebd->off), seglen);
306                         bp->wp += seglen;
307                         len -= seglen;
308                         ebd->len -= seglen;
309                         ebd->off += seglen;
310                         bp->extra_len -= seglen;
311                         if (ebd->len == 0) {
312                                 kfree((void *)ebd->base);
313                                 ebd->off = 0;
314                                 ebd->base = 0;
315                         }
316                 }
317                 /* maybe just call pullupblock recursively here */
318                 if (len)
319                         panic("pullup %d bytes overdrawn\n", len);
320                 checkb(bp, "after pullup");
321                 return bp;
322         }
323
324         /*
325          *  if not enough room in the first block,
326          *  add another to the front of the list.
327          */
328         if (bp->lim - bp->rp < n) {
329                 nbp = block_alloc(n, MEM_WAIT);
330                 nbp->next = bp;
331                 bp = nbp;
332         }
333
334         /*
335          *  copy bytes from the trailing blocks into the first
336          */
337         n -= BLEN(bp);
338         while ((nbp = bp->next)) {
339                 i = BLEN(nbp);
340                 if (i > n) {
341                         memmove(bp->wp, nbp->rp, n);
342                         pullupblockcnt++;
343                         bp->wp += n;
344                         nbp->rp += n;
345                         QDEBUG checkb(bp, "pullupblock 1");
346                         return bp;
347                 } else {
348                         memmove(bp->wp, nbp->rp, i);
349                         pullupblockcnt++;
350                         bp->wp += i;
351                         bp->next = nbp->next;
352                         nbp->next = 0;
353                         freeb(nbp);
354                         n -= i;
355                         if (n == 0) {
356                                 QDEBUG checkb(bp, "pullupblock 2");
357                                 return bp;
358                         }
359                 }
360         }
361         freeb(bp);
362         return 0;
363 }
364
365 /*
366  *  make sure the first block has at least n bytes in its main body
367  */
368 struct block *pullupqueue(struct queue *q, int n)
369 {
370         struct block *b;
371
372         /* TODO: lock to protect the queue links? */
373         if ((BHLEN(q->bfirst) >= n))
374                 return q->bfirst;
375         q->bfirst = pullupblock(q->bfirst, n);
376         for (b = q->bfirst; b != NULL && b->next != NULL; b = b->next) ;
377         q->blast = b;
378         return q->bfirst;
379 }
380
381 /* throw away count bytes from the front of
382  * block's extradata.  Returns count of bytes
383  * thrown away
384  */
385
386 static int pullext(struct block *bp, int count)
387 {
388         struct extra_bdata *ed;
389         int i, rem, bytes = 0;
390
391         for (i = 0; bp->extra_len && count && i < bp->nr_extra_bufs; i++) {
392                 ed = &bp->extra_data[i];
393                 rem = MIN(count, ed->len);
394                 bp->extra_len -= rem;
395                 count -= rem;
396                 bytes += rem;
397                 ed->off += rem;
398                 ed->len -= rem;
399                 if (ed->len == 0) {
400                         kfree((void *)ed->base);
401                         ed->base = 0;
402                         ed->off = 0;
403                 }
404         }
405         return bytes;
406 }
407
408 /* throw away count bytes from the end of a
409  * block's extradata.  Returns count of bytes
410  * thrown away
411  */
412
413 static int dropext(struct block *bp, int count)
414 {
415         struct extra_bdata *ed;
416         int i, rem, bytes = 0;
417
418         for (i = bp->nr_extra_bufs - 1; bp->extra_len && count && i >= 0; i--) {
419                 ed = &bp->extra_data[i];
420                 rem = MIN(count, ed->len);
421                 bp->extra_len -= rem;
422                 count -= rem;
423                 bytes += rem;
424                 ed->len -= rem;
425                 if (ed->len == 0) {
426                         kfree((void *)ed->base);
427                         ed->base = 0;
428                         ed->off = 0;
429                 }
430         }
431         return bytes;
432 }
433
434 /*
435  *  throw away up to count bytes from a
436  *  list of blocks.  Return count of bytes
437  *  thrown away.
438  */
439 static int _pullblock(struct block **bph, int count, int free)
440 {
441         struct block *bp;
442         int n, bytes;
443
444         bytes = 0;
445         if (bph == NULL)
446                 return 0;
447
448         while (*bph != NULL && count != 0) {
449                 bp = *bph;
450
451                 n = MIN(BHLEN(bp), count);
452                 bytes += n;
453                 count -= n;
454                 bp->rp += n;
455                 n = pullext(bp, count);
456                 bytes += n;
457                 count -= n;
458                 QDEBUG checkb(bp, "pullblock ");
459                 if (BLEN(bp) == 0 && (free || count)) {
460                         *bph = bp->next;
461                         bp->next = NULL;
462                         freeb(bp);
463                 }
464         }
465         return bytes;
466 }
467
468 int pullblock(struct block **bph, int count)
469 {
470         return _pullblock(bph, count, 1);
471 }
472
473 /*
474  *  trim to len bytes starting at offset
475  */
476 struct block *trimblock(struct block *bp, int offset, int len)
477 {
478         uint32_t l, trim;
479         int olen = len;
480
481         QDEBUG checkb(bp, "trimblock 1");
482         if (blocklen(bp) < offset + len) {
483                 freeblist(bp);
484                 return NULL;
485         }
486
487         l =_pullblock(&bp, offset, 0);
488         if (bp == NULL)
489                 return NULL;
490         if (l != offset) {
491                 freeblist(bp);
492                 return NULL;
493         }
494
495         while ((l = BLEN(bp)) < len) {
496                 len -= l;
497                 bp = bp->next;
498         }
499
500         trim = BLEN(bp) - len;
501         trim -= dropext(bp, trim);
502         bp->wp -= trim;
503
504         if (bp->next) {
505                 freeblist(bp->next);
506                 bp->next = NULL;
507         }
508         return bp;
509 }
510
511 /*
512  *  copy 'count' bytes into a new block
513  */
514 struct block *copyblock(struct block *bp, int count)
515 {
516         int l;
517         struct block *nbp;
518
519         QDEBUG checkb(bp, "copyblock 0");
520         nbp = block_alloc(count, MEM_WAIT);
521         if (bp->flag & BCKSUM_FLAGS) {
522                 nbp->flag |= (bp->flag & BCKSUM_FLAGS);
523                 nbp->checksum_start = bp->checksum_start;
524                 nbp->checksum_offset = bp->checksum_offset;
525                 nbp->mss = bp->mss;
526         }
527         PANIC_EXTRA(bp);
528         for (; count > 0 && bp != 0; bp = bp->next) {
529                 l = BLEN(bp);
530                 if (l > count)
531                         l = count;
532                 memmove(nbp->wp, bp->rp, l);
533                 nbp->wp += l;
534                 count -= l;
535         }
536         if (count > 0) {
537                 memset(nbp->wp, 0, count);
538                 nbp->wp += count;
539         }
540         copyblockcnt++;
541         QDEBUG checkb(nbp, "copyblock 1");
542
543         return nbp;
544 }
545
546 /* Adjust block @bp so that its size is exactly @len.
547  * If the size is increased, fill in the new contents with zeros.
548  * If the size is decreased, discard some of the old contents at the tail. */
549 struct block *adjustblock(struct block *bp, int len)
550 {
551         struct extra_bdata *ebd;
552         void *buf;
553         int i;
554
555         if (len < 0) {
556                 freeb(bp);
557                 return NULL;
558         }
559
560         if (len == BLEN(bp))
561                 return bp;
562
563         /* Shrink within block main body. */
564         if (len <= BHLEN(bp)) {
565                 free_block_extra(bp);
566                 bp->wp = bp->rp + len;
567                 QDEBUG checkb(bp, "adjustblock 1");
568                 return bp;
569         }
570
571         /* Need to grow. */
572         if (len > BLEN(bp)) {
573                 /* Grow within block main body. */
574                 if (bp->extra_len == 0 && bp->rp + len <= bp->lim) {
575                         memset(bp->wp, 0, len - BLEN(bp));
576                         bp->wp = bp->rp + len;
577                         QDEBUG checkb(bp, "adjustblock 2");
578                         return bp;
579                 }
580                 /* Grow with extra data buffers. */
581                 buf = kzmalloc(len - BLEN(bp), MEM_WAIT);
582                 block_append_extra(bp, (uintptr_t)buf, 0, len - BLEN(bp), MEM_WAIT);
583                 QDEBUG checkb(bp, "adjustblock 3");
584                 return bp;
585         }
586
587         /* Shrink extra data buffers.
588          * len is how much of ebd we need to keep.
589          * extra_len is re-accumulated. */
590         assert(bp->extra_len > 0);
591         len -= BHLEN(bp);
592         bp->extra_len = 0;
593         for (i = 0; i < bp->nr_extra_bufs; i++) {
594                 ebd = &bp->extra_data[i];
595                 if (len <= ebd->len)
596                         break;
597                 len -= ebd->len;
598                 bp->extra_len += ebd->len;
599         }
600         /* If len becomes zero, extra_data[i] should be freed. */
601         if (len > 0) {
602                 ebd = &bp->extra_data[i];
603                 ebd->len = len;
604                 bp->extra_len += ebd->len;
605                 i++;
606         }
607         for (; i < bp->nr_extra_bufs; i++) {
608                 ebd = &bp->extra_data[i];
609                 if (ebd->base)
610                         kfree((void*)ebd->base);
611                 ebd->base = ebd->off = ebd->len = 0;
612         }
613         QDEBUG checkb(bp, "adjustblock 4");
614         return bp;
615 }
616
617
618 /*
619  *  get next block from a queue, return null if nothing there
620  */
621 struct block *qget(struct queue *q)
622 {
623         int dowakeup;
624         struct block *b;
625
626         /* sync with qwrite */
627         spin_lock_irqsave(&q->lock);
628
629         b = q->bfirst;
630         if (b == NULL) {
631                 q->state |= Qstarve;
632                 spin_unlock_irqsave(&q->lock);
633                 return NULL;
634         }
635         q->bfirst = b->next;
636         b->next = 0;
637         q->len -= BALLOC(b);
638         q->dlen -= BLEN(b);
639         QDEBUG checkb(b, "qget");
640
641         /* if writer flow controlled, restart */
642         if ((q->state & Qflow) && q->len < q->limit / 2) {
643                 q->state &= ~Qflow;
644                 dowakeup = 1;
645         } else
646                 dowakeup = 0;
647
648         spin_unlock_irqsave(&q->lock);
649
650         if (dowakeup)
651                 rendez_wakeup(&q->wr);
652         qwake_cb(q, FDTAP_FILT_WRITABLE);
653
654         return b;
655 }
656
657 /*
658  *  throw away the next 'len' bytes in the queue
659  * returning the number actually discarded
660  */
661 int qdiscard(struct queue *q, int len)
662 {
663         struct block *b;
664         int dowakeup, n, sofar, body_amt, extra_amt;
665         struct extra_bdata *ebd;
666
667         spin_lock_irqsave(&q->lock);
668         for (sofar = 0; sofar < len; sofar += n) {
669                 b = q->bfirst;
670                 if (b == NULL)
671                         break;
672                 QDEBUG checkb(b, "qdiscard");
673                 n = BLEN(b);
674                 if (n <= len - sofar) {
675                         q->bfirst = b->next;
676                         b->next = 0;
677                         q->len -= BALLOC(b);
678                         q->dlen -= BLEN(b);
679                         freeb(b);
680                 } else {
681                         n = len - sofar;
682                         q->dlen -= n;
683                         /* partial block removal */
684                         body_amt = MIN(BHLEN(b), n);
685                         b->rp += body_amt;
686                         extra_amt = n - body_amt;
687                         /* reduce q->len by the amount we remove from the extras.  The
688                          * header will always be accounted for above, during block removal.
689                          * */
690                         q->len -= extra_amt;
691                         for (int i = 0; (i < b->nr_extra_bufs) && extra_amt; i++) {
692                                 ebd = &b->extra_data[i];
693                                 if (!ebd->base || !ebd->len)
694                                         continue;
695                                 if (extra_amt >= ebd->len) {
696                                         /* remove the entire entry, note the kfree release */
697                                         b->extra_len -= ebd->len;
698                                         extra_amt -= ebd->len;
699                                         kfree((void*)ebd->base);
700                                         ebd->base = ebd->off = ebd->len = 0;
701                                         continue;
702                                 }
703                                 ebd->off += extra_amt;
704                                 ebd->len -= extra_amt;
705                                 b->extra_len -= extra_amt;
706                                 extra_amt = 0;
707                         }
708                 }
709         }
710
711         /*
712          *  if writer flow controlled, restart
713          *
714          *  This used to be
715          *  q->len < q->limit/2
716          *  but it slows down tcp too much for certain write sizes.
717          *  I really don't understand it completely.  It may be
718          *  due to the queue draining so fast that the transmission
719          *  stalls waiting for the app to produce more data.  - presotto
720          */
721         if ((q->state & Qflow) && q->len < q->limit) {
722                 q->state &= ~Qflow;
723                 dowakeup = 1;
724         } else
725                 dowakeup = 0;
726
727         spin_unlock_irqsave(&q->lock);
728
729         if (dowakeup)
730                 rendez_wakeup(&q->wr);
731         qwake_cb(q, FDTAP_FILT_WRITABLE);
732
733         return sofar;
734 }
735
736 ssize_t qpass(struct queue *q, struct block *b)
737 {
738         return __qbwrite(q, b, QIO_LIMIT | QIO_DROP_OVERFLOW);
739 }
740
741 ssize_t qpassnolim(struct queue *q, struct block *b)
742 {
743         return __qbwrite(q, b, 0);
744 }
745
746 /*
747  *  if the allocated space is way out of line with the used
748  *  space, reallocate to a smaller block
749  */
750 struct block *packblock(struct block *bp)
751 {
752         struct block **l, *nbp;
753         int n;
754
755         if (bp->extra_len)
756                 return bp;
757         for (l = &bp; *l; l = &(*l)->next) {
758                 nbp = *l;
759                 n = BLEN(nbp);
760                 if ((n << 2) < BALLOC(nbp)) {
761                         *l = block_alloc(n, MEM_WAIT);
762                         memmove((*l)->wp, nbp->rp, n);
763                         (*l)->wp += n;
764                         (*l)->next = nbp->next;
765                         freeb(nbp);
766                 }
767         }
768
769         return bp;
770 }
771
772 /* Add an extra_data entry to newb at newb_idx pointing to b's body, starting at
773  * body_rp, for up to len.  Returns the len consumed. 
774  *
775  * The base is 'b', so that we can kfree it later.  This currently ties us to
776  * using kfree for the release method for all extra_data.
777  *
778  * It is possible to have a body size that is 0, if there is no offset, and
779  * b->wp == b->rp.  This will have an extra data entry of 0 length. */
780 static size_t point_to_body(struct block *b, uint8_t *body_rp,
781                             struct block *newb, unsigned int newb_idx,
782                             size_t len)
783 {
784         struct extra_bdata *ebd = &newb->extra_data[newb_idx];
785
786         assert(newb_idx < newb->nr_extra_bufs);
787
788         kmalloc_incref(b);
789         ebd->base = (uintptr_t)b;
790         ebd->off = (uint32_t)(body_rp - (uint8_t*)b);
791         ebd->len = MIN(b->wp - body_rp, len);   /* think of body_rp as b->rp */
792         assert((int)ebd->len >= 0);
793         newb->extra_len += ebd->len;
794         return ebd->len;
795 }
796
797 /* Add an extra_data entry to newb at newb_idx pointing to b's b_idx'th
798  * extra_data buf, at b_off within that buffer, for up to len.  Returns the len
799  * consumed.
800  *
801  * We can have blocks with 0 length, but they are still refcnt'd.  See above. */
802 static size_t point_to_buf(struct block *b, unsigned int b_idx, uint32_t b_off,
803                            struct block *newb, unsigned int newb_idx,
804                            size_t len)
805 {
806         struct extra_bdata *n_ebd = &newb->extra_data[newb_idx];
807         struct extra_bdata *b_ebd = &b->extra_data[b_idx];
808
809         assert(b_idx < b->nr_extra_bufs);
810         assert(newb_idx < newb->nr_extra_bufs);
811
812         kmalloc_incref((void*)b_ebd->base);
813         n_ebd->base = b_ebd->base;
814         n_ebd->off = b_ebd->off + b_off;
815         n_ebd->len = MIN(b_ebd->len - b_off, len);
816         newb->extra_len += n_ebd->len;
817         return n_ebd->len;
818 }
819
820 /* given a string of blocks, fills the new block's extra_data  with the contents
821  * of the blist [offset, len + offset)
822  *
823  * returns 0 on success.  the only failure is if the extra_data array was too
824  * small, so this returns a positive integer saying how big the extra_data needs
825  * to be.
826  *
827  * callers are responsible for protecting the list structure. */
828 static int __blist_clone_to(struct block *blist, struct block *newb, int len,
829                             uint32_t offset)
830 {
831         struct block *b, *first;
832         unsigned int nr_bufs = 0;
833         unsigned int b_idx, newb_idx = 0;
834         uint8_t *first_main_body = 0;
835
836         /* find the first block; keep offset relative to the latest b in the list */
837         for (b = blist; b; b = b->next) {
838                 if (BLEN(b) > offset)
839                         break;
840                 offset -= BLEN(b);
841         }
842         /* qcopy semantics: if you asked for an offset outside the block list, you
843          * get an empty block back */
844         if (!b)
845                 return 0;
846         first = b;
847         /* upper bound for how many buffers we'll need in newb */
848         for (/* b is set*/; b; b = b->next) {
849                 nr_bufs += 1 + b->nr_extra_bufs;        /* 1 for the main body */
850         }
851         /* we might be holding a spinlock here, so we won't wait for kmalloc */
852         if (block_add_extd(newb, nr_bufs, 0) != 0) {
853                 /* caller will need to alloc these, then re-call us */
854                 return nr_bufs;
855         }
856         for (b = first; b && len; b = b->next) {
857                 b_idx = 0;
858                 if (offset) {
859                         if (offset < BHLEN(b)) {
860                                 /* off is in the main body */
861                                 len -= point_to_body(b, b->rp + offset, newb, newb_idx, len);
862                                 newb_idx++;
863                         } else {
864                                 /* off is in one of the buffers (or just past the last one).
865                                  * we're not going to point to b's main body at all. */
866                                 offset -= BHLEN(b);
867                                 assert(b->extra_data);
868                                 /* assuming these extrabufs are packed, or at least that len
869                                  * isn't gibberish */
870                                 while (b->extra_data[b_idx].len <= offset) {
871                                         offset -= b->extra_data[b_idx].len;
872                                         b_idx++;
873                                 }
874                                 /* now offset is set to our offset in the b_idx'th buf */
875                                 len -= point_to_buf(b, b_idx, offset, newb, newb_idx, len);
876                                 newb_idx++;
877                                 b_idx++;
878                         }
879                         offset = 0;
880                 } else {
881                         len -= point_to_body(b, b->rp, newb, newb_idx, len);
882                         newb_idx++;
883                 }
884                 /* knock out all remaining bufs.  we only did one point_to_ op by now,
885                  * and any point_to_ could be our last if it consumed all of len. */
886                 for (int i = b_idx; (i < b->nr_extra_bufs) && len; i++) {
887                         len -= point_to_buf(b, i, 0, newb, newb_idx, len);
888                         newb_idx++;
889                 }
890         }
891         return 0;
892 }
893
894 struct block *blist_clone(struct block *blist, int header_len, int len,
895                           uint32_t offset)
896 {
897         int ret;
898         struct block *newb = block_alloc(header_len, MEM_WAIT);
899         do {
900                 ret = __blist_clone_to(blist, newb, len, offset);
901                 if (ret)
902                         block_add_extd(newb, ret, MEM_WAIT);
903         } while (ret);
904         return newb;
905 }
906
907 /* given a queue, makes a single block with header_len reserved space in the
908  * block main body, and the contents of [offset, len + offset) pointed to in the
909  * new blocks ext_data. */
910 struct block *qclone(struct queue *q, int header_len, int len, uint32_t offset)
911 {
912         int ret;
913         struct block *newb = block_alloc(header_len, MEM_WAIT);
914         /* the while loop should rarely be used: it would require someone
915          * concurrently adding to the queue. */
916         do {
917                 /* TODO: RCU: protecting the q list (b->next) (need read lock) */
918                 spin_lock_irqsave(&q->lock);
919                 ret = __blist_clone_to(q->bfirst, newb, len, offset);
920                 spin_unlock_irqsave(&q->lock);
921                 if (ret)
922                         block_add_extd(newb, ret, MEM_WAIT);
923         } while (ret);
924         return newb;
925 }
926
927 /*
928  *  copy from offset in the queue
929  */
930 struct block *qcopy_old(struct queue *q, int len, uint32_t offset)
931 {
932         int sofar;
933         int n;
934         struct block *b, *nb;
935         uint8_t *p;
936
937         nb = block_alloc(len, MEM_WAIT);
938
939         spin_lock_irqsave(&q->lock);
940
941         /* go to offset */
942         b = q->bfirst;
943         for (sofar = 0;; sofar += n) {
944                 if (b == NULL) {
945                         spin_unlock_irqsave(&q->lock);
946                         return nb;
947                 }
948                 n = BLEN(b);
949                 if (sofar + n > offset) {
950                         p = b->rp + offset - sofar;
951                         n -= offset - sofar;
952                         break;
953                 }
954                 QDEBUG checkb(b, "qcopy");
955                 b = b->next;
956         }
957
958         /* copy bytes from there */
959         for (sofar = 0; sofar < len;) {
960                 if (n > len - sofar)
961                         n = len - sofar;
962                 PANIC_EXTRA(b);
963                 memmove(nb->wp, p, n);
964                 qcopycnt += n;
965                 sofar += n;
966                 nb->wp += n;
967                 b = b->next;
968                 if (b == NULL)
969                         break;
970                 n = BLEN(b);
971                 p = b->rp;
972         }
973         spin_unlock_irqsave(&q->lock);
974
975         return nb;
976 }
977
978 struct block *qcopy(struct queue *q, int len, uint32_t offset)
979 {
980 #ifdef CONFIG_BLOCK_EXTRAS
981         return qclone(q, 0, len, offset);
982 #else
983         return qcopy_old(q, len, offset);
984 #endif
985 }
986
987 static void qinit_common(struct queue *q)
988 {
989         spinlock_init_irqsave(&q->lock);
990         rendez_init(&q->rr);
991         rendez_init(&q->wr);
992 }
993
994 /*
995  *  called by non-interrupt code
996  */
997 struct queue *qopen(int limit, int msg, void (*kick) (void *), void *arg)
998 {
999         struct queue *q;
1000
1001         q = kzmalloc(sizeof(struct queue), 0);
1002         if (q == 0)
1003                 return 0;
1004         qinit_common(q);
1005
1006         q->limit = q->inilim = limit;
1007         q->kick = kick;
1008         q->arg = arg;
1009         q->state = msg;
1010         q->state |= Qstarve;
1011         q->eof = 0;
1012
1013         return q;
1014 }
1015
1016 /* open a queue to be bypassed */
1017 struct queue *qbypass(void (*bypass) (void *, struct block *), void *arg)
1018 {
1019         struct queue *q;
1020
1021         q = kzmalloc(sizeof(struct queue), 0);
1022         if (q == 0)
1023                 return 0;
1024         qinit_common(q);
1025
1026         q->limit = 0;
1027         q->arg = arg;
1028         q->bypass = bypass;
1029         q->state = 0;
1030
1031         return q;
1032 }
1033
1034 static int notempty(void *a)
1035 {
1036         struct queue *q = a;
1037
1038         return (q->state & Qclosed) || q->bfirst != 0;
1039 }
1040
1041 /* Block, waiting for the queue to be non-empty or closed.  Returns with
1042  * the spinlock held.  Returns TRUE when there queue is not empty, FALSE if it
1043  * was naturally closed.  Throws an error o/w. */
1044 static bool qwait_and_ilock(struct queue *q)
1045 {
1046         while (1) {
1047                 spin_lock_irqsave(&q->lock);
1048                 if (q->bfirst != NULL)
1049                         return TRUE;
1050                 if (q->state & Qclosed) {
1051                         if (++q->eof > 3) {
1052                                 spin_unlock_irqsave(&q->lock);
1053                                 error(EFAIL, "multiple reads on a closed queue");
1054                         }
1055                         if (q->err[0]) {
1056                                 spin_unlock_irqsave(&q->lock);
1057                                 error(EFAIL, q->err);
1058                         }
1059                         return FALSE;
1060                 }
1061                 /* We set Qstarve regardless of whether we are non-blocking or not.
1062                  * Qstarve tracks the edge detection of the queue being empty. */
1063                 q->state |= Qstarve;
1064                 if (q->state & Qnonblock) {
1065                         spin_unlock_irqsave(&q->lock);
1066                         error(EAGAIN, "queue empty");
1067                 }
1068                 spin_unlock_irqsave(&q->lock);
1069                 /* may throw an error() */
1070                 rendez_sleep(&q->rr, notempty, q);
1071         }
1072 }
1073
1074 /*
1075  * add a block list to a queue
1076  */
1077 void qaddlist(struct queue *q, struct block *b)
1078 {
1079         /* TODO: q lock? */
1080         /* queue the block */
1081         if (q->bfirst)
1082                 q->blast->next = b;
1083         else
1084                 q->bfirst = b;
1085         q->len += blockalloclen(b);
1086         q->dlen += blocklen(b);
1087         while (b->next)
1088                 b = b->next;
1089         q->blast = b;
1090 }
1091
1092 /*
1093  *  called with q ilocked
1094  */
1095 static struct block *qremove(struct queue *q)
1096 {
1097         struct block *b;
1098
1099         b = q->bfirst;
1100         if (b == NULL)
1101                 return NULL;
1102         q->bfirst = b->next;
1103         b->next = NULL;
1104         q->dlen -= BLEN(b);
1105         q->len -= BALLOC(b);
1106         QDEBUG checkb(b, "qremove");
1107         return b;
1108 }
1109
1110 static size_t read_from_block(struct block *b, uint8_t *to, size_t amt)
1111 {
1112         size_t copy_amt, retval = 0;
1113         struct extra_bdata *ebd;
1114         
1115         copy_amt = MIN(BHLEN(b), amt);
1116         memcpy(to, b->rp, copy_amt);
1117         /* advance the rp, since this block not be completely consumed and future
1118          * reads need to know where to pick up from */
1119         b->rp += copy_amt;
1120         to += copy_amt;
1121         amt -= copy_amt;
1122         retval += copy_amt;
1123         for (int i = 0; (i < b->nr_extra_bufs) && amt; i++) {
1124                 ebd = &b->extra_data[i];
1125                 /* skip empty entires.  if we track this in the struct block, we can
1126                  * just start the for loop early */
1127                 if (!ebd->base || !ebd->len)
1128                         continue;
1129                 copy_amt = MIN(ebd->len, amt);
1130                 memcpy(to, (void*)(ebd->base + ebd->off), copy_amt);
1131                 /* we're actually consuming the entries, just like how we advance rp up
1132                  * above, and might only consume part of one. */
1133                 ebd->len -= copy_amt;
1134                 ebd->off += copy_amt;
1135                 b->extra_len -= copy_amt;
1136                 if (!ebd->len) {
1137                         /* we don't actually have to decref here.  it's also done in
1138                          * freeb().  this is the earliest we can free. */
1139                         kfree((void*)ebd->base);
1140                         ebd->base = ebd->off = 0;
1141                 }
1142                 to += copy_amt;
1143                 amt -= copy_amt;
1144                 retval += copy_amt;
1145         }
1146         return retval;
1147 }
1148
1149 /*
1150  *  copy the contents of a string of blocks into
1151  *  memory.  emptied blocks are freed.  return
1152  *  pointer to first unconsumed block.
1153  */
1154 struct block *bl2mem(uint8_t * p, struct block *b, int n)
1155 {
1156         int i;
1157         struct block *next;
1158
1159         /* could be slicker here, since read_from_block is smart */
1160         for (; b != NULL; b = next) {
1161                 i = BLEN(b);
1162                 if (i > n) {
1163                         /* partial block, consume some */
1164                         read_from_block(b, p, n);
1165                         return b;
1166                 }
1167                 /* full block, consume all and move on */
1168                 i = read_from_block(b, p, i);
1169                 n -= i;
1170                 p += i;
1171                 next = b->next;
1172                 freeb(b);
1173         }
1174         return NULL;
1175 }
1176
1177 /*
1178  *  copy the contents of memory into a string of blocks.
1179  *  return NULL on error.
1180  */
1181 struct block *mem2bl(uint8_t * p, int len)
1182 {
1183         ERRSTACK(1);
1184         int n;
1185         struct block *b, *first, **l;
1186
1187         first = NULL;
1188         l = &first;
1189         if (waserror()) {
1190                 freeblist(first);
1191                 nexterror();
1192         }
1193         do {
1194                 n = len;
1195                 if (n > Maxatomic)
1196                         n = Maxatomic;
1197
1198                 *l = b = block_alloc(n, MEM_WAIT);
1199                 /* TODO consider extra_data */
1200                 memmove(b->wp, p, n);
1201                 b->wp += n;
1202                 p += n;
1203                 len -= n;
1204                 l = &b->next;
1205         } while (len > 0);
1206         poperror();
1207
1208         return first;
1209 }
1210
1211 /*
1212  *  put a block back to the front of the queue
1213  *  called with q ilocked
1214  */
1215 void qputback(struct queue *q, struct block *b)
1216 {
1217         b->next = q->bfirst;
1218         if (q->bfirst == NULL)
1219                 q->blast = b;
1220         q->bfirst = b;
1221         q->len += BALLOC(b);
1222         q->dlen += BLEN(b);
1223 }
1224
1225 /*
1226  *  flow control, get producer going again
1227  *  called with q ilocked
1228  */
1229 static void qwakeup_iunlock(struct queue *q)
1230 {
1231         int dowakeup = 0;
1232
1233         /* if writer flow controlled, restart */
1234         if ((q->state & Qflow) && q->len < q->limit / 2) {
1235                 q->state &= ~Qflow;
1236                 dowakeup = 1;
1237         }
1238
1239         spin_unlock_irqsave(&q->lock);
1240
1241         /* wakeup flow controlled writers */
1242         if (dowakeup) {
1243                 if (q->kick)
1244                         q->kick(q->arg);
1245                 rendez_wakeup(&q->wr);
1246         }
1247         qwake_cb(q, FDTAP_FILT_WRITABLE);
1248 }
1249
1250 /*
1251  *  get next block from a queue (up to a limit)
1252  */
1253 struct block *qbread(struct queue *q, int len)
1254 {
1255         struct block *b, *nb;
1256         int n;
1257
1258         if (!qwait_and_ilock(q)) {
1259                 /* queue closed */
1260                 spin_unlock_irqsave(&q->lock);
1261                 return NULL;
1262         }
1263
1264         /* if we get here, there's at least one block in the queue */
1265         b = qremove(q);
1266         n = BLEN(b);
1267
1268         /* split block if it's too big and this is not a message queue */
1269         nb = b;
1270         if (n > len) {
1271                 PANIC_EXTRA(b);
1272                 if ((q->state & Qmsg) == 0) {
1273                         n -= len;
1274                         b = block_alloc(n, MEM_WAIT);
1275                         memmove(b->wp, nb->rp + len, n);
1276                         b->wp += n;
1277                         qputback(q, b);
1278                 }
1279                 nb->wp = nb->rp + len;
1280         }
1281
1282         /* restart producer */
1283         qwakeup_iunlock(q);
1284
1285         return nb;
1286 }
1287
1288 /*
1289  *  read a queue.  if no data is queued, post a struct block
1290  *  and wait on its Rendez.
1291  */
1292 long qread(struct queue *q, void *vp, int len)
1293 {
1294         struct block *b, *first, **l;
1295         int m, n;
1296
1297 again:
1298         if (!qwait_and_ilock(q)) {
1299                 /* queue closed */
1300                 spin_unlock_irqsave(&q->lock);
1301                 return 0;
1302         }
1303
1304         /* if we get here, there's at least one block in the queue */
1305         // TODO: Consider removing the Qcoalesce flag and force a coalescing
1306         // strategy by default.
1307         if (q->state & Qcoalesce) {
1308                 /* when coalescing, 0 length blocks just go away */
1309                 b = q->bfirst;
1310                 if (BLEN(b) <= 0) {
1311                         freeb(qremove(q));
1312                         spin_unlock_irqsave(&q->lock);
1313                         goto again;
1314                 }
1315
1316                 /*  grab the first block plus as many
1317                  *  following blocks as will completely
1318                  *  fit in the read.
1319                  */
1320                 n = 0;
1321                 l = &first;
1322                 m = BLEN(b);
1323                 for (;;) {
1324                         *l = qremove(q);
1325                         l = &b->next;
1326                         n += m;
1327
1328                         b = q->bfirst;
1329                         if (b == NULL)
1330                                 break;
1331                         m = BLEN(b);
1332                         if (n + m > len)
1333                                 break;
1334                 }
1335         } else {
1336                 first = qremove(q);
1337                 n = BLEN(first);
1338         }
1339         b = bl2mem(vp, first, len);
1340         /* take care of any left over partial block */
1341         if (b != NULL) {
1342                 n -= BLEN(b);
1343                 if (q->state & Qmsg)
1344                         freeb(b);
1345                 else
1346                         qputback(q, b);
1347         }
1348
1349         /* restart producer */
1350         qwakeup_iunlock(q);
1351
1352         return n;
1353 }
1354
1355 static int qnotfull(void *a)
1356 {
1357         struct queue *q = a;
1358
1359         return q->len < q->limit || (q->state & Qclosed);
1360 }
1361
1362 /* Helper: enqueues a list of blocks to a queue.  Returns the total length. */
1363 static size_t enqueue_blist(struct queue *q, struct block *b)
1364 {
1365         size_t len, dlen;
1366
1367         if (q->bfirst)
1368                 q->blast->next = b;
1369         else
1370                 q->bfirst = b;
1371         len = BALLOC(b);
1372         dlen = BLEN(b);
1373         while (b->next) {
1374                 b = b->next;
1375                 len += BALLOC(b);
1376                 dlen += BLEN(b);
1377         }
1378         q->blast = b;
1379         q->len += len;
1380         q->dlen += dlen;
1381         return dlen;
1382 }
1383
1384 /* Adds block (which can be a list of blocks) to the queue, subject to
1385  * qio_flags.  Returns the length written on success or -1 on non-throwable
1386  * error.  Adjust qio_flags to control the value-added features!. */
1387 static ssize_t __qbwrite(struct queue *q, struct block *b, int qio_flags)
1388 {
1389         ssize_t ret;
1390         bool dowakeup = FALSE;
1391         bool was_empty;
1392
1393         if (q->bypass) {
1394                 (*q->bypass) (q->arg, b);
1395                 return blocklen(b);
1396         }
1397         spin_lock_irqsave(&q->lock);
1398         was_empty = q->len == 0;
1399         if (q->state & Qclosed) {
1400                 spin_unlock_irqsave(&q->lock);
1401                 freeblist(b);
1402                 if (!(qio_flags & QIO_CAN_ERR_SLEEP))
1403                         return -1;
1404                 if (q->err[0])
1405                         error(EFAIL, q->err);
1406                 else
1407                         error(EFAIL, "connection closed");
1408         }
1409         if ((qio_flags & QIO_LIMIT) && (q->len >= q->limit)) {
1410                 /* drop overflow takes priority over regular non-blocking */
1411                 if ((qio_flags & QIO_DROP_OVERFLOW) || (q->state & Qdropoverflow)) {
1412                         spin_unlock_irqsave(&q->lock);
1413                         freeb(b);
1414                         return -1;
1415                 }
1416                 if ((qio_flags & QIO_CAN_ERR_SLEEP) && (q->state & Qnonblock)) {
1417                         spin_unlock_irqsave(&q->lock);
1418                         freeb(b);
1419                         error(EAGAIN, "queue full");
1420                 }
1421         }
1422         ret = enqueue_blist(q, b);
1423         QDEBUG checkb(b, "__qbwrite");
1424         /* make sure other end gets awakened */
1425         if (q->state & Qstarve) {
1426                 q->state &= ~Qstarve;
1427                 dowakeup = TRUE;
1428         }
1429         spin_unlock_irqsave(&q->lock);
1430         /* TODO: not sure if the usage of a kick is mutually exclusive with a
1431          * wakeup, meaning that actual users either want a kick or have qreaders. */
1432         if (q->kick && (dowakeup || (q->state & Qkick)))
1433                 q->kick(q->arg);
1434         if (dowakeup)
1435                 rendez_wakeup(&q->rr);
1436         if (was_empty)
1437                 qwake_cb(q, FDTAP_FILT_READABLE);
1438         /*
1439          *  flow control, wait for queue to get below the limit
1440          *  before allowing the process to continue and queue
1441          *  more.  We do this here so that postnote can only
1442          *  interrupt us after the data has been queued.  This
1443          *  means that things like 9p flushes and ssl messages
1444          *  will not be disrupted by software interrupts.
1445          *
1446          *  Note - this is moderately dangerous since a process
1447          *  that keeps getting interrupted and rewriting will
1448          *  queue infinite crud.
1449          */
1450         if ((qio_flags & QIO_CAN_ERR_SLEEP) &&
1451             !(q->state & (Qdropoverflow | Qnonblock))) {
1452                 /* This is a racy peek at the q status.  If we accidentally block, we
1453                  * set Qflow, so someone should wake us.  If we accidentally don't
1454                  * block, we just returned to the user and let them slip a block past
1455                  * flow control. */
1456                 while (!qnotfull(q)) {
1457                         spin_lock_irqsave(&q->lock);
1458                         q->state |= Qflow;
1459                         spin_unlock_irqsave(&q->lock);
1460                         rendez_sleep(&q->wr, qnotfull, q);
1461                 }
1462         }
1463         return ret;
1464 }
1465
1466 /*
1467  *  add a block to a queue obeying flow control
1468  */
1469 ssize_t qbwrite(struct queue *q, struct block *b)
1470 {
1471         return __qbwrite(q, b, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
1472 }
1473
1474 ssize_t qibwrite(struct queue *q, struct block *b)
1475 {
1476         return __qbwrite(q, b, 0);
1477 }
1478
1479 /* Helper, allocs a block and copies [from, from + len) into it.  Returns the
1480  * block on success, 0 on failure. */
1481 static struct block *build_block(void *from, size_t len, int mem_flags)
1482 {
1483         struct block *b;
1484         void *ext_buf;
1485
1486         /* If len is small, we don't need to bother with the extra_data.  But until
1487          * the whole stack can handle extd blocks, we'll use them unconditionally.
1488          * */
1489 #ifdef CONFIG_BLOCK_EXTRAS
1490         /* allocb builds in 128 bytes of header space to all blocks, but this is
1491          * only available via padblock (to the left).  we also need some space
1492          * for pullupblock for some basic headers (like icmp) that get written
1493          * in directly */
1494         b = block_alloc(64, mem_flags);
1495         if (!b)
1496                 return 0;
1497         ext_buf = kmalloc(len, mem_flags);
1498         if (!ext_buf) {
1499                 kfree(b);
1500                 return 0;
1501         }
1502         memcpy(ext_buf, from, len);
1503         if (block_add_extd(b, 1, mem_flags)) {
1504                 kfree(ext_buf);
1505                 kfree(b);
1506                 return 0;
1507         }
1508         b->extra_data[0].base = (uintptr_t)ext_buf;
1509         b->extra_data[0].off = 0;
1510         b->extra_data[0].len = len;
1511         b->extra_len += len;
1512 #else
1513         b = block_alloc(n, mem_flags);
1514         if (!b)
1515                 return 0;
1516         memmove(b->wp, from, len);
1517         b->wp += len;
1518 #endif
1519         return b;
1520 }
1521
1522 static ssize_t __qwrite(struct queue *q, void *vp, size_t len, int mem_flags,
1523                         int qio_flags)
1524 {
1525         size_t n, sofar;
1526         struct block *b;
1527         uint8_t *p = vp;
1528         void *ext_buf;
1529
1530         sofar = 0;
1531         do {
1532                 n = len - sofar;
1533                 /* This is 64K, the max amount per single block.  Still a good value? */
1534                 if (n > Maxatomic)
1535                         n = Maxatomic;
1536                 b = build_block(p + sofar, n, mem_flags);
1537                 if (!b)
1538                         break;
1539                 if (__qbwrite(q, b, qio_flags) < 0)
1540                         break;
1541                 sofar += n;
1542         } while ((sofar < len) && (q->state & Qmsg) == 0);
1543         return sofar;
1544 }
1545
1546 ssize_t qwrite(struct queue *q, void *vp, int len)
1547 {
1548         return __qwrite(q, vp, len, MEM_WAIT, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
1549 }
1550
1551 ssize_t qiwrite(struct queue *q, void *vp, int len)
1552 {
1553         return __qwrite(q, vp, len, MEM_ATOMIC, 0);
1554 }
1555
1556 /*
1557  *  be extremely careful when calling this,
1558  *  as there is no reference accounting
1559  */
1560 void qfree(struct queue *q)
1561 {
1562         qclose(q);
1563         kfree(q);
1564 }
1565
1566 /*
1567  *  Mark a queue as closed.  No further IO is permitted.
1568  *  All blocks are released.
1569  */
1570 void qclose(struct queue *q)
1571 {
1572         struct block *bfirst;
1573
1574         if (q == NULL)
1575                 return;
1576
1577         /* mark it */
1578         spin_lock_irqsave(&q->lock);
1579         q->state |= Qclosed;
1580         q->state &= ~(Qflow | Qstarve | Qdropoverflow | Qnonblock);
1581         q->err[0] = 0;
1582         bfirst = q->bfirst;
1583         q->bfirst = 0;
1584         q->len = 0;
1585         q->dlen = 0;
1586         spin_unlock_irqsave(&q->lock);
1587
1588         /* free queued blocks */
1589         freeblist(bfirst);
1590
1591         /* wake up readers/writers */
1592         rendez_wakeup(&q->rr);
1593         rendez_wakeup(&q->wr);
1594         qwake_cb(q, FDTAP_FILT_HANGUP);
1595 }
1596
1597 /* Mark a queue as closed.  Wakeup any readers.  Don't remove queued blocks.
1598  *
1599  * msg will be the errstr received by any waiters (qread, qbread, etc).  If
1600  * there is no message, which is what also happens during a natural qclose(),
1601  * those waiters will simply return 0.  qwriters will always error() on a
1602  * closed/hungup queue. */
1603 void qhangup(struct queue *q, char *msg)
1604 {
1605         /* mark it */
1606         spin_lock_irqsave(&q->lock);
1607         q->state |= Qclosed;
1608         if (msg == 0 || *msg == 0)
1609                 q->err[0] = 0;
1610         else
1611                 strlcpy(q->err, msg, ERRMAX);
1612         spin_unlock_irqsave(&q->lock);
1613
1614         /* wake up readers/writers */
1615         rendez_wakeup(&q->rr);
1616         rendez_wakeup(&q->wr);
1617         qwake_cb(q, FDTAP_FILT_HANGUP);
1618 }
1619
1620 /*
1621  *  return non-zero if the q is hungup
1622  */
1623 int qisclosed(struct queue *q)
1624 {
1625         return q->state & Qclosed;
1626 }
1627
1628 /*
1629  *  mark a queue as no longer hung up.  resets the wake_cb.
1630  */
1631 void qreopen(struct queue *q)
1632 {
1633         spin_lock_irqsave(&q->lock);
1634         q->state &= ~Qclosed;
1635         q->state |= Qstarve;
1636         q->eof = 0;
1637         q->limit = q->inilim;
1638         q->wake_cb = 0;
1639         q->wake_data = 0;
1640         spin_unlock_irqsave(&q->lock);
1641 }
1642
1643 /*
1644  *  return bytes queued
1645  */
1646 int qlen(struct queue *q)
1647 {
1648         return q->dlen;
1649 }
1650
1651 /*
1652  * return space remaining before flow control
1653  */
1654 int qwindow(struct queue *q)
1655 {
1656         int l;
1657
1658         l = q->limit - q->len;
1659         if (l < 0)
1660                 l = 0;
1661         return l;
1662 }
1663
1664 /*
1665  *  return true if we can read without blocking
1666  */
1667 int qcanread(struct queue *q)
1668 {
1669         return q->bfirst != 0;
1670 }
1671
1672 /*
1673  *  change queue limit
1674  */
1675 void qsetlimit(struct queue *q, int limit)
1676 {
1677         q->limit = limit;
1678 }
1679
1680 /*
1681  *  set whether writes drop overflowing blocks, or if we sleep
1682  */
1683 void qdropoverflow(struct queue *q, bool onoff)
1684 {
1685         if (onoff)
1686                 q->state |= Qdropoverflow;
1687         else
1688                 q->state &= ~Qdropoverflow;
1689 }
1690
1691 /* set whether or not the queue is nonblocking, in the EAGAIN sense. */
1692 void qnonblock(struct queue *q, bool onoff)
1693 {
1694         if (onoff)
1695                 q->state |= Qnonblock;
1696         else
1697                 q->state &= ~Qnonblock;
1698 }
1699
1700 /*
1701  *  flush the output queue
1702  */
1703 void qflush(struct queue *q)
1704 {
1705         struct block *bfirst;
1706
1707         /* mark it */
1708         spin_lock_irqsave(&q->lock);
1709         bfirst = q->bfirst;
1710         q->bfirst = 0;
1711         q->len = 0;
1712         q->dlen = 0;
1713         spin_unlock_irqsave(&q->lock);
1714
1715         /* free queued blocks */
1716         freeblist(bfirst);
1717
1718         /* wake up writers */
1719         rendez_wakeup(&q->wr);
1720         qwake_cb(q, FDTAP_FILT_WRITABLE);
1721 }
1722
1723 int qfull(struct queue *q)
1724 {
1725         return q->len >= q->limit;
1726 }
1727
1728 int qstate(struct queue *q)
1729 {
1730         return q->state;
1731 }
1732
1733 void qdump(struct queue *q)
1734 {
1735         if (q)
1736                 printk("q=%p bfirst=%p blast=%p len=%d dlen=%d limit=%d state=#%x\n",
1737                            q, q->bfirst, q->blast, q->len, q->dlen, q->limit, q->state);
1738 }
1739
1740 /* On certain wakeup events, qio will call func(q, data, filter), where filter
1741  * marks the type of wakeup event (flags from FDTAP).
1742  *
1743  * There's no sync protection.  If you change the CB while the qio is running,
1744  * you might get a CB with the data or func from a previous set_wake_cb.  You
1745  * should set this once per queue and forget it.
1746  *
1747  * You can remove the CB by passing in 0 for the func.  Alternatively, you can
1748  * just make sure that the func(data) pair are valid until the queue is freed or
1749  * reopened. */
1750 void qio_set_wake_cb(struct queue *q, qio_wake_cb_t func, void *data)
1751 {
1752         q->wake_data = data;
1753         wmb();  /* if we see func, we'll also see the data for it */
1754         q->wake_cb = func;
1755 }