Add the Inferno license to files we got from Inferno
[akaros.git] / kern / src / ns / qio.c
1 /* Copyright © 1994-1999 Lucent Technologies Inc.  All rights reserved.
2  * Portions Copyright © 1997-1999 Vita Nuova Limited
3  * Portions Copyright © 2000-2007 Vita Nuova Holdings Limited
4  *                                (www.vitanuova.com)
5  * Revisions Copyright © 2000-2007 Lucent Technologies Inc. and others
6  *
7  * Modified for the Akaros operating system:
8  * Copyright (c) 2013-2014 The Regents of the University of California
9  * Copyright (c) 2013-2015 Google Inc.
10  *
11  * Permission is hereby granted, free of charge, to any person obtaining a copy
12  * of this software and associated documentation files (the "Software"), to deal
13  * in the Software without restriction, including without limitation the rights
14  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15  * copies of the Software, and to permit persons to whom the Software is
16  * furnished to do so, subject to the following conditions:
17  *
18  * The above copyright notice and this permission notice shall be included in
19  * all copies or substantial portions of the Software.
20  *
21  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL THE
24  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
27  * SOFTWARE. */
28
29 #include <vfs.h>
30 #include <kfs.h>
31 #include <slab.h>
32 #include <kmalloc.h>
33 #include <kref.h>
34 #include <string.h>
35 #include <stdio.h>
36 #include <assert.h>
37 #include <error.h>
38 #include <cpio.h>
39 #include <pmap.h>
40 #include <smp.h>
41 #include <ip.h>
42
43 #define PANIC_EXTRA(b)                                                  \
44 {                                                                       \
45         if ((b)->extra_len) {                                           \
46                 printblock(b);                                          \
47                 backtrace();                                            \
48                 panic("%s doesn't handle extra_data", __FUNCTION__);    \
49         }                                                               \
50 }
51
52 static uint32_t padblockcnt;
53 static uint32_t concatblockcnt;
54 static uint32_t pullupblockcnt;
55 static uint32_t copyblockcnt;
56 static uint32_t consumecnt;
57 static uint32_t producecnt;
58 static uint32_t qcopycnt;
59
60 static int debugging;
61
62 #define QDEBUG  if(0)
63
64 /*
65  *  IO queues
66  */
67
68 struct queue {
69         spinlock_t lock;;
70
71         struct block *bfirst;           /* buffer */
72         struct block *blast;
73
74         int len;                                        /* bytes allocated to queue */
75         int dlen;                                       /* data bytes in queue */
76         int limit;                                      /* max bytes in queue */
77         int inilim;                             /* initial limit */
78         int state;
79         int eof;                                        /* number of eofs read by user */
80
81         void (*kick) (void *);          /* restart output */
82         void (*bypass) (void *, struct block *);        /* bypass queue altogether */
83         void *arg;                                      /* argument to kick */
84
85         qlock_t rlock;                          /* mutex for reading processes */
86         struct rendez rr;                       /* process waiting to read */
87         qlock_t wlock;                          /* mutex for writing processes */
88         struct rendez wr;                       /* process waiting to write */
89         qio_wake_cb_t wake_cb;          /* callbacks for qio wakeups */
90         void *wake_data;
91
92         char err[ERRMAX];
93 };
94
95 enum {
96         Maxatomic = 64 * 1024,
97 };
98
99 unsigned int qiomaxatomic = Maxatomic;
100
101 /* Helper: fires a wake callback, sending 'filter' */
102 static void qwake_cb(struct queue *q, int filter)
103 {
104         if (q->wake_cb)
105                 q->wake_cb(q, q->wake_data, filter);
106 }
107
108 void ixsummary(void)
109 {
110         debugging ^= 1;
111         iallocsummary();
112         printd("pad %lu, concat %lu, pullup %lu, copy %lu\n",
113                    padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
114         printd("consume %lu, produce %lu, qcopy %lu\n",
115                    consumecnt, producecnt, qcopycnt);
116 }
117
118 /*
119  *  free a list of blocks
120  */
121 void freeblist(struct block *b)
122 {
123         struct block *next;
124
125         for (; b != 0; b = next) {
126                 next = b->next;
127                 b->next = 0;
128                 freeb(b);
129         }
130 }
131
132 /*
133  *  pad a block to the front (or the back if size is negative)
134  */
135 struct block *padblock(struct block *bp, int size)
136 {
137         int n;
138         struct block *nbp;
139         uint8_t bcksum = bp->flag & BCKSUM_FLAGS;
140         uint16_t checksum_start = bp->checksum_start;
141         uint16_t checksum_offset = bp->checksum_offset;
142         uint16_t mss = bp->mss;
143
144         QDEBUG checkb(bp, "padblock 1");
145         if (size >= 0) {
146                 if (bp->rp - bp->base >= size) {
147                         bp->checksum_start += size;
148                         bp->rp -= size;
149                         return bp;
150                 }
151
152                 PANIC_EXTRA(bp);
153                 if (bp->next)
154                         panic("padblock %p", getcallerpc(&bp));
155                 n = BLEN(bp);
156                 padblockcnt++;
157                 nbp = allocb(size + n);
158                 nbp->rp += size;
159                 nbp->wp = nbp->rp;
160                 memmove(nbp->wp, bp->rp, n);
161                 nbp->wp += n;
162                 freeb(bp);
163                 nbp->rp -= size;
164         } else {
165                 size = -size;
166
167                 PANIC_EXTRA(bp);
168
169                 if (bp->next)
170                         panic("padblock %p", getcallerpc(&bp));
171
172                 if (bp->lim - bp->wp >= size)
173                         return bp;
174
175                 n = BLEN(bp);
176                 padblockcnt++;
177                 nbp = allocb(size + n);
178                 memmove(nbp->wp, bp->rp, n);
179                 nbp->wp += n;
180                 freeb(bp);
181         }
182         if (bcksum) {
183                 nbp->flag |= bcksum;
184                 nbp->checksum_start = checksum_start;
185                 nbp->checksum_offset = checksum_offset;
186                 nbp->mss = mss;
187         }
188         QDEBUG checkb(nbp, "padblock 1");
189         return nbp;
190 }
191
192 /*
193  *  return count of bytes in a string of blocks
194  */
195 int blocklen(struct block *bp)
196 {
197         int len;
198
199         len = 0;
200         while (bp) {
201                 len += BLEN(bp);
202                 bp = bp->next;
203         }
204         return len;
205 }
206
207 /*
208  * return count of space in blocks
209  */
210 int blockalloclen(struct block *bp)
211 {
212         int len;
213
214         len = 0;
215         while (bp) {
216                 len += BALLOC(bp);
217                 bp = bp->next;
218         }
219         return len;
220 }
221
222 /*
223  *  copy the  string of blocks into
224  *  a single block and free the string
225  */
226 struct block *concatblock(struct block *bp)
227 {
228         int len;
229         struct block *nb, *f;
230
231         if (bp->next == 0)
232                 return bp;
233
234         /* probably use parts of qclone */
235         PANIC_EXTRA(bp);
236         nb = allocb(blocklen(bp));
237         for (f = bp; f; f = f->next) {
238                 len = BLEN(f);
239                 memmove(nb->wp, f->rp, len);
240                 nb->wp += len;
241         }
242         concatblockcnt += BLEN(nb);
243         freeblist(bp);
244         QDEBUG checkb(nb, "concatblock 1");
245         return nb;
246 }
247
248 /* Returns a block with the remaining contents of b all in the main body of the
249  * returned block.  Replace old references to b with the returned value (which
250  * may still be 'b', if no change was needed. */
251 struct block *linearizeblock(struct block *b)
252 {
253         struct block *newb;
254         size_t len;
255         struct extra_bdata *ebd;
256
257         if (!b->extra_len)
258                 return b;
259
260         newb = allocb(BLEN(b));
261         len = BHLEN(b);
262         memcpy(newb->wp, b->rp, len);
263         newb->wp += len;
264         len = b->extra_len;
265         for (int i = 0; (i < b->nr_extra_bufs) && len; i++) {
266                 ebd = &b->extra_data[i];
267                 if (!ebd->base || !ebd->len)
268                         continue;
269                 memcpy(newb->wp, (void*)(ebd->base + ebd->off), ebd->len);
270                 newb->wp += ebd->len;
271                 len -= ebd->len;
272         }
273         /* TODO: any other flags that need copied over? */
274         if (b->flag & BCKSUM_FLAGS) {
275                 newb->flag |= (b->flag & BCKSUM_FLAGS);
276                 newb->checksum_start = b->checksum_start;
277                 newb->checksum_offset = b->checksum_offset;
278                 newb->mss = b->mss;
279         }
280         freeb(b);
281         return newb;
282 }
283
284 /*
285  *  make sure the first block has at least n bytes in its main body
286  */
287 struct block *pullupblock(struct block *bp, int n)
288 {
289         int i, len, seglen;
290         struct block *nbp;
291         struct extra_bdata *ebd;
292
293         /*
294          *  this should almost always be true, it's
295          *  just to avoid every caller checking.
296          */
297         if (BHLEN(bp) >= n)
298                 return bp;
299
300          /* a start at explicit main-body / header management */
301         if (bp->extra_len) {
302                 if (n > bp->lim - bp->rp) {
303                         /* would need to realloc a new block and copy everything over. */
304                         panic("can't pullup %d bytes, no place to put it: bp->lim %p, bp->rp %p, bp->lim-bp->rp %d\n",
305                                         n, bp->lim, bp->rp, bp->lim-bp->rp);
306                 }
307                 len = n - BHLEN(bp);
308                 if (len > bp->extra_len)
309                         panic("pullup more than extra (%d, %d, %d)\n",
310                               n, BHLEN(bp), bp->extra_len);
311                 checkb(bp, "before pullup");
312                 for (int i = 0; (i < bp->nr_extra_bufs) && len; i++) {
313                         ebd = &bp->extra_data[i];
314                         if (!ebd->base || !ebd->len)
315                                 continue;
316                         seglen = MIN(ebd->len, len);
317                         memcpy(bp->wp, (void*)(ebd->base + ebd->off), seglen);
318                         bp->wp += seglen;
319                         len -= seglen;
320                         ebd->len -= seglen;
321                         ebd->off += seglen;
322                         bp->extra_len -= seglen;
323                         if (ebd->len == 0) {
324                                 kfree((void *)ebd->base);
325                                 ebd->off = 0;
326                                 ebd->base = 0;
327                         }
328                 }
329                 /* maybe just call pullupblock recursively here */
330                 if (len)
331                         panic("pullup %d bytes overdrawn\n", len);
332                 checkb(bp, "after pullup");
333                 return bp;
334         }
335
336         /*
337          *  if not enough room in the first block,
338          *  add another to the front of the list.
339          */
340         if (bp->lim - bp->rp < n) {
341                 nbp = allocb(n);
342                 nbp->next = bp;
343                 bp = nbp;
344         }
345
346         /*
347          *  copy bytes from the trailing blocks into the first
348          */
349         n -= BLEN(bp);
350         while ((nbp = bp->next)) {
351                 i = BLEN(nbp);
352                 if (i > n) {
353                         memmove(bp->wp, nbp->rp, n);
354                         pullupblockcnt++;
355                         bp->wp += n;
356                         nbp->rp += n;
357                         QDEBUG checkb(bp, "pullupblock 1");
358                         return bp;
359                 } else {
360                         memmove(bp->wp, nbp->rp, i);
361                         pullupblockcnt++;
362                         bp->wp += i;
363                         bp->next = nbp->next;
364                         nbp->next = 0;
365                         freeb(nbp);
366                         n -= i;
367                         if (n == 0) {
368                                 QDEBUG checkb(bp, "pullupblock 2");
369                                 return bp;
370                         }
371                 }
372         }
373         freeb(bp);
374         return 0;
375 }
376
377 /*
378  *  make sure the first block has at least n bytes in its main body
379  */
380 struct block *pullupqueue(struct queue *q, int n)
381 {
382         struct block *b;
383
384         /* TODO: lock to protect the queue links? */
385         if ((BHLEN(q->bfirst) >= n))
386                 return q->bfirst;
387         q->bfirst = pullupblock(q->bfirst, n);
388         for (b = q->bfirst; b != NULL && b->next != NULL; b = b->next) ;
389         q->blast = b;
390         return q->bfirst;
391 }
392
393 /* throw away count bytes from the front of
394  * block's extradata.  Returns count of bytes
395  * thrown away
396  */
397
398 static int pullext(struct block *bp, int count)
399 {
400         struct extra_bdata *ed;
401         int i, rem, bytes = 0;
402
403         for (i = 0; bp->extra_len && count && i < bp->nr_extra_bufs; i++) {
404                 ed = &bp->extra_data[i];
405                 rem = MIN(count, ed->len);
406                 bp->extra_len -= rem;
407                 count -= rem;
408                 bytes += rem;
409                 ed->off += rem;
410                 ed->len -= rem;
411                 if (ed->len == 0) {
412                         kfree((void *)ed->base);
413                         ed->base = 0;
414                         ed->off = 0;
415                 }
416         }
417         return bytes;
418 }
419
420 /* throw away count bytes from the end of a
421  * block's extradata.  Returns count of bytes
422  * thrown away
423  */
424
425 static int dropext(struct block *bp, int count)
426 {
427         struct extra_bdata *ed;
428         int i, rem, bytes = 0;
429
430         for (i = bp->nr_extra_bufs - 1; bp->extra_len && count && i >= 0; i--) {
431                 ed = &bp->extra_data[i];
432                 rem = MIN(count, ed->len);
433                 bp->extra_len -= rem;
434                 count -= rem;
435                 bytes += rem;
436                 ed->len -= rem;
437                 if (ed->len == 0) {
438                         kfree((void *)ed->base);
439                         ed->base = 0;
440                         ed->off = 0;
441                 }
442         }
443         return bytes;
444 }
445
446 /*
447  *  throw away up to count bytes from a
448  *  list of blocks.  Return count of bytes
449  *  thrown away.
450  */
451 static int _pullblock(struct block **bph, int count, int free)
452 {
453         struct block *bp;
454         int n, bytes;
455
456         bytes = 0;
457         if (bph == NULL)
458                 return 0;
459
460         while (*bph != NULL && count != 0) {
461                 bp = *bph;
462
463                 n = MIN(BHLEN(bp), count);
464                 bytes += n;
465                 count -= n;
466                 bp->rp += n;
467                 n = pullext(bp, count);
468                 bytes += n;
469                 count -= n;
470                 QDEBUG checkb(bp, "pullblock ");
471                 if (BLEN(bp) == 0 && (free || count)) {
472                         *bph = bp->next;
473                         bp->next = NULL;
474                         freeb(bp);
475                 }
476         }
477         return bytes;
478 }
479
480 int pullblock(struct block **bph, int count)
481 {
482         return _pullblock(bph, count, 1);
483 }
484
485 /*
486  *  trim to len bytes starting at offset
487  */
488 struct block *trimblock(struct block *bp, int offset, int len)
489 {
490         uint32_t l, trim;
491         int olen = len;
492
493         QDEBUG checkb(bp, "trimblock 1");
494         if (blocklen(bp) < offset + len) {
495                 freeblist(bp);
496                 return NULL;
497         }
498
499         l =_pullblock(&bp, offset, 0);
500         if (bp == NULL)
501                 return NULL;
502         if (l != offset) {
503                 freeblist(bp);
504                 return NULL;
505         }
506
507         while ((l = BLEN(bp)) < len) {
508                 len -= l;
509                 bp = bp->next;
510         }
511
512         trim = BLEN(bp) - len;
513         trim -= dropext(bp, trim);
514         bp->wp -= trim;
515
516         if (bp->next) {
517                 freeblist(bp->next);
518                 bp->next = NULL;
519         }
520         return bp;
521 }
522
523 /*
524  *  copy 'count' bytes into a new block
525  */
526 struct block *copyblock(struct block *bp, int count)
527 {
528         int l;
529         struct block *nbp;
530
531         QDEBUG checkb(bp, "copyblock 0");
532         nbp = allocb(count);
533         if (bp->flag & BCKSUM_FLAGS) {
534                 nbp->flag |= (bp->flag & BCKSUM_FLAGS);
535                 nbp->checksum_start = bp->checksum_start;
536                 nbp->checksum_offset = bp->checksum_offset;
537                 nbp->mss = bp->mss;
538         }
539         PANIC_EXTRA(bp);
540         for (; count > 0 && bp != 0; bp = bp->next) {
541                 l = BLEN(bp);
542                 if (l > count)
543                         l = count;
544                 memmove(nbp->wp, bp->rp, l);
545                 nbp->wp += l;
546                 count -= l;
547         }
548         if (count > 0) {
549                 memset(nbp->wp, 0, count);
550                 nbp->wp += count;
551         }
552         copyblockcnt++;
553         QDEBUG checkb(nbp, "copyblock 1");
554
555         return nbp;
556 }
557
558 /* Adjust block @bp so that its size is exactly @len.
559  * If the size is increased, fill in the new contents with zeros.
560  * If the size is decreased, discard some of the old contents at the tail. */
561 struct block *adjustblock(struct block *bp, int len)
562 {
563         struct extra_bdata *ebd;
564         int i;
565
566         if (len < 0) {
567                 freeb(bp);
568                 return NULL;
569         }
570
571         if (len == BLEN(bp))
572                 return bp;
573
574         /* Shrink within block main body. */
575         if (len <= BHLEN(bp)) {
576                 free_block_extra(bp);
577                 bp->wp = bp->rp + len;
578                 QDEBUG checkb(bp, "adjustblock 1");
579                 return bp;
580         }
581
582         /* Need to grow. */
583         if (len > BLEN(bp)) {
584                 /* Grow within block main body. */
585                 if (bp->extra_len == 0 && bp->rp + len <= bp->lim) {
586                         memset(bp->wp, 0, len - BLEN(bp));
587                         bp->wp = bp->rp + len;
588                         QDEBUG checkb(bp, "adjustblock 2");
589                         return bp;
590                 }
591                 /* Grow with extra data buffers. */
592                 block_append_extra(bp, len - BLEN(bp), KMALLOC_WAIT);
593                 QDEBUG checkb(bp, "adjustblock 3");
594                 return bp;
595         }
596
597         /* Shrink extra data buffers.
598          * len is how much of ebd we need to keep.
599          * extra_len is re-accumulated. */
600         assert(bp->extra_len > 0);
601         len -= BHLEN(bp);
602         bp->extra_len = 0;
603         for (i = 0; i < bp->nr_extra_bufs; i++) {
604                 ebd = &bp->extra_data[i];
605                 if (len <= ebd->len)
606                         break;
607                 len -= ebd->len;
608                 bp->extra_len += ebd->len;
609         }
610         /* If len becomes zero, extra_data[i] should be freed. */
611         if (len > 0) {
612                 ebd = &bp->extra_data[i];
613                 ebd->len = len;
614                 bp->extra_len += ebd->len;
615                 i++;
616         }
617         for (; i < bp->nr_extra_bufs; i++) {
618                 ebd = &bp->extra_data[i];
619                 if (ebd->base)
620                         kfree((void*)ebd->base);
621                 ebd->base = ebd->off = ebd->len = 0;
622         }
623         QDEBUG checkb(bp, "adjustblock 4");
624         return bp;
625 }
626
627
628 /*
629  *  get next block from a queue, return null if nothing there
630  */
631 struct block *qget(struct queue *q)
632 {
633         int dowakeup;
634         struct block *b;
635
636         /* sync with qwrite */
637         spin_lock_irqsave(&q->lock);
638
639         b = q->bfirst;
640         if (b == NULL) {
641                 q->state |= Qstarve;
642                 spin_unlock_irqsave(&q->lock);
643                 return NULL;
644         }
645         q->bfirst = b->next;
646         b->next = 0;
647         q->len -= BALLOC(b);
648         q->dlen -= BLEN(b);
649         QDEBUG checkb(b, "qget");
650
651         /* if writer flow controlled, restart */
652         if ((q->state & Qflow) && q->len < q->limit / 2) {
653                 q->state &= ~Qflow;
654                 dowakeup = 1;
655         } else
656                 dowakeup = 0;
657
658         spin_unlock_irqsave(&q->lock);
659
660         if (dowakeup) {
661                 rendez_wakeup(&q->wr);
662                 /* We only send the writable event on wakeup, which is edge triggered */
663                 qwake_cb(q, FDTAP_FILT_WRITABLE);
664         }
665
666         return b;
667 }
668
669 /*
670  *  throw away the next 'len' bytes in the queue
671  * returning the number actually discarded
672  */
673 int qdiscard(struct queue *q, int len)
674 {
675         struct block *b;
676         int dowakeup, n, sofar, body_amt, extra_amt;
677         struct extra_bdata *ebd;
678
679         spin_lock_irqsave(&q->lock);
680         for (sofar = 0; sofar < len; sofar += n) {
681                 b = q->bfirst;
682                 if (b == NULL)
683                         break;
684                 QDEBUG checkb(b, "qdiscard");
685                 n = BLEN(b);
686                 if (n <= len - sofar) {
687                         q->bfirst = b->next;
688                         b->next = 0;
689                         q->len -= BALLOC(b);
690                         q->dlen -= BLEN(b);
691                         freeb(b);
692                 } else {
693                         n = len - sofar;
694                         q->dlen -= n;
695                         /* partial block removal */
696                         body_amt = MIN(BHLEN(b), n);
697                         b->rp += body_amt;
698                         extra_amt = n - body_amt;
699                         /* reduce q->len by the amount we remove from the extras.  The
700                          * header will always be accounted for above, during block removal.
701                          * */
702                         q->len -= extra_amt;
703                         for (int i = 0; (i < b->nr_extra_bufs) && extra_amt; i++) {
704                                 ebd = &b->extra_data[i];
705                                 if (!ebd->base || !ebd->len)
706                                         continue;
707                                 if (extra_amt >= ebd->len) {
708                                         /* remove the entire entry, note the kfree release */
709                                         b->extra_len -= ebd->len;
710                                         extra_amt -= ebd->len;
711                                         kfree((void*)ebd->base);
712                                         ebd->base = ebd->off = ebd->len = 0;
713                                         continue;
714                                 }
715                                 ebd->off += extra_amt;
716                                 ebd->len -= extra_amt;
717                                 b->extra_len -= extra_amt;
718                                 extra_amt = 0;
719                         }
720                 }
721         }
722
723         /*
724          *  if writer flow controlled, restart
725          *
726          *  This used to be
727          *  q->len < q->limit/2
728          *  but it slows down tcp too much for certain write sizes.
729          *  I really don't understand it completely.  It may be
730          *  due to the queue draining so fast that the transmission
731          *  stalls waiting for the app to produce more data.  - presotto
732          */
733         if ((q->state & Qflow) && q->len < q->limit) {
734                 q->state &= ~Qflow;
735                 dowakeup = 1;
736         } else
737                 dowakeup = 0;
738
739         spin_unlock_irqsave(&q->lock);
740
741         if (dowakeup) {
742                 rendez_wakeup(&q->wr);
743                 qwake_cb(q, FDTAP_FILT_WRITABLE);
744         }
745
746         return sofar;
747 }
748
749 /*
750  *  Interrupt level copy out of a queue, return # bytes copied.
751  */
752 int qconsume(struct queue *q, void *vp, int len)
753 {
754         struct block *b;
755         int n, dowakeup;
756         uint8_t *p = vp;
757         struct block *tofree = NULL;
758
759         /* sync with qwrite */
760         spin_lock_irqsave(&q->lock);
761
762         for (;;) {
763                 b = q->bfirst;
764                 if (b == 0) {
765                         q->state |= Qstarve;
766                         spin_unlock_irqsave(&q->lock);
767                         return -1;
768                 }
769                 QDEBUG checkb(b, "qconsume 1");
770
771                 n = BLEN(b);
772                 if (n > 0)
773                         break;
774                 q->bfirst = b->next;
775                 q->len -= BALLOC(b);
776
777                 /* remember to free this */
778                 b->next = tofree;
779                 tofree = b;
780         };
781
782         PANIC_EXTRA(b);
783         if (n < len)
784                 len = n;
785         memmove(p, b->rp, len);
786         consumecnt += n;
787         b->rp += len;
788         q->dlen -= len;
789
790         /* discard the block if we're done with it */
791         if ((q->state & Qmsg) || len == n) {
792                 q->bfirst = b->next;
793                 b->next = 0;
794                 q->len -= BALLOC(b);
795                 q->dlen -= BLEN(b);
796
797                 /* remember to free this */
798                 b->next = tofree;
799                 tofree = b;
800         }
801
802         /* if writer flow controlled, restart */
803         if ((q->state & Qflow) && q->len < q->limit / 2) {
804                 q->state &= ~Qflow;
805                 dowakeup = 1;
806         } else
807                 dowakeup = 0;
808
809         spin_unlock_irqsave(&q->lock);
810
811         if (dowakeup) {
812                 rendez_wakeup(&q->wr);
813                 qwake_cb(q, FDTAP_FILT_WRITABLE);
814         }
815
816         if (tofree != NULL)
817                 freeblist(tofree);
818
819         return len;
820 }
821
822 int qpass(struct queue *q, struct block *b)
823 {
824         int dlen, len, dowakeup;
825
826         /* sync with qread */
827         dowakeup = 0;
828         spin_lock_irqsave(&q->lock);
829         if (q->len >= q->limit) {
830                 freeblist(b);
831                 spin_unlock_irqsave(&q->lock);
832                 return -1;
833         }
834         if (q->state & Qclosed) {
835                 len = blocklen(b);
836                 freeblist(b);
837                 spin_unlock_irqsave(&q->lock);
838                 return len;
839         }
840
841         /* add buffer to queue */
842         if (q->bfirst)
843                 q->blast->next = b;
844         else
845                 q->bfirst = b;
846         len = BALLOC(b);
847         dlen = BLEN(b);
848         QDEBUG checkb(b, "qpass");
849         while (b->next) {
850                 b = b->next;
851                 QDEBUG checkb(b, "qpass");
852                 len += BALLOC(b);
853                 dlen += BLEN(b);
854         }
855         q->blast = b;
856         q->len += len;
857         q->dlen += dlen;
858
859         if (q->len >= q->limit / 2)
860                 q->state |= Qflow;
861
862         if (q->state & Qstarve) {
863                 q->state &= ~Qstarve;
864                 dowakeup = 1;
865         }
866         spin_unlock_irqsave(&q->lock);
867
868         if (dowakeup) {
869                 rendez_wakeup(&q->rr);
870                 qwake_cb(q, FDTAP_FILT_READABLE);
871         }
872
873         return len;
874 }
875
876 int qpassnolim(struct queue *q, struct block *b)
877 {
878         int dlen, len, dowakeup;
879
880         /* sync with qread */
881         dowakeup = 0;
882         spin_lock_irqsave(&q->lock);
883
884         if (q->state & Qclosed) {
885                 freeblist(b);
886                 spin_unlock_irqsave(&q->lock);
887                 return BALLOC(b);
888         }
889
890         /* add buffer to queue */
891         if (q->bfirst)
892                 q->blast->next = b;
893         else
894                 q->bfirst = b;
895         len = BALLOC(b);
896         dlen = BLEN(b);
897         QDEBUG checkb(b, "qpass");
898         while (b->next) {
899                 b = b->next;
900                 QDEBUG checkb(b, "qpass");
901                 len += BALLOC(b);
902                 dlen += BLEN(b);
903         }
904         q->blast = b;
905         q->len += len;
906         q->dlen += dlen;
907
908         if (q->len >= q->limit / 2)
909                 q->state |= Qflow;
910
911         if (q->state & Qstarve) {
912                 q->state &= ~Qstarve;
913                 dowakeup = 1;
914         }
915         spin_unlock_irqsave(&q->lock);
916
917         if (dowakeup) {
918                 rendez_wakeup(&q->rr);
919                 qwake_cb(q, FDTAP_FILT_READABLE);
920         }
921
922         return len;
923 }
924
925 /*
926  *  if the allocated space is way out of line with the used
927  *  space, reallocate to a smaller block
928  */
929 struct block *packblock(struct block *bp)
930 {
931         struct block **l, *nbp;
932         int n;
933
934         if (bp->extra_len)
935                 return bp;
936         for (l = &bp; *l; l = &(*l)->next) {
937                 nbp = *l;
938                 n = BLEN(nbp);
939                 if ((n << 2) < BALLOC(nbp)) {
940                         *l = allocb(n);
941                         memmove((*l)->wp, nbp->rp, n);
942                         (*l)->wp += n;
943                         (*l)->next = nbp->next;
944                         freeb(nbp);
945                 }
946         }
947
948         return bp;
949 }
950
951 int qproduce(struct queue *q, void *vp, int len)
952 {
953         struct block *b;
954         int dowakeup;
955         uint8_t *p = vp;
956
957         /* sync with qread */
958         dowakeup = 0;
959         spin_lock_irqsave(&q->lock);
960
961         /* no waiting receivers, room in buffer? */
962         if (q->len >= q->limit) {
963                 q->state |= Qflow;
964                 spin_unlock_irqsave(&q->lock);
965                 return -1;
966         }
967
968         /* save in buffer */
969         /* use Qcoalesce here to save storage */
970         // TODO: Consider removing the Qcoalesce flag and force a coalescing
971         // strategy by default.
972         b = q->blast;
973         if ((q->state & Qcoalesce) == 0 || q->bfirst == NULL
974                 || b->lim - b->wp < len) {
975                 /* need a new block */
976                 b = iallocb(len);
977                 if (b == 0) {
978                         spin_unlock_irqsave(&q->lock);
979                         return 0;
980                 }
981                 if (q->bfirst)
982                         q->blast->next = b;
983                 else
984                         q->bfirst = b;
985                 q->blast = b;
986                 /* b->next = 0; done by iallocb() */
987                 q->len += BALLOC(b);
988         }
989         PANIC_EXTRA(b);
990         memmove(b->wp, p, len);
991         producecnt += len;
992         b->wp += len;
993         q->dlen += len;
994         QDEBUG checkb(b, "qproduce");
995
996         if (q->state & Qstarve) {
997                 q->state &= ~Qstarve;
998                 dowakeup = 1;
999         }
1000
1001         if (q->len >= q->limit)
1002                 q->state |= Qflow;
1003         spin_unlock_irqsave(&q->lock);
1004
1005         if (dowakeup) {
1006                 rendez_wakeup(&q->rr);
1007                 qwake_cb(q, FDTAP_FILT_READABLE);
1008         }
1009
1010         return len;
1011 }
1012
1013 /* Add an extra_data entry to newb at newb_idx pointing to b's body, starting at
1014  * body_rp, for up to len.  Returns the len consumed. 
1015  *
1016  * The base is 'b', so that we can kfree it later.  This currently ties us to
1017  * using kfree for the release method for all extra_data.
1018  *
1019  * It is possible to have a body size that is 0, if there is no offset, and
1020  * b->wp == b->rp.  This will have an extra data entry of 0 length. */
1021 static size_t point_to_body(struct block *b, uint8_t *body_rp,
1022                             struct block *newb, unsigned int newb_idx,
1023                             size_t len)
1024 {
1025         struct extra_bdata *ebd = &newb->extra_data[newb_idx];
1026
1027         assert(newb_idx < newb->nr_extra_bufs);
1028
1029         kmalloc_incref(b);
1030         ebd->base = (uintptr_t)b;
1031         ebd->off = (uint32_t)(body_rp - (uint8_t*)b);
1032         ebd->len = MIN(b->wp - body_rp, len);   /* think of body_rp as b->rp */
1033         assert((int)ebd->len >= 0);
1034         newb->extra_len += ebd->len;
1035         return ebd->len;
1036 }
1037
1038 /* Add an extra_data entry to newb at newb_idx pointing to b's b_idx'th
1039  * extra_data buf, at b_off within that buffer, for up to len.  Returns the len
1040  * consumed.
1041  *
1042  * We can have blocks with 0 length, but they are still refcnt'd.  See above. */
1043 static size_t point_to_buf(struct block *b, unsigned int b_idx, uint32_t b_off,
1044                            struct block *newb, unsigned int newb_idx,
1045                            size_t len)
1046 {
1047         struct extra_bdata *n_ebd = &newb->extra_data[newb_idx];
1048         struct extra_bdata *b_ebd = &b->extra_data[b_idx];
1049
1050         assert(b_idx < b->nr_extra_bufs);
1051         assert(newb_idx < newb->nr_extra_bufs);
1052
1053         kmalloc_incref((void*)b_ebd->base);
1054         n_ebd->base = b_ebd->base;
1055         n_ebd->off = b_ebd->off + b_off;
1056         n_ebd->len = MIN(b_ebd->len - b_off, len);
1057         newb->extra_len += n_ebd->len;
1058         return n_ebd->len;
1059 }
1060
1061 /* given a string of blocks, fills the new block's extra_data  with the contents
1062  * of the blist [offset, len + offset)
1063  *
1064  * returns 0 on success.  the only failure is if the extra_data array was too
1065  * small, so this returns a positive integer saying how big the extra_data needs
1066  * to be.
1067  *
1068  * callers are responsible for protecting the list structure. */
1069 static int __blist_clone_to(struct block *blist, struct block *newb, int len,
1070                             uint32_t offset)
1071 {
1072         struct block *b, *first;
1073         unsigned int nr_bufs = 0;
1074         unsigned int b_idx, newb_idx = 0;
1075         uint8_t *first_main_body = 0;
1076
1077         /* find the first block; keep offset relative to the latest b in the list */
1078         for (b = blist; b; b = b->next) {
1079                 if (BLEN(b) > offset)
1080                         break;
1081                 offset -= BLEN(b);
1082         }
1083         /* qcopy semantics: if you asked for an offset outside the block list, you
1084          * get an empty block back */
1085         if (!b)
1086                 return 0;
1087         first = b;
1088         /* upper bound for how many buffers we'll need in newb */
1089         for (/* b is set*/; b; b = b->next) {
1090                 nr_bufs += 1 + b->nr_extra_bufs;        /* 1 for the main body */
1091         }
1092         /* we might be holding a spinlock here, so we won't wait for kmalloc */
1093         if (block_add_extd(newb, nr_bufs, 0) != 0) {
1094                 /* caller will need to alloc these, then re-call us */
1095                 return nr_bufs;
1096         }
1097         for (b = first; b && len; b = b->next) {
1098                 b_idx = 0;
1099                 if (offset) {
1100                         if (offset < BHLEN(b)) {
1101                                 /* off is in the main body */
1102                                 len -= point_to_body(b, b->rp + offset, newb, newb_idx, len);
1103                                 newb_idx++;
1104                         } else {
1105                                 /* off is in one of the buffers (or just past the last one).
1106                                  * we're not going to point to b's main body at all. */
1107                                 offset -= BHLEN(b);
1108                                 assert(b->extra_data);
1109                                 /* assuming these extrabufs are packed, or at least that len
1110                                  * isn't gibberish */
1111                                 while (b->extra_data[b_idx].len <= offset) {
1112                                         offset -= b->extra_data[b_idx].len;
1113                                         b_idx++;
1114                                 }
1115                                 /* now offset is set to our offset in the b_idx'th buf */
1116                                 len -= point_to_buf(b, b_idx, offset, newb, newb_idx, len);
1117                                 newb_idx++;
1118                                 b_idx++;
1119                         }
1120                         offset = 0;
1121                 } else {
1122                         len -= point_to_body(b, b->rp, newb, newb_idx, len);
1123                         newb_idx++;
1124                 }
1125                 /* knock out all remaining bufs.  we only did one point_to_ op by now,
1126                  * and any point_to_ could be our last if it consumed all of len. */
1127                 for (int i = b_idx; (i < b->nr_extra_bufs) && len; i++) {
1128                         len -= point_to_buf(b, i, 0, newb, newb_idx, len);
1129                         newb_idx++;
1130                 }
1131         }
1132         return 0;
1133 }
1134
1135 struct block *blist_clone(struct block *blist, int header_len, int len,
1136                           uint32_t offset)
1137 {
1138         int ret;
1139         struct block *newb = allocb(header_len);
1140         do {
1141                 ret = __blist_clone_to(blist, newb, len, offset);
1142                 if (ret)
1143                         block_add_extd(newb, ret, KMALLOC_WAIT);
1144         } while (ret);
1145         return newb;
1146 }
1147
1148 /* given a queue, makes a single block with header_len reserved space in the
1149  * block main body, and the contents of [offset, len + offset) pointed to in the
1150  * new blocks ext_data. */
1151 struct block *qclone(struct queue *q, int header_len, int len, uint32_t offset)
1152 {
1153         int ret;
1154         struct block *newb = allocb(header_len);
1155         /* the while loop should rarely be used: it would require someone
1156          * concurrently adding to the queue. */
1157         do {
1158                 /* TODO: RCU: protecting the q list (b->next) (need read lock) */
1159                 spin_lock_irqsave(&q->lock);
1160                 ret = __blist_clone_to(q->bfirst, newb, len, offset);
1161                 spin_unlock_irqsave(&q->lock);
1162                 if (ret)
1163                         block_add_extd(newb, ret, KMALLOC_WAIT);
1164         } while (ret);
1165         return newb;
1166 }
1167
1168 /*
1169  *  copy from offset in the queue
1170  */
1171 struct block *qcopy_old(struct queue *q, int len, uint32_t offset)
1172 {
1173         int sofar;
1174         int n;
1175         struct block *b, *nb;
1176         uint8_t *p;
1177
1178         nb = allocb(len);
1179
1180         spin_lock_irqsave(&q->lock);
1181
1182         /* go to offset */
1183         b = q->bfirst;
1184         for (sofar = 0;; sofar += n) {
1185                 if (b == NULL) {
1186                         spin_unlock_irqsave(&q->lock);
1187                         return nb;
1188                 }
1189                 n = BLEN(b);
1190                 if (sofar + n > offset) {
1191                         p = b->rp + offset - sofar;
1192                         n -= offset - sofar;
1193                         break;
1194                 }
1195                 QDEBUG checkb(b, "qcopy");
1196                 b = b->next;
1197         }
1198
1199         /* copy bytes from there */
1200         for (sofar = 0; sofar < len;) {
1201                 if (n > len - sofar)
1202                         n = len - sofar;
1203                 PANIC_EXTRA(b);
1204                 memmove(nb->wp, p, n);
1205                 qcopycnt += n;
1206                 sofar += n;
1207                 nb->wp += n;
1208                 b = b->next;
1209                 if (b == NULL)
1210                         break;
1211                 n = BLEN(b);
1212                 p = b->rp;
1213         }
1214         spin_unlock_irqsave(&q->lock);
1215
1216         return nb;
1217 }
1218
1219 struct block *qcopy(struct queue *q, int len, uint32_t offset)
1220 {
1221 #ifdef CONFIG_BLOCK_EXTRAS
1222         return qclone(q, 0, len, offset);
1223 #else
1224         return qcopy_old(q, len, offset);
1225 #endif
1226 }
1227
1228 static void qinit_common(struct queue *q)
1229 {
1230         spinlock_init_irqsave(&q->lock);
1231         qlock_init(&q->rlock);
1232         qlock_init(&q->wlock);
1233         rendez_init(&q->rr);
1234         rendez_init(&q->wr);
1235 }
1236
1237 /*
1238  *  called by non-interrupt code
1239  */
1240 struct queue *qopen(int limit, int msg, void (*kick) (void *), void *arg)
1241 {
1242         struct queue *q;
1243
1244         q = kzmalloc(sizeof(struct queue), 0);
1245         if (q == 0)
1246                 return 0;
1247         qinit_common(q);
1248
1249         q->limit = q->inilim = limit;
1250         q->kick = kick;
1251         q->arg = arg;
1252         q->state = msg;
1253         q->state |= Qstarve;
1254         q->eof = 0;
1255
1256         return q;
1257 }
1258
1259 /* open a queue to be bypassed */
1260 struct queue *qbypass(void (*bypass) (void *, struct block *), void *arg)
1261 {
1262         struct queue *q;
1263
1264         q = kzmalloc(sizeof(struct queue), 0);
1265         if (q == 0)
1266                 return 0;
1267         qinit_common(q);
1268
1269         q->limit = 0;
1270         q->arg = arg;
1271         q->bypass = bypass;
1272         q->state = 0;
1273
1274         return q;
1275 }
1276
1277 static int notempty(void *a)
1278 {
1279         struct queue *q = a;
1280
1281         return (q->state & Qclosed) || q->bfirst != 0;
1282 }
1283
1284 /* Wait for the queue to be non-empty or closed.  Returns TRUE for a successful
1285  * wait, FALSE on Qclose (without error)
1286  *
1287  * Called with q ilocked.  May error out, back through the caller, with
1288  * the irqsave lock unlocked.  */
1289 static bool qwait(struct queue *q)
1290 {
1291         /* wait for data */
1292         for (;;) {
1293                 if (q->bfirst != NULL)
1294                         break;
1295
1296                 if (q->state & Qclosed) {
1297                         if (++q->eof > 3) {
1298                                 spin_unlock_irqsave(&q->lock);
1299                                 error(EFAIL, "multiple reads on a closed queue");
1300                         }
1301                         if (*q->err && strcmp(q->err, errno_to_string(ECONNABORTED)) != 0) {
1302                                 spin_unlock_irqsave(&q->lock);
1303                                 error(EFAIL, q->err);
1304                         }
1305                         return FALSE;
1306                 }
1307                 /* We set Qstarve regardless of whether we are non-blocking or not.
1308                  * Qstarve tracks the edge detection of the queue being empty. */
1309                 q->state |= Qstarve;
1310                 if (q->state & Qnonblock) {
1311                         spin_unlock_irqsave(&q->lock);
1312                         error(EAGAIN, "queue empty");
1313                 }
1314                 spin_unlock_irqsave(&q->lock);
1315                 /* may throw an error() */
1316                 rendez_sleep(&q->rr, notempty, q);
1317                 spin_lock_irqsave(&q->lock);
1318         }
1319         return TRUE;
1320 }
1321
1322 /*
1323  * add a block list to a queue
1324  */
1325 void qaddlist(struct queue *q, struct block *b)
1326 {
1327         /* TODO: q lock? */
1328         /* queue the block */
1329         if (q->bfirst)
1330                 q->blast->next = b;
1331         else
1332                 q->bfirst = b;
1333         q->len += blockalloclen(b);
1334         q->dlen += blocklen(b);
1335         while (b->next)
1336                 b = b->next;
1337         q->blast = b;
1338 }
1339
1340 /*
1341  *  called with q ilocked
1342  */
1343 struct block *qremove(struct queue *q)
1344 {
1345         struct block *b;
1346
1347         b = q->bfirst;
1348         if (b == NULL)
1349                 return NULL;
1350         q->bfirst = b->next;
1351         b->next = NULL;
1352         q->dlen -= BLEN(b);
1353         q->len -= BALLOC(b);
1354         QDEBUG checkb(b, "qremove");
1355         return b;
1356 }
1357
1358 static size_t read_from_block(struct block *b, uint8_t *to, size_t amt)
1359 {
1360         size_t copy_amt, retval = 0;
1361         struct extra_bdata *ebd;
1362         
1363         copy_amt = MIN(BHLEN(b), amt);
1364         memcpy(to, b->rp, copy_amt);
1365         /* advance the rp, since this block not be completely consumed and future
1366          * reads need to know where to pick up from */
1367         b->rp += copy_amt;
1368         to += copy_amt;
1369         amt -= copy_amt;
1370         retval += copy_amt;
1371         for (int i = 0; (i < b->nr_extra_bufs) && amt; i++) {
1372                 ebd = &b->extra_data[i];
1373                 /* skip empty entires.  if we track this in the struct block, we can
1374                  * just start the for loop early */
1375                 if (!ebd->base || !ebd->len)
1376                         continue;
1377                 copy_amt = MIN(ebd->len, amt);
1378                 memcpy(to, (void*)(ebd->base + ebd->off), copy_amt);
1379                 /* we're actually consuming the entries, just like how we advance rp up
1380                  * above, and might only consume part of one. */
1381                 ebd->len -= copy_amt;
1382                 ebd->off += copy_amt;
1383                 b->extra_len -= copy_amt;
1384                 if (!ebd->len) {
1385                         /* we don't actually have to decref here.  it's also done in
1386                          * freeb().  this is the earliest we can free. */
1387                         kfree((void*)ebd->base);
1388                         ebd->base = ebd->off = 0;
1389                 }
1390                 to += copy_amt;
1391                 amt -= copy_amt;
1392                 retval += copy_amt;
1393         }
1394         return retval;
1395 }
1396
1397 /*
1398  *  copy the contents of a string of blocks into
1399  *  memory.  emptied blocks are freed.  return
1400  *  pointer to first unconsumed block.
1401  */
1402 struct block *bl2mem(uint8_t * p, struct block *b, int n)
1403 {
1404         int i;
1405         struct block *next;
1406
1407         /* could be slicker here, since read_from_block is smart */
1408         for (; b != NULL; b = next) {
1409                 i = BLEN(b);
1410                 if (i > n) {
1411                         /* partial block, consume some */
1412                         read_from_block(b, p, n);
1413                         return b;
1414                 }
1415                 /* full block, consume all and move on */
1416                 i = read_from_block(b, p, i);
1417                 n -= i;
1418                 p += i;
1419                 next = b->next;
1420                 freeb(b);
1421         }
1422         return NULL;
1423 }
1424
1425 /*
1426  *  copy the contents of memory into a string of blocks.
1427  *  return NULL on error.
1428  */
1429 struct block *mem2bl(uint8_t * p, int len)
1430 {
1431         ERRSTACK(1);
1432         int n;
1433         struct block *b, *first, **l;
1434
1435         first = NULL;
1436         l = &first;
1437         if (waserror()) {
1438                 freeblist(first);
1439                 nexterror();
1440         }
1441         do {
1442                 n = len;
1443                 if (n > Maxatomic)
1444                         n = Maxatomic;
1445
1446                 *l = b = allocb(n);
1447                 /* TODO consider extra_data */
1448                 memmove(b->wp, p, n);
1449                 b->wp += n;
1450                 p += n;
1451                 len -= n;
1452                 l = &b->next;
1453         } while (len > 0);
1454         poperror();
1455
1456         return first;
1457 }
1458
1459 /*
1460  *  put a block back to the front of the queue
1461  *  called with q ilocked
1462  */
1463 void qputback(struct queue *q, struct block *b)
1464 {
1465         b->next = q->bfirst;
1466         if (q->bfirst == NULL)
1467                 q->blast = b;
1468         q->bfirst = b;
1469         q->len += BALLOC(b);
1470         q->dlen += BLEN(b);
1471 }
1472
1473 /*
1474  *  flow control, get producer going again
1475  *  called with q ilocked
1476  */
1477 static void qwakeup_iunlock(struct queue *q)
1478 {
1479         int dowakeup = 0;
1480
1481         /* if writer flow controlled, restart */
1482         if ((q->state & Qflow) && q->len < q->limit / 2) {
1483                 q->state &= ~Qflow;
1484                 dowakeup = 1;
1485         }
1486
1487         spin_unlock_irqsave(&q->lock);
1488
1489         /* wakeup flow controlled writers */
1490         if (dowakeup) {
1491                 if (q->kick)
1492                         q->kick(q->arg);
1493                 rendez_wakeup(&q->wr);
1494                 qwake_cb(q, FDTAP_FILT_WRITABLE);
1495         }
1496 }
1497
1498 /*
1499  *  get next block from a queue (up to a limit)
1500  */
1501 struct block *qbread(struct queue *q, int len)
1502 {
1503         ERRSTACK(1);
1504         struct block *b, *nb;
1505         int n;
1506
1507         qlock(&q->rlock);
1508         if (waserror()) {
1509                 qunlock(&q->rlock);
1510                 nexterror();
1511         }
1512
1513         spin_lock_irqsave(&q->lock);
1514         if (!qwait(q)) {
1515                 /* queue closed */
1516                 spin_unlock_irqsave(&q->lock);
1517                 qunlock(&q->rlock);
1518                 poperror();
1519                 return NULL;
1520         }
1521
1522         /* if we get here, there's at least one block in the queue */
1523         b = qremove(q);
1524         n = BLEN(b);
1525
1526         /* split block if it's too big and this is not a message queue */
1527         nb = b;
1528         if (n > len) {
1529                 PANIC_EXTRA(b);
1530                 if ((q->state & Qmsg) == 0) {
1531                         n -= len;
1532                         b = allocb(n);
1533                         memmove(b->wp, nb->rp + len, n);
1534                         b->wp += n;
1535                         qputback(q, b);
1536                 }
1537                 nb->wp = nb->rp + len;
1538         }
1539
1540         /* restart producer */
1541         qwakeup_iunlock(q);
1542
1543         poperror();
1544         qunlock(&q->rlock);
1545         return nb;
1546 }
1547
1548 /*
1549  *  read a queue.  if no data is queued, post a struct block
1550  *  and wait on its Rendez.
1551  */
1552 long qread(struct queue *q, void *vp, int len)
1553 {
1554         ERRSTACK(1);
1555         struct block *b, *first, **l;
1556         int m, n;
1557
1558         qlock(&q->rlock);
1559         if (waserror()) {
1560                 qunlock(&q->rlock);
1561                 nexterror();
1562         }
1563
1564         spin_lock_irqsave(&q->lock);
1565 again:
1566         if (!qwait(q)) {
1567                 /* queue closed */
1568                 spin_unlock_irqsave(&q->lock);
1569                 qunlock(&q->rlock);
1570                 poperror();
1571                 return 0;
1572         }
1573
1574         /* if we get here, there's at least one block in the queue */
1575         // TODO: Consider removing the Qcoalesce flag and force a coalescing
1576         // strategy by default.
1577         if (q->state & Qcoalesce) {
1578                 /* when coalescing, 0 length blocks just go away */
1579                 b = q->bfirst;
1580                 if (BLEN(b) <= 0) {
1581                         freeb(qremove(q));
1582                         goto again;
1583                 }
1584
1585                 /*  grab the first block plus as many
1586                  *  following blocks as will completely
1587                  *  fit in the read.
1588                  */
1589                 n = 0;
1590                 l = &first;
1591                 m = BLEN(b);
1592                 for (;;) {
1593                         *l = qremove(q);
1594                         l = &b->next;
1595                         n += m;
1596
1597                         b = q->bfirst;
1598                         if (b == NULL)
1599                                 break;
1600                         m = BLEN(b);
1601                         if (n + m > len)
1602                                 break;
1603                 }
1604         } else {
1605                 first = qremove(q);
1606                 n = BLEN(first);
1607         }
1608
1609         /* copy to user space outside of the ilock */
1610         spin_unlock_irqsave(&q->lock);
1611         b = bl2mem(vp, first, len);
1612         spin_lock_irqsave(&q->lock);
1613
1614         /* take care of any left over partial block */
1615         if (b != NULL) {
1616                 n -= BLEN(b);
1617                 if (q->state & Qmsg)
1618                         freeb(b);
1619                 else
1620                         qputback(q, b);
1621         }
1622
1623         /* restart producer */
1624         qwakeup_iunlock(q);
1625
1626         poperror();
1627         qunlock(&q->rlock);
1628         return n;
1629 }
1630
1631 static int qnotfull(void *a)
1632 {
1633         struct queue *q = a;
1634
1635         return q->len < q->limit || (q->state & Qclosed);
1636 }
1637
1638 uint32_t dropcnt;
1639
1640 /*
1641  *  add a block to a queue obeying flow control
1642  */
1643 long qbwrite(struct queue *q, struct block *b)
1644 {
1645         ERRSTACK(1);
1646         int n, dowakeup;
1647         volatile bool should_free_b = TRUE;
1648
1649         n = BLEN(b);
1650
1651         if (q->bypass) {
1652                 (*q->bypass) (q->arg, b);
1653                 return n;
1654         }
1655
1656         dowakeup = 0;
1657         qlock(&q->wlock);
1658         if (waserror()) {
1659                 if (b != NULL && should_free_b)
1660                         freeb(b);
1661                 qunlock(&q->wlock);
1662                 nexterror();
1663         }
1664
1665         spin_lock_irqsave(&q->lock);
1666
1667         /* give up if the queue is closed */
1668         if (q->state & Qclosed) {
1669                 spin_unlock_irqsave(&q->lock);
1670                 error(EFAIL, q->err);
1671         }
1672
1673         /* if nonblocking, don't queue over the limit */
1674         if (q->len >= q->limit) {
1675                 /* drop overflow takes priority over regular non-blocking */
1676                 if (q->state & Qdropoverflow) {
1677                         spin_unlock_irqsave(&q->lock);
1678                         freeb(b);
1679                         dropcnt += n;
1680                         qunlock(&q->wlock);
1681                         poperror();
1682                         return n;
1683                 }
1684                 if (q->state & Qnonblock) {
1685                         spin_unlock_irqsave(&q->lock);
1686                         freeb(b);
1687                         error(EAGAIN, "queue full");
1688                 }
1689         }
1690
1691         /* queue the block */
1692         should_free_b = FALSE;
1693         if (q->bfirst)
1694                 q->blast->next = b;
1695         else
1696                 q->bfirst = b;
1697         q->blast = b;
1698         b->next = 0;
1699         q->len += BALLOC(b);
1700         q->dlen += n;
1701         QDEBUG checkb(b, "qbwrite");
1702         b = NULL;
1703
1704         /* make sure other end gets awakened */
1705         if (q->state & Qstarve) {
1706                 q->state &= ~Qstarve;
1707                 dowakeup = 1;
1708         }
1709         spin_unlock_irqsave(&q->lock);
1710
1711         /*  get output going again */
1712         if (q->kick && (dowakeup || (q->state & Qkick)))
1713                 q->kick(q->arg);
1714
1715         /* wakeup anyone consuming at the other end */
1716         if (dowakeup) {
1717                 rendez_wakeup(&q->rr);
1718                 qwake_cb(q, FDTAP_FILT_READABLE);
1719         }
1720
1721         /*
1722          *  flow control, wait for queue to get below the limit
1723          *  before allowing the process to continue and queue
1724          *  more.  We do this here so that postnote can only
1725          *  interrupt us after the data has been queued.  This
1726          *  means that things like 9p flushes and ssl messages
1727          *  will not be disrupted by software interrupts.
1728          *
1729          *  Note - this is moderately dangerous since a process
1730          *  that keeps getting interrupted and rewriting will
1731          *  queue infinite crud.
1732          */
1733         for (;;) {
1734                 if ((q->state & (Qdropoverflow | Qnonblock)) || qnotfull(q))
1735                         break;
1736
1737                 spin_lock_irqsave(&q->lock);
1738                 q->state |= Qflow;
1739                 spin_unlock_irqsave(&q->lock);
1740                 rendez_sleep(&q->wr, qnotfull, q);
1741         }
1742
1743         qunlock(&q->wlock);
1744         poperror();
1745         return n;
1746 }
1747
1748 long qibwrite(struct queue *q, struct block *b)
1749 {
1750         int n, dowakeup;
1751
1752         dowakeup = 0;
1753
1754         n = BLEN(b);
1755
1756         spin_lock_irqsave(&q->lock);
1757
1758         QDEBUG checkb(b, "qibwrite");
1759         if (q->bfirst)
1760                 q->blast->next = b;
1761         else
1762                 q->bfirst = b;
1763         q->blast = b;
1764         q->len += BALLOC(b);
1765         q->dlen += n;
1766
1767         if (q->state & Qstarve) {
1768                 q->state &= ~Qstarve;
1769                 dowakeup = 1;
1770         }
1771
1772         spin_unlock_irqsave(&q->lock);
1773
1774         if (dowakeup) {
1775                 if (q->kick)
1776                         q->kick(q->arg);
1777                 rendez_wakeup(&q->rr);
1778                 qwake_cb(q, FDTAP_FILT_READABLE);
1779         }
1780
1781         return n;
1782 }
1783
1784 /*
1785  *  write to a queue.  only Maxatomic bytes at a time is atomic.
1786  */
1787 int qwrite(struct queue *q, void *vp, int len)
1788 {
1789         int n, sofar;
1790         struct block *b;
1791         uint8_t *p = vp;
1792         void *ext_buf;
1793
1794         QDEBUG if (!islo())
1795                  printd("qwrite hi %p\n", getcallerpc(&q));
1796
1797         sofar = 0;
1798         do {
1799                 n = len - sofar;
1800                 /* This is 64K, the max amount per single block.  Still a good value? */
1801                 if (n > Maxatomic)
1802                         n = Maxatomic;
1803
1804                 /* If n is small, we don't need to bother with the extra_data.  But
1805                  * until the whole stack can handle extd blocks, we'll use them
1806                  * unconditionally. */
1807 #ifdef CONFIG_BLOCK_EXTRAS
1808                 /* allocb builds in 128 bytes of header space to all blocks, but this is
1809                  * only available via padblock (to the left).  we also need some space
1810                  * for pullupblock for some basic headers (like icmp) that get written
1811                  * in directly */
1812                 b = allocb(64);
1813                 ext_buf = kmalloc(n, 0);
1814                 memcpy(ext_buf, p + sofar, n);
1815                 block_add_extd(b, 1, KMALLOC_WAIT); /* returns 0 on success */
1816                 b->extra_data[0].base = (uintptr_t)ext_buf;
1817                 b->extra_data[0].off = 0;
1818                 b->extra_data[0].len = n;
1819                 b->extra_len += n;
1820 #else
1821                 b = allocb(n);
1822                 memmove(b->wp, p + sofar, n);
1823                 b->wp += n;
1824 #endif
1825                         
1826                 qbwrite(q, b);
1827
1828                 sofar += n;
1829         } while (sofar < len && (q->state & Qmsg) == 0);
1830
1831         return len;
1832 }
1833
1834 /*
1835  *  used by print() to write to a queue.  Since we may be splhi or not in
1836  *  a process, don't qlock.
1837  */
1838 int qiwrite(struct queue *q, void *vp, int len)
1839 {
1840         int n, sofar, dowakeup;
1841         struct block *b;
1842         uint8_t *p = vp;
1843
1844         dowakeup = 0;
1845
1846         sofar = 0;
1847         do {
1848                 n = len - sofar;
1849                 if (n > Maxatomic)
1850                         n = Maxatomic;
1851
1852                 b = iallocb(n);
1853                 if (b == NULL)
1854                         break;
1855                 /* TODO consider extra_data */
1856                 memmove(b->wp, p + sofar, n);
1857                 /* this adjusts BLEN to be n, or at least it should */
1858                 b->wp += n;
1859                 assert(n == BLEN(b));
1860                 qibwrite(q, b);
1861
1862                 sofar += n;
1863         } while (sofar < len && (q->state & Qmsg) == 0);
1864
1865         return sofar;
1866 }
1867
1868 /*
1869  *  be extremely careful when calling this,
1870  *  as there is no reference accounting
1871  */
1872 void qfree(struct queue *q)
1873 {
1874         qclose(q);
1875         kfree(q);
1876 }
1877
1878 /*
1879  *  Mark a queue as closed.  No further IO is permitted.
1880  *  All blocks are released.
1881  */
1882 void qclose(struct queue *q)
1883 {
1884         struct block *bfirst;
1885
1886         if (q == NULL)
1887                 return;
1888
1889         /* mark it */
1890         spin_lock_irqsave(&q->lock);
1891         q->state |= Qclosed;
1892         q->state &= ~(Qflow | Qstarve | Qdropoverflow | Qnonblock);
1893         strncpy(q->err, errno_to_string(ECONNABORTED), sizeof(q->err));
1894         bfirst = q->bfirst;
1895         q->bfirst = 0;
1896         q->len = 0;
1897         q->dlen = 0;
1898         spin_unlock_irqsave(&q->lock);
1899
1900         /* free queued blocks */
1901         freeblist(bfirst);
1902
1903         /* wake up readers/writers */
1904         rendez_wakeup(&q->rr);
1905         rendez_wakeup(&q->wr);
1906         qwake_cb(q, FDTAP_FILT_HANGUP);
1907 }
1908
1909 /*
1910  *  Mark a queue as closed.  Wakeup any readers.  Don't remove queued
1911  *  blocks.
1912  */
1913 void qhangup(struct queue *q, char *msg)
1914 {
1915         /* mark it */
1916         spin_lock_irqsave(&q->lock);
1917         q->state |= Qclosed;
1918         if (msg == 0 || *msg == 0)
1919                 strncpy(q->err, errno_to_string(ECONNABORTED), sizeof(q->err));
1920         else
1921                 strncpy(q->err, msg, ERRMAX - 1);
1922         spin_unlock_irqsave(&q->lock);
1923
1924         /* wake up readers/writers */
1925         rendez_wakeup(&q->rr);
1926         rendez_wakeup(&q->wr);
1927         qwake_cb(q, FDTAP_FILT_HANGUP);
1928 }
1929
1930 /*
1931  *  return non-zero if the q is hungup
1932  */
1933 int qisclosed(struct queue *q)
1934 {
1935         return q->state & Qclosed;
1936 }
1937
1938 /*
1939  *  mark a queue as no longer hung up.  resets the wake_cb.
1940  */
1941 void qreopen(struct queue *q)
1942 {
1943         spin_lock_irqsave(&q->lock);
1944         q->state &= ~Qclosed;
1945         q->state |= Qstarve;
1946         q->eof = 0;
1947         q->limit = q->inilim;
1948         q->wake_cb = 0;
1949         q->wake_data = 0;
1950         spin_unlock_irqsave(&q->lock);
1951 }
1952
1953 /*
1954  *  return bytes queued
1955  */
1956 int qlen(struct queue *q)
1957 {
1958         return q->dlen;
1959 }
1960
1961 /*
1962  * return space remaining before flow control
1963  */
1964 int qwindow(struct queue *q)
1965 {
1966         int l;
1967
1968         l = q->limit - q->len;
1969         if (l < 0)
1970                 l = 0;
1971         return l;
1972 }
1973
1974 /*
1975  *  return true if we can read without blocking
1976  */
1977 int qcanread(struct queue *q)
1978 {
1979         return q->bfirst != 0;
1980 }
1981
1982 /*
1983  *  change queue limit
1984  */
1985 void qsetlimit(struct queue *q, int limit)
1986 {
1987         q->limit = limit;
1988 }
1989
1990 /*
1991  *  set whether writes drop overflowing blocks, or if we sleep
1992  */
1993 void qdropoverflow(struct queue *q, bool onoff)
1994 {
1995         if (onoff)
1996                 q->state |= Qdropoverflow;
1997         else
1998                 q->state &= ~Qdropoverflow;
1999 }
2000
2001 /* set whether or not the queue is nonblocking, in the EAGAIN sense. */
2002 void qnonblock(struct queue *q, bool onoff)
2003 {
2004         if (onoff)
2005                 q->state |= Qnonblock;
2006         else
2007                 q->state &= ~Qnonblock;
2008 }
2009
2010 /*
2011  *  flush the output queue
2012  */
2013 void qflush(struct queue *q)
2014 {
2015         struct block *bfirst;
2016
2017         /* mark it */
2018         spin_lock_irqsave(&q->lock);
2019         bfirst = q->bfirst;
2020         q->bfirst = 0;
2021         q->len = 0;
2022         q->dlen = 0;
2023         spin_unlock_irqsave(&q->lock);
2024
2025         /* free queued blocks */
2026         freeblist(bfirst);
2027
2028         /* wake up writers */
2029         rendez_wakeup(&q->wr);
2030         qwake_cb(q, FDTAP_FILT_WRITABLE);
2031 }
2032
2033 int qfull(struct queue *q)
2034 {
2035         return q->state & Qflow;
2036 }
2037
2038 int qstate(struct queue *q)
2039 {
2040         return q->state;
2041 }
2042
2043 void qdump(struct queue *q)
2044 {
2045         if (q)
2046                 printk("q=%p bfirst=%p blast=%p len=%d dlen=%d limit=%d state=#%x\n",
2047                            q, q->bfirst, q->blast, q->len, q->dlen, q->limit, q->state);
2048 }
2049
2050 /* On certain wakeup events, qio will call func(q, data, filter), where filter
2051  * marks the type of wakeup event (flags from FDTAP).
2052  *
2053  * There's no sync protection.  If you change the CB while the qio is running,
2054  * you might get a CB with the data or func from a previous set_wake_cb.  You
2055  * should set this once per queue and forget it.
2056  *
2057  * You can remove the CB by passing in 0 for the func.  Alternatively, you can
2058  * just make sure that the func(data) pair are valid until the queue is freed or
2059  * reopened. */
2060 void qio_set_wake_cb(struct queue *q, qio_wake_cb_t func, void *data)
2061 {
2062         q->wake_data = data;
2063         wmb();  /* if we see func, we'll also see the data for it */
2064         q->wake_cb = func;
2065 }