qio: Remove q->len
[akaros.git] / kern / src / ns / qio.c
1 /* Copyright © 1994-1999 Lucent Technologies Inc.  All rights reserved.
2  * Portions Copyright © 1997-1999 Vita Nuova Limited
3  * Portions Copyright © 2000-2007 Vita Nuova Holdings Limited
4  *                                (www.vitanuova.com)
5  * Revisions Copyright © 2000-2007 Lucent Technologies Inc. and others
6  *
7  * Modified for the Akaros operating system:
8  * Copyright (c) 2013-2014 The Regents of the University of California
9  * Copyright (c) 2013-2015 Google Inc.
10  *
11  * Permission is hereby granted, free of charge, to any person obtaining a copy
12  * of this software and associated documentation files (the "Software"), to deal
13  * in the Software without restriction, including without limitation the rights
14  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15  * copies of the Software, and to permit persons to whom the Software is
16  * furnished to do so, subject to the following conditions:
17  *
18  * The above copyright notice and this permission notice shall be included in
19  * all copies or substantial portions of the Software.
20  *
21  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL THE
24  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
27  * SOFTWARE. */
28
29 #include <vfs.h>
30 #include <kfs.h>
31 #include <slab.h>
32 #include <kmalloc.h>
33 #include <kref.h>
34 #include <string.h>
35 #include <stdio.h>
36 #include <assert.h>
37 #include <error.h>
38 #include <cpio.h>
39 #include <pmap.h>
40 #include <smp.h>
41 #include <ip.h>
42
43 #define PANIC_EXTRA(b)                                                  \
44 {                                                                       \
45         if ((b)->extra_len) {                                           \
46                 printblock(b);                                          \
47                 backtrace();                                            \
48                 panic("%s doesn't handle extra_data", __FUNCTION__);    \
49         }                                                               \
50 }
51
52 static uint32_t padblockcnt;
53 static uint32_t concatblockcnt;
54 static uint32_t pullupblockcnt;
55 static uint32_t copyblockcnt;
56 static uint32_t consumecnt;
57 static uint32_t producecnt;
58 static uint32_t qcopycnt;
59
60 static int debugging;
61
62 #define QDEBUG  if(0)
63
64 /*
65  *  IO queues
66  */
67
68 struct queue {
69         spinlock_t lock;;
70
71         struct block *bfirst;           /* buffer */
72         struct block *blast;
73
74         int dlen;                                       /* data bytes in queue */
75         int limit;                                      /* max bytes in queue */
76         int inilim;                             /* initial limit */
77         int state;
78         int eof;                                        /* number of eofs read by user */
79
80         void (*kick) (void *);          /* restart output */
81         void (*bypass) (void *, struct block *);        /* bypass queue altogether */
82         void *arg;                                      /* argument to kick */
83
84         struct rendez rr;                       /* process waiting to read */
85         struct rendez wr;                       /* process waiting to write */
86         qio_wake_cb_t wake_cb;          /* callbacks for qio wakeups */
87         void *wake_data;
88
89         char err[ERRMAX];
90 };
91
92 enum {
93         Maxatomic = 64 * 1024,
94         QIO_CAN_ERR_SLEEP = (1 << 0),   /* can throw errors or block/sleep */
95         QIO_LIMIT = (1 << 1),                   /* respect q->limit */
96         QIO_DROP_OVERFLOW = (1 << 2),   /* alternative to setting qdropoverflow */
97         QIO_JUST_ONE_BLOCK = (1 << 3),  /* when qbreading, just get one block */
98         QIO_NON_BLOCK = (1 << 4),               /* throw EAGAIN instead of blocking */
99         QIO_DONT_KICK = (1 << 5),               /* don't kick when waking */
100 };
101
102 unsigned int qiomaxatomic = Maxatomic;
103
104 static size_t copy_to_block_body(struct block *to, void *from, size_t copy_amt);
105 static ssize_t __qbwrite(struct queue *q, struct block *b, int flags);
106 static struct block *__qbread(struct queue *q, size_t len, int qio_flags,
107                               int mem_flags);
108 static bool qwait_and_ilock(struct queue *q, int qio_flags);
109
110 /* Helper: fires a wake callback, sending 'filter' */
111 static void qwake_cb(struct queue *q, int filter)
112 {
113         if (q->wake_cb)
114                 q->wake_cb(q, q->wake_data, filter);
115 }
116
117 void ixsummary(void)
118 {
119         debugging ^= 1;
120         printd("pad %lu, concat %lu, pullup %lu, copy %lu\n",
121                    padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
122         printd("consume %lu, produce %lu, qcopy %lu\n",
123                    consumecnt, producecnt, qcopycnt);
124 }
125
126 /*
127  *  pad a block to the front (or the back if size is negative)
128  */
129 struct block *padblock(struct block *bp, int size)
130 {
131         int n;
132         struct block *nbp;
133         uint8_t bcksum = bp->flag & BCKSUM_FLAGS;
134         uint16_t checksum_start = bp->checksum_start;
135         uint16_t checksum_offset = bp->checksum_offset;
136         uint16_t mss = bp->mss;
137
138         QDEBUG checkb(bp, "padblock 1");
139         if (size >= 0) {
140                 if (bp->rp - bp->base >= size) {
141                         bp->checksum_start += size;
142                         bp->rp -= size;
143                         return bp;
144                 }
145
146                 PANIC_EXTRA(bp);
147                 if (bp->next)
148                         panic("padblock %p", getcallerpc(&bp));
149                 n = BLEN(bp);
150                 padblockcnt++;
151                 nbp = block_alloc(size + n, MEM_WAIT);
152                 nbp->rp += size;
153                 nbp->wp = nbp->rp;
154                 memmove(nbp->wp, bp->rp, n);
155                 nbp->wp += n;
156                 freeb(bp);
157                 nbp->rp -= size;
158         } else {
159                 size = -size;
160
161                 PANIC_EXTRA(bp);
162
163                 if (bp->next)
164                         panic("padblock %p", getcallerpc(&bp));
165
166                 if (bp->lim - bp->wp >= size)
167                         return bp;
168
169                 n = BLEN(bp);
170                 padblockcnt++;
171                 nbp = block_alloc(size + n, MEM_WAIT);
172                 memmove(nbp->wp, bp->rp, n);
173                 nbp->wp += n;
174                 freeb(bp);
175         }
176         if (bcksum) {
177                 nbp->flag |= bcksum;
178                 nbp->checksum_start = checksum_start;
179                 nbp->checksum_offset = checksum_offset;
180                 nbp->mss = mss;
181         }
182         QDEBUG checkb(nbp, "padblock 1");
183         return nbp;
184 }
185
186 /*
187  *  return count of bytes in a string of blocks
188  */
189 int blocklen(struct block *bp)
190 {
191         int len;
192
193         len = 0;
194         while (bp) {
195                 len += BLEN(bp);
196                 bp = bp->next;
197         }
198         return len;
199 }
200
201 /*
202  * return count of space in blocks
203  */
204 int blockalloclen(struct block *bp)
205 {
206         int len;
207
208         len = 0;
209         while (bp) {
210                 len += BALLOC(bp);
211                 bp = bp->next;
212         }
213         return len;
214 }
215
216 /*
217  *  copy the  string of blocks into
218  *  a single block and free the string
219  */
220 struct block *concatblock(struct block *bp)
221 {
222         int len;
223         struct block *nb, *f;
224
225         if (bp->next == 0)
226                 return bp;
227
228         /* probably use parts of qclone */
229         PANIC_EXTRA(bp);
230         nb = block_alloc(blocklen(bp), MEM_WAIT);
231         for (f = bp; f; f = f->next) {
232                 len = BLEN(f);
233                 memmove(nb->wp, f->rp, len);
234                 nb->wp += len;
235         }
236         concatblockcnt += BLEN(nb);
237         freeblist(bp);
238         QDEBUG checkb(nb, "concatblock 1");
239         return nb;
240 }
241
242 /* Makes an identical copy of the block, collapsing all the data into the block
243  * body.  It does not point to the contents of the original, it is a copy
244  * (unlike qclone).  Since we're copying, we might as well put the memory into
245  * one contiguous chunk. */
246 struct block *copyblock(struct block *bp, int mem_flags)
247 {
248         struct block *newb;
249         struct extra_bdata *ebd;
250         size_t amt;
251
252         QDEBUG checkb(bp, "copyblock 0");
253         newb = block_alloc(BLEN(bp), mem_flags);
254         if (!newb)
255                 return 0;
256         amt = copy_to_block_body(newb, bp->rp, BHLEN(bp));
257         assert(amt == BHLEN(bp));
258         for (int i = 0; i < bp->nr_extra_bufs; i++) {
259                 ebd = &bp->extra_data[i];
260                 if (!ebd->base || !ebd->len)
261                         continue;
262                 amt = copy_to_block_body(newb, (void*)ebd->base + ebd->off, ebd->len);
263                 assert(amt == ebd->len);
264         }
265         /* TODO: any other flags that need copied over? */
266         if (bp->flag & BCKSUM_FLAGS) {
267                 newb->flag |= (bp->flag & BCKSUM_FLAGS);
268                 newb->checksum_start = bp->checksum_start;
269                 newb->checksum_offset = bp->checksum_offset;
270                 newb->mss = bp->mss;
271         }
272         copyblockcnt++;
273         QDEBUG checkb(newb, "copyblock 1");
274         return newb;
275 }
276
277 /* Returns a block with the remaining contents of b all in the main body of the
278  * returned block.  Replace old references to b with the returned value (which
279  * may still be 'b', if no change was needed. */
280 struct block *linearizeblock(struct block *b)
281 {
282         struct block *newb;
283
284         if (!b->extra_len)
285                 return b;
286         newb = copyblock(b, MEM_WAIT);
287         freeb(b);
288         return newb;
289 }
290
291 /* Make sure the first block has at least n bytes in its main body.  Pulls up
292  * data from the *list* of blocks.  Returns 0 if there is not enough data in the
293  * block list. */
294 struct block *pullupblock(struct block *bp, int n)
295 {
296         int i, len, seglen;
297         struct block *nbp;
298         struct extra_bdata *ebd;
299
300         /*
301          *  this should almost always be true, it's
302          *  just to avoid every caller checking.
303          */
304         if (BHLEN(bp) >= n)
305                 return bp;
306
307         /* If there's no chance, just bail out now.  This might be slightly wasteful
308          * if there's a long blist that does have enough data. */
309         if (n > blocklen(bp))
310                 return 0;
311         /* a start at explicit main-body / header management */
312         if (bp->extra_len) {
313                 if (n > bp->lim - bp->rp) {
314                         /* would need to realloc a new block and copy everything over. */
315                         panic("can't pullup %d bytes, no place to put it: bp->lim %p, bp->rp %p, bp->lim-bp->rp %d\n",
316                                         n, bp->lim, bp->rp, bp->lim-bp->rp);
317                 }
318                 len = n - BHLEN(bp);
319                 /* Would need to recursively call this, or otherwise pull from later
320                  * blocks and put chunks of their data into the block we're building. */
321                 if (len > bp->extra_len)
322                         panic("pullup more than extra (%d, %d, %d)\n",
323                               n, BHLEN(bp), bp->extra_len);
324                 QDEBUG checkb(bp, "before pullup");
325                 for (int i = 0; (i < bp->nr_extra_bufs) && len; i++) {
326                         ebd = &bp->extra_data[i];
327                         if (!ebd->base || !ebd->len)
328                                 continue;
329                         seglen = MIN(ebd->len, len);
330                         memcpy(bp->wp, (void*)(ebd->base + ebd->off), seglen);
331                         bp->wp += seglen;
332                         len -= seglen;
333                         ebd->len -= seglen;
334                         ebd->off += seglen;
335                         bp->extra_len -= seglen;
336                         if (ebd->len == 0) {
337                                 kfree((void *)ebd->base);
338                                 ebd->off = 0;
339                                 ebd->base = 0;
340                         }
341                 }
342                 /* maybe just call pullupblock recursively here */
343                 if (len)
344                         panic("pullup %d bytes overdrawn\n", len);
345                 QDEBUG checkb(bp, "after pullup");
346                 return bp;
347         }
348
349         /*
350          *  if not enough room in the first block,
351          *  add another to the front of the list.
352          */
353         if (bp->lim - bp->rp < n) {
354                 nbp = block_alloc(n, MEM_WAIT);
355                 nbp->next = bp;
356                 bp = nbp;
357         }
358
359         /*
360          *  copy bytes from the trailing blocks into the first
361          */
362         n -= BLEN(bp);
363         while ((nbp = bp->next)) {
364                 i = BLEN(nbp);
365                 if (i > n) {
366                         memmove(bp->wp, nbp->rp, n);
367                         pullupblockcnt++;
368                         bp->wp += n;
369                         nbp->rp += n;
370                         QDEBUG checkb(bp, "pullupblock 1");
371                         return bp;
372                 } else {
373                         memmove(bp->wp, nbp->rp, i);
374                         pullupblockcnt++;
375                         bp->wp += i;
376                         bp->next = nbp->next;
377                         nbp->next = 0;
378                         freeb(nbp);
379                         n -= i;
380                         if (n == 0) {
381                                 QDEBUG checkb(bp, "pullupblock 2");
382                                 return bp;
383                         }
384                 }
385         }
386         freeb(bp);
387         return 0;
388 }
389
390 /*
391  *  make sure the first block has at least n bytes in its main body
392  */
393 struct block *pullupqueue(struct queue *q, int n)
394 {
395         struct block *b;
396
397         /* TODO: lock to protect the queue links? */
398         if ((BHLEN(q->bfirst) >= n))
399                 return q->bfirst;
400         q->bfirst = pullupblock(q->bfirst, n);
401         for (b = q->bfirst; b != NULL && b->next != NULL; b = b->next) ;
402         q->blast = b;
403         return q->bfirst;
404 }
405
406 /* throw away count bytes from the front of
407  * block's extradata.  Returns count of bytes
408  * thrown away
409  */
410
411 static int pullext(struct block *bp, int count)
412 {
413         struct extra_bdata *ed;
414         int i, rem, bytes = 0;
415
416         for (i = 0; bp->extra_len && count && i < bp->nr_extra_bufs; i++) {
417                 ed = &bp->extra_data[i];
418                 rem = MIN(count, ed->len);
419                 bp->extra_len -= rem;
420                 count -= rem;
421                 bytes += rem;
422                 ed->off += rem;
423                 ed->len -= rem;
424                 if (ed->len == 0) {
425                         kfree((void *)ed->base);
426                         ed->base = 0;
427                         ed->off = 0;
428                 }
429         }
430         return bytes;
431 }
432
433 /* throw away count bytes from the end of a
434  * block's extradata.  Returns count of bytes
435  * thrown away
436  */
437
438 static int dropext(struct block *bp, int count)
439 {
440         struct extra_bdata *ed;
441         int i, rem, bytes = 0;
442
443         for (i = bp->nr_extra_bufs - 1; bp->extra_len && count && i >= 0; i--) {
444                 ed = &bp->extra_data[i];
445                 rem = MIN(count, ed->len);
446                 bp->extra_len -= rem;
447                 count -= rem;
448                 bytes += rem;
449                 ed->len -= rem;
450                 if (ed->len == 0) {
451                         kfree((void *)ed->base);
452                         ed->base = 0;
453                         ed->off = 0;
454                 }
455         }
456         return bytes;
457 }
458
459 /*
460  *  throw away up to count bytes from a
461  *  list of blocks.  Return count of bytes
462  *  thrown away.
463  */
464 static int _pullblock(struct block **bph, int count, int free)
465 {
466         struct block *bp;
467         int n, bytes;
468
469         bytes = 0;
470         if (bph == NULL)
471                 return 0;
472
473         while (*bph != NULL && count != 0) {
474                 bp = *bph;
475
476                 n = MIN(BHLEN(bp), count);
477                 bytes += n;
478                 count -= n;
479                 bp->rp += n;
480                 n = pullext(bp, count);
481                 bytes += n;
482                 count -= n;
483                 QDEBUG checkb(bp, "pullblock ");
484                 if (BLEN(bp) == 0 && (free || count)) {
485                         *bph = bp->next;
486                         bp->next = NULL;
487                         freeb(bp);
488                 }
489         }
490         return bytes;
491 }
492
493 int pullblock(struct block **bph, int count)
494 {
495         return _pullblock(bph, count, 1);
496 }
497
498 /*
499  *  trim to len bytes starting at offset
500  */
501 struct block *trimblock(struct block *bp, int offset, int len)
502 {
503         uint32_t l, trim;
504         int olen = len;
505
506         QDEBUG checkb(bp, "trimblock 1");
507         if (blocklen(bp) < offset + len) {
508                 freeblist(bp);
509                 return NULL;
510         }
511
512         l =_pullblock(&bp, offset, 0);
513         if (bp == NULL)
514                 return NULL;
515         if (l != offset) {
516                 freeblist(bp);
517                 return NULL;
518         }
519
520         while ((l = BLEN(bp)) < len) {
521                 len -= l;
522                 bp = bp->next;
523         }
524
525         trim = BLEN(bp) - len;
526         trim -= dropext(bp, trim);
527         bp->wp -= trim;
528
529         if (bp->next) {
530                 freeblist(bp->next);
531                 bp->next = NULL;
532         }
533         return bp;
534 }
535
536 /* Adjust block @bp so that its size is exactly @len.
537  * If the size is increased, fill in the new contents with zeros.
538  * If the size is decreased, discard some of the old contents at the tail. */
539 struct block *adjustblock(struct block *bp, int len)
540 {
541         struct extra_bdata *ebd;
542         void *buf;
543         int i;
544
545         if (len < 0) {
546                 freeb(bp);
547                 return NULL;
548         }
549
550         if (len == BLEN(bp))
551                 return bp;
552
553         /* Shrink within block main body. */
554         if (len <= BHLEN(bp)) {
555                 free_block_extra(bp);
556                 bp->wp = bp->rp + len;
557                 QDEBUG checkb(bp, "adjustblock 1");
558                 return bp;
559         }
560
561         /* Need to grow. */
562         if (len > BLEN(bp)) {
563                 /* Grow within block main body. */
564                 if (bp->extra_len == 0 && bp->rp + len <= bp->lim) {
565                         memset(bp->wp, 0, len - BLEN(bp));
566                         bp->wp = bp->rp + len;
567                         QDEBUG checkb(bp, "adjustblock 2");
568                         return bp;
569                 }
570                 /* Grow with extra data buffers. */
571                 buf = kzmalloc(len - BLEN(bp), MEM_WAIT);
572                 block_append_extra(bp, (uintptr_t)buf, 0, len - BLEN(bp), MEM_WAIT);
573                 QDEBUG checkb(bp, "adjustblock 3");
574                 return bp;
575         }
576
577         /* Shrink extra data buffers.
578          * len is how much of ebd we need to keep.
579          * extra_len is re-accumulated. */
580         assert(bp->extra_len > 0);
581         len -= BHLEN(bp);
582         bp->extra_len = 0;
583         for (i = 0; i < bp->nr_extra_bufs; i++) {
584                 ebd = &bp->extra_data[i];
585                 if (len <= ebd->len)
586                         break;
587                 len -= ebd->len;
588                 bp->extra_len += ebd->len;
589         }
590         /* If len becomes zero, extra_data[i] should be freed. */
591         if (len > 0) {
592                 ebd = &bp->extra_data[i];
593                 ebd->len = len;
594                 bp->extra_len += ebd->len;
595                 i++;
596         }
597         for (; i < bp->nr_extra_bufs; i++) {
598                 ebd = &bp->extra_data[i];
599                 if (ebd->base)
600                         kfree((void*)ebd->base);
601                 ebd->base = ebd->off = ebd->len = 0;
602         }
603         QDEBUG checkb(bp, "adjustblock 4");
604         return bp;
605 }
606
607 /* Helper: removes and returns the first block from q */
608 static struct block *pop_first_block(struct queue *q)
609 {
610         struct block *b = q->bfirst;
611
612         q->dlen -= BLEN(b);
613         q->bfirst = b->next;
614         b->next = 0;
615         return b;
616 }
617
618 /* Helper: copies up to copy_amt from a buf to a block's main body (b->wp) */
619 static size_t copy_to_block_body(struct block *to, void *from, size_t copy_amt)
620 {
621         copy_amt = MIN(to->lim - to->wp, copy_amt);
622         memcpy(to->wp, from, copy_amt);
623         to->wp += copy_amt;
624         return copy_amt;
625 }
626
627 /* Accounting helper.  Block b in q lost amt extra_data */
628 static void block_and_q_lost_extra(struct block *b, struct queue *q, size_t amt)
629 {
630         b->extra_len -= amt;
631         q->dlen -= amt;
632 }
633
634 /* Helper: moves ebd from a block (in from_q) to another block.  The *ebd is
635  * fixed in 'from', so we move its contents and zero it out in 'from'.
636  *
637  * Returns the length moved (0 on failure). */
638 static size_t move_ebd(struct extra_bdata *ebd, struct block *to,
639                        struct block *from, struct queue *from_q)
640 {
641         size_t ret = ebd->len;
642
643         if (block_append_extra(to, ebd->base, ebd->off, ebd->len, MEM_ATOMIC))
644                 return 0;
645         block_and_q_lost_extra(from, from_q, ebd->len);
646         ebd->base = ebd->len = ebd->off = 0;
647         return ret;
648 }
649
650 /* Copy up to len bytes from q->bfirst to @to, leaving the block in place.  May
651  * return with less than len, but greater than 0, even if there is more
652  * available in q.
653  *
654  * At any moment that we have copied anything and things are tricky, we can just
655  * return.  The trickiness comes from a bunch of variables: is the main body
656  * empty?  How do we split the ebd?  If our alloc fails, then we can fall back
657  * to @to's main body, but only if we haven't used it yet. */
658 static size_t copy_from_first_block(struct queue *q, struct block *to,
659                                     size_t len)
660 {
661         struct block *from = q->bfirst;
662         size_t copy_amt, amt;
663         struct extra_bdata *ebd;
664
665         assert(len < BLEN(from));       /* sanity */
666         /* Try to extract from the main body */
667         copy_amt = MIN(BHLEN(from), len);
668         if (copy_amt) {
669                 copy_amt = copy_to_block_body(to, from->rp, copy_amt);
670                 from->rp += copy_amt;
671                 /* We only change dlen, (data len), not q->len, since the q still has
672                  * the same block memory allocation (no kfrees happened) */
673                 q->dlen -= copy_amt;
674         }
675         /* Try to extract the remainder from the extra data */
676         len -= copy_amt;
677         for (int i = 0; (i < from->nr_extra_bufs) && len; i++) {
678                 ebd = &from->extra_data[i];
679                 if (!ebd->base || !ebd->len)
680                         continue;
681                 if (len >= ebd->len) {
682                         amt = move_ebd(ebd, to, from, q);
683                         if (!amt) {
684                                 /* our internal alloc could have failed.   this ebd is now the
685                                  * last one we'll consider.  let's handle it separately and put
686                                  * it in the main body. */
687                                 if (copy_amt)
688                                         return copy_amt;
689                                 copy_amt = copy_to_block_body(to, (void*)ebd->base + ebd->off,
690                                                               ebd->len);
691                                 block_and_q_lost_extra(from, q, copy_amt);
692                                 break;
693                         }
694                         len -= amt;
695                         copy_amt += amt;
696                         continue;
697                 } else {
698                         /* If we're here, we reached our final ebd, which we'll need to
699                          * split to get anything from it. */
700                         if (copy_amt)
701                                 return copy_amt;
702                         copy_amt = copy_to_block_body(to, (void*)ebd->base + ebd->off,
703                                                       len);
704                         ebd->off += copy_amt;
705                         ebd->len -= copy_amt;
706                         block_and_q_lost_extra(from, q, copy_amt);
707                         break;
708                 }
709         }
710         if (len)
711                 assert(copy_amt);       /* sanity */
712         return copy_amt;
713 }
714
715 /* Return codes for __qbread and __try_qbread. */
716 enum {
717         QBR_OK,
718         QBR_FAIL,
719         QBR_SPARE,      /* we need a spare block */
720         QBR_AGAIN,      /* do it again, we are coalescing blocks */
721 };
722
723 /* Helper and back-end for __qbread: extracts and returns a list of blocks
724  * containing up to len bytes.  It may contain less than len even if q has more
725  * data.
726  *
727  * Returns a code interpreted by __qbread, and the returned blist in ret. */
728 static int __try_qbread(struct queue *q, size_t len, int qio_flags,
729                         struct block **real_ret, struct block *spare)
730 {
731         struct block *ret, *ret_last, *first;
732         size_t blen;
733         bool was_unwritable = FALSE;
734
735         if (qio_flags & QIO_CAN_ERR_SLEEP) {
736                 if (!qwait_and_ilock(q, qio_flags)) {
737                         spin_unlock_irqsave(&q->lock);
738                         return QBR_FAIL;
739                 }
740                 /* we qwaited and still hold the lock, so the q is not empty */
741                 first = q->bfirst;
742         } else {
743                 spin_lock_irqsave(&q->lock);
744                 first = q->bfirst;
745                 if (!first) {
746                         spin_unlock_irqsave(&q->lock);
747                         return QBR_FAIL;
748                 }
749         }
750         /* We need to check before adjusting q->len.  We're checking the writer's
751          * sleep condition / tap condition.  When set, we *might* be making an edge
752          * transition (from unwritable to writable), which needs to wake and fire
753          * taps.  But, our read might not drain the queue below q->lim.  We'll check
754          * again later to see if we should really wake them.  */
755         was_unwritable = !qwritable(q);
756         blen = BLEN(first);
757         if ((q->state & Qcoalesce) && (blen == 0)) {
758                 freeb(pop_first_block(q));
759                 spin_unlock_irqsave(&q->lock);
760                 /* Need to retry to make sure we have a first block */
761                 return QBR_AGAIN;
762         }
763         /* Qmsg: just return the first block.  Be careful, since our caller might
764          * not read all of the block and thus drop bytes.  Similar to SOCK_DGRAM. */
765         if (q->state & Qmsg) {
766                 ret = pop_first_block(q);
767                 goto out_ok;
768         }
769         /* Let's get at least something first - makes the code easier.  This way,
770          * we'll only ever split the block once. */
771         if (blen <= len) {
772                 ret = pop_first_block(q);
773                 len -= blen;
774         } else {
775                 /* need to split the block.  we won't actually take the first block out
776                  * of the queue - we're just extracting a little bit. */
777                 if (!spare) {
778                         /* We have nothing and need a spare block.  Retry! */
779                         spin_unlock_irqsave(&q->lock);
780                         return QBR_SPARE;
781                 }
782                 copy_from_first_block(q, spare, len);
783                 ret = spare;
784                 goto out_ok;
785         }
786         /* At this point, we just grabbed the first block.  We can try to grab some
787          * more, up to len (if they want). */
788         if (qio_flags & QIO_JUST_ONE_BLOCK)
789                 goto out_ok;
790         ret_last = ret;
791         while (q->bfirst && (len > 0)) {
792                 blen = BLEN(q->bfirst);
793                 if ((q->state & Qcoalesce) && (blen == 0)) {
794                         /* remove the intermediate 0 blocks */
795                         freeb(pop_first_block(q));
796                         continue;
797                 }
798                 if (blen > len) {
799                         /* We could try to split the block, but that's a huge pain.  For
800                          * instance, we might need to move the main body of b into an
801                          * extra_data of ret_last.  lots of ways for that to fail, and lots
802                          * of cases to consider.  Easier to just bail out.  This is why I
803                          * did the first block above: we don't need to worry about this. */
804                          break;
805                 }
806                 ret_last->next = pop_first_block(q);
807                 ret_last = ret_last->next;
808                 len -= blen;
809         }
810 out_ok:
811         /* Don't wake them up or fire tap if we didn't drain enough. */
812         if (!qwritable(q))
813                 was_unwritable = FALSE;
814         spin_unlock_irqsave(&q->lock);
815         if (was_unwritable) {
816                 if (q->kick && !(qio_flags & QIO_DONT_KICK))
817                         q->kick(q->arg);
818                 rendez_wakeup(&q->wr);
819                 qwake_cb(q, FDTAP_FILT_WRITABLE);
820         }
821         *real_ret = ret;
822         return QBR_OK;
823 }
824
825 /* Helper and front-end for __try_qbread: extracts and returns a list of blocks
826  * containing up to len bytes.  It may contain less than len even if q has more
827  * data.
828  *
829  * Returns 0 if the q is closed or would require blocking and !CAN_BLOCK.
830  *
831  * Technically, there's a weird corner case with !Qcoalesce and Qmsg where you
832  * could get a zero length block back. */
833 static struct block *__qbread(struct queue *q, size_t len, int qio_flags,
834                               int mem_flags)
835 {
836         struct block *ret = 0;
837         struct block *spare = 0;
838
839         while (1) {
840                 switch (__try_qbread(q, len, qio_flags, &ret, spare)) {
841                 case QBR_OK:
842                 case QBR_FAIL:
843                         if (spare && (ret != spare))
844                                 freeb(spare);
845                         return ret;
846                 case QBR_SPARE:
847                         assert(!spare);
848                         /* Due to some nastiness, we need a fresh block so we can read out
849                          * anything from the queue.  'len' seems like a reasonable amount.
850                          * Maybe we can get away with less. */
851                         spare = block_alloc(len, mem_flags);
852                         if (!spare)
853                                 return 0;
854                         break;
855                 case QBR_AGAIN:
856                         /* if the first block is 0 and we are Qcoalesce, then we'll need to
857                          * try again.  We bounce out of __try so we can perform the "is
858                          * there a block" logic again from the top. */
859                         break;
860                 }
861         }
862 }
863
864 /*
865  *  get next block from a queue, return null if nothing there
866  */
867 struct block *qget(struct queue *q)
868 {
869         /* since len == SIZE_MAX, we should never need to do a mem alloc */
870         return __qbread(q, SIZE_MAX, QIO_JUST_ONE_BLOCK, MEM_ATOMIC);
871 }
872
873 /* Throw away the next 'len' bytes in the queue returning the number actually
874  * discarded.
875  *
876  * If the bytes are in the queue, then they must be discarded.  The only time to
877  * return less than len is if the q itself has less than len bytes.
878  *
879  * This won't trigger a kick when waking up any sleepers.  This seems to be Plan
880  * 9's intent, since the TCP stack will deadlock if qdiscard kicks. */
881 size_t qdiscard(struct queue *q, size_t len)
882 {
883         struct block *blist;
884         size_t removed_amt;
885         size_t sofar = 0;
886
887         /* This is racy.  There could be multiple qdiscarders or other consumers,
888          * where the consumption could be interleaved. */
889         while (qlen(q) && len) {
890                 blist = __qbread(q, len, QIO_DONT_KICK, MEM_WAIT);
891                 removed_amt = freeblist(blist);
892                 sofar += removed_amt;
893                 len -= removed_amt;
894         }
895         return sofar;
896 }
897
898 ssize_t qpass(struct queue *q, struct block *b)
899 {
900         return __qbwrite(q, b, QIO_LIMIT | QIO_DROP_OVERFLOW);
901 }
902
903 ssize_t qpassnolim(struct queue *q, struct block *b)
904 {
905         return __qbwrite(q, b, 0);
906 }
907
908 /*
909  *  if the allocated space is way out of line with the used
910  *  space, reallocate to a smaller block
911  */
912 struct block *packblock(struct block *bp)
913 {
914         struct block **l, *nbp;
915         int n;
916
917         if (bp->extra_len)
918                 return bp;
919         for (l = &bp; *l; l = &(*l)->next) {
920                 nbp = *l;
921                 n = BLEN(nbp);
922                 if ((n << 2) < BALLOC(nbp)) {
923                         *l = block_alloc(n, MEM_WAIT);
924                         memmove((*l)->wp, nbp->rp, n);
925                         (*l)->wp += n;
926                         (*l)->next = nbp->next;
927                         freeb(nbp);
928                 }
929         }
930
931         return bp;
932 }
933
934 /* Add an extra_data entry to newb at newb_idx pointing to b's body, starting at
935  * body_rp, for up to len.  Returns the len consumed.
936  *
937  * The base is 'b', so that we can kfree it later.  This currently ties us to
938  * using kfree for the release method for all extra_data.
939  *
940  * It is possible to have a body size that is 0, if there is no offset, and
941  * b->wp == b->rp.  This will have an extra data entry of 0 length. */
942 static size_t point_to_body(struct block *b, uint8_t *body_rp,
943                             struct block *newb, unsigned int newb_idx,
944                             size_t len)
945 {
946         struct extra_bdata *ebd = &newb->extra_data[newb_idx];
947
948         assert(newb_idx < newb->nr_extra_bufs);
949
950         kmalloc_incref(b);
951         ebd->base = (uintptr_t)b;
952         ebd->off = (uint32_t)(body_rp - (uint8_t*)b);
953         ebd->len = MIN(b->wp - body_rp, len);   /* think of body_rp as b->rp */
954         assert((int)ebd->len >= 0);
955         newb->extra_len += ebd->len;
956         return ebd->len;
957 }
958
959 /* Add an extra_data entry to newb at newb_idx pointing to b's b_idx'th
960  * extra_data buf, at b_off within that buffer, for up to len.  Returns the len
961  * consumed.
962  *
963  * We can have blocks with 0 length, but they are still refcnt'd.  See above. */
964 static size_t point_to_buf(struct block *b, unsigned int b_idx, uint32_t b_off,
965                            struct block *newb, unsigned int newb_idx,
966                            size_t len)
967 {
968         struct extra_bdata *n_ebd = &newb->extra_data[newb_idx];
969         struct extra_bdata *b_ebd = &b->extra_data[b_idx];
970
971         assert(b_idx < b->nr_extra_bufs);
972         assert(newb_idx < newb->nr_extra_bufs);
973
974         kmalloc_incref((void*)b_ebd->base);
975         n_ebd->base = b_ebd->base;
976         n_ebd->off = b_ebd->off + b_off;
977         n_ebd->len = MIN(b_ebd->len - b_off, len);
978         newb->extra_len += n_ebd->len;
979         return n_ebd->len;
980 }
981
982 /* given a string of blocks, sets up the new block's extra_data such that it
983  * *points* to the contents of the blist [offset, len + offset).  This does not
984  * make a separate copy of the contents of the blist.
985  *
986  * returns 0 on success.  the only failure is if the extra_data array was too
987  * small, so this returns a positive integer saying how big the extra_data needs
988  * to be.
989  *
990  * callers are responsible for protecting the list structure. */
991 static int __blist_clone_to(struct block *blist, struct block *newb, int len,
992                             uint32_t offset)
993 {
994         struct block *b, *first;
995         unsigned int nr_bufs = 0;
996         unsigned int b_idx, newb_idx = 0;
997         uint8_t *first_main_body = 0;
998
999         /* find the first block; keep offset relative to the latest b in the list */
1000         for (b = blist; b; b = b->next) {
1001                 if (BLEN(b) > offset)
1002                         break;
1003                 offset -= BLEN(b);
1004         }
1005         /* qcopy semantics: if you asked for an offset outside the block list, you
1006          * get an empty block back */
1007         if (!b)
1008                 return 0;
1009         first = b;
1010         /* upper bound for how many buffers we'll need in newb */
1011         for (/* b is set*/; b; b = b->next) {
1012                 nr_bufs += 1 + b->nr_extra_bufs;        /* 1 for the main body */
1013         }
1014         /* we might be holding a spinlock here, so we won't wait for kmalloc */
1015         if (block_add_extd(newb, nr_bufs, 0) != 0) {
1016                 /* caller will need to alloc these, then re-call us */
1017                 return nr_bufs;
1018         }
1019         for (b = first; b && len; b = b->next) {
1020                 b_idx = 0;
1021                 if (offset) {
1022                         if (offset < BHLEN(b)) {
1023                                 /* off is in the main body */
1024                                 len -= point_to_body(b, b->rp + offset, newb, newb_idx, len);
1025                                 newb_idx++;
1026                         } else {
1027                                 /* off is in one of the buffers (or just past the last one).
1028                                  * we're not going to point to b's main body at all. */
1029                                 offset -= BHLEN(b);
1030                                 assert(b->extra_data);
1031                                 /* assuming these extrabufs are packed, or at least that len
1032                                  * isn't gibberish */
1033                                 while (b->extra_data[b_idx].len <= offset) {
1034                                         offset -= b->extra_data[b_idx].len;
1035                                         b_idx++;
1036                                 }
1037                                 /* now offset is set to our offset in the b_idx'th buf */
1038                                 len -= point_to_buf(b, b_idx, offset, newb, newb_idx, len);
1039                                 newb_idx++;
1040                                 b_idx++;
1041                         }
1042                         offset = 0;
1043                 } else {
1044                         len -= point_to_body(b, b->rp, newb, newb_idx, len);
1045                         newb_idx++;
1046                 }
1047                 /* knock out all remaining bufs.  we only did one point_to_ op by now,
1048                  * and any point_to_ could be our last if it consumed all of len. */
1049                 for (int i = b_idx; (i < b->nr_extra_bufs) && len; i++) {
1050                         len -= point_to_buf(b, i, 0, newb, newb_idx, len);
1051                         newb_idx++;
1052                 }
1053         }
1054         return 0;
1055 }
1056
1057 struct block *blist_clone(struct block *blist, int header_len, int len,
1058                           uint32_t offset)
1059 {
1060         int ret;
1061         struct block *newb = block_alloc(header_len, MEM_WAIT);
1062         do {
1063                 ret = __blist_clone_to(blist, newb, len, offset);
1064                 if (ret)
1065                         block_add_extd(newb, ret, MEM_WAIT);
1066         } while (ret);
1067         return newb;
1068 }
1069
1070 /* given a queue, makes a single block with header_len reserved space in the
1071  * block main body, and the contents of [offset, len + offset) pointed to in the
1072  * new blocks ext_data.  This does not make a copy of the q's contents, though
1073  * you do have a ref count on the memory. */
1074 struct block *qclone(struct queue *q, int header_len, int len, uint32_t offset)
1075 {
1076         int ret;
1077         struct block *newb = block_alloc(header_len, MEM_WAIT);
1078         /* the while loop should rarely be used: it would require someone
1079          * concurrently adding to the queue. */
1080         do {
1081                 /* TODO: RCU: protecting the q list (b->next) (need read lock) */
1082                 spin_lock_irqsave(&q->lock);
1083                 ret = __blist_clone_to(q->bfirst, newb, len, offset);
1084                 spin_unlock_irqsave(&q->lock);
1085                 if (ret)
1086                         block_add_extd(newb, ret, MEM_WAIT);
1087         } while (ret);
1088         return newb;
1089 }
1090
1091 /*
1092  *  copy from offset in the queue
1093  */
1094 struct block *qcopy_old(struct queue *q, int len, uint32_t offset)
1095 {
1096         int sofar;
1097         int n;
1098         struct block *b, *nb;
1099         uint8_t *p;
1100
1101         nb = block_alloc(len, MEM_WAIT);
1102
1103         spin_lock_irqsave(&q->lock);
1104
1105         /* go to offset */
1106         b = q->bfirst;
1107         for (sofar = 0;; sofar += n) {
1108                 if (b == NULL) {
1109                         spin_unlock_irqsave(&q->lock);
1110                         return nb;
1111                 }
1112                 n = BLEN(b);
1113                 if (sofar + n > offset) {
1114                         p = b->rp + offset - sofar;
1115                         n -= offset - sofar;
1116                         break;
1117                 }
1118                 QDEBUG checkb(b, "qcopy");
1119                 b = b->next;
1120         }
1121
1122         /* copy bytes from there */
1123         for (sofar = 0; sofar < len;) {
1124                 if (n > len - sofar)
1125                         n = len - sofar;
1126                 PANIC_EXTRA(b);
1127                 memmove(nb->wp, p, n);
1128                 qcopycnt += n;
1129                 sofar += n;
1130                 nb->wp += n;
1131                 b = b->next;
1132                 if (b == NULL)
1133                         break;
1134                 n = BLEN(b);
1135                 p = b->rp;
1136         }
1137         spin_unlock_irqsave(&q->lock);
1138
1139         return nb;
1140 }
1141
1142 struct block *qcopy(struct queue *q, int len, uint32_t offset)
1143 {
1144 #ifdef CONFIG_BLOCK_EXTRAS
1145         return qclone(q, 0, len, offset);
1146 #else
1147         return qcopy_old(q, len, offset);
1148 #endif
1149 }
1150
1151 static void qinit_common(struct queue *q)
1152 {
1153         spinlock_init_irqsave(&q->lock);
1154         rendez_init(&q->rr);
1155         rendez_init(&q->wr);
1156 }
1157
1158 /*
1159  *  called by non-interrupt code
1160  */
1161 struct queue *qopen(int limit, int msg, void (*kick) (void *), void *arg)
1162 {
1163         struct queue *q;
1164
1165         q = kzmalloc(sizeof(struct queue), 0);
1166         if (q == 0)
1167                 return 0;
1168         qinit_common(q);
1169
1170         q->limit = q->inilim = limit;
1171         q->kick = kick;
1172         q->arg = arg;
1173         q->state = msg;
1174         q->eof = 0;
1175
1176         return q;
1177 }
1178
1179 /* open a queue to be bypassed */
1180 struct queue *qbypass(void (*bypass) (void *, struct block *), void *arg)
1181 {
1182         struct queue *q;
1183
1184         q = kzmalloc(sizeof(struct queue), 0);
1185         if (q == 0)
1186                 return 0;
1187         qinit_common(q);
1188
1189         q->limit = 0;
1190         q->arg = arg;
1191         q->bypass = bypass;
1192         q->state = 0;
1193
1194         return q;
1195 }
1196
1197 static int notempty(void *a)
1198 {
1199         struct queue *q = a;
1200
1201         return (q->state & Qclosed) || q->bfirst != 0;
1202 }
1203
1204 /* Block, waiting for the queue to be non-empty or closed.  Returns with
1205  * the spinlock held.  Returns TRUE when there queue is not empty, FALSE if it
1206  * was naturally closed.  Throws an error o/w. */
1207 static bool qwait_and_ilock(struct queue *q, int qio_flags)
1208 {
1209         while (1) {
1210                 spin_lock_irqsave(&q->lock);
1211                 if (q->bfirst != NULL)
1212                         return TRUE;
1213                 if (q->state & Qclosed) {
1214                         if (++q->eof > 3) {
1215                                 spin_unlock_irqsave(&q->lock);
1216                                 error(EPIPE, "multiple reads on a closed queue");
1217                         }
1218                         if (q->err[0]) {
1219                                 spin_unlock_irqsave(&q->lock);
1220                                 error(EPIPE, q->err);
1221                         }
1222                         return FALSE;
1223                 }
1224                 if (qio_flags & QIO_NON_BLOCK) {
1225                         spin_unlock_irqsave(&q->lock);
1226                         error(EAGAIN, "queue empty");
1227                 }
1228                 spin_unlock_irqsave(&q->lock);
1229                 /* As with the producer side, we check for a condition while holding the
1230                  * q->lock, decide to sleep, then unlock.  It's like the "check, signal,
1231                  * check again" pattern, but we do it conditionally.  Both sides agree
1232                  * synchronously to do it, and those decisions are made while holding
1233                  * q->lock.  I think this is OK.
1234                  *
1235                  * The invariant is that no reader sleeps when the queue has data.
1236                  * While holding the rendez lock, if we see there's no data, we'll
1237                  * sleep.  Since we saw there was no data, the next writer will see (or
1238                  * already saw) no data, and then the writer decides to rendez_wake,
1239                  * which will grab the rendez lock.  If the writer already did that,
1240                  * then we'll see notempty when we do our check-again. */
1241                 rendez_sleep(&q->rr, notempty, q);
1242         }
1243 }
1244
1245 /*
1246  * add a block list to a queue
1247  * XXX basically the same as enqueue blist, and has no locking!
1248  */
1249 void qaddlist(struct queue *q, struct block *b)
1250 {
1251         /* TODO: q lock? */
1252         /* queue the block */
1253         if (q->bfirst)
1254                 q->blast->next = b;
1255         else
1256                 q->bfirst = b;
1257         q->dlen += blocklen(b);
1258         while (b->next)
1259                 b = b->next;
1260         q->blast = b;
1261 }
1262
1263 static size_t read_from_block(struct block *b, uint8_t *to, size_t amt)
1264 {
1265         size_t copy_amt, retval = 0;
1266         struct extra_bdata *ebd;
1267
1268         copy_amt = MIN(BHLEN(b), amt);
1269         memcpy(to, b->rp, copy_amt);
1270         /* advance the rp, since this block not be completely consumed and future
1271          * reads need to know where to pick up from */
1272         b->rp += copy_amt;
1273         to += copy_amt;
1274         amt -= copy_amt;
1275         retval += copy_amt;
1276         for (int i = 0; (i < b->nr_extra_bufs) && amt; i++) {
1277                 ebd = &b->extra_data[i];
1278                 /* skip empty entires.  if we track this in the struct block, we can
1279                  * just start the for loop early */
1280                 if (!ebd->base || !ebd->len)
1281                         continue;
1282                 copy_amt = MIN(ebd->len, amt);
1283                 memcpy(to, (void*)(ebd->base + ebd->off), copy_amt);
1284                 /* we're actually consuming the entries, just like how we advance rp up
1285                  * above, and might only consume part of one. */
1286                 ebd->len -= copy_amt;
1287                 ebd->off += copy_amt;
1288                 b->extra_len -= copy_amt;
1289                 if (!ebd->len) {
1290                         /* we don't actually have to decref here.  it's also done in
1291                          * freeb().  this is the earliest we can free. */
1292                         kfree((void*)ebd->base);
1293                         ebd->base = ebd->off = 0;
1294                 }
1295                 to += copy_amt;
1296                 amt -= copy_amt;
1297                 retval += copy_amt;
1298         }
1299         return retval;
1300 }
1301
1302 /*
1303  *  copy the contents of a string of blocks into
1304  *  memory.  emptied blocks are freed.  return
1305  *  pointer to first unconsumed block.
1306  */
1307 struct block *bl2mem(uint8_t * p, struct block *b, int n)
1308 {
1309         int i;
1310         struct block *next;
1311
1312         /* could be slicker here, since read_from_block is smart */
1313         for (; b != NULL; b = next) {
1314                 i = BLEN(b);
1315                 if (i > n) {
1316                         /* partial block, consume some */
1317                         read_from_block(b, p, n);
1318                         return b;
1319                 }
1320                 /* full block, consume all and move on */
1321                 i = read_from_block(b, p, i);
1322                 n -= i;
1323                 p += i;
1324                 next = b->next;
1325                 freeb(b);
1326         }
1327         return NULL;
1328 }
1329
1330 /* Extract the contents of all blocks and copy to va, up to len.  Returns the
1331  * actual amount copied. */
1332 static size_t read_all_blocks(struct block *b, void *va, size_t len)
1333 {
1334         size_t sofar = 0;
1335         struct block *next;
1336
1337         do {
1338                 /* We should be draining every block completely. */
1339                 assert(BLEN(b) <= len - sofar);
1340                 assert(va);
1341                 assert(va + sofar);
1342                 assert(b->rp);
1343                 sofar += read_from_block(b, va + sofar, len - sofar);
1344                 next = b->next;
1345                 freeb(b);
1346                 b = next;
1347         } while (b);
1348         return sofar;
1349 }
1350
1351 /*
1352  *  copy the contents of memory into a string of blocks.
1353  *  return NULL on error.
1354  */
1355 struct block *mem2bl(uint8_t * p, int len)
1356 {
1357         ERRSTACK(1);
1358         int n;
1359         struct block *b, *first, **l;
1360
1361         first = NULL;
1362         l = &first;
1363         if (waserror()) {
1364                 freeblist(first);
1365                 nexterror();
1366         }
1367         do {
1368                 n = len;
1369                 if (n > Maxatomic)
1370                         n = Maxatomic;
1371
1372                 *l = b = block_alloc(n, MEM_WAIT);
1373                 /* TODO consider extra_data */
1374                 memmove(b->wp, p, n);
1375                 b->wp += n;
1376                 p += n;
1377                 len -= n;
1378                 l = &b->next;
1379         } while (len > 0);
1380         poperror();
1381
1382         return first;
1383 }
1384
1385 /*
1386  *  put a block back to the front of the queue
1387  *  called with q ilocked
1388  */
1389 void qputback(struct queue *q, struct block *b)
1390 {
1391         b->next = q->bfirst;
1392         if (q->bfirst == NULL)
1393                 q->blast = b;
1394         q->bfirst = b;
1395         q->dlen += BLEN(b);
1396 }
1397
1398 /*
1399  *  get next block from a queue (up to a limit)
1400  *
1401  */
1402 struct block *qbread(struct queue *q, size_t len)
1403 {
1404         return __qbread(q, len, QIO_JUST_ONE_BLOCK | QIO_CAN_ERR_SLEEP, MEM_WAIT);
1405 }
1406
1407 struct block *qbread_nonblock(struct queue *q, size_t len)
1408 {
1409         return __qbread(q, len, QIO_JUST_ONE_BLOCK | QIO_CAN_ERR_SLEEP |
1410                         QIO_NON_BLOCK, MEM_WAIT);
1411 }
1412
1413 /* read up to len from a queue into vp. */
1414 size_t qread(struct queue *q, void *va, size_t len)
1415 {
1416         struct block *blist = __qbread(q, len, QIO_CAN_ERR_SLEEP, MEM_WAIT);
1417
1418         if (!blist)
1419                 return 0;
1420         return read_all_blocks(blist, va, len);
1421 }
1422
1423 size_t qread_nonblock(struct queue *q, void *va, size_t len)
1424 {
1425         struct block *blist = __qbread(q, len, QIO_CAN_ERR_SLEEP | QIO_NON_BLOCK,
1426                                        MEM_WAIT);
1427
1428         if (!blist)
1429                 return 0;
1430         return read_all_blocks(blist, va, len);
1431 }
1432
1433 static int qnotfull(void *a)
1434 {
1435         struct queue *q = a;
1436
1437         return qwritable(q) || (q->state & Qclosed);
1438 }
1439
1440 /* Helper: enqueues a list of blocks to a queue.  Returns the total length. */
1441 static size_t enqueue_blist(struct queue *q, struct block *b)
1442 {
1443         size_t dlen;
1444
1445         if (q->bfirst)
1446                 q->blast->next = b;
1447         else
1448                 q->bfirst = b;
1449         dlen = BLEN(b);
1450         while (b->next) {
1451                 b = b->next;
1452                 dlen += BLEN(b);
1453         }
1454         q->blast = b;
1455         q->dlen += dlen;
1456         return dlen;
1457 }
1458
1459 /* Adds block (which can be a list of blocks) to the queue, subject to
1460  * qio_flags.  Returns the length written on success or -1 on non-throwable
1461  * error.  Adjust qio_flags to control the value-added features!. */
1462 static ssize_t __qbwrite(struct queue *q, struct block *b, int qio_flags)
1463 {
1464         ssize_t ret;
1465         bool was_unreadable;
1466
1467         if (q->bypass) {
1468                 ret = blocklen(b);
1469                 (*q->bypass) (q->arg, b);
1470                 return ret;
1471         }
1472         spin_lock_irqsave(&q->lock);
1473         was_unreadable = q->dlen == 0;
1474         if (q->state & Qclosed) {
1475                 spin_unlock_irqsave(&q->lock);
1476                 freeblist(b);
1477                 if (!(qio_flags & QIO_CAN_ERR_SLEEP))
1478                         return -1;
1479                 if (q->err[0])
1480                         error(EPIPE, q->err);
1481                 else
1482                         error(EPIPE, "connection closed");
1483         }
1484         if ((qio_flags & QIO_LIMIT) && (q->dlen >= q->limit)) {
1485                 /* drop overflow takes priority over regular non-blocking */
1486                 if ((qio_flags & QIO_DROP_OVERFLOW) || (q->state & Qdropoverflow)) {
1487                         spin_unlock_irqsave(&q->lock);
1488                         freeb(b);
1489                         return -1;
1490                 }
1491                 /* People shouldn't set NON_BLOCK without CAN_ERR, but we can be nice
1492                  * and catch it. */
1493                 if ((qio_flags & QIO_CAN_ERR_SLEEP) && (qio_flags & QIO_NON_BLOCK)) {
1494                         spin_unlock_irqsave(&q->lock);
1495                         freeb(b);
1496                         error(EAGAIN, "queue full");
1497                 }
1498         }
1499         ret = enqueue_blist(q, b);
1500         QDEBUG checkb(b, "__qbwrite");
1501         spin_unlock_irqsave(&q->lock);
1502         /* TODO: not sure if the usage of a kick is mutually exclusive with a
1503          * wakeup, meaning that actual users either want a kick or have qreaders. */
1504         if (q->kick && (was_unreadable || (q->state & Qkick)))
1505                 q->kick(q->arg);
1506         if (was_unreadable) {
1507                 /* Unlike the read side, there's no double-check to make sure the queue
1508                  * transitioned across an edge.  We know we added something, so that's
1509                  * enough.  We wake if the queue was empty.  Both sides are the same, in
1510                  * that the condition for which we do the rendez_wakeup() is the same as
1511                  * the condition done for the rendez_sleep(). */
1512                 rendez_wakeup(&q->rr);
1513                 qwake_cb(q, FDTAP_FILT_READABLE);
1514         }
1515         /*
1516          *  flow control, wait for queue to get below the limit
1517          *  before allowing the process to continue and queue
1518          *  more.  We do this here so that postnote can only
1519          *  interrupt us after the data has been queued.  This
1520          *  means that things like 9p flushes and ssl messages
1521          *  will not be disrupted by software interrupts.
1522          *
1523          *  Note - this is moderately dangerous since a process
1524          *  that keeps getting interrupted and rewriting will
1525          *  queue infinite crud.
1526          */
1527         if ((qio_flags & QIO_CAN_ERR_SLEEP) &&
1528             !(q->state & Qdropoverflow) && !(qio_flags & QIO_NON_BLOCK)) {
1529                 /* This is a racy peek at the q status.  If we accidentally block, our
1530                  * rendez will return.  The rendez's peak (qnotfull) is also racy w.r.t.
1531                  * the q's spinlock (that lock protects writes, but not reads).
1532                  *
1533                  * Here's the deal: when holding the rendez lock, if we see the sleep
1534                  * condition, the consumer will wake us.  The condition will only ever
1535                  * be changed by the next qbread() (consumer, changes q->dlen).  That
1536                  * code will do a rendez wake, which will spin on the rendez lock,
1537                  * meaning it won't procede until we either see the new state (and
1538                  * return) or put ourselves on the rendez, and wake up.
1539                  *
1540                  * The pattern is one side writes mem, then signals.  Our side checks
1541                  * the signal, then reads the mem.  The goal is to not miss seeing the
1542                  * signal AND missing the memory write.  In this specific case, the
1543                  * signal is actually synchronous (the rendez lock) and not basic shared
1544                  * memory.
1545                  *
1546                  * Oh, and we spin in case we woke early and someone else filled the
1547                  * queue, mesa-style. */
1548                 while (!qnotfull(q))
1549                         rendez_sleep(&q->wr, qnotfull, q);
1550         }
1551         return ret;
1552 }
1553
1554 /*
1555  *  add a block to a queue obeying flow control
1556  */
1557 ssize_t qbwrite(struct queue *q, struct block *b)
1558 {
1559         return __qbwrite(q, b, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
1560 }
1561
1562 ssize_t qbwrite_nonblock(struct queue *q, struct block *b)
1563 {
1564         return __qbwrite(q, b, QIO_CAN_ERR_SLEEP | QIO_LIMIT | QIO_NON_BLOCK);
1565 }
1566
1567 ssize_t qibwrite(struct queue *q, struct block *b)
1568 {
1569         return __qbwrite(q, b, 0);
1570 }
1571
1572 /* Helper, allocs a block and copies [from, from + len) into it.  Returns the
1573  * block on success, 0 on failure. */
1574 static struct block *build_block(void *from, size_t len, int mem_flags)
1575 {
1576         struct block *b;
1577         void *ext_buf;
1578
1579         /* If len is small, we don't need to bother with the extra_data.  But until
1580          * the whole stack can handle extd blocks, we'll use them unconditionally.
1581          * */
1582 #ifdef CONFIG_BLOCK_EXTRAS
1583         /* allocb builds in 128 bytes of header space to all blocks, but this is
1584          * only available via padblock (to the left).  we also need some space
1585          * for pullupblock for some basic headers (like icmp) that get written
1586          * in directly */
1587         b = block_alloc(64, mem_flags);
1588         if (!b)
1589                 return 0;
1590         ext_buf = kmalloc(len, mem_flags);
1591         if (!ext_buf) {
1592                 kfree(b);
1593                 return 0;
1594         }
1595         memcpy(ext_buf, from, len);
1596         if (block_add_extd(b, 1, mem_flags)) {
1597                 kfree(ext_buf);
1598                 kfree(b);
1599                 return 0;
1600         }
1601         b->extra_data[0].base = (uintptr_t)ext_buf;
1602         b->extra_data[0].off = 0;
1603         b->extra_data[0].len = len;
1604         b->extra_len += len;
1605 #else
1606         b = block_alloc(len, mem_flags);
1607         if (!b)
1608                 return 0;
1609         memmove(b->wp, from, len);
1610         b->wp += len;
1611 #endif
1612         return b;
1613 }
1614
1615 static ssize_t __qwrite(struct queue *q, void *vp, size_t len, int mem_flags,
1616                         int qio_flags)
1617 {
1618         size_t n, sofar;
1619         struct block *b;
1620         uint8_t *p = vp;
1621         void *ext_buf;
1622
1623         sofar = 0;
1624         do {
1625                 n = len - sofar;
1626                 /* This is 64K, the max amount per single block.  Still a good value? */
1627                 if (n > Maxatomic)
1628                         n = Maxatomic;
1629                 b = build_block(p + sofar, n, mem_flags);
1630                 if (!b)
1631                         break;
1632                 if (__qbwrite(q, b, qio_flags) < 0)
1633                         break;
1634                 sofar += n;
1635         } while ((sofar < len) && (q->state & Qmsg) == 0);
1636         return sofar;
1637 }
1638
1639 ssize_t qwrite(struct queue *q, void *vp, int len)
1640 {
1641         return __qwrite(q, vp, len, MEM_WAIT, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
1642 }
1643
1644 ssize_t qwrite_nonblock(struct queue *q, void *vp, int len)
1645 {
1646         return __qwrite(q, vp, len, MEM_WAIT, QIO_CAN_ERR_SLEEP | QIO_LIMIT |
1647                                               QIO_NON_BLOCK);
1648 }
1649
1650 ssize_t qiwrite(struct queue *q, void *vp, int len)
1651 {
1652         return __qwrite(q, vp, len, MEM_ATOMIC, 0);
1653 }
1654
1655 /*
1656  *  be extremely careful when calling this,
1657  *  as there is no reference accounting
1658  */
1659 void qfree(struct queue *q)
1660 {
1661         qclose(q);
1662         kfree(q);
1663 }
1664
1665 /*
1666  *  Mark a queue as closed.  No further IO is permitted.
1667  *  All blocks are released.
1668  */
1669 void qclose(struct queue *q)
1670 {
1671         struct block *bfirst;
1672
1673         if (q == NULL)
1674                 return;
1675
1676         /* mark it */
1677         spin_lock_irqsave(&q->lock);
1678         q->state |= Qclosed;
1679         q->state &= ~Qdropoverflow;
1680         q->err[0] = 0;
1681         bfirst = q->bfirst;
1682         q->bfirst = 0;
1683         q->dlen = 0;
1684         spin_unlock_irqsave(&q->lock);
1685
1686         /* free queued blocks */
1687         freeblist(bfirst);
1688
1689         /* wake up readers/writers */
1690         rendez_wakeup(&q->rr);
1691         rendez_wakeup(&q->wr);
1692         qwake_cb(q, FDTAP_FILT_HANGUP);
1693 }
1694
1695 /* Mark a queue as closed.  Wakeup any readers.  Don't remove queued blocks.
1696  *
1697  * msg will be the errstr received by any waiters (qread, qbread, etc).  If
1698  * there is no message, which is what also happens during a natural qclose(),
1699  * those waiters will simply return 0.  qwriters will always error() on a
1700  * closed/hungup queue. */
1701 void qhangup(struct queue *q, char *msg)
1702 {
1703         /* mark it */
1704         spin_lock_irqsave(&q->lock);
1705         q->state |= Qclosed;
1706         if (msg == 0 || *msg == 0)
1707                 q->err[0] = 0;
1708         else
1709                 strlcpy(q->err, msg, ERRMAX);
1710         spin_unlock_irqsave(&q->lock);
1711
1712         /* wake up readers/writers */
1713         rendez_wakeup(&q->rr);
1714         rendez_wakeup(&q->wr);
1715         qwake_cb(q, FDTAP_FILT_HANGUP);
1716 }
1717
1718 /*
1719  *  return non-zero if the q is hungup
1720  */
1721 int qisclosed(struct queue *q)
1722 {
1723         return q->state & Qclosed;
1724 }
1725
1726 /*
1727  *  mark a queue as no longer hung up.  resets the wake_cb.
1728  */
1729 void qreopen(struct queue *q)
1730 {
1731         spin_lock_irqsave(&q->lock);
1732         q->state &= ~Qclosed;
1733         q->eof = 0;
1734         q->limit = q->inilim;
1735         q->wake_cb = 0;
1736         q->wake_data = 0;
1737         spin_unlock_irqsave(&q->lock);
1738 }
1739
1740 /*
1741  *  return bytes queued
1742  */
1743 int qlen(struct queue *q)
1744 {
1745         return q->dlen;
1746 }
1747
1748 /*
1749  * return space remaining before flow control
1750  *
1751  *  This used to be
1752  *  q->len < q->limit/2
1753  *  but it slows down tcp too much for certain write sizes.
1754  *  I really don't understand it completely.  It may be
1755  *  due to the queue draining so fast that the transmission
1756  *  stalls waiting for the app to produce more data.  - presotto
1757  *
1758  *  q->len was the amount of bytes, which is no longer used.  we now use
1759  *  q->dlen, the amount of usable data.  a.k.a. qlen()...  - brho
1760  */
1761 int qwindow(struct queue *q)
1762 {
1763         int l;
1764
1765         l = q->limit - q->dlen;
1766         if (l < 0)
1767                 l = 0;
1768         return l;
1769 }
1770
1771 /*
1772  *  return true if we can read without blocking
1773  */
1774 int qcanread(struct queue *q)
1775 {
1776         return q->bfirst != 0;
1777 }
1778
1779 /*
1780  *  change queue limit
1781  */
1782 void qsetlimit(struct queue *q, int limit)
1783 {
1784         q->limit = limit;
1785 }
1786
1787 /*
1788  *  set whether writes drop overflowing blocks, or if we sleep
1789  */
1790 void qdropoverflow(struct queue *q, bool onoff)
1791 {
1792         spin_lock_irqsave(&q->lock);
1793         if (onoff)
1794                 q->state |= Qdropoverflow;
1795         else
1796                 q->state &= ~Qdropoverflow;
1797         spin_unlock_irqsave(&q->lock);
1798 }
1799
1800 /* Be careful: this can affect concurrent reads/writes and code that might have
1801  * built-in expectations of the q's type. */
1802 void q_toggle_qmsg(struct queue *q, bool onoff)
1803 {
1804         spin_lock_irqsave(&q->lock);
1805         if (onoff)
1806                 q->state |= Qmsg;
1807         else
1808                 q->state &= ~Qmsg;
1809         spin_unlock_irqsave(&q->lock);
1810 }
1811
1812 /* Be careful: this can affect concurrent reads/writes and code that might have
1813  * built-in expectations of the q's type. */
1814 void q_toggle_qcoalesce(struct queue *q, bool onoff)
1815 {
1816         spin_lock_irqsave(&q->lock);
1817         if (onoff)
1818                 q->state |= Qcoalesce;
1819         else
1820                 q->state &= ~Qcoalesce;
1821         spin_unlock_irqsave(&q->lock);
1822 }
1823
1824 /*
1825  *  flush the output queue
1826  */
1827 void qflush(struct queue *q)
1828 {
1829         struct block *bfirst;
1830
1831         /* mark it */
1832         spin_lock_irqsave(&q->lock);
1833         bfirst = q->bfirst;
1834         q->bfirst = 0;
1835         q->dlen = 0;
1836         spin_unlock_irqsave(&q->lock);
1837
1838         /* free queued blocks */
1839         freeblist(bfirst);
1840
1841         /* wake up writers */
1842         rendez_wakeup(&q->wr);
1843         qwake_cb(q, FDTAP_FILT_WRITABLE);
1844 }
1845
1846 int qfull(struct queue *q)
1847 {
1848         return q->dlen >= q->limit;
1849 }
1850
1851 int qstate(struct queue *q)
1852 {
1853         return q->state;
1854 }
1855
1856 void qdump(struct queue *q)
1857 {
1858         if (q)
1859                 printk("q=%p bfirst=%p blast=%p dlen=%d limit=%d state=#%x\n",
1860                            q, q->bfirst, q->blast, q->dlen, q->limit, q->state);
1861 }
1862
1863 /* On certain wakeup events, qio will call func(q, data, filter), where filter
1864  * marks the type of wakeup event (flags from FDTAP).
1865  *
1866  * There's no sync protection.  If you change the CB while the qio is running,
1867  * you might get a CB with the data or func from a previous set_wake_cb.  You
1868  * should set this once per queue and forget it.
1869  *
1870  * You can remove the CB by passing in 0 for the func.  Alternatively, you can
1871  * just make sure that the func(data) pair are valid until the queue is freed or
1872  * reopened. */
1873 void qio_set_wake_cb(struct queue *q, qio_wake_cb_t func, void *data)
1874 {
1875         q->wake_data = data;
1876         wmb();  /* if we see func, we'll also see the data for it */
1877         q->wake_cb = func;
1878 }
1879
1880 /* Helper for detecting whether we'll block on a read at this instant. */
1881 bool qreadable(struct queue *q)
1882 {
1883         return qlen(q) > 0;
1884 }
1885
1886 /* Helper for detecting whether we'll block on a write at this instant. */
1887 bool qwritable(struct queue *q)
1888 {
1889         return !q->limit || qwindow(q) > 0;
1890 }