eef857427cd9d99e8fccf37dcaeac44833dfe66b
[akaros.git] / kern / src / ns / qio.c
1 // INFERNO
2 #include <vfs.h>
3 #include <kfs.h>
4 #include <slab.h>
5 #include <kmalloc.h>
6 #include <kref.h>
7 #include <string.h>
8 #include <stdio.h>
9 #include <assert.h>
10 #include <error.h>
11 #include <cpio.h>
12 #include <pmap.h>
13 #include <smp.h>
14 #include <ip.h>
15
16 #define WARN_EXTRA(b)                                                          \
17 {                                                                              \
18         if ((b)->extra_len)                                                        \
19                 warn_once("%s doesn't handle extra_data", __FUNCTION__);               \
20 }
21
22 static uint32_t padblockcnt;
23 static uint32_t concatblockcnt;
24 static uint32_t pullupblockcnt;
25 static uint32_t copyblockcnt;
26 static uint32_t consumecnt;
27 static uint32_t producecnt;
28 static uint32_t qcopycnt;
29
30 static int debugging;
31
32 #define QDEBUG  if(0)
33
34 /*
35  *  IO queues
36  */
37
38 struct queue {
39         spinlock_t lock;;
40
41         struct block *bfirst;           /* buffer */
42         struct block *blast;
43
44         int len;                                        /* bytes allocated to queue */
45         int dlen;                                       /* data bytes in queue */
46         int limit;                                      /* max bytes in queue */
47         int inilim;                             /* initial limit */
48         int state;
49         int noblock;                            /* true if writes return immediately when q full */
50         int eof;                                        /* number of eofs read by user */
51
52         void (*kick) (void *);          /* restart output */
53         void (*bypass) (void *, struct block *);        /* bypass queue altogether */
54         void *arg;                                      /* argument to kick */
55
56         qlock_t rlock;                          /* mutex for reading processes */
57         struct rendez rr;                       /* process waiting to read */
58         qlock_t wlock;                          /* mutex for writing processes */
59         struct rendez wr;                       /* process waiting to write */
60
61         char err[ERRMAX];
62 };
63
64 enum {
65         Maxatomic = 64 * 1024,
66 };
67
68 unsigned int qiomaxatomic = Maxatomic;
69
70 void ixsummary(void)
71 {
72         debugging ^= 1;
73         iallocsummary();
74         printd("pad %lu, concat %lu, pullup %lu, copy %lu\n",
75                    padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
76         printd("consume %lu, produce %lu, qcopy %lu\n",
77                    consumecnt, producecnt, qcopycnt);
78 }
79
80 /*
81  *  free a list of blocks
82  */
83 void freeblist(struct block *b)
84 {
85         struct block *next;
86
87         for (; b != 0; b = next) {
88                 next = b->next;
89                 b->next = 0;
90                 freeb(b);
91         }
92 }
93
94 /*
95  *  pad a block to the front (or the back if size is negative)
96  */
97 struct block *padblock(struct block *bp, int size)
98 {
99         int n;
100         struct block *nbp;
101         uint8_t bcksum = bp->flag & BCKSUM_FLAGS;
102         uint16_t checksum_start = bp->checksum_start;
103         uint16_t checksum_offset = bp->checksum_offset;
104
105         QDEBUG checkb(bp, "padblock 1");
106         if (size >= 0) {
107                 if (bp->rp - bp->base >= size) {
108                         bp->checksum_start += size;
109                         bp->rp -= size;
110                         return bp;
111                 }
112
113                 WARN_EXTRA(bp);
114                 if (bp->next)
115                         panic("padblock %p", getcallerpc(&bp));
116                 n = BLEN(bp);
117                 padblockcnt++;
118                 nbp = allocb(size + n);
119                 nbp->rp += size;
120                 nbp->wp = nbp->rp;
121                 memmove(nbp->wp, bp->rp, n);
122                 nbp->wp += n;
123                 freeb(bp);
124                 nbp->rp -= size;
125         } else {
126                 size = -size;
127
128                 WARN_EXTRA(bp);
129
130                 if (bp->next)
131                         panic("padblock %p", getcallerpc(&bp));
132
133                 if (bp->lim - bp->wp >= size)
134                         return bp;
135
136                 n = BLEN(bp);
137                 padblockcnt++;
138                 nbp = allocb(size + n);
139                 memmove(nbp->wp, bp->rp, n);
140                 nbp->wp += n;
141                 freeb(bp);
142         }
143         if (bcksum) {
144                 nbp->flag |= bcksum;
145                 nbp->checksum_start = checksum_start;
146                 nbp->checksum_offset = checksum_offset;
147         }
148         QDEBUG checkb(nbp, "padblock 1");
149         return nbp;
150 }
151
152 /*
153  *  return count of bytes in a string of blocks
154  */
155 int blocklen(struct block *bp)
156 {
157         int len;
158
159         len = 0;
160         while (bp) {
161                 len += BLEN(bp);
162                 bp = bp->next;
163         }
164         return len;
165 }
166
167 /*
168  * return count of space in blocks
169  */
170 int blockalloclen(struct block *bp)
171 {
172         int len;
173
174         len = 0;
175         while (bp) {
176                 len += BALLOC(bp);
177                 bp = bp->next;
178         }
179         return len;
180 }
181
182 /*
183  *  copy the  string of blocks into
184  *  a single block and free the string
185  */
186 struct block *concatblock(struct block *bp)
187 {
188         int len;
189         struct block *nb, *f;
190
191         if (bp->next == 0)
192                 return bp;
193
194         /* probably use parts of qclone */
195         WARN_EXTRA(bp);
196         nb = allocb(blocklen(bp));
197         for (f = bp; f; f = f->next) {
198                 len = BLEN(f);
199                 memmove(nb->wp, f->rp, len);
200                 nb->wp += len;
201         }
202         concatblockcnt += BLEN(nb);
203         freeblist(bp);
204         QDEBUG checkb(nb, "concatblock 1");
205         return nb;
206 }
207
208 /* Returns a block with the remaining contents of b all in the main body of the
209  * returned block.  Replace old references to b with the returned value (which
210  * may still be 'b', if no change was needed. */
211 struct block *linearizeblock(struct block *b)
212 {
213         struct block *newb;
214         size_t len;
215         struct extra_bdata *ebd;
216
217         if (!b->extra_len)
218                 return b;
219
220         newb = allocb(BLEN(b));
221         len = BHLEN(b);
222         memcpy(newb->wp, b->rp, len);
223         newb->wp += len;
224         len = b->extra_len;
225         for (int i = 0; (i < b->nr_extra_bufs) && len; i++) {
226                 ebd = &b->extra_data[i];
227                 if (!ebd->base || !ebd->len)
228                         continue;
229                 memcpy(newb->wp, (void*)(ebd->base + ebd->off), ebd->len);
230                 newb->wp += ebd->len;
231                 len -= ebd->len;
232         }
233         /* TODO: any other flags that need copied over? */
234         if (b->flag & BCKSUM_FLAGS) {
235                 newb->flag |= (b->flag & BCKSUM_FLAGS);
236                 newb->checksum_start = b->checksum_start;
237                 newb->checksum_offset = b->checksum_offset;
238         }
239         freeb(b);
240         return newb;
241 }
242
243 /*
244  *  make sure the first block has at least n bytes in its main body
245  */
246 struct block *pullupblock(struct block *bp, int n)
247 {
248         int i, len, seglen;
249         struct block *nbp;
250         struct extra_bdata *ebd;
251
252         /*
253          *  this should almost always be true, it's
254          *  just to avoid every caller checking.
255          */
256         if (BHLEN(bp) >= n)
257                 return bp;
258
259          /* a start at explicit main-body / header management */
260         if (bp->extra_len) {
261                 if (n > bp->lim - bp->rp) {
262                         /* would need to realloc a new block and copy everything over. */
263                         panic("can't pullup, no place to put it\n");
264                 }
265                 len = n - BHLEN(bp);
266                 if (len > bp->extra_len)
267                         panic("pullup more than extra (%d, %d, %d)\n",
268                               n, BHLEN(bp), bp->extra_len);
269                 checkb(bp, "before pullup");
270                 for (int i = 0; (i < bp->nr_extra_bufs) && len; i++) {
271                         ebd = &bp->extra_data[i];
272                         if (!ebd->base || !ebd->len)
273                                 continue;
274                         seglen = MIN(ebd->len, len);
275                         memcpy(bp->wp, (void*)(ebd->base + ebd->off), seglen);
276                         bp->wp += seglen;
277                         len -= seglen;
278                         ebd->len -= seglen;
279                         ebd->off += seglen;
280                         bp->extra_len -= seglen;
281                         if (ebd->len == 0) {
282                                 kfree((void *)ebd->base);
283                                 ebd->len = 0;
284                                 ebd->off = 0;
285                                 ebd->base = 0;
286                         }
287                 }
288                 /* maybe just call pullupblock recursively here */
289                 if (len)
290                         panic("pullup %d bytes overdrawn\n", len);
291                 checkb(bp, "after pullup");
292                 return bp;
293         }
294
295         /*
296          *  if not enough room in the first block,
297          *  add another to the front of the list.
298          */
299         if (bp->lim - bp->rp < n) {
300                 nbp = allocb(n);
301                 nbp->next = bp;
302                 bp = nbp;
303         }
304
305         /*
306          *  copy bytes from the trailing blocks into the first
307          */
308         n -= BLEN(bp);
309         while ((nbp = bp->next)) {
310                 i = BLEN(nbp);
311                 if (i > n) {
312                         memmove(bp->wp, nbp->rp, n);
313                         pullupblockcnt++;
314                         bp->wp += n;
315                         nbp->rp += n;
316                         QDEBUG checkb(bp, "pullupblock 1");
317                         return bp;
318                 } else {
319                         memmove(bp->wp, nbp->rp, i);
320                         pullupblockcnt++;
321                         bp->wp += i;
322                         bp->next = nbp->next;
323                         nbp->next = 0;
324                         freeb(nbp);
325                         n -= i;
326                         if (n == 0) {
327                                 QDEBUG checkb(bp, "pullupblock 2");
328                                 return bp;
329                         }
330                 }
331         }
332         freeb(bp);
333         return 0;
334 }
335
336 /*
337  *  make sure the first block has at least n bytes in its main body
338  */
339 struct block *pullupqueue(struct queue *q, int n)
340 {
341         struct block *b;
342
343         /* TODO: lock to protect the queue links? */
344         if ((BHLEN(q->bfirst) >= n))
345                 return q->bfirst;
346         q->bfirst = pullupblock(q->bfirst, n);
347         for (b = q->bfirst; b != NULL && b->next != NULL; b = b->next) ;
348         q->blast = b;
349         return q->bfirst;
350 }
351
352 /*
353  *  trim to len bytes starting at offset
354  */
355 struct block *trimblock(struct block *bp, int offset, int len)
356 {
357         uint32_t l;
358         struct block *nb, *startb;
359
360         QDEBUG checkb(bp, "trimblock 1");
361         if (blocklen(bp) < offset + len) {
362                 freeblist(bp);
363                 return NULL;
364         }
365
366         while ((l = BLEN(bp)) < offset) {
367                 offset -= l;
368                 nb = bp->next;
369                 bp->next = NULL;
370                 freeb(bp);
371                 bp = nb;
372         }
373
374         WARN_EXTRA(bp);
375         startb = bp;
376         bp->rp += offset;
377
378         while ((l = BLEN(bp)) < len) {
379                 len -= l;
380                 bp = bp->next;
381         }
382
383         bp->wp -= (BLEN(bp) - len);
384
385         if (bp->next) {
386                 freeblist(bp->next);
387                 bp->next = NULL;
388         }
389
390         return startb;
391 }
392
393 /*
394  *  copy 'count' bytes into a new block
395  */
396 struct block *copyblock(struct block *bp, int count)
397 {
398         int l;
399         struct block *nbp;
400
401         QDEBUG checkb(bp, "copyblock 0");
402         nbp = allocb(count);
403         if (bp->flag & BCKSUM_FLAGS) {
404                 nbp->flag |= (bp->flag & BCKSUM_FLAGS);
405                 nbp->checksum_start = bp->checksum_start;
406                 nbp->checksum_offset = bp->checksum_offset;
407         }
408         WARN_EXTRA(bp);
409         for (; count > 0 && bp != 0; bp = bp->next) {
410                 l = BLEN(bp);
411                 if (l > count)
412                         l = count;
413                 memmove(nbp->wp, bp->rp, l);
414                 nbp->wp += l;
415                 count -= l;
416         }
417         if (count > 0) {
418                 memset(nbp->wp, 0, count);
419                 nbp->wp += count;
420         }
421         copyblockcnt++;
422         QDEBUG checkb(nbp, "copyblock 1");
423
424         return nbp;
425 }
426
427 struct block *adjustblock(struct block *bp, int len)
428 {
429         int n;
430         struct block *nbp;
431
432         if (len < 0) {
433                 freeb(bp);
434                 return NULL;
435         }
436
437         WARN_EXTRA(bp);
438         if (bp->rp + len > bp->lim) {
439                 nbp = copyblock(bp, len);
440                 freeblist(bp);
441                 QDEBUG checkb(nbp, "adjustblock 1");
442
443                 return nbp;
444         }
445
446         n = BLEN(bp);
447         if (len > n)
448                 memset(bp->wp, 0, len - n);
449         bp->wp = bp->rp + len;
450         QDEBUG checkb(bp, "adjustblock 2");
451
452         return bp;
453 }
454
455 /*
456  *  throw away up to count bytes from a
457  *  list of blocks.  Return count of bytes
458  *  thrown away.
459  */
460 int pullblock(struct block **bph, int count)
461 {
462         struct block *bp;
463         int n, bytes;
464
465         bytes = 0;
466         if (bph == NULL)
467                 return 0;
468
469         while (*bph != NULL && count != 0) {
470                 bp = *bph;
471         WARN_EXTRA(bp);
472
473                 n = BLEN(bp);
474                 if (count < n)
475                         n = count;
476                 bytes += n;
477                 count -= n;
478                 bp->rp += n;
479                 QDEBUG checkb(bp, "pullblock ");
480                 if (BLEN(bp) == 0) {
481                         *bph = bp->next;
482                         bp->next = NULL;
483                         freeb(bp);
484                 }
485         }
486         return bytes;
487 }
488
489 /*
490  *  get next block from a queue, return null if nothing there
491  */
492 struct block *qget(struct queue *q)
493 {
494         int dowakeup;
495         struct block *b;
496
497         /* sync with qwrite */
498         spin_lock_irqsave(&q->lock);
499
500         b = q->bfirst;
501         if (b == NULL) {
502                 q->state |= Qstarve;
503                 spin_unlock_irqsave(&q->lock);
504                 return NULL;
505         }
506         q->bfirst = b->next;
507         b->next = 0;
508         q->len -= BALLOC(b);
509         q->dlen -= BLEN(b);
510         QDEBUG checkb(b, "qget");
511
512         /* if writer flow controlled, restart */
513         if ((q->state & Qflow) && q->len < q->limit / 2) {
514                 q->state &= ~Qflow;
515                 dowakeup = 1;
516         } else
517                 dowakeup = 0;
518
519         spin_unlock_irqsave(&q->lock);
520
521         if (dowakeup)
522                 rendez_wakeup(&q->wr);
523
524         return b;
525 }
526
527 /*
528  *  throw away the next 'len' bytes in the queue
529  * returning the number actually discarded
530  */
531 int qdiscard(struct queue *q, int len)
532 {
533         struct block *b;
534         int dowakeup, n, sofar, body_amt, extra_amt;
535         struct extra_bdata *ebd;
536
537         spin_lock_irqsave(&q->lock);
538         for (sofar = 0; sofar < len; sofar += n) {
539                 b = q->bfirst;
540                 if (b == NULL)
541                         break;
542                 QDEBUG checkb(b, "qdiscard");
543                 n = BLEN(b);
544                 if (n <= len - sofar) {
545                         q->bfirst = b->next;
546                         b->next = 0;
547                         q->len -= BALLOC(b);
548                         q->dlen -= BLEN(b);
549                         freeb(b);
550                 } else {
551                         n = len - sofar;
552                         q->dlen -= n;
553                         /* partial block removal */
554                         body_amt = MIN(BHLEN(b), n);
555                         b->rp += body_amt;
556                         extra_amt = n - body_amt;
557                         /* reduce q->len by the amount we remove from the extras.  The
558                          * header will always be accounted for above, during block removal.
559                          * */
560                         q->len -= extra_amt;
561                         for (int i = 0; (i < b->nr_extra_bufs) && extra_amt; i++) {
562                                 ebd = &b->extra_data[i];
563                                 if (!ebd->base || !ebd->len)
564                                         continue;
565                                 if (extra_amt >= ebd->len) {
566                                         /* remove the entire entry, note the kfree release */
567                                         b->extra_len -= ebd->len;
568                                         extra_amt -= ebd->len;
569                                         kfree((void*)ebd->base);
570                                         ebd->base = ebd->off = ebd->len = 0;
571                                         continue;
572                                 }
573                                 ebd->off += extra_amt;
574                                 ebd->len -= extra_amt;
575                                 b->extra_len -= extra_amt;
576                                 extra_amt = 0;
577                         }
578                 }
579         }
580
581         /*
582          *  if writer flow controlled, restart
583          *
584          *  This used to be
585          *  q->len < q->limit/2
586          *  but it slows down tcp too much for certain write sizes.
587          *  I really don't understand it completely.  It may be
588          *  due to the queue draining so fast that the transmission
589          *  stalls waiting for the app to produce more data.  - presotto
590          */
591         if ((q->state & Qflow) && q->len < q->limit) {
592                 q->state &= ~Qflow;
593                 dowakeup = 1;
594         } else
595                 dowakeup = 0;
596
597         spin_unlock_irqsave(&q->lock);
598
599         if (dowakeup)
600                 rendez_wakeup(&q->wr);
601
602         return sofar;
603 }
604
605 /*
606  *  Interrupt level copy out of a queue, return # bytes copied.
607  */
608 int qconsume(struct queue *q, void *vp, int len)
609 {
610         struct block *b;
611         int n, dowakeup;
612         uint8_t *p = vp;
613         struct block *tofree = NULL;
614
615         /* sync with qwrite */
616         spin_lock_irqsave(&q->lock);
617
618         for (;;) {
619                 b = q->bfirst;
620                 if (b == 0) {
621                         q->state |= Qstarve;
622                         spin_unlock_irqsave(&q->lock);
623                         return -1;
624                 }
625                 QDEBUG checkb(b, "qconsume 1");
626
627                 n = BLEN(b);
628                 if (n > 0)
629                         break;
630                 q->bfirst = b->next;
631                 q->len -= BALLOC(b);
632
633                 /* remember to free this */
634                 b->next = tofree;
635                 tofree = b;
636         };
637
638         WARN_EXTRA(b);
639         if (n < len)
640                 len = n;
641         memmove(p, b->rp, len);
642         consumecnt += n;
643         b->rp += len;
644         q->dlen -= len;
645
646         /* discard the block if we're done with it */
647         if ((q->state & Qmsg) || len == n) {
648                 q->bfirst = b->next;
649                 b->next = 0;
650                 q->len -= BALLOC(b);
651                 q->dlen -= BLEN(b);
652
653                 /* remember to free this */
654                 b->next = tofree;
655                 tofree = b;
656         }
657
658         /* if writer flow controlled, restart */
659         if ((q->state & Qflow) && q->len < q->limit / 2) {
660                 q->state &= ~Qflow;
661                 dowakeup = 1;
662         } else
663                 dowakeup = 0;
664
665         spin_unlock_irqsave(&q->lock);
666
667         if (dowakeup)
668                 rendez_wakeup(&q->wr);
669
670         if (tofree != NULL)
671                 freeblist(tofree);
672
673         return len;
674 }
675
676 int qpass(struct queue *q, struct block *b)
677 {
678         int dlen, len, dowakeup;
679
680         /* sync with qread */
681         dowakeup = 0;
682         spin_lock_irqsave(&q->lock);
683         if (q->len >= q->limit) {
684                 freeblist(b);
685                 spin_unlock_irqsave(&q->lock);
686                 return -1;
687         }
688         if (q->state & Qclosed) {
689                 len = blocklen(b);
690                 freeblist(b);
691                 spin_unlock_irqsave(&q->lock);
692                 return len;
693         }
694
695         /* add buffer to queue */
696         if (q->bfirst)
697                 q->blast->next = b;
698         else
699                 q->bfirst = b;
700         len = BALLOC(b);
701         dlen = BLEN(b);
702         QDEBUG checkb(b, "qpass");
703         while (b->next) {
704                 b = b->next;
705                 QDEBUG checkb(b, "qpass");
706                 len += BALLOC(b);
707                 dlen += BLEN(b);
708         }
709         q->blast = b;
710         q->len += len;
711         q->dlen += dlen;
712
713         if (q->len >= q->limit / 2)
714                 q->state |= Qflow;
715
716         if (q->state & Qstarve) {
717                 q->state &= ~Qstarve;
718                 dowakeup = 1;
719         }
720         spin_unlock_irqsave(&q->lock);
721
722         if (dowakeup)
723                 rendez_wakeup(&q->rr);
724
725         return len;
726 }
727
728 int qpassnolim(struct queue *q, struct block *b)
729 {
730         int dlen, len, dowakeup;
731
732         /* sync with qread */
733         dowakeup = 0;
734         spin_lock_irqsave(&q->lock);
735
736         if (q->state & Qclosed) {
737                 freeblist(b);
738                 spin_unlock_irqsave(&q->lock);
739                 return BALLOC(b);
740         }
741
742         /* add buffer to queue */
743         if (q->bfirst)
744                 q->blast->next = b;
745         else
746                 q->bfirst = b;
747         len = BALLOC(b);
748         dlen = BLEN(b);
749         QDEBUG checkb(b, "qpass");
750         while (b->next) {
751                 b = b->next;
752                 QDEBUG checkb(b, "qpass");
753                 len += BALLOC(b);
754                 dlen += BLEN(b);
755         }
756         q->blast = b;
757         q->len += len;
758         q->dlen += dlen;
759
760         if (q->len >= q->limit / 2)
761                 q->state |= Qflow;
762
763         if (q->state & Qstarve) {
764                 q->state &= ~Qstarve;
765                 dowakeup = 1;
766         }
767         spin_unlock_irqsave(&q->lock);
768
769         if (dowakeup)
770                 rendez_wakeup(&q->rr);
771
772         return len;
773 }
774
775 /*
776  *  if the allocated space is way out of line with the used
777  *  space, reallocate to a smaller block
778  */
779 struct block *packblock(struct block *bp)
780 {
781         struct block **l, *nbp;
782         int n;
783
784         WARN_EXTRA(bp);
785         for (l = &bp; *l; l = &(*l)->next) {
786                 nbp = *l;
787                 n = BLEN(nbp);
788                 if ((n << 2) < BALLOC(nbp)) {
789                         *l = allocb(n);
790                         memmove((*l)->wp, nbp->rp, n);
791                         (*l)->wp += n;
792                         (*l)->next = nbp->next;
793                         freeb(nbp);
794                 }
795         }
796
797         return bp;
798 }
799
800 int qproduce(struct queue *q, void *vp, int len)
801 {
802         struct block *b;
803         int dowakeup;
804         uint8_t *p = vp;
805
806         /* sync with qread */
807         dowakeup = 0;
808         spin_lock_irqsave(&q->lock);
809
810         /* no waiting receivers, room in buffer? */
811         if (q->len >= q->limit) {
812                 q->state |= Qflow;
813                 spin_unlock_irqsave(&q->lock);
814                 return -1;
815         }
816
817         /* save in buffer */
818         /* use Qcoalesce here to save storage */
819         // TODO: Consider removing the Qcoalesce flag and force a coalescing
820         // strategy by default.
821         b = q->blast;
822         if ((q->state & Qcoalesce) == 0 || q->bfirst == NULL
823                 || b->lim - b->wp < len) {
824                 /* need a new block */
825                 b = iallocb(len);
826                 if (b == 0) {
827                         spin_unlock_irqsave(&q->lock);
828                         return 0;
829                 }
830                 if (q->bfirst)
831                         q->blast->next = b;
832                 else
833                         q->bfirst = b;
834                 q->blast = b;
835                 /* b->next = 0; done by iallocb() */
836                 q->len += BALLOC(b);
837         }
838         WARN_EXTRA(b);
839         memmove(b->wp, p, len);
840         producecnt += len;
841         b->wp += len;
842         q->dlen += len;
843         QDEBUG checkb(b, "qproduce");
844
845         if (q->state & Qstarve) {
846                 q->state &= ~Qstarve;
847                 dowakeup = 1;
848         }
849
850         if (q->len >= q->limit)
851                 q->state |= Qflow;
852         spin_unlock_irqsave(&q->lock);
853
854         if (dowakeup)
855                 rendez_wakeup(&q->rr);
856
857         return len;
858 }
859
860 /* Add an extra_data entry to newb at newb_idx pointing to b's body, starting at
861  * body_rp, for up to len.  Returns the len consumed. 
862  *
863  * The base is 'b', so that we can kfree it later.  This currently ties us to
864  * using kfree for the release method for all extra_data.
865  *
866  * It is possible to have a body size that is 0, if there is no offset, and
867  * b->wp == b->rp.  This will have an extra data entry of 0 length. */
868 static size_t point_to_body(struct block *b, uint8_t *body_rp,
869                             struct block *newb, unsigned int newb_idx,
870                             size_t len)
871 {
872         struct extra_bdata *ebd = &newb->extra_data[newb_idx];
873
874         assert(newb_idx < newb->nr_extra_bufs);
875
876         kmalloc_incref(b);
877         ebd->base = (uintptr_t)b;
878         ebd->off = (uint32_t)(body_rp - (uint8_t*)b);
879         ebd->len = MIN(b->wp - body_rp, len);   /* think of body_rp as b->rp */
880         assert((int)ebd->len >= 0);
881         newb->extra_len += ebd->len;
882         return ebd->len;
883 }
884
885 /* Add an extra_data entry to newb at newb_idx pointing to b's b_idx'th
886  * extra_data buf, at b_off within that buffer, for up to len.  Returns the len
887  * consumed.
888  *
889  * We can have blocks with 0 length, but they are still refcnt'd.  See above. */
890 static size_t point_to_buf(struct block *b, unsigned int b_idx, uint32_t b_off,
891                            struct block *newb, unsigned int newb_idx,
892                            size_t len)
893 {
894         struct extra_bdata *n_ebd = &newb->extra_data[newb_idx];
895         struct extra_bdata *b_ebd = &b->extra_data[b_idx];
896
897         assert(b_idx < b->nr_extra_bufs);
898         assert(newb_idx < newb->nr_extra_bufs);
899
900         kmalloc_incref((void*)b_ebd->base);
901         n_ebd->base = b_ebd->base;
902         n_ebd->off = b_ebd->off + b_off;
903         n_ebd->len = MIN(b_ebd->len - b_off, len);
904         newb->extra_len += n_ebd->len;
905         return n_ebd->len;
906 }
907
908 /* given a string of blocks, fills the new block's extra_data  with the contents
909  * of the blist [offset, len + offset)
910  *
911  * returns 0 on success.  the only failure is if the extra_data array was too
912  * small, so this returns a positive integer saying how big the extra_data needs
913  * to be.
914  *
915  * callers are responsible for protecting the list structure. */
916 static int __blist_clone_to(struct block *blist, struct block *newb, int len,
917                             uint32_t offset)
918 {
919         struct block *b, *first;
920         unsigned int nr_bufs = 0;
921         unsigned int b_idx, newb_idx = 0;
922         uint8_t *first_main_body = 0;
923
924         /* find the first block; keep offset relative to the latest b in the list */
925         for (b = blist; b; b = b->next) {
926                 if (BLEN(b) > offset)
927                         break;
928                 offset -= BLEN(b);
929         }
930         /* qcopy semantics: if you asked for an offset outside the block list, you
931          * get an empty block back */
932         if (!b)
933                 return 0;
934         first = b;
935         /* upper bound for how many buffers we'll need in newb */
936         for (/* b is set*/; b; b = b->next) {
937                 nr_bufs += 1 + b->nr_extra_bufs;        /* 1 for the main body */
938         }
939         /* we might be holding a spinlock here, so we won't wait for kmalloc */
940         block_add_extd(newb, nr_bufs, 0);
941         if (newb->nr_extra_bufs < nr_bufs) {
942                 /* caller will need to alloc these, then re-call us */
943                 return nr_bufs;
944         }
945         for (b = first; b && len; b = b->next) {
946                 b_idx = 0;
947                 if (offset) {
948                         if (offset < BHLEN(b)) {
949                                 /* off is in the main body */
950                                 len -= point_to_body(b, b->rp + offset, newb, newb_idx, len);
951                                 newb_idx++;
952                         } else {
953                                 /* off is in one of the buffers (or just past the last one).
954                                  * we're not going to point to b's main body at all. */
955                                 offset -= BHLEN(b);
956                                 assert(b->extra_data);
957                                 /* assuming these extrabufs are packed, or at least that len
958                                  * isn't gibberish */
959                                 while (b->extra_data[b_idx].len <= offset) {
960                                         offset -= b->extra_data[b_idx].len;
961                                         b_idx++;
962                                 }
963                                 /* now offset is set to our offset in the b_idx'th buf */
964                                 len -= point_to_buf(b, b_idx, offset, newb, newb_idx, len);
965                                 newb_idx++;
966                                 b_idx++;
967                         }
968                         offset = 0;
969                 } else {
970                         len -= point_to_body(b, b->rp, newb, newb_idx, len);
971                         newb_idx++;
972                 }
973                 /* knock out all remaining bufs.  we only did one point_to_ op by now,
974                  * and any point_to_ could be our last if it consumed all of len. */
975                 for (int i = b_idx; (i < b->nr_extra_bufs) && len; i++) {
976                         len -= point_to_buf(b, i, 0, newb, newb_idx, len);
977                         newb_idx++;
978                 }
979         }
980         return 0;
981 }
982
983 struct block *blist_clone(struct block *blist, int header_len, int len,
984                           uint32_t offset)
985 {
986         int ret;
987         struct block *newb = allocb(header_len);
988         do {
989                 ret = __blist_clone_to(blist, newb, len, offset);
990                 if (ret)
991                         block_add_extd(newb, ret, KMALLOC_WAIT);
992         } while (ret);
993         return newb;
994 }
995
996 /* given a queue, makes a single block with header_len reserved space in the
997  * block main body, and the contents of [offset, len + offset) pointed to in the
998  * new blocks ext_data. */
999 struct block *qclone(struct queue *q, int header_len, int len, uint32_t offset)
1000 {
1001         int ret;
1002         struct block *newb = allocb(header_len);
1003         /* the while loop should rarely be used: it would require someone
1004          * concurrently adding to the queue. */
1005         do {
1006                 /* TODO: RCU: protecting the q list (b->next) (need read lock) */
1007                 spin_lock_irqsave(&q->lock);
1008                 ret = __blist_clone_to(q->bfirst, newb, len, offset);
1009                 spin_unlock_irqsave(&q->lock);
1010                 if (ret)
1011                         block_add_extd(newb, ret, KMALLOC_WAIT);
1012         } while (ret);
1013         return newb;
1014 }
1015
1016 /*
1017  *  copy from offset in the queue
1018  */
1019 struct block *qcopy_old(struct queue *q, int len, uint32_t offset)
1020 {
1021         int sofar;
1022         int n;
1023         struct block *b, *nb;
1024         uint8_t *p;
1025
1026         nb = allocb(len);
1027
1028         spin_lock_irqsave(&q->lock);
1029
1030         /* go to offset */
1031         b = q->bfirst;
1032         for (sofar = 0;; sofar += n) {
1033                 if (b == NULL) {
1034                         spin_unlock_irqsave(&q->lock);
1035                         return nb;
1036                 }
1037                 n = BLEN(b);
1038                 if (sofar + n > offset) {
1039                         p = b->rp + offset - sofar;
1040                         n -= offset - sofar;
1041                         break;
1042                 }
1043                 QDEBUG checkb(b, "qcopy");
1044                 b = b->next;
1045         }
1046
1047         /* copy bytes from there */
1048         for (sofar = 0; sofar < len;) {
1049                 if (n > len - sofar)
1050                         n = len - sofar;
1051                 WARN_EXTRA(b);
1052                 memmove(nb->wp, p, n);
1053                 qcopycnt += n;
1054                 sofar += n;
1055                 nb->wp += n;
1056                 b = b->next;
1057                 if (b == NULL)
1058                         break;
1059                 n = BLEN(b);
1060                 p = b->rp;
1061         }
1062         spin_unlock_irqsave(&q->lock);
1063
1064         return nb;
1065 }
1066
1067 struct block *qcopy(struct queue *q, int len, uint32_t offset)
1068 {
1069 #ifdef CONFIG_BLOCK_EXTRAS
1070         return qclone(q, 0, len, offset);
1071 #else
1072         return qcopy_old(q, len, offset);
1073 #endif
1074 }
1075
1076 static void qinit_common(struct queue *q)
1077 {
1078         spinlock_init_irqsave(&q->lock);
1079         qlock_init(&q->rlock);
1080         qlock_init(&q->wlock);
1081         rendez_init(&q->rr);
1082         rendez_init(&q->wr);
1083 }
1084
1085 /*
1086  *  called by non-interrupt code
1087  */
1088 struct queue *qopen(int limit, int msg, void (*kick) (void *), void *arg)
1089 {
1090         struct queue *q;
1091
1092         q = kzmalloc(sizeof(struct queue), 0);
1093         if (q == 0)
1094                 return 0;
1095         qinit_common(q);
1096
1097         q->limit = q->inilim = limit;
1098         q->kick = kick;
1099         q->arg = arg;
1100         q->state = msg;
1101         q->state |= Qstarve;
1102         q->eof = 0;
1103         q->noblock = 0;
1104
1105         return q;
1106 }
1107
1108 /* open a queue to be bypassed */
1109 struct queue *qbypass(void (*bypass) (void *, struct block *), void *arg)
1110 {
1111         struct queue *q;
1112
1113         q = kzmalloc(sizeof(struct queue), 0);
1114         if (q == 0)
1115                 return 0;
1116         qinit_common(q);
1117
1118         q->limit = 0;
1119         q->arg = arg;
1120         q->bypass = bypass;
1121         q->state = 0;
1122
1123         return q;
1124 }
1125
1126 static int notempty(void *a)
1127 {
1128         struct queue *q = a;
1129
1130         return (q->state & Qclosed) || q->bfirst != 0;
1131 }
1132
1133 /* wait for the queue to be non-empty or closed.
1134  *
1135  * called with q ilocked.  rendez may error out, back through the caller, with
1136  * the irqsave lock unlocked.  */
1137 static int qwait(struct queue *q)
1138 {
1139         /* wait for data */
1140         for (;;) {
1141                 if (q->bfirst != NULL)
1142                         break;
1143
1144                 if (q->state & Qclosed) {
1145                         if (++q->eof > 3)
1146                                 return -1;
1147                         if (*q->err && strcmp(q->err, Ehungup) != 0)
1148                                 return -1;
1149                         return 0;
1150                 }
1151
1152                 q->state |= Qstarve;    /* flag requesting producer to wake me */
1153                 spin_unlock_irqsave(&q->lock);
1154                 /* may throw an error() */
1155                 rendez_sleep(&q->rr, notempty, q);
1156                 spin_lock_irqsave(&q->lock);
1157         }
1158         return 1;
1159 }
1160
1161 /*
1162  * add a block list to a queue
1163  */
1164 void qaddlist(struct queue *q, struct block *b)
1165 {
1166         /* TODO: q lock? */
1167         /* queue the block */
1168         if (q->bfirst)
1169                 q->blast->next = b;
1170         else
1171                 q->bfirst = b;
1172         q->len += blockalloclen(b);
1173         q->dlen += blocklen(b);
1174         while (b->next)
1175                 b = b->next;
1176         q->blast = b;
1177 }
1178
1179 /*
1180  *  called with q ilocked
1181  */
1182 struct block *qremove(struct queue *q)
1183 {
1184         struct block *b;
1185
1186         b = q->bfirst;
1187         if (b == NULL)
1188                 return NULL;
1189         q->bfirst = b->next;
1190         b->next = NULL;
1191         q->dlen -= BLEN(b);
1192         q->len -= BALLOC(b);
1193         QDEBUG checkb(b, "qremove");
1194         return b;
1195 }
1196
1197 static size_t read_from_block(struct block *b, uint8_t *to, size_t amt)
1198 {
1199         size_t copy_amt, retval = 0;
1200         struct extra_bdata *ebd;
1201         
1202         copy_amt = MIN(BHLEN(b), amt);
1203         memcpy(to, b->rp, copy_amt);
1204         /* advance the rp, since this block not be completely consumed and future
1205          * reads need to know where to pick up from */
1206         b->rp += copy_amt;
1207         to += copy_amt;
1208         amt -= copy_amt;
1209         retval += copy_amt;
1210         for (int i = 0; (i < b->nr_extra_bufs) && amt; i++) {
1211                 ebd = &b->extra_data[i];
1212                 /* skip empty entires.  if we track this in the struct block, we can
1213                  * just start the for loop early */
1214                 if (!ebd->base || !ebd->len)
1215                         continue;
1216                 copy_amt = MIN(ebd->len, amt);
1217                 memcpy(to, (void*)(ebd->base + ebd->off), copy_amt);
1218                 /* we're actually consuming the entries, just like how we advance rp up
1219                  * above, and might only consume part of one. */
1220                 ebd->len -= copy_amt;
1221                 ebd->off += copy_amt;
1222                 b->extra_len -= copy_amt;
1223                 if (!ebd->len) {
1224                         /* we don't actually have to decref here.  it's also done in
1225                          * freeb().  this is the earliest we can free. */
1226                         kfree((void*)ebd->base);
1227                         ebd->base = ebd->off = 0;
1228                 }
1229                 to += copy_amt;
1230                 amt -= copy_amt;
1231                 retval += copy_amt;
1232         }
1233         return retval;
1234 }
1235
1236 /*
1237  *  copy the contents of a string of blocks into
1238  *  memory.  emptied blocks are freed.  return
1239  *  pointer to first unconsumed block.
1240  */
1241 struct block *bl2mem(uint8_t * p, struct block *b, int n)
1242 {
1243         int i;
1244         struct block *next;
1245
1246         /* could be slicker here, since read_from_block is smart */
1247         for (; b != NULL; b = next) {
1248                 i = BLEN(b);
1249                 if (i > n) {
1250                         /* partial block, consume some */
1251                         read_from_block(b, p, n);
1252                         return b;
1253                 }
1254                 /* full block, consume all and move on */
1255                 i = read_from_block(b, p, i);
1256                 n -= i;
1257                 p += i;
1258                 next = b->next;
1259                 freeb(b);
1260         }
1261         return NULL;
1262 }
1263
1264 /*
1265  *  copy the contents of memory into a string of blocks.
1266  *  return NULL on error.
1267  */
1268 struct block *mem2bl(uint8_t * p, int len)
1269 {
1270         ERRSTACK(1);
1271         int n;
1272         struct block *b, *first, **l;
1273
1274         first = NULL;
1275         l = &first;
1276         if (waserror()) {
1277                 freeblist(first);
1278                 nexterror();
1279         }
1280         do {
1281                 n = len;
1282                 if (n > Maxatomic)
1283                         n = Maxatomic;
1284
1285                 *l = b = allocb(n);
1286                 /* TODO consider extra_data */
1287                 memmove(b->wp, p, n);
1288                 b->wp += n;
1289                 p += n;
1290                 len -= n;
1291                 l = &b->next;
1292         } while (len > 0);
1293         poperror();
1294
1295         return first;
1296 }
1297
1298 /*
1299  *  put a block back to the front of the queue
1300  *  called with q ilocked
1301  */
1302 void qputback(struct queue *q, struct block *b)
1303 {
1304         b->next = q->bfirst;
1305         if (q->bfirst == NULL)
1306                 q->blast = b;
1307         q->bfirst = b;
1308         q->len += BALLOC(b);
1309         q->dlen += BLEN(b);
1310 }
1311
1312 /*
1313  *  flow control, get producer going again
1314  *  called with q ilocked
1315  */
1316 static void qwakeup_iunlock(struct queue *q)
1317 {
1318         int dowakeup = 0;
1319
1320         /* if writer flow controlled, restart */
1321         if ((q->state & Qflow) && q->len < q->limit / 2) {
1322                 q->state &= ~Qflow;
1323                 dowakeup = 1;
1324         }
1325
1326         spin_unlock_irqsave(&q->lock);
1327
1328         /* wakeup flow controlled writers */
1329         if (dowakeup) {
1330                 if (q->kick)
1331                         q->kick(q->arg);
1332                 rendez_wakeup(&q->wr);
1333         }
1334 }
1335
1336 /*
1337  *  get next block from a queue (up to a limit)
1338  */
1339 struct block *qbread(struct queue *q, int len)
1340 {
1341         ERRSTACK(1);
1342         struct block *b, *nb;
1343         int n;
1344
1345         qlock(&q->rlock);
1346         if (waserror()) {
1347                 qunlock(&q->rlock);
1348                 nexterror();
1349         }
1350
1351         spin_lock_irqsave(&q->lock);
1352         switch (qwait(q)) {
1353                 case 0:
1354                         /* queue closed */
1355                         spin_unlock_irqsave(&q->lock);
1356                         qunlock(&q->rlock);
1357                         poperror();
1358                         return NULL;
1359                 case -1:
1360                         /* multiple reads on a closed queue */
1361                         spin_unlock_irqsave(&q->lock);
1362                         error(q->err);
1363         }
1364
1365         /* if we get here, there's at least one block in the queue */
1366         b = qremove(q);
1367         n = BLEN(b);
1368
1369         /* split block if it's too big and this is not a message queue */
1370         nb = b;
1371         if (n > len) {
1372                 WARN_EXTRA(b);
1373                 if ((q->state & Qmsg) == 0) {
1374                         n -= len;
1375                         b = allocb(n);
1376                         memmove(b->wp, nb->rp + len, n);
1377                         b->wp += n;
1378                         qputback(q, b);
1379                 }
1380                 nb->wp = nb->rp + len;
1381         }
1382
1383         /* restart producer */
1384         qwakeup_iunlock(q);
1385
1386         poperror();
1387         qunlock(&q->rlock);
1388         return nb;
1389 }
1390
1391 /*
1392  *  read a queue.  if no data is queued, post a struct block
1393  *  and wait on its Rendez.
1394  */
1395 long qread(struct queue *q, void *vp, int len)
1396 {
1397         ERRSTACK(1);
1398         struct block *b, *first, **l;
1399         int m, n;
1400
1401         qlock(&q->rlock);
1402         if (waserror()) {
1403                 qunlock(&q->rlock);
1404                 nexterror();
1405         }
1406
1407         spin_lock_irqsave(&q->lock);
1408 again:
1409         switch (qwait(q)) {
1410                 case 0:
1411                         /* queue closed */
1412                         spin_unlock_irqsave(&q->lock);
1413                         qunlock(&q->rlock);
1414                         poperror();
1415                         return 0;
1416                 case -1:
1417                         /* multiple reads on a closed queue */
1418                         spin_unlock_irqsave(&q->lock);
1419                         error(q->err);
1420         }
1421
1422         /* if we get here, there's at least one block in the queue */
1423         // TODO: Consider removing the Qcoalesce flag and force a coalescing
1424         // strategy by default.
1425         if (q->state & Qcoalesce) {
1426                 /* when coalescing, 0 length blocks just go away */
1427                 b = q->bfirst;
1428                 if (BLEN(b) <= 0) {
1429                         freeb(qremove(q));
1430                         goto again;
1431                 }
1432
1433                 /*  grab the first block plus as many
1434                  *  following blocks as will completely
1435                  *  fit in the read.
1436                  */
1437                 n = 0;
1438                 l = &first;
1439                 m = BLEN(b);
1440                 for (;;) {
1441                         *l = qremove(q);
1442                         l = &b->next;
1443                         n += m;
1444
1445                         b = q->bfirst;
1446                         if (b == NULL)
1447                                 break;
1448                         m = BLEN(b);
1449                         if (n + m > len)
1450                                 break;
1451                 }
1452         } else {
1453                 first = qremove(q);
1454                 n = BLEN(first);
1455         }
1456
1457         /* copy to user space outside of the ilock */
1458         spin_unlock_irqsave(&q->lock);
1459         b = bl2mem(vp, first, len);
1460         spin_lock_irqsave(&q->lock);
1461
1462         /* take care of any left over partial block */
1463         if (b != NULL) {
1464                 n -= BLEN(b);
1465                 if (q->state & Qmsg)
1466                         freeb(b);
1467                 else
1468                         qputback(q, b);
1469         }
1470
1471         /* restart producer */
1472         qwakeup_iunlock(q);
1473
1474         poperror();
1475         qunlock(&q->rlock);
1476         return n;
1477 }
1478
1479 static int qnotfull(void *a)
1480 {
1481         struct queue *q = a;
1482
1483         return q->len < q->limit || (q->state & Qclosed);
1484 }
1485
1486 uint32_t noblockcnt;
1487
1488 /*
1489  *  add a block to a queue obeying flow control
1490  */
1491 long qbwrite(struct queue *q, struct block *b)
1492 {
1493         ERRSTACK(1);
1494         int n, dowakeup;
1495         volatile bool should_free_b = TRUE;
1496
1497         n = BLEN(b);
1498
1499         if (q->bypass) {
1500                 (*q->bypass) (q->arg, b);
1501                 return n;
1502         }
1503
1504         dowakeup = 0;
1505         qlock(&q->wlock);
1506         if (waserror()) {
1507                 if (b != NULL && should_free_b)
1508                         freeb(b);
1509                 qunlock(&q->wlock);
1510                 nexterror();
1511         }
1512
1513         spin_lock_irqsave(&q->lock);
1514
1515         /* give up if the queue is closed */
1516         if (q->state & Qclosed) {
1517                 spin_unlock_irqsave(&q->lock);
1518                 error(q->err);
1519         }
1520
1521         /* if nonblocking, don't queue over the limit */
1522         if (q->len >= q->limit) {
1523                 if (q->noblock) {
1524                         spin_unlock_irqsave(&q->lock);
1525                         freeb(b);
1526                         noblockcnt += n;
1527                         qunlock(&q->wlock);
1528                         poperror();
1529                         return n;
1530                 }
1531         }
1532
1533         /* queue the block */
1534         should_free_b = FALSE;
1535         if (q->bfirst)
1536                 q->blast->next = b;
1537         else
1538                 q->bfirst = b;
1539         q->blast = b;
1540         b->next = 0;
1541         q->len += BALLOC(b);
1542         q->dlen += n;
1543         QDEBUG checkb(b, "qbwrite");
1544         b = NULL;
1545
1546         /* make sure other end gets awakened */
1547         if (q->state & Qstarve) {
1548                 q->state &= ~Qstarve;
1549                 dowakeup = 1;
1550         }
1551         spin_unlock_irqsave(&q->lock);
1552
1553         /*  get output going again */
1554         if (q->kick && (dowakeup || (q->state & Qkick)))
1555                 q->kick(q->arg);
1556
1557         /* wakeup anyone consuming at the other end */
1558         if (dowakeup)
1559                 rendez_wakeup(&q->rr);
1560
1561         /*
1562          *  flow control, wait for queue to get below the limit
1563          *  before allowing the process to continue and queue
1564          *  more.  We do this here so that postnote can only
1565          *  interrupt us after the data has been queued.  This
1566          *  means that things like 9p flushes and ssl messages
1567          *  will not be disrupted by software interrupts.
1568          *
1569          *  Note - this is moderately dangerous since a process
1570          *  that keeps getting interrupted and rewriting will
1571          *  queue infinite crud.
1572          */
1573         for (;;) {
1574                 if (q->noblock || qnotfull(q))
1575                         break;
1576
1577                 spin_lock_irqsave(&q->lock);
1578                 q->state |= Qflow;
1579                 spin_unlock_irqsave(&q->lock);
1580                 rendez_sleep(&q->wr, qnotfull, q);
1581         }
1582
1583         qunlock(&q->wlock);
1584         poperror();
1585         return n;
1586 }
1587
1588 long qibwrite(struct queue *q, struct block *b)
1589 {
1590         int n, dowakeup;
1591
1592         dowakeup = 0;
1593
1594         n = BLEN(b);
1595
1596         spin_lock_irqsave(&q->lock);
1597
1598         QDEBUG checkb(b, "qibwrite");
1599         if (q->bfirst)
1600                 q->blast->next = b;
1601         else
1602                 q->bfirst = b;
1603         q->blast = b;
1604         q->len += BALLOC(b);
1605         q->dlen += n;
1606
1607         if (q->state & Qstarve) {
1608                 q->state &= ~Qstarve;
1609                 dowakeup = 1;
1610         }
1611
1612         spin_unlock_irqsave(&q->lock);
1613
1614         if (dowakeup) {
1615                 if (q->kick)
1616                         q->kick(q->arg);
1617                 rendez_wakeup(&q->rr);
1618         }
1619
1620         return n;
1621 }
1622
1623 /*
1624  *  write to a queue.  only Maxatomic bytes at a time is atomic.
1625  */
1626 int qwrite(struct queue *q, void *vp, int len)
1627 {
1628         int n, sofar;
1629         struct block *b;
1630         uint8_t *p = vp;
1631         void *ext_buf;
1632
1633         QDEBUG if (!islo())
1634                  printd("qwrite hi %p\n", getcallerpc(&q));
1635
1636         sofar = 0;
1637         do {
1638                 n = len - sofar;
1639                 /* This is 64K, the max amount per single block.  Still a good value? */
1640                 if (n > Maxatomic)
1641                         n = Maxatomic;
1642
1643                 /* If n is small, we don't need to bother with the extra_data.  But
1644                  * until the whole stack can handle extd blocks, we'll use them
1645                  * unconditionally. */
1646 #ifdef CONFIG_BLOCK_EXTRAS
1647                 /* allocb builds in 128 bytes of header space to all blocks, but this is
1648                  * only available via padblock (to the left).  we also need some space
1649                  * for pullupblock for some basic headers (like icmp) that get written
1650                  * in directly */
1651                 b = allocb(64);
1652                 ext_buf = kmalloc(n, 0);
1653                 memcpy(ext_buf, p + sofar, n);
1654                 block_add_extd(b, 1, KMALLOC_WAIT); /* returns 0 on success */
1655                 b->extra_data[0].base = (uintptr_t)ext_buf;
1656                 b->extra_data[0].off = 0;
1657                 b->extra_data[0].len = n;
1658                 b->extra_len += n;
1659 #else
1660                 b = allocb(n);
1661                 memmove(b->wp, p + sofar, n);
1662                 b->wp += n;
1663 #endif
1664                         
1665                 qbwrite(q, b);
1666
1667                 sofar += n;
1668         } while (sofar < len && (q->state & Qmsg) == 0);
1669
1670         return len;
1671 }
1672
1673 /*
1674  *  used by print() to write to a queue.  Since we may be splhi or not in
1675  *  a process, don't qlock.
1676  */
1677 int qiwrite(struct queue *q, void *vp, int len)
1678 {
1679         int n, sofar, dowakeup;
1680         struct block *b;
1681         uint8_t *p = vp;
1682
1683         dowakeup = 0;
1684
1685         sofar = 0;
1686         do {
1687                 n = len - sofar;
1688                 if (n > Maxatomic)
1689                         n = Maxatomic;
1690
1691                 b = iallocb(n);
1692                 if (b == NULL)
1693                         break;
1694                 /* TODO consider extra_data */
1695                 memmove(b->wp, p + sofar, n);
1696                 /* this adjusts BLEN to be n, or at least it should */
1697                 b->wp += n;
1698                 assert(n == BLEN(b));
1699                 qibwrite(q, b);
1700
1701                 sofar += n;
1702         } while (sofar < len && (q->state & Qmsg) == 0);
1703
1704         return sofar;
1705 }
1706
1707 /*
1708  *  be extremely careful when calling this,
1709  *  as there is no reference accounting
1710  */
1711 void qfree(struct queue *q)
1712 {
1713         qclose(q);
1714         kfree(q);
1715 }
1716
1717 /*
1718  *  Mark a queue as closed.  No further IO is permitted.
1719  *  All blocks are released.
1720  */
1721 void qclose(struct queue *q)
1722 {
1723         struct block *bfirst;
1724
1725         if (q == NULL)
1726                 return;
1727
1728         /* mark it */
1729         spin_lock_irqsave(&q->lock);
1730         q->state |= Qclosed;
1731         q->state &= ~(Qflow | Qstarve);
1732         strncpy(q->err, Ehungup, sizeof(q->err));
1733         bfirst = q->bfirst;
1734         q->bfirst = 0;
1735         q->len = 0;
1736         q->dlen = 0;
1737         q->noblock = 0;
1738         spin_unlock_irqsave(&q->lock);
1739
1740         /* free queued blocks */
1741         freeblist(bfirst);
1742
1743         /* wake up readers/writers */
1744         rendez_wakeup(&q->rr);
1745         rendez_wakeup(&q->wr);
1746 }
1747
1748 /*
1749  *  Mark a queue as closed.  Wakeup any readers.  Don't remove queued
1750  *  blocks.
1751  */
1752 void qhangup(struct queue *q, char *msg)
1753 {
1754         /* mark it */
1755         spin_lock_irqsave(&q->lock);
1756         q->state |= Qclosed;
1757         if (msg == 0 || *msg == 0)
1758                 strncpy(q->err, Ehungup, sizeof(q->err));
1759         else
1760                 strncpy(q->err, msg, ERRMAX - 1);
1761         spin_unlock_irqsave(&q->lock);
1762
1763         /* wake up readers/writers */
1764         rendez_wakeup(&q->rr);
1765         rendez_wakeup(&q->wr);
1766 }
1767
1768 /*
1769  *  return non-zero if the q is hungup
1770  */
1771 int qisclosed(struct queue *q)
1772 {
1773         return q->state & Qclosed;
1774 }
1775
1776 /*
1777  *  mark a queue as no longer hung up
1778  */
1779 void qreopen(struct queue *q)
1780 {
1781         spin_lock_irqsave(&q->lock);
1782         q->state &= ~Qclosed;
1783         q->state |= Qstarve;
1784         q->eof = 0;
1785         q->limit = q->inilim;
1786         spin_unlock_irqsave(&q->lock);
1787 }
1788
1789 /*
1790  *  return bytes queued
1791  */
1792 int qlen(struct queue *q)
1793 {
1794         return q->dlen;
1795 }
1796
1797 /*
1798  * return space remaining before flow control
1799  */
1800 int qwindow(struct queue *q)
1801 {
1802         int l;
1803
1804         l = q->limit - q->len;
1805         if (l < 0)
1806                 l = 0;
1807         return l;
1808 }
1809
1810 /*
1811  *  return true if we can read without blocking
1812  */
1813 int qcanread(struct queue *q)
1814 {
1815         return q->bfirst != 0;
1816 }
1817
1818 /*
1819  *  change queue limit
1820  */
1821 void qsetlimit(struct queue *q, int limit)
1822 {
1823         q->limit = limit;
1824 }
1825
1826 /*
1827  *  set blocking/nonblocking
1828  */
1829 void qnoblock(struct queue *q, int onoff)
1830 {
1831         q->noblock = onoff;
1832 }
1833
1834 /*
1835  *  flush the output queue
1836  */
1837 void qflush(struct queue *q)
1838 {
1839         struct block *bfirst;
1840
1841         /* mark it */
1842         spin_lock_irqsave(&q->lock);
1843         bfirst = q->bfirst;
1844         q->bfirst = 0;
1845         q->len = 0;
1846         q->dlen = 0;
1847         spin_unlock_irqsave(&q->lock);
1848
1849         /* free queued blocks */
1850         freeblist(bfirst);
1851
1852         /* wake up readers/writers */
1853         rendez_wakeup(&q->wr);
1854 }
1855
1856 int qfull(struct queue *q)
1857 {
1858         return q->state & Qflow;
1859 }
1860
1861 int qstate(struct queue *q)
1862 {
1863         return q->state;
1864 }
1865
1866 void qdump(struct queue *q)
1867 {
1868         if (q)
1869                 printk("q=%p bfirst=%p blast=%p len=%d dlen=%d limit=%d state=#%x\n",
1870                            q, q->bfirst, q->blast, q->len, q->dlen, q->limit, q->state);
1871 }