qio: Fire read taps on actual edges
[akaros.git] / kern / src / ns / qio.c
1 /* Copyright © 1994-1999 Lucent Technologies Inc.  All rights reserved.
2  * Portions Copyright © 1997-1999 Vita Nuova Limited
3  * Portions Copyright © 2000-2007 Vita Nuova Holdings Limited
4  *                                (www.vitanuova.com)
5  * Revisions Copyright © 2000-2007 Lucent Technologies Inc. and others
6  *
7  * Modified for the Akaros operating system:
8  * Copyright (c) 2013-2014 The Regents of the University of California
9  * Copyright (c) 2013-2015 Google Inc.
10  *
11  * Permission is hereby granted, free of charge, to any person obtaining a copy
12  * of this software and associated documentation files (the "Software"), to deal
13  * in the Software without restriction, including without limitation the rights
14  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15  * copies of the Software, and to permit persons to whom the Software is
16  * furnished to do so, subject to the following conditions:
17  *
18  * The above copyright notice and this permission notice shall be included in
19  * all copies or substantial portions of the Software.
20  *
21  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL THE
24  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
27  * SOFTWARE. */
28
29 #include <vfs.h>
30 #include <kfs.h>
31 #include <slab.h>
32 #include <kmalloc.h>
33 #include <kref.h>
34 #include <string.h>
35 #include <stdio.h>
36 #include <assert.h>
37 #include <error.h>
38 #include <cpio.h>
39 #include <pmap.h>
40 #include <smp.h>
41 #include <ip.h>
42
43 #define PANIC_EXTRA(b)                                                  \
44 {                                                                       \
45         if ((b)->extra_len) {                                           \
46                 printblock(b);                                          \
47                 backtrace();                                            \
48                 panic("%s doesn't handle extra_data", __FUNCTION__);    \
49         }                                                               \
50 }
51
52 static uint32_t padblockcnt;
53 static uint32_t concatblockcnt;
54 static uint32_t pullupblockcnt;
55 static uint32_t copyblockcnt;
56 static uint32_t consumecnt;
57 static uint32_t producecnt;
58 static uint32_t qcopycnt;
59
60 static int debugging;
61
62 #define QDEBUG  if(0)
63
64 /*
65  *  IO queues
66  */
67
68 struct queue {
69         spinlock_t lock;;
70
71         struct block *bfirst;           /* buffer */
72         struct block *blast;
73
74         int len;                                        /* bytes allocated to queue */
75         int dlen;                                       /* data bytes in queue */
76         int limit;                                      /* max bytes in queue */
77         int inilim;                             /* initial limit */
78         int state;
79         int eof;                                        /* number of eofs read by user */
80
81         void (*kick) (void *);          /* restart output */
82         void (*bypass) (void *, struct block *);        /* bypass queue altogether */
83         void *arg;                                      /* argument to kick */
84
85         qlock_t rlock;                          /* mutex for reading processes */
86         struct rendez rr;                       /* process waiting to read */
87         qlock_t wlock;                          /* mutex for writing processes */
88         struct rendez wr;                       /* process waiting to write */
89         qio_wake_cb_t wake_cb;          /* callbacks for qio wakeups */
90         void *wake_data;
91
92         char err[ERRMAX];
93 };
94
95 enum {
96         Maxatomic = 64 * 1024,
97 };
98
99 unsigned int qiomaxatomic = Maxatomic;
100
101 /* Helper: fires a wake callback, sending 'filter' */
102 static void qwake_cb(struct queue *q, int filter)
103 {
104         if (q->wake_cb)
105                 q->wake_cb(q, q->wake_data, filter);
106 }
107
108 void ixsummary(void)
109 {
110         debugging ^= 1;
111         iallocsummary();
112         printd("pad %lu, concat %lu, pullup %lu, copy %lu\n",
113                    padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
114         printd("consume %lu, produce %lu, qcopy %lu\n",
115                    consumecnt, producecnt, qcopycnt);
116 }
117
118 /*
119  *  free a list of blocks
120  */
121 void freeblist(struct block *b)
122 {
123         struct block *next;
124
125         for (; b != 0; b = next) {
126                 next = b->next;
127                 b->next = 0;
128                 freeb(b);
129         }
130 }
131
132 /*
133  *  pad a block to the front (or the back if size is negative)
134  */
135 struct block *padblock(struct block *bp, int size)
136 {
137         int n;
138         struct block *nbp;
139         uint8_t bcksum = bp->flag & BCKSUM_FLAGS;
140         uint16_t checksum_start = bp->checksum_start;
141         uint16_t checksum_offset = bp->checksum_offset;
142         uint16_t mss = bp->mss;
143
144         QDEBUG checkb(bp, "padblock 1");
145         if (size >= 0) {
146                 if (bp->rp - bp->base >= size) {
147                         bp->checksum_start += size;
148                         bp->rp -= size;
149                         return bp;
150                 }
151
152                 PANIC_EXTRA(bp);
153                 if (bp->next)
154                         panic("padblock %p", getcallerpc(&bp));
155                 n = BLEN(bp);
156                 padblockcnt++;
157                 nbp = allocb(size + n);
158                 nbp->rp += size;
159                 nbp->wp = nbp->rp;
160                 memmove(nbp->wp, bp->rp, n);
161                 nbp->wp += n;
162                 freeb(bp);
163                 nbp->rp -= size;
164         } else {
165                 size = -size;
166
167                 PANIC_EXTRA(bp);
168
169                 if (bp->next)
170                         panic("padblock %p", getcallerpc(&bp));
171
172                 if (bp->lim - bp->wp >= size)
173                         return bp;
174
175                 n = BLEN(bp);
176                 padblockcnt++;
177                 nbp = allocb(size + n);
178                 memmove(nbp->wp, bp->rp, n);
179                 nbp->wp += n;
180                 freeb(bp);
181         }
182         if (bcksum) {
183                 nbp->flag |= bcksum;
184                 nbp->checksum_start = checksum_start;
185                 nbp->checksum_offset = checksum_offset;
186                 nbp->mss = mss;
187         }
188         QDEBUG checkb(nbp, "padblock 1");
189         return nbp;
190 }
191
192 /*
193  *  return count of bytes in a string of blocks
194  */
195 int blocklen(struct block *bp)
196 {
197         int len;
198
199         len = 0;
200         while (bp) {
201                 len += BLEN(bp);
202                 bp = bp->next;
203         }
204         return len;
205 }
206
207 /*
208  * return count of space in blocks
209  */
210 int blockalloclen(struct block *bp)
211 {
212         int len;
213
214         len = 0;
215         while (bp) {
216                 len += BALLOC(bp);
217                 bp = bp->next;
218         }
219         return len;
220 }
221
222 /*
223  *  copy the  string of blocks into
224  *  a single block and free the string
225  */
226 struct block *concatblock(struct block *bp)
227 {
228         int len;
229         struct block *nb, *f;
230
231         if (bp->next == 0)
232                 return bp;
233
234         /* probably use parts of qclone */
235         PANIC_EXTRA(bp);
236         nb = allocb(blocklen(bp));
237         for (f = bp; f; f = f->next) {
238                 len = BLEN(f);
239                 memmove(nb->wp, f->rp, len);
240                 nb->wp += len;
241         }
242         concatblockcnt += BLEN(nb);
243         freeblist(bp);
244         QDEBUG checkb(nb, "concatblock 1");
245         return nb;
246 }
247
248 /* Returns a block with the remaining contents of b all in the main body of the
249  * returned block.  Replace old references to b with the returned value (which
250  * may still be 'b', if no change was needed. */
251 struct block *linearizeblock(struct block *b)
252 {
253         struct block *newb;
254         size_t len;
255         struct extra_bdata *ebd;
256
257         if (!b->extra_len)
258                 return b;
259
260         newb = allocb(BLEN(b));
261         len = BHLEN(b);
262         memcpy(newb->wp, b->rp, len);
263         newb->wp += len;
264         len = b->extra_len;
265         for (int i = 0; (i < b->nr_extra_bufs) && len; i++) {
266                 ebd = &b->extra_data[i];
267                 if (!ebd->base || !ebd->len)
268                         continue;
269                 memcpy(newb->wp, (void*)(ebd->base + ebd->off), ebd->len);
270                 newb->wp += ebd->len;
271                 len -= ebd->len;
272         }
273         /* TODO: any other flags that need copied over? */
274         if (b->flag & BCKSUM_FLAGS) {
275                 newb->flag |= (b->flag & BCKSUM_FLAGS);
276                 newb->checksum_start = b->checksum_start;
277                 newb->checksum_offset = b->checksum_offset;
278                 newb->mss = b->mss;
279         }
280         freeb(b);
281         return newb;
282 }
283
284 /*
285  *  make sure the first block has at least n bytes in its main body
286  */
287 struct block *pullupblock(struct block *bp, int n)
288 {
289         int i, len, seglen;
290         struct block *nbp;
291         struct extra_bdata *ebd;
292
293         /*
294          *  this should almost always be true, it's
295          *  just to avoid every caller checking.
296          */
297         if (BHLEN(bp) >= n)
298                 return bp;
299
300          /* a start at explicit main-body / header management */
301         if (bp->extra_len) {
302                 if (n > bp->lim - bp->rp) {
303                         /* would need to realloc a new block and copy everything over. */
304                         panic("can't pullup %d bytes, no place to put it: bp->lim %p, bp->rp %p, bp->lim-bp->rp %d\n",
305                                         n, bp->lim, bp->rp, bp->lim-bp->rp);
306                 }
307                 len = n - BHLEN(bp);
308                 if (len > bp->extra_len)
309                         panic("pullup more than extra (%d, %d, %d)\n",
310                               n, BHLEN(bp), bp->extra_len);
311                 checkb(bp, "before pullup");
312                 for (int i = 0; (i < bp->nr_extra_bufs) && len; i++) {
313                         ebd = &bp->extra_data[i];
314                         if (!ebd->base || !ebd->len)
315                                 continue;
316                         seglen = MIN(ebd->len, len);
317                         memcpy(bp->wp, (void*)(ebd->base + ebd->off), seglen);
318                         bp->wp += seglen;
319                         len -= seglen;
320                         ebd->len -= seglen;
321                         ebd->off += seglen;
322                         bp->extra_len -= seglen;
323                         if (ebd->len == 0) {
324                                 kfree((void *)ebd->base);
325                                 ebd->off = 0;
326                                 ebd->base = 0;
327                         }
328                 }
329                 /* maybe just call pullupblock recursively here */
330                 if (len)
331                         panic("pullup %d bytes overdrawn\n", len);
332                 checkb(bp, "after pullup");
333                 return bp;
334         }
335
336         /*
337          *  if not enough room in the first block,
338          *  add another to the front of the list.
339          */
340         if (bp->lim - bp->rp < n) {
341                 nbp = allocb(n);
342                 nbp->next = bp;
343                 bp = nbp;
344         }
345
346         /*
347          *  copy bytes from the trailing blocks into the first
348          */
349         n -= BLEN(bp);
350         while ((nbp = bp->next)) {
351                 i = BLEN(nbp);
352                 if (i > n) {
353                         memmove(bp->wp, nbp->rp, n);
354                         pullupblockcnt++;
355                         bp->wp += n;
356                         nbp->rp += n;
357                         QDEBUG checkb(bp, "pullupblock 1");
358                         return bp;
359                 } else {
360                         memmove(bp->wp, nbp->rp, i);
361                         pullupblockcnt++;
362                         bp->wp += i;
363                         bp->next = nbp->next;
364                         nbp->next = 0;
365                         freeb(nbp);
366                         n -= i;
367                         if (n == 0) {
368                                 QDEBUG checkb(bp, "pullupblock 2");
369                                 return bp;
370                         }
371                 }
372         }
373         freeb(bp);
374         return 0;
375 }
376
377 /*
378  *  make sure the first block has at least n bytes in its main body
379  */
380 struct block *pullupqueue(struct queue *q, int n)
381 {
382         struct block *b;
383
384         /* TODO: lock to protect the queue links? */
385         if ((BHLEN(q->bfirst) >= n))
386                 return q->bfirst;
387         q->bfirst = pullupblock(q->bfirst, n);
388         for (b = q->bfirst; b != NULL && b->next != NULL; b = b->next) ;
389         q->blast = b;
390         return q->bfirst;
391 }
392
393 /* throw away count bytes from the front of
394  * block's extradata.  Returns count of bytes
395  * thrown away
396  */
397
398 static int pullext(struct block *bp, int count)
399 {
400         struct extra_bdata *ed;
401         int i, rem, bytes = 0;
402
403         for (i = 0; bp->extra_len && count && i < bp->nr_extra_bufs; i++) {
404                 ed = &bp->extra_data[i];
405                 rem = MIN(count, ed->len);
406                 bp->extra_len -= rem;
407                 count -= rem;
408                 bytes += rem;
409                 ed->off += rem;
410                 ed->len -= rem;
411                 if (ed->len == 0) {
412                         kfree((void *)ed->base);
413                         ed->base = 0;
414                         ed->off = 0;
415                 }
416         }
417         return bytes;
418 }
419
420 /* throw away count bytes from the end of a
421  * block's extradata.  Returns count of bytes
422  * thrown away
423  */
424
425 static int dropext(struct block *bp, int count)
426 {
427         struct extra_bdata *ed;
428         int i, rem, bytes = 0;
429
430         for (i = bp->nr_extra_bufs - 1; bp->extra_len && count && i >= 0; i--) {
431                 ed = &bp->extra_data[i];
432                 rem = MIN(count, ed->len);
433                 bp->extra_len -= rem;
434                 count -= rem;
435                 bytes += rem;
436                 ed->len -= rem;
437                 if (ed->len == 0) {
438                         kfree((void *)ed->base);
439                         ed->base = 0;
440                         ed->off = 0;
441                 }
442         }
443         return bytes;
444 }
445
446 /*
447  *  throw away up to count bytes from a
448  *  list of blocks.  Return count of bytes
449  *  thrown away.
450  */
451 static int _pullblock(struct block **bph, int count, int free)
452 {
453         struct block *bp;
454         int n, bytes;
455
456         bytes = 0;
457         if (bph == NULL)
458                 return 0;
459
460         while (*bph != NULL && count != 0) {
461                 bp = *bph;
462
463                 n = MIN(BHLEN(bp), count);
464                 bytes += n;
465                 count -= n;
466                 bp->rp += n;
467                 n = pullext(bp, count);
468                 bytes += n;
469                 count -= n;
470                 QDEBUG checkb(bp, "pullblock ");
471                 if (BLEN(bp) == 0 && (free || count)) {
472                         *bph = bp->next;
473                         bp->next = NULL;
474                         freeb(bp);
475                 }
476         }
477         return bytes;
478 }
479
480 int pullblock(struct block **bph, int count)
481 {
482         return _pullblock(bph, count, 1);
483 }
484
485 /*
486  *  trim to len bytes starting at offset
487  */
488 struct block *trimblock(struct block *bp, int offset, int len)
489 {
490         uint32_t l, trim;
491         int olen = len;
492
493         QDEBUG checkb(bp, "trimblock 1");
494         if (blocklen(bp) < offset + len) {
495                 freeblist(bp);
496                 return NULL;
497         }
498
499         l =_pullblock(&bp, offset, 0);
500         if (bp == NULL)
501                 return NULL;
502         if (l != offset) {
503                 freeblist(bp);
504                 return NULL;
505         }
506
507         while ((l = BLEN(bp)) < len) {
508                 len -= l;
509                 bp = bp->next;
510         }
511
512         trim = BLEN(bp) - len;
513         trim -= dropext(bp, trim);
514         bp->wp -= trim;
515
516         if (bp->next) {
517                 freeblist(bp->next);
518                 bp->next = NULL;
519         }
520         return bp;
521 }
522
523 /*
524  *  copy 'count' bytes into a new block
525  */
526 struct block *copyblock(struct block *bp, int count)
527 {
528         int l;
529         struct block *nbp;
530
531         QDEBUG checkb(bp, "copyblock 0");
532         nbp = allocb(count);
533         if (bp->flag & BCKSUM_FLAGS) {
534                 nbp->flag |= (bp->flag & BCKSUM_FLAGS);
535                 nbp->checksum_start = bp->checksum_start;
536                 nbp->checksum_offset = bp->checksum_offset;
537                 nbp->mss = bp->mss;
538         }
539         PANIC_EXTRA(bp);
540         for (; count > 0 && bp != 0; bp = bp->next) {
541                 l = BLEN(bp);
542                 if (l > count)
543                         l = count;
544                 memmove(nbp->wp, bp->rp, l);
545                 nbp->wp += l;
546                 count -= l;
547         }
548         if (count > 0) {
549                 memset(nbp->wp, 0, count);
550                 nbp->wp += count;
551         }
552         copyblockcnt++;
553         QDEBUG checkb(nbp, "copyblock 1");
554
555         return nbp;
556 }
557
558 /* Adjust block @bp so that its size is exactly @len.
559  * If the size is increased, fill in the new contents with zeros.
560  * If the size is decreased, discard some of the old contents at the tail. */
561 struct block *adjustblock(struct block *bp, int len)
562 {
563         struct extra_bdata *ebd;
564         int i;
565
566         if (len < 0) {
567                 freeb(bp);
568                 return NULL;
569         }
570
571         if (len == BLEN(bp))
572                 return bp;
573
574         /* Shrink within block main body. */
575         if (len <= BHLEN(bp)) {
576                 free_block_extra(bp);
577                 bp->wp = bp->rp + len;
578                 QDEBUG checkb(bp, "adjustblock 1");
579                 return bp;
580         }
581
582         /* Need to grow. */
583         if (len > BLEN(bp)) {
584                 /* Grow within block main body. */
585                 if (bp->extra_len == 0 && bp->rp + len <= bp->lim) {
586                         memset(bp->wp, 0, len - BLEN(bp));
587                         bp->wp = bp->rp + len;
588                         QDEBUG checkb(bp, "adjustblock 2");
589                         return bp;
590                 }
591                 /* Grow with extra data buffers. */
592                 block_append_extra(bp, len - BLEN(bp), KMALLOC_WAIT);
593                 QDEBUG checkb(bp, "adjustblock 3");
594                 return bp;
595         }
596
597         /* Shrink extra data buffers.
598          * len is how much of ebd we need to keep.
599          * extra_len is re-accumulated. */
600         assert(bp->extra_len > 0);
601         len -= BHLEN(bp);
602         bp->extra_len = 0;
603         for (i = 0; i < bp->nr_extra_bufs; i++) {
604                 ebd = &bp->extra_data[i];
605                 if (len <= ebd->len)
606                         break;
607                 len -= ebd->len;
608                 bp->extra_len += ebd->len;
609         }
610         /* If len becomes zero, extra_data[i] should be freed. */
611         if (len > 0) {
612                 ebd = &bp->extra_data[i];
613                 ebd->len = len;
614                 bp->extra_len += ebd->len;
615                 i++;
616         }
617         for (; i < bp->nr_extra_bufs; i++) {
618                 ebd = &bp->extra_data[i];
619                 if (ebd->base)
620                         kfree((void*)ebd->base);
621                 ebd->base = ebd->off = ebd->len = 0;
622         }
623         QDEBUG checkb(bp, "adjustblock 4");
624         return bp;
625 }
626
627
628 /*
629  *  get next block from a queue, return null if nothing there
630  */
631 struct block *qget(struct queue *q)
632 {
633         int dowakeup;
634         struct block *b;
635
636         /* sync with qwrite */
637         spin_lock_irqsave(&q->lock);
638
639         b = q->bfirst;
640         if (b == NULL) {
641                 q->state |= Qstarve;
642                 spin_unlock_irqsave(&q->lock);
643                 return NULL;
644         }
645         q->bfirst = b->next;
646         b->next = 0;
647         q->len -= BALLOC(b);
648         q->dlen -= BLEN(b);
649         QDEBUG checkb(b, "qget");
650
651         /* if writer flow controlled, restart */
652         if ((q->state & Qflow) && q->len < q->limit / 2) {
653                 q->state &= ~Qflow;
654                 dowakeup = 1;
655         } else
656                 dowakeup = 0;
657
658         spin_unlock_irqsave(&q->lock);
659
660         if (dowakeup)
661                 rendez_wakeup(&q->wr);
662         qwake_cb(q, FDTAP_FILT_WRITABLE);
663
664         return b;
665 }
666
667 /*
668  *  throw away the next 'len' bytes in the queue
669  * returning the number actually discarded
670  */
671 int qdiscard(struct queue *q, int len)
672 {
673         struct block *b;
674         int dowakeup, n, sofar, body_amt, extra_amt;
675         struct extra_bdata *ebd;
676
677         spin_lock_irqsave(&q->lock);
678         for (sofar = 0; sofar < len; sofar += n) {
679                 b = q->bfirst;
680                 if (b == NULL)
681                         break;
682                 QDEBUG checkb(b, "qdiscard");
683                 n = BLEN(b);
684                 if (n <= len - sofar) {
685                         q->bfirst = b->next;
686                         b->next = 0;
687                         q->len -= BALLOC(b);
688                         q->dlen -= BLEN(b);
689                         freeb(b);
690                 } else {
691                         n = len - sofar;
692                         q->dlen -= n;
693                         /* partial block removal */
694                         body_amt = MIN(BHLEN(b), n);
695                         b->rp += body_amt;
696                         extra_amt = n - body_amt;
697                         /* reduce q->len by the amount we remove from the extras.  The
698                          * header will always be accounted for above, during block removal.
699                          * */
700                         q->len -= extra_amt;
701                         for (int i = 0; (i < b->nr_extra_bufs) && extra_amt; i++) {
702                                 ebd = &b->extra_data[i];
703                                 if (!ebd->base || !ebd->len)
704                                         continue;
705                                 if (extra_amt >= ebd->len) {
706                                         /* remove the entire entry, note the kfree release */
707                                         b->extra_len -= ebd->len;
708                                         extra_amt -= ebd->len;
709                                         kfree((void*)ebd->base);
710                                         ebd->base = ebd->off = ebd->len = 0;
711                                         continue;
712                                 }
713                                 ebd->off += extra_amt;
714                                 ebd->len -= extra_amt;
715                                 b->extra_len -= extra_amt;
716                                 extra_amt = 0;
717                         }
718                 }
719         }
720
721         /*
722          *  if writer flow controlled, restart
723          *
724          *  This used to be
725          *  q->len < q->limit/2
726          *  but it slows down tcp too much for certain write sizes.
727          *  I really don't understand it completely.  It may be
728          *  due to the queue draining so fast that the transmission
729          *  stalls waiting for the app to produce more data.  - presotto
730          */
731         if ((q->state & Qflow) && q->len < q->limit) {
732                 q->state &= ~Qflow;
733                 dowakeup = 1;
734         } else
735                 dowakeup = 0;
736
737         spin_unlock_irqsave(&q->lock);
738
739         if (dowakeup)
740                 rendez_wakeup(&q->wr);
741         qwake_cb(q, FDTAP_FILT_WRITABLE);
742
743         return sofar;
744 }
745
746 /*
747  *  Interrupt level copy out of a queue, return # bytes copied.
748  */
749 int qconsume(struct queue *q, void *vp, int len)
750 {
751         struct block *b;
752         int n, dowakeup;
753         uint8_t *p = vp;
754         struct block *tofree = NULL;
755
756         /* sync with qwrite */
757         spin_lock_irqsave(&q->lock);
758
759         for (;;) {
760                 b = q->bfirst;
761                 if (b == 0) {
762                         q->state |= Qstarve;
763                         spin_unlock_irqsave(&q->lock);
764                         return -1;
765                 }
766                 QDEBUG checkb(b, "qconsume 1");
767
768                 n = BLEN(b);
769                 if (n > 0)
770                         break;
771                 q->bfirst = b->next;
772                 q->len -= BALLOC(b);
773
774                 /* remember to free this */
775                 b->next = tofree;
776                 tofree = b;
777         };
778
779         PANIC_EXTRA(b);
780         if (n < len)
781                 len = n;
782         memmove(p, b->rp, len);
783         consumecnt += n;
784         b->rp += len;
785         q->dlen -= len;
786
787         /* discard the block if we're done with it */
788         if ((q->state & Qmsg) || len == n) {
789                 q->bfirst = b->next;
790                 b->next = 0;
791                 q->len -= BALLOC(b);
792                 q->dlen -= BLEN(b);
793
794                 /* remember to free this */
795                 b->next = tofree;
796                 tofree = b;
797         }
798
799         /* if writer flow controlled, restart */
800         if ((q->state & Qflow) && q->len < q->limit / 2) {
801                 q->state &= ~Qflow;
802                 dowakeup = 1;
803         } else
804                 dowakeup = 0;
805
806         spin_unlock_irqsave(&q->lock);
807
808         if (dowakeup)
809                 rendez_wakeup(&q->wr);
810         qwake_cb(q, FDTAP_FILT_WRITABLE);
811
812         if (tofree != NULL)
813                 freeblist(tofree);
814
815         return len;
816 }
817
818 int qpass(struct queue *q, struct block *b)
819 {
820         int dlen, len, dowakeup;
821         bool was_empty;
822
823         /* sync with qread */
824         dowakeup = 0;
825         spin_lock_irqsave(&q->lock);
826         was_empty = q->len == 0;
827         if (q->len >= q->limit) {
828                 freeblist(b);
829                 spin_unlock_irqsave(&q->lock);
830                 return -1;
831         }
832         if (q->state & Qclosed) {
833                 len = blocklen(b);
834                 freeblist(b);
835                 spin_unlock_irqsave(&q->lock);
836                 return len;
837         }
838
839         /* add buffer to queue */
840         if (q->bfirst)
841                 q->blast->next = b;
842         else
843                 q->bfirst = b;
844         len = BALLOC(b);
845         dlen = BLEN(b);
846         QDEBUG checkb(b, "qpass");
847         while (b->next) {
848                 b = b->next;
849                 QDEBUG checkb(b, "qpass");
850                 len += BALLOC(b);
851                 dlen += BLEN(b);
852         }
853         q->blast = b;
854         q->len += len;
855         q->dlen += dlen;
856
857         if (q->len >= q->limit / 2)
858                 q->state |= Qflow;
859
860         if (q->state & Qstarve) {
861                 q->state &= ~Qstarve;
862                 dowakeup = 1;
863         }
864         spin_unlock_irqsave(&q->lock);
865
866         if (dowakeup)
867                 rendez_wakeup(&q->rr);
868         if (was_empty)
869                 qwake_cb(q, FDTAP_FILT_READABLE);
870
871         return len;
872 }
873
874 int qpassnolim(struct queue *q, struct block *b)
875 {
876         int dlen, len, dowakeup;
877         bool was_empty;
878
879         /* sync with qread */
880         dowakeup = 0;
881         spin_lock_irqsave(&q->lock);
882         was_empty = q->len == 0;
883
884         if (q->state & Qclosed) {
885                 freeblist(b);
886                 spin_unlock_irqsave(&q->lock);
887                 return BALLOC(b);
888         }
889
890         /* add buffer to queue */
891         if (q->bfirst)
892                 q->blast->next = b;
893         else
894                 q->bfirst = b;
895         len = BALLOC(b);
896         dlen = BLEN(b);
897         QDEBUG checkb(b, "qpass");
898         while (b->next) {
899                 b = b->next;
900                 QDEBUG checkb(b, "qpass");
901                 len += BALLOC(b);
902                 dlen += BLEN(b);
903         }
904         q->blast = b;
905         q->len += len;
906         q->dlen += dlen;
907
908         if (q->len >= q->limit / 2)
909                 q->state |= Qflow;
910
911         if (q->state & Qstarve) {
912                 q->state &= ~Qstarve;
913                 dowakeup = 1;
914         }
915         spin_unlock_irqsave(&q->lock);
916
917         if (dowakeup)
918                 rendez_wakeup(&q->rr);
919         if (was_empty)
920                 qwake_cb(q, FDTAP_FILT_READABLE);
921
922         return len;
923 }
924
925 /*
926  *  if the allocated space is way out of line with the used
927  *  space, reallocate to a smaller block
928  */
929 struct block *packblock(struct block *bp)
930 {
931         struct block **l, *nbp;
932         int n;
933
934         if (bp->extra_len)
935                 return bp;
936         for (l = &bp; *l; l = &(*l)->next) {
937                 nbp = *l;
938                 n = BLEN(nbp);
939                 if ((n << 2) < BALLOC(nbp)) {
940                         *l = allocb(n);
941                         memmove((*l)->wp, nbp->rp, n);
942                         (*l)->wp += n;
943                         (*l)->next = nbp->next;
944                         freeb(nbp);
945                 }
946         }
947
948         return bp;
949 }
950
951 int qproduce(struct queue *q, void *vp, int len)
952 {
953         struct block *b;
954         int dowakeup;
955         uint8_t *p = vp;
956         bool was_empty;
957
958         /* sync with qread */
959         dowakeup = 0;
960         spin_lock_irqsave(&q->lock);
961         was_empty = q->len == 0;
962
963         /* no waiting receivers, room in buffer? */
964         if (q->len >= q->limit) {
965                 q->state |= Qflow;
966                 spin_unlock_irqsave(&q->lock);
967                 return -1;
968         }
969
970         /* save in buffer */
971         /* use Qcoalesce here to save storage */
972         // TODO: Consider removing the Qcoalesce flag and force a coalescing
973         // strategy by default.
974         b = q->blast;
975         if ((q->state & Qcoalesce) == 0 || q->bfirst == NULL
976                 || b->lim - b->wp < len) {
977                 /* need a new block */
978                 b = iallocb(len);
979                 if (b == 0) {
980                         spin_unlock_irqsave(&q->lock);
981                         return 0;
982                 }
983                 if (q->bfirst)
984                         q->blast->next = b;
985                 else
986                         q->bfirst = b;
987                 q->blast = b;
988                 /* b->next = 0; done by iallocb() */
989                 q->len += BALLOC(b);
990         }
991         PANIC_EXTRA(b);
992         memmove(b->wp, p, len);
993         producecnt += len;
994         b->wp += len;
995         q->dlen += len;
996         QDEBUG checkb(b, "qproduce");
997
998         if (q->state & Qstarve) {
999                 q->state &= ~Qstarve;
1000                 dowakeup = 1;
1001         }
1002
1003         if (q->len >= q->limit)
1004                 q->state |= Qflow;
1005         spin_unlock_irqsave(&q->lock);
1006
1007         if (dowakeup)
1008                 rendez_wakeup(&q->rr);
1009         if (was_empty)
1010                 qwake_cb(q, FDTAP_FILT_READABLE);
1011
1012         return len;
1013 }
1014
1015 /* Add an extra_data entry to newb at newb_idx pointing to b's body, starting at
1016  * body_rp, for up to len.  Returns the len consumed. 
1017  *
1018  * The base is 'b', so that we can kfree it later.  This currently ties us to
1019  * using kfree for the release method for all extra_data.
1020  *
1021  * It is possible to have a body size that is 0, if there is no offset, and
1022  * b->wp == b->rp.  This will have an extra data entry of 0 length. */
1023 static size_t point_to_body(struct block *b, uint8_t *body_rp,
1024                             struct block *newb, unsigned int newb_idx,
1025                             size_t len)
1026 {
1027         struct extra_bdata *ebd = &newb->extra_data[newb_idx];
1028
1029         assert(newb_idx < newb->nr_extra_bufs);
1030
1031         kmalloc_incref(b);
1032         ebd->base = (uintptr_t)b;
1033         ebd->off = (uint32_t)(body_rp - (uint8_t*)b);
1034         ebd->len = MIN(b->wp - body_rp, len);   /* think of body_rp as b->rp */
1035         assert((int)ebd->len >= 0);
1036         newb->extra_len += ebd->len;
1037         return ebd->len;
1038 }
1039
1040 /* Add an extra_data entry to newb at newb_idx pointing to b's b_idx'th
1041  * extra_data buf, at b_off within that buffer, for up to len.  Returns the len
1042  * consumed.
1043  *
1044  * We can have blocks with 0 length, but they are still refcnt'd.  See above. */
1045 static size_t point_to_buf(struct block *b, unsigned int b_idx, uint32_t b_off,
1046                            struct block *newb, unsigned int newb_idx,
1047                            size_t len)
1048 {
1049         struct extra_bdata *n_ebd = &newb->extra_data[newb_idx];
1050         struct extra_bdata *b_ebd = &b->extra_data[b_idx];
1051
1052         assert(b_idx < b->nr_extra_bufs);
1053         assert(newb_idx < newb->nr_extra_bufs);
1054
1055         kmalloc_incref((void*)b_ebd->base);
1056         n_ebd->base = b_ebd->base;
1057         n_ebd->off = b_ebd->off + b_off;
1058         n_ebd->len = MIN(b_ebd->len - b_off, len);
1059         newb->extra_len += n_ebd->len;
1060         return n_ebd->len;
1061 }
1062
1063 /* given a string of blocks, fills the new block's extra_data  with the contents
1064  * of the blist [offset, len + offset)
1065  *
1066  * returns 0 on success.  the only failure is if the extra_data array was too
1067  * small, so this returns a positive integer saying how big the extra_data needs
1068  * to be.
1069  *
1070  * callers are responsible for protecting the list structure. */
1071 static int __blist_clone_to(struct block *blist, struct block *newb, int len,
1072                             uint32_t offset)
1073 {
1074         struct block *b, *first;
1075         unsigned int nr_bufs = 0;
1076         unsigned int b_idx, newb_idx = 0;
1077         uint8_t *first_main_body = 0;
1078
1079         /* find the first block; keep offset relative to the latest b in the list */
1080         for (b = blist; b; b = b->next) {
1081                 if (BLEN(b) > offset)
1082                         break;
1083                 offset -= BLEN(b);
1084         }
1085         /* qcopy semantics: if you asked for an offset outside the block list, you
1086          * get an empty block back */
1087         if (!b)
1088                 return 0;
1089         first = b;
1090         /* upper bound for how many buffers we'll need in newb */
1091         for (/* b is set*/; b; b = b->next) {
1092                 nr_bufs += 1 + b->nr_extra_bufs;        /* 1 for the main body */
1093         }
1094         /* we might be holding a spinlock here, so we won't wait for kmalloc */
1095         if (block_add_extd(newb, nr_bufs, 0) != 0) {
1096                 /* caller will need to alloc these, then re-call us */
1097                 return nr_bufs;
1098         }
1099         for (b = first; b && len; b = b->next) {
1100                 b_idx = 0;
1101                 if (offset) {
1102                         if (offset < BHLEN(b)) {
1103                                 /* off is in the main body */
1104                                 len -= point_to_body(b, b->rp + offset, newb, newb_idx, len);
1105                                 newb_idx++;
1106                         } else {
1107                                 /* off is in one of the buffers (or just past the last one).
1108                                  * we're not going to point to b's main body at all. */
1109                                 offset -= BHLEN(b);
1110                                 assert(b->extra_data);
1111                                 /* assuming these extrabufs are packed, or at least that len
1112                                  * isn't gibberish */
1113                                 while (b->extra_data[b_idx].len <= offset) {
1114                                         offset -= b->extra_data[b_idx].len;
1115                                         b_idx++;
1116                                 }
1117                                 /* now offset is set to our offset in the b_idx'th buf */
1118                                 len -= point_to_buf(b, b_idx, offset, newb, newb_idx, len);
1119                                 newb_idx++;
1120                                 b_idx++;
1121                         }
1122                         offset = 0;
1123                 } else {
1124                         len -= point_to_body(b, b->rp, newb, newb_idx, len);
1125                         newb_idx++;
1126                 }
1127                 /* knock out all remaining bufs.  we only did one point_to_ op by now,
1128                  * and any point_to_ could be our last if it consumed all of len. */
1129                 for (int i = b_idx; (i < b->nr_extra_bufs) && len; i++) {
1130                         len -= point_to_buf(b, i, 0, newb, newb_idx, len);
1131                         newb_idx++;
1132                 }
1133         }
1134         return 0;
1135 }
1136
1137 struct block *blist_clone(struct block *blist, int header_len, int len,
1138                           uint32_t offset)
1139 {
1140         int ret;
1141         struct block *newb = allocb(header_len);
1142         do {
1143                 ret = __blist_clone_to(blist, newb, len, offset);
1144                 if (ret)
1145                         block_add_extd(newb, ret, KMALLOC_WAIT);
1146         } while (ret);
1147         return newb;
1148 }
1149
1150 /* given a queue, makes a single block with header_len reserved space in the
1151  * block main body, and the contents of [offset, len + offset) pointed to in the
1152  * new blocks ext_data. */
1153 struct block *qclone(struct queue *q, int header_len, int len, uint32_t offset)
1154 {
1155         int ret;
1156         struct block *newb = allocb(header_len);
1157         /* the while loop should rarely be used: it would require someone
1158          * concurrently adding to the queue. */
1159         do {
1160                 /* TODO: RCU: protecting the q list (b->next) (need read lock) */
1161                 spin_lock_irqsave(&q->lock);
1162                 ret = __blist_clone_to(q->bfirst, newb, len, offset);
1163                 spin_unlock_irqsave(&q->lock);
1164                 if (ret)
1165                         block_add_extd(newb, ret, KMALLOC_WAIT);
1166         } while (ret);
1167         return newb;
1168 }
1169
1170 /*
1171  *  copy from offset in the queue
1172  */
1173 struct block *qcopy_old(struct queue *q, int len, uint32_t offset)
1174 {
1175         int sofar;
1176         int n;
1177         struct block *b, *nb;
1178         uint8_t *p;
1179
1180         nb = allocb(len);
1181
1182         spin_lock_irqsave(&q->lock);
1183
1184         /* go to offset */
1185         b = q->bfirst;
1186         for (sofar = 0;; sofar += n) {
1187                 if (b == NULL) {
1188                         spin_unlock_irqsave(&q->lock);
1189                         return nb;
1190                 }
1191                 n = BLEN(b);
1192                 if (sofar + n > offset) {
1193                         p = b->rp + offset - sofar;
1194                         n -= offset - sofar;
1195                         break;
1196                 }
1197                 QDEBUG checkb(b, "qcopy");
1198                 b = b->next;
1199         }
1200
1201         /* copy bytes from there */
1202         for (sofar = 0; sofar < len;) {
1203                 if (n > len - sofar)
1204                         n = len - sofar;
1205                 PANIC_EXTRA(b);
1206                 memmove(nb->wp, p, n);
1207                 qcopycnt += n;
1208                 sofar += n;
1209                 nb->wp += n;
1210                 b = b->next;
1211                 if (b == NULL)
1212                         break;
1213                 n = BLEN(b);
1214                 p = b->rp;
1215         }
1216         spin_unlock_irqsave(&q->lock);
1217
1218         return nb;
1219 }
1220
1221 struct block *qcopy(struct queue *q, int len, uint32_t offset)
1222 {
1223 #ifdef CONFIG_BLOCK_EXTRAS
1224         return qclone(q, 0, len, offset);
1225 #else
1226         return qcopy_old(q, len, offset);
1227 #endif
1228 }
1229
1230 static void qinit_common(struct queue *q)
1231 {
1232         spinlock_init_irqsave(&q->lock);
1233         qlock_init(&q->rlock);
1234         qlock_init(&q->wlock);
1235         rendez_init(&q->rr);
1236         rendez_init(&q->wr);
1237 }
1238
1239 /*
1240  *  called by non-interrupt code
1241  */
1242 struct queue *qopen(int limit, int msg, void (*kick) (void *), void *arg)
1243 {
1244         struct queue *q;
1245
1246         q = kzmalloc(sizeof(struct queue), 0);
1247         if (q == 0)
1248                 return 0;
1249         qinit_common(q);
1250
1251         q->limit = q->inilim = limit;
1252         q->kick = kick;
1253         q->arg = arg;
1254         q->state = msg;
1255         q->state |= Qstarve;
1256         q->eof = 0;
1257
1258         return q;
1259 }
1260
1261 /* open a queue to be bypassed */
1262 struct queue *qbypass(void (*bypass) (void *, struct block *), void *arg)
1263 {
1264         struct queue *q;
1265
1266         q = kzmalloc(sizeof(struct queue), 0);
1267         if (q == 0)
1268                 return 0;
1269         qinit_common(q);
1270
1271         q->limit = 0;
1272         q->arg = arg;
1273         q->bypass = bypass;
1274         q->state = 0;
1275
1276         return q;
1277 }
1278
1279 static int notempty(void *a)
1280 {
1281         struct queue *q = a;
1282
1283         return (q->state & Qclosed) || q->bfirst != 0;
1284 }
1285
1286 /* Wait for the queue to be non-empty or closed.  Returns TRUE for a successful
1287  * wait, FALSE on Qclose (without error)
1288  *
1289  * Called with q ilocked.  May error out, back through the caller, with
1290  * the irqsave lock unlocked.  */
1291 static bool qwait(struct queue *q)
1292 {
1293         /* wait for data */
1294         for (;;) {
1295                 if (q->bfirst != NULL)
1296                         break;
1297
1298                 if (q->state & Qclosed) {
1299                         if (++q->eof > 3) {
1300                                 spin_unlock_irqsave(&q->lock);
1301                                 error(EFAIL, "multiple reads on a closed queue");
1302                         }
1303                         if (q->err[0]) {
1304                                 spin_unlock_irqsave(&q->lock);
1305                                 error(EFAIL, q->err);
1306                         }
1307                         return FALSE;
1308                 }
1309                 /* We set Qstarve regardless of whether we are non-blocking or not.
1310                  * Qstarve tracks the edge detection of the queue being empty. */
1311                 q->state |= Qstarve;
1312                 if (q->state & Qnonblock) {
1313                         spin_unlock_irqsave(&q->lock);
1314                         error(EAGAIN, "queue empty");
1315                 }
1316                 spin_unlock_irqsave(&q->lock);
1317                 /* may throw an error() */
1318                 rendez_sleep(&q->rr, notempty, q);
1319                 spin_lock_irqsave(&q->lock);
1320         }
1321         return TRUE;
1322 }
1323
1324 /*
1325  * add a block list to a queue
1326  */
1327 void qaddlist(struct queue *q, struct block *b)
1328 {
1329         /* TODO: q lock? */
1330         /* queue the block */
1331         if (q->bfirst)
1332                 q->blast->next = b;
1333         else
1334                 q->bfirst = b;
1335         q->len += blockalloclen(b);
1336         q->dlen += blocklen(b);
1337         while (b->next)
1338                 b = b->next;
1339         q->blast = b;
1340 }
1341
1342 /*
1343  *  called with q ilocked
1344  */
1345 struct block *qremove(struct queue *q)
1346 {
1347         struct block *b;
1348
1349         b = q->bfirst;
1350         if (b == NULL)
1351                 return NULL;
1352         q->bfirst = b->next;
1353         b->next = NULL;
1354         q->dlen -= BLEN(b);
1355         q->len -= BALLOC(b);
1356         QDEBUG checkb(b, "qremove");
1357         return b;
1358 }
1359
1360 static size_t read_from_block(struct block *b, uint8_t *to, size_t amt)
1361 {
1362         size_t copy_amt, retval = 0;
1363         struct extra_bdata *ebd;
1364         
1365         copy_amt = MIN(BHLEN(b), amt);
1366         memcpy(to, b->rp, copy_amt);
1367         /* advance the rp, since this block not be completely consumed and future
1368          * reads need to know where to pick up from */
1369         b->rp += copy_amt;
1370         to += copy_amt;
1371         amt -= copy_amt;
1372         retval += copy_amt;
1373         for (int i = 0; (i < b->nr_extra_bufs) && amt; i++) {
1374                 ebd = &b->extra_data[i];
1375                 /* skip empty entires.  if we track this in the struct block, we can
1376                  * just start the for loop early */
1377                 if (!ebd->base || !ebd->len)
1378                         continue;
1379                 copy_amt = MIN(ebd->len, amt);
1380                 memcpy(to, (void*)(ebd->base + ebd->off), copy_amt);
1381                 /* we're actually consuming the entries, just like how we advance rp up
1382                  * above, and might only consume part of one. */
1383                 ebd->len -= copy_amt;
1384                 ebd->off += copy_amt;
1385                 b->extra_len -= copy_amt;
1386                 if (!ebd->len) {
1387                         /* we don't actually have to decref here.  it's also done in
1388                          * freeb().  this is the earliest we can free. */
1389                         kfree((void*)ebd->base);
1390                         ebd->base = ebd->off = 0;
1391                 }
1392                 to += copy_amt;
1393                 amt -= copy_amt;
1394                 retval += copy_amt;
1395         }
1396         return retval;
1397 }
1398
1399 /*
1400  *  copy the contents of a string of blocks into
1401  *  memory.  emptied blocks are freed.  return
1402  *  pointer to first unconsumed block.
1403  */
1404 struct block *bl2mem(uint8_t * p, struct block *b, int n)
1405 {
1406         int i;
1407         struct block *next;
1408
1409         /* could be slicker here, since read_from_block is smart */
1410         for (; b != NULL; b = next) {
1411                 i = BLEN(b);
1412                 if (i > n) {
1413                         /* partial block, consume some */
1414                         read_from_block(b, p, n);
1415                         return b;
1416                 }
1417                 /* full block, consume all and move on */
1418                 i = read_from_block(b, p, i);
1419                 n -= i;
1420                 p += i;
1421                 next = b->next;
1422                 freeb(b);
1423         }
1424         return NULL;
1425 }
1426
1427 /*
1428  *  copy the contents of memory into a string of blocks.
1429  *  return NULL on error.
1430  */
1431 struct block *mem2bl(uint8_t * p, int len)
1432 {
1433         ERRSTACK(1);
1434         int n;
1435         struct block *b, *first, **l;
1436
1437         first = NULL;
1438         l = &first;
1439         if (waserror()) {
1440                 freeblist(first);
1441                 nexterror();
1442         }
1443         do {
1444                 n = len;
1445                 if (n > Maxatomic)
1446                         n = Maxatomic;
1447
1448                 *l = b = allocb(n);
1449                 /* TODO consider extra_data */
1450                 memmove(b->wp, p, n);
1451                 b->wp += n;
1452                 p += n;
1453                 len -= n;
1454                 l = &b->next;
1455         } while (len > 0);
1456         poperror();
1457
1458         return first;
1459 }
1460
1461 /*
1462  *  put a block back to the front of the queue
1463  *  called with q ilocked
1464  */
1465 void qputback(struct queue *q, struct block *b)
1466 {
1467         b->next = q->bfirst;
1468         if (q->bfirst == NULL)
1469                 q->blast = b;
1470         q->bfirst = b;
1471         q->len += BALLOC(b);
1472         q->dlen += BLEN(b);
1473 }
1474
1475 /*
1476  *  flow control, get producer going again
1477  *  called with q ilocked
1478  */
1479 static void qwakeup_iunlock(struct queue *q)
1480 {
1481         int dowakeup = 0;
1482
1483         /* if writer flow controlled, restart */
1484         if ((q->state & Qflow) && q->len < q->limit / 2) {
1485                 q->state &= ~Qflow;
1486                 dowakeup = 1;
1487         }
1488
1489         spin_unlock_irqsave(&q->lock);
1490
1491         /* wakeup flow controlled writers */
1492         if (dowakeup) {
1493                 if (q->kick)
1494                         q->kick(q->arg);
1495                 rendez_wakeup(&q->wr);
1496         }
1497         qwake_cb(q, FDTAP_FILT_WRITABLE);
1498 }
1499
1500 /*
1501  *  get next block from a queue (up to a limit)
1502  */
1503 struct block *qbread(struct queue *q, int len)
1504 {
1505         ERRSTACK(1);
1506         struct block *b, *nb;
1507         int n;
1508
1509         qlock(&q->rlock);
1510         if (waserror()) {
1511                 qunlock(&q->rlock);
1512                 nexterror();
1513         }
1514
1515         spin_lock_irqsave(&q->lock);
1516         if (!qwait(q)) {
1517                 /* queue closed */
1518                 spin_unlock_irqsave(&q->lock);
1519                 qunlock(&q->rlock);
1520                 poperror();
1521                 return NULL;
1522         }
1523
1524         /* if we get here, there's at least one block in the queue */
1525         b = qremove(q);
1526         n = BLEN(b);
1527
1528         /* split block if it's too big and this is not a message queue */
1529         nb = b;
1530         if (n > len) {
1531                 PANIC_EXTRA(b);
1532                 if ((q->state & Qmsg) == 0) {
1533                         n -= len;
1534                         b = allocb(n);
1535                         memmove(b->wp, nb->rp + len, n);
1536                         b->wp += n;
1537                         qputback(q, b);
1538                 }
1539                 nb->wp = nb->rp + len;
1540         }
1541
1542         /* restart producer */
1543         qwakeup_iunlock(q);
1544
1545         poperror();
1546         qunlock(&q->rlock);
1547         return nb;
1548 }
1549
1550 /*
1551  *  read a queue.  if no data is queued, post a struct block
1552  *  and wait on its Rendez.
1553  */
1554 long qread(struct queue *q, void *vp, int len)
1555 {
1556         ERRSTACK(1);
1557         struct block *b, *first, **l;
1558         int m, n;
1559
1560         qlock(&q->rlock);
1561         if (waserror()) {
1562                 qunlock(&q->rlock);
1563                 nexterror();
1564         }
1565
1566         spin_lock_irqsave(&q->lock);
1567 again:
1568         if (!qwait(q)) {
1569                 /* queue closed */
1570                 spin_unlock_irqsave(&q->lock);
1571                 qunlock(&q->rlock);
1572                 poperror();
1573                 return 0;
1574         }
1575
1576         /* if we get here, there's at least one block in the queue */
1577         // TODO: Consider removing the Qcoalesce flag and force a coalescing
1578         // strategy by default.
1579         if (q->state & Qcoalesce) {
1580                 /* when coalescing, 0 length blocks just go away */
1581                 b = q->bfirst;
1582                 if (BLEN(b) <= 0) {
1583                         freeb(qremove(q));
1584                         goto again;
1585                 }
1586
1587                 /*  grab the first block plus as many
1588                  *  following blocks as will completely
1589                  *  fit in the read.
1590                  */
1591                 n = 0;
1592                 l = &first;
1593                 m = BLEN(b);
1594                 for (;;) {
1595                         *l = qremove(q);
1596                         l = &b->next;
1597                         n += m;
1598
1599                         b = q->bfirst;
1600                         if (b == NULL)
1601                                 break;
1602                         m = BLEN(b);
1603                         if (n + m > len)
1604                                 break;
1605                 }
1606         } else {
1607                 first = qremove(q);
1608                 n = BLEN(first);
1609         }
1610
1611         /* copy to user space outside of the ilock */
1612         spin_unlock_irqsave(&q->lock);
1613         b = bl2mem(vp, first, len);
1614         spin_lock_irqsave(&q->lock);
1615
1616         /* take care of any left over partial block */
1617         if (b != NULL) {
1618                 n -= BLEN(b);
1619                 if (q->state & Qmsg)
1620                         freeb(b);
1621                 else
1622                         qputback(q, b);
1623         }
1624
1625         /* restart producer */
1626         qwakeup_iunlock(q);
1627
1628         poperror();
1629         qunlock(&q->rlock);
1630         return n;
1631 }
1632
1633 static int qnotfull(void *a)
1634 {
1635         struct queue *q = a;
1636
1637         return q->len < q->limit || (q->state & Qclosed);
1638 }
1639
1640 uint32_t dropcnt;
1641
1642 /*
1643  *  add a block to a queue obeying flow control
1644  */
1645 long qbwrite(struct queue *q, struct block *b)
1646 {
1647         ERRSTACK(1);
1648         int n, dowakeup;
1649         volatile bool should_free_b = TRUE;
1650         bool was_empty;
1651
1652         n = BLEN(b);
1653
1654         if (q->bypass) {
1655                 (*q->bypass) (q->arg, b);
1656                 return n;
1657         }
1658
1659         dowakeup = 0;
1660         qlock(&q->wlock);
1661         if (waserror()) {
1662                 if (b != NULL && should_free_b)
1663                         freeb(b);
1664                 qunlock(&q->wlock);
1665                 nexterror();
1666         }
1667
1668         spin_lock_irqsave(&q->lock);
1669         was_empty = q->len == 0;
1670
1671         /* give up if the queue is closed */
1672         if (q->state & Qclosed) {
1673                 spin_unlock_irqsave(&q->lock);
1674                 if (q->err[0])
1675                         error(EFAIL, q->err);
1676                 else
1677                         error(EFAIL, "connection closed");
1678         }
1679
1680         /* if nonblocking, don't queue over the limit */
1681         if (q->len >= q->limit) {
1682                 /* drop overflow takes priority over regular non-blocking */
1683                 if (q->state & Qdropoverflow) {
1684                         spin_unlock_irqsave(&q->lock);
1685                         freeb(b);
1686                         dropcnt += n;
1687                         qunlock(&q->wlock);
1688                         poperror();
1689                         return n;
1690                 }
1691                 if (q->state & Qnonblock) {
1692                         spin_unlock_irqsave(&q->lock);
1693                         freeb(b);
1694                         error(EAGAIN, "queue full");
1695                 }
1696         }
1697
1698         /* queue the block */
1699         should_free_b = FALSE;
1700         if (q->bfirst)
1701                 q->blast->next = b;
1702         else
1703                 q->bfirst = b;
1704         q->blast = b;
1705         b->next = 0;
1706         q->len += BALLOC(b);
1707         q->dlen += n;
1708         QDEBUG checkb(b, "qbwrite");
1709         b = NULL;
1710
1711         /* make sure other end gets awakened */
1712         if (q->state & Qstarve) {
1713                 q->state &= ~Qstarve;
1714                 dowakeup = 1;
1715         }
1716         spin_unlock_irqsave(&q->lock);
1717
1718         /*  get output going again */
1719         if (q->kick && (dowakeup || (q->state & Qkick)))
1720                 q->kick(q->arg);
1721
1722         /* wakeup anyone consuming at the other end */
1723         if (dowakeup)
1724                 rendez_wakeup(&q->rr);
1725         if (was_empty)
1726                 qwake_cb(q, FDTAP_FILT_READABLE);
1727
1728         /*
1729          *  flow control, wait for queue to get below the limit
1730          *  before allowing the process to continue and queue
1731          *  more.  We do this here so that postnote can only
1732          *  interrupt us after the data has been queued.  This
1733          *  means that things like 9p flushes and ssl messages
1734          *  will not be disrupted by software interrupts.
1735          *
1736          *  Note - this is moderately dangerous since a process
1737          *  that keeps getting interrupted and rewriting will
1738          *  queue infinite crud.
1739          */
1740         for (;;) {
1741                 if ((q->state & (Qdropoverflow | Qnonblock)) || qnotfull(q))
1742                         break;
1743
1744                 spin_lock_irqsave(&q->lock);
1745                 q->state |= Qflow;
1746                 spin_unlock_irqsave(&q->lock);
1747                 rendez_sleep(&q->wr, qnotfull, q);
1748         }
1749
1750         qunlock(&q->wlock);
1751         poperror();
1752         return n;
1753 }
1754
1755 long qibwrite(struct queue *q, struct block *b)
1756 {
1757         int n, dowakeup;
1758         bool was_empty;
1759
1760         dowakeup = 0;
1761
1762         n = BLEN(b);
1763
1764         spin_lock_irqsave(&q->lock);
1765         was_empty = q->len == 0;
1766
1767         QDEBUG checkb(b, "qibwrite");
1768         if (q->bfirst)
1769                 q->blast->next = b;
1770         else
1771                 q->bfirst = b;
1772         q->blast = b;
1773         q->len += BALLOC(b);
1774         q->dlen += n;
1775
1776         if (q->state & Qstarve) {
1777                 q->state &= ~Qstarve;
1778                 dowakeup = 1;
1779         }
1780
1781         spin_unlock_irqsave(&q->lock);
1782
1783         if (dowakeup) {
1784                 if (q->kick)
1785                         q->kick(q->arg);
1786                 rendez_wakeup(&q->rr);
1787         }
1788         if (was_empty)
1789                 qwake_cb(q, FDTAP_FILT_READABLE);
1790
1791         return n;
1792 }
1793
1794 /*
1795  *  write to a queue.  only Maxatomic bytes at a time is atomic.
1796  */
1797 int qwrite(struct queue *q, void *vp, int len)
1798 {
1799         int n, sofar;
1800         struct block *b;
1801         uint8_t *p = vp;
1802         void *ext_buf;
1803
1804         QDEBUG if (!islo())
1805                  printd("qwrite hi %p\n", getcallerpc(&q));
1806
1807         sofar = 0;
1808         do {
1809                 n = len - sofar;
1810                 /* This is 64K, the max amount per single block.  Still a good value? */
1811                 if (n > Maxatomic)
1812                         n = Maxatomic;
1813
1814                 /* If n is small, we don't need to bother with the extra_data.  But
1815                  * until the whole stack can handle extd blocks, we'll use them
1816                  * unconditionally. */
1817 #ifdef CONFIG_BLOCK_EXTRAS
1818                 /* allocb builds in 128 bytes of header space to all blocks, but this is
1819                  * only available via padblock (to the left).  we also need some space
1820                  * for pullupblock for some basic headers (like icmp) that get written
1821                  * in directly */
1822                 b = allocb(64);
1823                 ext_buf = kmalloc(n, 0);
1824                 memcpy(ext_buf, p + sofar, n);
1825                 block_add_extd(b, 1, KMALLOC_WAIT); /* returns 0 on success */
1826                 b->extra_data[0].base = (uintptr_t)ext_buf;
1827                 b->extra_data[0].off = 0;
1828                 b->extra_data[0].len = n;
1829                 b->extra_len += n;
1830 #else
1831                 b = allocb(n);
1832                 memmove(b->wp, p + sofar, n);
1833                 b->wp += n;
1834 #endif
1835                         
1836                 qbwrite(q, b);
1837
1838                 sofar += n;
1839         } while (sofar < len && (q->state & Qmsg) == 0);
1840
1841         return len;
1842 }
1843
1844 /*
1845  *  used by print() to write to a queue.  Since we may be splhi or not in
1846  *  a process, don't qlock.
1847  */
1848 int qiwrite(struct queue *q, void *vp, int len)
1849 {
1850         int n, sofar;
1851         struct block *b;
1852         uint8_t *p = vp;
1853
1854         sofar = 0;
1855         do {
1856                 n = len - sofar;
1857                 if (n > Maxatomic)
1858                         n = Maxatomic;
1859
1860                 b = iallocb(n);
1861                 if (b == NULL)
1862                         break;
1863                 /* TODO consider extra_data */
1864                 memmove(b->wp, p + sofar, n);
1865                 /* this adjusts BLEN to be n, or at least it should */
1866                 b->wp += n;
1867                 assert(n == BLEN(b));
1868                 qibwrite(q, b);
1869
1870                 sofar += n;
1871         } while (sofar < len && (q->state & Qmsg) == 0);
1872
1873         return sofar;
1874 }
1875
1876 /*
1877  *  be extremely careful when calling this,
1878  *  as there is no reference accounting
1879  */
1880 void qfree(struct queue *q)
1881 {
1882         qclose(q);
1883         kfree(q);
1884 }
1885
1886 /*
1887  *  Mark a queue as closed.  No further IO is permitted.
1888  *  All blocks are released.
1889  */
1890 void qclose(struct queue *q)
1891 {
1892         struct block *bfirst;
1893
1894         if (q == NULL)
1895                 return;
1896
1897         /* mark it */
1898         spin_lock_irqsave(&q->lock);
1899         q->state |= Qclosed;
1900         q->state &= ~(Qflow | Qstarve | Qdropoverflow | Qnonblock);
1901         q->err[0] = 0;
1902         bfirst = q->bfirst;
1903         q->bfirst = 0;
1904         q->len = 0;
1905         q->dlen = 0;
1906         spin_unlock_irqsave(&q->lock);
1907
1908         /* free queued blocks */
1909         freeblist(bfirst);
1910
1911         /* wake up readers/writers */
1912         rendez_wakeup(&q->rr);
1913         rendez_wakeup(&q->wr);
1914         qwake_cb(q, FDTAP_FILT_HANGUP);
1915 }
1916
1917 /* Mark a queue as closed.  Wakeup any readers.  Don't remove queued blocks.
1918  *
1919  * msg will be the errstr received by any waiters (qread, qbread, etc).  If
1920  * there is no message, which is what also happens during a natural qclose(),
1921  * those waiters will simply return 0.  qwriters will always error() on a
1922  * closed/hungup queue. */
1923 void qhangup(struct queue *q, char *msg)
1924 {
1925         /* mark it */
1926         spin_lock_irqsave(&q->lock);
1927         q->state |= Qclosed;
1928         if (msg == 0 || *msg == 0)
1929                 q->err[0] = 0;
1930         else
1931                 strlcpy(q->err, msg, ERRMAX);
1932         spin_unlock_irqsave(&q->lock);
1933
1934         /* wake up readers/writers */
1935         rendez_wakeup(&q->rr);
1936         rendez_wakeup(&q->wr);
1937         qwake_cb(q, FDTAP_FILT_HANGUP);
1938 }
1939
1940 /*
1941  *  return non-zero if the q is hungup
1942  */
1943 int qisclosed(struct queue *q)
1944 {
1945         return q->state & Qclosed;
1946 }
1947
1948 /*
1949  *  mark a queue as no longer hung up.  resets the wake_cb.
1950  */
1951 void qreopen(struct queue *q)
1952 {
1953         spin_lock_irqsave(&q->lock);
1954         q->state &= ~Qclosed;
1955         q->state |= Qstarve;
1956         q->eof = 0;
1957         q->limit = q->inilim;
1958         q->wake_cb = 0;
1959         q->wake_data = 0;
1960         spin_unlock_irqsave(&q->lock);
1961 }
1962
1963 /*
1964  *  return bytes queued
1965  */
1966 int qlen(struct queue *q)
1967 {
1968         return q->dlen;
1969 }
1970
1971 /*
1972  * return space remaining before flow control
1973  */
1974 int qwindow(struct queue *q)
1975 {
1976         int l;
1977
1978         l = q->limit - q->len;
1979         if (l < 0)
1980                 l = 0;
1981         return l;
1982 }
1983
1984 /*
1985  *  return true if we can read without blocking
1986  */
1987 int qcanread(struct queue *q)
1988 {
1989         return q->bfirst != 0;
1990 }
1991
1992 /*
1993  *  change queue limit
1994  */
1995 void qsetlimit(struct queue *q, int limit)
1996 {
1997         q->limit = limit;
1998 }
1999
2000 /*
2001  *  set whether writes drop overflowing blocks, or if we sleep
2002  */
2003 void qdropoverflow(struct queue *q, bool onoff)
2004 {
2005         if (onoff)
2006                 q->state |= Qdropoverflow;
2007         else
2008                 q->state &= ~Qdropoverflow;
2009 }
2010
2011 /* set whether or not the queue is nonblocking, in the EAGAIN sense. */
2012 void qnonblock(struct queue *q, bool onoff)
2013 {
2014         if (onoff)
2015                 q->state |= Qnonblock;
2016         else
2017                 q->state &= ~Qnonblock;
2018 }
2019
2020 /*
2021  *  flush the output queue
2022  */
2023 void qflush(struct queue *q)
2024 {
2025         struct block *bfirst;
2026
2027         /* mark it */
2028         spin_lock_irqsave(&q->lock);
2029         bfirst = q->bfirst;
2030         q->bfirst = 0;
2031         q->len = 0;
2032         q->dlen = 0;
2033         spin_unlock_irqsave(&q->lock);
2034
2035         /* free queued blocks */
2036         freeblist(bfirst);
2037
2038         /* wake up writers */
2039         rendez_wakeup(&q->wr);
2040         qwake_cb(q, FDTAP_FILT_WRITABLE);
2041 }
2042
2043 int qfull(struct queue *q)
2044 {
2045         return q->len >= q->limit;
2046 }
2047
2048 int qstate(struct queue *q)
2049 {
2050         return q->state;
2051 }
2052
2053 void qdump(struct queue *q)
2054 {
2055         if (q)
2056                 printk("q=%p bfirst=%p blast=%p len=%d dlen=%d limit=%d state=#%x\n",
2057                            q, q->bfirst, q->blast, q->len, q->dlen, q->limit, q->state);
2058 }
2059
2060 /* On certain wakeup events, qio will call func(q, data, filter), where filter
2061  * marks the type of wakeup event (flags from FDTAP).
2062  *
2063  * There's no sync protection.  If you change the CB while the qio is running,
2064  * you might get a CB with the data or func from a previous set_wake_cb.  You
2065  * should set this once per queue and forget it.
2066  *
2067  * You can remove the CB by passing in 0 for the func.  Alternatively, you can
2068  * just make sure that the func(data) pair are valid until the queue is freed or
2069  * reopened. */
2070 void qio_set_wake_cb(struct queue *q, qio_wake_cb_t func, void *data)
2071 {
2072         q->wake_data = data;
2073         wmb();  /* if we see func, we'll also see the data for it */
2074         q->wake_cb = func;
2075 }