QIO and catching rendez_sleep()
[akaros.git] / kern / src / ns / qio.c
1 // INFERNO
2 #include <vfs.h>
3 #include <kfs.h>
4 #include <slab.h>
5 #include <kmalloc.h>
6 #include <kref.h>
7 #include <string.h>
8 #include <stdio.h>
9 #include <assert.h>
10 #include <error.h>
11 #include <cpio.h>
12 #include <pmap.h>
13 #include <smp.h>
14 #include <ip.h>
15
16 static uint32_t padblockcnt;
17 static uint32_t concatblockcnt;
18 static uint32_t pullupblockcnt;
19 static uint32_t copyblockcnt;
20 static uint32_t consumecnt;
21 static uint32_t producecnt;
22 static uint32_t qcopycnt;
23
24 static int debugging;
25
26 #define QDEBUG  if(0)
27
28 /*
29  *  IO queues
30  */
31
32 struct queue {
33         spinlock_t lock;;
34
35         struct block *bfirst;           /* buffer */
36         struct block *blast;
37
38         int len;                                        /* bytes allocated to queue */
39         int dlen;                                       /* data bytes in queue */
40         int limit;                                      /* max bytes in queue */
41         int iNULLim;                            /* initial limit */
42         int state;
43         int noblock;                            /* true if writes return immediately when q full */
44         int eof;                                        /* number of eofs read by user */
45
46         void (*kick) (void *);          /* restart output */
47         void (*bypass) (void *, struct block *);        /* bypass queue altogether */
48         void *arg;                                      /* argument to kick */
49
50         qlock_t rlock;                          /* mutex for reading processes */
51         struct rendez rr;                       /* process waiting to read */
52         qlock_t wlock;                          /* mutex for writing processes */
53         struct rendez wr;                       /* process waiting to write */
54
55         char err[ERRMAX];
56 };
57
58 enum {
59         Maxatomic = 64 * 1024,
60 };
61
62 unsigned int qiomaxatomic = Maxatomic;
63
64 void ixsummary(void)
65 {
66         debugging ^= 1;
67         iallocsummary();
68         printd("pad %lu, concat %lu, pullup %lu, copy %lu\n",
69                    padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
70         printd("consume %lu, produce %lu, qcopy %lu\n",
71                    consumecnt, producecnt, qcopycnt);
72 }
73
74 /*
75  *  free a list of blocks
76  */
77 void freeblist(struct block *b)
78 {
79         struct block *next;
80
81         for (; b != 0; b = next) {
82                 next = b->next;
83                 b->next = 0;
84                 freeb(b);
85         }
86 }
87
88 /*
89  *  pad a block to the front (or the back if size is negative)
90  */
91 struct block *padblock(struct block *bp, int size)
92 {
93         int n;
94         struct block *nbp;
95
96         QDEBUG checkb(bp, "padblock 1");
97         if (size >= 0) {
98                 if (bp->rp - bp->base >= size) {
99                         bp->rp -= size;
100                         return bp;
101                 }
102
103                 if (bp->next)
104                         panic("padblock %p", getcallerpc(&bp));
105                 n = BLEN(bp);
106                 padblockcnt++;
107                 nbp = allocb(size + n);
108                 nbp->rp += size;
109                 nbp->wp = nbp->rp;
110                 memmove(nbp->wp, bp->rp, n);
111                 nbp->wp += n;
112                 freeb(bp);
113                 nbp->rp -= size;
114         } else {
115                 size = -size;
116
117                 if (bp->next)
118                         panic("padblock %p", getcallerpc(&bp));
119
120                 if (bp->lim - bp->wp >= size)
121                         return bp;
122
123                 n = BLEN(bp);
124                 padblockcnt++;
125                 nbp = allocb(size + n);
126                 memmove(nbp->wp, bp->rp, n);
127                 nbp->wp += n;
128                 freeb(bp);
129         }
130         QDEBUG checkb(nbp, "padblock 1");
131         return nbp;
132 }
133
134 /*
135  *  return count of bytes in a string of blocks
136  */
137 int blocklen(struct block *bp)
138 {
139         int len;
140
141         len = 0;
142         while (bp) {
143                 len += BLEN(bp);
144                 bp = bp->next;
145         }
146         return len;
147 }
148
149 /*
150  * return count of space in blocks
151  */
152 int blockalloclen(struct block *bp)
153 {
154         int len;
155
156         len = 0;
157         while (bp) {
158                 len += BALLOC(bp);
159                 bp = bp->next;
160         }
161         return len;
162 }
163
164 /*
165  *  copy the  string of blocks into
166  *  a single block and free the string
167  */
168 struct block *concatblock(struct block *bp)
169 {
170         int len;
171         struct block *nb, *f;
172
173         if (bp->next == 0)
174                 return bp;
175
176         nb = allocb(blocklen(bp));
177         for (f = bp; f; f = f->next) {
178                 len = BLEN(f);
179                 memmove(nb->wp, f->rp, len);
180                 nb->wp += len;
181         }
182         concatblockcnt += BLEN(nb);
183         freeblist(bp);
184         QDEBUG checkb(nb, "concatblock 1");
185         return nb;
186 }
187
188 /*
189  *  make sure the first block has at least n bytes
190  */
191 struct block *pullupblock(struct block *bp, int n)
192 {
193         int i;
194         struct block *nbp;
195
196         /*
197          *  this should almost always be true, it's
198          *  just to avoid every caller checking.
199          */
200         if (BLEN(bp) >= n)
201                 return bp;
202
203         /*
204          *  if not enough room in the first block,
205          *  add another to the front of the list.
206          */
207         if (bp->lim - bp->rp < n) {
208                 nbp = allocb(n);
209                 nbp->next = bp;
210                 bp = nbp;
211         }
212
213         /*
214          *  copy bytes from the trailing blocks into the first
215          */
216         n -= BLEN(bp);
217         while ((nbp = bp->next)) {
218                 i = BLEN(nbp);
219                 if (i > n) {
220                         memmove(bp->wp, nbp->rp, n);
221                         pullupblockcnt++;
222                         bp->wp += n;
223                         nbp->rp += n;
224                         QDEBUG checkb(bp, "pullupblock 1");
225                         return bp;
226                 } else {
227                         memmove(bp->wp, nbp->rp, i);
228                         pullupblockcnt++;
229                         bp->wp += i;
230                         bp->next = nbp->next;
231                         nbp->next = 0;
232                         freeb(nbp);
233                         n -= i;
234                         if (n == 0) {
235                                 QDEBUG checkb(bp, "pullupblock 2");
236                                 return bp;
237                         }
238                 }
239         }
240         freeb(bp);
241         return 0;
242 }
243
244 /*
245  *  make sure the first block has at least n bytes
246  */
247 struct block *pullupqueue(struct queue *q, int n)
248 {
249         struct block *b;
250
251         if ((BLEN(q->bfirst) >= n))
252                 return q->bfirst;
253         q->bfirst = pullupblock(q->bfirst, n);
254         for (b = q->bfirst; b != NULL && b->next != NULL; b = b->next) ;
255         q->blast = b;
256         return q->bfirst;
257 }
258
259 /*
260  *  trim to len bytes starting at offset
261  */
262 struct block *trimblock(struct block *bp, int offset, int len)
263 {
264         uint32_t l;
265         struct block *nb, *startb;
266
267         QDEBUG checkb(bp, "trimblock 1");
268         if (blocklen(bp) < offset + len) {
269                 freeblist(bp);
270                 return NULL;
271         }
272
273         while ((l = BLEN(bp)) < offset) {
274                 offset -= l;
275                 nb = bp->next;
276                 bp->next = NULL;
277                 freeb(bp);
278                 bp = nb;
279         }
280
281         startb = bp;
282         bp->rp += offset;
283
284         while ((l = BLEN(bp)) < len) {
285                 len -= l;
286                 bp = bp->next;
287         }
288
289         bp->wp -= (BLEN(bp) - len);
290
291         if (bp->next) {
292                 freeblist(bp->next);
293                 bp->next = NULL;
294         }
295
296         return startb;
297 }
298
299 /*
300  *  copy 'count' bytes into a new block
301  */
302 struct block *copyblock(struct block *bp, int count)
303 {
304         int l;
305         struct block *nbp;
306
307         QDEBUG checkb(bp, "copyblock 0");
308         nbp = allocb(count);
309         for (; count > 0 && bp != 0; bp = bp->next) {
310                 l = BLEN(bp);
311                 if (l > count)
312                         l = count;
313                 memmove(nbp->wp, bp->rp, l);
314                 nbp->wp += l;
315                 count -= l;
316         }
317         if (count > 0) {
318                 memset(nbp->wp, 0, count);
319                 nbp->wp += count;
320         }
321         copyblockcnt++;
322         QDEBUG checkb(nbp, "copyblock 1");
323
324         return nbp;
325 }
326
327 struct block *adjustblock(struct block *bp, int len)
328 {
329         int n;
330         struct block *nbp;
331
332         if (len < 0) {
333                 freeb(bp);
334                 return NULL;
335         }
336
337         if (bp->rp + len > bp->lim) {
338                 nbp = copyblock(bp, len);
339                 freeblist(bp);
340                 QDEBUG checkb(nbp, "adjustblock 1");
341
342                 return nbp;
343         }
344
345         n = BLEN(bp);
346         if (len > n)
347                 memset(bp->wp, 0, len - n);
348         bp->wp = bp->rp + len;
349         QDEBUG checkb(bp, "adjustblock 2");
350
351         return bp;
352 }
353
354 /*
355  *  throw away up to count bytes from a
356  *  list of blocks.  Return count of bytes
357  *  thrown away.
358  */
359 int pullblock(struct block **bph, int count)
360 {
361         struct block *bp;
362         int n, bytes;
363
364         bytes = 0;
365         if (bph == NULL)
366                 return 0;
367
368         while (*bph != NULL && count != 0) {
369                 bp = *bph;
370                 n = BLEN(bp);
371                 if (count < n)
372                         n = count;
373                 bytes += n;
374                 count -= n;
375                 bp->rp += n;
376                 QDEBUG checkb(bp, "pullblock ");
377                 if (BLEN(bp) == 0) {
378                         *bph = bp->next;
379                         bp->next = NULL;
380                         freeb(bp);
381                 }
382         }
383         return bytes;
384 }
385
386 /*
387  *  get next block from a queue, return null if nothing there
388  */
389 struct block *qget(struct queue *q)
390 {
391         int dowakeup;
392         struct block *b;
393
394         /* sync with qwrite */
395         spin_lock_irqsave(&q->lock);
396
397         b = q->bfirst;
398         if (b == NULL) {
399                 q->state |= Qstarve;
400                 spin_unlock_irqsave(&q->lock);
401                 return NULL;
402         }
403         q->bfirst = b->next;
404         b->next = 0;
405         q->len -= BALLOC(b);
406         q->dlen -= BLEN(b);
407         QDEBUG checkb(b, "qget");
408
409         /* if writer flow controlled, restart */
410         if ((q->state & Qflow) && q->len < q->limit / 2) {
411                 q->state &= ~Qflow;
412                 dowakeup = 1;
413         } else
414                 dowakeup = 0;
415
416         spin_unlock_irqsave(&q->lock);
417
418         if (dowakeup)
419                 rendez_wakeup(&q->wr);
420
421         return b;
422 }
423
424 /*
425  *  throw away the next 'len' bytes in the queue
426  * returning the number actually discarded
427  */
428 int qdiscard(struct queue *q, int len)
429 {
430         struct block *b;
431         int dowakeup, n, sofar;
432
433         spin_lock_irqsave(&q->lock);
434         for (sofar = 0; sofar < len; sofar += n) {
435                 b = q->bfirst;
436                 if (b == NULL)
437                         break;
438                 QDEBUG checkb(b, "qdiscard");
439                 n = BLEN(b);
440                 if (n <= len - sofar) {
441                         q->bfirst = b->next;
442                         b->next = 0;
443                         q->len -= BALLOC(b);
444                         q->dlen -= BLEN(b);
445                         freeb(b);
446                 } else {
447                         n = len - sofar;
448                         b->rp += n;
449                         q->dlen -= n;
450                 }
451         }
452
453         /*
454          *  if writer flow controlled, restart
455          *
456          *  This used to be
457          *  q->len < q->limit/2
458          *  but it slows down tcp too much for certain write sizes.
459          *  I really don't understand it completely.  It may be
460          *  due to the queue draining so fast that the transmission
461          *  stalls waiting for the app to produce more data.  - presotto
462          */
463         if ((q->state & Qflow) && q->len < q->limit) {
464                 q->state &= ~Qflow;
465                 dowakeup = 1;
466         } else
467                 dowakeup = 0;
468
469         spin_unlock_irqsave(&q->lock);
470
471         if (dowakeup)
472                 rendez_wakeup(&q->wr);
473
474         return sofar;
475 }
476
477 /*
478  *  Interrupt level copy out of a queue, return # bytes copied.
479  */
480 int qconsume(struct queue *q, void *vp, int len)
481 {
482         struct block *b;
483         int n, dowakeup;
484         uint8_t *p = vp;
485         struct block *tofree = NULL;
486
487         /* sync with qwrite */
488         spin_lock_irqsave(&q->lock);
489
490         for (;;) {
491                 b = q->bfirst;
492                 if (b == 0) {
493                         q->state |= Qstarve;
494                         spin_unlock_irqsave(&q->lock);
495                         return -1;
496                 }
497                 QDEBUG checkb(b, "qconsume 1");
498
499                 n = BLEN(b);
500                 if (n > 0)
501                         break;
502                 q->bfirst = b->next;
503                 q->len -= BALLOC(b);
504
505                 /* remember to free this */
506                 b->next = tofree;
507                 tofree = b;
508         };
509
510         if (n < len)
511                 len = n;
512         memmove(p, b->rp, len);
513         consumecnt += n;
514         b->rp += len;
515         q->dlen -= len;
516
517         /* discard the block if we're done with it */
518         if ((q->state & Qmsg) || len == n) {
519                 q->bfirst = b->next;
520                 b->next = 0;
521                 q->len -= BALLOC(b);
522                 q->dlen -= BLEN(b);
523
524                 /* remember to free this */
525                 b->next = tofree;
526                 tofree = b;
527         }
528
529         /* if writer flow controlled, restart */
530         if ((q->state & Qflow) && q->len < q->limit / 2) {
531                 q->state &= ~Qflow;
532                 dowakeup = 1;
533         } else
534                 dowakeup = 0;
535
536         spin_unlock_irqsave(&q->lock);
537
538         if (dowakeup)
539                 rendez_wakeup(&q->wr);
540
541         if (tofree != NULL)
542                 freeblist(tofree);
543
544         return len;
545 }
546
547 int qpass(struct queue *q, struct block *b)
548 {
549         int dlen, len, dowakeup;
550
551         /* sync with qread */
552         dowakeup = 0;
553         spin_lock_irqsave(&q->lock);
554         if (q->len >= q->limit) {
555                 freeblist(b);
556                 spin_unlock_irqsave(&q->lock);
557                 return -1;
558         }
559         if (q->state & Qclosed) {
560                 len = blocklen(b);
561                 freeblist(b);
562                 spin_unlock_irqsave(&q->lock);
563                 return len;
564         }
565
566         /* add buffer to queue */
567         if (q->bfirst)
568                 q->blast->next = b;
569         else
570                 q->bfirst = b;
571         len = BALLOC(b);
572         dlen = BLEN(b);
573         QDEBUG checkb(b, "qpass");
574         while (b->next) {
575                 b = b->next;
576                 QDEBUG checkb(b, "qpass");
577                 len += BALLOC(b);
578                 dlen += BLEN(b);
579         }
580         q->blast = b;
581         q->len += len;
582         q->dlen += dlen;
583
584         if (q->len >= q->limit / 2)
585                 q->state |= Qflow;
586
587         if (q->state & Qstarve) {
588                 q->state &= ~Qstarve;
589                 dowakeup = 1;
590         }
591         spin_unlock_irqsave(&q->lock);
592
593         if (dowakeup)
594                 rendez_wakeup(&q->rr);
595
596         return len;
597 }
598
599 int qpassnolim(struct queue *q, struct block *b)
600 {
601         int dlen, len, dowakeup;
602
603         /* sync with qread */
604         dowakeup = 0;
605         spin_lock_irqsave(&q->lock);
606
607         if (q->state & Qclosed) {
608                 freeblist(b);
609                 spin_unlock_irqsave(&q->lock);
610                 return BALLOC(b);
611         }
612
613         /* add buffer to queue */
614         if (q->bfirst)
615                 q->blast->next = b;
616         else
617                 q->bfirst = b;
618         len = BALLOC(b);
619         dlen = BLEN(b);
620         QDEBUG checkb(b, "qpass");
621         while (b->next) {
622                 b = b->next;
623                 QDEBUG checkb(b, "qpass");
624                 len += BALLOC(b);
625                 dlen += BLEN(b);
626         }
627         q->blast = b;
628         q->len += len;
629         q->dlen += dlen;
630
631         if (q->len >= q->limit / 2)
632                 q->state |= Qflow;
633
634         if (q->state & Qstarve) {
635                 q->state &= ~Qstarve;
636                 dowakeup = 1;
637         }
638         spin_unlock_irqsave(&q->lock);
639
640         if (dowakeup)
641                 rendez_wakeup(&q->rr);
642
643         return len;
644 }
645
646 /*
647  *  if the allocated space is way out of line with the used
648  *  space, reallocate to a smaller block
649  */
650 struct block *packblock(struct block *bp)
651 {
652         struct block **l, *nbp;
653         int n;
654
655         for (l = &bp; *l; l = &(*l)->next) {
656                 nbp = *l;
657                 n = BLEN(nbp);
658                 if ((n << 2) < BALLOC(nbp)) {
659                         *l = allocb(n);
660                         memmove((*l)->wp, nbp->rp, n);
661                         (*l)->wp += n;
662                         (*l)->next = nbp->next;
663                         freeb(nbp);
664                 }
665         }
666
667         return bp;
668 }
669
670 int qproduce(struct queue *q, void *vp, int len)
671 {
672         struct block *b;
673         int dowakeup;
674         uint8_t *p = vp;
675
676         /* sync with qread */
677         dowakeup = 0;
678         spin_lock_irqsave(&q->lock);
679
680         /* no waiting receivers, room in buffer? */
681         if (q->len >= q->limit) {
682                 q->state |= Qflow;
683                 spin_unlock_irqsave(&q->lock);
684                 return -1;
685         }
686
687         /* save in buffer */
688         /* use Qcoalesce here to save storage */
689         // TODO: Consider removing the Qcoalesce flag and force a coalescing
690         // strategy by default.
691         b = q->blast;
692         if ((q->state & Qcoalesce) == 0 || q->bfirst == NULL
693                 || b->lim - b->wp < len) {
694                 /* need a new block */
695                 b = iallocb(len);
696                 if (b == 0) {
697                         spin_unlock_irqsave(&q->lock);
698                         return 0;
699                 }
700                 if (q->bfirst)
701                         q->blast->next = b;
702                 else
703                         q->bfirst = b;
704                 q->blast = b;
705                 /* b->next = 0; done by iallocb() */
706                 q->len += BALLOC(b);
707         }
708         memmove(b->wp, p, len);
709         producecnt += len;
710         b->wp += len;
711         q->dlen += len;
712         QDEBUG checkb(b, "qproduce");
713
714         if (q->state & Qstarve) {
715                 q->state &= ~Qstarve;
716                 dowakeup = 1;
717         }
718
719         if (q->len >= q->limit)
720                 q->state |= Qflow;
721         spin_unlock_irqsave(&q->lock);
722
723         if (dowakeup)
724                 rendez_wakeup(&q->rr);
725
726         return len;
727 }
728
729 /*
730  *  copy from offset in the queue
731  */
732 struct block *qcopy(struct queue *q, int len, uint32_t offset)
733 {
734         int sofar;
735         int n;
736         struct block *b, *nb;
737         uint8_t *p;
738
739         nb = allocb(len);
740
741         spin_lock_irqsave(&q->lock);
742
743         /* go to offset */
744         b = q->bfirst;
745         for (sofar = 0;; sofar += n) {
746                 if (b == NULL) {
747                         spin_unlock_irqsave(&q->lock);
748                         return nb;
749                 }
750                 n = BLEN(b);
751                 if (sofar + n > offset) {
752                         p = b->rp + offset - sofar;
753                         n -= offset - sofar;
754                         break;
755                 }
756                 QDEBUG checkb(b, "qcopy");
757                 b = b->next;
758         }
759
760         /* copy bytes from there */
761         for (sofar = 0; sofar < len;) {
762                 if (n > len - sofar)
763                         n = len - sofar;
764                 memmove(nb->wp, p, n);
765                 qcopycnt += n;
766                 sofar += n;
767                 nb->wp += n;
768                 b = b->next;
769                 if (b == NULL)
770                         break;
771                 n = BLEN(b);
772                 p = b->rp;
773         }
774         spin_unlock_irqsave(&q->lock);
775
776         return nb;
777 }
778
779 static void qinit_common(struct queue *q)
780 {
781         spinlock_init_irqsave(&q->lock);
782         qlock_init(&q->rlock);
783         qlock_init(&q->wlock);
784         rendez_init(&q->rr);
785         rendez_init(&q->wr);
786 }
787
788 /*
789  *  called by non-interrupt code
790  */
791 struct queue *qopen(int limit, int msg, void (*kick) (void *), void *arg)
792 {
793         struct queue *q;
794
795         q = kzmalloc(sizeof(struct queue), 0);
796         if (q == 0)
797                 return 0;
798         qinit_common(q);
799
800         q->limit = q->iNULLim = limit;
801         q->kick = kick;
802         q->arg = arg;
803         q->state = msg;
804         q->state |= Qstarve;
805         q->eof = 0;
806         q->noblock = 0;
807
808         return q;
809 }
810
811 /* open a queue to be bypassed */
812 struct queue *qbypass(void (*bypass) (void *, struct block *), void *arg)
813 {
814         struct queue *q;
815
816         q = kzmalloc(sizeof(struct queue), 0);
817         if (q == 0)
818                 return 0;
819         qinit_common(q);
820
821         q->limit = 0;
822         q->arg = arg;
823         q->bypass = bypass;
824         q->state = 0;
825
826         return q;
827 }
828
829 static int notempty(void *a)
830 {
831         struct queue *q = a;
832
833         return (q->state & Qclosed) || q->bfirst != 0;
834 }
835
836 /* wait for the queue to be non-empty or closed.
837  *
838  * called with q ilocked.  rendez may error out, back through the caller, with
839  * the irqsave lock unlocked.  */
840 static int qwait(struct queue *q)
841 {
842         /* wait for data */
843         for (;;) {
844                 if (q->bfirst != NULL)
845                         break;
846
847                 if (q->state & Qclosed) {
848                         if (++q->eof > 3)
849                                 return -1;
850                         if (*q->err && strcmp(q->err, Ehungup) != 0)
851                                 return -1;
852                         return 0;
853                 }
854
855                 q->state |= Qstarve;    /* flag requesting producer to wake me */
856                 spin_unlock_irqsave(&q->lock);
857                 /* may throw an error() */
858                 rendez_sleep(&q->rr, notempty, q);
859                 spin_lock_irqsave(&q->lock);
860         }
861         return 1;
862 }
863
864 /*
865  * add a block list to a queue
866  */
867 void qaddlist(struct queue *q, struct block *b)
868 {
869         /* queue the block */
870         if (q->bfirst)
871                 q->blast->next = b;
872         else
873                 q->bfirst = b;
874         q->len += blockalloclen(b);
875         q->dlen += blocklen(b);
876         while (b->next)
877                 b = b->next;
878         q->blast = b;
879 }
880
881 /*
882  *  called with q ilocked
883  */
884 struct block *qremove(struct queue *q)
885 {
886         struct block *b;
887
888         b = q->bfirst;
889         if (b == NULL)
890                 return NULL;
891         q->bfirst = b->next;
892         b->next = NULL;
893         q->dlen -= BLEN(b);
894         q->len -= BALLOC(b);
895         QDEBUG checkb(b, "qremove");
896         return b;
897 }
898
899 /*
900  *  copy the contents of a string of blocks into
901  *  memory.  emptied blocks are freed.  return
902  *  pointer to first unconsumed block.
903  */
904 struct block *bl2mem(uint8_t * p, struct block *b, int n)
905 {
906         int i;
907         struct block *next;
908
909         for (; b != NULL; b = next) {
910                 i = BLEN(b);
911                 if (i > n) {
912                         memmove(p, b->rp, n);
913                         b->rp += n;
914                         return b;
915                 }
916                 memmove(p, b->rp, i);
917                 n -= i;
918                 p += i;
919                 b->rp += i;
920                 next = b->next;
921                 freeb(b);
922         }
923         return NULL;
924 }
925
926 /*
927  *  copy the contents of memory into a string of blocks.
928  *  return NULL on error.
929  */
930 struct block *mem2bl(uint8_t * p, int len)
931 {
932         ERRSTACK(1);
933         int n;
934         struct block *b, *first, **l;
935
936         first = NULL;
937         l = &first;
938         if (waserror()) {
939                 freeblist(first);
940                 nexterror();
941         }
942         do {
943                 n = len;
944                 if (n > Maxatomic)
945                         n = Maxatomic;
946
947                 *l = b = allocb(n);
948                 memmove(b->wp, p, n);
949                 b->wp += n;
950                 p += n;
951                 len -= n;
952                 l = &b->next;
953         } while (len > 0);
954         poperror();
955
956         return first;
957 }
958
959 /*
960  *  put a block back to the front of the queue
961  *  called with q ilocked
962  */
963 void qputback(struct queue *q, struct block *b)
964 {
965         b->next = q->bfirst;
966         if (q->bfirst == NULL)
967                 q->blast = b;
968         q->bfirst = b;
969         q->len += BALLOC(b);
970         q->dlen += BLEN(b);
971 }
972
973 /*
974  *  flow control, get producer going again
975  *  called with q ilocked
976  */
977 static void qwakeup_iunlock(struct queue *q)
978 {
979         int dowakeup = 0;
980
981         /* if writer flow controlled, restart */
982         if ((q->state & Qflow) && q->len < q->limit / 2) {
983                 q->state &= ~Qflow;
984                 dowakeup = 1;
985         }
986
987         spin_unlock_irqsave(&q->lock);
988
989         /* wakeup flow controlled writers */
990         if (dowakeup) {
991                 if (q->kick)
992                         q->kick(q->arg);
993                 rendez_wakeup(&q->wr);
994         }
995 }
996
997 /*
998  *  get next block from a queue (up to a limit)
999  */
1000 struct block *qbread(struct queue *q, int len)
1001 {
1002         ERRSTACK(1);
1003         struct block *b, *nb;
1004         int n;
1005
1006         qlock(&q->rlock);
1007         if (waserror()) {
1008                 qunlock(&q->rlock);
1009                 nexterror();
1010         }
1011
1012         spin_lock_irqsave(&q->lock);
1013         switch (qwait(q)) {
1014                 case 0:
1015                         /* queue closed */
1016                         spin_unlock_irqsave(&q->lock);
1017                         qunlock(&q->rlock);
1018                         poperror();
1019                         return NULL;
1020                 case -1:
1021                         /* multiple reads on a closed queue */
1022                         spin_unlock_irqsave(&q->lock);
1023                         error(q->err);
1024         }
1025
1026         /* if we get here, there's at least one block in the queue */
1027         b = qremove(q);
1028         n = BLEN(b);
1029
1030         /* split block if it's too big and this is not a message queue */
1031         nb = b;
1032         if (n > len) {
1033                 if ((q->state & Qmsg) == 0) {
1034                         n -= len;
1035                         b = allocb(n);
1036                         memmove(b->wp, nb->rp + len, n);
1037                         b->wp += n;
1038                         qputback(q, b);
1039                 }
1040                 nb->wp = nb->rp + len;
1041         }
1042
1043         /* restart producer */
1044         qwakeup_iunlock(q);
1045
1046         poperror();
1047         qunlock(&q->rlock);
1048         return nb;
1049 }
1050
1051 /*
1052  *  read a queue.  if no data is queued, post a struct block
1053  *  and wait on its Rendez.
1054  */
1055 long qread(struct queue *q, void *vp, int len)
1056 {
1057         ERRSTACK(1);
1058         struct block *b, *first, **l;
1059         int m, n;
1060
1061         qlock(&q->rlock);
1062         if (waserror()) {
1063                 qunlock(&q->rlock);
1064                 nexterror();
1065         }
1066
1067         spin_lock_irqsave(&q->lock);
1068 again:
1069         switch (qwait(q)) {
1070                 case 0:
1071                         /* queue closed */
1072                         spin_unlock_irqsave(&q->lock);
1073                         qunlock(&q->rlock);
1074                         poperror();
1075                         return 0;
1076                 case -1:
1077                         /* multiple reads on a closed queue */
1078                         spin_unlock_irqsave(&q->lock);
1079                         error(q->err);
1080         }
1081
1082         /* if we get here, there's at least one block in the queue */
1083         // TODO: Consider removing the Qcoalesce flag and force a coalescing
1084         // strategy by default.
1085         if (q->state & Qcoalesce) {
1086                 /* when coalescing, 0 length blocks just go away */
1087                 b = q->bfirst;
1088                 if (BLEN(b) <= 0) {
1089                         freeb(qremove(q));
1090                         goto again;
1091                 }
1092
1093                 /*  grab the first block plus as many
1094                  *  following blocks as will completely
1095                  *  fit in the read.
1096                  */
1097                 n = 0;
1098                 l = &first;
1099                 m = BLEN(b);
1100                 for (;;) {
1101                         *l = qremove(q);
1102                         l = &b->next;
1103                         n += m;
1104
1105                         b = q->bfirst;
1106                         if (b == NULL)
1107                                 break;
1108                         m = BLEN(b);
1109                         if (n + m > len)
1110                                 break;
1111                 }
1112         } else {
1113                 first = qremove(q);
1114                 n = BLEN(first);
1115         }
1116
1117         /* copy to user space outside of the ilock */
1118         spin_unlock_irqsave(&q->lock);
1119         b = bl2mem(vp, first, len);
1120         spin_lock_irqsave(&q->lock);
1121
1122         /* take care of any left over partial block */
1123         if (b != NULL) {
1124                 n -= BLEN(b);
1125                 if (q->state & Qmsg)
1126                         freeb(b);
1127                 else
1128                         qputback(q, b);
1129         }
1130
1131         /* restart producer */
1132         qwakeup_iunlock(q);
1133
1134         poperror();
1135         qunlock(&q->rlock);
1136         return n;
1137 }
1138
1139 static int qnotfull(void *a)
1140 {
1141         struct queue *q = a;
1142
1143         return q->len < q->limit || (q->state & Qclosed);
1144 }
1145
1146 uint32_t noblockcnt;
1147
1148 /*
1149  *  add a block to a queue obeying flow control
1150  */
1151 long qbwrite(struct queue *q, struct block *b)
1152 {
1153         ERRSTACK(1);
1154         int n, dowakeup;
1155         volatile bool should_free_b = TRUE;
1156
1157         n = BLEN(b);
1158
1159         if (q->bypass) {
1160                 (*q->bypass) (q->arg, b);
1161                 return n;
1162         }
1163
1164         dowakeup = 0;
1165         qlock(&q->wlock);
1166         if (waserror()) {
1167                 if (b != NULL && should_free_b)
1168                         freeb(b);
1169                 qunlock(&q->wlock);
1170                 nexterror();
1171         }
1172
1173         spin_lock_irqsave(&q->lock);
1174
1175         /* give up if the queue is closed */
1176         if (q->state & Qclosed) {
1177                 spin_unlock_irqsave(&q->lock);
1178                 error(q->err);
1179         }
1180
1181         /* if nonblocking, don't queue over the limit */
1182         if (q->len >= q->limit) {
1183                 if (q->noblock) {
1184                         spin_unlock_irqsave(&q->lock);
1185                         freeb(b);
1186                         noblockcnt += n;
1187                         qunlock(&q->wlock);
1188                         poperror();
1189                         return n;
1190                 }
1191         }
1192
1193         /* queue the block */
1194         should_free_b = FALSE;
1195         if (q->bfirst)
1196                 q->blast->next = b;
1197         else
1198                 q->bfirst = b;
1199         q->blast = b;
1200         b->next = 0;
1201         q->len += BALLOC(b);
1202         q->dlen += n;
1203         QDEBUG checkb(b, "qbwrite");
1204         b = NULL;
1205
1206         /* make sure other end gets awakened */
1207         if (q->state & Qstarve) {
1208                 q->state &= ~Qstarve;
1209                 dowakeup = 1;
1210         }
1211         spin_unlock_irqsave(&q->lock);
1212
1213         /*  get output going again */
1214         if (q->kick && (dowakeup || (q->state & Qkick)))
1215                 q->kick(q->arg);
1216
1217         /* wakeup anyone consuming at the other end */
1218         if (dowakeup)
1219                 rendez_wakeup(&q->rr);
1220
1221         /*
1222          *  flow control, wait for queue to get below the limit
1223          *  before allowing the process to continue and queue
1224          *  more.  We do this here so that postnote can only
1225          *  interrupt us after the data has been queued.  This
1226          *  means that things like 9p flushes and ssl messages
1227          *  will not be disrupted by software interrupts.
1228          *
1229          *  Note - this is moderately dangerous since a process
1230          *  that keeps getting interrupted and rewriting will
1231          *  queue infinite crud.
1232          */
1233         for (;;) {
1234                 if (q->noblock || qnotfull(q))
1235                         break;
1236
1237                 spin_lock_irqsave(&q->lock);
1238                 q->state |= Qflow;
1239                 spin_unlock_irqsave(&q->lock);
1240                 rendez_sleep(&q->wr, qnotfull, q);
1241         }
1242
1243         qunlock(&q->wlock);
1244         poperror();
1245         return n;
1246 }
1247
1248 /*
1249  *  write to a queue.  only Maxatomic bytes at a time is atomic.
1250  */
1251 int qwrite(struct queue *q, void *vp, int len)
1252 {
1253         ERRSTACK(1);
1254         int n, sofar;
1255         struct block *b;
1256         uint8_t *p = vp;
1257
1258         QDEBUG if (!islo())
1259                  printd("qwrite hi %p\n", getcallerpc(&q));
1260
1261         sofar = 0;
1262         do {
1263                 n = len - sofar;
1264                 if (n > Maxatomic)
1265                         n = Maxatomic;
1266
1267                 b = allocb(n);
1268                 if (waserror()) {
1269                         freeb(b);
1270                         nexterror();
1271                 }
1272                 memmove(b->wp, p + sofar, n);
1273                 poperror();
1274                 b->wp += n;
1275
1276                 qbwrite(q, b);
1277
1278                 sofar += n;
1279         } while (sofar < len && (q->state & Qmsg) == 0);
1280
1281         return len;
1282 }
1283
1284 /*
1285  *  used by print() to write to a queue.  Since we may be splhi or not in
1286  *  a process, don't qlock.
1287  */
1288 int qiwrite(struct queue *q, void *vp, int len)
1289 {
1290         int n, sofar, dowakeup;
1291         struct block *b;
1292         uint8_t *p = vp;
1293
1294         dowakeup = 0;
1295
1296         sofar = 0;
1297         do {
1298                 n = len - sofar;
1299                 if (n > Maxatomic)
1300                         n = Maxatomic;
1301
1302                 b = iallocb(n);
1303                 if (b == NULL)
1304                         break;
1305                 memmove(b->wp, p + sofar, n);
1306                 b->wp += n;
1307
1308                 spin_lock_irqsave(&q->lock);
1309
1310                 QDEBUG checkb(b, "qiwrite");
1311                 if (q->bfirst)
1312                         q->blast->next = b;
1313                 else
1314                         q->bfirst = b;
1315                 q->blast = b;
1316                 q->len += BALLOC(b);
1317                 q->dlen += n;
1318
1319                 if (q->state & Qstarve) {
1320                         q->state &= ~Qstarve;
1321                         dowakeup = 1;
1322                 }
1323
1324                 spin_unlock_irqsave(&q->lock);
1325
1326                 if (dowakeup) {
1327                         if (q->kick)
1328                                 q->kick(q->arg);
1329                         rendez_wakeup(&q->rr);
1330                 }
1331
1332                 sofar += n;
1333         } while (sofar < len && (q->state & Qmsg) == 0);
1334
1335         return sofar;
1336 }
1337
1338 /*
1339  *  be extremely careful when calling this,
1340  *  as there is no reference accounting
1341  */
1342 void qfree(struct queue *q)
1343 {
1344         qclose(q);
1345         kfree(q);
1346 }
1347
1348 /*
1349  *  Mark a queue as closed.  No further IO is permitted.
1350  *  All blocks are released.
1351  */
1352 void qclose(struct queue *q)
1353 {
1354         struct block *bfirst;
1355
1356         if (q == NULL)
1357                 return;
1358
1359         /* mark it */
1360         spin_lock_irqsave(&q->lock);
1361         q->state |= Qclosed;
1362         q->state &= ~(Qflow | Qstarve);
1363         strncpy(q->err, Ehungup, sizeof(q->err));
1364         bfirst = q->bfirst;
1365         q->bfirst = 0;
1366         q->len = 0;
1367         q->dlen = 0;
1368         q->noblock = 0;
1369         spin_unlock_irqsave(&q->lock);
1370
1371         /* free queued blocks */
1372         freeblist(bfirst);
1373
1374         /* wake up readers/writers */
1375         rendez_wakeup(&q->rr);
1376         rendez_wakeup(&q->wr);
1377 }
1378
1379 /*
1380  *  Mark a queue as closed.  Wakeup any readers.  Don't remove queued
1381  *  blocks.
1382  */
1383 void qhangup(struct queue *q, char *msg)
1384 {
1385         /* mark it */
1386         spin_lock_irqsave(&q->lock);
1387         q->state |= Qclosed;
1388         if (msg == 0 || *msg == 0)
1389                 strncpy(q->err, Ehungup, sizeof(q->err));
1390         else
1391                 strncpy(q->err, msg, ERRMAX - 1);
1392         spin_unlock_irqsave(&q->lock);
1393
1394         /* wake up readers/writers */
1395         rendez_wakeup(&q->rr);
1396         rendez_wakeup(&q->wr);
1397 }
1398
1399 /*
1400  *  return non-zero if the q is hungup
1401  */
1402 int qisclosed(struct queue *q)
1403 {
1404         return q->state & Qclosed;
1405 }
1406
1407 /*
1408  *  mark a queue as no longer hung up
1409  */
1410 void qreopen(struct queue *q)
1411 {
1412         spin_lock_irqsave(&q->lock);
1413         q->state &= ~Qclosed;
1414         q->state |= Qstarve;
1415         q->eof = 0;
1416         q->limit = q->iNULLim;
1417         spin_unlock_irqsave(&q->lock);
1418 }
1419
1420 /*
1421  *  return bytes queued
1422  */
1423 int qlen(struct queue *q)
1424 {
1425         return q->dlen;
1426 }
1427
1428 /*
1429  * return space remaining before flow control
1430  */
1431 int qwindow(struct queue *q)
1432 {
1433         int l;
1434
1435         l = q->limit - q->len;
1436         if (l < 0)
1437                 l = 0;
1438         return l;
1439 }
1440
1441 /*
1442  *  return true if we can read without blocking
1443  */
1444 int qcanread(struct queue *q)
1445 {
1446         return q->bfirst != 0;
1447 }
1448
1449 /*
1450  *  change queue limit
1451  */
1452 void qsetlimit(struct queue *q, int limit)
1453 {
1454         q->limit = limit;
1455 }
1456
1457 /*
1458  *  set blocking/nonblocking
1459  */
1460 void qnoblock(struct queue *q, int onoff)
1461 {
1462         q->noblock = onoff;
1463 }
1464
1465 /*
1466  *  flush the output queue
1467  */
1468 void qflush(struct queue *q)
1469 {
1470         struct block *bfirst;
1471
1472         /* mark it */
1473         spin_lock_irqsave(&q->lock);
1474         bfirst = q->bfirst;
1475         q->bfirst = 0;
1476         q->len = 0;
1477         q->dlen = 0;
1478         spin_unlock_irqsave(&q->lock);
1479
1480         /* free queued blocks */
1481         freeblist(bfirst);
1482
1483         /* wake up readers/writers */
1484         rendez_wakeup(&q->wr);
1485 }
1486
1487 int qfull(struct queue *q)
1488 {
1489         return q->state & Qflow;
1490 }
1491
1492 int qstate(struct queue *q)
1493 {
1494         return q->state;
1495 }
1496
1497 void qdump(struct queue *q)
1498 {
1499         if (q)
1500                 printk("q=%p bfirst=%p blast=%p len=%d dlen=%d limit=%d state=#%x\n",
1501                            q, q->bfirst, q->blast, q->len, q->dlen, q->limit, q->state);
1502 }