Fixes qio irqsave lock
[akaros.git] / kern / src / ns / qio.c
1 // INFERNO
2 #include <vfs.h>
3 #include <kfs.h>
4 #include <slab.h>
5 #include <kmalloc.h>
6 #include <kref.h>
7 #include <string.h>
8 #include <stdio.h>
9 #include <assert.h>
10 #include <error.h>
11 #include <cpio.h>
12 #include <pmap.h>
13 #include <smp.h>
14 #include <ip.h>
15
16 static uint32_t padblockcnt;
17 static uint32_t concatblockcnt;
18 static uint32_t pullupblockcnt;
19 static uint32_t copyblockcnt;
20 static uint32_t consumecnt;
21 static uint32_t producecnt;
22 static uint32_t qcopycnt;
23
24 static int debugging;
25
26 #define QDEBUG  if(0)
27
28 /*
29  *  IO queues
30  */
31
32 struct queue
33 {
34         spinlock_t lock;;
35
36         struct block*   bfirst;         /* buffer */
37         struct block*   blast;
38
39         int     len;            /* bytes allocated to queue */
40         int     dlen;           /* data bytes in queue */
41         int     limit;          /* max bytes in queue */
42         int     iNULLim;                /* initial limit */
43         int     state;
44         int     noblock;        /* true if writes return immediately when q full */
45         int     eof;            /* number of eofs read by user */
46
47         void    (*kick)(void*); /* restart output */
48         void    (*bypass)(void*, struct block*);        /* bypass queue altogether */
49         void*   arg;            /* argument to kick */
50
51         qlock_t rlock;          /* mutex for reading processes */
52         struct rendez   rr;             /* process waiting to read */
53         qlock_t wlock;          /* mutex for writing processes */
54         struct rendez   wr;             /* process waiting to write */
55
56         char    err[ERRMAX];
57 };
58
59 enum
60 {
61         Maxatomic       = 64*1024,
62 };
63
64 unsigned int    qiomaxatomic = Maxatomic;
65
66 void
67 ixsummary(void)
68 {
69         debugging ^= 1;
70         iallocsummary();
71         printd("pad %lu, concat %lu, pullup %lu, copy %lu\n",
72                 padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
73         printd("consume %lu, produce %lu, qcopy %lu\n",
74                 consumecnt, producecnt, qcopycnt);
75 }
76
77 /*
78  *  free a list of blocks
79  */
80 void
81 freeblist(struct block *b)
82 {
83         struct block *next;
84
85         for(; b != 0; b = next){
86                 next = b->next;
87                 b->next = 0;
88                 freeb(b);
89         }
90 }
91
92 /*
93  *  pad a block to the front (or the back if size is negative)
94  */
95 struct block*
96 padblock(struct block *bp, int size)
97 {
98         int n;
99         struct block *nbp;
100
101         QDEBUG checkb(bp, "padblock 1");
102         if(size >= 0){
103                 if(bp->rp - bp->base >= size){
104                         bp->rp -= size;
105                         return bp;
106                 }
107
108                 if(bp->next)
109                         panic("padblock %p", getcallerpc(&bp));
110                 n = BLEN(bp);
111                 padblockcnt++;
112                 nbp = allocb(size+n);
113                 nbp->rp += size;
114                 nbp->wp = nbp->rp;
115                 memmove(nbp->wp, bp->rp, n);
116                 nbp->wp += n;
117                 freeb(bp);
118                 nbp->rp -= size;
119         } else {
120                 size = -size;
121
122                 if(bp->next)
123                         panic("padblock %p", getcallerpc(&bp));
124
125                 if(bp->lim - bp->wp >= size)
126                         return bp;
127
128                 n = BLEN(bp);
129                 padblockcnt++;
130                 nbp = allocb(size+n);
131                 memmove(nbp->wp, bp->rp, n);
132                 nbp->wp += n;
133                 freeb(bp);
134         }
135         QDEBUG checkb(nbp, "padblock 1");
136         return nbp;
137 }
138
139 /*
140  *  return count of bytes in a string of blocks
141  */
142 int
143 blocklen(struct block *bp)
144 {
145         int len;
146
147         len = 0;
148         while(bp) {
149                 len += BLEN(bp);
150                 bp = bp->next;
151         }
152         return len;
153 }
154
155 /*
156  * return count of space in blocks
157  */
158 int
159 blockalloclen(struct block *bp)
160 {
161         int len;
162
163         len = 0;
164         while(bp) {
165                 len += BALLOC(bp);
166                 bp = bp->next;
167         }
168         return len;
169 }
170
171 /*
172  *  copy the  string of blocks into
173  *  a single block and free the string
174  */
175 struct block*
176 concatblock(struct block *bp)
177 {
178         int len;
179         struct block *nb, *f;
180
181         if(bp->next == 0)
182                 return bp;
183
184         nb = allocb(blocklen(bp));
185         for(f = bp; f; f = f->next) {
186                 len = BLEN(f);
187                 memmove(nb->wp, f->rp, len);
188                 nb->wp += len;
189         }
190         concatblockcnt += BLEN(nb);
191         freeblist(bp);
192         QDEBUG checkb(nb, "concatblock 1");
193         return nb;
194 }
195
196 /*
197  *  make sure the first block has at least n bytes
198  */
199 struct block*
200 pullupblock(struct block *bp, int n)
201 {
202         int i;
203         struct block *nbp;
204
205         /*
206          *  this should almost always be true, it's
207          *  just to avoid every caller checking.
208          */
209         if(BLEN(bp) >= n)
210                 return bp;
211
212         /*
213          *  if not enough room in the first block,
214          *  add another to the front of the list.
215          */
216         if(bp->lim - bp->rp < n){
217                 nbp = allocb(n);
218                 nbp->next = bp;
219                 bp = nbp;
220         }
221
222         /*
223          *  copy bytes from the trailing blocks into the first
224          */
225         n -= BLEN(bp);
226         while((nbp = bp->next)){
227                 i = BLEN(nbp);
228                 if(i > n) {
229                         memmove(bp->wp, nbp->rp, n);
230                         pullupblockcnt++;
231                         bp->wp += n;
232                         nbp->rp += n;
233                         QDEBUG checkb(bp, "pullupblock 1");
234                         return bp;
235                 }
236                 else {
237                         memmove(bp->wp, nbp->rp, i);
238                         pullupblockcnt++;
239                         bp->wp += i;
240                         bp->next = nbp->next;
241                         nbp->next = 0;
242                         freeb(nbp);
243                         n -= i;
244                         if(n == 0){
245                                 QDEBUG checkb(bp, "pullupblock 2");
246                                 return bp;
247                         }
248                 }
249         }
250         freeb(bp);
251         return 0;
252 }
253
254 /*
255  *  make sure the first block has at least n bytes
256  */
257 struct block*
258 pullupqueue(struct queue *q, int n)
259 {
260         struct block *b;
261
262         if((BLEN(q->bfirst) >= n))
263                 return q->bfirst;
264         q->bfirst = pullupblock(q->bfirst, n);
265         for(b = q->bfirst; b != NULL && b->next != NULL; b = b->next)
266                 ;
267         q->blast = b;
268         return q->bfirst;
269 }
270
271 /*
272  *  trim to len bytes starting at offset
273  */
274 struct block *
275 trimblock(struct block *bp, int offset, int len)
276 {
277         uint32_t l;
278         struct block *nb, *startb;
279
280         QDEBUG checkb(bp, "trimblock 1");
281         if(blocklen(bp) < offset+len) {
282                 freeblist(bp);
283                 return NULL;
284         }
285
286         while((l = BLEN(bp)) < offset) {
287                 offset -= l;
288                 nb = bp->next;
289                 bp->next = NULL;
290                 freeb(bp);
291                 bp = nb;
292         }
293
294         startb = bp;
295         bp->rp += offset;
296
297         while((l = BLEN(bp)) < len) {
298                 len -= l;
299                 bp = bp->next;
300         }
301
302         bp->wp -= (BLEN(bp) - len);
303
304         if(bp->next) {
305                 freeblist(bp->next);
306                 bp->next = NULL;
307         }
308
309         return startb;
310 }
311
312 /*
313  *  copy 'count' bytes into a new block
314  */
315 struct block*
316 copyblock(struct block *bp, int count)
317 {
318         int l;
319         struct block *nbp;
320
321         QDEBUG checkb(bp, "copyblock 0");
322         nbp = allocb(count);
323         for(; count > 0 && bp != 0; bp = bp->next){
324                 l = BLEN(bp);
325                 if(l > count)
326                         l = count;
327                 memmove(nbp->wp, bp->rp, l);
328                 nbp->wp += l;
329                 count -= l;
330         }
331         if(count > 0){
332                 memset(nbp->wp, 0, count);
333                 nbp->wp += count;
334         }
335         copyblockcnt++;
336         QDEBUG checkb(nbp, "copyblock 1");
337
338         return nbp;
339 }
340
341 struct block*
342 adjustblock(struct block* bp, int len)
343 {
344         int n;
345         struct block *nbp;
346
347         if(len < 0){
348                 freeb(bp);
349                 return NULL;
350         }
351
352         if(bp->rp+len > bp->lim){
353                 nbp = copyblock(bp, len);
354                 freeblist(bp);
355                 QDEBUG checkb(nbp, "adjustblock 1");
356
357                 return nbp;
358         }
359
360         n = BLEN(bp);
361         if(len > n)
362                 memset(bp->wp, 0, len-n);
363         bp->wp = bp->rp+len;
364         QDEBUG checkb(bp, "adjustblock 2");
365
366         return bp;
367 }
368
369
370 /*
371  *  throw away up to count bytes from a
372  *  list of blocks.  Return count of bytes
373  *  thrown away.
374  */
375 int
376 pullblock(struct block **bph, int count)
377 {
378         struct block *bp;
379         int n, bytes;
380
381         bytes = 0;
382         if(bph == NULL)
383                 return 0;
384
385         while(*bph != NULL && count != 0) {
386                 bp = *bph;
387                 n = BLEN(bp);
388                 if(count < n)
389                         n = count;
390                 bytes += n;
391                 count -= n;
392                 bp->rp += n;
393                 QDEBUG checkb(bp, "pullblock ");
394                 if(BLEN(bp) == 0) {
395                         *bph = bp->next;
396                         bp->next = NULL;
397                         freeb(bp);
398                 }
399         }
400         return bytes;
401 }
402
403 /*
404  *  get next block from a queue, return null if nothing there
405  */
406 struct block*
407 qget(struct queue *q)
408 {
409         int dowakeup;
410         struct block *b;
411
412         /* sync with qwrite */
413         spin_lock_irqsave(&q->lock);
414
415         b = q->bfirst;
416         if(b == NULL){
417                 q->state |= Qstarve;
418                 spin_unlock_irqsave(&q->lock);
419                 return NULL;
420         }
421         q->bfirst = b->next;
422         b->next = 0;
423         q->len -= BALLOC(b);
424         q->dlen -= BLEN(b);
425         QDEBUG checkb(b, "qget");
426
427         /* if writer flow controlled, restart */
428         if((q->state & Qflow) && q->len < q->limit/2){
429                 q->state &= ~Qflow;
430                 dowakeup = 1;
431         } else
432                 dowakeup = 0;
433
434         spin_unlock_irqsave(&q->lock);
435
436         if(dowakeup)
437                 rendez_wakeup(&q->wr);
438
439         return b;
440 }
441
442 /*
443  *  throw away the next 'len' bytes in the queue
444  * returning the number actually discarded
445  */
446 int
447 qdiscard(struct queue *q, int len)
448 {
449         struct block *b;
450         int dowakeup, n, sofar;
451
452         spin_lock_irqsave(&q->lock);
453         for(sofar = 0; sofar < len; sofar += n){
454                 b = q->bfirst;
455                 if(b == NULL)
456                         break;
457                 QDEBUG checkb(b, "qdiscard");
458                 n = BLEN(b);
459                 if(n <= len - sofar){
460                         q->bfirst = b->next;
461                         b->next = 0;
462                         q->len -= BALLOC(b);
463                         q->dlen -= BLEN(b);
464                         freeb(b);
465                 } else {
466                         n = len - sofar;
467                         b->rp += n;
468                         q->dlen -= n;
469                 }
470         }
471
472         /*
473          *  if writer flow controlled, restart
474          *
475          *  This used to be
476          *      q->len < q->limit/2
477          *  but it slows down tcp too much for certain write sizes.
478          *  I really don't understand it completely.  It may be
479          *  due to the queue draining so fast that the transmission
480          *  stalls waiting for the app to produce more data.  - presotto
481          */
482         if((q->state & Qflow) && q->len < q->limit){
483                 q->state &= ~Qflow;
484                 dowakeup = 1;
485         } else
486                 dowakeup = 0;
487
488         spin_unlock_irqsave(&q->lock);
489
490         if(dowakeup)
491                 rendez_wakeup(&q->wr);
492
493         return sofar;
494 }
495
496 /*
497  *  Interrupt level copy out of a queue, return # bytes copied.
498  */
499 int
500 qconsume(struct queue *q, void *vp, int len)
501 {
502         struct block *b;
503         int n, dowakeup;
504         uint8_t *p = vp;
505         struct block *tofree = NULL;
506
507         /* sync with qwrite */
508         spin_lock_irqsave(&q->lock);
509
510         for(;;) {
511                 b = q->bfirst;
512                 if(b == 0){
513                         q->state |= Qstarve;
514                         spin_unlock_irqsave(&q->lock);
515                         return -1;
516                 }
517                 QDEBUG checkb(b, "qconsume 1");
518
519                 n = BLEN(b);
520                 if(n > 0)
521                         break;
522                 q->bfirst = b->next;
523                 q->len -= BALLOC(b);
524
525                 /* remember to free this */
526                 b->next = tofree;
527                 tofree = b;
528         };
529
530         if(n < len)
531                 len = n;
532         memmove(p, b->rp, len);
533         consumecnt += n;
534         b->rp += len;
535         q->dlen -= len;
536
537         /* discard the block if we're done with it */
538         if((q->state & Qmsg) || len == n){
539                 q->bfirst = b->next;
540                 b->next = 0;
541                 q->len -= BALLOC(b);
542                 q->dlen -= BLEN(b);
543
544                 /* remember to free this */
545                 b->next = tofree;
546                 tofree = b;
547         }
548
549         /* if writer flow controlled, restart */
550         if((q->state & Qflow) && q->len < q->limit/2){
551                 q->state &= ~Qflow;
552                 dowakeup = 1;
553         } else
554                 dowakeup = 0;
555
556         spin_unlock_irqsave(&q->lock);
557
558         if(dowakeup)
559                 rendez_wakeup(&q->wr);
560
561         if(tofree != NULL)
562                 freeblist(tofree);
563
564         return len;
565 }
566
567 int
568 qpass(struct queue *q, struct block *b)
569 {
570         int dlen, len, dowakeup;
571
572         /* sync with qread */
573         dowakeup = 0;
574         spin_lock_irqsave(&q->lock);
575         if(q->len >= q->limit){
576                 freeblist(b);
577                 spin_unlock_irqsave(&q->lock);
578                 return -1;
579         }
580         if(q->state & Qclosed){
581                 len = blocklen(b);
582                 freeblist(b);
583                 spin_unlock_irqsave(&q->lock);
584                 return len;
585         }
586
587         /* add buffer to queue */
588         if(q->bfirst)
589                 q->blast->next = b;
590         else
591                 q->bfirst = b;
592         len = BALLOC(b);
593         dlen = BLEN(b);
594         QDEBUG checkb(b, "qpass");
595         while(b->next){
596                 b = b->next;
597                 QDEBUG checkb(b, "qpass");
598                 len += BALLOC(b);
599                 dlen += BLEN(b);
600         }
601         q->blast = b;
602         q->len += len;
603         q->dlen += dlen;
604
605         if(q->len >= q->limit/2)
606                 q->state |= Qflow;
607
608         if(q->state & Qstarve){
609                 q->state &= ~Qstarve;
610                 dowakeup = 1;
611         }
612         spin_unlock_irqsave(&q->lock);
613
614         if(dowakeup)
615                 rendez_wakeup(&q->rr);
616
617         return len;
618 }
619
620 int
621 qpassnolim(struct queue *q, struct block *b)
622 {
623         int dlen, len, dowakeup;
624
625         /* sync with qread */
626         dowakeup = 0;
627         spin_lock_irqsave(&q->lock);
628
629         if(q->state & Qclosed){
630                 freeblist(b);
631                 spin_unlock_irqsave(&q->lock);
632                 return BALLOC(b);
633         }
634
635         /* add buffer to queue */
636         if(q->bfirst)
637                 q->blast->next = b;
638         else
639                 q->bfirst = b;
640         len = BALLOC(b);
641         dlen = BLEN(b);
642         QDEBUG checkb(b, "qpass");
643         while(b->next){
644                 b = b->next;
645                 QDEBUG checkb(b, "qpass");
646                 len += BALLOC(b);
647                 dlen += BLEN(b);
648         }
649         q->blast = b;
650         q->len += len;
651         q->dlen += dlen;
652
653         if(q->len >= q->limit/2)
654                 q->state |= Qflow;
655
656         if(q->state & Qstarve){
657                 q->state &= ~Qstarve;
658                 dowakeup = 1;
659         }
660         spin_unlock_irqsave(&q->lock);
661
662         if(dowakeup)
663                 rendez_wakeup(&q->rr);
664
665         return len;
666 }
667
668 /*
669  *  if the allocated space is way out of line with the used
670  *  space, reallocate to a smaller block
671  */
672 struct block*
673 packblock(struct block *bp)
674 {
675         struct block **l, *nbp;
676         int n;
677
678         for(l = &bp; *l; l = &(*l)->next){
679                 nbp = *l;
680                 n = BLEN(nbp);
681                 if((n<<2) < BALLOC(nbp)){
682                         *l = allocb(n);
683                         memmove((*l)->wp, nbp->rp, n);
684                         (*l)->wp += n;
685                         (*l)->next = nbp->next;
686                         freeb(nbp);
687                 }
688         }
689
690         return bp;
691 }
692
693 int
694 qproduce(struct queue *q, void *vp, int len)
695 {
696         struct block *b;
697         int dowakeup;
698         uint8_t *p = vp;
699
700         /* sync with qread */
701         dowakeup = 0;
702         spin_lock_irqsave(&q->lock);
703
704         /* no waiting receivers, room in buffer? */
705         if(q->len >= q->limit){
706                 q->state |= Qflow;
707                 spin_unlock_irqsave(&q->lock);
708                 return -1;
709         }
710
711         /* save in buffer */
712         /* use Qcoalesce here to save storage */
713         b = q->blast;
714         if((q->state & Qcoalesce)==0 || q->bfirst==NULL || b->lim-b->wp < len){
715                 /* need a new block */
716                 b = iallocb(len);
717                 if(b == 0){
718                         spin_unlock_irqsave(&q->lock);
719                         return 0;
720                 }
721                 if(q->bfirst)
722                         q->blast->next = b;
723                 else
724                         q->bfirst = b;
725                 q->blast = b;
726                 /* b->next = 0; done by iallocb() */
727                 q->len += BALLOC(b);
728         }
729         memmove(b->wp, p, len);
730         producecnt += len;
731         b->wp += len;
732         q->dlen += len;
733         QDEBUG checkb(b, "qproduce");
734
735         if(q->state & Qstarve){
736                 q->state &= ~Qstarve;
737                 dowakeup = 1;
738         }
739
740         if(q->len >= q->limit)
741                 q->state |= Qflow;
742         spin_unlock_irqsave(&q->lock);
743
744         if(dowakeup)
745                 rendez_wakeup(&q->rr);
746
747         return len;
748 }
749
750 /*
751  *  copy from offset in the queue
752  */
753 struct block*
754 qcopy(struct queue *q, int len, uint32_t offset)
755 {
756         int sofar;
757         int n;
758         struct block *b, *nb;
759         uint8_t *p;
760
761         nb = allocb(len);
762
763         spin_lock_irqsave(&q->lock);
764
765         /* go to offset */
766         b = q->bfirst;
767         for(sofar = 0; ; sofar += n){
768                 if(b == NULL){
769                         spin_unlock_irqsave(&q->lock);
770                         return nb;
771                 }
772                 n = BLEN(b);
773                 if(sofar + n > offset){
774                         p = b->rp + offset - sofar;
775                         n -= offset - sofar;
776                         break;
777                 }
778                 QDEBUG checkb(b, "qcopy");
779                 b = b->next;
780         }
781
782         /* copy bytes from there */
783         for(sofar = 0; sofar < len;){
784                 if(n > len - sofar)
785                         n = len - sofar;
786                 memmove(nb->wp, p, n);
787                 qcopycnt += n;
788                 sofar += n;
789                 nb->wp += n;
790                 b = b->next;
791                 if(b == NULL)
792                         break;
793                 n = BLEN(b);
794                 p = b->rp;
795         }
796         spin_unlock_irqsave(&q->lock);
797
798         return nb;
799 }
800
801 static void qinit_common(struct queue *q)
802 {
803         spinlock_init_irqsave(&q->lock);
804         qlock_init(&q->rlock);
805         qlock_init(&q->wlock);
806         rendez_init(&q->rr);
807         rendez_init(&q->wr);
808 }
809
810 /*
811  *  called by non-interrupt code
812  */
813 struct queue*
814 qopen(int limit, int msg, void (*kick)(void*), void *arg)
815 {
816         struct queue *q;
817
818         q = kzmalloc(sizeof(struct queue), 0);
819         if(q == 0)
820                 return 0;
821         qinit_common(q);
822
823         q->limit = q->iNULLim = limit;
824         q->kick = kick;
825         q->arg = arg;
826         q->state = msg;
827         q->state |= Qstarve;
828         q->eof = 0;
829         q->noblock = 0;
830
831         return q;
832 }
833
834 /* open a queue to be bypassed */
835 struct queue*
836 qbypass(void (*bypass)(void*, struct block*), void *arg)
837 {
838         struct queue *q;
839
840         q = kzmalloc(sizeof(struct queue), 0);
841         if(q == 0)
842                 return 0;
843         qinit_common(q);
844
845         q->limit = 0;
846         q->arg = arg;
847         q->bypass = bypass;
848         q->state = 0;
849
850         return q;
851 }
852
853 static int
854 notempty(void *a)
855 {
856         struct queue *q = a;
857
858         return (q->state & Qclosed) || q->bfirst != 0;
859 }
860
861 /*
862  *  wait for the queue to be non-empty or closed.
863  *  called with q ilocked.
864  */
865 static int
866 qwait(struct queue *q)
867 {
868         /* wait for data */
869         for(;;){
870                 if(q->bfirst != NULL)
871                         break;
872
873                 if(q->state & Qclosed){
874                         if(++q->eof > 3)
875                                 return -1;
876                         if(*q->err && strcmp(q->err, Ehungup) != 0)
877                                 return -1;
878                         return 0;
879                 }
880
881                 q->state |= Qstarve;    /* flag requesting producer to wake me */
882                 spin_unlock_irqsave(&q->lock);
883                 rendez_sleep(&q->rr, notempty, q);
884                 spin_lock_irqsave(&q->lock);
885         }
886         return 1;
887 }
888
889 /*
890  * add a block list to a queue
891  */
892 void
893 qaddlist(struct queue *q, struct block *b)
894 {
895         /* queue the block */
896         if(q->bfirst)
897                 q->blast->next = b;
898         else
899                 q->bfirst = b;
900         q->len += blockalloclen(b);
901         q->dlen += blocklen(b);
902         while(b->next)
903                 b = b->next;
904         q->blast = b;
905 }
906
907 /*
908  *  called with q ilocked
909  */
910 struct block*
911 qremove(struct queue *q)
912 {
913         struct block *b;
914
915         b = q->bfirst;
916         if(b == NULL)
917                 return NULL;
918         q->bfirst = b->next;
919         b->next = NULL;
920         q->dlen -= BLEN(b);
921         q->len -= BALLOC(b);
922         QDEBUG checkb(b, "qremove");
923         return b;
924 }
925
926 /*
927  *  copy the contents of a string of blocks into
928  *  memory.  emptied blocks are freed.  return
929  *  pointer to first unconsumed block.
930  */
931 struct block*
932 bl2mem(uint8_t *p, struct block *b, int n)
933 {
934         int i;
935         struct block *next;
936
937         for(; b != NULL; b = next){
938                 i = BLEN(b);
939                 if(i > n){
940                         memmove(p, b->rp, n);
941                         b->rp += n;
942                         return b;
943                 }
944                 memmove(p, b->rp, i);
945                 n -= i;
946                 p += i;
947                 b->rp += i;
948                 next = b->next;
949                 freeb(b);
950         }
951         return NULL;
952 }
953
954 /*
955  *  copy the contents of memory into a string of blocks.
956  *  return NULL on error.
957  */
958 struct block*
959 mem2bl(uint8_t *p, int len)
960 {
961         ERRSTACK(1);
962         int n;
963         struct block *b, *first, **l;
964
965         first = NULL;
966         l = &first;
967         if(waserror()){
968                 freeblist(first);
969                 nexterror();
970         }
971         do {
972                 n = len;
973                 if(n > Maxatomic)
974                         n = Maxatomic;
975
976                 *l = b = allocb(n);
977                 memmove(b->wp, p, n);
978                 b->wp += n;
979                 p += n;
980                 len -= n;
981                 l = &b->next;
982         } while(len > 0);
983         poperror();
984
985         return first;
986 }
987
988 /*
989  *  put a block back to the front of the queue
990  *  called with q ilocked
991  */
992 void
993 qputback(struct queue *q, struct block *b)
994 {
995         b->next = q->bfirst;
996         if(q->bfirst == NULL)
997                 q->blast = b;
998         q->bfirst = b;
999         q->len += BALLOC(b);
1000         q->dlen += BLEN(b);
1001 }
1002
1003 /*
1004  *  flow control, get producer going again
1005  *  called with q ilocked
1006  */
1007 static void
1008 qwakeup_iunlock(struct queue *q)
1009 {
1010         int dowakeup = 0;
1011
1012         /* if writer flow controlled, restart */
1013         if((q->state & Qflow) && q->len < q->limit/2){
1014                 q->state &= ~Qflow;
1015                 dowakeup = 1;
1016         }
1017
1018         spin_unlock_irqsave(&q->lock);
1019
1020         /* wakeup flow controlled writers */
1021         if(dowakeup){
1022                 if(q->kick)
1023                         q->kick(q->arg);
1024                 rendez_wakeup(&q->wr);
1025         }
1026 }
1027
1028 /*
1029  *  get next block from a queue (up to a limit)
1030  */
1031 struct block*
1032 qbread(struct queue *q, int len)
1033 {
1034         ERRSTACK(1);
1035         struct block *b, *nb;
1036         int n;
1037
1038         qlock(&q->rlock);
1039         if(waserror()){
1040                 qunlock(&q->rlock);
1041                 nexterror();
1042         }
1043
1044         spin_lock_irqsave(&q->lock);
1045         switch(qwait(q)){
1046         case 0:
1047                 /* queue closed */
1048                 spin_unlock_irqsave(&q->lock);
1049                 qunlock(&q->rlock);
1050                 poperror();
1051                 return NULL;
1052         case -1:
1053                 /* multiple reads on a closed queue */
1054                 spin_unlock_irqsave(&q->lock);
1055                 error(q->err);
1056         }
1057
1058         /* if we get here, there's at least one block in the queue */
1059         b = qremove(q);
1060         n = BLEN(b);
1061
1062         /* split block if it's too big and this is not a message queue */
1063         nb = b;
1064         if(n > len){
1065                 if((q->state&Qmsg) == 0){
1066                         n -= len;
1067                         b = allocb(n);
1068                         memmove(b->wp, nb->rp+len, n);
1069                         b->wp += n;
1070                         qputback(q, b);
1071                 }
1072                 nb->wp = nb->rp + len;
1073         }
1074
1075         /* restart producer */
1076         qwakeup_iunlock(q);
1077
1078         poperror();
1079         qunlock(&q->rlock);
1080         return nb;
1081 }
1082
1083 /*
1084  *  read a queue.  if no data is queued, post a struct block
1085  *  and wait on its Rendez.
1086  */
1087 long
1088 qread(struct queue *q, void *vp, int len)
1089 {
1090         ERRSTACK(1);
1091         struct block *b, *first, **l;
1092         int m, n;
1093
1094         qlock(&q->rlock);
1095         if(waserror()){
1096                 qunlock(&q->rlock);
1097                 nexterror();
1098         }
1099
1100         spin_lock_irqsave(&q->lock);
1101 again:
1102         switch(qwait(q)){
1103         case 0:
1104                 /* queue closed */
1105                 spin_unlock_irqsave(&q->lock);
1106                 qunlock(&q->rlock);
1107                 poperror();
1108                 return 0;
1109         case -1:
1110                 /* multiple reads on a closed queue */
1111                 spin_unlock_irqsave(&q->lock);
1112                 error(q->err);
1113         }
1114
1115         /* if we get here, there's at least one block in the queue */
1116         if(q->state & Qcoalesce){
1117                 /* when coalescing, 0 length blocks just go away */
1118                 b = q->bfirst;
1119                 if(BLEN(b) <= 0){
1120                         freeb(qremove(q));
1121                         goto again;
1122                 }
1123
1124                 /*  grab the first block plus as many
1125                  *  following blocks as will completely
1126                  *  fit in the read.
1127                  */
1128                 n = 0;
1129                 l = &first;
1130                 m = BLEN(b);
1131                 for(;;) {
1132                         *l = qremove(q);
1133                         l = &b->next;
1134                         n += m;
1135
1136                         b = q->bfirst;
1137                         if(b == NULL)
1138                                 break;
1139                         m = BLEN(b);
1140                         if(n+m > len)
1141                                 break;
1142                 }
1143         } else {
1144                 first = qremove(q);
1145                 n = BLEN(first);
1146         }
1147
1148         /* copy to user space outside of the ilock */
1149         spin_unlock_irqsave(&q->lock);
1150         b = bl2mem(vp, first, len);
1151         spin_lock_irqsave(&q->lock);
1152
1153         /* take care of any left over partial block */
1154         if(b != NULL){
1155                 n -= BLEN(b);
1156                 if(q->state & Qmsg)
1157                         freeb(b);
1158                 else
1159                         qputback(q, b);
1160         }
1161
1162         /* restart producer */
1163         qwakeup_iunlock(q);
1164
1165         poperror();
1166         qunlock(&q->rlock);
1167         return n;
1168 }
1169
1170 static int
1171 qnotfull(void *a)
1172 {
1173         struct queue *q = a;
1174
1175         return q->len < q->limit || (q->state & Qclosed);
1176 }
1177
1178 uint32_t noblockcnt;
1179
1180 /*
1181  *  add a block to a queue obeying flow control
1182  */
1183 long
1184 qbwrite(struct queue *q, struct block *b)
1185 {
1186         ERRSTACK(1);
1187         int n, dowakeup;
1188
1189         n = BLEN(b);
1190
1191         if(q->bypass){
1192                 (*q->bypass)(q->arg, b);
1193                 return n;
1194         }
1195
1196         dowakeup = 0;
1197         qlock(&q->wlock);
1198         if(waserror()){
1199                 if(b != NULL)
1200                         freeb(b);
1201                 qunlock(&q->wlock);
1202                 nexterror();
1203         }
1204
1205         spin_lock_irqsave(&q->lock);
1206
1207         /* give up if the queue is closed */
1208         if(q->state & Qclosed){
1209                 spin_unlock_irqsave(&q->lock);
1210                 error(q->err);
1211         }
1212
1213         /* if nonblocking, don't queue over the limit */
1214         if(q->len >= q->limit){
1215                 if(q->noblock){
1216                         spin_unlock_irqsave(&q->lock);
1217                         freeb(b);
1218                         noblockcnt += n;
1219                         qunlock(&q->wlock);
1220                         poperror();
1221                         return n;
1222                 }
1223         }
1224
1225         /* queue the block */
1226         if(q->bfirst)
1227                 q->blast->next = b;
1228         else
1229                 q->bfirst = b;
1230         q->blast = b;
1231         b->next = 0;
1232         q->len += BALLOC(b);
1233         q->dlen += n;
1234         QDEBUG checkb(b, "qbwrite");
1235         b = NULL;
1236
1237         /* make sure other end gets awakened */
1238         if(q->state & Qstarve){
1239                 q->state &= ~Qstarve;
1240                 dowakeup = 1;
1241         }
1242         spin_unlock_irqsave(&q->lock);
1243
1244         /*  get output going again */
1245         if(q->kick && (dowakeup || (q->state&Qkick)))
1246                 q->kick(q->arg);
1247
1248         /* wakeup anyone consuming at the other end */
1249         if(dowakeup)
1250                 rendez_wakeup(&q->rr);
1251
1252         /*
1253          *  flow control, wait for queue to get below the limit
1254          *  before allowing the process to continue and queue
1255          *  more.  We do this here so that postnote can only
1256          *  interrupt us after the data has been queued.  This
1257          *  means that things like 9p flushes and ssl messages
1258          *  will not be disrupted by software interrupts.
1259          *
1260          *  Note - this is moderately dangerous since a process
1261          *  that keeps getting interrupted and rewriting will
1262          *  queue infinite crud.
1263          */
1264         for(;;){
1265                 if(q->noblock || qnotfull(q))
1266                         break;
1267
1268                 spin_lock_irqsave(&q->lock);
1269                 q->state |= Qflow;
1270                 spin_unlock_irqsave(&q->lock);
1271                 rendez_sleep(&q->wr, qnotfull, q);
1272         }
1273
1274         qunlock(&q->wlock);
1275         poperror();
1276         return n;
1277 }
1278
1279 /*
1280  *  write to a queue.  only Maxatomic bytes at a time is atomic.
1281  */
1282 int
1283 qwrite(struct queue *q, void *vp, int len)
1284 {
1285         ERRSTACK(1);
1286         int n, sofar;
1287         struct block *b;
1288         uint8_t *p = vp;
1289
1290         QDEBUG if(!islo())
1291                 printd("qwrite hi %p\n", getcallerpc(&q));
1292
1293         sofar = 0;
1294         do {
1295                 n = len-sofar;
1296                 if(n > Maxatomic)
1297                         n = Maxatomic;
1298
1299                 b = allocb(n);
1300                 if(waserror()){
1301                         freeb(b);
1302                         nexterror();
1303                 }
1304                 memmove(b->wp, p+sofar, n);
1305                 poperror();
1306                 b->wp += n;
1307
1308                 qbwrite(q, b);
1309
1310                 sofar += n;
1311         } while(sofar < len && (q->state & Qmsg) == 0);
1312
1313         return len;
1314 }
1315
1316 /*
1317  *  used by print() to write to a queue.  Since we may be splhi or not in
1318  *  a process, don't qlock.
1319  */
1320 int
1321 qiwrite(struct queue *q, void *vp, int len)
1322 {
1323         int n, sofar, dowakeup;
1324         struct block *b;
1325         uint8_t *p = vp;
1326
1327         dowakeup = 0;
1328
1329         sofar = 0;
1330         do {
1331                 n = len-sofar;
1332                 if(n > Maxatomic)
1333                         n = Maxatomic;
1334
1335                 b = iallocb(n);
1336                 if(b == NULL)
1337                         break;
1338                 memmove(b->wp, p+sofar, n);
1339                 b->wp += n;
1340
1341                 spin_lock_irqsave(&q->lock);
1342
1343                 QDEBUG checkb(b, "qiwrite");
1344                 if(q->bfirst)
1345                         q->blast->next = b;
1346                 else
1347                         q->bfirst = b;
1348                 q->blast = b;
1349                 q->len += BALLOC(b);
1350                 q->dlen += n;
1351
1352                 if(q->state & Qstarve){
1353                         q->state &= ~Qstarve;
1354                         dowakeup = 1;
1355                 }
1356
1357                 spin_unlock_irqsave(&q->lock);
1358
1359                 if(dowakeup){
1360                         if(q->kick)
1361                                 q->kick(q->arg);
1362                         rendez_wakeup(&q->rr);
1363                 }
1364
1365                 sofar += n;
1366         } while(sofar < len && (q->state & Qmsg) == 0);
1367
1368         return sofar;
1369 }
1370
1371 /*
1372  *  be extremely careful when calling this,
1373  *  as there is no reference accounting
1374  */
1375 void
1376 qfree(struct queue *q)
1377 {
1378         qclose(q);
1379         kfree(q);
1380 }
1381
1382 /*
1383  *  Mark a queue as closed.  No further IO is permitted.
1384  *  All blocks are released.
1385  */
1386 void
1387 qclose(struct queue *q)
1388 {
1389         struct block *bfirst;
1390
1391         if(q == NULL)
1392                 return;
1393
1394         /* mark it */
1395         spin_lock_irqsave(&q->lock);
1396         q->state |= Qclosed;
1397         q->state &= ~(Qflow|Qstarve);
1398         strncpy(q->err,  Ehungup, sizeof(q->err));
1399         bfirst = q->bfirst;
1400         q->bfirst = 0;
1401         q->len = 0;
1402         q->dlen = 0;
1403         q->noblock = 0;
1404         spin_unlock_irqsave(&q->lock);
1405
1406         /* free queued blocks */
1407         freeblist(bfirst);
1408
1409         /* wake up readers/writers */
1410         rendez_wakeup(&q->rr);
1411         rendez_wakeup(&q->wr);
1412 }
1413
1414 /*
1415  *  Mark a queue as closed.  Wakeup any readers.  Don't remove queued
1416  *  blocks.
1417  */
1418 void
1419 qhangup(struct queue *q, char *msg)
1420 {
1421         /* mark it */
1422         spin_lock_irqsave(&q->lock);
1423         q->state |= Qclosed;
1424         if(msg == 0 || *msg == 0)
1425                 strncpy(q->err,  Ehungup, sizeof(q->err));
1426         else
1427                 strncpy(q->err, msg, ERRMAX-1);
1428         spin_unlock_irqsave(&q->lock);
1429
1430         /* wake up readers/writers */
1431         rendez_wakeup(&q->rr);
1432         rendez_wakeup(&q->wr);
1433 }
1434
1435 /*
1436  *  return non-zero if the q is hungup
1437  */
1438 int
1439 qisclosed(struct queue *q)
1440 {
1441         return q->state & Qclosed;
1442 }
1443
1444 /*
1445  *  mark a queue as no longer hung up
1446  */
1447 void
1448 qreopen(struct queue *q)
1449 {
1450         spin_lock_irqsave(&q->lock);
1451         q->state &= ~Qclosed;
1452         q->state |= Qstarve;
1453         q->eof = 0;
1454         q->limit = q->iNULLim;
1455         spin_unlock_irqsave(&q->lock);
1456 }
1457
1458 /*
1459  *  return bytes queued
1460  */
1461 int
1462 qlen(struct queue *q)
1463 {
1464         return q->dlen;
1465 }
1466
1467 /*
1468  * return space remaining before flow control
1469  */
1470 int
1471 qwindow(struct queue *q)
1472 {
1473         int l;
1474
1475         l = q->limit - q->len;
1476         if(l < 0)
1477                 l = 0;
1478         return l;
1479 }
1480
1481 /*
1482  *  return true if we can read without blocking
1483  */
1484 int
1485 qcanread(struct queue *q)
1486 {
1487         return q->bfirst!=0;
1488 }
1489
1490 /*
1491  *  change queue limit
1492  */
1493 void
1494 qsetlimit(struct queue *q, int limit)
1495 {
1496         q->limit = limit;
1497 }
1498
1499 /*
1500  *  set blocking/nonblocking
1501  */
1502 void
1503 qnoblock(struct queue *q, int onoff)
1504 {
1505         q->noblock = onoff;
1506 }
1507
1508 /*
1509  *  flush the output queue
1510  */
1511 void
1512 qflush(struct queue *q)
1513 {
1514         struct block *bfirst;
1515
1516         /* mark it */
1517         spin_lock_irqsave(&q->lock);
1518         bfirst = q->bfirst;
1519         q->bfirst = 0;
1520         q->len = 0;
1521         q->dlen = 0;
1522         spin_unlock_irqsave(&q->lock);
1523
1524         /* free queued blocks */
1525         freeblist(bfirst);
1526
1527         /* wake up readers/writers */
1528         rendez_wakeup(&q->wr);
1529 }
1530
1531 int
1532 qfull(struct queue *q)
1533 {
1534         return q->state & Qflow;
1535 }
1536
1537 int
1538 qstate(struct queue *q)
1539 {
1540         return q->state;
1541 }
1542
1543 void
1544 qdump(struct queue *q)
1545 {
1546         if(q)
1547         printk("q=%p bfirst=%p blast=%p len=%d dlen=%d limit=%d state=#%x\n",
1548                 q, q->bfirst, q->blast, q->len, q->dlen, q->limit, q->state);
1549 }