Be a bit more informative when we panic trying to pull up.
[akaros.git] / kern / src / ns / qio.c
1 // INFERNO
2 #include <vfs.h>
3 #include <kfs.h>
4 #include <slab.h>
5 #include <kmalloc.h>
6 #include <kref.h>
7 #include <string.h>
8 #include <stdio.h>
9 #include <assert.h>
10 #include <error.h>
11 #include <cpio.h>
12 #include <pmap.h>
13 #include <smp.h>
14 #include <ip.h>
15
16 #define PANIC_EXTRA(b)                                                          \
17 {                                                                              \
18         if ((b)->extra_len)                                                        \
19                 panic("%s doesn't handle extra_data", __FUNCTION__);               \
20 }
21
22 static uint32_t padblockcnt;
23 static uint32_t concatblockcnt;
24 static uint32_t pullupblockcnt;
25 static uint32_t copyblockcnt;
26 static uint32_t consumecnt;
27 static uint32_t producecnt;
28 static uint32_t qcopycnt;
29
30 static int debugging;
31
32 #define QDEBUG  if(0)
33
34 /*
35  *  IO queues
36  */
37
38 struct queue {
39         spinlock_t lock;;
40
41         struct block *bfirst;           /* buffer */
42         struct block *blast;
43
44         int len;                                        /* bytes allocated to queue */
45         int dlen;                                       /* data bytes in queue */
46         int limit;                                      /* max bytes in queue */
47         int inilim;                             /* initial limit */
48         int state;
49         int noblock;                            /* true if writes return immediately when q full */
50         int eof;                                        /* number of eofs read by user */
51
52         void (*kick) (void *);          /* restart output */
53         void (*bypass) (void *, struct block *);        /* bypass queue altogether */
54         void *arg;                                      /* argument to kick */
55
56         qlock_t rlock;                          /* mutex for reading processes */
57         struct rendez rr;                       /* process waiting to read */
58         qlock_t wlock;                          /* mutex for writing processes */
59         struct rendez wr;                       /* process waiting to write */
60
61         char err[ERRMAX];
62 };
63
64 enum {
65         Maxatomic = 64 * 1024,
66 };
67
68 unsigned int qiomaxatomic = Maxatomic;
69
70 void ixsummary(void)
71 {
72         debugging ^= 1;
73         iallocsummary();
74         printd("pad %lu, concat %lu, pullup %lu, copy %lu\n",
75                    padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
76         printd("consume %lu, produce %lu, qcopy %lu\n",
77                    consumecnt, producecnt, qcopycnt);
78 }
79
80 /*
81  *  free a list of blocks
82  */
83 void freeblist(struct block *b)
84 {
85         struct block *next;
86
87         for (; b != 0; b = next) {
88                 next = b->next;
89                 b->next = 0;
90                 freeb(b);
91         }
92 }
93
94 /*
95  *  pad a block to the front (or the back if size is negative)
96  */
97 struct block *padblock(struct block *bp, int size)
98 {
99         int n;
100         struct block *nbp;
101         uint8_t bcksum = bp->flag & BCKSUM_FLAGS;
102         uint16_t checksum_start = bp->checksum_start;
103         uint16_t checksum_offset = bp->checksum_offset;
104         uint16_t mss = bp->mss;
105
106         QDEBUG checkb(bp, "padblock 1");
107         if (size >= 0) {
108                 if (bp->rp - bp->base >= size) {
109                         bp->checksum_start += size;
110                         bp->rp -= size;
111                         return bp;
112                 }
113
114                 PANIC_EXTRA(bp);
115                 if (bp->next)
116                         panic("padblock %p", getcallerpc(&bp));
117                 n = BLEN(bp);
118                 padblockcnt++;
119                 nbp = allocb(size + n);
120                 nbp->rp += size;
121                 nbp->wp = nbp->rp;
122                 memmove(nbp->wp, bp->rp, n);
123                 nbp->wp += n;
124                 freeb(bp);
125                 nbp->rp -= size;
126         } else {
127                 size = -size;
128
129                 PANIC_EXTRA(bp);
130
131                 if (bp->next)
132                         panic("padblock %p", getcallerpc(&bp));
133
134                 if (bp->lim - bp->wp >= size)
135                         return bp;
136
137                 n = BLEN(bp);
138                 padblockcnt++;
139                 nbp = allocb(size + n);
140                 memmove(nbp->wp, bp->rp, n);
141                 nbp->wp += n;
142                 freeb(bp);
143         }
144         if (bcksum) {
145                 nbp->flag |= bcksum;
146                 nbp->checksum_start = checksum_start;
147                 nbp->checksum_offset = checksum_offset;
148                 nbp->mss = mss;
149         }
150         QDEBUG checkb(nbp, "padblock 1");
151         return nbp;
152 }
153
154 /*
155  *  return count of bytes in a string of blocks
156  */
157 int blocklen(struct block *bp)
158 {
159         int len;
160
161         len = 0;
162         while (bp) {
163                 len += BLEN(bp);
164                 bp = bp->next;
165         }
166         return len;
167 }
168
169 /*
170  * return count of space in blocks
171  */
172 int blockalloclen(struct block *bp)
173 {
174         int len;
175
176         len = 0;
177         while (bp) {
178                 len += BALLOC(bp);
179                 bp = bp->next;
180         }
181         return len;
182 }
183
184 /*
185  *  copy the  string of blocks into
186  *  a single block and free the string
187  */
188 struct block *concatblock(struct block *bp)
189 {
190         int len;
191         struct block *nb, *f;
192
193         if (bp->next == 0)
194                 return bp;
195
196         /* probably use parts of qclone */
197         PANIC_EXTRA(bp);
198         nb = allocb(blocklen(bp));
199         for (f = bp; f; f = f->next) {
200                 len = BLEN(f);
201                 memmove(nb->wp, f->rp, len);
202                 nb->wp += len;
203         }
204         concatblockcnt += BLEN(nb);
205         freeblist(bp);
206         QDEBUG checkb(nb, "concatblock 1");
207         return nb;
208 }
209
210 /* Returns a block with the remaining contents of b all in the main body of the
211  * returned block.  Replace old references to b with the returned value (which
212  * may still be 'b', if no change was needed. */
213 struct block *linearizeblock(struct block *b)
214 {
215         struct block *newb;
216         size_t len;
217         struct extra_bdata *ebd;
218
219         if (!b->extra_len)
220                 return b;
221
222         newb = allocb(BLEN(b));
223         len = BHLEN(b);
224         memcpy(newb->wp, b->rp, len);
225         newb->wp += len;
226         len = b->extra_len;
227         for (int i = 0; (i < b->nr_extra_bufs) && len; i++) {
228                 ebd = &b->extra_data[i];
229                 if (!ebd->base || !ebd->len)
230                         continue;
231                 memcpy(newb->wp, (void*)(ebd->base + ebd->off), ebd->len);
232                 newb->wp += ebd->len;
233                 len -= ebd->len;
234         }
235         /* TODO: any other flags that need copied over? */
236         if (b->flag & BCKSUM_FLAGS) {
237                 newb->flag |= (b->flag & BCKSUM_FLAGS);
238                 newb->checksum_start = b->checksum_start;
239                 newb->checksum_offset = b->checksum_offset;
240                 newb->mss = b->mss;
241         }
242         freeb(b);
243         return newb;
244 }
245
246 /*
247  *  make sure the first block has at least n bytes in its main body
248  */
249 struct block *pullupblock(struct block *bp, int n)
250 {
251         int i, len, seglen;
252         struct block *nbp;
253         struct extra_bdata *ebd;
254
255         /*
256          *  this should almost always be true, it's
257          *  just to avoid every caller checking.
258          */
259         if (BHLEN(bp) >= n)
260                 return bp;
261
262          /* a start at explicit main-body / header management */
263         if (bp->extra_len) {
264                 if (n > bp->lim - bp->rp) {
265                         /* would need to realloc a new block and copy everything over. */
266                         panic("can't pullup %d bytes, no place to put it: bp->lim %p, bp->rp %p, bp->lim-bp->rp %d\n",
267                                         n, bp->lim, bp->rp, bp->lim-bp->rp);
268                 }
269                 len = n - BHLEN(bp);
270                 if (len > bp->extra_len)
271                         panic("pullup more than extra (%d, %d, %d)\n",
272                               n, BHLEN(bp), bp->extra_len);
273                 checkb(bp, "before pullup");
274                 for (int i = 0; (i < bp->nr_extra_bufs) && len; i++) {
275                         ebd = &bp->extra_data[i];
276                         if (!ebd->base || !ebd->len)
277                                 continue;
278                         seglen = MIN(ebd->len, len);
279                         memcpy(bp->wp, (void*)(ebd->base + ebd->off), seglen);
280                         bp->wp += seglen;
281                         len -= seglen;
282                         ebd->len -= seglen;
283                         ebd->off += seglen;
284                         bp->extra_len -= seglen;
285                         if (ebd->len == 0) {
286                                 kfree((void *)ebd->base);
287                                 ebd->off = 0;
288                                 ebd->base = 0;
289                         }
290                 }
291                 /* maybe just call pullupblock recursively here */
292                 if (len)
293                         panic("pullup %d bytes overdrawn\n", len);
294                 checkb(bp, "after pullup");
295                 return bp;
296         }
297
298         /*
299          *  if not enough room in the first block,
300          *  add another to the front of the list.
301          */
302         if (bp->lim - bp->rp < n) {
303                 nbp = allocb(n);
304                 nbp->next = bp;
305                 bp = nbp;
306         }
307
308         /*
309          *  copy bytes from the trailing blocks into the first
310          */
311         n -= BLEN(bp);
312         while ((nbp = bp->next)) {
313                 i = BLEN(nbp);
314                 if (i > n) {
315                         memmove(bp->wp, nbp->rp, n);
316                         pullupblockcnt++;
317                         bp->wp += n;
318                         nbp->rp += n;
319                         QDEBUG checkb(bp, "pullupblock 1");
320                         return bp;
321                 } else {
322                         memmove(bp->wp, nbp->rp, i);
323                         pullupblockcnt++;
324                         bp->wp += i;
325                         bp->next = nbp->next;
326                         nbp->next = 0;
327                         freeb(nbp);
328                         n -= i;
329                         if (n == 0) {
330                                 QDEBUG checkb(bp, "pullupblock 2");
331                                 return bp;
332                         }
333                 }
334         }
335         freeb(bp);
336         return 0;
337 }
338
339 /*
340  *  make sure the first block has at least n bytes in its main body
341  */
342 struct block *pullupqueue(struct queue *q, int n)
343 {
344         struct block *b;
345
346         /* TODO: lock to protect the queue links? */
347         if ((BHLEN(q->bfirst) >= n))
348                 return q->bfirst;
349         q->bfirst = pullupblock(q->bfirst, n);
350         for (b = q->bfirst; b != NULL && b->next != NULL; b = b->next) ;
351         q->blast = b;
352         return q->bfirst;
353 }
354
355 /* throw away count bytes from the front of
356  * block's extradata.  Returns count of bytes
357  * thrown away
358  */
359
360 static int pullext(struct block *bp, int count)
361 {
362         struct extra_bdata *ed;
363         int i, rem, bytes = 0;
364
365         for (i = 0; bp->extra_len && count && i < bp->nr_extra_bufs; i++) {
366                 ed = &bp->extra_data[i];
367                 rem = MIN(count, ed->len);
368                 bp->extra_len -= rem;
369                 count -= rem;
370                 bytes += rem;
371                 ed->off += rem;
372                 ed->len -= rem;
373                 if (ed->len == 0) {
374                         kfree((void *)ed->base);
375                         ed->base = 0;
376                         ed->off = 0;
377                 }
378         }
379         return bytes;
380 }
381
382 /* throw away count bytes from the end of a
383  * block's extradata.  Returns count of bytes
384  * thrown away
385  */
386
387 static int dropext(struct block *bp, int count)
388 {
389         struct extra_bdata *ed;
390         int i, rem, bytes = 0;
391
392         for (i = bp->nr_extra_bufs - 1; bp->extra_len && count && i >= 0; i--) {
393                 ed = &bp->extra_data[i];
394                 rem = MIN(count, ed->len);
395                 bp->extra_len -= rem;
396                 count -= rem;
397                 bytes += rem;
398                 ed->len -= rem;
399                 if (ed->len == 0) {
400                         kfree((void *)ed->base);
401                         ed->base = 0;
402                         ed->off = 0;
403                 }
404         }
405         return bytes;
406 }
407
408 /*
409  *  throw away up to count bytes from a
410  *  list of blocks.  Return count of bytes
411  *  thrown away.
412  */
413 static int _pullblock(struct block **bph, int count, int free)
414 {
415         struct block *bp;
416         int n, bytes;
417
418         bytes = 0;
419         if (bph == NULL)
420                 return 0;
421
422         while (*bph != NULL && count != 0) {
423                 bp = *bph;
424
425                 n = MIN(BHLEN(bp), count);
426                 bytes += n;
427                 count -= n;
428                 bp->rp += n;
429                 n = pullext(bp, count);
430                 bytes += n;
431                 count -= n;
432                 QDEBUG checkb(bp, "pullblock ");
433                 if (BLEN(bp) == 0 && (free || count)) {
434                         *bph = bp->next;
435                         bp->next = NULL;
436                         freeb(bp);
437                 }
438         }
439         return bytes;
440 }
441
442 int pullblock(struct block **bph, int count)
443 {
444         return _pullblock(bph, count, 1);
445 }
446
447 /*
448  *  trim to len bytes starting at offset
449  */
450 struct block *trimblock(struct block *bp, int offset, int len)
451 {
452         uint32_t l, trim;
453         int olen = len;
454
455         QDEBUG checkb(bp, "trimblock 1");
456         if (blocklen(bp) < offset + len) {
457                 freeblist(bp);
458                 return NULL;
459         }
460
461         l =_pullblock(&bp, offset, 0);
462         if (bp == NULL)
463                 return NULL;
464         if (l != offset) {
465                 freeblist(bp);
466                 return NULL;
467         }
468
469         while ((l = BLEN(bp)) < len) {
470                 len -= l;
471                 bp = bp->next;
472         }
473
474         trim = BLEN(bp) - len;
475         trim -= dropext(bp, trim);
476         bp->wp -= trim;
477
478         if (bp->next) {
479                 freeblist(bp->next);
480                 bp->next = NULL;
481         }
482         return bp;
483 }
484
485 /*
486  *  copy 'count' bytes into a new block
487  */
488 struct block *copyblock(struct block *bp, int count)
489 {
490         int l;
491         struct block *nbp;
492
493         QDEBUG checkb(bp, "copyblock 0");
494         nbp = allocb(count);
495         if (bp->flag & BCKSUM_FLAGS) {
496                 nbp->flag |= (bp->flag & BCKSUM_FLAGS);
497                 nbp->checksum_start = bp->checksum_start;
498                 nbp->checksum_offset = bp->checksum_offset;
499                 nbp->mss = bp->mss;
500         }
501         PANIC_EXTRA(bp);
502         for (; count > 0 && bp != 0; bp = bp->next) {
503                 l = BLEN(bp);
504                 if (l > count)
505                         l = count;
506                 memmove(nbp->wp, bp->rp, l);
507                 nbp->wp += l;
508                 count -= l;
509         }
510         if (count > 0) {
511                 memset(nbp->wp, 0, count);
512                 nbp->wp += count;
513         }
514         copyblockcnt++;
515         QDEBUG checkb(nbp, "copyblock 1");
516
517         return nbp;
518 }
519
520 struct block *adjustblock(struct block *bp, int len)
521 {
522         int n;
523         struct block *nbp;
524
525         if (len < 0) {
526                 freeb(bp);
527                 return NULL;
528         }
529
530         PANIC_EXTRA(bp);
531         if (bp->rp + len > bp->lim) {
532                 nbp = copyblock(bp, len);
533                 freeblist(bp);
534                 QDEBUG checkb(nbp, "adjustblock 1");
535
536                 return nbp;
537         }
538
539         n = BLEN(bp);
540         if (len > n)
541                 memset(bp->wp, 0, len - n);
542         bp->wp = bp->rp + len;
543         QDEBUG checkb(bp, "adjustblock 2");
544
545         return bp;
546 }
547
548
549 /*
550  *  get next block from a queue, return null if nothing there
551  */
552 struct block *qget(struct queue *q)
553 {
554         int dowakeup;
555         struct block *b;
556
557         /* sync with qwrite */
558         spin_lock_irqsave(&q->lock);
559
560         b = q->bfirst;
561         if (b == NULL) {
562                 q->state |= Qstarve;
563                 spin_unlock_irqsave(&q->lock);
564                 return NULL;
565         }
566         q->bfirst = b->next;
567         b->next = 0;
568         q->len -= BALLOC(b);
569         q->dlen -= BLEN(b);
570         QDEBUG checkb(b, "qget");
571
572         /* if writer flow controlled, restart */
573         if ((q->state & Qflow) && q->len < q->limit / 2) {
574                 q->state &= ~Qflow;
575                 dowakeup = 1;
576         } else
577                 dowakeup = 0;
578
579         spin_unlock_irqsave(&q->lock);
580
581         if (dowakeup)
582                 rendez_wakeup(&q->wr);
583
584         return b;
585 }
586
587 /*
588  *  throw away the next 'len' bytes in the queue
589  * returning the number actually discarded
590  */
591 int qdiscard(struct queue *q, int len)
592 {
593         struct block *b;
594         int dowakeup, n, sofar, body_amt, extra_amt;
595         struct extra_bdata *ebd;
596
597         spin_lock_irqsave(&q->lock);
598         for (sofar = 0; sofar < len; sofar += n) {
599                 b = q->bfirst;
600                 if (b == NULL)
601                         break;
602                 QDEBUG checkb(b, "qdiscard");
603                 n = BLEN(b);
604                 if (n <= len - sofar) {
605                         q->bfirst = b->next;
606                         b->next = 0;
607                         q->len -= BALLOC(b);
608                         q->dlen -= BLEN(b);
609                         freeb(b);
610                 } else {
611                         n = len - sofar;
612                         q->dlen -= n;
613                         /* partial block removal */
614                         body_amt = MIN(BHLEN(b), n);
615                         b->rp += body_amt;
616                         extra_amt = n - body_amt;
617                         /* reduce q->len by the amount we remove from the extras.  The
618                          * header will always be accounted for above, during block removal.
619                          * */
620                         q->len -= extra_amt;
621                         for (int i = 0; (i < b->nr_extra_bufs) && extra_amt; i++) {
622                                 ebd = &b->extra_data[i];
623                                 if (!ebd->base || !ebd->len)
624                                         continue;
625                                 if (extra_amt >= ebd->len) {
626                                         /* remove the entire entry, note the kfree release */
627                                         b->extra_len -= ebd->len;
628                                         extra_amt -= ebd->len;
629                                         kfree((void*)ebd->base);
630                                         ebd->base = ebd->off = ebd->len = 0;
631                                         continue;
632                                 }
633                                 ebd->off += extra_amt;
634                                 ebd->len -= extra_amt;
635                                 b->extra_len -= extra_amt;
636                                 extra_amt = 0;
637                         }
638                 }
639         }
640
641         /*
642          *  if writer flow controlled, restart
643          *
644          *  This used to be
645          *  q->len < q->limit/2
646          *  but it slows down tcp too much for certain write sizes.
647          *  I really don't understand it completely.  It may be
648          *  due to the queue draining so fast that the transmission
649          *  stalls waiting for the app to produce more data.  - presotto
650          */
651         if ((q->state & Qflow) && q->len < q->limit) {
652                 q->state &= ~Qflow;
653                 dowakeup = 1;
654         } else
655                 dowakeup = 0;
656
657         spin_unlock_irqsave(&q->lock);
658
659         if (dowakeup)
660                 rendez_wakeup(&q->wr);
661
662         return sofar;
663 }
664
665 /*
666  *  Interrupt level copy out of a queue, return # bytes copied.
667  */
668 int qconsume(struct queue *q, void *vp, int len)
669 {
670         struct block *b;
671         int n, dowakeup;
672         uint8_t *p = vp;
673         struct block *tofree = NULL;
674
675         /* sync with qwrite */
676         spin_lock_irqsave(&q->lock);
677
678         for (;;) {
679                 b = q->bfirst;
680                 if (b == 0) {
681                         q->state |= Qstarve;
682                         spin_unlock_irqsave(&q->lock);
683                         return -1;
684                 }
685                 QDEBUG checkb(b, "qconsume 1");
686
687                 n = BLEN(b);
688                 if (n > 0)
689                         break;
690                 q->bfirst = b->next;
691                 q->len -= BALLOC(b);
692
693                 /* remember to free this */
694                 b->next = tofree;
695                 tofree = b;
696         };
697
698         PANIC_EXTRA(b);
699         if (n < len)
700                 len = n;
701         memmove(p, b->rp, len);
702         consumecnt += n;
703         b->rp += len;
704         q->dlen -= len;
705
706         /* discard the block if we're done with it */
707         if ((q->state & Qmsg) || len == n) {
708                 q->bfirst = b->next;
709                 b->next = 0;
710                 q->len -= BALLOC(b);
711                 q->dlen -= BLEN(b);
712
713                 /* remember to free this */
714                 b->next = tofree;
715                 tofree = b;
716         }
717
718         /* if writer flow controlled, restart */
719         if ((q->state & Qflow) && q->len < q->limit / 2) {
720                 q->state &= ~Qflow;
721                 dowakeup = 1;
722         } else
723                 dowakeup = 0;
724
725         spin_unlock_irqsave(&q->lock);
726
727         if (dowakeup)
728                 rendez_wakeup(&q->wr);
729
730         if (tofree != NULL)
731                 freeblist(tofree);
732
733         return len;
734 }
735
736 int qpass(struct queue *q, struct block *b)
737 {
738         int dlen, len, dowakeup;
739
740         /* sync with qread */
741         dowakeup = 0;
742         spin_lock_irqsave(&q->lock);
743         if (q->len >= q->limit) {
744                 freeblist(b);
745                 spin_unlock_irqsave(&q->lock);
746                 return -1;
747         }
748         if (q->state & Qclosed) {
749                 len = blocklen(b);
750                 freeblist(b);
751                 spin_unlock_irqsave(&q->lock);
752                 return len;
753         }
754
755         /* add buffer to queue */
756         if (q->bfirst)
757                 q->blast->next = b;
758         else
759                 q->bfirst = b;
760         len = BALLOC(b);
761         dlen = BLEN(b);
762         QDEBUG checkb(b, "qpass");
763         while (b->next) {
764                 b = b->next;
765                 QDEBUG checkb(b, "qpass");
766                 len += BALLOC(b);
767                 dlen += BLEN(b);
768         }
769         q->blast = b;
770         q->len += len;
771         q->dlen += dlen;
772
773         if (q->len >= q->limit / 2)
774                 q->state |= Qflow;
775
776         if (q->state & Qstarve) {
777                 q->state &= ~Qstarve;
778                 dowakeup = 1;
779         }
780         spin_unlock_irqsave(&q->lock);
781
782         if (dowakeup)
783                 rendez_wakeup(&q->rr);
784
785         return len;
786 }
787
788 int qpassnolim(struct queue *q, struct block *b)
789 {
790         int dlen, len, dowakeup;
791
792         /* sync with qread */
793         dowakeup = 0;
794         spin_lock_irqsave(&q->lock);
795
796         if (q->state & Qclosed) {
797                 freeblist(b);
798                 spin_unlock_irqsave(&q->lock);
799                 return BALLOC(b);
800         }
801
802         /* add buffer to queue */
803         if (q->bfirst)
804                 q->blast->next = b;
805         else
806                 q->bfirst = b;
807         len = BALLOC(b);
808         dlen = BLEN(b);
809         QDEBUG checkb(b, "qpass");
810         while (b->next) {
811                 b = b->next;
812                 QDEBUG checkb(b, "qpass");
813                 len += BALLOC(b);
814                 dlen += BLEN(b);
815         }
816         q->blast = b;
817         q->len += len;
818         q->dlen += dlen;
819
820         if (q->len >= q->limit / 2)
821                 q->state |= Qflow;
822
823         if (q->state & Qstarve) {
824                 q->state &= ~Qstarve;
825                 dowakeup = 1;
826         }
827         spin_unlock_irqsave(&q->lock);
828
829         if (dowakeup)
830                 rendez_wakeup(&q->rr);
831
832         return len;
833 }
834
835 /*
836  *  if the allocated space is way out of line with the used
837  *  space, reallocate to a smaller block
838  */
839 struct block *packblock(struct block *bp)
840 {
841         struct block **l, *nbp;
842         int n;
843
844         if (bp->extra_len)
845                 return bp;
846         for (l = &bp; *l; l = &(*l)->next) {
847                 nbp = *l;
848                 n = BLEN(nbp);
849                 if ((n << 2) < BALLOC(nbp)) {
850                         *l = allocb(n);
851                         memmove((*l)->wp, nbp->rp, n);
852                         (*l)->wp += n;
853                         (*l)->next = nbp->next;
854                         freeb(nbp);
855                 }
856         }
857
858         return bp;
859 }
860
861 int qproduce(struct queue *q, void *vp, int len)
862 {
863         struct block *b;
864         int dowakeup;
865         uint8_t *p = vp;
866
867         /* sync with qread */
868         dowakeup = 0;
869         spin_lock_irqsave(&q->lock);
870
871         /* no waiting receivers, room in buffer? */
872         if (q->len >= q->limit) {
873                 q->state |= Qflow;
874                 spin_unlock_irqsave(&q->lock);
875                 return -1;
876         }
877
878         /* save in buffer */
879         /* use Qcoalesce here to save storage */
880         // TODO: Consider removing the Qcoalesce flag and force a coalescing
881         // strategy by default.
882         b = q->blast;
883         if ((q->state & Qcoalesce) == 0 || q->bfirst == NULL
884                 || b->lim - b->wp < len) {
885                 /* need a new block */
886                 b = iallocb(len);
887                 if (b == 0) {
888                         spin_unlock_irqsave(&q->lock);
889                         return 0;
890                 }
891                 if (q->bfirst)
892                         q->blast->next = b;
893                 else
894                         q->bfirst = b;
895                 q->blast = b;
896                 /* b->next = 0; done by iallocb() */
897                 q->len += BALLOC(b);
898         }
899         PANIC_EXTRA(b);
900         memmove(b->wp, p, len);
901         producecnt += len;
902         b->wp += len;
903         q->dlen += len;
904         QDEBUG checkb(b, "qproduce");
905
906         if (q->state & Qstarve) {
907                 q->state &= ~Qstarve;
908                 dowakeup = 1;
909         }
910
911         if (q->len >= q->limit)
912                 q->state |= Qflow;
913         spin_unlock_irqsave(&q->lock);
914
915         if (dowakeup)
916                 rendez_wakeup(&q->rr);
917
918         return len;
919 }
920
921 /* Add an extra_data entry to newb at newb_idx pointing to b's body, starting at
922  * body_rp, for up to len.  Returns the len consumed. 
923  *
924  * The base is 'b', so that we can kfree it later.  This currently ties us to
925  * using kfree for the release method for all extra_data.
926  *
927  * It is possible to have a body size that is 0, if there is no offset, and
928  * b->wp == b->rp.  This will have an extra data entry of 0 length. */
929 static size_t point_to_body(struct block *b, uint8_t *body_rp,
930                             struct block *newb, unsigned int newb_idx,
931                             size_t len)
932 {
933         struct extra_bdata *ebd = &newb->extra_data[newb_idx];
934
935         assert(newb_idx < newb->nr_extra_bufs);
936
937         kmalloc_incref(b);
938         ebd->base = (uintptr_t)b;
939         ebd->off = (uint32_t)(body_rp - (uint8_t*)b);
940         ebd->len = MIN(b->wp - body_rp, len);   /* think of body_rp as b->rp */
941         assert((int)ebd->len >= 0);
942         newb->extra_len += ebd->len;
943         return ebd->len;
944 }
945
946 /* Add an extra_data entry to newb at newb_idx pointing to b's b_idx'th
947  * extra_data buf, at b_off within that buffer, for up to len.  Returns the len
948  * consumed.
949  *
950  * We can have blocks with 0 length, but they are still refcnt'd.  See above. */
951 static size_t point_to_buf(struct block *b, unsigned int b_idx, uint32_t b_off,
952                            struct block *newb, unsigned int newb_idx,
953                            size_t len)
954 {
955         struct extra_bdata *n_ebd = &newb->extra_data[newb_idx];
956         struct extra_bdata *b_ebd = &b->extra_data[b_idx];
957
958         assert(b_idx < b->nr_extra_bufs);
959         assert(newb_idx < newb->nr_extra_bufs);
960
961         kmalloc_incref((void*)b_ebd->base);
962         n_ebd->base = b_ebd->base;
963         n_ebd->off = b_ebd->off + b_off;
964         n_ebd->len = MIN(b_ebd->len - b_off, len);
965         newb->extra_len += n_ebd->len;
966         return n_ebd->len;
967 }
968
969 /* given a string of blocks, fills the new block's extra_data  with the contents
970  * of the blist [offset, len + offset)
971  *
972  * returns 0 on success.  the only failure is if the extra_data array was too
973  * small, so this returns a positive integer saying how big the extra_data needs
974  * to be.
975  *
976  * callers are responsible for protecting the list structure. */
977 static int __blist_clone_to(struct block *blist, struct block *newb, int len,
978                             uint32_t offset)
979 {
980         struct block *b, *first;
981         unsigned int nr_bufs = 0;
982         unsigned int b_idx, newb_idx = 0;
983         uint8_t *first_main_body = 0;
984
985         /* find the first block; keep offset relative to the latest b in the list */
986         for (b = blist; b; b = b->next) {
987                 if (BLEN(b) > offset)
988                         break;
989                 offset -= BLEN(b);
990         }
991         /* qcopy semantics: if you asked for an offset outside the block list, you
992          * get an empty block back */
993         if (!b)
994                 return 0;
995         first = b;
996         /* upper bound for how many buffers we'll need in newb */
997         for (/* b is set*/; b; b = b->next) {
998                 nr_bufs += 1 + b->nr_extra_bufs;        /* 1 for the main body */
999         }
1000         /* we might be holding a spinlock here, so we won't wait for kmalloc */
1001         block_add_extd(newb, nr_bufs, 0);
1002         if (newb->nr_extra_bufs < nr_bufs) {
1003                 /* caller will need to alloc these, then re-call us */
1004                 return nr_bufs;
1005         }
1006         for (b = first; b && len; b = b->next) {
1007                 b_idx = 0;
1008                 if (offset) {
1009                         if (offset < BHLEN(b)) {
1010                                 /* off is in the main body */
1011                                 len -= point_to_body(b, b->rp + offset, newb, newb_idx, len);
1012                                 newb_idx++;
1013                         } else {
1014                                 /* off is in one of the buffers (or just past the last one).
1015                                  * we're not going to point to b's main body at all. */
1016                                 offset -= BHLEN(b);
1017                                 assert(b->extra_data);
1018                                 /* assuming these extrabufs are packed, or at least that len
1019                                  * isn't gibberish */
1020                                 while (b->extra_data[b_idx].len <= offset) {
1021                                         offset -= b->extra_data[b_idx].len;
1022                                         b_idx++;
1023                                 }
1024                                 /* now offset is set to our offset in the b_idx'th buf */
1025                                 len -= point_to_buf(b, b_idx, offset, newb, newb_idx, len);
1026                                 newb_idx++;
1027                                 b_idx++;
1028                         }
1029                         offset = 0;
1030                 } else {
1031                         len -= point_to_body(b, b->rp, newb, newb_idx, len);
1032                         newb_idx++;
1033                 }
1034                 /* knock out all remaining bufs.  we only did one point_to_ op by now,
1035                  * and any point_to_ could be our last if it consumed all of len. */
1036                 for (int i = b_idx; (i < b->nr_extra_bufs) && len; i++) {
1037                         len -= point_to_buf(b, i, 0, newb, newb_idx, len);
1038                         newb_idx++;
1039                 }
1040         }
1041         return 0;
1042 }
1043
1044 struct block *blist_clone(struct block *blist, int header_len, int len,
1045                           uint32_t offset)
1046 {
1047         int ret;
1048         struct block *newb = allocb(header_len);
1049         do {
1050                 ret = __blist_clone_to(blist, newb, len, offset);
1051                 if (ret)
1052                         block_add_extd(newb, ret, KMALLOC_WAIT);
1053         } while (ret);
1054         return newb;
1055 }
1056
1057 /* given a queue, makes a single block with header_len reserved space in the
1058  * block main body, and the contents of [offset, len + offset) pointed to in the
1059  * new blocks ext_data. */
1060 struct block *qclone(struct queue *q, int header_len, int len, uint32_t offset)
1061 {
1062         int ret;
1063         struct block *newb = allocb(header_len);
1064         /* the while loop should rarely be used: it would require someone
1065          * concurrently adding to the queue. */
1066         do {
1067                 /* TODO: RCU: protecting the q list (b->next) (need read lock) */
1068                 spin_lock_irqsave(&q->lock);
1069                 ret = __blist_clone_to(q->bfirst, newb, len, offset);
1070                 spin_unlock_irqsave(&q->lock);
1071                 if (ret)
1072                         block_add_extd(newb, ret, KMALLOC_WAIT);
1073         } while (ret);
1074         return newb;
1075 }
1076
1077 /*
1078  *  copy from offset in the queue
1079  */
1080 struct block *qcopy_old(struct queue *q, int len, uint32_t offset)
1081 {
1082         int sofar;
1083         int n;
1084         struct block *b, *nb;
1085         uint8_t *p;
1086
1087         nb = allocb(len);
1088
1089         spin_lock_irqsave(&q->lock);
1090
1091         /* go to offset */
1092         b = q->bfirst;
1093         for (sofar = 0;; sofar += n) {
1094                 if (b == NULL) {
1095                         spin_unlock_irqsave(&q->lock);
1096                         return nb;
1097                 }
1098                 n = BLEN(b);
1099                 if (sofar + n > offset) {
1100                         p = b->rp + offset - sofar;
1101                         n -= offset - sofar;
1102                         break;
1103                 }
1104                 QDEBUG checkb(b, "qcopy");
1105                 b = b->next;
1106         }
1107
1108         /* copy bytes from there */
1109         for (sofar = 0; sofar < len;) {
1110                 if (n > len - sofar)
1111                         n = len - sofar;
1112                 PANIC_EXTRA(b);
1113                 memmove(nb->wp, p, n);
1114                 qcopycnt += n;
1115                 sofar += n;
1116                 nb->wp += n;
1117                 b = b->next;
1118                 if (b == NULL)
1119                         break;
1120                 n = BLEN(b);
1121                 p = b->rp;
1122         }
1123         spin_unlock_irqsave(&q->lock);
1124
1125         return nb;
1126 }
1127
1128 struct block *qcopy(struct queue *q, int len, uint32_t offset)
1129 {
1130 #ifdef CONFIG_BLOCK_EXTRAS
1131         return qclone(q, 0, len, offset);
1132 #else
1133         return qcopy_old(q, len, offset);
1134 #endif
1135 }
1136
1137 static void qinit_common(struct queue *q)
1138 {
1139         spinlock_init_irqsave(&q->lock);
1140         qlock_init(&q->rlock);
1141         qlock_init(&q->wlock);
1142         rendez_init(&q->rr);
1143         rendez_init(&q->wr);
1144 }
1145
1146 /*
1147  *  called by non-interrupt code
1148  */
1149 struct queue *qopen(int limit, int msg, void (*kick) (void *), void *arg)
1150 {
1151         struct queue *q;
1152
1153         q = kzmalloc(sizeof(struct queue), 0);
1154         if (q == 0)
1155                 return 0;
1156         qinit_common(q);
1157
1158         q->limit = q->inilim = limit;
1159         q->kick = kick;
1160         q->arg = arg;
1161         q->state = msg;
1162         q->state |= Qstarve;
1163         q->eof = 0;
1164         q->noblock = 0;
1165
1166         return q;
1167 }
1168
1169 /* open a queue to be bypassed */
1170 struct queue *qbypass(void (*bypass) (void *, struct block *), void *arg)
1171 {
1172         struct queue *q;
1173
1174         q = kzmalloc(sizeof(struct queue), 0);
1175         if (q == 0)
1176                 return 0;
1177         qinit_common(q);
1178
1179         q->limit = 0;
1180         q->arg = arg;
1181         q->bypass = bypass;
1182         q->state = 0;
1183
1184         return q;
1185 }
1186
1187 static int notempty(void *a)
1188 {
1189         struct queue *q = a;
1190
1191         return (q->state & Qclosed) || q->bfirst != 0;
1192 }
1193
1194 /* wait for the queue to be non-empty or closed.
1195  *
1196  * called with q ilocked.  rendez may error out, back through the caller, with
1197  * the irqsave lock unlocked.  */
1198 static int qwait(struct queue *q)
1199 {
1200         /* wait for data */
1201         for (;;) {
1202                 if (q->bfirst != NULL)
1203                         break;
1204
1205                 if (q->state & Qclosed) {
1206                         if (++q->eof > 3)
1207                                 return -1;
1208                         if (*q->err && strcmp(q->err, Ehungup) != 0)
1209                                 return -1;
1210                         return 0;
1211                 }
1212
1213                 q->state |= Qstarve;    /* flag requesting producer to wake me */
1214                 spin_unlock_irqsave(&q->lock);
1215                 /* may throw an error() */
1216                 rendez_sleep(&q->rr, notempty, q);
1217                 spin_lock_irqsave(&q->lock);
1218         }
1219         return 1;
1220 }
1221
1222 /*
1223  * add a block list to a queue
1224  */
1225 void qaddlist(struct queue *q, struct block *b)
1226 {
1227         /* TODO: q lock? */
1228         /* queue the block */
1229         if (q->bfirst)
1230                 q->blast->next = b;
1231         else
1232                 q->bfirst = b;
1233         q->len += blockalloclen(b);
1234         q->dlen += blocklen(b);
1235         while (b->next)
1236                 b = b->next;
1237         q->blast = b;
1238 }
1239
1240 /*
1241  *  called with q ilocked
1242  */
1243 struct block *qremove(struct queue *q)
1244 {
1245         struct block *b;
1246
1247         b = q->bfirst;
1248         if (b == NULL)
1249                 return NULL;
1250         q->bfirst = b->next;
1251         b->next = NULL;
1252         q->dlen -= BLEN(b);
1253         q->len -= BALLOC(b);
1254         QDEBUG checkb(b, "qremove");
1255         return b;
1256 }
1257
1258 static size_t read_from_block(struct block *b, uint8_t *to, size_t amt)
1259 {
1260         size_t copy_amt, retval = 0;
1261         struct extra_bdata *ebd;
1262         
1263         copy_amt = MIN(BHLEN(b), amt);
1264         memcpy(to, b->rp, copy_amt);
1265         /* advance the rp, since this block not be completely consumed and future
1266          * reads need to know where to pick up from */
1267         b->rp += copy_amt;
1268         to += copy_amt;
1269         amt -= copy_amt;
1270         retval += copy_amt;
1271         for (int i = 0; (i < b->nr_extra_bufs) && amt; i++) {
1272                 ebd = &b->extra_data[i];
1273                 /* skip empty entires.  if we track this in the struct block, we can
1274                  * just start the for loop early */
1275                 if (!ebd->base || !ebd->len)
1276                         continue;
1277                 copy_amt = MIN(ebd->len, amt);
1278                 memcpy(to, (void*)(ebd->base + ebd->off), copy_amt);
1279                 /* we're actually consuming the entries, just like how we advance rp up
1280                  * above, and might only consume part of one. */
1281                 ebd->len -= copy_amt;
1282                 ebd->off += copy_amt;
1283                 b->extra_len -= copy_amt;
1284                 if (!ebd->len) {
1285                         /* we don't actually have to decref here.  it's also done in
1286                          * freeb().  this is the earliest we can free. */
1287                         kfree((void*)ebd->base);
1288                         ebd->base = ebd->off = 0;
1289                 }
1290                 to += copy_amt;
1291                 amt -= copy_amt;
1292                 retval += copy_amt;
1293         }
1294         return retval;
1295 }
1296
1297 /*
1298  *  copy the contents of a string of blocks into
1299  *  memory.  emptied blocks are freed.  return
1300  *  pointer to first unconsumed block.
1301  */
1302 struct block *bl2mem(uint8_t * p, struct block *b, int n)
1303 {
1304         int i;
1305         struct block *next;
1306
1307         /* could be slicker here, since read_from_block is smart */
1308         for (; b != NULL; b = next) {
1309                 i = BLEN(b);
1310                 if (i > n) {
1311                         /* partial block, consume some */
1312                         read_from_block(b, p, n);
1313                         return b;
1314                 }
1315                 /* full block, consume all and move on */
1316                 i = read_from_block(b, p, i);
1317                 n -= i;
1318                 p += i;
1319                 next = b->next;
1320                 freeb(b);
1321         }
1322         return NULL;
1323 }
1324
1325 /*
1326  *  copy the contents of memory into a string of blocks.
1327  *  return NULL on error.
1328  */
1329 struct block *mem2bl(uint8_t * p, int len)
1330 {
1331         ERRSTACK(1);
1332         int n;
1333         struct block *b, *first, **l;
1334
1335         first = NULL;
1336         l = &first;
1337         if (waserror()) {
1338                 freeblist(first);
1339                 nexterror();
1340         }
1341         do {
1342                 n = len;
1343                 if (n > Maxatomic)
1344                         n = Maxatomic;
1345
1346                 *l = b = allocb(n);
1347                 /* TODO consider extra_data */
1348                 memmove(b->wp, p, n);
1349                 b->wp += n;
1350                 p += n;
1351                 len -= n;
1352                 l = &b->next;
1353         } while (len > 0);
1354         poperror();
1355
1356         return first;
1357 }
1358
1359 /*
1360  *  put a block back to the front of the queue
1361  *  called with q ilocked
1362  */
1363 void qputback(struct queue *q, struct block *b)
1364 {
1365         b->next = q->bfirst;
1366         if (q->bfirst == NULL)
1367                 q->blast = b;
1368         q->bfirst = b;
1369         q->len += BALLOC(b);
1370         q->dlen += BLEN(b);
1371 }
1372
1373 /*
1374  *  flow control, get producer going again
1375  *  called with q ilocked
1376  */
1377 static void qwakeup_iunlock(struct queue *q)
1378 {
1379         int dowakeup = 0;
1380
1381         /* if writer flow controlled, restart */
1382         if ((q->state & Qflow) && q->len < q->limit / 2) {
1383                 q->state &= ~Qflow;
1384                 dowakeup = 1;
1385         }
1386
1387         spin_unlock_irqsave(&q->lock);
1388
1389         /* wakeup flow controlled writers */
1390         if (dowakeup) {
1391                 if (q->kick)
1392                         q->kick(q->arg);
1393                 rendez_wakeup(&q->wr);
1394         }
1395 }
1396
1397 /*
1398  *  get next block from a queue (up to a limit)
1399  */
1400 struct block *qbread(struct queue *q, int len)
1401 {
1402         ERRSTACK(1);
1403         struct block *b, *nb;
1404         int n;
1405
1406         qlock(&q->rlock);
1407         if (waserror()) {
1408                 qunlock(&q->rlock);
1409                 nexterror();
1410         }
1411
1412         spin_lock_irqsave(&q->lock);
1413         switch (qwait(q)) {
1414                 case 0:
1415                         /* queue closed */
1416                         spin_unlock_irqsave(&q->lock);
1417                         qunlock(&q->rlock);
1418                         poperror();
1419                         return NULL;
1420                 case -1:
1421                         /* multiple reads on a closed queue */
1422                         spin_unlock_irqsave(&q->lock);
1423                         error(q->err);
1424         }
1425
1426         /* if we get here, there's at least one block in the queue */
1427         b = qremove(q);
1428         n = BLEN(b);
1429
1430         /* split block if it's too big and this is not a message queue */
1431         nb = b;
1432         if (n > len) {
1433                 PANIC_EXTRA(b);
1434                 if ((q->state & Qmsg) == 0) {
1435                         n -= len;
1436                         b = allocb(n);
1437                         memmove(b->wp, nb->rp + len, n);
1438                         b->wp += n;
1439                         qputback(q, b);
1440                 }
1441                 nb->wp = nb->rp + len;
1442         }
1443
1444         /* restart producer */
1445         qwakeup_iunlock(q);
1446
1447         poperror();
1448         qunlock(&q->rlock);
1449         return nb;
1450 }
1451
1452 /*
1453  *  read a queue.  if no data is queued, post a struct block
1454  *  and wait on its Rendez.
1455  */
1456 long qread(struct queue *q, void *vp, int len)
1457 {
1458         ERRSTACK(1);
1459         struct block *b, *first, **l;
1460         int m, n;
1461
1462         qlock(&q->rlock);
1463         if (waserror()) {
1464                 qunlock(&q->rlock);
1465                 nexterror();
1466         }
1467
1468         spin_lock_irqsave(&q->lock);
1469 again:
1470         switch (qwait(q)) {
1471                 case 0:
1472                         /* queue closed */
1473                         spin_unlock_irqsave(&q->lock);
1474                         qunlock(&q->rlock);
1475                         poperror();
1476                         return 0;
1477                 case -1:
1478                         /* multiple reads on a closed queue */
1479                         spin_unlock_irqsave(&q->lock);
1480                         error(q->err);
1481         }
1482
1483         /* if we get here, there's at least one block in the queue */
1484         // TODO: Consider removing the Qcoalesce flag and force a coalescing
1485         // strategy by default.
1486         if (q->state & Qcoalesce) {
1487                 /* when coalescing, 0 length blocks just go away */
1488                 b = q->bfirst;
1489                 if (BLEN(b) <= 0) {
1490                         freeb(qremove(q));
1491                         goto again;
1492                 }
1493
1494                 /*  grab the first block plus as many
1495                  *  following blocks as will completely
1496                  *  fit in the read.
1497                  */
1498                 n = 0;
1499                 l = &first;
1500                 m = BLEN(b);
1501                 for (;;) {
1502                         *l = qremove(q);
1503                         l = &b->next;
1504                         n += m;
1505
1506                         b = q->bfirst;
1507                         if (b == NULL)
1508                                 break;
1509                         m = BLEN(b);
1510                         if (n + m > len)
1511                                 break;
1512                 }
1513         } else {
1514                 first = qremove(q);
1515                 n = BLEN(first);
1516         }
1517
1518         /* copy to user space outside of the ilock */
1519         spin_unlock_irqsave(&q->lock);
1520         b = bl2mem(vp, first, len);
1521         spin_lock_irqsave(&q->lock);
1522
1523         /* take care of any left over partial block */
1524         if (b != NULL) {
1525                 n -= BLEN(b);
1526                 if (q->state & Qmsg)
1527                         freeb(b);
1528                 else
1529                         qputback(q, b);
1530         }
1531
1532         /* restart producer */
1533         qwakeup_iunlock(q);
1534
1535         poperror();
1536         qunlock(&q->rlock);
1537         return n;
1538 }
1539
1540 static int qnotfull(void *a)
1541 {
1542         struct queue *q = a;
1543
1544         return q->len < q->limit || (q->state & Qclosed);
1545 }
1546
1547 uint32_t noblockcnt;
1548
1549 /*
1550  *  add a block to a queue obeying flow control
1551  */
1552 long qbwrite(struct queue *q, struct block *b)
1553 {
1554         ERRSTACK(1);
1555         int n, dowakeup;
1556         volatile bool should_free_b = TRUE;
1557
1558         n = BLEN(b);
1559
1560         if (q->bypass) {
1561                 (*q->bypass) (q->arg, b);
1562                 return n;
1563         }
1564
1565         dowakeup = 0;
1566         qlock(&q->wlock);
1567         if (waserror()) {
1568                 if (b != NULL && should_free_b)
1569                         freeb(b);
1570                 qunlock(&q->wlock);
1571                 nexterror();
1572         }
1573
1574         spin_lock_irqsave(&q->lock);
1575
1576         /* give up if the queue is closed */
1577         if (q->state & Qclosed) {
1578                 spin_unlock_irqsave(&q->lock);
1579                 error(q->err);
1580         }
1581
1582         /* if nonblocking, don't queue over the limit */
1583         if (q->len >= q->limit) {
1584                 if (q->noblock) {
1585                         spin_unlock_irqsave(&q->lock);
1586                         freeb(b);
1587                         noblockcnt += n;
1588                         qunlock(&q->wlock);
1589                         poperror();
1590                         return n;
1591                 }
1592         }
1593
1594         /* queue the block */
1595         should_free_b = FALSE;
1596         if (q->bfirst)
1597                 q->blast->next = b;
1598         else
1599                 q->bfirst = b;
1600         q->blast = b;
1601         b->next = 0;
1602         q->len += BALLOC(b);
1603         q->dlen += n;
1604         QDEBUG checkb(b, "qbwrite");
1605         b = NULL;
1606
1607         /* make sure other end gets awakened */
1608         if (q->state & Qstarve) {
1609                 q->state &= ~Qstarve;
1610                 dowakeup = 1;
1611         }
1612         spin_unlock_irqsave(&q->lock);
1613
1614         /*  get output going again */
1615         if (q->kick && (dowakeup || (q->state & Qkick)))
1616                 q->kick(q->arg);
1617
1618         /* wakeup anyone consuming at the other end */
1619         if (dowakeup)
1620                 rendez_wakeup(&q->rr);
1621
1622         /*
1623          *  flow control, wait for queue to get below the limit
1624          *  before allowing the process to continue and queue
1625          *  more.  We do this here so that postnote can only
1626          *  interrupt us after the data has been queued.  This
1627          *  means that things like 9p flushes and ssl messages
1628          *  will not be disrupted by software interrupts.
1629          *
1630          *  Note - this is moderately dangerous since a process
1631          *  that keeps getting interrupted and rewriting will
1632          *  queue infinite crud.
1633          */
1634         for (;;) {
1635                 if (q->noblock || qnotfull(q))
1636                         break;
1637
1638                 spin_lock_irqsave(&q->lock);
1639                 q->state |= Qflow;
1640                 spin_unlock_irqsave(&q->lock);
1641                 rendez_sleep(&q->wr, qnotfull, q);
1642         }
1643
1644         qunlock(&q->wlock);
1645         poperror();
1646         return n;
1647 }
1648
1649 long qibwrite(struct queue *q, struct block *b)
1650 {
1651         int n, dowakeup;
1652
1653         dowakeup = 0;
1654
1655         n = BLEN(b);
1656
1657         spin_lock_irqsave(&q->lock);
1658
1659         QDEBUG checkb(b, "qibwrite");
1660         if (q->bfirst)
1661                 q->blast->next = b;
1662         else
1663                 q->bfirst = b;
1664         q->blast = b;
1665         q->len += BALLOC(b);
1666         q->dlen += n;
1667
1668         if (q->state & Qstarve) {
1669                 q->state &= ~Qstarve;
1670                 dowakeup = 1;
1671         }
1672
1673         spin_unlock_irqsave(&q->lock);
1674
1675         if (dowakeup) {
1676                 if (q->kick)
1677                         q->kick(q->arg);
1678                 rendez_wakeup(&q->rr);
1679         }
1680
1681         return n;
1682 }
1683
1684 /*
1685  *  write to a queue.  only Maxatomic bytes at a time is atomic.
1686  */
1687 int qwrite(struct queue *q, void *vp, int len)
1688 {
1689         int n, sofar;
1690         struct block *b;
1691         uint8_t *p = vp;
1692         void *ext_buf;
1693
1694         QDEBUG if (!islo())
1695                  printd("qwrite hi %p\n", getcallerpc(&q));
1696
1697         sofar = 0;
1698         do {
1699                 n = len - sofar;
1700                 /* This is 64K, the max amount per single block.  Still a good value? */
1701                 if (n > Maxatomic)
1702                         n = Maxatomic;
1703
1704                 /* If n is small, we don't need to bother with the extra_data.  But
1705                  * until the whole stack can handle extd blocks, we'll use them
1706                  * unconditionally. */
1707 #ifdef CONFIG_BLOCK_EXTRAS
1708                 /* allocb builds in 128 bytes of header space to all blocks, but this is
1709                  * only available via padblock (to the left).  we also need some space
1710                  * for pullupblock for some basic headers (like icmp) that get written
1711                  * in directly */
1712                 b = allocb(64);
1713                 ext_buf = kmalloc(n, 0);
1714                 memcpy(ext_buf, p + sofar, n);
1715                 block_add_extd(b, 1, KMALLOC_WAIT); /* returns 0 on success */
1716                 b->extra_data[0].base = (uintptr_t)ext_buf;
1717                 b->extra_data[0].off = 0;
1718                 b->extra_data[0].len = n;
1719                 b->extra_len += n;
1720 #else
1721                 b = allocb(n);
1722                 memmove(b->wp, p + sofar, n);
1723                 b->wp += n;
1724 #endif
1725                         
1726                 qbwrite(q, b);
1727
1728                 sofar += n;
1729         } while (sofar < len && (q->state & Qmsg) == 0);
1730
1731         return len;
1732 }
1733
1734 /*
1735  *  used by print() to write to a queue.  Since we may be splhi or not in
1736  *  a process, don't qlock.
1737  */
1738 int qiwrite(struct queue *q, void *vp, int len)
1739 {
1740         int n, sofar, dowakeup;
1741         struct block *b;
1742         uint8_t *p = vp;
1743
1744         dowakeup = 0;
1745
1746         sofar = 0;
1747         do {
1748                 n = len - sofar;
1749                 if (n > Maxatomic)
1750                         n = Maxatomic;
1751
1752                 b = iallocb(n);
1753                 if (b == NULL)
1754                         break;
1755                 /* TODO consider extra_data */
1756                 memmove(b->wp, p + sofar, n);
1757                 /* this adjusts BLEN to be n, or at least it should */
1758                 b->wp += n;
1759                 assert(n == BLEN(b));
1760                 qibwrite(q, b);
1761
1762                 sofar += n;
1763         } while (sofar < len && (q->state & Qmsg) == 0);
1764
1765         return sofar;
1766 }
1767
1768 /*
1769  *  be extremely careful when calling this,
1770  *  as there is no reference accounting
1771  */
1772 void qfree(struct queue *q)
1773 {
1774         qclose(q);
1775         kfree(q);
1776 }
1777
1778 /*
1779  *  Mark a queue as closed.  No further IO is permitted.
1780  *  All blocks are released.
1781  */
1782 void qclose(struct queue *q)
1783 {
1784         struct block *bfirst;
1785
1786         if (q == NULL)
1787                 return;
1788
1789         /* mark it */
1790         spin_lock_irqsave(&q->lock);
1791         q->state |= Qclosed;
1792         q->state &= ~(Qflow | Qstarve);
1793         strncpy(q->err, Ehungup, sizeof(q->err));
1794         bfirst = q->bfirst;
1795         q->bfirst = 0;
1796         q->len = 0;
1797         q->dlen = 0;
1798         q->noblock = 0;
1799         spin_unlock_irqsave(&q->lock);
1800
1801         /* free queued blocks */
1802         freeblist(bfirst);
1803
1804         /* wake up readers/writers */
1805         rendez_wakeup(&q->rr);
1806         rendez_wakeup(&q->wr);
1807 }
1808
1809 /*
1810  *  Mark a queue as closed.  Wakeup any readers.  Don't remove queued
1811  *  blocks.
1812  */
1813 void qhangup(struct queue *q, char *msg)
1814 {
1815         /* mark it */
1816         spin_lock_irqsave(&q->lock);
1817         q->state |= Qclosed;
1818         if (msg == 0 || *msg == 0)
1819                 strncpy(q->err, Ehungup, sizeof(q->err));
1820         else
1821                 strncpy(q->err, msg, ERRMAX - 1);
1822         spin_unlock_irqsave(&q->lock);
1823
1824         /* wake up readers/writers */
1825         rendez_wakeup(&q->rr);
1826         rendez_wakeup(&q->wr);
1827 }
1828
1829 /*
1830  *  return non-zero if the q is hungup
1831  */
1832 int qisclosed(struct queue *q)
1833 {
1834         return q->state & Qclosed;
1835 }
1836
1837 /*
1838  *  mark a queue as no longer hung up
1839  */
1840 void qreopen(struct queue *q)
1841 {
1842         spin_lock_irqsave(&q->lock);
1843         q->state &= ~Qclosed;
1844         q->state |= Qstarve;
1845         q->eof = 0;
1846         q->limit = q->inilim;
1847         spin_unlock_irqsave(&q->lock);
1848 }
1849
1850 /*
1851  *  return bytes queued
1852  */
1853 int qlen(struct queue *q)
1854 {
1855         return q->dlen;
1856 }
1857
1858 /*
1859  * return space remaining before flow control
1860  */
1861 int qwindow(struct queue *q)
1862 {
1863         int l;
1864
1865         l = q->limit - q->len;
1866         if (l < 0)
1867                 l = 0;
1868         return l;
1869 }
1870
1871 /*
1872  *  return true if we can read without blocking
1873  */
1874 int qcanread(struct queue *q)
1875 {
1876         return q->bfirst != 0;
1877 }
1878
1879 /*
1880  *  change queue limit
1881  */
1882 void qsetlimit(struct queue *q, int limit)
1883 {
1884         q->limit = limit;
1885 }
1886
1887 /*
1888  *  set blocking/nonblocking
1889  */
1890 void qnoblock(struct queue *q, int onoff)
1891 {
1892         q->noblock = onoff;
1893 }
1894
1895 /*
1896  *  flush the output queue
1897  */
1898 void qflush(struct queue *q)
1899 {
1900         struct block *bfirst;
1901
1902         /* mark it */
1903         spin_lock_irqsave(&q->lock);
1904         bfirst = q->bfirst;
1905         q->bfirst = 0;
1906         q->len = 0;
1907         q->dlen = 0;
1908         spin_unlock_irqsave(&q->lock);
1909
1910         /* free queued blocks */
1911         freeblist(bfirst);
1912
1913         /* wake up readers/writers */
1914         rendez_wakeup(&q->wr);
1915 }
1916
1917 int qfull(struct queue *q)
1918 {
1919         return q->state & Qflow;
1920 }
1921
1922 int qstate(struct queue *q)
1923 {
1924         return q->state;
1925 }
1926
1927 void qdump(struct queue *q)
1928 {
1929         if (q)
1930                 printk("q=%p bfirst=%p blast=%p len=%d dlen=%d limit=%d state=#%x\n",
1931                            q, q->bfirst, q->blast, q->len, q->dlen, q->limit, q->state);
1932 }