waserror() audit
[akaros.git] / kern / src / ns / qio.c
1 // INFERNO
2 #include <vfs.h>
3 #include <kfs.h>
4 #include <slab.h>
5 #include <kmalloc.h>
6 #include <kref.h>
7 #include <string.h>
8 #include <stdio.h>
9 #include <assert.h>
10 #include <error.h>
11 #include <cpio.h>
12 #include <pmap.h>
13 #include <smp.h>
14 #include <ip.h>
15
16 static uint32_t padblockcnt;
17 static uint32_t concatblockcnt;
18 static uint32_t pullupblockcnt;
19 static uint32_t copyblockcnt;
20 static uint32_t consumecnt;
21 static uint32_t producecnt;
22 static uint32_t qcopycnt;
23
24 static int debugging;
25
26 #define QDEBUG  if(0)
27
28 /*
29  *  IO queues
30  */
31
32 struct queue
33 {
34         spinlock_t lock;;
35
36         struct block*   bfirst;         /* buffer */
37         struct block*   blast;
38
39         int     len;            /* bytes allocated to queue */
40         int     dlen;           /* data bytes in queue */
41         int     limit;          /* max bytes in queue */
42         int     iNULLim;                /* initial limit */
43         int     state;
44         int     noblock;        /* true if writes return immediately when q full */
45         int     eof;            /* number of eofs read by user */
46
47         void    (*kick)(void*); /* restart output */
48         void    (*bypass)(void*, struct block*);        /* bypass queue altogether */
49         void*   arg;            /* argument to kick */
50
51         qlock_t rlock;          /* mutex for reading processes */
52         struct rendez   rr;             /* process waiting to read */
53         qlock_t wlock;          /* mutex for writing processes */
54         struct rendez   wr;             /* process waiting to write */
55
56         char    err[ERRMAX];
57 };
58
59 enum
60 {
61         Maxatomic       = 64*1024,
62 };
63
64 unsigned int    qiomaxatomic = Maxatomic;
65
66 void
67 ixsummary(void)
68 {
69         debugging ^= 1;
70         iallocsummary();
71         printd("pad %lud, concat %lud, pullup %lud, copy %lud\n",
72                 padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
73         printd("consume %lud, produce %lud, qcopy %lud\n",
74                 consumecnt, producecnt, qcopycnt);
75 }
76
77 /*
78  *  free a list of blocks
79  */
80 void
81 freeblist(struct block *b)
82 {
83         struct block *next;
84
85         for(; b != 0; b = next){
86                 next = b->next;
87                 b->next = 0;
88                 freeb(b);
89         }
90 }
91
92 /*
93  *  pad a block to the front (or the back if size is negative)
94  */
95 struct block*
96 padblock(struct block *bp, int size)
97 {
98         int n;
99         struct block *nbp;
100
101         QDEBUG checkb(bp, "padblock 1");
102         if(size >= 0){
103                 if(bp->rp - bp->base >= size){
104                         bp->rp -= size;
105                         return bp;
106                 }
107
108                 if(bp->next)
109                         panic("padblock 0x%luX", getcallerpc(&bp));
110                 n = BLEN(bp);
111                 padblockcnt++;
112                 nbp = allocb(size+n);
113                 nbp->rp += size;
114                 nbp->wp = nbp->rp;
115                 memmove(nbp->wp, bp->rp, n);
116                 nbp->wp += n;
117                 freeb(bp);
118                 nbp->rp -= size;
119         } else {
120                 size = -size;
121
122                 if(bp->next)
123                         panic("padblock 0x%luX", getcallerpc(&bp));
124
125                 if(bp->lim - bp->wp >= size)
126                         return bp;
127
128                 n = BLEN(bp);
129                 padblockcnt++;
130                 nbp = allocb(size+n);
131                 memmove(nbp->wp, bp->rp, n);
132                 nbp->wp += n;
133                 freeb(bp);
134         }
135         QDEBUG checkb(nbp, "padblock 1");
136         return nbp;
137 }
138
139 /*
140  *  return count of bytes in a string of blocks
141  */
142 int
143 blocklen(struct block *bp)
144 {
145         int len;
146
147         len = 0;
148         while(bp) {
149                 len += BLEN(bp);
150                 bp = bp->next;
151         }
152         return len;
153 }
154
155 /*
156  * return count of space in blocks
157  */
158 int
159 blockalloclen(struct block *bp)
160 {
161         int len;
162
163         len = 0;
164         while(bp) {
165                 len += BALLOC(bp);
166                 bp = bp->next;
167         }
168         return len;
169 }
170
171 /*
172  *  copy the  string of blocks into
173  *  a single block and free the string
174  */
175 struct block*
176 concatblock(struct block *bp)
177 {
178         int len;
179         struct block *nb, *f;
180
181         if(bp->next == 0)
182                 return bp;
183
184         nb = allocb(blocklen(bp));
185         for(f = bp; f; f = f->next) {
186                 len = BLEN(f);
187                 memmove(nb->wp, f->rp, len);
188                 nb->wp += len;
189         }
190         concatblockcnt += BLEN(nb);
191         freeblist(bp);
192         QDEBUG checkb(nb, "concatblock 1");
193         return nb;
194 }
195
196 /*
197  *  make sure the first block has at least n bytes
198  */
199 struct block*
200 pullupblock(struct block *bp, int n)
201 {
202         int i;
203         struct block *nbp;
204
205         /*
206          *  this should almost always be true, it's
207          *  just to avoid every caller checking.
208          */
209         if(BLEN(bp) >= n)
210                 return bp;
211
212         /*
213          *  if not enough room in the first block,
214          *  add another to the front of the list.
215          */
216         if(bp->lim - bp->rp < n){
217                 nbp = allocb(n);
218                 nbp->next = bp;
219                 bp = nbp;
220         }
221
222         /*
223          *  copy bytes from the trailing blocks into the first
224          */
225         n -= BLEN(bp);
226         while((nbp = bp->next)){
227                 i = BLEN(nbp);
228                 if(i > n) {
229                         memmove(bp->wp, nbp->rp, n);
230                         pullupblockcnt++;
231                         bp->wp += n;
232                         nbp->rp += n;
233                         QDEBUG checkb(bp, "pullupblock 1");
234                         return bp;
235                 }
236                 else {
237                         memmove(bp->wp, nbp->rp, i);
238                         pullupblockcnt++;
239                         bp->wp += i;
240                         bp->next = nbp->next;
241                         nbp->next = 0;
242                         freeb(nbp);
243                         n -= i;
244                         if(n == 0){
245                                 QDEBUG checkb(bp, "pullupblock 2");
246                                 return bp;
247                         }
248                 }
249         }
250         freeb(bp);
251         return 0;
252 }
253
254 /*
255  *  make sure the first block has at least n bytes
256  */
257 struct block*
258 pullupqueue(struct queue *q, int n)
259 {
260         struct block *b;
261
262         if((BLEN(q->bfirst) >= n))
263                 return q->bfirst;
264         q->bfirst = pullupblock(q->bfirst, n);
265         for(b = q->bfirst; b != NULL && b->next != NULL; b = b->next)
266                 ;
267         q->blast = b;
268         return q->bfirst;
269 }
270
271 /*
272  *  trim to len bytes starting at offset
273  */
274 struct block *
275 trimblock(struct block *bp, int offset, int len)
276 {
277         uint32_t l;
278         struct block *nb, *startb;
279
280         QDEBUG checkb(bp, "trimblock 1");
281         if(blocklen(bp) < offset+len) {
282                 freeblist(bp);
283                 return NULL;
284         }
285
286         while((l = BLEN(bp)) < offset) {
287                 offset -= l;
288                 nb = bp->next;
289                 bp->next = NULL;
290                 freeb(bp);
291                 bp = nb;
292         }
293
294         startb = bp;
295         bp->rp += offset;
296
297         while((l = BLEN(bp)) < len) {
298                 len -= l;
299                 bp = bp->next;
300         }
301
302         bp->wp -= (BLEN(bp) - len);
303
304         if(bp->next) {
305                 freeblist(bp->next);
306                 bp->next = NULL;
307         }
308
309         return startb;
310 }
311
312 /*
313  *  copy 'count' bytes into a new block
314  */
315 struct block*
316 copyblock(struct block *bp, int count)
317 {
318         int l;
319         struct block *nbp;
320
321         QDEBUG checkb(bp, "copyblock 0");
322         nbp = allocb(count);
323         for(; count > 0 && bp != 0; bp = bp->next){
324                 l = BLEN(bp);
325                 if(l > count)
326                         l = count;
327                 memmove(nbp->wp, bp->rp, l);
328                 nbp->wp += l;
329                 count -= l;
330         }
331         if(count > 0){
332                 memset(nbp->wp, 0, count);
333                 nbp->wp += count;
334         }
335         copyblockcnt++;
336         QDEBUG checkb(nbp, "copyblock 1");
337
338         return nbp;
339 }
340
341 struct block*
342 adjustblock(struct block* bp, int len)
343 {
344         int n;
345         struct block *nbp;
346
347         if(len < 0){
348                 freeb(bp);
349                 return NULL;
350         }
351
352         if(bp->rp+len > bp->lim){
353                 nbp = copyblock(bp, len);
354                 freeblist(bp);
355                 QDEBUG checkb(nbp, "adjustblock 1");
356
357                 return nbp;
358         }
359
360         n = BLEN(bp);
361         if(len > n)
362                 memset(bp->wp, 0, len-n);
363         bp->wp = bp->rp+len;
364         QDEBUG checkb(bp, "adjustblock 2");
365
366         return bp;
367 }
368
369
370 /*
371  *  throw away up to count bytes from a
372  *  list of blocks.  Return count of bytes
373  *  thrown away.
374  */
375 int
376 pullblock(struct block **bph, int count)
377 {
378         struct block *bp;
379         int n, bytes;
380
381         bytes = 0;
382         if(bph == NULL)
383                 return 0;
384
385         while(*bph != NULL && count != 0) {
386                 bp = *bph;
387                 n = BLEN(bp);
388                 if(count < n)
389                         n = count;
390                 bytes += n;
391                 count -= n;
392                 bp->rp += n;
393                 QDEBUG checkb(bp, "pullblock ");
394                 if(BLEN(bp) == 0) {
395                         *bph = bp->next;
396                         bp->next = NULL;
397                         freeb(bp);
398                 }
399         }
400         return bytes;
401 }
402
403 /*
404  *  get next block from a queue, return null if nothing there
405  */
406 struct block*
407 qget(struct queue *q)
408 {
409         int dowakeup;
410         struct block *b;
411
412         /* sync with qwrite */
413         spin_lock_irqsave(&q->lock);
414
415         b = q->bfirst;
416         if(b == NULL){
417                 q->state |= Qstarve;
418                 spin_unlock_irqsave(&q->lock);
419                 return NULL;
420         }
421         q->bfirst = b->next;
422         b->next = 0;
423         q->len -= BALLOC(b);
424         q->dlen -= BLEN(b);
425         QDEBUG checkb(b, "qget");
426
427         /* if writer flow controlled, restart */
428         if((q->state & Qflow) && q->len < q->limit/2){
429                 q->state &= ~Qflow;
430                 dowakeup = 1;
431         } else
432                 dowakeup = 0;
433
434         spin_unlock_irqsave(&q->lock);
435
436         if(dowakeup)
437                 rendez_wakeup(&q->wr);
438
439         return b;
440 }
441
442 /*
443  *  throw away the next 'len' bytes in the queue
444  * returning the number actually discarded
445  */
446 int
447 qdiscard(struct queue *q, int len)
448 {
449         struct block *b;
450         int dowakeup, n, sofar;
451
452         spin_lock_irqsave(&q->lock);
453         for(sofar = 0; sofar < len; sofar += n){
454                 b = q->bfirst;
455                 if(b == NULL)
456                         break;
457                 QDEBUG checkb(b, "qdiscard");
458                 n = BLEN(b);
459                 if(n <= len - sofar){
460                         q->bfirst = b->next;
461                         b->next = 0;
462                         q->len -= BALLOC(b);
463                         q->dlen -= BLEN(b);
464                         freeb(b);
465                 } else {
466                         n = len - sofar;
467                         b->rp += n;
468                         q->dlen -= n;
469                 }
470         }
471
472         /*
473          *  if writer flow controlled, restart
474          *
475          *  This used to be
476          *      q->len < q->limit/2
477          *  but it slows down tcp too much for certain write sizes.
478          *  I really don't understand it completely.  It may be
479          *  due to the queue draining so fast that the transmission
480          *  stalls waiting for the app to produce more data.  - presotto
481          */
482         if((q->state & Qflow) && q->len < q->limit){
483                 q->state &= ~Qflow;
484                 dowakeup = 1;
485         } else
486                 dowakeup = 0;
487
488         spin_unlock_irqsave(&q->lock);
489
490         if(dowakeup)
491                 rendez_wakeup(&q->wr);
492
493         return sofar;
494 }
495
496 /*
497  *  Interrupt level copy out of a queue, return # bytes copied.
498  */
499 int
500 qconsume(struct queue *q, void *vp, int len)
501 {
502         struct block *b;
503         int n, dowakeup;
504         uint8_t *p = vp;
505         struct block *tofree = NULL;
506
507         /* sync with qwrite */
508         spin_lock_irqsave(&q->lock);
509
510         for(;;) {
511                 b = q->bfirst;
512                 if(b == 0){
513                         q->state |= Qstarve;
514                         spin_unlock_irqsave(&q->lock);
515                         return -1;
516                 }
517                 QDEBUG checkb(b, "qconsume 1");
518
519                 n = BLEN(b);
520                 if(n > 0)
521                         break;
522                 q->bfirst = b->next;
523                 q->len -= BALLOC(b);
524
525                 /* remember to free this */
526                 b->next = tofree;
527                 tofree = b;
528         };
529
530         if(n < len)
531                 len = n;
532         memmove(p, b->rp, len);
533         consumecnt += n;
534         b->rp += len;
535         q->dlen -= len;
536
537         /* discard the block if we're done with it */
538         if((q->state & Qmsg) || len == n){
539                 q->bfirst = b->next;
540                 b->next = 0;
541                 q->len -= BALLOC(b);
542                 q->dlen -= BLEN(b);
543
544                 /* remember to free this */
545                 b->next = tofree;
546                 tofree = b;
547         }
548
549         /* if writer flow controlled, restart */
550         if((q->state & Qflow) && q->len < q->limit/2){
551                 q->state &= ~Qflow;
552                 dowakeup = 1;
553         } else
554                 dowakeup = 0;
555
556         spin_unlock_irqsave(&q->lock);
557
558         if(dowakeup)
559                 rendez_wakeup(&q->wr);
560
561         if(tofree != NULL)
562                 freeblist(tofree);
563
564         return len;
565 }
566
567 int
568 qpass(struct queue *q, struct block *b)
569 {
570         int dlen, len, dowakeup;
571
572         /* sync with qread */
573         dowakeup = 0;
574         spin_lock_irqsave(&q->lock);
575         if(q->len >= q->limit){
576                 freeblist(b);
577                 spin_unlock_irqsave(&q->lock);
578                 return -1;
579         }
580         if(q->state & Qclosed){
581                 len = blocklen(b);
582                 freeblist(b);
583                 spin_unlock_irqsave(&q->lock);
584                 return len;
585         }
586
587         /* add buffer to queue */
588         if(q->bfirst)
589                 q->blast->next = b;
590         else
591                 q->bfirst = b;
592         len = BALLOC(b);
593         dlen = BLEN(b);
594         QDEBUG checkb(b, "qpass");
595         while(b->next){
596                 b = b->next;
597                 QDEBUG checkb(b, "qpass");
598                 len += BALLOC(b);
599                 dlen += BLEN(b);
600         }
601         q->blast = b;
602         q->len += len;
603         q->dlen += dlen;
604
605         if(q->len >= q->limit/2)
606                 q->state |= Qflow;
607
608         if(q->state & Qstarve){
609                 q->state &= ~Qstarve;
610                 dowakeup = 1;
611         }
612         spin_unlock_irqsave(&q->lock);
613
614         if(dowakeup)
615                 rendez_wakeup(&q->rr);
616
617         return len;
618 }
619
620 int
621 qpassnolim(struct queue *q, struct block *b)
622 {
623         int dlen, len, dowakeup;
624
625         /* sync with qread */
626         dowakeup = 0;
627         spin_lock_irqsave(&q->lock);
628
629         if(q->state & Qclosed){
630                 freeblist(b);
631                 spin_unlock_irqsave(&q->lock);
632                 return BALLOC(b);
633         }
634
635         /* add buffer to queue */
636         if(q->bfirst)
637                 q->blast->next = b;
638         else
639                 q->bfirst = b;
640         len = BALLOC(b);
641         dlen = BLEN(b);
642         QDEBUG checkb(b, "qpass");
643         while(b->next){
644                 b = b->next;
645                 QDEBUG checkb(b, "qpass");
646                 len += BALLOC(b);
647                 dlen += BLEN(b);
648         }
649         q->blast = b;
650         q->len += len;
651         q->dlen += dlen;
652
653         if(q->len >= q->limit/2)
654                 q->state |= Qflow;
655
656         if(q->state & Qstarve){
657                 q->state &= ~Qstarve;
658                 dowakeup = 1;
659         }
660         spin_unlock_irqsave(&q->lock);
661
662         if(dowakeup)
663                 rendez_wakeup(&q->rr);
664
665         return len;
666 }
667
668 /*
669  *  if the allocated space is way out of line with the used
670  *  space, reallocate to a smaller block
671  */
672 struct block*
673 packblock(struct block *bp)
674 {
675         struct block **l, *nbp;
676         int n;
677
678         for(l = &bp; *l; l = &(*l)->next){
679                 nbp = *l;
680                 n = BLEN(nbp);
681                 if((n<<2) < BALLOC(nbp)){
682                         *l = allocb(n);
683                         memmove((*l)->wp, nbp->rp, n);
684                         (*l)->wp += n;
685                         (*l)->next = nbp->next;
686                         freeb(nbp);
687                 }
688         }
689
690         return bp;
691 }
692
693 int
694 qproduce(struct queue *q, void *vp, int len)
695 {
696         struct block *b;
697         int dowakeup;
698         uint8_t *p = vp;
699
700         /* sync with qread */
701         dowakeup = 0;
702         spin_lock_irqsave(&q->lock);
703
704         /* no waiting receivers, room in buffer? */
705         if(q->len >= q->limit){
706                 q->state |= Qflow;
707                 spin_unlock_irqsave(&q->lock);
708                 return -1;
709         }
710
711         /* save in buffer */
712         /* use Qcoalesce here to save storage */
713         b = q->blast;
714         if((q->state & Qcoalesce)==0 || q->bfirst==NULL || b->lim-b->wp < len){
715                 /* need a new block */
716                 b = iallocb(len);
717                 if(b == 0){
718                         spin_unlock_irqsave(&q->lock);
719                         return 0;
720                 }
721                 if(q->bfirst)
722                         q->blast->next = b;
723                 else
724                         q->bfirst = b;
725                 q->blast = b;
726                 /* b->next = 0; done by iallocb() */
727                 q->len += BALLOC(b);
728         }
729         memmove(b->wp, p, len);
730         producecnt += len;
731         b->wp += len;
732         q->dlen += len;
733         QDEBUG checkb(b, "qproduce");
734
735         if(q->state & Qstarve){
736                 q->state &= ~Qstarve;
737                 dowakeup = 1;
738         }
739
740         if(q->len >= q->limit)
741                 q->state |= Qflow;
742         spin_unlock_irqsave(&q->lock);
743
744         if(dowakeup)
745                 rendez_wakeup(&q->rr);
746
747         return len;
748 }
749
750 /*
751  *  copy from offset in the queue
752  */
753 struct block*
754 qcopy(struct queue *q, int len, uint32_t offset)
755 {
756         int sofar;
757         int n;
758         struct block *b, *nb;
759         uint8_t *p;
760
761         nb = allocb(len);
762
763         spin_lock_irqsave(&q->lock);
764
765         /* go to offset */
766         b = q->bfirst;
767         for(sofar = 0; ; sofar += n){
768                 if(b == NULL){
769                         spin_unlock_irqsave(&q->lock);
770                         return nb;
771                 }
772                 n = BLEN(b);
773                 if(sofar + n > offset){
774                         p = b->rp + offset - sofar;
775                         n -= offset - sofar;
776                         break;
777                 }
778                 QDEBUG checkb(b, "qcopy");
779                 b = b->next;
780         }
781
782         /* copy bytes from there */
783         for(sofar = 0; sofar < len;){
784                 if(n > len - sofar)
785                         n = len - sofar;
786                 memmove(nb->wp, p, n);
787                 qcopycnt += n;
788                 sofar += n;
789                 nb->wp += n;
790                 b = b->next;
791                 if(b == NULL)
792                         break;
793                 n = BLEN(b);
794                 p = b->rp;
795         }
796         spin_unlock_irqsave(&q->lock);
797
798         return nb;
799 }
800
801 /*
802  *  called by non-interrupt code
803  */
804 struct queue*
805 qopen(int limit, int msg, void (*kick)(void*), void *arg)
806 {
807         struct queue *q;
808
809         q = kzmalloc(sizeof(struct queue), 0);
810         if(q == 0)
811                 return 0;
812
813         q->limit = q->iNULLim = limit;
814         q->kick = kick;
815         q->arg = arg;
816         q->state = msg;
817         q->state |= Qstarve;
818         q->eof = 0;
819         q->noblock = 0;
820
821         return q;
822 }
823
824 /* open a queue to be bypassed */
825 struct queue*
826 qbypass(void (*bypass)(void*, struct block*), void *arg)
827 {
828         struct queue *q;
829
830         q = kzmalloc(sizeof(struct queue), 0);
831         if(q == 0)
832                 return 0;
833
834         q->limit = 0;
835         q->arg = arg;
836         q->bypass = bypass;
837         q->state = 0;
838
839         return q;
840 }
841
842 static int
843 notempty(void *a)
844 {
845         struct queue *q = a;
846
847         return (q->state & Qclosed) || q->bfirst != 0;
848 }
849
850 /*
851  *  wait for the queue to be non-empty or closed.
852  *  called with q ilocked.
853  */
854 static int
855 qwait(struct queue *q)
856 {
857         /* wait for data */
858         for(;;){
859                 if(q->bfirst != NULL)
860                         break;
861
862                 if(q->state & Qclosed){
863                         if(++q->eof > 3)
864                                 return -1;
865                         if(*q->err && strcmp(q->err, Ehungup) != 0)
866                                 return -1;
867                         return 0;
868                 }
869
870                 q->state |= Qstarve;    /* flag requesting producer to wake me */
871                 spin_unlock_irqsave(&q->lock);
872                 rendez_sleep(&q->rr, notempty, q);
873                 spin_lock_irqsave(&q->lock);
874         }
875         return 1;
876 }
877
878 /*
879  * add a block list to a queue
880  */
881 void
882 qaddlist(struct queue *q, struct block *b)
883 {
884         /* queue the block */
885         if(q->bfirst)
886                 q->blast->next = b;
887         else
888                 q->bfirst = b;
889         q->len += blockalloclen(b);
890         q->dlen += blocklen(b);
891         while(b->next)
892                 b = b->next;
893         q->blast = b;
894 }
895
896 /*
897  *  called with q ilocked
898  */
899 struct block*
900 qremove(struct queue *q)
901 {
902         struct block *b;
903
904         b = q->bfirst;
905         if(b == NULL)
906                 return NULL;
907         q->bfirst = b->next;
908         b->next = NULL;
909         q->dlen -= BLEN(b);
910         q->len -= BALLOC(b);
911         QDEBUG checkb(b, "qremove");
912         return b;
913 }
914
915 /*
916  *  copy the contents of a string of blocks into
917  *  memory.  emptied blocks are freed.  return
918  *  pointer to first unconsumed block.
919  */
920 struct block*
921 bl2mem(uint8_t *p, struct block *b, int n)
922 {
923         int i;
924         struct block *next;
925
926         for(; b != NULL; b = next){
927                 i = BLEN(b);
928                 if(i > n){
929                         memmove(p, b->rp, n);
930                         b->rp += n;
931                         return b;
932                 }
933                 memmove(p, b->rp, i);
934                 n -= i;
935                 p += i;
936                 b->rp += i;
937                 next = b->next;
938                 freeb(b);
939         }
940         return NULL;
941 }
942
943 /*
944  *  copy the contents of memory into a string of blocks.
945  *  return NULL on error.
946  */
947 struct block*
948 mem2bl(uint8_t *p, int len)
949 {
950         ERRSTACK(1);
951         int n;
952         struct block *b, *first, **l;
953
954         first = NULL;
955         l = &first;
956         if(waserror()){
957                 freeblist(first);
958                 nexterror();
959         }
960         do {
961                 n = len;
962                 if(n > Maxatomic)
963                         n = Maxatomic;
964
965                 *l = b = allocb(n);
966                 memmove(b->wp, p, n);
967                 b->wp += n;
968                 p += n;
969                 len -= n;
970                 l = &b->next;
971         } while(len > 0);
972         poperror();
973
974         return first;
975 }
976
977 /*
978  *  put a block back to the front of the queue
979  *  called with q ilocked
980  */
981 void
982 qputback(struct queue *q, struct block *b)
983 {
984         b->next = q->bfirst;
985         if(q->bfirst == NULL)
986                 q->blast = b;
987         q->bfirst = b;
988         q->len += BALLOC(b);
989         q->dlen += BLEN(b);
990 }
991
992 /*
993  *  flow control, get producer going again
994  *  called with q ilocked
995  */
996 static void
997 qwakeup_iunlock(struct queue *q)
998 {
999         int dowakeup = 0;
1000
1001         /* if writer flow controlled, restart */
1002         if((q->state & Qflow) && q->len < q->limit/2){
1003                 q->state &= ~Qflow;
1004                 dowakeup = 1;
1005         }
1006
1007         spin_unlock_irqsave(&q->lock);
1008
1009         /* wakeup flow controlled writers */
1010         if(dowakeup){
1011                 if(q->kick)
1012                         q->kick(q->arg);
1013                 rendez_wakeup(&q->wr);
1014         }
1015 }
1016
1017 /*
1018  *  get next block from a queue (up to a limit)
1019  */
1020 struct block*
1021 qbread(struct queue *q, int len)
1022 {
1023         ERRSTACK(1);
1024         struct block *b, *nb;
1025         int n;
1026
1027         qlock(&q->rlock);
1028         if(waserror()){
1029                 qunlock(&q->rlock);
1030                 nexterror();
1031         }
1032
1033         spin_lock_irqsave(&q->lock);
1034         switch(qwait(q)){
1035         case 0:
1036                 /* queue closed */
1037                 spin_unlock_irqsave(&q->lock);
1038                 qunlock(&q->rlock);
1039                 poperror();
1040                 return NULL;
1041         case -1:
1042                 /* multiple reads on a closed queue */
1043                 spin_unlock_irqsave(&q->lock);
1044                 error(q->err);
1045         }
1046
1047         /* if we get here, there's at least one block in the queue */
1048         b = qremove(q);
1049         n = BLEN(b);
1050
1051         /* split block if it's too big and this is not a message queue */
1052         nb = b;
1053         if(n > len){
1054                 if((q->state&Qmsg) == 0){
1055                         n -= len;
1056                         b = allocb(n);
1057                         memmove(b->wp, nb->rp+len, n);
1058                         b->wp += n;
1059                         qputback(q, b);
1060                 }
1061                 nb->wp = nb->rp + len;
1062         }
1063
1064         /* restart producer */
1065         qwakeup_iunlock(q);
1066
1067         poperror();
1068         qunlock(&q->rlock);
1069         return nb;
1070 }
1071
1072 /*
1073  *  read a queue.  if no data is queued, post a struct block
1074  *  and wait on its Rendez.
1075  */
1076 long
1077 qread(struct queue *q, void *vp, int len)
1078 {
1079         ERRSTACK(1);
1080         struct block *b, *first, **l;
1081         int m, n;
1082
1083         qlock(&q->rlock);
1084         if(waserror()){
1085                 qunlock(&q->rlock);
1086                 nexterror();
1087         }
1088
1089         spin_lock_irqsave(&q->lock);
1090 again:
1091         switch(qwait(q)){
1092         case 0:
1093                 /* queue closed */
1094                 spin_unlock_irqsave(&q->lock);
1095                 qunlock(&q->rlock);
1096                 poperror();
1097                 return 0;
1098         case -1:
1099                 /* multiple reads on a closed queue */
1100                 spin_unlock_irqsave(&q->lock);
1101                 error(q->err);
1102         }
1103
1104         /* if we get here, there's at least one block in the queue */
1105         if(q->state & Qcoalesce){
1106                 /* when coalescing, 0 length blocks just go away */
1107                 b = q->bfirst;
1108                 if(BLEN(b) <= 0){
1109                         freeb(qremove(q));
1110                         goto again;
1111                 }
1112
1113                 /*  grab the first block plus as many
1114                  *  following blocks as will completely
1115                  *  fit in the read.
1116                  */
1117                 n = 0;
1118                 l = &first;
1119                 m = BLEN(b);
1120                 for(;;) {
1121                         *l = qremove(q);
1122                         l = &b->next;
1123                         n += m;
1124
1125                         b = q->bfirst;
1126                         if(b == NULL)
1127                                 break;
1128                         m = BLEN(b);
1129                         if(n+m > len)
1130                                 break;
1131                 }
1132         } else {
1133                 first = qremove(q);
1134                 n = BLEN(first);
1135         }
1136
1137         /* copy to user space outside of the ilock */
1138         spin_unlock_irqsave(&q->lock);
1139         b = bl2mem(vp, first, len);
1140         spin_lock_irqsave(&q->lock);
1141
1142         /* take care of any left over partial block */
1143         if(b != NULL){
1144                 n -= BLEN(b);
1145                 if(q->state & Qmsg)
1146                         freeb(b);
1147                 else
1148                         qputback(q, b);
1149         }
1150
1151         /* restart producer */
1152         qwakeup_iunlock(q);
1153
1154         poperror();
1155         qunlock(&q->rlock);
1156         return n;
1157 }
1158
1159 static int
1160 qnotfull(void *a)
1161 {
1162         struct queue *q = a;
1163
1164         return q->len < q->limit || (q->state & Qclosed);
1165 }
1166
1167 uint32_t noblockcnt;
1168
1169 /*
1170  *  add a block to a queue obeying flow control
1171  */
1172 long
1173 qbwrite(struct queue *q, struct block *b)
1174 {
1175         ERRSTACK(1);
1176         int n, dowakeup;
1177
1178         n = BLEN(b);
1179
1180         if(q->bypass){
1181                 (*q->bypass)(q->arg, b);
1182                 return n;
1183         }
1184
1185         dowakeup = 0;
1186         qlock(&q->wlock);
1187         if(waserror()){
1188                 if(b != NULL)
1189                         freeb(b);
1190                 qunlock(&q->wlock);
1191                 nexterror();
1192         }
1193
1194         spin_lock_irqsave(&q->lock);
1195
1196         /* give up if the queue is closed */
1197         if(q->state & Qclosed){
1198                 spin_unlock_irqsave(&q->lock);
1199                 error(q->err);
1200         }
1201
1202         /* if nonblocking, don't queue over the limit */
1203         if(q->len >= q->limit){
1204                 if(q->noblock){
1205                         spin_unlock_irqsave(&q->lock);
1206                         freeb(b);
1207                         noblockcnt += n;
1208                         qunlock(&q->wlock);
1209                         poperror();
1210                         return n;
1211                 }
1212         }
1213
1214         /* queue the block */
1215         if(q->bfirst)
1216                 q->blast->next = b;
1217         else
1218                 q->bfirst = b;
1219         q->blast = b;
1220         b->next = 0;
1221         q->len += BALLOC(b);
1222         q->dlen += n;
1223         QDEBUG checkb(b, "qbwrite");
1224         b = NULL;
1225
1226         /* make sure other end gets awakened */
1227         if(q->state & Qstarve){
1228                 q->state &= ~Qstarve;
1229                 dowakeup = 1;
1230         }
1231         spin_unlock_irqsave(&q->lock);
1232
1233         /*  get output going again */
1234         if(q->kick && (dowakeup || (q->state&Qkick)))
1235                 q->kick(q->arg);
1236
1237         /* wakeup anyone consuming at the other end */
1238         if(dowakeup)
1239                 rendez_wakeup(&q->rr);
1240
1241         /*
1242          *  flow control, wait for queue to get below the limit
1243          *  before allowing the process to continue and queue
1244          *  more.  We do this here so that postnote can only
1245          *  interrupt us after the data has been queued.  This
1246          *  means that things like 9p flushes and ssl messages
1247          *  will not be disrupted by software interrupts.
1248          *
1249          *  Note - this is moderately dangerous since a process
1250          *  that keeps getting interrupted and rewriting will
1251          *  queue infinite crud.
1252          */
1253         for(;;){
1254                 if(q->noblock || qnotfull(q))
1255                         break;
1256
1257                 spin_lock_irqsave(&q->lock);
1258                 q->state |= Qflow;
1259                 spin_unlock_irqsave(&q->lock);
1260                 rendez_sleep(&q->wr, qnotfull, q);
1261         }
1262
1263         qunlock(&q->wlock);
1264         poperror();
1265         return n;
1266 }
1267
1268 /*
1269  *  write to a queue.  only Maxatomic bytes at a time is atomic.
1270  */
1271 int
1272 qwrite(struct queue *q, void *vp, int len)
1273 {
1274         ERRSTACK(1);
1275         int n, sofar;
1276         struct block *b;
1277         uint8_t *p = vp;
1278
1279         QDEBUG if(!islo())
1280                 printd("qwrite hi %lux\n", getcallerpc(&q));
1281
1282         sofar = 0;
1283         do {
1284                 n = len-sofar;
1285                 if(n > Maxatomic)
1286                         n = Maxatomic;
1287
1288                 b = allocb(n);
1289                 if(waserror()){
1290                         freeb(b);
1291                         nexterror();
1292                 }
1293                 memmove(b->wp, p+sofar, n);
1294                 poperror();
1295                 b->wp += n;
1296
1297                 qbwrite(q, b);
1298
1299                 sofar += n;
1300         } while(sofar < len && (q->state & Qmsg) == 0);
1301
1302         return len;
1303 }
1304
1305 /*
1306  *  used by print() to write to a queue.  Since we may be splhi or not in
1307  *  a process, don't qlock.
1308  */
1309 int
1310 qiwrite(struct queue *q, void *vp, int len)
1311 {
1312         int n, sofar, dowakeup;
1313         struct block *b;
1314         uint8_t *p = vp;
1315
1316         dowakeup = 0;
1317
1318         sofar = 0;
1319         do {
1320                 n = len-sofar;
1321                 if(n > Maxatomic)
1322                         n = Maxatomic;
1323
1324                 b = iallocb(n);
1325                 if(b == NULL)
1326                         break;
1327                 memmove(b->wp, p+sofar, n);
1328                 b->wp += n;
1329
1330                 spin_lock_irqsave(&q->lock);
1331
1332                 QDEBUG checkb(b, "qiwrite");
1333                 if(q->bfirst)
1334                         q->blast->next = b;
1335                 else
1336                         q->bfirst = b;
1337                 q->blast = b;
1338                 q->len += BALLOC(b);
1339                 q->dlen += n;
1340
1341                 if(q->state & Qstarve){
1342                         q->state &= ~Qstarve;
1343                         dowakeup = 1;
1344                 }
1345
1346                 spin_unlock_irqsave(&q->lock);
1347
1348                 if(dowakeup){
1349                         if(q->kick)
1350                                 q->kick(q->arg);
1351                         rendez_wakeup(&q->rr);
1352                 }
1353
1354                 sofar += n;
1355         } while(sofar < len && (q->state & Qmsg) == 0);
1356
1357         return sofar;
1358 }
1359
1360 /*
1361  *  be extremely careful when calling this,
1362  *  as there is no reference accounting
1363  */
1364 void
1365 qfree(struct queue *q)
1366 {
1367         qclose(q);
1368         kfree(q);
1369 }
1370
1371 /*
1372  *  Mark a queue as closed.  No further IO is permitted.
1373  *  All blocks are released.
1374  */
1375 void
1376 qclose(struct queue *q)
1377 {
1378         struct block *bfirst;
1379
1380         if(q == NULL)
1381                 return;
1382
1383         /* mark it */
1384         spin_lock_irqsave(&q->lock);
1385         q->state |= Qclosed;
1386         q->state &= ~(Qflow|Qstarve);
1387         strncpy(q->err,  Ehungup, sizeof(q->err));
1388         bfirst = q->bfirst;
1389         q->bfirst = 0;
1390         q->len = 0;
1391         q->dlen = 0;
1392         q->noblock = 0;
1393         spin_unlock_irqsave(&q->lock);
1394
1395         /* free queued blocks */
1396         freeblist(bfirst);
1397
1398         /* wake up readers/writers */
1399         rendez_wakeup(&q->rr);
1400         rendez_wakeup(&q->wr);
1401 }
1402
1403 /*
1404  *  Mark a queue as closed.  Wakeup any readers.  Don't remove queued
1405  *  blocks.
1406  */
1407 void
1408 qhangup(struct queue *q, char *msg)
1409 {
1410         /* mark it */
1411         spin_lock_irqsave(&q->lock);
1412         q->state |= Qclosed;
1413         if(msg == 0 || *msg == 0)
1414                 strncpy(q->err,  Ehungup, sizeof(q->err));
1415         else
1416                 strncpy(q->err, msg, ERRMAX-1);
1417         spin_unlock_irqsave(&q->lock);
1418
1419         /* wake up readers/writers */
1420         rendez_wakeup(&q->rr);
1421         rendez_wakeup(&q->wr);
1422 }
1423
1424 /*
1425  *  return non-zero if the q is hungup
1426  */
1427 int
1428 qisclosed(struct queue *q)
1429 {
1430         return q->state & Qclosed;
1431 }
1432
1433 /*
1434  *  mark a queue as no longer hung up
1435  */
1436 void
1437 qreopen(struct queue *q)
1438 {
1439         spin_lock_irqsave(&q->lock);
1440         q->state &= ~Qclosed;
1441         q->state |= Qstarve;
1442         q->eof = 0;
1443         q->limit = q->iNULLim;
1444         spin_unlock_irqsave(&q->lock);
1445 }
1446
1447 /*
1448  *  return bytes queued
1449  */
1450 int
1451 qlen(struct queue *q)
1452 {
1453         return q->dlen;
1454 }
1455
1456 /*
1457  * return space remaining before flow control
1458  */
1459 int
1460 qwindow(struct queue *q)
1461 {
1462         int l;
1463
1464         l = q->limit - q->len;
1465         if(l < 0)
1466                 l = 0;
1467         return l;
1468 }
1469
1470 /*
1471  *  return true if we can read without blocking
1472  */
1473 int
1474 qcanread(struct queue *q)
1475 {
1476         return q->bfirst!=0;
1477 }
1478
1479 /*
1480  *  change queue limit
1481  */
1482 void
1483 qsetlimit(struct queue *q, int limit)
1484 {
1485         q->limit = limit;
1486 }
1487
1488 /*
1489  *  set blocking/nonblocking
1490  */
1491 void
1492 qnoblock(struct queue *q, int onoff)
1493 {
1494         q->noblock = onoff;
1495 }
1496
1497 /*
1498  *  flush the output queue
1499  */
1500 void
1501 qflush(struct queue *q)
1502 {
1503         struct block *bfirst;
1504
1505         /* mark it */
1506         spin_lock_irqsave(&q->lock);
1507         bfirst = q->bfirst;
1508         q->bfirst = 0;
1509         q->len = 0;
1510         q->dlen = 0;
1511         spin_unlock_irqsave(&q->lock);
1512
1513         /* free queued blocks */
1514         freeblist(bfirst);
1515
1516         /* wake up readers/writers */
1517         rendez_wakeup(&q->wr);
1518 }
1519
1520 int
1521 qfull(struct queue *q)
1522 {
1523         return q->state & Qflow;
1524 }
1525
1526 int
1527 qstate(struct queue *q)
1528 {
1529         return q->state;
1530 }
1531
1532 void
1533 qdump(struct queue *q)
1534 {
1535         if(q)
1536         printk("q=%p bfirst=%p blast=%p len=%d dlen=%d limit=%d state=#%x\n",
1537                 q, q->bfirst, q->blast, q->len, q->dlen, q->limit, q->state);
1538 }