qio: Do not kick when calling qdiscard()
[akaros.git] / kern / src / ns / qio.c
1 /* Copyright © 1994-1999 Lucent Technologies Inc.  All rights reserved.
2  * Portions Copyright © 1997-1999 Vita Nuova Limited
3  * Portions Copyright © 2000-2007 Vita Nuova Holdings Limited
4  *                                (www.vitanuova.com)
5  * Revisions Copyright © 2000-2007 Lucent Technologies Inc. and others
6  *
7  * Modified for the Akaros operating system:
8  * Copyright (c) 2013-2014 The Regents of the University of California
9  * Copyright (c) 2013-2015 Google Inc.
10  *
11  * Permission is hereby granted, free of charge, to any person obtaining a copy
12  * of this software and associated documentation files (the "Software"), to deal
13  * in the Software without restriction, including without limitation the rights
14  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15  * copies of the Software, and to permit persons to whom the Software is
16  * furnished to do so, subject to the following conditions:
17  *
18  * The above copyright notice and this permission notice shall be included in
19  * all copies or substantial portions of the Software.
20  *
21  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL THE
24  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
27  * SOFTWARE. */
28
29 #include <vfs.h>
30 #include <kfs.h>
31 #include <slab.h>
32 #include <kmalloc.h>
33 #include <kref.h>
34 #include <string.h>
35 #include <stdio.h>
36 #include <assert.h>
37 #include <error.h>
38 #include <cpio.h>
39 #include <pmap.h>
40 #include <smp.h>
41 #include <ip.h>
42
43 #define PANIC_EXTRA(b)                                                  \
44 {                                                                       \
45         if ((b)->extra_len) {                                           \
46                 printblock(b);                                          \
47                 backtrace();                                            \
48                 panic("%s doesn't handle extra_data", __FUNCTION__);    \
49         }                                                               \
50 }
51
52 static uint32_t padblockcnt;
53 static uint32_t concatblockcnt;
54 static uint32_t pullupblockcnt;
55 static uint32_t copyblockcnt;
56 static uint32_t consumecnt;
57 static uint32_t producecnt;
58 static uint32_t qcopycnt;
59
60 static int debugging;
61
62 #define QDEBUG  if(0)
63
64 /*
65  *  IO queues
66  */
67
68 struct queue {
69         spinlock_t lock;;
70
71         struct block *bfirst;           /* buffer */
72         struct block *blast;
73
74         int len;                                        /* bytes allocated to queue */
75         int dlen;                                       /* data bytes in queue */
76         int limit;                                      /* max bytes in queue */
77         int inilim;                             /* initial limit */
78         int state;
79         int eof;                                        /* number of eofs read by user */
80
81         void (*kick) (void *);          /* restart output */
82         void (*bypass) (void *, struct block *);        /* bypass queue altogether */
83         void *arg;                                      /* argument to kick */
84
85         struct rendez rr;                       /* process waiting to read */
86         struct rendez wr;                       /* process waiting to write */
87         qio_wake_cb_t wake_cb;          /* callbacks for qio wakeups */
88         void *wake_data;
89
90         char err[ERRMAX];
91 };
92
93 enum {
94         Maxatomic = 64 * 1024,
95         QIO_CAN_ERR_SLEEP = (1 << 0),   /* can throw errors or block/sleep */
96         QIO_LIMIT = (1 << 1),                   /* respect q->limit */
97         QIO_DROP_OVERFLOW = (1 << 2),   /* alternative to setting qdropoverflow */
98         QIO_JUST_ONE_BLOCK = (1 << 3),  /* when qbreading, just get one block */
99         QIO_NON_BLOCK = (1 << 4),               /* throw EAGAIN instead of blocking */
100         QIO_DONT_KICK = (1 << 5),               /* don't kick when waking */
101 };
102
103 unsigned int qiomaxatomic = Maxatomic;
104
105 static ssize_t __qbwrite(struct queue *q, struct block *b, int flags);
106 static struct block *__qbread(struct queue *q, size_t len, int qio_flags,
107                               int mem_flags);
108 static bool qwait_and_ilock(struct queue *q, int qio_flags);
109 static void qwakeup_iunlock(struct queue *q, int qio_flags);
110
111 /* Helper: fires a wake callback, sending 'filter' */
112 static void qwake_cb(struct queue *q, int filter)
113 {
114         if (q->wake_cb)
115                 q->wake_cb(q, q->wake_data, filter);
116 }
117
118 void ixsummary(void)
119 {
120         debugging ^= 1;
121         printd("pad %lu, concat %lu, pullup %lu, copy %lu\n",
122                    padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
123         printd("consume %lu, produce %lu, qcopy %lu\n",
124                    consumecnt, producecnt, qcopycnt);
125 }
126
127 /*
128  *  pad a block to the front (or the back if size is negative)
129  */
130 struct block *padblock(struct block *bp, int size)
131 {
132         int n;
133         struct block *nbp;
134         uint8_t bcksum = bp->flag & BCKSUM_FLAGS;
135         uint16_t checksum_start = bp->checksum_start;
136         uint16_t checksum_offset = bp->checksum_offset;
137         uint16_t mss = bp->mss;
138
139         QDEBUG checkb(bp, "padblock 1");
140         if (size >= 0) {
141                 if (bp->rp - bp->base >= size) {
142                         bp->checksum_start += size;
143                         bp->rp -= size;
144                         return bp;
145                 }
146
147                 PANIC_EXTRA(bp);
148                 if (bp->next)
149                         panic("padblock %p", getcallerpc(&bp));
150                 n = BLEN(bp);
151                 padblockcnt++;
152                 nbp = block_alloc(size + n, MEM_WAIT);
153                 nbp->rp += size;
154                 nbp->wp = nbp->rp;
155                 memmove(nbp->wp, bp->rp, n);
156                 nbp->wp += n;
157                 freeb(bp);
158                 nbp->rp -= size;
159         } else {
160                 size = -size;
161
162                 PANIC_EXTRA(bp);
163
164                 if (bp->next)
165                         panic("padblock %p", getcallerpc(&bp));
166
167                 if (bp->lim - bp->wp >= size)
168                         return bp;
169
170                 n = BLEN(bp);
171                 padblockcnt++;
172                 nbp = block_alloc(size + n, MEM_WAIT);
173                 memmove(nbp->wp, bp->rp, n);
174                 nbp->wp += n;
175                 freeb(bp);
176         }
177         if (bcksum) {
178                 nbp->flag |= bcksum;
179                 nbp->checksum_start = checksum_start;
180                 nbp->checksum_offset = checksum_offset;
181                 nbp->mss = mss;
182         }
183         QDEBUG checkb(nbp, "padblock 1");
184         return nbp;
185 }
186
187 /*
188  *  return count of bytes in a string of blocks
189  */
190 int blocklen(struct block *bp)
191 {
192         int len;
193
194         len = 0;
195         while (bp) {
196                 len += BLEN(bp);
197                 bp = bp->next;
198         }
199         return len;
200 }
201
202 /*
203  * return count of space in blocks
204  */
205 int blockalloclen(struct block *bp)
206 {
207         int len;
208
209         len = 0;
210         while (bp) {
211                 len += BALLOC(bp);
212                 bp = bp->next;
213         }
214         return len;
215 }
216
217 /*
218  *  copy the  string of blocks into
219  *  a single block and free the string
220  */
221 struct block *concatblock(struct block *bp)
222 {
223         int len;
224         struct block *nb, *f;
225
226         if (bp->next == 0)
227                 return bp;
228
229         /* probably use parts of qclone */
230         PANIC_EXTRA(bp);
231         nb = block_alloc(blocklen(bp), MEM_WAIT);
232         for (f = bp; f; f = f->next) {
233                 len = BLEN(f);
234                 memmove(nb->wp, f->rp, len);
235                 nb->wp += len;
236         }
237         concatblockcnt += BLEN(nb);
238         freeblist(bp);
239         QDEBUG checkb(nb, "concatblock 1");
240         return nb;
241 }
242
243 /* Returns a block with the remaining contents of b all in the main body of the
244  * returned block.  Replace old references to b with the returned value (which
245  * may still be 'b', if no change was needed. */
246 struct block *linearizeblock(struct block *b)
247 {
248         struct block *newb;
249         size_t len;
250         struct extra_bdata *ebd;
251
252         if (!b->extra_len)
253                 return b;
254
255         newb = block_alloc(BLEN(b), MEM_WAIT);
256         len = BHLEN(b);
257         memcpy(newb->wp, b->rp, len);
258         newb->wp += len;
259         len = b->extra_len;
260         for (int i = 0; (i < b->nr_extra_bufs) && len; i++) {
261                 ebd = &b->extra_data[i];
262                 if (!ebd->base || !ebd->len)
263                         continue;
264                 memcpy(newb->wp, (void*)(ebd->base + ebd->off), ebd->len);
265                 newb->wp += ebd->len;
266                 len -= ebd->len;
267         }
268         /* TODO: any other flags that need copied over? */
269         if (b->flag & BCKSUM_FLAGS) {
270                 newb->flag |= (b->flag & BCKSUM_FLAGS);
271                 newb->checksum_start = b->checksum_start;
272                 newb->checksum_offset = b->checksum_offset;
273                 newb->mss = b->mss;
274         }
275         freeb(b);
276         return newb;
277 }
278
279 /*
280  *  make sure the first block has at least n bytes in its main body
281  */
282 struct block *pullupblock(struct block *bp, int n)
283 {
284         int i, len, seglen;
285         struct block *nbp;
286         struct extra_bdata *ebd;
287
288         /*
289          *  this should almost always be true, it's
290          *  just to avoid every caller checking.
291          */
292         if (BHLEN(bp) >= n)
293                 return bp;
294
295          /* a start at explicit main-body / header management */
296         if (bp->extra_len) {
297                 if (n > bp->lim - bp->rp) {
298                         /* would need to realloc a new block and copy everything over. */
299                         panic("can't pullup %d bytes, no place to put it: bp->lim %p, bp->rp %p, bp->lim-bp->rp %d\n",
300                                         n, bp->lim, bp->rp, bp->lim-bp->rp);
301                 }
302                 len = n - BHLEN(bp);
303                 if (len > bp->extra_len)
304                         panic("pullup more than extra (%d, %d, %d)\n",
305                               n, BHLEN(bp), bp->extra_len);
306                 QDEBUG checkb(bp, "before pullup");
307                 for (int i = 0; (i < bp->nr_extra_bufs) && len; i++) {
308                         ebd = &bp->extra_data[i];
309                         if (!ebd->base || !ebd->len)
310                                 continue;
311                         seglen = MIN(ebd->len, len);
312                         memcpy(bp->wp, (void*)(ebd->base + ebd->off), seglen);
313                         bp->wp += seglen;
314                         len -= seglen;
315                         ebd->len -= seglen;
316                         ebd->off += seglen;
317                         bp->extra_len -= seglen;
318                         if (ebd->len == 0) {
319                                 kfree((void *)ebd->base);
320                                 ebd->off = 0;
321                                 ebd->base = 0;
322                         }
323                 }
324                 /* maybe just call pullupblock recursively here */
325                 if (len)
326                         panic("pullup %d bytes overdrawn\n", len);
327                 QDEBUG checkb(bp, "after pullup");
328                 return bp;
329         }
330
331         /*
332          *  if not enough room in the first block,
333          *  add another to the front of the list.
334          */
335         if (bp->lim - bp->rp < n) {
336                 nbp = block_alloc(n, MEM_WAIT);
337                 nbp->next = bp;
338                 bp = nbp;
339         }
340
341         /*
342          *  copy bytes from the trailing blocks into the first
343          */
344         n -= BLEN(bp);
345         while ((nbp = bp->next)) {
346                 i = BLEN(nbp);
347                 if (i > n) {
348                         memmove(bp->wp, nbp->rp, n);
349                         pullupblockcnt++;
350                         bp->wp += n;
351                         nbp->rp += n;
352                         QDEBUG checkb(bp, "pullupblock 1");
353                         return bp;
354                 } else {
355                         memmove(bp->wp, nbp->rp, i);
356                         pullupblockcnt++;
357                         bp->wp += i;
358                         bp->next = nbp->next;
359                         nbp->next = 0;
360                         freeb(nbp);
361                         n -= i;
362                         if (n == 0) {
363                                 QDEBUG checkb(bp, "pullupblock 2");
364                                 return bp;
365                         }
366                 }
367         }
368         freeb(bp);
369         return 0;
370 }
371
372 /*
373  *  make sure the first block has at least n bytes in its main body
374  */
375 struct block *pullupqueue(struct queue *q, int n)
376 {
377         struct block *b;
378
379         /* TODO: lock to protect the queue links? */
380         if ((BHLEN(q->bfirst) >= n))
381                 return q->bfirst;
382         q->bfirst = pullupblock(q->bfirst, n);
383         for (b = q->bfirst; b != NULL && b->next != NULL; b = b->next) ;
384         q->blast = b;
385         return q->bfirst;
386 }
387
388 /* throw away count bytes from the front of
389  * block's extradata.  Returns count of bytes
390  * thrown away
391  */
392
393 static int pullext(struct block *bp, int count)
394 {
395         struct extra_bdata *ed;
396         int i, rem, bytes = 0;
397
398         for (i = 0; bp->extra_len && count && i < bp->nr_extra_bufs; i++) {
399                 ed = &bp->extra_data[i];
400                 rem = MIN(count, ed->len);
401                 bp->extra_len -= rem;
402                 count -= rem;
403                 bytes += rem;
404                 ed->off += rem;
405                 ed->len -= rem;
406                 if (ed->len == 0) {
407                         kfree((void *)ed->base);
408                         ed->base = 0;
409                         ed->off = 0;
410                 }
411         }
412         return bytes;
413 }
414
415 /* throw away count bytes from the end of a
416  * block's extradata.  Returns count of bytes
417  * thrown away
418  */
419
420 static int dropext(struct block *bp, int count)
421 {
422         struct extra_bdata *ed;
423         int i, rem, bytes = 0;
424
425         for (i = bp->nr_extra_bufs - 1; bp->extra_len && count && i >= 0; i--) {
426                 ed = &bp->extra_data[i];
427                 rem = MIN(count, ed->len);
428                 bp->extra_len -= rem;
429                 count -= rem;
430                 bytes += rem;
431                 ed->len -= rem;
432                 if (ed->len == 0) {
433                         kfree((void *)ed->base);
434                         ed->base = 0;
435                         ed->off = 0;
436                 }
437         }
438         return bytes;
439 }
440
441 /*
442  *  throw away up to count bytes from a
443  *  list of blocks.  Return count of bytes
444  *  thrown away.
445  */
446 static int _pullblock(struct block **bph, int count, int free)
447 {
448         struct block *bp;
449         int n, bytes;
450
451         bytes = 0;
452         if (bph == NULL)
453                 return 0;
454
455         while (*bph != NULL && count != 0) {
456                 bp = *bph;
457
458                 n = MIN(BHLEN(bp), count);
459                 bytes += n;
460                 count -= n;
461                 bp->rp += n;
462                 n = pullext(bp, count);
463                 bytes += n;
464                 count -= n;
465                 QDEBUG checkb(bp, "pullblock ");
466                 if (BLEN(bp) == 0 && (free || count)) {
467                         *bph = bp->next;
468                         bp->next = NULL;
469                         freeb(bp);
470                 }
471         }
472         return bytes;
473 }
474
475 int pullblock(struct block **bph, int count)
476 {
477         return _pullblock(bph, count, 1);
478 }
479
480 /*
481  *  trim to len bytes starting at offset
482  */
483 struct block *trimblock(struct block *bp, int offset, int len)
484 {
485         uint32_t l, trim;
486         int olen = len;
487
488         QDEBUG checkb(bp, "trimblock 1");
489         if (blocklen(bp) < offset + len) {
490                 freeblist(bp);
491                 return NULL;
492         }
493
494         l =_pullblock(&bp, offset, 0);
495         if (bp == NULL)
496                 return NULL;
497         if (l != offset) {
498                 freeblist(bp);
499                 return NULL;
500         }
501
502         while ((l = BLEN(bp)) < len) {
503                 len -= l;
504                 bp = bp->next;
505         }
506
507         trim = BLEN(bp) - len;
508         trim -= dropext(bp, trim);
509         bp->wp -= trim;
510
511         if (bp->next) {
512                 freeblist(bp->next);
513                 bp->next = NULL;
514         }
515         return bp;
516 }
517
518 /*
519  *  copy 'count' bytes into a new block
520  */
521 struct block *copyblock(struct block *bp, int count)
522 {
523         int l;
524         struct block *nbp;
525
526         QDEBUG checkb(bp, "copyblock 0");
527         nbp = block_alloc(count, MEM_WAIT);
528         if (bp->flag & BCKSUM_FLAGS) {
529                 nbp->flag |= (bp->flag & BCKSUM_FLAGS);
530                 nbp->checksum_start = bp->checksum_start;
531                 nbp->checksum_offset = bp->checksum_offset;
532                 nbp->mss = bp->mss;
533         }
534         PANIC_EXTRA(bp);
535         for (; count > 0 && bp != 0; bp = bp->next) {
536                 l = BLEN(bp);
537                 if (l > count)
538                         l = count;
539                 memmove(nbp->wp, bp->rp, l);
540                 nbp->wp += l;
541                 count -= l;
542         }
543         if (count > 0) {
544                 memset(nbp->wp, 0, count);
545                 nbp->wp += count;
546         }
547         copyblockcnt++;
548         QDEBUG checkb(nbp, "copyblock 1");
549
550         return nbp;
551 }
552
553 /* Adjust block @bp so that its size is exactly @len.
554  * If the size is increased, fill in the new contents with zeros.
555  * If the size is decreased, discard some of the old contents at the tail. */
556 struct block *adjustblock(struct block *bp, int len)
557 {
558         struct extra_bdata *ebd;
559         void *buf;
560         int i;
561
562         if (len < 0) {
563                 freeb(bp);
564                 return NULL;
565         }
566
567         if (len == BLEN(bp))
568                 return bp;
569
570         /* Shrink within block main body. */
571         if (len <= BHLEN(bp)) {
572                 free_block_extra(bp);
573                 bp->wp = bp->rp + len;
574                 QDEBUG checkb(bp, "adjustblock 1");
575                 return bp;
576         }
577
578         /* Need to grow. */
579         if (len > BLEN(bp)) {
580                 /* Grow within block main body. */
581                 if (bp->extra_len == 0 && bp->rp + len <= bp->lim) {
582                         memset(bp->wp, 0, len - BLEN(bp));
583                         bp->wp = bp->rp + len;
584                         QDEBUG checkb(bp, "adjustblock 2");
585                         return bp;
586                 }
587                 /* Grow with extra data buffers. */
588                 buf = kzmalloc(len - BLEN(bp), MEM_WAIT);
589                 block_append_extra(bp, (uintptr_t)buf, 0, len - BLEN(bp), MEM_WAIT);
590                 QDEBUG checkb(bp, "adjustblock 3");
591                 return bp;
592         }
593
594         /* Shrink extra data buffers.
595          * len is how much of ebd we need to keep.
596          * extra_len is re-accumulated. */
597         assert(bp->extra_len > 0);
598         len -= BHLEN(bp);
599         bp->extra_len = 0;
600         for (i = 0; i < bp->nr_extra_bufs; i++) {
601                 ebd = &bp->extra_data[i];
602                 if (len <= ebd->len)
603                         break;
604                 len -= ebd->len;
605                 bp->extra_len += ebd->len;
606         }
607         /* If len becomes zero, extra_data[i] should be freed. */
608         if (len > 0) {
609                 ebd = &bp->extra_data[i];
610                 ebd->len = len;
611                 bp->extra_len += ebd->len;
612                 i++;
613         }
614         for (; i < bp->nr_extra_bufs; i++) {
615                 ebd = &bp->extra_data[i];
616                 if (ebd->base)
617                         kfree((void*)ebd->base);
618                 ebd->base = ebd->off = ebd->len = 0;
619         }
620         QDEBUG checkb(bp, "adjustblock 4");
621         return bp;
622 }
623
624 /* Helper: removes and returns the first block from q */
625 static struct block *pop_first_block(struct queue *q)
626 {
627         struct block *b = q->bfirst;
628
629         q->len -= BALLOC(b);  // XXX all usages of q->len with extra_data are fucked
630         q->dlen -= BLEN(b);
631         q->bfirst = b->next;
632         b->next = 0;
633         return b;
634 }
635
636 /* Helper: copies up to copy_amt from a buf to a block's main body (b->wp) */
637 static size_t copy_to_block_body(struct block *to, void *from, size_t copy_amt)
638 {
639         copy_amt = MIN(to->lim - to->wp, copy_amt);
640         memcpy(to->wp, from, copy_amt);
641         to->wp += copy_amt;
642         return copy_amt;
643 }
644
645 /* Accounting helper.  Block b in q lost amt extra_data */
646 static void block_and_q_lost_extra(struct block *b, struct queue *q, size_t amt)
647 {
648         b->extra_len -= amt;
649         q->len -= amt;
650         q->dlen -= amt;
651 }
652
653 /* Helper: moves ebd from a block (in from_q) to another block.  The *ebd is
654  * fixed in 'from', so we move its contents and zero it out in 'from'.
655  *
656  * Returns the length moved (0 on failure). */
657 static size_t move_ebd(struct extra_bdata *ebd, struct block *to,
658                        struct block *from, struct queue *from_q)
659 {
660         size_t ret = ebd->len;
661
662         if (block_append_extra(to, ebd->base, ebd->off, ebd->len, MEM_ATOMIC))
663                 return 0;
664         block_and_q_lost_extra(from, from_q, ebd->len);
665         ebd->base = ebd->len = ebd->off = 0;
666         return ret;
667 }
668
669 /* Copy up to len bytes from q->bfirst to @to, leaving the block in place.  May
670  * return with less than len, but greater than 0, even if there is more
671  * available in q.
672  *
673  * At any moment that we have copied anything and things are tricky, we can just
674  * return.  The trickiness comes from a bunch of variables: is the main body
675  * empty?  How do we split the ebd?  If our alloc fails, then we can fall back
676  * to @to's main body, but only if we haven't used it yet. */
677 static size_t copy_from_first_block(struct queue *q, struct block *to,
678                                     size_t len)
679 {
680         struct block *from = q->bfirst;
681         size_t copy_amt, amt;
682         struct extra_bdata *ebd;
683
684         assert(len < BLEN(from));       /* sanity */
685         /* Try to extract from the main body */
686         copy_amt = MIN(BHLEN(from), len);
687         if (copy_amt) {
688                 copy_amt = copy_to_block_body(to, from->rp, copy_amt);
689                 from->rp += copy_amt;
690                 /* We only change dlen, (data len), not q->len, since the q still has
691                  * the same block memory allocation (no kfrees happened) */
692                 q->dlen -= copy_amt;
693         }
694         /* Try to extract the remainder from the extra data */
695         len -= copy_amt;
696         for (int i = 0; (i < from->nr_extra_bufs) && len; i++) {
697                 ebd = &from->extra_data[i];
698                 if (!ebd->base || !ebd->len)
699                         continue;
700                 if (len >= ebd->len) {
701                         amt = move_ebd(ebd, to, from, q);
702                         if (!amt) {
703                                 /* our internal alloc could have failed.   this ebd is now the
704                                  * last one we'll consider.  let's handle it separately and put
705                                  * it in the main body. */
706                                 if (copy_amt)
707                                         return copy_amt;
708                                 copy_amt = copy_to_block_body(to, (void*)ebd->base + ebd->off,
709                                                               ebd->len);
710                                 block_and_q_lost_extra(from, q, copy_amt);
711                                 break;
712                         }
713                         len -= amt;
714                         copy_amt += amt;
715                         continue;
716                 } else {
717                         /* If we're here, we reached our final ebd, which we'll need to
718                          * split to get anything from it. */
719                         if (copy_amt)
720                                 return copy_amt;
721                         copy_amt = copy_to_block_body(to, (void*)ebd->base + ebd->off,
722                                                       len);
723                         ebd->off += copy_amt;
724                         ebd->len -= copy_amt;
725                         block_and_q_lost_extra(from, q, copy_amt);
726                         break;
727                 }
728         }
729         if (len)
730                 assert(copy_amt);       /* sanity */
731         return copy_amt;
732 }
733
734 /* Return codes for __qbread and __try_qbread. */
735 enum {
736         QBR_OK,
737         QBR_FAIL,
738         QBR_SPARE,      /* we need a spare block */
739         QBR_AGAIN,      /* do it again, we are coalescing blocks */
740 };
741
742 /* Helper and back-end for __qbread: extracts and returns a list of blocks
743  * containing up to len bytes.  It may contain less than len even if q has more
744  * data.
745  *
746  * Returns a code interpreted by __qbread, and the returned blist in ret. */
747 static int __try_qbread(struct queue *q, size_t len, int qio_flags,
748                         struct block **real_ret, struct block *spare)
749 {
750         struct block *ret, *ret_last, *first;
751         size_t blen;
752
753         if (qio_flags & QIO_CAN_ERR_SLEEP) {
754                 if (!qwait_and_ilock(q, qio_flags)) {
755                         spin_unlock_irqsave(&q->lock);
756                         return QBR_FAIL;
757                 }
758                 /* we qwaited and still hold the lock, so the q is not empty */
759                 first = q->bfirst;
760         } else {
761                 spin_lock_irqsave(&q->lock);
762                 first = q->bfirst;
763                 if (!first) {
764                         spin_unlock_irqsave(&q->lock);
765                         return QBR_FAIL;
766                 }
767         }
768         blen = BLEN(first);
769         if ((q->state & Qcoalesce) && (blen == 0)) {
770                 freeb(pop_first_block(q));
771                 spin_unlock_irqsave(&q->lock);
772                 /* Need to retry to make sure we have a first block */
773                 return QBR_AGAIN;
774         }
775         /* Qmsg is a bit weird.  The old 9ns code seemed to yank the entire block,
776          * regardless of len.  We'll do the same, and just return the minimum: the
777          * first block.  I'd be happy to remove this. */
778         if (q->state & Qmsg) {
779                 ret = pop_first_block(q);
780                 goto out_ok;
781         }
782         /* Let's get at least something first - makes the code easier.  This way,
783          * we'll only ever split the block once. */
784         if (blen <= len) {
785                 ret = pop_first_block(q);
786                 len -= blen;
787         } else {
788                 /* need to split the block.  we won't actually take the first block out
789                  * of the queue - we're just extracting a little bit. */
790                 if (!spare) {
791                         /* We have nothing and need a spare block.  Retry! */
792                         spin_unlock_irqsave(&q->lock);
793                         return QBR_SPARE;
794                 }
795                 copy_from_first_block(q, spare, len);
796                 ret = spare;
797                 goto out_ok;
798         }
799         /* At this point, we just grabbed the first block.  We can try to grab some
800          * more, up to len (if they want). */
801         if (qio_flags & QIO_JUST_ONE_BLOCK)
802                 goto out_ok;
803         ret_last = ret;
804         while (q->bfirst && (len > 0)) {
805                 blen = BLEN(q->bfirst);
806                 if ((q->state & Qcoalesce) && (blen == 0)) {
807                         /* remove the intermediate 0 blocks */
808                         freeb(pop_first_block(q));
809                         continue;
810                 }
811                 if (blen > len) {
812                         /* We could try to split the block, but that's a huge pain.  For
813                          * instance, we might need to move the main body of b into an
814                          * extra_data of ret_last.  lots of ways for that to fail, and lots
815                          * of cases to consider.  Easier to just bail out.  This is why I
816                          * did the first block above: we don't need to worry about this. */
817                          break;
818                 }
819                 ret_last->next = pop_first_block(q);
820                 ret_last = ret_last->next;
821                 len -= blen;
822         }
823 out_ok:
824         qwakeup_iunlock(q, qio_flags);
825         *real_ret = ret;
826         return QBR_OK;
827 }
828
829 /* Helper and front-end for __try_qbread: extracts and returns a list of blocks
830  * containing up to len bytes.  It may contain less than len even if q has more
831  * data.
832  *
833  * Returns 0 if the q is closed or would require blocking and !CAN_BLOCK.
834  *
835  * Technically, there's a weird corner case with !Qcoalesce and Qmsg where you
836  * could get a zero length block back. */
837 static struct block *__qbread(struct queue *q, size_t len, int qio_flags,
838                               int mem_flags)
839 {
840         struct block *ret = 0;
841         struct block *spare = 0;
842
843         while (1) {
844                 switch (__try_qbread(q, len, qio_flags, &ret, spare)) {
845                 case QBR_OK:
846                 case QBR_FAIL:
847                         if (spare && (ret != spare))
848                                 freeb(spare);
849                         return ret;
850                 case QBR_SPARE:
851                         assert(!spare);
852                         /* Due to some nastiness, we need a fresh block so we can read out
853                          * anything from the queue.  'len' seems like a reasonable amount.
854                          * Maybe we can get away with less. */
855                         spare = block_alloc(len, mem_flags);
856                         if (!spare)
857                                 return 0;
858                         break;
859                 case QBR_AGAIN:
860                         /* if the first block is 0 and we are Qcoalesce, then we'll need to
861                          * try again.  We bounce out of __try so we can perform the "is
862                          * there a block" logic again from the top. */
863                         break;
864                 }
865         }
866 }
867
868 /*
869  *  get next block from a queue, return null if nothing there
870  */
871 struct block *qget(struct queue *q)
872 {
873         /* since len == SIZE_MAX, we should never need to do a mem alloc */
874         return __qbread(q, SIZE_MAX, QIO_JUST_ONE_BLOCK, MEM_ATOMIC);
875 }
876
877 /* Throw away the next 'len' bytes in the queue returning the number actually
878  * discarded.
879  *
880  * If the bytes are in the queue, then they must be discarded.  The only time to
881  * return less than len is if the q itself has less than len bytes.
882  *
883  * This won't trigger a kick when waking up any sleepers.  This seems to be Plan
884  * 9's intent, since the TCP stack will deadlock if qdiscard kicks. */
885 size_t qdiscard(struct queue *q, size_t len)
886 {
887         struct block *blist;
888         size_t removed_amt;
889         size_t sofar = 0;
890
891         /* This is racy.  There could be multiple qdiscarders or other consumers,
892          * where the consumption could be interleaved. */
893         while (qlen(q) && len) {
894                 blist = __qbread(q, len, QIO_DONT_KICK, MEM_WAIT);
895                 removed_amt = freeblist(blist);
896                 sofar += removed_amt;
897                 len -= removed_amt;
898         }
899         return sofar;
900 }
901
902 ssize_t qpass(struct queue *q, struct block *b)
903 {
904         return __qbwrite(q, b, QIO_LIMIT | QIO_DROP_OVERFLOW);
905 }
906
907 ssize_t qpassnolim(struct queue *q, struct block *b)
908 {
909         return __qbwrite(q, b, 0);
910 }
911
912 /*
913  *  if the allocated space is way out of line with the used
914  *  space, reallocate to a smaller block
915  */
916 struct block *packblock(struct block *bp)
917 {
918         struct block **l, *nbp;
919         int n;
920
921         if (bp->extra_len)
922                 return bp;
923         for (l = &bp; *l; l = &(*l)->next) {
924                 nbp = *l;
925                 n = BLEN(nbp);
926                 if ((n << 2) < BALLOC(nbp)) {
927                         *l = block_alloc(n, MEM_WAIT);
928                         memmove((*l)->wp, nbp->rp, n);
929                         (*l)->wp += n;
930                         (*l)->next = nbp->next;
931                         freeb(nbp);
932                 }
933         }
934
935         return bp;
936 }
937
938 /* Add an extra_data entry to newb at newb_idx pointing to b's body, starting at
939  * body_rp, for up to len.  Returns the len consumed.
940  *
941  * The base is 'b', so that we can kfree it later.  This currently ties us to
942  * using kfree for the release method for all extra_data.
943  *
944  * It is possible to have a body size that is 0, if there is no offset, and
945  * b->wp == b->rp.  This will have an extra data entry of 0 length. */
946 static size_t point_to_body(struct block *b, uint8_t *body_rp,
947                             struct block *newb, unsigned int newb_idx,
948                             size_t len)
949 {
950         struct extra_bdata *ebd = &newb->extra_data[newb_idx];
951
952         assert(newb_idx < newb->nr_extra_bufs);
953
954         kmalloc_incref(b);
955         ebd->base = (uintptr_t)b;
956         ebd->off = (uint32_t)(body_rp - (uint8_t*)b);
957         ebd->len = MIN(b->wp - body_rp, len);   /* think of body_rp as b->rp */
958         assert((int)ebd->len >= 0);
959         newb->extra_len += ebd->len;
960         return ebd->len;
961 }
962
963 /* Add an extra_data entry to newb at newb_idx pointing to b's b_idx'th
964  * extra_data buf, at b_off within that buffer, for up to len.  Returns the len
965  * consumed.
966  *
967  * We can have blocks with 0 length, but they are still refcnt'd.  See above. */
968 static size_t point_to_buf(struct block *b, unsigned int b_idx, uint32_t b_off,
969                            struct block *newb, unsigned int newb_idx,
970                            size_t len)
971 {
972         struct extra_bdata *n_ebd = &newb->extra_data[newb_idx];
973         struct extra_bdata *b_ebd = &b->extra_data[b_idx];
974
975         assert(b_idx < b->nr_extra_bufs);
976         assert(newb_idx < newb->nr_extra_bufs);
977
978         kmalloc_incref((void*)b_ebd->base);
979         n_ebd->base = b_ebd->base;
980         n_ebd->off = b_ebd->off + b_off;
981         n_ebd->len = MIN(b_ebd->len - b_off, len);
982         newb->extra_len += n_ebd->len;
983         return n_ebd->len;
984 }
985
986 /* given a string of blocks, fills the new block's extra_data  with the contents
987  * of the blist [offset, len + offset)
988  *
989  * returns 0 on success.  the only failure is if the extra_data array was too
990  * small, so this returns a positive integer saying how big the extra_data needs
991  * to be.
992  *
993  * callers are responsible for protecting the list structure. */
994 static int __blist_clone_to(struct block *blist, struct block *newb, int len,
995                             uint32_t offset)
996 {
997         struct block *b, *first;
998         unsigned int nr_bufs = 0;
999         unsigned int b_idx, newb_idx = 0;
1000         uint8_t *first_main_body = 0;
1001
1002         /* find the first block; keep offset relative to the latest b in the list */
1003         for (b = blist; b; b = b->next) {
1004                 if (BLEN(b) > offset)
1005                         break;
1006                 offset -= BLEN(b);
1007         }
1008         /* qcopy semantics: if you asked for an offset outside the block list, you
1009          * get an empty block back */
1010         if (!b)
1011                 return 0;
1012         first = b;
1013         /* upper bound for how many buffers we'll need in newb */
1014         for (/* b is set*/; b; b = b->next) {
1015                 nr_bufs += 1 + b->nr_extra_bufs;        /* 1 for the main body */
1016         }
1017         /* we might be holding a spinlock here, so we won't wait for kmalloc */
1018         if (block_add_extd(newb, nr_bufs, 0) != 0) {
1019                 /* caller will need to alloc these, then re-call us */
1020                 return nr_bufs;
1021         }
1022         for (b = first; b && len; b = b->next) {
1023                 b_idx = 0;
1024                 if (offset) {
1025                         if (offset < BHLEN(b)) {
1026                                 /* off is in the main body */
1027                                 len -= point_to_body(b, b->rp + offset, newb, newb_idx, len);
1028                                 newb_idx++;
1029                         } else {
1030                                 /* off is in one of the buffers (or just past the last one).
1031                                  * we're not going to point to b's main body at all. */
1032                                 offset -= BHLEN(b);
1033                                 assert(b->extra_data);
1034                                 /* assuming these extrabufs are packed, or at least that len
1035                                  * isn't gibberish */
1036                                 while (b->extra_data[b_idx].len <= offset) {
1037                                         offset -= b->extra_data[b_idx].len;
1038                                         b_idx++;
1039                                 }
1040                                 /* now offset is set to our offset in the b_idx'th buf */
1041                                 len -= point_to_buf(b, b_idx, offset, newb, newb_idx, len);
1042                                 newb_idx++;
1043                                 b_idx++;
1044                         }
1045                         offset = 0;
1046                 } else {
1047                         len -= point_to_body(b, b->rp, newb, newb_idx, len);
1048                         newb_idx++;
1049                 }
1050                 /* knock out all remaining bufs.  we only did one point_to_ op by now,
1051                  * and any point_to_ could be our last if it consumed all of len. */
1052                 for (int i = b_idx; (i < b->nr_extra_bufs) && len; i++) {
1053                         len -= point_to_buf(b, i, 0, newb, newb_idx, len);
1054                         newb_idx++;
1055                 }
1056         }
1057         return 0;
1058 }
1059
1060 struct block *blist_clone(struct block *blist, int header_len, int len,
1061                           uint32_t offset)
1062 {
1063         int ret;
1064         struct block *newb = block_alloc(header_len, MEM_WAIT);
1065         do {
1066                 ret = __blist_clone_to(blist, newb, len, offset);
1067                 if (ret)
1068                         block_add_extd(newb, ret, MEM_WAIT);
1069         } while (ret);
1070         return newb;
1071 }
1072
1073 /* given a queue, makes a single block with header_len reserved space in the
1074  * block main body, and the contents of [offset, len + offset) pointed to in the
1075  * new blocks ext_data. */
1076 struct block *qclone(struct queue *q, int header_len, int len, uint32_t offset)
1077 {
1078         int ret;
1079         struct block *newb = block_alloc(header_len, MEM_WAIT);
1080         /* the while loop should rarely be used: it would require someone
1081          * concurrently adding to the queue. */
1082         do {
1083                 /* TODO: RCU: protecting the q list (b->next) (need read lock) */
1084                 spin_lock_irqsave(&q->lock);
1085                 ret = __blist_clone_to(q->bfirst, newb, len, offset);
1086                 spin_unlock_irqsave(&q->lock);
1087                 if (ret)
1088                         block_add_extd(newb, ret, MEM_WAIT);
1089         } while (ret);
1090         return newb;
1091 }
1092
1093 /*
1094  *  copy from offset in the queue
1095  */
1096 struct block *qcopy_old(struct queue *q, int len, uint32_t offset)
1097 {
1098         int sofar;
1099         int n;
1100         struct block *b, *nb;
1101         uint8_t *p;
1102
1103         nb = block_alloc(len, MEM_WAIT);
1104
1105         spin_lock_irqsave(&q->lock);
1106
1107         /* go to offset */
1108         b = q->bfirst;
1109         for (sofar = 0;; sofar += n) {
1110                 if (b == NULL) {
1111                         spin_unlock_irqsave(&q->lock);
1112                         return nb;
1113                 }
1114                 n = BLEN(b);
1115                 if (sofar + n > offset) {
1116                         p = b->rp + offset - sofar;
1117                         n -= offset - sofar;
1118                         break;
1119                 }
1120                 QDEBUG checkb(b, "qcopy");
1121                 b = b->next;
1122         }
1123
1124         /* copy bytes from there */
1125         for (sofar = 0; sofar < len;) {
1126                 if (n > len - sofar)
1127                         n = len - sofar;
1128                 PANIC_EXTRA(b);
1129                 memmove(nb->wp, p, n);
1130                 qcopycnt += n;
1131                 sofar += n;
1132                 nb->wp += n;
1133                 b = b->next;
1134                 if (b == NULL)
1135                         break;
1136                 n = BLEN(b);
1137                 p = b->rp;
1138         }
1139         spin_unlock_irqsave(&q->lock);
1140
1141         return nb;
1142 }
1143
1144 struct block *qcopy(struct queue *q, int len, uint32_t offset)
1145 {
1146 #ifdef CONFIG_BLOCK_EXTRAS
1147         return qclone(q, 0, len, offset);
1148 #else
1149         return qcopy_old(q, len, offset);
1150 #endif
1151 }
1152
1153 static void qinit_common(struct queue *q)
1154 {
1155         spinlock_init_irqsave(&q->lock);
1156         rendez_init(&q->rr);
1157         rendez_init(&q->wr);
1158 }
1159
1160 /*
1161  *  called by non-interrupt code
1162  */
1163 struct queue *qopen(int limit, int msg, void (*kick) (void *), void *arg)
1164 {
1165         struct queue *q;
1166
1167         q = kzmalloc(sizeof(struct queue), 0);
1168         if (q == 0)
1169                 return 0;
1170         qinit_common(q);
1171
1172         q->limit = q->inilim = limit;
1173         q->kick = kick;
1174         q->arg = arg;
1175         q->state = msg;
1176         q->state |= Qstarve;
1177         q->eof = 0;
1178
1179         return q;
1180 }
1181
1182 /* open a queue to be bypassed */
1183 struct queue *qbypass(void (*bypass) (void *, struct block *), void *arg)
1184 {
1185         struct queue *q;
1186
1187         q = kzmalloc(sizeof(struct queue), 0);
1188         if (q == 0)
1189                 return 0;
1190         qinit_common(q);
1191
1192         q->limit = 0;
1193         q->arg = arg;
1194         q->bypass = bypass;
1195         q->state = 0;
1196
1197         return q;
1198 }
1199
1200 static int notempty(void *a)
1201 {
1202         struct queue *q = a;
1203
1204         return (q->state & Qclosed) || q->bfirst != 0;
1205 }
1206
1207 /* Block, waiting for the queue to be non-empty or closed.  Returns with
1208  * the spinlock held.  Returns TRUE when there queue is not empty, FALSE if it
1209  * was naturally closed.  Throws an error o/w. */
1210 static bool qwait_and_ilock(struct queue *q, int qio_flags)
1211 {
1212         while (1) {
1213                 spin_lock_irqsave(&q->lock);
1214                 if (q->bfirst != NULL)
1215                         return TRUE;
1216                 if (q->state & Qclosed) {
1217                         if (++q->eof > 3) {
1218                                 spin_unlock_irqsave(&q->lock);
1219                                 error(EPIPE, "multiple reads on a closed queue");
1220                         }
1221                         if (q->err[0]) {
1222                                 spin_unlock_irqsave(&q->lock);
1223                                 error(EPIPE, q->err);
1224                         }
1225                         return FALSE;
1226                 }
1227                 /* We set Qstarve regardless of whether we are non-blocking or not.
1228                  * Qstarve tracks the edge detection of the queue being empty. */
1229                 q->state |= Qstarve;
1230                 if (qio_flags & QIO_NON_BLOCK) {
1231                         spin_unlock_irqsave(&q->lock);
1232                         error(EAGAIN, "queue empty");
1233                 }
1234                 spin_unlock_irqsave(&q->lock);
1235                 /* may throw an error() */
1236                 rendez_sleep(&q->rr, notempty, q);
1237         }
1238 }
1239
1240 /*
1241  * add a block list to a queue
1242  * XXX basically the same as enqueue blist, and has no locking!
1243  */
1244 void qaddlist(struct queue *q, struct block *b)
1245 {
1246         /* TODO: q lock? */
1247         /* queue the block */
1248         if (q->bfirst)
1249                 q->blast->next = b;
1250         else
1251                 q->bfirst = b;
1252         q->len += blockalloclen(b);
1253         q->dlen += blocklen(b);
1254         while (b->next)
1255                 b = b->next;
1256         q->blast = b;
1257 }
1258
1259 static size_t read_from_block(struct block *b, uint8_t *to, size_t amt)
1260 {
1261         size_t copy_amt, retval = 0;
1262         struct extra_bdata *ebd;
1263
1264         copy_amt = MIN(BHLEN(b), amt);
1265         memcpy(to, b->rp, copy_amt);
1266         /* advance the rp, since this block not be completely consumed and future
1267          * reads need to know where to pick up from */
1268         b->rp += copy_amt;
1269         to += copy_amt;
1270         amt -= copy_amt;
1271         retval += copy_amt;
1272         for (int i = 0; (i < b->nr_extra_bufs) && amt; i++) {
1273                 ebd = &b->extra_data[i];
1274                 /* skip empty entires.  if we track this in the struct block, we can
1275                  * just start the for loop early */
1276                 if (!ebd->base || !ebd->len)
1277                         continue;
1278                 copy_amt = MIN(ebd->len, amt);
1279                 memcpy(to, (void*)(ebd->base + ebd->off), copy_amt);
1280                 /* we're actually consuming the entries, just like how we advance rp up
1281                  * above, and might only consume part of one. */
1282                 ebd->len -= copy_amt;
1283                 ebd->off += copy_amt;
1284                 b->extra_len -= copy_amt;
1285                 if (!ebd->len) {
1286                         /* we don't actually have to decref here.  it's also done in
1287                          * freeb().  this is the earliest we can free. */
1288                         kfree((void*)ebd->base);
1289                         ebd->base = ebd->off = 0;
1290                 }
1291                 to += copy_amt;
1292                 amt -= copy_amt;
1293                 retval += copy_amt;
1294         }
1295         return retval;
1296 }
1297
1298 /*
1299  *  copy the contents of a string of blocks into
1300  *  memory.  emptied blocks are freed.  return
1301  *  pointer to first unconsumed block.
1302  */
1303 struct block *bl2mem(uint8_t * p, struct block *b, int n)
1304 {
1305         int i;
1306         struct block *next;
1307
1308         /* could be slicker here, since read_from_block is smart */
1309         for (; b != NULL; b = next) {
1310                 i = BLEN(b);
1311                 if (i > n) {
1312                         /* partial block, consume some */
1313                         read_from_block(b, p, n);
1314                         return b;
1315                 }
1316                 /* full block, consume all and move on */
1317                 i = read_from_block(b, p, i);
1318                 n -= i;
1319                 p += i;
1320                 next = b->next;
1321                 freeb(b);
1322         }
1323         return NULL;
1324 }
1325
1326 /* Extract the contents of all blocks and copy to va, up to len.  Returns the
1327  * actual amount copied. */
1328 static size_t read_all_blocks(struct block *b, void *va, size_t len)
1329 {
1330         size_t sofar = 0;
1331         struct block *next;
1332
1333         do {
1334                 /* We should be draining every block completely. */
1335                 assert(BLEN(b) <= len - sofar);
1336                 sofar += read_from_block(b, va + sofar, len - sofar);
1337                 next = b->next;
1338                 freeb(b);
1339                 b = next;
1340         } while (b);
1341         return sofar;
1342 }
1343
1344 /*
1345  *  copy the contents of memory into a string of blocks.
1346  *  return NULL on error.
1347  */
1348 struct block *mem2bl(uint8_t * p, int len)
1349 {
1350         ERRSTACK(1);
1351         int n;
1352         struct block *b, *first, **l;
1353
1354         first = NULL;
1355         l = &first;
1356         if (waserror()) {
1357                 freeblist(first);
1358                 nexterror();
1359         }
1360         do {
1361                 n = len;
1362                 if (n > Maxatomic)
1363                         n = Maxatomic;
1364
1365                 *l = b = block_alloc(n, MEM_WAIT);
1366                 /* TODO consider extra_data */
1367                 memmove(b->wp, p, n);
1368                 b->wp += n;
1369                 p += n;
1370                 len -= n;
1371                 l = &b->next;
1372         } while (len > 0);
1373         poperror();
1374
1375         return first;
1376 }
1377
1378 /*
1379  *  put a block back to the front of the queue
1380  *  called with q ilocked
1381  */
1382 void qputback(struct queue *q, struct block *b)
1383 {
1384         b->next = q->bfirst;
1385         if (q->bfirst == NULL)
1386                 q->blast = b;
1387         q->bfirst = b;
1388         q->len += BALLOC(b);
1389         q->dlen += BLEN(b);
1390 }
1391
1392 /*
1393  *  flow control, get producer going again
1394  *  called with q ilocked
1395  */
1396 static void qwakeup_iunlock(struct queue *q, int qio_flags)
1397 {
1398         int dowakeup = 0;
1399
1400         /*
1401          *  if writer flow controlled, restart
1402          *
1403          *  This used to be
1404          *  q->len < q->limit/2
1405          *  but it slows down tcp too much for certain write sizes.
1406          *  I really don't understand it completely.  It may be
1407          *  due to the queue draining so fast that the transmission
1408          *  stalls waiting for the app to produce more data.  - presotto
1409          */
1410         if ((q->state & Qflow) && q->len < q->limit) {
1411                 q->state &= ~Qflow;
1412                 dowakeup = 1;
1413         }
1414
1415         spin_unlock_irqsave(&q->lock);
1416
1417         /* wakeup flow controlled writers */
1418         if (dowakeup) {
1419                 if (q->kick && !(qio_flags & QIO_DONT_KICK))
1420                         q->kick(q->arg);
1421                 rendez_wakeup(&q->wr);
1422         }
1423         qwake_cb(q, FDTAP_FILT_WRITABLE);
1424 }
1425
1426 /*
1427  *  get next block from a queue (up to a limit)
1428  *
1429  */
1430 struct block *qbread(struct queue *q, size_t len)
1431 {
1432         return __qbread(q, len, QIO_JUST_ONE_BLOCK | QIO_CAN_ERR_SLEEP, MEM_WAIT);
1433 }
1434
1435 struct block *qbread_nonblock(struct queue *q, size_t len)
1436 {
1437         return __qbread(q, len, QIO_JUST_ONE_BLOCK | QIO_CAN_ERR_SLEEP |
1438                         QIO_NON_BLOCK, MEM_WAIT);
1439 }
1440
1441 /* read up to len from a queue into vp. */
1442 size_t qread(struct queue *q, void *va, size_t len)
1443 {
1444         struct block *blist = __qbread(q, len, QIO_CAN_ERR_SLEEP, MEM_WAIT);
1445
1446         if (!blist)
1447                 return 0;
1448         return read_all_blocks(blist, va, len);
1449 }
1450
1451 size_t qread_nonblock(struct queue *q, void *va, size_t len)
1452 {
1453         struct block *blist = __qbread(q, len, QIO_CAN_ERR_SLEEP | QIO_NON_BLOCK,
1454                                        MEM_WAIT);
1455
1456         if (!blist)
1457                 return 0;
1458         return read_all_blocks(blist, va, len);
1459 }
1460
1461 static int qnotfull(void *a)
1462 {
1463         struct queue *q = a;
1464
1465         return q->len < q->limit || (q->state & Qclosed);
1466 }
1467
1468 /* Helper: enqueues a list of blocks to a queue.  Returns the total length. */
1469 static size_t enqueue_blist(struct queue *q, struct block *b)
1470 {
1471         size_t len, dlen;
1472
1473         if (q->bfirst)
1474                 q->blast->next = b;
1475         else
1476                 q->bfirst = b;
1477         len = BALLOC(b);
1478         dlen = BLEN(b);
1479         while (b->next) {
1480                 b = b->next;
1481                 len += BALLOC(b);
1482                 dlen += BLEN(b);
1483         }
1484         q->blast = b;
1485         q->len += len;
1486         q->dlen += dlen;
1487         return dlen;
1488 }
1489
1490 /* Adds block (which can be a list of blocks) to the queue, subject to
1491  * qio_flags.  Returns the length written on success or -1 on non-throwable
1492  * error.  Adjust qio_flags to control the value-added features!. */
1493 static ssize_t __qbwrite(struct queue *q, struct block *b, int qio_flags)
1494 {
1495         ssize_t ret;
1496         bool dowakeup = FALSE;
1497         bool was_empty;
1498
1499         if (q->bypass) {
1500                 ret = blocklen(b);
1501                 (*q->bypass) (q->arg, b);
1502                 return ret;
1503         }
1504         spin_lock_irqsave(&q->lock);
1505         was_empty = q->len == 0;
1506         if (q->state & Qclosed) {
1507                 spin_unlock_irqsave(&q->lock);
1508                 freeblist(b);
1509                 if (!(qio_flags & QIO_CAN_ERR_SLEEP))
1510                         return -1;
1511                 if (q->err[0])
1512                         error(EPIPE, q->err);
1513                 else
1514                         error(EPIPE, "connection closed");
1515         }
1516         if ((qio_flags & QIO_LIMIT) && (q->len >= q->limit)) {
1517                 /* drop overflow takes priority over regular non-blocking */
1518                 if ((qio_flags & QIO_DROP_OVERFLOW) || (q->state & Qdropoverflow)) {
1519                         spin_unlock_irqsave(&q->lock);
1520                         freeb(b);
1521                         return -1;
1522                 }
1523                 /* People shouldn't set NON_BLOCK without CAN_ERR, but we can be nice
1524                  * and catch it. */
1525                 if ((qio_flags & QIO_CAN_ERR_SLEEP) && (qio_flags & QIO_NON_BLOCK)) {
1526                         spin_unlock_irqsave(&q->lock);
1527                         freeb(b);
1528                         error(EAGAIN, "queue full");
1529                 }
1530         }
1531         ret = enqueue_blist(q, b);
1532         QDEBUG checkb(b, "__qbwrite");
1533         /* make sure other end gets awakened */
1534         if (q->state & Qstarve) {
1535                 q->state &= ~Qstarve;
1536                 dowakeup = TRUE;
1537         }
1538         spin_unlock_irqsave(&q->lock);
1539         /* TODO: not sure if the usage of a kick is mutually exclusive with a
1540          * wakeup, meaning that actual users either want a kick or have qreaders. */
1541         if (q->kick && (dowakeup || (q->state & Qkick)))
1542                 q->kick(q->arg);
1543         if (dowakeup)
1544                 rendez_wakeup(&q->rr);
1545         if (was_empty)
1546                 qwake_cb(q, FDTAP_FILT_READABLE);
1547         /*
1548          *  flow control, wait for queue to get below the limit
1549          *  before allowing the process to continue and queue
1550          *  more.  We do this here so that postnote can only
1551          *  interrupt us after the data has been queued.  This
1552          *  means that things like 9p flushes and ssl messages
1553          *  will not be disrupted by software interrupts.
1554          *
1555          *  Note - this is moderately dangerous since a process
1556          *  that keeps getting interrupted and rewriting will
1557          *  queue infinite crud.
1558          */
1559         if ((qio_flags & QIO_CAN_ERR_SLEEP) &&
1560             !(q->state & Qdropoverflow) && !(qio_flags & QIO_NON_BLOCK)) {
1561                 /* This is a racy peek at the q status.  If we accidentally block, we
1562                  * set Qflow, so someone should wake us.  If we accidentally don't
1563                  * block, we just returned to the user and let them slip a block past
1564                  * flow control. */
1565                 while (!qnotfull(q)) {
1566                         spin_lock_irqsave(&q->lock);
1567                         q->state |= Qflow;
1568                         spin_unlock_irqsave(&q->lock);
1569                         rendez_sleep(&q->wr, qnotfull, q);
1570                 }
1571         }
1572         return ret;
1573 }
1574
1575 /*
1576  *  add a block to a queue obeying flow control
1577  */
1578 ssize_t qbwrite(struct queue *q, struct block *b)
1579 {
1580         return __qbwrite(q, b, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
1581 }
1582
1583 ssize_t qbwrite_nonblock(struct queue *q, struct block *b)
1584 {
1585         return __qbwrite(q, b, QIO_CAN_ERR_SLEEP | QIO_LIMIT | QIO_NON_BLOCK);
1586 }
1587
1588 ssize_t qibwrite(struct queue *q, struct block *b)
1589 {
1590         return __qbwrite(q, b, 0);
1591 }
1592
1593 /* Helper, allocs a block and copies [from, from + len) into it.  Returns the
1594  * block on success, 0 on failure. */
1595 static struct block *build_block(void *from, size_t len, int mem_flags)
1596 {
1597         struct block *b;
1598         void *ext_buf;
1599
1600         /* If len is small, we don't need to bother with the extra_data.  But until
1601          * the whole stack can handle extd blocks, we'll use them unconditionally.
1602          * */
1603 #ifdef CONFIG_BLOCK_EXTRAS
1604         /* allocb builds in 128 bytes of header space to all blocks, but this is
1605          * only available via padblock (to the left).  we also need some space
1606          * for pullupblock for some basic headers (like icmp) that get written
1607          * in directly */
1608         b = block_alloc(64, mem_flags);
1609         if (!b)
1610                 return 0;
1611         ext_buf = kmalloc(len, mem_flags);
1612         if (!ext_buf) {
1613                 kfree(b);
1614                 return 0;
1615         }
1616         memcpy(ext_buf, from, len);
1617         if (block_add_extd(b, 1, mem_flags)) {
1618                 kfree(ext_buf);
1619                 kfree(b);
1620                 return 0;
1621         }
1622         b->extra_data[0].base = (uintptr_t)ext_buf;
1623         b->extra_data[0].off = 0;
1624         b->extra_data[0].len = len;
1625         b->extra_len += len;
1626 #else
1627         b = block_alloc(n, mem_flags);
1628         if (!b)
1629                 return 0;
1630         memmove(b->wp, from, len);
1631         b->wp += len;
1632 #endif
1633         return b;
1634 }
1635
1636 static ssize_t __qwrite(struct queue *q, void *vp, size_t len, int mem_flags,
1637                         int qio_flags)
1638 {
1639         size_t n, sofar;
1640         struct block *b;
1641         uint8_t *p = vp;
1642         void *ext_buf;
1643
1644         sofar = 0;
1645         do {
1646                 n = len - sofar;
1647                 /* This is 64K, the max amount per single block.  Still a good value? */
1648                 if (n > Maxatomic)
1649                         n = Maxatomic;
1650                 b = build_block(p + sofar, n, mem_flags);
1651                 if (!b)
1652                         break;
1653                 if (__qbwrite(q, b, qio_flags) < 0)
1654                         break;
1655                 sofar += n;
1656         } while ((sofar < len) && (q->state & Qmsg) == 0);
1657         return sofar;
1658 }
1659
1660 ssize_t qwrite(struct queue *q, void *vp, int len)
1661 {
1662         return __qwrite(q, vp, len, MEM_WAIT, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
1663 }
1664
1665 ssize_t qwrite_nonblock(struct queue *q, void *vp, int len)
1666 {
1667         return __qwrite(q, vp, len, MEM_WAIT, QIO_CAN_ERR_SLEEP | QIO_LIMIT |
1668                                               QIO_NON_BLOCK);
1669 }
1670
1671 ssize_t qiwrite(struct queue *q, void *vp, int len)
1672 {
1673         return __qwrite(q, vp, len, MEM_ATOMIC, 0);
1674 }
1675
1676 /*
1677  *  be extremely careful when calling this,
1678  *  as there is no reference accounting
1679  */
1680 void qfree(struct queue *q)
1681 {
1682         qclose(q);
1683         kfree(q);
1684 }
1685
1686 /*
1687  *  Mark a queue as closed.  No further IO is permitted.
1688  *  All blocks are released.
1689  */
1690 void qclose(struct queue *q)
1691 {
1692         struct block *bfirst;
1693
1694         if (q == NULL)
1695                 return;
1696
1697         /* mark it */
1698         spin_lock_irqsave(&q->lock);
1699         q->state |= Qclosed;
1700         q->state &= ~(Qflow | Qstarve | Qdropoverflow);
1701         q->err[0] = 0;
1702         bfirst = q->bfirst;
1703         q->bfirst = 0;
1704         q->len = 0;
1705         q->dlen = 0;
1706         spin_unlock_irqsave(&q->lock);
1707
1708         /* free queued blocks */
1709         freeblist(bfirst);
1710
1711         /* wake up readers/writers */
1712         rendez_wakeup(&q->rr);
1713         rendez_wakeup(&q->wr);
1714         qwake_cb(q, FDTAP_FILT_HANGUP);
1715 }
1716
1717 /* Mark a queue as closed.  Wakeup any readers.  Don't remove queued blocks.
1718  *
1719  * msg will be the errstr received by any waiters (qread, qbread, etc).  If
1720  * there is no message, which is what also happens during a natural qclose(),
1721  * those waiters will simply return 0.  qwriters will always error() on a
1722  * closed/hungup queue. */
1723 void qhangup(struct queue *q, char *msg)
1724 {
1725         /* mark it */
1726         spin_lock_irqsave(&q->lock);
1727         q->state |= Qclosed;
1728         if (msg == 0 || *msg == 0)
1729                 q->err[0] = 0;
1730         else
1731                 strlcpy(q->err, msg, ERRMAX);
1732         spin_unlock_irqsave(&q->lock);
1733
1734         /* wake up readers/writers */
1735         rendez_wakeup(&q->rr);
1736         rendez_wakeup(&q->wr);
1737         qwake_cb(q, FDTAP_FILT_HANGUP);
1738 }
1739
1740 /*
1741  *  return non-zero if the q is hungup
1742  */
1743 int qisclosed(struct queue *q)
1744 {
1745         return q->state & Qclosed;
1746 }
1747
1748 /*
1749  *  mark a queue as no longer hung up.  resets the wake_cb.
1750  */
1751 void qreopen(struct queue *q)
1752 {
1753         spin_lock_irqsave(&q->lock);
1754         q->state &= ~Qclosed;
1755         q->state |= Qstarve;
1756         q->eof = 0;
1757         q->limit = q->inilim;
1758         q->wake_cb = 0;
1759         q->wake_data = 0;
1760         spin_unlock_irqsave(&q->lock);
1761 }
1762
1763 /*
1764  *  return bytes queued
1765  */
1766 int qlen(struct queue *q)
1767 {
1768         return q->dlen;
1769 }
1770
1771 /*
1772  * return space remaining before flow control
1773  */
1774 int qwindow(struct queue *q)
1775 {
1776         int l;
1777
1778         l = q->limit - q->len;
1779         if (l < 0)
1780                 l = 0;
1781         return l;
1782 }
1783
1784 /*
1785  *  return true if we can read without blocking
1786  */
1787 int qcanread(struct queue *q)
1788 {
1789         return q->bfirst != 0;
1790 }
1791
1792 /*
1793  *  change queue limit
1794  */
1795 void qsetlimit(struct queue *q, int limit)
1796 {
1797         q->limit = limit;
1798 }
1799
1800 /*
1801  *  set whether writes drop overflowing blocks, or if we sleep
1802  */
1803 void qdropoverflow(struct queue *q, bool onoff)
1804 {
1805         if (onoff)
1806                 q->state |= Qdropoverflow;
1807         else
1808                 q->state &= ~Qdropoverflow;
1809 }
1810
1811 /*
1812  *  flush the output queue
1813  */
1814 void qflush(struct queue *q)
1815 {
1816         struct block *bfirst;
1817
1818         /* mark it */
1819         spin_lock_irqsave(&q->lock);
1820         bfirst = q->bfirst;
1821         q->bfirst = 0;
1822         q->len = 0;
1823         q->dlen = 0;
1824         spin_unlock_irqsave(&q->lock);
1825
1826         /* free queued blocks */
1827         freeblist(bfirst);
1828
1829         /* wake up writers */
1830         rendez_wakeup(&q->wr);
1831         qwake_cb(q, FDTAP_FILT_WRITABLE);
1832 }
1833
1834 int qfull(struct queue *q)
1835 {
1836         return q->len >= q->limit;
1837 }
1838
1839 int qstate(struct queue *q)
1840 {
1841         return q->state;
1842 }
1843
1844 void qdump(struct queue *q)
1845 {
1846         if (q)
1847                 printk("q=%p bfirst=%p blast=%p len=%d dlen=%d limit=%d state=#%x\n",
1848                            q, q->bfirst, q->blast, q->len, q->dlen, q->limit, q->state);
1849 }
1850
1851 /* On certain wakeup events, qio will call func(q, data, filter), where filter
1852  * marks the type of wakeup event (flags from FDTAP).
1853  *
1854  * There's no sync protection.  If you change the CB while the qio is running,
1855  * you might get a CB with the data or func from a previous set_wake_cb.  You
1856  * should set this once per queue and forget it.
1857  *
1858  * You can remove the CB by passing in 0 for the func.  Alternatively, you can
1859  * just make sure that the func(data) pair are valid until the queue is freed or
1860  * reopened. */
1861 void qio_set_wake_cb(struct queue *q, qio_wake_cb_t func, void *data)
1862 {
1863         q->wake_data = data;
1864         wmb();  /* if we see func, we'll also see the data for it */
1865         q->wake_cb = func;
1866 }
1867
1868 /* Helper for detecting whether we'll block on a read at this instant. */
1869 bool qreadable(struct queue *q)
1870 {
1871         return qlen(q) > 0;
1872 }
1873
1874 /* Helper for detecting whether we'll block on a write at this instant. */
1875 bool qwritable(struct queue *q)
1876 {
1877         return qwindow(q) > 0;
1878 }