qio: Fix race with multiple blockers
[akaros.git] / kern / src / ns / qio.c
1 /* Copyright © 1994-1999 Lucent Technologies Inc.  All rights reserved.
2  * Portions Copyright © 1997-1999 Vita Nuova Limited
3  * Portions Copyright © 2000-2007 Vita Nuova Holdings Limited
4  *                                (www.vitanuova.com)
5  * Revisions Copyright © 2000-2007 Lucent Technologies Inc. and others
6  *
7  * Modified for the Akaros operating system:
8  * Copyright (c) 2013-2014 The Regents of the University of California
9  * Copyright (c) 2013-2015 Google Inc.
10  *
11  * Permission is hereby granted, free of charge, to any person obtaining a copy
12  * of this software and associated documentation files (the "Software"), to deal
13  * in the Software without restriction, including without limitation the rights
14  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15  * copies of the Software, and to permit persons to whom the Software is
16  * furnished to do so, subject to the following conditions:
17  *
18  * The above copyright notice and this permission notice shall be included in
19  * all copies or substantial portions of the Software.
20  *
21  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL THE
24  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
27  * SOFTWARE. */
28
29 #include <vfs.h>
30 #include <kfs.h>
31 #include <slab.h>
32 #include <kmalloc.h>
33 #include <kref.h>
34 #include <string.h>
35 #include <stdio.h>
36 #include <assert.h>
37 #include <error.h>
38 #include <cpio.h>
39 #include <pmap.h>
40 #include <smp.h>
41 #include <ip.h>
42
43 #define PANIC_EXTRA(b)                                                  \
44 {                                                                       \
45         if ((b)->extra_len) {                                           \
46                 printblock(b);                                          \
47                 backtrace();                                            \
48                 panic("%s doesn't handle extra_data", __FUNCTION__);    \
49         }                                                               \
50 }
51
52 static uint32_t padblockcnt;
53 static uint32_t concatblockcnt;
54 static uint32_t pullupblockcnt;
55 static uint32_t copyblockcnt;
56 static uint32_t consumecnt;
57 static uint32_t producecnt;
58 static uint32_t qcopycnt;
59
60 static int debugging;
61
62 #define QDEBUG  if(0)
63
64 /*
65  *  IO queues
66  */
67
68 struct queue {
69         spinlock_t lock;;
70
71         struct block *bfirst;           /* buffer */
72         struct block *blast;
73
74         int len;                                        /* bytes allocated to queue */
75         int dlen;                                       /* data bytes in queue */
76         int limit;                                      /* max bytes in queue */
77         int inilim;                             /* initial limit */
78         int state;
79         int eof;                                        /* number of eofs read by user */
80
81         void (*kick) (void *);          /* restart output */
82         void (*bypass) (void *, struct block *);        /* bypass queue altogether */
83         void *arg;                                      /* argument to kick */
84
85         struct rendez rr;                       /* process waiting to read */
86         struct rendez wr;                       /* process waiting to write */
87         qio_wake_cb_t wake_cb;          /* callbacks for qio wakeups */
88         void *wake_data;
89
90         char err[ERRMAX];
91 };
92
93 enum {
94         Maxatomic = 64 * 1024,
95         QIO_CAN_ERR_SLEEP = (1 << 0),   /* can throw errors or block/sleep */
96         QIO_LIMIT = (1 << 1),                   /* respect q->limit */
97         QIO_DROP_OVERFLOW = (1 << 2),   /* alternative to setting qdropoverflow */
98         QIO_JUST_ONE_BLOCK = (1 << 3),  /* when qbreading, just get one block */
99         QIO_NON_BLOCK = (1 << 4),               /* throw EAGAIN instead of blocking */
100         QIO_DONT_KICK = (1 << 5),               /* don't kick when waking */
101 };
102
103 unsigned int qiomaxatomic = Maxatomic;
104
105 static size_t copy_to_block_body(struct block *to, void *from, size_t copy_amt);
106 static ssize_t __qbwrite(struct queue *q, struct block *b, int flags);
107 static struct block *__qbread(struct queue *q, size_t len, int qio_flags,
108                               int mem_flags);
109 static bool qwait_and_ilock(struct queue *q, int qio_flags);
110
111 /* Helper: fires a wake callback, sending 'filter' */
112 static void qwake_cb(struct queue *q, int filter)
113 {
114         if (q->wake_cb)
115                 q->wake_cb(q, q->wake_data, filter);
116 }
117
118 void ixsummary(void)
119 {
120         debugging ^= 1;
121         printd("pad %lu, concat %lu, pullup %lu, copy %lu\n",
122                    padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
123         printd("consume %lu, produce %lu, qcopy %lu\n",
124                    consumecnt, producecnt, qcopycnt);
125 }
126
127 /*
128  *  pad a block to the front (or the back if size is negative)
129  */
130 struct block *padblock(struct block *bp, int size)
131 {
132         int n;
133         struct block *nbp;
134         uint8_t bcksum = bp->flag & BCKSUM_FLAGS;
135         uint16_t checksum_start = bp->checksum_start;
136         uint16_t checksum_offset = bp->checksum_offset;
137         uint16_t mss = bp->mss;
138
139         QDEBUG checkb(bp, "padblock 1");
140         if (size >= 0) {
141                 if (bp->rp - bp->base >= size) {
142                         bp->checksum_start += size;
143                         bp->rp -= size;
144                         return bp;
145                 }
146
147                 PANIC_EXTRA(bp);
148                 if (bp->next)
149                         panic("padblock %p", getcallerpc(&bp));
150                 n = BLEN(bp);
151                 padblockcnt++;
152                 nbp = block_alloc(size + n, MEM_WAIT);
153                 nbp->rp += size;
154                 nbp->wp = nbp->rp;
155                 memmove(nbp->wp, bp->rp, n);
156                 nbp->wp += n;
157                 freeb(bp);
158                 nbp->rp -= size;
159         } else {
160                 size = -size;
161
162                 PANIC_EXTRA(bp);
163
164                 if (bp->next)
165                         panic("padblock %p", getcallerpc(&bp));
166
167                 if (bp->lim - bp->wp >= size)
168                         return bp;
169
170                 n = BLEN(bp);
171                 padblockcnt++;
172                 nbp = block_alloc(size + n, MEM_WAIT);
173                 memmove(nbp->wp, bp->rp, n);
174                 nbp->wp += n;
175                 freeb(bp);
176         }
177         if (bcksum) {
178                 nbp->flag |= bcksum;
179                 nbp->checksum_start = checksum_start;
180                 nbp->checksum_offset = checksum_offset;
181                 nbp->mss = mss;
182         }
183         QDEBUG checkb(nbp, "padblock 1");
184         return nbp;
185 }
186
187 /*
188  *  return count of bytes in a string of blocks
189  */
190 int blocklen(struct block *bp)
191 {
192         int len;
193
194         len = 0;
195         while (bp) {
196                 len += BLEN(bp);
197                 bp = bp->next;
198         }
199         return len;
200 }
201
202 /*
203  * return count of space in blocks
204  */
205 int blockalloclen(struct block *bp)
206 {
207         int len;
208
209         len = 0;
210         while (bp) {
211                 len += BALLOC(bp);
212                 bp = bp->next;
213         }
214         return len;
215 }
216
217 /*
218  *  copy the  string of blocks into
219  *  a single block and free the string
220  */
221 struct block *concatblock(struct block *bp)
222 {
223         int len;
224         struct block *nb, *f;
225
226         if (bp->next == 0)
227                 return bp;
228
229         /* probably use parts of qclone */
230         PANIC_EXTRA(bp);
231         nb = block_alloc(blocklen(bp), MEM_WAIT);
232         for (f = bp; f; f = f->next) {
233                 len = BLEN(f);
234                 memmove(nb->wp, f->rp, len);
235                 nb->wp += len;
236         }
237         concatblockcnt += BLEN(nb);
238         freeblist(bp);
239         QDEBUG checkb(nb, "concatblock 1");
240         return nb;
241 }
242
243 /* Makes an identical copy of the block, collapsing all the data into the block
244  * body.  It does not point to the contents of the original, it is a copy
245  * (unlike qclone).  Since we're copying, we might as well put the memory into
246  * one contiguous chunk. */
247 struct block *copyblock(struct block *bp, int mem_flags)
248 {
249         struct block *newb;
250         struct extra_bdata *ebd;
251         size_t amt;
252
253         QDEBUG checkb(bp, "copyblock 0");
254         newb = block_alloc(BLEN(bp), mem_flags);
255         if (!newb)
256                 return 0;
257         amt = copy_to_block_body(newb, bp->rp, BHLEN(bp));
258         assert(amt == BHLEN(bp));
259         for (int i = 0; i < bp->nr_extra_bufs; i++) {
260                 ebd = &bp->extra_data[i];
261                 if (!ebd->base || !ebd->len)
262                         continue;
263                 amt = copy_to_block_body(newb, (void*)ebd->base + ebd->off, ebd->len);
264                 assert(amt == ebd->len);
265         }
266         /* TODO: any other flags that need copied over? */
267         if (bp->flag & BCKSUM_FLAGS) {
268                 newb->flag |= (bp->flag & BCKSUM_FLAGS);
269                 newb->checksum_start = bp->checksum_start;
270                 newb->checksum_offset = bp->checksum_offset;
271                 newb->mss = bp->mss;
272         }
273         copyblockcnt++;
274         QDEBUG checkb(newb, "copyblock 1");
275         return newb;
276 }
277
278 /* Returns a block with the remaining contents of b all in the main body of the
279  * returned block.  Replace old references to b with the returned value (which
280  * may still be 'b', if no change was needed. */
281 struct block *linearizeblock(struct block *b)
282 {
283         struct block *newb;
284
285         if (!b->extra_len)
286                 return b;
287         newb = copyblock(b, MEM_WAIT);
288         freeb(b);
289         return newb;
290 }
291
292 /* Make sure the first block has at least n bytes in its main body.  Pulls up
293  * data from the *list* of blocks.  Returns 0 if there is not enough data in the
294  * block list. */
295 struct block *pullupblock(struct block *bp, int n)
296 {
297         int i, len, seglen;
298         struct block *nbp;
299         struct extra_bdata *ebd;
300
301         /*
302          *  this should almost always be true, it's
303          *  just to avoid every caller checking.
304          */
305         if (BHLEN(bp) >= n)
306                 return bp;
307
308         /* If there's no chance, just bail out now.  This might be slightly wasteful
309          * if there's a long blist that does have enough data. */
310         if (n > blocklen(bp))
311                 return 0;
312         /* a start at explicit main-body / header management */
313         if (bp->extra_len) {
314                 if (n > bp->lim - bp->rp) {
315                         /* would need to realloc a new block and copy everything over. */
316                         panic("can't pullup %d bytes, no place to put it: bp->lim %p, bp->rp %p, bp->lim-bp->rp %d\n",
317                                         n, bp->lim, bp->rp, bp->lim-bp->rp);
318                 }
319                 len = n - BHLEN(bp);
320                 /* Would need to recursively call this, or otherwise pull from later
321                  * blocks and put chunks of their data into the block we're building. */
322                 if (len > bp->extra_len)
323                         panic("pullup more than extra (%d, %d, %d)\n",
324                               n, BHLEN(bp), bp->extra_len);
325                 QDEBUG checkb(bp, "before pullup");
326                 for (int i = 0; (i < bp->nr_extra_bufs) && len; i++) {
327                         ebd = &bp->extra_data[i];
328                         if (!ebd->base || !ebd->len)
329                                 continue;
330                         seglen = MIN(ebd->len, len);
331                         memcpy(bp->wp, (void*)(ebd->base + ebd->off), seglen);
332                         bp->wp += seglen;
333                         len -= seglen;
334                         ebd->len -= seglen;
335                         ebd->off += seglen;
336                         bp->extra_len -= seglen;
337                         if (ebd->len == 0) {
338                                 kfree((void *)ebd->base);
339                                 ebd->off = 0;
340                                 ebd->base = 0;
341                         }
342                 }
343                 /* maybe just call pullupblock recursively here */
344                 if (len)
345                         panic("pullup %d bytes overdrawn\n", len);
346                 QDEBUG checkb(bp, "after pullup");
347                 return bp;
348         }
349
350         /*
351          *  if not enough room in the first block,
352          *  add another to the front of the list.
353          */
354         if (bp->lim - bp->rp < n) {
355                 nbp = block_alloc(n, MEM_WAIT);
356                 nbp->next = bp;
357                 bp = nbp;
358         }
359
360         /*
361          *  copy bytes from the trailing blocks into the first
362          */
363         n -= BLEN(bp);
364         while ((nbp = bp->next)) {
365                 i = BLEN(nbp);
366                 if (i > n) {
367                         memmove(bp->wp, nbp->rp, n);
368                         pullupblockcnt++;
369                         bp->wp += n;
370                         nbp->rp += n;
371                         QDEBUG checkb(bp, "pullupblock 1");
372                         return bp;
373                 } else {
374                         memmove(bp->wp, nbp->rp, i);
375                         pullupblockcnt++;
376                         bp->wp += i;
377                         bp->next = nbp->next;
378                         nbp->next = 0;
379                         freeb(nbp);
380                         n -= i;
381                         if (n == 0) {
382                                 QDEBUG checkb(bp, "pullupblock 2");
383                                 return bp;
384                         }
385                 }
386         }
387         freeb(bp);
388         return 0;
389 }
390
391 /*
392  *  make sure the first block has at least n bytes in its main body
393  */
394 struct block *pullupqueue(struct queue *q, int n)
395 {
396         struct block *b;
397
398         /* TODO: lock to protect the queue links? */
399         if ((BHLEN(q->bfirst) >= n))
400                 return q->bfirst;
401         q->bfirst = pullupblock(q->bfirst, n);
402         for (b = q->bfirst; b != NULL && b->next != NULL; b = b->next) ;
403         q->blast = b;
404         return q->bfirst;
405 }
406
407 /* throw away count bytes from the front of
408  * block's extradata.  Returns count of bytes
409  * thrown away
410  */
411
412 static int pullext(struct block *bp, int count)
413 {
414         struct extra_bdata *ed;
415         int i, rem, bytes = 0;
416
417         for (i = 0; bp->extra_len && count && i < bp->nr_extra_bufs; i++) {
418                 ed = &bp->extra_data[i];
419                 rem = MIN(count, ed->len);
420                 bp->extra_len -= rem;
421                 count -= rem;
422                 bytes += rem;
423                 ed->off += rem;
424                 ed->len -= rem;
425                 if (ed->len == 0) {
426                         kfree((void *)ed->base);
427                         ed->base = 0;
428                         ed->off = 0;
429                 }
430         }
431         return bytes;
432 }
433
434 /* throw away count bytes from the end of a
435  * block's extradata.  Returns count of bytes
436  * thrown away
437  */
438
439 static int dropext(struct block *bp, int count)
440 {
441         struct extra_bdata *ed;
442         int i, rem, bytes = 0;
443
444         for (i = bp->nr_extra_bufs - 1; bp->extra_len && count && i >= 0; i--) {
445                 ed = &bp->extra_data[i];
446                 rem = MIN(count, ed->len);
447                 bp->extra_len -= rem;
448                 count -= rem;
449                 bytes += rem;
450                 ed->len -= rem;
451                 if (ed->len == 0) {
452                         kfree((void *)ed->base);
453                         ed->base = 0;
454                         ed->off = 0;
455                 }
456         }
457         return bytes;
458 }
459
460 /*
461  *  throw away up to count bytes from a
462  *  list of blocks.  Return count of bytes
463  *  thrown away.
464  */
465 static int _pullblock(struct block **bph, int count, int free)
466 {
467         struct block *bp;
468         int n, bytes;
469
470         bytes = 0;
471         if (bph == NULL)
472                 return 0;
473
474         while (*bph != NULL && count != 0) {
475                 bp = *bph;
476
477                 n = MIN(BHLEN(bp), count);
478                 bytes += n;
479                 count -= n;
480                 bp->rp += n;
481                 n = pullext(bp, count);
482                 bytes += n;
483                 count -= n;
484                 QDEBUG checkb(bp, "pullblock ");
485                 if (BLEN(bp) == 0 && (free || count)) {
486                         *bph = bp->next;
487                         bp->next = NULL;
488                         freeb(bp);
489                 }
490         }
491         return bytes;
492 }
493
494 int pullblock(struct block **bph, int count)
495 {
496         return _pullblock(bph, count, 1);
497 }
498
499 /*
500  *  trim to len bytes starting at offset
501  */
502 struct block *trimblock(struct block *bp, int offset, int len)
503 {
504         uint32_t l, trim;
505         int olen = len;
506
507         QDEBUG checkb(bp, "trimblock 1");
508         if (blocklen(bp) < offset + len) {
509                 freeblist(bp);
510                 return NULL;
511         }
512
513         l =_pullblock(&bp, offset, 0);
514         if (bp == NULL)
515                 return NULL;
516         if (l != offset) {
517                 freeblist(bp);
518                 return NULL;
519         }
520
521         while ((l = BLEN(bp)) < len) {
522                 len -= l;
523                 bp = bp->next;
524         }
525
526         trim = BLEN(bp) - len;
527         trim -= dropext(bp, trim);
528         bp->wp -= trim;
529
530         if (bp->next) {
531                 freeblist(bp->next);
532                 bp->next = NULL;
533         }
534         return bp;
535 }
536
537 /* Adjust block @bp so that its size is exactly @len.
538  * If the size is increased, fill in the new contents with zeros.
539  * If the size is decreased, discard some of the old contents at the tail. */
540 struct block *adjustblock(struct block *bp, int len)
541 {
542         struct extra_bdata *ebd;
543         void *buf;
544         int i;
545
546         if (len < 0) {
547                 freeb(bp);
548                 return NULL;
549         }
550
551         if (len == BLEN(bp))
552                 return bp;
553
554         /* Shrink within block main body. */
555         if (len <= BHLEN(bp)) {
556                 free_block_extra(bp);
557                 bp->wp = bp->rp + len;
558                 QDEBUG checkb(bp, "adjustblock 1");
559                 return bp;
560         }
561
562         /* Need to grow. */
563         if (len > BLEN(bp)) {
564                 /* Grow within block main body. */
565                 if (bp->extra_len == 0 && bp->rp + len <= bp->lim) {
566                         memset(bp->wp, 0, len - BLEN(bp));
567                         bp->wp = bp->rp + len;
568                         QDEBUG checkb(bp, "adjustblock 2");
569                         return bp;
570                 }
571                 /* Grow with extra data buffers. */
572                 buf = kzmalloc(len - BLEN(bp), MEM_WAIT);
573                 block_append_extra(bp, (uintptr_t)buf, 0, len - BLEN(bp), MEM_WAIT);
574                 QDEBUG checkb(bp, "adjustblock 3");
575                 return bp;
576         }
577
578         /* Shrink extra data buffers.
579          * len is how much of ebd we need to keep.
580          * extra_len is re-accumulated. */
581         assert(bp->extra_len > 0);
582         len -= BHLEN(bp);
583         bp->extra_len = 0;
584         for (i = 0; i < bp->nr_extra_bufs; i++) {
585                 ebd = &bp->extra_data[i];
586                 if (len <= ebd->len)
587                         break;
588                 len -= ebd->len;
589                 bp->extra_len += ebd->len;
590         }
591         /* If len becomes zero, extra_data[i] should be freed. */
592         if (len > 0) {
593                 ebd = &bp->extra_data[i];
594                 ebd->len = len;
595                 bp->extra_len += ebd->len;
596                 i++;
597         }
598         for (; i < bp->nr_extra_bufs; i++) {
599                 ebd = &bp->extra_data[i];
600                 if (ebd->base)
601                         kfree((void*)ebd->base);
602                 ebd->base = ebd->off = ebd->len = 0;
603         }
604         QDEBUG checkb(bp, "adjustblock 4");
605         return bp;
606 }
607
608 /* Helper: removes and returns the first block from q */
609 static struct block *pop_first_block(struct queue *q)
610 {
611         struct block *b = q->bfirst;
612
613         q->len -= BALLOC(b);  // XXX all usages of q->len with extra_data are fucked
614         q->dlen -= BLEN(b);
615         q->bfirst = b->next;
616         b->next = 0;
617         return b;
618 }
619
620 /* Helper: copies up to copy_amt from a buf to a block's main body (b->wp) */
621 static size_t copy_to_block_body(struct block *to, void *from, size_t copy_amt)
622 {
623         copy_amt = MIN(to->lim - to->wp, copy_amt);
624         memcpy(to->wp, from, copy_amt);
625         to->wp += copy_amt;
626         return copy_amt;
627 }
628
629 /* Accounting helper.  Block b in q lost amt extra_data */
630 static void block_and_q_lost_extra(struct block *b, struct queue *q, size_t amt)
631 {
632         b->extra_len -= amt;
633         q->len -= amt;
634         q->dlen -= amt;
635 }
636
637 /* Helper: moves ebd from a block (in from_q) to another block.  The *ebd is
638  * fixed in 'from', so we move its contents and zero it out in 'from'.
639  *
640  * Returns the length moved (0 on failure). */
641 static size_t move_ebd(struct extra_bdata *ebd, struct block *to,
642                        struct block *from, struct queue *from_q)
643 {
644         size_t ret = ebd->len;
645
646         if (block_append_extra(to, ebd->base, ebd->off, ebd->len, MEM_ATOMIC))
647                 return 0;
648         block_and_q_lost_extra(from, from_q, ebd->len);
649         ebd->base = ebd->len = ebd->off = 0;
650         return ret;
651 }
652
653 /* Copy up to len bytes from q->bfirst to @to, leaving the block in place.  May
654  * return with less than len, but greater than 0, even if there is more
655  * available in q.
656  *
657  * At any moment that we have copied anything and things are tricky, we can just
658  * return.  The trickiness comes from a bunch of variables: is the main body
659  * empty?  How do we split the ebd?  If our alloc fails, then we can fall back
660  * to @to's main body, but only if we haven't used it yet. */
661 static size_t copy_from_first_block(struct queue *q, struct block *to,
662                                     size_t len)
663 {
664         struct block *from = q->bfirst;
665         size_t copy_amt, amt;
666         struct extra_bdata *ebd;
667
668         assert(len < BLEN(from));       /* sanity */
669         /* Try to extract from the main body */
670         copy_amt = MIN(BHLEN(from), len);
671         if (copy_amt) {
672                 copy_amt = copy_to_block_body(to, from->rp, copy_amt);
673                 from->rp += copy_amt;
674                 /* We only change dlen, (data len), not q->len, since the q still has
675                  * the same block memory allocation (no kfrees happened) */
676                 q->dlen -= copy_amt;
677         }
678         /* Try to extract the remainder from the extra data */
679         len -= copy_amt;
680         for (int i = 0; (i < from->nr_extra_bufs) && len; i++) {
681                 ebd = &from->extra_data[i];
682                 if (!ebd->base || !ebd->len)
683                         continue;
684                 if (len >= ebd->len) {
685                         amt = move_ebd(ebd, to, from, q);
686                         if (!amt) {
687                                 /* our internal alloc could have failed.   this ebd is now the
688                                  * last one we'll consider.  let's handle it separately and put
689                                  * it in the main body. */
690                                 if (copy_amt)
691                                         return copy_amt;
692                                 copy_amt = copy_to_block_body(to, (void*)ebd->base + ebd->off,
693                                                               ebd->len);
694                                 block_and_q_lost_extra(from, q, copy_amt);
695                                 break;
696                         }
697                         len -= amt;
698                         copy_amt += amt;
699                         continue;
700                 } else {
701                         /* If we're here, we reached our final ebd, which we'll need to
702                          * split to get anything from it. */
703                         if (copy_amt)
704                                 return copy_amt;
705                         copy_amt = copy_to_block_body(to, (void*)ebd->base + ebd->off,
706                                                       len);
707                         ebd->off += copy_amt;
708                         ebd->len -= copy_amt;
709                         block_and_q_lost_extra(from, q, copy_amt);
710                         break;
711                 }
712         }
713         if (len)
714                 assert(copy_amt);       /* sanity */
715         return copy_amt;
716 }
717
718 /* Return codes for __qbread and __try_qbread. */
719 enum {
720         QBR_OK,
721         QBR_FAIL,
722         QBR_SPARE,      /* we need a spare block */
723         QBR_AGAIN,      /* do it again, we are coalescing blocks */
724 };
725
726 /* Helper and back-end for __qbread: extracts and returns a list of blocks
727  * containing up to len bytes.  It may contain less than len even if q has more
728  * data.
729  *
730  * Returns a code interpreted by __qbread, and the returned blist in ret. */
731 static int __try_qbread(struct queue *q, size_t len, int qio_flags,
732                         struct block **real_ret, struct block *spare)
733 {
734         struct block *ret, *ret_last, *first;
735         size_t blen;
736         bool was_unwritable = FALSE;
737
738         if (qio_flags & QIO_CAN_ERR_SLEEP) {
739                 if (!qwait_and_ilock(q, qio_flags)) {
740                         spin_unlock_irqsave(&q->lock);
741                         return QBR_FAIL;
742                 }
743                 /* we qwaited and still hold the lock, so the q is not empty */
744                 first = q->bfirst;
745         } else {
746                 spin_lock_irqsave(&q->lock);
747                 first = q->bfirst;
748                 if (!first) {
749                         spin_unlock_irqsave(&q->lock);
750                         return QBR_FAIL;
751                 }
752         }
753         /* We need to check before adjusting q->len.  We're checking the writer's
754          * sleep condition / tap condition.  When set, we *might* be making an edge
755          * transition (from unwritable to writable), which needs to wake and fire
756          * taps.  But, our read might not drain the queue below q->lim.  We'll check
757          * again later to see if we should really wake them.  */
758         was_unwritable = !qwritable(q);
759         blen = BLEN(first);
760         if ((q->state & Qcoalesce) && (blen == 0)) {
761                 freeb(pop_first_block(q));
762                 spin_unlock_irqsave(&q->lock);
763                 /* Need to retry to make sure we have a first block */
764                 return QBR_AGAIN;
765         }
766         /* Qmsg: just return the first block.  Be careful, since our caller might
767          * not read all of the block and thus drop bytes.  Similar to SOCK_DGRAM. */
768         if (q->state & Qmsg) {
769                 ret = pop_first_block(q);
770                 goto out_ok;
771         }
772         /* Let's get at least something first - makes the code easier.  This way,
773          * we'll only ever split the block once. */
774         if (blen <= len) {
775                 ret = pop_first_block(q);
776                 len -= blen;
777         } else {
778                 /* need to split the block.  we won't actually take the first block out
779                  * of the queue - we're just extracting a little bit. */
780                 if (!spare) {
781                         /* We have nothing and need a spare block.  Retry! */
782                         spin_unlock_irqsave(&q->lock);
783                         return QBR_SPARE;
784                 }
785                 copy_from_first_block(q, spare, len);
786                 ret = spare;
787                 goto out_ok;
788         }
789         /* At this point, we just grabbed the first block.  We can try to grab some
790          * more, up to len (if they want). */
791         if (qio_flags & QIO_JUST_ONE_BLOCK)
792                 goto out_ok;
793         ret_last = ret;
794         while (q->bfirst && (len > 0)) {
795                 blen = BLEN(q->bfirst);
796                 if ((q->state & Qcoalesce) && (blen == 0)) {
797                         /* remove the intermediate 0 blocks */
798                         freeb(pop_first_block(q));
799                         continue;
800                 }
801                 if (blen > len) {
802                         /* We could try to split the block, but that's a huge pain.  For
803                          * instance, we might need to move the main body of b into an
804                          * extra_data of ret_last.  lots of ways for that to fail, and lots
805                          * of cases to consider.  Easier to just bail out.  This is why I
806                          * did the first block above: we don't need to worry about this. */
807                          break;
808                 }
809                 ret_last->next = pop_first_block(q);
810                 ret_last = ret_last->next;
811                 len -= blen;
812         }
813 out_ok:
814         /* Don't wake them up or fire tap if we didn't drain enough. */
815         if (!qwritable(q))
816                 was_unwritable = FALSE;
817         spin_unlock_irqsave(&q->lock);
818         if (was_unwritable) {
819                 if (q->kick && !(qio_flags & QIO_DONT_KICK))
820                         q->kick(q->arg);
821                 rendez_wakeup(&q->wr);
822                 qwake_cb(q, FDTAP_FILT_WRITABLE);
823         }
824         *real_ret = ret;
825         return QBR_OK;
826 }
827
828 /* Helper and front-end for __try_qbread: extracts and returns a list of blocks
829  * containing up to len bytes.  It may contain less than len even if q has more
830  * data.
831  *
832  * Returns 0 if the q is closed or would require blocking and !CAN_BLOCK.
833  *
834  * Technically, there's a weird corner case with !Qcoalesce and Qmsg where you
835  * could get a zero length block back. */
836 static struct block *__qbread(struct queue *q, size_t len, int qio_flags,
837                               int mem_flags)
838 {
839         struct block *ret = 0;
840         struct block *spare = 0;
841
842         while (1) {
843                 switch (__try_qbread(q, len, qio_flags, &ret, spare)) {
844                 case QBR_OK:
845                 case QBR_FAIL:
846                         if (spare && (ret != spare))
847                                 freeb(spare);
848                         return ret;
849                 case QBR_SPARE:
850                         assert(!spare);
851                         /* Due to some nastiness, we need a fresh block so we can read out
852                          * anything from the queue.  'len' seems like a reasonable amount.
853                          * Maybe we can get away with less. */
854                         spare = block_alloc(len, mem_flags);
855                         if (!spare)
856                                 return 0;
857                         break;
858                 case QBR_AGAIN:
859                         /* if the first block is 0 and we are Qcoalesce, then we'll need to
860                          * try again.  We bounce out of __try so we can perform the "is
861                          * there a block" logic again from the top. */
862                         break;
863                 }
864         }
865 }
866
867 /*
868  *  get next block from a queue, return null if nothing there
869  */
870 struct block *qget(struct queue *q)
871 {
872         /* since len == SIZE_MAX, we should never need to do a mem alloc */
873         return __qbread(q, SIZE_MAX, QIO_JUST_ONE_BLOCK, MEM_ATOMIC);
874 }
875
876 /* Throw away the next 'len' bytes in the queue returning the number actually
877  * discarded.
878  *
879  * If the bytes are in the queue, then they must be discarded.  The only time to
880  * return less than len is if the q itself has less than len bytes.
881  *
882  * This won't trigger a kick when waking up any sleepers.  This seems to be Plan
883  * 9's intent, since the TCP stack will deadlock if qdiscard kicks. */
884 size_t qdiscard(struct queue *q, size_t len)
885 {
886         struct block *blist;
887         size_t removed_amt;
888         size_t sofar = 0;
889
890         /* This is racy.  There could be multiple qdiscarders or other consumers,
891          * where the consumption could be interleaved. */
892         while (qlen(q) && len) {
893                 blist = __qbread(q, len, QIO_DONT_KICK, MEM_WAIT);
894                 removed_amt = freeblist(blist);
895                 sofar += removed_amt;
896                 len -= removed_amt;
897         }
898         return sofar;
899 }
900
901 ssize_t qpass(struct queue *q, struct block *b)
902 {
903         return __qbwrite(q, b, QIO_LIMIT | QIO_DROP_OVERFLOW);
904 }
905
906 ssize_t qpassnolim(struct queue *q, struct block *b)
907 {
908         return __qbwrite(q, b, 0);
909 }
910
911 /*
912  *  if the allocated space is way out of line with the used
913  *  space, reallocate to a smaller block
914  */
915 struct block *packblock(struct block *bp)
916 {
917         struct block **l, *nbp;
918         int n;
919
920         if (bp->extra_len)
921                 return bp;
922         for (l = &bp; *l; l = &(*l)->next) {
923                 nbp = *l;
924                 n = BLEN(nbp);
925                 if ((n << 2) < BALLOC(nbp)) {
926                         *l = block_alloc(n, MEM_WAIT);
927                         memmove((*l)->wp, nbp->rp, n);
928                         (*l)->wp += n;
929                         (*l)->next = nbp->next;
930                         freeb(nbp);
931                 }
932         }
933
934         return bp;
935 }
936
937 /* Add an extra_data entry to newb at newb_idx pointing to b's body, starting at
938  * body_rp, for up to len.  Returns the len consumed.
939  *
940  * The base is 'b', so that we can kfree it later.  This currently ties us to
941  * using kfree for the release method for all extra_data.
942  *
943  * It is possible to have a body size that is 0, if there is no offset, and
944  * b->wp == b->rp.  This will have an extra data entry of 0 length. */
945 static size_t point_to_body(struct block *b, uint8_t *body_rp,
946                             struct block *newb, unsigned int newb_idx,
947                             size_t len)
948 {
949         struct extra_bdata *ebd = &newb->extra_data[newb_idx];
950
951         assert(newb_idx < newb->nr_extra_bufs);
952
953         kmalloc_incref(b);
954         ebd->base = (uintptr_t)b;
955         ebd->off = (uint32_t)(body_rp - (uint8_t*)b);
956         ebd->len = MIN(b->wp - body_rp, len);   /* think of body_rp as b->rp */
957         assert((int)ebd->len >= 0);
958         newb->extra_len += ebd->len;
959         return ebd->len;
960 }
961
962 /* Add an extra_data entry to newb at newb_idx pointing to b's b_idx'th
963  * extra_data buf, at b_off within that buffer, for up to len.  Returns the len
964  * consumed.
965  *
966  * We can have blocks with 0 length, but they are still refcnt'd.  See above. */
967 static size_t point_to_buf(struct block *b, unsigned int b_idx, uint32_t b_off,
968                            struct block *newb, unsigned int newb_idx,
969                            size_t len)
970 {
971         struct extra_bdata *n_ebd = &newb->extra_data[newb_idx];
972         struct extra_bdata *b_ebd = &b->extra_data[b_idx];
973
974         assert(b_idx < b->nr_extra_bufs);
975         assert(newb_idx < newb->nr_extra_bufs);
976
977         kmalloc_incref((void*)b_ebd->base);
978         n_ebd->base = b_ebd->base;
979         n_ebd->off = b_ebd->off + b_off;
980         n_ebd->len = MIN(b_ebd->len - b_off, len);
981         newb->extra_len += n_ebd->len;
982         return n_ebd->len;
983 }
984
985 /* given a string of blocks, sets up the new block's extra_data such that it
986  * *points* to the contents of the blist [offset, len + offset).  This does not
987  * make a separate copy of the contents of the blist.
988  *
989  * returns 0 on success.  the only failure is if the extra_data array was too
990  * small, so this returns a positive integer saying how big the extra_data needs
991  * to be.
992  *
993  * callers are responsible for protecting the list structure. */
994 static int __blist_clone_to(struct block *blist, struct block *newb, int len,
995                             uint32_t offset)
996 {
997         struct block *b, *first;
998         unsigned int nr_bufs = 0;
999         unsigned int b_idx, newb_idx = 0;
1000         uint8_t *first_main_body = 0;
1001
1002         /* find the first block; keep offset relative to the latest b in the list */
1003         for (b = blist; b; b = b->next) {
1004                 if (BLEN(b) > offset)
1005                         break;
1006                 offset -= BLEN(b);
1007         }
1008         /* qcopy semantics: if you asked for an offset outside the block list, you
1009          * get an empty block back */
1010         if (!b)
1011                 return 0;
1012         first = b;
1013         /* upper bound for how many buffers we'll need in newb */
1014         for (/* b is set*/; b; b = b->next) {
1015                 nr_bufs += 1 + b->nr_extra_bufs;        /* 1 for the main body */
1016         }
1017         /* we might be holding a spinlock here, so we won't wait for kmalloc */
1018         if (block_add_extd(newb, nr_bufs, 0) != 0) {
1019                 /* caller will need to alloc these, then re-call us */
1020                 return nr_bufs;
1021         }
1022         for (b = first; b && len; b = b->next) {
1023                 b_idx = 0;
1024                 if (offset) {
1025                         if (offset < BHLEN(b)) {
1026                                 /* off is in the main body */
1027                                 len -= point_to_body(b, b->rp + offset, newb, newb_idx, len);
1028                                 newb_idx++;
1029                         } else {
1030                                 /* off is in one of the buffers (or just past the last one).
1031                                  * we're not going to point to b's main body at all. */
1032                                 offset -= BHLEN(b);
1033                                 assert(b->extra_data);
1034                                 /* assuming these extrabufs are packed, or at least that len
1035                                  * isn't gibberish */
1036                                 while (b->extra_data[b_idx].len <= offset) {
1037                                         offset -= b->extra_data[b_idx].len;
1038                                         b_idx++;
1039                                 }
1040                                 /* now offset is set to our offset in the b_idx'th buf */
1041                                 len -= point_to_buf(b, b_idx, offset, newb, newb_idx, len);
1042                                 newb_idx++;
1043                                 b_idx++;
1044                         }
1045                         offset = 0;
1046                 } else {
1047                         len -= point_to_body(b, b->rp, newb, newb_idx, len);
1048                         newb_idx++;
1049                 }
1050                 /* knock out all remaining bufs.  we only did one point_to_ op by now,
1051                  * and any point_to_ could be our last if it consumed all of len. */
1052                 for (int i = b_idx; (i < b->nr_extra_bufs) && len; i++) {
1053                         len -= point_to_buf(b, i, 0, newb, newb_idx, len);
1054                         newb_idx++;
1055                 }
1056         }
1057         return 0;
1058 }
1059
1060 struct block *blist_clone(struct block *blist, int header_len, int len,
1061                           uint32_t offset)
1062 {
1063         int ret;
1064         struct block *newb = block_alloc(header_len, MEM_WAIT);
1065         do {
1066                 ret = __blist_clone_to(blist, newb, len, offset);
1067                 if (ret)
1068                         block_add_extd(newb, ret, MEM_WAIT);
1069         } while (ret);
1070         return newb;
1071 }
1072
1073 /* given a queue, makes a single block with header_len reserved space in the
1074  * block main body, and the contents of [offset, len + offset) pointed to in the
1075  * new blocks ext_data.  This does not make a copy of the q's contents, though
1076  * you do have a ref count on the memory. */
1077 struct block *qclone(struct queue *q, int header_len, int len, uint32_t offset)
1078 {
1079         int ret;
1080         struct block *newb = block_alloc(header_len, MEM_WAIT);
1081         /* the while loop should rarely be used: it would require someone
1082          * concurrently adding to the queue. */
1083         do {
1084                 /* TODO: RCU: protecting the q list (b->next) (need read lock) */
1085                 spin_lock_irqsave(&q->lock);
1086                 ret = __blist_clone_to(q->bfirst, newb, len, offset);
1087                 spin_unlock_irqsave(&q->lock);
1088                 if (ret)
1089                         block_add_extd(newb, ret, MEM_WAIT);
1090         } while (ret);
1091         return newb;
1092 }
1093
1094 /*
1095  *  copy from offset in the queue
1096  */
1097 struct block *qcopy_old(struct queue *q, int len, uint32_t offset)
1098 {
1099         int sofar;
1100         int n;
1101         struct block *b, *nb;
1102         uint8_t *p;
1103
1104         nb = block_alloc(len, MEM_WAIT);
1105
1106         spin_lock_irqsave(&q->lock);
1107
1108         /* go to offset */
1109         b = q->bfirst;
1110         for (sofar = 0;; sofar += n) {
1111                 if (b == NULL) {
1112                         spin_unlock_irqsave(&q->lock);
1113                         return nb;
1114                 }
1115                 n = BLEN(b);
1116                 if (sofar + n > offset) {
1117                         p = b->rp + offset - sofar;
1118                         n -= offset - sofar;
1119                         break;
1120                 }
1121                 QDEBUG checkb(b, "qcopy");
1122                 b = b->next;
1123         }
1124
1125         /* copy bytes from there */
1126         for (sofar = 0; sofar < len;) {
1127                 if (n > len - sofar)
1128                         n = len - sofar;
1129                 PANIC_EXTRA(b);
1130                 memmove(nb->wp, p, n);
1131                 qcopycnt += n;
1132                 sofar += n;
1133                 nb->wp += n;
1134                 b = b->next;
1135                 if (b == NULL)
1136                         break;
1137                 n = BLEN(b);
1138                 p = b->rp;
1139         }
1140         spin_unlock_irqsave(&q->lock);
1141
1142         return nb;
1143 }
1144
1145 struct block *qcopy(struct queue *q, int len, uint32_t offset)
1146 {
1147 #ifdef CONFIG_BLOCK_EXTRAS
1148         return qclone(q, 0, len, offset);
1149 #else
1150         return qcopy_old(q, len, offset);
1151 #endif
1152 }
1153
1154 static void qinit_common(struct queue *q)
1155 {
1156         spinlock_init_irqsave(&q->lock);
1157         rendez_init(&q->rr);
1158         rendez_init(&q->wr);
1159 }
1160
1161 /*
1162  *  called by non-interrupt code
1163  */
1164 struct queue *qopen(int limit, int msg, void (*kick) (void *), void *arg)
1165 {
1166         struct queue *q;
1167
1168         q = kzmalloc(sizeof(struct queue), 0);
1169         if (q == 0)
1170                 return 0;
1171         qinit_common(q);
1172
1173         q->limit = q->inilim = limit;
1174         q->kick = kick;
1175         q->arg = arg;
1176         q->state = msg;
1177         q->eof = 0;
1178
1179         return q;
1180 }
1181
1182 /* open a queue to be bypassed */
1183 struct queue *qbypass(void (*bypass) (void *, struct block *), void *arg)
1184 {
1185         struct queue *q;
1186
1187         q = kzmalloc(sizeof(struct queue), 0);
1188         if (q == 0)
1189                 return 0;
1190         qinit_common(q);
1191
1192         q->limit = 0;
1193         q->arg = arg;
1194         q->bypass = bypass;
1195         q->state = 0;
1196
1197         return q;
1198 }
1199
1200 static int notempty(void *a)
1201 {
1202         struct queue *q = a;
1203
1204         return (q->state & Qclosed) || q->bfirst != 0;
1205 }
1206
1207 /* Block, waiting for the queue to be non-empty or closed.  Returns with
1208  * the spinlock held.  Returns TRUE when there queue is not empty, FALSE if it
1209  * was naturally closed.  Throws an error o/w. */
1210 static bool qwait_and_ilock(struct queue *q, int qio_flags)
1211 {
1212         while (1) {
1213                 spin_lock_irqsave(&q->lock);
1214                 if (q->bfirst != NULL)
1215                         return TRUE;
1216                 if (q->state & Qclosed) {
1217                         if (++q->eof > 3) {
1218                                 spin_unlock_irqsave(&q->lock);
1219                                 error(EPIPE, "multiple reads on a closed queue");
1220                         }
1221                         if (q->err[0]) {
1222                                 spin_unlock_irqsave(&q->lock);
1223                                 error(EPIPE, q->err);
1224                         }
1225                         return FALSE;
1226                 }
1227                 if (qio_flags & QIO_NON_BLOCK) {
1228                         spin_unlock_irqsave(&q->lock);
1229                         error(EAGAIN, "queue empty");
1230                 }
1231                 spin_unlock_irqsave(&q->lock);
1232                 /* As with the producer side, we check for a condition while holding the
1233                  * q->lock, decide to sleep, then unlock.  It's like the "check, signal,
1234                  * check again" pattern, but we do it conditionally.  Both sides agree
1235                  * synchronously to do it, and those decisions are made while holding
1236                  * q->lock.  I think this is OK.
1237                  *
1238                  * The invariant is that no reader sleeps when the queue has data.
1239                  * While holding the rendez lock, if we see there's no data, we'll
1240                  * sleep.  Since we saw there was no data, the next writer will see (or
1241                  * already saw) no data, and then the writer decides to rendez_wake,
1242                  * which will grab the rendez lock.  If the writer already did that,
1243                  * then we'll see notempty when we do our check-again. */
1244                 rendez_sleep(&q->rr, notempty, q);
1245         }
1246 }
1247
1248 /*
1249  * add a block list to a queue
1250  * XXX basically the same as enqueue blist, and has no locking!
1251  */
1252 void qaddlist(struct queue *q, struct block *b)
1253 {
1254         /* TODO: q lock? */
1255         /* queue the block */
1256         if (q->bfirst)
1257                 q->blast->next = b;
1258         else
1259                 q->bfirst = b;
1260         q->len += blockalloclen(b);
1261         q->dlen += blocklen(b);
1262         while (b->next)
1263                 b = b->next;
1264         q->blast = b;
1265 }
1266
1267 static size_t read_from_block(struct block *b, uint8_t *to, size_t amt)
1268 {
1269         size_t copy_amt, retval = 0;
1270         struct extra_bdata *ebd;
1271
1272         copy_amt = MIN(BHLEN(b), amt);
1273         memcpy(to, b->rp, copy_amt);
1274         /* advance the rp, since this block not be completely consumed and future
1275          * reads need to know where to pick up from */
1276         b->rp += copy_amt;
1277         to += copy_amt;
1278         amt -= copy_amt;
1279         retval += copy_amt;
1280         for (int i = 0; (i < b->nr_extra_bufs) && amt; i++) {
1281                 ebd = &b->extra_data[i];
1282                 /* skip empty entires.  if we track this in the struct block, we can
1283                  * just start the for loop early */
1284                 if (!ebd->base || !ebd->len)
1285                         continue;
1286                 copy_amt = MIN(ebd->len, amt);
1287                 memcpy(to, (void*)(ebd->base + ebd->off), copy_amt);
1288                 /* we're actually consuming the entries, just like how we advance rp up
1289                  * above, and might only consume part of one. */
1290                 ebd->len -= copy_amt;
1291                 ebd->off += copy_amt;
1292                 b->extra_len -= copy_amt;
1293                 if (!ebd->len) {
1294                         /* we don't actually have to decref here.  it's also done in
1295                          * freeb().  this is the earliest we can free. */
1296                         kfree((void*)ebd->base);
1297                         ebd->base = ebd->off = 0;
1298                 }
1299                 to += copy_amt;
1300                 amt -= copy_amt;
1301                 retval += copy_amt;
1302         }
1303         return retval;
1304 }
1305
1306 /*
1307  *  copy the contents of a string of blocks into
1308  *  memory.  emptied blocks are freed.  return
1309  *  pointer to first unconsumed block.
1310  */
1311 struct block *bl2mem(uint8_t * p, struct block *b, int n)
1312 {
1313         int i;
1314         struct block *next;
1315
1316         /* could be slicker here, since read_from_block is smart */
1317         for (; b != NULL; b = next) {
1318                 i = BLEN(b);
1319                 if (i > n) {
1320                         /* partial block, consume some */
1321                         read_from_block(b, p, n);
1322                         return b;
1323                 }
1324                 /* full block, consume all and move on */
1325                 i = read_from_block(b, p, i);
1326                 n -= i;
1327                 p += i;
1328                 next = b->next;
1329                 freeb(b);
1330         }
1331         return NULL;
1332 }
1333
1334 /* Extract the contents of all blocks and copy to va, up to len.  Returns the
1335  * actual amount copied. */
1336 static size_t read_all_blocks(struct block *b, void *va, size_t len)
1337 {
1338         size_t sofar = 0;
1339         struct block *next;
1340
1341         do {
1342                 /* We should be draining every block completely. */
1343                 assert(BLEN(b) <= len - sofar);
1344                 assert(va);
1345                 assert(va + sofar);
1346                 assert(b->rp);
1347                 sofar += read_from_block(b, va + sofar, len - sofar);
1348                 next = b->next;
1349                 freeb(b);
1350                 b = next;
1351         } while (b);
1352         return sofar;
1353 }
1354
1355 /*
1356  *  copy the contents of memory into a string of blocks.
1357  *  return NULL on error.
1358  */
1359 struct block *mem2bl(uint8_t * p, int len)
1360 {
1361         ERRSTACK(1);
1362         int n;
1363         struct block *b, *first, **l;
1364
1365         first = NULL;
1366         l = &first;
1367         if (waserror()) {
1368                 freeblist(first);
1369                 nexterror();
1370         }
1371         do {
1372                 n = len;
1373                 if (n > Maxatomic)
1374                         n = Maxatomic;
1375
1376                 *l = b = block_alloc(n, MEM_WAIT);
1377                 /* TODO consider extra_data */
1378                 memmove(b->wp, p, n);
1379                 b->wp += n;
1380                 p += n;
1381                 len -= n;
1382                 l = &b->next;
1383         } while (len > 0);
1384         poperror();
1385
1386         return first;
1387 }
1388
1389 /*
1390  *  put a block back to the front of the queue
1391  *  called with q ilocked
1392  */
1393 void qputback(struct queue *q, struct block *b)
1394 {
1395         b->next = q->bfirst;
1396         if (q->bfirst == NULL)
1397                 q->blast = b;
1398         q->bfirst = b;
1399         q->len += BALLOC(b);
1400         q->dlen += BLEN(b);
1401 }
1402
1403 /*
1404  *  get next block from a queue (up to a limit)
1405  *
1406  */
1407 struct block *qbread(struct queue *q, size_t len)
1408 {
1409         return __qbread(q, len, QIO_JUST_ONE_BLOCK | QIO_CAN_ERR_SLEEP, MEM_WAIT);
1410 }
1411
1412 struct block *qbread_nonblock(struct queue *q, size_t len)
1413 {
1414         return __qbread(q, len, QIO_JUST_ONE_BLOCK | QIO_CAN_ERR_SLEEP |
1415                         QIO_NON_BLOCK, MEM_WAIT);
1416 }
1417
1418 /* read up to len from a queue into vp. */
1419 size_t qread(struct queue *q, void *va, size_t len)
1420 {
1421         struct block *blist = __qbread(q, len, QIO_CAN_ERR_SLEEP, MEM_WAIT);
1422
1423         if (!blist)
1424                 return 0;
1425         return read_all_blocks(blist, va, len);
1426 }
1427
1428 size_t qread_nonblock(struct queue *q, void *va, size_t len)
1429 {
1430         struct block *blist = __qbread(q, len, QIO_CAN_ERR_SLEEP | QIO_NON_BLOCK,
1431                                        MEM_WAIT);
1432
1433         if (!blist)
1434                 return 0;
1435         return read_all_blocks(blist, va, len);
1436 }
1437
1438 static int qnotfull(void *a)
1439 {
1440         struct queue *q = a;
1441
1442         return qwritable(q) || (q->state & Qclosed);
1443 }
1444
1445 /* Helper: enqueues a list of blocks to a queue.  Returns the total length. */
1446 static size_t enqueue_blist(struct queue *q, struct block *b)
1447 {
1448         size_t len, dlen;
1449
1450         if (q->bfirst)
1451                 q->blast->next = b;
1452         else
1453                 q->bfirst = b;
1454         len = BALLOC(b);
1455         dlen = BLEN(b);
1456         while (b->next) {
1457                 b = b->next;
1458                 len += BALLOC(b);
1459                 dlen += BLEN(b);
1460         }
1461         q->blast = b;
1462         q->len += len;
1463         q->dlen += dlen;
1464         return dlen;
1465 }
1466
1467 /* Adds block (which can be a list of blocks) to the queue, subject to
1468  * qio_flags.  Returns the length written on success or -1 on non-throwable
1469  * error.  Adjust qio_flags to control the value-added features!. */
1470 static ssize_t __qbwrite(struct queue *q, struct block *b, int qio_flags)
1471 {
1472         ssize_t ret;
1473         bool was_unreadable;
1474
1475         if (q->bypass) {
1476                 ret = blocklen(b);
1477                 (*q->bypass) (q->arg, b);
1478                 return ret;
1479         }
1480         spin_lock_irqsave(&q->lock);
1481         was_unreadable = q->len == 0;
1482         if (q->state & Qclosed) {
1483                 spin_unlock_irqsave(&q->lock);
1484                 freeblist(b);
1485                 if (!(qio_flags & QIO_CAN_ERR_SLEEP))
1486                         return -1;
1487                 if (q->err[0])
1488                         error(EPIPE, q->err);
1489                 else
1490                         error(EPIPE, "connection closed");
1491         }
1492         if ((qio_flags & QIO_LIMIT) && (q->len >= q->limit)) {
1493                 /* drop overflow takes priority over regular non-blocking */
1494                 if ((qio_flags & QIO_DROP_OVERFLOW) || (q->state & Qdropoverflow)) {
1495                         spin_unlock_irqsave(&q->lock);
1496                         freeb(b);
1497                         return -1;
1498                 }
1499                 /* People shouldn't set NON_BLOCK without CAN_ERR, but we can be nice
1500                  * and catch it. */
1501                 if ((qio_flags & QIO_CAN_ERR_SLEEP) && (qio_flags & QIO_NON_BLOCK)) {
1502                         spin_unlock_irqsave(&q->lock);
1503                         freeb(b);
1504                         error(EAGAIN, "queue full");
1505                 }
1506         }
1507         ret = enqueue_blist(q, b);
1508         QDEBUG checkb(b, "__qbwrite");
1509         spin_unlock_irqsave(&q->lock);
1510         /* TODO: not sure if the usage of a kick is mutually exclusive with a
1511          * wakeup, meaning that actual users either want a kick or have qreaders. */
1512         if (q->kick && (was_unreadable || (q->state & Qkick)))
1513                 q->kick(q->arg);
1514         if (was_unreadable) {
1515                 /* Unlike the read side, there's no double-check to make sure the queue
1516                  * transitioned across an edge.  We know we added something, so that's
1517                  * enough.  We wake if the queue was empty.  Both sides are the same, in
1518                  * that the condition for which we do the rendez_wakeup() is the same as
1519                  * the condition done for the rendez_sleep(). */
1520                 rendez_wakeup(&q->rr);
1521                 qwake_cb(q, FDTAP_FILT_READABLE);
1522         }
1523         /*
1524          *  flow control, wait for queue to get below the limit
1525          *  before allowing the process to continue and queue
1526          *  more.  We do this here so that postnote can only
1527          *  interrupt us after the data has been queued.  This
1528          *  means that things like 9p flushes and ssl messages
1529          *  will not be disrupted by software interrupts.
1530          *
1531          *  Note - this is moderately dangerous since a process
1532          *  that keeps getting interrupted and rewriting will
1533          *  queue infinite crud.
1534          */
1535         if ((qio_flags & QIO_CAN_ERR_SLEEP) &&
1536             !(q->state & Qdropoverflow) && !(qio_flags & QIO_NON_BLOCK)) {
1537                 /* This is a racy peek at the q status.  If we accidentally block, our
1538                  * rendez will return.  The rendez's peak (qnotfull) is also racy w.r.t.
1539                  * the q's spinlock (that lock protects writes, but not reads).
1540                  *
1541                  * Here's the deal: when holding the rendez lock, if we see the sleep
1542                  * condition, the consumer will wake us.  The condition will only ever
1543                  * be changed by the next qbread() (consumer, changes q->len).  That
1544                  * code will do a rendez wake, which will spin on the rendez lock,
1545                  * meaning it won't procede until we either see the new state (and
1546                  * return) or put ourselves on the rendez, and wake up.
1547                  *
1548                  * The pattern is one side writes mem, then signals.  Our side checks
1549                  * the signal, then reads the mem.  The goal is to not miss seeing the
1550                  * signal AND missing the memory write.  In this specific case, the
1551                  * signal is actually synchronous (the rendez lock) and not basic shared
1552                  * memory.
1553                  *
1554                  * Oh, and we spin in case we woke early and someone else filled the
1555                  * queue, mesa-style. */
1556                 while (!qnotfull(q))
1557                         rendez_sleep(&q->wr, qnotfull, q);
1558         }
1559         return ret;
1560 }
1561
1562 /*
1563  *  add a block to a queue obeying flow control
1564  */
1565 ssize_t qbwrite(struct queue *q, struct block *b)
1566 {
1567         return __qbwrite(q, b, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
1568 }
1569
1570 ssize_t qbwrite_nonblock(struct queue *q, struct block *b)
1571 {
1572         return __qbwrite(q, b, QIO_CAN_ERR_SLEEP | QIO_LIMIT | QIO_NON_BLOCK);
1573 }
1574
1575 ssize_t qibwrite(struct queue *q, struct block *b)
1576 {
1577         return __qbwrite(q, b, 0);
1578 }
1579
1580 /* Helper, allocs a block and copies [from, from + len) into it.  Returns the
1581  * block on success, 0 on failure. */
1582 static struct block *build_block(void *from, size_t len, int mem_flags)
1583 {
1584         struct block *b;
1585         void *ext_buf;
1586
1587         /* If len is small, we don't need to bother with the extra_data.  But until
1588          * the whole stack can handle extd blocks, we'll use them unconditionally.
1589          * */
1590 #ifdef CONFIG_BLOCK_EXTRAS
1591         /* allocb builds in 128 bytes of header space to all blocks, but this is
1592          * only available via padblock (to the left).  we also need some space
1593          * for pullupblock for some basic headers (like icmp) that get written
1594          * in directly */
1595         b = block_alloc(64, mem_flags);
1596         if (!b)
1597                 return 0;
1598         ext_buf = kmalloc(len, mem_flags);
1599         if (!ext_buf) {
1600                 kfree(b);
1601                 return 0;
1602         }
1603         memcpy(ext_buf, from, len);
1604         if (block_add_extd(b, 1, mem_flags)) {
1605                 kfree(ext_buf);
1606                 kfree(b);
1607                 return 0;
1608         }
1609         b->extra_data[0].base = (uintptr_t)ext_buf;
1610         b->extra_data[0].off = 0;
1611         b->extra_data[0].len = len;
1612         b->extra_len += len;
1613 #else
1614         b = block_alloc(len, mem_flags);
1615         if (!b)
1616                 return 0;
1617         memmove(b->wp, from, len);
1618         b->wp += len;
1619 #endif
1620         return b;
1621 }
1622
1623 static ssize_t __qwrite(struct queue *q, void *vp, size_t len, int mem_flags,
1624                         int qio_flags)
1625 {
1626         size_t n, sofar;
1627         struct block *b;
1628         uint8_t *p = vp;
1629         void *ext_buf;
1630
1631         sofar = 0;
1632         do {
1633                 n = len - sofar;
1634                 /* This is 64K, the max amount per single block.  Still a good value? */
1635                 if (n > Maxatomic)
1636                         n = Maxatomic;
1637                 b = build_block(p + sofar, n, mem_flags);
1638                 if (!b)
1639                         break;
1640                 if (__qbwrite(q, b, qio_flags) < 0)
1641                         break;
1642                 sofar += n;
1643         } while ((sofar < len) && (q->state & Qmsg) == 0);
1644         return sofar;
1645 }
1646
1647 ssize_t qwrite(struct queue *q, void *vp, int len)
1648 {
1649         return __qwrite(q, vp, len, MEM_WAIT, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
1650 }
1651
1652 ssize_t qwrite_nonblock(struct queue *q, void *vp, int len)
1653 {
1654         return __qwrite(q, vp, len, MEM_WAIT, QIO_CAN_ERR_SLEEP | QIO_LIMIT |
1655                                               QIO_NON_BLOCK);
1656 }
1657
1658 ssize_t qiwrite(struct queue *q, void *vp, int len)
1659 {
1660         return __qwrite(q, vp, len, MEM_ATOMIC, 0);
1661 }
1662
1663 /*
1664  *  be extremely careful when calling this,
1665  *  as there is no reference accounting
1666  */
1667 void qfree(struct queue *q)
1668 {
1669         qclose(q);
1670         kfree(q);
1671 }
1672
1673 /*
1674  *  Mark a queue as closed.  No further IO is permitted.
1675  *  All blocks are released.
1676  */
1677 void qclose(struct queue *q)
1678 {
1679         struct block *bfirst;
1680
1681         if (q == NULL)
1682                 return;
1683
1684         /* mark it */
1685         spin_lock_irqsave(&q->lock);
1686         q->state |= Qclosed;
1687         q->state &= ~Qdropoverflow;
1688         q->err[0] = 0;
1689         bfirst = q->bfirst;
1690         q->bfirst = 0;
1691         q->len = 0;
1692         q->dlen = 0;
1693         spin_unlock_irqsave(&q->lock);
1694
1695         /* free queued blocks */
1696         freeblist(bfirst);
1697
1698         /* wake up readers/writers */
1699         rendez_wakeup(&q->rr);
1700         rendez_wakeup(&q->wr);
1701         qwake_cb(q, FDTAP_FILT_HANGUP);
1702 }
1703
1704 /* Mark a queue as closed.  Wakeup any readers.  Don't remove queued blocks.
1705  *
1706  * msg will be the errstr received by any waiters (qread, qbread, etc).  If
1707  * there is no message, which is what also happens during a natural qclose(),
1708  * those waiters will simply return 0.  qwriters will always error() on a
1709  * closed/hungup queue. */
1710 void qhangup(struct queue *q, char *msg)
1711 {
1712         /* mark it */
1713         spin_lock_irqsave(&q->lock);
1714         q->state |= Qclosed;
1715         if (msg == 0 || *msg == 0)
1716                 q->err[0] = 0;
1717         else
1718                 strlcpy(q->err, msg, ERRMAX);
1719         spin_unlock_irqsave(&q->lock);
1720
1721         /* wake up readers/writers */
1722         rendez_wakeup(&q->rr);
1723         rendez_wakeup(&q->wr);
1724         qwake_cb(q, FDTAP_FILT_HANGUP);
1725 }
1726
1727 /*
1728  *  return non-zero if the q is hungup
1729  */
1730 int qisclosed(struct queue *q)
1731 {
1732         return q->state & Qclosed;
1733 }
1734
1735 /*
1736  *  mark a queue as no longer hung up.  resets the wake_cb.
1737  */
1738 void qreopen(struct queue *q)
1739 {
1740         spin_lock_irqsave(&q->lock);
1741         q->state &= ~Qclosed;
1742         q->eof = 0;
1743         q->limit = q->inilim;
1744         q->wake_cb = 0;
1745         q->wake_data = 0;
1746         spin_unlock_irqsave(&q->lock);
1747 }
1748
1749 /*
1750  *  return bytes queued
1751  */
1752 int qlen(struct queue *q)
1753 {
1754         return q->dlen;
1755 }
1756
1757 /*
1758  * return space remaining before flow control
1759  *
1760  *  This used to be
1761  *  q->len < q->limit/2
1762  *  but it slows down tcp too much for certain write sizes.
1763  *  I really don't understand it completely.  It may be
1764  *  due to the queue draining so fast that the transmission
1765  *  stalls waiting for the app to produce more data.  - presotto
1766  */
1767 int qwindow(struct queue *q)
1768 {
1769         int l;
1770
1771         l = q->limit - q->len;
1772         if (l < 0)
1773                 l = 0;
1774         return l;
1775 }
1776
1777 /*
1778  *  return true if we can read without blocking
1779  */
1780 int qcanread(struct queue *q)
1781 {
1782         return q->bfirst != 0;
1783 }
1784
1785 /*
1786  *  change queue limit
1787  */
1788 void qsetlimit(struct queue *q, int limit)
1789 {
1790         q->limit = limit;
1791 }
1792
1793 /*
1794  *  set whether writes drop overflowing blocks, or if we sleep
1795  */
1796 void qdropoverflow(struct queue *q, bool onoff)
1797 {
1798         spin_lock_irqsave(&q->lock);
1799         if (onoff)
1800                 q->state |= Qdropoverflow;
1801         else
1802                 q->state &= ~Qdropoverflow;
1803         spin_unlock_irqsave(&q->lock);
1804 }
1805
1806 /* Be careful: this can affect concurrent reads/writes and code that might have
1807  * built-in expectations of the q's type. */
1808 void q_toggle_qmsg(struct queue *q, bool onoff)
1809 {
1810         spin_lock_irqsave(&q->lock);
1811         if (onoff)
1812                 q->state |= Qmsg;
1813         else
1814                 q->state &= ~Qmsg;
1815         spin_unlock_irqsave(&q->lock);
1816 }
1817
1818 /* Be careful: this can affect concurrent reads/writes and code that might have
1819  * built-in expectations of the q's type. */
1820 void q_toggle_qcoalesce(struct queue *q, bool onoff)
1821 {
1822         spin_lock_irqsave(&q->lock);
1823         if (onoff)
1824                 q->state |= Qcoalesce;
1825         else
1826                 q->state &= ~Qcoalesce;
1827         spin_unlock_irqsave(&q->lock);
1828 }
1829
1830 /*
1831  *  flush the output queue
1832  */
1833 void qflush(struct queue *q)
1834 {
1835         struct block *bfirst;
1836
1837         /* mark it */
1838         spin_lock_irqsave(&q->lock);
1839         bfirst = q->bfirst;
1840         q->bfirst = 0;
1841         q->len = 0;
1842         q->dlen = 0;
1843         spin_unlock_irqsave(&q->lock);
1844
1845         /* free queued blocks */
1846         freeblist(bfirst);
1847
1848         /* wake up writers */
1849         rendez_wakeup(&q->wr);
1850         qwake_cb(q, FDTAP_FILT_WRITABLE);
1851 }
1852
1853 int qfull(struct queue *q)
1854 {
1855         return q->len >= q->limit;
1856 }
1857
1858 int qstate(struct queue *q)
1859 {
1860         return q->state;
1861 }
1862
1863 void qdump(struct queue *q)
1864 {
1865         if (q)
1866                 printk("q=%p bfirst=%p blast=%p len=%d dlen=%d limit=%d state=#%x\n",
1867                            q, q->bfirst, q->blast, q->len, q->dlen, q->limit, q->state);
1868 }
1869
1870 /* On certain wakeup events, qio will call func(q, data, filter), where filter
1871  * marks the type of wakeup event (flags from FDTAP).
1872  *
1873  * There's no sync protection.  If you change the CB while the qio is running,
1874  * you might get a CB with the data or func from a previous set_wake_cb.  You
1875  * should set this once per queue and forget it.
1876  *
1877  * You can remove the CB by passing in 0 for the func.  Alternatively, you can
1878  * just make sure that the func(data) pair are valid until the queue is freed or
1879  * reopened. */
1880 void qio_set_wake_cb(struct queue *q, qio_wake_cb_t func, void *data)
1881 {
1882         q->wake_data = data;
1883         wmb();  /* if we see func, we'll also see the data for it */
1884         q->wake_cb = func;
1885 }
1886
1887 /* Helper for detecting whether we'll block on a read at this instant. */
1888 bool qreadable(struct queue *q)
1889 {
1890         return qlen(q) > 0;
1891 }
1892
1893 /* Helper for detecting whether we'll block on a write at this instant. */
1894 bool qwritable(struct queue *q)
1895 {
1896         return qwindow(q) > 0;
1897 }