Pass the buf to block_append_extra()
[akaros.git] / kern / src / ns / qio.c
1 /* Copyright © 1994-1999 Lucent Technologies Inc.  All rights reserved.
2  * Portions Copyright © 1997-1999 Vita Nuova Limited
3  * Portions Copyright © 2000-2007 Vita Nuova Holdings Limited
4  *                                (www.vitanuova.com)
5  * Revisions Copyright © 2000-2007 Lucent Technologies Inc. and others
6  *
7  * Modified for the Akaros operating system:
8  * Copyright (c) 2013-2014 The Regents of the University of California
9  * Copyright (c) 2013-2015 Google Inc.
10  *
11  * Permission is hereby granted, free of charge, to any person obtaining a copy
12  * of this software and associated documentation files (the "Software"), to deal
13  * in the Software without restriction, including without limitation the rights
14  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15  * copies of the Software, and to permit persons to whom the Software is
16  * furnished to do so, subject to the following conditions:
17  *
18  * The above copyright notice and this permission notice shall be included in
19  * all copies or substantial portions of the Software.
20  *
21  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL THE
24  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
27  * SOFTWARE. */
28
29 #include <vfs.h>
30 #include <kfs.h>
31 #include <slab.h>
32 #include <kmalloc.h>
33 #include <kref.h>
34 #include <string.h>
35 #include <stdio.h>
36 #include <assert.h>
37 #include <error.h>
38 #include <cpio.h>
39 #include <pmap.h>
40 #include <smp.h>
41 #include <ip.h>
42
43 #define PANIC_EXTRA(b)                                                  \
44 {                                                                       \
45         if ((b)->extra_len) {                                           \
46                 printblock(b);                                          \
47                 backtrace();                                            \
48                 panic("%s doesn't handle extra_data", __FUNCTION__);    \
49         }                                                               \
50 }
51
52 static uint32_t padblockcnt;
53 static uint32_t concatblockcnt;
54 static uint32_t pullupblockcnt;
55 static uint32_t copyblockcnt;
56 static uint32_t consumecnt;
57 static uint32_t producecnt;
58 static uint32_t qcopycnt;
59
60 static int debugging;
61
62 #define QDEBUG  if(0)
63
64 /*
65  *  IO queues
66  */
67
68 struct queue {
69         spinlock_t lock;;
70
71         struct block *bfirst;           /* buffer */
72         struct block *blast;
73
74         int len;                                        /* bytes allocated to queue */
75         int dlen;                                       /* data bytes in queue */
76         int limit;                                      /* max bytes in queue */
77         int inilim;                             /* initial limit */
78         int state;
79         int eof;                                        /* number of eofs read by user */
80
81         void (*kick) (void *);          /* restart output */
82         void (*bypass) (void *, struct block *);        /* bypass queue altogether */
83         void *arg;                                      /* argument to kick */
84
85         struct rendez rr;                       /* process waiting to read */
86         struct rendez wr;                       /* process waiting to write */
87         qio_wake_cb_t wake_cb;          /* callbacks for qio wakeups */
88         void *wake_data;
89
90         char err[ERRMAX];
91 };
92
93 enum {
94         Maxatomic = 64 * 1024,
95         QIO_CAN_ERR_SLEEP = (1 << 0),   /* can throw errors or block/sleep */
96         QIO_LIMIT = (1 << 1),                   /* respect q->limit */
97         QIO_DROP_OVERFLOW = (1 << 2),   /* alternative to setting qdropoverflow */
98 };
99
100 unsigned int qiomaxatomic = Maxatomic;
101
102 static ssize_t __qbwrite(struct queue *q, struct block *b, int flags);
103
104 /* Helper: fires a wake callback, sending 'filter' */
105 static void qwake_cb(struct queue *q, int filter)
106 {
107         if (q->wake_cb)
108                 q->wake_cb(q, q->wake_data, filter);
109 }
110
111 void ixsummary(void)
112 {
113         debugging ^= 1;
114         printd("pad %lu, concat %lu, pullup %lu, copy %lu\n",
115                    padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
116         printd("consume %lu, produce %lu, qcopy %lu\n",
117                    consumecnt, producecnt, qcopycnt);
118 }
119
120 /*
121  *  free a list of blocks
122  */
123 void freeblist(struct block *b)
124 {
125         struct block *next;
126
127         for (; b != 0; b = next) {
128                 next = b->next;
129                 b->next = 0;
130                 freeb(b);
131         }
132 }
133
134 /*
135  *  pad a block to the front (or the back if size is negative)
136  */
137 struct block *padblock(struct block *bp, int size)
138 {
139         int n;
140         struct block *nbp;
141         uint8_t bcksum = bp->flag & BCKSUM_FLAGS;
142         uint16_t checksum_start = bp->checksum_start;
143         uint16_t checksum_offset = bp->checksum_offset;
144         uint16_t mss = bp->mss;
145
146         QDEBUG checkb(bp, "padblock 1");
147         if (size >= 0) {
148                 if (bp->rp - bp->base >= size) {
149                         bp->checksum_start += size;
150                         bp->rp -= size;
151                         return bp;
152                 }
153
154                 PANIC_EXTRA(bp);
155                 if (bp->next)
156                         panic("padblock %p", getcallerpc(&bp));
157                 n = BLEN(bp);
158                 padblockcnt++;
159                 nbp = block_alloc(size + n, MEM_WAIT);
160                 nbp->rp += size;
161                 nbp->wp = nbp->rp;
162                 memmove(nbp->wp, bp->rp, n);
163                 nbp->wp += n;
164                 freeb(bp);
165                 nbp->rp -= size;
166         } else {
167                 size = -size;
168
169                 PANIC_EXTRA(bp);
170
171                 if (bp->next)
172                         panic("padblock %p", getcallerpc(&bp));
173
174                 if (bp->lim - bp->wp >= size)
175                         return bp;
176
177                 n = BLEN(bp);
178                 padblockcnt++;
179                 nbp = block_alloc(size + n, MEM_WAIT);
180                 memmove(nbp->wp, bp->rp, n);
181                 nbp->wp += n;
182                 freeb(bp);
183         }
184         if (bcksum) {
185                 nbp->flag |= bcksum;
186                 nbp->checksum_start = checksum_start;
187                 nbp->checksum_offset = checksum_offset;
188                 nbp->mss = mss;
189         }
190         QDEBUG checkb(nbp, "padblock 1");
191         return nbp;
192 }
193
194 /*
195  *  return count of bytes in a string of blocks
196  */
197 int blocklen(struct block *bp)
198 {
199         int len;
200
201         len = 0;
202         while (bp) {
203                 len += BLEN(bp);
204                 bp = bp->next;
205         }
206         return len;
207 }
208
209 /*
210  * return count of space in blocks
211  */
212 int blockalloclen(struct block *bp)
213 {
214         int len;
215
216         len = 0;
217         while (bp) {
218                 len += BALLOC(bp);
219                 bp = bp->next;
220         }
221         return len;
222 }
223
224 /*
225  *  copy the  string of blocks into
226  *  a single block and free the string
227  */
228 struct block *concatblock(struct block *bp)
229 {
230         int len;
231         struct block *nb, *f;
232
233         if (bp->next == 0)
234                 return bp;
235
236         /* probably use parts of qclone */
237         PANIC_EXTRA(bp);
238         nb = block_alloc(blocklen(bp), MEM_WAIT);
239         for (f = bp; f; f = f->next) {
240                 len = BLEN(f);
241                 memmove(nb->wp, f->rp, len);
242                 nb->wp += len;
243         }
244         concatblockcnt += BLEN(nb);
245         freeblist(bp);
246         QDEBUG checkb(nb, "concatblock 1");
247         return nb;
248 }
249
250 /* Returns a block with the remaining contents of b all in the main body of the
251  * returned block.  Replace old references to b with the returned value (which
252  * may still be 'b', if no change was needed. */
253 struct block *linearizeblock(struct block *b)
254 {
255         struct block *newb;
256         size_t len;
257         struct extra_bdata *ebd;
258
259         if (!b->extra_len)
260                 return b;
261
262         newb = block_alloc(BLEN(b), MEM_WAIT);
263         len = BHLEN(b);
264         memcpy(newb->wp, b->rp, len);
265         newb->wp += len;
266         len = b->extra_len;
267         for (int i = 0; (i < b->nr_extra_bufs) && len; i++) {
268                 ebd = &b->extra_data[i];
269                 if (!ebd->base || !ebd->len)
270                         continue;
271                 memcpy(newb->wp, (void*)(ebd->base + ebd->off), ebd->len);
272                 newb->wp += ebd->len;
273                 len -= ebd->len;
274         }
275         /* TODO: any other flags that need copied over? */
276         if (b->flag & BCKSUM_FLAGS) {
277                 newb->flag |= (b->flag & BCKSUM_FLAGS);
278                 newb->checksum_start = b->checksum_start;
279                 newb->checksum_offset = b->checksum_offset;
280                 newb->mss = b->mss;
281         }
282         freeb(b);
283         return newb;
284 }
285
286 /*
287  *  make sure the first block has at least n bytes in its main body
288  */
289 struct block *pullupblock(struct block *bp, int n)
290 {
291         int i, len, seglen;
292         struct block *nbp;
293         struct extra_bdata *ebd;
294
295         /*
296          *  this should almost always be true, it's
297          *  just to avoid every caller checking.
298          */
299         if (BHLEN(bp) >= n)
300                 return bp;
301
302          /* a start at explicit main-body / header management */
303         if (bp->extra_len) {
304                 if (n > bp->lim - bp->rp) {
305                         /* would need to realloc a new block and copy everything over. */
306                         panic("can't pullup %d bytes, no place to put it: bp->lim %p, bp->rp %p, bp->lim-bp->rp %d\n",
307                                         n, bp->lim, bp->rp, bp->lim-bp->rp);
308                 }
309                 len = n - BHLEN(bp);
310                 if (len > bp->extra_len)
311                         panic("pullup more than extra (%d, %d, %d)\n",
312                               n, BHLEN(bp), bp->extra_len);
313                 checkb(bp, "before pullup");
314                 for (int i = 0; (i < bp->nr_extra_bufs) && len; i++) {
315                         ebd = &bp->extra_data[i];
316                         if (!ebd->base || !ebd->len)
317                                 continue;
318                         seglen = MIN(ebd->len, len);
319                         memcpy(bp->wp, (void*)(ebd->base + ebd->off), seglen);
320                         bp->wp += seglen;
321                         len -= seglen;
322                         ebd->len -= seglen;
323                         ebd->off += seglen;
324                         bp->extra_len -= seglen;
325                         if (ebd->len == 0) {
326                                 kfree((void *)ebd->base);
327                                 ebd->off = 0;
328                                 ebd->base = 0;
329                         }
330                 }
331                 /* maybe just call pullupblock recursively here */
332                 if (len)
333                         panic("pullup %d bytes overdrawn\n", len);
334                 checkb(bp, "after pullup");
335                 return bp;
336         }
337
338         /*
339          *  if not enough room in the first block,
340          *  add another to the front of the list.
341          */
342         if (bp->lim - bp->rp < n) {
343                 nbp = block_alloc(n, MEM_WAIT);
344                 nbp->next = bp;
345                 bp = nbp;
346         }
347
348         /*
349          *  copy bytes from the trailing blocks into the first
350          */
351         n -= BLEN(bp);
352         while ((nbp = bp->next)) {
353                 i = BLEN(nbp);
354                 if (i > n) {
355                         memmove(bp->wp, nbp->rp, n);
356                         pullupblockcnt++;
357                         bp->wp += n;
358                         nbp->rp += n;
359                         QDEBUG checkb(bp, "pullupblock 1");
360                         return bp;
361                 } else {
362                         memmove(bp->wp, nbp->rp, i);
363                         pullupblockcnt++;
364                         bp->wp += i;
365                         bp->next = nbp->next;
366                         nbp->next = 0;
367                         freeb(nbp);
368                         n -= i;
369                         if (n == 0) {
370                                 QDEBUG checkb(bp, "pullupblock 2");
371                                 return bp;
372                         }
373                 }
374         }
375         freeb(bp);
376         return 0;
377 }
378
379 /*
380  *  make sure the first block has at least n bytes in its main body
381  */
382 struct block *pullupqueue(struct queue *q, int n)
383 {
384         struct block *b;
385
386         /* TODO: lock to protect the queue links? */
387         if ((BHLEN(q->bfirst) >= n))
388                 return q->bfirst;
389         q->bfirst = pullupblock(q->bfirst, n);
390         for (b = q->bfirst; b != NULL && b->next != NULL; b = b->next) ;
391         q->blast = b;
392         return q->bfirst;
393 }
394
395 /* throw away count bytes from the front of
396  * block's extradata.  Returns count of bytes
397  * thrown away
398  */
399
400 static int pullext(struct block *bp, int count)
401 {
402         struct extra_bdata *ed;
403         int i, rem, bytes = 0;
404
405         for (i = 0; bp->extra_len && count && i < bp->nr_extra_bufs; i++) {
406                 ed = &bp->extra_data[i];
407                 rem = MIN(count, ed->len);
408                 bp->extra_len -= rem;
409                 count -= rem;
410                 bytes += rem;
411                 ed->off += rem;
412                 ed->len -= rem;
413                 if (ed->len == 0) {
414                         kfree((void *)ed->base);
415                         ed->base = 0;
416                         ed->off = 0;
417                 }
418         }
419         return bytes;
420 }
421
422 /* throw away count bytes from the end of a
423  * block's extradata.  Returns count of bytes
424  * thrown away
425  */
426
427 static int dropext(struct block *bp, int count)
428 {
429         struct extra_bdata *ed;
430         int i, rem, bytes = 0;
431
432         for (i = bp->nr_extra_bufs - 1; bp->extra_len && count && i >= 0; i--) {
433                 ed = &bp->extra_data[i];
434                 rem = MIN(count, ed->len);
435                 bp->extra_len -= rem;
436                 count -= rem;
437                 bytes += rem;
438                 ed->len -= rem;
439                 if (ed->len == 0) {
440                         kfree((void *)ed->base);
441                         ed->base = 0;
442                         ed->off = 0;
443                 }
444         }
445         return bytes;
446 }
447
448 /*
449  *  throw away up to count bytes from a
450  *  list of blocks.  Return count of bytes
451  *  thrown away.
452  */
453 static int _pullblock(struct block **bph, int count, int free)
454 {
455         struct block *bp;
456         int n, bytes;
457
458         bytes = 0;
459         if (bph == NULL)
460                 return 0;
461
462         while (*bph != NULL && count != 0) {
463                 bp = *bph;
464
465                 n = MIN(BHLEN(bp), count);
466                 bytes += n;
467                 count -= n;
468                 bp->rp += n;
469                 n = pullext(bp, count);
470                 bytes += n;
471                 count -= n;
472                 QDEBUG checkb(bp, "pullblock ");
473                 if (BLEN(bp) == 0 && (free || count)) {
474                         *bph = bp->next;
475                         bp->next = NULL;
476                         freeb(bp);
477                 }
478         }
479         return bytes;
480 }
481
482 int pullblock(struct block **bph, int count)
483 {
484         return _pullblock(bph, count, 1);
485 }
486
487 /*
488  *  trim to len bytes starting at offset
489  */
490 struct block *trimblock(struct block *bp, int offset, int len)
491 {
492         uint32_t l, trim;
493         int olen = len;
494
495         QDEBUG checkb(bp, "trimblock 1");
496         if (blocklen(bp) < offset + len) {
497                 freeblist(bp);
498                 return NULL;
499         }
500
501         l =_pullblock(&bp, offset, 0);
502         if (bp == NULL)
503                 return NULL;
504         if (l != offset) {
505                 freeblist(bp);
506                 return NULL;
507         }
508
509         while ((l = BLEN(bp)) < len) {
510                 len -= l;
511                 bp = bp->next;
512         }
513
514         trim = BLEN(bp) - len;
515         trim -= dropext(bp, trim);
516         bp->wp -= trim;
517
518         if (bp->next) {
519                 freeblist(bp->next);
520                 bp->next = NULL;
521         }
522         return bp;
523 }
524
525 /*
526  *  copy 'count' bytes into a new block
527  */
528 struct block *copyblock(struct block *bp, int count)
529 {
530         int l;
531         struct block *nbp;
532
533         QDEBUG checkb(bp, "copyblock 0");
534         nbp = block_alloc(count, MEM_WAIT);
535         if (bp->flag & BCKSUM_FLAGS) {
536                 nbp->flag |= (bp->flag & BCKSUM_FLAGS);
537                 nbp->checksum_start = bp->checksum_start;
538                 nbp->checksum_offset = bp->checksum_offset;
539                 nbp->mss = bp->mss;
540         }
541         PANIC_EXTRA(bp);
542         for (; count > 0 && bp != 0; bp = bp->next) {
543                 l = BLEN(bp);
544                 if (l > count)
545                         l = count;
546                 memmove(nbp->wp, bp->rp, l);
547                 nbp->wp += l;
548                 count -= l;
549         }
550         if (count > 0) {
551                 memset(nbp->wp, 0, count);
552                 nbp->wp += count;
553         }
554         copyblockcnt++;
555         QDEBUG checkb(nbp, "copyblock 1");
556
557         return nbp;
558 }
559
560 /* Adjust block @bp so that its size is exactly @len.
561  * If the size is increased, fill in the new contents with zeros.
562  * If the size is decreased, discard some of the old contents at the tail. */
563 struct block *adjustblock(struct block *bp, int len)
564 {
565         struct extra_bdata *ebd;
566         void *buf;
567         int i;
568
569         if (len < 0) {
570                 freeb(bp);
571                 return NULL;
572         }
573
574         if (len == BLEN(bp))
575                 return bp;
576
577         /* Shrink within block main body. */
578         if (len <= BHLEN(bp)) {
579                 free_block_extra(bp);
580                 bp->wp = bp->rp + len;
581                 QDEBUG checkb(bp, "adjustblock 1");
582                 return bp;
583         }
584
585         /* Need to grow. */
586         if (len > BLEN(bp)) {
587                 /* Grow within block main body. */
588                 if (bp->extra_len == 0 && bp->rp + len <= bp->lim) {
589                         memset(bp->wp, 0, len - BLEN(bp));
590                         bp->wp = bp->rp + len;
591                         QDEBUG checkb(bp, "adjustblock 2");
592                         return bp;
593                 }
594                 /* Grow with extra data buffers. */
595                 buf = kzmalloc(len - BLEN(bp), MEM_WAIT);
596                 block_append_extra(bp, (uintptr_t)buf, 0, len - BLEN(bp), MEM_WAIT);
597                 QDEBUG checkb(bp, "adjustblock 3");
598                 return bp;
599         }
600
601         /* Shrink extra data buffers.
602          * len is how much of ebd we need to keep.
603          * extra_len is re-accumulated. */
604         assert(bp->extra_len > 0);
605         len -= BHLEN(bp);
606         bp->extra_len = 0;
607         for (i = 0; i < bp->nr_extra_bufs; i++) {
608                 ebd = &bp->extra_data[i];
609                 if (len <= ebd->len)
610                         break;
611                 len -= ebd->len;
612                 bp->extra_len += ebd->len;
613         }
614         /* If len becomes zero, extra_data[i] should be freed. */
615         if (len > 0) {
616                 ebd = &bp->extra_data[i];
617                 ebd->len = len;
618                 bp->extra_len += ebd->len;
619                 i++;
620         }
621         for (; i < bp->nr_extra_bufs; i++) {
622                 ebd = &bp->extra_data[i];
623                 if (ebd->base)
624                         kfree((void*)ebd->base);
625                 ebd->base = ebd->off = ebd->len = 0;
626         }
627         QDEBUG checkb(bp, "adjustblock 4");
628         return bp;
629 }
630
631
632 /*
633  *  get next block from a queue, return null if nothing there
634  */
635 struct block *qget(struct queue *q)
636 {
637         int dowakeup;
638         struct block *b;
639
640         /* sync with qwrite */
641         spin_lock_irqsave(&q->lock);
642
643         b = q->bfirst;
644         if (b == NULL) {
645                 q->state |= Qstarve;
646                 spin_unlock_irqsave(&q->lock);
647                 return NULL;
648         }
649         q->bfirst = b->next;
650         b->next = 0;
651         q->len -= BALLOC(b);
652         q->dlen -= BLEN(b);
653         QDEBUG checkb(b, "qget");
654
655         /* if writer flow controlled, restart */
656         if ((q->state & Qflow) && q->len < q->limit / 2) {
657                 q->state &= ~Qflow;
658                 dowakeup = 1;
659         } else
660                 dowakeup = 0;
661
662         spin_unlock_irqsave(&q->lock);
663
664         if (dowakeup)
665                 rendez_wakeup(&q->wr);
666         qwake_cb(q, FDTAP_FILT_WRITABLE);
667
668         return b;
669 }
670
671 /*
672  *  throw away the next 'len' bytes in the queue
673  * returning the number actually discarded
674  */
675 int qdiscard(struct queue *q, int len)
676 {
677         struct block *b;
678         int dowakeup, n, sofar, body_amt, extra_amt;
679         struct extra_bdata *ebd;
680
681         spin_lock_irqsave(&q->lock);
682         for (sofar = 0; sofar < len; sofar += n) {
683                 b = q->bfirst;
684                 if (b == NULL)
685                         break;
686                 QDEBUG checkb(b, "qdiscard");
687                 n = BLEN(b);
688                 if (n <= len - sofar) {
689                         q->bfirst = b->next;
690                         b->next = 0;
691                         q->len -= BALLOC(b);
692                         q->dlen -= BLEN(b);
693                         freeb(b);
694                 } else {
695                         n = len - sofar;
696                         q->dlen -= n;
697                         /* partial block removal */
698                         body_amt = MIN(BHLEN(b), n);
699                         b->rp += body_amt;
700                         extra_amt = n - body_amt;
701                         /* reduce q->len by the amount we remove from the extras.  The
702                          * header will always be accounted for above, during block removal.
703                          * */
704                         q->len -= extra_amt;
705                         for (int i = 0; (i < b->nr_extra_bufs) && extra_amt; i++) {
706                                 ebd = &b->extra_data[i];
707                                 if (!ebd->base || !ebd->len)
708                                         continue;
709                                 if (extra_amt >= ebd->len) {
710                                         /* remove the entire entry, note the kfree release */
711                                         b->extra_len -= ebd->len;
712                                         extra_amt -= ebd->len;
713                                         kfree((void*)ebd->base);
714                                         ebd->base = ebd->off = ebd->len = 0;
715                                         continue;
716                                 }
717                                 ebd->off += extra_amt;
718                                 ebd->len -= extra_amt;
719                                 b->extra_len -= extra_amt;
720                                 extra_amt = 0;
721                         }
722                 }
723         }
724
725         /*
726          *  if writer flow controlled, restart
727          *
728          *  This used to be
729          *  q->len < q->limit/2
730          *  but it slows down tcp too much for certain write sizes.
731          *  I really don't understand it completely.  It may be
732          *  due to the queue draining so fast that the transmission
733          *  stalls waiting for the app to produce more data.  - presotto
734          */
735         if ((q->state & Qflow) && q->len < q->limit) {
736                 q->state &= ~Qflow;
737                 dowakeup = 1;
738         } else
739                 dowakeup = 0;
740
741         spin_unlock_irqsave(&q->lock);
742
743         if (dowakeup)
744                 rendez_wakeup(&q->wr);
745         qwake_cb(q, FDTAP_FILT_WRITABLE);
746
747         return sofar;
748 }
749
750 ssize_t qpass(struct queue *q, struct block *b)
751 {
752         return __qbwrite(q, b, QIO_LIMIT | QIO_DROP_OVERFLOW);
753 }
754
755 ssize_t qpassnolim(struct queue *q, struct block *b)
756 {
757         return __qbwrite(q, b, 0);
758 }
759
760 /*
761  *  if the allocated space is way out of line with the used
762  *  space, reallocate to a smaller block
763  */
764 struct block *packblock(struct block *bp)
765 {
766         struct block **l, *nbp;
767         int n;
768
769         if (bp->extra_len)
770                 return bp;
771         for (l = &bp; *l; l = &(*l)->next) {
772                 nbp = *l;
773                 n = BLEN(nbp);
774                 if ((n << 2) < BALLOC(nbp)) {
775                         *l = block_alloc(n, MEM_WAIT);
776                         memmove((*l)->wp, nbp->rp, n);
777                         (*l)->wp += n;
778                         (*l)->next = nbp->next;
779                         freeb(nbp);
780                 }
781         }
782
783         return bp;
784 }
785
786 /* Add an extra_data entry to newb at newb_idx pointing to b's body, starting at
787  * body_rp, for up to len.  Returns the len consumed. 
788  *
789  * The base is 'b', so that we can kfree it later.  This currently ties us to
790  * using kfree for the release method for all extra_data.
791  *
792  * It is possible to have a body size that is 0, if there is no offset, and
793  * b->wp == b->rp.  This will have an extra data entry of 0 length. */
794 static size_t point_to_body(struct block *b, uint8_t *body_rp,
795                             struct block *newb, unsigned int newb_idx,
796                             size_t len)
797 {
798         struct extra_bdata *ebd = &newb->extra_data[newb_idx];
799
800         assert(newb_idx < newb->nr_extra_bufs);
801
802         kmalloc_incref(b);
803         ebd->base = (uintptr_t)b;
804         ebd->off = (uint32_t)(body_rp - (uint8_t*)b);
805         ebd->len = MIN(b->wp - body_rp, len);   /* think of body_rp as b->rp */
806         assert((int)ebd->len >= 0);
807         newb->extra_len += ebd->len;
808         return ebd->len;
809 }
810
811 /* Add an extra_data entry to newb at newb_idx pointing to b's b_idx'th
812  * extra_data buf, at b_off within that buffer, for up to len.  Returns the len
813  * consumed.
814  *
815  * We can have blocks with 0 length, but they are still refcnt'd.  See above. */
816 static size_t point_to_buf(struct block *b, unsigned int b_idx, uint32_t b_off,
817                            struct block *newb, unsigned int newb_idx,
818                            size_t len)
819 {
820         struct extra_bdata *n_ebd = &newb->extra_data[newb_idx];
821         struct extra_bdata *b_ebd = &b->extra_data[b_idx];
822
823         assert(b_idx < b->nr_extra_bufs);
824         assert(newb_idx < newb->nr_extra_bufs);
825
826         kmalloc_incref((void*)b_ebd->base);
827         n_ebd->base = b_ebd->base;
828         n_ebd->off = b_ebd->off + b_off;
829         n_ebd->len = MIN(b_ebd->len - b_off, len);
830         newb->extra_len += n_ebd->len;
831         return n_ebd->len;
832 }
833
834 /* given a string of blocks, fills the new block's extra_data  with the contents
835  * of the blist [offset, len + offset)
836  *
837  * returns 0 on success.  the only failure is if the extra_data array was too
838  * small, so this returns a positive integer saying how big the extra_data needs
839  * to be.
840  *
841  * callers are responsible for protecting the list structure. */
842 static int __blist_clone_to(struct block *blist, struct block *newb, int len,
843                             uint32_t offset)
844 {
845         struct block *b, *first;
846         unsigned int nr_bufs = 0;
847         unsigned int b_idx, newb_idx = 0;
848         uint8_t *first_main_body = 0;
849
850         /* find the first block; keep offset relative to the latest b in the list */
851         for (b = blist; b; b = b->next) {
852                 if (BLEN(b) > offset)
853                         break;
854                 offset -= BLEN(b);
855         }
856         /* qcopy semantics: if you asked for an offset outside the block list, you
857          * get an empty block back */
858         if (!b)
859                 return 0;
860         first = b;
861         /* upper bound for how many buffers we'll need in newb */
862         for (/* b is set*/; b; b = b->next) {
863                 nr_bufs += 1 + b->nr_extra_bufs;        /* 1 for the main body */
864         }
865         /* we might be holding a spinlock here, so we won't wait for kmalloc */
866         if (block_add_extd(newb, nr_bufs, 0) != 0) {
867                 /* caller will need to alloc these, then re-call us */
868                 return nr_bufs;
869         }
870         for (b = first; b && len; b = b->next) {
871                 b_idx = 0;
872                 if (offset) {
873                         if (offset < BHLEN(b)) {
874                                 /* off is in the main body */
875                                 len -= point_to_body(b, b->rp + offset, newb, newb_idx, len);
876                                 newb_idx++;
877                         } else {
878                                 /* off is in one of the buffers (or just past the last one).
879                                  * we're not going to point to b's main body at all. */
880                                 offset -= BHLEN(b);
881                                 assert(b->extra_data);
882                                 /* assuming these extrabufs are packed, or at least that len
883                                  * isn't gibberish */
884                                 while (b->extra_data[b_idx].len <= offset) {
885                                         offset -= b->extra_data[b_idx].len;
886                                         b_idx++;
887                                 }
888                                 /* now offset is set to our offset in the b_idx'th buf */
889                                 len -= point_to_buf(b, b_idx, offset, newb, newb_idx, len);
890                                 newb_idx++;
891                                 b_idx++;
892                         }
893                         offset = 0;
894                 } else {
895                         len -= point_to_body(b, b->rp, newb, newb_idx, len);
896                         newb_idx++;
897                 }
898                 /* knock out all remaining bufs.  we only did one point_to_ op by now,
899                  * and any point_to_ could be our last if it consumed all of len. */
900                 for (int i = b_idx; (i < b->nr_extra_bufs) && len; i++) {
901                         len -= point_to_buf(b, i, 0, newb, newb_idx, len);
902                         newb_idx++;
903                 }
904         }
905         return 0;
906 }
907
908 struct block *blist_clone(struct block *blist, int header_len, int len,
909                           uint32_t offset)
910 {
911         int ret;
912         struct block *newb = block_alloc(header_len, MEM_WAIT);
913         do {
914                 ret = __blist_clone_to(blist, newb, len, offset);
915                 if (ret)
916                         block_add_extd(newb, ret, MEM_WAIT);
917         } while (ret);
918         return newb;
919 }
920
921 /* given a queue, makes a single block with header_len reserved space in the
922  * block main body, and the contents of [offset, len + offset) pointed to in the
923  * new blocks ext_data. */
924 struct block *qclone(struct queue *q, int header_len, int len, uint32_t offset)
925 {
926         int ret;
927         struct block *newb = block_alloc(header_len, MEM_WAIT);
928         /* the while loop should rarely be used: it would require someone
929          * concurrently adding to the queue. */
930         do {
931                 /* TODO: RCU: protecting the q list (b->next) (need read lock) */
932                 spin_lock_irqsave(&q->lock);
933                 ret = __blist_clone_to(q->bfirst, newb, len, offset);
934                 spin_unlock_irqsave(&q->lock);
935                 if (ret)
936                         block_add_extd(newb, ret, MEM_WAIT);
937         } while (ret);
938         return newb;
939 }
940
941 /*
942  *  copy from offset in the queue
943  */
944 struct block *qcopy_old(struct queue *q, int len, uint32_t offset)
945 {
946         int sofar;
947         int n;
948         struct block *b, *nb;
949         uint8_t *p;
950
951         nb = block_alloc(len, MEM_WAIT);
952
953         spin_lock_irqsave(&q->lock);
954
955         /* go to offset */
956         b = q->bfirst;
957         for (sofar = 0;; sofar += n) {
958                 if (b == NULL) {
959                         spin_unlock_irqsave(&q->lock);
960                         return nb;
961                 }
962                 n = BLEN(b);
963                 if (sofar + n > offset) {
964                         p = b->rp + offset - sofar;
965                         n -= offset - sofar;
966                         break;
967                 }
968                 QDEBUG checkb(b, "qcopy");
969                 b = b->next;
970         }
971
972         /* copy bytes from there */
973         for (sofar = 0; sofar < len;) {
974                 if (n > len - sofar)
975                         n = len - sofar;
976                 PANIC_EXTRA(b);
977                 memmove(nb->wp, p, n);
978                 qcopycnt += n;
979                 sofar += n;
980                 nb->wp += n;
981                 b = b->next;
982                 if (b == NULL)
983                         break;
984                 n = BLEN(b);
985                 p = b->rp;
986         }
987         spin_unlock_irqsave(&q->lock);
988
989         return nb;
990 }
991
992 struct block *qcopy(struct queue *q, int len, uint32_t offset)
993 {
994 #ifdef CONFIG_BLOCK_EXTRAS
995         return qclone(q, 0, len, offset);
996 #else
997         return qcopy_old(q, len, offset);
998 #endif
999 }
1000
1001 static void qinit_common(struct queue *q)
1002 {
1003         spinlock_init_irqsave(&q->lock);
1004         rendez_init(&q->rr);
1005         rendez_init(&q->wr);
1006 }
1007
1008 /*
1009  *  called by non-interrupt code
1010  */
1011 struct queue *qopen(int limit, int msg, void (*kick) (void *), void *arg)
1012 {
1013         struct queue *q;
1014
1015         q = kzmalloc(sizeof(struct queue), 0);
1016         if (q == 0)
1017                 return 0;
1018         qinit_common(q);
1019
1020         q->limit = q->inilim = limit;
1021         q->kick = kick;
1022         q->arg = arg;
1023         q->state = msg;
1024         q->state |= Qstarve;
1025         q->eof = 0;
1026
1027         return q;
1028 }
1029
1030 /* open a queue to be bypassed */
1031 struct queue *qbypass(void (*bypass) (void *, struct block *), void *arg)
1032 {
1033         struct queue *q;
1034
1035         q = kzmalloc(sizeof(struct queue), 0);
1036         if (q == 0)
1037                 return 0;
1038         qinit_common(q);
1039
1040         q->limit = 0;
1041         q->arg = arg;
1042         q->bypass = bypass;
1043         q->state = 0;
1044
1045         return q;
1046 }
1047
1048 static int notempty(void *a)
1049 {
1050         struct queue *q = a;
1051
1052         return (q->state & Qclosed) || q->bfirst != 0;
1053 }
1054
1055 /* Block, waiting for the queue to be non-empty or closed.  Returns with
1056  * the spinlock held.  Returns TRUE when there queue is not empty, FALSE if it
1057  * was naturally closed.  Throws an error o/w. */
1058 static bool qwait_and_ilock(struct queue *q)
1059 {
1060         while (1) {
1061                 spin_lock_irqsave(&q->lock);
1062                 if (q->bfirst != NULL)
1063                         return TRUE;
1064                 if (q->state & Qclosed) {
1065                         if (++q->eof > 3) {
1066                                 spin_unlock_irqsave(&q->lock);
1067                                 error(EFAIL, "multiple reads on a closed queue");
1068                         }
1069                         if (q->err[0]) {
1070                                 spin_unlock_irqsave(&q->lock);
1071                                 error(EFAIL, q->err);
1072                         }
1073                         return FALSE;
1074                 }
1075                 /* We set Qstarve regardless of whether we are non-blocking or not.
1076                  * Qstarve tracks the edge detection of the queue being empty. */
1077                 q->state |= Qstarve;
1078                 if (q->state & Qnonblock) {
1079                         spin_unlock_irqsave(&q->lock);
1080                         error(EAGAIN, "queue empty");
1081                 }
1082                 spin_unlock_irqsave(&q->lock);
1083                 /* may throw an error() */
1084                 rendez_sleep(&q->rr, notempty, q);
1085         }
1086 }
1087
1088 /*
1089  * add a block list to a queue
1090  */
1091 void qaddlist(struct queue *q, struct block *b)
1092 {
1093         /* TODO: q lock? */
1094         /* queue the block */
1095         if (q->bfirst)
1096                 q->blast->next = b;
1097         else
1098                 q->bfirst = b;
1099         q->len += blockalloclen(b);
1100         q->dlen += blocklen(b);
1101         while (b->next)
1102                 b = b->next;
1103         q->blast = b;
1104 }
1105
1106 /*
1107  *  called with q ilocked
1108  */
1109 static struct block *qremove(struct queue *q)
1110 {
1111         struct block *b;
1112
1113         b = q->bfirst;
1114         if (b == NULL)
1115                 return NULL;
1116         q->bfirst = b->next;
1117         b->next = NULL;
1118         q->dlen -= BLEN(b);
1119         q->len -= BALLOC(b);
1120         QDEBUG checkb(b, "qremove");
1121         return b;
1122 }
1123
1124 static size_t read_from_block(struct block *b, uint8_t *to, size_t amt)
1125 {
1126         size_t copy_amt, retval = 0;
1127         struct extra_bdata *ebd;
1128         
1129         copy_amt = MIN(BHLEN(b), amt);
1130         memcpy(to, b->rp, copy_amt);
1131         /* advance the rp, since this block not be completely consumed and future
1132          * reads need to know where to pick up from */
1133         b->rp += copy_amt;
1134         to += copy_amt;
1135         amt -= copy_amt;
1136         retval += copy_amt;
1137         for (int i = 0; (i < b->nr_extra_bufs) && amt; i++) {
1138                 ebd = &b->extra_data[i];
1139                 /* skip empty entires.  if we track this in the struct block, we can
1140                  * just start the for loop early */
1141                 if (!ebd->base || !ebd->len)
1142                         continue;
1143                 copy_amt = MIN(ebd->len, amt);
1144                 memcpy(to, (void*)(ebd->base + ebd->off), copy_amt);
1145                 /* we're actually consuming the entries, just like how we advance rp up
1146                  * above, and might only consume part of one. */
1147                 ebd->len -= copy_amt;
1148                 ebd->off += copy_amt;
1149                 b->extra_len -= copy_amt;
1150                 if (!ebd->len) {
1151                         /* we don't actually have to decref here.  it's also done in
1152                          * freeb().  this is the earliest we can free. */
1153                         kfree((void*)ebd->base);
1154                         ebd->base = ebd->off = 0;
1155                 }
1156                 to += copy_amt;
1157                 amt -= copy_amt;
1158                 retval += copy_amt;
1159         }
1160         return retval;
1161 }
1162
1163 /*
1164  *  copy the contents of a string of blocks into
1165  *  memory.  emptied blocks are freed.  return
1166  *  pointer to first unconsumed block.
1167  */
1168 struct block *bl2mem(uint8_t * p, struct block *b, int n)
1169 {
1170         int i;
1171         struct block *next;
1172
1173         /* could be slicker here, since read_from_block is smart */
1174         for (; b != NULL; b = next) {
1175                 i = BLEN(b);
1176                 if (i > n) {
1177                         /* partial block, consume some */
1178                         read_from_block(b, p, n);
1179                         return b;
1180                 }
1181                 /* full block, consume all and move on */
1182                 i = read_from_block(b, p, i);
1183                 n -= i;
1184                 p += i;
1185                 next = b->next;
1186                 freeb(b);
1187         }
1188         return NULL;
1189 }
1190
1191 /*
1192  *  copy the contents of memory into a string of blocks.
1193  *  return NULL on error.
1194  */
1195 struct block *mem2bl(uint8_t * p, int len)
1196 {
1197         ERRSTACK(1);
1198         int n;
1199         struct block *b, *first, **l;
1200
1201         first = NULL;
1202         l = &first;
1203         if (waserror()) {
1204                 freeblist(first);
1205                 nexterror();
1206         }
1207         do {
1208                 n = len;
1209                 if (n > Maxatomic)
1210                         n = Maxatomic;
1211
1212                 *l = b = block_alloc(n, MEM_WAIT);
1213                 /* TODO consider extra_data */
1214                 memmove(b->wp, p, n);
1215                 b->wp += n;
1216                 p += n;
1217                 len -= n;
1218                 l = &b->next;
1219         } while (len > 0);
1220         poperror();
1221
1222         return first;
1223 }
1224
1225 /*
1226  *  put a block back to the front of the queue
1227  *  called with q ilocked
1228  */
1229 void qputback(struct queue *q, struct block *b)
1230 {
1231         b->next = q->bfirst;
1232         if (q->bfirst == NULL)
1233                 q->blast = b;
1234         q->bfirst = b;
1235         q->len += BALLOC(b);
1236         q->dlen += BLEN(b);
1237 }
1238
1239 /*
1240  *  flow control, get producer going again
1241  *  called with q ilocked
1242  */
1243 static void qwakeup_iunlock(struct queue *q)
1244 {
1245         int dowakeup = 0;
1246
1247         /* if writer flow controlled, restart */
1248         if ((q->state & Qflow) && q->len < q->limit / 2) {
1249                 q->state &= ~Qflow;
1250                 dowakeup = 1;
1251         }
1252
1253         spin_unlock_irqsave(&q->lock);
1254
1255         /* wakeup flow controlled writers */
1256         if (dowakeup) {
1257                 if (q->kick)
1258                         q->kick(q->arg);
1259                 rendez_wakeup(&q->wr);
1260         }
1261         qwake_cb(q, FDTAP_FILT_WRITABLE);
1262 }
1263
1264 /*
1265  *  get next block from a queue (up to a limit)
1266  */
1267 struct block *qbread(struct queue *q, int len)
1268 {
1269         struct block *b, *nb;
1270         int n;
1271
1272         if (!qwait_and_ilock(q)) {
1273                 /* queue closed */
1274                 spin_unlock_irqsave(&q->lock);
1275                 return NULL;
1276         }
1277
1278         /* if we get here, there's at least one block in the queue */
1279         b = qremove(q);
1280         n = BLEN(b);
1281
1282         /* split block if it's too big and this is not a message queue */
1283         nb = b;
1284         if (n > len) {
1285                 PANIC_EXTRA(b);
1286                 if ((q->state & Qmsg) == 0) {
1287                         n -= len;
1288                         b = block_alloc(n, MEM_WAIT);
1289                         memmove(b->wp, nb->rp + len, n);
1290                         b->wp += n;
1291                         qputback(q, b);
1292                 }
1293                 nb->wp = nb->rp + len;
1294         }
1295
1296         /* restart producer */
1297         qwakeup_iunlock(q);
1298
1299         return nb;
1300 }
1301
1302 /*
1303  *  read a queue.  if no data is queued, post a struct block
1304  *  and wait on its Rendez.
1305  */
1306 long qread(struct queue *q, void *vp, int len)
1307 {
1308         struct block *b, *first, **l;
1309         int m, n;
1310
1311 again:
1312         if (!qwait_and_ilock(q)) {
1313                 /* queue closed */
1314                 spin_unlock_irqsave(&q->lock);
1315                 return 0;
1316         }
1317
1318         /* if we get here, there's at least one block in the queue */
1319         // TODO: Consider removing the Qcoalesce flag and force a coalescing
1320         // strategy by default.
1321         if (q->state & Qcoalesce) {
1322                 /* when coalescing, 0 length blocks just go away */
1323                 b = q->bfirst;
1324                 if (BLEN(b) <= 0) {
1325                         freeb(qremove(q));
1326                         spin_unlock_irqsave(&q->lock);
1327                         goto again;
1328                 }
1329
1330                 /*  grab the first block plus as many
1331                  *  following blocks as will completely
1332                  *  fit in the read.
1333                  */
1334                 n = 0;
1335                 l = &first;
1336                 m = BLEN(b);
1337                 for (;;) {
1338                         *l = qremove(q);
1339                         l = &b->next;
1340                         n += m;
1341
1342                         b = q->bfirst;
1343                         if (b == NULL)
1344                                 break;
1345                         m = BLEN(b);
1346                         if (n + m > len)
1347                                 break;
1348                 }
1349         } else {
1350                 first = qremove(q);
1351                 n = BLEN(first);
1352         }
1353         b = bl2mem(vp, first, len);
1354         /* take care of any left over partial block */
1355         if (b != NULL) {
1356                 n -= BLEN(b);
1357                 if (q->state & Qmsg)
1358                         freeb(b);
1359                 else
1360                         qputback(q, b);
1361         }
1362
1363         /* restart producer */
1364         qwakeup_iunlock(q);
1365
1366         return n;
1367 }
1368
1369 static int qnotfull(void *a)
1370 {
1371         struct queue *q = a;
1372
1373         return q->len < q->limit || (q->state & Qclosed);
1374 }
1375
1376 /* Helper: enqueues a list of blocks to a queue.  Returns the total length. */
1377 static size_t enqueue_blist(struct queue *q, struct block *b)
1378 {
1379         size_t len, dlen;
1380
1381         if (q->bfirst)
1382                 q->blast->next = b;
1383         else
1384                 q->bfirst = b;
1385         len = BALLOC(b);
1386         dlen = BLEN(b);
1387         while (b->next) {
1388                 b = b->next;
1389                 len += BALLOC(b);
1390                 dlen += BLEN(b);
1391         }
1392         q->blast = b;
1393         q->len += len;
1394         q->dlen += dlen;
1395         return dlen;
1396 }
1397
1398 /* Adds block (which can be a list of blocks) to the queue, subject to
1399  * qio_flags.  Returns the length written on success or -1 on non-throwable
1400  * error.  Adjust qio_flags to control the value-added features!. */
1401 static ssize_t __qbwrite(struct queue *q, struct block *b, int qio_flags)
1402 {
1403         ssize_t ret;
1404         bool dowakeup = FALSE;
1405         bool was_empty;
1406
1407         if (q->bypass) {
1408                 (*q->bypass) (q->arg, b);
1409                 return blocklen(b);
1410         }
1411         spin_lock_irqsave(&q->lock);
1412         was_empty = q->len == 0;
1413         if (q->state & Qclosed) {
1414                 spin_unlock_irqsave(&q->lock);
1415                 freeblist(b);
1416                 if (!(qio_flags & QIO_CAN_ERR_SLEEP))
1417                         return -1;
1418                 if (q->err[0])
1419                         error(EFAIL, q->err);
1420                 else
1421                         error(EFAIL, "connection closed");
1422         }
1423         if ((qio_flags & QIO_LIMIT) && (q->len >= q->limit)) {
1424                 /* drop overflow takes priority over regular non-blocking */
1425                 if ((qio_flags & QIO_DROP_OVERFLOW) || (q->state & Qdropoverflow)) {
1426                         spin_unlock_irqsave(&q->lock);
1427                         freeb(b);
1428                         return -1;
1429                 }
1430                 if ((qio_flags & QIO_CAN_ERR_SLEEP) && (q->state & Qnonblock)) {
1431                         spin_unlock_irqsave(&q->lock);
1432                         freeb(b);
1433                         error(EAGAIN, "queue full");
1434                 }
1435         }
1436         ret = enqueue_blist(q, b);
1437         QDEBUG checkb(b, "__qbwrite");
1438         /* make sure other end gets awakened */
1439         if (q->state & Qstarve) {
1440                 q->state &= ~Qstarve;
1441                 dowakeup = TRUE;
1442         }
1443         spin_unlock_irqsave(&q->lock);
1444         /* TODO: not sure if the usage of a kick is mutually exclusive with a
1445          * wakeup, meaning that actual users either want a kick or have qreaders. */
1446         if (q->kick && (dowakeup || (q->state & Qkick)))
1447                 q->kick(q->arg);
1448         if (dowakeup)
1449                 rendez_wakeup(&q->rr);
1450         if (was_empty)
1451                 qwake_cb(q, FDTAP_FILT_READABLE);
1452         /*
1453          *  flow control, wait for queue to get below the limit
1454          *  before allowing the process to continue and queue
1455          *  more.  We do this here so that postnote can only
1456          *  interrupt us after the data has been queued.  This
1457          *  means that things like 9p flushes and ssl messages
1458          *  will not be disrupted by software interrupts.
1459          *
1460          *  Note - this is moderately dangerous since a process
1461          *  that keeps getting interrupted and rewriting will
1462          *  queue infinite crud.
1463          */
1464         if ((qio_flags & QIO_CAN_ERR_SLEEP) &&
1465             !(q->state & (Qdropoverflow | Qnonblock))) {
1466                 /* This is a racy peek at the q status.  If we accidentally block, we
1467                  * set Qflow, so someone should wake us.  If we accidentally don't
1468                  * block, we just returned to the user and let them slip a block past
1469                  * flow control. */
1470                 while (!qnotfull(q)) {
1471                         spin_lock_irqsave(&q->lock);
1472                         q->state |= Qflow;
1473                         spin_unlock_irqsave(&q->lock);
1474                         rendez_sleep(&q->wr, qnotfull, q);
1475                 }
1476         }
1477         return ret;
1478 }
1479
1480 /*
1481  *  add a block to a queue obeying flow control
1482  */
1483 ssize_t qbwrite(struct queue *q, struct block *b)
1484 {
1485         return __qbwrite(q, b, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
1486 }
1487
1488 ssize_t qibwrite(struct queue *q, struct block *b)
1489 {
1490         return __qbwrite(q, b, 0);
1491 }
1492
1493 /* Helper, allocs a block and copies [from, from + len) into it.  Returns the
1494  * block on success, 0 on failure. */
1495 static struct block *build_block(void *from, size_t len, int mem_flags)
1496 {
1497         struct block *b;
1498         void *ext_buf;
1499
1500         /* If len is small, we don't need to bother with the extra_data.  But until
1501          * the whole stack can handle extd blocks, we'll use them unconditionally.
1502          * */
1503 #ifdef CONFIG_BLOCK_EXTRAS
1504         /* allocb builds in 128 bytes of header space to all blocks, but this is
1505          * only available via padblock (to the left).  we also need some space
1506          * for pullupblock for some basic headers (like icmp) that get written
1507          * in directly */
1508         b = block_alloc(64, mem_flags);
1509         if (!b)
1510                 return 0;
1511         ext_buf = kmalloc(len, mem_flags);
1512         if (!ext_buf) {
1513                 kfree(b);
1514                 return 0;
1515         }
1516         memcpy(ext_buf, from, len);
1517         if (block_add_extd(b, 1, mem_flags)) {
1518                 kfree(ext_buf);
1519                 kfree(b);
1520                 return 0;
1521         }
1522         b->extra_data[0].base = (uintptr_t)ext_buf;
1523         b->extra_data[0].off = 0;
1524         b->extra_data[0].len = len;
1525         b->extra_len += len;
1526 #else
1527         b = block_alloc(n, mem_flags);
1528         if (!b)
1529                 return 0;
1530         memmove(b->wp, from, len);
1531         b->wp += len;
1532 #endif
1533         return b;
1534 }
1535
1536 static ssize_t __qwrite(struct queue *q, void *vp, size_t len, int mem_flags,
1537                         int qio_flags)
1538 {
1539         size_t n, sofar;
1540         struct block *b;
1541         uint8_t *p = vp;
1542         void *ext_buf;
1543
1544         sofar = 0;
1545         do {
1546                 n = len - sofar;
1547                 /* This is 64K, the max amount per single block.  Still a good value? */
1548                 if (n > Maxatomic)
1549                         n = Maxatomic;
1550                 b = build_block(p + sofar, n, mem_flags);
1551                 if (!b)
1552                         break;
1553                 if (__qbwrite(q, b, qio_flags) < 0)
1554                         break;
1555                 sofar += n;
1556         } while ((sofar < len) && (q->state & Qmsg) == 0);
1557         return sofar;
1558 }
1559
1560 ssize_t qwrite(struct queue *q, void *vp, int len)
1561 {
1562         return __qwrite(q, vp, len, MEM_WAIT, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
1563 }
1564
1565 ssize_t qiwrite(struct queue *q, void *vp, int len)
1566 {
1567         return __qwrite(q, vp, len, MEM_ATOMIC, 0);
1568 }
1569
1570 /*
1571  *  be extremely careful when calling this,
1572  *  as there is no reference accounting
1573  */
1574 void qfree(struct queue *q)
1575 {
1576         qclose(q);
1577         kfree(q);
1578 }
1579
1580 /*
1581  *  Mark a queue as closed.  No further IO is permitted.
1582  *  All blocks are released.
1583  */
1584 void qclose(struct queue *q)
1585 {
1586         struct block *bfirst;
1587
1588         if (q == NULL)
1589                 return;
1590
1591         /* mark it */
1592         spin_lock_irqsave(&q->lock);
1593         q->state |= Qclosed;
1594         q->state &= ~(Qflow | Qstarve | Qdropoverflow | Qnonblock);
1595         q->err[0] = 0;
1596         bfirst = q->bfirst;
1597         q->bfirst = 0;
1598         q->len = 0;
1599         q->dlen = 0;
1600         spin_unlock_irqsave(&q->lock);
1601
1602         /* free queued blocks */
1603         freeblist(bfirst);
1604
1605         /* wake up readers/writers */
1606         rendez_wakeup(&q->rr);
1607         rendez_wakeup(&q->wr);
1608         qwake_cb(q, FDTAP_FILT_HANGUP);
1609 }
1610
1611 /* Mark a queue as closed.  Wakeup any readers.  Don't remove queued blocks.
1612  *
1613  * msg will be the errstr received by any waiters (qread, qbread, etc).  If
1614  * there is no message, which is what also happens during a natural qclose(),
1615  * those waiters will simply return 0.  qwriters will always error() on a
1616  * closed/hungup queue. */
1617 void qhangup(struct queue *q, char *msg)
1618 {
1619         /* mark it */
1620         spin_lock_irqsave(&q->lock);
1621         q->state |= Qclosed;
1622         if (msg == 0 || *msg == 0)
1623                 q->err[0] = 0;
1624         else
1625                 strlcpy(q->err, msg, ERRMAX);
1626         spin_unlock_irqsave(&q->lock);
1627
1628         /* wake up readers/writers */
1629         rendez_wakeup(&q->rr);
1630         rendez_wakeup(&q->wr);
1631         qwake_cb(q, FDTAP_FILT_HANGUP);
1632 }
1633
1634 /*
1635  *  return non-zero if the q is hungup
1636  */
1637 int qisclosed(struct queue *q)
1638 {
1639         return q->state & Qclosed;
1640 }
1641
1642 /*
1643  *  mark a queue as no longer hung up.  resets the wake_cb.
1644  */
1645 void qreopen(struct queue *q)
1646 {
1647         spin_lock_irqsave(&q->lock);
1648         q->state &= ~Qclosed;
1649         q->state |= Qstarve;
1650         q->eof = 0;
1651         q->limit = q->inilim;
1652         q->wake_cb = 0;
1653         q->wake_data = 0;
1654         spin_unlock_irqsave(&q->lock);
1655 }
1656
1657 /*
1658  *  return bytes queued
1659  */
1660 int qlen(struct queue *q)
1661 {
1662         return q->dlen;
1663 }
1664
1665 /*
1666  * return space remaining before flow control
1667  */
1668 int qwindow(struct queue *q)
1669 {
1670         int l;
1671
1672         l = q->limit - q->len;
1673         if (l < 0)
1674                 l = 0;
1675         return l;
1676 }
1677
1678 /*
1679  *  return true if we can read without blocking
1680  */
1681 int qcanread(struct queue *q)
1682 {
1683         return q->bfirst != 0;
1684 }
1685
1686 /*
1687  *  change queue limit
1688  */
1689 void qsetlimit(struct queue *q, int limit)
1690 {
1691         q->limit = limit;
1692 }
1693
1694 /*
1695  *  set whether writes drop overflowing blocks, or if we sleep
1696  */
1697 void qdropoverflow(struct queue *q, bool onoff)
1698 {
1699         if (onoff)
1700                 q->state |= Qdropoverflow;
1701         else
1702                 q->state &= ~Qdropoverflow;
1703 }
1704
1705 /* set whether or not the queue is nonblocking, in the EAGAIN sense. */
1706 void qnonblock(struct queue *q, bool onoff)
1707 {
1708         if (onoff)
1709                 q->state |= Qnonblock;
1710         else
1711                 q->state &= ~Qnonblock;
1712 }
1713
1714 /*
1715  *  flush the output queue
1716  */
1717 void qflush(struct queue *q)
1718 {
1719         struct block *bfirst;
1720
1721         /* mark it */
1722         spin_lock_irqsave(&q->lock);
1723         bfirst = q->bfirst;
1724         q->bfirst = 0;
1725         q->len = 0;
1726         q->dlen = 0;
1727         spin_unlock_irqsave(&q->lock);
1728
1729         /* free queued blocks */
1730         freeblist(bfirst);
1731
1732         /* wake up writers */
1733         rendez_wakeup(&q->wr);
1734         qwake_cb(q, FDTAP_FILT_WRITABLE);
1735 }
1736
1737 int qfull(struct queue *q)
1738 {
1739         return q->len >= q->limit;
1740 }
1741
1742 int qstate(struct queue *q)
1743 {
1744         return q->state;
1745 }
1746
1747 void qdump(struct queue *q)
1748 {
1749         if (q)
1750                 printk("q=%p bfirst=%p blast=%p len=%d dlen=%d limit=%d state=#%x\n",
1751                            q, q->bfirst, q->blast, q->len, q->dlen, q->limit, q->state);
1752 }
1753
1754 /* On certain wakeup events, qio will call func(q, data, filter), where filter
1755  * marks the type of wakeup event (flags from FDTAP).
1756  *
1757  * There's no sync protection.  If you change the CB while the qio is running,
1758  * you might get a CB with the data or func from a previous set_wake_cb.  You
1759  * should set this once per queue and forget it.
1760  *
1761  * You can remove the CB by passing in 0 for the func.  Alternatively, you can
1762  * just make sure that the func(data) pair are valid until the queue is freed or
1763  * reopened. */
1764 void qio_set_wake_cb(struct queue *q, qio_wake_cb_t func, void *data)
1765 {
1766         q->wake_data = data;
1767         wmb();  /* if we see func, we'll also see the data for it */
1768         q->wake_cb = func;
1769 }