Use block_add_extd retval to detect success or error
[akaros.git] / kern / src / ns / qio.c
1 // INFERNO
2 #include <vfs.h>
3 #include <kfs.h>
4 #include <slab.h>
5 #include <kmalloc.h>
6 #include <kref.h>
7 #include <string.h>
8 #include <stdio.h>
9 #include <assert.h>
10 #include <error.h>
11 #include <cpio.h>
12 #include <pmap.h>
13 #include <smp.h>
14 #include <ip.h>
15
16 #define PANIC_EXTRA(b)                                                  \
17 {                                                                       \
18         if ((b)->extra_len) {                                           \
19                 printblock(b);                                          \
20                 backtrace();                                            \
21                 panic("%s doesn't handle extra_data", __FUNCTION__);    \
22         }                                                               \
23 }
24
25 static uint32_t padblockcnt;
26 static uint32_t concatblockcnt;
27 static uint32_t pullupblockcnt;
28 static uint32_t copyblockcnt;
29 static uint32_t consumecnt;
30 static uint32_t producecnt;
31 static uint32_t qcopycnt;
32
33 static int debugging;
34
35 #define QDEBUG  if(0)
36
37 /*
38  *  IO queues
39  */
40
41 struct queue {
42         spinlock_t lock;;
43
44         struct block *bfirst;           /* buffer */
45         struct block *blast;
46
47         int len;                                        /* bytes allocated to queue */
48         int dlen;                                       /* data bytes in queue */
49         int limit;                                      /* max bytes in queue */
50         int inilim;                             /* initial limit */
51         int state;
52         int eof;                                        /* number of eofs read by user */
53
54         void (*kick) (void *);          /* restart output */
55         void (*bypass) (void *, struct block *);        /* bypass queue altogether */
56         void *arg;                                      /* argument to kick */
57
58         qlock_t rlock;                          /* mutex for reading processes */
59         struct rendez rr;                       /* process waiting to read */
60         qlock_t wlock;                          /* mutex for writing processes */
61         struct rendez wr;                       /* process waiting to write */
62         qio_wake_cb_t wake_cb;          /* callbacks for qio wakeups */
63         void *wake_data;
64
65         char err[ERRMAX];
66 };
67
68 enum {
69         Maxatomic = 64 * 1024,
70 };
71
72 unsigned int qiomaxatomic = Maxatomic;
73
74 /* Helper: fires a wake callback, sending 'filter' */
75 static void qwake_cb(struct queue *q, int filter)
76 {
77         if (q->wake_cb)
78                 q->wake_cb(q, q->wake_data, filter);
79 }
80
81 void ixsummary(void)
82 {
83         debugging ^= 1;
84         iallocsummary();
85         printd("pad %lu, concat %lu, pullup %lu, copy %lu\n",
86                    padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
87         printd("consume %lu, produce %lu, qcopy %lu\n",
88                    consumecnt, producecnt, qcopycnt);
89 }
90
91 /*
92  *  free a list of blocks
93  */
94 void freeblist(struct block *b)
95 {
96         struct block *next;
97
98         for (; b != 0; b = next) {
99                 next = b->next;
100                 b->next = 0;
101                 freeb(b);
102         }
103 }
104
105 /*
106  *  pad a block to the front (or the back if size is negative)
107  */
108 struct block *padblock(struct block *bp, int size)
109 {
110         int n;
111         struct block *nbp;
112         uint8_t bcksum = bp->flag & BCKSUM_FLAGS;
113         uint16_t checksum_start = bp->checksum_start;
114         uint16_t checksum_offset = bp->checksum_offset;
115         uint16_t mss = bp->mss;
116
117         QDEBUG checkb(bp, "padblock 1");
118         if (size >= 0) {
119                 if (bp->rp - bp->base >= size) {
120                         bp->checksum_start += size;
121                         bp->rp -= size;
122                         return bp;
123                 }
124
125                 PANIC_EXTRA(bp);
126                 if (bp->next)
127                         panic("padblock %p", getcallerpc(&bp));
128                 n = BLEN(bp);
129                 padblockcnt++;
130                 nbp = allocb(size + n);
131                 nbp->rp += size;
132                 nbp->wp = nbp->rp;
133                 memmove(nbp->wp, bp->rp, n);
134                 nbp->wp += n;
135                 freeb(bp);
136                 nbp->rp -= size;
137         } else {
138                 size = -size;
139
140                 PANIC_EXTRA(bp);
141
142                 if (bp->next)
143                         panic("padblock %p", getcallerpc(&bp));
144
145                 if (bp->lim - bp->wp >= size)
146                         return bp;
147
148                 n = BLEN(bp);
149                 padblockcnt++;
150                 nbp = allocb(size + n);
151                 memmove(nbp->wp, bp->rp, n);
152                 nbp->wp += n;
153                 freeb(bp);
154         }
155         if (bcksum) {
156                 nbp->flag |= bcksum;
157                 nbp->checksum_start = checksum_start;
158                 nbp->checksum_offset = checksum_offset;
159                 nbp->mss = mss;
160         }
161         QDEBUG checkb(nbp, "padblock 1");
162         return nbp;
163 }
164
165 /*
166  *  return count of bytes in a string of blocks
167  */
168 int blocklen(struct block *bp)
169 {
170         int len;
171
172         len = 0;
173         while (bp) {
174                 len += BLEN(bp);
175                 bp = bp->next;
176         }
177         return len;
178 }
179
180 /*
181  * return count of space in blocks
182  */
183 int blockalloclen(struct block *bp)
184 {
185         int len;
186
187         len = 0;
188         while (bp) {
189                 len += BALLOC(bp);
190                 bp = bp->next;
191         }
192         return len;
193 }
194
195 /*
196  *  copy the  string of blocks into
197  *  a single block and free the string
198  */
199 struct block *concatblock(struct block *bp)
200 {
201         int len;
202         struct block *nb, *f;
203
204         if (bp->next == 0)
205                 return bp;
206
207         /* probably use parts of qclone */
208         PANIC_EXTRA(bp);
209         nb = allocb(blocklen(bp));
210         for (f = bp; f; f = f->next) {
211                 len = BLEN(f);
212                 memmove(nb->wp, f->rp, len);
213                 nb->wp += len;
214         }
215         concatblockcnt += BLEN(nb);
216         freeblist(bp);
217         QDEBUG checkb(nb, "concatblock 1");
218         return nb;
219 }
220
221 /* Returns a block with the remaining contents of b all in the main body of the
222  * returned block.  Replace old references to b with the returned value (which
223  * may still be 'b', if no change was needed. */
224 struct block *linearizeblock(struct block *b)
225 {
226         struct block *newb;
227         size_t len;
228         struct extra_bdata *ebd;
229
230         if (!b->extra_len)
231                 return b;
232
233         newb = allocb(BLEN(b));
234         len = BHLEN(b);
235         memcpy(newb->wp, b->rp, len);
236         newb->wp += len;
237         len = b->extra_len;
238         for (int i = 0; (i < b->nr_extra_bufs) && len; i++) {
239                 ebd = &b->extra_data[i];
240                 if (!ebd->base || !ebd->len)
241                         continue;
242                 memcpy(newb->wp, (void*)(ebd->base + ebd->off), ebd->len);
243                 newb->wp += ebd->len;
244                 len -= ebd->len;
245         }
246         /* TODO: any other flags that need copied over? */
247         if (b->flag & BCKSUM_FLAGS) {
248                 newb->flag |= (b->flag & BCKSUM_FLAGS);
249                 newb->checksum_start = b->checksum_start;
250                 newb->checksum_offset = b->checksum_offset;
251                 newb->mss = b->mss;
252         }
253         freeb(b);
254         return newb;
255 }
256
257 /*
258  *  make sure the first block has at least n bytes in its main body
259  */
260 struct block *pullupblock(struct block *bp, int n)
261 {
262         int i, len, seglen;
263         struct block *nbp;
264         struct extra_bdata *ebd;
265
266         /*
267          *  this should almost always be true, it's
268          *  just to avoid every caller checking.
269          */
270         if (BHLEN(bp) >= n)
271                 return bp;
272
273          /* a start at explicit main-body / header management */
274         if (bp->extra_len) {
275                 if (n > bp->lim - bp->rp) {
276                         /* would need to realloc a new block and copy everything over. */
277                         panic("can't pullup %d bytes, no place to put it: bp->lim %p, bp->rp %p, bp->lim-bp->rp %d\n",
278                                         n, bp->lim, bp->rp, bp->lim-bp->rp);
279                 }
280                 len = n - BHLEN(bp);
281                 if (len > bp->extra_len)
282                         panic("pullup more than extra (%d, %d, %d)\n",
283                               n, BHLEN(bp), bp->extra_len);
284                 checkb(bp, "before pullup");
285                 for (int i = 0; (i < bp->nr_extra_bufs) && len; i++) {
286                         ebd = &bp->extra_data[i];
287                         if (!ebd->base || !ebd->len)
288                                 continue;
289                         seglen = MIN(ebd->len, len);
290                         memcpy(bp->wp, (void*)(ebd->base + ebd->off), seglen);
291                         bp->wp += seglen;
292                         len -= seglen;
293                         ebd->len -= seglen;
294                         ebd->off += seglen;
295                         bp->extra_len -= seglen;
296                         if (ebd->len == 0) {
297                                 kfree((void *)ebd->base);
298                                 ebd->off = 0;
299                                 ebd->base = 0;
300                         }
301                 }
302                 /* maybe just call pullupblock recursively here */
303                 if (len)
304                         panic("pullup %d bytes overdrawn\n", len);
305                 checkb(bp, "after pullup");
306                 return bp;
307         }
308
309         /*
310          *  if not enough room in the first block,
311          *  add another to the front of the list.
312          */
313         if (bp->lim - bp->rp < n) {
314                 nbp = allocb(n);
315                 nbp->next = bp;
316                 bp = nbp;
317         }
318
319         /*
320          *  copy bytes from the trailing blocks into the first
321          */
322         n -= BLEN(bp);
323         while ((nbp = bp->next)) {
324                 i = BLEN(nbp);
325                 if (i > n) {
326                         memmove(bp->wp, nbp->rp, n);
327                         pullupblockcnt++;
328                         bp->wp += n;
329                         nbp->rp += n;
330                         QDEBUG checkb(bp, "pullupblock 1");
331                         return bp;
332                 } else {
333                         memmove(bp->wp, nbp->rp, i);
334                         pullupblockcnt++;
335                         bp->wp += i;
336                         bp->next = nbp->next;
337                         nbp->next = 0;
338                         freeb(nbp);
339                         n -= i;
340                         if (n == 0) {
341                                 QDEBUG checkb(bp, "pullupblock 2");
342                                 return bp;
343                         }
344                 }
345         }
346         freeb(bp);
347         return 0;
348 }
349
350 /*
351  *  make sure the first block has at least n bytes in its main body
352  */
353 struct block *pullupqueue(struct queue *q, int n)
354 {
355         struct block *b;
356
357         /* TODO: lock to protect the queue links? */
358         if ((BHLEN(q->bfirst) >= n))
359                 return q->bfirst;
360         q->bfirst = pullupblock(q->bfirst, n);
361         for (b = q->bfirst; b != NULL && b->next != NULL; b = b->next) ;
362         q->blast = b;
363         return q->bfirst;
364 }
365
366 /* throw away count bytes from the front of
367  * block's extradata.  Returns count of bytes
368  * thrown away
369  */
370
371 static int pullext(struct block *bp, int count)
372 {
373         struct extra_bdata *ed;
374         int i, rem, bytes = 0;
375
376         for (i = 0; bp->extra_len && count && i < bp->nr_extra_bufs; i++) {
377                 ed = &bp->extra_data[i];
378                 rem = MIN(count, ed->len);
379                 bp->extra_len -= rem;
380                 count -= rem;
381                 bytes += rem;
382                 ed->off += rem;
383                 ed->len -= rem;
384                 if (ed->len == 0) {
385                         kfree((void *)ed->base);
386                         ed->base = 0;
387                         ed->off = 0;
388                 }
389         }
390         return bytes;
391 }
392
393 /* throw away count bytes from the end of a
394  * block's extradata.  Returns count of bytes
395  * thrown away
396  */
397
398 static int dropext(struct block *bp, int count)
399 {
400         struct extra_bdata *ed;
401         int i, rem, bytes = 0;
402
403         for (i = bp->nr_extra_bufs - 1; bp->extra_len && count && i >= 0; i--) {
404                 ed = &bp->extra_data[i];
405                 rem = MIN(count, ed->len);
406                 bp->extra_len -= rem;
407                 count -= rem;
408                 bytes += rem;
409                 ed->len -= rem;
410                 if (ed->len == 0) {
411                         kfree((void *)ed->base);
412                         ed->base = 0;
413                         ed->off = 0;
414                 }
415         }
416         return bytes;
417 }
418
419 /*
420  *  throw away up to count bytes from a
421  *  list of blocks.  Return count of bytes
422  *  thrown away.
423  */
424 static int _pullblock(struct block **bph, int count, int free)
425 {
426         struct block *bp;
427         int n, bytes;
428
429         bytes = 0;
430         if (bph == NULL)
431                 return 0;
432
433         while (*bph != NULL && count != 0) {
434                 bp = *bph;
435
436                 n = MIN(BHLEN(bp), count);
437                 bytes += n;
438                 count -= n;
439                 bp->rp += n;
440                 n = pullext(bp, count);
441                 bytes += n;
442                 count -= n;
443                 QDEBUG checkb(bp, "pullblock ");
444                 if (BLEN(bp) == 0 && (free || count)) {
445                         *bph = bp->next;
446                         bp->next = NULL;
447                         freeb(bp);
448                 }
449         }
450         return bytes;
451 }
452
453 int pullblock(struct block **bph, int count)
454 {
455         return _pullblock(bph, count, 1);
456 }
457
458 /*
459  *  trim to len bytes starting at offset
460  */
461 struct block *trimblock(struct block *bp, int offset, int len)
462 {
463         uint32_t l, trim;
464         int olen = len;
465
466         QDEBUG checkb(bp, "trimblock 1");
467         if (blocklen(bp) < offset + len) {
468                 freeblist(bp);
469                 return NULL;
470         }
471
472         l =_pullblock(&bp, offset, 0);
473         if (bp == NULL)
474                 return NULL;
475         if (l != offset) {
476                 freeblist(bp);
477                 return NULL;
478         }
479
480         while ((l = BLEN(bp)) < len) {
481                 len -= l;
482                 bp = bp->next;
483         }
484
485         trim = BLEN(bp) - len;
486         trim -= dropext(bp, trim);
487         bp->wp -= trim;
488
489         if (bp->next) {
490                 freeblist(bp->next);
491                 bp->next = NULL;
492         }
493         return bp;
494 }
495
496 /*
497  *  copy 'count' bytes into a new block
498  */
499 struct block *copyblock(struct block *bp, int count)
500 {
501         int l;
502         struct block *nbp;
503
504         QDEBUG checkb(bp, "copyblock 0");
505         nbp = allocb(count);
506         if (bp->flag & BCKSUM_FLAGS) {
507                 nbp->flag |= (bp->flag & BCKSUM_FLAGS);
508                 nbp->checksum_start = bp->checksum_start;
509                 nbp->checksum_offset = bp->checksum_offset;
510                 nbp->mss = bp->mss;
511         }
512         PANIC_EXTRA(bp);
513         for (; count > 0 && bp != 0; bp = bp->next) {
514                 l = BLEN(bp);
515                 if (l > count)
516                         l = count;
517                 memmove(nbp->wp, bp->rp, l);
518                 nbp->wp += l;
519                 count -= l;
520         }
521         if (count > 0) {
522                 memset(nbp->wp, 0, count);
523                 nbp->wp += count;
524         }
525         copyblockcnt++;
526         QDEBUG checkb(nbp, "copyblock 1");
527
528         return nbp;
529 }
530
531 struct block *adjustblock(struct block *bp, int len)
532 {
533         int n;
534         struct block *nbp;
535
536         if (len < 0) {
537                 freeb(bp);
538                 return NULL;
539         }
540
541         PANIC_EXTRA(bp);
542         if (bp->rp + len > bp->lim) {
543                 nbp = copyblock(bp, len);
544                 freeblist(bp);
545                 QDEBUG checkb(nbp, "adjustblock 1");
546
547                 return nbp;
548         }
549
550         n = BLEN(bp);
551         if (len > n)
552                 memset(bp->wp, 0, len - n);
553         bp->wp = bp->rp + len;
554         QDEBUG checkb(bp, "adjustblock 2");
555
556         return bp;
557 }
558
559
560 /*
561  *  get next block from a queue, return null if nothing there
562  */
563 struct block *qget(struct queue *q)
564 {
565         int dowakeup;
566         struct block *b;
567
568         /* sync with qwrite */
569         spin_lock_irqsave(&q->lock);
570
571         b = q->bfirst;
572         if (b == NULL) {
573                 q->state |= Qstarve;
574                 spin_unlock_irqsave(&q->lock);
575                 return NULL;
576         }
577         q->bfirst = b->next;
578         b->next = 0;
579         q->len -= BALLOC(b);
580         q->dlen -= BLEN(b);
581         QDEBUG checkb(b, "qget");
582
583         /* if writer flow controlled, restart */
584         if ((q->state & Qflow) && q->len < q->limit / 2) {
585                 q->state &= ~Qflow;
586                 dowakeup = 1;
587         } else
588                 dowakeup = 0;
589
590         spin_unlock_irqsave(&q->lock);
591
592         if (dowakeup) {
593                 rendez_wakeup(&q->wr);
594                 /* We only send the writable event on wakeup, which is edge triggered */
595                 qwake_cb(q, FDTAP_FILT_WRITABLE);
596         }
597
598         return b;
599 }
600
601 /*
602  *  throw away the next 'len' bytes in the queue
603  * returning the number actually discarded
604  */
605 int qdiscard(struct queue *q, int len)
606 {
607         struct block *b;
608         int dowakeup, n, sofar, body_amt, extra_amt;
609         struct extra_bdata *ebd;
610
611         spin_lock_irqsave(&q->lock);
612         for (sofar = 0; sofar < len; sofar += n) {
613                 b = q->bfirst;
614                 if (b == NULL)
615                         break;
616                 QDEBUG checkb(b, "qdiscard");
617                 n = BLEN(b);
618                 if (n <= len - sofar) {
619                         q->bfirst = b->next;
620                         b->next = 0;
621                         q->len -= BALLOC(b);
622                         q->dlen -= BLEN(b);
623                         freeb(b);
624                 } else {
625                         n = len - sofar;
626                         q->dlen -= n;
627                         /* partial block removal */
628                         body_amt = MIN(BHLEN(b), n);
629                         b->rp += body_amt;
630                         extra_amt = n - body_amt;
631                         /* reduce q->len by the amount we remove from the extras.  The
632                          * header will always be accounted for above, during block removal.
633                          * */
634                         q->len -= extra_amt;
635                         for (int i = 0; (i < b->nr_extra_bufs) && extra_amt; i++) {
636                                 ebd = &b->extra_data[i];
637                                 if (!ebd->base || !ebd->len)
638                                         continue;
639                                 if (extra_amt >= ebd->len) {
640                                         /* remove the entire entry, note the kfree release */
641                                         b->extra_len -= ebd->len;
642                                         extra_amt -= ebd->len;
643                                         kfree((void*)ebd->base);
644                                         ebd->base = ebd->off = ebd->len = 0;
645                                         continue;
646                                 }
647                                 ebd->off += extra_amt;
648                                 ebd->len -= extra_amt;
649                                 b->extra_len -= extra_amt;
650                                 extra_amt = 0;
651                         }
652                 }
653         }
654
655         /*
656          *  if writer flow controlled, restart
657          *
658          *  This used to be
659          *  q->len < q->limit/2
660          *  but it slows down tcp too much for certain write sizes.
661          *  I really don't understand it completely.  It may be
662          *  due to the queue draining so fast that the transmission
663          *  stalls waiting for the app to produce more data.  - presotto
664          */
665         if ((q->state & Qflow) && q->len < q->limit) {
666                 q->state &= ~Qflow;
667                 dowakeup = 1;
668         } else
669                 dowakeup = 0;
670
671         spin_unlock_irqsave(&q->lock);
672
673         if (dowakeup) {
674                 rendez_wakeup(&q->wr);
675                 qwake_cb(q, FDTAP_FILT_WRITABLE);
676         }
677
678         return sofar;
679 }
680
681 /*
682  *  Interrupt level copy out of a queue, return # bytes copied.
683  */
684 int qconsume(struct queue *q, void *vp, int len)
685 {
686         struct block *b;
687         int n, dowakeup;
688         uint8_t *p = vp;
689         struct block *tofree = NULL;
690
691         /* sync with qwrite */
692         spin_lock_irqsave(&q->lock);
693
694         for (;;) {
695                 b = q->bfirst;
696                 if (b == 0) {
697                         q->state |= Qstarve;
698                         spin_unlock_irqsave(&q->lock);
699                         return -1;
700                 }
701                 QDEBUG checkb(b, "qconsume 1");
702
703                 n = BLEN(b);
704                 if (n > 0)
705                         break;
706                 q->bfirst = b->next;
707                 q->len -= BALLOC(b);
708
709                 /* remember to free this */
710                 b->next = tofree;
711                 tofree = b;
712         };
713
714         PANIC_EXTRA(b);
715         if (n < len)
716                 len = n;
717         memmove(p, b->rp, len);
718         consumecnt += n;
719         b->rp += len;
720         q->dlen -= len;
721
722         /* discard the block if we're done with it */
723         if ((q->state & Qmsg) || len == n) {
724                 q->bfirst = b->next;
725                 b->next = 0;
726                 q->len -= BALLOC(b);
727                 q->dlen -= BLEN(b);
728
729                 /* remember to free this */
730                 b->next = tofree;
731                 tofree = b;
732         }
733
734         /* if writer flow controlled, restart */
735         if ((q->state & Qflow) && q->len < q->limit / 2) {
736                 q->state &= ~Qflow;
737                 dowakeup = 1;
738         } else
739                 dowakeup = 0;
740
741         spin_unlock_irqsave(&q->lock);
742
743         if (dowakeup) {
744                 rendez_wakeup(&q->wr);
745                 qwake_cb(q, FDTAP_FILT_WRITABLE);
746         }
747
748         if (tofree != NULL)
749                 freeblist(tofree);
750
751         return len;
752 }
753
754 int qpass(struct queue *q, struct block *b)
755 {
756         int dlen, len, dowakeup;
757
758         /* sync with qread */
759         dowakeup = 0;
760         spin_lock_irqsave(&q->lock);
761         if (q->len >= q->limit) {
762                 freeblist(b);
763                 spin_unlock_irqsave(&q->lock);
764                 return -1;
765         }
766         if (q->state & Qclosed) {
767                 len = blocklen(b);
768                 freeblist(b);
769                 spin_unlock_irqsave(&q->lock);
770                 return len;
771         }
772
773         /* add buffer to queue */
774         if (q->bfirst)
775                 q->blast->next = b;
776         else
777                 q->bfirst = b;
778         len = BALLOC(b);
779         dlen = BLEN(b);
780         QDEBUG checkb(b, "qpass");
781         while (b->next) {
782                 b = b->next;
783                 QDEBUG checkb(b, "qpass");
784                 len += BALLOC(b);
785                 dlen += BLEN(b);
786         }
787         q->blast = b;
788         q->len += len;
789         q->dlen += dlen;
790
791         if (q->len >= q->limit / 2)
792                 q->state |= Qflow;
793
794         if (q->state & Qstarve) {
795                 q->state &= ~Qstarve;
796                 dowakeup = 1;
797         }
798         spin_unlock_irqsave(&q->lock);
799
800         if (dowakeup) {
801                 rendez_wakeup(&q->rr);
802                 qwake_cb(q, FDTAP_FILT_READABLE);
803         }
804
805         return len;
806 }
807
808 int qpassnolim(struct queue *q, struct block *b)
809 {
810         int dlen, len, dowakeup;
811
812         /* sync with qread */
813         dowakeup = 0;
814         spin_lock_irqsave(&q->lock);
815
816         if (q->state & Qclosed) {
817                 freeblist(b);
818                 spin_unlock_irqsave(&q->lock);
819                 return BALLOC(b);
820         }
821
822         /* add buffer to queue */
823         if (q->bfirst)
824                 q->blast->next = b;
825         else
826                 q->bfirst = b;
827         len = BALLOC(b);
828         dlen = BLEN(b);
829         QDEBUG checkb(b, "qpass");
830         while (b->next) {
831                 b = b->next;
832                 QDEBUG checkb(b, "qpass");
833                 len += BALLOC(b);
834                 dlen += BLEN(b);
835         }
836         q->blast = b;
837         q->len += len;
838         q->dlen += dlen;
839
840         if (q->len >= q->limit / 2)
841                 q->state |= Qflow;
842
843         if (q->state & Qstarve) {
844                 q->state &= ~Qstarve;
845                 dowakeup = 1;
846         }
847         spin_unlock_irqsave(&q->lock);
848
849         if (dowakeup) {
850                 rendez_wakeup(&q->rr);
851                 qwake_cb(q, FDTAP_FILT_READABLE);
852         }
853
854         return len;
855 }
856
857 /*
858  *  if the allocated space is way out of line with the used
859  *  space, reallocate to a smaller block
860  */
861 struct block *packblock(struct block *bp)
862 {
863         struct block **l, *nbp;
864         int n;
865
866         if (bp->extra_len)
867                 return bp;
868         for (l = &bp; *l; l = &(*l)->next) {
869                 nbp = *l;
870                 n = BLEN(nbp);
871                 if ((n << 2) < BALLOC(nbp)) {
872                         *l = allocb(n);
873                         memmove((*l)->wp, nbp->rp, n);
874                         (*l)->wp += n;
875                         (*l)->next = nbp->next;
876                         freeb(nbp);
877                 }
878         }
879
880         return bp;
881 }
882
883 int qproduce(struct queue *q, void *vp, int len)
884 {
885         struct block *b;
886         int dowakeup;
887         uint8_t *p = vp;
888
889         /* sync with qread */
890         dowakeup = 0;
891         spin_lock_irqsave(&q->lock);
892
893         /* no waiting receivers, room in buffer? */
894         if (q->len >= q->limit) {
895                 q->state |= Qflow;
896                 spin_unlock_irqsave(&q->lock);
897                 return -1;
898         }
899
900         /* save in buffer */
901         /* use Qcoalesce here to save storage */
902         // TODO: Consider removing the Qcoalesce flag and force a coalescing
903         // strategy by default.
904         b = q->blast;
905         if ((q->state & Qcoalesce) == 0 || q->bfirst == NULL
906                 || b->lim - b->wp < len) {
907                 /* need a new block */
908                 b = iallocb(len);
909                 if (b == 0) {
910                         spin_unlock_irqsave(&q->lock);
911                         return 0;
912                 }
913                 if (q->bfirst)
914                         q->blast->next = b;
915                 else
916                         q->bfirst = b;
917                 q->blast = b;
918                 /* b->next = 0; done by iallocb() */
919                 q->len += BALLOC(b);
920         }
921         PANIC_EXTRA(b);
922         memmove(b->wp, p, len);
923         producecnt += len;
924         b->wp += len;
925         q->dlen += len;
926         QDEBUG checkb(b, "qproduce");
927
928         if (q->state & Qstarve) {
929                 q->state &= ~Qstarve;
930                 dowakeup = 1;
931         }
932
933         if (q->len >= q->limit)
934                 q->state |= Qflow;
935         spin_unlock_irqsave(&q->lock);
936
937         if (dowakeup) {
938                 rendez_wakeup(&q->rr);
939                 qwake_cb(q, FDTAP_FILT_READABLE);
940         }
941
942         return len;
943 }
944
945 /* Add an extra_data entry to newb at newb_idx pointing to b's body, starting at
946  * body_rp, for up to len.  Returns the len consumed. 
947  *
948  * The base is 'b', so that we can kfree it later.  This currently ties us to
949  * using kfree for the release method for all extra_data.
950  *
951  * It is possible to have a body size that is 0, if there is no offset, and
952  * b->wp == b->rp.  This will have an extra data entry of 0 length. */
953 static size_t point_to_body(struct block *b, uint8_t *body_rp,
954                             struct block *newb, unsigned int newb_idx,
955                             size_t len)
956 {
957         struct extra_bdata *ebd = &newb->extra_data[newb_idx];
958
959         assert(newb_idx < newb->nr_extra_bufs);
960
961         kmalloc_incref(b);
962         ebd->base = (uintptr_t)b;
963         ebd->off = (uint32_t)(body_rp - (uint8_t*)b);
964         ebd->len = MIN(b->wp - body_rp, len);   /* think of body_rp as b->rp */
965         assert((int)ebd->len >= 0);
966         newb->extra_len += ebd->len;
967         return ebd->len;
968 }
969
970 /* Add an extra_data entry to newb at newb_idx pointing to b's b_idx'th
971  * extra_data buf, at b_off within that buffer, for up to len.  Returns the len
972  * consumed.
973  *
974  * We can have blocks with 0 length, but they are still refcnt'd.  See above. */
975 static size_t point_to_buf(struct block *b, unsigned int b_idx, uint32_t b_off,
976                            struct block *newb, unsigned int newb_idx,
977                            size_t len)
978 {
979         struct extra_bdata *n_ebd = &newb->extra_data[newb_idx];
980         struct extra_bdata *b_ebd = &b->extra_data[b_idx];
981
982         assert(b_idx < b->nr_extra_bufs);
983         assert(newb_idx < newb->nr_extra_bufs);
984
985         kmalloc_incref((void*)b_ebd->base);
986         n_ebd->base = b_ebd->base;
987         n_ebd->off = b_ebd->off + b_off;
988         n_ebd->len = MIN(b_ebd->len - b_off, len);
989         newb->extra_len += n_ebd->len;
990         return n_ebd->len;
991 }
992
993 /* given a string of blocks, fills the new block's extra_data  with the contents
994  * of the blist [offset, len + offset)
995  *
996  * returns 0 on success.  the only failure is if the extra_data array was too
997  * small, so this returns a positive integer saying how big the extra_data needs
998  * to be.
999  *
1000  * callers are responsible for protecting the list structure. */
1001 static int __blist_clone_to(struct block *blist, struct block *newb, int len,
1002                             uint32_t offset)
1003 {
1004         struct block *b, *first;
1005         unsigned int nr_bufs = 0;
1006         unsigned int b_idx, newb_idx = 0;
1007         uint8_t *first_main_body = 0;
1008
1009         /* find the first block; keep offset relative to the latest b in the list */
1010         for (b = blist; b; b = b->next) {
1011                 if (BLEN(b) > offset)
1012                         break;
1013                 offset -= BLEN(b);
1014         }
1015         /* qcopy semantics: if you asked for an offset outside the block list, you
1016          * get an empty block back */
1017         if (!b)
1018                 return 0;
1019         first = b;
1020         /* upper bound for how many buffers we'll need in newb */
1021         for (/* b is set*/; b; b = b->next) {
1022                 nr_bufs += 1 + b->nr_extra_bufs;        /* 1 for the main body */
1023         }
1024         /* we might be holding a spinlock here, so we won't wait for kmalloc */
1025         if (block_add_extd(newb, nr_bufs, 0) != 0) {
1026                 /* caller will need to alloc these, then re-call us */
1027                 return nr_bufs;
1028         }
1029         for (b = first; b && len; b = b->next) {
1030                 b_idx = 0;
1031                 if (offset) {
1032                         if (offset < BHLEN(b)) {
1033                                 /* off is in the main body */
1034                                 len -= point_to_body(b, b->rp + offset, newb, newb_idx, len);
1035                                 newb_idx++;
1036                         } else {
1037                                 /* off is in one of the buffers (or just past the last one).
1038                                  * we're not going to point to b's main body at all. */
1039                                 offset -= BHLEN(b);
1040                                 assert(b->extra_data);
1041                                 /* assuming these extrabufs are packed, or at least that len
1042                                  * isn't gibberish */
1043                                 while (b->extra_data[b_idx].len <= offset) {
1044                                         offset -= b->extra_data[b_idx].len;
1045                                         b_idx++;
1046                                 }
1047                                 /* now offset is set to our offset in the b_idx'th buf */
1048                                 len -= point_to_buf(b, b_idx, offset, newb, newb_idx, len);
1049                                 newb_idx++;
1050                                 b_idx++;
1051                         }
1052                         offset = 0;
1053                 } else {
1054                         len -= point_to_body(b, b->rp, newb, newb_idx, len);
1055                         newb_idx++;
1056                 }
1057                 /* knock out all remaining bufs.  we only did one point_to_ op by now,
1058                  * and any point_to_ could be our last if it consumed all of len. */
1059                 for (int i = b_idx; (i < b->nr_extra_bufs) && len; i++) {
1060                         len -= point_to_buf(b, i, 0, newb, newb_idx, len);
1061                         newb_idx++;
1062                 }
1063         }
1064         return 0;
1065 }
1066
1067 struct block *blist_clone(struct block *blist, int header_len, int len,
1068                           uint32_t offset)
1069 {
1070         int ret;
1071         struct block *newb = allocb(header_len);
1072         do {
1073                 ret = __blist_clone_to(blist, newb, len, offset);
1074                 if (ret)
1075                         block_add_extd(newb, ret, KMALLOC_WAIT);
1076         } while (ret);
1077         return newb;
1078 }
1079
1080 /* given a queue, makes a single block with header_len reserved space in the
1081  * block main body, and the contents of [offset, len + offset) pointed to in the
1082  * new blocks ext_data. */
1083 struct block *qclone(struct queue *q, int header_len, int len, uint32_t offset)
1084 {
1085         int ret;
1086         struct block *newb = allocb(header_len);
1087         /* the while loop should rarely be used: it would require someone
1088          * concurrently adding to the queue. */
1089         do {
1090                 /* TODO: RCU: protecting the q list (b->next) (need read lock) */
1091                 spin_lock_irqsave(&q->lock);
1092                 ret = __blist_clone_to(q->bfirst, newb, len, offset);
1093                 spin_unlock_irqsave(&q->lock);
1094                 if (ret)
1095                         block_add_extd(newb, ret, KMALLOC_WAIT);
1096         } while (ret);
1097         return newb;
1098 }
1099
1100 /*
1101  *  copy from offset in the queue
1102  */
1103 struct block *qcopy_old(struct queue *q, int len, uint32_t offset)
1104 {
1105         int sofar;
1106         int n;
1107         struct block *b, *nb;
1108         uint8_t *p;
1109
1110         nb = allocb(len);
1111
1112         spin_lock_irqsave(&q->lock);
1113
1114         /* go to offset */
1115         b = q->bfirst;
1116         for (sofar = 0;; sofar += n) {
1117                 if (b == NULL) {
1118                         spin_unlock_irqsave(&q->lock);
1119                         return nb;
1120                 }
1121                 n = BLEN(b);
1122                 if (sofar + n > offset) {
1123                         p = b->rp + offset - sofar;
1124                         n -= offset - sofar;
1125                         break;
1126                 }
1127                 QDEBUG checkb(b, "qcopy");
1128                 b = b->next;
1129         }
1130
1131         /* copy bytes from there */
1132         for (sofar = 0; sofar < len;) {
1133                 if (n > len - sofar)
1134                         n = len - sofar;
1135                 PANIC_EXTRA(b);
1136                 memmove(nb->wp, p, n);
1137                 qcopycnt += n;
1138                 sofar += n;
1139                 nb->wp += n;
1140                 b = b->next;
1141                 if (b == NULL)
1142                         break;
1143                 n = BLEN(b);
1144                 p = b->rp;
1145         }
1146         spin_unlock_irqsave(&q->lock);
1147
1148         return nb;
1149 }
1150
1151 struct block *qcopy(struct queue *q, int len, uint32_t offset)
1152 {
1153 #ifdef CONFIG_BLOCK_EXTRAS
1154         return qclone(q, 0, len, offset);
1155 #else
1156         return qcopy_old(q, len, offset);
1157 #endif
1158 }
1159
1160 static void qinit_common(struct queue *q)
1161 {
1162         spinlock_init_irqsave(&q->lock);
1163         qlock_init(&q->rlock);
1164         qlock_init(&q->wlock);
1165         rendez_init(&q->rr);
1166         rendez_init(&q->wr);
1167 }
1168
1169 /*
1170  *  called by non-interrupt code
1171  */
1172 struct queue *qopen(int limit, int msg, void (*kick) (void *), void *arg)
1173 {
1174         struct queue *q;
1175
1176         q = kzmalloc(sizeof(struct queue), 0);
1177         if (q == 0)
1178                 return 0;
1179         qinit_common(q);
1180
1181         q->limit = q->inilim = limit;
1182         q->kick = kick;
1183         q->arg = arg;
1184         q->state = msg;
1185         q->state |= Qstarve;
1186         q->eof = 0;
1187
1188         return q;
1189 }
1190
1191 /* open a queue to be bypassed */
1192 struct queue *qbypass(void (*bypass) (void *, struct block *), void *arg)
1193 {
1194         struct queue *q;
1195
1196         q = kzmalloc(sizeof(struct queue), 0);
1197         if (q == 0)
1198                 return 0;
1199         qinit_common(q);
1200
1201         q->limit = 0;
1202         q->arg = arg;
1203         q->bypass = bypass;
1204         q->state = 0;
1205
1206         return q;
1207 }
1208
1209 static int notempty(void *a)
1210 {
1211         struct queue *q = a;
1212
1213         return (q->state & Qclosed) || q->bfirst != 0;
1214 }
1215
1216 /* Wait for the queue to be non-empty or closed.  Returns TRUE for a successful
1217  * wait, FALSE on Qclose (without error)
1218  *
1219  * Called with q ilocked.  May error out, back through the caller, with
1220  * the irqsave lock unlocked.  */
1221 static bool qwait(struct queue *q)
1222 {
1223         /* wait for data */
1224         for (;;) {
1225                 if (q->bfirst != NULL)
1226                         break;
1227
1228                 if (q->state & Qclosed) {
1229                         if (++q->eof > 3) {
1230                                 spin_unlock_irqsave(&q->lock);
1231                                 error("multiple reads on a closed queue");
1232                         }
1233                         if (*q->err && strcmp(q->err, Ehungup) != 0) {
1234                                 spin_unlock_irqsave(&q->lock);
1235                                 error(q->err);
1236                         }
1237                         return FALSE;
1238                 }
1239                 /* We set Qstarve regardless of whether we are non-blocking or not.
1240                  * Qstarve tracks the edge detection of the queue being empty. */
1241                 q->state |= Qstarve;
1242                 if (q->state & Qnonblock) {
1243                         spin_unlock_irqsave(&q->lock);
1244                         set_errno(EAGAIN);
1245                         error("queue empty");
1246                 }
1247                 spin_unlock_irqsave(&q->lock);
1248                 /* may throw an error() */
1249                 rendez_sleep(&q->rr, notempty, q);
1250                 spin_lock_irqsave(&q->lock);
1251         }
1252         return TRUE;
1253 }
1254
1255 /*
1256  * add a block list to a queue
1257  */
1258 void qaddlist(struct queue *q, struct block *b)
1259 {
1260         /* TODO: q lock? */
1261         /* queue the block */
1262         if (q->bfirst)
1263                 q->blast->next = b;
1264         else
1265                 q->bfirst = b;
1266         q->len += blockalloclen(b);
1267         q->dlen += blocklen(b);
1268         while (b->next)
1269                 b = b->next;
1270         q->blast = b;
1271 }
1272
1273 /*
1274  *  called with q ilocked
1275  */
1276 struct block *qremove(struct queue *q)
1277 {
1278         struct block *b;
1279
1280         b = q->bfirst;
1281         if (b == NULL)
1282                 return NULL;
1283         q->bfirst = b->next;
1284         b->next = NULL;
1285         q->dlen -= BLEN(b);
1286         q->len -= BALLOC(b);
1287         QDEBUG checkb(b, "qremove");
1288         return b;
1289 }
1290
1291 static size_t read_from_block(struct block *b, uint8_t *to, size_t amt)
1292 {
1293         size_t copy_amt, retval = 0;
1294         struct extra_bdata *ebd;
1295         
1296         copy_amt = MIN(BHLEN(b), amt);
1297         memcpy(to, b->rp, copy_amt);
1298         /* advance the rp, since this block not be completely consumed and future
1299          * reads need to know where to pick up from */
1300         b->rp += copy_amt;
1301         to += copy_amt;
1302         amt -= copy_amt;
1303         retval += copy_amt;
1304         for (int i = 0; (i < b->nr_extra_bufs) && amt; i++) {
1305                 ebd = &b->extra_data[i];
1306                 /* skip empty entires.  if we track this in the struct block, we can
1307                  * just start the for loop early */
1308                 if (!ebd->base || !ebd->len)
1309                         continue;
1310                 copy_amt = MIN(ebd->len, amt);
1311                 memcpy(to, (void*)(ebd->base + ebd->off), copy_amt);
1312                 /* we're actually consuming the entries, just like how we advance rp up
1313                  * above, and might only consume part of one. */
1314                 ebd->len -= copy_amt;
1315                 ebd->off += copy_amt;
1316                 b->extra_len -= copy_amt;
1317                 if (!ebd->len) {
1318                         /* we don't actually have to decref here.  it's also done in
1319                          * freeb().  this is the earliest we can free. */
1320                         kfree((void*)ebd->base);
1321                         ebd->base = ebd->off = 0;
1322                 }
1323                 to += copy_amt;
1324                 amt -= copy_amt;
1325                 retval += copy_amt;
1326         }
1327         return retval;
1328 }
1329
1330 /*
1331  *  copy the contents of a string of blocks into
1332  *  memory.  emptied blocks are freed.  return
1333  *  pointer to first unconsumed block.
1334  */
1335 struct block *bl2mem(uint8_t * p, struct block *b, int n)
1336 {
1337         int i;
1338         struct block *next;
1339
1340         /* could be slicker here, since read_from_block is smart */
1341         for (; b != NULL; b = next) {
1342                 i = BLEN(b);
1343                 if (i > n) {
1344                         /* partial block, consume some */
1345                         read_from_block(b, p, n);
1346                         return b;
1347                 }
1348                 /* full block, consume all and move on */
1349                 i = read_from_block(b, p, i);
1350                 n -= i;
1351                 p += i;
1352                 next = b->next;
1353                 freeb(b);
1354         }
1355         return NULL;
1356 }
1357
1358 /*
1359  *  copy the contents of memory into a string of blocks.
1360  *  return NULL on error.
1361  */
1362 struct block *mem2bl(uint8_t * p, int len)
1363 {
1364         ERRSTACK(1);
1365         int n;
1366         struct block *b, *first, **l;
1367
1368         first = NULL;
1369         l = &first;
1370         if (waserror()) {
1371                 freeblist(first);
1372                 nexterror();
1373         }
1374         do {
1375                 n = len;
1376                 if (n > Maxatomic)
1377                         n = Maxatomic;
1378
1379                 *l = b = allocb(n);
1380                 /* TODO consider extra_data */
1381                 memmove(b->wp, p, n);
1382                 b->wp += n;
1383                 p += n;
1384                 len -= n;
1385                 l = &b->next;
1386         } while (len > 0);
1387         poperror();
1388
1389         return first;
1390 }
1391
1392 /*
1393  *  put a block back to the front of the queue
1394  *  called with q ilocked
1395  */
1396 void qputback(struct queue *q, struct block *b)
1397 {
1398         b->next = q->bfirst;
1399         if (q->bfirst == NULL)
1400                 q->blast = b;
1401         q->bfirst = b;
1402         q->len += BALLOC(b);
1403         q->dlen += BLEN(b);
1404 }
1405
1406 /*
1407  *  flow control, get producer going again
1408  *  called with q ilocked
1409  */
1410 static void qwakeup_iunlock(struct queue *q)
1411 {
1412         int dowakeup = 0;
1413
1414         /* if writer flow controlled, restart */
1415         if ((q->state & Qflow) && q->len < q->limit / 2) {
1416                 q->state &= ~Qflow;
1417                 dowakeup = 1;
1418         }
1419
1420         spin_unlock_irqsave(&q->lock);
1421
1422         /* wakeup flow controlled writers */
1423         if (dowakeup) {
1424                 if (q->kick)
1425                         q->kick(q->arg);
1426                 rendez_wakeup(&q->wr);
1427                 qwake_cb(q, FDTAP_FILT_WRITABLE);
1428         }
1429 }
1430
1431 /*
1432  *  get next block from a queue (up to a limit)
1433  */
1434 struct block *qbread(struct queue *q, int len)
1435 {
1436         ERRSTACK(1);
1437         struct block *b, *nb;
1438         int n;
1439
1440         qlock(&q->rlock);
1441         if (waserror()) {
1442                 qunlock(&q->rlock);
1443                 nexterror();
1444         }
1445
1446         spin_lock_irqsave(&q->lock);
1447         if (!qwait(q)) {
1448                 /* queue closed */
1449                 spin_unlock_irqsave(&q->lock);
1450                 qunlock(&q->rlock);
1451                 poperror();
1452                 return NULL;
1453         }
1454
1455         /* if we get here, there's at least one block in the queue */
1456         b = qremove(q);
1457         n = BLEN(b);
1458
1459         /* split block if it's too big and this is not a message queue */
1460         nb = b;
1461         if (n > len) {
1462                 PANIC_EXTRA(b);
1463                 if ((q->state & Qmsg) == 0) {
1464                         n -= len;
1465                         b = allocb(n);
1466                         memmove(b->wp, nb->rp + len, n);
1467                         b->wp += n;
1468                         qputback(q, b);
1469                 }
1470                 nb->wp = nb->rp + len;
1471         }
1472
1473         /* restart producer */
1474         qwakeup_iunlock(q);
1475
1476         poperror();
1477         qunlock(&q->rlock);
1478         return nb;
1479 }
1480
1481 /*
1482  *  read a queue.  if no data is queued, post a struct block
1483  *  and wait on its Rendez.
1484  */
1485 long qread(struct queue *q, void *vp, int len)
1486 {
1487         ERRSTACK(1);
1488         struct block *b, *first, **l;
1489         int m, n;
1490
1491         qlock(&q->rlock);
1492         if (waserror()) {
1493                 qunlock(&q->rlock);
1494                 nexterror();
1495         }
1496
1497         spin_lock_irqsave(&q->lock);
1498 again:
1499         if (!qwait(q)) {
1500                 /* queue closed */
1501                 spin_unlock_irqsave(&q->lock);
1502                 qunlock(&q->rlock);
1503                 poperror();
1504                 return 0;
1505         }
1506
1507         /* if we get here, there's at least one block in the queue */
1508         // TODO: Consider removing the Qcoalesce flag and force a coalescing
1509         // strategy by default.
1510         if (q->state & Qcoalesce) {
1511                 /* when coalescing, 0 length blocks just go away */
1512                 b = q->bfirst;
1513                 if (BLEN(b) <= 0) {
1514                         freeb(qremove(q));
1515                         goto again;
1516                 }
1517
1518                 /*  grab the first block plus as many
1519                  *  following blocks as will completely
1520                  *  fit in the read.
1521                  */
1522                 n = 0;
1523                 l = &first;
1524                 m = BLEN(b);
1525                 for (;;) {
1526                         *l = qremove(q);
1527                         l = &b->next;
1528                         n += m;
1529
1530                         b = q->bfirst;
1531                         if (b == NULL)
1532                                 break;
1533                         m = BLEN(b);
1534                         if (n + m > len)
1535                                 break;
1536                 }
1537         } else {
1538                 first = qremove(q);
1539                 n = BLEN(first);
1540         }
1541
1542         /* copy to user space outside of the ilock */
1543         spin_unlock_irqsave(&q->lock);
1544         b = bl2mem(vp, first, len);
1545         spin_lock_irqsave(&q->lock);
1546
1547         /* take care of any left over partial block */
1548         if (b != NULL) {
1549                 n -= BLEN(b);
1550                 if (q->state & Qmsg)
1551                         freeb(b);
1552                 else
1553                         qputback(q, b);
1554         }
1555
1556         /* restart producer */
1557         qwakeup_iunlock(q);
1558
1559         poperror();
1560         qunlock(&q->rlock);
1561         return n;
1562 }
1563
1564 static int qnotfull(void *a)
1565 {
1566         struct queue *q = a;
1567
1568         return q->len < q->limit || (q->state & Qclosed);
1569 }
1570
1571 uint32_t dropcnt;
1572
1573 /*
1574  *  add a block to a queue obeying flow control
1575  */
1576 long qbwrite(struct queue *q, struct block *b)
1577 {
1578         ERRSTACK(1);
1579         int n, dowakeup;
1580         volatile bool should_free_b = TRUE;
1581
1582         n = BLEN(b);
1583
1584         if (q->bypass) {
1585                 (*q->bypass) (q->arg, b);
1586                 return n;
1587         }
1588
1589         dowakeup = 0;
1590         qlock(&q->wlock);
1591         if (waserror()) {
1592                 if (b != NULL && should_free_b)
1593                         freeb(b);
1594                 qunlock(&q->wlock);
1595                 nexterror();
1596         }
1597
1598         spin_lock_irqsave(&q->lock);
1599
1600         /* give up if the queue is closed */
1601         if (q->state & Qclosed) {
1602                 spin_unlock_irqsave(&q->lock);
1603                 error(q->err);
1604         }
1605
1606         /* if nonblocking, don't queue over the limit */
1607         if (q->len >= q->limit) {
1608                 /* drop overflow takes priority over regular non-blocking */
1609                 if (q->state & Qdropoverflow) {
1610                         spin_unlock_irqsave(&q->lock);
1611                         freeb(b);
1612                         dropcnt += n;
1613                         qunlock(&q->wlock);
1614                         poperror();
1615                         return n;
1616                 }
1617                 if (q->state & Qnonblock) {
1618                         spin_unlock_irqsave(&q->lock);
1619                         freeb(b);
1620                         set_errno(EAGAIN);
1621                         error("queue full");
1622                 }
1623         }
1624
1625         /* queue the block */
1626         should_free_b = FALSE;
1627         if (q->bfirst)
1628                 q->blast->next = b;
1629         else
1630                 q->bfirst = b;
1631         q->blast = b;
1632         b->next = 0;
1633         q->len += BALLOC(b);
1634         q->dlen += n;
1635         QDEBUG checkb(b, "qbwrite");
1636         b = NULL;
1637
1638         /* make sure other end gets awakened */
1639         if (q->state & Qstarve) {
1640                 q->state &= ~Qstarve;
1641                 dowakeup = 1;
1642         }
1643         spin_unlock_irqsave(&q->lock);
1644
1645         /*  get output going again */
1646         if (q->kick && (dowakeup || (q->state & Qkick)))
1647                 q->kick(q->arg);
1648
1649         /* wakeup anyone consuming at the other end */
1650         if (dowakeup) {
1651                 rendez_wakeup(&q->rr);
1652                 qwake_cb(q, FDTAP_FILT_READABLE);
1653         }
1654
1655         /*
1656          *  flow control, wait for queue to get below the limit
1657          *  before allowing the process to continue and queue
1658          *  more.  We do this here so that postnote can only
1659          *  interrupt us after the data has been queued.  This
1660          *  means that things like 9p flushes and ssl messages
1661          *  will not be disrupted by software interrupts.
1662          *
1663          *  Note - this is moderately dangerous since a process
1664          *  that keeps getting interrupted and rewriting will
1665          *  queue infinite crud.
1666          */
1667         for (;;) {
1668                 if ((q->state & (Qdropoverflow | Qnonblock)) || qnotfull(q))
1669                         break;
1670
1671                 spin_lock_irqsave(&q->lock);
1672                 q->state |= Qflow;
1673                 spin_unlock_irqsave(&q->lock);
1674                 rendez_sleep(&q->wr, qnotfull, q);
1675         }
1676
1677         qunlock(&q->wlock);
1678         poperror();
1679         return n;
1680 }
1681
1682 long qibwrite(struct queue *q, struct block *b)
1683 {
1684         int n, dowakeup;
1685
1686         dowakeup = 0;
1687
1688         n = BLEN(b);
1689
1690         spin_lock_irqsave(&q->lock);
1691
1692         QDEBUG checkb(b, "qibwrite");
1693         if (q->bfirst)
1694                 q->blast->next = b;
1695         else
1696                 q->bfirst = b;
1697         q->blast = b;
1698         q->len += BALLOC(b);
1699         q->dlen += n;
1700
1701         if (q->state & Qstarve) {
1702                 q->state &= ~Qstarve;
1703                 dowakeup = 1;
1704         }
1705
1706         spin_unlock_irqsave(&q->lock);
1707
1708         if (dowakeup) {
1709                 if (q->kick)
1710                         q->kick(q->arg);
1711                 rendez_wakeup(&q->rr);
1712                 qwake_cb(q, FDTAP_FILT_READABLE);
1713         }
1714
1715         return n;
1716 }
1717
1718 /*
1719  *  write to a queue.  only Maxatomic bytes at a time is atomic.
1720  */
1721 int qwrite(struct queue *q, void *vp, int len)
1722 {
1723         int n, sofar;
1724         struct block *b;
1725         uint8_t *p = vp;
1726         void *ext_buf;
1727
1728         QDEBUG if (!islo())
1729                  printd("qwrite hi %p\n", getcallerpc(&q));
1730
1731         sofar = 0;
1732         do {
1733                 n = len - sofar;
1734                 /* This is 64K, the max amount per single block.  Still a good value? */
1735                 if (n > Maxatomic)
1736                         n = Maxatomic;
1737
1738                 /* If n is small, we don't need to bother with the extra_data.  But
1739                  * until the whole stack can handle extd blocks, we'll use them
1740                  * unconditionally. */
1741 #ifdef CONFIG_BLOCK_EXTRAS
1742                 /* allocb builds in 128 bytes of header space to all blocks, but this is
1743                  * only available via padblock (to the left).  we also need some space
1744                  * for pullupblock for some basic headers (like icmp) that get written
1745                  * in directly */
1746                 b = allocb(64);
1747                 ext_buf = kmalloc(n, 0);
1748                 memcpy(ext_buf, p + sofar, n);
1749                 block_add_extd(b, 1, KMALLOC_WAIT); /* returns 0 on success */
1750                 b->extra_data[0].base = (uintptr_t)ext_buf;
1751                 b->extra_data[0].off = 0;
1752                 b->extra_data[0].len = n;
1753                 b->extra_len += n;
1754 #else
1755                 b = allocb(n);
1756                 memmove(b->wp, p + sofar, n);
1757                 b->wp += n;
1758 #endif
1759                         
1760                 qbwrite(q, b);
1761
1762                 sofar += n;
1763         } while (sofar < len && (q->state & Qmsg) == 0);
1764
1765         return len;
1766 }
1767
1768 /*
1769  *  used by print() to write to a queue.  Since we may be splhi or not in
1770  *  a process, don't qlock.
1771  */
1772 int qiwrite(struct queue *q, void *vp, int len)
1773 {
1774         int n, sofar, dowakeup;
1775         struct block *b;
1776         uint8_t *p = vp;
1777
1778         dowakeup = 0;
1779
1780         sofar = 0;
1781         do {
1782                 n = len - sofar;
1783                 if (n > Maxatomic)
1784                         n = Maxatomic;
1785
1786                 b = iallocb(n);
1787                 if (b == NULL)
1788                         break;
1789                 /* TODO consider extra_data */
1790                 memmove(b->wp, p + sofar, n);
1791                 /* this adjusts BLEN to be n, or at least it should */
1792                 b->wp += n;
1793                 assert(n == BLEN(b));
1794                 qibwrite(q, b);
1795
1796                 sofar += n;
1797         } while (sofar < len && (q->state & Qmsg) == 0);
1798
1799         return sofar;
1800 }
1801
1802 /*
1803  *  be extremely careful when calling this,
1804  *  as there is no reference accounting
1805  */
1806 void qfree(struct queue *q)
1807 {
1808         qclose(q);
1809         kfree(q);
1810 }
1811
1812 /*
1813  *  Mark a queue as closed.  No further IO is permitted.
1814  *  All blocks are released.
1815  */
1816 void qclose(struct queue *q)
1817 {
1818         struct block *bfirst;
1819
1820         if (q == NULL)
1821                 return;
1822
1823         /* mark it */
1824         spin_lock_irqsave(&q->lock);
1825         q->state |= Qclosed;
1826         q->state &= ~(Qflow | Qstarve | Qdropoverflow | Qnonblock);
1827         strncpy(q->err, Ehungup, sizeof(q->err));
1828         bfirst = q->bfirst;
1829         q->bfirst = 0;
1830         q->len = 0;
1831         q->dlen = 0;
1832         spin_unlock_irqsave(&q->lock);
1833
1834         /* free queued blocks */
1835         freeblist(bfirst);
1836
1837         /* wake up readers/writers */
1838         rendez_wakeup(&q->rr);
1839         rendez_wakeup(&q->wr);
1840         qwake_cb(q, FDTAP_FILT_HANGUP);
1841 }
1842
1843 /*
1844  *  Mark a queue as closed.  Wakeup any readers.  Don't remove queued
1845  *  blocks.
1846  */
1847 void qhangup(struct queue *q, char *msg)
1848 {
1849         /* mark it */
1850         spin_lock_irqsave(&q->lock);
1851         q->state |= Qclosed;
1852         if (msg == 0 || *msg == 0)
1853                 strncpy(q->err, Ehungup, sizeof(q->err));
1854         else
1855                 strncpy(q->err, msg, ERRMAX - 1);
1856         spin_unlock_irqsave(&q->lock);
1857
1858         /* wake up readers/writers */
1859         rendez_wakeup(&q->rr);
1860         rendez_wakeup(&q->wr);
1861         qwake_cb(q, FDTAP_FILT_HANGUP);
1862 }
1863
1864 /*
1865  *  return non-zero if the q is hungup
1866  */
1867 int qisclosed(struct queue *q)
1868 {
1869         return q->state & Qclosed;
1870 }
1871
1872 /*
1873  *  mark a queue as no longer hung up.  resets the wake_cb.
1874  */
1875 void qreopen(struct queue *q)
1876 {
1877         spin_lock_irqsave(&q->lock);
1878         q->state &= ~Qclosed;
1879         q->state |= Qstarve;
1880         q->eof = 0;
1881         q->limit = q->inilim;
1882         q->wake_cb = 0;
1883         q->wake_data = 0;
1884         spin_unlock_irqsave(&q->lock);
1885 }
1886
1887 /*
1888  *  return bytes queued
1889  */
1890 int qlen(struct queue *q)
1891 {
1892         return q->dlen;
1893 }
1894
1895 /*
1896  * return space remaining before flow control
1897  */
1898 int qwindow(struct queue *q)
1899 {
1900         int l;
1901
1902         l = q->limit - q->len;
1903         if (l < 0)
1904                 l = 0;
1905         return l;
1906 }
1907
1908 /*
1909  *  return true if we can read without blocking
1910  */
1911 int qcanread(struct queue *q)
1912 {
1913         return q->bfirst != 0;
1914 }
1915
1916 /*
1917  *  change queue limit
1918  */
1919 void qsetlimit(struct queue *q, int limit)
1920 {
1921         q->limit = limit;
1922 }
1923
1924 /*
1925  *  set whether writes drop overflowing blocks, or if we sleep
1926  */
1927 void qdropoverflow(struct queue *q, bool onoff)
1928 {
1929         if (onoff)
1930                 q->state |= Qdropoverflow;
1931         else
1932                 q->state &= ~Qdropoverflow;
1933 }
1934
1935 /* set whether or not the queue is nonblocking, in the EAGAIN sense. */
1936 void qnonblock(struct queue *q, bool onoff)
1937 {
1938         if (onoff)
1939                 q->state |= Qnonblock;
1940         else
1941                 q->state &= ~Qnonblock;
1942 }
1943
1944 /*
1945  *  flush the output queue
1946  */
1947 void qflush(struct queue *q)
1948 {
1949         struct block *bfirst;
1950
1951         /* mark it */
1952         spin_lock_irqsave(&q->lock);
1953         bfirst = q->bfirst;
1954         q->bfirst = 0;
1955         q->len = 0;
1956         q->dlen = 0;
1957         spin_unlock_irqsave(&q->lock);
1958
1959         /* free queued blocks */
1960         freeblist(bfirst);
1961
1962         /* wake up writers */
1963         rendez_wakeup(&q->wr);
1964         qwake_cb(q, FDTAP_FILT_WRITABLE);
1965 }
1966
1967 int qfull(struct queue *q)
1968 {
1969         return q->state & Qflow;
1970 }
1971
1972 int qstate(struct queue *q)
1973 {
1974         return q->state;
1975 }
1976
1977 void qdump(struct queue *q)
1978 {
1979         if (q)
1980                 printk("q=%p bfirst=%p blast=%p len=%d dlen=%d limit=%d state=#%x\n",
1981                            q, q->bfirst, q->blast, q->len, q->dlen, q->limit, q->state);
1982 }
1983
1984 /* On certain wakeup events, qio will call func(q, data, filter), where filter
1985  * marks the type of wakeup event (flags from FDTAP).
1986  *
1987  * There's no sync protection.  If you change the CB while the qio is running,
1988  * you might get a CB with the data or func from a previous set_wake_cb.  You
1989  * should set this once per queue and forget it.
1990  *
1991  * You can remove the CB by passing in 0 for the func.  Alternatively, you can
1992  * just make sure that the func(data) pair are valid until the queue is freed or
1993  * reopened. */
1994 void qio_set_wake_cb(struct queue *q, qio_wake_cb_t func, void *data)
1995 {
1996         q->wake_data = data;
1997         wmb();  /* if we see func, we'll also see the data for it */
1998         q->wake_cb = func;
1999 }