qio: Add non-blocking queues
[akaros.git] / kern / src / ns / qio.c
1 // INFERNO
2 #include <vfs.h>
3 #include <kfs.h>
4 #include <slab.h>
5 #include <kmalloc.h>
6 #include <kref.h>
7 #include <string.h>
8 #include <stdio.h>
9 #include <assert.h>
10 #include <error.h>
11 #include <cpio.h>
12 #include <pmap.h>
13 #include <smp.h>
14 #include <ip.h>
15
16 #define PANIC_EXTRA(b)                                                          \
17 {                                                                              \
18         if ((b)->extra_len)                                                        \
19                 panic("%s doesn't handle extra_data", __FUNCTION__);               \
20 }
21
22 static uint32_t padblockcnt;
23 static uint32_t concatblockcnt;
24 static uint32_t pullupblockcnt;
25 static uint32_t copyblockcnt;
26 static uint32_t consumecnt;
27 static uint32_t producecnt;
28 static uint32_t qcopycnt;
29
30 static int debugging;
31
32 #define QDEBUG  if(0)
33
34 /*
35  *  IO queues
36  */
37
38 struct queue {
39         spinlock_t lock;;
40
41         struct block *bfirst;           /* buffer */
42         struct block *blast;
43
44         int len;                                        /* bytes allocated to queue */
45         int dlen;                                       /* data bytes in queue */
46         int limit;                                      /* max bytes in queue */
47         int inilim;                             /* initial limit */
48         int state;
49         int eof;                                        /* number of eofs read by user */
50
51         void (*kick) (void *);          /* restart output */
52         void (*bypass) (void *, struct block *);        /* bypass queue altogether */
53         void *arg;                                      /* argument to kick */
54
55         qlock_t rlock;                          /* mutex for reading processes */
56         struct rendez rr;                       /* process waiting to read */
57         qlock_t wlock;                          /* mutex for writing processes */
58         struct rendez wr;                       /* process waiting to write */
59
60         char err[ERRMAX];
61 };
62
63 enum {
64         Maxatomic = 64 * 1024,
65 };
66
67 unsigned int qiomaxatomic = Maxatomic;
68
69 void ixsummary(void)
70 {
71         debugging ^= 1;
72         iallocsummary();
73         printd("pad %lu, concat %lu, pullup %lu, copy %lu\n",
74                    padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
75         printd("consume %lu, produce %lu, qcopy %lu\n",
76                    consumecnt, producecnt, qcopycnt);
77 }
78
79 /*
80  *  free a list of blocks
81  */
82 void freeblist(struct block *b)
83 {
84         struct block *next;
85
86         for (; b != 0; b = next) {
87                 next = b->next;
88                 b->next = 0;
89                 freeb(b);
90         }
91 }
92
93 /*
94  *  pad a block to the front (or the back if size is negative)
95  */
96 struct block *padblock(struct block *bp, int size)
97 {
98         int n;
99         struct block *nbp;
100         uint8_t bcksum = bp->flag & BCKSUM_FLAGS;
101         uint16_t checksum_start = bp->checksum_start;
102         uint16_t checksum_offset = bp->checksum_offset;
103         uint16_t mss = bp->mss;
104
105         QDEBUG checkb(bp, "padblock 1");
106         if (size >= 0) {
107                 if (bp->rp - bp->base >= size) {
108                         bp->checksum_start += size;
109                         bp->rp -= size;
110                         return bp;
111                 }
112
113                 PANIC_EXTRA(bp);
114                 if (bp->next)
115                         panic("padblock %p", getcallerpc(&bp));
116                 n = BLEN(bp);
117                 padblockcnt++;
118                 nbp = allocb(size + n);
119                 nbp->rp += size;
120                 nbp->wp = nbp->rp;
121                 memmove(nbp->wp, bp->rp, n);
122                 nbp->wp += n;
123                 freeb(bp);
124                 nbp->rp -= size;
125         } else {
126                 size = -size;
127
128                 PANIC_EXTRA(bp);
129
130                 if (bp->next)
131                         panic("padblock %p", getcallerpc(&bp));
132
133                 if (bp->lim - bp->wp >= size)
134                         return bp;
135
136                 n = BLEN(bp);
137                 padblockcnt++;
138                 nbp = allocb(size + n);
139                 memmove(nbp->wp, bp->rp, n);
140                 nbp->wp += n;
141                 freeb(bp);
142         }
143         if (bcksum) {
144                 nbp->flag |= bcksum;
145                 nbp->checksum_start = checksum_start;
146                 nbp->checksum_offset = checksum_offset;
147                 nbp->mss = mss;
148         }
149         QDEBUG checkb(nbp, "padblock 1");
150         return nbp;
151 }
152
153 /*
154  *  return count of bytes in a string of blocks
155  */
156 int blocklen(struct block *bp)
157 {
158         int len;
159
160         len = 0;
161         while (bp) {
162                 len += BLEN(bp);
163                 bp = bp->next;
164         }
165         return len;
166 }
167
168 /*
169  * return count of space in blocks
170  */
171 int blockalloclen(struct block *bp)
172 {
173         int len;
174
175         len = 0;
176         while (bp) {
177                 len += BALLOC(bp);
178                 bp = bp->next;
179         }
180         return len;
181 }
182
183 /*
184  *  copy the  string of blocks into
185  *  a single block and free the string
186  */
187 struct block *concatblock(struct block *bp)
188 {
189         int len;
190         struct block *nb, *f;
191
192         if (bp->next == 0)
193                 return bp;
194
195         /* probably use parts of qclone */
196         PANIC_EXTRA(bp);
197         nb = allocb(blocklen(bp));
198         for (f = bp; f; f = f->next) {
199                 len = BLEN(f);
200                 memmove(nb->wp, f->rp, len);
201                 nb->wp += len;
202         }
203         concatblockcnt += BLEN(nb);
204         freeblist(bp);
205         QDEBUG checkb(nb, "concatblock 1");
206         return nb;
207 }
208
209 /* Returns a block with the remaining contents of b all in the main body of the
210  * returned block.  Replace old references to b with the returned value (which
211  * may still be 'b', if no change was needed. */
212 struct block *linearizeblock(struct block *b)
213 {
214         struct block *newb;
215         size_t len;
216         struct extra_bdata *ebd;
217
218         if (!b->extra_len)
219                 return b;
220
221         newb = allocb(BLEN(b));
222         len = BHLEN(b);
223         memcpy(newb->wp, b->rp, len);
224         newb->wp += len;
225         len = b->extra_len;
226         for (int i = 0; (i < b->nr_extra_bufs) && len; i++) {
227                 ebd = &b->extra_data[i];
228                 if (!ebd->base || !ebd->len)
229                         continue;
230                 memcpy(newb->wp, (void*)(ebd->base + ebd->off), ebd->len);
231                 newb->wp += ebd->len;
232                 len -= ebd->len;
233         }
234         /* TODO: any other flags that need copied over? */
235         if (b->flag & BCKSUM_FLAGS) {
236                 newb->flag |= (b->flag & BCKSUM_FLAGS);
237                 newb->checksum_start = b->checksum_start;
238                 newb->checksum_offset = b->checksum_offset;
239                 newb->mss = b->mss;
240         }
241         freeb(b);
242         return newb;
243 }
244
245 /*
246  *  make sure the first block has at least n bytes in its main body
247  */
248 struct block *pullupblock(struct block *bp, int n)
249 {
250         int i, len, seglen;
251         struct block *nbp;
252         struct extra_bdata *ebd;
253
254         /*
255          *  this should almost always be true, it's
256          *  just to avoid every caller checking.
257          */
258         if (BHLEN(bp) >= n)
259                 return bp;
260
261          /* a start at explicit main-body / header management */
262         if (bp->extra_len) {
263                 if (n > bp->lim - bp->rp) {
264                         /* would need to realloc a new block and copy everything over. */
265                         panic("can't pullup %d bytes, no place to put it: bp->lim %p, bp->rp %p, bp->lim-bp->rp %d\n",
266                                         n, bp->lim, bp->rp, bp->lim-bp->rp);
267                 }
268                 len = n - BHLEN(bp);
269                 if (len > bp->extra_len)
270                         panic("pullup more than extra (%d, %d, %d)\n",
271                               n, BHLEN(bp), bp->extra_len);
272                 checkb(bp, "before pullup");
273                 for (int i = 0; (i < bp->nr_extra_bufs) && len; i++) {
274                         ebd = &bp->extra_data[i];
275                         if (!ebd->base || !ebd->len)
276                                 continue;
277                         seglen = MIN(ebd->len, len);
278                         memcpy(bp->wp, (void*)(ebd->base + ebd->off), seglen);
279                         bp->wp += seglen;
280                         len -= seglen;
281                         ebd->len -= seglen;
282                         ebd->off += seglen;
283                         bp->extra_len -= seglen;
284                         if (ebd->len == 0) {
285                                 kfree((void *)ebd->base);
286                                 ebd->off = 0;
287                                 ebd->base = 0;
288                         }
289                 }
290                 /* maybe just call pullupblock recursively here */
291                 if (len)
292                         panic("pullup %d bytes overdrawn\n", len);
293                 checkb(bp, "after pullup");
294                 return bp;
295         }
296
297         /*
298          *  if not enough room in the first block,
299          *  add another to the front of the list.
300          */
301         if (bp->lim - bp->rp < n) {
302                 nbp = allocb(n);
303                 nbp->next = bp;
304                 bp = nbp;
305         }
306
307         /*
308          *  copy bytes from the trailing blocks into the first
309          */
310         n -= BLEN(bp);
311         while ((nbp = bp->next)) {
312                 i = BLEN(nbp);
313                 if (i > n) {
314                         memmove(bp->wp, nbp->rp, n);
315                         pullupblockcnt++;
316                         bp->wp += n;
317                         nbp->rp += n;
318                         QDEBUG checkb(bp, "pullupblock 1");
319                         return bp;
320                 } else {
321                         memmove(bp->wp, nbp->rp, i);
322                         pullupblockcnt++;
323                         bp->wp += i;
324                         bp->next = nbp->next;
325                         nbp->next = 0;
326                         freeb(nbp);
327                         n -= i;
328                         if (n == 0) {
329                                 QDEBUG checkb(bp, "pullupblock 2");
330                                 return bp;
331                         }
332                 }
333         }
334         freeb(bp);
335         return 0;
336 }
337
338 /*
339  *  make sure the first block has at least n bytes in its main body
340  */
341 struct block *pullupqueue(struct queue *q, int n)
342 {
343         struct block *b;
344
345         /* TODO: lock to protect the queue links? */
346         if ((BHLEN(q->bfirst) >= n))
347                 return q->bfirst;
348         q->bfirst = pullupblock(q->bfirst, n);
349         for (b = q->bfirst; b != NULL && b->next != NULL; b = b->next) ;
350         q->blast = b;
351         return q->bfirst;
352 }
353
354 /* throw away count bytes from the front of
355  * block's extradata.  Returns count of bytes
356  * thrown away
357  */
358
359 static int pullext(struct block *bp, int count)
360 {
361         struct extra_bdata *ed;
362         int i, rem, bytes = 0;
363
364         for (i = 0; bp->extra_len && count && i < bp->nr_extra_bufs; i++) {
365                 ed = &bp->extra_data[i];
366                 rem = MIN(count, ed->len);
367                 bp->extra_len -= rem;
368                 count -= rem;
369                 bytes += rem;
370                 ed->off += rem;
371                 ed->len -= rem;
372                 if (ed->len == 0) {
373                         kfree((void *)ed->base);
374                         ed->base = 0;
375                         ed->off = 0;
376                 }
377         }
378         return bytes;
379 }
380
381 /* throw away count bytes from the end of a
382  * block's extradata.  Returns count of bytes
383  * thrown away
384  */
385
386 static int dropext(struct block *bp, int count)
387 {
388         struct extra_bdata *ed;
389         int i, rem, bytes = 0;
390
391         for (i = bp->nr_extra_bufs - 1; bp->extra_len && count && i >= 0; i--) {
392                 ed = &bp->extra_data[i];
393                 rem = MIN(count, ed->len);
394                 bp->extra_len -= rem;
395                 count -= rem;
396                 bytes += rem;
397                 ed->len -= rem;
398                 if (ed->len == 0) {
399                         kfree((void *)ed->base);
400                         ed->base = 0;
401                         ed->off = 0;
402                 }
403         }
404         return bytes;
405 }
406
407 /*
408  *  throw away up to count bytes from a
409  *  list of blocks.  Return count of bytes
410  *  thrown away.
411  */
412 static int _pullblock(struct block **bph, int count, int free)
413 {
414         struct block *bp;
415         int n, bytes;
416
417         bytes = 0;
418         if (bph == NULL)
419                 return 0;
420
421         while (*bph != NULL && count != 0) {
422                 bp = *bph;
423
424                 n = MIN(BHLEN(bp), count);
425                 bytes += n;
426                 count -= n;
427                 bp->rp += n;
428                 n = pullext(bp, count);
429                 bytes += n;
430                 count -= n;
431                 QDEBUG checkb(bp, "pullblock ");
432                 if (BLEN(bp) == 0 && (free || count)) {
433                         *bph = bp->next;
434                         bp->next = NULL;
435                         freeb(bp);
436                 }
437         }
438         return bytes;
439 }
440
441 int pullblock(struct block **bph, int count)
442 {
443         return _pullblock(bph, count, 1);
444 }
445
446 /*
447  *  trim to len bytes starting at offset
448  */
449 struct block *trimblock(struct block *bp, int offset, int len)
450 {
451         uint32_t l, trim;
452         int olen = len;
453
454         QDEBUG checkb(bp, "trimblock 1");
455         if (blocklen(bp) < offset + len) {
456                 freeblist(bp);
457                 return NULL;
458         }
459
460         l =_pullblock(&bp, offset, 0);
461         if (bp == NULL)
462                 return NULL;
463         if (l != offset) {
464                 freeblist(bp);
465                 return NULL;
466         }
467
468         while ((l = BLEN(bp)) < len) {
469                 len -= l;
470                 bp = bp->next;
471         }
472
473         trim = BLEN(bp) - len;
474         trim -= dropext(bp, trim);
475         bp->wp -= trim;
476
477         if (bp->next) {
478                 freeblist(bp->next);
479                 bp->next = NULL;
480         }
481         return bp;
482 }
483
484 /*
485  *  copy 'count' bytes into a new block
486  */
487 struct block *copyblock(struct block *bp, int count)
488 {
489         int l;
490         struct block *nbp;
491
492         QDEBUG checkb(bp, "copyblock 0");
493         nbp = allocb(count);
494         if (bp->flag & BCKSUM_FLAGS) {
495                 nbp->flag |= (bp->flag & BCKSUM_FLAGS);
496                 nbp->checksum_start = bp->checksum_start;
497                 nbp->checksum_offset = bp->checksum_offset;
498                 nbp->mss = bp->mss;
499         }
500         PANIC_EXTRA(bp);
501         for (; count > 0 && bp != 0; bp = bp->next) {
502                 l = BLEN(bp);
503                 if (l > count)
504                         l = count;
505                 memmove(nbp->wp, bp->rp, l);
506                 nbp->wp += l;
507                 count -= l;
508         }
509         if (count > 0) {
510                 memset(nbp->wp, 0, count);
511                 nbp->wp += count;
512         }
513         copyblockcnt++;
514         QDEBUG checkb(nbp, "copyblock 1");
515
516         return nbp;
517 }
518
519 struct block *adjustblock(struct block *bp, int len)
520 {
521         int n;
522         struct block *nbp;
523
524         if (len < 0) {
525                 freeb(bp);
526                 return NULL;
527         }
528
529         PANIC_EXTRA(bp);
530         if (bp->rp + len > bp->lim) {
531                 nbp = copyblock(bp, len);
532                 freeblist(bp);
533                 QDEBUG checkb(nbp, "adjustblock 1");
534
535                 return nbp;
536         }
537
538         n = BLEN(bp);
539         if (len > n)
540                 memset(bp->wp, 0, len - n);
541         bp->wp = bp->rp + len;
542         QDEBUG checkb(bp, "adjustblock 2");
543
544         return bp;
545 }
546
547
548 /*
549  *  get next block from a queue, return null if nothing there
550  */
551 struct block *qget(struct queue *q)
552 {
553         int dowakeup;
554         struct block *b;
555
556         /* sync with qwrite */
557         spin_lock_irqsave(&q->lock);
558
559         b = q->bfirst;
560         if (b == NULL) {
561                 q->state |= Qstarve;
562                 spin_unlock_irqsave(&q->lock);
563                 return NULL;
564         }
565         q->bfirst = b->next;
566         b->next = 0;
567         q->len -= BALLOC(b);
568         q->dlen -= BLEN(b);
569         QDEBUG checkb(b, "qget");
570
571         /* if writer flow controlled, restart */
572         if ((q->state & Qflow) && q->len < q->limit / 2) {
573                 q->state &= ~Qflow;
574                 dowakeup = 1;
575         } else
576                 dowakeup = 0;
577
578         spin_unlock_irqsave(&q->lock);
579
580         if (dowakeup)
581                 rendez_wakeup(&q->wr);
582
583         return b;
584 }
585
586 /*
587  *  throw away the next 'len' bytes in the queue
588  * returning the number actually discarded
589  */
590 int qdiscard(struct queue *q, int len)
591 {
592         struct block *b;
593         int dowakeup, n, sofar, body_amt, extra_amt;
594         struct extra_bdata *ebd;
595
596         spin_lock_irqsave(&q->lock);
597         for (sofar = 0; sofar < len; sofar += n) {
598                 b = q->bfirst;
599                 if (b == NULL)
600                         break;
601                 QDEBUG checkb(b, "qdiscard");
602                 n = BLEN(b);
603                 if (n <= len - sofar) {
604                         q->bfirst = b->next;
605                         b->next = 0;
606                         q->len -= BALLOC(b);
607                         q->dlen -= BLEN(b);
608                         freeb(b);
609                 } else {
610                         n = len - sofar;
611                         q->dlen -= n;
612                         /* partial block removal */
613                         body_amt = MIN(BHLEN(b), n);
614                         b->rp += body_amt;
615                         extra_amt = n - body_amt;
616                         /* reduce q->len by the amount we remove from the extras.  The
617                          * header will always be accounted for above, during block removal.
618                          * */
619                         q->len -= extra_amt;
620                         for (int i = 0; (i < b->nr_extra_bufs) && extra_amt; i++) {
621                                 ebd = &b->extra_data[i];
622                                 if (!ebd->base || !ebd->len)
623                                         continue;
624                                 if (extra_amt >= ebd->len) {
625                                         /* remove the entire entry, note the kfree release */
626                                         b->extra_len -= ebd->len;
627                                         extra_amt -= ebd->len;
628                                         kfree((void*)ebd->base);
629                                         ebd->base = ebd->off = ebd->len = 0;
630                                         continue;
631                                 }
632                                 ebd->off += extra_amt;
633                                 ebd->len -= extra_amt;
634                                 b->extra_len -= extra_amt;
635                                 extra_amt = 0;
636                         }
637                 }
638         }
639
640         /*
641          *  if writer flow controlled, restart
642          *
643          *  This used to be
644          *  q->len < q->limit/2
645          *  but it slows down tcp too much for certain write sizes.
646          *  I really don't understand it completely.  It may be
647          *  due to the queue draining so fast that the transmission
648          *  stalls waiting for the app to produce more data.  - presotto
649          */
650         if ((q->state & Qflow) && q->len < q->limit) {
651                 q->state &= ~Qflow;
652                 dowakeup = 1;
653         } else
654                 dowakeup = 0;
655
656         spin_unlock_irqsave(&q->lock);
657
658         if (dowakeup)
659                 rendez_wakeup(&q->wr);
660
661         return sofar;
662 }
663
664 /*
665  *  Interrupt level copy out of a queue, return # bytes copied.
666  */
667 int qconsume(struct queue *q, void *vp, int len)
668 {
669         struct block *b;
670         int n, dowakeup;
671         uint8_t *p = vp;
672         struct block *tofree = NULL;
673
674         /* sync with qwrite */
675         spin_lock_irqsave(&q->lock);
676
677         for (;;) {
678                 b = q->bfirst;
679                 if (b == 0) {
680                         q->state |= Qstarve;
681                         spin_unlock_irqsave(&q->lock);
682                         return -1;
683                 }
684                 QDEBUG checkb(b, "qconsume 1");
685
686                 n = BLEN(b);
687                 if (n > 0)
688                         break;
689                 q->bfirst = b->next;
690                 q->len -= BALLOC(b);
691
692                 /* remember to free this */
693                 b->next = tofree;
694                 tofree = b;
695         };
696
697         PANIC_EXTRA(b);
698         if (n < len)
699                 len = n;
700         memmove(p, b->rp, len);
701         consumecnt += n;
702         b->rp += len;
703         q->dlen -= len;
704
705         /* discard the block if we're done with it */
706         if ((q->state & Qmsg) || len == n) {
707                 q->bfirst = b->next;
708                 b->next = 0;
709                 q->len -= BALLOC(b);
710                 q->dlen -= BLEN(b);
711
712                 /* remember to free this */
713                 b->next = tofree;
714                 tofree = b;
715         }
716
717         /* if writer flow controlled, restart */
718         if ((q->state & Qflow) && q->len < q->limit / 2) {
719                 q->state &= ~Qflow;
720                 dowakeup = 1;
721         } else
722                 dowakeup = 0;
723
724         spin_unlock_irqsave(&q->lock);
725
726         if (dowakeup)
727                 rendez_wakeup(&q->wr);
728
729         if (tofree != NULL)
730                 freeblist(tofree);
731
732         return len;
733 }
734
735 int qpass(struct queue *q, struct block *b)
736 {
737         int dlen, len, dowakeup;
738
739         /* sync with qread */
740         dowakeup = 0;
741         spin_lock_irqsave(&q->lock);
742         if (q->len >= q->limit) {
743                 freeblist(b);
744                 spin_unlock_irqsave(&q->lock);
745                 return -1;
746         }
747         if (q->state & Qclosed) {
748                 len = blocklen(b);
749                 freeblist(b);
750                 spin_unlock_irqsave(&q->lock);
751                 return len;
752         }
753
754         /* add buffer to queue */
755         if (q->bfirst)
756                 q->blast->next = b;
757         else
758                 q->bfirst = b;
759         len = BALLOC(b);
760         dlen = BLEN(b);
761         QDEBUG checkb(b, "qpass");
762         while (b->next) {
763                 b = b->next;
764                 QDEBUG checkb(b, "qpass");
765                 len += BALLOC(b);
766                 dlen += BLEN(b);
767         }
768         q->blast = b;
769         q->len += len;
770         q->dlen += dlen;
771
772         if (q->len >= q->limit / 2)
773                 q->state |= Qflow;
774
775         if (q->state & Qstarve) {
776                 q->state &= ~Qstarve;
777                 dowakeup = 1;
778         }
779         spin_unlock_irqsave(&q->lock);
780
781         if (dowakeup)
782                 rendez_wakeup(&q->rr);
783
784         return len;
785 }
786
787 int qpassnolim(struct queue *q, struct block *b)
788 {
789         int dlen, len, dowakeup;
790
791         /* sync with qread */
792         dowakeup = 0;
793         spin_lock_irqsave(&q->lock);
794
795         if (q->state & Qclosed) {
796                 freeblist(b);
797                 spin_unlock_irqsave(&q->lock);
798                 return BALLOC(b);
799         }
800
801         /* add buffer to queue */
802         if (q->bfirst)
803                 q->blast->next = b;
804         else
805                 q->bfirst = b;
806         len = BALLOC(b);
807         dlen = BLEN(b);
808         QDEBUG checkb(b, "qpass");
809         while (b->next) {
810                 b = b->next;
811                 QDEBUG checkb(b, "qpass");
812                 len += BALLOC(b);
813                 dlen += BLEN(b);
814         }
815         q->blast = b;
816         q->len += len;
817         q->dlen += dlen;
818
819         if (q->len >= q->limit / 2)
820                 q->state |= Qflow;
821
822         if (q->state & Qstarve) {
823                 q->state &= ~Qstarve;
824                 dowakeup = 1;
825         }
826         spin_unlock_irqsave(&q->lock);
827
828         if (dowakeup)
829                 rendez_wakeup(&q->rr);
830
831         return len;
832 }
833
834 /*
835  *  if the allocated space is way out of line with the used
836  *  space, reallocate to a smaller block
837  */
838 struct block *packblock(struct block *bp)
839 {
840         struct block **l, *nbp;
841         int n;
842
843         if (bp->extra_len)
844                 return bp;
845         for (l = &bp; *l; l = &(*l)->next) {
846                 nbp = *l;
847                 n = BLEN(nbp);
848                 if ((n << 2) < BALLOC(nbp)) {
849                         *l = allocb(n);
850                         memmove((*l)->wp, nbp->rp, n);
851                         (*l)->wp += n;
852                         (*l)->next = nbp->next;
853                         freeb(nbp);
854                 }
855         }
856
857         return bp;
858 }
859
860 int qproduce(struct queue *q, void *vp, int len)
861 {
862         struct block *b;
863         int dowakeup;
864         uint8_t *p = vp;
865
866         /* sync with qread */
867         dowakeup = 0;
868         spin_lock_irqsave(&q->lock);
869
870         /* no waiting receivers, room in buffer? */
871         if (q->len >= q->limit) {
872                 q->state |= Qflow;
873                 spin_unlock_irqsave(&q->lock);
874                 return -1;
875         }
876
877         /* save in buffer */
878         /* use Qcoalesce here to save storage */
879         // TODO: Consider removing the Qcoalesce flag and force a coalescing
880         // strategy by default.
881         b = q->blast;
882         if ((q->state & Qcoalesce) == 0 || q->bfirst == NULL
883                 || b->lim - b->wp < len) {
884                 /* need a new block */
885                 b = iallocb(len);
886                 if (b == 0) {
887                         spin_unlock_irqsave(&q->lock);
888                         return 0;
889                 }
890                 if (q->bfirst)
891                         q->blast->next = b;
892                 else
893                         q->bfirst = b;
894                 q->blast = b;
895                 /* b->next = 0; done by iallocb() */
896                 q->len += BALLOC(b);
897         }
898         PANIC_EXTRA(b);
899         memmove(b->wp, p, len);
900         producecnt += len;
901         b->wp += len;
902         q->dlen += len;
903         QDEBUG checkb(b, "qproduce");
904
905         if (q->state & Qstarve) {
906                 q->state &= ~Qstarve;
907                 dowakeup = 1;
908         }
909
910         if (q->len >= q->limit)
911                 q->state |= Qflow;
912         spin_unlock_irqsave(&q->lock);
913
914         if (dowakeup)
915                 rendez_wakeup(&q->rr);
916
917         return len;
918 }
919
920 /* Add an extra_data entry to newb at newb_idx pointing to b's body, starting at
921  * body_rp, for up to len.  Returns the len consumed. 
922  *
923  * The base is 'b', so that we can kfree it later.  This currently ties us to
924  * using kfree for the release method for all extra_data.
925  *
926  * It is possible to have a body size that is 0, if there is no offset, and
927  * b->wp == b->rp.  This will have an extra data entry of 0 length. */
928 static size_t point_to_body(struct block *b, uint8_t *body_rp,
929                             struct block *newb, unsigned int newb_idx,
930                             size_t len)
931 {
932         struct extra_bdata *ebd = &newb->extra_data[newb_idx];
933
934         assert(newb_idx < newb->nr_extra_bufs);
935
936         kmalloc_incref(b);
937         ebd->base = (uintptr_t)b;
938         ebd->off = (uint32_t)(body_rp - (uint8_t*)b);
939         ebd->len = MIN(b->wp - body_rp, len);   /* think of body_rp as b->rp */
940         assert((int)ebd->len >= 0);
941         newb->extra_len += ebd->len;
942         return ebd->len;
943 }
944
945 /* Add an extra_data entry to newb at newb_idx pointing to b's b_idx'th
946  * extra_data buf, at b_off within that buffer, for up to len.  Returns the len
947  * consumed.
948  *
949  * We can have blocks with 0 length, but they are still refcnt'd.  See above. */
950 static size_t point_to_buf(struct block *b, unsigned int b_idx, uint32_t b_off,
951                            struct block *newb, unsigned int newb_idx,
952                            size_t len)
953 {
954         struct extra_bdata *n_ebd = &newb->extra_data[newb_idx];
955         struct extra_bdata *b_ebd = &b->extra_data[b_idx];
956
957         assert(b_idx < b->nr_extra_bufs);
958         assert(newb_idx < newb->nr_extra_bufs);
959
960         kmalloc_incref((void*)b_ebd->base);
961         n_ebd->base = b_ebd->base;
962         n_ebd->off = b_ebd->off + b_off;
963         n_ebd->len = MIN(b_ebd->len - b_off, len);
964         newb->extra_len += n_ebd->len;
965         return n_ebd->len;
966 }
967
968 /* given a string of blocks, fills the new block's extra_data  with the contents
969  * of the blist [offset, len + offset)
970  *
971  * returns 0 on success.  the only failure is if the extra_data array was too
972  * small, so this returns a positive integer saying how big the extra_data needs
973  * to be.
974  *
975  * callers are responsible for protecting the list structure. */
976 static int __blist_clone_to(struct block *blist, struct block *newb, int len,
977                             uint32_t offset)
978 {
979         struct block *b, *first;
980         unsigned int nr_bufs = 0;
981         unsigned int b_idx, newb_idx = 0;
982         uint8_t *first_main_body = 0;
983
984         /* find the first block; keep offset relative to the latest b in the list */
985         for (b = blist; b; b = b->next) {
986                 if (BLEN(b) > offset)
987                         break;
988                 offset -= BLEN(b);
989         }
990         /* qcopy semantics: if you asked for an offset outside the block list, you
991          * get an empty block back */
992         if (!b)
993                 return 0;
994         first = b;
995         /* upper bound for how many buffers we'll need in newb */
996         for (/* b is set*/; b; b = b->next) {
997                 nr_bufs += 1 + b->nr_extra_bufs;        /* 1 for the main body */
998         }
999         /* we might be holding a spinlock here, so we won't wait for kmalloc */
1000         block_add_extd(newb, nr_bufs, 0);
1001         if (newb->nr_extra_bufs < nr_bufs) {
1002                 /* caller will need to alloc these, then re-call us */
1003                 return nr_bufs;
1004         }
1005         for (b = first; b && len; b = b->next) {
1006                 b_idx = 0;
1007                 if (offset) {
1008                         if (offset < BHLEN(b)) {
1009                                 /* off is in the main body */
1010                                 len -= point_to_body(b, b->rp + offset, newb, newb_idx, len);
1011                                 newb_idx++;
1012                         } else {
1013                                 /* off is in one of the buffers (or just past the last one).
1014                                  * we're not going to point to b's main body at all. */
1015                                 offset -= BHLEN(b);
1016                                 assert(b->extra_data);
1017                                 /* assuming these extrabufs are packed, or at least that len
1018                                  * isn't gibberish */
1019                                 while (b->extra_data[b_idx].len <= offset) {
1020                                         offset -= b->extra_data[b_idx].len;
1021                                         b_idx++;
1022                                 }
1023                                 /* now offset is set to our offset in the b_idx'th buf */
1024                                 len -= point_to_buf(b, b_idx, offset, newb, newb_idx, len);
1025                                 newb_idx++;
1026                                 b_idx++;
1027                         }
1028                         offset = 0;
1029                 } else {
1030                         len -= point_to_body(b, b->rp, newb, newb_idx, len);
1031                         newb_idx++;
1032                 }
1033                 /* knock out all remaining bufs.  we only did one point_to_ op by now,
1034                  * and any point_to_ could be our last if it consumed all of len. */
1035                 for (int i = b_idx; (i < b->nr_extra_bufs) && len; i++) {
1036                         len -= point_to_buf(b, i, 0, newb, newb_idx, len);
1037                         newb_idx++;
1038                 }
1039         }
1040         return 0;
1041 }
1042
1043 struct block *blist_clone(struct block *blist, int header_len, int len,
1044                           uint32_t offset)
1045 {
1046         int ret;
1047         struct block *newb = allocb(header_len);
1048         do {
1049                 ret = __blist_clone_to(blist, newb, len, offset);
1050                 if (ret)
1051                         block_add_extd(newb, ret, KMALLOC_WAIT);
1052         } while (ret);
1053         return newb;
1054 }
1055
1056 /* given a queue, makes a single block with header_len reserved space in the
1057  * block main body, and the contents of [offset, len + offset) pointed to in the
1058  * new blocks ext_data. */
1059 struct block *qclone(struct queue *q, int header_len, int len, uint32_t offset)
1060 {
1061         int ret;
1062         struct block *newb = allocb(header_len);
1063         /* the while loop should rarely be used: it would require someone
1064          * concurrently adding to the queue. */
1065         do {
1066                 /* TODO: RCU: protecting the q list (b->next) (need read lock) */
1067                 spin_lock_irqsave(&q->lock);
1068                 ret = __blist_clone_to(q->bfirst, newb, len, offset);
1069                 spin_unlock_irqsave(&q->lock);
1070                 if (ret)
1071                         block_add_extd(newb, ret, KMALLOC_WAIT);
1072         } while (ret);
1073         return newb;
1074 }
1075
1076 /*
1077  *  copy from offset in the queue
1078  */
1079 struct block *qcopy_old(struct queue *q, int len, uint32_t offset)
1080 {
1081         int sofar;
1082         int n;
1083         struct block *b, *nb;
1084         uint8_t *p;
1085
1086         nb = allocb(len);
1087
1088         spin_lock_irqsave(&q->lock);
1089
1090         /* go to offset */
1091         b = q->bfirst;
1092         for (sofar = 0;; sofar += n) {
1093                 if (b == NULL) {
1094                         spin_unlock_irqsave(&q->lock);
1095                         return nb;
1096                 }
1097                 n = BLEN(b);
1098                 if (sofar + n > offset) {
1099                         p = b->rp + offset - sofar;
1100                         n -= offset - sofar;
1101                         break;
1102                 }
1103                 QDEBUG checkb(b, "qcopy");
1104                 b = b->next;
1105         }
1106
1107         /* copy bytes from there */
1108         for (sofar = 0; sofar < len;) {
1109                 if (n > len - sofar)
1110                         n = len - sofar;
1111                 PANIC_EXTRA(b);
1112                 memmove(nb->wp, p, n);
1113                 qcopycnt += n;
1114                 sofar += n;
1115                 nb->wp += n;
1116                 b = b->next;
1117                 if (b == NULL)
1118                         break;
1119                 n = BLEN(b);
1120                 p = b->rp;
1121         }
1122         spin_unlock_irqsave(&q->lock);
1123
1124         return nb;
1125 }
1126
1127 struct block *qcopy(struct queue *q, int len, uint32_t offset)
1128 {
1129 #ifdef CONFIG_BLOCK_EXTRAS
1130         return qclone(q, 0, len, offset);
1131 #else
1132         return qcopy_old(q, len, offset);
1133 #endif
1134 }
1135
1136 static void qinit_common(struct queue *q)
1137 {
1138         spinlock_init_irqsave(&q->lock);
1139         qlock_init(&q->rlock);
1140         qlock_init(&q->wlock);
1141         rendez_init(&q->rr);
1142         rendez_init(&q->wr);
1143 }
1144
1145 /*
1146  *  called by non-interrupt code
1147  */
1148 struct queue *qopen(int limit, int msg, void (*kick) (void *), void *arg)
1149 {
1150         struct queue *q;
1151
1152         q = kzmalloc(sizeof(struct queue), 0);
1153         if (q == 0)
1154                 return 0;
1155         qinit_common(q);
1156
1157         q->limit = q->inilim = limit;
1158         q->kick = kick;
1159         q->arg = arg;
1160         q->state = msg;
1161         q->state |= Qstarve;
1162         q->eof = 0;
1163
1164         return q;
1165 }
1166
1167 /* open a queue to be bypassed */
1168 struct queue *qbypass(void (*bypass) (void *, struct block *), void *arg)
1169 {
1170         struct queue *q;
1171
1172         q = kzmalloc(sizeof(struct queue), 0);
1173         if (q == 0)
1174                 return 0;
1175         qinit_common(q);
1176
1177         q->limit = 0;
1178         q->arg = arg;
1179         q->bypass = bypass;
1180         q->state = 0;
1181
1182         return q;
1183 }
1184
1185 static int notempty(void *a)
1186 {
1187         struct queue *q = a;
1188
1189         return (q->state & Qclosed) || q->bfirst != 0;
1190 }
1191
1192 /* Wait for the queue to be non-empty or closed.  Returns TRUE for a successful
1193  * wait, FALSE on Qclose (without error)
1194  *
1195  * Called with q ilocked.  May error out, back through the caller, with
1196  * the irqsave lock unlocked.  */
1197 static bool qwait(struct queue *q)
1198 {
1199         /* wait for data */
1200         for (;;) {
1201                 if (q->bfirst != NULL)
1202                         break;
1203
1204                 if (q->state & Qclosed) {
1205                         if (++q->eof > 3) {
1206                                 spin_unlock_irqsave(&q->lock);
1207                                 error("multiple reads on a closed queue");
1208                         }
1209                         if (*q->err && strcmp(q->err, Ehungup) != 0) {
1210                                 spin_unlock_irqsave(&q->lock);
1211                                 error(q->err);
1212                         }
1213                         return FALSE;
1214                 }
1215                 if (q->state & Qnonblock) {
1216                         spin_unlock_irqsave(&q->lock);
1217                         set_errno(EAGAIN);
1218                         error("queue empty");
1219                 }
1220                 q->state |= Qstarve;    /* flag requesting producer to wake me */
1221                 spin_unlock_irqsave(&q->lock);
1222                 /* may throw an error() */
1223                 rendez_sleep(&q->rr, notempty, q);
1224                 spin_lock_irqsave(&q->lock);
1225         }
1226         return TRUE;
1227 }
1228
1229 /*
1230  * add a block list to a queue
1231  */
1232 void qaddlist(struct queue *q, struct block *b)
1233 {
1234         /* TODO: q lock? */
1235         /* queue the block */
1236         if (q->bfirst)
1237                 q->blast->next = b;
1238         else
1239                 q->bfirst = b;
1240         q->len += blockalloclen(b);
1241         q->dlen += blocklen(b);
1242         while (b->next)
1243                 b = b->next;
1244         q->blast = b;
1245 }
1246
1247 /*
1248  *  called with q ilocked
1249  */
1250 struct block *qremove(struct queue *q)
1251 {
1252         struct block *b;
1253
1254         b = q->bfirst;
1255         if (b == NULL)
1256                 return NULL;
1257         q->bfirst = b->next;
1258         b->next = NULL;
1259         q->dlen -= BLEN(b);
1260         q->len -= BALLOC(b);
1261         QDEBUG checkb(b, "qremove");
1262         return b;
1263 }
1264
1265 static size_t read_from_block(struct block *b, uint8_t *to, size_t amt)
1266 {
1267         size_t copy_amt, retval = 0;
1268         struct extra_bdata *ebd;
1269         
1270         copy_amt = MIN(BHLEN(b), amt);
1271         memcpy(to, b->rp, copy_amt);
1272         /* advance the rp, since this block not be completely consumed and future
1273          * reads need to know where to pick up from */
1274         b->rp += copy_amt;
1275         to += copy_amt;
1276         amt -= copy_amt;
1277         retval += copy_amt;
1278         for (int i = 0; (i < b->nr_extra_bufs) && amt; i++) {
1279                 ebd = &b->extra_data[i];
1280                 /* skip empty entires.  if we track this in the struct block, we can
1281                  * just start the for loop early */
1282                 if (!ebd->base || !ebd->len)
1283                         continue;
1284                 copy_amt = MIN(ebd->len, amt);
1285                 memcpy(to, (void*)(ebd->base + ebd->off), copy_amt);
1286                 /* we're actually consuming the entries, just like how we advance rp up
1287                  * above, and might only consume part of one. */
1288                 ebd->len -= copy_amt;
1289                 ebd->off += copy_amt;
1290                 b->extra_len -= copy_amt;
1291                 if (!ebd->len) {
1292                         /* we don't actually have to decref here.  it's also done in
1293                          * freeb().  this is the earliest we can free. */
1294                         kfree((void*)ebd->base);
1295                         ebd->base = ebd->off = 0;
1296                 }
1297                 to += copy_amt;
1298                 amt -= copy_amt;
1299                 retval += copy_amt;
1300         }
1301         return retval;
1302 }
1303
1304 /*
1305  *  copy the contents of a string of blocks into
1306  *  memory.  emptied blocks are freed.  return
1307  *  pointer to first unconsumed block.
1308  */
1309 struct block *bl2mem(uint8_t * p, struct block *b, int n)
1310 {
1311         int i;
1312         struct block *next;
1313
1314         /* could be slicker here, since read_from_block is smart */
1315         for (; b != NULL; b = next) {
1316                 i = BLEN(b);
1317                 if (i > n) {
1318                         /* partial block, consume some */
1319                         read_from_block(b, p, n);
1320                         return b;
1321                 }
1322                 /* full block, consume all and move on */
1323                 i = read_from_block(b, p, i);
1324                 n -= i;
1325                 p += i;
1326                 next = b->next;
1327                 freeb(b);
1328         }
1329         return NULL;
1330 }
1331
1332 /*
1333  *  copy the contents of memory into a string of blocks.
1334  *  return NULL on error.
1335  */
1336 struct block *mem2bl(uint8_t * p, int len)
1337 {
1338         ERRSTACK(1);
1339         int n;
1340         struct block *b, *first, **l;
1341
1342         first = NULL;
1343         l = &first;
1344         if (waserror()) {
1345                 freeblist(first);
1346                 nexterror();
1347         }
1348         do {
1349                 n = len;
1350                 if (n > Maxatomic)
1351                         n = Maxatomic;
1352
1353                 *l = b = allocb(n);
1354                 /* TODO consider extra_data */
1355                 memmove(b->wp, p, n);
1356                 b->wp += n;
1357                 p += n;
1358                 len -= n;
1359                 l = &b->next;
1360         } while (len > 0);
1361         poperror();
1362
1363         return first;
1364 }
1365
1366 /*
1367  *  put a block back to the front of the queue
1368  *  called with q ilocked
1369  */
1370 void qputback(struct queue *q, struct block *b)
1371 {
1372         b->next = q->bfirst;
1373         if (q->bfirst == NULL)
1374                 q->blast = b;
1375         q->bfirst = b;
1376         q->len += BALLOC(b);
1377         q->dlen += BLEN(b);
1378 }
1379
1380 /*
1381  *  flow control, get producer going again
1382  *  called with q ilocked
1383  */
1384 static void qwakeup_iunlock(struct queue *q)
1385 {
1386         int dowakeup = 0;
1387
1388         /* if writer flow controlled, restart */
1389         if ((q->state & Qflow) && q->len < q->limit / 2) {
1390                 q->state &= ~Qflow;
1391                 dowakeup = 1;
1392         }
1393
1394         spin_unlock_irqsave(&q->lock);
1395
1396         /* wakeup flow controlled writers */
1397         if (dowakeup) {
1398                 if (q->kick)
1399                         q->kick(q->arg);
1400                 rendez_wakeup(&q->wr);
1401         }
1402 }
1403
1404 /*
1405  *  get next block from a queue (up to a limit)
1406  */
1407 struct block *qbread(struct queue *q, int len)
1408 {
1409         ERRSTACK(1);
1410         struct block *b, *nb;
1411         int n;
1412
1413         qlock(&q->rlock);
1414         if (waserror()) {
1415                 qunlock(&q->rlock);
1416                 nexterror();
1417         }
1418
1419         spin_lock_irqsave(&q->lock);
1420         if (!qwait(q)) {
1421                 /* queue closed */
1422                 spin_unlock_irqsave(&q->lock);
1423                 qunlock(&q->rlock);
1424                 poperror();
1425                 return NULL;
1426         }
1427
1428         /* if we get here, there's at least one block in the queue */
1429         b = qremove(q);
1430         n = BLEN(b);
1431
1432         /* split block if it's too big and this is not a message queue */
1433         nb = b;
1434         if (n > len) {
1435                 PANIC_EXTRA(b);
1436                 if ((q->state & Qmsg) == 0) {
1437                         n -= len;
1438                         b = allocb(n);
1439                         memmove(b->wp, nb->rp + len, n);
1440                         b->wp += n;
1441                         qputback(q, b);
1442                 }
1443                 nb->wp = nb->rp + len;
1444         }
1445
1446         /* restart producer */
1447         qwakeup_iunlock(q);
1448
1449         poperror();
1450         qunlock(&q->rlock);
1451         return nb;
1452 }
1453
1454 /*
1455  *  read a queue.  if no data is queued, post a struct block
1456  *  and wait on its Rendez.
1457  */
1458 long qread(struct queue *q, void *vp, int len)
1459 {
1460         ERRSTACK(1);
1461         struct block *b, *first, **l;
1462         int m, n;
1463
1464         qlock(&q->rlock);
1465         if (waserror()) {
1466                 qunlock(&q->rlock);
1467                 nexterror();
1468         }
1469
1470         spin_lock_irqsave(&q->lock);
1471 again:
1472         if (!qwait(q)) {
1473                 /* queue closed */
1474                 spin_unlock_irqsave(&q->lock);
1475                 qunlock(&q->rlock);
1476                 poperror();
1477                 return 0;
1478         }
1479
1480         /* if we get here, there's at least one block in the queue */
1481         // TODO: Consider removing the Qcoalesce flag and force a coalescing
1482         // strategy by default.
1483         if (q->state & Qcoalesce) {
1484                 /* when coalescing, 0 length blocks just go away */
1485                 b = q->bfirst;
1486                 if (BLEN(b) <= 0) {
1487                         freeb(qremove(q));
1488                         goto again;
1489                 }
1490
1491                 /*  grab the first block plus as many
1492                  *  following blocks as will completely
1493                  *  fit in the read.
1494                  */
1495                 n = 0;
1496                 l = &first;
1497                 m = BLEN(b);
1498                 for (;;) {
1499                         *l = qremove(q);
1500                         l = &b->next;
1501                         n += m;
1502
1503                         b = q->bfirst;
1504                         if (b == NULL)
1505                                 break;
1506                         m = BLEN(b);
1507                         if (n + m > len)
1508                                 break;
1509                 }
1510         } else {
1511                 first = qremove(q);
1512                 n = BLEN(first);
1513         }
1514
1515         /* copy to user space outside of the ilock */
1516         spin_unlock_irqsave(&q->lock);
1517         b = bl2mem(vp, first, len);
1518         spin_lock_irqsave(&q->lock);
1519
1520         /* take care of any left over partial block */
1521         if (b != NULL) {
1522                 n -= BLEN(b);
1523                 if (q->state & Qmsg)
1524                         freeb(b);
1525                 else
1526                         qputback(q, b);
1527         }
1528
1529         /* restart producer */
1530         qwakeup_iunlock(q);
1531
1532         poperror();
1533         qunlock(&q->rlock);
1534         return n;
1535 }
1536
1537 static int qnotfull(void *a)
1538 {
1539         struct queue *q = a;
1540
1541         return q->len < q->limit || (q->state & Qclosed);
1542 }
1543
1544 uint32_t dropcnt;
1545
1546 /*
1547  *  add a block to a queue obeying flow control
1548  */
1549 long qbwrite(struct queue *q, struct block *b)
1550 {
1551         ERRSTACK(1);
1552         int n, dowakeup;
1553         volatile bool should_free_b = TRUE;
1554
1555         n = BLEN(b);
1556
1557         if (q->bypass) {
1558                 (*q->bypass) (q->arg, b);
1559                 return n;
1560         }
1561
1562         dowakeup = 0;
1563         qlock(&q->wlock);
1564         if (waserror()) {
1565                 if (b != NULL && should_free_b)
1566                         freeb(b);
1567                 qunlock(&q->wlock);
1568                 nexterror();
1569         }
1570
1571         spin_lock_irqsave(&q->lock);
1572
1573         /* give up if the queue is closed */
1574         if (q->state & Qclosed) {
1575                 spin_unlock_irqsave(&q->lock);
1576                 error(q->err);
1577         }
1578
1579         /* if nonblocking, don't queue over the limit */
1580         if (q->len >= q->limit) {
1581                 /* drop overflow takes priority over regular non-blocking */
1582                 if (q->state & Qdropoverflow) {
1583                         spin_unlock_irqsave(&q->lock);
1584                         freeb(b);
1585                         dropcnt += n;
1586                         qunlock(&q->wlock);
1587                         poperror();
1588                         return n;
1589                 }
1590                 if (q->state & Qnonblock) {
1591                         spin_unlock_irqsave(&q->lock);
1592                         freeb(b);
1593                         set_errno(EAGAIN);
1594                         error("queue full");
1595                 }
1596         }
1597
1598         /* queue the block */
1599         should_free_b = FALSE;
1600         if (q->bfirst)
1601                 q->blast->next = b;
1602         else
1603                 q->bfirst = b;
1604         q->blast = b;
1605         b->next = 0;
1606         q->len += BALLOC(b);
1607         q->dlen += n;
1608         QDEBUG checkb(b, "qbwrite");
1609         b = NULL;
1610
1611         /* make sure other end gets awakened */
1612         if (q->state & Qstarve) {
1613                 q->state &= ~Qstarve;
1614                 dowakeup = 1;
1615         }
1616         spin_unlock_irqsave(&q->lock);
1617
1618         /*  get output going again */
1619         if (q->kick && (dowakeup || (q->state & Qkick)))
1620                 q->kick(q->arg);
1621
1622         /* wakeup anyone consuming at the other end */
1623         if (dowakeup)
1624                 rendez_wakeup(&q->rr);
1625
1626         /*
1627          *  flow control, wait for queue to get below the limit
1628          *  before allowing the process to continue and queue
1629          *  more.  We do this here so that postnote can only
1630          *  interrupt us after the data has been queued.  This
1631          *  means that things like 9p flushes and ssl messages
1632          *  will not be disrupted by software interrupts.
1633          *
1634          *  Note - this is moderately dangerous since a process
1635          *  that keeps getting interrupted and rewriting will
1636          *  queue infinite crud.
1637          */
1638         for (;;) {
1639                 if ((q->state & (Qdropoverflow | Qnonblock)) || qnotfull(q))
1640                         break;
1641
1642                 spin_lock_irqsave(&q->lock);
1643                 q->state |= Qflow;
1644                 spin_unlock_irqsave(&q->lock);
1645                 rendez_sleep(&q->wr, qnotfull, q);
1646         }
1647
1648         qunlock(&q->wlock);
1649         poperror();
1650         return n;
1651 }
1652
1653 long qibwrite(struct queue *q, struct block *b)
1654 {
1655         int n, dowakeup;
1656
1657         dowakeup = 0;
1658
1659         n = BLEN(b);
1660
1661         spin_lock_irqsave(&q->lock);
1662
1663         QDEBUG checkb(b, "qibwrite");
1664         if (q->bfirst)
1665                 q->blast->next = b;
1666         else
1667                 q->bfirst = b;
1668         q->blast = b;
1669         q->len += BALLOC(b);
1670         q->dlen += n;
1671
1672         if (q->state & Qstarve) {
1673                 q->state &= ~Qstarve;
1674                 dowakeup = 1;
1675         }
1676
1677         spin_unlock_irqsave(&q->lock);
1678
1679         if (dowakeup) {
1680                 if (q->kick)
1681                         q->kick(q->arg);
1682                 rendez_wakeup(&q->rr);
1683         }
1684
1685         return n;
1686 }
1687
1688 /*
1689  *  write to a queue.  only Maxatomic bytes at a time is atomic.
1690  */
1691 int qwrite(struct queue *q, void *vp, int len)
1692 {
1693         int n, sofar;
1694         struct block *b;
1695         uint8_t *p = vp;
1696         void *ext_buf;
1697
1698         QDEBUG if (!islo())
1699                  printd("qwrite hi %p\n", getcallerpc(&q));
1700
1701         sofar = 0;
1702         do {
1703                 n = len - sofar;
1704                 /* This is 64K, the max amount per single block.  Still a good value? */
1705                 if (n > Maxatomic)
1706                         n = Maxatomic;
1707
1708                 /* If n is small, we don't need to bother with the extra_data.  But
1709                  * until the whole stack can handle extd blocks, we'll use them
1710                  * unconditionally. */
1711 #ifdef CONFIG_BLOCK_EXTRAS
1712                 /* allocb builds in 128 bytes of header space to all blocks, but this is
1713                  * only available via padblock (to the left).  we also need some space
1714                  * for pullupblock for some basic headers (like icmp) that get written
1715                  * in directly */
1716                 b = allocb(64);
1717                 ext_buf = kmalloc(n, 0);
1718                 memcpy(ext_buf, p + sofar, n);
1719                 block_add_extd(b, 1, KMALLOC_WAIT); /* returns 0 on success */
1720                 b->extra_data[0].base = (uintptr_t)ext_buf;
1721                 b->extra_data[0].off = 0;
1722                 b->extra_data[0].len = n;
1723                 b->extra_len += n;
1724 #else
1725                 b = allocb(n);
1726                 memmove(b->wp, p + sofar, n);
1727                 b->wp += n;
1728 #endif
1729                         
1730                 qbwrite(q, b);
1731
1732                 sofar += n;
1733         } while (sofar < len && (q->state & Qmsg) == 0);
1734
1735         return len;
1736 }
1737
1738 /*
1739  *  used by print() to write to a queue.  Since we may be splhi or not in
1740  *  a process, don't qlock.
1741  */
1742 int qiwrite(struct queue *q, void *vp, int len)
1743 {
1744         int n, sofar, dowakeup;
1745         struct block *b;
1746         uint8_t *p = vp;
1747
1748         dowakeup = 0;
1749
1750         sofar = 0;
1751         do {
1752                 n = len - sofar;
1753                 if (n > Maxatomic)
1754                         n = Maxatomic;
1755
1756                 b = iallocb(n);
1757                 if (b == NULL)
1758                         break;
1759                 /* TODO consider extra_data */
1760                 memmove(b->wp, p + sofar, n);
1761                 /* this adjusts BLEN to be n, or at least it should */
1762                 b->wp += n;
1763                 assert(n == BLEN(b));
1764                 qibwrite(q, b);
1765
1766                 sofar += n;
1767         } while (sofar < len && (q->state & Qmsg) == 0);
1768
1769         return sofar;
1770 }
1771
1772 /*
1773  *  be extremely careful when calling this,
1774  *  as there is no reference accounting
1775  */
1776 void qfree(struct queue *q)
1777 {
1778         qclose(q);
1779         kfree(q);
1780 }
1781
1782 /*
1783  *  Mark a queue as closed.  No further IO is permitted.
1784  *  All blocks are released.
1785  */
1786 void qclose(struct queue *q)
1787 {
1788         struct block *bfirst;
1789
1790         if (q == NULL)
1791                 return;
1792
1793         /* mark it */
1794         spin_lock_irqsave(&q->lock);
1795         q->state |= Qclosed;
1796         q->state &= ~(Qflow | Qstarve | Qdropoverflow | Qnonblock);
1797         strncpy(q->err, Ehungup, sizeof(q->err));
1798         bfirst = q->bfirst;
1799         q->bfirst = 0;
1800         q->len = 0;
1801         q->dlen = 0;
1802         spin_unlock_irqsave(&q->lock);
1803
1804         /* free queued blocks */
1805         freeblist(bfirst);
1806
1807         /* wake up readers/writers */
1808         rendez_wakeup(&q->rr);
1809         rendez_wakeup(&q->wr);
1810 }
1811
1812 /*
1813  *  Mark a queue as closed.  Wakeup any readers.  Don't remove queued
1814  *  blocks.
1815  */
1816 void qhangup(struct queue *q, char *msg)
1817 {
1818         /* mark it */
1819         spin_lock_irqsave(&q->lock);
1820         q->state |= Qclosed;
1821         if (msg == 0 || *msg == 0)
1822                 strncpy(q->err, Ehungup, sizeof(q->err));
1823         else
1824                 strncpy(q->err, msg, ERRMAX - 1);
1825         spin_unlock_irqsave(&q->lock);
1826
1827         /* wake up readers/writers */
1828         rendez_wakeup(&q->rr);
1829         rendez_wakeup(&q->wr);
1830 }
1831
1832 /*
1833  *  return non-zero if the q is hungup
1834  */
1835 int qisclosed(struct queue *q)
1836 {
1837         return q->state & Qclosed;
1838 }
1839
1840 /*
1841  *  mark a queue as no longer hung up
1842  */
1843 void qreopen(struct queue *q)
1844 {
1845         spin_lock_irqsave(&q->lock);
1846         q->state &= ~Qclosed;
1847         q->state |= Qstarve;
1848         q->eof = 0;
1849         q->limit = q->inilim;
1850         spin_unlock_irqsave(&q->lock);
1851 }
1852
1853 /*
1854  *  return bytes queued
1855  */
1856 int qlen(struct queue *q)
1857 {
1858         return q->dlen;
1859 }
1860
1861 /*
1862  * return space remaining before flow control
1863  */
1864 int qwindow(struct queue *q)
1865 {
1866         int l;
1867
1868         l = q->limit - q->len;
1869         if (l < 0)
1870                 l = 0;
1871         return l;
1872 }
1873
1874 /*
1875  *  return true if we can read without blocking
1876  */
1877 int qcanread(struct queue *q)
1878 {
1879         return q->bfirst != 0;
1880 }
1881
1882 /*
1883  *  change queue limit
1884  */
1885 void qsetlimit(struct queue *q, int limit)
1886 {
1887         q->limit = limit;
1888 }
1889
1890 /*
1891  *  set whether writes drop overflowing blocks, or if we sleep
1892  */
1893 void qdropoverflow(struct queue *q, bool onoff)
1894 {
1895         if (onoff)
1896                 q->state |= Qdropoverflow;
1897         else
1898                 q->state &= ~Qdropoverflow;
1899 }
1900
1901 /* set whether or not the queue is nonblocking, in the EAGAIN sense. */
1902 void qnonblock(struct queue *q, bool onoff)
1903 {
1904         if (onoff)
1905                 q->state |= Qnonblock;
1906         else
1907                 q->state &= ~Qnonblock;
1908 }
1909
1910 /*
1911  *  flush the output queue
1912  */
1913 void qflush(struct queue *q)
1914 {
1915         struct block *bfirst;
1916
1917         /* mark it */
1918         spin_lock_irqsave(&q->lock);
1919         bfirst = q->bfirst;
1920         q->bfirst = 0;
1921         q->len = 0;
1922         q->dlen = 0;
1923         spin_unlock_irqsave(&q->lock);
1924
1925         /* free queued blocks */
1926         freeblist(bfirst);
1927
1928         /* wake up readers/writers */
1929         rendez_wakeup(&q->wr);
1930 }
1931
1932 int qfull(struct queue *q)
1933 {
1934         return q->state & Qflow;
1935 }
1936
1937 int qstate(struct queue *q)
1938 {
1939         return q->state;
1940 }
1941
1942 void qdump(struct queue *q)
1943 {
1944         if (q)
1945                 printk("q=%p bfirst=%p blast=%p len=%d dlen=%d limit=%d state=#%x\n",
1946                            q, q->bfirst, q->blast, q->len, q->dlen, q->limit, q->state);
1947 }