fae6b9a9e8eff84bcfe34cca8a06afaf3ebb6515
[akaros.git] / kern / src / ns / qio.c
1 /* Copyright © 1994-1999 Lucent Technologies Inc.  All rights reserved.
2  * Portions Copyright © 1997-1999 Vita Nuova Limited
3  * Portions Copyright © 2000-2007 Vita Nuova Holdings Limited
4  *                                (www.vitanuova.com)
5  * Revisions Copyright © 2000-2007 Lucent Technologies Inc. and others
6  *
7  * Modified for the Akaros operating system:
8  * Copyright (c) 2013-2014 The Regents of the University of California
9  * Copyright (c) 2013-2015 Google Inc.
10  *
11  * Permission is hereby granted, free of charge, to any person obtaining a copy
12  * of this software and associated documentation files (the "Software"), to deal
13  * in the Software without restriction, including without limitation the rights
14  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15  * copies of the Software, and to permit persons to whom the Software is
16  * furnished to do so, subject to the following conditions:
17  *
18  * The above copyright notice and this permission notice shall be included in
19  * all copies or substantial portions of the Software.
20  *
21  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL THE
24  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
27  * SOFTWARE. */
28
29 #include <vfs.h>
30 #include <kfs.h>
31 #include <slab.h>
32 #include <kmalloc.h>
33 #include <kref.h>
34 #include <string.h>
35 #include <stdio.h>
36 #include <assert.h>
37 #include <error.h>
38 #include <cpio.h>
39 #include <pmap.h>
40 #include <smp.h>
41 #include <ip.h>
42
43 #define PANIC_EXTRA(b)                                                  \
44 {                                                                       \
45         if ((b)->extra_len) {                                           \
46                 printblock(b);                                          \
47                 backtrace();                                            \
48                 panic("%s doesn't handle extra_data", __FUNCTION__);    \
49         }                                                               \
50 }
51
52 static uint32_t padblockcnt;
53 static uint32_t concatblockcnt;
54 static uint32_t pullupblockcnt;
55 static uint32_t copyblockcnt;
56 static uint32_t consumecnt;
57 static uint32_t producecnt;
58 static uint32_t qcopycnt;
59
60 static int debugging;
61
62 #define QDEBUG  if(0)
63
64 /*
65  *  IO queues
66  */
67
68 struct queue {
69         spinlock_t lock;;
70
71         struct block *bfirst;           /* buffer */
72         struct block *blast;
73
74         int len;                                        /* bytes allocated to queue */
75         int dlen;                                       /* data bytes in queue */
76         int limit;                                      /* max bytes in queue */
77         int inilim;                             /* initial limit */
78         int state;
79         int eof;                                        /* number of eofs read by user */
80
81         void (*kick) (void *);          /* restart output */
82         void (*bypass) (void *, struct block *);        /* bypass queue altogether */
83         void *arg;                                      /* argument to kick */
84
85         qlock_t rlock;                          /* mutex for reading processes */
86         struct rendez rr;                       /* process waiting to read */
87         qlock_t wlock;                          /* mutex for writing processes */
88         struct rendez wr;                       /* process waiting to write */
89         qio_wake_cb_t wake_cb;          /* callbacks for qio wakeups */
90         void *wake_data;
91
92         char err[ERRMAX];
93 };
94
95 enum {
96         Maxatomic = 64 * 1024,
97 };
98
99 unsigned int qiomaxatomic = Maxatomic;
100
101 /* Helper: fires a wake callback, sending 'filter' */
102 static void qwake_cb(struct queue *q, int filter)
103 {
104         if (q->wake_cb)
105                 q->wake_cb(q, q->wake_data, filter);
106 }
107
108 void ixsummary(void)
109 {
110         debugging ^= 1;
111         iallocsummary();
112         printd("pad %lu, concat %lu, pullup %lu, copy %lu\n",
113                    padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
114         printd("consume %lu, produce %lu, qcopy %lu\n",
115                    consumecnt, producecnt, qcopycnt);
116 }
117
118 /*
119  *  free a list of blocks
120  */
121 void freeblist(struct block *b)
122 {
123         struct block *next;
124
125         for (; b != 0; b = next) {
126                 next = b->next;
127                 b->next = 0;
128                 freeb(b);
129         }
130 }
131
132 /*
133  *  pad a block to the front (or the back if size is negative)
134  */
135 struct block *padblock(struct block *bp, int size)
136 {
137         int n;
138         struct block *nbp;
139         uint8_t bcksum = bp->flag & BCKSUM_FLAGS;
140         uint16_t checksum_start = bp->checksum_start;
141         uint16_t checksum_offset = bp->checksum_offset;
142         uint16_t mss = bp->mss;
143
144         QDEBUG checkb(bp, "padblock 1");
145         if (size >= 0) {
146                 if (bp->rp - bp->base >= size) {
147                         bp->checksum_start += size;
148                         bp->rp -= size;
149                         return bp;
150                 }
151
152                 PANIC_EXTRA(bp);
153                 if (bp->next)
154                         panic("padblock %p", getcallerpc(&bp));
155                 n = BLEN(bp);
156                 padblockcnt++;
157                 nbp = allocb(size + n);
158                 nbp->rp += size;
159                 nbp->wp = nbp->rp;
160                 memmove(nbp->wp, bp->rp, n);
161                 nbp->wp += n;
162                 freeb(bp);
163                 nbp->rp -= size;
164         } else {
165                 size = -size;
166
167                 PANIC_EXTRA(bp);
168
169                 if (bp->next)
170                         panic("padblock %p", getcallerpc(&bp));
171
172                 if (bp->lim - bp->wp >= size)
173                         return bp;
174
175                 n = BLEN(bp);
176                 padblockcnt++;
177                 nbp = allocb(size + n);
178                 memmove(nbp->wp, bp->rp, n);
179                 nbp->wp += n;
180                 freeb(bp);
181         }
182         if (bcksum) {
183                 nbp->flag |= bcksum;
184                 nbp->checksum_start = checksum_start;
185                 nbp->checksum_offset = checksum_offset;
186                 nbp->mss = mss;
187         }
188         QDEBUG checkb(nbp, "padblock 1");
189         return nbp;
190 }
191
192 /*
193  *  return count of bytes in a string of blocks
194  */
195 int blocklen(struct block *bp)
196 {
197         int len;
198
199         len = 0;
200         while (bp) {
201                 len += BLEN(bp);
202                 bp = bp->next;
203         }
204         return len;
205 }
206
207 /*
208  * return count of space in blocks
209  */
210 int blockalloclen(struct block *bp)
211 {
212         int len;
213
214         len = 0;
215         while (bp) {
216                 len += BALLOC(bp);
217                 bp = bp->next;
218         }
219         return len;
220 }
221
222 /*
223  *  copy the  string of blocks into
224  *  a single block and free the string
225  */
226 struct block *concatblock(struct block *bp)
227 {
228         int len;
229         struct block *nb, *f;
230
231         if (bp->next == 0)
232                 return bp;
233
234         /* probably use parts of qclone */
235         PANIC_EXTRA(bp);
236         nb = allocb(blocklen(bp));
237         for (f = bp; f; f = f->next) {
238                 len = BLEN(f);
239                 memmove(nb->wp, f->rp, len);
240                 nb->wp += len;
241         }
242         concatblockcnt += BLEN(nb);
243         freeblist(bp);
244         QDEBUG checkb(nb, "concatblock 1");
245         return nb;
246 }
247
248 /* Returns a block with the remaining contents of b all in the main body of the
249  * returned block.  Replace old references to b with the returned value (which
250  * may still be 'b', if no change was needed. */
251 struct block *linearizeblock(struct block *b)
252 {
253         struct block *newb;
254         size_t len;
255         struct extra_bdata *ebd;
256
257         if (!b->extra_len)
258                 return b;
259
260         newb = allocb(BLEN(b));
261         len = BHLEN(b);
262         memcpy(newb->wp, b->rp, len);
263         newb->wp += len;
264         len = b->extra_len;
265         for (int i = 0; (i < b->nr_extra_bufs) && len; i++) {
266                 ebd = &b->extra_data[i];
267                 if (!ebd->base || !ebd->len)
268                         continue;
269                 memcpy(newb->wp, (void*)(ebd->base + ebd->off), ebd->len);
270                 newb->wp += ebd->len;
271                 len -= ebd->len;
272         }
273         /* TODO: any other flags that need copied over? */
274         if (b->flag & BCKSUM_FLAGS) {
275                 newb->flag |= (b->flag & BCKSUM_FLAGS);
276                 newb->checksum_start = b->checksum_start;
277                 newb->checksum_offset = b->checksum_offset;
278                 newb->mss = b->mss;
279         }
280         freeb(b);
281         return newb;
282 }
283
284 /*
285  *  make sure the first block has at least n bytes in its main body
286  */
287 struct block *pullupblock(struct block *bp, int n)
288 {
289         int i, len, seglen;
290         struct block *nbp;
291         struct extra_bdata *ebd;
292
293         /*
294          *  this should almost always be true, it's
295          *  just to avoid every caller checking.
296          */
297         if (BHLEN(bp) >= n)
298                 return bp;
299
300          /* a start at explicit main-body / header management */
301         if (bp->extra_len) {
302                 if (n > bp->lim - bp->rp) {
303                         /* would need to realloc a new block and copy everything over. */
304                         panic("can't pullup %d bytes, no place to put it: bp->lim %p, bp->rp %p, bp->lim-bp->rp %d\n",
305                                         n, bp->lim, bp->rp, bp->lim-bp->rp);
306                 }
307                 len = n - BHLEN(bp);
308                 if (len > bp->extra_len)
309                         panic("pullup more than extra (%d, %d, %d)\n",
310                               n, BHLEN(bp), bp->extra_len);
311                 checkb(bp, "before pullup");
312                 for (int i = 0; (i < bp->nr_extra_bufs) && len; i++) {
313                         ebd = &bp->extra_data[i];
314                         if (!ebd->base || !ebd->len)
315                                 continue;
316                         seglen = MIN(ebd->len, len);
317                         memcpy(bp->wp, (void*)(ebd->base + ebd->off), seglen);
318                         bp->wp += seglen;
319                         len -= seglen;
320                         ebd->len -= seglen;
321                         ebd->off += seglen;
322                         bp->extra_len -= seglen;
323                         if (ebd->len == 0) {
324                                 kfree((void *)ebd->base);
325                                 ebd->off = 0;
326                                 ebd->base = 0;
327                         }
328                 }
329                 /* maybe just call pullupblock recursively here */
330                 if (len)
331                         panic("pullup %d bytes overdrawn\n", len);
332                 checkb(bp, "after pullup");
333                 return bp;
334         }
335
336         /*
337          *  if not enough room in the first block,
338          *  add another to the front of the list.
339          */
340         if (bp->lim - bp->rp < n) {
341                 nbp = allocb(n);
342                 nbp->next = bp;
343                 bp = nbp;
344         }
345
346         /*
347          *  copy bytes from the trailing blocks into the first
348          */
349         n -= BLEN(bp);
350         while ((nbp = bp->next)) {
351                 i = BLEN(nbp);
352                 if (i > n) {
353                         memmove(bp->wp, nbp->rp, n);
354                         pullupblockcnt++;
355                         bp->wp += n;
356                         nbp->rp += n;
357                         QDEBUG checkb(bp, "pullupblock 1");
358                         return bp;
359                 } else {
360                         memmove(bp->wp, nbp->rp, i);
361                         pullupblockcnt++;
362                         bp->wp += i;
363                         bp->next = nbp->next;
364                         nbp->next = 0;
365                         freeb(nbp);
366                         n -= i;
367                         if (n == 0) {
368                                 QDEBUG checkb(bp, "pullupblock 2");
369                                 return bp;
370                         }
371                 }
372         }
373         freeb(bp);
374         return 0;
375 }
376
377 /*
378  *  make sure the first block has at least n bytes in its main body
379  */
380 struct block *pullupqueue(struct queue *q, int n)
381 {
382         struct block *b;
383
384         /* TODO: lock to protect the queue links? */
385         if ((BHLEN(q->bfirst) >= n))
386                 return q->bfirst;
387         q->bfirst = pullupblock(q->bfirst, n);
388         for (b = q->bfirst; b != NULL && b->next != NULL; b = b->next) ;
389         q->blast = b;
390         return q->bfirst;
391 }
392
393 /* throw away count bytes from the front of
394  * block's extradata.  Returns count of bytes
395  * thrown away
396  */
397
398 static int pullext(struct block *bp, int count)
399 {
400         struct extra_bdata *ed;
401         int i, rem, bytes = 0;
402
403         for (i = 0; bp->extra_len && count && i < bp->nr_extra_bufs; i++) {
404                 ed = &bp->extra_data[i];
405                 rem = MIN(count, ed->len);
406                 bp->extra_len -= rem;
407                 count -= rem;
408                 bytes += rem;
409                 ed->off += rem;
410                 ed->len -= rem;
411                 if (ed->len == 0) {
412                         kfree((void *)ed->base);
413                         ed->base = 0;
414                         ed->off = 0;
415                 }
416         }
417         return bytes;
418 }
419
420 /* throw away count bytes from the end of a
421  * block's extradata.  Returns count of bytes
422  * thrown away
423  */
424
425 static int dropext(struct block *bp, int count)
426 {
427         struct extra_bdata *ed;
428         int i, rem, bytes = 0;
429
430         for (i = bp->nr_extra_bufs - 1; bp->extra_len && count && i >= 0; i--) {
431                 ed = &bp->extra_data[i];
432                 rem = MIN(count, ed->len);
433                 bp->extra_len -= rem;
434                 count -= rem;
435                 bytes += rem;
436                 ed->len -= rem;
437                 if (ed->len == 0) {
438                         kfree((void *)ed->base);
439                         ed->base = 0;
440                         ed->off = 0;
441                 }
442         }
443         return bytes;
444 }
445
446 /*
447  *  throw away up to count bytes from a
448  *  list of blocks.  Return count of bytes
449  *  thrown away.
450  */
451 static int _pullblock(struct block **bph, int count, int free)
452 {
453         struct block *bp;
454         int n, bytes;
455
456         bytes = 0;
457         if (bph == NULL)
458                 return 0;
459
460         while (*bph != NULL && count != 0) {
461                 bp = *bph;
462
463                 n = MIN(BHLEN(bp), count);
464                 bytes += n;
465                 count -= n;
466                 bp->rp += n;
467                 n = pullext(bp, count);
468                 bytes += n;
469                 count -= n;
470                 QDEBUG checkb(bp, "pullblock ");
471                 if (BLEN(bp) == 0 && (free || count)) {
472                         *bph = bp->next;
473                         bp->next = NULL;
474                         freeb(bp);
475                 }
476         }
477         return bytes;
478 }
479
480 int pullblock(struct block **bph, int count)
481 {
482         return _pullblock(bph, count, 1);
483 }
484
485 /*
486  *  trim to len bytes starting at offset
487  */
488 struct block *trimblock(struct block *bp, int offset, int len)
489 {
490         uint32_t l, trim;
491         int olen = len;
492
493         QDEBUG checkb(bp, "trimblock 1");
494         if (blocklen(bp) < offset + len) {
495                 freeblist(bp);
496                 return NULL;
497         }
498
499         l =_pullblock(&bp, offset, 0);
500         if (bp == NULL)
501                 return NULL;
502         if (l != offset) {
503                 freeblist(bp);
504                 return NULL;
505         }
506
507         while ((l = BLEN(bp)) < len) {
508                 len -= l;
509                 bp = bp->next;
510         }
511
512         trim = BLEN(bp) - len;
513         trim -= dropext(bp, trim);
514         bp->wp -= trim;
515
516         if (bp->next) {
517                 freeblist(bp->next);
518                 bp->next = NULL;
519         }
520         return bp;
521 }
522
523 /*
524  *  copy 'count' bytes into a new block
525  */
526 struct block *copyblock(struct block *bp, int count)
527 {
528         int l;
529         struct block *nbp;
530
531         QDEBUG checkb(bp, "copyblock 0");
532         nbp = allocb(count);
533         if (bp->flag & BCKSUM_FLAGS) {
534                 nbp->flag |= (bp->flag & BCKSUM_FLAGS);
535                 nbp->checksum_start = bp->checksum_start;
536                 nbp->checksum_offset = bp->checksum_offset;
537                 nbp->mss = bp->mss;
538         }
539         PANIC_EXTRA(bp);
540         for (; count > 0 && bp != 0; bp = bp->next) {
541                 l = BLEN(bp);
542                 if (l > count)
543                         l = count;
544                 memmove(nbp->wp, bp->rp, l);
545                 nbp->wp += l;
546                 count -= l;
547         }
548         if (count > 0) {
549                 memset(nbp->wp, 0, count);
550                 nbp->wp += count;
551         }
552         copyblockcnt++;
553         QDEBUG checkb(nbp, "copyblock 1");
554
555         return nbp;
556 }
557
558 /* Adjust block @bp so that its size is exactly @len.
559  * If the size is increased, fill in the new contents with zeros.
560  * If the size is decreased, discard some of the old contents at the tail. */
561 struct block *adjustblock(struct block *bp, int len)
562 {
563         struct extra_bdata *ebd;
564         int i;
565
566         if (len < 0) {
567                 freeb(bp);
568                 return NULL;
569         }
570
571         if (len == BLEN(bp))
572                 return bp;
573
574         /* Shrink within block main body. */
575         if (len <= BHLEN(bp)) {
576                 free_block_extra(bp);
577                 bp->wp = bp->rp + len;
578                 QDEBUG checkb(bp, "adjustblock 1");
579                 return bp;
580         }
581
582         /* Need to grow. */
583         if (len > BLEN(bp)) {
584                 /* Grow within block main body. */
585                 if (bp->extra_len == 0 && bp->rp + len <= bp->lim) {
586                         memset(bp->wp, 0, len - BLEN(bp));
587                         bp->wp = bp->rp + len;
588                         QDEBUG checkb(bp, "adjustblock 2");
589                         return bp;
590                 }
591                 /* Grow with extra data buffers. */
592                 block_append_extra(bp, len - BLEN(bp), KMALLOC_WAIT);
593                 QDEBUG checkb(bp, "adjustblock 3");
594                 return bp;
595         }
596
597         /* Shrink extra data buffers.
598          * len is how much of ebd we need to keep.
599          * extra_len is re-accumulated. */
600         assert(bp->extra_len > 0);
601         len -= BHLEN(bp);
602         bp->extra_len = 0;
603         for (i = 0; i < bp->nr_extra_bufs; i++) {
604                 ebd = &bp->extra_data[i];
605                 if (len <= ebd->len)
606                         break;
607                 len -= ebd->len;
608                 bp->extra_len += ebd->len;
609         }
610         /* If len becomes zero, extra_data[i] should be freed. */
611         if (len > 0) {
612                 ebd = &bp->extra_data[i];
613                 ebd->len = len;
614                 bp->extra_len += ebd->len;
615                 i++;
616         }
617         for (; i < bp->nr_extra_bufs; i++) {
618                 ebd = &bp->extra_data[i];
619                 if (ebd->base)
620                         kfree((void*)ebd->base);
621                 ebd->base = ebd->off = ebd->len = 0;
622         }
623         QDEBUG checkb(bp, "adjustblock 4");
624         return bp;
625 }
626
627
628 /*
629  *  get next block from a queue, return null if nothing there
630  */
631 struct block *qget(struct queue *q)
632 {
633         int dowakeup;
634         struct block *b;
635
636         /* sync with qwrite */
637         spin_lock_irqsave(&q->lock);
638
639         b = q->bfirst;
640         if (b == NULL) {
641                 q->state |= Qstarve;
642                 spin_unlock_irqsave(&q->lock);
643                 return NULL;
644         }
645         q->bfirst = b->next;
646         b->next = 0;
647         q->len -= BALLOC(b);
648         q->dlen -= BLEN(b);
649         QDEBUG checkb(b, "qget");
650
651         /* if writer flow controlled, restart */
652         if ((q->state & Qflow) && q->len < q->limit / 2) {
653                 q->state &= ~Qflow;
654                 dowakeup = 1;
655         } else
656                 dowakeup = 0;
657
658         spin_unlock_irqsave(&q->lock);
659
660         if (dowakeup)
661                 rendez_wakeup(&q->wr);
662         qwake_cb(q, FDTAP_FILT_WRITABLE);
663
664         return b;
665 }
666
667 /*
668  *  throw away the next 'len' bytes in the queue
669  * returning the number actually discarded
670  */
671 int qdiscard(struct queue *q, int len)
672 {
673         struct block *b;
674         int dowakeup, n, sofar, body_amt, extra_amt;
675         struct extra_bdata *ebd;
676
677         spin_lock_irqsave(&q->lock);
678         for (sofar = 0; sofar < len; sofar += n) {
679                 b = q->bfirst;
680                 if (b == NULL)
681                         break;
682                 QDEBUG checkb(b, "qdiscard");
683                 n = BLEN(b);
684                 if (n <= len - sofar) {
685                         q->bfirst = b->next;
686                         b->next = 0;
687                         q->len -= BALLOC(b);
688                         q->dlen -= BLEN(b);
689                         freeb(b);
690                 } else {
691                         n = len - sofar;
692                         q->dlen -= n;
693                         /* partial block removal */
694                         body_amt = MIN(BHLEN(b), n);
695                         b->rp += body_amt;
696                         extra_amt = n - body_amt;
697                         /* reduce q->len by the amount we remove from the extras.  The
698                          * header will always be accounted for above, during block removal.
699                          * */
700                         q->len -= extra_amt;
701                         for (int i = 0; (i < b->nr_extra_bufs) && extra_amt; i++) {
702                                 ebd = &b->extra_data[i];
703                                 if (!ebd->base || !ebd->len)
704                                         continue;
705                                 if (extra_amt >= ebd->len) {
706                                         /* remove the entire entry, note the kfree release */
707                                         b->extra_len -= ebd->len;
708                                         extra_amt -= ebd->len;
709                                         kfree((void*)ebd->base);
710                                         ebd->base = ebd->off = ebd->len = 0;
711                                         continue;
712                                 }
713                                 ebd->off += extra_amt;
714                                 ebd->len -= extra_amt;
715                                 b->extra_len -= extra_amt;
716                                 extra_amt = 0;
717                         }
718                 }
719         }
720
721         /*
722          *  if writer flow controlled, restart
723          *
724          *  This used to be
725          *  q->len < q->limit/2
726          *  but it slows down tcp too much for certain write sizes.
727          *  I really don't understand it completely.  It may be
728          *  due to the queue draining so fast that the transmission
729          *  stalls waiting for the app to produce more data.  - presotto
730          */
731         if ((q->state & Qflow) && q->len < q->limit) {
732                 q->state &= ~Qflow;
733                 dowakeup = 1;
734         } else
735                 dowakeup = 0;
736
737         spin_unlock_irqsave(&q->lock);
738
739         if (dowakeup)
740                 rendez_wakeup(&q->wr);
741         qwake_cb(q, FDTAP_FILT_WRITABLE);
742
743         return sofar;
744 }
745
746 /*
747  *  Interrupt level copy out of a queue, return # bytes copied.
748  */
749 int qconsume(struct queue *q, void *vp, int len)
750 {
751         struct block *b;
752         int n, dowakeup;
753         uint8_t *p = vp;
754         struct block *tofree = NULL;
755
756         /* sync with qwrite */
757         spin_lock_irqsave(&q->lock);
758
759         for (;;) {
760                 b = q->bfirst;
761                 if (b == 0) {
762                         q->state |= Qstarve;
763                         spin_unlock_irqsave(&q->lock);
764                         return -1;
765                 }
766                 QDEBUG checkb(b, "qconsume 1");
767
768                 n = BLEN(b);
769                 if (n > 0)
770                         break;
771                 q->bfirst = b->next;
772                 q->len -= BALLOC(b);
773
774                 /* remember to free this */
775                 b->next = tofree;
776                 tofree = b;
777         };
778
779         PANIC_EXTRA(b);
780         if (n < len)
781                 len = n;
782         memmove(p, b->rp, len);
783         consumecnt += n;
784         b->rp += len;
785         q->dlen -= len;
786
787         /* discard the block if we're done with it */
788         if ((q->state & Qmsg) || len == n) {
789                 q->bfirst = b->next;
790                 b->next = 0;
791                 q->len -= BALLOC(b);
792                 q->dlen -= BLEN(b);
793
794                 /* remember to free this */
795                 b->next = tofree;
796                 tofree = b;
797         }
798
799         /* if writer flow controlled, restart */
800         if ((q->state & Qflow) && q->len < q->limit / 2) {
801                 q->state &= ~Qflow;
802                 dowakeup = 1;
803         } else
804                 dowakeup = 0;
805
806         spin_unlock_irqsave(&q->lock);
807
808         if (dowakeup)
809                 rendez_wakeup(&q->wr);
810         qwake_cb(q, FDTAP_FILT_WRITABLE);
811
812         if (tofree != NULL)
813                 freeblist(tofree);
814
815         return len;
816 }
817
818 int qpass(struct queue *q, struct block *b)
819 {
820         int dlen, len, dowakeup;
821
822         /* sync with qread */
823         dowakeup = 0;
824         spin_lock_irqsave(&q->lock);
825         if (q->len >= q->limit) {
826                 freeblist(b);
827                 spin_unlock_irqsave(&q->lock);
828                 return -1;
829         }
830         if (q->state & Qclosed) {
831                 len = blocklen(b);
832                 freeblist(b);
833                 spin_unlock_irqsave(&q->lock);
834                 return len;
835         }
836
837         /* add buffer to queue */
838         if (q->bfirst)
839                 q->blast->next = b;
840         else
841                 q->bfirst = b;
842         len = BALLOC(b);
843         dlen = BLEN(b);
844         QDEBUG checkb(b, "qpass");
845         while (b->next) {
846                 b = b->next;
847                 QDEBUG checkb(b, "qpass");
848                 len += BALLOC(b);
849                 dlen += BLEN(b);
850         }
851         q->blast = b;
852         q->len += len;
853         q->dlen += dlen;
854
855         if (q->len >= q->limit / 2)
856                 q->state |= Qflow;
857
858         if (q->state & Qstarve) {
859                 q->state &= ~Qstarve;
860                 dowakeup = 1;
861         }
862         spin_unlock_irqsave(&q->lock);
863
864         if (dowakeup) {
865                 rendez_wakeup(&q->rr);
866                 qwake_cb(q, FDTAP_FILT_READABLE);
867         }
868
869         return len;
870 }
871
872 int qpassnolim(struct queue *q, struct block *b)
873 {
874         int dlen, len, dowakeup;
875
876         /* sync with qread */
877         dowakeup = 0;
878         spin_lock_irqsave(&q->lock);
879
880         if (q->state & Qclosed) {
881                 freeblist(b);
882                 spin_unlock_irqsave(&q->lock);
883                 return BALLOC(b);
884         }
885
886         /* add buffer to queue */
887         if (q->bfirst)
888                 q->blast->next = b;
889         else
890                 q->bfirst = b;
891         len = BALLOC(b);
892         dlen = BLEN(b);
893         QDEBUG checkb(b, "qpass");
894         while (b->next) {
895                 b = b->next;
896                 QDEBUG checkb(b, "qpass");
897                 len += BALLOC(b);
898                 dlen += BLEN(b);
899         }
900         q->blast = b;
901         q->len += len;
902         q->dlen += dlen;
903
904         if (q->len >= q->limit / 2)
905                 q->state |= Qflow;
906
907         if (q->state & Qstarve) {
908                 q->state &= ~Qstarve;
909                 dowakeup = 1;
910         }
911         spin_unlock_irqsave(&q->lock);
912
913         if (dowakeup) {
914                 rendez_wakeup(&q->rr);
915                 qwake_cb(q, FDTAP_FILT_READABLE);
916         }
917
918         return len;
919 }
920
921 /*
922  *  if the allocated space is way out of line with the used
923  *  space, reallocate to a smaller block
924  */
925 struct block *packblock(struct block *bp)
926 {
927         struct block **l, *nbp;
928         int n;
929
930         if (bp->extra_len)
931                 return bp;
932         for (l = &bp; *l; l = &(*l)->next) {
933                 nbp = *l;
934                 n = BLEN(nbp);
935                 if ((n << 2) < BALLOC(nbp)) {
936                         *l = allocb(n);
937                         memmove((*l)->wp, nbp->rp, n);
938                         (*l)->wp += n;
939                         (*l)->next = nbp->next;
940                         freeb(nbp);
941                 }
942         }
943
944         return bp;
945 }
946
947 int qproduce(struct queue *q, void *vp, int len)
948 {
949         struct block *b;
950         int dowakeup;
951         uint8_t *p = vp;
952
953         /* sync with qread */
954         dowakeup = 0;
955         spin_lock_irqsave(&q->lock);
956
957         /* no waiting receivers, room in buffer? */
958         if (q->len >= q->limit) {
959                 q->state |= Qflow;
960                 spin_unlock_irqsave(&q->lock);
961                 return -1;
962         }
963
964         /* save in buffer */
965         /* use Qcoalesce here to save storage */
966         // TODO: Consider removing the Qcoalesce flag and force a coalescing
967         // strategy by default.
968         b = q->blast;
969         if ((q->state & Qcoalesce) == 0 || q->bfirst == NULL
970                 || b->lim - b->wp < len) {
971                 /* need a new block */
972                 b = iallocb(len);
973                 if (b == 0) {
974                         spin_unlock_irqsave(&q->lock);
975                         return 0;
976                 }
977                 if (q->bfirst)
978                         q->blast->next = b;
979                 else
980                         q->bfirst = b;
981                 q->blast = b;
982                 /* b->next = 0; done by iallocb() */
983                 q->len += BALLOC(b);
984         }
985         PANIC_EXTRA(b);
986         memmove(b->wp, p, len);
987         producecnt += len;
988         b->wp += len;
989         q->dlen += len;
990         QDEBUG checkb(b, "qproduce");
991
992         if (q->state & Qstarve) {
993                 q->state &= ~Qstarve;
994                 dowakeup = 1;
995         }
996
997         if (q->len >= q->limit)
998                 q->state |= Qflow;
999         spin_unlock_irqsave(&q->lock);
1000
1001         if (dowakeup) {
1002                 rendez_wakeup(&q->rr);
1003                 qwake_cb(q, FDTAP_FILT_READABLE);
1004         }
1005
1006         return len;
1007 }
1008
1009 /* Add an extra_data entry to newb at newb_idx pointing to b's body, starting at
1010  * body_rp, for up to len.  Returns the len consumed. 
1011  *
1012  * The base is 'b', so that we can kfree it later.  This currently ties us to
1013  * using kfree for the release method for all extra_data.
1014  *
1015  * It is possible to have a body size that is 0, if there is no offset, and
1016  * b->wp == b->rp.  This will have an extra data entry of 0 length. */
1017 static size_t point_to_body(struct block *b, uint8_t *body_rp,
1018                             struct block *newb, unsigned int newb_idx,
1019                             size_t len)
1020 {
1021         struct extra_bdata *ebd = &newb->extra_data[newb_idx];
1022
1023         assert(newb_idx < newb->nr_extra_bufs);
1024
1025         kmalloc_incref(b);
1026         ebd->base = (uintptr_t)b;
1027         ebd->off = (uint32_t)(body_rp - (uint8_t*)b);
1028         ebd->len = MIN(b->wp - body_rp, len);   /* think of body_rp as b->rp */
1029         assert((int)ebd->len >= 0);
1030         newb->extra_len += ebd->len;
1031         return ebd->len;
1032 }
1033
1034 /* Add an extra_data entry to newb at newb_idx pointing to b's b_idx'th
1035  * extra_data buf, at b_off within that buffer, for up to len.  Returns the len
1036  * consumed.
1037  *
1038  * We can have blocks with 0 length, but they are still refcnt'd.  See above. */
1039 static size_t point_to_buf(struct block *b, unsigned int b_idx, uint32_t b_off,
1040                            struct block *newb, unsigned int newb_idx,
1041                            size_t len)
1042 {
1043         struct extra_bdata *n_ebd = &newb->extra_data[newb_idx];
1044         struct extra_bdata *b_ebd = &b->extra_data[b_idx];
1045
1046         assert(b_idx < b->nr_extra_bufs);
1047         assert(newb_idx < newb->nr_extra_bufs);
1048
1049         kmalloc_incref((void*)b_ebd->base);
1050         n_ebd->base = b_ebd->base;
1051         n_ebd->off = b_ebd->off + b_off;
1052         n_ebd->len = MIN(b_ebd->len - b_off, len);
1053         newb->extra_len += n_ebd->len;
1054         return n_ebd->len;
1055 }
1056
1057 /* given a string of blocks, fills the new block's extra_data  with the contents
1058  * of the blist [offset, len + offset)
1059  *
1060  * returns 0 on success.  the only failure is if the extra_data array was too
1061  * small, so this returns a positive integer saying how big the extra_data needs
1062  * to be.
1063  *
1064  * callers are responsible for protecting the list structure. */
1065 static int __blist_clone_to(struct block *blist, struct block *newb, int len,
1066                             uint32_t offset)
1067 {
1068         struct block *b, *first;
1069         unsigned int nr_bufs = 0;
1070         unsigned int b_idx, newb_idx = 0;
1071         uint8_t *first_main_body = 0;
1072
1073         /* find the first block; keep offset relative to the latest b in the list */
1074         for (b = blist; b; b = b->next) {
1075                 if (BLEN(b) > offset)
1076                         break;
1077                 offset -= BLEN(b);
1078         }
1079         /* qcopy semantics: if you asked for an offset outside the block list, you
1080          * get an empty block back */
1081         if (!b)
1082                 return 0;
1083         first = b;
1084         /* upper bound for how many buffers we'll need in newb */
1085         for (/* b is set*/; b; b = b->next) {
1086                 nr_bufs += 1 + b->nr_extra_bufs;        /* 1 for the main body */
1087         }
1088         /* we might be holding a spinlock here, so we won't wait for kmalloc */
1089         if (block_add_extd(newb, nr_bufs, 0) != 0) {
1090                 /* caller will need to alloc these, then re-call us */
1091                 return nr_bufs;
1092         }
1093         for (b = first; b && len; b = b->next) {
1094                 b_idx = 0;
1095                 if (offset) {
1096                         if (offset < BHLEN(b)) {
1097                                 /* off is in the main body */
1098                                 len -= point_to_body(b, b->rp + offset, newb, newb_idx, len);
1099                                 newb_idx++;
1100                         } else {
1101                                 /* off is in one of the buffers (or just past the last one).
1102                                  * we're not going to point to b's main body at all. */
1103                                 offset -= BHLEN(b);
1104                                 assert(b->extra_data);
1105                                 /* assuming these extrabufs are packed, or at least that len
1106                                  * isn't gibberish */
1107                                 while (b->extra_data[b_idx].len <= offset) {
1108                                         offset -= b->extra_data[b_idx].len;
1109                                         b_idx++;
1110                                 }
1111                                 /* now offset is set to our offset in the b_idx'th buf */
1112                                 len -= point_to_buf(b, b_idx, offset, newb, newb_idx, len);
1113                                 newb_idx++;
1114                                 b_idx++;
1115                         }
1116                         offset = 0;
1117                 } else {
1118                         len -= point_to_body(b, b->rp, newb, newb_idx, len);
1119                         newb_idx++;
1120                 }
1121                 /* knock out all remaining bufs.  we only did one point_to_ op by now,
1122                  * and any point_to_ could be our last if it consumed all of len. */
1123                 for (int i = b_idx; (i < b->nr_extra_bufs) && len; i++) {
1124                         len -= point_to_buf(b, i, 0, newb, newb_idx, len);
1125                         newb_idx++;
1126                 }
1127         }
1128         return 0;
1129 }
1130
1131 struct block *blist_clone(struct block *blist, int header_len, int len,
1132                           uint32_t offset)
1133 {
1134         int ret;
1135         struct block *newb = allocb(header_len);
1136         do {
1137                 ret = __blist_clone_to(blist, newb, len, offset);
1138                 if (ret)
1139                         block_add_extd(newb, ret, KMALLOC_WAIT);
1140         } while (ret);
1141         return newb;
1142 }
1143
1144 /* given a queue, makes a single block with header_len reserved space in the
1145  * block main body, and the contents of [offset, len + offset) pointed to in the
1146  * new blocks ext_data. */
1147 struct block *qclone(struct queue *q, int header_len, int len, uint32_t offset)
1148 {
1149         int ret;
1150         struct block *newb = allocb(header_len);
1151         /* the while loop should rarely be used: it would require someone
1152          * concurrently adding to the queue. */
1153         do {
1154                 /* TODO: RCU: protecting the q list (b->next) (need read lock) */
1155                 spin_lock_irqsave(&q->lock);
1156                 ret = __blist_clone_to(q->bfirst, newb, len, offset);
1157                 spin_unlock_irqsave(&q->lock);
1158                 if (ret)
1159                         block_add_extd(newb, ret, KMALLOC_WAIT);
1160         } while (ret);
1161         return newb;
1162 }
1163
1164 /*
1165  *  copy from offset in the queue
1166  */
1167 struct block *qcopy_old(struct queue *q, int len, uint32_t offset)
1168 {
1169         int sofar;
1170         int n;
1171         struct block *b, *nb;
1172         uint8_t *p;
1173
1174         nb = allocb(len);
1175
1176         spin_lock_irqsave(&q->lock);
1177
1178         /* go to offset */
1179         b = q->bfirst;
1180         for (sofar = 0;; sofar += n) {
1181                 if (b == NULL) {
1182                         spin_unlock_irqsave(&q->lock);
1183                         return nb;
1184                 }
1185                 n = BLEN(b);
1186                 if (sofar + n > offset) {
1187                         p = b->rp + offset - sofar;
1188                         n -= offset - sofar;
1189                         break;
1190                 }
1191                 QDEBUG checkb(b, "qcopy");
1192                 b = b->next;
1193         }
1194
1195         /* copy bytes from there */
1196         for (sofar = 0; sofar < len;) {
1197                 if (n > len - sofar)
1198                         n = len - sofar;
1199                 PANIC_EXTRA(b);
1200                 memmove(nb->wp, p, n);
1201                 qcopycnt += n;
1202                 sofar += n;
1203                 nb->wp += n;
1204                 b = b->next;
1205                 if (b == NULL)
1206                         break;
1207                 n = BLEN(b);
1208                 p = b->rp;
1209         }
1210         spin_unlock_irqsave(&q->lock);
1211
1212         return nb;
1213 }
1214
1215 struct block *qcopy(struct queue *q, int len, uint32_t offset)
1216 {
1217 #ifdef CONFIG_BLOCK_EXTRAS
1218         return qclone(q, 0, len, offset);
1219 #else
1220         return qcopy_old(q, len, offset);
1221 #endif
1222 }
1223
1224 static void qinit_common(struct queue *q)
1225 {
1226         spinlock_init_irqsave(&q->lock);
1227         qlock_init(&q->rlock);
1228         qlock_init(&q->wlock);
1229         rendez_init(&q->rr);
1230         rendez_init(&q->wr);
1231 }
1232
1233 /*
1234  *  called by non-interrupt code
1235  */
1236 struct queue *qopen(int limit, int msg, void (*kick) (void *), void *arg)
1237 {
1238         struct queue *q;
1239
1240         q = kzmalloc(sizeof(struct queue), 0);
1241         if (q == 0)
1242                 return 0;
1243         qinit_common(q);
1244
1245         q->limit = q->inilim = limit;
1246         q->kick = kick;
1247         q->arg = arg;
1248         q->state = msg;
1249         q->state |= Qstarve;
1250         q->eof = 0;
1251
1252         return q;
1253 }
1254
1255 /* open a queue to be bypassed */
1256 struct queue *qbypass(void (*bypass) (void *, struct block *), void *arg)
1257 {
1258         struct queue *q;
1259
1260         q = kzmalloc(sizeof(struct queue), 0);
1261         if (q == 0)
1262                 return 0;
1263         qinit_common(q);
1264
1265         q->limit = 0;
1266         q->arg = arg;
1267         q->bypass = bypass;
1268         q->state = 0;
1269
1270         return q;
1271 }
1272
1273 static int notempty(void *a)
1274 {
1275         struct queue *q = a;
1276
1277         return (q->state & Qclosed) || q->bfirst != 0;
1278 }
1279
1280 /* Wait for the queue to be non-empty or closed.  Returns TRUE for a successful
1281  * wait, FALSE on Qclose (without error)
1282  *
1283  * Called with q ilocked.  May error out, back through the caller, with
1284  * the irqsave lock unlocked.  */
1285 static bool qwait(struct queue *q)
1286 {
1287         /* wait for data */
1288         for (;;) {
1289                 if (q->bfirst != NULL)
1290                         break;
1291
1292                 if (q->state & Qclosed) {
1293                         if (++q->eof > 3) {
1294                                 spin_unlock_irqsave(&q->lock);
1295                                 error(EFAIL, "multiple reads on a closed queue");
1296                         }
1297                         if (q->err[0]) {
1298                                 spin_unlock_irqsave(&q->lock);
1299                                 error(EFAIL, q->err);
1300                         }
1301                         return FALSE;
1302                 }
1303                 /* We set Qstarve regardless of whether we are non-blocking or not.
1304                  * Qstarve tracks the edge detection of the queue being empty. */
1305                 q->state |= Qstarve;
1306                 if (q->state & Qnonblock) {
1307                         spin_unlock_irqsave(&q->lock);
1308                         error(EAGAIN, "queue empty");
1309                 }
1310                 spin_unlock_irqsave(&q->lock);
1311                 /* may throw an error() */
1312                 rendez_sleep(&q->rr, notempty, q);
1313                 spin_lock_irqsave(&q->lock);
1314         }
1315         return TRUE;
1316 }
1317
1318 /*
1319  * add a block list to a queue
1320  */
1321 void qaddlist(struct queue *q, struct block *b)
1322 {
1323         /* TODO: q lock? */
1324         /* queue the block */
1325         if (q->bfirst)
1326                 q->blast->next = b;
1327         else
1328                 q->bfirst = b;
1329         q->len += blockalloclen(b);
1330         q->dlen += blocklen(b);
1331         while (b->next)
1332                 b = b->next;
1333         q->blast = b;
1334 }
1335
1336 /*
1337  *  called with q ilocked
1338  */
1339 struct block *qremove(struct queue *q)
1340 {
1341         struct block *b;
1342
1343         b = q->bfirst;
1344         if (b == NULL)
1345                 return NULL;
1346         q->bfirst = b->next;
1347         b->next = NULL;
1348         q->dlen -= BLEN(b);
1349         q->len -= BALLOC(b);
1350         QDEBUG checkb(b, "qremove");
1351         return b;
1352 }
1353
1354 static size_t read_from_block(struct block *b, uint8_t *to, size_t amt)
1355 {
1356         size_t copy_amt, retval = 0;
1357         struct extra_bdata *ebd;
1358         
1359         copy_amt = MIN(BHLEN(b), amt);
1360         memcpy(to, b->rp, copy_amt);
1361         /* advance the rp, since this block not be completely consumed and future
1362          * reads need to know where to pick up from */
1363         b->rp += copy_amt;
1364         to += copy_amt;
1365         amt -= copy_amt;
1366         retval += copy_amt;
1367         for (int i = 0; (i < b->nr_extra_bufs) && amt; i++) {
1368                 ebd = &b->extra_data[i];
1369                 /* skip empty entires.  if we track this in the struct block, we can
1370                  * just start the for loop early */
1371                 if (!ebd->base || !ebd->len)
1372                         continue;
1373                 copy_amt = MIN(ebd->len, amt);
1374                 memcpy(to, (void*)(ebd->base + ebd->off), copy_amt);
1375                 /* we're actually consuming the entries, just like how we advance rp up
1376                  * above, and might only consume part of one. */
1377                 ebd->len -= copy_amt;
1378                 ebd->off += copy_amt;
1379                 b->extra_len -= copy_amt;
1380                 if (!ebd->len) {
1381                         /* we don't actually have to decref here.  it's also done in
1382                          * freeb().  this is the earliest we can free. */
1383                         kfree((void*)ebd->base);
1384                         ebd->base = ebd->off = 0;
1385                 }
1386                 to += copy_amt;
1387                 amt -= copy_amt;
1388                 retval += copy_amt;
1389         }
1390         return retval;
1391 }
1392
1393 /*
1394  *  copy the contents of a string of blocks into
1395  *  memory.  emptied blocks are freed.  return
1396  *  pointer to first unconsumed block.
1397  */
1398 struct block *bl2mem(uint8_t * p, struct block *b, int n)
1399 {
1400         int i;
1401         struct block *next;
1402
1403         /* could be slicker here, since read_from_block is smart */
1404         for (; b != NULL; b = next) {
1405                 i = BLEN(b);
1406                 if (i > n) {
1407                         /* partial block, consume some */
1408                         read_from_block(b, p, n);
1409                         return b;
1410                 }
1411                 /* full block, consume all and move on */
1412                 i = read_from_block(b, p, i);
1413                 n -= i;
1414                 p += i;
1415                 next = b->next;
1416                 freeb(b);
1417         }
1418         return NULL;
1419 }
1420
1421 /*
1422  *  copy the contents of memory into a string of blocks.
1423  *  return NULL on error.
1424  */
1425 struct block *mem2bl(uint8_t * p, int len)
1426 {
1427         ERRSTACK(1);
1428         int n;
1429         struct block *b, *first, **l;
1430
1431         first = NULL;
1432         l = &first;
1433         if (waserror()) {
1434                 freeblist(first);
1435                 nexterror();
1436         }
1437         do {
1438                 n = len;
1439                 if (n > Maxatomic)
1440                         n = Maxatomic;
1441
1442                 *l = b = allocb(n);
1443                 /* TODO consider extra_data */
1444                 memmove(b->wp, p, n);
1445                 b->wp += n;
1446                 p += n;
1447                 len -= n;
1448                 l = &b->next;
1449         } while (len > 0);
1450         poperror();
1451
1452         return first;
1453 }
1454
1455 /*
1456  *  put a block back to the front of the queue
1457  *  called with q ilocked
1458  */
1459 void qputback(struct queue *q, struct block *b)
1460 {
1461         b->next = q->bfirst;
1462         if (q->bfirst == NULL)
1463                 q->blast = b;
1464         q->bfirst = b;
1465         q->len += BALLOC(b);
1466         q->dlen += BLEN(b);
1467 }
1468
1469 /*
1470  *  flow control, get producer going again
1471  *  called with q ilocked
1472  */
1473 static void qwakeup_iunlock(struct queue *q)
1474 {
1475         int dowakeup = 0;
1476
1477         /* if writer flow controlled, restart */
1478         if ((q->state & Qflow) && q->len < q->limit / 2) {
1479                 q->state &= ~Qflow;
1480                 dowakeup = 1;
1481         }
1482
1483         spin_unlock_irqsave(&q->lock);
1484
1485         /* wakeup flow controlled writers */
1486         if (dowakeup) {
1487                 if (q->kick)
1488                         q->kick(q->arg);
1489                 rendez_wakeup(&q->wr);
1490         }
1491         qwake_cb(q, FDTAP_FILT_WRITABLE);
1492 }
1493
1494 /*
1495  *  get next block from a queue (up to a limit)
1496  */
1497 struct block *qbread(struct queue *q, int len)
1498 {
1499         ERRSTACK(1);
1500         struct block *b, *nb;
1501         int n;
1502
1503         qlock(&q->rlock);
1504         if (waserror()) {
1505                 qunlock(&q->rlock);
1506                 nexterror();
1507         }
1508
1509         spin_lock_irqsave(&q->lock);
1510         if (!qwait(q)) {
1511                 /* queue closed */
1512                 spin_unlock_irqsave(&q->lock);
1513                 qunlock(&q->rlock);
1514                 poperror();
1515                 return NULL;
1516         }
1517
1518         /* if we get here, there's at least one block in the queue */
1519         b = qremove(q);
1520         n = BLEN(b);
1521
1522         /* split block if it's too big and this is not a message queue */
1523         nb = b;
1524         if (n > len) {
1525                 PANIC_EXTRA(b);
1526                 if ((q->state & Qmsg) == 0) {
1527                         n -= len;
1528                         b = allocb(n);
1529                         memmove(b->wp, nb->rp + len, n);
1530                         b->wp += n;
1531                         qputback(q, b);
1532                 }
1533                 nb->wp = nb->rp + len;
1534         }
1535
1536         /* restart producer */
1537         qwakeup_iunlock(q);
1538
1539         poperror();
1540         qunlock(&q->rlock);
1541         return nb;
1542 }
1543
1544 /*
1545  *  read a queue.  if no data is queued, post a struct block
1546  *  and wait on its Rendez.
1547  */
1548 long qread(struct queue *q, void *vp, int len)
1549 {
1550         ERRSTACK(1);
1551         struct block *b, *first, **l;
1552         int m, n;
1553
1554         qlock(&q->rlock);
1555         if (waserror()) {
1556                 qunlock(&q->rlock);
1557                 nexterror();
1558         }
1559
1560         spin_lock_irqsave(&q->lock);
1561 again:
1562         if (!qwait(q)) {
1563                 /* queue closed */
1564                 spin_unlock_irqsave(&q->lock);
1565                 qunlock(&q->rlock);
1566                 poperror();
1567                 return 0;
1568         }
1569
1570         /* if we get here, there's at least one block in the queue */
1571         // TODO: Consider removing the Qcoalesce flag and force a coalescing
1572         // strategy by default.
1573         if (q->state & Qcoalesce) {
1574                 /* when coalescing, 0 length blocks just go away */
1575                 b = q->bfirst;
1576                 if (BLEN(b) <= 0) {
1577                         freeb(qremove(q));
1578                         goto again;
1579                 }
1580
1581                 /*  grab the first block plus as many
1582                  *  following blocks as will completely
1583                  *  fit in the read.
1584                  */
1585                 n = 0;
1586                 l = &first;
1587                 m = BLEN(b);
1588                 for (;;) {
1589                         *l = qremove(q);
1590                         l = &b->next;
1591                         n += m;
1592
1593                         b = q->bfirst;
1594                         if (b == NULL)
1595                                 break;
1596                         m = BLEN(b);
1597                         if (n + m > len)
1598                                 break;
1599                 }
1600         } else {
1601                 first = qremove(q);
1602                 n = BLEN(first);
1603         }
1604
1605         /* copy to user space outside of the ilock */
1606         spin_unlock_irqsave(&q->lock);
1607         b = bl2mem(vp, first, len);
1608         spin_lock_irqsave(&q->lock);
1609
1610         /* take care of any left over partial block */
1611         if (b != NULL) {
1612                 n -= BLEN(b);
1613                 if (q->state & Qmsg)
1614                         freeb(b);
1615                 else
1616                         qputback(q, b);
1617         }
1618
1619         /* restart producer */
1620         qwakeup_iunlock(q);
1621
1622         poperror();
1623         qunlock(&q->rlock);
1624         return n;
1625 }
1626
1627 static int qnotfull(void *a)
1628 {
1629         struct queue *q = a;
1630
1631         return q->len < q->limit || (q->state & Qclosed);
1632 }
1633
1634 uint32_t dropcnt;
1635
1636 /*
1637  *  add a block to a queue obeying flow control
1638  */
1639 long qbwrite(struct queue *q, struct block *b)
1640 {
1641         ERRSTACK(1);
1642         int n, dowakeup;
1643         volatile bool should_free_b = TRUE;
1644
1645         n = BLEN(b);
1646
1647         if (q->bypass) {
1648                 (*q->bypass) (q->arg, b);
1649                 return n;
1650         }
1651
1652         dowakeup = 0;
1653         qlock(&q->wlock);
1654         if (waserror()) {
1655                 if (b != NULL && should_free_b)
1656                         freeb(b);
1657                 qunlock(&q->wlock);
1658                 nexterror();
1659         }
1660
1661         spin_lock_irqsave(&q->lock);
1662
1663         /* give up if the queue is closed */
1664         if (q->state & Qclosed) {
1665                 spin_unlock_irqsave(&q->lock);
1666                 if (q->err[0])
1667                         error(EFAIL, q->err);
1668                 else
1669                         error(EFAIL, "connection closed");
1670         }
1671
1672         /* if nonblocking, don't queue over the limit */
1673         if (q->len >= q->limit) {
1674                 /* drop overflow takes priority over regular non-blocking */
1675                 if (q->state & Qdropoverflow) {
1676                         spin_unlock_irqsave(&q->lock);
1677                         freeb(b);
1678                         dropcnt += n;
1679                         qunlock(&q->wlock);
1680                         poperror();
1681                         return n;
1682                 }
1683                 if (q->state & Qnonblock) {
1684                         spin_unlock_irqsave(&q->lock);
1685                         freeb(b);
1686                         error(EAGAIN, "queue full");
1687                 }
1688         }
1689
1690         /* queue the block */
1691         should_free_b = FALSE;
1692         if (q->bfirst)
1693                 q->blast->next = b;
1694         else
1695                 q->bfirst = b;
1696         q->blast = b;
1697         b->next = 0;
1698         q->len += BALLOC(b);
1699         q->dlen += n;
1700         QDEBUG checkb(b, "qbwrite");
1701         b = NULL;
1702
1703         /* make sure other end gets awakened */
1704         if (q->state & Qstarve) {
1705                 q->state &= ~Qstarve;
1706                 dowakeup = 1;
1707         }
1708         spin_unlock_irqsave(&q->lock);
1709
1710         /*  get output going again */
1711         if (q->kick && (dowakeup || (q->state & Qkick)))
1712                 q->kick(q->arg);
1713
1714         /* wakeup anyone consuming at the other end */
1715         if (dowakeup) {
1716                 rendez_wakeup(&q->rr);
1717                 qwake_cb(q, FDTAP_FILT_READABLE);
1718         }
1719
1720         /*
1721          *  flow control, wait for queue to get below the limit
1722          *  before allowing the process to continue and queue
1723          *  more.  We do this here so that postnote can only
1724          *  interrupt us after the data has been queued.  This
1725          *  means that things like 9p flushes and ssl messages
1726          *  will not be disrupted by software interrupts.
1727          *
1728          *  Note - this is moderately dangerous since a process
1729          *  that keeps getting interrupted and rewriting will
1730          *  queue infinite crud.
1731          */
1732         for (;;) {
1733                 if ((q->state & (Qdropoverflow | Qnonblock)) || qnotfull(q))
1734                         break;
1735
1736                 spin_lock_irqsave(&q->lock);
1737                 q->state |= Qflow;
1738                 spin_unlock_irqsave(&q->lock);
1739                 rendez_sleep(&q->wr, qnotfull, q);
1740         }
1741
1742         qunlock(&q->wlock);
1743         poperror();
1744         return n;
1745 }
1746
1747 long qibwrite(struct queue *q, struct block *b)
1748 {
1749         int n, dowakeup;
1750
1751         dowakeup = 0;
1752
1753         n = BLEN(b);
1754
1755         spin_lock_irqsave(&q->lock);
1756
1757         QDEBUG checkb(b, "qibwrite");
1758         if (q->bfirst)
1759                 q->blast->next = b;
1760         else
1761                 q->bfirst = b;
1762         q->blast = b;
1763         q->len += BALLOC(b);
1764         q->dlen += n;
1765
1766         if (q->state & Qstarve) {
1767                 q->state &= ~Qstarve;
1768                 dowakeup = 1;
1769         }
1770
1771         spin_unlock_irqsave(&q->lock);
1772
1773         if (dowakeup) {
1774                 if (q->kick)
1775                         q->kick(q->arg);
1776                 rendez_wakeup(&q->rr);
1777                 qwake_cb(q, FDTAP_FILT_READABLE);
1778         }
1779
1780         return n;
1781 }
1782
1783 /*
1784  *  write to a queue.  only Maxatomic bytes at a time is atomic.
1785  */
1786 int qwrite(struct queue *q, void *vp, int len)
1787 {
1788         int n, sofar;
1789         struct block *b;
1790         uint8_t *p = vp;
1791         void *ext_buf;
1792
1793         QDEBUG if (!islo())
1794                  printd("qwrite hi %p\n", getcallerpc(&q));
1795
1796         sofar = 0;
1797         do {
1798                 n = len - sofar;
1799                 /* This is 64K, the max amount per single block.  Still a good value? */
1800                 if (n > Maxatomic)
1801                         n = Maxatomic;
1802
1803                 /* If n is small, we don't need to bother with the extra_data.  But
1804                  * until the whole stack can handle extd blocks, we'll use them
1805                  * unconditionally. */
1806 #ifdef CONFIG_BLOCK_EXTRAS
1807                 /* allocb builds in 128 bytes of header space to all blocks, but this is
1808                  * only available via padblock (to the left).  we also need some space
1809                  * for pullupblock for some basic headers (like icmp) that get written
1810                  * in directly */
1811                 b = allocb(64);
1812                 ext_buf = kmalloc(n, 0);
1813                 memcpy(ext_buf, p + sofar, n);
1814                 block_add_extd(b, 1, KMALLOC_WAIT); /* returns 0 on success */
1815                 b->extra_data[0].base = (uintptr_t)ext_buf;
1816                 b->extra_data[0].off = 0;
1817                 b->extra_data[0].len = n;
1818                 b->extra_len += n;
1819 #else
1820                 b = allocb(n);
1821                 memmove(b->wp, p + sofar, n);
1822                 b->wp += n;
1823 #endif
1824                         
1825                 qbwrite(q, b);
1826
1827                 sofar += n;
1828         } while (sofar < len && (q->state & Qmsg) == 0);
1829
1830         return len;
1831 }
1832
1833 /*
1834  *  used by print() to write to a queue.  Since we may be splhi or not in
1835  *  a process, don't qlock.
1836  */
1837 int qiwrite(struct queue *q, void *vp, int len)
1838 {
1839         int n, sofar;
1840         struct block *b;
1841         uint8_t *p = vp;
1842
1843         sofar = 0;
1844         do {
1845                 n = len - sofar;
1846                 if (n > Maxatomic)
1847                         n = Maxatomic;
1848
1849                 b = iallocb(n);
1850                 if (b == NULL)
1851                         break;
1852                 /* TODO consider extra_data */
1853                 memmove(b->wp, p + sofar, n);
1854                 /* this adjusts BLEN to be n, or at least it should */
1855                 b->wp += n;
1856                 assert(n == BLEN(b));
1857                 qibwrite(q, b);
1858
1859                 sofar += n;
1860         } while (sofar < len && (q->state & Qmsg) == 0);
1861
1862         return sofar;
1863 }
1864
1865 /*
1866  *  be extremely careful when calling this,
1867  *  as there is no reference accounting
1868  */
1869 void qfree(struct queue *q)
1870 {
1871         qclose(q);
1872         kfree(q);
1873 }
1874
1875 /*
1876  *  Mark a queue as closed.  No further IO is permitted.
1877  *  All blocks are released.
1878  */
1879 void qclose(struct queue *q)
1880 {
1881         struct block *bfirst;
1882
1883         if (q == NULL)
1884                 return;
1885
1886         /* mark it */
1887         spin_lock_irqsave(&q->lock);
1888         q->state |= Qclosed;
1889         q->state &= ~(Qflow | Qstarve | Qdropoverflow | Qnonblock);
1890         q->err[0] = 0;
1891         bfirst = q->bfirst;
1892         q->bfirst = 0;
1893         q->len = 0;
1894         q->dlen = 0;
1895         spin_unlock_irqsave(&q->lock);
1896
1897         /* free queued blocks */
1898         freeblist(bfirst);
1899
1900         /* wake up readers/writers */
1901         rendez_wakeup(&q->rr);
1902         rendez_wakeup(&q->wr);
1903         qwake_cb(q, FDTAP_FILT_HANGUP);
1904 }
1905
1906 /* Mark a queue as closed.  Wakeup any readers.  Don't remove queued blocks.
1907  *
1908  * msg will be the errstr received by any waiters (qread, qbread, etc).  If
1909  * there is no message, which is what also happens during a natural qclose(),
1910  * those waiters will simply return 0.  qwriters will always error() on a
1911  * closed/hungup queue. */
1912 void qhangup(struct queue *q, char *msg)
1913 {
1914         /* mark it */
1915         spin_lock_irqsave(&q->lock);
1916         q->state |= Qclosed;
1917         if (msg == 0 || *msg == 0)
1918                 q->err[0] = 0;
1919         else
1920                 strlcpy(q->err, msg, ERRMAX);
1921         spin_unlock_irqsave(&q->lock);
1922
1923         /* wake up readers/writers */
1924         rendez_wakeup(&q->rr);
1925         rendez_wakeup(&q->wr);
1926         qwake_cb(q, FDTAP_FILT_HANGUP);
1927 }
1928
1929 /*
1930  *  return non-zero if the q is hungup
1931  */
1932 int qisclosed(struct queue *q)
1933 {
1934         return q->state & Qclosed;
1935 }
1936
1937 /*
1938  *  mark a queue as no longer hung up.  resets the wake_cb.
1939  */
1940 void qreopen(struct queue *q)
1941 {
1942         spin_lock_irqsave(&q->lock);
1943         q->state &= ~Qclosed;
1944         q->state |= Qstarve;
1945         q->eof = 0;
1946         q->limit = q->inilim;
1947         q->wake_cb = 0;
1948         q->wake_data = 0;
1949         spin_unlock_irqsave(&q->lock);
1950 }
1951
1952 /*
1953  *  return bytes queued
1954  */
1955 int qlen(struct queue *q)
1956 {
1957         return q->dlen;
1958 }
1959
1960 /*
1961  * return space remaining before flow control
1962  */
1963 int qwindow(struct queue *q)
1964 {
1965         int l;
1966
1967         l = q->limit - q->len;
1968         if (l < 0)
1969                 l = 0;
1970         return l;
1971 }
1972
1973 /*
1974  *  return true if we can read without blocking
1975  */
1976 int qcanread(struct queue *q)
1977 {
1978         return q->bfirst != 0;
1979 }
1980
1981 /*
1982  *  change queue limit
1983  */
1984 void qsetlimit(struct queue *q, int limit)
1985 {
1986         q->limit = limit;
1987 }
1988
1989 /*
1990  *  set whether writes drop overflowing blocks, or if we sleep
1991  */
1992 void qdropoverflow(struct queue *q, bool onoff)
1993 {
1994         if (onoff)
1995                 q->state |= Qdropoverflow;
1996         else
1997                 q->state &= ~Qdropoverflow;
1998 }
1999
2000 /* set whether or not the queue is nonblocking, in the EAGAIN sense. */
2001 void qnonblock(struct queue *q, bool onoff)
2002 {
2003         if (onoff)
2004                 q->state |= Qnonblock;
2005         else
2006                 q->state &= ~Qnonblock;
2007 }
2008
2009 /*
2010  *  flush the output queue
2011  */
2012 void qflush(struct queue *q)
2013 {
2014         struct block *bfirst;
2015
2016         /* mark it */
2017         spin_lock_irqsave(&q->lock);
2018         bfirst = q->bfirst;
2019         q->bfirst = 0;
2020         q->len = 0;
2021         q->dlen = 0;
2022         spin_unlock_irqsave(&q->lock);
2023
2024         /* free queued blocks */
2025         freeblist(bfirst);
2026
2027         /* wake up writers */
2028         rendez_wakeup(&q->wr);
2029         qwake_cb(q, FDTAP_FILT_WRITABLE);
2030 }
2031
2032 int qfull(struct queue *q)
2033 {
2034         return q->len >= q->limit;
2035 }
2036
2037 int qstate(struct queue *q)
2038 {
2039         return q->state;
2040 }
2041
2042 void qdump(struct queue *q)
2043 {
2044         if (q)
2045                 printk("q=%p bfirst=%p blast=%p len=%d dlen=%d limit=%d state=#%x\n",
2046                            q, q->bfirst, q->blast, q->len, q->dlen, q->limit, q->state);
2047 }
2048
2049 /* On certain wakeup events, qio will call func(q, data, filter), where filter
2050  * marks the type of wakeup event (flags from FDTAP).
2051  *
2052  * There's no sync protection.  If you change the CB while the qio is running,
2053  * you might get a CB with the data or func from a previous set_wake_cb.  You
2054  * should set this once per queue and forget it.
2055  *
2056  * You can remove the CB by passing in 0 for the func.  Alternatively, you can
2057  * just make sure that the func(data) pair are valid until the queue is freed or
2058  * reopened. */
2059 void qio_set_wake_cb(struct queue *q, qio_wake_cb_t func, void *data)
2060 {
2061         q->wake_data = data;
2062         wmb();  /* if we see func, we'll also see the data for it */
2063         q->wake_cb = func;
2064 }