Block pullup fixes
[akaros.git] / kern / src / ns / qio.c
1 // INFERNO
2 #include <vfs.h>
3 #include <kfs.h>
4 #include <slab.h>
5 #include <kmalloc.h>
6 #include <kref.h>
7 #include <string.h>
8 #include <stdio.h>
9 #include <assert.h>
10 #include <error.h>
11 #include <cpio.h>
12 #include <pmap.h>
13 #include <smp.h>
14 #include <ip.h>
15
16 #define WARN_EXTRA(b)                                                          \
17 {                                                                              \
18         if ((b)->extra_len)                                                        \
19                 warn_once("%s doesn't handle extra_data", __FUNCTION__);               \
20 }
21
22 static uint32_t padblockcnt;
23 static uint32_t concatblockcnt;
24 static uint32_t pullupblockcnt;
25 static uint32_t copyblockcnt;
26 static uint32_t consumecnt;
27 static uint32_t producecnt;
28 static uint32_t qcopycnt;
29
30 static int debugging;
31
32 #define QDEBUG  if(0)
33
34 /*
35  *  IO queues
36  */
37
38 struct queue {
39         spinlock_t lock;;
40
41         struct block *bfirst;           /* buffer */
42         struct block *blast;
43
44         int len;                                        /* bytes allocated to queue */
45         int dlen;                                       /* data bytes in queue */
46         int limit;                                      /* max bytes in queue */
47         int inilim;                             /* initial limit */
48         int state;
49         int noblock;                            /* true if writes return immediately when q full */
50         int eof;                                        /* number of eofs read by user */
51
52         void (*kick) (void *);          /* restart output */
53         void (*bypass) (void *, struct block *);        /* bypass queue altogether */
54         void *arg;                                      /* argument to kick */
55
56         qlock_t rlock;                          /* mutex for reading processes */
57         struct rendez rr;                       /* process waiting to read */
58         qlock_t wlock;                          /* mutex for writing processes */
59         struct rendez wr;                       /* process waiting to write */
60
61         char err[ERRMAX];
62 };
63
64 enum {
65         Maxatomic = 64 * 1024,
66 };
67
68 unsigned int qiomaxatomic = Maxatomic;
69
70 void ixsummary(void)
71 {
72         debugging ^= 1;
73         iallocsummary();
74         printd("pad %lu, concat %lu, pullup %lu, copy %lu\n",
75                    padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
76         printd("consume %lu, produce %lu, qcopy %lu\n",
77                    consumecnt, producecnt, qcopycnt);
78 }
79
80 /*
81  *  free a list of blocks
82  */
83 void freeblist(struct block *b)
84 {
85         struct block *next;
86
87         for (; b != 0; b = next) {
88                 next = b->next;
89                 b->next = 0;
90                 freeb(b);
91         }
92 }
93
94 /*
95  *  pad a block to the front (or the back if size is negative)
96  */
97 struct block *padblock(struct block *bp, int size)
98 {
99         int n;
100         struct block *nbp;
101         uint8_t bcksum = bp->flag & BCKSUM_FLAGS;
102         uint16_t checksum_start = bp->checksum_start;
103         uint16_t checksum_offset = bp->checksum_offset;
104
105         QDEBUG checkb(bp, "padblock 1");
106         if (size >= 0) {
107                 if (bp->rp - bp->base >= size) {
108                         bp->checksum_start += size;
109                         bp->rp -= size;
110                         return bp;
111                 }
112
113                 WARN_EXTRA(bp);
114                 if (bp->next)
115                         panic("padblock %p", getcallerpc(&bp));
116                 n = BLEN(bp);
117                 padblockcnt++;
118                 nbp = allocb(size + n);
119                 nbp->rp += size;
120                 nbp->wp = nbp->rp;
121                 memmove(nbp->wp, bp->rp, n);
122                 nbp->wp += n;
123                 freeb(bp);
124                 nbp->rp -= size;
125         } else {
126                 size = -size;
127
128                 WARN_EXTRA(bp);
129
130                 if (bp->next)
131                         panic("padblock %p", getcallerpc(&bp));
132
133                 if (bp->lim - bp->wp >= size)
134                         return bp;
135
136                 n = BLEN(bp);
137                 padblockcnt++;
138                 nbp = allocb(size + n);
139                 memmove(nbp->wp, bp->rp, n);
140                 nbp->wp += n;
141                 freeb(bp);
142         }
143         if (bcksum) {
144                 nbp->flag |= bcksum;
145                 nbp->checksum_start = checksum_start;
146                 nbp->checksum_offset = checksum_offset;
147         }
148         QDEBUG checkb(nbp, "padblock 1");
149         return nbp;
150 }
151
152 /*
153  *  return count of bytes in a string of blocks
154  */
155 int blocklen(struct block *bp)
156 {
157         int len;
158
159         len = 0;
160         while (bp) {
161                 len += BLEN(bp);
162                 bp = bp->next;
163         }
164         return len;
165 }
166
167 /*
168  * return count of space in blocks
169  */
170 int blockalloclen(struct block *bp)
171 {
172         int len;
173
174         len = 0;
175         while (bp) {
176                 len += BALLOC(bp);
177                 bp = bp->next;
178         }
179         return len;
180 }
181
182 /*
183  *  copy the  string of blocks into
184  *  a single block and free the string
185  */
186 struct block *concatblock(struct block *bp)
187 {
188         int len;
189         struct block *nb, *f;
190
191         if (bp->next == 0)
192                 return bp;
193
194         /* probably use parts of qclone */
195         WARN_EXTRA(bp);
196         nb = allocb(blocklen(bp));
197         for (f = bp; f; f = f->next) {
198                 len = BLEN(f);
199                 memmove(nb->wp, f->rp, len);
200                 nb->wp += len;
201         }
202         concatblockcnt += BLEN(nb);
203         freeblist(bp);
204         QDEBUG checkb(nb, "concatblock 1");
205         return nb;
206 }
207
208 /* Returns a block with the remaining contents of b all in the main body of the
209  * returned block.  Replace old references to b with the returned value (which
210  * may still be 'b', if no change was needed. */
211 struct block *linearizeblock(struct block *b)
212 {
213         struct block *newb;
214         size_t len;
215         struct extra_bdata *ebd;
216
217         if (!b->extra_len)
218                 return b;
219
220         newb = allocb(BLEN(b));
221         len = BHLEN(b);
222         memcpy(newb->wp, b->rp, len);
223         newb->wp += len;
224         len = b->extra_len;
225         for (int i = 0; (i < b->nr_extra_bufs) && len; i++) {
226                 ebd = &b->extra_data[i];
227                 if (!ebd->base || !ebd->len)
228                         continue;
229                 memcpy(newb->wp, (void*)(ebd->base + ebd->off), ebd->len);
230                 newb->wp += ebd->len;
231                 len -= ebd->len;
232         }
233         /* TODO: any other flags that need copied over? */
234         if (b->flag & BCKSUM_FLAGS) {
235                 newb->flag |= (b->flag & BCKSUM_FLAGS);
236                 newb->checksum_start = b->checksum_start;
237                 newb->checksum_offset = b->checksum_offset;
238         }
239         freeb(b);
240         return newb;
241 }
242
243 /*
244  *  make sure the first block has at least n bytes in its main body
245  */
246 struct block *pullupblock(struct block *bp, int n)
247 {
248         int i, len, seglen;
249         struct block *nbp;
250         struct extra_bdata *ebd;
251
252         /*
253          *  this should almost always be true, it's
254          *  just to avoid every caller checking.
255          */
256         if (BHLEN(bp) >= n)
257                 return bp;
258
259          /* a start at explicit main-body / header management */
260         if (bp->extra_len) {
261                 if (n > bp->lim - bp->rp) {
262                         /* would need to realloc a new block and copy everything over. */
263                         panic("can't pullup, no place to put it\n");
264                 }
265                 len = n - BHLEN(bp);
266                 if (len > bp->extra_len)
267                         panic("pullup more than extra (%d, %d, %d)\n",
268                               n, BHLEN(bp), bp->extra_len);
269                 checkb(bp, "before pullup");
270                 for (int i = 0; (i < bp->nr_extra_bufs) && len; i++) {
271                         ebd = &bp->extra_data[i];
272                         if (!ebd->base || !ebd->len)
273                                 continue;
274                         seglen = MIN(ebd->len, len);
275                         memcpy(bp->wp, (void*)(ebd->base + ebd->off), seglen);
276                         bp->wp += seglen;
277                         len -= seglen;
278                         ebd->len -= seglen;
279                         ebd->off += seglen;
280                         bp->extra_len -= seglen;
281                         if (ebd->len == 0) {
282                                 kfree((void *)ebd->base);
283                                 ebd->len = 0;
284                                 ebd->off = 0;
285                                 ebd->base = 0;
286                         }
287                 }
288                 /* maybe just call pullupblock recursively here */
289                 if (len)
290                         panic("pullup %d bytes overdrawn\n", len);
291                 checkb(bp, "after pullup");
292                 return bp;
293         }
294
295         /*
296          *  if not enough room in the first block,
297          *  add another to the front of the list.
298          */
299         if (bp->lim - bp->rp < n) {
300                 nbp = allocb(n);
301                 nbp->next = bp;
302                 bp = nbp;
303         }
304
305         /*
306          *  copy bytes from the trailing blocks into the first
307          */
308         n -= BLEN(bp);
309         while ((nbp = bp->next)) {
310                 i = BLEN(nbp);
311                 if (i > n) {
312                         memmove(bp->wp, nbp->rp, n);
313                         pullupblockcnt++;
314                         bp->wp += n;
315                         nbp->rp += n;
316                         QDEBUG checkb(bp, "pullupblock 1");
317                         return bp;
318                 } else {
319                         memmove(bp->wp, nbp->rp, i);
320                         pullupblockcnt++;
321                         bp->wp += i;
322                         bp->next = nbp->next;
323                         nbp->next = 0;
324                         freeb(nbp);
325                         n -= i;
326                         if (n == 0) {
327                                 QDEBUG checkb(bp, "pullupblock 2");
328                                 return bp;
329                         }
330                 }
331         }
332         freeb(bp);
333         return 0;
334 }
335
336 /*
337  *  make sure the first block has at least n bytes in its main body
338  */
339 struct block *pullupqueue(struct queue *q, int n)
340 {
341         struct block *b;
342
343         /* TODO: lock to protect the queue links? */
344         if ((BHLEN(q->bfirst) >= n))
345                 return q->bfirst;
346         q->bfirst = pullupblock(q->bfirst, n);
347         for (b = q->bfirst; b != NULL && b->next != NULL; b = b->next) ;
348         q->blast = b;
349         return q->bfirst;
350 }
351
352 /*
353  *  trim to len bytes starting at offset
354  */
355 struct block *trimblock(struct block *bp, int offset, int len)
356 {
357         uint32_t l;
358         struct block *nb, *startb;
359
360         QDEBUG checkb(bp, "trimblock 1");
361         if (blocklen(bp) < offset + len) {
362                 freeblist(bp);
363                 return NULL;
364         }
365
366         while ((l = BLEN(bp)) < offset) {
367                 offset -= l;
368                 nb = bp->next;
369                 bp->next = NULL;
370                 freeb(bp);
371                 bp = nb;
372         }
373
374         WARN_EXTRA(bp);
375         startb = bp;
376         bp->rp += offset;
377
378         while ((l = BLEN(bp)) < len) {
379                 len -= l;
380                 bp = bp->next;
381         }
382
383         bp->wp -= (BLEN(bp) - len);
384
385         if (bp->next) {
386                 freeblist(bp->next);
387                 bp->next = NULL;
388         }
389
390         return startb;
391 }
392
393 /*
394  *  copy 'count' bytes into a new block
395  */
396 struct block *copyblock(struct block *bp, int count)
397 {
398         int l;
399         struct block *nbp;
400
401         QDEBUG checkb(bp, "copyblock 0");
402         nbp = allocb(count);
403         if (bp->flag & BCKSUM_FLAGS) {
404                 nbp->flag |= (bp->flag & BCKSUM_FLAGS);
405                 nbp->checksum_start = bp->checksum_start;
406                 nbp->checksum_offset = bp->checksum_offset;
407         }
408         WARN_EXTRA(bp);
409         for (; count > 0 && bp != 0; bp = bp->next) {
410                 l = BLEN(bp);
411                 if (l > count)
412                         l = count;
413                 memmove(nbp->wp, bp->rp, l);
414                 nbp->wp += l;
415                 count -= l;
416         }
417         if (count > 0) {
418                 memset(nbp->wp, 0, count);
419                 nbp->wp += count;
420         }
421         copyblockcnt++;
422         QDEBUG checkb(nbp, "copyblock 1");
423
424         return nbp;
425 }
426
427 struct block *adjustblock(struct block *bp, int len)
428 {
429         int n;
430         struct block *nbp;
431
432         if (len < 0) {
433                 freeb(bp);
434                 return NULL;
435         }
436
437         WARN_EXTRA(bp);
438         if (bp->rp + len > bp->lim) {
439                 nbp = copyblock(bp, len);
440                 freeblist(bp);
441                 QDEBUG checkb(nbp, "adjustblock 1");
442
443                 return nbp;
444         }
445
446         n = BLEN(bp);
447         if (len > n)
448                 memset(bp->wp, 0, len - n);
449         bp->wp = bp->rp + len;
450         QDEBUG checkb(bp, "adjustblock 2");
451
452         return bp;
453 }
454
455 /*
456  *  throw away up to count bytes from a
457  *  list of blocks.  Return count of bytes
458  *  thrown away.
459  */
460 int pullblock(struct block **bph, int count)
461 {
462         struct block *bp;
463         int n, bytes;
464
465         bytes = 0;
466         if (bph == NULL)
467                 return 0;
468
469         while (*bph != NULL && count != 0) {
470                 bp = *bph;
471         WARN_EXTRA(bp);
472
473                 n = BLEN(bp);
474                 if (count < n)
475                         n = count;
476                 bytes += n;
477                 count -= n;
478                 bp->rp += n;
479                 QDEBUG checkb(bp, "pullblock ");
480                 if (BLEN(bp) == 0) {
481                         *bph = bp->next;
482                         bp->next = NULL;
483                         freeb(bp);
484                 }
485         }
486         return bytes;
487 }
488
489 /*
490  *  get next block from a queue, return null if nothing there
491  */
492 struct block *qget(struct queue *q)
493 {
494         int dowakeup;
495         struct block *b;
496
497         /* sync with qwrite */
498         spin_lock_irqsave(&q->lock);
499
500         b = q->bfirst;
501         if (b == NULL) {
502                 q->state |= Qstarve;
503                 spin_unlock_irqsave(&q->lock);
504                 return NULL;
505         }
506         q->bfirst = b->next;
507         b->next = 0;
508         q->len -= BALLOC(b);
509         q->dlen -= BLEN(b);
510         QDEBUG checkb(b, "qget");
511
512         /* if writer flow controlled, restart */
513         if ((q->state & Qflow) && q->len < q->limit / 2) {
514                 q->state &= ~Qflow;
515                 dowakeup = 1;
516         } else
517                 dowakeup = 0;
518
519         spin_unlock_irqsave(&q->lock);
520
521         if (dowakeup)
522                 rendez_wakeup(&q->wr);
523
524         return b;
525 }
526
527 /*
528  *  throw away the next 'len' bytes in the queue
529  * returning the number actually discarded
530  */
531 int qdiscard(struct queue *q, int len)
532 {
533         struct block *b;
534         int dowakeup, n, sofar, body_amt, extra_amt;
535         struct extra_bdata *ebd;
536
537         spin_lock_irqsave(&q->lock);
538         for (sofar = 0; sofar < len; sofar += n) {
539                 b = q->bfirst;
540                 if (b == NULL)
541                         break;
542                 QDEBUG checkb(b, "qdiscard");
543                 n = BLEN(b);
544                 if (n <= len - sofar) {
545                         q->bfirst = b->next;
546                         b->next = 0;
547                         q->len -= BALLOC(b);
548                         q->dlen -= BLEN(b);
549                         freeb(b);
550                 } else {
551                         n = len - sofar;
552                         q->dlen -= n;
553                         /* partial block removal */
554                         body_amt = MIN(BHLEN(b), n);
555                         b->rp += body_amt;
556                         extra_amt = n - body_amt;
557                         /* reduce q->len by the amount we remove from the extras.  The
558                          * header will always be accounted for above, during block removal.
559                          * */
560                         q->len -= extra_amt;
561                         for (int i = 0; (i < b->nr_extra_bufs) && extra_amt; i++) {
562                                 ebd = &b->extra_data[i];
563                                 if (!ebd->base || !ebd->len)
564                                         continue;
565                                 if (extra_amt >= ebd->len) {
566                                         /* remove the entire entry, note the kfree release */
567                                         b->extra_len -= ebd->len;
568                                         extra_amt -= ebd->len;
569                                         kfree((void*)ebd->base);
570                                         ebd->base = ebd->off = ebd->len = 0;
571                                         continue;
572                                 }
573                                 ebd->off += extra_amt;
574                                 ebd->len -= extra_amt;
575                                 b->extra_len -= extra_amt;
576                                 extra_amt = 0;
577                         }
578                 }
579         }
580
581         /*
582          *  if writer flow controlled, restart
583          *
584          *  This used to be
585          *  q->len < q->limit/2
586          *  but it slows down tcp too much for certain write sizes.
587          *  I really don't understand it completely.  It may be
588          *  due to the queue draining so fast that the transmission
589          *  stalls waiting for the app to produce more data.  - presotto
590          */
591         if ((q->state & Qflow) && q->len < q->limit) {
592                 q->state &= ~Qflow;
593                 dowakeup = 1;
594         } else
595                 dowakeup = 0;
596
597         spin_unlock_irqsave(&q->lock);
598
599         if (dowakeup)
600                 rendez_wakeup(&q->wr);
601
602         return sofar;
603 }
604
605 /*
606  *  Interrupt level copy out of a queue, return # bytes copied.
607  */
608 int qconsume(struct queue *q, void *vp, int len)
609 {
610         struct block *b;
611         int n, dowakeup;
612         uint8_t *p = vp;
613         struct block *tofree = NULL;
614
615         /* sync with qwrite */
616         spin_lock_irqsave(&q->lock);
617
618         for (;;) {
619                 b = q->bfirst;
620                 if (b == 0) {
621                         q->state |= Qstarve;
622                         spin_unlock_irqsave(&q->lock);
623                         return -1;
624                 }
625                 QDEBUG checkb(b, "qconsume 1");
626
627                 n = BLEN(b);
628                 if (n > 0)
629                         break;
630                 q->bfirst = b->next;
631                 q->len -= BALLOC(b);
632
633                 /* remember to free this */
634                 b->next = tofree;
635                 tofree = b;
636         };
637
638         WARN_EXTRA(b);
639         if (n < len)
640                 len = n;
641         memmove(p, b->rp, len);
642         consumecnt += n;
643         b->rp += len;
644         q->dlen -= len;
645
646         /* discard the block if we're done with it */
647         if ((q->state & Qmsg) || len == n) {
648                 q->bfirst = b->next;
649                 b->next = 0;
650                 q->len -= BALLOC(b);
651                 q->dlen -= BLEN(b);
652
653                 /* remember to free this */
654                 b->next = tofree;
655                 tofree = b;
656         }
657
658         /* if writer flow controlled, restart */
659         if ((q->state & Qflow) && q->len < q->limit / 2) {
660                 q->state &= ~Qflow;
661                 dowakeup = 1;
662         } else
663                 dowakeup = 0;
664
665         spin_unlock_irqsave(&q->lock);
666
667         if (dowakeup)
668                 rendez_wakeup(&q->wr);
669
670         if (tofree != NULL)
671                 freeblist(tofree);
672
673         return len;
674 }
675
676 int qpass(struct queue *q, struct block *b)
677 {
678         int dlen, len, dowakeup;
679
680         /* sync with qread */
681         dowakeup = 0;
682         spin_lock_irqsave(&q->lock);
683         if (q->len >= q->limit) {
684                 freeblist(b);
685                 spin_unlock_irqsave(&q->lock);
686                 return -1;
687         }
688         if (q->state & Qclosed) {
689                 len = blocklen(b);
690                 freeblist(b);
691                 spin_unlock_irqsave(&q->lock);
692                 return len;
693         }
694
695         /* add buffer to queue */
696         if (q->bfirst)
697                 q->blast->next = b;
698         else
699                 q->bfirst = b;
700         len = BALLOC(b);
701         dlen = BLEN(b);
702         QDEBUG checkb(b, "qpass");
703         while (b->next) {
704                 b = b->next;
705                 QDEBUG checkb(b, "qpass");
706                 len += BALLOC(b);
707                 dlen += BLEN(b);
708         }
709         q->blast = b;
710         q->len += len;
711         q->dlen += dlen;
712
713         if (q->len >= q->limit / 2)
714                 q->state |= Qflow;
715
716         if (q->state & Qstarve) {
717                 q->state &= ~Qstarve;
718                 dowakeup = 1;
719         }
720         spin_unlock_irqsave(&q->lock);
721
722         if (dowakeup)
723                 rendez_wakeup(&q->rr);
724
725         return len;
726 }
727
728 int qpassnolim(struct queue *q, struct block *b)
729 {
730         int dlen, len, dowakeup;
731
732         /* sync with qread */
733         dowakeup = 0;
734         spin_lock_irqsave(&q->lock);
735
736         if (q->state & Qclosed) {
737                 freeblist(b);
738                 spin_unlock_irqsave(&q->lock);
739                 return BALLOC(b);
740         }
741
742         /* add buffer to queue */
743         if (q->bfirst)
744                 q->blast->next = b;
745         else
746                 q->bfirst = b;
747         len = BALLOC(b);
748         dlen = BLEN(b);
749         QDEBUG checkb(b, "qpass");
750         while (b->next) {
751                 b = b->next;
752                 QDEBUG checkb(b, "qpass");
753                 len += BALLOC(b);
754                 dlen += BLEN(b);
755         }
756         q->blast = b;
757         q->len += len;
758         q->dlen += dlen;
759
760         if (q->len >= q->limit / 2)
761                 q->state |= Qflow;
762
763         if (q->state & Qstarve) {
764                 q->state &= ~Qstarve;
765                 dowakeup = 1;
766         }
767         spin_unlock_irqsave(&q->lock);
768
769         if (dowakeup)
770                 rendez_wakeup(&q->rr);
771
772         return len;
773 }
774
775 /*
776  *  if the allocated space is way out of line with the used
777  *  space, reallocate to a smaller block
778  */
779 struct block *packblock(struct block *bp)
780 {
781         struct block **l, *nbp;
782         int n;
783
784         WARN_EXTRA(bp);
785         for (l = &bp; *l; l = &(*l)->next) {
786                 nbp = *l;
787                 n = BLEN(nbp);
788                 if ((n << 2) < BALLOC(nbp)) {
789                         *l = allocb(n);
790                         memmove((*l)->wp, nbp->rp, n);
791                         (*l)->wp += n;
792                         (*l)->next = nbp->next;
793                         freeb(nbp);
794                 }
795         }
796
797         return bp;
798 }
799
800 int qproduce(struct queue *q, void *vp, int len)
801 {
802         struct block *b;
803         int dowakeup;
804         uint8_t *p = vp;
805
806         /* sync with qread */
807         dowakeup = 0;
808         spin_lock_irqsave(&q->lock);
809
810         /* no waiting receivers, room in buffer? */
811         if (q->len >= q->limit) {
812                 q->state |= Qflow;
813                 spin_unlock_irqsave(&q->lock);
814                 return -1;
815         }
816
817         /* save in buffer */
818         /* use Qcoalesce here to save storage */
819         // TODO: Consider removing the Qcoalesce flag and force a coalescing
820         // strategy by default.
821         b = q->blast;
822         if ((q->state & Qcoalesce) == 0 || q->bfirst == NULL
823                 || b->lim - b->wp < len) {
824                 /* need a new block */
825                 b = iallocb(len);
826                 if (b == 0) {
827                         spin_unlock_irqsave(&q->lock);
828                         return 0;
829                 }
830                 if (q->bfirst)
831                         q->blast->next = b;
832                 else
833                         q->bfirst = b;
834                 q->blast = b;
835                 /* b->next = 0; done by iallocb() */
836                 q->len += BALLOC(b);
837         }
838         WARN_EXTRA(b);
839         memmove(b->wp, p, len);
840         producecnt += len;
841         b->wp += len;
842         q->dlen += len;
843         QDEBUG checkb(b, "qproduce");
844
845         if (q->state & Qstarve) {
846                 q->state &= ~Qstarve;
847                 dowakeup = 1;
848         }
849
850         if (q->len >= q->limit)
851                 q->state |= Qflow;
852         spin_unlock_irqsave(&q->lock);
853
854         if (dowakeup)
855                 rendez_wakeup(&q->rr);
856
857         return len;
858 }
859
860 /* Add an extra_data entry to newb at newb_idx pointing to b's body, starting at
861  * body_rp, for up to len.  Returns the len consumed. 
862  *
863  * The base is 'b', so that we can kfree it later.  This currently ties us to
864  * using kfree for the release method for all extra_data.
865  *
866  * It is possible to have a body size that is 0, if there is no offset, and
867  * b->wp == b->rp.  This will have an extra data entry of 0 length. */
868 static size_t point_to_body(struct block *b, uint8_t *body_rp,
869                             struct block *newb, unsigned int newb_idx,
870                             size_t len)
871 {
872         struct extra_bdata *ebd = &newb->extra_data[newb_idx];
873
874         assert(newb_idx < newb->nr_extra_bufs);
875
876         kmalloc_incref(b);
877         ebd->base = (uintptr_t)b;
878         ebd->off = (uint32_t)(body_rp - (uint8_t*)b);
879         ebd->len = MIN(b->wp - body_rp, len);   /* think of body_rp as b->rp */
880         assert((int)ebd->len >= 0);
881         newb->extra_len += ebd->len;
882         return ebd->len;
883 }
884
885 /* Add an extra_data entry to newb at newb_idx pointing to b's b_idx'th
886  * extra_data buf, at b_off within that buffer, for up to len.  Returns the len
887  * consumed.
888  *
889  * We can have blocks with 0 length, but they are still refcnt'd.  See above. */
890 static size_t point_to_buf(struct block *b, unsigned int b_idx, uint32_t b_off,
891                            struct block *newb, unsigned int newb_idx,
892                            size_t len)
893 {
894         struct extra_bdata *n_ebd = &newb->extra_data[newb_idx];
895         struct extra_bdata *b_ebd = &b->extra_data[b_idx];
896
897         assert(b_idx < b->nr_extra_bufs);
898         assert(newb_idx < newb->nr_extra_bufs);
899
900         kmalloc_incref((void*)b_ebd->base);
901         n_ebd->base = b_ebd->base;
902         n_ebd->off = b_ebd->off + b_off;
903         n_ebd->len = MIN(b_ebd->len - b_off, len);
904         newb->extra_len += n_ebd->len;
905         return n_ebd->len;
906 }
907
908 /* given a string of blocks, fills the new block's extra_data  with the contents
909  * of the blist [offset, len + offset)
910  *
911  * returns 0 on success.  the only failure is if the extra_data array was too
912  * small, so this returns a positive integer saying how big the extra_data needs
913  * to be.
914  *
915  * callers are responsible for protecting the list structure. */
916 static int __blist_clone_to(struct block *blist, struct block *newb, int len,
917                             uint32_t offset)
918 {
919         struct block *b, *first;
920         unsigned int nr_bufs = 0;
921         unsigned int b_idx, newb_idx = 0;
922         uint8_t *first_main_body = 0;
923
924         /* find the first block; keep offset relative to the latest b in the list */
925         for (b = blist; b; b = b->next) {
926                 if (BLEN(b) > offset)
927                         break;
928                 offset -= BLEN(b);
929         }
930         /* qcopy semantics: if you asked for an offset outside the block list, you
931          * get an empty block back */
932         if (!b)
933                 return 0;
934         first = b;
935         /* upper bound for how many buffers we'll need in newb */
936         for (/* b is set*/; b; b = b->next) {
937                 nr_bufs += 1 + b->nr_extra_bufs;        /* 1 for the main body */
938         }
939         /* we might be holding a spinlock here, so we won't wait for kmalloc */
940         block_add_extd(newb, nr_bufs, 0);
941         if (newb->nr_extra_bufs < nr_bufs) {
942                 /* caller will need to alloc these, then re-call us */
943                 return nr_bufs;
944         }
945         for (b = first; b && len; b = b->next) {
946                 b_idx = 0;
947                 if (offset) {
948                         if (offset < BHLEN(b)) {
949                                 /* off is in the main body */
950                                 len -= point_to_body(b, b->rp + offset, newb, newb_idx, len);
951                                 newb_idx++;
952                         } else {
953                                 /* off is in one of the buffers (or just past the last one).
954                                  * we're not going to point to b's main body at all. */
955                                 offset -= BHLEN(b);
956                                 assert(b->extra_data);
957                                 /* assuming these extrabufs are packed, or at least that len
958                                  * isn't gibberish */
959                                 while (b->extra_data[b_idx].len <= offset) {
960                                         offset -= b->extra_data[b_idx].len;
961                                         b_idx++;
962                                 }
963                                 /* now offset is set to our offset in the b_idx'th buf */
964                                 len -= point_to_buf(b, b_idx, offset, newb, newb_idx, len);
965                                 newb_idx++;
966                                 b_idx++;
967                         }
968                         offset = 0;
969                 } else {
970                         len -= point_to_body(b, b->rp, newb, newb_idx, len);
971                         newb_idx++;
972                 }
973                 /* knock out all remaining bufs.  we only did one point_to_ op by now,
974                  * and any point_to_ could be our last if it consumed all of len. */
975                 for (int i = b_idx; (i < b->nr_extra_bufs) && len; i++) {
976                         len -= point_to_buf(b, i, 0, newb, newb_idx, len);
977                         newb_idx++;
978                 }
979         }
980         return 0;
981 }
982
983 struct block *blist_clone(struct block *blist, int header_len, int len,
984                           uint32_t offset)
985 {
986         int ret;
987         struct block *newb = allocb(header_len);
988         do {
989                 ret = __blist_clone_to(blist, newb, len, offset);
990                 if (ret)
991                         block_add_extd(newb, ret, KMALLOC_WAIT);
992         } while (ret);
993         return newb;
994 }
995
996 /* given a queue, makes a single block with header_len reserved space in the
997  * block main body, and the contents of [offset, len + offset) pointed to in the
998  * new blocks ext_data. */
999 struct block *qclone(struct queue *q, int header_len, int len, uint32_t offset)
1000 {
1001         int ret;
1002         struct block *newb = allocb(header_len);
1003         /* the while loop should rarely be used: it would require someone
1004          * concurrently adding to the queue. */
1005         do {
1006                 /* TODO: RCU: protecting the q list (b->next) (need read lock) */
1007                 spin_lock_irqsave(&q->lock);
1008                 ret = __blist_clone_to(q->bfirst, newb, len, offset);
1009                 spin_unlock_irqsave(&q->lock);
1010                 if (ret)
1011                         block_add_extd(newb, ret, KMALLOC_WAIT);
1012         } while (ret);
1013         return newb;
1014 }
1015
1016 /*
1017  *  copy from offset in the queue
1018  */
1019 struct block *qcopy_old(struct queue *q, int len, uint32_t offset)
1020 {
1021         int sofar;
1022         int n;
1023         struct block *b, *nb;
1024         uint8_t *p;
1025
1026         nb = allocb(len);
1027
1028         spin_lock_irqsave(&q->lock);
1029
1030         /* go to offset */
1031         b = q->bfirst;
1032         for (sofar = 0;; sofar += n) {
1033                 if (b == NULL) {
1034                         spin_unlock_irqsave(&q->lock);
1035                         return nb;
1036                 }
1037                 n = BLEN(b);
1038                 if (sofar + n > offset) {
1039                         p = b->rp + offset - sofar;
1040                         n -= offset - sofar;
1041                         break;
1042                 }
1043                 QDEBUG checkb(b, "qcopy");
1044                 b = b->next;
1045         }
1046
1047         /* copy bytes from there */
1048         for (sofar = 0; sofar < len;) {
1049                 if (n > len - sofar)
1050                         n = len - sofar;
1051                 WARN_EXTRA(b);
1052                 memmove(nb->wp, p, n);
1053                 qcopycnt += n;
1054                 sofar += n;
1055                 nb->wp += n;
1056                 b = b->next;
1057                 if (b == NULL)
1058                         break;
1059                 n = BLEN(b);
1060                 p = b->rp;
1061         }
1062         spin_unlock_irqsave(&q->lock);
1063
1064         return nb;
1065 }
1066
1067 struct block *qcopy(struct queue *q, int len, uint32_t offset)
1068 {
1069 #ifdef CONFIG_BLOCK_EXTRAS
1070         return qclone(q, 0, len, offset);
1071 #else
1072         return qcopy_old(q, len, offset);
1073 #endif
1074 }
1075
1076 static void qinit_common(struct queue *q)
1077 {
1078         spinlock_init_irqsave(&q->lock);
1079         qlock_init(&q->rlock);
1080         qlock_init(&q->wlock);
1081         rendez_init(&q->rr);
1082         rendez_init(&q->wr);
1083 }
1084
1085 /*
1086  *  called by non-interrupt code
1087  */
1088 struct queue *qopen(int limit, int msg, void (*kick) (void *), void *arg)
1089 {
1090         struct queue *q;
1091
1092         q = kzmalloc(sizeof(struct queue), 0);
1093         if (q == 0)
1094                 return 0;
1095         qinit_common(q);
1096
1097         q->limit = q->inilim = limit;
1098         q->kick = kick;
1099         q->arg = arg;
1100         q->state = msg;
1101         q->state |= Qstarve;
1102         q->eof = 0;
1103         q->noblock = 0;
1104
1105         return q;
1106 }
1107
1108 /* open a queue to be bypassed */
1109 struct queue *qbypass(void (*bypass) (void *, struct block *), void *arg)
1110 {
1111         struct queue *q;
1112
1113         q = kzmalloc(sizeof(struct queue), 0);
1114         if (q == 0)
1115                 return 0;
1116         qinit_common(q);
1117
1118         q->limit = 0;
1119         q->arg = arg;
1120         q->bypass = bypass;
1121         q->state = 0;
1122
1123         return q;
1124 }
1125
1126 static int notempty(void *a)
1127 {
1128         struct queue *q = a;
1129
1130         return (q->state & Qclosed) || q->bfirst != 0;
1131 }
1132
1133 /* wait for the queue to be non-empty or closed.
1134  *
1135  * called with q ilocked.  rendez may error out, back through the caller, with
1136  * the irqsave lock unlocked.  */
1137 static int qwait(struct queue *q)
1138 {
1139         /* wait for data */
1140         for (;;) {
1141                 if (q->bfirst != NULL)
1142                         break;
1143
1144                 if (q->state & Qclosed) {
1145                         if (++q->eof > 3)
1146                                 return -1;
1147                         if (*q->err && strcmp(q->err, Ehungup) != 0)
1148                                 return -1;
1149                         return 0;
1150                 }
1151
1152                 q->state |= Qstarve;    /* flag requesting producer to wake me */
1153                 spin_unlock_irqsave(&q->lock);
1154                 /* may throw an error() */
1155                 rendez_sleep(&q->rr, notempty, q);
1156                 spin_lock_irqsave(&q->lock);
1157         }
1158         return 1;
1159 }
1160
1161 /*
1162  * add a block list to a queue
1163  */
1164 void qaddlist(struct queue *q, struct block *b)
1165 {
1166         /* TODO: q lock? */
1167         /* queue the block */
1168         if (q->bfirst)
1169                 q->blast->next = b;
1170         else
1171                 q->bfirst = b;
1172         q->len += blockalloclen(b);
1173         q->dlen += blocklen(b);
1174         while (b->next)
1175                 b = b->next;
1176         q->blast = b;
1177 }
1178
1179 /*
1180  *  called with q ilocked
1181  */
1182 struct block *qremove(struct queue *q)
1183 {
1184         struct block *b;
1185
1186         b = q->bfirst;
1187         if (b == NULL)
1188                 return NULL;
1189         q->bfirst = b->next;
1190         b->next = NULL;
1191         q->dlen -= BLEN(b);
1192         q->len -= BALLOC(b);
1193         QDEBUG checkb(b, "qremove");
1194         b = linearizeblock(b);
1195         return b;
1196 }
1197
1198 static size_t read_from_block(struct block *b, uint8_t *to, size_t amt)
1199 {
1200         size_t copy_amt, retval = 0;
1201         struct extra_bdata *ebd;
1202         
1203         copy_amt = MIN(BHLEN(b), amt);
1204         memcpy(to, b->rp, copy_amt);
1205         /* advance the rp, since this block not be completely consumed and future
1206          * reads need to know where to pick up from */
1207         b->rp += copy_amt;
1208         to += copy_amt;
1209         amt -= copy_amt;
1210         retval += copy_amt;
1211         for (int i = 0; (i < b->nr_extra_bufs) && amt; i++) {
1212                 ebd = &b->extra_data[i];
1213                 /* skip empty entires.  if we track this in the struct block, we can
1214                  * just start the for loop early */
1215                 if (!ebd->base || !ebd->len)
1216                         continue;
1217                 copy_amt = MIN(ebd->len, amt);
1218                 memcpy(to, (void*)(ebd->base + ebd->off), copy_amt);
1219                 /* we're actually consuming the entries, just like how we advance rp up
1220                  * above, and might only consume part of one. */
1221                 ebd->len -= copy_amt;
1222                 ebd->off += copy_amt;
1223                 b->extra_len -= copy_amt;
1224                 if (!ebd->len) {
1225                         /* we don't actually have to decref here.  it's also done in
1226                          * freeb().  this is the earliest we can free. */
1227                         kfree((void*)ebd->base);
1228                         ebd->base = ebd->off = 0;
1229                 }
1230                 to += copy_amt;
1231                 amt -= copy_amt;
1232                 retval += copy_amt;
1233         }
1234         return retval;
1235 }
1236
1237 /*
1238  *  copy the contents of a string of blocks into
1239  *  memory.  emptied blocks are freed.  return
1240  *  pointer to first unconsumed block.
1241  */
1242 struct block *bl2mem(uint8_t * p, struct block *b, int n)
1243 {
1244         int i;
1245         struct block *next;
1246
1247         /* could be slicker here, since read_from_block is smart */
1248         for (; b != NULL; b = next) {
1249                 i = BLEN(b);
1250                 if (i > n) {
1251                         /* partial block, consume some */
1252                         read_from_block(b, p, n);
1253                         return b;
1254                 }
1255                 /* full block, consume all and move on */
1256                 i = read_from_block(b, p, i);
1257                 n -= i;
1258                 p += i;
1259                 next = b->next;
1260                 freeb(b);
1261         }
1262         return NULL;
1263 }
1264
1265 /*
1266  *  copy the contents of memory into a string of blocks.
1267  *  return NULL on error.
1268  */
1269 struct block *mem2bl(uint8_t * p, int len)
1270 {
1271         ERRSTACK(1);
1272         int n;
1273         struct block *b, *first, **l;
1274
1275         first = NULL;
1276         l = &first;
1277         if (waserror()) {
1278                 freeblist(first);
1279                 nexterror();
1280         }
1281         do {
1282                 n = len;
1283                 if (n > Maxatomic)
1284                         n = Maxatomic;
1285
1286                 *l = b = allocb(n);
1287                 /* TODO consider extra_data */
1288                 memmove(b->wp, p, n);
1289                 b->wp += n;
1290                 p += n;
1291                 len -= n;
1292                 l = &b->next;
1293         } while (len > 0);
1294         poperror();
1295
1296         return first;
1297 }
1298
1299 /*
1300  *  put a block back to the front of the queue
1301  *  called with q ilocked
1302  */
1303 void qputback(struct queue *q, struct block *b)
1304 {
1305         b->next = q->bfirst;
1306         if (q->bfirst == NULL)
1307                 q->blast = b;
1308         q->bfirst = b;
1309         q->len += BALLOC(b);
1310         q->dlen += BLEN(b);
1311 }
1312
1313 /*
1314  *  flow control, get producer going again
1315  *  called with q ilocked
1316  */
1317 static void qwakeup_iunlock(struct queue *q)
1318 {
1319         int dowakeup = 0;
1320
1321         /* if writer flow controlled, restart */
1322         if ((q->state & Qflow) && q->len < q->limit / 2) {
1323                 q->state &= ~Qflow;
1324                 dowakeup = 1;
1325         }
1326
1327         spin_unlock_irqsave(&q->lock);
1328
1329         /* wakeup flow controlled writers */
1330         if (dowakeup) {
1331                 if (q->kick)
1332                         q->kick(q->arg);
1333                 rendez_wakeup(&q->wr);
1334         }
1335 }
1336
1337 /*
1338  *  get next block from a queue (up to a limit)
1339  */
1340 struct block *qbread(struct queue *q, int len)
1341 {
1342         ERRSTACK(1);
1343         struct block *b, *nb;
1344         int n;
1345
1346         qlock(&q->rlock);
1347         if (waserror()) {
1348                 qunlock(&q->rlock);
1349                 nexterror();
1350         }
1351
1352         spin_lock_irqsave(&q->lock);
1353         switch (qwait(q)) {
1354                 case 0:
1355                         /* queue closed */
1356                         spin_unlock_irqsave(&q->lock);
1357                         qunlock(&q->rlock);
1358                         poperror();
1359                         return NULL;
1360                 case -1:
1361                         /* multiple reads on a closed queue */
1362                         spin_unlock_irqsave(&q->lock);
1363                         error(q->err);
1364         }
1365
1366         /* if we get here, there's at least one block in the queue */
1367         b = qremove(q);
1368         n = BLEN(b);
1369
1370         /* split block if it's too big and this is not a message queue */
1371         nb = b;
1372         if (n > len) {
1373                 WARN_EXTRA(b);
1374                 if ((q->state & Qmsg) == 0) {
1375                         n -= len;
1376                         b = allocb(n);
1377                         memmove(b->wp, nb->rp + len, n);
1378                         b->wp += n;
1379                         qputback(q, b);
1380                 }
1381                 nb->wp = nb->rp + len;
1382         }
1383
1384         /* restart producer */
1385         qwakeup_iunlock(q);
1386
1387         poperror();
1388         qunlock(&q->rlock);
1389         return nb;
1390 }
1391
1392 /*
1393  *  read a queue.  if no data is queued, post a struct block
1394  *  and wait on its Rendez.
1395  */
1396 long qread(struct queue *q, void *vp, int len)
1397 {
1398         ERRSTACK(1);
1399         struct block *b, *first, **l;
1400         int m, n;
1401
1402         qlock(&q->rlock);
1403         if (waserror()) {
1404                 qunlock(&q->rlock);
1405                 nexterror();
1406         }
1407
1408         spin_lock_irqsave(&q->lock);
1409 again:
1410         switch (qwait(q)) {
1411                 case 0:
1412                         /* queue closed */
1413                         spin_unlock_irqsave(&q->lock);
1414                         qunlock(&q->rlock);
1415                         poperror();
1416                         return 0;
1417                 case -1:
1418                         /* multiple reads on a closed queue */
1419                         spin_unlock_irqsave(&q->lock);
1420                         error(q->err);
1421         }
1422
1423         /* if we get here, there's at least one block in the queue */
1424         // TODO: Consider removing the Qcoalesce flag and force a coalescing
1425         // strategy by default.
1426         if (q->state & Qcoalesce) {
1427                 /* when coalescing, 0 length blocks just go away */
1428                 b = q->bfirst;
1429                 if (BLEN(b) <= 0) {
1430                         freeb(qremove(q));
1431                         goto again;
1432                 }
1433
1434                 /*  grab the first block plus as many
1435                  *  following blocks as will completely
1436                  *  fit in the read.
1437                  */
1438                 n = 0;
1439                 l = &first;
1440                 m = BLEN(b);
1441                 for (;;) {
1442                         *l = qremove(q);
1443                         l = &b->next;
1444                         n += m;
1445
1446                         b = q->bfirst;
1447                         if (b == NULL)
1448                                 break;
1449                         m = BLEN(b);
1450                         if (n + m > len)
1451                                 break;
1452                 }
1453         } else {
1454                 first = qremove(q);
1455                 n = BLEN(first);
1456         }
1457
1458         /* copy to user space outside of the ilock */
1459         spin_unlock_irqsave(&q->lock);
1460         b = bl2mem(vp, first, len);
1461         spin_lock_irqsave(&q->lock);
1462
1463         /* take care of any left over partial block */
1464         if (b != NULL) {
1465                 n -= BLEN(b);
1466                 if (q->state & Qmsg)
1467                         freeb(b);
1468                 else
1469                         qputback(q, b);
1470         }
1471
1472         /* restart producer */
1473         qwakeup_iunlock(q);
1474
1475         poperror();
1476         qunlock(&q->rlock);
1477         return n;
1478 }
1479
1480 static int qnotfull(void *a)
1481 {
1482         struct queue *q = a;
1483
1484         return q->len < q->limit || (q->state & Qclosed);
1485 }
1486
1487 uint32_t noblockcnt;
1488
1489 /*
1490  *  add a block to a queue obeying flow control
1491  */
1492 long qbwrite(struct queue *q, struct block *b)
1493 {
1494         ERRSTACK(1);
1495         int n, dowakeup;
1496         volatile bool should_free_b = TRUE;
1497
1498         n = BLEN(b);
1499
1500         if (q->bypass) {
1501                 (*q->bypass) (q->arg, b);
1502                 return n;
1503         }
1504
1505         dowakeup = 0;
1506         qlock(&q->wlock);
1507         if (waserror()) {
1508                 if (b != NULL && should_free_b)
1509                         freeb(b);
1510                 qunlock(&q->wlock);
1511                 nexterror();
1512         }
1513
1514         spin_lock_irqsave(&q->lock);
1515
1516         /* give up if the queue is closed */
1517         if (q->state & Qclosed) {
1518                 spin_unlock_irqsave(&q->lock);
1519                 error(q->err);
1520         }
1521
1522         /* if nonblocking, don't queue over the limit */
1523         if (q->len >= q->limit) {
1524                 if (q->noblock) {
1525                         spin_unlock_irqsave(&q->lock);
1526                         freeb(b);
1527                         noblockcnt += n;
1528                         qunlock(&q->wlock);
1529                         poperror();
1530                         return n;
1531                 }
1532         }
1533
1534         /* queue the block */
1535         should_free_b = FALSE;
1536         if (q->bfirst)
1537                 q->blast->next = b;
1538         else
1539                 q->bfirst = b;
1540         q->blast = b;
1541         b->next = 0;
1542         q->len += BALLOC(b);
1543         q->dlen += n;
1544         QDEBUG checkb(b, "qbwrite");
1545         b = NULL;
1546
1547         /* make sure other end gets awakened */
1548         if (q->state & Qstarve) {
1549                 q->state &= ~Qstarve;
1550                 dowakeup = 1;
1551         }
1552         spin_unlock_irqsave(&q->lock);
1553
1554         /*  get output going again */
1555         if (q->kick && (dowakeup || (q->state & Qkick)))
1556                 q->kick(q->arg);
1557
1558         /* wakeup anyone consuming at the other end */
1559         if (dowakeup)
1560                 rendez_wakeup(&q->rr);
1561
1562         /*
1563          *  flow control, wait for queue to get below the limit
1564          *  before allowing the process to continue and queue
1565          *  more.  We do this here so that postnote can only
1566          *  interrupt us after the data has been queued.  This
1567          *  means that things like 9p flushes and ssl messages
1568          *  will not be disrupted by software interrupts.
1569          *
1570          *  Note - this is moderately dangerous since a process
1571          *  that keeps getting interrupted and rewriting will
1572          *  queue infinite crud.
1573          */
1574         for (;;) {
1575                 if (q->noblock || qnotfull(q))
1576                         break;
1577
1578                 spin_lock_irqsave(&q->lock);
1579                 q->state |= Qflow;
1580                 spin_unlock_irqsave(&q->lock);
1581                 rendez_sleep(&q->wr, qnotfull, q);
1582         }
1583
1584         qunlock(&q->wlock);
1585         poperror();
1586         return n;
1587 }
1588
1589 long qibwrite(struct queue *q, struct block *b)
1590 {
1591         int n, dowakeup;
1592
1593         dowakeup = 0;
1594
1595         n = BLEN(b);
1596
1597         spin_lock_irqsave(&q->lock);
1598
1599         QDEBUG checkb(b, "qibwrite");
1600         if (q->bfirst)
1601                 q->blast->next = b;
1602         else
1603                 q->bfirst = b;
1604         q->blast = b;
1605         q->len += BALLOC(b);
1606         q->dlen += n;
1607
1608         if (q->state & Qstarve) {
1609                 q->state &= ~Qstarve;
1610                 dowakeup = 1;
1611         }
1612
1613         spin_unlock_irqsave(&q->lock);
1614
1615         if (dowakeup) {
1616                 if (q->kick)
1617                         q->kick(q->arg);
1618                 rendez_wakeup(&q->rr);
1619         }
1620
1621         return n;
1622 }
1623
1624 /*
1625  *  write to a queue.  only Maxatomic bytes at a time is atomic.
1626  */
1627 int qwrite(struct queue *q, void *vp, int len)
1628 {
1629         int n, sofar;
1630         struct block *b;
1631         uint8_t *p = vp;
1632         void *ext_buf;
1633
1634         QDEBUG if (!islo())
1635                  printd("qwrite hi %p\n", getcallerpc(&q));
1636
1637         sofar = 0;
1638         do {
1639                 n = len - sofar;
1640                 /* This is 64K, the max amount per single block.  Still a good value? */
1641                 if (n > Maxatomic)
1642                         n = Maxatomic;
1643
1644                 /* If n is small, we don't need to bother with the extra_data.  But
1645                  * until the whole stack can handle extd blocks, we'll use them
1646                  * unconditionally. */
1647 #ifdef CONFIG_BLOCK_EXTRAS
1648                 /* allocb builds in 128 bytes of header space to all blocks, but this is
1649                  * only available via padblock (to the left).  we also need some space
1650                  * for pullupblock for some basic headers (like icmp) that get written
1651                  * in directly */
1652                 b = allocb(64);
1653                 ext_buf = kmalloc(n, 0);
1654                 memcpy(ext_buf, p + sofar, n);
1655                 block_add_extd(b, 1, KMALLOC_WAIT); /* returns 0 on success */
1656                 b->extra_data[0].base = (uintptr_t)ext_buf;
1657                 b->extra_data[0].off = 0;
1658                 b->extra_data[0].len = n;
1659                 b->extra_len += n;
1660 #else
1661                 b = allocb(n);
1662                 memmove(b->wp, p + sofar, n);
1663                 b->wp += n;
1664 #endif
1665                         
1666                 qbwrite(q, b);
1667
1668                 sofar += n;
1669         } while (sofar < len && (q->state & Qmsg) == 0);
1670
1671         return len;
1672 }
1673
1674 /*
1675  *  used by print() to write to a queue.  Since we may be splhi or not in
1676  *  a process, don't qlock.
1677  */
1678 int qiwrite(struct queue *q, void *vp, int len)
1679 {
1680         int n, sofar, dowakeup;
1681         struct block *b;
1682         uint8_t *p = vp;
1683
1684         dowakeup = 0;
1685
1686         sofar = 0;
1687         do {
1688                 n = len - sofar;
1689                 if (n > Maxatomic)
1690                         n = Maxatomic;
1691
1692                 b = iallocb(n);
1693                 if (b == NULL)
1694                         break;
1695                 /* TODO consider extra_data */
1696                 memmove(b->wp, p + sofar, n);
1697                 /* this adjusts BLEN to be n, or at least it should */
1698                 b->wp += n;
1699                 assert(n == BLEN(b));
1700                 qibwrite(q, b);
1701
1702                 sofar += n;
1703         } while (sofar < len && (q->state & Qmsg) == 0);
1704
1705         return sofar;
1706 }
1707
1708 /*
1709  *  be extremely careful when calling this,
1710  *  as there is no reference accounting
1711  */
1712 void qfree(struct queue *q)
1713 {
1714         qclose(q);
1715         kfree(q);
1716 }
1717
1718 /*
1719  *  Mark a queue as closed.  No further IO is permitted.
1720  *  All blocks are released.
1721  */
1722 void qclose(struct queue *q)
1723 {
1724         struct block *bfirst;
1725
1726         if (q == NULL)
1727                 return;
1728
1729         /* mark it */
1730         spin_lock_irqsave(&q->lock);
1731         q->state |= Qclosed;
1732         q->state &= ~(Qflow | Qstarve);
1733         strncpy(q->err, Ehungup, sizeof(q->err));
1734         bfirst = q->bfirst;
1735         q->bfirst = 0;
1736         q->len = 0;
1737         q->dlen = 0;
1738         q->noblock = 0;
1739         spin_unlock_irqsave(&q->lock);
1740
1741         /* free queued blocks */
1742         freeblist(bfirst);
1743
1744         /* wake up readers/writers */
1745         rendez_wakeup(&q->rr);
1746         rendez_wakeup(&q->wr);
1747 }
1748
1749 /*
1750  *  Mark a queue as closed.  Wakeup any readers.  Don't remove queued
1751  *  blocks.
1752  */
1753 void qhangup(struct queue *q, char *msg)
1754 {
1755         /* mark it */
1756         spin_lock_irqsave(&q->lock);
1757         q->state |= Qclosed;
1758         if (msg == 0 || *msg == 0)
1759                 strncpy(q->err, Ehungup, sizeof(q->err));
1760         else
1761                 strncpy(q->err, msg, ERRMAX - 1);
1762         spin_unlock_irqsave(&q->lock);
1763
1764         /* wake up readers/writers */
1765         rendez_wakeup(&q->rr);
1766         rendez_wakeup(&q->wr);
1767 }
1768
1769 /*
1770  *  return non-zero if the q is hungup
1771  */
1772 int qisclosed(struct queue *q)
1773 {
1774         return q->state & Qclosed;
1775 }
1776
1777 /*
1778  *  mark a queue as no longer hung up
1779  */
1780 void qreopen(struct queue *q)
1781 {
1782         spin_lock_irqsave(&q->lock);
1783         q->state &= ~Qclosed;
1784         q->state |= Qstarve;
1785         q->eof = 0;
1786         q->limit = q->inilim;
1787         spin_unlock_irqsave(&q->lock);
1788 }
1789
1790 /*
1791  *  return bytes queued
1792  */
1793 int qlen(struct queue *q)
1794 {
1795         return q->dlen;
1796 }
1797
1798 /*
1799  * return space remaining before flow control
1800  */
1801 int qwindow(struct queue *q)
1802 {
1803         int l;
1804
1805         l = q->limit - q->len;
1806         if (l < 0)
1807                 l = 0;
1808         return l;
1809 }
1810
1811 /*
1812  *  return true if we can read without blocking
1813  */
1814 int qcanread(struct queue *q)
1815 {
1816         return q->bfirst != 0;
1817 }
1818
1819 /*
1820  *  change queue limit
1821  */
1822 void qsetlimit(struct queue *q, int limit)
1823 {
1824         q->limit = limit;
1825 }
1826
1827 /*
1828  *  set blocking/nonblocking
1829  */
1830 void qnoblock(struct queue *q, int onoff)
1831 {
1832         q->noblock = onoff;
1833 }
1834
1835 /*
1836  *  flush the output queue
1837  */
1838 void qflush(struct queue *q)
1839 {
1840         struct block *bfirst;
1841
1842         /* mark it */
1843         spin_lock_irqsave(&q->lock);
1844         bfirst = q->bfirst;
1845         q->bfirst = 0;
1846         q->len = 0;
1847         q->dlen = 0;
1848         spin_unlock_irqsave(&q->lock);
1849
1850         /* free queued blocks */
1851         freeblist(bfirst);
1852
1853         /* wake up readers/writers */
1854         rendez_wakeup(&q->wr);
1855 }
1856
1857 int qfull(struct queue *q)
1858 {
1859         return q->state & Qflow;
1860 }
1861
1862 int qstate(struct queue *q)
1863 {
1864         return q->state;
1865 }
1866
1867 void qdump(struct queue *q)
1868 {
1869         if (q)
1870                 printk("q=%p bfirst=%p blast=%p len=%d dlen=%d limit=%d state=#%x\n",
1871                            q, q->bfirst, q->blast, q->len, q->dlen, q->limit, q->state);
1872 }