5e827d86f57282036e88efa26ed59795c3ae8081
[akaros.git] / kern / src / ns / qio.c
1 // INFERNO
2 #include <vfs.h>
3 #include <kfs.h>
4 #include <slab.h>
5 #include <kmalloc.h>
6 #include <kref.h>
7 #include <string.h>
8 #include <stdio.h>
9 #include <assert.h>
10 #include <error.h>
11 #include <cpio.h>
12 #include <pmap.h>
13 #include <smp.h>
14 #include <ip.h>
15
16 #define PANIC_EXTRA(b)                                                  \
17 {                                                                       \
18         if ((b)->extra_len) {                                           \
19                 printblock(b);                                          \
20                 backtrace();                                            \
21                 panic("%s doesn't handle extra_data", __FUNCTION__);    \
22         }                                                               \
23 }
24
25 static uint32_t padblockcnt;
26 static uint32_t concatblockcnt;
27 static uint32_t pullupblockcnt;
28 static uint32_t copyblockcnt;
29 static uint32_t consumecnt;
30 static uint32_t producecnt;
31 static uint32_t qcopycnt;
32
33 static int debugging;
34
35 #define QDEBUG  if(0)
36
37 /*
38  *  IO queues
39  */
40
41 struct queue {
42         spinlock_t lock;;
43
44         struct block *bfirst;           /* buffer */
45         struct block *blast;
46
47         int len;                                        /* bytes allocated to queue */
48         int dlen;                                       /* data bytes in queue */
49         int limit;                                      /* max bytes in queue */
50         int inilim;                             /* initial limit */
51         int state;
52         int eof;                                        /* number of eofs read by user */
53
54         void (*kick) (void *);          /* restart output */
55         void (*bypass) (void *, struct block *);        /* bypass queue altogether */
56         void *arg;                                      /* argument to kick */
57
58         qlock_t rlock;                          /* mutex for reading processes */
59         struct rendez rr;                       /* process waiting to read */
60         qlock_t wlock;                          /* mutex for writing processes */
61         struct rendez wr;                       /* process waiting to write */
62         qio_wake_cb_t wake_cb;          /* callbacks for qio wakeups */
63         void *wake_data;
64
65         char err[ERRMAX];
66 };
67
68 enum {
69         Maxatomic = 64 * 1024,
70 };
71
72 unsigned int qiomaxatomic = Maxatomic;
73
74 /* Helper: fires a wake callback, sending 'filter' */
75 static void qwake_cb(struct queue *q, int filter)
76 {
77         if (q->wake_cb)
78                 q->wake_cb(q, q->wake_data, filter);
79 }
80
81 void ixsummary(void)
82 {
83         debugging ^= 1;
84         iallocsummary();
85         printd("pad %lu, concat %lu, pullup %lu, copy %lu\n",
86                    padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
87         printd("consume %lu, produce %lu, qcopy %lu\n",
88                    consumecnt, producecnt, qcopycnt);
89 }
90
91 /*
92  *  free a list of blocks
93  */
94 void freeblist(struct block *b)
95 {
96         struct block *next;
97
98         for (; b != 0; b = next) {
99                 next = b->next;
100                 b->next = 0;
101                 freeb(b);
102         }
103 }
104
105 /*
106  *  pad a block to the front (or the back if size is negative)
107  */
108 struct block *padblock(struct block *bp, int size)
109 {
110         int n;
111         struct block *nbp;
112         uint8_t bcksum = bp->flag & BCKSUM_FLAGS;
113         uint16_t checksum_start = bp->checksum_start;
114         uint16_t checksum_offset = bp->checksum_offset;
115         uint16_t mss = bp->mss;
116
117         QDEBUG checkb(bp, "padblock 1");
118         if (size >= 0) {
119                 if (bp->rp - bp->base >= size) {
120                         bp->checksum_start += size;
121                         bp->rp -= size;
122                         return bp;
123                 }
124
125                 PANIC_EXTRA(bp);
126                 if (bp->next)
127                         panic("padblock %p", getcallerpc(&bp));
128                 n = BLEN(bp);
129                 padblockcnt++;
130                 nbp = allocb(size + n);
131                 nbp->rp += size;
132                 nbp->wp = nbp->rp;
133                 memmove(nbp->wp, bp->rp, n);
134                 nbp->wp += n;
135                 freeb(bp);
136                 nbp->rp -= size;
137         } else {
138                 size = -size;
139
140                 PANIC_EXTRA(bp);
141
142                 if (bp->next)
143                         panic("padblock %p", getcallerpc(&bp));
144
145                 if (bp->lim - bp->wp >= size)
146                         return bp;
147
148                 n = BLEN(bp);
149                 padblockcnt++;
150                 nbp = allocb(size + n);
151                 memmove(nbp->wp, bp->rp, n);
152                 nbp->wp += n;
153                 freeb(bp);
154         }
155         if (bcksum) {
156                 nbp->flag |= bcksum;
157                 nbp->checksum_start = checksum_start;
158                 nbp->checksum_offset = checksum_offset;
159                 nbp->mss = mss;
160         }
161         QDEBUG checkb(nbp, "padblock 1");
162         return nbp;
163 }
164
165 /*
166  *  return count of bytes in a string of blocks
167  */
168 int blocklen(struct block *bp)
169 {
170         int len;
171
172         len = 0;
173         while (bp) {
174                 len += BLEN(bp);
175                 bp = bp->next;
176         }
177         return len;
178 }
179
180 /*
181  * return count of space in blocks
182  */
183 int blockalloclen(struct block *bp)
184 {
185         int len;
186
187         len = 0;
188         while (bp) {
189                 len += BALLOC(bp);
190                 bp = bp->next;
191         }
192         return len;
193 }
194
195 /*
196  *  copy the  string of blocks into
197  *  a single block and free the string
198  */
199 struct block *concatblock(struct block *bp)
200 {
201         int len;
202         struct block *nb, *f;
203
204         if (bp->next == 0)
205                 return bp;
206
207         /* probably use parts of qclone */
208         PANIC_EXTRA(bp);
209         nb = allocb(blocklen(bp));
210         for (f = bp; f; f = f->next) {
211                 len = BLEN(f);
212                 memmove(nb->wp, f->rp, len);
213                 nb->wp += len;
214         }
215         concatblockcnt += BLEN(nb);
216         freeblist(bp);
217         QDEBUG checkb(nb, "concatblock 1");
218         return nb;
219 }
220
221 /* Returns a block with the remaining contents of b all in the main body of the
222  * returned block.  Replace old references to b with the returned value (which
223  * may still be 'b', if no change was needed. */
224 struct block *linearizeblock(struct block *b)
225 {
226         struct block *newb;
227         size_t len;
228         struct extra_bdata *ebd;
229
230         if (!b->extra_len)
231                 return b;
232
233         newb = allocb(BLEN(b));
234         len = BHLEN(b);
235         memcpy(newb->wp, b->rp, len);
236         newb->wp += len;
237         len = b->extra_len;
238         for (int i = 0; (i < b->nr_extra_bufs) && len; i++) {
239                 ebd = &b->extra_data[i];
240                 if (!ebd->base || !ebd->len)
241                         continue;
242                 memcpy(newb->wp, (void*)(ebd->base + ebd->off), ebd->len);
243                 newb->wp += ebd->len;
244                 len -= ebd->len;
245         }
246         /* TODO: any other flags that need copied over? */
247         if (b->flag & BCKSUM_FLAGS) {
248                 newb->flag |= (b->flag & BCKSUM_FLAGS);
249                 newb->checksum_start = b->checksum_start;
250                 newb->checksum_offset = b->checksum_offset;
251                 newb->mss = b->mss;
252         }
253         freeb(b);
254         return newb;
255 }
256
257 /*
258  *  make sure the first block has at least n bytes in its main body
259  */
260 struct block *pullupblock(struct block *bp, int n)
261 {
262         int i, len, seglen;
263         struct block *nbp;
264         struct extra_bdata *ebd;
265
266         /*
267          *  this should almost always be true, it's
268          *  just to avoid every caller checking.
269          */
270         if (BHLEN(bp) >= n)
271                 return bp;
272
273          /* a start at explicit main-body / header management */
274         if (bp->extra_len) {
275                 if (n > bp->lim - bp->rp) {
276                         /* would need to realloc a new block and copy everything over. */
277                         panic("can't pullup %d bytes, no place to put it: bp->lim %p, bp->rp %p, bp->lim-bp->rp %d\n",
278                                         n, bp->lim, bp->rp, bp->lim-bp->rp);
279                 }
280                 len = n - BHLEN(bp);
281                 if (len > bp->extra_len)
282                         panic("pullup more than extra (%d, %d, %d)\n",
283                               n, BHLEN(bp), bp->extra_len);
284                 checkb(bp, "before pullup");
285                 for (int i = 0; (i < bp->nr_extra_bufs) && len; i++) {
286                         ebd = &bp->extra_data[i];
287                         if (!ebd->base || !ebd->len)
288                                 continue;
289                         seglen = MIN(ebd->len, len);
290                         memcpy(bp->wp, (void*)(ebd->base + ebd->off), seglen);
291                         bp->wp += seglen;
292                         len -= seglen;
293                         ebd->len -= seglen;
294                         ebd->off += seglen;
295                         bp->extra_len -= seglen;
296                         if (ebd->len == 0) {
297                                 kfree((void *)ebd->base);
298                                 ebd->off = 0;
299                                 ebd->base = 0;
300                         }
301                 }
302                 /* maybe just call pullupblock recursively here */
303                 if (len)
304                         panic("pullup %d bytes overdrawn\n", len);
305                 checkb(bp, "after pullup");
306                 return bp;
307         }
308
309         /*
310          *  if not enough room in the first block,
311          *  add another to the front of the list.
312          */
313         if (bp->lim - bp->rp < n) {
314                 nbp = allocb(n);
315                 nbp->next = bp;
316                 bp = nbp;
317         }
318
319         /*
320          *  copy bytes from the trailing blocks into the first
321          */
322         n -= BLEN(bp);
323         while ((nbp = bp->next)) {
324                 i = BLEN(nbp);
325                 if (i > n) {
326                         memmove(bp->wp, nbp->rp, n);
327                         pullupblockcnt++;
328                         bp->wp += n;
329                         nbp->rp += n;
330                         QDEBUG checkb(bp, "pullupblock 1");
331                         return bp;
332                 } else {
333                         memmove(bp->wp, nbp->rp, i);
334                         pullupblockcnt++;
335                         bp->wp += i;
336                         bp->next = nbp->next;
337                         nbp->next = 0;
338                         freeb(nbp);
339                         n -= i;
340                         if (n == 0) {
341                                 QDEBUG checkb(bp, "pullupblock 2");
342                                 return bp;
343                         }
344                 }
345         }
346         freeb(bp);
347         return 0;
348 }
349
350 /*
351  *  make sure the first block has at least n bytes in its main body
352  */
353 struct block *pullupqueue(struct queue *q, int n)
354 {
355         struct block *b;
356
357         /* TODO: lock to protect the queue links? */
358         if ((BHLEN(q->bfirst) >= n))
359                 return q->bfirst;
360         q->bfirst = pullupblock(q->bfirst, n);
361         for (b = q->bfirst; b != NULL && b->next != NULL; b = b->next) ;
362         q->blast = b;
363         return q->bfirst;
364 }
365
366 /* throw away count bytes from the front of
367  * block's extradata.  Returns count of bytes
368  * thrown away
369  */
370
371 static int pullext(struct block *bp, int count)
372 {
373         struct extra_bdata *ed;
374         int i, rem, bytes = 0;
375
376         for (i = 0; bp->extra_len && count && i < bp->nr_extra_bufs; i++) {
377                 ed = &bp->extra_data[i];
378                 rem = MIN(count, ed->len);
379                 bp->extra_len -= rem;
380                 count -= rem;
381                 bytes += rem;
382                 ed->off += rem;
383                 ed->len -= rem;
384                 if (ed->len == 0) {
385                         kfree((void *)ed->base);
386                         ed->base = 0;
387                         ed->off = 0;
388                 }
389         }
390         return bytes;
391 }
392
393 /* throw away count bytes from the end of a
394  * block's extradata.  Returns count of bytes
395  * thrown away
396  */
397
398 static int dropext(struct block *bp, int count)
399 {
400         struct extra_bdata *ed;
401         int i, rem, bytes = 0;
402
403         for (i = bp->nr_extra_bufs - 1; bp->extra_len && count && i >= 0; i--) {
404                 ed = &bp->extra_data[i];
405                 rem = MIN(count, ed->len);
406                 bp->extra_len -= rem;
407                 count -= rem;
408                 bytes += rem;
409                 ed->len -= rem;
410                 if (ed->len == 0) {
411                         kfree((void *)ed->base);
412                         ed->base = 0;
413                         ed->off = 0;
414                 }
415         }
416         return bytes;
417 }
418
419 /*
420  *  throw away up to count bytes from a
421  *  list of blocks.  Return count of bytes
422  *  thrown away.
423  */
424 static int _pullblock(struct block **bph, int count, int free)
425 {
426         struct block *bp;
427         int n, bytes;
428
429         bytes = 0;
430         if (bph == NULL)
431                 return 0;
432
433         while (*bph != NULL && count != 0) {
434                 bp = *bph;
435
436                 n = MIN(BHLEN(bp), count);
437                 bytes += n;
438                 count -= n;
439                 bp->rp += n;
440                 n = pullext(bp, count);
441                 bytes += n;
442                 count -= n;
443                 QDEBUG checkb(bp, "pullblock ");
444                 if (BLEN(bp) == 0 && (free || count)) {
445                         *bph = bp->next;
446                         bp->next = NULL;
447                         freeb(bp);
448                 }
449         }
450         return bytes;
451 }
452
453 int pullblock(struct block **bph, int count)
454 {
455         return _pullblock(bph, count, 1);
456 }
457
458 /*
459  *  trim to len bytes starting at offset
460  */
461 struct block *trimblock(struct block *bp, int offset, int len)
462 {
463         uint32_t l, trim;
464         int olen = len;
465
466         QDEBUG checkb(bp, "trimblock 1");
467         if (blocklen(bp) < offset + len) {
468                 freeblist(bp);
469                 return NULL;
470         }
471
472         l =_pullblock(&bp, offset, 0);
473         if (bp == NULL)
474                 return NULL;
475         if (l != offset) {
476                 freeblist(bp);
477                 return NULL;
478         }
479
480         while ((l = BLEN(bp)) < len) {
481                 len -= l;
482                 bp = bp->next;
483         }
484
485         trim = BLEN(bp) - len;
486         trim -= dropext(bp, trim);
487         bp->wp -= trim;
488
489         if (bp->next) {
490                 freeblist(bp->next);
491                 bp->next = NULL;
492         }
493         return bp;
494 }
495
496 /*
497  *  copy 'count' bytes into a new block
498  */
499 struct block *copyblock(struct block *bp, int count)
500 {
501         int l;
502         struct block *nbp;
503
504         QDEBUG checkb(bp, "copyblock 0");
505         nbp = allocb(count);
506         if (bp->flag & BCKSUM_FLAGS) {
507                 nbp->flag |= (bp->flag & BCKSUM_FLAGS);
508                 nbp->checksum_start = bp->checksum_start;
509                 nbp->checksum_offset = bp->checksum_offset;
510                 nbp->mss = bp->mss;
511         }
512         PANIC_EXTRA(bp);
513         for (; count > 0 && bp != 0; bp = bp->next) {
514                 l = BLEN(bp);
515                 if (l > count)
516                         l = count;
517                 memmove(nbp->wp, bp->rp, l);
518                 nbp->wp += l;
519                 count -= l;
520         }
521         if (count > 0) {
522                 memset(nbp->wp, 0, count);
523                 nbp->wp += count;
524         }
525         copyblockcnt++;
526         QDEBUG checkb(nbp, "copyblock 1");
527
528         return nbp;
529 }
530
531 /* Adjust block @bp so that its size is exactly @len.
532  * If the size is increased, fill in the new contents with zeros.
533  * If the size is decreased, discard some of the old contents at the tail. */
534 struct block *adjustblock(struct block *bp, int len)
535 {
536         struct extra_bdata *ebd;
537         int i;
538
539         if (len < 0) {
540                 freeb(bp);
541                 return NULL;
542         }
543
544         if (len == BLEN(bp))
545                 return bp;
546
547         /* Shrink within block main body. */
548         if (len <= BHLEN(bp)) {
549                 free_block_extra(bp);
550                 bp->wp = bp->rp + len;
551                 QDEBUG checkb(bp, "adjustblock 1");
552                 return bp;
553         }
554
555         /* Need to grow. */
556         if (len > BLEN(bp)) {
557                 /* Grow within block main body. */
558                 if (bp->extra_len == 0 && bp->rp + len <= bp->lim) {
559                         memset(bp->wp, 0, len - BLEN(bp));
560                         bp->wp = bp->rp + len;
561                         QDEBUG checkb(bp, "adjustblock 2");
562                         return bp;
563                 }
564                 /* Grow with extra data buffers. */
565                 block_append_extra(bp, len - BLEN(bp), KMALLOC_WAIT);
566                 QDEBUG checkb(bp, "adjustblock 3");
567                 return bp;
568         }
569
570         /* Shrink extra data buffers.
571          * len is how much of ebd we need to keep.
572          * extra_len is re-accumulated. */
573         assert(bp->extra_len > 0);
574         len -= BHLEN(bp);
575         bp->extra_len = 0;
576         for (i = 0; i < bp->nr_extra_bufs; i++) {
577                 ebd = &bp->extra_data[i];
578                 if (len <= ebd->len)
579                         break;
580                 len -= ebd->len;
581                 bp->extra_len += ebd->len;
582         }
583         /* If len becomes zero, extra_data[i] should be freed. */
584         if (len > 0) {
585                 ebd = &bp->extra_data[i];
586                 ebd->len = len;
587                 bp->extra_len += ebd->len;
588                 i++;
589         }
590         for (; i < bp->nr_extra_bufs; i++) {
591                 ebd = &bp->extra_data[i];
592                 if (ebd->base)
593                         kfree((void*)ebd->base);
594                 ebd->base = ebd->off = ebd->len = 0;
595         }
596         QDEBUG checkb(bp, "adjustblock 4");
597         return bp;
598 }
599
600
601 /*
602  *  get next block from a queue, return null if nothing there
603  */
604 struct block *qget(struct queue *q)
605 {
606         int dowakeup;
607         struct block *b;
608
609         /* sync with qwrite */
610         spin_lock_irqsave(&q->lock);
611
612         b = q->bfirst;
613         if (b == NULL) {
614                 q->state |= Qstarve;
615                 spin_unlock_irqsave(&q->lock);
616                 return NULL;
617         }
618         q->bfirst = b->next;
619         b->next = 0;
620         q->len -= BALLOC(b);
621         q->dlen -= BLEN(b);
622         QDEBUG checkb(b, "qget");
623
624         /* if writer flow controlled, restart */
625         if ((q->state & Qflow) && q->len < q->limit / 2) {
626                 q->state &= ~Qflow;
627                 dowakeup = 1;
628         } else
629                 dowakeup = 0;
630
631         spin_unlock_irqsave(&q->lock);
632
633         if (dowakeup) {
634                 rendez_wakeup(&q->wr);
635                 /* We only send the writable event on wakeup, which is edge triggered */
636                 qwake_cb(q, FDTAP_FILT_WRITABLE);
637         }
638
639         return b;
640 }
641
642 /*
643  *  throw away the next 'len' bytes in the queue
644  * returning the number actually discarded
645  */
646 int qdiscard(struct queue *q, int len)
647 {
648         struct block *b;
649         int dowakeup, n, sofar, body_amt, extra_amt;
650         struct extra_bdata *ebd;
651
652         spin_lock_irqsave(&q->lock);
653         for (sofar = 0; sofar < len; sofar += n) {
654                 b = q->bfirst;
655                 if (b == NULL)
656                         break;
657                 QDEBUG checkb(b, "qdiscard");
658                 n = BLEN(b);
659                 if (n <= len - sofar) {
660                         q->bfirst = b->next;
661                         b->next = 0;
662                         q->len -= BALLOC(b);
663                         q->dlen -= BLEN(b);
664                         freeb(b);
665                 } else {
666                         n = len - sofar;
667                         q->dlen -= n;
668                         /* partial block removal */
669                         body_amt = MIN(BHLEN(b), n);
670                         b->rp += body_amt;
671                         extra_amt = n - body_amt;
672                         /* reduce q->len by the amount we remove from the extras.  The
673                          * header will always be accounted for above, during block removal.
674                          * */
675                         q->len -= extra_amt;
676                         for (int i = 0; (i < b->nr_extra_bufs) && extra_amt; i++) {
677                                 ebd = &b->extra_data[i];
678                                 if (!ebd->base || !ebd->len)
679                                         continue;
680                                 if (extra_amt >= ebd->len) {
681                                         /* remove the entire entry, note the kfree release */
682                                         b->extra_len -= ebd->len;
683                                         extra_amt -= ebd->len;
684                                         kfree((void*)ebd->base);
685                                         ebd->base = ebd->off = ebd->len = 0;
686                                         continue;
687                                 }
688                                 ebd->off += extra_amt;
689                                 ebd->len -= extra_amt;
690                                 b->extra_len -= extra_amt;
691                                 extra_amt = 0;
692                         }
693                 }
694         }
695
696         /*
697          *  if writer flow controlled, restart
698          *
699          *  This used to be
700          *  q->len < q->limit/2
701          *  but it slows down tcp too much for certain write sizes.
702          *  I really don't understand it completely.  It may be
703          *  due to the queue draining so fast that the transmission
704          *  stalls waiting for the app to produce more data.  - presotto
705          */
706         if ((q->state & Qflow) && q->len < q->limit) {
707                 q->state &= ~Qflow;
708                 dowakeup = 1;
709         } else
710                 dowakeup = 0;
711
712         spin_unlock_irqsave(&q->lock);
713
714         if (dowakeup) {
715                 rendez_wakeup(&q->wr);
716                 qwake_cb(q, FDTAP_FILT_WRITABLE);
717         }
718
719         return sofar;
720 }
721
722 /*
723  *  Interrupt level copy out of a queue, return # bytes copied.
724  */
725 int qconsume(struct queue *q, void *vp, int len)
726 {
727         struct block *b;
728         int n, dowakeup;
729         uint8_t *p = vp;
730         struct block *tofree = NULL;
731
732         /* sync with qwrite */
733         spin_lock_irqsave(&q->lock);
734
735         for (;;) {
736                 b = q->bfirst;
737                 if (b == 0) {
738                         q->state |= Qstarve;
739                         spin_unlock_irqsave(&q->lock);
740                         return -1;
741                 }
742                 QDEBUG checkb(b, "qconsume 1");
743
744                 n = BLEN(b);
745                 if (n > 0)
746                         break;
747                 q->bfirst = b->next;
748                 q->len -= BALLOC(b);
749
750                 /* remember to free this */
751                 b->next = tofree;
752                 tofree = b;
753         };
754
755         PANIC_EXTRA(b);
756         if (n < len)
757                 len = n;
758         memmove(p, b->rp, len);
759         consumecnt += n;
760         b->rp += len;
761         q->dlen -= len;
762
763         /* discard the block if we're done with it */
764         if ((q->state & Qmsg) || len == n) {
765                 q->bfirst = b->next;
766                 b->next = 0;
767                 q->len -= BALLOC(b);
768                 q->dlen -= BLEN(b);
769
770                 /* remember to free this */
771                 b->next = tofree;
772                 tofree = b;
773         }
774
775         /* if writer flow controlled, restart */
776         if ((q->state & Qflow) && q->len < q->limit / 2) {
777                 q->state &= ~Qflow;
778                 dowakeup = 1;
779         } else
780                 dowakeup = 0;
781
782         spin_unlock_irqsave(&q->lock);
783
784         if (dowakeup) {
785                 rendez_wakeup(&q->wr);
786                 qwake_cb(q, FDTAP_FILT_WRITABLE);
787         }
788
789         if (tofree != NULL)
790                 freeblist(tofree);
791
792         return len;
793 }
794
795 int qpass(struct queue *q, struct block *b)
796 {
797         int dlen, len, dowakeup;
798
799         /* sync with qread */
800         dowakeup = 0;
801         spin_lock_irqsave(&q->lock);
802         if (q->len >= q->limit) {
803                 freeblist(b);
804                 spin_unlock_irqsave(&q->lock);
805                 return -1;
806         }
807         if (q->state & Qclosed) {
808                 len = blocklen(b);
809                 freeblist(b);
810                 spin_unlock_irqsave(&q->lock);
811                 return len;
812         }
813
814         /* add buffer to queue */
815         if (q->bfirst)
816                 q->blast->next = b;
817         else
818                 q->bfirst = b;
819         len = BALLOC(b);
820         dlen = BLEN(b);
821         QDEBUG checkb(b, "qpass");
822         while (b->next) {
823                 b = b->next;
824                 QDEBUG checkb(b, "qpass");
825                 len += BALLOC(b);
826                 dlen += BLEN(b);
827         }
828         q->blast = b;
829         q->len += len;
830         q->dlen += dlen;
831
832         if (q->len >= q->limit / 2)
833                 q->state |= Qflow;
834
835         if (q->state & Qstarve) {
836                 q->state &= ~Qstarve;
837                 dowakeup = 1;
838         }
839         spin_unlock_irqsave(&q->lock);
840
841         if (dowakeup) {
842                 rendez_wakeup(&q->rr);
843                 qwake_cb(q, FDTAP_FILT_READABLE);
844         }
845
846         return len;
847 }
848
849 int qpassnolim(struct queue *q, struct block *b)
850 {
851         int dlen, len, dowakeup;
852
853         /* sync with qread */
854         dowakeup = 0;
855         spin_lock_irqsave(&q->lock);
856
857         if (q->state & Qclosed) {
858                 freeblist(b);
859                 spin_unlock_irqsave(&q->lock);
860                 return BALLOC(b);
861         }
862
863         /* add buffer to queue */
864         if (q->bfirst)
865                 q->blast->next = b;
866         else
867                 q->bfirst = b;
868         len = BALLOC(b);
869         dlen = BLEN(b);
870         QDEBUG checkb(b, "qpass");
871         while (b->next) {
872                 b = b->next;
873                 QDEBUG checkb(b, "qpass");
874                 len += BALLOC(b);
875                 dlen += BLEN(b);
876         }
877         q->blast = b;
878         q->len += len;
879         q->dlen += dlen;
880
881         if (q->len >= q->limit / 2)
882                 q->state |= Qflow;
883
884         if (q->state & Qstarve) {
885                 q->state &= ~Qstarve;
886                 dowakeup = 1;
887         }
888         spin_unlock_irqsave(&q->lock);
889
890         if (dowakeup) {
891                 rendez_wakeup(&q->rr);
892                 qwake_cb(q, FDTAP_FILT_READABLE);
893         }
894
895         return len;
896 }
897
898 /*
899  *  if the allocated space is way out of line with the used
900  *  space, reallocate to a smaller block
901  */
902 struct block *packblock(struct block *bp)
903 {
904         struct block **l, *nbp;
905         int n;
906
907         if (bp->extra_len)
908                 return bp;
909         for (l = &bp; *l; l = &(*l)->next) {
910                 nbp = *l;
911                 n = BLEN(nbp);
912                 if ((n << 2) < BALLOC(nbp)) {
913                         *l = allocb(n);
914                         memmove((*l)->wp, nbp->rp, n);
915                         (*l)->wp += n;
916                         (*l)->next = nbp->next;
917                         freeb(nbp);
918                 }
919         }
920
921         return bp;
922 }
923
924 int qproduce(struct queue *q, void *vp, int len)
925 {
926         struct block *b;
927         int dowakeup;
928         uint8_t *p = vp;
929
930         /* sync with qread */
931         dowakeup = 0;
932         spin_lock_irqsave(&q->lock);
933
934         /* no waiting receivers, room in buffer? */
935         if (q->len >= q->limit) {
936                 q->state |= Qflow;
937                 spin_unlock_irqsave(&q->lock);
938                 return -1;
939         }
940
941         /* save in buffer */
942         /* use Qcoalesce here to save storage */
943         // TODO: Consider removing the Qcoalesce flag and force a coalescing
944         // strategy by default.
945         b = q->blast;
946         if ((q->state & Qcoalesce) == 0 || q->bfirst == NULL
947                 || b->lim - b->wp < len) {
948                 /* need a new block */
949                 b = iallocb(len);
950                 if (b == 0) {
951                         spin_unlock_irqsave(&q->lock);
952                         return 0;
953                 }
954                 if (q->bfirst)
955                         q->blast->next = b;
956                 else
957                         q->bfirst = b;
958                 q->blast = b;
959                 /* b->next = 0; done by iallocb() */
960                 q->len += BALLOC(b);
961         }
962         PANIC_EXTRA(b);
963         memmove(b->wp, p, len);
964         producecnt += len;
965         b->wp += len;
966         q->dlen += len;
967         QDEBUG checkb(b, "qproduce");
968
969         if (q->state & Qstarve) {
970                 q->state &= ~Qstarve;
971                 dowakeup = 1;
972         }
973
974         if (q->len >= q->limit)
975                 q->state |= Qflow;
976         spin_unlock_irqsave(&q->lock);
977
978         if (dowakeup) {
979                 rendez_wakeup(&q->rr);
980                 qwake_cb(q, FDTAP_FILT_READABLE);
981         }
982
983         return len;
984 }
985
986 /* Add an extra_data entry to newb at newb_idx pointing to b's body, starting at
987  * body_rp, for up to len.  Returns the len consumed. 
988  *
989  * The base is 'b', so that we can kfree it later.  This currently ties us to
990  * using kfree for the release method for all extra_data.
991  *
992  * It is possible to have a body size that is 0, if there is no offset, and
993  * b->wp == b->rp.  This will have an extra data entry of 0 length. */
994 static size_t point_to_body(struct block *b, uint8_t *body_rp,
995                             struct block *newb, unsigned int newb_idx,
996                             size_t len)
997 {
998         struct extra_bdata *ebd = &newb->extra_data[newb_idx];
999
1000         assert(newb_idx < newb->nr_extra_bufs);
1001
1002         kmalloc_incref(b);
1003         ebd->base = (uintptr_t)b;
1004         ebd->off = (uint32_t)(body_rp - (uint8_t*)b);
1005         ebd->len = MIN(b->wp - body_rp, len);   /* think of body_rp as b->rp */
1006         assert((int)ebd->len >= 0);
1007         newb->extra_len += ebd->len;
1008         return ebd->len;
1009 }
1010
1011 /* Add an extra_data entry to newb at newb_idx pointing to b's b_idx'th
1012  * extra_data buf, at b_off within that buffer, for up to len.  Returns the len
1013  * consumed.
1014  *
1015  * We can have blocks with 0 length, but they are still refcnt'd.  See above. */
1016 static size_t point_to_buf(struct block *b, unsigned int b_idx, uint32_t b_off,
1017                            struct block *newb, unsigned int newb_idx,
1018                            size_t len)
1019 {
1020         struct extra_bdata *n_ebd = &newb->extra_data[newb_idx];
1021         struct extra_bdata *b_ebd = &b->extra_data[b_idx];
1022
1023         assert(b_idx < b->nr_extra_bufs);
1024         assert(newb_idx < newb->nr_extra_bufs);
1025
1026         kmalloc_incref((void*)b_ebd->base);
1027         n_ebd->base = b_ebd->base;
1028         n_ebd->off = b_ebd->off + b_off;
1029         n_ebd->len = MIN(b_ebd->len - b_off, len);
1030         newb->extra_len += n_ebd->len;
1031         return n_ebd->len;
1032 }
1033
1034 /* given a string of blocks, fills the new block's extra_data  with the contents
1035  * of the blist [offset, len + offset)
1036  *
1037  * returns 0 on success.  the only failure is if the extra_data array was too
1038  * small, so this returns a positive integer saying how big the extra_data needs
1039  * to be.
1040  *
1041  * callers are responsible for protecting the list structure. */
1042 static int __blist_clone_to(struct block *blist, struct block *newb, int len,
1043                             uint32_t offset)
1044 {
1045         struct block *b, *first;
1046         unsigned int nr_bufs = 0;
1047         unsigned int b_idx, newb_idx = 0;
1048         uint8_t *first_main_body = 0;
1049
1050         /* find the first block; keep offset relative to the latest b in the list */
1051         for (b = blist; b; b = b->next) {
1052                 if (BLEN(b) > offset)
1053                         break;
1054                 offset -= BLEN(b);
1055         }
1056         /* qcopy semantics: if you asked for an offset outside the block list, you
1057          * get an empty block back */
1058         if (!b)
1059                 return 0;
1060         first = b;
1061         /* upper bound for how many buffers we'll need in newb */
1062         for (/* b is set*/; b; b = b->next) {
1063                 nr_bufs += 1 + b->nr_extra_bufs;        /* 1 for the main body */
1064         }
1065         /* we might be holding a spinlock here, so we won't wait for kmalloc */
1066         if (block_add_extd(newb, nr_bufs, 0) != 0) {
1067                 /* caller will need to alloc these, then re-call us */
1068                 return nr_bufs;
1069         }
1070         for (b = first; b && len; b = b->next) {
1071                 b_idx = 0;
1072                 if (offset) {
1073                         if (offset < BHLEN(b)) {
1074                                 /* off is in the main body */
1075                                 len -= point_to_body(b, b->rp + offset, newb, newb_idx, len);
1076                                 newb_idx++;
1077                         } else {
1078                                 /* off is in one of the buffers (or just past the last one).
1079                                  * we're not going to point to b's main body at all. */
1080                                 offset -= BHLEN(b);
1081                                 assert(b->extra_data);
1082                                 /* assuming these extrabufs are packed, or at least that len
1083                                  * isn't gibberish */
1084                                 while (b->extra_data[b_idx].len <= offset) {
1085                                         offset -= b->extra_data[b_idx].len;
1086                                         b_idx++;
1087                                 }
1088                                 /* now offset is set to our offset in the b_idx'th buf */
1089                                 len -= point_to_buf(b, b_idx, offset, newb, newb_idx, len);
1090                                 newb_idx++;
1091                                 b_idx++;
1092                         }
1093                         offset = 0;
1094                 } else {
1095                         len -= point_to_body(b, b->rp, newb, newb_idx, len);
1096                         newb_idx++;
1097                 }
1098                 /* knock out all remaining bufs.  we only did one point_to_ op by now,
1099                  * and any point_to_ could be our last if it consumed all of len. */
1100                 for (int i = b_idx; (i < b->nr_extra_bufs) && len; i++) {
1101                         len -= point_to_buf(b, i, 0, newb, newb_idx, len);
1102                         newb_idx++;
1103                 }
1104         }
1105         return 0;
1106 }
1107
1108 struct block *blist_clone(struct block *blist, int header_len, int len,
1109                           uint32_t offset)
1110 {
1111         int ret;
1112         struct block *newb = allocb(header_len);
1113         do {
1114                 ret = __blist_clone_to(blist, newb, len, offset);
1115                 if (ret)
1116                         block_add_extd(newb, ret, KMALLOC_WAIT);
1117         } while (ret);
1118         return newb;
1119 }
1120
1121 /* given a queue, makes a single block with header_len reserved space in the
1122  * block main body, and the contents of [offset, len + offset) pointed to in the
1123  * new blocks ext_data. */
1124 struct block *qclone(struct queue *q, int header_len, int len, uint32_t offset)
1125 {
1126         int ret;
1127         struct block *newb = allocb(header_len);
1128         /* the while loop should rarely be used: it would require someone
1129          * concurrently adding to the queue. */
1130         do {
1131                 /* TODO: RCU: protecting the q list (b->next) (need read lock) */
1132                 spin_lock_irqsave(&q->lock);
1133                 ret = __blist_clone_to(q->bfirst, newb, len, offset);
1134                 spin_unlock_irqsave(&q->lock);
1135                 if (ret)
1136                         block_add_extd(newb, ret, KMALLOC_WAIT);
1137         } while (ret);
1138         return newb;
1139 }
1140
1141 /*
1142  *  copy from offset in the queue
1143  */
1144 struct block *qcopy_old(struct queue *q, int len, uint32_t offset)
1145 {
1146         int sofar;
1147         int n;
1148         struct block *b, *nb;
1149         uint8_t *p;
1150
1151         nb = allocb(len);
1152
1153         spin_lock_irqsave(&q->lock);
1154
1155         /* go to offset */
1156         b = q->bfirst;
1157         for (sofar = 0;; sofar += n) {
1158                 if (b == NULL) {
1159                         spin_unlock_irqsave(&q->lock);
1160                         return nb;
1161                 }
1162                 n = BLEN(b);
1163                 if (sofar + n > offset) {
1164                         p = b->rp + offset - sofar;
1165                         n -= offset - sofar;
1166                         break;
1167                 }
1168                 QDEBUG checkb(b, "qcopy");
1169                 b = b->next;
1170         }
1171
1172         /* copy bytes from there */
1173         for (sofar = 0; sofar < len;) {
1174                 if (n > len - sofar)
1175                         n = len - sofar;
1176                 PANIC_EXTRA(b);
1177                 memmove(nb->wp, p, n);
1178                 qcopycnt += n;
1179                 sofar += n;
1180                 nb->wp += n;
1181                 b = b->next;
1182                 if (b == NULL)
1183                         break;
1184                 n = BLEN(b);
1185                 p = b->rp;
1186         }
1187         spin_unlock_irqsave(&q->lock);
1188
1189         return nb;
1190 }
1191
1192 struct block *qcopy(struct queue *q, int len, uint32_t offset)
1193 {
1194 #ifdef CONFIG_BLOCK_EXTRAS
1195         return qclone(q, 0, len, offset);
1196 #else
1197         return qcopy_old(q, len, offset);
1198 #endif
1199 }
1200
1201 static void qinit_common(struct queue *q)
1202 {
1203         spinlock_init_irqsave(&q->lock);
1204         qlock_init(&q->rlock);
1205         qlock_init(&q->wlock);
1206         rendez_init(&q->rr);
1207         rendez_init(&q->wr);
1208 }
1209
1210 /*
1211  *  called by non-interrupt code
1212  */
1213 struct queue *qopen(int limit, int msg, void (*kick) (void *), void *arg)
1214 {
1215         struct queue *q;
1216
1217         q = kzmalloc(sizeof(struct queue), 0);
1218         if (q == 0)
1219                 return 0;
1220         qinit_common(q);
1221
1222         q->limit = q->inilim = limit;
1223         q->kick = kick;
1224         q->arg = arg;
1225         q->state = msg;
1226         q->state |= Qstarve;
1227         q->eof = 0;
1228
1229         return q;
1230 }
1231
1232 /* open a queue to be bypassed */
1233 struct queue *qbypass(void (*bypass) (void *, struct block *), void *arg)
1234 {
1235         struct queue *q;
1236
1237         q = kzmalloc(sizeof(struct queue), 0);
1238         if (q == 0)
1239                 return 0;
1240         qinit_common(q);
1241
1242         q->limit = 0;
1243         q->arg = arg;
1244         q->bypass = bypass;
1245         q->state = 0;
1246
1247         return q;
1248 }
1249
1250 static int notempty(void *a)
1251 {
1252         struct queue *q = a;
1253
1254         return (q->state & Qclosed) || q->bfirst != 0;
1255 }
1256
1257 /* Wait for the queue to be non-empty or closed.  Returns TRUE for a successful
1258  * wait, FALSE on Qclose (without error)
1259  *
1260  * Called with q ilocked.  May error out, back through the caller, with
1261  * the irqsave lock unlocked.  */
1262 static bool qwait(struct queue *q)
1263 {
1264         /* wait for data */
1265         for (;;) {
1266                 if (q->bfirst != NULL)
1267                         break;
1268
1269                 if (q->state & Qclosed) {
1270                         if (++q->eof > 3) {
1271                                 spin_unlock_irqsave(&q->lock);
1272                                 error("multiple reads on a closed queue");
1273                         }
1274                         if (*q->err && strcmp(q->err, Ehungup) != 0) {
1275                                 spin_unlock_irqsave(&q->lock);
1276                                 error(q->err);
1277                         }
1278                         return FALSE;
1279                 }
1280                 /* We set Qstarve regardless of whether we are non-blocking or not.
1281                  * Qstarve tracks the edge detection of the queue being empty. */
1282                 q->state |= Qstarve;
1283                 if (q->state & Qnonblock) {
1284                         spin_unlock_irqsave(&q->lock);
1285                         set_errno(EAGAIN);
1286                         error("queue empty");
1287                 }
1288                 spin_unlock_irqsave(&q->lock);
1289                 /* may throw an error() */
1290                 rendez_sleep(&q->rr, notempty, q);
1291                 spin_lock_irqsave(&q->lock);
1292         }
1293         return TRUE;
1294 }
1295
1296 /*
1297  * add a block list to a queue
1298  */
1299 void qaddlist(struct queue *q, struct block *b)
1300 {
1301         /* TODO: q lock? */
1302         /* queue the block */
1303         if (q->bfirst)
1304                 q->blast->next = b;
1305         else
1306                 q->bfirst = b;
1307         q->len += blockalloclen(b);
1308         q->dlen += blocklen(b);
1309         while (b->next)
1310                 b = b->next;
1311         q->blast = b;
1312 }
1313
1314 /*
1315  *  called with q ilocked
1316  */
1317 struct block *qremove(struct queue *q)
1318 {
1319         struct block *b;
1320
1321         b = q->bfirst;
1322         if (b == NULL)
1323                 return NULL;
1324         q->bfirst = b->next;
1325         b->next = NULL;
1326         q->dlen -= BLEN(b);
1327         q->len -= BALLOC(b);
1328         QDEBUG checkb(b, "qremove");
1329         return b;
1330 }
1331
1332 static size_t read_from_block(struct block *b, uint8_t *to, size_t amt)
1333 {
1334         size_t copy_amt, retval = 0;
1335         struct extra_bdata *ebd;
1336         
1337         copy_amt = MIN(BHLEN(b), amt);
1338         memcpy(to, b->rp, copy_amt);
1339         /* advance the rp, since this block not be completely consumed and future
1340          * reads need to know where to pick up from */
1341         b->rp += copy_amt;
1342         to += copy_amt;
1343         amt -= copy_amt;
1344         retval += copy_amt;
1345         for (int i = 0; (i < b->nr_extra_bufs) && amt; i++) {
1346                 ebd = &b->extra_data[i];
1347                 /* skip empty entires.  if we track this in the struct block, we can
1348                  * just start the for loop early */
1349                 if (!ebd->base || !ebd->len)
1350                         continue;
1351                 copy_amt = MIN(ebd->len, amt);
1352                 memcpy(to, (void*)(ebd->base + ebd->off), copy_amt);
1353                 /* we're actually consuming the entries, just like how we advance rp up
1354                  * above, and might only consume part of one. */
1355                 ebd->len -= copy_amt;
1356                 ebd->off += copy_amt;
1357                 b->extra_len -= copy_amt;
1358                 if (!ebd->len) {
1359                         /* we don't actually have to decref here.  it's also done in
1360                          * freeb().  this is the earliest we can free. */
1361                         kfree((void*)ebd->base);
1362                         ebd->base = ebd->off = 0;
1363                 }
1364                 to += copy_amt;
1365                 amt -= copy_amt;
1366                 retval += copy_amt;
1367         }
1368         return retval;
1369 }
1370
1371 /*
1372  *  copy the contents of a string of blocks into
1373  *  memory.  emptied blocks are freed.  return
1374  *  pointer to first unconsumed block.
1375  */
1376 struct block *bl2mem(uint8_t * p, struct block *b, int n)
1377 {
1378         int i;
1379         struct block *next;
1380
1381         /* could be slicker here, since read_from_block is smart */
1382         for (; b != NULL; b = next) {
1383                 i = BLEN(b);
1384                 if (i > n) {
1385                         /* partial block, consume some */
1386                         read_from_block(b, p, n);
1387                         return b;
1388                 }
1389                 /* full block, consume all and move on */
1390                 i = read_from_block(b, p, i);
1391                 n -= i;
1392                 p += i;
1393                 next = b->next;
1394                 freeb(b);
1395         }
1396         return NULL;
1397 }
1398
1399 /*
1400  *  copy the contents of memory into a string of blocks.
1401  *  return NULL on error.
1402  */
1403 struct block *mem2bl(uint8_t * p, int len)
1404 {
1405         ERRSTACK(1);
1406         int n;
1407         struct block *b, *first, **l;
1408
1409         first = NULL;
1410         l = &first;
1411         if (waserror()) {
1412                 freeblist(first);
1413                 nexterror();
1414         }
1415         do {
1416                 n = len;
1417                 if (n > Maxatomic)
1418                         n = Maxatomic;
1419
1420                 *l = b = allocb(n);
1421                 /* TODO consider extra_data */
1422                 memmove(b->wp, p, n);
1423                 b->wp += n;
1424                 p += n;
1425                 len -= n;
1426                 l = &b->next;
1427         } while (len > 0);
1428         poperror();
1429
1430         return first;
1431 }
1432
1433 /*
1434  *  put a block back to the front of the queue
1435  *  called with q ilocked
1436  */
1437 void qputback(struct queue *q, struct block *b)
1438 {
1439         b->next = q->bfirst;
1440         if (q->bfirst == NULL)
1441                 q->blast = b;
1442         q->bfirst = b;
1443         q->len += BALLOC(b);
1444         q->dlen += BLEN(b);
1445 }
1446
1447 /*
1448  *  flow control, get producer going again
1449  *  called with q ilocked
1450  */
1451 static void qwakeup_iunlock(struct queue *q)
1452 {
1453         int dowakeup = 0;
1454
1455         /* if writer flow controlled, restart */
1456         if ((q->state & Qflow) && q->len < q->limit / 2) {
1457                 q->state &= ~Qflow;
1458                 dowakeup = 1;
1459         }
1460
1461         spin_unlock_irqsave(&q->lock);
1462
1463         /* wakeup flow controlled writers */
1464         if (dowakeup) {
1465                 if (q->kick)
1466                         q->kick(q->arg);
1467                 rendez_wakeup(&q->wr);
1468                 qwake_cb(q, FDTAP_FILT_WRITABLE);
1469         }
1470 }
1471
1472 /*
1473  *  get next block from a queue (up to a limit)
1474  */
1475 struct block *qbread(struct queue *q, int len)
1476 {
1477         ERRSTACK(1);
1478         struct block *b, *nb;
1479         int n;
1480
1481         qlock(&q->rlock);
1482         if (waserror()) {
1483                 qunlock(&q->rlock);
1484                 nexterror();
1485         }
1486
1487         spin_lock_irqsave(&q->lock);
1488         if (!qwait(q)) {
1489                 /* queue closed */
1490                 spin_unlock_irqsave(&q->lock);
1491                 qunlock(&q->rlock);
1492                 poperror();
1493                 return NULL;
1494         }
1495
1496         /* if we get here, there's at least one block in the queue */
1497         b = qremove(q);
1498         n = BLEN(b);
1499
1500         /* split block if it's too big and this is not a message queue */
1501         nb = b;
1502         if (n > len) {
1503                 PANIC_EXTRA(b);
1504                 if ((q->state & Qmsg) == 0) {
1505                         n -= len;
1506                         b = allocb(n);
1507                         memmove(b->wp, nb->rp + len, n);
1508                         b->wp += n;
1509                         qputback(q, b);
1510                 }
1511                 nb->wp = nb->rp + len;
1512         }
1513
1514         /* restart producer */
1515         qwakeup_iunlock(q);
1516
1517         poperror();
1518         qunlock(&q->rlock);
1519         return nb;
1520 }
1521
1522 /*
1523  *  read a queue.  if no data is queued, post a struct block
1524  *  and wait on its Rendez.
1525  */
1526 long qread(struct queue *q, void *vp, int len)
1527 {
1528         ERRSTACK(1);
1529         struct block *b, *first, **l;
1530         int m, n;
1531
1532         qlock(&q->rlock);
1533         if (waserror()) {
1534                 qunlock(&q->rlock);
1535                 nexterror();
1536         }
1537
1538         spin_lock_irqsave(&q->lock);
1539 again:
1540         if (!qwait(q)) {
1541                 /* queue closed */
1542                 spin_unlock_irqsave(&q->lock);
1543                 qunlock(&q->rlock);
1544                 poperror();
1545                 return 0;
1546         }
1547
1548         /* if we get here, there's at least one block in the queue */
1549         // TODO: Consider removing the Qcoalesce flag and force a coalescing
1550         // strategy by default.
1551         if (q->state & Qcoalesce) {
1552                 /* when coalescing, 0 length blocks just go away */
1553                 b = q->bfirst;
1554                 if (BLEN(b) <= 0) {
1555                         freeb(qremove(q));
1556                         goto again;
1557                 }
1558
1559                 /*  grab the first block plus as many
1560                  *  following blocks as will completely
1561                  *  fit in the read.
1562                  */
1563                 n = 0;
1564                 l = &first;
1565                 m = BLEN(b);
1566                 for (;;) {
1567                         *l = qremove(q);
1568                         l = &b->next;
1569                         n += m;
1570
1571                         b = q->bfirst;
1572                         if (b == NULL)
1573                                 break;
1574                         m = BLEN(b);
1575                         if (n + m > len)
1576                                 break;
1577                 }
1578         } else {
1579                 first = qremove(q);
1580                 n = BLEN(first);
1581         }
1582
1583         /* copy to user space outside of the ilock */
1584         spin_unlock_irqsave(&q->lock);
1585         b = bl2mem(vp, first, len);
1586         spin_lock_irqsave(&q->lock);
1587
1588         /* take care of any left over partial block */
1589         if (b != NULL) {
1590                 n -= BLEN(b);
1591                 if (q->state & Qmsg)
1592                         freeb(b);
1593                 else
1594                         qputback(q, b);
1595         }
1596
1597         /* restart producer */
1598         qwakeup_iunlock(q);
1599
1600         poperror();
1601         qunlock(&q->rlock);
1602         return n;
1603 }
1604
1605 static int qnotfull(void *a)
1606 {
1607         struct queue *q = a;
1608
1609         return q->len < q->limit || (q->state & Qclosed);
1610 }
1611
1612 uint32_t dropcnt;
1613
1614 /*
1615  *  add a block to a queue obeying flow control
1616  */
1617 long qbwrite(struct queue *q, struct block *b)
1618 {
1619         ERRSTACK(1);
1620         int n, dowakeup;
1621         volatile bool should_free_b = TRUE;
1622
1623         n = BLEN(b);
1624
1625         if (q->bypass) {
1626                 (*q->bypass) (q->arg, b);
1627                 return n;
1628         }
1629
1630         dowakeup = 0;
1631         qlock(&q->wlock);
1632         if (waserror()) {
1633                 if (b != NULL && should_free_b)
1634                         freeb(b);
1635                 qunlock(&q->wlock);
1636                 nexterror();
1637         }
1638
1639         spin_lock_irqsave(&q->lock);
1640
1641         /* give up if the queue is closed */
1642         if (q->state & Qclosed) {
1643                 spin_unlock_irqsave(&q->lock);
1644                 error(q->err);
1645         }
1646
1647         /* if nonblocking, don't queue over the limit */
1648         if (q->len >= q->limit) {
1649                 /* drop overflow takes priority over regular non-blocking */
1650                 if (q->state & Qdropoverflow) {
1651                         spin_unlock_irqsave(&q->lock);
1652                         freeb(b);
1653                         dropcnt += n;
1654                         qunlock(&q->wlock);
1655                         poperror();
1656                         return n;
1657                 }
1658                 if (q->state & Qnonblock) {
1659                         spin_unlock_irqsave(&q->lock);
1660                         freeb(b);
1661                         set_errno(EAGAIN);
1662                         error("queue full");
1663                 }
1664         }
1665
1666         /* queue the block */
1667         should_free_b = FALSE;
1668         if (q->bfirst)
1669                 q->blast->next = b;
1670         else
1671                 q->bfirst = b;
1672         q->blast = b;
1673         b->next = 0;
1674         q->len += BALLOC(b);
1675         q->dlen += n;
1676         QDEBUG checkb(b, "qbwrite");
1677         b = NULL;
1678
1679         /* make sure other end gets awakened */
1680         if (q->state & Qstarve) {
1681                 q->state &= ~Qstarve;
1682                 dowakeup = 1;
1683         }
1684         spin_unlock_irqsave(&q->lock);
1685
1686         /*  get output going again */
1687         if (q->kick && (dowakeup || (q->state & Qkick)))
1688                 q->kick(q->arg);
1689
1690         /* wakeup anyone consuming at the other end */
1691         if (dowakeup) {
1692                 rendez_wakeup(&q->rr);
1693                 qwake_cb(q, FDTAP_FILT_READABLE);
1694         }
1695
1696         /*
1697          *  flow control, wait for queue to get below the limit
1698          *  before allowing the process to continue and queue
1699          *  more.  We do this here so that postnote can only
1700          *  interrupt us after the data has been queued.  This
1701          *  means that things like 9p flushes and ssl messages
1702          *  will not be disrupted by software interrupts.
1703          *
1704          *  Note - this is moderately dangerous since a process
1705          *  that keeps getting interrupted and rewriting will
1706          *  queue infinite crud.
1707          */
1708         for (;;) {
1709                 if ((q->state & (Qdropoverflow | Qnonblock)) || qnotfull(q))
1710                         break;
1711
1712                 spin_lock_irqsave(&q->lock);
1713                 q->state |= Qflow;
1714                 spin_unlock_irqsave(&q->lock);
1715                 rendez_sleep(&q->wr, qnotfull, q);
1716         }
1717
1718         qunlock(&q->wlock);
1719         poperror();
1720         return n;
1721 }
1722
1723 long qibwrite(struct queue *q, struct block *b)
1724 {
1725         int n, dowakeup;
1726
1727         dowakeup = 0;
1728
1729         n = BLEN(b);
1730
1731         spin_lock_irqsave(&q->lock);
1732
1733         QDEBUG checkb(b, "qibwrite");
1734         if (q->bfirst)
1735                 q->blast->next = b;
1736         else
1737                 q->bfirst = b;
1738         q->blast = b;
1739         q->len += BALLOC(b);
1740         q->dlen += n;
1741
1742         if (q->state & Qstarve) {
1743                 q->state &= ~Qstarve;
1744                 dowakeup = 1;
1745         }
1746
1747         spin_unlock_irqsave(&q->lock);
1748
1749         if (dowakeup) {
1750                 if (q->kick)
1751                         q->kick(q->arg);
1752                 rendez_wakeup(&q->rr);
1753                 qwake_cb(q, FDTAP_FILT_READABLE);
1754         }
1755
1756         return n;
1757 }
1758
1759 /*
1760  *  write to a queue.  only Maxatomic bytes at a time is atomic.
1761  */
1762 int qwrite(struct queue *q, void *vp, int len)
1763 {
1764         int n, sofar;
1765         struct block *b;
1766         uint8_t *p = vp;
1767         void *ext_buf;
1768
1769         QDEBUG if (!islo())
1770                  printd("qwrite hi %p\n", getcallerpc(&q));
1771
1772         sofar = 0;
1773         do {
1774                 n = len - sofar;
1775                 /* This is 64K, the max amount per single block.  Still a good value? */
1776                 if (n > Maxatomic)
1777                         n = Maxatomic;
1778
1779                 /* If n is small, we don't need to bother with the extra_data.  But
1780                  * until the whole stack can handle extd blocks, we'll use them
1781                  * unconditionally. */
1782 #ifdef CONFIG_BLOCK_EXTRAS
1783                 /* allocb builds in 128 bytes of header space to all blocks, but this is
1784                  * only available via padblock (to the left).  we also need some space
1785                  * for pullupblock for some basic headers (like icmp) that get written
1786                  * in directly */
1787                 b = allocb(64);
1788                 ext_buf = kmalloc(n, 0);
1789                 memcpy(ext_buf, p + sofar, n);
1790                 block_add_extd(b, 1, KMALLOC_WAIT); /* returns 0 on success */
1791                 b->extra_data[0].base = (uintptr_t)ext_buf;
1792                 b->extra_data[0].off = 0;
1793                 b->extra_data[0].len = n;
1794                 b->extra_len += n;
1795 #else
1796                 b = allocb(n);
1797                 memmove(b->wp, p + sofar, n);
1798                 b->wp += n;
1799 #endif
1800                         
1801                 qbwrite(q, b);
1802
1803                 sofar += n;
1804         } while (sofar < len && (q->state & Qmsg) == 0);
1805
1806         return len;
1807 }
1808
1809 /*
1810  *  used by print() to write to a queue.  Since we may be splhi or not in
1811  *  a process, don't qlock.
1812  */
1813 int qiwrite(struct queue *q, void *vp, int len)
1814 {
1815         int n, sofar, dowakeup;
1816         struct block *b;
1817         uint8_t *p = vp;
1818
1819         dowakeup = 0;
1820
1821         sofar = 0;
1822         do {
1823                 n = len - sofar;
1824                 if (n > Maxatomic)
1825                         n = Maxatomic;
1826
1827                 b = iallocb(n);
1828                 if (b == NULL)
1829                         break;
1830                 /* TODO consider extra_data */
1831                 memmove(b->wp, p + sofar, n);
1832                 /* this adjusts BLEN to be n, or at least it should */
1833                 b->wp += n;
1834                 assert(n == BLEN(b));
1835                 qibwrite(q, b);
1836
1837                 sofar += n;
1838         } while (sofar < len && (q->state & Qmsg) == 0);
1839
1840         return sofar;
1841 }
1842
1843 /*
1844  *  be extremely careful when calling this,
1845  *  as there is no reference accounting
1846  */
1847 void qfree(struct queue *q)
1848 {
1849         qclose(q);
1850         kfree(q);
1851 }
1852
1853 /*
1854  *  Mark a queue as closed.  No further IO is permitted.
1855  *  All blocks are released.
1856  */
1857 void qclose(struct queue *q)
1858 {
1859         struct block *bfirst;
1860
1861         if (q == NULL)
1862                 return;
1863
1864         /* mark it */
1865         spin_lock_irqsave(&q->lock);
1866         q->state |= Qclosed;
1867         q->state &= ~(Qflow | Qstarve | Qdropoverflow | Qnonblock);
1868         strncpy(q->err, Ehungup, sizeof(q->err));
1869         bfirst = q->bfirst;
1870         q->bfirst = 0;
1871         q->len = 0;
1872         q->dlen = 0;
1873         spin_unlock_irqsave(&q->lock);
1874
1875         /* free queued blocks */
1876         freeblist(bfirst);
1877
1878         /* wake up readers/writers */
1879         rendez_wakeup(&q->rr);
1880         rendez_wakeup(&q->wr);
1881         qwake_cb(q, FDTAP_FILT_HANGUP);
1882 }
1883
1884 /*
1885  *  Mark a queue as closed.  Wakeup any readers.  Don't remove queued
1886  *  blocks.
1887  */
1888 void qhangup(struct queue *q, char *msg)
1889 {
1890         /* mark it */
1891         spin_lock_irqsave(&q->lock);
1892         q->state |= Qclosed;
1893         if (msg == 0 || *msg == 0)
1894                 strncpy(q->err, Ehungup, sizeof(q->err));
1895         else
1896                 strncpy(q->err, msg, ERRMAX - 1);
1897         spin_unlock_irqsave(&q->lock);
1898
1899         /* wake up readers/writers */
1900         rendez_wakeup(&q->rr);
1901         rendez_wakeup(&q->wr);
1902         qwake_cb(q, FDTAP_FILT_HANGUP);
1903 }
1904
1905 /*
1906  *  return non-zero if the q is hungup
1907  */
1908 int qisclosed(struct queue *q)
1909 {
1910         return q->state & Qclosed;
1911 }
1912
1913 /*
1914  *  mark a queue as no longer hung up.  resets the wake_cb.
1915  */
1916 void qreopen(struct queue *q)
1917 {
1918         spin_lock_irqsave(&q->lock);
1919         q->state &= ~Qclosed;
1920         q->state |= Qstarve;
1921         q->eof = 0;
1922         q->limit = q->inilim;
1923         q->wake_cb = 0;
1924         q->wake_data = 0;
1925         spin_unlock_irqsave(&q->lock);
1926 }
1927
1928 /*
1929  *  return bytes queued
1930  */
1931 int qlen(struct queue *q)
1932 {
1933         return q->dlen;
1934 }
1935
1936 /*
1937  * return space remaining before flow control
1938  */
1939 int qwindow(struct queue *q)
1940 {
1941         int l;
1942
1943         l = q->limit - q->len;
1944         if (l < 0)
1945                 l = 0;
1946         return l;
1947 }
1948
1949 /*
1950  *  return true if we can read without blocking
1951  */
1952 int qcanread(struct queue *q)
1953 {
1954         return q->bfirst != 0;
1955 }
1956
1957 /*
1958  *  change queue limit
1959  */
1960 void qsetlimit(struct queue *q, int limit)
1961 {
1962         q->limit = limit;
1963 }
1964
1965 /*
1966  *  set whether writes drop overflowing blocks, or if we sleep
1967  */
1968 void qdropoverflow(struct queue *q, bool onoff)
1969 {
1970         if (onoff)
1971                 q->state |= Qdropoverflow;
1972         else
1973                 q->state &= ~Qdropoverflow;
1974 }
1975
1976 /* set whether or not the queue is nonblocking, in the EAGAIN sense. */
1977 void qnonblock(struct queue *q, bool onoff)
1978 {
1979         if (onoff)
1980                 q->state |= Qnonblock;
1981         else
1982                 q->state &= ~Qnonblock;
1983 }
1984
1985 /*
1986  *  flush the output queue
1987  */
1988 void qflush(struct queue *q)
1989 {
1990         struct block *bfirst;
1991
1992         /* mark it */
1993         spin_lock_irqsave(&q->lock);
1994         bfirst = q->bfirst;
1995         q->bfirst = 0;
1996         q->len = 0;
1997         q->dlen = 0;
1998         spin_unlock_irqsave(&q->lock);
1999
2000         /* free queued blocks */
2001         freeblist(bfirst);
2002
2003         /* wake up writers */
2004         rendez_wakeup(&q->wr);
2005         qwake_cb(q, FDTAP_FILT_WRITABLE);
2006 }
2007
2008 int qfull(struct queue *q)
2009 {
2010         return q->state & Qflow;
2011 }
2012
2013 int qstate(struct queue *q)
2014 {
2015         return q->state;
2016 }
2017
2018 void qdump(struct queue *q)
2019 {
2020         if (q)
2021                 printk("q=%p bfirst=%p blast=%p len=%d dlen=%d limit=%d state=#%x\n",
2022                            q, q->bfirst, q->blast, q->len, q->dlen, q->limit, q->state);
2023 }
2024
2025 /* On certain wakeup events, qio will call func(q, data, filter), where filter
2026  * marks the type of wakeup event (flags from FDTAP).
2027  *
2028  * There's no sync protection.  If you change the CB while the qio is running,
2029  * you might get a CB with the data or func from a previous set_wake_cb.  You
2030  * should set this once per queue and forget it.
2031  *
2032  * You can remove the CB by passing in 0 for the func.  Alternatively, you can
2033  * just make sure that the func(data) pair are valid until the queue is freed or
2034  * reopened. */
2035 void qio_set_wake_cb(struct queue *q, qio_wake_cb_t func, void *data)
2036 {
2037         q->wake_data = data;
2038         wmb();  /* if we see func, we'll also see the data for it */
2039         q->wake_cb = func;
2040 }