qio: Clean up locking
[akaros.git] / kern / src / ns / qio.c
1 /* Copyright © 1994-1999 Lucent Technologies Inc.  All rights reserved.
2  * Portions Copyright © 1997-1999 Vita Nuova Limited
3  * Portions Copyright © 2000-2007 Vita Nuova Holdings Limited
4  *                                (www.vitanuova.com)
5  * Revisions Copyright © 2000-2007 Lucent Technologies Inc. and others
6  *
7  * Modified for the Akaros operating system:
8  * Copyright (c) 2013-2014 The Regents of the University of California
9  * Copyright (c) 2013-2015 Google Inc.
10  *
11  * Permission is hereby granted, free of charge, to any person obtaining a copy
12  * of this software and associated documentation files (the "Software"), to deal
13  * in the Software without restriction, including without limitation the rights
14  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15  * copies of the Software, and to permit persons to whom the Software is
16  * furnished to do so, subject to the following conditions:
17  *
18  * The above copyright notice and this permission notice shall be included in
19  * all copies or substantial portions of the Software.
20  *
21  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL THE
24  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
27  * SOFTWARE. */
28
29 #include <vfs.h>
30 #include <kfs.h>
31 #include <slab.h>
32 #include <kmalloc.h>
33 #include <kref.h>
34 #include <string.h>
35 #include <stdio.h>
36 #include <assert.h>
37 #include <error.h>
38 #include <cpio.h>
39 #include <pmap.h>
40 #include <smp.h>
41 #include <ip.h>
42
43 #define PANIC_EXTRA(b)                                                  \
44 {                                                                       \
45         if ((b)->extra_len) {                                           \
46                 printblock(b);                                          \
47                 backtrace();                                            \
48                 panic("%s doesn't handle extra_data", __FUNCTION__);    \
49         }                                                               \
50 }
51
52 static uint32_t padblockcnt;
53 static uint32_t concatblockcnt;
54 static uint32_t pullupblockcnt;
55 static uint32_t copyblockcnt;
56 static uint32_t consumecnt;
57 static uint32_t producecnt;
58 static uint32_t qcopycnt;
59
60 static int debugging;
61
62 #define QDEBUG  if(0)
63
64 /*
65  *  IO queues
66  */
67
68 struct queue {
69         spinlock_t lock;;
70
71         struct block *bfirst;           /* buffer */
72         struct block *blast;
73
74         int len;                                        /* bytes allocated to queue */
75         int dlen;                                       /* data bytes in queue */
76         int limit;                                      /* max bytes in queue */
77         int inilim;                             /* initial limit */
78         int state;
79         int eof;                                        /* number of eofs read by user */
80
81         void (*kick) (void *);          /* restart output */
82         void (*bypass) (void *, struct block *);        /* bypass queue altogether */
83         void *arg;                                      /* argument to kick */
84
85         struct rendez rr;                       /* process waiting to read */
86         struct rendez wr;                       /* process waiting to write */
87         qio_wake_cb_t wake_cb;          /* callbacks for qio wakeups */
88         void *wake_data;
89
90         char err[ERRMAX];
91 };
92
93 enum {
94         Maxatomic = 64 * 1024,
95 };
96
97 unsigned int qiomaxatomic = Maxatomic;
98
99 /* Helper: fires a wake callback, sending 'filter' */
100 static void qwake_cb(struct queue *q, int filter)
101 {
102         if (q->wake_cb)
103                 q->wake_cb(q, q->wake_data, filter);
104 }
105
106 void ixsummary(void)
107 {
108         debugging ^= 1;
109         printd("pad %lu, concat %lu, pullup %lu, copy %lu\n",
110                    padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
111         printd("consume %lu, produce %lu, qcopy %lu\n",
112                    consumecnt, producecnt, qcopycnt);
113 }
114
115 /*
116  *  free a list of blocks
117  */
118 void freeblist(struct block *b)
119 {
120         struct block *next;
121
122         for (; b != 0; b = next) {
123                 next = b->next;
124                 b->next = 0;
125                 freeb(b);
126         }
127 }
128
129 /*
130  *  pad a block to the front (or the back if size is negative)
131  */
132 struct block *padblock(struct block *bp, int size)
133 {
134         int n;
135         struct block *nbp;
136         uint8_t bcksum = bp->flag & BCKSUM_FLAGS;
137         uint16_t checksum_start = bp->checksum_start;
138         uint16_t checksum_offset = bp->checksum_offset;
139         uint16_t mss = bp->mss;
140
141         QDEBUG checkb(bp, "padblock 1");
142         if (size >= 0) {
143                 if (bp->rp - bp->base >= size) {
144                         bp->checksum_start += size;
145                         bp->rp -= size;
146                         return bp;
147                 }
148
149                 PANIC_EXTRA(bp);
150                 if (bp->next)
151                         panic("padblock %p", getcallerpc(&bp));
152                 n = BLEN(bp);
153                 padblockcnt++;
154                 nbp = block_alloc(size + n, MEM_WAIT);
155                 nbp->rp += size;
156                 nbp->wp = nbp->rp;
157                 memmove(nbp->wp, bp->rp, n);
158                 nbp->wp += n;
159                 freeb(bp);
160                 nbp->rp -= size;
161         } else {
162                 size = -size;
163
164                 PANIC_EXTRA(bp);
165
166                 if (bp->next)
167                         panic("padblock %p", getcallerpc(&bp));
168
169                 if (bp->lim - bp->wp >= size)
170                         return bp;
171
172                 n = BLEN(bp);
173                 padblockcnt++;
174                 nbp = block_alloc(size + n, MEM_WAIT);
175                 memmove(nbp->wp, bp->rp, n);
176                 nbp->wp += n;
177                 freeb(bp);
178         }
179         if (bcksum) {
180                 nbp->flag |= bcksum;
181                 nbp->checksum_start = checksum_start;
182                 nbp->checksum_offset = checksum_offset;
183                 nbp->mss = mss;
184         }
185         QDEBUG checkb(nbp, "padblock 1");
186         return nbp;
187 }
188
189 /*
190  *  return count of bytes in a string of blocks
191  */
192 int blocklen(struct block *bp)
193 {
194         int len;
195
196         len = 0;
197         while (bp) {
198                 len += BLEN(bp);
199                 bp = bp->next;
200         }
201         return len;
202 }
203
204 /*
205  * return count of space in blocks
206  */
207 int blockalloclen(struct block *bp)
208 {
209         int len;
210
211         len = 0;
212         while (bp) {
213                 len += BALLOC(bp);
214                 bp = bp->next;
215         }
216         return len;
217 }
218
219 /*
220  *  copy the  string of blocks into
221  *  a single block and free the string
222  */
223 struct block *concatblock(struct block *bp)
224 {
225         int len;
226         struct block *nb, *f;
227
228         if (bp->next == 0)
229                 return bp;
230
231         /* probably use parts of qclone */
232         PANIC_EXTRA(bp);
233         nb = block_alloc(blocklen(bp), MEM_WAIT);
234         for (f = bp; f; f = f->next) {
235                 len = BLEN(f);
236                 memmove(nb->wp, f->rp, len);
237                 nb->wp += len;
238         }
239         concatblockcnt += BLEN(nb);
240         freeblist(bp);
241         QDEBUG checkb(nb, "concatblock 1");
242         return nb;
243 }
244
245 /* Returns a block with the remaining contents of b all in the main body of the
246  * returned block.  Replace old references to b with the returned value (which
247  * may still be 'b', if no change was needed. */
248 struct block *linearizeblock(struct block *b)
249 {
250         struct block *newb;
251         size_t len;
252         struct extra_bdata *ebd;
253
254         if (!b->extra_len)
255                 return b;
256
257         newb = block_alloc(BLEN(b), MEM_WAIT);
258         len = BHLEN(b);
259         memcpy(newb->wp, b->rp, len);
260         newb->wp += len;
261         len = b->extra_len;
262         for (int i = 0; (i < b->nr_extra_bufs) && len; i++) {
263                 ebd = &b->extra_data[i];
264                 if (!ebd->base || !ebd->len)
265                         continue;
266                 memcpy(newb->wp, (void*)(ebd->base + ebd->off), ebd->len);
267                 newb->wp += ebd->len;
268                 len -= ebd->len;
269         }
270         /* TODO: any other flags that need copied over? */
271         if (b->flag & BCKSUM_FLAGS) {
272                 newb->flag |= (b->flag & BCKSUM_FLAGS);
273                 newb->checksum_start = b->checksum_start;
274                 newb->checksum_offset = b->checksum_offset;
275                 newb->mss = b->mss;
276         }
277         freeb(b);
278         return newb;
279 }
280
281 /*
282  *  make sure the first block has at least n bytes in its main body
283  */
284 struct block *pullupblock(struct block *bp, int n)
285 {
286         int i, len, seglen;
287         struct block *nbp;
288         struct extra_bdata *ebd;
289
290         /*
291          *  this should almost always be true, it's
292          *  just to avoid every caller checking.
293          */
294         if (BHLEN(bp) >= n)
295                 return bp;
296
297          /* a start at explicit main-body / header management */
298         if (bp->extra_len) {
299                 if (n > bp->lim - bp->rp) {
300                         /* would need to realloc a new block and copy everything over. */
301                         panic("can't pullup %d bytes, no place to put it: bp->lim %p, bp->rp %p, bp->lim-bp->rp %d\n",
302                                         n, bp->lim, bp->rp, bp->lim-bp->rp);
303                 }
304                 len = n - BHLEN(bp);
305                 if (len > bp->extra_len)
306                         panic("pullup more than extra (%d, %d, %d)\n",
307                               n, BHLEN(bp), bp->extra_len);
308                 checkb(bp, "before pullup");
309                 for (int i = 0; (i < bp->nr_extra_bufs) && len; i++) {
310                         ebd = &bp->extra_data[i];
311                         if (!ebd->base || !ebd->len)
312                                 continue;
313                         seglen = MIN(ebd->len, len);
314                         memcpy(bp->wp, (void*)(ebd->base + ebd->off), seglen);
315                         bp->wp += seglen;
316                         len -= seglen;
317                         ebd->len -= seglen;
318                         ebd->off += seglen;
319                         bp->extra_len -= seglen;
320                         if (ebd->len == 0) {
321                                 kfree((void *)ebd->base);
322                                 ebd->off = 0;
323                                 ebd->base = 0;
324                         }
325                 }
326                 /* maybe just call pullupblock recursively here */
327                 if (len)
328                         panic("pullup %d bytes overdrawn\n", len);
329                 checkb(bp, "after pullup");
330                 return bp;
331         }
332
333         /*
334          *  if not enough room in the first block,
335          *  add another to the front of the list.
336          */
337         if (bp->lim - bp->rp < n) {
338                 nbp = block_alloc(n, MEM_WAIT);
339                 nbp->next = bp;
340                 bp = nbp;
341         }
342
343         /*
344          *  copy bytes from the trailing blocks into the first
345          */
346         n -= BLEN(bp);
347         while ((nbp = bp->next)) {
348                 i = BLEN(nbp);
349                 if (i > n) {
350                         memmove(bp->wp, nbp->rp, n);
351                         pullupblockcnt++;
352                         bp->wp += n;
353                         nbp->rp += n;
354                         QDEBUG checkb(bp, "pullupblock 1");
355                         return bp;
356                 } else {
357                         memmove(bp->wp, nbp->rp, i);
358                         pullupblockcnt++;
359                         bp->wp += i;
360                         bp->next = nbp->next;
361                         nbp->next = 0;
362                         freeb(nbp);
363                         n -= i;
364                         if (n == 0) {
365                                 QDEBUG checkb(bp, "pullupblock 2");
366                                 return bp;
367                         }
368                 }
369         }
370         freeb(bp);
371         return 0;
372 }
373
374 /*
375  *  make sure the first block has at least n bytes in its main body
376  */
377 struct block *pullupqueue(struct queue *q, int n)
378 {
379         struct block *b;
380
381         /* TODO: lock to protect the queue links? */
382         if ((BHLEN(q->bfirst) >= n))
383                 return q->bfirst;
384         q->bfirst = pullupblock(q->bfirst, n);
385         for (b = q->bfirst; b != NULL && b->next != NULL; b = b->next) ;
386         q->blast = b;
387         return q->bfirst;
388 }
389
390 /* throw away count bytes from the front of
391  * block's extradata.  Returns count of bytes
392  * thrown away
393  */
394
395 static int pullext(struct block *bp, int count)
396 {
397         struct extra_bdata *ed;
398         int i, rem, bytes = 0;
399
400         for (i = 0; bp->extra_len && count && i < bp->nr_extra_bufs; i++) {
401                 ed = &bp->extra_data[i];
402                 rem = MIN(count, ed->len);
403                 bp->extra_len -= rem;
404                 count -= rem;
405                 bytes += rem;
406                 ed->off += rem;
407                 ed->len -= rem;
408                 if (ed->len == 0) {
409                         kfree((void *)ed->base);
410                         ed->base = 0;
411                         ed->off = 0;
412                 }
413         }
414         return bytes;
415 }
416
417 /* throw away count bytes from the end of a
418  * block's extradata.  Returns count of bytes
419  * thrown away
420  */
421
422 static int dropext(struct block *bp, int count)
423 {
424         struct extra_bdata *ed;
425         int i, rem, bytes = 0;
426
427         for (i = bp->nr_extra_bufs - 1; bp->extra_len && count && i >= 0; i--) {
428                 ed = &bp->extra_data[i];
429                 rem = MIN(count, ed->len);
430                 bp->extra_len -= rem;
431                 count -= rem;
432                 bytes += rem;
433                 ed->len -= rem;
434                 if (ed->len == 0) {
435                         kfree((void *)ed->base);
436                         ed->base = 0;
437                         ed->off = 0;
438                 }
439         }
440         return bytes;
441 }
442
443 /*
444  *  throw away up to count bytes from a
445  *  list of blocks.  Return count of bytes
446  *  thrown away.
447  */
448 static int _pullblock(struct block **bph, int count, int free)
449 {
450         struct block *bp;
451         int n, bytes;
452
453         bytes = 0;
454         if (bph == NULL)
455                 return 0;
456
457         while (*bph != NULL && count != 0) {
458                 bp = *bph;
459
460                 n = MIN(BHLEN(bp), count);
461                 bytes += n;
462                 count -= n;
463                 bp->rp += n;
464                 n = pullext(bp, count);
465                 bytes += n;
466                 count -= n;
467                 QDEBUG checkb(bp, "pullblock ");
468                 if (BLEN(bp) == 0 && (free || count)) {
469                         *bph = bp->next;
470                         bp->next = NULL;
471                         freeb(bp);
472                 }
473         }
474         return bytes;
475 }
476
477 int pullblock(struct block **bph, int count)
478 {
479         return _pullblock(bph, count, 1);
480 }
481
482 /*
483  *  trim to len bytes starting at offset
484  */
485 struct block *trimblock(struct block *bp, int offset, int len)
486 {
487         uint32_t l, trim;
488         int olen = len;
489
490         QDEBUG checkb(bp, "trimblock 1");
491         if (blocklen(bp) < offset + len) {
492                 freeblist(bp);
493                 return NULL;
494         }
495
496         l =_pullblock(&bp, offset, 0);
497         if (bp == NULL)
498                 return NULL;
499         if (l != offset) {
500                 freeblist(bp);
501                 return NULL;
502         }
503
504         while ((l = BLEN(bp)) < len) {
505                 len -= l;
506                 bp = bp->next;
507         }
508
509         trim = BLEN(bp) - len;
510         trim -= dropext(bp, trim);
511         bp->wp -= trim;
512
513         if (bp->next) {
514                 freeblist(bp->next);
515                 bp->next = NULL;
516         }
517         return bp;
518 }
519
520 /*
521  *  copy 'count' bytes into a new block
522  */
523 struct block *copyblock(struct block *bp, int count)
524 {
525         int l;
526         struct block *nbp;
527
528         QDEBUG checkb(bp, "copyblock 0");
529         nbp = block_alloc(count, MEM_WAIT);
530         if (bp->flag & BCKSUM_FLAGS) {
531                 nbp->flag |= (bp->flag & BCKSUM_FLAGS);
532                 nbp->checksum_start = bp->checksum_start;
533                 nbp->checksum_offset = bp->checksum_offset;
534                 nbp->mss = bp->mss;
535         }
536         PANIC_EXTRA(bp);
537         for (; count > 0 && bp != 0; bp = bp->next) {
538                 l = BLEN(bp);
539                 if (l > count)
540                         l = count;
541                 memmove(nbp->wp, bp->rp, l);
542                 nbp->wp += l;
543                 count -= l;
544         }
545         if (count > 0) {
546                 memset(nbp->wp, 0, count);
547                 nbp->wp += count;
548         }
549         copyblockcnt++;
550         QDEBUG checkb(nbp, "copyblock 1");
551
552         return nbp;
553 }
554
555 /* Adjust block @bp so that its size is exactly @len.
556  * If the size is increased, fill in the new contents with zeros.
557  * If the size is decreased, discard some of the old contents at the tail. */
558 struct block *adjustblock(struct block *bp, int len)
559 {
560         struct extra_bdata *ebd;
561         int i;
562
563         if (len < 0) {
564                 freeb(bp);
565                 return NULL;
566         }
567
568         if (len == BLEN(bp))
569                 return bp;
570
571         /* Shrink within block main body. */
572         if (len <= BHLEN(bp)) {
573                 free_block_extra(bp);
574                 bp->wp = bp->rp + len;
575                 QDEBUG checkb(bp, "adjustblock 1");
576                 return bp;
577         }
578
579         /* Need to grow. */
580         if (len > BLEN(bp)) {
581                 /* Grow within block main body. */
582                 if (bp->extra_len == 0 && bp->rp + len <= bp->lim) {
583                         memset(bp->wp, 0, len - BLEN(bp));
584                         bp->wp = bp->rp + len;
585                         QDEBUG checkb(bp, "adjustblock 2");
586                         return bp;
587                 }
588                 /* Grow with extra data buffers. */
589                 block_append_extra(bp, len - BLEN(bp), MEM_WAIT);
590                 QDEBUG checkb(bp, "adjustblock 3");
591                 return bp;
592         }
593
594         /* Shrink extra data buffers.
595          * len is how much of ebd we need to keep.
596          * extra_len is re-accumulated. */
597         assert(bp->extra_len > 0);
598         len -= BHLEN(bp);
599         bp->extra_len = 0;
600         for (i = 0; i < bp->nr_extra_bufs; i++) {
601                 ebd = &bp->extra_data[i];
602                 if (len <= ebd->len)
603                         break;
604                 len -= ebd->len;
605                 bp->extra_len += ebd->len;
606         }
607         /* If len becomes zero, extra_data[i] should be freed. */
608         if (len > 0) {
609                 ebd = &bp->extra_data[i];
610                 ebd->len = len;
611                 bp->extra_len += ebd->len;
612                 i++;
613         }
614         for (; i < bp->nr_extra_bufs; i++) {
615                 ebd = &bp->extra_data[i];
616                 if (ebd->base)
617                         kfree((void*)ebd->base);
618                 ebd->base = ebd->off = ebd->len = 0;
619         }
620         QDEBUG checkb(bp, "adjustblock 4");
621         return bp;
622 }
623
624
625 /*
626  *  get next block from a queue, return null if nothing there
627  */
628 struct block *qget(struct queue *q)
629 {
630         int dowakeup;
631         struct block *b;
632
633         /* sync with qwrite */
634         spin_lock_irqsave(&q->lock);
635
636         b = q->bfirst;
637         if (b == NULL) {
638                 q->state |= Qstarve;
639                 spin_unlock_irqsave(&q->lock);
640                 return NULL;
641         }
642         q->bfirst = b->next;
643         b->next = 0;
644         q->len -= BALLOC(b);
645         q->dlen -= BLEN(b);
646         QDEBUG checkb(b, "qget");
647
648         /* if writer flow controlled, restart */
649         if ((q->state & Qflow) && q->len < q->limit / 2) {
650                 q->state &= ~Qflow;
651                 dowakeup = 1;
652         } else
653                 dowakeup = 0;
654
655         spin_unlock_irqsave(&q->lock);
656
657         if (dowakeup)
658                 rendez_wakeup(&q->wr);
659         qwake_cb(q, FDTAP_FILT_WRITABLE);
660
661         return b;
662 }
663
664 /*
665  *  throw away the next 'len' bytes in the queue
666  * returning the number actually discarded
667  */
668 int qdiscard(struct queue *q, int len)
669 {
670         struct block *b;
671         int dowakeup, n, sofar, body_amt, extra_amt;
672         struct extra_bdata *ebd;
673
674         spin_lock_irqsave(&q->lock);
675         for (sofar = 0; sofar < len; sofar += n) {
676                 b = q->bfirst;
677                 if (b == NULL)
678                         break;
679                 QDEBUG checkb(b, "qdiscard");
680                 n = BLEN(b);
681                 if (n <= len - sofar) {
682                         q->bfirst = b->next;
683                         b->next = 0;
684                         q->len -= BALLOC(b);
685                         q->dlen -= BLEN(b);
686                         freeb(b);
687                 } else {
688                         n = len - sofar;
689                         q->dlen -= n;
690                         /* partial block removal */
691                         body_amt = MIN(BHLEN(b), n);
692                         b->rp += body_amt;
693                         extra_amt = n - body_amt;
694                         /* reduce q->len by the amount we remove from the extras.  The
695                          * header will always be accounted for above, during block removal.
696                          * */
697                         q->len -= extra_amt;
698                         for (int i = 0; (i < b->nr_extra_bufs) && extra_amt; i++) {
699                                 ebd = &b->extra_data[i];
700                                 if (!ebd->base || !ebd->len)
701                                         continue;
702                                 if (extra_amt >= ebd->len) {
703                                         /* remove the entire entry, note the kfree release */
704                                         b->extra_len -= ebd->len;
705                                         extra_amt -= ebd->len;
706                                         kfree((void*)ebd->base);
707                                         ebd->base = ebd->off = ebd->len = 0;
708                                         continue;
709                                 }
710                                 ebd->off += extra_amt;
711                                 ebd->len -= extra_amt;
712                                 b->extra_len -= extra_amt;
713                                 extra_amt = 0;
714                         }
715                 }
716         }
717
718         /*
719          *  if writer flow controlled, restart
720          *
721          *  This used to be
722          *  q->len < q->limit/2
723          *  but it slows down tcp too much for certain write sizes.
724          *  I really don't understand it completely.  It may be
725          *  due to the queue draining so fast that the transmission
726          *  stalls waiting for the app to produce more data.  - presotto
727          */
728         if ((q->state & Qflow) && q->len < q->limit) {
729                 q->state &= ~Qflow;
730                 dowakeup = 1;
731         } else
732                 dowakeup = 0;
733
734         spin_unlock_irqsave(&q->lock);
735
736         if (dowakeup)
737                 rendez_wakeup(&q->wr);
738         qwake_cb(q, FDTAP_FILT_WRITABLE);
739
740         return sofar;
741 }
742
743 /*
744  *  Interrupt level copy out of a queue, return # bytes copied.
745  */
746 int qconsume(struct queue *q, void *vp, int len)
747 {
748         struct block *b;
749         int n, dowakeup;
750         uint8_t *p = vp;
751         struct block *tofree = NULL;
752
753         /* sync with qwrite */
754         spin_lock_irqsave(&q->lock);
755
756         for (;;) {
757                 b = q->bfirst;
758                 if (b == 0) {
759                         q->state |= Qstarve;
760                         spin_unlock_irqsave(&q->lock);
761                         return -1;
762                 }
763                 QDEBUG checkb(b, "qconsume 1");
764
765                 n = BLEN(b);
766                 if (n > 0)
767                         break;
768                 q->bfirst = b->next;
769                 q->len -= BALLOC(b);
770
771                 /* remember to free this */
772                 b->next = tofree;
773                 tofree = b;
774         };
775
776         PANIC_EXTRA(b);
777         if (n < len)
778                 len = n;
779         memmove(p, b->rp, len);
780         consumecnt += n;
781         b->rp += len;
782         q->dlen -= len;
783
784         /* discard the block if we're done with it */
785         if ((q->state & Qmsg) || len == n) {
786                 q->bfirst = b->next;
787                 b->next = 0;
788                 q->len -= BALLOC(b);
789                 q->dlen -= BLEN(b);
790
791                 /* remember to free this */
792                 b->next = tofree;
793                 tofree = b;
794         }
795
796         /* if writer flow controlled, restart */
797         if ((q->state & Qflow) && q->len < q->limit / 2) {
798                 q->state &= ~Qflow;
799                 dowakeup = 1;
800         } else
801                 dowakeup = 0;
802
803         spin_unlock_irqsave(&q->lock);
804
805         if (dowakeup)
806                 rendez_wakeup(&q->wr);
807         qwake_cb(q, FDTAP_FILT_WRITABLE);
808
809         if (tofree != NULL)
810                 freeblist(tofree);
811
812         return len;
813 }
814
815 int qpass(struct queue *q, struct block *b)
816 {
817         int dlen, len, dowakeup;
818         bool was_empty;
819
820         /* sync with qread */
821         dowakeup = 0;
822         spin_lock_irqsave(&q->lock);
823         was_empty = q->len == 0;
824         if (q->len >= q->limit) {
825                 freeblist(b);
826                 spin_unlock_irqsave(&q->lock);
827                 return -1;
828         }
829         if (q->state & Qclosed) {
830                 len = blocklen(b);
831                 freeblist(b);
832                 spin_unlock_irqsave(&q->lock);
833                 return len;
834         }
835
836         /* add buffer to queue */
837         if (q->bfirst)
838                 q->blast->next = b;
839         else
840                 q->bfirst = b;
841         len = BALLOC(b);
842         dlen = BLEN(b);
843         QDEBUG checkb(b, "qpass");
844         while (b->next) {
845                 b = b->next;
846                 QDEBUG checkb(b, "qpass");
847                 len += BALLOC(b);
848                 dlen += BLEN(b);
849         }
850         q->blast = b;
851         q->len += len;
852         q->dlen += dlen;
853
854         if (q->len >= q->limit / 2)
855                 q->state |= Qflow;
856
857         if (q->state & Qstarve) {
858                 q->state &= ~Qstarve;
859                 dowakeup = 1;
860         }
861         spin_unlock_irqsave(&q->lock);
862
863         if (dowakeup)
864                 rendez_wakeup(&q->rr);
865         if (was_empty)
866                 qwake_cb(q, FDTAP_FILT_READABLE);
867
868         return len;
869 }
870
871 int qpassnolim(struct queue *q, struct block *b)
872 {
873         int dlen, len, dowakeup;
874         bool was_empty;
875
876         /* sync with qread */
877         dowakeup = 0;
878         spin_lock_irqsave(&q->lock);
879         was_empty = q->len == 0;
880
881         if (q->state & Qclosed) {
882                 freeblist(b);
883                 spin_unlock_irqsave(&q->lock);
884                 return BALLOC(b);
885         }
886
887         /* add buffer to queue */
888         if (q->bfirst)
889                 q->blast->next = b;
890         else
891                 q->bfirst = b;
892         len = BALLOC(b);
893         dlen = BLEN(b);
894         QDEBUG checkb(b, "qpass");
895         while (b->next) {
896                 b = b->next;
897                 QDEBUG checkb(b, "qpass");
898                 len += BALLOC(b);
899                 dlen += BLEN(b);
900         }
901         q->blast = b;
902         q->len += len;
903         q->dlen += dlen;
904
905         if (q->len >= q->limit / 2)
906                 q->state |= Qflow;
907
908         if (q->state & Qstarve) {
909                 q->state &= ~Qstarve;
910                 dowakeup = 1;
911         }
912         spin_unlock_irqsave(&q->lock);
913
914         if (dowakeup)
915                 rendez_wakeup(&q->rr);
916         if (was_empty)
917                 qwake_cb(q, FDTAP_FILT_READABLE);
918
919         return len;
920 }
921
922 /*
923  *  if the allocated space is way out of line with the used
924  *  space, reallocate to a smaller block
925  */
926 struct block *packblock(struct block *bp)
927 {
928         struct block **l, *nbp;
929         int n;
930
931         if (bp->extra_len)
932                 return bp;
933         for (l = &bp; *l; l = &(*l)->next) {
934                 nbp = *l;
935                 n = BLEN(nbp);
936                 if ((n << 2) < BALLOC(nbp)) {
937                         *l = block_alloc(n, MEM_WAIT);
938                         memmove((*l)->wp, nbp->rp, n);
939                         (*l)->wp += n;
940                         (*l)->next = nbp->next;
941                         freeb(nbp);
942                 }
943         }
944
945         return bp;
946 }
947
948 int qproduce(struct queue *q, void *vp, int len)
949 {
950         struct block *b;
951         int dowakeup;
952         uint8_t *p = vp;
953         bool was_empty;
954
955         /* sync with qread */
956         dowakeup = 0;
957         spin_lock_irqsave(&q->lock);
958         was_empty = q->len == 0;
959
960         /* no waiting receivers, room in buffer? */
961         if (q->len >= q->limit) {
962                 q->state |= Qflow;
963                 spin_unlock_irqsave(&q->lock);
964                 return -1;
965         }
966
967         /* save in buffer */
968         /* use Qcoalesce here to save storage */
969         // TODO: Consider removing the Qcoalesce flag and force a coalescing
970         // strategy by default.
971         b = q->blast;
972         if ((q->state & Qcoalesce) == 0 || q->bfirst == NULL
973                 || b->lim - b->wp < len) {
974                 /* need a new block */
975                 b = block_alloc(len, MEM_ATOMIC);
976                 if (b == 0) {
977                         spin_unlock_irqsave(&q->lock);
978                         return 0;
979                 }
980                 if (q->bfirst)
981                         q->blast->next = b;
982                 else
983                         q->bfirst = b;
984                 q->blast = b;
985                 /* b->next = 0; done by iallocb() */
986                 q->len += BALLOC(b);
987         }
988         PANIC_EXTRA(b);
989         memmove(b->wp, p, len);
990         producecnt += len;
991         b->wp += len;
992         q->dlen += len;
993         QDEBUG checkb(b, "qproduce");
994
995         if (q->state & Qstarve) {
996                 q->state &= ~Qstarve;
997                 dowakeup = 1;
998         }
999
1000         if (q->len >= q->limit)
1001                 q->state |= Qflow;
1002         spin_unlock_irqsave(&q->lock);
1003
1004         if (dowakeup)
1005                 rendez_wakeup(&q->rr);
1006         if (was_empty)
1007                 qwake_cb(q, FDTAP_FILT_READABLE);
1008
1009         return len;
1010 }
1011
1012 /* Add an extra_data entry to newb at newb_idx pointing to b's body, starting at
1013  * body_rp, for up to len.  Returns the len consumed. 
1014  *
1015  * The base is 'b', so that we can kfree it later.  This currently ties us to
1016  * using kfree for the release method for all extra_data.
1017  *
1018  * It is possible to have a body size that is 0, if there is no offset, and
1019  * b->wp == b->rp.  This will have an extra data entry of 0 length. */
1020 static size_t point_to_body(struct block *b, uint8_t *body_rp,
1021                             struct block *newb, unsigned int newb_idx,
1022                             size_t len)
1023 {
1024         struct extra_bdata *ebd = &newb->extra_data[newb_idx];
1025
1026         assert(newb_idx < newb->nr_extra_bufs);
1027
1028         kmalloc_incref(b);
1029         ebd->base = (uintptr_t)b;
1030         ebd->off = (uint32_t)(body_rp - (uint8_t*)b);
1031         ebd->len = MIN(b->wp - body_rp, len);   /* think of body_rp as b->rp */
1032         assert((int)ebd->len >= 0);
1033         newb->extra_len += ebd->len;
1034         return ebd->len;
1035 }
1036
1037 /* Add an extra_data entry to newb at newb_idx pointing to b's b_idx'th
1038  * extra_data buf, at b_off within that buffer, for up to len.  Returns the len
1039  * consumed.
1040  *
1041  * We can have blocks with 0 length, but they are still refcnt'd.  See above. */
1042 static size_t point_to_buf(struct block *b, unsigned int b_idx, uint32_t b_off,
1043                            struct block *newb, unsigned int newb_idx,
1044                            size_t len)
1045 {
1046         struct extra_bdata *n_ebd = &newb->extra_data[newb_idx];
1047         struct extra_bdata *b_ebd = &b->extra_data[b_idx];
1048
1049         assert(b_idx < b->nr_extra_bufs);
1050         assert(newb_idx < newb->nr_extra_bufs);
1051
1052         kmalloc_incref((void*)b_ebd->base);
1053         n_ebd->base = b_ebd->base;
1054         n_ebd->off = b_ebd->off + b_off;
1055         n_ebd->len = MIN(b_ebd->len - b_off, len);
1056         newb->extra_len += n_ebd->len;
1057         return n_ebd->len;
1058 }
1059
1060 /* given a string of blocks, fills the new block's extra_data  with the contents
1061  * of the blist [offset, len + offset)
1062  *
1063  * returns 0 on success.  the only failure is if the extra_data array was too
1064  * small, so this returns a positive integer saying how big the extra_data needs
1065  * to be.
1066  *
1067  * callers are responsible for protecting the list structure. */
1068 static int __blist_clone_to(struct block *blist, struct block *newb, int len,
1069                             uint32_t offset)
1070 {
1071         struct block *b, *first;
1072         unsigned int nr_bufs = 0;
1073         unsigned int b_idx, newb_idx = 0;
1074         uint8_t *first_main_body = 0;
1075
1076         /* find the first block; keep offset relative to the latest b in the list */
1077         for (b = blist; b; b = b->next) {
1078                 if (BLEN(b) > offset)
1079                         break;
1080                 offset -= BLEN(b);
1081         }
1082         /* qcopy semantics: if you asked for an offset outside the block list, you
1083          * get an empty block back */
1084         if (!b)
1085                 return 0;
1086         first = b;
1087         /* upper bound for how many buffers we'll need in newb */
1088         for (/* b is set*/; b; b = b->next) {
1089                 nr_bufs += 1 + b->nr_extra_bufs;        /* 1 for the main body */
1090         }
1091         /* we might be holding a spinlock here, so we won't wait for kmalloc */
1092         if (block_add_extd(newb, nr_bufs, 0) != 0) {
1093                 /* caller will need to alloc these, then re-call us */
1094                 return nr_bufs;
1095         }
1096         for (b = first; b && len; b = b->next) {
1097                 b_idx = 0;
1098                 if (offset) {
1099                         if (offset < BHLEN(b)) {
1100                                 /* off is in the main body */
1101                                 len -= point_to_body(b, b->rp + offset, newb, newb_idx, len);
1102                                 newb_idx++;
1103                         } else {
1104                                 /* off is in one of the buffers (or just past the last one).
1105                                  * we're not going to point to b's main body at all. */
1106                                 offset -= BHLEN(b);
1107                                 assert(b->extra_data);
1108                                 /* assuming these extrabufs are packed, or at least that len
1109                                  * isn't gibberish */
1110                                 while (b->extra_data[b_idx].len <= offset) {
1111                                         offset -= b->extra_data[b_idx].len;
1112                                         b_idx++;
1113                                 }
1114                                 /* now offset is set to our offset in the b_idx'th buf */
1115                                 len -= point_to_buf(b, b_idx, offset, newb, newb_idx, len);
1116                                 newb_idx++;
1117                                 b_idx++;
1118                         }
1119                         offset = 0;
1120                 } else {
1121                         len -= point_to_body(b, b->rp, newb, newb_idx, len);
1122                         newb_idx++;
1123                 }
1124                 /* knock out all remaining bufs.  we only did one point_to_ op by now,
1125                  * and any point_to_ could be our last if it consumed all of len. */
1126                 for (int i = b_idx; (i < b->nr_extra_bufs) && len; i++) {
1127                         len -= point_to_buf(b, i, 0, newb, newb_idx, len);
1128                         newb_idx++;
1129                 }
1130         }
1131         return 0;
1132 }
1133
1134 struct block *blist_clone(struct block *blist, int header_len, int len,
1135                           uint32_t offset)
1136 {
1137         int ret;
1138         struct block *newb = block_alloc(header_len, MEM_WAIT);
1139         do {
1140                 ret = __blist_clone_to(blist, newb, len, offset);
1141                 if (ret)
1142                         block_add_extd(newb, ret, MEM_WAIT);
1143         } while (ret);
1144         return newb;
1145 }
1146
1147 /* given a queue, makes a single block with header_len reserved space in the
1148  * block main body, and the contents of [offset, len + offset) pointed to in the
1149  * new blocks ext_data. */
1150 struct block *qclone(struct queue *q, int header_len, int len, uint32_t offset)
1151 {
1152         int ret;
1153         struct block *newb = block_alloc(header_len, MEM_WAIT);
1154         /* the while loop should rarely be used: it would require someone
1155          * concurrently adding to the queue. */
1156         do {
1157                 /* TODO: RCU: protecting the q list (b->next) (need read lock) */
1158                 spin_lock_irqsave(&q->lock);
1159                 ret = __blist_clone_to(q->bfirst, newb, len, offset);
1160                 spin_unlock_irqsave(&q->lock);
1161                 if (ret)
1162                         block_add_extd(newb, ret, MEM_WAIT);
1163         } while (ret);
1164         return newb;
1165 }
1166
1167 /*
1168  *  copy from offset in the queue
1169  */
1170 struct block *qcopy_old(struct queue *q, int len, uint32_t offset)
1171 {
1172         int sofar;
1173         int n;
1174         struct block *b, *nb;
1175         uint8_t *p;
1176
1177         nb = block_alloc(len, MEM_WAIT);
1178
1179         spin_lock_irqsave(&q->lock);
1180
1181         /* go to offset */
1182         b = q->bfirst;
1183         for (sofar = 0;; sofar += n) {
1184                 if (b == NULL) {
1185                         spin_unlock_irqsave(&q->lock);
1186                         return nb;
1187                 }
1188                 n = BLEN(b);
1189                 if (sofar + n > offset) {
1190                         p = b->rp + offset - sofar;
1191                         n -= offset - sofar;
1192                         break;
1193                 }
1194                 QDEBUG checkb(b, "qcopy");
1195                 b = b->next;
1196         }
1197
1198         /* copy bytes from there */
1199         for (sofar = 0; sofar < len;) {
1200                 if (n > len - sofar)
1201                         n = len - sofar;
1202                 PANIC_EXTRA(b);
1203                 memmove(nb->wp, p, n);
1204                 qcopycnt += n;
1205                 sofar += n;
1206                 nb->wp += n;
1207                 b = b->next;
1208                 if (b == NULL)
1209                         break;
1210                 n = BLEN(b);
1211                 p = b->rp;
1212         }
1213         spin_unlock_irqsave(&q->lock);
1214
1215         return nb;
1216 }
1217
1218 struct block *qcopy(struct queue *q, int len, uint32_t offset)
1219 {
1220 #ifdef CONFIG_BLOCK_EXTRAS
1221         return qclone(q, 0, len, offset);
1222 #else
1223         return qcopy_old(q, len, offset);
1224 #endif
1225 }
1226
1227 static void qinit_common(struct queue *q)
1228 {
1229         spinlock_init_irqsave(&q->lock);
1230         rendez_init(&q->rr);
1231         rendez_init(&q->wr);
1232 }
1233
1234 /*
1235  *  called by non-interrupt code
1236  */
1237 struct queue *qopen(int limit, int msg, void (*kick) (void *), void *arg)
1238 {
1239         struct queue *q;
1240
1241         q = kzmalloc(sizeof(struct queue), 0);
1242         if (q == 0)
1243                 return 0;
1244         qinit_common(q);
1245
1246         q->limit = q->inilim = limit;
1247         q->kick = kick;
1248         q->arg = arg;
1249         q->state = msg;
1250         q->state |= Qstarve;
1251         q->eof = 0;
1252
1253         return q;
1254 }
1255
1256 /* open a queue to be bypassed */
1257 struct queue *qbypass(void (*bypass) (void *, struct block *), void *arg)
1258 {
1259         struct queue *q;
1260
1261         q = kzmalloc(sizeof(struct queue), 0);
1262         if (q == 0)
1263                 return 0;
1264         qinit_common(q);
1265
1266         q->limit = 0;
1267         q->arg = arg;
1268         q->bypass = bypass;
1269         q->state = 0;
1270
1271         return q;
1272 }
1273
1274 static int notempty(void *a)
1275 {
1276         struct queue *q = a;
1277
1278         return (q->state & Qclosed) || q->bfirst != 0;
1279 }
1280
1281 /* Block, waiting for the queue to be non-empty or closed.  Returns with
1282  * the spinlock held.  Returns TRUE when there queue is not empty, FALSE if it
1283  * was naturally closed.  Throws an error o/w. */
1284 static bool qwait_and_ilock(struct queue *q)
1285 {
1286         while (1) {
1287                 spin_lock_irqsave(&q->lock);
1288                 if (q->bfirst != NULL)
1289                         return TRUE;
1290                 if (q->state & Qclosed) {
1291                         if (++q->eof > 3) {
1292                                 spin_unlock_irqsave(&q->lock);
1293                                 error(EFAIL, "multiple reads on a closed queue");
1294                         }
1295                         if (q->err[0]) {
1296                                 spin_unlock_irqsave(&q->lock);
1297                                 error(EFAIL, q->err);
1298                         }
1299                         return FALSE;
1300                 }
1301                 /* We set Qstarve regardless of whether we are non-blocking or not.
1302                  * Qstarve tracks the edge detection of the queue being empty. */
1303                 q->state |= Qstarve;
1304                 if (q->state & Qnonblock) {
1305                         spin_unlock_irqsave(&q->lock);
1306                         error(EAGAIN, "queue empty");
1307                 }
1308                 spin_unlock_irqsave(&q->lock);
1309                 /* may throw an error() */
1310                 rendez_sleep(&q->rr, notempty, q);
1311         }
1312 }
1313
1314 /*
1315  * add a block list to a queue
1316  */
1317 void qaddlist(struct queue *q, struct block *b)
1318 {
1319         /* TODO: q lock? */
1320         /* queue the block */
1321         if (q->bfirst)
1322                 q->blast->next = b;
1323         else
1324                 q->bfirst = b;
1325         q->len += blockalloclen(b);
1326         q->dlen += blocklen(b);
1327         while (b->next)
1328                 b = b->next;
1329         q->blast = b;
1330 }
1331
1332 /*
1333  *  called with q ilocked
1334  */
1335 struct block *qremove(struct queue *q)
1336 {
1337         struct block *b;
1338
1339         b = q->bfirst;
1340         if (b == NULL)
1341                 return NULL;
1342         q->bfirst = b->next;
1343         b->next = NULL;
1344         q->dlen -= BLEN(b);
1345         q->len -= BALLOC(b);
1346         QDEBUG checkb(b, "qremove");
1347         return b;
1348 }
1349
1350 static size_t read_from_block(struct block *b, uint8_t *to, size_t amt)
1351 {
1352         size_t copy_amt, retval = 0;
1353         struct extra_bdata *ebd;
1354         
1355         copy_amt = MIN(BHLEN(b), amt);
1356         memcpy(to, b->rp, copy_amt);
1357         /* advance the rp, since this block not be completely consumed and future
1358          * reads need to know where to pick up from */
1359         b->rp += copy_amt;
1360         to += copy_amt;
1361         amt -= copy_amt;
1362         retval += copy_amt;
1363         for (int i = 0; (i < b->nr_extra_bufs) && amt; i++) {
1364                 ebd = &b->extra_data[i];
1365                 /* skip empty entires.  if we track this in the struct block, we can
1366                  * just start the for loop early */
1367                 if (!ebd->base || !ebd->len)
1368                         continue;
1369                 copy_amt = MIN(ebd->len, amt);
1370                 memcpy(to, (void*)(ebd->base + ebd->off), copy_amt);
1371                 /* we're actually consuming the entries, just like how we advance rp up
1372                  * above, and might only consume part of one. */
1373                 ebd->len -= copy_amt;
1374                 ebd->off += copy_amt;
1375                 b->extra_len -= copy_amt;
1376                 if (!ebd->len) {
1377                         /* we don't actually have to decref here.  it's also done in
1378                          * freeb().  this is the earliest we can free. */
1379                         kfree((void*)ebd->base);
1380                         ebd->base = ebd->off = 0;
1381                 }
1382                 to += copy_amt;
1383                 amt -= copy_amt;
1384                 retval += copy_amt;
1385         }
1386         return retval;
1387 }
1388
1389 /*
1390  *  copy the contents of a string of blocks into
1391  *  memory.  emptied blocks are freed.  return
1392  *  pointer to first unconsumed block.
1393  */
1394 struct block *bl2mem(uint8_t * p, struct block *b, int n)
1395 {
1396         int i;
1397         struct block *next;
1398
1399         /* could be slicker here, since read_from_block is smart */
1400         for (; b != NULL; b = next) {
1401                 i = BLEN(b);
1402                 if (i > n) {
1403                         /* partial block, consume some */
1404                         read_from_block(b, p, n);
1405                         return b;
1406                 }
1407                 /* full block, consume all and move on */
1408                 i = read_from_block(b, p, i);
1409                 n -= i;
1410                 p += i;
1411                 next = b->next;
1412                 freeb(b);
1413         }
1414         return NULL;
1415 }
1416
1417 /*
1418  *  copy the contents of memory into a string of blocks.
1419  *  return NULL on error.
1420  */
1421 struct block *mem2bl(uint8_t * p, int len)
1422 {
1423         ERRSTACK(1);
1424         int n;
1425         struct block *b, *first, **l;
1426
1427         first = NULL;
1428         l = &first;
1429         if (waserror()) {
1430                 freeblist(first);
1431                 nexterror();
1432         }
1433         do {
1434                 n = len;
1435                 if (n > Maxatomic)
1436                         n = Maxatomic;
1437
1438                 *l = b = block_alloc(n, MEM_WAIT);
1439                 /* TODO consider extra_data */
1440                 memmove(b->wp, p, n);
1441                 b->wp += n;
1442                 p += n;
1443                 len -= n;
1444                 l = &b->next;
1445         } while (len > 0);
1446         poperror();
1447
1448         return first;
1449 }
1450
1451 /*
1452  *  put a block back to the front of the queue
1453  *  called with q ilocked
1454  */
1455 void qputback(struct queue *q, struct block *b)
1456 {
1457         b->next = q->bfirst;
1458         if (q->bfirst == NULL)
1459                 q->blast = b;
1460         q->bfirst = b;
1461         q->len += BALLOC(b);
1462         q->dlen += BLEN(b);
1463 }
1464
1465 /*
1466  *  flow control, get producer going again
1467  *  called with q ilocked
1468  */
1469 static void qwakeup_iunlock(struct queue *q)
1470 {
1471         int dowakeup = 0;
1472
1473         /* if writer flow controlled, restart */
1474         if ((q->state & Qflow) && q->len < q->limit / 2) {
1475                 q->state &= ~Qflow;
1476                 dowakeup = 1;
1477         }
1478
1479         spin_unlock_irqsave(&q->lock);
1480
1481         /* wakeup flow controlled writers */
1482         if (dowakeup) {
1483                 if (q->kick)
1484                         q->kick(q->arg);
1485                 rendez_wakeup(&q->wr);
1486         }
1487         qwake_cb(q, FDTAP_FILT_WRITABLE);
1488 }
1489
1490 /*
1491  *  get next block from a queue (up to a limit)
1492  */
1493 struct block *qbread(struct queue *q, int len)
1494 {
1495         struct block *b, *nb;
1496         int n;
1497
1498         if (!qwait_and_ilock(q)) {
1499                 /* queue closed */
1500                 spin_unlock_irqsave(&q->lock);
1501                 return NULL;
1502         }
1503
1504         /* if we get here, there's at least one block in the queue */
1505         b = qremove(q);
1506         n = BLEN(b);
1507
1508         /* split block if it's too big and this is not a message queue */
1509         nb = b;
1510         if (n > len) {
1511                 PANIC_EXTRA(b);
1512                 if ((q->state & Qmsg) == 0) {
1513                         n -= len;
1514                         b = block_alloc(n, MEM_WAIT);
1515                         memmove(b->wp, nb->rp + len, n);
1516                         b->wp += n;
1517                         qputback(q, b);
1518                 }
1519                 nb->wp = nb->rp + len;
1520         }
1521
1522         /* restart producer */
1523         qwakeup_iunlock(q);
1524
1525         return nb;
1526 }
1527
1528 /*
1529  *  read a queue.  if no data is queued, post a struct block
1530  *  and wait on its Rendez.
1531  */
1532 long qread(struct queue *q, void *vp, int len)
1533 {
1534         struct block *b, *first, **l;
1535         int m, n;
1536
1537 again:
1538         if (!qwait_and_ilock(q)) {
1539                 /* queue closed */
1540                 spin_unlock_irqsave(&q->lock);
1541                 return 0;
1542         }
1543
1544         /* if we get here, there's at least one block in the queue */
1545         // TODO: Consider removing the Qcoalesce flag and force a coalescing
1546         // strategy by default.
1547         if (q->state & Qcoalesce) {
1548                 /* when coalescing, 0 length blocks just go away */
1549                 b = q->bfirst;
1550                 if (BLEN(b) <= 0) {
1551                         freeb(qremove(q));
1552                         spin_unlock_irqsave(&q->lock);
1553                         goto again;
1554                 }
1555
1556                 /*  grab the first block plus as many
1557                  *  following blocks as will completely
1558                  *  fit in the read.
1559                  */
1560                 n = 0;
1561                 l = &first;
1562                 m = BLEN(b);
1563                 for (;;) {
1564                         *l = qremove(q);
1565                         l = &b->next;
1566                         n += m;
1567
1568                         b = q->bfirst;
1569                         if (b == NULL)
1570                                 break;
1571                         m = BLEN(b);
1572                         if (n + m > len)
1573                                 break;
1574                 }
1575         } else {
1576                 first = qremove(q);
1577                 n = BLEN(first);
1578         }
1579         b = bl2mem(vp, first, len);
1580         /* take care of any left over partial block */
1581         if (b != NULL) {
1582                 n -= BLEN(b);
1583                 if (q->state & Qmsg)
1584                         freeb(b);
1585                 else
1586                         qputback(q, b);
1587         }
1588
1589         /* restart producer */
1590         qwakeup_iunlock(q);
1591
1592         return n;
1593 }
1594
1595 static int qnotfull(void *a)
1596 {
1597         struct queue *q = a;
1598
1599         return q->len < q->limit || (q->state & Qclosed);
1600 }
1601
1602 uint32_t dropcnt;
1603
1604 /*
1605  *  add a block to a queue obeying flow control
1606  */
1607 long qbwrite(struct queue *q, struct block *b)
1608 {
1609         int n, dowakeup;
1610         bool was_empty;
1611
1612         n = BLEN(b);
1613
1614         if (q->bypass) {
1615                 (*q->bypass) (q->arg, b);
1616                 return n;
1617         }
1618
1619         dowakeup = 0;
1620
1621         spin_lock_irqsave(&q->lock);
1622         was_empty = q->len == 0;
1623
1624         /* give up if the queue is closed */
1625         if (q->state & Qclosed) {
1626                 spin_unlock_irqsave(&q->lock);
1627                 freeb(b);
1628                 if (q->err[0])
1629                         error(EFAIL, q->err);
1630                 else
1631                         error(EFAIL, "connection closed");
1632         }
1633
1634         /* if nonblocking, don't queue over the limit */
1635         if (q->len >= q->limit) {
1636                 /* drop overflow takes priority over regular non-blocking */
1637                 if (q->state & Qdropoverflow) {
1638                         spin_unlock_irqsave(&q->lock);
1639                         freeb(b);
1640                         dropcnt += n;
1641                         return n;
1642                 }
1643                 if (q->state & Qnonblock) {
1644                         spin_unlock_irqsave(&q->lock);
1645                         freeb(b);
1646                         error(EAGAIN, "queue full");
1647                 }
1648         }
1649
1650         /* queue the block */
1651         if (q->bfirst)
1652                 q->blast->next = b;
1653         else
1654                 q->bfirst = b;
1655         q->blast = b;
1656         b->next = 0;
1657         q->len += BALLOC(b);
1658         q->dlen += n;
1659         QDEBUG checkb(b, "qbwrite");
1660         b = NULL;
1661
1662         /* make sure other end gets awakened */
1663         if (q->state & Qstarve) {
1664                 q->state &= ~Qstarve;
1665                 dowakeup = 1;
1666         }
1667         spin_unlock_irqsave(&q->lock);
1668
1669         /*  get output going again */
1670         if (q->kick && (dowakeup || (q->state & Qkick)))
1671                 q->kick(q->arg);
1672
1673         /* wakeup anyone consuming at the other end */
1674         if (dowakeup)
1675                 rendez_wakeup(&q->rr);
1676         if (was_empty)
1677                 qwake_cb(q, FDTAP_FILT_READABLE);
1678
1679         /*
1680          *  flow control, wait for queue to get below the limit
1681          *  before allowing the process to continue and queue
1682          *  more.  We do this here so that postnote can only
1683          *  interrupt us after the data has been queued.  This
1684          *  means that things like 9p flushes and ssl messages
1685          *  will not be disrupted by software interrupts.
1686          *
1687          *  Note - this is moderately dangerous since a process
1688          *  that keeps getting interrupted and rewriting will
1689          *  queue infinite crud.
1690          */
1691         for (;;) {
1692                 if ((q->state & (Qdropoverflow | Qnonblock)) || qnotfull(q))
1693                         break;
1694
1695                 spin_lock_irqsave(&q->lock);
1696                 q->state |= Qflow;
1697                 spin_unlock_irqsave(&q->lock);
1698                 rendez_sleep(&q->wr, qnotfull, q);
1699         }
1700
1701         return n;
1702 }
1703
1704 long qibwrite(struct queue *q, struct block *b)
1705 {
1706         int n, dowakeup;
1707         bool was_empty;
1708
1709         dowakeup = 0;
1710
1711         n = BLEN(b);
1712
1713         spin_lock_irqsave(&q->lock);
1714         was_empty = q->len == 0;
1715
1716         QDEBUG checkb(b, "qibwrite");
1717         if (q->bfirst)
1718                 q->blast->next = b;
1719         else
1720                 q->bfirst = b;
1721         q->blast = b;
1722         q->len += BALLOC(b);
1723         q->dlen += n;
1724
1725         if (q->state & Qstarve) {
1726                 q->state &= ~Qstarve;
1727                 dowakeup = 1;
1728         }
1729
1730         spin_unlock_irqsave(&q->lock);
1731
1732         if (dowakeup) {
1733                 if (q->kick)
1734                         q->kick(q->arg);
1735                 rendez_wakeup(&q->rr);
1736         }
1737         if (was_empty)
1738                 qwake_cb(q, FDTAP_FILT_READABLE);
1739
1740         return n;
1741 }
1742
1743 /*
1744  *  write to a queue.  only Maxatomic bytes at a time is atomic.
1745  */
1746 int qwrite(struct queue *q, void *vp, int len)
1747 {
1748         int n, sofar;
1749         struct block *b;
1750         uint8_t *p = vp;
1751         void *ext_buf;
1752
1753         QDEBUG if (!islo())
1754                  printd("qwrite hi %p\n", getcallerpc(&q));
1755
1756         sofar = 0;
1757         do {
1758                 n = len - sofar;
1759                 /* This is 64K, the max amount per single block.  Still a good value? */
1760                 if (n > Maxatomic)
1761                         n = Maxatomic;
1762
1763                 /* If n is small, we don't need to bother with the extra_data.  But
1764                  * until the whole stack can handle extd blocks, we'll use them
1765                  * unconditionally. */
1766 #ifdef CONFIG_BLOCK_EXTRAS
1767                 /* allocb builds in 128 bytes of header space to all blocks, but this is
1768                  * only available via padblock (to the left).  we also need some space
1769                  * for pullupblock for some basic headers (like icmp) that get written
1770                  * in directly */
1771                 b = block_alloc(64, MEM_WAIT);
1772                 ext_buf = kmalloc(n, 0);
1773                 memcpy(ext_buf, p + sofar, n);
1774                 block_add_extd(b, 1, MEM_WAIT); /* returns 0 on success */
1775                 b->extra_data[0].base = (uintptr_t)ext_buf;
1776                 b->extra_data[0].off = 0;
1777                 b->extra_data[0].len = n;
1778                 b->extra_len += n;
1779 #else
1780                 b = block_alloc(n, MEM_WAIT);
1781                 memmove(b->wp, p + sofar, n);
1782                 b->wp += n;
1783 #endif
1784                         
1785                 qbwrite(q, b);
1786
1787                 sofar += n;
1788         } while (sofar < len && (q->state & Qmsg) == 0);
1789
1790         return len;
1791 }
1792
1793 /*
1794  *  used by print() to write to a queue.  Since we may be splhi or not in
1795  *  a process, don't qlock.
1796  */
1797 int qiwrite(struct queue *q, void *vp, int len)
1798 {
1799         int n, sofar;
1800         struct block *b;
1801         uint8_t *p = vp;
1802
1803         sofar = 0;
1804         do {
1805                 n = len - sofar;
1806                 if (n > Maxatomic)
1807                         n = Maxatomic;
1808
1809                 b = block_alloc(n, MEM_ATOMIC);
1810                 if (b == NULL)
1811                         break;
1812                 /* TODO consider extra_data */
1813                 memmove(b->wp, p + sofar, n);
1814                 /* this adjusts BLEN to be n, or at least it should */
1815                 b->wp += n;
1816                 assert(n == BLEN(b));
1817                 qibwrite(q, b);
1818
1819                 sofar += n;
1820         } while (sofar < len && (q->state & Qmsg) == 0);
1821
1822         return sofar;
1823 }
1824
1825 /*
1826  *  be extremely careful when calling this,
1827  *  as there is no reference accounting
1828  */
1829 void qfree(struct queue *q)
1830 {
1831         qclose(q);
1832         kfree(q);
1833 }
1834
1835 /*
1836  *  Mark a queue as closed.  No further IO is permitted.
1837  *  All blocks are released.
1838  */
1839 void qclose(struct queue *q)
1840 {
1841         struct block *bfirst;
1842
1843         if (q == NULL)
1844                 return;
1845
1846         /* mark it */
1847         spin_lock_irqsave(&q->lock);
1848         q->state |= Qclosed;
1849         q->state &= ~(Qflow | Qstarve | Qdropoverflow | Qnonblock);
1850         q->err[0] = 0;
1851         bfirst = q->bfirst;
1852         q->bfirst = 0;
1853         q->len = 0;
1854         q->dlen = 0;
1855         spin_unlock_irqsave(&q->lock);
1856
1857         /* free queued blocks */
1858         freeblist(bfirst);
1859
1860         /* wake up readers/writers */
1861         rendez_wakeup(&q->rr);
1862         rendez_wakeup(&q->wr);
1863         qwake_cb(q, FDTAP_FILT_HANGUP);
1864 }
1865
1866 /* Mark a queue as closed.  Wakeup any readers.  Don't remove queued blocks.
1867  *
1868  * msg will be the errstr received by any waiters (qread, qbread, etc).  If
1869  * there is no message, which is what also happens during a natural qclose(),
1870  * those waiters will simply return 0.  qwriters will always error() on a
1871  * closed/hungup queue. */
1872 void qhangup(struct queue *q, char *msg)
1873 {
1874         /* mark it */
1875         spin_lock_irqsave(&q->lock);
1876         q->state |= Qclosed;
1877         if (msg == 0 || *msg == 0)
1878                 q->err[0] = 0;
1879         else
1880                 strlcpy(q->err, msg, ERRMAX);
1881         spin_unlock_irqsave(&q->lock);
1882
1883         /* wake up readers/writers */
1884         rendez_wakeup(&q->rr);
1885         rendez_wakeup(&q->wr);
1886         qwake_cb(q, FDTAP_FILT_HANGUP);
1887 }
1888
1889 /*
1890  *  return non-zero if the q is hungup
1891  */
1892 int qisclosed(struct queue *q)
1893 {
1894         return q->state & Qclosed;
1895 }
1896
1897 /*
1898  *  mark a queue as no longer hung up.  resets the wake_cb.
1899  */
1900 void qreopen(struct queue *q)
1901 {
1902         spin_lock_irqsave(&q->lock);
1903         q->state &= ~Qclosed;
1904         q->state |= Qstarve;
1905         q->eof = 0;
1906         q->limit = q->inilim;
1907         q->wake_cb = 0;
1908         q->wake_data = 0;
1909         spin_unlock_irqsave(&q->lock);
1910 }
1911
1912 /*
1913  *  return bytes queued
1914  */
1915 int qlen(struct queue *q)
1916 {
1917         return q->dlen;
1918 }
1919
1920 /*
1921  * return space remaining before flow control
1922  */
1923 int qwindow(struct queue *q)
1924 {
1925         int l;
1926
1927         l = q->limit - q->len;
1928         if (l < 0)
1929                 l = 0;
1930         return l;
1931 }
1932
1933 /*
1934  *  return true if we can read without blocking
1935  */
1936 int qcanread(struct queue *q)
1937 {
1938         return q->bfirst != 0;
1939 }
1940
1941 /*
1942  *  change queue limit
1943  */
1944 void qsetlimit(struct queue *q, int limit)
1945 {
1946         q->limit = limit;
1947 }
1948
1949 /*
1950  *  set whether writes drop overflowing blocks, or if we sleep
1951  */
1952 void qdropoverflow(struct queue *q, bool onoff)
1953 {
1954         if (onoff)
1955                 q->state |= Qdropoverflow;
1956         else
1957                 q->state &= ~Qdropoverflow;
1958 }
1959
1960 /* set whether or not the queue is nonblocking, in the EAGAIN sense. */
1961 void qnonblock(struct queue *q, bool onoff)
1962 {
1963         if (onoff)
1964                 q->state |= Qnonblock;
1965         else
1966                 q->state &= ~Qnonblock;
1967 }
1968
1969 /*
1970  *  flush the output queue
1971  */
1972 void qflush(struct queue *q)
1973 {
1974         struct block *bfirst;
1975
1976         /* mark it */
1977         spin_lock_irqsave(&q->lock);
1978         bfirst = q->bfirst;
1979         q->bfirst = 0;
1980         q->len = 0;
1981         q->dlen = 0;
1982         spin_unlock_irqsave(&q->lock);
1983
1984         /* free queued blocks */
1985         freeblist(bfirst);
1986
1987         /* wake up writers */
1988         rendez_wakeup(&q->wr);
1989         qwake_cb(q, FDTAP_FILT_WRITABLE);
1990 }
1991
1992 int qfull(struct queue *q)
1993 {
1994         return q->len >= q->limit;
1995 }
1996
1997 int qstate(struct queue *q)
1998 {
1999         return q->state;
2000 }
2001
2002 void qdump(struct queue *q)
2003 {
2004         if (q)
2005                 printk("q=%p bfirst=%p blast=%p len=%d dlen=%d limit=%d state=#%x\n",
2006                            q, q->bfirst, q->blast, q->len, q->dlen, q->limit, q->state);
2007 }
2008
2009 /* On certain wakeup events, qio will call func(q, data, filter), where filter
2010  * marks the type of wakeup event (flags from FDTAP).
2011  *
2012  * There's no sync protection.  If you change the CB while the qio is running,
2013  * you might get a CB with the data or func from a previous set_wake_cb.  You
2014  * should set this once per queue and forget it.
2015  *
2016  * You can remove the CB by passing in 0 for the func.  Alternatively, you can
2017  * just make sure that the func(data) pair are valid until the queue is freed or
2018  * reopened. */
2019 void qio_set_wake_cb(struct queue *q, qio_wake_cb_t func, void *data)
2020 {
2021         q->wake_data = data;
2022         wmb();  /* if we see func, we'll also see the data for it */
2023         q->wake_cb = func;
2024 }