Implement transmit checksum offload.
[akaros.git] / kern / src / ns / qio.c
1 // INFERNO
2 #include <vfs.h>
3 #include <kfs.h>
4 #include <slab.h>
5 #include <kmalloc.h>
6 #include <kref.h>
7 #include <string.h>
8 #include <stdio.h>
9 #include <assert.h>
10 #include <error.h>
11 #include <cpio.h>
12 #include <pmap.h>
13 #include <smp.h>
14 #include <ip.h>
15
16 static uint32_t padblockcnt;
17 static uint32_t concatblockcnt;
18 static uint32_t pullupblockcnt;
19 static uint32_t copyblockcnt;
20 static uint32_t consumecnt;
21 static uint32_t producecnt;
22 static uint32_t qcopycnt;
23
24 static int debugging;
25
26 #define QDEBUG  if(0)
27
28 /*
29  *  IO queues
30  */
31
32 struct queue {
33         spinlock_t lock;;
34
35         struct block *bfirst;           /* buffer */
36         struct block *blast;
37
38         int len;                                        /* bytes allocated to queue */
39         int dlen;                                       /* data bytes in queue */
40         int limit;                                      /* max bytes in queue */
41         int inilim;                             /* initial limit */
42         int state;
43         int noblock;                            /* true if writes return immediately when q full */
44         int eof;                                        /* number of eofs read by user */
45
46         void (*kick) (void *);          /* restart output */
47         void (*bypass) (void *, struct block *);        /* bypass queue altogether */
48         void *arg;                                      /* argument to kick */
49
50         qlock_t rlock;                          /* mutex for reading processes */
51         struct rendez rr;                       /* process waiting to read */
52         qlock_t wlock;                          /* mutex for writing processes */
53         struct rendez wr;                       /* process waiting to write */
54
55         char err[ERRMAX];
56 };
57
58 enum {
59         Maxatomic = 64 * 1024,
60 };
61
62 unsigned int qiomaxatomic = Maxatomic;
63
64 void ixsummary(void)
65 {
66         debugging ^= 1;
67         iallocsummary();
68         printd("pad %lu, concat %lu, pullup %lu, copy %lu\n",
69                    padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
70         printd("consume %lu, produce %lu, qcopy %lu\n",
71                    consumecnt, producecnt, qcopycnt);
72 }
73
74 /*
75  *  free a list of blocks
76  */
77 void freeblist(struct block *b)
78 {
79         struct block *next;
80
81         for (; b != 0; b = next) {
82                 next = b->next;
83                 b->next = 0;
84                 freeb(b);
85         }
86 }
87
88 /*
89  *  pad a block to the front (or the back if size is negative)
90  */
91 struct block *padblock(struct block *bp, int size)
92 {
93         int n;
94         struct block *nbp;
95         uint8_t bcksum = bp->flag & BCKSUM_FLAGS;
96         uint16_t checksum_start = bp->checksum_start;
97         uint16_t checksum_offset = bp->checksum_offset;
98
99         QDEBUG checkb(bp, "padblock 1");
100         if (size >= 0) {
101                 if (bp->rp - bp->base >= size) {
102                         bp->checksum_start += size;
103                         bp->rp -= size;
104                         return bp;
105                 }
106
107                 if (bp->next)
108                         panic("padblock %p", getcallerpc(&bp));
109                 n = BLEN(bp);
110                 padblockcnt++;
111                 nbp = allocb(size + n);
112                 nbp->rp += size;
113                 nbp->wp = nbp->rp;
114                 memmove(nbp->wp, bp->rp, n);
115                 nbp->wp += n;
116                 freeb(bp);
117                 nbp->rp -= size;
118         } else {
119                 size = -size;
120
121                 if (bp->next)
122                         panic("padblock %p", getcallerpc(&bp));
123
124                 if (bp->lim - bp->wp >= size)
125                         return bp;
126
127                 n = BLEN(bp);
128                 padblockcnt++;
129                 nbp = allocb(size + n);
130                 memmove(nbp->wp, bp->rp, n);
131                 nbp->wp += n;
132                 freeb(bp);
133         }
134         if (bcksum) {
135                 nbp->flag |= bcksum;
136                 nbp->checksum_start = checksum_start;
137                 nbp->checksum_offset = checksum_offset;
138         }
139         QDEBUG checkb(nbp, "padblock 1");
140         return nbp;
141 }
142
143 /*
144  *  return count of bytes in a string of blocks
145  */
146 int blocklen(struct block *bp)
147 {
148         int len;
149
150         len = 0;
151         while (bp) {
152                 len += BLEN(bp);
153                 bp = bp->next;
154         }
155         return len;
156 }
157
158 /*
159  * return count of space in blocks
160  */
161 int blockalloclen(struct block *bp)
162 {
163         int len;
164
165         len = 0;
166         while (bp) {
167                 len += BALLOC(bp);
168                 bp = bp->next;
169         }
170         return len;
171 }
172
173 /*
174  *  copy the  string of blocks into
175  *  a single block and free the string
176  */
177 struct block *concatblock(struct block *bp)
178 {
179         int len;
180         struct block *nb, *f;
181
182         if (bp->next == 0)
183                 return bp;
184
185         nb = allocb(blocklen(bp));
186         for (f = bp; f; f = f->next) {
187                 len = BLEN(f);
188                 memmove(nb->wp, f->rp, len);
189                 nb->wp += len;
190         }
191         concatblockcnt += BLEN(nb);
192         freeblist(bp);
193         QDEBUG checkb(nb, "concatblock 1");
194         return nb;
195 }
196
197 /*
198  *  make sure the first block has at least n bytes
199  */
200 struct block *pullupblock(struct block *bp, int n)
201 {
202         int i;
203         struct block *nbp;
204
205         /*
206          *  this should almost always be true, it's
207          *  just to avoid every caller checking.
208          */
209         if (BLEN(bp) >= n)
210                 return bp;
211
212         /*
213          *  if not enough room in the first block,
214          *  add another to the front of the list.
215          */
216         if (bp->lim - bp->rp < n) {
217                 nbp = allocb(n);
218                 nbp->next = bp;
219                 bp = nbp;
220         }
221
222         /*
223          *  copy bytes from the trailing blocks into the first
224          */
225         n -= BLEN(bp);
226         while ((nbp = bp->next)) {
227                 i = BLEN(nbp);
228                 if (i > n) {
229                         memmove(bp->wp, nbp->rp, n);
230                         pullupblockcnt++;
231                         bp->wp += n;
232                         nbp->rp += n;
233                         QDEBUG checkb(bp, "pullupblock 1");
234                         return bp;
235                 } else {
236                         memmove(bp->wp, nbp->rp, i);
237                         pullupblockcnt++;
238                         bp->wp += i;
239                         bp->next = nbp->next;
240                         nbp->next = 0;
241                         freeb(nbp);
242                         n -= i;
243                         if (n == 0) {
244                                 QDEBUG checkb(bp, "pullupblock 2");
245                                 return bp;
246                         }
247                 }
248         }
249         freeb(bp);
250         return 0;
251 }
252
253 /*
254  *  make sure the first block has at least n bytes
255  */
256 struct block *pullupqueue(struct queue *q, int n)
257 {
258         struct block *b;
259
260         if ((BLEN(q->bfirst) >= n))
261                 return q->bfirst;
262         q->bfirst = pullupblock(q->bfirst, n);
263         for (b = q->bfirst; b != NULL && b->next != NULL; b = b->next) ;
264         q->blast = b;
265         return q->bfirst;
266 }
267
268 /*
269  *  trim to len bytes starting at offset
270  */
271 struct block *trimblock(struct block *bp, int offset, int len)
272 {
273         uint32_t l;
274         struct block *nb, *startb;
275
276         QDEBUG checkb(bp, "trimblock 1");
277         if (blocklen(bp) < offset + len) {
278                 freeblist(bp);
279                 return NULL;
280         }
281
282         while ((l = BLEN(bp)) < offset) {
283                 offset -= l;
284                 nb = bp->next;
285                 bp->next = NULL;
286                 freeb(bp);
287                 bp = nb;
288         }
289
290         startb = bp;
291         bp->rp += offset;
292
293         while ((l = BLEN(bp)) < len) {
294                 len -= l;
295                 bp = bp->next;
296         }
297
298         bp->wp -= (BLEN(bp) - len);
299
300         if (bp->next) {
301                 freeblist(bp->next);
302                 bp->next = NULL;
303         }
304
305         return startb;
306 }
307
308 /*
309  *  copy 'count' bytes into a new block
310  */
311 struct block *copyblock(struct block *bp, int count)
312 {
313         int l;
314         struct block *nbp;
315
316         QDEBUG checkb(bp, "copyblock 0");
317         nbp = allocb(count);
318         if (bp->flag & BCKSUM_FLAGS) {
319                 nbp->flag |= (bp->flag & BCKSUM_FLAGS);
320                 nbp->checksum_start = bp->checksum_start;
321                 nbp->checksum_offset = bp->checksum_offset;
322         }
323         for (; count > 0 && bp != 0; bp = bp->next) {
324                 l = BLEN(bp);
325                 if (l > count)
326                         l = count;
327                 memmove(nbp->wp, bp->rp, l);
328                 nbp->wp += l;
329                 count -= l;
330         }
331         if (count > 0) {
332                 memset(nbp->wp, 0, count);
333                 nbp->wp += count;
334         }
335         copyblockcnt++;
336         QDEBUG checkb(nbp, "copyblock 1");
337
338         return nbp;
339 }
340
341 struct block *adjustblock(struct block *bp, int len)
342 {
343         int n;
344         struct block *nbp;
345
346         if (len < 0) {
347                 freeb(bp);
348                 return NULL;
349         }
350
351         if (bp->rp + len > bp->lim) {
352                 nbp = copyblock(bp, len);
353                 freeblist(bp);
354                 QDEBUG checkb(nbp, "adjustblock 1");
355
356                 return nbp;
357         }
358
359         n = BLEN(bp);
360         if (len > n)
361                 memset(bp->wp, 0, len - n);
362         bp->wp = bp->rp + len;
363         QDEBUG checkb(bp, "adjustblock 2");
364
365         return bp;
366 }
367
368 /*
369  *  throw away up to count bytes from a
370  *  list of blocks.  Return count of bytes
371  *  thrown away.
372  */
373 int pullblock(struct block **bph, int count)
374 {
375         struct block *bp;
376         int n, bytes;
377
378         bytes = 0;
379         if (bph == NULL)
380                 return 0;
381
382         while (*bph != NULL && count != 0) {
383                 bp = *bph;
384                 n = BLEN(bp);
385                 if (count < n)
386                         n = count;
387                 bytes += n;
388                 count -= n;
389                 bp->rp += n;
390                 QDEBUG checkb(bp, "pullblock ");
391                 if (BLEN(bp) == 0) {
392                         *bph = bp->next;
393                         bp->next = NULL;
394                         freeb(bp);
395                 }
396         }
397         return bytes;
398 }
399
400 /*
401  *  get next block from a queue, return null if nothing there
402  */
403 struct block *qget(struct queue *q)
404 {
405         int dowakeup;
406         struct block *b;
407
408         /* sync with qwrite */
409         spin_lock_irqsave(&q->lock);
410
411         b = q->bfirst;
412         if (b == NULL) {
413                 q->state |= Qstarve;
414                 spin_unlock_irqsave(&q->lock);
415                 return NULL;
416         }
417         q->bfirst = b->next;
418         b->next = 0;
419         q->len -= BALLOC(b);
420         q->dlen -= BLEN(b);
421         QDEBUG checkb(b, "qget");
422
423         /* if writer flow controlled, restart */
424         if ((q->state & Qflow) && q->len < q->limit / 2) {
425                 q->state &= ~Qflow;
426                 dowakeup = 1;
427         } else
428                 dowakeup = 0;
429
430         spin_unlock_irqsave(&q->lock);
431
432         if (dowakeup)
433                 rendez_wakeup(&q->wr);
434
435         return b;
436 }
437
438 /*
439  *  throw away the next 'len' bytes in the queue
440  * returning the number actually discarded
441  */
442 int qdiscard(struct queue *q, int len)
443 {
444         struct block *b;
445         int dowakeup, n, sofar;
446
447         spin_lock_irqsave(&q->lock);
448         for (sofar = 0; sofar < len; sofar += n) {
449                 b = q->bfirst;
450                 if (b == NULL)
451                         break;
452                 QDEBUG checkb(b, "qdiscard");
453                 n = BLEN(b);
454                 if (n <= len - sofar) {
455                         q->bfirst = b->next;
456                         b->next = 0;
457                         q->len -= BALLOC(b);
458                         q->dlen -= BLEN(b);
459                         freeb(b);
460                 } else {
461                         n = len - sofar;
462                         b->rp += n;
463                         q->dlen -= n;
464                 }
465         }
466
467         /*
468          *  if writer flow controlled, restart
469          *
470          *  This used to be
471          *  q->len < q->limit/2
472          *  but it slows down tcp too much for certain write sizes.
473          *  I really don't understand it completely.  It may be
474          *  due to the queue draining so fast that the transmission
475          *  stalls waiting for the app to produce more data.  - presotto
476          */
477         if ((q->state & Qflow) && q->len < q->limit) {
478                 q->state &= ~Qflow;
479                 dowakeup = 1;
480         } else
481                 dowakeup = 0;
482
483         spin_unlock_irqsave(&q->lock);
484
485         if (dowakeup)
486                 rendez_wakeup(&q->wr);
487
488         return sofar;
489 }
490
491 /*
492  *  Interrupt level copy out of a queue, return # bytes copied.
493  */
494 int qconsume(struct queue *q, void *vp, int len)
495 {
496         struct block *b;
497         int n, dowakeup;
498         uint8_t *p = vp;
499         struct block *tofree = NULL;
500
501         /* sync with qwrite */
502         spin_lock_irqsave(&q->lock);
503
504         for (;;) {
505                 b = q->bfirst;
506                 if (b == 0) {
507                         q->state |= Qstarve;
508                         spin_unlock_irqsave(&q->lock);
509                         return -1;
510                 }
511                 QDEBUG checkb(b, "qconsume 1");
512
513                 n = BLEN(b);
514                 if (n > 0)
515                         break;
516                 q->bfirst = b->next;
517                 q->len -= BALLOC(b);
518
519                 /* remember to free this */
520                 b->next = tofree;
521                 tofree = b;
522         };
523
524         if (n < len)
525                 len = n;
526         memmove(p, b->rp, len);
527         consumecnt += n;
528         b->rp += len;
529         q->dlen -= len;
530
531         /* discard the block if we're done with it */
532         if ((q->state & Qmsg) || len == n) {
533                 q->bfirst = b->next;
534                 b->next = 0;
535                 q->len -= BALLOC(b);
536                 q->dlen -= BLEN(b);
537
538                 /* remember to free this */
539                 b->next = tofree;
540                 tofree = b;
541         }
542
543         /* if writer flow controlled, restart */
544         if ((q->state & Qflow) && q->len < q->limit / 2) {
545                 q->state &= ~Qflow;
546                 dowakeup = 1;
547         } else
548                 dowakeup = 0;
549
550         spin_unlock_irqsave(&q->lock);
551
552         if (dowakeup)
553                 rendez_wakeup(&q->wr);
554
555         if (tofree != NULL)
556                 freeblist(tofree);
557
558         return len;
559 }
560
561 int qpass(struct queue *q, struct block *b)
562 {
563         int dlen, len, dowakeup;
564
565         /* sync with qread */
566         dowakeup = 0;
567         spin_lock_irqsave(&q->lock);
568         if (q->len >= q->limit) {
569                 freeblist(b);
570                 spin_unlock_irqsave(&q->lock);
571                 return -1;
572         }
573         if (q->state & Qclosed) {
574                 len = blocklen(b);
575                 freeblist(b);
576                 spin_unlock_irqsave(&q->lock);
577                 return len;
578         }
579
580         /* add buffer to queue */
581         if (q->bfirst)
582                 q->blast->next = b;
583         else
584                 q->bfirst = b;
585         len = BALLOC(b);
586         dlen = BLEN(b);
587         QDEBUG checkb(b, "qpass");
588         while (b->next) {
589                 b = b->next;
590                 QDEBUG checkb(b, "qpass");
591                 len += BALLOC(b);
592                 dlen += BLEN(b);
593         }
594         q->blast = b;
595         q->len += len;
596         q->dlen += dlen;
597
598         if (q->len >= q->limit / 2)
599                 q->state |= Qflow;
600
601         if (q->state & Qstarve) {
602                 q->state &= ~Qstarve;
603                 dowakeup = 1;
604         }
605         spin_unlock_irqsave(&q->lock);
606
607         if (dowakeup)
608                 rendez_wakeup(&q->rr);
609
610         return len;
611 }
612
613 int qpassnolim(struct queue *q, struct block *b)
614 {
615         int dlen, len, dowakeup;
616
617         /* sync with qread */
618         dowakeup = 0;
619         spin_lock_irqsave(&q->lock);
620
621         if (q->state & Qclosed) {
622                 freeblist(b);
623                 spin_unlock_irqsave(&q->lock);
624                 return BALLOC(b);
625         }
626
627         /* add buffer to queue */
628         if (q->bfirst)
629                 q->blast->next = b;
630         else
631                 q->bfirst = b;
632         len = BALLOC(b);
633         dlen = BLEN(b);
634         QDEBUG checkb(b, "qpass");
635         while (b->next) {
636                 b = b->next;
637                 QDEBUG checkb(b, "qpass");
638                 len += BALLOC(b);
639                 dlen += BLEN(b);
640         }
641         q->blast = b;
642         q->len += len;
643         q->dlen += dlen;
644
645         if (q->len >= q->limit / 2)
646                 q->state |= Qflow;
647
648         if (q->state & Qstarve) {
649                 q->state &= ~Qstarve;
650                 dowakeup = 1;
651         }
652         spin_unlock_irqsave(&q->lock);
653
654         if (dowakeup)
655                 rendez_wakeup(&q->rr);
656
657         return len;
658 }
659
660 /*
661  *  if the allocated space is way out of line with the used
662  *  space, reallocate to a smaller block
663  */
664 struct block *packblock(struct block *bp)
665 {
666         struct block **l, *nbp;
667         int n;
668
669         for (l = &bp; *l; l = &(*l)->next) {
670                 nbp = *l;
671                 n = BLEN(nbp);
672                 if ((n << 2) < BALLOC(nbp)) {
673                         *l = allocb(n);
674                         memmove((*l)->wp, nbp->rp, n);
675                         (*l)->wp += n;
676                         (*l)->next = nbp->next;
677                         freeb(nbp);
678                 }
679         }
680
681         return bp;
682 }
683
684 int qproduce(struct queue *q, void *vp, int len)
685 {
686         struct block *b;
687         int dowakeup;
688         uint8_t *p = vp;
689
690         /* sync with qread */
691         dowakeup = 0;
692         spin_lock_irqsave(&q->lock);
693
694         /* no waiting receivers, room in buffer? */
695         if (q->len >= q->limit) {
696                 q->state |= Qflow;
697                 spin_unlock_irqsave(&q->lock);
698                 return -1;
699         }
700
701         /* save in buffer */
702         /* use Qcoalesce here to save storage */
703         // TODO: Consider removing the Qcoalesce flag and force a coalescing
704         // strategy by default.
705         b = q->blast;
706         if ((q->state & Qcoalesce) == 0 || q->bfirst == NULL
707                 || b->lim - b->wp < len) {
708                 /* need a new block */
709                 b = iallocb(len);
710                 if (b == 0) {
711                         spin_unlock_irqsave(&q->lock);
712                         return 0;
713                 }
714                 if (q->bfirst)
715                         q->blast->next = b;
716                 else
717                         q->bfirst = b;
718                 q->blast = b;
719                 /* b->next = 0; done by iallocb() */
720                 q->len += BALLOC(b);
721         }
722         memmove(b->wp, p, len);
723         producecnt += len;
724         b->wp += len;
725         q->dlen += len;
726         QDEBUG checkb(b, "qproduce");
727
728         if (q->state & Qstarve) {
729                 q->state &= ~Qstarve;
730                 dowakeup = 1;
731         }
732
733         if (q->len >= q->limit)
734                 q->state |= Qflow;
735         spin_unlock_irqsave(&q->lock);
736
737         if (dowakeup)
738                 rendez_wakeup(&q->rr);
739
740         return len;
741 }
742
743 /*
744  *  copy from offset in the queue
745  */
746 struct block *qcopy(struct queue *q, int len, uint32_t offset)
747 {
748         int sofar;
749         int n;
750         struct block *b, *nb;
751         uint8_t *p;
752
753         nb = allocb(len);
754
755         spin_lock_irqsave(&q->lock);
756
757         /* go to offset */
758         b = q->bfirst;
759         for (sofar = 0;; sofar += n) {
760                 if (b == NULL) {
761                         spin_unlock_irqsave(&q->lock);
762                         return nb;
763                 }
764                 n = BLEN(b);
765                 if (sofar + n > offset) {
766                         p = b->rp + offset - sofar;
767                         n -= offset - sofar;
768                         break;
769                 }
770                 QDEBUG checkb(b, "qcopy");
771                 b = b->next;
772         }
773
774         /* copy bytes from there */
775         for (sofar = 0; sofar < len;) {
776                 if (n > len - sofar)
777                         n = len - sofar;
778                 memmove(nb->wp, p, n);
779                 qcopycnt += n;
780                 sofar += n;
781                 nb->wp += n;
782                 b = b->next;
783                 if (b == NULL)
784                         break;
785                 n = BLEN(b);
786                 p = b->rp;
787         }
788         spin_unlock_irqsave(&q->lock);
789
790         return nb;
791 }
792
793 static void qinit_common(struct queue *q)
794 {
795         spinlock_init_irqsave(&q->lock);
796         qlock_init(&q->rlock);
797         qlock_init(&q->wlock);
798         rendez_init(&q->rr);
799         rendez_init(&q->wr);
800 }
801
802 /*
803  *  called by non-interrupt code
804  */
805 struct queue *qopen(int limit, int msg, void (*kick) (void *), void *arg)
806 {
807         struct queue *q;
808
809         q = kzmalloc(sizeof(struct queue), 0);
810         if (q == 0)
811                 return 0;
812         qinit_common(q);
813
814         q->limit = q->inilim = limit;
815         q->kick = kick;
816         q->arg = arg;
817         q->state = msg;
818         q->state |= Qstarve;
819         q->eof = 0;
820         q->noblock = 0;
821
822         return q;
823 }
824
825 /* open a queue to be bypassed */
826 struct queue *qbypass(void (*bypass) (void *, struct block *), void *arg)
827 {
828         struct queue *q;
829
830         q = kzmalloc(sizeof(struct queue), 0);
831         if (q == 0)
832                 return 0;
833         qinit_common(q);
834
835         q->limit = 0;
836         q->arg = arg;
837         q->bypass = bypass;
838         q->state = 0;
839
840         return q;
841 }
842
843 static int notempty(void *a)
844 {
845         struct queue *q = a;
846
847         return (q->state & Qclosed) || q->bfirst != 0;
848 }
849
850 /* wait for the queue to be non-empty or closed.
851  *
852  * called with q ilocked.  rendez may error out, back through the caller, with
853  * the irqsave lock unlocked.  */
854 static int qwait(struct queue *q)
855 {
856         /* wait for data */
857         for (;;) {
858                 if (q->bfirst != NULL)
859                         break;
860
861                 if (q->state & Qclosed) {
862                         if (++q->eof > 3)
863                                 return -1;
864                         if (*q->err && strcmp(q->err, Ehungup) != 0)
865                                 return -1;
866                         return 0;
867                 }
868
869                 q->state |= Qstarve;    /* flag requesting producer to wake me */
870                 spin_unlock_irqsave(&q->lock);
871                 /* may throw an error() */
872                 rendez_sleep(&q->rr, notempty, q);
873                 spin_lock_irqsave(&q->lock);
874         }
875         return 1;
876 }
877
878 /*
879  * add a block list to a queue
880  */
881 void qaddlist(struct queue *q, struct block *b)
882 {
883         /* queue the block */
884         if (q->bfirst)
885                 q->blast->next = b;
886         else
887                 q->bfirst = b;
888         q->len += blockalloclen(b);
889         q->dlen += blocklen(b);
890         while (b->next)
891                 b = b->next;
892         q->blast = b;
893 }
894
895 /*
896  *  called with q ilocked
897  */
898 struct block *qremove(struct queue *q)
899 {
900         struct block *b;
901
902         b = q->bfirst;
903         if (b == NULL)
904                 return NULL;
905         q->bfirst = b->next;
906         b->next = NULL;
907         q->dlen -= BLEN(b);
908         q->len -= BALLOC(b);
909         QDEBUG checkb(b, "qremove");
910         return b;
911 }
912
913 /*
914  *  copy the contents of a string of blocks into
915  *  memory.  emptied blocks are freed.  return
916  *  pointer to first unconsumed block.
917  */
918 struct block *bl2mem(uint8_t * p, struct block *b, int n)
919 {
920         int i;
921         struct block *next;
922
923         for (; b != NULL; b = next) {
924                 i = BLEN(b);
925                 if (i > n) {
926                         memmove(p, b->rp, n);
927                         b->rp += n;
928                         return b;
929                 }
930                 memmove(p, b->rp, i);
931                 n -= i;
932                 p += i;
933                 b->rp += i;
934                 next = b->next;
935                 freeb(b);
936         }
937         return NULL;
938 }
939
940 /*
941  *  copy the contents of memory into a string of blocks.
942  *  return NULL on error.
943  */
944 struct block *mem2bl(uint8_t * p, int len)
945 {
946         ERRSTACK(1);
947         int n;
948         struct block *b, *first, **l;
949
950         first = NULL;
951         l = &first;
952         if (waserror()) {
953                 freeblist(first);
954                 nexterror();
955         }
956         do {
957                 n = len;
958                 if (n > Maxatomic)
959                         n = Maxatomic;
960
961                 *l = b = allocb(n);
962                 memmove(b->wp, p, n);
963                 b->wp += n;
964                 p += n;
965                 len -= n;
966                 l = &b->next;
967         } while (len > 0);
968         poperror();
969
970         return first;
971 }
972
973 /*
974  *  put a block back to the front of the queue
975  *  called with q ilocked
976  */
977 void qputback(struct queue *q, struct block *b)
978 {
979         b->next = q->bfirst;
980         if (q->bfirst == NULL)
981                 q->blast = b;
982         q->bfirst = b;
983         q->len += BALLOC(b);
984         q->dlen += BLEN(b);
985 }
986
987 /*
988  *  flow control, get producer going again
989  *  called with q ilocked
990  */
991 static void qwakeup_iunlock(struct queue *q)
992 {
993         int dowakeup = 0;
994
995         /* if writer flow controlled, restart */
996         if ((q->state & Qflow) && q->len < q->limit / 2) {
997                 q->state &= ~Qflow;
998                 dowakeup = 1;
999         }
1000
1001         spin_unlock_irqsave(&q->lock);
1002
1003         /* wakeup flow controlled writers */
1004         if (dowakeup) {
1005                 if (q->kick)
1006                         q->kick(q->arg);
1007                 rendez_wakeup(&q->wr);
1008         }
1009 }
1010
1011 /*
1012  *  get next block from a queue (up to a limit)
1013  */
1014 struct block *qbread(struct queue *q, int len)
1015 {
1016         ERRSTACK(1);
1017         struct block *b, *nb;
1018         int n;
1019
1020         qlock(&q->rlock);
1021         if (waserror()) {
1022                 qunlock(&q->rlock);
1023                 nexterror();
1024         }
1025
1026         spin_lock_irqsave(&q->lock);
1027         switch (qwait(q)) {
1028                 case 0:
1029                         /* queue closed */
1030                         spin_unlock_irqsave(&q->lock);
1031                         qunlock(&q->rlock);
1032                         poperror();
1033                         return NULL;
1034                 case -1:
1035                         /* multiple reads on a closed queue */
1036                         spin_unlock_irqsave(&q->lock);
1037                         error(q->err);
1038         }
1039
1040         /* if we get here, there's at least one block in the queue */
1041         b = qremove(q);
1042         n = BLEN(b);
1043
1044         /* split block if it's too big and this is not a message queue */
1045         nb = b;
1046         if (n > len) {
1047                 if ((q->state & Qmsg) == 0) {
1048                         n -= len;
1049                         b = allocb(n);
1050                         memmove(b->wp, nb->rp + len, n);
1051                         b->wp += n;
1052                         qputback(q, b);
1053                 }
1054                 nb->wp = nb->rp + len;
1055         }
1056
1057         /* restart producer */
1058         qwakeup_iunlock(q);
1059
1060         poperror();
1061         qunlock(&q->rlock);
1062         return nb;
1063 }
1064
1065 /*
1066  *  read a queue.  if no data is queued, post a struct block
1067  *  and wait on its Rendez.
1068  */
1069 long qread(struct queue *q, void *vp, int len)
1070 {
1071         ERRSTACK(1);
1072         struct block *b, *first, **l;
1073         int m, n;
1074
1075         qlock(&q->rlock);
1076         if (waserror()) {
1077                 qunlock(&q->rlock);
1078                 nexterror();
1079         }
1080
1081         spin_lock_irqsave(&q->lock);
1082 again:
1083         switch (qwait(q)) {
1084                 case 0:
1085                         /* queue closed */
1086                         spin_unlock_irqsave(&q->lock);
1087                         qunlock(&q->rlock);
1088                         poperror();
1089                         return 0;
1090                 case -1:
1091                         /* multiple reads on a closed queue */
1092                         spin_unlock_irqsave(&q->lock);
1093                         error(q->err);
1094         }
1095
1096         /* if we get here, there's at least one block in the queue */
1097         // TODO: Consider removing the Qcoalesce flag and force a coalescing
1098         // strategy by default.
1099         if (q->state & Qcoalesce) {
1100                 /* when coalescing, 0 length blocks just go away */
1101                 b = q->bfirst;
1102                 if (BLEN(b) <= 0) {
1103                         freeb(qremove(q));
1104                         goto again;
1105                 }
1106
1107                 /*  grab the first block plus as many
1108                  *  following blocks as will completely
1109                  *  fit in the read.
1110                  */
1111                 n = 0;
1112                 l = &first;
1113                 m = BLEN(b);
1114                 for (;;) {
1115                         *l = qremove(q);
1116                         l = &b->next;
1117                         n += m;
1118
1119                         b = q->bfirst;
1120                         if (b == NULL)
1121                                 break;
1122                         m = BLEN(b);
1123                         if (n + m > len)
1124                                 break;
1125                 }
1126         } else {
1127                 first = qremove(q);
1128                 n = BLEN(first);
1129         }
1130
1131         /* copy to user space outside of the ilock */
1132         spin_unlock_irqsave(&q->lock);
1133         b = bl2mem(vp, first, len);
1134         spin_lock_irqsave(&q->lock);
1135
1136         /* take care of any left over partial block */
1137         if (b != NULL) {
1138                 n -= BLEN(b);
1139                 if (q->state & Qmsg)
1140                         freeb(b);
1141                 else
1142                         qputback(q, b);
1143         }
1144
1145         /* restart producer */
1146         qwakeup_iunlock(q);
1147
1148         poperror();
1149         qunlock(&q->rlock);
1150         return n;
1151 }
1152
1153 static int qnotfull(void *a)
1154 {
1155         struct queue *q = a;
1156
1157         return q->len < q->limit || (q->state & Qclosed);
1158 }
1159
1160 uint32_t noblockcnt;
1161
1162 /*
1163  *  add a block to a queue obeying flow control
1164  */
1165 long qbwrite(struct queue *q, struct block *b)
1166 {
1167         ERRSTACK(1);
1168         int n, dowakeup;
1169         volatile bool should_free_b = TRUE;
1170
1171         n = BLEN(b);
1172
1173         if (q->bypass) {
1174                 (*q->bypass) (q->arg, b);
1175                 return n;
1176         }
1177
1178         dowakeup = 0;
1179         qlock(&q->wlock);
1180         if (waserror()) {
1181                 if (b != NULL && should_free_b)
1182                         freeb(b);
1183                 qunlock(&q->wlock);
1184                 nexterror();
1185         }
1186
1187         spin_lock_irqsave(&q->lock);
1188
1189         /* give up if the queue is closed */
1190         if (q->state & Qclosed) {
1191                 spin_unlock_irqsave(&q->lock);
1192                 error(q->err);
1193         }
1194
1195         /* if nonblocking, don't queue over the limit */
1196         if (q->len >= q->limit) {
1197                 if (q->noblock) {
1198                         spin_unlock_irqsave(&q->lock);
1199                         freeb(b);
1200                         noblockcnt += n;
1201                         qunlock(&q->wlock);
1202                         poperror();
1203                         return n;
1204                 }
1205         }
1206
1207         /* queue the block */
1208         should_free_b = FALSE;
1209         if (q->bfirst)
1210                 q->blast->next = b;
1211         else
1212                 q->bfirst = b;
1213         q->blast = b;
1214         b->next = 0;
1215         q->len += BALLOC(b);
1216         q->dlen += n;
1217         QDEBUG checkb(b, "qbwrite");
1218         b = NULL;
1219
1220         /* make sure other end gets awakened */
1221         if (q->state & Qstarve) {
1222                 q->state &= ~Qstarve;
1223                 dowakeup = 1;
1224         }
1225         spin_unlock_irqsave(&q->lock);
1226
1227         /*  get output going again */
1228         if (q->kick && (dowakeup || (q->state & Qkick)))
1229                 q->kick(q->arg);
1230
1231         /* wakeup anyone consuming at the other end */
1232         if (dowakeup)
1233                 rendez_wakeup(&q->rr);
1234
1235         /*
1236          *  flow control, wait for queue to get below the limit
1237          *  before allowing the process to continue and queue
1238          *  more.  We do this here so that postnote can only
1239          *  interrupt us after the data has been queued.  This
1240          *  means that things like 9p flushes and ssl messages
1241          *  will not be disrupted by software interrupts.
1242          *
1243          *  Note - this is moderately dangerous since a process
1244          *  that keeps getting interrupted and rewriting will
1245          *  queue infinite crud.
1246          */
1247         for (;;) {
1248                 if (q->noblock || qnotfull(q))
1249                         break;
1250
1251                 spin_lock_irqsave(&q->lock);
1252                 q->state |= Qflow;
1253                 spin_unlock_irqsave(&q->lock);
1254                 rendez_sleep(&q->wr, qnotfull, q);
1255         }
1256
1257         qunlock(&q->wlock);
1258         poperror();
1259         return n;
1260 }
1261
1262 long qibwrite(struct queue *q, struct block *b)
1263 {
1264         int n, dowakeup;
1265
1266         dowakeup = 0;
1267
1268         n = BLEN(b);
1269
1270         spin_lock_irqsave(&q->lock);
1271
1272         QDEBUG checkb(b, "qibwrite");
1273         if (q->bfirst)
1274                 q->blast->next = b;
1275         else
1276                 q->bfirst = b;
1277         q->blast = b;
1278         q->len += BALLOC(b);
1279         q->dlen += n;
1280
1281         if (q->state & Qstarve) {
1282                 q->state &= ~Qstarve;
1283                 dowakeup = 1;
1284         }
1285
1286         spin_unlock_irqsave(&q->lock);
1287
1288         if (dowakeup) {
1289                 if (q->kick)
1290                         q->kick(q->arg);
1291                 rendez_wakeup(&q->rr);
1292         }
1293
1294         return n;
1295 }
1296
1297 /*
1298  *  write to a queue.  only Maxatomic bytes at a time is atomic.
1299  */
1300 int qwrite(struct queue *q, void *vp, int len)
1301 {
1302         ERRSTACK(1);
1303         int n, sofar;
1304         struct block *b;
1305         uint8_t *p = vp;
1306
1307         QDEBUG if (!islo())
1308                  printd("qwrite hi %p\n", getcallerpc(&q));
1309
1310         sofar = 0;
1311         do {
1312                 n = len - sofar;
1313                 if (n > Maxatomic)
1314                         n = Maxatomic;
1315
1316                 b = allocb(n);
1317                 if (waserror()) {
1318                         freeb(b);
1319                         nexterror();
1320                 }
1321                 memmove(b->wp, p + sofar, n);
1322                 poperror();
1323                 b->wp += n;
1324
1325                 qbwrite(q, b);
1326
1327                 sofar += n;
1328         } while (sofar < len && (q->state & Qmsg) == 0);
1329
1330         return len;
1331 }
1332
1333 /*
1334  *  used by print() to write to a queue.  Since we may be splhi or not in
1335  *  a process, don't qlock.
1336  */
1337 int qiwrite(struct queue *q, void *vp, int len)
1338 {
1339         int n, sofar, dowakeup;
1340         struct block *b;
1341         uint8_t *p = vp;
1342
1343         dowakeup = 0;
1344
1345         sofar = 0;
1346         do {
1347                 n = len - sofar;
1348                 if (n > Maxatomic)
1349                         n = Maxatomic;
1350
1351                 b = iallocb(n);
1352                 if (b == NULL)
1353                         break;
1354                 memmove(b->wp, p + sofar, n);
1355                 /* this adjusts BLEN to be n, or at least it should */
1356                 b->wp += n;
1357                 assert(n == BLEN(b));
1358                 qibwrite(q, b);
1359
1360                 sofar += n;
1361         } while (sofar < len && (q->state & Qmsg) == 0);
1362
1363         return sofar;
1364 }
1365
1366 /*
1367  *  be extremely careful when calling this,
1368  *  as there is no reference accounting
1369  */
1370 void qfree(struct queue *q)
1371 {
1372         qclose(q);
1373         kfree(q);
1374 }
1375
1376 /*
1377  *  Mark a queue as closed.  No further IO is permitted.
1378  *  All blocks are released.
1379  */
1380 void qclose(struct queue *q)
1381 {
1382         struct block *bfirst;
1383
1384         if (q == NULL)
1385                 return;
1386
1387         /* mark it */
1388         spin_lock_irqsave(&q->lock);
1389         q->state |= Qclosed;
1390         q->state &= ~(Qflow | Qstarve);
1391         strncpy(q->err, Ehungup, sizeof(q->err));
1392         bfirst = q->bfirst;
1393         q->bfirst = 0;
1394         q->len = 0;
1395         q->dlen = 0;
1396         q->noblock = 0;
1397         spin_unlock_irqsave(&q->lock);
1398
1399         /* free queued blocks */
1400         freeblist(bfirst);
1401
1402         /* wake up readers/writers */
1403         rendez_wakeup(&q->rr);
1404         rendez_wakeup(&q->wr);
1405 }
1406
1407 /*
1408  *  Mark a queue as closed.  Wakeup any readers.  Don't remove queued
1409  *  blocks.
1410  */
1411 void qhangup(struct queue *q, char *msg)
1412 {
1413         /* mark it */
1414         spin_lock_irqsave(&q->lock);
1415         q->state |= Qclosed;
1416         if (msg == 0 || *msg == 0)
1417                 strncpy(q->err, Ehungup, sizeof(q->err));
1418         else
1419                 strncpy(q->err, msg, ERRMAX - 1);
1420         spin_unlock_irqsave(&q->lock);
1421
1422         /* wake up readers/writers */
1423         rendez_wakeup(&q->rr);
1424         rendez_wakeup(&q->wr);
1425 }
1426
1427 /*
1428  *  return non-zero if the q is hungup
1429  */
1430 int qisclosed(struct queue *q)
1431 {
1432         return q->state & Qclosed;
1433 }
1434
1435 /*
1436  *  mark a queue as no longer hung up
1437  */
1438 void qreopen(struct queue *q)
1439 {
1440         spin_lock_irqsave(&q->lock);
1441         q->state &= ~Qclosed;
1442         q->state |= Qstarve;
1443         q->eof = 0;
1444         q->limit = q->inilim;
1445         spin_unlock_irqsave(&q->lock);
1446 }
1447
1448 /*
1449  *  return bytes queued
1450  */
1451 int qlen(struct queue *q)
1452 {
1453         return q->dlen;
1454 }
1455
1456 /*
1457  * return space remaining before flow control
1458  */
1459 int qwindow(struct queue *q)
1460 {
1461         int l;
1462
1463         l = q->limit - q->len;
1464         if (l < 0)
1465                 l = 0;
1466         return l;
1467 }
1468
1469 /*
1470  *  return true if we can read without blocking
1471  */
1472 int qcanread(struct queue *q)
1473 {
1474         return q->bfirst != 0;
1475 }
1476
1477 /*
1478  *  change queue limit
1479  */
1480 void qsetlimit(struct queue *q, int limit)
1481 {
1482         q->limit = limit;
1483 }
1484
1485 /*
1486  *  set blocking/nonblocking
1487  */
1488 void qnoblock(struct queue *q, int onoff)
1489 {
1490         q->noblock = onoff;
1491 }
1492
1493 /*
1494  *  flush the output queue
1495  */
1496 void qflush(struct queue *q)
1497 {
1498         struct block *bfirst;
1499
1500         /* mark it */
1501         spin_lock_irqsave(&q->lock);
1502         bfirst = q->bfirst;
1503         q->bfirst = 0;
1504         q->len = 0;
1505         q->dlen = 0;
1506         spin_unlock_irqsave(&q->lock);
1507
1508         /* free queued blocks */
1509         freeblist(bfirst);
1510
1511         /* wake up readers/writers */
1512         rendez_wakeup(&q->wr);
1513 }
1514
1515 int qfull(struct queue *q)
1516 {
1517         return q->state & Qflow;
1518 }
1519
1520 int qstate(struct queue *q)
1521 {
1522         return q->state;
1523 }
1524
1525 void qdump(struct queue *q)
1526 {
1527         if (q)
1528                 printk("q=%p bfirst=%p blast=%p len=%d dlen=%d limit=%d state=#%x\n",
1529                            q, q->bfirst, q->blast, q->len, q->dlen, q->limit, q->state);
1530 }