qio: Add callbacks for unblocking queues
[akaros.git] / kern / src / ns / qio.c
1 // INFERNO
2 #include <vfs.h>
3 #include <kfs.h>
4 #include <slab.h>
5 #include <kmalloc.h>
6 #include <kref.h>
7 #include <string.h>
8 #include <stdio.h>
9 #include <assert.h>
10 #include <error.h>
11 #include <cpio.h>
12 #include <pmap.h>
13 #include <smp.h>
14 #include <ip.h>
15
16 #define PANIC_EXTRA(b)                                                          \
17 {                                                                              \
18         if ((b)->extra_len)                                                        \
19                 panic("%s doesn't handle extra_data", __FUNCTION__);               \
20 }
21
22 static uint32_t padblockcnt;
23 static uint32_t concatblockcnt;
24 static uint32_t pullupblockcnt;
25 static uint32_t copyblockcnt;
26 static uint32_t consumecnt;
27 static uint32_t producecnt;
28 static uint32_t qcopycnt;
29
30 static int debugging;
31
32 #define QDEBUG  if(0)
33
34 /*
35  *  IO queues
36  */
37
38 struct queue {
39         spinlock_t lock;;
40
41         struct block *bfirst;           /* buffer */
42         struct block *blast;
43
44         int len;                                        /* bytes allocated to queue */
45         int dlen;                                       /* data bytes in queue */
46         int limit;                                      /* max bytes in queue */
47         int inilim;                             /* initial limit */
48         int state;
49         int eof;                                        /* number of eofs read by user */
50
51         void (*kick) (void *);          /* restart output */
52         void (*bypass) (void *, struct block *);        /* bypass queue altogether */
53         void *arg;                                      /* argument to kick */
54
55         qlock_t rlock;                          /* mutex for reading processes */
56         struct rendez rr;                       /* process waiting to read */
57         qlock_t wlock;                          /* mutex for writing processes */
58         struct rendez wr;                       /* process waiting to write */
59         qio_wake_cb_t wake_cb;          /* callbacks for qio wakeups */
60         void *wake_data;
61
62         char err[ERRMAX];
63 };
64
65 enum {
66         Maxatomic = 64 * 1024,
67 };
68
69 unsigned int qiomaxatomic = Maxatomic;
70
71 /* Helper: fires a wake callback, sending 'filter' */
72 static void qwake_cb(struct queue *q, int filter)
73 {
74         if (q->wake_cb)
75                 q->wake_cb(q, q->wake_data, filter);
76 }
77
78 void ixsummary(void)
79 {
80         debugging ^= 1;
81         iallocsummary();
82         printd("pad %lu, concat %lu, pullup %lu, copy %lu\n",
83                    padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
84         printd("consume %lu, produce %lu, qcopy %lu\n",
85                    consumecnt, producecnt, qcopycnt);
86 }
87
88 /*
89  *  free a list of blocks
90  */
91 void freeblist(struct block *b)
92 {
93         struct block *next;
94
95         for (; b != 0; b = next) {
96                 next = b->next;
97                 b->next = 0;
98                 freeb(b);
99         }
100 }
101
102 /*
103  *  pad a block to the front (or the back if size is negative)
104  */
105 struct block *padblock(struct block *bp, int size)
106 {
107         int n;
108         struct block *nbp;
109         uint8_t bcksum = bp->flag & BCKSUM_FLAGS;
110         uint16_t checksum_start = bp->checksum_start;
111         uint16_t checksum_offset = bp->checksum_offset;
112         uint16_t mss = bp->mss;
113
114         QDEBUG checkb(bp, "padblock 1");
115         if (size >= 0) {
116                 if (bp->rp - bp->base >= size) {
117                         bp->checksum_start += size;
118                         bp->rp -= size;
119                         return bp;
120                 }
121
122                 PANIC_EXTRA(bp);
123                 if (bp->next)
124                         panic("padblock %p", getcallerpc(&bp));
125                 n = BLEN(bp);
126                 padblockcnt++;
127                 nbp = allocb(size + n);
128                 nbp->rp += size;
129                 nbp->wp = nbp->rp;
130                 memmove(nbp->wp, bp->rp, n);
131                 nbp->wp += n;
132                 freeb(bp);
133                 nbp->rp -= size;
134         } else {
135                 size = -size;
136
137                 PANIC_EXTRA(bp);
138
139                 if (bp->next)
140                         panic("padblock %p", getcallerpc(&bp));
141
142                 if (bp->lim - bp->wp >= size)
143                         return bp;
144
145                 n = BLEN(bp);
146                 padblockcnt++;
147                 nbp = allocb(size + n);
148                 memmove(nbp->wp, bp->rp, n);
149                 nbp->wp += n;
150                 freeb(bp);
151         }
152         if (bcksum) {
153                 nbp->flag |= bcksum;
154                 nbp->checksum_start = checksum_start;
155                 nbp->checksum_offset = checksum_offset;
156                 nbp->mss = mss;
157         }
158         QDEBUG checkb(nbp, "padblock 1");
159         return nbp;
160 }
161
162 /*
163  *  return count of bytes in a string of blocks
164  */
165 int blocklen(struct block *bp)
166 {
167         int len;
168
169         len = 0;
170         while (bp) {
171                 len += BLEN(bp);
172                 bp = bp->next;
173         }
174         return len;
175 }
176
177 /*
178  * return count of space in blocks
179  */
180 int blockalloclen(struct block *bp)
181 {
182         int len;
183
184         len = 0;
185         while (bp) {
186                 len += BALLOC(bp);
187                 bp = bp->next;
188         }
189         return len;
190 }
191
192 /*
193  *  copy the  string of blocks into
194  *  a single block and free the string
195  */
196 struct block *concatblock(struct block *bp)
197 {
198         int len;
199         struct block *nb, *f;
200
201         if (bp->next == 0)
202                 return bp;
203
204         /* probably use parts of qclone */
205         PANIC_EXTRA(bp);
206         nb = allocb(blocklen(bp));
207         for (f = bp; f; f = f->next) {
208                 len = BLEN(f);
209                 memmove(nb->wp, f->rp, len);
210                 nb->wp += len;
211         }
212         concatblockcnt += BLEN(nb);
213         freeblist(bp);
214         QDEBUG checkb(nb, "concatblock 1");
215         return nb;
216 }
217
218 /* Returns a block with the remaining contents of b all in the main body of the
219  * returned block.  Replace old references to b with the returned value (which
220  * may still be 'b', if no change was needed. */
221 struct block *linearizeblock(struct block *b)
222 {
223         struct block *newb;
224         size_t len;
225         struct extra_bdata *ebd;
226
227         if (!b->extra_len)
228                 return b;
229
230         newb = allocb(BLEN(b));
231         len = BHLEN(b);
232         memcpy(newb->wp, b->rp, len);
233         newb->wp += len;
234         len = b->extra_len;
235         for (int i = 0; (i < b->nr_extra_bufs) && len; i++) {
236                 ebd = &b->extra_data[i];
237                 if (!ebd->base || !ebd->len)
238                         continue;
239                 memcpy(newb->wp, (void*)(ebd->base + ebd->off), ebd->len);
240                 newb->wp += ebd->len;
241                 len -= ebd->len;
242         }
243         /* TODO: any other flags that need copied over? */
244         if (b->flag & BCKSUM_FLAGS) {
245                 newb->flag |= (b->flag & BCKSUM_FLAGS);
246                 newb->checksum_start = b->checksum_start;
247                 newb->checksum_offset = b->checksum_offset;
248                 newb->mss = b->mss;
249         }
250         freeb(b);
251         return newb;
252 }
253
254 /*
255  *  make sure the first block has at least n bytes in its main body
256  */
257 struct block *pullupblock(struct block *bp, int n)
258 {
259         int i, len, seglen;
260         struct block *nbp;
261         struct extra_bdata *ebd;
262
263         /*
264          *  this should almost always be true, it's
265          *  just to avoid every caller checking.
266          */
267         if (BHLEN(bp) >= n)
268                 return bp;
269
270          /* a start at explicit main-body / header management */
271         if (bp->extra_len) {
272                 if (n > bp->lim - bp->rp) {
273                         /* would need to realloc a new block and copy everything over. */
274                         panic("can't pullup %d bytes, no place to put it: bp->lim %p, bp->rp %p, bp->lim-bp->rp %d\n",
275                                         n, bp->lim, bp->rp, bp->lim-bp->rp);
276                 }
277                 len = n - BHLEN(bp);
278                 if (len > bp->extra_len)
279                         panic("pullup more than extra (%d, %d, %d)\n",
280                               n, BHLEN(bp), bp->extra_len);
281                 checkb(bp, "before pullup");
282                 for (int i = 0; (i < bp->nr_extra_bufs) && len; i++) {
283                         ebd = &bp->extra_data[i];
284                         if (!ebd->base || !ebd->len)
285                                 continue;
286                         seglen = MIN(ebd->len, len);
287                         memcpy(bp->wp, (void*)(ebd->base + ebd->off), seglen);
288                         bp->wp += seglen;
289                         len -= seglen;
290                         ebd->len -= seglen;
291                         ebd->off += seglen;
292                         bp->extra_len -= seglen;
293                         if (ebd->len == 0) {
294                                 kfree((void *)ebd->base);
295                                 ebd->off = 0;
296                                 ebd->base = 0;
297                         }
298                 }
299                 /* maybe just call pullupblock recursively here */
300                 if (len)
301                         panic("pullup %d bytes overdrawn\n", len);
302                 checkb(bp, "after pullup");
303                 return bp;
304         }
305
306         /*
307          *  if not enough room in the first block,
308          *  add another to the front of the list.
309          */
310         if (bp->lim - bp->rp < n) {
311                 nbp = allocb(n);
312                 nbp->next = bp;
313                 bp = nbp;
314         }
315
316         /*
317          *  copy bytes from the trailing blocks into the first
318          */
319         n -= BLEN(bp);
320         while ((nbp = bp->next)) {
321                 i = BLEN(nbp);
322                 if (i > n) {
323                         memmove(bp->wp, nbp->rp, n);
324                         pullupblockcnt++;
325                         bp->wp += n;
326                         nbp->rp += n;
327                         QDEBUG checkb(bp, "pullupblock 1");
328                         return bp;
329                 } else {
330                         memmove(bp->wp, nbp->rp, i);
331                         pullupblockcnt++;
332                         bp->wp += i;
333                         bp->next = nbp->next;
334                         nbp->next = 0;
335                         freeb(nbp);
336                         n -= i;
337                         if (n == 0) {
338                                 QDEBUG checkb(bp, "pullupblock 2");
339                                 return bp;
340                         }
341                 }
342         }
343         freeb(bp);
344         return 0;
345 }
346
347 /*
348  *  make sure the first block has at least n bytes in its main body
349  */
350 struct block *pullupqueue(struct queue *q, int n)
351 {
352         struct block *b;
353
354         /* TODO: lock to protect the queue links? */
355         if ((BHLEN(q->bfirst) >= n))
356                 return q->bfirst;
357         q->bfirst = pullupblock(q->bfirst, n);
358         for (b = q->bfirst; b != NULL && b->next != NULL; b = b->next) ;
359         q->blast = b;
360         return q->bfirst;
361 }
362
363 /* throw away count bytes from the front of
364  * block's extradata.  Returns count of bytes
365  * thrown away
366  */
367
368 static int pullext(struct block *bp, int count)
369 {
370         struct extra_bdata *ed;
371         int i, rem, bytes = 0;
372
373         for (i = 0; bp->extra_len && count && i < bp->nr_extra_bufs; i++) {
374                 ed = &bp->extra_data[i];
375                 rem = MIN(count, ed->len);
376                 bp->extra_len -= rem;
377                 count -= rem;
378                 bytes += rem;
379                 ed->off += rem;
380                 ed->len -= rem;
381                 if (ed->len == 0) {
382                         kfree((void *)ed->base);
383                         ed->base = 0;
384                         ed->off = 0;
385                 }
386         }
387         return bytes;
388 }
389
390 /* throw away count bytes from the end of a
391  * block's extradata.  Returns count of bytes
392  * thrown away
393  */
394
395 static int dropext(struct block *bp, int count)
396 {
397         struct extra_bdata *ed;
398         int i, rem, bytes = 0;
399
400         for (i = bp->nr_extra_bufs - 1; bp->extra_len && count && i >= 0; i--) {
401                 ed = &bp->extra_data[i];
402                 rem = MIN(count, ed->len);
403                 bp->extra_len -= rem;
404                 count -= rem;
405                 bytes += rem;
406                 ed->len -= rem;
407                 if (ed->len == 0) {
408                         kfree((void *)ed->base);
409                         ed->base = 0;
410                         ed->off = 0;
411                 }
412         }
413         return bytes;
414 }
415
416 /*
417  *  throw away up to count bytes from a
418  *  list of blocks.  Return count of bytes
419  *  thrown away.
420  */
421 static int _pullblock(struct block **bph, int count, int free)
422 {
423         struct block *bp;
424         int n, bytes;
425
426         bytes = 0;
427         if (bph == NULL)
428                 return 0;
429
430         while (*bph != NULL && count != 0) {
431                 bp = *bph;
432
433                 n = MIN(BHLEN(bp), count);
434                 bytes += n;
435                 count -= n;
436                 bp->rp += n;
437                 n = pullext(bp, count);
438                 bytes += n;
439                 count -= n;
440                 QDEBUG checkb(bp, "pullblock ");
441                 if (BLEN(bp) == 0 && (free || count)) {
442                         *bph = bp->next;
443                         bp->next = NULL;
444                         freeb(bp);
445                 }
446         }
447         return bytes;
448 }
449
450 int pullblock(struct block **bph, int count)
451 {
452         return _pullblock(bph, count, 1);
453 }
454
455 /*
456  *  trim to len bytes starting at offset
457  */
458 struct block *trimblock(struct block *bp, int offset, int len)
459 {
460         uint32_t l, trim;
461         int olen = len;
462
463         QDEBUG checkb(bp, "trimblock 1");
464         if (blocklen(bp) < offset + len) {
465                 freeblist(bp);
466                 return NULL;
467         }
468
469         l =_pullblock(&bp, offset, 0);
470         if (bp == NULL)
471                 return NULL;
472         if (l != offset) {
473                 freeblist(bp);
474                 return NULL;
475         }
476
477         while ((l = BLEN(bp)) < len) {
478                 len -= l;
479                 bp = bp->next;
480         }
481
482         trim = BLEN(bp) - len;
483         trim -= dropext(bp, trim);
484         bp->wp -= trim;
485
486         if (bp->next) {
487                 freeblist(bp->next);
488                 bp->next = NULL;
489         }
490         return bp;
491 }
492
493 /*
494  *  copy 'count' bytes into a new block
495  */
496 struct block *copyblock(struct block *bp, int count)
497 {
498         int l;
499         struct block *nbp;
500
501         QDEBUG checkb(bp, "copyblock 0");
502         nbp = allocb(count);
503         if (bp->flag & BCKSUM_FLAGS) {
504                 nbp->flag |= (bp->flag & BCKSUM_FLAGS);
505                 nbp->checksum_start = bp->checksum_start;
506                 nbp->checksum_offset = bp->checksum_offset;
507                 nbp->mss = bp->mss;
508         }
509         PANIC_EXTRA(bp);
510         for (; count > 0 && bp != 0; bp = bp->next) {
511                 l = BLEN(bp);
512                 if (l > count)
513                         l = count;
514                 memmove(nbp->wp, bp->rp, l);
515                 nbp->wp += l;
516                 count -= l;
517         }
518         if (count > 0) {
519                 memset(nbp->wp, 0, count);
520                 nbp->wp += count;
521         }
522         copyblockcnt++;
523         QDEBUG checkb(nbp, "copyblock 1");
524
525         return nbp;
526 }
527
528 struct block *adjustblock(struct block *bp, int len)
529 {
530         int n;
531         struct block *nbp;
532
533         if (len < 0) {
534                 freeb(bp);
535                 return NULL;
536         }
537
538         PANIC_EXTRA(bp);
539         if (bp->rp + len > bp->lim) {
540                 nbp = copyblock(bp, len);
541                 freeblist(bp);
542                 QDEBUG checkb(nbp, "adjustblock 1");
543
544                 return nbp;
545         }
546
547         n = BLEN(bp);
548         if (len > n)
549                 memset(bp->wp, 0, len - n);
550         bp->wp = bp->rp + len;
551         QDEBUG checkb(bp, "adjustblock 2");
552
553         return bp;
554 }
555
556
557 /*
558  *  get next block from a queue, return null if nothing there
559  */
560 struct block *qget(struct queue *q)
561 {
562         int dowakeup;
563         struct block *b;
564
565         /* sync with qwrite */
566         spin_lock_irqsave(&q->lock);
567
568         b = q->bfirst;
569         if (b == NULL) {
570                 q->state |= Qstarve;
571                 spin_unlock_irqsave(&q->lock);
572                 return NULL;
573         }
574         q->bfirst = b->next;
575         b->next = 0;
576         q->len -= BALLOC(b);
577         q->dlen -= BLEN(b);
578         QDEBUG checkb(b, "qget");
579
580         /* if writer flow controlled, restart */
581         if ((q->state & Qflow) && q->len < q->limit / 2) {
582                 q->state &= ~Qflow;
583                 dowakeup = 1;
584         } else
585                 dowakeup = 0;
586
587         spin_unlock_irqsave(&q->lock);
588
589         if (dowakeup) {
590                 rendez_wakeup(&q->wr);
591                 /* We only send the writable event on wakeup, which is edge triggered */
592                 qwake_cb(q, FDTAP_FILT_WRITABLE);
593         }
594
595         return b;
596 }
597
598 /*
599  *  throw away the next 'len' bytes in the queue
600  * returning the number actually discarded
601  */
602 int qdiscard(struct queue *q, int len)
603 {
604         struct block *b;
605         int dowakeup, n, sofar, body_amt, extra_amt;
606         struct extra_bdata *ebd;
607
608         spin_lock_irqsave(&q->lock);
609         for (sofar = 0; sofar < len; sofar += n) {
610                 b = q->bfirst;
611                 if (b == NULL)
612                         break;
613                 QDEBUG checkb(b, "qdiscard");
614                 n = BLEN(b);
615                 if (n <= len - sofar) {
616                         q->bfirst = b->next;
617                         b->next = 0;
618                         q->len -= BALLOC(b);
619                         q->dlen -= BLEN(b);
620                         freeb(b);
621                 } else {
622                         n = len - sofar;
623                         q->dlen -= n;
624                         /* partial block removal */
625                         body_amt = MIN(BHLEN(b), n);
626                         b->rp += body_amt;
627                         extra_amt = n - body_amt;
628                         /* reduce q->len by the amount we remove from the extras.  The
629                          * header will always be accounted for above, during block removal.
630                          * */
631                         q->len -= extra_amt;
632                         for (int i = 0; (i < b->nr_extra_bufs) && extra_amt; i++) {
633                                 ebd = &b->extra_data[i];
634                                 if (!ebd->base || !ebd->len)
635                                         continue;
636                                 if (extra_amt >= ebd->len) {
637                                         /* remove the entire entry, note the kfree release */
638                                         b->extra_len -= ebd->len;
639                                         extra_amt -= ebd->len;
640                                         kfree((void*)ebd->base);
641                                         ebd->base = ebd->off = ebd->len = 0;
642                                         continue;
643                                 }
644                                 ebd->off += extra_amt;
645                                 ebd->len -= extra_amt;
646                                 b->extra_len -= extra_amt;
647                                 extra_amt = 0;
648                         }
649                 }
650         }
651
652         /*
653          *  if writer flow controlled, restart
654          *
655          *  This used to be
656          *  q->len < q->limit/2
657          *  but it slows down tcp too much for certain write sizes.
658          *  I really don't understand it completely.  It may be
659          *  due to the queue draining so fast that the transmission
660          *  stalls waiting for the app to produce more data.  - presotto
661          */
662         if ((q->state & Qflow) && q->len < q->limit) {
663                 q->state &= ~Qflow;
664                 dowakeup = 1;
665         } else
666                 dowakeup = 0;
667
668         spin_unlock_irqsave(&q->lock);
669
670         if (dowakeup) {
671                 rendez_wakeup(&q->wr);
672                 qwake_cb(q, FDTAP_FILT_WRITABLE);
673         }
674
675         return sofar;
676 }
677
678 /*
679  *  Interrupt level copy out of a queue, return # bytes copied.
680  */
681 int qconsume(struct queue *q, void *vp, int len)
682 {
683         struct block *b;
684         int n, dowakeup;
685         uint8_t *p = vp;
686         struct block *tofree = NULL;
687
688         /* sync with qwrite */
689         spin_lock_irqsave(&q->lock);
690
691         for (;;) {
692                 b = q->bfirst;
693                 if (b == 0) {
694                         q->state |= Qstarve;
695                         spin_unlock_irqsave(&q->lock);
696                         return -1;
697                 }
698                 QDEBUG checkb(b, "qconsume 1");
699
700                 n = BLEN(b);
701                 if (n > 0)
702                         break;
703                 q->bfirst = b->next;
704                 q->len -= BALLOC(b);
705
706                 /* remember to free this */
707                 b->next = tofree;
708                 tofree = b;
709         };
710
711         PANIC_EXTRA(b);
712         if (n < len)
713                 len = n;
714         memmove(p, b->rp, len);
715         consumecnt += n;
716         b->rp += len;
717         q->dlen -= len;
718
719         /* discard the block if we're done with it */
720         if ((q->state & Qmsg) || len == n) {
721                 q->bfirst = b->next;
722                 b->next = 0;
723                 q->len -= BALLOC(b);
724                 q->dlen -= BLEN(b);
725
726                 /* remember to free this */
727                 b->next = tofree;
728                 tofree = b;
729         }
730
731         /* if writer flow controlled, restart */
732         if ((q->state & Qflow) && q->len < q->limit / 2) {
733                 q->state &= ~Qflow;
734                 dowakeup = 1;
735         } else
736                 dowakeup = 0;
737
738         spin_unlock_irqsave(&q->lock);
739
740         if (dowakeup) {
741                 rendez_wakeup(&q->wr);
742                 qwake_cb(q, FDTAP_FILT_WRITABLE);
743         }
744
745         if (tofree != NULL)
746                 freeblist(tofree);
747
748         return len;
749 }
750
751 int qpass(struct queue *q, struct block *b)
752 {
753         int dlen, len, dowakeup;
754
755         /* sync with qread */
756         dowakeup = 0;
757         spin_lock_irqsave(&q->lock);
758         if (q->len >= q->limit) {
759                 freeblist(b);
760                 spin_unlock_irqsave(&q->lock);
761                 return -1;
762         }
763         if (q->state & Qclosed) {
764                 len = blocklen(b);
765                 freeblist(b);
766                 spin_unlock_irqsave(&q->lock);
767                 return len;
768         }
769
770         /* add buffer to queue */
771         if (q->bfirst)
772                 q->blast->next = b;
773         else
774                 q->bfirst = b;
775         len = BALLOC(b);
776         dlen = BLEN(b);
777         QDEBUG checkb(b, "qpass");
778         while (b->next) {
779                 b = b->next;
780                 QDEBUG checkb(b, "qpass");
781                 len += BALLOC(b);
782                 dlen += BLEN(b);
783         }
784         q->blast = b;
785         q->len += len;
786         q->dlen += dlen;
787
788         if (q->len >= q->limit / 2)
789                 q->state |= Qflow;
790
791         if (q->state & Qstarve) {
792                 q->state &= ~Qstarve;
793                 dowakeup = 1;
794         }
795         spin_unlock_irqsave(&q->lock);
796
797         if (dowakeup) {
798                 rendez_wakeup(&q->rr);
799                 qwake_cb(q, FDTAP_FILT_READABLE);
800         }
801
802         return len;
803 }
804
805 int qpassnolim(struct queue *q, struct block *b)
806 {
807         int dlen, len, dowakeup;
808
809         /* sync with qread */
810         dowakeup = 0;
811         spin_lock_irqsave(&q->lock);
812
813         if (q->state & Qclosed) {
814                 freeblist(b);
815                 spin_unlock_irqsave(&q->lock);
816                 return BALLOC(b);
817         }
818
819         /* add buffer to queue */
820         if (q->bfirst)
821                 q->blast->next = b;
822         else
823                 q->bfirst = b;
824         len = BALLOC(b);
825         dlen = BLEN(b);
826         QDEBUG checkb(b, "qpass");
827         while (b->next) {
828                 b = b->next;
829                 QDEBUG checkb(b, "qpass");
830                 len += BALLOC(b);
831                 dlen += BLEN(b);
832         }
833         q->blast = b;
834         q->len += len;
835         q->dlen += dlen;
836
837         if (q->len >= q->limit / 2)
838                 q->state |= Qflow;
839
840         if (q->state & Qstarve) {
841                 q->state &= ~Qstarve;
842                 dowakeup = 1;
843         }
844         spin_unlock_irqsave(&q->lock);
845
846         if (dowakeup) {
847                 rendez_wakeup(&q->rr);
848                 qwake_cb(q, FDTAP_FILT_READABLE);
849         }
850
851         return len;
852 }
853
854 /*
855  *  if the allocated space is way out of line with the used
856  *  space, reallocate to a smaller block
857  */
858 struct block *packblock(struct block *bp)
859 {
860         struct block **l, *nbp;
861         int n;
862
863         if (bp->extra_len)
864                 return bp;
865         for (l = &bp; *l; l = &(*l)->next) {
866                 nbp = *l;
867                 n = BLEN(nbp);
868                 if ((n << 2) < BALLOC(nbp)) {
869                         *l = allocb(n);
870                         memmove((*l)->wp, nbp->rp, n);
871                         (*l)->wp += n;
872                         (*l)->next = nbp->next;
873                         freeb(nbp);
874                 }
875         }
876
877         return bp;
878 }
879
880 int qproduce(struct queue *q, void *vp, int len)
881 {
882         struct block *b;
883         int dowakeup;
884         uint8_t *p = vp;
885
886         /* sync with qread */
887         dowakeup = 0;
888         spin_lock_irqsave(&q->lock);
889
890         /* no waiting receivers, room in buffer? */
891         if (q->len >= q->limit) {
892                 q->state |= Qflow;
893                 spin_unlock_irqsave(&q->lock);
894                 return -1;
895         }
896
897         /* save in buffer */
898         /* use Qcoalesce here to save storage */
899         // TODO: Consider removing the Qcoalesce flag and force a coalescing
900         // strategy by default.
901         b = q->blast;
902         if ((q->state & Qcoalesce) == 0 || q->bfirst == NULL
903                 || b->lim - b->wp < len) {
904                 /* need a new block */
905                 b = iallocb(len);
906                 if (b == 0) {
907                         spin_unlock_irqsave(&q->lock);
908                         return 0;
909                 }
910                 if (q->bfirst)
911                         q->blast->next = b;
912                 else
913                         q->bfirst = b;
914                 q->blast = b;
915                 /* b->next = 0; done by iallocb() */
916                 q->len += BALLOC(b);
917         }
918         PANIC_EXTRA(b);
919         memmove(b->wp, p, len);
920         producecnt += len;
921         b->wp += len;
922         q->dlen += len;
923         QDEBUG checkb(b, "qproduce");
924
925         if (q->state & Qstarve) {
926                 q->state &= ~Qstarve;
927                 dowakeup = 1;
928         }
929
930         if (q->len >= q->limit)
931                 q->state |= Qflow;
932         spin_unlock_irqsave(&q->lock);
933
934         if (dowakeup) {
935                 rendez_wakeup(&q->rr);
936                 qwake_cb(q, FDTAP_FILT_READABLE);
937         }
938
939         return len;
940 }
941
942 /* Add an extra_data entry to newb at newb_idx pointing to b's body, starting at
943  * body_rp, for up to len.  Returns the len consumed. 
944  *
945  * The base is 'b', so that we can kfree it later.  This currently ties us to
946  * using kfree for the release method for all extra_data.
947  *
948  * It is possible to have a body size that is 0, if there is no offset, and
949  * b->wp == b->rp.  This will have an extra data entry of 0 length. */
950 static size_t point_to_body(struct block *b, uint8_t *body_rp,
951                             struct block *newb, unsigned int newb_idx,
952                             size_t len)
953 {
954         struct extra_bdata *ebd = &newb->extra_data[newb_idx];
955
956         assert(newb_idx < newb->nr_extra_bufs);
957
958         kmalloc_incref(b);
959         ebd->base = (uintptr_t)b;
960         ebd->off = (uint32_t)(body_rp - (uint8_t*)b);
961         ebd->len = MIN(b->wp - body_rp, len);   /* think of body_rp as b->rp */
962         assert((int)ebd->len >= 0);
963         newb->extra_len += ebd->len;
964         return ebd->len;
965 }
966
967 /* Add an extra_data entry to newb at newb_idx pointing to b's b_idx'th
968  * extra_data buf, at b_off within that buffer, for up to len.  Returns the len
969  * consumed.
970  *
971  * We can have blocks with 0 length, but they are still refcnt'd.  See above. */
972 static size_t point_to_buf(struct block *b, unsigned int b_idx, uint32_t b_off,
973                            struct block *newb, unsigned int newb_idx,
974                            size_t len)
975 {
976         struct extra_bdata *n_ebd = &newb->extra_data[newb_idx];
977         struct extra_bdata *b_ebd = &b->extra_data[b_idx];
978
979         assert(b_idx < b->nr_extra_bufs);
980         assert(newb_idx < newb->nr_extra_bufs);
981
982         kmalloc_incref((void*)b_ebd->base);
983         n_ebd->base = b_ebd->base;
984         n_ebd->off = b_ebd->off + b_off;
985         n_ebd->len = MIN(b_ebd->len - b_off, len);
986         newb->extra_len += n_ebd->len;
987         return n_ebd->len;
988 }
989
990 /* given a string of blocks, fills the new block's extra_data  with the contents
991  * of the blist [offset, len + offset)
992  *
993  * returns 0 on success.  the only failure is if the extra_data array was too
994  * small, so this returns a positive integer saying how big the extra_data needs
995  * to be.
996  *
997  * callers are responsible for protecting the list structure. */
998 static int __blist_clone_to(struct block *blist, struct block *newb, int len,
999                             uint32_t offset)
1000 {
1001         struct block *b, *first;
1002         unsigned int nr_bufs = 0;
1003         unsigned int b_idx, newb_idx = 0;
1004         uint8_t *first_main_body = 0;
1005
1006         /* find the first block; keep offset relative to the latest b in the list */
1007         for (b = blist; b; b = b->next) {
1008                 if (BLEN(b) > offset)
1009                         break;
1010                 offset -= BLEN(b);
1011         }
1012         /* qcopy semantics: if you asked for an offset outside the block list, you
1013          * get an empty block back */
1014         if (!b)
1015                 return 0;
1016         first = b;
1017         /* upper bound for how many buffers we'll need in newb */
1018         for (/* b is set*/; b; b = b->next) {
1019                 nr_bufs += 1 + b->nr_extra_bufs;        /* 1 for the main body */
1020         }
1021         /* we might be holding a spinlock here, so we won't wait for kmalloc */
1022         block_add_extd(newb, nr_bufs, 0);
1023         if (newb->nr_extra_bufs < nr_bufs) {
1024                 /* caller will need to alloc these, then re-call us */
1025                 return nr_bufs;
1026         }
1027         for (b = first; b && len; b = b->next) {
1028                 b_idx = 0;
1029                 if (offset) {
1030                         if (offset < BHLEN(b)) {
1031                                 /* off is in the main body */
1032                                 len -= point_to_body(b, b->rp + offset, newb, newb_idx, len);
1033                                 newb_idx++;
1034                         } else {
1035                                 /* off is in one of the buffers (or just past the last one).
1036                                  * we're not going to point to b's main body at all. */
1037                                 offset -= BHLEN(b);
1038                                 assert(b->extra_data);
1039                                 /* assuming these extrabufs are packed, or at least that len
1040                                  * isn't gibberish */
1041                                 while (b->extra_data[b_idx].len <= offset) {
1042                                         offset -= b->extra_data[b_idx].len;
1043                                         b_idx++;
1044                                 }
1045                                 /* now offset is set to our offset in the b_idx'th buf */
1046                                 len -= point_to_buf(b, b_idx, offset, newb, newb_idx, len);
1047                                 newb_idx++;
1048                                 b_idx++;
1049                         }
1050                         offset = 0;
1051                 } else {
1052                         len -= point_to_body(b, b->rp, newb, newb_idx, len);
1053                         newb_idx++;
1054                 }
1055                 /* knock out all remaining bufs.  we only did one point_to_ op by now,
1056                  * and any point_to_ could be our last if it consumed all of len. */
1057                 for (int i = b_idx; (i < b->nr_extra_bufs) && len; i++) {
1058                         len -= point_to_buf(b, i, 0, newb, newb_idx, len);
1059                         newb_idx++;
1060                 }
1061         }
1062         return 0;
1063 }
1064
1065 struct block *blist_clone(struct block *blist, int header_len, int len,
1066                           uint32_t offset)
1067 {
1068         int ret;
1069         struct block *newb = allocb(header_len);
1070         do {
1071                 ret = __blist_clone_to(blist, newb, len, offset);
1072                 if (ret)
1073                         block_add_extd(newb, ret, KMALLOC_WAIT);
1074         } while (ret);
1075         return newb;
1076 }
1077
1078 /* given a queue, makes a single block with header_len reserved space in the
1079  * block main body, and the contents of [offset, len + offset) pointed to in the
1080  * new blocks ext_data. */
1081 struct block *qclone(struct queue *q, int header_len, int len, uint32_t offset)
1082 {
1083         int ret;
1084         struct block *newb = allocb(header_len);
1085         /* the while loop should rarely be used: it would require someone
1086          * concurrently adding to the queue. */
1087         do {
1088                 /* TODO: RCU: protecting the q list (b->next) (need read lock) */
1089                 spin_lock_irqsave(&q->lock);
1090                 ret = __blist_clone_to(q->bfirst, newb, len, offset);
1091                 spin_unlock_irqsave(&q->lock);
1092                 if (ret)
1093                         block_add_extd(newb, ret, KMALLOC_WAIT);
1094         } while (ret);
1095         return newb;
1096 }
1097
1098 /*
1099  *  copy from offset in the queue
1100  */
1101 struct block *qcopy_old(struct queue *q, int len, uint32_t offset)
1102 {
1103         int sofar;
1104         int n;
1105         struct block *b, *nb;
1106         uint8_t *p;
1107
1108         nb = allocb(len);
1109
1110         spin_lock_irqsave(&q->lock);
1111
1112         /* go to offset */
1113         b = q->bfirst;
1114         for (sofar = 0;; sofar += n) {
1115                 if (b == NULL) {
1116                         spin_unlock_irqsave(&q->lock);
1117                         return nb;
1118                 }
1119                 n = BLEN(b);
1120                 if (sofar + n > offset) {
1121                         p = b->rp + offset - sofar;
1122                         n -= offset - sofar;
1123                         break;
1124                 }
1125                 QDEBUG checkb(b, "qcopy");
1126                 b = b->next;
1127         }
1128
1129         /* copy bytes from there */
1130         for (sofar = 0; sofar < len;) {
1131                 if (n > len - sofar)
1132                         n = len - sofar;
1133                 PANIC_EXTRA(b);
1134                 memmove(nb->wp, p, n);
1135                 qcopycnt += n;
1136                 sofar += n;
1137                 nb->wp += n;
1138                 b = b->next;
1139                 if (b == NULL)
1140                         break;
1141                 n = BLEN(b);
1142                 p = b->rp;
1143         }
1144         spin_unlock_irqsave(&q->lock);
1145
1146         return nb;
1147 }
1148
1149 struct block *qcopy(struct queue *q, int len, uint32_t offset)
1150 {
1151 #ifdef CONFIG_BLOCK_EXTRAS
1152         return qclone(q, 0, len, offset);
1153 #else
1154         return qcopy_old(q, len, offset);
1155 #endif
1156 }
1157
1158 static void qinit_common(struct queue *q)
1159 {
1160         spinlock_init_irqsave(&q->lock);
1161         qlock_init(&q->rlock);
1162         qlock_init(&q->wlock);
1163         rendez_init(&q->rr);
1164         rendez_init(&q->wr);
1165 }
1166
1167 /*
1168  *  called by non-interrupt code
1169  */
1170 struct queue *qopen(int limit, int msg, void (*kick) (void *), void *arg)
1171 {
1172         struct queue *q;
1173
1174         q = kzmalloc(sizeof(struct queue), 0);
1175         if (q == 0)
1176                 return 0;
1177         qinit_common(q);
1178
1179         q->limit = q->inilim = limit;
1180         q->kick = kick;
1181         q->arg = arg;
1182         q->state = msg;
1183         q->state |= Qstarve;
1184         q->eof = 0;
1185
1186         return q;
1187 }
1188
1189 /* open a queue to be bypassed */
1190 struct queue *qbypass(void (*bypass) (void *, struct block *), void *arg)
1191 {
1192         struct queue *q;
1193
1194         q = kzmalloc(sizeof(struct queue), 0);
1195         if (q == 0)
1196                 return 0;
1197         qinit_common(q);
1198
1199         q->limit = 0;
1200         q->arg = arg;
1201         q->bypass = bypass;
1202         q->state = 0;
1203
1204         return q;
1205 }
1206
1207 static int notempty(void *a)
1208 {
1209         struct queue *q = a;
1210
1211         return (q->state & Qclosed) || q->bfirst != 0;
1212 }
1213
1214 /* Wait for the queue to be non-empty or closed.  Returns TRUE for a successful
1215  * wait, FALSE on Qclose (without error)
1216  *
1217  * Called with q ilocked.  May error out, back through the caller, with
1218  * the irqsave lock unlocked.  */
1219 static bool qwait(struct queue *q)
1220 {
1221         /* wait for data */
1222         for (;;) {
1223                 if (q->bfirst != NULL)
1224                         break;
1225
1226                 if (q->state & Qclosed) {
1227                         if (++q->eof > 3) {
1228                                 spin_unlock_irqsave(&q->lock);
1229                                 error("multiple reads on a closed queue");
1230                         }
1231                         if (*q->err && strcmp(q->err, Ehungup) != 0) {
1232                                 spin_unlock_irqsave(&q->lock);
1233                                 error(q->err);
1234                         }
1235                         return FALSE;
1236                 }
1237                 if (q->state & Qnonblock) {
1238                         spin_unlock_irqsave(&q->lock);
1239                         set_errno(EAGAIN);
1240                         error("queue empty");
1241                 }
1242                 q->state |= Qstarve;    /* flag requesting producer to wake me */
1243                 spin_unlock_irqsave(&q->lock);
1244                 /* may throw an error() */
1245                 rendez_sleep(&q->rr, notempty, q);
1246                 spin_lock_irqsave(&q->lock);
1247         }
1248         return TRUE;
1249 }
1250
1251 /*
1252  * add a block list to a queue
1253  */
1254 void qaddlist(struct queue *q, struct block *b)
1255 {
1256         /* TODO: q lock? */
1257         /* queue the block */
1258         if (q->bfirst)
1259                 q->blast->next = b;
1260         else
1261                 q->bfirst = b;
1262         q->len += blockalloclen(b);
1263         q->dlen += blocklen(b);
1264         while (b->next)
1265                 b = b->next;
1266         q->blast = b;
1267 }
1268
1269 /*
1270  *  called with q ilocked
1271  */
1272 struct block *qremove(struct queue *q)
1273 {
1274         struct block *b;
1275
1276         b = q->bfirst;
1277         if (b == NULL)
1278                 return NULL;
1279         q->bfirst = b->next;
1280         b->next = NULL;
1281         q->dlen -= BLEN(b);
1282         q->len -= BALLOC(b);
1283         QDEBUG checkb(b, "qremove");
1284         return b;
1285 }
1286
1287 static size_t read_from_block(struct block *b, uint8_t *to, size_t amt)
1288 {
1289         size_t copy_amt, retval = 0;
1290         struct extra_bdata *ebd;
1291         
1292         copy_amt = MIN(BHLEN(b), amt);
1293         memcpy(to, b->rp, copy_amt);
1294         /* advance the rp, since this block not be completely consumed and future
1295          * reads need to know where to pick up from */
1296         b->rp += copy_amt;
1297         to += copy_amt;
1298         amt -= copy_amt;
1299         retval += copy_amt;
1300         for (int i = 0; (i < b->nr_extra_bufs) && amt; i++) {
1301                 ebd = &b->extra_data[i];
1302                 /* skip empty entires.  if we track this in the struct block, we can
1303                  * just start the for loop early */
1304                 if (!ebd->base || !ebd->len)
1305                         continue;
1306                 copy_amt = MIN(ebd->len, amt);
1307                 memcpy(to, (void*)(ebd->base + ebd->off), copy_amt);
1308                 /* we're actually consuming the entries, just like how we advance rp up
1309                  * above, and might only consume part of one. */
1310                 ebd->len -= copy_amt;
1311                 ebd->off += copy_amt;
1312                 b->extra_len -= copy_amt;
1313                 if (!ebd->len) {
1314                         /* we don't actually have to decref here.  it's also done in
1315                          * freeb().  this is the earliest we can free. */
1316                         kfree((void*)ebd->base);
1317                         ebd->base = ebd->off = 0;
1318                 }
1319                 to += copy_amt;
1320                 amt -= copy_amt;
1321                 retval += copy_amt;
1322         }
1323         return retval;
1324 }
1325
1326 /*
1327  *  copy the contents of a string of blocks into
1328  *  memory.  emptied blocks are freed.  return
1329  *  pointer to first unconsumed block.
1330  */
1331 struct block *bl2mem(uint8_t * p, struct block *b, int n)
1332 {
1333         int i;
1334         struct block *next;
1335
1336         /* could be slicker here, since read_from_block is smart */
1337         for (; b != NULL; b = next) {
1338                 i = BLEN(b);
1339                 if (i > n) {
1340                         /* partial block, consume some */
1341                         read_from_block(b, p, n);
1342                         return b;
1343                 }
1344                 /* full block, consume all and move on */
1345                 i = read_from_block(b, p, i);
1346                 n -= i;
1347                 p += i;
1348                 next = b->next;
1349                 freeb(b);
1350         }
1351         return NULL;
1352 }
1353
1354 /*
1355  *  copy the contents of memory into a string of blocks.
1356  *  return NULL on error.
1357  */
1358 struct block *mem2bl(uint8_t * p, int len)
1359 {
1360         ERRSTACK(1);
1361         int n;
1362         struct block *b, *first, **l;
1363
1364         first = NULL;
1365         l = &first;
1366         if (waserror()) {
1367                 freeblist(first);
1368                 nexterror();
1369         }
1370         do {
1371                 n = len;
1372                 if (n > Maxatomic)
1373                         n = Maxatomic;
1374
1375                 *l = b = allocb(n);
1376                 /* TODO consider extra_data */
1377                 memmove(b->wp, p, n);
1378                 b->wp += n;
1379                 p += n;
1380                 len -= n;
1381                 l = &b->next;
1382         } while (len > 0);
1383         poperror();
1384
1385         return first;
1386 }
1387
1388 /*
1389  *  put a block back to the front of the queue
1390  *  called with q ilocked
1391  */
1392 void qputback(struct queue *q, struct block *b)
1393 {
1394         b->next = q->bfirst;
1395         if (q->bfirst == NULL)
1396                 q->blast = b;
1397         q->bfirst = b;
1398         q->len += BALLOC(b);
1399         q->dlen += BLEN(b);
1400 }
1401
1402 /*
1403  *  flow control, get producer going again
1404  *  called with q ilocked
1405  */
1406 static void qwakeup_iunlock(struct queue *q)
1407 {
1408         int dowakeup = 0;
1409
1410         /* if writer flow controlled, restart */
1411         if ((q->state & Qflow) && q->len < q->limit / 2) {
1412                 q->state &= ~Qflow;
1413                 dowakeup = 1;
1414         }
1415
1416         spin_unlock_irqsave(&q->lock);
1417
1418         /* wakeup flow controlled writers */
1419         if (dowakeup) {
1420                 if (q->kick)
1421                         q->kick(q->arg);
1422                 rendez_wakeup(&q->wr);
1423                 qwake_cb(q, FDTAP_FILT_WRITABLE);
1424         }
1425 }
1426
1427 /*
1428  *  get next block from a queue (up to a limit)
1429  */
1430 struct block *qbread(struct queue *q, int len)
1431 {
1432         ERRSTACK(1);
1433         struct block *b, *nb;
1434         int n;
1435
1436         qlock(&q->rlock);
1437         if (waserror()) {
1438                 qunlock(&q->rlock);
1439                 nexterror();
1440         }
1441
1442         spin_lock_irqsave(&q->lock);
1443         if (!qwait(q)) {
1444                 /* queue closed */
1445                 spin_unlock_irqsave(&q->lock);
1446                 qunlock(&q->rlock);
1447                 poperror();
1448                 return NULL;
1449         }
1450
1451         /* if we get here, there's at least one block in the queue */
1452         b = qremove(q);
1453         n = BLEN(b);
1454
1455         /* split block if it's too big and this is not a message queue */
1456         nb = b;
1457         if (n > len) {
1458                 PANIC_EXTRA(b);
1459                 if ((q->state & Qmsg) == 0) {
1460                         n -= len;
1461                         b = allocb(n);
1462                         memmove(b->wp, nb->rp + len, n);
1463                         b->wp += n;
1464                         qputback(q, b);
1465                 }
1466                 nb->wp = nb->rp + len;
1467         }
1468
1469         /* restart producer */
1470         qwakeup_iunlock(q);
1471
1472         poperror();
1473         qunlock(&q->rlock);
1474         return nb;
1475 }
1476
1477 /*
1478  *  read a queue.  if no data is queued, post a struct block
1479  *  and wait on its Rendez.
1480  */
1481 long qread(struct queue *q, void *vp, int len)
1482 {
1483         ERRSTACK(1);
1484         struct block *b, *first, **l;
1485         int m, n;
1486
1487         qlock(&q->rlock);
1488         if (waserror()) {
1489                 qunlock(&q->rlock);
1490                 nexterror();
1491         }
1492
1493         spin_lock_irqsave(&q->lock);
1494 again:
1495         if (!qwait(q)) {
1496                 /* queue closed */
1497                 spin_unlock_irqsave(&q->lock);
1498                 qunlock(&q->rlock);
1499                 poperror();
1500                 return 0;
1501         }
1502
1503         /* if we get here, there's at least one block in the queue */
1504         // TODO: Consider removing the Qcoalesce flag and force a coalescing
1505         // strategy by default.
1506         if (q->state & Qcoalesce) {
1507                 /* when coalescing, 0 length blocks just go away */
1508                 b = q->bfirst;
1509                 if (BLEN(b) <= 0) {
1510                         freeb(qremove(q));
1511                         goto again;
1512                 }
1513
1514                 /*  grab the first block plus as many
1515                  *  following blocks as will completely
1516                  *  fit in the read.
1517                  */
1518                 n = 0;
1519                 l = &first;
1520                 m = BLEN(b);
1521                 for (;;) {
1522                         *l = qremove(q);
1523                         l = &b->next;
1524                         n += m;
1525
1526                         b = q->bfirst;
1527                         if (b == NULL)
1528                                 break;
1529                         m = BLEN(b);
1530                         if (n + m > len)
1531                                 break;
1532                 }
1533         } else {
1534                 first = qremove(q);
1535                 n = BLEN(first);
1536         }
1537
1538         /* copy to user space outside of the ilock */
1539         spin_unlock_irqsave(&q->lock);
1540         b = bl2mem(vp, first, len);
1541         spin_lock_irqsave(&q->lock);
1542
1543         /* take care of any left over partial block */
1544         if (b != NULL) {
1545                 n -= BLEN(b);
1546                 if (q->state & Qmsg)
1547                         freeb(b);
1548                 else
1549                         qputback(q, b);
1550         }
1551
1552         /* restart producer */
1553         qwakeup_iunlock(q);
1554
1555         poperror();
1556         qunlock(&q->rlock);
1557         return n;
1558 }
1559
1560 static int qnotfull(void *a)
1561 {
1562         struct queue *q = a;
1563
1564         return q->len < q->limit || (q->state & Qclosed);
1565 }
1566
1567 uint32_t dropcnt;
1568
1569 /*
1570  *  add a block to a queue obeying flow control
1571  */
1572 long qbwrite(struct queue *q, struct block *b)
1573 {
1574         ERRSTACK(1);
1575         int n, dowakeup;
1576         volatile bool should_free_b = TRUE;
1577
1578         n = BLEN(b);
1579
1580         if (q->bypass) {
1581                 (*q->bypass) (q->arg, b);
1582                 return n;
1583         }
1584
1585         dowakeup = 0;
1586         qlock(&q->wlock);
1587         if (waserror()) {
1588                 if (b != NULL && should_free_b)
1589                         freeb(b);
1590                 qunlock(&q->wlock);
1591                 nexterror();
1592         }
1593
1594         spin_lock_irqsave(&q->lock);
1595
1596         /* give up if the queue is closed */
1597         if (q->state & Qclosed) {
1598                 spin_unlock_irqsave(&q->lock);
1599                 error(q->err);
1600         }
1601
1602         /* if nonblocking, don't queue over the limit */
1603         if (q->len >= q->limit) {
1604                 /* drop overflow takes priority over regular non-blocking */
1605                 if (q->state & Qdropoverflow) {
1606                         spin_unlock_irqsave(&q->lock);
1607                         freeb(b);
1608                         dropcnt += n;
1609                         qunlock(&q->wlock);
1610                         poperror();
1611                         return n;
1612                 }
1613                 if (q->state & Qnonblock) {
1614                         spin_unlock_irqsave(&q->lock);
1615                         freeb(b);
1616                         set_errno(EAGAIN);
1617                         error("queue full");
1618                 }
1619         }
1620
1621         /* queue the block */
1622         should_free_b = FALSE;
1623         if (q->bfirst)
1624                 q->blast->next = b;
1625         else
1626                 q->bfirst = b;
1627         q->blast = b;
1628         b->next = 0;
1629         q->len += BALLOC(b);
1630         q->dlen += n;
1631         QDEBUG checkb(b, "qbwrite");
1632         b = NULL;
1633
1634         /* make sure other end gets awakened */
1635         if (q->state & Qstarve) {
1636                 q->state &= ~Qstarve;
1637                 dowakeup = 1;
1638         }
1639         spin_unlock_irqsave(&q->lock);
1640
1641         /*  get output going again */
1642         if (q->kick && (dowakeup || (q->state & Qkick)))
1643                 q->kick(q->arg);
1644
1645         /* wakeup anyone consuming at the other end */
1646         if (dowakeup) {
1647                 rendez_wakeup(&q->rr);
1648                 qwake_cb(q, FDTAP_FILT_READABLE);
1649         }
1650
1651         /*
1652          *  flow control, wait for queue to get below the limit
1653          *  before allowing the process to continue and queue
1654          *  more.  We do this here so that postnote can only
1655          *  interrupt us after the data has been queued.  This
1656          *  means that things like 9p flushes and ssl messages
1657          *  will not be disrupted by software interrupts.
1658          *
1659          *  Note - this is moderately dangerous since a process
1660          *  that keeps getting interrupted and rewriting will
1661          *  queue infinite crud.
1662          */
1663         for (;;) {
1664                 if ((q->state & (Qdropoverflow | Qnonblock)) || qnotfull(q))
1665                         break;
1666
1667                 spin_lock_irqsave(&q->lock);
1668                 q->state |= Qflow;
1669                 spin_unlock_irqsave(&q->lock);
1670                 rendez_sleep(&q->wr, qnotfull, q);
1671         }
1672
1673         qunlock(&q->wlock);
1674         poperror();
1675         return n;
1676 }
1677
1678 long qibwrite(struct queue *q, struct block *b)
1679 {
1680         int n, dowakeup;
1681
1682         dowakeup = 0;
1683
1684         n = BLEN(b);
1685
1686         spin_lock_irqsave(&q->lock);
1687
1688         QDEBUG checkb(b, "qibwrite");
1689         if (q->bfirst)
1690                 q->blast->next = b;
1691         else
1692                 q->bfirst = b;
1693         q->blast = b;
1694         q->len += BALLOC(b);
1695         q->dlen += n;
1696
1697         if (q->state & Qstarve) {
1698                 q->state &= ~Qstarve;
1699                 dowakeup = 1;
1700         }
1701
1702         spin_unlock_irqsave(&q->lock);
1703
1704         if (dowakeup) {
1705                 if (q->kick)
1706                         q->kick(q->arg);
1707                 rendez_wakeup(&q->rr);
1708                 qwake_cb(q, FDTAP_FILT_READABLE);
1709         }
1710
1711         return n;
1712 }
1713
1714 /*
1715  *  write to a queue.  only Maxatomic bytes at a time is atomic.
1716  */
1717 int qwrite(struct queue *q, void *vp, int len)
1718 {
1719         int n, sofar;
1720         struct block *b;
1721         uint8_t *p = vp;
1722         void *ext_buf;
1723
1724         QDEBUG if (!islo())
1725                  printd("qwrite hi %p\n", getcallerpc(&q));
1726
1727         sofar = 0;
1728         do {
1729                 n = len - sofar;
1730                 /* This is 64K, the max amount per single block.  Still a good value? */
1731                 if (n > Maxatomic)
1732                         n = Maxatomic;
1733
1734                 /* If n is small, we don't need to bother with the extra_data.  But
1735                  * until the whole stack can handle extd blocks, we'll use them
1736                  * unconditionally. */
1737 #ifdef CONFIG_BLOCK_EXTRAS
1738                 /* allocb builds in 128 bytes of header space to all blocks, but this is
1739                  * only available via padblock (to the left).  we also need some space
1740                  * for pullupblock for some basic headers (like icmp) that get written
1741                  * in directly */
1742                 b = allocb(64);
1743                 ext_buf = kmalloc(n, 0);
1744                 memcpy(ext_buf, p + sofar, n);
1745                 block_add_extd(b, 1, KMALLOC_WAIT); /* returns 0 on success */
1746                 b->extra_data[0].base = (uintptr_t)ext_buf;
1747                 b->extra_data[0].off = 0;
1748                 b->extra_data[0].len = n;
1749                 b->extra_len += n;
1750 #else
1751                 b = allocb(n);
1752                 memmove(b->wp, p + sofar, n);
1753                 b->wp += n;
1754 #endif
1755                         
1756                 qbwrite(q, b);
1757
1758                 sofar += n;
1759         } while (sofar < len && (q->state & Qmsg) == 0);
1760
1761         return len;
1762 }
1763
1764 /*
1765  *  used by print() to write to a queue.  Since we may be splhi or not in
1766  *  a process, don't qlock.
1767  */
1768 int qiwrite(struct queue *q, void *vp, int len)
1769 {
1770         int n, sofar, dowakeup;
1771         struct block *b;
1772         uint8_t *p = vp;
1773
1774         dowakeup = 0;
1775
1776         sofar = 0;
1777         do {
1778                 n = len - sofar;
1779                 if (n > Maxatomic)
1780                         n = Maxatomic;
1781
1782                 b = iallocb(n);
1783                 if (b == NULL)
1784                         break;
1785                 /* TODO consider extra_data */
1786                 memmove(b->wp, p + sofar, n);
1787                 /* this adjusts BLEN to be n, or at least it should */
1788                 b->wp += n;
1789                 assert(n == BLEN(b));
1790                 qibwrite(q, b);
1791
1792                 sofar += n;
1793         } while (sofar < len && (q->state & Qmsg) == 0);
1794
1795         return sofar;
1796 }
1797
1798 /*
1799  *  be extremely careful when calling this,
1800  *  as there is no reference accounting
1801  */
1802 void qfree(struct queue *q)
1803 {
1804         qclose(q);
1805         kfree(q);
1806 }
1807
1808 /*
1809  *  Mark a queue as closed.  No further IO is permitted.
1810  *  All blocks are released.
1811  */
1812 void qclose(struct queue *q)
1813 {
1814         struct block *bfirst;
1815
1816         if (q == NULL)
1817                 return;
1818
1819         /* mark it */
1820         spin_lock_irqsave(&q->lock);
1821         q->state |= Qclosed;
1822         q->state &= ~(Qflow | Qstarve | Qdropoverflow | Qnonblock);
1823         strncpy(q->err, Ehungup, sizeof(q->err));
1824         bfirst = q->bfirst;
1825         q->bfirst = 0;
1826         q->len = 0;
1827         q->dlen = 0;
1828         spin_unlock_irqsave(&q->lock);
1829
1830         /* free queued blocks */
1831         freeblist(bfirst);
1832
1833         /* wake up readers/writers */
1834         rendez_wakeup(&q->rr);
1835         rendez_wakeup(&q->wr);
1836         qwake_cb(q, FDTAP_FILT_HANGUP);
1837 }
1838
1839 /*
1840  *  Mark a queue as closed.  Wakeup any readers.  Don't remove queued
1841  *  blocks.
1842  */
1843 void qhangup(struct queue *q, char *msg)
1844 {
1845         /* mark it */
1846         spin_lock_irqsave(&q->lock);
1847         q->state |= Qclosed;
1848         if (msg == 0 || *msg == 0)
1849                 strncpy(q->err, Ehungup, sizeof(q->err));
1850         else
1851                 strncpy(q->err, msg, ERRMAX - 1);
1852         spin_unlock_irqsave(&q->lock);
1853
1854         /* wake up readers/writers */
1855         rendez_wakeup(&q->rr);
1856         rendez_wakeup(&q->wr);
1857         qwake_cb(q, FDTAP_FILT_HANGUP);
1858 }
1859
1860 /*
1861  *  return non-zero if the q is hungup
1862  */
1863 int qisclosed(struct queue *q)
1864 {
1865         return q->state & Qclosed;
1866 }
1867
1868 /*
1869  *  mark a queue as no longer hung up.  resets the wake_cb.
1870  */
1871 void qreopen(struct queue *q)
1872 {
1873         spin_lock_irqsave(&q->lock);
1874         q->state &= ~Qclosed;
1875         q->state |= Qstarve;
1876         q->eof = 0;
1877         q->limit = q->inilim;
1878         q->wake_cb = 0;
1879         q->wake_data = 0;
1880         spin_unlock_irqsave(&q->lock);
1881 }
1882
1883 /*
1884  *  return bytes queued
1885  */
1886 int qlen(struct queue *q)
1887 {
1888         return q->dlen;
1889 }
1890
1891 /*
1892  * return space remaining before flow control
1893  */
1894 int qwindow(struct queue *q)
1895 {
1896         int l;
1897
1898         l = q->limit - q->len;
1899         if (l < 0)
1900                 l = 0;
1901         return l;
1902 }
1903
1904 /*
1905  *  return true if we can read without blocking
1906  */
1907 int qcanread(struct queue *q)
1908 {
1909         return q->bfirst != 0;
1910 }
1911
1912 /*
1913  *  change queue limit
1914  */
1915 void qsetlimit(struct queue *q, int limit)
1916 {
1917         q->limit = limit;
1918 }
1919
1920 /*
1921  *  set whether writes drop overflowing blocks, or if we sleep
1922  */
1923 void qdropoverflow(struct queue *q, bool onoff)
1924 {
1925         if (onoff)
1926                 q->state |= Qdropoverflow;
1927         else
1928                 q->state &= ~Qdropoverflow;
1929 }
1930
1931 /* set whether or not the queue is nonblocking, in the EAGAIN sense. */
1932 void qnonblock(struct queue *q, bool onoff)
1933 {
1934         if (onoff)
1935                 q->state |= Qnonblock;
1936         else
1937                 q->state &= ~Qnonblock;
1938 }
1939
1940 /*
1941  *  flush the output queue
1942  */
1943 void qflush(struct queue *q)
1944 {
1945         struct block *bfirst;
1946
1947         /* mark it */
1948         spin_lock_irqsave(&q->lock);
1949         bfirst = q->bfirst;
1950         q->bfirst = 0;
1951         q->len = 0;
1952         q->dlen = 0;
1953         spin_unlock_irqsave(&q->lock);
1954
1955         /* free queued blocks */
1956         freeblist(bfirst);
1957
1958         /* wake up writers */
1959         rendez_wakeup(&q->wr);
1960         qwake_cb(q, FDTAP_FILT_WRITABLE);
1961 }
1962
1963 int qfull(struct queue *q)
1964 {
1965         return q->state & Qflow;
1966 }
1967
1968 int qstate(struct queue *q)
1969 {
1970         return q->state;
1971 }
1972
1973 void qdump(struct queue *q)
1974 {
1975         if (q)
1976                 printk("q=%p bfirst=%p blast=%p len=%d dlen=%d limit=%d state=#%x\n",
1977                            q, q->bfirst, q->blast, q->len, q->dlen, q->limit, q->state);
1978 }
1979
1980 /* On certain wakeup events, qio will call func(q, data, filter), where filter
1981  * marks the type of wakeup event (flags from FDTAP).
1982  *
1983  * There's no sync protection.  If you change the CB while the qio is running,
1984  * you might get a CB with the data or func from a previous set_wake_cb.  You
1985  * should set this once per queue and forget it.
1986  *
1987  * You can remove the CB by passing in 0 for the func.  Alternatively, you can
1988  * just make sure that the func(data) pair are valid until the queue is freed or
1989  * reopened. */
1990 void qio_set_wake_cb(struct queue *q, qio_wake_cb_t func, void *data)
1991 {
1992         q->wake_data = data;
1993         wmb();  /* if we see func, we'll also see the data for it */
1994         q->wake_cb = func;
1995 }