qio: Fix copyblock()
[akaros.git] / kern / src / ns / qio.c
1 /* Copyright © 1994-1999 Lucent Technologies Inc.  All rights reserved.
2  * Portions Copyright © 1997-1999 Vita Nuova Limited
3  * Portions Copyright © 2000-2007 Vita Nuova Holdings Limited
4  *                                (www.vitanuova.com)
5  * Revisions Copyright © 2000-2007 Lucent Technologies Inc. and others
6  *
7  * Modified for the Akaros operating system:
8  * Copyright (c) 2013-2014 The Regents of the University of California
9  * Copyright (c) 2013-2015 Google Inc.
10  *
11  * Permission is hereby granted, free of charge, to any person obtaining a copy
12  * of this software and associated documentation files (the "Software"), to deal
13  * in the Software without restriction, including without limitation the rights
14  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15  * copies of the Software, and to permit persons to whom the Software is
16  * furnished to do so, subject to the following conditions:
17  *
18  * The above copyright notice and this permission notice shall be included in
19  * all copies or substantial portions of the Software.
20  *
21  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL THE
24  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
27  * SOFTWARE. */
28
29 #include <vfs.h>
30 #include <kfs.h>
31 #include <slab.h>
32 #include <kmalloc.h>
33 #include <kref.h>
34 #include <string.h>
35 #include <stdio.h>
36 #include <assert.h>
37 #include <error.h>
38 #include <cpio.h>
39 #include <pmap.h>
40 #include <smp.h>
41 #include <ip.h>
42
43 #define PANIC_EXTRA(b)                                                  \
44 {                                                                       \
45         if ((b)->extra_len) {                                           \
46                 printblock(b);                                          \
47                 backtrace();                                            \
48                 panic("%s doesn't handle extra_data", __FUNCTION__);    \
49         }                                                               \
50 }
51
52 static uint32_t padblockcnt;
53 static uint32_t concatblockcnt;
54 static uint32_t pullupblockcnt;
55 static uint32_t copyblockcnt;
56 static uint32_t consumecnt;
57 static uint32_t producecnt;
58 static uint32_t qcopycnt;
59
60 static int debugging;
61
62 #define QDEBUG  if(0)
63
64 /*
65  *  IO queues
66  */
67
68 struct queue {
69         spinlock_t lock;;
70
71         struct block *bfirst;           /* buffer */
72         struct block *blast;
73
74         int len;                                        /* bytes allocated to queue */
75         int dlen;                                       /* data bytes in queue */
76         int limit;                                      /* max bytes in queue */
77         int inilim;                             /* initial limit */
78         int state;
79         int eof;                                        /* number of eofs read by user */
80
81         void (*kick) (void *);          /* restart output */
82         void (*bypass) (void *, struct block *);        /* bypass queue altogether */
83         void *arg;                                      /* argument to kick */
84
85         struct rendez rr;                       /* process waiting to read */
86         struct rendez wr;                       /* process waiting to write */
87         qio_wake_cb_t wake_cb;          /* callbacks for qio wakeups */
88         void *wake_data;
89
90         char err[ERRMAX];
91 };
92
93 enum {
94         Maxatomic = 64 * 1024,
95         QIO_CAN_ERR_SLEEP = (1 << 0),   /* can throw errors or block/sleep */
96         QIO_LIMIT = (1 << 1),                   /* respect q->limit */
97         QIO_DROP_OVERFLOW = (1 << 2),   /* alternative to setting qdropoverflow */
98         QIO_JUST_ONE_BLOCK = (1 << 3),  /* when qbreading, just get one block */
99         QIO_NON_BLOCK = (1 << 4),               /* throw EAGAIN instead of blocking */
100         QIO_DONT_KICK = (1 << 5),               /* don't kick when waking */
101 };
102
103 unsigned int qiomaxatomic = Maxatomic;
104
105 static size_t copy_to_block_body(struct block *to, void *from, size_t copy_amt);
106 static ssize_t __qbwrite(struct queue *q, struct block *b, int flags);
107 static struct block *__qbread(struct queue *q, size_t len, int qio_flags,
108                               int mem_flags);
109 static bool qwait_and_ilock(struct queue *q, int qio_flags);
110 static void qwakeup_iunlock(struct queue *q, int qio_flags);
111
112 /* Helper: fires a wake callback, sending 'filter' */
113 static void qwake_cb(struct queue *q, int filter)
114 {
115         if (q->wake_cb)
116                 q->wake_cb(q, q->wake_data, filter);
117 }
118
119 void ixsummary(void)
120 {
121         debugging ^= 1;
122         printd("pad %lu, concat %lu, pullup %lu, copy %lu\n",
123                    padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
124         printd("consume %lu, produce %lu, qcopy %lu\n",
125                    consumecnt, producecnt, qcopycnt);
126 }
127
128 /*
129  *  pad a block to the front (or the back if size is negative)
130  */
131 struct block *padblock(struct block *bp, int size)
132 {
133         int n;
134         struct block *nbp;
135         uint8_t bcksum = bp->flag & BCKSUM_FLAGS;
136         uint16_t checksum_start = bp->checksum_start;
137         uint16_t checksum_offset = bp->checksum_offset;
138         uint16_t mss = bp->mss;
139
140         QDEBUG checkb(bp, "padblock 1");
141         if (size >= 0) {
142                 if (bp->rp - bp->base >= size) {
143                         bp->checksum_start += size;
144                         bp->rp -= size;
145                         return bp;
146                 }
147
148                 PANIC_EXTRA(bp);
149                 if (bp->next)
150                         panic("padblock %p", getcallerpc(&bp));
151                 n = BLEN(bp);
152                 padblockcnt++;
153                 nbp = block_alloc(size + n, MEM_WAIT);
154                 nbp->rp += size;
155                 nbp->wp = nbp->rp;
156                 memmove(nbp->wp, bp->rp, n);
157                 nbp->wp += n;
158                 freeb(bp);
159                 nbp->rp -= size;
160         } else {
161                 size = -size;
162
163                 PANIC_EXTRA(bp);
164
165                 if (bp->next)
166                         panic("padblock %p", getcallerpc(&bp));
167
168                 if (bp->lim - bp->wp >= size)
169                         return bp;
170
171                 n = BLEN(bp);
172                 padblockcnt++;
173                 nbp = block_alloc(size + n, MEM_WAIT);
174                 memmove(nbp->wp, bp->rp, n);
175                 nbp->wp += n;
176                 freeb(bp);
177         }
178         if (bcksum) {
179                 nbp->flag |= bcksum;
180                 nbp->checksum_start = checksum_start;
181                 nbp->checksum_offset = checksum_offset;
182                 nbp->mss = mss;
183         }
184         QDEBUG checkb(nbp, "padblock 1");
185         return nbp;
186 }
187
188 /*
189  *  return count of bytes in a string of blocks
190  */
191 int blocklen(struct block *bp)
192 {
193         int len;
194
195         len = 0;
196         while (bp) {
197                 len += BLEN(bp);
198                 bp = bp->next;
199         }
200         return len;
201 }
202
203 /*
204  * return count of space in blocks
205  */
206 int blockalloclen(struct block *bp)
207 {
208         int len;
209
210         len = 0;
211         while (bp) {
212                 len += BALLOC(bp);
213                 bp = bp->next;
214         }
215         return len;
216 }
217
218 /*
219  *  copy the  string of blocks into
220  *  a single block and free the string
221  */
222 struct block *concatblock(struct block *bp)
223 {
224         int len;
225         struct block *nb, *f;
226
227         if (bp->next == 0)
228                 return bp;
229
230         /* probably use parts of qclone */
231         PANIC_EXTRA(bp);
232         nb = block_alloc(blocklen(bp), MEM_WAIT);
233         for (f = bp; f; f = f->next) {
234                 len = BLEN(f);
235                 memmove(nb->wp, f->rp, len);
236                 nb->wp += len;
237         }
238         concatblockcnt += BLEN(nb);
239         freeblist(bp);
240         QDEBUG checkb(nb, "concatblock 1");
241         return nb;
242 }
243
244 /* Makes an identical copy of the block, collapsing all the data into the block
245  * body.  It does not point to the contents of the original, it is a copy
246  * (unlike qclone).  Since we're copying, we might as well put the memory into
247  * one contiguous chunk. */
248 struct block *copyblock(struct block *bp, int mem_flags)
249 {
250         struct block *newb;
251         struct extra_bdata *ebd;
252         size_t amt;
253
254         QDEBUG checkb(bp, "copyblock 0");
255         newb = block_alloc(BLEN(bp), mem_flags);
256         if (!newb)
257                 return 0;
258         amt = copy_to_block_body(newb, bp->rp, BHLEN(bp));
259         assert(amt == BHLEN(bp));
260         for (int i = 0; i < bp->nr_extra_bufs; i++) {
261                 ebd = &bp->extra_data[i];
262                 if (!ebd->base || !ebd->len)
263                         continue;
264                 amt = copy_to_block_body(newb, (void*)ebd->base + ebd->off, ebd->len);
265                 assert(amt == ebd->len);
266         }
267         /* TODO: any other flags that need copied over? */
268         if (bp->flag & BCKSUM_FLAGS) {
269                 newb->flag |= (bp->flag & BCKSUM_FLAGS);
270                 newb->checksum_start = bp->checksum_start;
271                 newb->checksum_offset = bp->checksum_offset;
272                 newb->mss = bp->mss;
273         }
274         copyblockcnt++;
275         QDEBUG checkb(newb, "copyblock 1");
276         return newb;
277 }
278
279 /* Returns a block with the remaining contents of b all in the main body of the
280  * returned block.  Replace old references to b with the returned value (which
281  * may still be 'b', if no change was needed. */
282 struct block *linearizeblock(struct block *b)
283 {
284         struct block *newb;
285
286         if (!b->extra_len)
287                 return b;
288         newb = copyblock(b, MEM_WAIT);
289         freeb(b);
290         return newb;
291 }
292
293 /*
294  *  make sure the first block has at least n bytes in its main body
295  */
296 struct block *pullupblock(struct block *bp, int n)
297 {
298         int i, len, seglen;
299         struct block *nbp;
300         struct extra_bdata *ebd;
301
302         /*
303          *  this should almost always be true, it's
304          *  just to avoid every caller checking.
305          */
306         if (BHLEN(bp) >= n)
307                 return bp;
308
309          /* a start at explicit main-body / header management */
310         if (bp->extra_len) {
311                 if (n > bp->lim - bp->rp) {
312                         /* would need to realloc a new block and copy everything over. */
313                         panic("can't pullup %d bytes, no place to put it: bp->lim %p, bp->rp %p, bp->lim-bp->rp %d\n",
314                                         n, bp->lim, bp->rp, bp->lim-bp->rp);
315                 }
316                 len = n - BHLEN(bp);
317                 if (len > bp->extra_len)
318                         panic("pullup more than extra (%d, %d, %d)\n",
319                               n, BHLEN(bp), bp->extra_len);
320                 QDEBUG checkb(bp, "before pullup");
321                 for (int i = 0; (i < bp->nr_extra_bufs) && len; i++) {
322                         ebd = &bp->extra_data[i];
323                         if (!ebd->base || !ebd->len)
324                                 continue;
325                         seglen = MIN(ebd->len, len);
326                         memcpy(bp->wp, (void*)(ebd->base + ebd->off), seglen);
327                         bp->wp += seglen;
328                         len -= seglen;
329                         ebd->len -= seglen;
330                         ebd->off += seglen;
331                         bp->extra_len -= seglen;
332                         if (ebd->len == 0) {
333                                 kfree((void *)ebd->base);
334                                 ebd->off = 0;
335                                 ebd->base = 0;
336                         }
337                 }
338                 /* maybe just call pullupblock recursively here */
339                 if (len)
340                         panic("pullup %d bytes overdrawn\n", len);
341                 QDEBUG checkb(bp, "after pullup");
342                 return bp;
343         }
344
345         /*
346          *  if not enough room in the first block,
347          *  add another to the front of the list.
348          */
349         if (bp->lim - bp->rp < n) {
350                 nbp = block_alloc(n, MEM_WAIT);
351                 nbp->next = bp;
352                 bp = nbp;
353         }
354
355         /*
356          *  copy bytes from the trailing blocks into the first
357          */
358         n -= BLEN(bp);
359         while ((nbp = bp->next)) {
360                 i = BLEN(nbp);
361                 if (i > n) {
362                         memmove(bp->wp, nbp->rp, n);
363                         pullupblockcnt++;
364                         bp->wp += n;
365                         nbp->rp += n;
366                         QDEBUG checkb(bp, "pullupblock 1");
367                         return bp;
368                 } else {
369                         memmove(bp->wp, nbp->rp, i);
370                         pullupblockcnt++;
371                         bp->wp += i;
372                         bp->next = nbp->next;
373                         nbp->next = 0;
374                         freeb(nbp);
375                         n -= i;
376                         if (n == 0) {
377                                 QDEBUG checkb(bp, "pullupblock 2");
378                                 return bp;
379                         }
380                 }
381         }
382         freeb(bp);
383         return 0;
384 }
385
386 /*
387  *  make sure the first block has at least n bytes in its main body
388  */
389 struct block *pullupqueue(struct queue *q, int n)
390 {
391         struct block *b;
392
393         /* TODO: lock to protect the queue links? */
394         if ((BHLEN(q->bfirst) >= n))
395                 return q->bfirst;
396         q->bfirst = pullupblock(q->bfirst, n);
397         for (b = q->bfirst; b != NULL && b->next != NULL; b = b->next) ;
398         q->blast = b;
399         return q->bfirst;
400 }
401
402 /* throw away count bytes from the front of
403  * block's extradata.  Returns count of bytes
404  * thrown away
405  */
406
407 static int pullext(struct block *bp, int count)
408 {
409         struct extra_bdata *ed;
410         int i, rem, bytes = 0;
411
412         for (i = 0; bp->extra_len && count && i < bp->nr_extra_bufs; i++) {
413                 ed = &bp->extra_data[i];
414                 rem = MIN(count, ed->len);
415                 bp->extra_len -= rem;
416                 count -= rem;
417                 bytes += rem;
418                 ed->off += rem;
419                 ed->len -= rem;
420                 if (ed->len == 0) {
421                         kfree((void *)ed->base);
422                         ed->base = 0;
423                         ed->off = 0;
424                 }
425         }
426         return bytes;
427 }
428
429 /* throw away count bytes from the end of a
430  * block's extradata.  Returns count of bytes
431  * thrown away
432  */
433
434 static int dropext(struct block *bp, int count)
435 {
436         struct extra_bdata *ed;
437         int i, rem, bytes = 0;
438
439         for (i = bp->nr_extra_bufs - 1; bp->extra_len && count && i >= 0; i--) {
440                 ed = &bp->extra_data[i];
441                 rem = MIN(count, ed->len);
442                 bp->extra_len -= rem;
443                 count -= rem;
444                 bytes += rem;
445                 ed->len -= rem;
446                 if (ed->len == 0) {
447                         kfree((void *)ed->base);
448                         ed->base = 0;
449                         ed->off = 0;
450                 }
451         }
452         return bytes;
453 }
454
455 /*
456  *  throw away up to count bytes from a
457  *  list of blocks.  Return count of bytes
458  *  thrown away.
459  */
460 static int _pullblock(struct block **bph, int count, int free)
461 {
462         struct block *bp;
463         int n, bytes;
464
465         bytes = 0;
466         if (bph == NULL)
467                 return 0;
468
469         while (*bph != NULL && count != 0) {
470                 bp = *bph;
471
472                 n = MIN(BHLEN(bp), count);
473                 bytes += n;
474                 count -= n;
475                 bp->rp += n;
476                 n = pullext(bp, count);
477                 bytes += n;
478                 count -= n;
479                 QDEBUG checkb(bp, "pullblock ");
480                 if (BLEN(bp) == 0 && (free || count)) {
481                         *bph = bp->next;
482                         bp->next = NULL;
483                         freeb(bp);
484                 }
485         }
486         return bytes;
487 }
488
489 int pullblock(struct block **bph, int count)
490 {
491         return _pullblock(bph, count, 1);
492 }
493
494 /*
495  *  trim to len bytes starting at offset
496  */
497 struct block *trimblock(struct block *bp, int offset, int len)
498 {
499         uint32_t l, trim;
500         int olen = len;
501
502         QDEBUG checkb(bp, "trimblock 1");
503         if (blocklen(bp) < offset + len) {
504                 freeblist(bp);
505                 return NULL;
506         }
507
508         l =_pullblock(&bp, offset, 0);
509         if (bp == NULL)
510                 return NULL;
511         if (l != offset) {
512                 freeblist(bp);
513                 return NULL;
514         }
515
516         while ((l = BLEN(bp)) < len) {
517                 len -= l;
518                 bp = bp->next;
519         }
520
521         trim = BLEN(bp) - len;
522         trim -= dropext(bp, trim);
523         bp->wp -= trim;
524
525         if (bp->next) {
526                 freeblist(bp->next);
527                 bp->next = NULL;
528         }
529         return bp;
530 }
531
532 /* Adjust block @bp so that its size is exactly @len.
533  * If the size is increased, fill in the new contents with zeros.
534  * If the size is decreased, discard some of the old contents at the tail. */
535 struct block *adjustblock(struct block *bp, int len)
536 {
537         struct extra_bdata *ebd;
538         void *buf;
539         int i;
540
541         if (len < 0) {
542                 freeb(bp);
543                 return NULL;
544         }
545
546         if (len == BLEN(bp))
547                 return bp;
548
549         /* Shrink within block main body. */
550         if (len <= BHLEN(bp)) {
551                 free_block_extra(bp);
552                 bp->wp = bp->rp + len;
553                 QDEBUG checkb(bp, "adjustblock 1");
554                 return bp;
555         }
556
557         /* Need to grow. */
558         if (len > BLEN(bp)) {
559                 /* Grow within block main body. */
560                 if (bp->extra_len == 0 && bp->rp + len <= bp->lim) {
561                         memset(bp->wp, 0, len - BLEN(bp));
562                         bp->wp = bp->rp + len;
563                         QDEBUG checkb(bp, "adjustblock 2");
564                         return bp;
565                 }
566                 /* Grow with extra data buffers. */
567                 buf = kzmalloc(len - BLEN(bp), MEM_WAIT);
568                 block_append_extra(bp, (uintptr_t)buf, 0, len - BLEN(bp), MEM_WAIT);
569                 QDEBUG checkb(bp, "adjustblock 3");
570                 return bp;
571         }
572
573         /* Shrink extra data buffers.
574          * len is how much of ebd we need to keep.
575          * extra_len is re-accumulated. */
576         assert(bp->extra_len > 0);
577         len -= BHLEN(bp);
578         bp->extra_len = 0;
579         for (i = 0; i < bp->nr_extra_bufs; i++) {
580                 ebd = &bp->extra_data[i];
581                 if (len <= ebd->len)
582                         break;
583                 len -= ebd->len;
584                 bp->extra_len += ebd->len;
585         }
586         /* If len becomes zero, extra_data[i] should be freed. */
587         if (len > 0) {
588                 ebd = &bp->extra_data[i];
589                 ebd->len = len;
590                 bp->extra_len += ebd->len;
591                 i++;
592         }
593         for (; i < bp->nr_extra_bufs; i++) {
594                 ebd = &bp->extra_data[i];
595                 if (ebd->base)
596                         kfree((void*)ebd->base);
597                 ebd->base = ebd->off = ebd->len = 0;
598         }
599         QDEBUG checkb(bp, "adjustblock 4");
600         return bp;
601 }
602
603 /* Helper: removes and returns the first block from q */
604 static struct block *pop_first_block(struct queue *q)
605 {
606         struct block *b = q->bfirst;
607
608         q->len -= BALLOC(b);  // XXX all usages of q->len with extra_data are fucked
609         q->dlen -= BLEN(b);
610         q->bfirst = b->next;
611         b->next = 0;
612         return b;
613 }
614
615 /* Helper: copies up to copy_amt from a buf to a block's main body (b->wp) */
616 static size_t copy_to_block_body(struct block *to, void *from, size_t copy_amt)
617 {
618         copy_amt = MIN(to->lim - to->wp, copy_amt);
619         memcpy(to->wp, from, copy_amt);
620         to->wp += copy_amt;
621         return copy_amt;
622 }
623
624 /* Accounting helper.  Block b in q lost amt extra_data */
625 static void block_and_q_lost_extra(struct block *b, struct queue *q, size_t amt)
626 {
627         b->extra_len -= amt;
628         q->len -= amt;
629         q->dlen -= amt;
630 }
631
632 /* Helper: moves ebd from a block (in from_q) to another block.  The *ebd is
633  * fixed in 'from', so we move its contents and zero it out in 'from'.
634  *
635  * Returns the length moved (0 on failure). */
636 static size_t move_ebd(struct extra_bdata *ebd, struct block *to,
637                        struct block *from, struct queue *from_q)
638 {
639         size_t ret = ebd->len;
640
641         if (block_append_extra(to, ebd->base, ebd->off, ebd->len, MEM_ATOMIC))
642                 return 0;
643         block_and_q_lost_extra(from, from_q, ebd->len);
644         ebd->base = ebd->len = ebd->off = 0;
645         return ret;
646 }
647
648 /* Copy up to len bytes from q->bfirst to @to, leaving the block in place.  May
649  * return with less than len, but greater than 0, even if there is more
650  * available in q.
651  *
652  * At any moment that we have copied anything and things are tricky, we can just
653  * return.  The trickiness comes from a bunch of variables: is the main body
654  * empty?  How do we split the ebd?  If our alloc fails, then we can fall back
655  * to @to's main body, but only if we haven't used it yet. */
656 static size_t copy_from_first_block(struct queue *q, struct block *to,
657                                     size_t len)
658 {
659         struct block *from = q->bfirst;
660         size_t copy_amt, amt;
661         struct extra_bdata *ebd;
662
663         assert(len < BLEN(from));       /* sanity */
664         /* Try to extract from the main body */
665         copy_amt = MIN(BHLEN(from), len);
666         if (copy_amt) {
667                 copy_amt = copy_to_block_body(to, from->rp, copy_amt);
668                 from->rp += copy_amt;
669                 /* We only change dlen, (data len), not q->len, since the q still has
670                  * the same block memory allocation (no kfrees happened) */
671                 q->dlen -= copy_amt;
672         }
673         /* Try to extract the remainder from the extra data */
674         len -= copy_amt;
675         for (int i = 0; (i < from->nr_extra_bufs) && len; i++) {
676                 ebd = &from->extra_data[i];
677                 if (!ebd->base || !ebd->len)
678                         continue;
679                 if (len >= ebd->len) {
680                         amt = move_ebd(ebd, to, from, q);
681                         if (!amt) {
682                                 /* our internal alloc could have failed.   this ebd is now the
683                                  * last one we'll consider.  let's handle it separately and put
684                                  * it in the main body. */
685                                 if (copy_amt)
686                                         return copy_amt;
687                                 copy_amt = copy_to_block_body(to, (void*)ebd->base + ebd->off,
688                                                               ebd->len);
689                                 block_and_q_lost_extra(from, q, copy_amt);
690                                 break;
691                         }
692                         len -= amt;
693                         copy_amt += amt;
694                         continue;
695                 } else {
696                         /* If we're here, we reached our final ebd, which we'll need to
697                          * split to get anything from it. */
698                         if (copy_amt)
699                                 return copy_amt;
700                         copy_amt = copy_to_block_body(to, (void*)ebd->base + ebd->off,
701                                                       len);
702                         ebd->off += copy_amt;
703                         ebd->len -= copy_amt;
704                         block_and_q_lost_extra(from, q, copy_amt);
705                         break;
706                 }
707         }
708         if (len)
709                 assert(copy_amt);       /* sanity */
710         return copy_amt;
711 }
712
713 /* Return codes for __qbread and __try_qbread. */
714 enum {
715         QBR_OK,
716         QBR_FAIL,
717         QBR_SPARE,      /* we need a spare block */
718         QBR_AGAIN,      /* do it again, we are coalescing blocks */
719 };
720
721 /* Helper and back-end for __qbread: extracts and returns a list of blocks
722  * containing up to len bytes.  It may contain less than len even if q has more
723  * data.
724  *
725  * Returns a code interpreted by __qbread, and the returned blist in ret. */
726 static int __try_qbread(struct queue *q, size_t len, int qio_flags,
727                         struct block **real_ret, struct block *spare)
728 {
729         struct block *ret, *ret_last, *first;
730         size_t blen;
731
732         if (qio_flags & QIO_CAN_ERR_SLEEP) {
733                 if (!qwait_and_ilock(q, qio_flags)) {
734                         spin_unlock_irqsave(&q->lock);
735                         return QBR_FAIL;
736                 }
737                 /* we qwaited and still hold the lock, so the q is not empty */
738                 first = q->bfirst;
739         } else {
740                 spin_lock_irqsave(&q->lock);
741                 first = q->bfirst;
742                 if (!first) {
743                         spin_unlock_irqsave(&q->lock);
744                         return QBR_FAIL;
745                 }
746         }
747         blen = BLEN(first);
748         if ((q->state & Qcoalesce) && (blen == 0)) {
749                 freeb(pop_first_block(q));
750                 spin_unlock_irqsave(&q->lock);
751                 /* Need to retry to make sure we have a first block */
752                 return QBR_AGAIN;
753         }
754         /* Qmsg is a bit weird.  The old 9ns code seemed to yank the entire block,
755          * regardless of len.  We'll do the same, and just return the minimum: the
756          * first block.  I'd be happy to remove this. */
757         if (q->state & Qmsg) {
758                 ret = pop_first_block(q);
759                 goto out_ok;
760         }
761         /* Let's get at least something first - makes the code easier.  This way,
762          * we'll only ever split the block once. */
763         if (blen <= len) {
764                 ret = pop_first_block(q);
765                 len -= blen;
766         } else {
767                 /* need to split the block.  we won't actually take the first block out
768                  * of the queue - we're just extracting a little bit. */
769                 if (!spare) {
770                         /* We have nothing and need a spare block.  Retry! */
771                         spin_unlock_irqsave(&q->lock);
772                         return QBR_SPARE;
773                 }
774                 copy_from_first_block(q, spare, len);
775                 ret = spare;
776                 goto out_ok;
777         }
778         /* At this point, we just grabbed the first block.  We can try to grab some
779          * more, up to len (if they want). */
780         if (qio_flags & QIO_JUST_ONE_BLOCK)
781                 goto out_ok;
782         ret_last = ret;
783         while (q->bfirst && (len > 0)) {
784                 blen = BLEN(q->bfirst);
785                 if ((q->state & Qcoalesce) && (blen == 0)) {
786                         /* remove the intermediate 0 blocks */
787                         freeb(pop_first_block(q));
788                         continue;
789                 }
790                 if (blen > len) {
791                         /* We could try to split the block, but that's a huge pain.  For
792                          * instance, we might need to move the main body of b into an
793                          * extra_data of ret_last.  lots of ways for that to fail, and lots
794                          * of cases to consider.  Easier to just bail out.  This is why I
795                          * did the first block above: we don't need to worry about this. */
796                          break;
797                 }
798                 ret_last->next = pop_first_block(q);
799                 ret_last = ret_last->next;
800                 len -= blen;
801         }
802 out_ok:
803         qwakeup_iunlock(q, qio_flags);
804         *real_ret = ret;
805         return QBR_OK;
806 }
807
808 /* Helper and front-end for __try_qbread: extracts and returns a list of blocks
809  * containing up to len bytes.  It may contain less than len even if q has more
810  * data.
811  *
812  * Returns 0 if the q is closed or would require blocking and !CAN_BLOCK.
813  *
814  * Technically, there's a weird corner case with !Qcoalesce and Qmsg where you
815  * could get a zero length block back. */
816 static struct block *__qbread(struct queue *q, size_t len, int qio_flags,
817                               int mem_flags)
818 {
819         struct block *ret = 0;
820         struct block *spare = 0;
821
822         while (1) {
823                 switch (__try_qbread(q, len, qio_flags, &ret, spare)) {
824                 case QBR_OK:
825                 case QBR_FAIL:
826                         if (spare && (ret != spare))
827                                 freeb(spare);
828                         return ret;
829                 case QBR_SPARE:
830                         assert(!spare);
831                         /* Due to some nastiness, we need a fresh block so we can read out
832                          * anything from the queue.  'len' seems like a reasonable amount.
833                          * Maybe we can get away with less. */
834                         spare = block_alloc(len, mem_flags);
835                         if (!spare)
836                                 return 0;
837                         break;
838                 case QBR_AGAIN:
839                         /* if the first block is 0 and we are Qcoalesce, then we'll need to
840                          * try again.  We bounce out of __try so we can perform the "is
841                          * there a block" logic again from the top. */
842                         break;
843                 }
844         }
845 }
846
847 /*
848  *  get next block from a queue, return null if nothing there
849  */
850 struct block *qget(struct queue *q)
851 {
852         /* since len == SIZE_MAX, we should never need to do a mem alloc */
853         return __qbread(q, SIZE_MAX, QIO_JUST_ONE_BLOCK, MEM_ATOMIC);
854 }
855
856 /* Throw away the next 'len' bytes in the queue returning the number actually
857  * discarded.
858  *
859  * If the bytes are in the queue, then they must be discarded.  The only time to
860  * return less than len is if the q itself has less than len bytes.
861  *
862  * This won't trigger a kick when waking up any sleepers.  This seems to be Plan
863  * 9's intent, since the TCP stack will deadlock if qdiscard kicks. */
864 size_t qdiscard(struct queue *q, size_t len)
865 {
866         struct block *blist;
867         size_t removed_amt;
868         size_t sofar = 0;
869
870         /* This is racy.  There could be multiple qdiscarders or other consumers,
871          * where the consumption could be interleaved. */
872         while (qlen(q) && len) {
873                 blist = __qbread(q, len, QIO_DONT_KICK, MEM_WAIT);
874                 removed_amt = freeblist(blist);
875                 sofar += removed_amt;
876                 len -= removed_amt;
877         }
878         return sofar;
879 }
880
881 ssize_t qpass(struct queue *q, struct block *b)
882 {
883         return __qbwrite(q, b, QIO_LIMIT | QIO_DROP_OVERFLOW);
884 }
885
886 ssize_t qpassnolim(struct queue *q, struct block *b)
887 {
888         return __qbwrite(q, b, 0);
889 }
890
891 /*
892  *  if the allocated space is way out of line with the used
893  *  space, reallocate to a smaller block
894  */
895 struct block *packblock(struct block *bp)
896 {
897         struct block **l, *nbp;
898         int n;
899
900         if (bp->extra_len)
901                 return bp;
902         for (l = &bp; *l; l = &(*l)->next) {
903                 nbp = *l;
904                 n = BLEN(nbp);
905                 if ((n << 2) < BALLOC(nbp)) {
906                         *l = block_alloc(n, MEM_WAIT);
907                         memmove((*l)->wp, nbp->rp, n);
908                         (*l)->wp += n;
909                         (*l)->next = nbp->next;
910                         freeb(nbp);
911                 }
912         }
913
914         return bp;
915 }
916
917 /* Add an extra_data entry to newb at newb_idx pointing to b's body, starting at
918  * body_rp, for up to len.  Returns the len consumed.
919  *
920  * The base is 'b', so that we can kfree it later.  This currently ties us to
921  * using kfree for the release method for all extra_data.
922  *
923  * It is possible to have a body size that is 0, if there is no offset, and
924  * b->wp == b->rp.  This will have an extra data entry of 0 length. */
925 static size_t point_to_body(struct block *b, uint8_t *body_rp,
926                             struct block *newb, unsigned int newb_idx,
927                             size_t len)
928 {
929         struct extra_bdata *ebd = &newb->extra_data[newb_idx];
930
931         assert(newb_idx < newb->nr_extra_bufs);
932
933         kmalloc_incref(b);
934         ebd->base = (uintptr_t)b;
935         ebd->off = (uint32_t)(body_rp - (uint8_t*)b);
936         ebd->len = MIN(b->wp - body_rp, len);   /* think of body_rp as b->rp */
937         assert((int)ebd->len >= 0);
938         newb->extra_len += ebd->len;
939         return ebd->len;
940 }
941
942 /* Add an extra_data entry to newb at newb_idx pointing to b's b_idx'th
943  * extra_data buf, at b_off within that buffer, for up to len.  Returns the len
944  * consumed.
945  *
946  * We can have blocks with 0 length, but they are still refcnt'd.  See above. */
947 static size_t point_to_buf(struct block *b, unsigned int b_idx, uint32_t b_off,
948                            struct block *newb, unsigned int newb_idx,
949                            size_t len)
950 {
951         struct extra_bdata *n_ebd = &newb->extra_data[newb_idx];
952         struct extra_bdata *b_ebd = &b->extra_data[b_idx];
953
954         assert(b_idx < b->nr_extra_bufs);
955         assert(newb_idx < newb->nr_extra_bufs);
956
957         kmalloc_incref((void*)b_ebd->base);
958         n_ebd->base = b_ebd->base;
959         n_ebd->off = b_ebd->off + b_off;
960         n_ebd->len = MIN(b_ebd->len - b_off, len);
961         newb->extra_len += n_ebd->len;
962         return n_ebd->len;
963 }
964
965 /* given a string of blocks, sets up the new block's extra_data such that it
966  * *points* to the contents of the blist [offset, len + offset).  This does not
967  * make a separate copy of the contents of the blist.
968  *
969  * returns 0 on success.  the only failure is if the extra_data array was too
970  * small, so this returns a positive integer saying how big the extra_data needs
971  * to be.
972  *
973  * callers are responsible for protecting the list structure. */
974 static int __blist_clone_to(struct block *blist, struct block *newb, int len,
975                             uint32_t offset)
976 {
977         struct block *b, *first;
978         unsigned int nr_bufs = 0;
979         unsigned int b_idx, newb_idx = 0;
980         uint8_t *first_main_body = 0;
981
982         /* find the first block; keep offset relative to the latest b in the list */
983         for (b = blist; b; b = b->next) {
984                 if (BLEN(b) > offset)
985                         break;
986                 offset -= BLEN(b);
987         }
988         /* qcopy semantics: if you asked for an offset outside the block list, you
989          * get an empty block back */
990         if (!b)
991                 return 0;
992         first = b;
993         /* upper bound for how many buffers we'll need in newb */
994         for (/* b is set*/; b; b = b->next) {
995                 nr_bufs += 1 + b->nr_extra_bufs;        /* 1 for the main body */
996         }
997         /* we might be holding a spinlock here, so we won't wait for kmalloc */
998         if (block_add_extd(newb, nr_bufs, 0) != 0) {
999                 /* caller will need to alloc these, then re-call us */
1000                 return nr_bufs;
1001         }
1002         for (b = first; b && len; b = b->next) {
1003                 b_idx = 0;
1004                 if (offset) {
1005                         if (offset < BHLEN(b)) {
1006                                 /* off is in the main body */
1007                                 len -= point_to_body(b, b->rp + offset, newb, newb_idx, len);
1008                                 newb_idx++;
1009                         } else {
1010                                 /* off is in one of the buffers (or just past the last one).
1011                                  * we're not going to point to b's main body at all. */
1012                                 offset -= BHLEN(b);
1013                                 assert(b->extra_data);
1014                                 /* assuming these extrabufs are packed, or at least that len
1015                                  * isn't gibberish */
1016                                 while (b->extra_data[b_idx].len <= offset) {
1017                                         offset -= b->extra_data[b_idx].len;
1018                                         b_idx++;
1019                                 }
1020                                 /* now offset is set to our offset in the b_idx'th buf */
1021                                 len -= point_to_buf(b, b_idx, offset, newb, newb_idx, len);
1022                                 newb_idx++;
1023                                 b_idx++;
1024                         }
1025                         offset = 0;
1026                 } else {
1027                         len -= point_to_body(b, b->rp, newb, newb_idx, len);
1028                         newb_idx++;
1029                 }
1030                 /* knock out all remaining bufs.  we only did one point_to_ op by now,
1031                  * and any point_to_ could be our last if it consumed all of len. */
1032                 for (int i = b_idx; (i < b->nr_extra_bufs) && len; i++) {
1033                         len -= point_to_buf(b, i, 0, newb, newb_idx, len);
1034                         newb_idx++;
1035                 }
1036         }
1037         return 0;
1038 }
1039
1040 struct block *blist_clone(struct block *blist, int header_len, int len,
1041                           uint32_t offset)
1042 {
1043         int ret;
1044         struct block *newb = block_alloc(header_len, MEM_WAIT);
1045         do {
1046                 ret = __blist_clone_to(blist, newb, len, offset);
1047                 if (ret)
1048                         block_add_extd(newb, ret, MEM_WAIT);
1049         } while (ret);
1050         return newb;
1051 }
1052
1053 /* given a queue, makes a single block with header_len reserved space in the
1054  * block main body, and the contents of [offset, len + offset) pointed to in the
1055  * new blocks ext_data.  This does not make a copy of the q's contents, though
1056  * you do have a ref count on the memory. */
1057 struct block *qclone(struct queue *q, int header_len, int len, uint32_t offset)
1058 {
1059         int ret;
1060         struct block *newb = block_alloc(header_len, MEM_WAIT);
1061         /* the while loop should rarely be used: it would require someone
1062          * concurrently adding to the queue. */
1063         do {
1064                 /* TODO: RCU: protecting the q list (b->next) (need read lock) */
1065                 spin_lock_irqsave(&q->lock);
1066                 ret = __blist_clone_to(q->bfirst, newb, len, offset);
1067                 spin_unlock_irqsave(&q->lock);
1068                 if (ret)
1069                         block_add_extd(newb, ret, MEM_WAIT);
1070         } while (ret);
1071         return newb;
1072 }
1073
1074 /*
1075  *  copy from offset in the queue
1076  */
1077 struct block *qcopy_old(struct queue *q, int len, uint32_t offset)
1078 {
1079         int sofar;
1080         int n;
1081         struct block *b, *nb;
1082         uint8_t *p;
1083
1084         nb = block_alloc(len, MEM_WAIT);
1085
1086         spin_lock_irqsave(&q->lock);
1087
1088         /* go to offset */
1089         b = q->bfirst;
1090         for (sofar = 0;; sofar += n) {
1091                 if (b == NULL) {
1092                         spin_unlock_irqsave(&q->lock);
1093                         return nb;
1094                 }
1095                 n = BLEN(b);
1096                 if (sofar + n > offset) {
1097                         p = b->rp + offset - sofar;
1098                         n -= offset - sofar;
1099                         break;
1100                 }
1101                 QDEBUG checkb(b, "qcopy");
1102                 b = b->next;
1103         }
1104
1105         /* copy bytes from there */
1106         for (sofar = 0; sofar < len;) {
1107                 if (n > len - sofar)
1108                         n = len - sofar;
1109                 PANIC_EXTRA(b);
1110                 memmove(nb->wp, p, n);
1111                 qcopycnt += n;
1112                 sofar += n;
1113                 nb->wp += n;
1114                 b = b->next;
1115                 if (b == NULL)
1116                         break;
1117                 n = BLEN(b);
1118                 p = b->rp;
1119         }
1120         spin_unlock_irqsave(&q->lock);
1121
1122         return nb;
1123 }
1124
1125 struct block *qcopy(struct queue *q, int len, uint32_t offset)
1126 {
1127 #ifdef CONFIG_BLOCK_EXTRAS
1128         return qclone(q, 0, len, offset);
1129 #else
1130         return qcopy_old(q, len, offset);
1131 #endif
1132 }
1133
1134 static void qinit_common(struct queue *q)
1135 {
1136         spinlock_init_irqsave(&q->lock);
1137         rendez_init(&q->rr);
1138         rendez_init(&q->wr);
1139 }
1140
1141 /*
1142  *  called by non-interrupt code
1143  */
1144 struct queue *qopen(int limit, int msg, void (*kick) (void *), void *arg)
1145 {
1146         struct queue *q;
1147
1148         q = kzmalloc(sizeof(struct queue), 0);
1149         if (q == 0)
1150                 return 0;
1151         qinit_common(q);
1152
1153         q->limit = q->inilim = limit;
1154         q->kick = kick;
1155         q->arg = arg;
1156         q->state = msg;
1157         q->state |= Qstarve;
1158         q->eof = 0;
1159
1160         return q;
1161 }
1162
1163 /* open a queue to be bypassed */
1164 struct queue *qbypass(void (*bypass) (void *, struct block *), void *arg)
1165 {
1166         struct queue *q;
1167
1168         q = kzmalloc(sizeof(struct queue), 0);
1169         if (q == 0)
1170                 return 0;
1171         qinit_common(q);
1172
1173         q->limit = 0;
1174         q->arg = arg;
1175         q->bypass = bypass;
1176         q->state = 0;
1177
1178         return q;
1179 }
1180
1181 static int notempty(void *a)
1182 {
1183         struct queue *q = a;
1184
1185         return (q->state & Qclosed) || q->bfirst != 0;
1186 }
1187
1188 /* Block, waiting for the queue to be non-empty or closed.  Returns with
1189  * the spinlock held.  Returns TRUE when there queue is not empty, FALSE if it
1190  * was naturally closed.  Throws an error o/w. */
1191 static bool qwait_and_ilock(struct queue *q, int qio_flags)
1192 {
1193         while (1) {
1194                 spin_lock_irqsave(&q->lock);
1195                 if (q->bfirst != NULL)
1196                         return TRUE;
1197                 if (q->state & Qclosed) {
1198                         if (++q->eof > 3) {
1199                                 spin_unlock_irqsave(&q->lock);
1200                                 error(EPIPE, "multiple reads on a closed queue");
1201                         }
1202                         if (q->err[0]) {
1203                                 spin_unlock_irqsave(&q->lock);
1204                                 error(EPIPE, q->err);
1205                         }
1206                         return FALSE;
1207                 }
1208                 /* We set Qstarve regardless of whether we are non-blocking or not.
1209                  * Qstarve tracks the edge detection of the queue being empty. */
1210                 q->state |= Qstarve;
1211                 if (qio_flags & QIO_NON_BLOCK) {
1212                         spin_unlock_irqsave(&q->lock);
1213                         error(EAGAIN, "queue empty");
1214                 }
1215                 spin_unlock_irqsave(&q->lock);
1216                 /* may throw an error() */
1217                 rendez_sleep(&q->rr, notempty, q);
1218         }
1219 }
1220
1221 /*
1222  * add a block list to a queue
1223  * XXX basically the same as enqueue blist, and has no locking!
1224  */
1225 void qaddlist(struct queue *q, struct block *b)
1226 {
1227         /* TODO: q lock? */
1228         /* queue the block */
1229         if (q->bfirst)
1230                 q->blast->next = b;
1231         else
1232                 q->bfirst = b;
1233         q->len += blockalloclen(b);
1234         q->dlen += blocklen(b);
1235         while (b->next)
1236                 b = b->next;
1237         q->blast = b;
1238 }
1239
1240 static size_t read_from_block(struct block *b, uint8_t *to, size_t amt)
1241 {
1242         size_t copy_amt, retval = 0;
1243         struct extra_bdata *ebd;
1244
1245         copy_amt = MIN(BHLEN(b), amt);
1246         memcpy(to, b->rp, copy_amt);
1247         /* advance the rp, since this block not be completely consumed and future
1248          * reads need to know where to pick up from */
1249         b->rp += copy_amt;
1250         to += copy_amt;
1251         amt -= copy_amt;
1252         retval += copy_amt;
1253         for (int i = 0; (i < b->nr_extra_bufs) && amt; i++) {
1254                 ebd = &b->extra_data[i];
1255                 /* skip empty entires.  if we track this in the struct block, we can
1256                  * just start the for loop early */
1257                 if (!ebd->base || !ebd->len)
1258                         continue;
1259                 copy_amt = MIN(ebd->len, amt);
1260                 memcpy(to, (void*)(ebd->base + ebd->off), copy_amt);
1261                 /* we're actually consuming the entries, just like how we advance rp up
1262                  * above, and might only consume part of one. */
1263                 ebd->len -= copy_amt;
1264                 ebd->off += copy_amt;
1265                 b->extra_len -= copy_amt;
1266                 if (!ebd->len) {
1267                         /* we don't actually have to decref here.  it's also done in
1268                          * freeb().  this is the earliest we can free. */
1269                         kfree((void*)ebd->base);
1270                         ebd->base = ebd->off = 0;
1271                 }
1272                 to += copy_amt;
1273                 amt -= copy_amt;
1274                 retval += copy_amt;
1275         }
1276         return retval;
1277 }
1278
1279 /*
1280  *  copy the contents of a string of blocks into
1281  *  memory.  emptied blocks are freed.  return
1282  *  pointer to first unconsumed block.
1283  */
1284 struct block *bl2mem(uint8_t * p, struct block *b, int n)
1285 {
1286         int i;
1287         struct block *next;
1288
1289         /* could be slicker here, since read_from_block is smart */
1290         for (; b != NULL; b = next) {
1291                 i = BLEN(b);
1292                 if (i > n) {
1293                         /* partial block, consume some */
1294                         read_from_block(b, p, n);
1295                         return b;
1296                 }
1297                 /* full block, consume all and move on */
1298                 i = read_from_block(b, p, i);
1299                 n -= i;
1300                 p += i;
1301                 next = b->next;
1302                 freeb(b);
1303         }
1304         return NULL;
1305 }
1306
1307 /* Extract the contents of all blocks and copy to va, up to len.  Returns the
1308  * actual amount copied. */
1309 static size_t read_all_blocks(struct block *b, void *va, size_t len)
1310 {
1311         size_t sofar = 0;
1312         struct block *next;
1313
1314         do {
1315                 /* We should be draining every block completely. */
1316                 assert(BLEN(b) <= len - sofar);
1317                 sofar += read_from_block(b, va + sofar, len - sofar);
1318                 next = b->next;
1319                 freeb(b);
1320                 b = next;
1321         } while (b);
1322         return sofar;
1323 }
1324
1325 /*
1326  *  copy the contents of memory into a string of blocks.
1327  *  return NULL on error.
1328  */
1329 struct block *mem2bl(uint8_t * p, int len)
1330 {
1331         ERRSTACK(1);
1332         int n;
1333         struct block *b, *first, **l;
1334
1335         first = NULL;
1336         l = &first;
1337         if (waserror()) {
1338                 freeblist(first);
1339                 nexterror();
1340         }
1341         do {
1342                 n = len;
1343                 if (n > Maxatomic)
1344                         n = Maxatomic;
1345
1346                 *l = b = block_alloc(n, MEM_WAIT);
1347                 /* TODO consider extra_data */
1348                 memmove(b->wp, p, n);
1349                 b->wp += n;
1350                 p += n;
1351                 len -= n;
1352                 l = &b->next;
1353         } while (len > 0);
1354         poperror();
1355
1356         return first;
1357 }
1358
1359 /*
1360  *  put a block back to the front of the queue
1361  *  called with q ilocked
1362  */
1363 void qputback(struct queue *q, struct block *b)
1364 {
1365         b->next = q->bfirst;
1366         if (q->bfirst == NULL)
1367                 q->blast = b;
1368         q->bfirst = b;
1369         q->len += BALLOC(b);
1370         q->dlen += BLEN(b);
1371 }
1372
1373 /*
1374  *  flow control, get producer going again
1375  *  called with q ilocked
1376  */
1377 static void qwakeup_iunlock(struct queue *q, int qio_flags)
1378 {
1379         int dowakeup = 0;
1380
1381         /*
1382          *  if writer flow controlled, restart
1383          *
1384          *  This used to be
1385          *  q->len < q->limit/2
1386          *  but it slows down tcp too much for certain write sizes.
1387          *  I really don't understand it completely.  It may be
1388          *  due to the queue draining so fast that the transmission
1389          *  stalls waiting for the app to produce more data.  - presotto
1390          */
1391         if ((q->state & Qflow) && q->len < q->limit) {
1392                 q->state &= ~Qflow;
1393                 dowakeup = 1;
1394         }
1395
1396         spin_unlock_irqsave(&q->lock);
1397
1398         /* wakeup flow controlled writers */
1399         if (dowakeup) {
1400                 if (q->kick && !(qio_flags & QIO_DONT_KICK))
1401                         q->kick(q->arg);
1402                 rendez_wakeup(&q->wr);
1403         }
1404         qwake_cb(q, FDTAP_FILT_WRITABLE);
1405 }
1406
1407 /*
1408  *  get next block from a queue (up to a limit)
1409  *
1410  */
1411 struct block *qbread(struct queue *q, size_t len)
1412 {
1413         return __qbread(q, len, QIO_JUST_ONE_BLOCK | QIO_CAN_ERR_SLEEP, MEM_WAIT);
1414 }
1415
1416 struct block *qbread_nonblock(struct queue *q, size_t len)
1417 {
1418         return __qbread(q, len, QIO_JUST_ONE_BLOCK | QIO_CAN_ERR_SLEEP |
1419                         QIO_NON_BLOCK, MEM_WAIT);
1420 }
1421
1422 /* read up to len from a queue into vp. */
1423 size_t qread(struct queue *q, void *va, size_t len)
1424 {
1425         struct block *blist = __qbread(q, len, QIO_CAN_ERR_SLEEP, MEM_WAIT);
1426
1427         if (!blist)
1428                 return 0;
1429         return read_all_blocks(blist, va, len);
1430 }
1431
1432 size_t qread_nonblock(struct queue *q, void *va, size_t len)
1433 {
1434         struct block *blist = __qbread(q, len, QIO_CAN_ERR_SLEEP | QIO_NON_BLOCK,
1435                                        MEM_WAIT);
1436
1437         if (!blist)
1438                 return 0;
1439         return read_all_blocks(blist, va, len);
1440 }
1441
1442 static int qnotfull(void *a)
1443 {
1444         struct queue *q = a;
1445
1446         return q->len < q->limit || (q->state & Qclosed);
1447 }
1448
1449 /* Helper: enqueues a list of blocks to a queue.  Returns the total length. */
1450 static size_t enqueue_blist(struct queue *q, struct block *b)
1451 {
1452         size_t len, dlen;
1453
1454         if (q->bfirst)
1455                 q->blast->next = b;
1456         else
1457                 q->bfirst = b;
1458         len = BALLOC(b);
1459         dlen = BLEN(b);
1460         while (b->next) {
1461                 b = b->next;
1462                 len += BALLOC(b);
1463                 dlen += BLEN(b);
1464         }
1465         q->blast = b;
1466         q->len += len;
1467         q->dlen += dlen;
1468         return dlen;
1469 }
1470
1471 /* Adds block (which can be a list of blocks) to the queue, subject to
1472  * qio_flags.  Returns the length written on success or -1 on non-throwable
1473  * error.  Adjust qio_flags to control the value-added features!. */
1474 static ssize_t __qbwrite(struct queue *q, struct block *b, int qio_flags)
1475 {
1476         ssize_t ret;
1477         bool dowakeup = FALSE;
1478         bool was_empty;
1479
1480         if (q->bypass) {
1481                 ret = blocklen(b);
1482                 (*q->bypass) (q->arg, b);
1483                 return ret;
1484         }
1485         spin_lock_irqsave(&q->lock);
1486         was_empty = q->len == 0;
1487         if (q->state & Qclosed) {
1488                 spin_unlock_irqsave(&q->lock);
1489                 freeblist(b);
1490                 if (!(qio_flags & QIO_CAN_ERR_SLEEP))
1491                         return -1;
1492                 if (q->err[0])
1493                         error(EPIPE, q->err);
1494                 else
1495                         error(EPIPE, "connection closed");
1496         }
1497         if ((qio_flags & QIO_LIMIT) && (q->len >= q->limit)) {
1498                 /* drop overflow takes priority over regular non-blocking */
1499                 if ((qio_flags & QIO_DROP_OVERFLOW) || (q->state & Qdropoverflow)) {
1500                         spin_unlock_irqsave(&q->lock);
1501                         freeb(b);
1502                         return -1;
1503                 }
1504                 /* People shouldn't set NON_BLOCK without CAN_ERR, but we can be nice
1505                  * and catch it. */
1506                 if ((qio_flags & QIO_CAN_ERR_SLEEP) && (qio_flags & QIO_NON_BLOCK)) {
1507                         spin_unlock_irqsave(&q->lock);
1508                         freeb(b);
1509                         error(EAGAIN, "queue full");
1510                 }
1511         }
1512         ret = enqueue_blist(q, b);
1513         QDEBUG checkb(b, "__qbwrite");
1514         /* make sure other end gets awakened */
1515         if (q->state & Qstarve) {
1516                 q->state &= ~Qstarve;
1517                 dowakeup = TRUE;
1518         }
1519         spin_unlock_irqsave(&q->lock);
1520         /* TODO: not sure if the usage of a kick is mutually exclusive with a
1521          * wakeup, meaning that actual users either want a kick or have qreaders. */
1522         if (q->kick && (dowakeup || (q->state & Qkick)))
1523                 q->kick(q->arg);
1524         if (dowakeup)
1525                 rendez_wakeup(&q->rr);
1526         if (was_empty)
1527                 qwake_cb(q, FDTAP_FILT_READABLE);
1528         /*
1529          *  flow control, wait for queue to get below the limit
1530          *  before allowing the process to continue and queue
1531          *  more.  We do this here so that postnote can only
1532          *  interrupt us after the data has been queued.  This
1533          *  means that things like 9p flushes and ssl messages
1534          *  will not be disrupted by software interrupts.
1535          *
1536          *  Note - this is moderately dangerous since a process
1537          *  that keeps getting interrupted and rewriting will
1538          *  queue infinite crud.
1539          */
1540         if ((qio_flags & QIO_CAN_ERR_SLEEP) &&
1541             !(q->state & Qdropoverflow) && !(qio_flags & QIO_NON_BLOCK)) {
1542                 /* This is a racy peek at the q status.  If we accidentally block, we
1543                  * set Qflow, so someone should wake us.  If we accidentally don't
1544                  * block, we just returned to the user and let them slip a block past
1545                  * flow control. */
1546                 while (!qnotfull(q)) {
1547                         spin_lock_irqsave(&q->lock);
1548                         q->state |= Qflow;
1549                         spin_unlock_irqsave(&q->lock);
1550                         rendez_sleep(&q->wr, qnotfull, q);
1551                 }
1552         }
1553         return ret;
1554 }
1555
1556 /*
1557  *  add a block to a queue obeying flow control
1558  */
1559 ssize_t qbwrite(struct queue *q, struct block *b)
1560 {
1561         return __qbwrite(q, b, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
1562 }
1563
1564 ssize_t qbwrite_nonblock(struct queue *q, struct block *b)
1565 {
1566         return __qbwrite(q, b, QIO_CAN_ERR_SLEEP | QIO_LIMIT | QIO_NON_BLOCK);
1567 }
1568
1569 ssize_t qibwrite(struct queue *q, struct block *b)
1570 {
1571         return __qbwrite(q, b, 0);
1572 }
1573
1574 /* Helper, allocs a block and copies [from, from + len) into it.  Returns the
1575  * block on success, 0 on failure. */
1576 static struct block *build_block(void *from, size_t len, int mem_flags)
1577 {
1578         struct block *b;
1579         void *ext_buf;
1580
1581         /* If len is small, we don't need to bother with the extra_data.  But until
1582          * the whole stack can handle extd blocks, we'll use them unconditionally.
1583          * */
1584 #ifdef CONFIG_BLOCK_EXTRAS
1585         /* allocb builds in 128 bytes of header space to all blocks, but this is
1586          * only available via padblock (to the left).  we also need some space
1587          * for pullupblock for some basic headers (like icmp) that get written
1588          * in directly */
1589         b = block_alloc(64, mem_flags);
1590         if (!b)
1591                 return 0;
1592         ext_buf = kmalloc(len, mem_flags);
1593         if (!ext_buf) {
1594                 kfree(b);
1595                 return 0;
1596         }
1597         memcpy(ext_buf, from, len);
1598         if (block_add_extd(b, 1, mem_flags)) {
1599                 kfree(ext_buf);
1600                 kfree(b);
1601                 return 0;
1602         }
1603         b->extra_data[0].base = (uintptr_t)ext_buf;
1604         b->extra_data[0].off = 0;
1605         b->extra_data[0].len = len;
1606         b->extra_len += len;
1607 #else
1608         b = block_alloc(len, mem_flags);
1609         if (!b)
1610                 return 0;
1611         memmove(b->wp, from, len);
1612         b->wp += len;
1613 #endif
1614         return b;
1615 }
1616
1617 static ssize_t __qwrite(struct queue *q, void *vp, size_t len, int mem_flags,
1618                         int qio_flags)
1619 {
1620         size_t n, sofar;
1621         struct block *b;
1622         uint8_t *p = vp;
1623         void *ext_buf;
1624
1625         sofar = 0;
1626         do {
1627                 n = len - sofar;
1628                 /* This is 64K, the max amount per single block.  Still a good value? */
1629                 if (n > Maxatomic)
1630                         n = Maxatomic;
1631                 b = build_block(p + sofar, n, mem_flags);
1632                 if (!b)
1633                         break;
1634                 if (__qbwrite(q, b, qio_flags) < 0)
1635                         break;
1636                 sofar += n;
1637         } while ((sofar < len) && (q->state & Qmsg) == 0);
1638         return sofar;
1639 }
1640
1641 ssize_t qwrite(struct queue *q, void *vp, int len)
1642 {
1643         return __qwrite(q, vp, len, MEM_WAIT, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
1644 }
1645
1646 ssize_t qwrite_nonblock(struct queue *q, void *vp, int len)
1647 {
1648         return __qwrite(q, vp, len, MEM_WAIT, QIO_CAN_ERR_SLEEP | QIO_LIMIT |
1649                                               QIO_NON_BLOCK);
1650 }
1651
1652 ssize_t qiwrite(struct queue *q, void *vp, int len)
1653 {
1654         return __qwrite(q, vp, len, MEM_ATOMIC, 0);
1655 }
1656
1657 /*
1658  *  be extremely careful when calling this,
1659  *  as there is no reference accounting
1660  */
1661 void qfree(struct queue *q)
1662 {
1663         qclose(q);
1664         kfree(q);
1665 }
1666
1667 /*
1668  *  Mark a queue as closed.  No further IO is permitted.
1669  *  All blocks are released.
1670  */
1671 void qclose(struct queue *q)
1672 {
1673         struct block *bfirst;
1674
1675         if (q == NULL)
1676                 return;
1677
1678         /* mark it */
1679         spin_lock_irqsave(&q->lock);
1680         q->state |= Qclosed;
1681         q->state &= ~(Qflow | Qstarve | Qdropoverflow);
1682         q->err[0] = 0;
1683         bfirst = q->bfirst;
1684         q->bfirst = 0;
1685         q->len = 0;
1686         q->dlen = 0;
1687         spin_unlock_irqsave(&q->lock);
1688
1689         /* free queued blocks */
1690         freeblist(bfirst);
1691
1692         /* wake up readers/writers */
1693         rendez_wakeup(&q->rr);
1694         rendez_wakeup(&q->wr);
1695         qwake_cb(q, FDTAP_FILT_HANGUP);
1696 }
1697
1698 /* Mark a queue as closed.  Wakeup any readers.  Don't remove queued blocks.
1699  *
1700  * msg will be the errstr received by any waiters (qread, qbread, etc).  If
1701  * there is no message, which is what also happens during a natural qclose(),
1702  * those waiters will simply return 0.  qwriters will always error() on a
1703  * closed/hungup queue. */
1704 void qhangup(struct queue *q, char *msg)
1705 {
1706         /* mark it */
1707         spin_lock_irqsave(&q->lock);
1708         q->state |= Qclosed;
1709         if (msg == 0 || *msg == 0)
1710                 q->err[0] = 0;
1711         else
1712                 strlcpy(q->err, msg, ERRMAX);
1713         spin_unlock_irqsave(&q->lock);
1714
1715         /* wake up readers/writers */
1716         rendez_wakeup(&q->rr);
1717         rendez_wakeup(&q->wr);
1718         qwake_cb(q, FDTAP_FILT_HANGUP);
1719 }
1720
1721 /*
1722  *  return non-zero if the q is hungup
1723  */
1724 int qisclosed(struct queue *q)
1725 {
1726         return q->state & Qclosed;
1727 }
1728
1729 /*
1730  *  mark a queue as no longer hung up.  resets the wake_cb.
1731  */
1732 void qreopen(struct queue *q)
1733 {
1734         spin_lock_irqsave(&q->lock);
1735         q->state &= ~Qclosed;
1736         q->state |= Qstarve;
1737         q->eof = 0;
1738         q->limit = q->inilim;
1739         q->wake_cb = 0;
1740         q->wake_data = 0;
1741         spin_unlock_irqsave(&q->lock);
1742 }
1743
1744 /*
1745  *  return bytes queued
1746  */
1747 int qlen(struct queue *q)
1748 {
1749         return q->dlen;
1750 }
1751
1752 /*
1753  * return space remaining before flow control
1754  */
1755 int qwindow(struct queue *q)
1756 {
1757         int l;
1758
1759         l = q->limit - q->len;
1760         if (l < 0)
1761                 l = 0;
1762         return l;
1763 }
1764
1765 /*
1766  *  return true if we can read without blocking
1767  */
1768 int qcanread(struct queue *q)
1769 {
1770         return q->bfirst != 0;
1771 }
1772
1773 /*
1774  *  change queue limit
1775  */
1776 void qsetlimit(struct queue *q, int limit)
1777 {
1778         q->limit = limit;
1779 }
1780
1781 /*
1782  *  set whether writes drop overflowing blocks, or if we sleep
1783  */
1784 void qdropoverflow(struct queue *q, bool onoff)
1785 {
1786         spin_lock_irqsave(&q->lock);
1787         if (onoff)
1788                 q->state |= Qdropoverflow;
1789         else
1790                 q->state &= ~Qdropoverflow;
1791         spin_unlock_irqsave(&q->lock);
1792 }
1793
1794 /*
1795  *  flush the output queue
1796  */
1797 void qflush(struct queue *q)
1798 {
1799         struct block *bfirst;
1800
1801         /* mark it */
1802         spin_lock_irqsave(&q->lock);
1803         bfirst = q->bfirst;
1804         q->bfirst = 0;
1805         q->len = 0;
1806         q->dlen = 0;
1807         spin_unlock_irqsave(&q->lock);
1808
1809         /* free queued blocks */
1810         freeblist(bfirst);
1811
1812         /* wake up writers */
1813         rendez_wakeup(&q->wr);
1814         qwake_cb(q, FDTAP_FILT_WRITABLE);
1815 }
1816
1817 int qfull(struct queue *q)
1818 {
1819         return q->len >= q->limit;
1820 }
1821
1822 int qstate(struct queue *q)
1823 {
1824         return q->state;
1825 }
1826
1827 void qdump(struct queue *q)
1828 {
1829         if (q)
1830                 printk("q=%p bfirst=%p blast=%p len=%d dlen=%d limit=%d state=#%x\n",
1831                            q, q->bfirst, q->blast, q->len, q->dlen, q->limit, q->state);
1832 }
1833
1834 /* On certain wakeup events, qio will call func(q, data, filter), where filter
1835  * marks the type of wakeup event (flags from FDTAP).
1836  *
1837  * There's no sync protection.  If you change the CB while the qio is running,
1838  * you might get a CB with the data or func from a previous set_wake_cb.  You
1839  * should set this once per queue and forget it.
1840  *
1841  * You can remove the CB by passing in 0 for the func.  Alternatively, you can
1842  * just make sure that the func(data) pair are valid until the queue is freed or
1843  * reopened. */
1844 void qio_set_wake_cb(struct queue *q, qio_wake_cb_t func, void *data)
1845 {
1846         q->wake_data = data;
1847         wmb();  /* if we see func, we'll also see the data for it */
1848         q->wake_cb = func;
1849 }
1850
1851 /* Helper for detecting whether we'll block on a read at this instant. */
1852 bool qreadable(struct queue *q)
1853 {
1854         return qlen(q) > 0;
1855 }
1856
1857 /* Helper for detecting whether we'll block on a write at this instant. */
1858 bool qwritable(struct queue *q)
1859 {
1860         return qwindow(q) > 0;
1861 }