f90f2d17834f2dd793d697b8bdd05decb4c60854
[akaros.git] / kern / src / ns / qio.c
1 // INFERNO
2 #include <vfs.h>
3 #include <kfs.h>
4 #include <slab.h>
5 #include <kmalloc.h>
6 #include <kref.h>
7 #include <string.h>
8 #include <stdio.h>
9 #include <assert.h>
10 #include <error.h>
11 #include <cpio.h>
12 #include <pmap.h>
13 #include <smp.h>
14 #include <ip.h>
15
16 #define WARN_EXTRA(b)                                                          \
17 {                                                                              \
18         if ((b)->extra_len)                                                        \
19                 warn_once("%s doesn't handle extra_data", __FUNCTION__);               \
20 }
21
22 static uint32_t padblockcnt;
23 static uint32_t concatblockcnt;
24 static uint32_t pullupblockcnt;
25 static uint32_t copyblockcnt;
26 static uint32_t consumecnt;
27 static uint32_t producecnt;
28 static uint32_t qcopycnt;
29
30 static int debugging;
31
32 #define QDEBUG  if(0)
33
34 /*
35  *  IO queues
36  */
37
38 struct queue {
39         spinlock_t lock;;
40
41         struct block *bfirst;           /* buffer */
42         struct block *blast;
43
44         int len;                                        /* bytes allocated to queue */
45         int dlen;                                       /* data bytes in queue */
46         int limit;                                      /* max bytes in queue */
47         int inilim;                             /* initial limit */
48         int state;
49         int noblock;                            /* true if writes return immediately when q full */
50         int eof;                                        /* number of eofs read by user */
51
52         void (*kick) (void *);          /* restart output */
53         void (*bypass) (void *, struct block *);        /* bypass queue altogether */
54         void *arg;                                      /* argument to kick */
55
56         qlock_t rlock;                          /* mutex for reading processes */
57         struct rendez rr;                       /* process waiting to read */
58         qlock_t wlock;                          /* mutex for writing processes */
59         struct rendez wr;                       /* process waiting to write */
60
61         char err[ERRMAX];
62 };
63
64 enum {
65         Maxatomic = 64 * 1024,
66 };
67
68 unsigned int qiomaxatomic = Maxatomic;
69
70 void ixsummary(void)
71 {
72         debugging ^= 1;
73         iallocsummary();
74         printd("pad %lu, concat %lu, pullup %lu, copy %lu\n",
75                    padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
76         printd("consume %lu, produce %lu, qcopy %lu\n",
77                    consumecnt, producecnt, qcopycnt);
78 }
79
80 /*
81  *  free a list of blocks
82  */
83 void freeblist(struct block *b)
84 {
85         struct block *next;
86
87         for (; b != 0; b = next) {
88                 next = b->next;
89                 b->next = 0;
90                 freeb(b);
91         }
92 }
93
94 /*
95  *  pad a block to the front (or the back if size is negative)
96  */
97 struct block *padblock(struct block *bp, int size)
98 {
99         int n;
100         struct block *nbp;
101         uint8_t bcksum = bp->flag & BCKSUM_FLAGS;
102         uint16_t checksum_start = bp->checksum_start;
103         uint16_t checksum_offset = bp->checksum_offset;
104
105         QDEBUG checkb(bp, "padblock 1");
106         if (size >= 0) {
107                 if (bp->rp - bp->base >= size) {
108                         bp->checksum_start += size;
109                         bp->rp -= size;
110                         return bp;
111                 }
112
113                 WARN_EXTRA(bp);
114                 if (bp->next)
115                         panic("padblock %p", getcallerpc(&bp));
116                 n = BLEN(bp);
117                 padblockcnt++;
118                 nbp = allocb(size + n);
119                 nbp->rp += size;
120                 nbp->wp = nbp->rp;
121                 memmove(nbp->wp, bp->rp, n);
122                 nbp->wp += n;
123                 freeb(bp);
124                 nbp->rp -= size;
125         } else {
126                 size = -size;
127
128                 WARN_EXTRA(bp);
129
130                 if (bp->next)
131                         panic("padblock %p", getcallerpc(&bp));
132
133                 if (bp->lim - bp->wp >= size)
134                         return bp;
135
136                 n = BLEN(bp);
137                 padblockcnt++;
138                 nbp = allocb(size + n);
139                 memmove(nbp->wp, bp->rp, n);
140                 nbp->wp += n;
141                 freeb(bp);
142         }
143         if (bcksum) {
144                 nbp->flag |= bcksum;
145                 nbp->checksum_start = checksum_start;
146                 nbp->checksum_offset = checksum_offset;
147         }
148         QDEBUG checkb(nbp, "padblock 1");
149         return nbp;
150 }
151
152 /*
153  *  return count of bytes in a string of blocks
154  */
155 int blocklen(struct block *bp)
156 {
157         int len;
158
159         len = 0;
160         while (bp) {
161                 len += BLEN(bp);
162                 bp = bp->next;
163         }
164         return len;
165 }
166
167 /*
168  * return count of space in blocks
169  */
170 int blockalloclen(struct block *bp)
171 {
172         int len;
173
174         len = 0;
175         while (bp) {
176                 len += BALLOC(bp);
177                 bp = bp->next;
178         }
179         return len;
180 }
181
182 /*
183  *  copy the  string of blocks into
184  *  a single block and free the string
185  */
186 struct block *concatblock(struct block *bp)
187 {
188         int len;
189         struct block *nb, *f;
190
191         if (bp->next == 0)
192                 return bp;
193
194         /* probably use parts of qclone */
195         WARN_EXTRA(bp);
196         nb = allocb(blocklen(bp));
197         for (f = bp; f; f = f->next) {
198                 len = BLEN(f);
199                 memmove(nb->wp, f->rp, len);
200                 nb->wp += len;
201         }
202         concatblockcnt += BLEN(nb);
203         freeblist(bp);
204         QDEBUG checkb(nb, "concatblock 1");
205         return nb;
206 }
207
208 /* Returns a block with the remaining contents of b all in the main body of the
209  * returned block.  Replace old references to b with the returned value (which
210  * may still be 'b', if no change was needed. */
211 struct block *linearizeblock(struct block *b)
212 {
213         struct block *newb;
214         size_t len;
215         struct extra_bdata *ebd;
216
217         if (!b->extra_len)
218                 return b;
219
220         newb = allocb(BLEN(b));
221         len = BHLEN(b);
222         memcpy(newb->wp, b->rp, len);
223         newb->wp += len;
224         len = b->extra_len;
225         for (int i = 0; (i < b->nr_extra_bufs) && len; i++) {
226                 ebd = &b->extra_data[i];
227                 if (!ebd->base || !ebd->len)
228                         continue;
229                 memcpy(newb->wp, (void*)(ebd->base + ebd->off), ebd->len);
230                 newb->wp += ebd->len;
231                 len -= ebd->len;
232         }
233         /* TODO: any other flags that need copied over? */
234         if (b->flag & BCKSUM_FLAGS) {
235                 newb->flag |= (b->flag & BCKSUM_FLAGS);
236                 newb->checksum_start = b->checksum_start;
237                 newb->checksum_offset = b->checksum_offset;
238         }
239         freeb(b);
240         return newb;
241 }
242
243 /*
244  *  make sure the first block has at least n bytes in its main body
245  */
246 struct block *pullupblock(struct block *bp, int n)
247 {
248         int i, len, seglen;
249         struct block *nbp;
250         struct extra_bdata *ebd;
251
252         /*
253          *  this should almost always be true, it's
254          *  just to avoid every caller checking.
255          */
256         if (BHLEN(bp) >= n)
257                 return bp;
258
259          /* a start at explicit main-body / header management */
260         if (bp->extra_len) {
261                 if (n > bp->lim - bp->rp) {
262                         /* would need to realloc a new block and copy everything over. */
263                         panic("can't pullup, no place to put it\n");
264                 }
265                 len = n - BHLEN(bp);
266                 if (len > bp->extra_len)
267                         panic("pullup more than extra (%d, %d, %d)\n",
268                               n, BHLEN(bp), bp->extra_len);
269                 checkb(bp, "before pullup");
270                 for (int i = 0; (i < bp->nr_extra_bufs) && len; i++) {
271                         ebd = &bp->extra_data[i];
272                         if (!ebd->base || !ebd->len)
273                                 continue;
274                         seglen = MIN(ebd->len, len);
275                         memcpy(bp->wp, (void*)(ebd->base + ebd->off), seglen);
276                         bp->wp += seglen;
277                         len -= seglen;
278                         ebd->len -= seglen;
279                         ebd->off += seglen;
280                         bp->extra_len -= seglen;
281                         if (ebd->len == 0) {
282                                 kfree((void *)ebd->base);
283                                 ebd->len = 0;
284                                 ebd->off = 0;
285                         }
286                 }
287                 /* maybe just call pullupblock recursively here */
288                 if (len)
289                         panic("pullup %d bytes overdrawn\n", len);
290                 checkb(bp, "after pullup");
291                 return bp;
292         }
293
294         /*
295          *  if not enough room in the first block,
296          *  add another to the front of the list.
297          */
298         if (bp->lim - bp->rp < n) {
299                 nbp = allocb(n);
300                 nbp->next = bp;
301                 bp = nbp;
302         }
303
304         /*
305          *  copy bytes from the trailing blocks into the first
306          */
307         n -= BLEN(bp);
308         while ((nbp = bp->next)) {
309                 i = BLEN(nbp);
310                 if (i > n) {
311                         memmove(bp->wp, nbp->rp, n);
312                         pullupblockcnt++;
313                         bp->wp += n;
314                         nbp->rp += n;
315                         QDEBUG checkb(bp, "pullupblock 1");
316                         return bp;
317                 } else {
318                         memmove(bp->wp, nbp->rp, i);
319                         pullupblockcnt++;
320                         bp->wp += i;
321                         bp->next = nbp->next;
322                         nbp->next = 0;
323                         freeb(nbp);
324                         n -= i;
325                         if (n == 0) {
326                                 QDEBUG checkb(bp, "pullupblock 2");
327                                 return bp;
328                         }
329                 }
330         }
331         freeb(bp);
332         return 0;
333 }
334
335 /*
336  *  make sure the first block has at least n bytes
337  */
338 struct block *pullupqueue(struct queue *q, int n)
339 {
340         struct block *b;
341
342         /* TODO: lock to protect the queue links? */
343         if ((BLEN(q->bfirst) >= n))
344                 return q->bfirst;
345         q->bfirst = pullupblock(q->bfirst, n);
346         for (b = q->bfirst; b != NULL && b->next != NULL; b = b->next) ;
347         q->blast = b;
348         return q->bfirst;
349 }
350
351 /*
352  *  trim to len bytes starting at offset
353  */
354 struct block *trimblock(struct block *bp, int offset, int len)
355 {
356         uint32_t l;
357         struct block *nb, *startb;
358
359         QDEBUG checkb(bp, "trimblock 1");
360         if (blocklen(bp) < offset + len) {
361                 freeblist(bp);
362                 return NULL;
363         }
364
365         while ((l = BLEN(bp)) < offset) {
366                 offset -= l;
367                 nb = bp->next;
368                 bp->next = NULL;
369                 freeb(bp);
370                 bp = nb;
371         }
372
373         WARN_EXTRA(bp);
374         startb = bp;
375         bp->rp += offset;
376
377         while ((l = BLEN(bp)) < len) {
378                 len -= l;
379                 bp = bp->next;
380         }
381
382         bp->wp -= (BLEN(bp) - len);
383
384         if (bp->next) {
385                 freeblist(bp->next);
386                 bp->next = NULL;
387         }
388
389         return startb;
390 }
391
392 /*
393  *  copy 'count' bytes into a new block
394  */
395 struct block *copyblock(struct block *bp, int count)
396 {
397         int l;
398         struct block *nbp;
399
400         QDEBUG checkb(bp, "copyblock 0");
401         nbp = allocb(count);
402         if (bp->flag & BCKSUM_FLAGS) {
403                 nbp->flag |= (bp->flag & BCKSUM_FLAGS);
404                 nbp->checksum_start = bp->checksum_start;
405                 nbp->checksum_offset = bp->checksum_offset;
406         }
407         WARN_EXTRA(bp);
408         for (; count > 0 && bp != 0; bp = bp->next) {
409                 l = BLEN(bp);
410                 if (l > count)
411                         l = count;
412                 memmove(nbp->wp, bp->rp, l);
413                 nbp->wp += l;
414                 count -= l;
415         }
416         if (count > 0) {
417                 memset(nbp->wp, 0, count);
418                 nbp->wp += count;
419         }
420         copyblockcnt++;
421         QDEBUG checkb(nbp, "copyblock 1");
422
423         return nbp;
424 }
425
426 struct block *adjustblock(struct block *bp, int len)
427 {
428         int n;
429         struct block *nbp;
430
431         if (len < 0) {
432                 freeb(bp);
433                 return NULL;
434         }
435
436         WARN_EXTRA(bp);
437         if (bp->rp + len > bp->lim) {
438                 nbp = copyblock(bp, len);
439                 freeblist(bp);
440                 QDEBUG checkb(nbp, "adjustblock 1");
441
442                 return nbp;
443         }
444
445         n = BLEN(bp);
446         if (len > n)
447                 memset(bp->wp, 0, len - n);
448         bp->wp = bp->rp + len;
449         QDEBUG checkb(bp, "adjustblock 2");
450
451         return bp;
452 }
453
454 /*
455  *  throw away up to count bytes from a
456  *  list of blocks.  Return count of bytes
457  *  thrown away.
458  */
459 int pullblock(struct block **bph, int count)
460 {
461         struct block *bp;
462         int n, bytes;
463
464         bytes = 0;
465         if (bph == NULL)
466                 return 0;
467
468         while (*bph != NULL && count != 0) {
469                 bp = *bph;
470         WARN_EXTRA(bp);
471
472                 n = BLEN(bp);
473                 if (count < n)
474                         n = count;
475                 bytes += n;
476                 count -= n;
477                 bp->rp += n;
478                 QDEBUG checkb(bp, "pullblock ");
479                 if (BLEN(bp) == 0) {
480                         *bph = bp->next;
481                         bp->next = NULL;
482                         freeb(bp);
483                 }
484         }
485         return bytes;
486 }
487
488 /*
489  *  get next block from a queue, return null if nothing there
490  */
491 struct block *qget(struct queue *q)
492 {
493         int dowakeup;
494         struct block *b;
495
496         /* sync with qwrite */
497         spin_lock_irqsave(&q->lock);
498
499         b = q->bfirst;
500         if (b == NULL) {
501                 q->state |= Qstarve;
502                 spin_unlock_irqsave(&q->lock);
503                 return NULL;
504         }
505         q->bfirst = b->next;
506         b->next = 0;
507         q->len -= BALLOC(b);
508         q->dlen -= BLEN(b);
509         QDEBUG checkb(b, "qget");
510
511         /* if writer flow controlled, restart */
512         if ((q->state & Qflow) && q->len < q->limit / 2) {
513                 q->state &= ~Qflow;
514                 dowakeup = 1;
515         } else
516                 dowakeup = 0;
517
518         spin_unlock_irqsave(&q->lock);
519
520         if (dowakeup)
521                 rendez_wakeup(&q->wr);
522
523         return b;
524 }
525
526 /*
527  *  throw away the next 'len' bytes in the queue
528  * returning the number actually discarded
529  */
530 int qdiscard(struct queue *q, int len)
531 {
532         struct block *b;
533         int dowakeup, n, sofar, body_amt, extra_amt;
534         struct extra_bdata *ebd;
535
536         spin_lock_irqsave(&q->lock);
537         for (sofar = 0; sofar < len; sofar += n) {
538                 b = q->bfirst;
539                 if (b == NULL)
540                         break;
541                 QDEBUG checkb(b, "qdiscard");
542                 n = BLEN(b);
543                 if (n <= len - sofar) {
544                         q->bfirst = b->next;
545                         b->next = 0;
546                         q->len -= BALLOC(b);
547                         q->dlen -= BLEN(b);
548                         freeb(b);
549                 } else {
550                         n = len - sofar;
551                         q->dlen -= n;
552                         /* partial block removal */
553                         body_amt = MIN(BHLEN(b), n);
554                         b->rp += body_amt;
555                         extra_amt = n - body_amt;
556                         /* reduce q->len by the amount we remove from the extras.  The
557                          * header will always be accounted for above, during block removal.
558                          * */
559                         q->len -= extra_amt;
560                         for (int i = 0; (i < b->nr_extra_bufs) && extra_amt; i++) {
561                                 ebd = &b->extra_data[i];
562                                 if (!ebd->base || !ebd->len)
563                                         continue;
564                                 if (extra_amt >= ebd->len) {
565                                         /* remove the entire entry, note the kfree release */
566                                         b->extra_len -= ebd->len;
567                                         extra_amt -= ebd->len;
568                                         kfree((void*)ebd->base);
569                                         ebd->base = ebd->off = ebd->len = 0;
570                                         continue;
571                                 }
572                                 ebd->off += extra_amt;
573                                 ebd->len -= extra_amt;
574                                 b->extra_len -= extra_amt;
575                                 extra_amt = 0;
576                         }
577                 }
578         }
579
580         /*
581          *  if writer flow controlled, restart
582          *
583          *  This used to be
584          *  q->len < q->limit/2
585          *  but it slows down tcp too much for certain write sizes.
586          *  I really don't understand it completely.  It may be
587          *  due to the queue draining so fast that the transmission
588          *  stalls waiting for the app to produce more data.  - presotto
589          */
590         if ((q->state & Qflow) && q->len < q->limit) {
591                 q->state &= ~Qflow;
592                 dowakeup = 1;
593         } else
594                 dowakeup = 0;
595
596         spin_unlock_irqsave(&q->lock);
597
598         if (dowakeup)
599                 rendez_wakeup(&q->wr);
600
601         return sofar;
602 }
603
604 /*
605  *  Interrupt level copy out of a queue, return # bytes copied.
606  */
607 int qconsume(struct queue *q, void *vp, int len)
608 {
609         struct block *b;
610         int n, dowakeup;
611         uint8_t *p = vp;
612         struct block *tofree = NULL;
613
614         /* sync with qwrite */
615         spin_lock_irqsave(&q->lock);
616
617         for (;;) {
618                 b = q->bfirst;
619                 if (b == 0) {
620                         q->state |= Qstarve;
621                         spin_unlock_irqsave(&q->lock);
622                         return -1;
623                 }
624                 QDEBUG checkb(b, "qconsume 1");
625
626                 n = BLEN(b);
627                 if (n > 0)
628                         break;
629                 q->bfirst = b->next;
630                 q->len -= BALLOC(b);
631
632                 /* remember to free this */
633                 b->next = tofree;
634                 tofree = b;
635         };
636
637         WARN_EXTRA(b);
638         if (n < len)
639                 len = n;
640         memmove(p, b->rp, len);
641         consumecnt += n;
642         b->rp += len;
643         q->dlen -= len;
644
645         /* discard the block if we're done with it */
646         if ((q->state & Qmsg) || len == n) {
647                 q->bfirst = b->next;
648                 b->next = 0;
649                 q->len -= BALLOC(b);
650                 q->dlen -= BLEN(b);
651
652                 /* remember to free this */
653                 b->next = tofree;
654                 tofree = b;
655         }
656
657         /* if writer flow controlled, restart */
658         if ((q->state & Qflow) && q->len < q->limit / 2) {
659                 q->state &= ~Qflow;
660                 dowakeup = 1;
661         } else
662                 dowakeup = 0;
663
664         spin_unlock_irqsave(&q->lock);
665
666         if (dowakeup)
667                 rendez_wakeup(&q->wr);
668
669         if (tofree != NULL)
670                 freeblist(tofree);
671
672         return len;
673 }
674
675 int qpass(struct queue *q, struct block *b)
676 {
677         int dlen, len, dowakeup;
678
679         /* sync with qread */
680         dowakeup = 0;
681         spin_lock_irqsave(&q->lock);
682         if (q->len >= q->limit) {
683                 freeblist(b);
684                 spin_unlock_irqsave(&q->lock);
685                 return -1;
686         }
687         if (q->state & Qclosed) {
688                 len = blocklen(b);
689                 freeblist(b);
690                 spin_unlock_irqsave(&q->lock);
691                 return len;
692         }
693
694         /* add buffer to queue */
695         if (q->bfirst)
696                 q->blast->next = b;
697         else
698                 q->bfirst = b;
699         len = BALLOC(b);
700         dlen = BLEN(b);
701         QDEBUG checkb(b, "qpass");
702         while (b->next) {
703                 b = b->next;
704                 QDEBUG checkb(b, "qpass");
705                 len += BALLOC(b);
706                 dlen += BLEN(b);
707         }
708         q->blast = b;
709         q->len += len;
710         q->dlen += dlen;
711
712         if (q->len >= q->limit / 2)
713                 q->state |= Qflow;
714
715         if (q->state & Qstarve) {
716                 q->state &= ~Qstarve;
717                 dowakeup = 1;
718         }
719         spin_unlock_irqsave(&q->lock);
720
721         if (dowakeup)
722                 rendez_wakeup(&q->rr);
723
724         return len;
725 }
726
727 int qpassnolim(struct queue *q, struct block *b)
728 {
729         int dlen, len, dowakeup;
730
731         /* sync with qread */
732         dowakeup = 0;
733         spin_lock_irqsave(&q->lock);
734
735         if (q->state & Qclosed) {
736                 freeblist(b);
737                 spin_unlock_irqsave(&q->lock);
738                 return BALLOC(b);
739         }
740
741         /* add buffer to queue */
742         if (q->bfirst)
743                 q->blast->next = b;
744         else
745                 q->bfirst = b;
746         len = BALLOC(b);
747         dlen = BLEN(b);
748         QDEBUG checkb(b, "qpass");
749         while (b->next) {
750                 b = b->next;
751                 QDEBUG checkb(b, "qpass");
752                 len += BALLOC(b);
753                 dlen += BLEN(b);
754         }
755         q->blast = b;
756         q->len += len;
757         q->dlen += dlen;
758
759         if (q->len >= q->limit / 2)
760                 q->state |= Qflow;
761
762         if (q->state & Qstarve) {
763                 q->state &= ~Qstarve;
764                 dowakeup = 1;
765         }
766         spin_unlock_irqsave(&q->lock);
767
768         if (dowakeup)
769                 rendez_wakeup(&q->rr);
770
771         return len;
772 }
773
774 /*
775  *  if the allocated space is way out of line with the used
776  *  space, reallocate to a smaller block
777  */
778 struct block *packblock(struct block *bp)
779 {
780         struct block **l, *nbp;
781         int n;
782
783         WARN_EXTRA(bp);
784         for (l = &bp; *l; l = &(*l)->next) {
785                 nbp = *l;
786                 n = BLEN(nbp);
787                 if ((n << 2) < BALLOC(nbp)) {
788                         *l = allocb(n);
789                         memmove((*l)->wp, nbp->rp, n);
790                         (*l)->wp += n;
791                         (*l)->next = nbp->next;
792                         freeb(nbp);
793                 }
794         }
795
796         return bp;
797 }
798
799 int qproduce(struct queue *q, void *vp, int len)
800 {
801         struct block *b;
802         int dowakeup;
803         uint8_t *p = vp;
804
805         /* sync with qread */
806         dowakeup = 0;
807         spin_lock_irqsave(&q->lock);
808
809         /* no waiting receivers, room in buffer? */
810         if (q->len >= q->limit) {
811                 q->state |= Qflow;
812                 spin_unlock_irqsave(&q->lock);
813                 return -1;
814         }
815
816         /* save in buffer */
817         /* use Qcoalesce here to save storage */
818         // TODO: Consider removing the Qcoalesce flag and force a coalescing
819         // strategy by default.
820         b = q->blast;
821         if ((q->state & Qcoalesce) == 0 || q->bfirst == NULL
822                 || b->lim - b->wp < len) {
823                 /* need a new block */
824                 b = iallocb(len);
825                 if (b == 0) {
826                         spin_unlock_irqsave(&q->lock);
827                         return 0;
828                 }
829                 if (q->bfirst)
830                         q->blast->next = b;
831                 else
832                         q->bfirst = b;
833                 q->blast = b;
834                 /* b->next = 0; done by iallocb() */
835                 q->len += BALLOC(b);
836         }
837         WARN_EXTRA(b);
838         memmove(b->wp, p, len);
839         producecnt += len;
840         b->wp += len;
841         q->dlen += len;
842         QDEBUG checkb(b, "qproduce");
843
844         if (q->state & Qstarve) {
845                 q->state &= ~Qstarve;
846                 dowakeup = 1;
847         }
848
849         if (q->len >= q->limit)
850                 q->state |= Qflow;
851         spin_unlock_irqsave(&q->lock);
852
853         if (dowakeup)
854                 rendez_wakeup(&q->rr);
855
856         return len;
857 }
858
859 /* Add an extra_data entry to newb at newb_idx pointing to b's body, starting at
860  * body_rp, for up to len.  Returns the len consumed. 
861  *
862  * The base is 'b', so that we can kfree it later.  This currently ties us to
863  * using kfree for the release method for all extra_data.
864  *
865  * It is possible to have a body size that is 0, if there is no offset, and
866  * b->wp == b->rp.  This will have an extra data entry of 0 length. */
867 static size_t point_to_body(struct block *b, uint8_t *body_rp,
868                             struct block *newb, unsigned int newb_idx,
869                             size_t len)
870 {
871         struct extra_bdata *ebd = &newb->extra_data[newb_idx];
872
873         assert(newb_idx < newb->nr_extra_bufs);
874
875         kmalloc_incref(b);
876         ebd->base = (uintptr_t)b;
877         ebd->off = (uint32_t)(body_rp - (uint8_t*)b);
878         ebd->len = MIN(b->wp - body_rp, len);   /* think of body_rp as b->rp */
879         assert((int)ebd->len >= 0);
880         newb->extra_len += ebd->len;
881         return ebd->len;
882 }
883
884 /* Add an extra_data entry to newb at newb_idx pointing to b's b_idx'th
885  * extra_data buf, at b_off within that buffer, for up to len.  Returns the len
886  * consumed.
887  *
888  * We can have blocks with 0 length, but they are still refcnt'd.  See above. */
889 static size_t point_to_buf(struct block *b, unsigned int b_idx, uint32_t b_off,
890                            struct block *newb, unsigned int newb_idx,
891                            size_t len)
892 {
893         struct extra_bdata *n_ebd = &newb->extra_data[newb_idx];
894         struct extra_bdata *b_ebd = &b->extra_data[b_idx];
895
896         assert(b_idx < b->nr_extra_bufs);
897         assert(newb_idx < newb->nr_extra_bufs);
898
899         kmalloc_incref((void*)b_ebd->base);
900         n_ebd->base = b_ebd->base;
901         n_ebd->off = b_ebd->off + b_off;
902         n_ebd->len = MIN(b_ebd->len - b_off, len);
903         newb->extra_len += n_ebd->len;
904         return n_ebd->len;
905 }
906
907 /* given a string of blocks, fills the new block's extra_data  with the contents
908  * of the blist [offset, len + offset)
909  *
910  * returns 0 on success.  the only failure is if the extra_data array was too
911  * small, so this returns a positive integer saying how big the extra_data needs
912  * to be.
913  *
914  * callers are responsible for protecting the list structure. */
915 static int __blist_clone_to(struct block *blist, struct block *newb, int len,
916                             uint32_t offset)
917 {
918         struct block *b, *first;
919         unsigned int nr_bufs = 0;
920         unsigned int b_idx, newb_idx = 0;
921         uint8_t *first_main_body = 0;
922
923         /* find the first block; keep offset relative to the latest b in the list */
924         for (b = blist; b; b = b->next) {
925                 if (BLEN(b) > offset)
926                         break;
927                 offset -= BLEN(b);
928         }
929         /* qcopy semantics: if you asked for an offset outside the block list, you
930          * get an empty block back */
931         if (!b)
932                 return 0;
933         first = b;
934         /* upper bound for how many buffers we'll need in newb */
935         for (/* b is set*/; b; b = b->next) {
936                 nr_bufs += 1 + b->nr_extra_bufs;        /* 1 for the main body */
937         }
938         /* we might be holding a spinlock here, so we won't wait for kmalloc */
939         block_add_extd(newb, nr_bufs, 0);
940         if (newb->nr_extra_bufs < nr_bufs) {
941                 /* caller will need to alloc these, then re-call us */
942                 return nr_bufs;
943         }
944         for (b = first; b && len; b = b->next) {
945                 b_idx = 0;
946                 if (offset) {
947                         if (offset < BHLEN(b)) {
948                                 /* off is in the main body */
949                                 len -= point_to_body(b, b->rp + offset, newb, newb_idx, len);
950                                 newb_idx++;
951                         } else {
952                                 /* off is in one of the buffers (or just past the last one).
953                                  * we're not going to point to b's main body at all. */
954                                 offset -= BHLEN(b);
955                                 assert(b->extra_data);
956                                 /* assuming these extrabufs are packed, or at least that len
957                                  * isn't gibberish */
958                                 while (b->extra_data[b_idx].len <= offset) {
959                                         offset -= b->extra_data[b_idx].len;
960                                         b_idx++;
961                                 }
962                                 /* now offset is set to our offset in the b_idx'th buf */
963                                 len -= point_to_buf(b, b_idx, offset, newb, newb_idx, len);
964                                 newb_idx++;
965                                 b_idx++;
966                         }
967                         offset = 0;
968                 } else {
969                         len -= point_to_body(b, b->rp, newb, newb_idx, len);
970                         newb_idx++;
971                 }
972                 /* knock out all remaining bufs.  we only did one point_to_ op by now,
973                  * and any point_to_ could be our last if it consumed all of len. */
974                 for (int i = b_idx; (i < b->nr_extra_bufs) && len; i++) {
975                         len -= point_to_buf(b, i, 0, newb, newb_idx, len);
976                         newb_idx++;
977                 }
978         }
979         return 0;
980 }
981
982 struct block *blist_clone(struct block *blist, int header_len, int len,
983                           uint32_t offset)
984 {
985         int ret;
986         struct block *newb = allocb(header_len);
987         do {
988                 ret = __blist_clone_to(blist, newb, len, offset);
989                 if (ret)
990                         block_add_extd(newb, ret, KMALLOC_WAIT);
991         } while (ret);
992         return newb;
993 }
994
995 /* given a queue, makes a single block with header_len reserved space in the
996  * block main body, and the contents of [offset, len + offset) pointed to in the
997  * new blocks ext_data. */
998 struct block *qclone(struct queue *q, int header_len, int len, uint32_t offset)
999 {
1000         int ret;
1001         struct block *newb = allocb(header_len);
1002         /* the while loop should rarely be used: it would require someone
1003          * concurrently adding to the queue. */
1004         do {
1005                 /* TODO: RCU: protecting the q list (b->next) (need read lock) */
1006                 spin_lock_irqsave(&q->lock);
1007                 ret = __blist_clone_to(q->bfirst, newb, len, offset);
1008                 spin_unlock_irqsave(&q->lock);
1009                 if (ret)
1010                         block_add_extd(newb, ret, KMALLOC_WAIT);
1011         } while (ret);
1012         return newb;
1013 }
1014
1015 /*
1016  *  copy from offset in the queue
1017  */
1018 struct block *qcopy_old(struct queue *q, int len, uint32_t offset)
1019 {
1020         int sofar;
1021         int n;
1022         struct block *b, *nb;
1023         uint8_t *p;
1024
1025         nb = allocb(len);
1026
1027         spin_lock_irqsave(&q->lock);
1028
1029         /* go to offset */
1030         b = q->bfirst;
1031         for (sofar = 0;; sofar += n) {
1032                 if (b == NULL) {
1033                         spin_unlock_irqsave(&q->lock);
1034                         return nb;
1035                 }
1036                 n = BLEN(b);
1037                 if (sofar + n > offset) {
1038                         p = b->rp + offset - sofar;
1039                         n -= offset - sofar;
1040                         break;
1041                 }
1042                 QDEBUG checkb(b, "qcopy");
1043                 b = b->next;
1044         }
1045
1046         /* copy bytes from there */
1047         for (sofar = 0; sofar < len;) {
1048                 if (n > len - sofar)
1049                         n = len - sofar;
1050                 WARN_EXTRA(b);
1051                 memmove(nb->wp, p, n);
1052                 qcopycnt += n;
1053                 sofar += n;
1054                 nb->wp += n;
1055                 b = b->next;
1056                 if (b == NULL)
1057                         break;
1058                 n = BLEN(b);
1059                 p = b->rp;
1060         }
1061         spin_unlock_irqsave(&q->lock);
1062
1063         return nb;
1064 }
1065
1066 struct block *qcopy(struct queue *q, int len, uint32_t offset)
1067 {
1068 #ifdef CONFIG_BLOCK_EXTRAS
1069         return qclone(q, 0, len, offset);
1070 #else
1071         return qcopy_old(q, len, offset);
1072 #endif
1073 }
1074
1075 static void qinit_common(struct queue *q)
1076 {
1077         spinlock_init_irqsave(&q->lock);
1078         qlock_init(&q->rlock);
1079         qlock_init(&q->wlock);
1080         rendez_init(&q->rr);
1081         rendez_init(&q->wr);
1082 }
1083
1084 /*
1085  *  called by non-interrupt code
1086  */
1087 struct queue *qopen(int limit, int msg, void (*kick) (void *), void *arg)
1088 {
1089         struct queue *q;
1090
1091         q = kzmalloc(sizeof(struct queue), 0);
1092         if (q == 0)
1093                 return 0;
1094         qinit_common(q);
1095
1096         q->limit = q->inilim = limit;
1097         q->kick = kick;
1098         q->arg = arg;
1099         q->state = msg;
1100         q->state |= Qstarve;
1101         q->eof = 0;
1102         q->noblock = 0;
1103
1104         return q;
1105 }
1106
1107 /* open a queue to be bypassed */
1108 struct queue *qbypass(void (*bypass) (void *, struct block *), void *arg)
1109 {
1110         struct queue *q;
1111
1112         q = kzmalloc(sizeof(struct queue), 0);
1113         if (q == 0)
1114                 return 0;
1115         qinit_common(q);
1116
1117         q->limit = 0;
1118         q->arg = arg;
1119         q->bypass = bypass;
1120         q->state = 0;
1121
1122         return q;
1123 }
1124
1125 static int notempty(void *a)
1126 {
1127         struct queue *q = a;
1128
1129         return (q->state & Qclosed) || q->bfirst != 0;
1130 }
1131
1132 /* wait for the queue to be non-empty or closed.
1133  *
1134  * called with q ilocked.  rendez may error out, back through the caller, with
1135  * the irqsave lock unlocked.  */
1136 static int qwait(struct queue *q)
1137 {
1138         /* wait for data */
1139         for (;;) {
1140                 if (q->bfirst != NULL)
1141                         break;
1142
1143                 if (q->state & Qclosed) {
1144                         if (++q->eof > 3)
1145                                 return -1;
1146                         if (*q->err && strcmp(q->err, Ehungup) != 0)
1147                                 return -1;
1148                         return 0;
1149                 }
1150
1151                 q->state |= Qstarve;    /* flag requesting producer to wake me */
1152                 spin_unlock_irqsave(&q->lock);
1153                 /* may throw an error() */
1154                 rendez_sleep(&q->rr, notempty, q);
1155                 spin_lock_irqsave(&q->lock);
1156         }
1157         return 1;
1158 }
1159
1160 /*
1161  * add a block list to a queue
1162  */
1163 void qaddlist(struct queue *q, struct block *b)
1164 {
1165         /* TODO: q lock? */
1166         /* queue the block */
1167         if (q->bfirst)
1168                 q->blast->next = b;
1169         else
1170                 q->bfirst = b;
1171         q->len += blockalloclen(b);
1172         q->dlen += blocklen(b);
1173         while (b->next)
1174                 b = b->next;
1175         q->blast = b;
1176 }
1177
1178 /*
1179  *  called with q ilocked
1180  */
1181 struct block *qremove(struct queue *q)
1182 {
1183         struct block *b;
1184
1185         b = q->bfirst;
1186         if (b == NULL)
1187                 return NULL;
1188         q->bfirst = b->next;
1189         b->next = NULL;
1190         q->dlen -= BLEN(b);
1191         q->len -= BALLOC(b);
1192         QDEBUG checkb(b, "qremove");
1193         b = linearizeblock(b);
1194         return b;
1195 }
1196
1197 static size_t read_from_block(struct block *b, uint8_t *to, size_t amt)
1198 {
1199         size_t copy_amt, retval = 0;
1200         struct extra_bdata *ebd;
1201         
1202         copy_amt = MIN(BHLEN(b), amt);
1203         memcpy(to, b->rp, copy_amt);
1204         /* advance the rp, since this block not be completely consumed and future
1205          * reads need to know where to pick up from */
1206         b->rp += copy_amt;
1207         to += copy_amt;
1208         amt -= copy_amt;
1209         retval += copy_amt;
1210         for (int i = 0; (i < b->nr_extra_bufs) && amt; i++) {
1211                 ebd = &b->extra_data[i];
1212                 /* skip empty entires.  if we track this in the struct block, we can
1213                  * just start the for loop early */
1214                 if (!ebd->base || !ebd->len)
1215                         continue;
1216                 copy_amt = MIN(ebd->len, amt);
1217                 memcpy(to, (void*)(ebd->base + ebd->off), copy_amt);
1218                 /* we're actually consuming the entries, just like how we advance rp up
1219                  * above, and might only consume part of one. */
1220                 ebd->len -= copy_amt;
1221                 ebd->off += copy_amt;
1222                 b->extra_len -= copy_amt;
1223                 if (!ebd->len) {
1224                         /* we don't actually have to decref here.  it's also done in
1225                          * freeb().  this is the earliest we can free. */
1226                         kfree((void*)ebd->base);
1227                         ebd->base = ebd->off = 0;
1228                 }
1229                 to += copy_amt;
1230                 amt -= copy_amt;
1231                 retval += copy_amt;
1232         }
1233         return retval;
1234 }
1235
1236 /*
1237  *  copy the contents of a string of blocks into
1238  *  memory.  emptied blocks are freed.  return
1239  *  pointer to first unconsumed block.
1240  */
1241 struct block *bl2mem(uint8_t * p, struct block *b, int n)
1242 {
1243         int i;
1244         struct block *next;
1245
1246         /* could be slicker here, since read_from_block is smart */
1247         for (; b != NULL; b = next) {
1248                 i = BLEN(b);
1249                 if (i > n) {
1250                         /* partial block, consume some */
1251                         read_from_block(b, p, n);
1252                         return b;
1253                 }
1254                 /* full block, consume all and move on */
1255                 i = read_from_block(b, p, i);
1256                 n -= i;
1257                 p += i;
1258                 next = b->next;
1259                 freeb(b);
1260         }
1261         return NULL;
1262 }
1263
1264 /*
1265  *  copy the contents of memory into a string of blocks.
1266  *  return NULL on error.
1267  */
1268 struct block *mem2bl(uint8_t * p, int len)
1269 {
1270         ERRSTACK(1);
1271         int n;
1272         struct block *b, *first, **l;
1273
1274         first = NULL;
1275         l = &first;
1276         if (waserror()) {
1277                 freeblist(first);
1278                 nexterror();
1279         }
1280         do {
1281                 n = len;
1282                 if (n > Maxatomic)
1283                         n = Maxatomic;
1284
1285                 *l = b = allocb(n);
1286                 /* TODO consider extra_data */
1287                 memmove(b->wp, p, n);
1288                 b->wp += n;
1289                 p += n;
1290                 len -= n;
1291                 l = &b->next;
1292         } while (len > 0);
1293         poperror();
1294
1295         return first;
1296 }
1297
1298 /*
1299  *  put a block back to the front of the queue
1300  *  called with q ilocked
1301  */
1302 void qputback(struct queue *q, struct block *b)
1303 {
1304         b->next = q->bfirst;
1305         if (q->bfirst == NULL)
1306                 q->blast = b;
1307         q->bfirst = b;
1308         q->len += BALLOC(b);
1309         q->dlen += BLEN(b);
1310 }
1311
1312 /*
1313  *  flow control, get producer going again
1314  *  called with q ilocked
1315  */
1316 static void qwakeup_iunlock(struct queue *q)
1317 {
1318         int dowakeup = 0;
1319
1320         /* if writer flow controlled, restart */
1321         if ((q->state & Qflow) && q->len < q->limit / 2) {
1322                 q->state &= ~Qflow;
1323                 dowakeup = 1;
1324         }
1325
1326         spin_unlock_irqsave(&q->lock);
1327
1328         /* wakeup flow controlled writers */
1329         if (dowakeup) {
1330                 if (q->kick)
1331                         q->kick(q->arg);
1332                 rendez_wakeup(&q->wr);
1333         }
1334 }
1335
1336 /*
1337  *  get next block from a queue (up to a limit)
1338  */
1339 struct block *qbread(struct queue *q, int len)
1340 {
1341         ERRSTACK(1);
1342         struct block *b, *nb;
1343         int n;
1344
1345         qlock(&q->rlock);
1346         if (waserror()) {
1347                 qunlock(&q->rlock);
1348                 nexterror();
1349         }
1350
1351         spin_lock_irqsave(&q->lock);
1352         switch (qwait(q)) {
1353                 case 0:
1354                         /* queue closed */
1355                         spin_unlock_irqsave(&q->lock);
1356                         qunlock(&q->rlock);
1357                         poperror();
1358                         return NULL;
1359                 case -1:
1360                         /* multiple reads on a closed queue */
1361                         spin_unlock_irqsave(&q->lock);
1362                         error(q->err);
1363         }
1364
1365         /* if we get here, there's at least one block in the queue */
1366         b = qremove(q);
1367         n = BLEN(b);
1368
1369         /* split block if it's too big and this is not a message queue */
1370         nb = b;
1371         if (n > len) {
1372                 WARN_EXTRA(b);
1373                 if ((q->state & Qmsg) == 0) {
1374                         n -= len;
1375                         b = allocb(n);
1376                         memmove(b->wp, nb->rp + len, n);
1377                         b->wp += n;
1378                         qputback(q, b);
1379                 }
1380                 nb->wp = nb->rp + len;
1381         }
1382
1383         /* restart producer */
1384         qwakeup_iunlock(q);
1385
1386         poperror();
1387         qunlock(&q->rlock);
1388         return nb;
1389 }
1390
1391 /*
1392  *  read a queue.  if no data is queued, post a struct block
1393  *  and wait on its Rendez.
1394  */
1395 long qread(struct queue *q, void *vp, int len)
1396 {
1397         ERRSTACK(1);
1398         struct block *b, *first, **l;
1399         int m, n;
1400
1401         qlock(&q->rlock);
1402         if (waserror()) {
1403                 qunlock(&q->rlock);
1404                 nexterror();
1405         }
1406
1407         spin_lock_irqsave(&q->lock);
1408 again:
1409         switch (qwait(q)) {
1410                 case 0:
1411                         /* queue closed */
1412                         spin_unlock_irqsave(&q->lock);
1413                         qunlock(&q->rlock);
1414                         poperror();
1415                         return 0;
1416                 case -1:
1417                         /* multiple reads on a closed queue */
1418                         spin_unlock_irqsave(&q->lock);
1419                         error(q->err);
1420         }
1421
1422         /* if we get here, there's at least one block in the queue */
1423         // TODO: Consider removing the Qcoalesce flag and force a coalescing
1424         // strategy by default.
1425         if (q->state & Qcoalesce) {
1426                 /* when coalescing, 0 length blocks just go away */
1427                 b = q->bfirst;
1428                 if (BLEN(b) <= 0) {
1429                         freeb(qremove(q));
1430                         goto again;
1431                 }
1432
1433                 /*  grab the first block plus as many
1434                  *  following blocks as will completely
1435                  *  fit in the read.
1436                  */
1437                 n = 0;
1438                 l = &first;
1439                 m = BLEN(b);
1440                 for (;;) {
1441                         *l = qremove(q);
1442                         l = &b->next;
1443                         n += m;
1444
1445                         b = q->bfirst;
1446                         if (b == NULL)
1447                                 break;
1448                         m = BLEN(b);
1449                         if (n + m > len)
1450                                 break;
1451                 }
1452         } else {
1453                 first = qremove(q);
1454                 n = BLEN(first);
1455         }
1456
1457         /* copy to user space outside of the ilock */
1458         spin_unlock_irqsave(&q->lock);
1459         b = bl2mem(vp, first, len);
1460         spin_lock_irqsave(&q->lock);
1461
1462         /* take care of any left over partial block */
1463         if (b != NULL) {
1464                 n -= BLEN(b);
1465                 if (q->state & Qmsg)
1466                         freeb(b);
1467                 else
1468                         qputback(q, b);
1469         }
1470
1471         /* restart producer */
1472         qwakeup_iunlock(q);
1473
1474         poperror();
1475         qunlock(&q->rlock);
1476         return n;
1477 }
1478
1479 static int qnotfull(void *a)
1480 {
1481         struct queue *q = a;
1482
1483         return q->len < q->limit || (q->state & Qclosed);
1484 }
1485
1486 uint32_t noblockcnt;
1487
1488 /*
1489  *  add a block to a queue obeying flow control
1490  */
1491 long qbwrite(struct queue *q, struct block *b)
1492 {
1493         ERRSTACK(1);
1494         int n, dowakeup;
1495         volatile bool should_free_b = TRUE;
1496
1497         n = BLEN(b);
1498
1499         if (q->bypass) {
1500                 (*q->bypass) (q->arg, b);
1501                 return n;
1502         }
1503
1504         dowakeup = 0;
1505         qlock(&q->wlock);
1506         if (waserror()) {
1507                 if (b != NULL && should_free_b)
1508                         freeb(b);
1509                 qunlock(&q->wlock);
1510                 nexterror();
1511         }
1512
1513         spin_lock_irqsave(&q->lock);
1514
1515         /* give up if the queue is closed */
1516         if (q->state & Qclosed) {
1517                 spin_unlock_irqsave(&q->lock);
1518                 error(q->err);
1519         }
1520
1521         /* if nonblocking, don't queue over the limit */
1522         if (q->len >= q->limit) {
1523                 if (q->noblock) {
1524                         spin_unlock_irqsave(&q->lock);
1525                         freeb(b);
1526                         noblockcnt += n;
1527                         qunlock(&q->wlock);
1528                         poperror();
1529                         return n;
1530                 }
1531         }
1532
1533         /* queue the block */
1534         should_free_b = FALSE;
1535         if (q->bfirst)
1536                 q->blast->next = b;
1537         else
1538                 q->bfirst = b;
1539         q->blast = b;
1540         b->next = 0;
1541         q->len += BALLOC(b);
1542         q->dlen += n;
1543         QDEBUG checkb(b, "qbwrite");
1544         b = NULL;
1545
1546         /* make sure other end gets awakened */
1547         if (q->state & Qstarve) {
1548                 q->state &= ~Qstarve;
1549                 dowakeup = 1;
1550         }
1551         spin_unlock_irqsave(&q->lock);
1552
1553         /*  get output going again */
1554         if (q->kick && (dowakeup || (q->state & Qkick)))
1555                 q->kick(q->arg);
1556
1557         /* wakeup anyone consuming at the other end */
1558         if (dowakeup)
1559                 rendez_wakeup(&q->rr);
1560
1561         /*
1562          *  flow control, wait for queue to get below the limit
1563          *  before allowing the process to continue and queue
1564          *  more.  We do this here so that postnote can only
1565          *  interrupt us after the data has been queued.  This
1566          *  means that things like 9p flushes and ssl messages
1567          *  will not be disrupted by software interrupts.
1568          *
1569          *  Note - this is moderately dangerous since a process
1570          *  that keeps getting interrupted and rewriting will
1571          *  queue infinite crud.
1572          */
1573         for (;;) {
1574                 if (q->noblock || qnotfull(q))
1575                         break;
1576
1577                 spin_lock_irqsave(&q->lock);
1578                 q->state |= Qflow;
1579                 spin_unlock_irqsave(&q->lock);
1580                 rendez_sleep(&q->wr, qnotfull, q);
1581         }
1582
1583         qunlock(&q->wlock);
1584         poperror();
1585         return n;
1586 }
1587
1588 long qibwrite(struct queue *q, struct block *b)
1589 {
1590         int n, dowakeup;
1591
1592         dowakeup = 0;
1593
1594         n = BLEN(b);
1595
1596         spin_lock_irqsave(&q->lock);
1597
1598         QDEBUG checkb(b, "qibwrite");
1599         if (q->bfirst)
1600                 q->blast->next = b;
1601         else
1602                 q->bfirst = b;
1603         q->blast = b;
1604         q->len += BALLOC(b);
1605         q->dlen += n;
1606
1607         if (q->state & Qstarve) {
1608                 q->state &= ~Qstarve;
1609                 dowakeup = 1;
1610         }
1611
1612         spin_unlock_irqsave(&q->lock);
1613
1614         if (dowakeup) {
1615                 if (q->kick)
1616                         q->kick(q->arg);
1617                 rendez_wakeup(&q->rr);
1618         }
1619
1620         return n;
1621 }
1622
1623 /*
1624  *  write to a queue.  only Maxatomic bytes at a time is atomic.
1625  */
1626 int qwrite(struct queue *q, void *vp, int len)
1627 {
1628         int n, sofar;
1629         struct block *b;
1630         uint8_t *p = vp;
1631         void *ext_buf;
1632
1633         QDEBUG if (!islo())
1634                  printd("qwrite hi %p\n", getcallerpc(&q));
1635
1636         sofar = 0;
1637         do {
1638                 n = len - sofar;
1639                 /* This is 64K, the max amount per single block.  Still a good value? */
1640                 if (n > Maxatomic)
1641                         n = Maxatomic;
1642
1643                 /* If n is small, we don't need to bother with the extra_data.  But
1644                  * until the whole stack can handle extd blocks, we'll use them
1645                  * unconditionally. */
1646 #ifdef CONFIG_BLOCK_EXTRAS
1647                 /* allocb builds in 128 bytes of header space to all blocks, but this is
1648                  * only available via padblock (to the left).  we also need some space
1649                  * for pullupblock for some basic headers (like icmp) that get written
1650                  * in directly */
1651                 b = allocb(64);
1652                 ext_buf = kmalloc(n, 0);
1653                 memcpy(ext_buf, p + sofar, n);
1654                 block_add_extd(b, 1, KMALLOC_WAIT); /* returns 0 on success */
1655                 b->extra_data[0].base = (uintptr_t)ext_buf;
1656                 b->extra_data[0].off = 0;
1657                 b->extra_data[0].len = n;
1658                 b->extra_len += n;
1659 #else
1660                 b = allocb(n);
1661                 memmove(b->wp, p + sofar, n);
1662                 b->wp += n;
1663 #endif
1664                         
1665                 qbwrite(q, b);
1666
1667                 sofar += n;
1668         } while (sofar < len && (q->state & Qmsg) == 0);
1669
1670         return len;
1671 }
1672
1673 /*
1674  *  used by print() to write to a queue.  Since we may be splhi or not in
1675  *  a process, don't qlock.
1676  */
1677 int qiwrite(struct queue *q, void *vp, int len)
1678 {
1679         int n, sofar, dowakeup;
1680         struct block *b;
1681         uint8_t *p = vp;
1682
1683         dowakeup = 0;
1684
1685         sofar = 0;
1686         do {
1687                 n = len - sofar;
1688                 if (n > Maxatomic)
1689                         n = Maxatomic;
1690
1691                 b = iallocb(n);
1692                 if (b == NULL)
1693                         break;
1694                 /* TODO consider extra_data */
1695                 memmove(b->wp, p + sofar, n);
1696                 /* this adjusts BLEN to be n, or at least it should */
1697                 b->wp += n;
1698                 assert(n == BLEN(b));
1699                 qibwrite(q, b);
1700
1701                 sofar += n;
1702         } while (sofar < len && (q->state & Qmsg) == 0);
1703
1704         return sofar;
1705 }
1706
1707 /*
1708  *  be extremely careful when calling this,
1709  *  as there is no reference accounting
1710  */
1711 void qfree(struct queue *q)
1712 {
1713         qclose(q);
1714         kfree(q);
1715 }
1716
1717 /*
1718  *  Mark a queue as closed.  No further IO is permitted.
1719  *  All blocks are released.
1720  */
1721 void qclose(struct queue *q)
1722 {
1723         struct block *bfirst;
1724
1725         if (q == NULL)
1726                 return;
1727
1728         /* mark it */
1729         spin_lock_irqsave(&q->lock);
1730         q->state |= Qclosed;
1731         q->state &= ~(Qflow | Qstarve);
1732         strncpy(q->err, Ehungup, sizeof(q->err));
1733         bfirst = q->bfirst;
1734         q->bfirst = 0;
1735         q->len = 0;
1736         q->dlen = 0;
1737         q->noblock = 0;
1738         spin_unlock_irqsave(&q->lock);
1739
1740         /* free queued blocks */
1741         freeblist(bfirst);
1742
1743         /* wake up readers/writers */
1744         rendez_wakeup(&q->rr);
1745         rendez_wakeup(&q->wr);
1746 }
1747
1748 /*
1749  *  Mark a queue as closed.  Wakeup any readers.  Don't remove queued
1750  *  blocks.
1751  */
1752 void qhangup(struct queue *q, char *msg)
1753 {
1754         /* mark it */
1755         spin_lock_irqsave(&q->lock);
1756         q->state |= Qclosed;
1757         if (msg == 0 || *msg == 0)
1758                 strncpy(q->err, Ehungup, sizeof(q->err));
1759         else
1760                 strncpy(q->err, msg, ERRMAX - 1);
1761         spin_unlock_irqsave(&q->lock);
1762
1763         /* wake up readers/writers */
1764         rendez_wakeup(&q->rr);
1765         rendez_wakeup(&q->wr);
1766 }
1767
1768 /*
1769  *  return non-zero if the q is hungup
1770  */
1771 int qisclosed(struct queue *q)
1772 {
1773         return q->state & Qclosed;
1774 }
1775
1776 /*
1777  *  mark a queue as no longer hung up
1778  */
1779 void qreopen(struct queue *q)
1780 {
1781         spin_lock_irqsave(&q->lock);
1782         q->state &= ~Qclosed;
1783         q->state |= Qstarve;
1784         q->eof = 0;
1785         q->limit = q->inilim;
1786         spin_unlock_irqsave(&q->lock);
1787 }
1788
1789 /*
1790  *  return bytes queued
1791  */
1792 int qlen(struct queue *q)
1793 {
1794         return q->dlen;
1795 }
1796
1797 /*
1798  * return space remaining before flow control
1799  */
1800 int qwindow(struct queue *q)
1801 {
1802         int l;
1803
1804         l = q->limit - q->len;
1805         if (l < 0)
1806                 l = 0;
1807         return l;
1808 }
1809
1810 /*
1811  *  return true if we can read without blocking
1812  */
1813 int qcanread(struct queue *q)
1814 {
1815         return q->bfirst != 0;
1816 }
1817
1818 /*
1819  *  change queue limit
1820  */
1821 void qsetlimit(struct queue *q, int limit)
1822 {
1823         q->limit = limit;
1824 }
1825
1826 /*
1827  *  set blocking/nonblocking
1828  */
1829 void qnoblock(struct queue *q, int onoff)
1830 {
1831         q->noblock = onoff;
1832 }
1833
1834 /*
1835  *  flush the output queue
1836  */
1837 void qflush(struct queue *q)
1838 {
1839         struct block *bfirst;
1840
1841         /* mark it */
1842         spin_lock_irqsave(&q->lock);
1843         bfirst = q->bfirst;
1844         q->bfirst = 0;
1845         q->len = 0;
1846         q->dlen = 0;
1847         spin_unlock_irqsave(&q->lock);
1848
1849         /* free queued blocks */
1850         freeblist(bfirst);
1851
1852         /* wake up readers/writers */
1853         rendez_wakeup(&q->wr);
1854 }
1855
1856 int qfull(struct queue *q)
1857 {
1858         return q->state & Qflow;
1859 }
1860
1861 int qstate(struct queue *q)
1862 {
1863         return q->state;
1864 }
1865
1866 void qdump(struct queue *q)
1867 {
1868         if (q)
1869                 printk("q=%p bfirst=%p blast=%p len=%d dlen=%d limit=%d state=#%x\n",
1870                            q, q->bfirst, q->blast, q->len, q->dlen, q->limit, q->state);
1871 }