Implement TSO
[akaros.git] / kern / src / ns / qio.c
1 // INFERNO
2 #include <vfs.h>
3 #include <kfs.h>
4 #include <slab.h>
5 #include <kmalloc.h>
6 #include <kref.h>
7 #include <string.h>
8 #include <stdio.h>
9 #include <assert.h>
10 #include <error.h>
11 #include <cpio.h>
12 #include <pmap.h>
13 #include <smp.h>
14 #include <ip.h>
15
16 #define WARN_EXTRA(b)                                                          \
17 {                                                                              \
18         if ((b)->extra_len)                                                        \
19                 warn_once("%s doesn't handle extra_data", __FUNCTION__);               \
20 }
21
22 static uint32_t padblockcnt;
23 static uint32_t concatblockcnt;
24 static uint32_t pullupblockcnt;
25 static uint32_t copyblockcnt;
26 static uint32_t consumecnt;
27 static uint32_t producecnt;
28 static uint32_t qcopycnt;
29
30 static int debugging;
31
32 #define QDEBUG  if(0)
33
34 /*
35  *  IO queues
36  */
37
38 struct queue {
39         spinlock_t lock;;
40
41         struct block *bfirst;           /* buffer */
42         struct block *blast;
43
44         int len;                                        /* bytes allocated to queue */
45         int dlen;                                       /* data bytes in queue */
46         int limit;                                      /* max bytes in queue */
47         int inilim;                             /* initial limit */
48         int state;
49         int noblock;                            /* true if writes return immediately when q full */
50         int eof;                                        /* number of eofs read by user */
51
52         void (*kick) (void *);          /* restart output */
53         void (*bypass) (void *, struct block *);        /* bypass queue altogether */
54         void *arg;                                      /* argument to kick */
55
56         qlock_t rlock;                          /* mutex for reading processes */
57         struct rendez rr;                       /* process waiting to read */
58         qlock_t wlock;                          /* mutex for writing processes */
59         struct rendez wr;                       /* process waiting to write */
60
61         char err[ERRMAX];
62 };
63
64 enum {
65         Maxatomic = 64 * 1024,
66 };
67
68 unsigned int qiomaxatomic = Maxatomic;
69
70 void ixsummary(void)
71 {
72         debugging ^= 1;
73         iallocsummary();
74         printd("pad %lu, concat %lu, pullup %lu, copy %lu\n",
75                    padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
76         printd("consume %lu, produce %lu, qcopy %lu\n",
77                    consumecnt, producecnt, qcopycnt);
78 }
79
80 /*
81  *  free a list of blocks
82  */
83 void freeblist(struct block *b)
84 {
85         struct block *next;
86
87         for (; b != 0; b = next) {
88                 next = b->next;
89                 b->next = 0;
90                 freeb(b);
91         }
92 }
93
94 /*
95  *  pad a block to the front (or the back if size is negative)
96  */
97 struct block *padblock(struct block *bp, int size)
98 {
99         int n;
100         struct block *nbp;
101         uint8_t bcksum = bp->flag & BCKSUM_FLAGS;
102         uint16_t checksum_start = bp->checksum_start;
103         uint16_t checksum_offset = bp->checksum_offset;
104         uint16_t mss = bp->mss;
105
106         QDEBUG checkb(bp, "padblock 1");
107         if (size >= 0) {
108                 if (bp->rp - bp->base >= size) {
109                         bp->checksum_start += size;
110                         bp->rp -= size;
111                         return bp;
112                 }
113
114                 WARN_EXTRA(bp);
115                 if (bp->next)
116                         panic("padblock %p", getcallerpc(&bp));
117                 n = BLEN(bp);
118                 padblockcnt++;
119                 nbp = allocb(size + n);
120                 nbp->rp += size;
121                 nbp->wp = nbp->rp;
122                 memmove(nbp->wp, bp->rp, n);
123                 nbp->wp += n;
124                 freeb(bp);
125                 nbp->rp -= size;
126         } else {
127                 size = -size;
128
129                 WARN_EXTRA(bp);
130
131                 if (bp->next)
132                         panic("padblock %p", getcallerpc(&bp));
133
134                 if (bp->lim - bp->wp >= size)
135                         return bp;
136
137                 n = BLEN(bp);
138                 padblockcnt++;
139                 nbp = allocb(size + n);
140                 memmove(nbp->wp, bp->rp, n);
141                 nbp->wp += n;
142                 freeb(bp);
143         }
144         if (bcksum) {
145                 nbp->flag |= bcksum;
146                 nbp->checksum_start = checksum_start;
147                 nbp->checksum_offset = checksum_offset;
148                 nbp->mss = mss;
149         }
150         QDEBUG checkb(nbp, "padblock 1");
151         return nbp;
152 }
153
154 /*
155  *  return count of bytes in a string of blocks
156  */
157 int blocklen(struct block *bp)
158 {
159         int len;
160
161         len = 0;
162         while (bp) {
163                 len += BLEN(bp);
164                 bp = bp->next;
165         }
166         return len;
167 }
168
169 /*
170  * return count of space in blocks
171  */
172 int blockalloclen(struct block *bp)
173 {
174         int len;
175
176         len = 0;
177         while (bp) {
178                 len += BALLOC(bp);
179                 bp = bp->next;
180         }
181         return len;
182 }
183
184 /*
185  *  copy the  string of blocks into
186  *  a single block and free the string
187  */
188 struct block *concatblock(struct block *bp)
189 {
190         int len;
191         struct block *nb, *f;
192
193         if (bp->next == 0)
194                 return bp;
195
196         /* probably use parts of qclone */
197         WARN_EXTRA(bp);
198         nb = allocb(blocklen(bp));
199         for (f = bp; f; f = f->next) {
200                 len = BLEN(f);
201                 memmove(nb->wp, f->rp, len);
202                 nb->wp += len;
203         }
204         concatblockcnt += BLEN(nb);
205         freeblist(bp);
206         QDEBUG checkb(nb, "concatblock 1");
207         return nb;
208 }
209
210 /* Returns a block with the remaining contents of b all in the main body of the
211  * returned block.  Replace old references to b with the returned value (which
212  * may still be 'b', if no change was needed. */
213 struct block *linearizeblock(struct block *b)
214 {
215         struct block *newb;
216         size_t len;
217         struct extra_bdata *ebd;
218
219         if (!b->extra_len)
220                 return b;
221
222         newb = allocb(BLEN(b));
223         len = BHLEN(b);
224         memcpy(newb->wp, b->rp, len);
225         newb->wp += len;
226         len = b->extra_len;
227         for (int i = 0; (i < b->nr_extra_bufs) && len; i++) {
228                 ebd = &b->extra_data[i];
229                 if (!ebd->base || !ebd->len)
230                         continue;
231                 memcpy(newb->wp, (void*)(ebd->base + ebd->off), ebd->len);
232                 newb->wp += ebd->len;
233                 len -= ebd->len;
234         }
235         /* TODO: any other flags that need copied over? */
236         if (b->flag & BCKSUM_FLAGS) {
237                 newb->flag |= (b->flag & BCKSUM_FLAGS);
238                 newb->checksum_start = b->checksum_start;
239                 newb->checksum_offset = b->checksum_offset;
240                 newb->mss = b->mss;
241         }
242         freeb(b);
243         return newb;
244 }
245
246 /*
247  *  make sure the first block has at least n bytes in its main body
248  */
249 struct block *pullupblock(struct block *bp, int n)
250 {
251         int i, len, seglen;
252         struct block *nbp;
253         struct extra_bdata *ebd;
254
255         /*
256          *  this should almost always be true, it's
257          *  just to avoid every caller checking.
258          */
259         if (BHLEN(bp) >= n)
260                 return bp;
261
262          /* a start at explicit main-body / header management */
263         if (bp->extra_len) {
264                 if (n > bp->lim - bp->rp) {
265                         /* would need to realloc a new block and copy everything over. */
266                         panic("can't pullup, no place to put it\n");
267                 }
268                 len = n - BHLEN(bp);
269                 if (len > bp->extra_len)
270                         panic("pullup more than extra (%d, %d, %d)\n",
271                               n, BHLEN(bp), bp->extra_len);
272                 checkb(bp, "before pullup");
273                 for (int i = 0; (i < bp->nr_extra_bufs) && len; i++) {
274                         ebd = &bp->extra_data[i];
275                         if (!ebd->base || !ebd->len)
276                                 continue;
277                         seglen = MIN(ebd->len, len);
278                         memcpy(bp->wp, (void*)(ebd->base + ebd->off), seglen);
279                         bp->wp += seglen;
280                         len -= seglen;
281                         ebd->len -= seglen;
282                         ebd->off += seglen;
283                         bp->extra_len -= seglen;
284                         if (ebd->len == 0) {
285                                 kfree((void *)ebd->base);
286                                 ebd->len = 0;
287                                 ebd->off = 0;
288                                 ebd->base = 0;
289                         }
290                 }
291                 /* maybe just call pullupblock recursively here */
292                 if (len)
293                         panic("pullup %d bytes overdrawn\n", len);
294                 checkb(bp, "after pullup");
295                 return bp;
296         }
297
298         /*
299          *  if not enough room in the first block,
300          *  add another to the front of the list.
301          */
302         if (bp->lim - bp->rp < n) {
303                 nbp = allocb(n);
304                 nbp->next = bp;
305                 bp = nbp;
306         }
307
308         /*
309          *  copy bytes from the trailing blocks into the first
310          */
311         n -= BLEN(bp);
312         while ((nbp = bp->next)) {
313                 i = BLEN(nbp);
314                 if (i > n) {
315                         memmove(bp->wp, nbp->rp, n);
316                         pullupblockcnt++;
317                         bp->wp += n;
318                         nbp->rp += n;
319                         QDEBUG checkb(bp, "pullupblock 1");
320                         return bp;
321                 } else {
322                         memmove(bp->wp, nbp->rp, i);
323                         pullupblockcnt++;
324                         bp->wp += i;
325                         bp->next = nbp->next;
326                         nbp->next = 0;
327                         freeb(nbp);
328                         n -= i;
329                         if (n == 0) {
330                                 QDEBUG checkb(bp, "pullupblock 2");
331                                 return bp;
332                         }
333                 }
334         }
335         freeb(bp);
336         return 0;
337 }
338
339 /*
340  *  make sure the first block has at least n bytes in its main body
341  */
342 struct block *pullupqueue(struct queue *q, int n)
343 {
344         struct block *b;
345
346         /* TODO: lock to protect the queue links? */
347         if ((BHLEN(q->bfirst) >= n))
348                 return q->bfirst;
349         q->bfirst = pullupblock(q->bfirst, n);
350         for (b = q->bfirst; b != NULL && b->next != NULL; b = b->next) ;
351         q->blast = b;
352         return q->bfirst;
353 }
354
355 /*
356  *  trim to len bytes starting at offset
357  */
358 struct block *trimblock(struct block *bp, int offset, int len)
359 {
360         uint32_t l;
361         struct block *nb, *startb;
362
363         QDEBUG checkb(bp, "trimblock 1");
364         if (blocklen(bp) < offset + len) {
365                 freeblist(bp);
366                 return NULL;
367         }
368
369         while ((l = BLEN(bp)) < offset) {
370                 offset -= l;
371                 nb = bp->next;
372                 bp->next = NULL;
373                 freeb(bp);
374                 bp = nb;
375         }
376
377         WARN_EXTRA(bp);
378         startb = bp;
379         bp->rp += offset;
380
381         while ((l = BLEN(bp)) < len) {
382                 len -= l;
383                 bp = bp->next;
384         }
385
386         bp->wp -= (BLEN(bp) - len);
387
388         if (bp->next) {
389                 freeblist(bp->next);
390                 bp->next = NULL;
391         }
392
393         return startb;
394 }
395
396 /*
397  *  copy 'count' bytes into a new block
398  */
399 struct block *copyblock(struct block *bp, int count)
400 {
401         int l;
402         struct block *nbp;
403
404         QDEBUG checkb(bp, "copyblock 0");
405         nbp = allocb(count);
406         if (bp->flag & BCKSUM_FLAGS) {
407                 nbp->flag |= (bp->flag & BCKSUM_FLAGS);
408                 nbp->checksum_start = bp->checksum_start;
409                 nbp->checksum_offset = bp->checksum_offset;
410                 nbp->mss = bp->mss;
411         }
412         WARN_EXTRA(bp);
413         for (; count > 0 && bp != 0; bp = bp->next) {
414                 l = BLEN(bp);
415                 if (l > count)
416                         l = count;
417                 memmove(nbp->wp, bp->rp, l);
418                 nbp->wp += l;
419                 count -= l;
420         }
421         if (count > 0) {
422                 memset(nbp->wp, 0, count);
423                 nbp->wp += count;
424         }
425         copyblockcnt++;
426         QDEBUG checkb(nbp, "copyblock 1");
427
428         return nbp;
429 }
430
431 struct block *adjustblock(struct block *bp, int len)
432 {
433         int n;
434         struct block *nbp;
435
436         if (len < 0) {
437                 freeb(bp);
438                 return NULL;
439         }
440
441         WARN_EXTRA(bp);
442         if (bp->rp + len > bp->lim) {
443                 nbp = copyblock(bp, len);
444                 freeblist(bp);
445                 QDEBUG checkb(nbp, "adjustblock 1");
446
447                 return nbp;
448         }
449
450         n = BLEN(bp);
451         if (len > n)
452                 memset(bp->wp, 0, len - n);
453         bp->wp = bp->rp + len;
454         QDEBUG checkb(bp, "adjustblock 2");
455
456         return bp;
457 }
458
459 /*
460  *  throw away up to count bytes from a
461  *  list of blocks.  Return count of bytes
462  *  thrown away.
463  */
464 int pullblock(struct block **bph, int count)
465 {
466         struct block *bp;
467         int n, bytes;
468
469         bytes = 0;
470         if (bph == NULL)
471                 return 0;
472
473         while (*bph != NULL && count != 0) {
474                 bp = *bph;
475         WARN_EXTRA(bp);
476
477                 n = BLEN(bp);
478                 if (count < n)
479                         n = count;
480                 bytes += n;
481                 count -= n;
482                 bp->rp += n;
483                 QDEBUG checkb(bp, "pullblock ");
484                 if (BLEN(bp) == 0) {
485                         *bph = bp->next;
486                         bp->next = NULL;
487                         freeb(bp);
488                 }
489         }
490         return bytes;
491 }
492
493 /*
494  *  get next block from a queue, return null if nothing there
495  */
496 struct block *qget(struct queue *q)
497 {
498         int dowakeup;
499         struct block *b;
500
501         /* sync with qwrite */
502         spin_lock_irqsave(&q->lock);
503
504         b = q->bfirst;
505         if (b == NULL) {
506                 q->state |= Qstarve;
507                 spin_unlock_irqsave(&q->lock);
508                 return NULL;
509         }
510         q->bfirst = b->next;
511         b->next = 0;
512         q->len -= BALLOC(b);
513         q->dlen -= BLEN(b);
514         QDEBUG checkb(b, "qget");
515
516         /* if writer flow controlled, restart */
517         if ((q->state & Qflow) && q->len < q->limit / 2) {
518                 q->state &= ~Qflow;
519                 dowakeup = 1;
520         } else
521                 dowakeup = 0;
522
523         spin_unlock_irqsave(&q->lock);
524
525         if (dowakeup)
526                 rendez_wakeup(&q->wr);
527
528         return b;
529 }
530
531 /*
532  *  throw away the next 'len' bytes in the queue
533  * returning the number actually discarded
534  */
535 int qdiscard(struct queue *q, int len)
536 {
537         struct block *b;
538         int dowakeup, n, sofar, body_amt, extra_amt;
539         struct extra_bdata *ebd;
540
541         spin_lock_irqsave(&q->lock);
542         for (sofar = 0; sofar < len; sofar += n) {
543                 b = q->bfirst;
544                 if (b == NULL)
545                         break;
546                 QDEBUG checkb(b, "qdiscard");
547                 n = BLEN(b);
548                 if (n <= len - sofar) {
549                         q->bfirst = b->next;
550                         b->next = 0;
551                         q->len -= BALLOC(b);
552                         q->dlen -= BLEN(b);
553                         freeb(b);
554                 } else {
555                         n = len - sofar;
556                         q->dlen -= n;
557                         /* partial block removal */
558                         body_amt = MIN(BHLEN(b), n);
559                         b->rp += body_amt;
560                         extra_amt = n - body_amt;
561                         /* reduce q->len by the amount we remove from the extras.  The
562                          * header will always be accounted for above, during block removal.
563                          * */
564                         q->len -= extra_amt;
565                         for (int i = 0; (i < b->nr_extra_bufs) && extra_amt; i++) {
566                                 ebd = &b->extra_data[i];
567                                 if (!ebd->base || !ebd->len)
568                                         continue;
569                                 if (extra_amt >= ebd->len) {
570                                         /* remove the entire entry, note the kfree release */
571                                         b->extra_len -= ebd->len;
572                                         extra_amt -= ebd->len;
573                                         kfree((void*)ebd->base);
574                                         ebd->base = ebd->off = ebd->len = 0;
575                                         continue;
576                                 }
577                                 ebd->off += extra_amt;
578                                 ebd->len -= extra_amt;
579                                 b->extra_len -= extra_amt;
580                                 extra_amt = 0;
581                         }
582                 }
583         }
584
585         /*
586          *  if writer flow controlled, restart
587          *
588          *  This used to be
589          *  q->len < q->limit/2
590          *  but it slows down tcp too much for certain write sizes.
591          *  I really don't understand it completely.  It may be
592          *  due to the queue draining so fast that the transmission
593          *  stalls waiting for the app to produce more data.  - presotto
594          */
595         if ((q->state & Qflow) && q->len < q->limit) {
596                 q->state &= ~Qflow;
597                 dowakeup = 1;
598         } else
599                 dowakeup = 0;
600
601         spin_unlock_irqsave(&q->lock);
602
603         if (dowakeup)
604                 rendez_wakeup(&q->wr);
605
606         return sofar;
607 }
608
609 /*
610  *  Interrupt level copy out of a queue, return # bytes copied.
611  */
612 int qconsume(struct queue *q, void *vp, int len)
613 {
614         struct block *b;
615         int n, dowakeup;
616         uint8_t *p = vp;
617         struct block *tofree = NULL;
618
619         /* sync with qwrite */
620         spin_lock_irqsave(&q->lock);
621
622         for (;;) {
623                 b = q->bfirst;
624                 if (b == 0) {
625                         q->state |= Qstarve;
626                         spin_unlock_irqsave(&q->lock);
627                         return -1;
628                 }
629                 QDEBUG checkb(b, "qconsume 1");
630
631                 n = BLEN(b);
632                 if (n > 0)
633                         break;
634                 q->bfirst = b->next;
635                 q->len -= BALLOC(b);
636
637                 /* remember to free this */
638                 b->next = tofree;
639                 tofree = b;
640         };
641
642         WARN_EXTRA(b);
643         if (n < len)
644                 len = n;
645         memmove(p, b->rp, len);
646         consumecnt += n;
647         b->rp += len;
648         q->dlen -= len;
649
650         /* discard the block if we're done with it */
651         if ((q->state & Qmsg) || len == n) {
652                 q->bfirst = b->next;
653                 b->next = 0;
654                 q->len -= BALLOC(b);
655                 q->dlen -= BLEN(b);
656
657                 /* remember to free this */
658                 b->next = tofree;
659                 tofree = b;
660         }
661
662         /* if writer flow controlled, restart */
663         if ((q->state & Qflow) && q->len < q->limit / 2) {
664                 q->state &= ~Qflow;
665                 dowakeup = 1;
666         } else
667                 dowakeup = 0;
668
669         spin_unlock_irqsave(&q->lock);
670
671         if (dowakeup)
672                 rendez_wakeup(&q->wr);
673
674         if (tofree != NULL)
675                 freeblist(tofree);
676
677         return len;
678 }
679
680 int qpass(struct queue *q, struct block *b)
681 {
682         int dlen, len, dowakeup;
683
684         /* sync with qread */
685         dowakeup = 0;
686         spin_lock_irqsave(&q->lock);
687         if (q->len >= q->limit) {
688                 freeblist(b);
689                 spin_unlock_irqsave(&q->lock);
690                 return -1;
691         }
692         if (q->state & Qclosed) {
693                 len = blocklen(b);
694                 freeblist(b);
695                 spin_unlock_irqsave(&q->lock);
696                 return len;
697         }
698
699         /* add buffer to queue */
700         if (q->bfirst)
701                 q->blast->next = b;
702         else
703                 q->bfirst = b;
704         len = BALLOC(b);
705         dlen = BLEN(b);
706         QDEBUG checkb(b, "qpass");
707         while (b->next) {
708                 b = b->next;
709                 QDEBUG checkb(b, "qpass");
710                 len += BALLOC(b);
711                 dlen += BLEN(b);
712         }
713         q->blast = b;
714         q->len += len;
715         q->dlen += dlen;
716
717         if (q->len >= q->limit / 2)
718                 q->state |= Qflow;
719
720         if (q->state & Qstarve) {
721                 q->state &= ~Qstarve;
722                 dowakeup = 1;
723         }
724         spin_unlock_irqsave(&q->lock);
725
726         if (dowakeup)
727                 rendez_wakeup(&q->rr);
728
729         return len;
730 }
731
732 int qpassnolim(struct queue *q, struct block *b)
733 {
734         int dlen, len, dowakeup;
735
736         /* sync with qread */
737         dowakeup = 0;
738         spin_lock_irqsave(&q->lock);
739
740         if (q->state & Qclosed) {
741                 freeblist(b);
742                 spin_unlock_irqsave(&q->lock);
743                 return BALLOC(b);
744         }
745
746         /* add buffer to queue */
747         if (q->bfirst)
748                 q->blast->next = b;
749         else
750                 q->bfirst = b;
751         len = BALLOC(b);
752         dlen = BLEN(b);
753         QDEBUG checkb(b, "qpass");
754         while (b->next) {
755                 b = b->next;
756                 QDEBUG checkb(b, "qpass");
757                 len += BALLOC(b);
758                 dlen += BLEN(b);
759         }
760         q->blast = b;
761         q->len += len;
762         q->dlen += dlen;
763
764         if (q->len >= q->limit / 2)
765                 q->state |= Qflow;
766
767         if (q->state & Qstarve) {
768                 q->state &= ~Qstarve;
769                 dowakeup = 1;
770         }
771         spin_unlock_irqsave(&q->lock);
772
773         if (dowakeup)
774                 rendez_wakeup(&q->rr);
775
776         return len;
777 }
778
779 /*
780  *  if the allocated space is way out of line with the used
781  *  space, reallocate to a smaller block
782  */
783 struct block *packblock(struct block *bp)
784 {
785         struct block **l, *nbp;
786         int n;
787
788         WARN_EXTRA(bp);
789         for (l = &bp; *l; l = &(*l)->next) {
790                 nbp = *l;
791                 n = BLEN(nbp);
792                 if ((n << 2) < BALLOC(nbp)) {
793                         *l = allocb(n);
794                         memmove((*l)->wp, nbp->rp, n);
795                         (*l)->wp += n;
796                         (*l)->next = nbp->next;
797                         freeb(nbp);
798                 }
799         }
800
801         return bp;
802 }
803
804 int qproduce(struct queue *q, void *vp, int len)
805 {
806         struct block *b;
807         int dowakeup;
808         uint8_t *p = vp;
809
810         /* sync with qread */
811         dowakeup = 0;
812         spin_lock_irqsave(&q->lock);
813
814         /* no waiting receivers, room in buffer? */
815         if (q->len >= q->limit) {
816                 q->state |= Qflow;
817                 spin_unlock_irqsave(&q->lock);
818                 return -1;
819         }
820
821         /* save in buffer */
822         /* use Qcoalesce here to save storage */
823         // TODO: Consider removing the Qcoalesce flag and force a coalescing
824         // strategy by default.
825         b = q->blast;
826         if ((q->state & Qcoalesce) == 0 || q->bfirst == NULL
827                 || b->lim - b->wp < len) {
828                 /* need a new block */
829                 b = iallocb(len);
830                 if (b == 0) {
831                         spin_unlock_irqsave(&q->lock);
832                         return 0;
833                 }
834                 if (q->bfirst)
835                         q->blast->next = b;
836                 else
837                         q->bfirst = b;
838                 q->blast = b;
839                 /* b->next = 0; done by iallocb() */
840                 q->len += BALLOC(b);
841         }
842         WARN_EXTRA(b);
843         memmove(b->wp, p, len);
844         producecnt += len;
845         b->wp += len;
846         q->dlen += len;
847         QDEBUG checkb(b, "qproduce");
848
849         if (q->state & Qstarve) {
850                 q->state &= ~Qstarve;
851                 dowakeup = 1;
852         }
853
854         if (q->len >= q->limit)
855                 q->state |= Qflow;
856         spin_unlock_irqsave(&q->lock);
857
858         if (dowakeup)
859                 rendez_wakeup(&q->rr);
860
861         return len;
862 }
863
864 /* Add an extra_data entry to newb at newb_idx pointing to b's body, starting at
865  * body_rp, for up to len.  Returns the len consumed. 
866  *
867  * The base is 'b', so that we can kfree it later.  This currently ties us to
868  * using kfree for the release method for all extra_data.
869  *
870  * It is possible to have a body size that is 0, if there is no offset, and
871  * b->wp == b->rp.  This will have an extra data entry of 0 length. */
872 static size_t point_to_body(struct block *b, uint8_t *body_rp,
873                             struct block *newb, unsigned int newb_idx,
874                             size_t len)
875 {
876         struct extra_bdata *ebd = &newb->extra_data[newb_idx];
877
878         assert(newb_idx < newb->nr_extra_bufs);
879
880         kmalloc_incref(b);
881         ebd->base = (uintptr_t)b;
882         ebd->off = (uint32_t)(body_rp - (uint8_t*)b);
883         ebd->len = MIN(b->wp - body_rp, len);   /* think of body_rp as b->rp */
884         assert((int)ebd->len >= 0);
885         newb->extra_len += ebd->len;
886         return ebd->len;
887 }
888
889 /* Add an extra_data entry to newb at newb_idx pointing to b's b_idx'th
890  * extra_data buf, at b_off within that buffer, for up to len.  Returns the len
891  * consumed.
892  *
893  * We can have blocks with 0 length, but they are still refcnt'd.  See above. */
894 static size_t point_to_buf(struct block *b, unsigned int b_idx, uint32_t b_off,
895                            struct block *newb, unsigned int newb_idx,
896                            size_t len)
897 {
898         struct extra_bdata *n_ebd = &newb->extra_data[newb_idx];
899         struct extra_bdata *b_ebd = &b->extra_data[b_idx];
900
901         assert(b_idx < b->nr_extra_bufs);
902         assert(newb_idx < newb->nr_extra_bufs);
903
904         kmalloc_incref((void*)b_ebd->base);
905         n_ebd->base = b_ebd->base;
906         n_ebd->off = b_ebd->off + b_off;
907         n_ebd->len = MIN(b_ebd->len - b_off, len);
908         newb->extra_len += n_ebd->len;
909         return n_ebd->len;
910 }
911
912 /* given a string of blocks, fills the new block's extra_data  with the contents
913  * of the blist [offset, len + offset)
914  *
915  * returns 0 on success.  the only failure is if the extra_data array was too
916  * small, so this returns a positive integer saying how big the extra_data needs
917  * to be.
918  *
919  * callers are responsible for protecting the list structure. */
920 static int __blist_clone_to(struct block *blist, struct block *newb, int len,
921                             uint32_t offset)
922 {
923         struct block *b, *first;
924         unsigned int nr_bufs = 0;
925         unsigned int b_idx, newb_idx = 0;
926         uint8_t *first_main_body = 0;
927
928         /* find the first block; keep offset relative to the latest b in the list */
929         for (b = blist; b; b = b->next) {
930                 if (BLEN(b) > offset)
931                         break;
932                 offset -= BLEN(b);
933         }
934         /* qcopy semantics: if you asked for an offset outside the block list, you
935          * get an empty block back */
936         if (!b)
937                 return 0;
938         first = b;
939         /* upper bound for how many buffers we'll need in newb */
940         for (/* b is set*/; b; b = b->next) {
941                 nr_bufs += 1 + b->nr_extra_bufs;        /* 1 for the main body */
942         }
943         /* we might be holding a spinlock here, so we won't wait for kmalloc */
944         block_add_extd(newb, nr_bufs, 0);
945         if (newb->nr_extra_bufs < nr_bufs) {
946                 /* caller will need to alloc these, then re-call us */
947                 return nr_bufs;
948         }
949         for (b = first; b && len; b = b->next) {
950                 b_idx = 0;
951                 if (offset) {
952                         if (offset < BHLEN(b)) {
953                                 /* off is in the main body */
954                                 len -= point_to_body(b, b->rp + offset, newb, newb_idx, len);
955                                 newb_idx++;
956                         } else {
957                                 /* off is in one of the buffers (or just past the last one).
958                                  * we're not going to point to b's main body at all. */
959                                 offset -= BHLEN(b);
960                                 assert(b->extra_data);
961                                 /* assuming these extrabufs are packed, or at least that len
962                                  * isn't gibberish */
963                                 while (b->extra_data[b_idx].len <= offset) {
964                                         offset -= b->extra_data[b_idx].len;
965                                         b_idx++;
966                                 }
967                                 /* now offset is set to our offset in the b_idx'th buf */
968                                 len -= point_to_buf(b, b_idx, offset, newb, newb_idx, len);
969                                 newb_idx++;
970                                 b_idx++;
971                         }
972                         offset = 0;
973                 } else {
974                         len -= point_to_body(b, b->rp, newb, newb_idx, len);
975                         newb_idx++;
976                 }
977                 /* knock out all remaining bufs.  we only did one point_to_ op by now,
978                  * and any point_to_ could be our last if it consumed all of len. */
979                 for (int i = b_idx; (i < b->nr_extra_bufs) && len; i++) {
980                         len -= point_to_buf(b, i, 0, newb, newb_idx, len);
981                         newb_idx++;
982                 }
983         }
984         return 0;
985 }
986
987 struct block *blist_clone(struct block *blist, int header_len, int len,
988                           uint32_t offset)
989 {
990         int ret;
991         struct block *newb = allocb(header_len);
992         do {
993                 ret = __blist_clone_to(blist, newb, len, offset);
994                 if (ret)
995                         block_add_extd(newb, ret, KMALLOC_WAIT);
996         } while (ret);
997         return newb;
998 }
999
1000 /* given a queue, makes a single block with header_len reserved space in the
1001  * block main body, and the contents of [offset, len + offset) pointed to in the
1002  * new blocks ext_data. */
1003 struct block *qclone(struct queue *q, int header_len, int len, uint32_t offset)
1004 {
1005         int ret;
1006         struct block *newb = allocb(header_len);
1007         /* the while loop should rarely be used: it would require someone
1008          * concurrently adding to the queue. */
1009         do {
1010                 /* TODO: RCU: protecting the q list (b->next) (need read lock) */
1011                 spin_lock_irqsave(&q->lock);
1012                 ret = __blist_clone_to(q->bfirst, newb, len, offset);
1013                 spin_unlock_irqsave(&q->lock);
1014                 if (ret)
1015                         block_add_extd(newb, ret, KMALLOC_WAIT);
1016         } while (ret);
1017         return newb;
1018 }
1019
1020 /*
1021  *  copy from offset in the queue
1022  */
1023 struct block *qcopy_old(struct queue *q, int len, uint32_t offset)
1024 {
1025         int sofar;
1026         int n;
1027         struct block *b, *nb;
1028         uint8_t *p;
1029
1030         nb = allocb(len);
1031
1032         spin_lock_irqsave(&q->lock);
1033
1034         /* go to offset */
1035         b = q->bfirst;
1036         for (sofar = 0;; sofar += n) {
1037                 if (b == NULL) {
1038                         spin_unlock_irqsave(&q->lock);
1039                         return nb;
1040                 }
1041                 n = BLEN(b);
1042                 if (sofar + n > offset) {
1043                         p = b->rp + offset - sofar;
1044                         n -= offset - sofar;
1045                         break;
1046                 }
1047                 QDEBUG checkb(b, "qcopy");
1048                 b = b->next;
1049         }
1050
1051         /* copy bytes from there */
1052         for (sofar = 0; sofar < len;) {
1053                 if (n > len - sofar)
1054                         n = len - sofar;
1055                 WARN_EXTRA(b);
1056                 memmove(nb->wp, p, n);
1057                 qcopycnt += n;
1058                 sofar += n;
1059                 nb->wp += n;
1060                 b = b->next;
1061                 if (b == NULL)
1062                         break;
1063                 n = BLEN(b);
1064                 p = b->rp;
1065         }
1066         spin_unlock_irqsave(&q->lock);
1067
1068         return nb;
1069 }
1070
1071 struct block *qcopy(struct queue *q, int len, uint32_t offset)
1072 {
1073 #ifdef CONFIG_BLOCK_EXTRAS
1074         return qclone(q, 0, len, offset);
1075 #else
1076         return qcopy_old(q, len, offset);
1077 #endif
1078 }
1079
1080 static void qinit_common(struct queue *q)
1081 {
1082         spinlock_init_irqsave(&q->lock);
1083         qlock_init(&q->rlock);
1084         qlock_init(&q->wlock);
1085         rendez_init(&q->rr);
1086         rendez_init(&q->wr);
1087 }
1088
1089 /*
1090  *  called by non-interrupt code
1091  */
1092 struct queue *qopen(int limit, int msg, void (*kick) (void *), void *arg)
1093 {
1094         struct queue *q;
1095
1096         q = kzmalloc(sizeof(struct queue), 0);
1097         if (q == 0)
1098                 return 0;
1099         qinit_common(q);
1100
1101         q->limit = q->inilim = limit;
1102         q->kick = kick;
1103         q->arg = arg;
1104         q->state = msg;
1105         q->state |= Qstarve;
1106         q->eof = 0;
1107         q->noblock = 0;
1108
1109         return q;
1110 }
1111
1112 /* open a queue to be bypassed */
1113 struct queue *qbypass(void (*bypass) (void *, struct block *), void *arg)
1114 {
1115         struct queue *q;
1116
1117         q = kzmalloc(sizeof(struct queue), 0);
1118         if (q == 0)
1119                 return 0;
1120         qinit_common(q);
1121
1122         q->limit = 0;
1123         q->arg = arg;
1124         q->bypass = bypass;
1125         q->state = 0;
1126
1127         return q;
1128 }
1129
1130 static int notempty(void *a)
1131 {
1132         struct queue *q = a;
1133
1134         return (q->state & Qclosed) || q->bfirst != 0;
1135 }
1136
1137 /* wait for the queue to be non-empty or closed.
1138  *
1139  * called with q ilocked.  rendez may error out, back through the caller, with
1140  * the irqsave lock unlocked.  */
1141 static int qwait(struct queue *q)
1142 {
1143         /* wait for data */
1144         for (;;) {
1145                 if (q->bfirst != NULL)
1146                         break;
1147
1148                 if (q->state & Qclosed) {
1149                         if (++q->eof > 3)
1150                                 return -1;
1151                         if (*q->err && strcmp(q->err, Ehungup) != 0)
1152                                 return -1;
1153                         return 0;
1154                 }
1155
1156                 q->state |= Qstarve;    /* flag requesting producer to wake me */
1157                 spin_unlock_irqsave(&q->lock);
1158                 /* may throw an error() */
1159                 rendez_sleep(&q->rr, notempty, q);
1160                 spin_lock_irqsave(&q->lock);
1161         }
1162         return 1;
1163 }
1164
1165 /*
1166  * add a block list to a queue
1167  */
1168 void qaddlist(struct queue *q, struct block *b)
1169 {
1170         /* TODO: q lock? */
1171         /* queue the block */
1172         if (q->bfirst)
1173                 q->blast->next = b;
1174         else
1175                 q->bfirst = b;
1176         q->len += blockalloclen(b);
1177         q->dlen += blocklen(b);
1178         while (b->next)
1179                 b = b->next;
1180         q->blast = b;
1181 }
1182
1183 /*
1184  *  called with q ilocked
1185  */
1186 struct block *qremove(struct queue *q)
1187 {
1188         struct block *b;
1189
1190         b = q->bfirst;
1191         if (b == NULL)
1192                 return NULL;
1193         q->bfirst = b->next;
1194         b->next = NULL;
1195         q->dlen -= BLEN(b);
1196         q->len -= BALLOC(b);
1197         QDEBUG checkb(b, "qremove");
1198         return b;
1199 }
1200
1201 static size_t read_from_block(struct block *b, uint8_t *to, size_t amt)
1202 {
1203         size_t copy_amt, retval = 0;
1204         struct extra_bdata *ebd;
1205         
1206         copy_amt = MIN(BHLEN(b), amt);
1207         memcpy(to, b->rp, copy_amt);
1208         /* advance the rp, since this block not be completely consumed and future
1209          * reads need to know where to pick up from */
1210         b->rp += copy_amt;
1211         to += copy_amt;
1212         amt -= copy_amt;
1213         retval += copy_amt;
1214         for (int i = 0; (i < b->nr_extra_bufs) && amt; i++) {
1215                 ebd = &b->extra_data[i];
1216                 /* skip empty entires.  if we track this in the struct block, we can
1217                  * just start the for loop early */
1218                 if (!ebd->base || !ebd->len)
1219                         continue;
1220                 copy_amt = MIN(ebd->len, amt);
1221                 memcpy(to, (void*)(ebd->base + ebd->off), copy_amt);
1222                 /* we're actually consuming the entries, just like how we advance rp up
1223                  * above, and might only consume part of one. */
1224                 ebd->len -= copy_amt;
1225                 ebd->off += copy_amt;
1226                 b->extra_len -= copy_amt;
1227                 if (!ebd->len) {
1228                         /* we don't actually have to decref here.  it's also done in
1229                          * freeb().  this is the earliest we can free. */
1230                         kfree((void*)ebd->base);
1231                         ebd->base = ebd->off = 0;
1232                 }
1233                 to += copy_amt;
1234                 amt -= copy_amt;
1235                 retval += copy_amt;
1236         }
1237         return retval;
1238 }
1239
1240 /*
1241  *  copy the contents of a string of blocks into
1242  *  memory.  emptied blocks are freed.  return
1243  *  pointer to first unconsumed block.
1244  */
1245 struct block *bl2mem(uint8_t * p, struct block *b, int n)
1246 {
1247         int i;
1248         struct block *next;
1249
1250         /* could be slicker here, since read_from_block is smart */
1251         for (; b != NULL; b = next) {
1252                 i = BLEN(b);
1253                 if (i > n) {
1254                         /* partial block, consume some */
1255                         read_from_block(b, p, n);
1256                         return b;
1257                 }
1258                 /* full block, consume all and move on */
1259                 i = read_from_block(b, p, i);
1260                 n -= i;
1261                 p += i;
1262                 next = b->next;
1263                 freeb(b);
1264         }
1265         return NULL;
1266 }
1267
1268 /*
1269  *  copy the contents of memory into a string of blocks.
1270  *  return NULL on error.
1271  */
1272 struct block *mem2bl(uint8_t * p, int len)
1273 {
1274         ERRSTACK(1);
1275         int n;
1276         struct block *b, *first, **l;
1277
1278         first = NULL;
1279         l = &first;
1280         if (waserror()) {
1281                 freeblist(first);
1282                 nexterror();
1283         }
1284         do {
1285                 n = len;
1286                 if (n > Maxatomic)
1287                         n = Maxatomic;
1288
1289                 *l = b = allocb(n);
1290                 /* TODO consider extra_data */
1291                 memmove(b->wp, p, n);
1292                 b->wp += n;
1293                 p += n;
1294                 len -= n;
1295                 l = &b->next;
1296         } while (len > 0);
1297         poperror();
1298
1299         return first;
1300 }
1301
1302 /*
1303  *  put a block back to the front of the queue
1304  *  called with q ilocked
1305  */
1306 void qputback(struct queue *q, struct block *b)
1307 {
1308         b->next = q->bfirst;
1309         if (q->bfirst == NULL)
1310                 q->blast = b;
1311         q->bfirst = b;
1312         q->len += BALLOC(b);
1313         q->dlen += BLEN(b);
1314 }
1315
1316 /*
1317  *  flow control, get producer going again
1318  *  called with q ilocked
1319  */
1320 static void qwakeup_iunlock(struct queue *q)
1321 {
1322         int dowakeup = 0;
1323
1324         /* if writer flow controlled, restart */
1325         if ((q->state & Qflow) && q->len < q->limit / 2) {
1326                 q->state &= ~Qflow;
1327                 dowakeup = 1;
1328         }
1329
1330         spin_unlock_irqsave(&q->lock);
1331
1332         /* wakeup flow controlled writers */
1333         if (dowakeup) {
1334                 if (q->kick)
1335                         q->kick(q->arg);
1336                 rendez_wakeup(&q->wr);
1337         }
1338 }
1339
1340 /*
1341  *  get next block from a queue (up to a limit)
1342  */
1343 struct block *qbread(struct queue *q, int len)
1344 {
1345         ERRSTACK(1);
1346         struct block *b, *nb;
1347         int n;
1348
1349         qlock(&q->rlock);
1350         if (waserror()) {
1351                 qunlock(&q->rlock);
1352                 nexterror();
1353         }
1354
1355         spin_lock_irqsave(&q->lock);
1356         switch (qwait(q)) {
1357                 case 0:
1358                         /* queue closed */
1359                         spin_unlock_irqsave(&q->lock);
1360                         qunlock(&q->rlock);
1361                         poperror();
1362                         return NULL;
1363                 case -1:
1364                         /* multiple reads on a closed queue */
1365                         spin_unlock_irqsave(&q->lock);
1366                         error(q->err);
1367         }
1368
1369         /* if we get here, there's at least one block in the queue */
1370         b = qremove(q);
1371         n = BLEN(b);
1372
1373         /* split block if it's too big and this is not a message queue */
1374         nb = b;
1375         if (n > len) {
1376                 WARN_EXTRA(b);
1377                 if ((q->state & Qmsg) == 0) {
1378                         n -= len;
1379                         b = allocb(n);
1380                         memmove(b->wp, nb->rp + len, n);
1381                         b->wp += n;
1382                         qputback(q, b);
1383                 }
1384                 nb->wp = nb->rp + len;
1385         }
1386
1387         /* restart producer */
1388         qwakeup_iunlock(q);
1389
1390         poperror();
1391         qunlock(&q->rlock);
1392         return nb;
1393 }
1394
1395 /*
1396  *  read a queue.  if no data is queued, post a struct block
1397  *  and wait on its Rendez.
1398  */
1399 long qread(struct queue *q, void *vp, int len)
1400 {
1401         ERRSTACK(1);
1402         struct block *b, *first, **l;
1403         int m, n;
1404
1405         qlock(&q->rlock);
1406         if (waserror()) {
1407                 qunlock(&q->rlock);
1408                 nexterror();
1409         }
1410
1411         spin_lock_irqsave(&q->lock);
1412 again:
1413         switch (qwait(q)) {
1414                 case 0:
1415                         /* queue closed */
1416                         spin_unlock_irqsave(&q->lock);
1417                         qunlock(&q->rlock);
1418                         poperror();
1419                         return 0;
1420                 case -1:
1421                         /* multiple reads on a closed queue */
1422                         spin_unlock_irqsave(&q->lock);
1423                         error(q->err);
1424         }
1425
1426         /* if we get here, there's at least one block in the queue */
1427         // TODO: Consider removing the Qcoalesce flag and force a coalescing
1428         // strategy by default.
1429         if (q->state & Qcoalesce) {
1430                 /* when coalescing, 0 length blocks just go away */
1431                 b = q->bfirst;
1432                 if (BLEN(b) <= 0) {
1433                         freeb(qremove(q));
1434                         goto again;
1435                 }
1436
1437                 /*  grab the first block plus as many
1438                  *  following blocks as will completely
1439                  *  fit in the read.
1440                  */
1441                 n = 0;
1442                 l = &first;
1443                 m = BLEN(b);
1444                 for (;;) {
1445                         *l = qremove(q);
1446                         l = &b->next;
1447                         n += m;
1448
1449                         b = q->bfirst;
1450                         if (b == NULL)
1451                                 break;
1452                         m = BLEN(b);
1453                         if (n + m > len)
1454                                 break;
1455                 }
1456         } else {
1457                 first = qremove(q);
1458                 n = BLEN(first);
1459         }
1460
1461         /* copy to user space outside of the ilock */
1462         spin_unlock_irqsave(&q->lock);
1463         b = bl2mem(vp, first, len);
1464         spin_lock_irqsave(&q->lock);
1465
1466         /* take care of any left over partial block */
1467         if (b != NULL) {
1468                 n -= BLEN(b);
1469                 if (q->state & Qmsg)
1470                         freeb(b);
1471                 else
1472                         qputback(q, b);
1473         }
1474
1475         /* restart producer */
1476         qwakeup_iunlock(q);
1477
1478         poperror();
1479         qunlock(&q->rlock);
1480         return n;
1481 }
1482
1483 static int qnotfull(void *a)
1484 {
1485         struct queue *q = a;
1486
1487         return q->len < q->limit || (q->state & Qclosed);
1488 }
1489
1490 uint32_t noblockcnt;
1491
1492 /*
1493  *  add a block to a queue obeying flow control
1494  */
1495 long qbwrite(struct queue *q, struct block *b)
1496 {
1497         ERRSTACK(1);
1498         int n, dowakeup;
1499         volatile bool should_free_b = TRUE;
1500
1501         n = BLEN(b);
1502
1503         if (q->bypass) {
1504                 (*q->bypass) (q->arg, b);
1505                 return n;
1506         }
1507
1508         dowakeup = 0;
1509         qlock(&q->wlock);
1510         if (waserror()) {
1511                 if (b != NULL && should_free_b)
1512                         freeb(b);
1513                 qunlock(&q->wlock);
1514                 nexterror();
1515         }
1516
1517         spin_lock_irqsave(&q->lock);
1518
1519         /* give up if the queue is closed */
1520         if (q->state & Qclosed) {
1521                 spin_unlock_irqsave(&q->lock);
1522                 error(q->err);
1523         }
1524
1525         /* if nonblocking, don't queue over the limit */
1526         if (q->len >= q->limit) {
1527                 if (q->noblock) {
1528                         spin_unlock_irqsave(&q->lock);
1529                         freeb(b);
1530                         noblockcnt += n;
1531                         qunlock(&q->wlock);
1532                         poperror();
1533                         return n;
1534                 }
1535         }
1536
1537         /* queue the block */
1538         should_free_b = FALSE;
1539         if (q->bfirst)
1540                 q->blast->next = b;
1541         else
1542                 q->bfirst = b;
1543         q->blast = b;
1544         b->next = 0;
1545         q->len += BALLOC(b);
1546         q->dlen += n;
1547         QDEBUG checkb(b, "qbwrite");
1548         b = NULL;
1549
1550         /* make sure other end gets awakened */
1551         if (q->state & Qstarve) {
1552                 q->state &= ~Qstarve;
1553                 dowakeup = 1;
1554         }
1555         spin_unlock_irqsave(&q->lock);
1556
1557         /*  get output going again */
1558         if (q->kick && (dowakeup || (q->state & Qkick)))
1559                 q->kick(q->arg);
1560
1561         /* wakeup anyone consuming at the other end */
1562         if (dowakeup)
1563                 rendez_wakeup(&q->rr);
1564
1565         /*
1566          *  flow control, wait for queue to get below the limit
1567          *  before allowing the process to continue and queue
1568          *  more.  We do this here so that postnote can only
1569          *  interrupt us after the data has been queued.  This
1570          *  means that things like 9p flushes and ssl messages
1571          *  will not be disrupted by software interrupts.
1572          *
1573          *  Note - this is moderately dangerous since a process
1574          *  that keeps getting interrupted and rewriting will
1575          *  queue infinite crud.
1576          */
1577         for (;;) {
1578                 if (q->noblock || qnotfull(q))
1579                         break;
1580
1581                 spin_lock_irqsave(&q->lock);
1582                 q->state |= Qflow;
1583                 spin_unlock_irqsave(&q->lock);
1584                 rendez_sleep(&q->wr, qnotfull, q);
1585         }
1586
1587         qunlock(&q->wlock);
1588         poperror();
1589         return n;
1590 }
1591
1592 long qibwrite(struct queue *q, struct block *b)
1593 {
1594         int n, dowakeup;
1595
1596         dowakeup = 0;
1597
1598         n = BLEN(b);
1599
1600         spin_lock_irqsave(&q->lock);
1601
1602         QDEBUG checkb(b, "qibwrite");
1603         if (q->bfirst)
1604                 q->blast->next = b;
1605         else
1606                 q->bfirst = b;
1607         q->blast = b;
1608         q->len += BALLOC(b);
1609         q->dlen += n;
1610
1611         if (q->state & Qstarve) {
1612                 q->state &= ~Qstarve;
1613                 dowakeup = 1;
1614         }
1615
1616         spin_unlock_irqsave(&q->lock);
1617
1618         if (dowakeup) {
1619                 if (q->kick)
1620                         q->kick(q->arg);
1621                 rendez_wakeup(&q->rr);
1622         }
1623
1624         return n;
1625 }
1626
1627 /*
1628  *  write to a queue.  only Maxatomic bytes at a time is atomic.
1629  */
1630 int qwrite(struct queue *q, void *vp, int len)
1631 {
1632         int n, sofar;
1633         struct block *b;
1634         uint8_t *p = vp;
1635         void *ext_buf;
1636
1637         QDEBUG if (!islo())
1638                  printd("qwrite hi %p\n", getcallerpc(&q));
1639
1640         sofar = 0;
1641         do {
1642                 n = len - sofar;
1643                 /* This is 64K, the max amount per single block.  Still a good value? */
1644                 if (n > Maxatomic)
1645                         n = Maxatomic;
1646
1647                 /* If n is small, we don't need to bother with the extra_data.  But
1648                  * until the whole stack can handle extd blocks, we'll use them
1649                  * unconditionally. */
1650 #ifdef CONFIG_BLOCK_EXTRAS
1651                 /* allocb builds in 128 bytes of header space to all blocks, but this is
1652                  * only available via padblock (to the left).  we also need some space
1653                  * for pullupblock for some basic headers (like icmp) that get written
1654                  * in directly */
1655                 b = allocb(64);
1656                 ext_buf = kmalloc(n, 0);
1657                 memcpy(ext_buf, p + sofar, n);
1658                 block_add_extd(b, 1, KMALLOC_WAIT); /* returns 0 on success */
1659                 b->extra_data[0].base = (uintptr_t)ext_buf;
1660                 b->extra_data[0].off = 0;
1661                 b->extra_data[0].len = n;
1662                 b->extra_len += n;
1663 #else
1664                 b = allocb(n);
1665                 memmove(b->wp, p + sofar, n);
1666                 b->wp += n;
1667 #endif
1668                         
1669                 qbwrite(q, b);
1670
1671                 sofar += n;
1672         } while (sofar < len && (q->state & Qmsg) == 0);
1673
1674         return len;
1675 }
1676
1677 /*
1678  *  used by print() to write to a queue.  Since we may be splhi or not in
1679  *  a process, don't qlock.
1680  */
1681 int qiwrite(struct queue *q, void *vp, int len)
1682 {
1683         int n, sofar, dowakeup;
1684         struct block *b;
1685         uint8_t *p = vp;
1686
1687         dowakeup = 0;
1688
1689         sofar = 0;
1690         do {
1691                 n = len - sofar;
1692                 if (n > Maxatomic)
1693                         n = Maxatomic;
1694
1695                 b = iallocb(n);
1696                 if (b == NULL)
1697                         break;
1698                 /* TODO consider extra_data */
1699                 memmove(b->wp, p + sofar, n);
1700                 /* this adjusts BLEN to be n, or at least it should */
1701                 b->wp += n;
1702                 assert(n == BLEN(b));
1703                 qibwrite(q, b);
1704
1705                 sofar += n;
1706         } while (sofar < len && (q->state & Qmsg) == 0);
1707
1708         return sofar;
1709 }
1710
1711 /*
1712  *  be extremely careful when calling this,
1713  *  as there is no reference accounting
1714  */
1715 void qfree(struct queue *q)
1716 {
1717         qclose(q);
1718         kfree(q);
1719 }
1720
1721 /*
1722  *  Mark a queue as closed.  No further IO is permitted.
1723  *  All blocks are released.
1724  */
1725 void qclose(struct queue *q)
1726 {
1727         struct block *bfirst;
1728
1729         if (q == NULL)
1730                 return;
1731
1732         /* mark it */
1733         spin_lock_irqsave(&q->lock);
1734         q->state |= Qclosed;
1735         q->state &= ~(Qflow | Qstarve);
1736         strncpy(q->err, Ehungup, sizeof(q->err));
1737         bfirst = q->bfirst;
1738         q->bfirst = 0;
1739         q->len = 0;
1740         q->dlen = 0;
1741         q->noblock = 0;
1742         spin_unlock_irqsave(&q->lock);
1743
1744         /* free queued blocks */
1745         freeblist(bfirst);
1746
1747         /* wake up readers/writers */
1748         rendez_wakeup(&q->rr);
1749         rendez_wakeup(&q->wr);
1750 }
1751
1752 /*
1753  *  Mark a queue as closed.  Wakeup any readers.  Don't remove queued
1754  *  blocks.
1755  */
1756 void qhangup(struct queue *q, char *msg)
1757 {
1758         /* mark it */
1759         spin_lock_irqsave(&q->lock);
1760         q->state |= Qclosed;
1761         if (msg == 0 || *msg == 0)
1762                 strncpy(q->err, Ehungup, sizeof(q->err));
1763         else
1764                 strncpy(q->err, msg, ERRMAX - 1);
1765         spin_unlock_irqsave(&q->lock);
1766
1767         /* wake up readers/writers */
1768         rendez_wakeup(&q->rr);
1769         rendez_wakeup(&q->wr);
1770 }
1771
1772 /*
1773  *  return non-zero if the q is hungup
1774  */
1775 int qisclosed(struct queue *q)
1776 {
1777         return q->state & Qclosed;
1778 }
1779
1780 /*
1781  *  mark a queue as no longer hung up
1782  */
1783 void qreopen(struct queue *q)
1784 {
1785         spin_lock_irqsave(&q->lock);
1786         q->state &= ~Qclosed;
1787         q->state |= Qstarve;
1788         q->eof = 0;
1789         q->limit = q->inilim;
1790         spin_unlock_irqsave(&q->lock);
1791 }
1792
1793 /*
1794  *  return bytes queued
1795  */
1796 int qlen(struct queue *q)
1797 {
1798         return q->dlen;
1799 }
1800
1801 /*
1802  * return space remaining before flow control
1803  */
1804 int qwindow(struct queue *q)
1805 {
1806         int l;
1807
1808         l = q->limit - q->len;
1809         if (l < 0)
1810                 l = 0;
1811         return l;
1812 }
1813
1814 /*
1815  *  return true if we can read without blocking
1816  */
1817 int qcanread(struct queue *q)
1818 {
1819         return q->bfirst != 0;
1820 }
1821
1822 /*
1823  *  change queue limit
1824  */
1825 void qsetlimit(struct queue *q, int limit)
1826 {
1827         q->limit = limit;
1828 }
1829
1830 /*
1831  *  set blocking/nonblocking
1832  */
1833 void qnoblock(struct queue *q, int onoff)
1834 {
1835         q->noblock = onoff;
1836 }
1837
1838 /*
1839  *  flush the output queue
1840  */
1841 void qflush(struct queue *q)
1842 {
1843         struct block *bfirst;
1844
1845         /* mark it */
1846         spin_lock_irqsave(&q->lock);
1847         bfirst = q->bfirst;
1848         q->bfirst = 0;
1849         q->len = 0;
1850         q->dlen = 0;
1851         spin_unlock_irqsave(&q->lock);
1852
1853         /* free queued blocks */
1854         freeblist(bfirst);
1855
1856         /* wake up readers/writers */
1857         rendez_wakeup(&q->wr);
1858 }
1859
1860 int qfull(struct queue *q)
1861 {
1862         return q->state & Qflow;
1863 }
1864
1865 int qstate(struct queue *q)
1866 {
1867         return q->state;
1868 }
1869
1870 void qdump(struct queue *q)
1871 {
1872         if (q)
1873                 printk("q=%p bfirst=%p blast=%p len=%d dlen=%d limit=%d state=#%x\n",
1874                            q, q->bfirst, q->blast, q->len, q->dlen, q->limit, q->state);
1875 }