Print block content and backtrace in PANIC_EXTRA
[akaros.git] / kern / src / ns / qio.c
1 // INFERNO
2 #include <vfs.h>
3 #include <kfs.h>
4 #include <slab.h>
5 #include <kmalloc.h>
6 #include <kref.h>
7 #include <string.h>
8 #include <stdio.h>
9 #include <assert.h>
10 #include <error.h>
11 #include <cpio.h>
12 #include <pmap.h>
13 #include <smp.h>
14 #include <ip.h>
15
16 #define PANIC_EXTRA(b)                                                  \
17 {                                                                       \
18         if ((b)->extra_len) {                                           \
19                 printblock(b);                                          \
20                 backtrace();                                            \
21                 panic("%s doesn't handle extra_data", __FUNCTION__);    \
22         }                                                               \
23 }
24
25 static uint32_t padblockcnt;
26 static uint32_t concatblockcnt;
27 static uint32_t pullupblockcnt;
28 static uint32_t copyblockcnt;
29 static uint32_t consumecnt;
30 static uint32_t producecnt;
31 static uint32_t qcopycnt;
32
33 static int debugging;
34
35 #define QDEBUG  if(0)
36
37 /*
38  *  IO queues
39  */
40
41 struct queue {
42         spinlock_t lock;;
43
44         struct block *bfirst;           /* buffer */
45         struct block *blast;
46
47         int len;                                        /* bytes allocated to queue */
48         int dlen;                                       /* data bytes in queue */
49         int limit;                                      /* max bytes in queue */
50         int inilim;                             /* initial limit */
51         int state;
52         int eof;                                        /* number of eofs read by user */
53
54         void (*kick) (void *);          /* restart output */
55         void (*bypass) (void *, struct block *);        /* bypass queue altogether */
56         void *arg;                                      /* argument to kick */
57
58         qlock_t rlock;                          /* mutex for reading processes */
59         struct rendez rr;                       /* process waiting to read */
60         qlock_t wlock;                          /* mutex for writing processes */
61         struct rendez wr;                       /* process waiting to write */
62         qio_wake_cb_t wake_cb;          /* callbacks for qio wakeups */
63         void *wake_data;
64
65         char err[ERRMAX];
66 };
67
68 enum {
69         Maxatomic = 64 * 1024,
70 };
71
72 unsigned int qiomaxatomic = Maxatomic;
73
74 /* Helper: fires a wake callback, sending 'filter' */
75 static void qwake_cb(struct queue *q, int filter)
76 {
77         if (q->wake_cb)
78                 q->wake_cb(q, q->wake_data, filter);
79 }
80
81 void ixsummary(void)
82 {
83         debugging ^= 1;
84         iallocsummary();
85         printd("pad %lu, concat %lu, pullup %lu, copy %lu\n",
86                    padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
87         printd("consume %lu, produce %lu, qcopy %lu\n",
88                    consumecnt, producecnt, qcopycnt);
89 }
90
91 /*
92  *  free a list of blocks
93  */
94 void freeblist(struct block *b)
95 {
96         struct block *next;
97
98         for (; b != 0; b = next) {
99                 next = b->next;
100                 b->next = 0;
101                 freeb(b);
102         }
103 }
104
105 /*
106  *  pad a block to the front (or the back if size is negative)
107  */
108 struct block *padblock(struct block *bp, int size)
109 {
110         int n;
111         struct block *nbp;
112         uint8_t bcksum = bp->flag & BCKSUM_FLAGS;
113         uint16_t checksum_start = bp->checksum_start;
114         uint16_t checksum_offset = bp->checksum_offset;
115         uint16_t mss = bp->mss;
116
117         QDEBUG checkb(bp, "padblock 1");
118         if (size >= 0) {
119                 if (bp->rp - bp->base >= size) {
120                         bp->checksum_start += size;
121                         bp->rp -= size;
122                         return bp;
123                 }
124
125                 PANIC_EXTRA(bp);
126                 if (bp->next)
127                         panic("padblock %p", getcallerpc(&bp));
128                 n = BLEN(bp);
129                 padblockcnt++;
130                 nbp = allocb(size + n);
131                 nbp->rp += size;
132                 nbp->wp = nbp->rp;
133                 memmove(nbp->wp, bp->rp, n);
134                 nbp->wp += n;
135                 freeb(bp);
136                 nbp->rp -= size;
137         } else {
138                 size = -size;
139
140                 PANIC_EXTRA(bp);
141
142                 if (bp->next)
143                         panic("padblock %p", getcallerpc(&bp));
144
145                 if (bp->lim - bp->wp >= size)
146                         return bp;
147
148                 n = BLEN(bp);
149                 padblockcnt++;
150                 nbp = allocb(size + n);
151                 memmove(nbp->wp, bp->rp, n);
152                 nbp->wp += n;
153                 freeb(bp);
154         }
155         if (bcksum) {
156                 nbp->flag |= bcksum;
157                 nbp->checksum_start = checksum_start;
158                 nbp->checksum_offset = checksum_offset;
159                 nbp->mss = mss;
160         }
161         QDEBUG checkb(nbp, "padblock 1");
162         return nbp;
163 }
164
165 /*
166  *  return count of bytes in a string of blocks
167  */
168 int blocklen(struct block *bp)
169 {
170         int len;
171
172         len = 0;
173         while (bp) {
174                 len += BLEN(bp);
175                 bp = bp->next;
176         }
177         return len;
178 }
179
180 /*
181  * return count of space in blocks
182  */
183 int blockalloclen(struct block *bp)
184 {
185         int len;
186
187         len = 0;
188         while (bp) {
189                 len += BALLOC(bp);
190                 bp = bp->next;
191         }
192         return len;
193 }
194
195 /*
196  *  copy the  string of blocks into
197  *  a single block and free the string
198  */
199 struct block *concatblock(struct block *bp)
200 {
201         int len;
202         struct block *nb, *f;
203
204         if (bp->next == 0)
205                 return bp;
206
207         /* probably use parts of qclone */
208         PANIC_EXTRA(bp);
209         nb = allocb(blocklen(bp));
210         for (f = bp; f; f = f->next) {
211                 len = BLEN(f);
212                 memmove(nb->wp, f->rp, len);
213                 nb->wp += len;
214         }
215         concatblockcnt += BLEN(nb);
216         freeblist(bp);
217         QDEBUG checkb(nb, "concatblock 1");
218         return nb;
219 }
220
221 /* Returns a block with the remaining contents of b all in the main body of the
222  * returned block.  Replace old references to b with the returned value (which
223  * may still be 'b', if no change was needed. */
224 struct block *linearizeblock(struct block *b)
225 {
226         struct block *newb;
227         size_t len;
228         struct extra_bdata *ebd;
229
230         if (!b->extra_len)
231                 return b;
232
233         newb = allocb(BLEN(b));
234         len = BHLEN(b);
235         memcpy(newb->wp, b->rp, len);
236         newb->wp += len;
237         len = b->extra_len;
238         for (int i = 0; (i < b->nr_extra_bufs) && len; i++) {
239                 ebd = &b->extra_data[i];
240                 if (!ebd->base || !ebd->len)
241                         continue;
242                 memcpy(newb->wp, (void*)(ebd->base + ebd->off), ebd->len);
243                 newb->wp += ebd->len;
244                 len -= ebd->len;
245         }
246         /* TODO: any other flags that need copied over? */
247         if (b->flag & BCKSUM_FLAGS) {
248                 newb->flag |= (b->flag & BCKSUM_FLAGS);
249                 newb->checksum_start = b->checksum_start;
250                 newb->checksum_offset = b->checksum_offset;
251                 newb->mss = b->mss;
252         }
253         freeb(b);
254         return newb;
255 }
256
257 /*
258  *  make sure the first block has at least n bytes in its main body
259  */
260 struct block *pullupblock(struct block *bp, int n)
261 {
262         int i, len, seglen;
263         struct block *nbp;
264         struct extra_bdata *ebd;
265
266         /*
267          *  this should almost always be true, it's
268          *  just to avoid every caller checking.
269          */
270         if (BHLEN(bp) >= n)
271                 return bp;
272
273          /* a start at explicit main-body / header management */
274         if (bp->extra_len) {
275                 if (n > bp->lim - bp->rp) {
276                         /* would need to realloc a new block and copy everything over. */
277                         panic("can't pullup %d bytes, no place to put it: bp->lim %p, bp->rp %p, bp->lim-bp->rp %d\n",
278                                         n, bp->lim, bp->rp, bp->lim-bp->rp);
279                 }
280                 len = n - BHLEN(bp);
281                 if (len > bp->extra_len)
282                         panic("pullup more than extra (%d, %d, %d)\n",
283                               n, BHLEN(bp), bp->extra_len);
284                 checkb(bp, "before pullup");
285                 for (int i = 0; (i < bp->nr_extra_bufs) && len; i++) {
286                         ebd = &bp->extra_data[i];
287                         if (!ebd->base || !ebd->len)
288                                 continue;
289                         seglen = MIN(ebd->len, len);
290                         memcpy(bp->wp, (void*)(ebd->base + ebd->off), seglen);
291                         bp->wp += seglen;
292                         len -= seglen;
293                         ebd->len -= seglen;
294                         ebd->off += seglen;
295                         bp->extra_len -= seglen;
296                         if (ebd->len == 0) {
297                                 kfree((void *)ebd->base);
298                                 ebd->off = 0;
299                                 ebd->base = 0;
300                         }
301                 }
302                 /* maybe just call pullupblock recursively here */
303                 if (len)
304                         panic("pullup %d bytes overdrawn\n", len);
305                 checkb(bp, "after pullup");
306                 return bp;
307         }
308
309         /*
310          *  if not enough room in the first block,
311          *  add another to the front of the list.
312          */
313         if (bp->lim - bp->rp < n) {
314                 nbp = allocb(n);
315                 nbp->next = bp;
316                 bp = nbp;
317         }
318
319         /*
320          *  copy bytes from the trailing blocks into the first
321          */
322         n -= BLEN(bp);
323         while ((nbp = bp->next)) {
324                 i = BLEN(nbp);
325                 if (i > n) {
326                         memmove(bp->wp, nbp->rp, n);
327                         pullupblockcnt++;
328                         bp->wp += n;
329                         nbp->rp += n;
330                         QDEBUG checkb(bp, "pullupblock 1");
331                         return bp;
332                 } else {
333                         memmove(bp->wp, nbp->rp, i);
334                         pullupblockcnt++;
335                         bp->wp += i;
336                         bp->next = nbp->next;
337                         nbp->next = 0;
338                         freeb(nbp);
339                         n -= i;
340                         if (n == 0) {
341                                 QDEBUG checkb(bp, "pullupblock 2");
342                                 return bp;
343                         }
344                 }
345         }
346         freeb(bp);
347         return 0;
348 }
349
350 /*
351  *  make sure the first block has at least n bytes in its main body
352  */
353 struct block *pullupqueue(struct queue *q, int n)
354 {
355         struct block *b;
356
357         /* TODO: lock to protect the queue links? */
358         if ((BHLEN(q->bfirst) >= n))
359                 return q->bfirst;
360         q->bfirst = pullupblock(q->bfirst, n);
361         for (b = q->bfirst; b != NULL && b->next != NULL; b = b->next) ;
362         q->blast = b;
363         return q->bfirst;
364 }
365
366 /* throw away count bytes from the front of
367  * block's extradata.  Returns count of bytes
368  * thrown away
369  */
370
371 static int pullext(struct block *bp, int count)
372 {
373         struct extra_bdata *ed;
374         int i, rem, bytes = 0;
375
376         for (i = 0; bp->extra_len && count && i < bp->nr_extra_bufs; i++) {
377                 ed = &bp->extra_data[i];
378                 rem = MIN(count, ed->len);
379                 bp->extra_len -= rem;
380                 count -= rem;
381                 bytes += rem;
382                 ed->off += rem;
383                 ed->len -= rem;
384                 if (ed->len == 0) {
385                         kfree((void *)ed->base);
386                         ed->base = 0;
387                         ed->off = 0;
388                 }
389         }
390         return bytes;
391 }
392
393 /* throw away count bytes from the end of a
394  * block's extradata.  Returns count of bytes
395  * thrown away
396  */
397
398 static int dropext(struct block *bp, int count)
399 {
400         struct extra_bdata *ed;
401         int i, rem, bytes = 0;
402
403         for (i = bp->nr_extra_bufs - 1; bp->extra_len && count && i >= 0; i--) {
404                 ed = &bp->extra_data[i];
405                 rem = MIN(count, ed->len);
406                 bp->extra_len -= rem;
407                 count -= rem;
408                 bytes += rem;
409                 ed->len -= rem;
410                 if (ed->len == 0) {
411                         kfree((void *)ed->base);
412                         ed->base = 0;
413                         ed->off = 0;
414                 }
415         }
416         return bytes;
417 }
418
419 /*
420  *  throw away up to count bytes from a
421  *  list of blocks.  Return count of bytes
422  *  thrown away.
423  */
424 static int _pullblock(struct block **bph, int count, int free)
425 {
426         struct block *bp;
427         int n, bytes;
428
429         bytes = 0;
430         if (bph == NULL)
431                 return 0;
432
433         while (*bph != NULL && count != 0) {
434                 bp = *bph;
435
436                 n = MIN(BHLEN(bp), count);
437                 bytes += n;
438                 count -= n;
439                 bp->rp += n;
440                 n = pullext(bp, count);
441                 bytes += n;
442                 count -= n;
443                 QDEBUG checkb(bp, "pullblock ");
444                 if (BLEN(bp) == 0 && (free || count)) {
445                         *bph = bp->next;
446                         bp->next = NULL;
447                         freeb(bp);
448                 }
449         }
450         return bytes;
451 }
452
453 int pullblock(struct block **bph, int count)
454 {
455         return _pullblock(bph, count, 1);
456 }
457
458 /*
459  *  trim to len bytes starting at offset
460  */
461 struct block *trimblock(struct block *bp, int offset, int len)
462 {
463         uint32_t l, trim;
464         int olen = len;
465
466         QDEBUG checkb(bp, "trimblock 1");
467         if (blocklen(bp) < offset + len) {
468                 freeblist(bp);
469                 return NULL;
470         }
471
472         l =_pullblock(&bp, offset, 0);
473         if (bp == NULL)
474                 return NULL;
475         if (l != offset) {
476                 freeblist(bp);
477                 return NULL;
478         }
479
480         while ((l = BLEN(bp)) < len) {
481                 len -= l;
482                 bp = bp->next;
483         }
484
485         trim = BLEN(bp) - len;
486         trim -= dropext(bp, trim);
487         bp->wp -= trim;
488
489         if (bp->next) {
490                 freeblist(bp->next);
491                 bp->next = NULL;
492         }
493         return bp;
494 }
495
496 /*
497  *  copy 'count' bytes into a new block
498  */
499 struct block *copyblock(struct block *bp, int count)
500 {
501         int l;
502         struct block *nbp;
503
504         QDEBUG checkb(bp, "copyblock 0");
505         nbp = allocb(count);
506         if (bp->flag & BCKSUM_FLAGS) {
507                 nbp->flag |= (bp->flag & BCKSUM_FLAGS);
508                 nbp->checksum_start = bp->checksum_start;
509                 nbp->checksum_offset = bp->checksum_offset;
510                 nbp->mss = bp->mss;
511         }
512         PANIC_EXTRA(bp);
513         for (; count > 0 && bp != 0; bp = bp->next) {
514                 l = BLEN(bp);
515                 if (l > count)
516                         l = count;
517                 memmove(nbp->wp, bp->rp, l);
518                 nbp->wp += l;
519                 count -= l;
520         }
521         if (count > 0) {
522                 memset(nbp->wp, 0, count);
523                 nbp->wp += count;
524         }
525         copyblockcnt++;
526         QDEBUG checkb(nbp, "copyblock 1");
527
528         return nbp;
529 }
530
531 struct block *adjustblock(struct block *bp, int len)
532 {
533         int n;
534         struct block *nbp;
535
536         if (len < 0) {
537                 freeb(bp);
538                 return NULL;
539         }
540
541         PANIC_EXTRA(bp);
542         if (bp->rp + len > bp->lim) {
543                 nbp = copyblock(bp, len);
544                 freeblist(bp);
545                 QDEBUG checkb(nbp, "adjustblock 1");
546
547                 return nbp;
548         }
549
550         n = BLEN(bp);
551         if (len > n)
552                 memset(bp->wp, 0, len - n);
553         bp->wp = bp->rp + len;
554         QDEBUG checkb(bp, "adjustblock 2");
555
556         return bp;
557 }
558
559
560 /*
561  *  get next block from a queue, return null if nothing there
562  */
563 struct block *qget(struct queue *q)
564 {
565         int dowakeup;
566         struct block *b;
567
568         /* sync with qwrite */
569         spin_lock_irqsave(&q->lock);
570
571         b = q->bfirst;
572         if (b == NULL) {
573                 q->state |= Qstarve;
574                 spin_unlock_irqsave(&q->lock);
575                 return NULL;
576         }
577         q->bfirst = b->next;
578         b->next = 0;
579         q->len -= BALLOC(b);
580         q->dlen -= BLEN(b);
581         QDEBUG checkb(b, "qget");
582
583         /* if writer flow controlled, restart */
584         if ((q->state & Qflow) && q->len < q->limit / 2) {
585                 q->state &= ~Qflow;
586                 dowakeup = 1;
587         } else
588                 dowakeup = 0;
589
590         spin_unlock_irqsave(&q->lock);
591
592         if (dowakeup) {
593                 rendez_wakeup(&q->wr);
594                 /* We only send the writable event on wakeup, which is edge triggered */
595                 qwake_cb(q, FDTAP_FILT_WRITABLE);
596         }
597
598         return b;
599 }
600
601 /*
602  *  throw away the next 'len' bytes in the queue
603  * returning the number actually discarded
604  */
605 int qdiscard(struct queue *q, int len)
606 {
607         struct block *b;
608         int dowakeup, n, sofar, body_amt, extra_amt;
609         struct extra_bdata *ebd;
610
611         spin_lock_irqsave(&q->lock);
612         for (sofar = 0; sofar < len; sofar += n) {
613                 b = q->bfirst;
614                 if (b == NULL)
615                         break;
616                 QDEBUG checkb(b, "qdiscard");
617                 n = BLEN(b);
618                 if (n <= len - sofar) {
619                         q->bfirst = b->next;
620                         b->next = 0;
621                         q->len -= BALLOC(b);
622                         q->dlen -= BLEN(b);
623                         freeb(b);
624                 } else {
625                         n = len - sofar;
626                         q->dlen -= n;
627                         /* partial block removal */
628                         body_amt = MIN(BHLEN(b), n);
629                         b->rp += body_amt;
630                         extra_amt = n - body_amt;
631                         /* reduce q->len by the amount we remove from the extras.  The
632                          * header will always be accounted for above, during block removal.
633                          * */
634                         q->len -= extra_amt;
635                         for (int i = 0; (i < b->nr_extra_bufs) && extra_amt; i++) {
636                                 ebd = &b->extra_data[i];
637                                 if (!ebd->base || !ebd->len)
638                                         continue;
639                                 if (extra_amt >= ebd->len) {
640                                         /* remove the entire entry, note the kfree release */
641                                         b->extra_len -= ebd->len;
642                                         extra_amt -= ebd->len;
643                                         kfree((void*)ebd->base);
644                                         ebd->base = ebd->off = ebd->len = 0;
645                                         continue;
646                                 }
647                                 ebd->off += extra_amt;
648                                 ebd->len -= extra_amt;
649                                 b->extra_len -= extra_amt;
650                                 extra_amt = 0;
651                         }
652                 }
653         }
654
655         /*
656          *  if writer flow controlled, restart
657          *
658          *  This used to be
659          *  q->len < q->limit/2
660          *  but it slows down tcp too much for certain write sizes.
661          *  I really don't understand it completely.  It may be
662          *  due to the queue draining so fast that the transmission
663          *  stalls waiting for the app to produce more data.  - presotto
664          */
665         if ((q->state & Qflow) && q->len < q->limit) {
666                 q->state &= ~Qflow;
667                 dowakeup = 1;
668         } else
669                 dowakeup = 0;
670
671         spin_unlock_irqsave(&q->lock);
672
673         if (dowakeup) {
674                 rendez_wakeup(&q->wr);
675                 qwake_cb(q, FDTAP_FILT_WRITABLE);
676         }
677
678         return sofar;
679 }
680
681 /*
682  *  Interrupt level copy out of a queue, return # bytes copied.
683  */
684 int qconsume(struct queue *q, void *vp, int len)
685 {
686         struct block *b;
687         int n, dowakeup;
688         uint8_t *p = vp;
689         struct block *tofree = NULL;
690
691         /* sync with qwrite */
692         spin_lock_irqsave(&q->lock);
693
694         for (;;) {
695                 b = q->bfirst;
696                 if (b == 0) {
697                         q->state |= Qstarve;
698                         spin_unlock_irqsave(&q->lock);
699                         return -1;
700                 }
701                 QDEBUG checkb(b, "qconsume 1");
702
703                 n = BLEN(b);
704                 if (n > 0)
705                         break;
706                 q->bfirst = b->next;
707                 q->len -= BALLOC(b);
708
709                 /* remember to free this */
710                 b->next = tofree;
711                 tofree = b;
712         };
713
714         PANIC_EXTRA(b);
715         if (n < len)
716                 len = n;
717         memmove(p, b->rp, len);
718         consumecnt += n;
719         b->rp += len;
720         q->dlen -= len;
721
722         /* discard the block if we're done with it */
723         if ((q->state & Qmsg) || len == n) {
724                 q->bfirst = b->next;
725                 b->next = 0;
726                 q->len -= BALLOC(b);
727                 q->dlen -= BLEN(b);
728
729                 /* remember to free this */
730                 b->next = tofree;
731                 tofree = b;
732         }
733
734         /* if writer flow controlled, restart */
735         if ((q->state & Qflow) && q->len < q->limit / 2) {
736                 q->state &= ~Qflow;
737                 dowakeup = 1;
738         } else
739                 dowakeup = 0;
740
741         spin_unlock_irqsave(&q->lock);
742
743         if (dowakeup) {
744                 rendez_wakeup(&q->wr);
745                 qwake_cb(q, FDTAP_FILT_WRITABLE);
746         }
747
748         if (tofree != NULL)
749                 freeblist(tofree);
750
751         return len;
752 }
753
754 int qpass(struct queue *q, struct block *b)
755 {
756         int dlen, len, dowakeup;
757
758         /* sync with qread */
759         dowakeup = 0;
760         spin_lock_irqsave(&q->lock);
761         if (q->len >= q->limit) {
762                 freeblist(b);
763                 spin_unlock_irqsave(&q->lock);
764                 return -1;
765         }
766         if (q->state & Qclosed) {
767                 len = blocklen(b);
768                 freeblist(b);
769                 spin_unlock_irqsave(&q->lock);
770                 return len;
771         }
772
773         /* add buffer to queue */
774         if (q->bfirst)
775                 q->blast->next = b;
776         else
777                 q->bfirst = b;
778         len = BALLOC(b);
779         dlen = BLEN(b);
780         QDEBUG checkb(b, "qpass");
781         while (b->next) {
782                 b = b->next;
783                 QDEBUG checkb(b, "qpass");
784                 len += BALLOC(b);
785                 dlen += BLEN(b);
786         }
787         q->blast = b;
788         q->len += len;
789         q->dlen += dlen;
790
791         if (q->len >= q->limit / 2)
792                 q->state |= Qflow;
793
794         if (q->state & Qstarve) {
795                 q->state &= ~Qstarve;
796                 dowakeup = 1;
797         }
798         spin_unlock_irqsave(&q->lock);
799
800         if (dowakeup) {
801                 rendez_wakeup(&q->rr);
802                 qwake_cb(q, FDTAP_FILT_READABLE);
803         }
804
805         return len;
806 }
807
808 int qpassnolim(struct queue *q, struct block *b)
809 {
810         int dlen, len, dowakeup;
811
812         /* sync with qread */
813         dowakeup = 0;
814         spin_lock_irqsave(&q->lock);
815
816         if (q->state & Qclosed) {
817                 freeblist(b);
818                 spin_unlock_irqsave(&q->lock);
819                 return BALLOC(b);
820         }
821
822         /* add buffer to queue */
823         if (q->bfirst)
824                 q->blast->next = b;
825         else
826                 q->bfirst = b;
827         len = BALLOC(b);
828         dlen = BLEN(b);
829         QDEBUG checkb(b, "qpass");
830         while (b->next) {
831                 b = b->next;
832                 QDEBUG checkb(b, "qpass");
833                 len += BALLOC(b);
834                 dlen += BLEN(b);
835         }
836         q->blast = b;
837         q->len += len;
838         q->dlen += dlen;
839
840         if (q->len >= q->limit / 2)
841                 q->state |= Qflow;
842
843         if (q->state & Qstarve) {
844                 q->state &= ~Qstarve;
845                 dowakeup = 1;
846         }
847         spin_unlock_irqsave(&q->lock);
848
849         if (dowakeup) {
850                 rendez_wakeup(&q->rr);
851                 qwake_cb(q, FDTAP_FILT_READABLE);
852         }
853
854         return len;
855 }
856
857 /*
858  *  if the allocated space is way out of line with the used
859  *  space, reallocate to a smaller block
860  */
861 struct block *packblock(struct block *bp)
862 {
863         struct block **l, *nbp;
864         int n;
865
866         if (bp->extra_len)
867                 return bp;
868         for (l = &bp; *l; l = &(*l)->next) {
869                 nbp = *l;
870                 n = BLEN(nbp);
871                 if ((n << 2) < BALLOC(nbp)) {
872                         *l = allocb(n);
873                         memmove((*l)->wp, nbp->rp, n);
874                         (*l)->wp += n;
875                         (*l)->next = nbp->next;
876                         freeb(nbp);
877                 }
878         }
879
880         return bp;
881 }
882
883 int qproduce(struct queue *q, void *vp, int len)
884 {
885         struct block *b;
886         int dowakeup;
887         uint8_t *p = vp;
888
889         /* sync with qread */
890         dowakeup = 0;
891         spin_lock_irqsave(&q->lock);
892
893         /* no waiting receivers, room in buffer? */
894         if (q->len >= q->limit) {
895                 q->state |= Qflow;
896                 spin_unlock_irqsave(&q->lock);
897                 return -1;
898         }
899
900         /* save in buffer */
901         /* use Qcoalesce here to save storage */
902         // TODO: Consider removing the Qcoalesce flag and force a coalescing
903         // strategy by default.
904         b = q->blast;
905         if ((q->state & Qcoalesce) == 0 || q->bfirst == NULL
906                 || b->lim - b->wp < len) {
907                 /* need a new block */
908                 b = iallocb(len);
909                 if (b == 0) {
910                         spin_unlock_irqsave(&q->lock);
911                         return 0;
912                 }
913                 if (q->bfirst)
914                         q->blast->next = b;
915                 else
916                         q->bfirst = b;
917                 q->blast = b;
918                 /* b->next = 0; done by iallocb() */
919                 q->len += BALLOC(b);
920         }
921         PANIC_EXTRA(b);
922         memmove(b->wp, p, len);
923         producecnt += len;
924         b->wp += len;
925         q->dlen += len;
926         QDEBUG checkb(b, "qproduce");
927
928         if (q->state & Qstarve) {
929                 q->state &= ~Qstarve;
930                 dowakeup = 1;
931         }
932
933         if (q->len >= q->limit)
934                 q->state |= Qflow;
935         spin_unlock_irqsave(&q->lock);
936
937         if (dowakeup) {
938                 rendez_wakeup(&q->rr);
939                 qwake_cb(q, FDTAP_FILT_READABLE);
940         }
941
942         return len;
943 }
944
945 /* Add an extra_data entry to newb at newb_idx pointing to b's body, starting at
946  * body_rp, for up to len.  Returns the len consumed. 
947  *
948  * The base is 'b', so that we can kfree it later.  This currently ties us to
949  * using kfree for the release method for all extra_data.
950  *
951  * It is possible to have a body size that is 0, if there is no offset, and
952  * b->wp == b->rp.  This will have an extra data entry of 0 length. */
953 static size_t point_to_body(struct block *b, uint8_t *body_rp,
954                             struct block *newb, unsigned int newb_idx,
955                             size_t len)
956 {
957         struct extra_bdata *ebd = &newb->extra_data[newb_idx];
958
959         assert(newb_idx < newb->nr_extra_bufs);
960
961         kmalloc_incref(b);
962         ebd->base = (uintptr_t)b;
963         ebd->off = (uint32_t)(body_rp - (uint8_t*)b);
964         ebd->len = MIN(b->wp - body_rp, len);   /* think of body_rp as b->rp */
965         assert((int)ebd->len >= 0);
966         newb->extra_len += ebd->len;
967         return ebd->len;
968 }
969
970 /* Add an extra_data entry to newb at newb_idx pointing to b's b_idx'th
971  * extra_data buf, at b_off within that buffer, for up to len.  Returns the len
972  * consumed.
973  *
974  * We can have blocks with 0 length, but they are still refcnt'd.  See above. */
975 static size_t point_to_buf(struct block *b, unsigned int b_idx, uint32_t b_off,
976                            struct block *newb, unsigned int newb_idx,
977                            size_t len)
978 {
979         struct extra_bdata *n_ebd = &newb->extra_data[newb_idx];
980         struct extra_bdata *b_ebd = &b->extra_data[b_idx];
981
982         assert(b_idx < b->nr_extra_bufs);
983         assert(newb_idx < newb->nr_extra_bufs);
984
985         kmalloc_incref((void*)b_ebd->base);
986         n_ebd->base = b_ebd->base;
987         n_ebd->off = b_ebd->off + b_off;
988         n_ebd->len = MIN(b_ebd->len - b_off, len);
989         newb->extra_len += n_ebd->len;
990         return n_ebd->len;
991 }
992
993 /* given a string of blocks, fills the new block's extra_data  with the contents
994  * of the blist [offset, len + offset)
995  *
996  * returns 0 on success.  the only failure is if the extra_data array was too
997  * small, so this returns a positive integer saying how big the extra_data needs
998  * to be.
999  *
1000  * callers are responsible for protecting the list structure. */
1001 static int __blist_clone_to(struct block *blist, struct block *newb, int len,
1002                             uint32_t offset)
1003 {
1004         struct block *b, *first;
1005         unsigned int nr_bufs = 0;
1006         unsigned int b_idx, newb_idx = 0;
1007         uint8_t *first_main_body = 0;
1008
1009         /* find the first block; keep offset relative to the latest b in the list */
1010         for (b = blist; b; b = b->next) {
1011                 if (BLEN(b) > offset)
1012                         break;
1013                 offset -= BLEN(b);
1014         }
1015         /* qcopy semantics: if you asked for an offset outside the block list, you
1016          * get an empty block back */
1017         if (!b)
1018                 return 0;
1019         first = b;
1020         /* upper bound for how many buffers we'll need in newb */
1021         for (/* b is set*/; b; b = b->next) {
1022                 nr_bufs += 1 + b->nr_extra_bufs;        /* 1 for the main body */
1023         }
1024         /* we might be holding a spinlock here, so we won't wait for kmalloc */
1025         block_add_extd(newb, nr_bufs, 0);
1026         if (newb->nr_extra_bufs < nr_bufs) {
1027                 /* caller will need to alloc these, then re-call us */
1028                 return nr_bufs;
1029         }
1030         for (b = first; b && len; b = b->next) {
1031                 b_idx = 0;
1032                 if (offset) {
1033                         if (offset < BHLEN(b)) {
1034                                 /* off is in the main body */
1035                                 len -= point_to_body(b, b->rp + offset, newb, newb_idx, len);
1036                                 newb_idx++;
1037                         } else {
1038                                 /* off is in one of the buffers (or just past the last one).
1039                                  * we're not going to point to b's main body at all. */
1040                                 offset -= BHLEN(b);
1041                                 assert(b->extra_data);
1042                                 /* assuming these extrabufs are packed, or at least that len
1043                                  * isn't gibberish */
1044                                 while (b->extra_data[b_idx].len <= offset) {
1045                                         offset -= b->extra_data[b_idx].len;
1046                                         b_idx++;
1047                                 }
1048                                 /* now offset is set to our offset in the b_idx'th buf */
1049                                 len -= point_to_buf(b, b_idx, offset, newb, newb_idx, len);
1050                                 newb_idx++;
1051                                 b_idx++;
1052                         }
1053                         offset = 0;
1054                 } else {
1055                         len -= point_to_body(b, b->rp, newb, newb_idx, len);
1056                         newb_idx++;
1057                 }
1058                 /* knock out all remaining bufs.  we only did one point_to_ op by now,
1059                  * and any point_to_ could be our last if it consumed all of len. */
1060                 for (int i = b_idx; (i < b->nr_extra_bufs) && len; i++) {
1061                         len -= point_to_buf(b, i, 0, newb, newb_idx, len);
1062                         newb_idx++;
1063                 }
1064         }
1065         return 0;
1066 }
1067
1068 struct block *blist_clone(struct block *blist, int header_len, int len,
1069                           uint32_t offset)
1070 {
1071         int ret;
1072         struct block *newb = allocb(header_len);
1073         do {
1074                 ret = __blist_clone_to(blist, newb, len, offset);
1075                 if (ret)
1076                         block_add_extd(newb, ret, KMALLOC_WAIT);
1077         } while (ret);
1078         return newb;
1079 }
1080
1081 /* given a queue, makes a single block with header_len reserved space in the
1082  * block main body, and the contents of [offset, len + offset) pointed to in the
1083  * new blocks ext_data. */
1084 struct block *qclone(struct queue *q, int header_len, int len, uint32_t offset)
1085 {
1086         int ret;
1087         struct block *newb = allocb(header_len);
1088         /* the while loop should rarely be used: it would require someone
1089          * concurrently adding to the queue. */
1090         do {
1091                 /* TODO: RCU: protecting the q list (b->next) (need read lock) */
1092                 spin_lock_irqsave(&q->lock);
1093                 ret = __blist_clone_to(q->bfirst, newb, len, offset);
1094                 spin_unlock_irqsave(&q->lock);
1095                 if (ret)
1096                         block_add_extd(newb, ret, KMALLOC_WAIT);
1097         } while (ret);
1098         return newb;
1099 }
1100
1101 /*
1102  *  copy from offset in the queue
1103  */
1104 struct block *qcopy_old(struct queue *q, int len, uint32_t offset)
1105 {
1106         int sofar;
1107         int n;
1108         struct block *b, *nb;
1109         uint8_t *p;
1110
1111         nb = allocb(len);
1112
1113         spin_lock_irqsave(&q->lock);
1114
1115         /* go to offset */
1116         b = q->bfirst;
1117         for (sofar = 0;; sofar += n) {
1118                 if (b == NULL) {
1119                         spin_unlock_irqsave(&q->lock);
1120                         return nb;
1121                 }
1122                 n = BLEN(b);
1123                 if (sofar + n > offset) {
1124                         p = b->rp + offset - sofar;
1125                         n -= offset - sofar;
1126                         break;
1127                 }
1128                 QDEBUG checkb(b, "qcopy");
1129                 b = b->next;
1130         }
1131
1132         /* copy bytes from there */
1133         for (sofar = 0; sofar < len;) {
1134                 if (n > len - sofar)
1135                         n = len - sofar;
1136                 PANIC_EXTRA(b);
1137                 memmove(nb->wp, p, n);
1138                 qcopycnt += n;
1139                 sofar += n;
1140                 nb->wp += n;
1141                 b = b->next;
1142                 if (b == NULL)
1143                         break;
1144                 n = BLEN(b);
1145                 p = b->rp;
1146         }
1147         spin_unlock_irqsave(&q->lock);
1148
1149         return nb;
1150 }
1151
1152 struct block *qcopy(struct queue *q, int len, uint32_t offset)
1153 {
1154 #ifdef CONFIG_BLOCK_EXTRAS
1155         return qclone(q, 0, len, offset);
1156 #else
1157         return qcopy_old(q, len, offset);
1158 #endif
1159 }
1160
1161 static void qinit_common(struct queue *q)
1162 {
1163         spinlock_init_irqsave(&q->lock);
1164         qlock_init(&q->rlock);
1165         qlock_init(&q->wlock);
1166         rendez_init(&q->rr);
1167         rendez_init(&q->wr);
1168 }
1169
1170 /*
1171  *  called by non-interrupt code
1172  */
1173 struct queue *qopen(int limit, int msg, void (*kick) (void *), void *arg)
1174 {
1175         struct queue *q;
1176
1177         q = kzmalloc(sizeof(struct queue), 0);
1178         if (q == 0)
1179                 return 0;
1180         qinit_common(q);
1181
1182         q->limit = q->inilim = limit;
1183         q->kick = kick;
1184         q->arg = arg;
1185         q->state = msg;
1186         q->state |= Qstarve;
1187         q->eof = 0;
1188
1189         return q;
1190 }
1191
1192 /* open a queue to be bypassed */
1193 struct queue *qbypass(void (*bypass) (void *, struct block *), void *arg)
1194 {
1195         struct queue *q;
1196
1197         q = kzmalloc(sizeof(struct queue), 0);
1198         if (q == 0)
1199                 return 0;
1200         qinit_common(q);
1201
1202         q->limit = 0;
1203         q->arg = arg;
1204         q->bypass = bypass;
1205         q->state = 0;
1206
1207         return q;
1208 }
1209
1210 static int notempty(void *a)
1211 {
1212         struct queue *q = a;
1213
1214         return (q->state & Qclosed) || q->bfirst != 0;
1215 }
1216
1217 /* Wait for the queue to be non-empty or closed.  Returns TRUE for a successful
1218  * wait, FALSE on Qclose (without error)
1219  *
1220  * Called with q ilocked.  May error out, back through the caller, with
1221  * the irqsave lock unlocked.  */
1222 static bool qwait(struct queue *q)
1223 {
1224         /* wait for data */
1225         for (;;) {
1226                 if (q->bfirst != NULL)
1227                         break;
1228
1229                 if (q->state & Qclosed) {
1230                         if (++q->eof > 3) {
1231                                 spin_unlock_irqsave(&q->lock);
1232                                 error("multiple reads on a closed queue");
1233                         }
1234                         if (*q->err && strcmp(q->err, Ehungup) != 0) {
1235                                 spin_unlock_irqsave(&q->lock);
1236                                 error(q->err);
1237                         }
1238                         return FALSE;
1239                 }
1240                 /* We set Qstarve regardless of whether we are non-blocking or not.
1241                  * Qstarve tracks the edge detection of the queue being empty. */
1242                 q->state |= Qstarve;
1243                 if (q->state & Qnonblock) {
1244                         spin_unlock_irqsave(&q->lock);
1245                         set_errno(EAGAIN);
1246                         error("queue empty");
1247                 }
1248                 spin_unlock_irqsave(&q->lock);
1249                 /* may throw an error() */
1250                 rendez_sleep(&q->rr, notempty, q);
1251                 spin_lock_irqsave(&q->lock);
1252         }
1253         return TRUE;
1254 }
1255
1256 /*
1257  * add a block list to a queue
1258  */
1259 void qaddlist(struct queue *q, struct block *b)
1260 {
1261         /* TODO: q lock? */
1262         /* queue the block */
1263         if (q->bfirst)
1264                 q->blast->next = b;
1265         else
1266                 q->bfirst = b;
1267         q->len += blockalloclen(b);
1268         q->dlen += blocklen(b);
1269         while (b->next)
1270                 b = b->next;
1271         q->blast = b;
1272 }
1273
1274 /*
1275  *  called with q ilocked
1276  */
1277 struct block *qremove(struct queue *q)
1278 {
1279         struct block *b;
1280
1281         b = q->bfirst;
1282         if (b == NULL)
1283                 return NULL;
1284         q->bfirst = b->next;
1285         b->next = NULL;
1286         q->dlen -= BLEN(b);
1287         q->len -= BALLOC(b);
1288         QDEBUG checkb(b, "qremove");
1289         return b;
1290 }
1291
1292 static size_t read_from_block(struct block *b, uint8_t *to, size_t amt)
1293 {
1294         size_t copy_amt, retval = 0;
1295         struct extra_bdata *ebd;
1296         
1297         copy_amt = MIN(BHLEN(b), amt);
1298         memcpy(to, b->rp, copy_amt);
1299         /* advance the rp, since this block not be completely consumed and future
1300          * reads need to know where to pick up from */
1301         b->rp += copy_amt;
1302         to += copy_amt;
1303         amt -= copy_amt;
1304         retval += copy_amt;
1305         for (int i = 0; (i < b->nr_extra_bufs) && amt; i++) {
1306                 ebd = &b->extra_data[i];
1307                 /* skip empty entires.  if we track this in the struct block, we can
1308                  * just start the for loop early */
1309                 if (!ebd->base || !ebd->len)
1310                         continue;
1311                 copy_amt = MIN(ebd->len, amt);
1312                 memcpy(to, (void*)(ebd->base + ebd->off), copy_amt);
1313                 /* we're actually consuming the entries, just like how we advance rp up
1314                  * above, and might only consume part of one. */
1315                 ebd->len -= copy_amt;
1316                 ebd->off += copy_amt;
1317                 b->extra_len -= copy_amt;
1318                 if (!ebd->len) {
1319                         /* we don't actually have to decref here.  it's also done in
1320                          * freeb().  this is the earliest we can free. */
1321                         kfree((void*)ebd->base);
1322                         ebd->base = ebd->off = 0;
1323                 }
1324                 to += copy_amt;
1325                 amt -= copy_amt;
1326                 retval += copy_amt;
1327         }
1328         return retval;
1329 }
1330
1331 /*
1332  *  copy the contents of a string of blocks into
1333  *  memory.  emptied blocks are freed.  return
1334  *  pointer to first unconsumed block.
1335  */
1336 struct block *bl2mem(uint8_t * p, struct block *b, int n)
1337 {
1338         int i;
1339         struct block *next;
1340
1341         /* could be slicker here, since read_from_block is smart */
1342         for (; b != NULL; b = next) {
1343                 i = BLEN(b);
1344                 if (i > n) {
1345                         /* partial block, consume some */
1346                         read_from_block(b, p, n);
1347                         return b;
1348                 }
1349                 /* full block, consume all and move on */
1350                 i = read_from_block(b, p, i);
1351                 n -= i;
1352                 p += i;
1353                 next = b->next;
1354                 freeb(b);
1355         }
1356         return NULL;
1357 }
1358
1359 /*
1360  *  copy the contents of memory into a string of blocks.
1361  *  return NULL on error.
1362  */
1363 struct block *mem2bl(uint8_t * p, int len)
1364 {
1365         ERRSTACK(1);
1366         int n;
1367         struct block *b, *first, **l;
1368
1369         first = NULL;
1370         l = &first;
1371         if (waserror()) {
1372                 freeblist(first);
1373                 nexterror();
1374         }
1375         do {
1376                 n = len;
1377                 if (n > Maxatomic)
1378                         n = Maxatomic;
1379
1380                 *l = b = allocb(n);
1381                 /* TODO consider extra_data */
1382                 memmove(b->wp, p, n);
1383                 b->wp += n;
1384                 p += n;
1385                 len -= n;
1386                 l = &b->next;
1387         } while (len > 0);
1388         poperror();
1389
1390         return first;
1391 }
1392
1393 /*
1394  *  put a block back to the front of the queue
1395  *  called with q ilocked
1396  */
1397 void qputback(struct queue *q, struct block *b)
1398 {
1399         b->next = q->bfirst;
1400         if (q->bfirst == NULL)
1401                 q->blast = b;
1402         q->bfirst = b;
1403         q->len += BALLOC(b);
1404         q->dlen += BLEN(b);
1405 }
1406
1407 /*
1408  *  flow control, get producer going again
1409  *  called with q ilocked
1410  */
1411 static void qwakeup_iunlock(struct queue *q)
1412 {
1413         int dowakeup = 0;
1414
1415         /* if writer flow controlled, restart */
1416         if ((q->state & Qflow) && q->len < q->limit / 2) {
1417                 q->state &= ~Qflow;
1418                 dowakeup = 1;
1419         }
1420
1421         spin_unlock_irqsave(&q->lock);
1422
1423         /* wakeup flow controlled writers */
1424         if (dowakeup) {
1425                 if (q->kick)
1426                         q->kick(q->arg);
1427                 rendez_wakeup(&q->wr);
1428                 qwake_cb(q, FDTAP_FILT_WRITABLE);
1429         }
1430 }
1431
1432 /*
1433  *  get next block from a queue (up to a limit)
1434  */
1435 struct block *qbread(struct queue *q, int len)
1436 {
1437         ERRSTACK(1);
1438         struct block *b, *nb;
1439         int n;
1440
1441         qlock(&q->rlock);
1442         if (waserror()) {
1443                 qunlock(&q->rlock);
1444                 nexterror();
1445         }
1446
1447         spin_lock_irqsave(&q->lock);
1448         if (!qwait(q)) {
1449                 /* queue closed */
1450                 spin_unlock_irqsave(&q->lock);
1451                 qunlock(&q->rlock);
1452                 poperror();
1453                 return NULL;
1454         }
1455
1456         /* if we get here, there's at least one block in the queue */
1457         b = qremove(q);
1458         n = BLEN(b);
1459
1460         /* split block if it's too big and this is not a message queue */
1461         nb = b;
1462         if (n > len) {
1463                 PANIC_EXTRA(b);
1464                 if ((q->state & Qmsg) == 0) {
1465                         n -= len;
1466                         b = allocb(n);
1467                         memmove(b->wp, nb->rp + len, n);
1468                         b->wp += n;
1469                         qputback(q, b);
1470                 }
1471                 nb->wp = nb->rp + len;
1472         }
1473
1474         /* restart producer */
1475         qwakeup_iunlock(q);
1476
1477         poperror();
1478         qunlock(&q->rlock);
1479         return nb;
1480 }
1481
1482 /*
1483  *  read a queue.  if no data is queued, post a struct block
1484  *  and wait on its Rendez.
1485  */
1486 long qread(struct queue *q, void *vp, int len)
1487 {
1488         ERRSTACK(1);
1489         struct block *b, *first, **l;
1490         int m, n;
1491
1492         qlock(&q->rlock);
1493         if (waserror()) {
1494                 qunlock(&q->rlock);
1495                 nexterror();
1496         }
1497
1498         spin_lock_irqsave(&q->lock);
1499 again:
1500         if (!qwait(q)) {
1501                 /* queue closed */
1502                 spin_unlock_irqsave(&q->lock);
1503                 qunlock(&q->rlock);
1504                 poperror();
1505                 return 0;
1506         }
1507
1508         /* if we get here, there's at least one block in the queue */
1509         // TODO: Consider removing the Qcoalesce flag and force a coalescing
1510         // strategy by default.
1511         if (q->state & Qcoalesce) {
1512                 /* when coalescing, 0 length blocks just go away */
1513                 b = q->bfirst;
1514                 if (BLEN(b) <= 0) {
1515                         freeb(qremove(q));
1516                         goto again;
1517                 }
1518
1519                 /*  grab the first block plus as many
1520                  *  following blocks as will completely
1521                  *  fit in the read.
1522                  */
1523                 n = 0;
1524                 l = &first;
1525                 m = BLEN(b);
1526                 for (;;) {
1527                         *l = qremove(q);
1528                         l = &b->next;
1529                         n += m;
1530
1531                         b = q->bfirst;
1532                         if (b == NULL)
1533                                 break;
1534                         m = BLEN(b);
1535                         if (n + m > len)
1536                                 break;
1537                 }
1538         } else {
1539                 first = qremove(q);
1540                 n = BLEN(first);
1541         }
1542
1543         /* copy to user space outside of the ilock */
1544         spin_unlock_irqsave(&q->lock);
1545         b = bl2mem(vp, first, len);
1546         spin_lock_irqsave(&q->lock);
1547
1548         /* take care of any left over partial block */
1549         if (b != NULL) {
1550                 n -= BLEN(b);
1551                 if (q->state & Qmsg)
1552                         freeb(b);
1553                 else
1554                         qputback(q, b);
1555         }
1556
1557         /* restart producer */
1558         qwakeup_iunlock(q);
1559
1560         poperror();
1561         qunlock(&q->rlock);
1562         return n;
1563 }
1564
1565 static int qnotfull(void *a)
1566 {
1567         struct queue *q = a;
1568
1569         return q->len < q->limit || (q->state & Qclosed);
1570 }
1571
1572 uint32_t dropcnt;
1573
1574 /*
1575  *  add a block to a queue obeying flow control
1576  */
1577 long qbwrite(struct queue *q, struct block *b)
1578 {
1579         ERRSTACK(1);
1580         int n, dowakeup;
1581         volatile bool should_free_b = TRUE;
1582
1583         n = BLEN(b);
1584
1585         if (q->bypass) {
1586                 (*q->bypass) (q->arg, b);
1587                 return n;
1588         }
1589
1590         dowakeup = 0;
1591         qlock(&q->wlock);
1592         if (waserror()) {
1593                 if (b != NULL && should_free_b)
1594                         freeb(b);
1595                 qunlock(&q->wlock);
1596                 nexterror();
1597         }
1598
1599         spin_lock_irqsave(&q->lock);
1600
1601         /* give up if the queue is closed */
1602         if (q->state & Qclosed) {
1603                 spin_unlock_irqsave(&q->lock);
1604                 error(q->err);
1605         }
1606
1607         /* if nonblocking, don't queue over the limit */
1608         if (q->len >= q->limit) {
1609                 /* drop overflow takes priority over regular non-blocking */
1610                 if (q->state & Qdropoverflow) {
1611                         spin_unlock_irqsave(&q->lock);
1612                         freeb(b);
1613                         dropcnt += n;
1614                         qunlock(&q->wlock);
1615                         poperror();
1616                         return n;
1617                 }
1618                 if (q->state & Qnonblock) {
1619                         spin_unlock_irqsave(&q->lock);
1620                         freeb(b);
1621                         set_errno(EAGAIN);
1622                         error("queue full");
1623                 }
1624         }
1625
1626         /* queue the block */
1627         should_free_b = FALSE;
1628         if (q->bfirst)
1629                 q->blast->next = b;
1630         else
1631                 q->bfirst = b;
1632         q->blast = b;
1633         b->next = 0;
1634         q->len += BALLOC(b);
1635         q->dlen += n;
1636         QDEBUG checkb(b, "qbwrite");
1637         b = NULL;
1638
1639         /* make sure other end gets awakened */
1640         if (q->state & Qstarve) {
1641                 q->state &= ~Qstarve;
1642                 dowakeup = 1;
1643         }
1644         spin_unlock_irqsave(&q->lock);
1645
1646         /*  get output going again */
1647         if (q->kick && (dowakeup || (q->state & Qkick)))
1648                 q->kick(q->arg);
1649
1650         /* wakeup anyone consuming at the other end */
1651         if (dowakeup) {
1652                 rendez_wakeup(&q->rr);
1653                 qwake_cb(q, FDTAP_FILT_READABLE);
1654         }
1655
1656         /*
1657          *  flow control, wait for queue to get below the limit
1658          *  before allowing the process to continue and queue
1659          *  more.  We do this here so that postnote can only
1660          *  interrupt us after the data has been queued.  This
1661          *  means that things like 9p flushes and ssl messages
1662          *  will not be disrupted by software interrupts.
1663          *
1664          *  Note - this is moderately dangerous since a process
1665          *  that keeps getting interrupted and rewriting will
1666          *  queue infinite crud.
1667          */
1668         for (;;) {
1669                 if ((q->state & (Qdropoverflow | Qnonblock)) || qnotfull(q))
1670                         break;
1671
1672                 spin_lock_irqsave(&q->lock);
1673                 q->state |= Qflow;
1674                 spin_unlock_irqsave(&q->lock);
1675                 rendez_sleep(&q->wr, qnotfull, q);
1676         }
1677
1678         qunlock(&q->wlock);
1679         poperror();
1680         return n;
1681 }
1682
1683 long qibwrite(struct queue *q, struct block *b)
1684 {
1685         int n, dowakeup;
1686
1687         dowakeup = 0;
1688
1689         n = BLEN(b);
1690
1691         spin_lock_irqsave(&q->lock);
1692
1693         QDEBUG checkb(b, "qibwrite");
1694         if (q->bfirst)
1695                 q->blast->next = b;
1696         else
1697                 q->bfirst = b;
1698         q->blast = b;
1699         q->len += BALLOC(b);
1700         q->dlen += n;
1701
1702         if (q->state & Qstarve) {
1703                 q->state &= ~Qstarve;
1704                 dowakeup = 1;
1705         }
1706
1707         spin_unlock_irqsave(&q->lock);
1708
1709         if (dowakeup) {
1710                 if (q->kick)
1711                         q->kick(q->arg);
1712                 rendez_wakeup(&q->rr);
1713                 qwake_cb(q, FDTAP_FILT_READABLE);
1714         }
1715
1716         return n;
1717 }
1718
1719 /*
1720  *  write to a queue.  only Maxatomic bytes at a time is atomic.
1721  */
1722 int qwrite(struct queue *q, void *vp, int len)
1723 {
1724         int n, sofar;
1725         struct block *b;
1726         uint8_t *p = vp;
1727         void *ext_buf;
1728
1729         QDEBUG if (!islo())
1730                  printd("qwrite hi %p\n", getcallerpc(&q));
1731
1732         sofar = 0;
1733         do {
1734                 n = len - sofar;
1735                 /* This is 64K, the max amount per single block.  Still a good value? */
1736                 if (n > Maxatomic)
1737                         n = Maxatomic;
1738
1739                 /* If n is small, we don't need to bother with the extra_data.  But
1740                  * until the whole stack can handle extd blocks, we'll use them
1741                  * unconditionally. */
1742 #ifdef CONFIG_BLOCK_EXTRAS
1743                 /* allocb builds in 128 bytes of header space to all blocks, but this is
1744                  * only available via padblock (to the left).  we also need some space
1745                  * for pullupblock for some basic headers (like icmp) that get written
1746                  * in directly */
1747                 b = allocb(64);
1748                 ext_buf = kmalloc(n, 0);
1749                 memcpy(ext_buf, p + sofar, n);
1750                 block_add_extd(b, 1, KMALLOC_WAIT); /* returns 0 on success */
1751                 b->extra_data[0].base = (uintptr_t)ext_buf;
1752                 b->extra_data[0].off = 0;
1753                 b->extra_data[0].len = n;
1754                 b->extra_len += n;
1755 #else
1756                 b = allocb(n);
1757                 memmove(b->wp, p + sofar, n);
1758                 b->wp += n;
1759 #endif
1760                         
1761                 qbwrite(q, b);
1762
1763                 sofar += n;
1764         } while (sofar < len && (q->state & Qmsg) == 0);
1765
1766         return len;
1767 }
1768
1769 /*
1770  *  used by print() to write to a queue.  Since we may be splhi or not in
1771  *  a process, don't qlock.
1772  */
1773 int qiwrite(struct queue *q, void *vp, int len)
1774 {
1775         int n, sofar, dowakeup;
1776         struct block *b;
1777         uint8_t *p = vp;
1778
1779         dowakeup = 0;
1780
1781         sofar = 0;
1782         do {
1783                 n = len - sofar;
1784                 if (n > Maxatomic)
1785                         n = Maxatomic;
1786
1787                 b = iallocb(n);
1788                 if (b == NULL)
1789                         break;
1790                 /* TODO consider extra_data */
1791                 memmove(b->wp, p + sofar, n);
1792                 /* this adjusts BLEN to be n, or at least it should */
1793                 b->wp += n;
1794                 assert(n == BLEN(b));
1795                 qibwrite(q, b);
1796
1797                 sofar += n;
1798         } while (sofar < len && (q->state & Qmsg) == 0);
1799
1800         return sofar;
1801 }
1802
1803 /*
1804  *  be extremely careful when calling this,
1805  *  as there is no reference accounting
1806  */
1807 void qfree(struct queue *q)
1808 {
1809         qclose(q);
1810         kfree(q);
1811 }
1812
1813 /*
1814  *  Mark a queue as closed.  No further IO is permitted.
1815  *  All blocks are released.
1816  */
1817 void qclose(struct queue *q)
1818 {
1819         struct block *bfirst;
1820
1821         if (q == NULL)
1822                 return;
1823
1824         /* mark it */
1825         spin_lock_irqsave(&q->lock);
1826         q->state |= Qclosed;
1827         q->state &= ~(Qflow | Qstarve | Qdropoverflow | Qnonblock);
1828         strncpy(q->err, Ehungup, sizeof(q->err));
1829         bfirst = q->bfirst;
1830         q->bfirst = 0;
1831         q->len = 0;
1832         q->dlen = 0;
1833         spin_unlock_irqsave(&q->lock);
1834
1835         /* free queued blocks */
1836         freeblist(bfirst);
1837
1838         /* wake up readers/writers */
1839         rendez_wakeup(&q->rr);
1840         rendez_wakeup(&q->wr);
1841         qwake_cb(q, FDTAP_FILT_HANGUP);
1842 }
1843
1844 /*
1845  *  Mark a queue as closed.  Wakeup any readers.  Don't remove queued
1846  *  blocks.
1847  */
1848 void qhangup(struct queue *q, char *msg)
1849 {
1850         /* mark it */
1851         spin_lock_irqsave(&q->lock);
1852         q->state |= Qclosed;
1853         if (msg == 0 || *msg == 0)
1854                 strncpy(q->err, Ehungup, sizeof(q->err));
1855         else
1856                 strncpy(q->err, msg, ERRMAX - 1);
1857         spin_unlock_irqsave(&q->lock);
1858
1859         /* wake up readers/writers */
1860         rendez_wakeup(&q->rr);
1861         rendez_wakeup(&q->wr);
1862         qwake_cb(q, FDTAP_FILT_HANGUP);
1863 }
1864
1865 /*
1866  *  return non-zero if the q is hungup
1867  */
1868 int qisclosed(struct queue *q)
1869 {
1870         return q->state & Qclosed;
1871 }
1872
1873 /*
1874  *  mark a queue as no longer hung up.  resets the wake_cb.
1875  */
1876 void qreopen(struct queue *q)
1877 {
1878         spin_lock_irqsave(&q->lock);
1879         q->state &= ~Qclosed;
1880         q->state |= Qstarve;
1881         q->eof = 0;
1882         q->limit = q->inilim;
1883         q->wake_cb = 0;
1884         q->wake_data = 0;
1885         spin_unlock_irqsave(&q->lock);
1886 }
1887
1888 /*
1889  *  return bytes queued
1890  */
1891 int qlen(struct queue *q)
1892 {
1893         return q->dlen;
1894 }
1895
1896 /*
1897  * return space remaining before flow control
1898  */
1899 int qwindow(struct queue *q)
1900 {
1901         int l;
1902
1903         l = q->limit - q->len;
1904         if (l < 0)
1905                 l = 0;
1906         return l;
1907 }
1908
1909 /*
1910  *  return true if we can read without blocking
1911  */
1912 int qcanread(struct queue *q)
1913 {
1914         return q->bfirst != 0;
1915 }
1916
1917 /*
1918  *  change queue limit
1919  */
1920 void qsetlimit(struct queue *q, int limit)
1921 {
1922         q->limit = limit;
1923 }
1924
1925 /*
1926  *  set whether writes drop overflowing blocks, or if we sleep
1927  */
1928 void qdropoverflow(struct queue *q, bool onoff)
1929 {
1930         if (onoff)
1931                 q->state |= Qdropoverflow;
1932         else
1933                 q->state &= ~Qdropoverflow;
1934 }
1935
1936 /* set whether or not the queue is nonblocking, in the EAGAIN sense. */
1937 void qnonblock(struct queue *q, bool onoff)
1938 {
1939         if (onoff)
1940                 q->state |= Qnonblock;
1941         else
1942                 q->state &= ~Qnonblock;
1943 }
1944
1945 /*
1946  *  flush the output queue
1947  */
1948 void qflush(struct queue *q)
1949 {
1950         struct block *bfirst;
1951
1952         /* mark it */
1953         spin_lock_irqsave(&q->lock);
1954         bfirst = q->bfirst;
1955         q->bfirst = 0;
1956         q->len = 0;
1957         q->dlen = 0;
1958         spin_unlock_irqsave(&q->lock);
1959
1960         /* free queued blocks */
1961         freeblist(bfirst);
1962
1963         /* wake up writers */
1964         rendez_wakeup(&q->wr);
1965         qwake_cb(q, FDTAP_FILT_WRITABLE);
1966 }
1967
1968 int qfull(struct queue *q)
1969 {
1970         return q->state & Qflow;
1971 }
1972
1973 int qstate(struct queue *q)
1974 {
1975         return q->state;
1976 }
1977
1978 void qdump(struct queue *q)
1979 {
1980         if (q)
1981                 printk("q=%p bfirst=%p blast=%p len=%d dlen=%d limit=%d state=#%x\n",
1982                            q, q->bfirst, q->blast, q->len, q->dlen, q->limit, q->state);
1983 }
1984
1985 /* On certain wakeup events, qio will call func(q, data, filter), where filter
1986  * marks the type of wakeup event (flags from FDTAP).
1987  *
1988  * There's no sync protection.  If you change the CB while the qio is running,
1989  * you might get a CB with the data or func from a previous set_wake_cb.  You
1990  * should set this once per queue and forget it.
1991  *
1992  * You can remove the CB by passing in 0 for the func.  Alternatively, you can
1993  * just make sure that the func(data) pair are valid until the queue is freed or
1994  * reopened. */
1995 void qio_set_wake_cb(struct queue *q, qio_wake_cb_t func, void *data)
1996 {
1997         q->wake_data = data;
1998         wmb();  /* if we see func, we'll also see the data for it */
1999         q->wake_cb = func;
2000 }