vfs: Remove KFS, blockdev and devfs
[akaros.git] / kern / src / ns / qio.c
1 /* Copyright © 1994-1999 Lucent Technologies Inc.  All rights reserved.
2  * Portions Copyright © 1997-1999 Vita Nuova Limited
3  * Portions Copyright © 2000-2007 Vita Nuova Holdings Limited
4  *                                (www.vitanuova.com)
5  * Revisions Copyright © 2000-2007 Lucent Technologies Inc. and others
6  *
7  * Modified for the Akaros operating system:
8  * Copyright (c) 2013-2014 The Regents of the University of California
9  * Copyright (c) 2013-2015 Google Inc.
10  *
11  * Permission is hereby granted, free of charge, to any person obtaining a copy
12  * of this software and associated documentation files (the "Software"), to deal
13  * in the Software without restriction, including without limitation the rights
14  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15  * copies of the Software, and to permit persons to whom the Software is
16  * furnished to do so, subject to the following conditions:
17  *
18  * The above copyright notice and this permission notice shall be included in
19  * all copies or substantial portions of the Software.
20  *
21  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL THE
24  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
27  * SOFTWARE. */
28
29 #include <vfs.h>
30 #include <slab.h>
31 #include <kmalloc.h>
32 #include <kref.h>
33 #include <string.h>
34 #include <stdio.h>
35 #include <assert.h>
36 #include <error.h>
37 #include <cpio.h>
38 #include <pmap.h>
39 #include <smp.h>
40 #include <net/ip.h>
41
42 #define PANIC_EXTRA(b)                                                  \
43 {                                                                       \
44         if ((b)->extra_len) {                                           \
45                 printblock(b);                                          \
46                 backtrace();                                            \
47                 panic("%s doesn't handle extra_data", __FUNCTION__);    \
48         }                                                               \
49 }
50
51 static uint32_t padblockcnt;
52 static uint32_t concatblockcnt;
53 static uint32_t pullupblockcnt;
54 static uint32_t copyblockcnt;
55 static uint32_t consumecnt;
56 static uint32_t producecnt;
57 static uint32_t qcopycnt;
58
59 static int debugging;
60
61 #define QDEBUG  if(0)
62
63 /*
64  *  IO queues
65  */
66
67 struct queue {
68         spinlock_t lock;;
69
70         struct block *bfirst;           /* buffer */
71         struct block *blast;
72
73         int dlen;                                       /* data bytes in queue */
74         int limit;                                      /* max bytes in queue */
75         int inilim;                             /* initial limit */
76         int state;
77         int eof;                                        /* number of eofs read by user */
78         size_t bytes_read;
79
80         void (*kick) (void *);          /* restart output */
81         void (*bypass) (void *, struct block *);        /* bypass queue altogether */
82         void *arg;                                      /* argument to kick */
83
84         struct rendez rr;                       /* process waiting to read */
85         struct rendez wr;                       /* process waiting to write */
86         qio_wake_cb_t wake_cb;          /* callbacks for qio wakeups */
87         void *wake_data;
88
89         char err[ERRMAX];
90 };
91
92 enum {
93         Maxatomic = 64 * 1024,
94         QIO_CAN_ERR_SLEEP = (1 << 0),   /* can throw errors or block/sleep */
95         QIO_LIMIT = (1 << 1),                   /* respect q->limit */
96         QIO_DROP_OVERFLOW = (1 << 2),   /* alternative to setting qdropoverflow */
97         QIO_JUST_ONE_BLOCK = (1 << 3),  /* when qbreading, just get one block */
98         QIO_NON_BLOCK = (1 << 4),               /* throw EAGAIN instead of blocking */
99         QIO_DONT_KICK = (1 << 5),               /* don't kick when waking */
100 };
101
102 unsigned int qiomaxatomic = Maxatomic;
103
104 static size_t copy_to_block_body(struct block *to, void *from, size_t copy_amt);
105 static ssize_t __qbwrite(struct queue *q, struct block *b, int flags);
106 static struct block *__qbread(struct queue *q, size_t len, int qio_flags,
107                               int mem_flags);
108 static bool qwait_and_ilock(struct queue *q, int qio_flags);
109
110 /* Helper: fires a wake callback, sending 'filter' */
111 static void qwake_cb(struct queue *q, int filter)
112 {
113         if (q->wake_cb)
114                 q->wake_cb(q, q->wake_data, filter);
115 }
116
117 void ixsummary(void)
118 {
119         debugging ^= 1;
120         printd("pad %lu, concat %lu, pullup %lu, copy %lu\n",
121                    padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
122         printd("consume %lu, produce %lu, qcopy %lu\n",
123                    consumecnt, producecnt, qcopycnt);
124 }
125
126 /*
127  *  pad a block to the front (or the back if size is negative)
128  */
129 struct block *padblock(struct block *bp, int size)
130 {
131         int n;
132         struct block *nbp;
133
134         QDEBUG checkb(bp, "padblock 1");
135         if (size >= 0) {
136                 if (bp->rp - bp->base >= size) {
137                         bp->network_offset += size;
138                         bp->transport_offset += size;
139                         bp->rp -= size;
140                         return bp;
141                 }
142
143                 PANIC_EXTRA(bp);
144                 if (bp->next)
145                         panic("padblock %p", getcallerpc(&bp));
146                 n = BLEN(bp);
147                 padblockcnt++;
148                 nbp = block_alloc(size + n, MEM_WAIT);
149                 block_copy_metadata(nbp, bp);
150                 nbp->rp += size;
151                 nbp->wp = nbp->rp;
152                 memmove(nbp->wp, bp->rp, n);
153                 nbp->wp += n;
154                 freeb(bp);
155                 nbp->rp -= size;
156         } else {
157                 size = -size;
158
159                 PANIC_EXTRA(bp);
160
161                 if (bp->next)
162                         panic("padblock %p", getcallerpc(&bp));
163
164                 if (bp->lim - bp->wp >= size)
165                         return bp;
166
167                 n = BLEN(bp);
168                 padblockcnt++;
169                 nbp = block_alloc(size + n, MEM_WAIT);
170                 block_copy_metadata(nbp, bp);
171                 memmove(nbp->wp, bp->rp, n);
172                 nbp->wp += n;
173                 freeb(bp);
174         }
175         QDEBUG checkb(nbp, "padblock 1");
176         return nbp;
177 }
178
179 /*
180  *  return count of bytes in a string of blocks
181  */
182 int blocklen(struct block *bp)
183 {
184         int len;
185
186         len = 0;
187         while (bp) {
188                 len += BLEN(bp);
189                 bp = bp->next;
190         }
191         return len;
192 }
193
194 /*
195  * return count of space in blocks
196  */
197 int blockalloclen(struct block *bp)
198 {
199         int len;
200
201         len = 0;
202         while (bp) {
203                 len += BALLOC(bp);
204                 bp = bp->next;
205         }
206         return len;
207 }
208
209 /*
210  *  copy the  string of blocks into
211  *  a single block and free the string
212  */
213 struct block *concatblock(struct block *bp)
214 {
215         int len;
216         struct block *nb, *f;
217
218         if (bp->next == 0)
219                 return bp;
220
221         /* probably use parts of qclone */
222         PANIC_EXTRA(bp);
223         nb = block_alloc(blocklen(bp), MEM_WAIT);
224         for (f = bp; f; f = f->next) {
225                 len = BLEN(f);
226                 memmove(nb->wp, f->rp, len);
227                 nb->wp += len;
228         }
229         concatblockcnt += BLEN(nb);
230         freeblist(bp);
231         QDEBUG checkb(nb, "concatblock 1");
232         return nb;
233 }
234
235 /* Makes an identical copy of the block, collapsing all the data into the block
236  * body.  It does not point to the contents of the original, it is a copy
237  * (unlike qclone).  Since we're copying, we might as well put the memory into
238  * one contiguous chunk. */
239 struct block *copyblock(struct block *bp, int mem_flags)
240 {
241         struct block *newb;
242         struct extra_bdata *ebd;
243         size_t amt;
244
245         QDEBUG checkb(bp, "copyblock 0");
246         newb = block_alloc(BLEN(bp), mem_flags);
247         if (!newb)
248                 return 0;
249         amt = copy_to_block_body(newb, bp->rp, BHLEN(bp));
250         assert(amt == BHLEN(bp));
251         for (int i = 0; i < bp->nr_extra_bufs; i++) {
252                 ebd = &bp->extra_data[i];
253                 if (!ebd->base || !ebd->len)
254                         continue;
255                 amt = copy_to_block_body(newb, (void*)ebd->base + ebd->off, ebd->len);
256                 assert(amt == ebd->len);
257         }
258         block_copy_metadata(newb, bp);
259         copyblockcnt++;
260         QDEBUG checkb(newb, "copyblock 1");
261         return newb;
262 }
263
264 /* Returns a block with the remaining contents of b all in the main body of the
265  * returned block.  Replace old references to b with the returned value (which
266  * may still be 'b', if no change was needed. */
267 struct block *linearizeblock(struct block *b)
268 {
269         struct block *newb;
270
271         if (!b->extra_len)
272                 return b;
273         newb = copyblock(b, MEM_WAIT);
274         freeb(b);
275         return newb;
276 }
277
278 /* Make sure the first block has at least n bytes in its main body.  Pulls up
279  * data from the *list* of blocks.  Returns 0 if there is not enough data in the
280  * block list. */
281 struct block *pullupblock(struct block *bp, int n)
282 {
283         int i, len, seglen;
284         struct block *nbp;
285         struct extra_bdata *ebd;
286
287         /*
288          *  this should almost always be true, it's
289          *  just to avoid every caller checking.
290          */
291         if (BHLEN(bp) >= n)
292                 return bp;
293
294         /* If there's no chance, just bail out now.  This might be slightly wasteful
295          * if there's a long blist that does have enough data. */
296         if (n > blocklen(bp))
297                 return 0;
298         /* a start at explicit main-body / header management */
299         if (bp->extra_len) {
300                 if (n > bp->lim - bp->rp) {
301                         /* would need to realloc a new block and copy everything over. */
302                         panic("can't pullup %d bytes, no place to put it: bp->lim %p, bp->rp %p, bp->lim-bp->rp %d\n",
303                                         n, bp->lim, bp->rp, bp->lim-bp->rp);
304                 }
305                 len = n - BHLEN(bp);
306                 /* Would need to recursively call this, or otherwise pull from later
307                  * blocks and put chunks of their data into the block we're building. */
308                 if (len > bp->extra_len)
309                         panic("pullup more than extra (%d, %d, %d)\n",
310                               n, BHLEN(bp), bp->extra_len);
311                 QDEBUG checkb(bp, "before pullup");
312                 for (int i = 0; (i < bp->nr_extra_bufs) && len; i++) {
313                         ebd = &bp->extra_data[i];
314                         if (!ebd->base || !ebd->len)
315                                 continue;
316                         seglen = MIN(ebd->len, len);
317                         memcpy(bp->wp, (void*)(ebd->base + ebd->off), seglen);
318                         bp->wp += seglen;
319                         len -= seglen;
320                         ebd->len -= seglen;
321                         ebd->off += seglen;
322                         bp->extra_len -= seglen;
323                         if (ebd->len == 0) {
324                                 kfree((void *)ebd->base);
325                                 ebd->off = 0;
326                                 ebd->base = 0;
327                         }
328                 }
329                 /* maybe just call pullupblock recursively here */
330                 if (len)
331                         panic("pullup %d bytes overdrawn\n", len);
332                 QDEBUG checkb(bp, "after pullup");
333                 return bp;
334         }
335
336         /*
337          *  if not enough room in the first block,
338          *  add another to the front of the list.
339          */
340         if (bp->lim - bp->rp < n) {
341                 nbp = block_alloc(n, MEM_WAIT);
342                 nbp->next = bp;
343                 bp = nbp;
344         }
345
346         /*
347          *  copy bytes from the trailing blocks into the first
348          */
349         n -= BLEN(bp);
350         while ((nbp = bp->next)) {
351                 i = BLEN(nbp);
352                 if (i > n) {
353                         memmove(bp->wp, nbp->rp, n);
354                         pullupblockcnt++;
355                         bp->wp += n;
356                         nbp->rp += n;
357                         QDEBUG checkb(bp, "pullupblock 1");
358                         return bp;
359                 } else {
360                         memmove(bp->wp, nbp->rp, i);
361                         pullupblockcnt++;
362                         bp->wp += i;
363                         bp->next = nbp->next;
364                         nbp->next = 0;
365                         freeb(nbp);
366                         n -= i;
367                         if (n == 0) {
368                                 QDEBUG checkb(bp, "pullupblock 2");
369                                 return bp;
370                         }
371                 }
372         }
373         freeb(bp);
374         return 0;
375 }
376
377 /*
378  *  make sure the first block has at least n bytes in its main body
379  */
380 struct block *pullupqueue(struct queue *q, int n)
381 {
382         struct block *b;
383
384         /* TODO: lock to protect the queue links? */
385         if ((BHLEN(q->bfirst) >= n))
386                 return q->bfirst;
387         q->bfirst = pullupblock(q->bfirst, n);
388         for (b = q->bfirst; b != NULL && b->next != NULL; b = b->next) ;
389         q->blast = b;
390         return q->bfirst;
391 }
392
393 /* throw away count bytes from the front of
394  * block's extradata.  Returns count of bytes
395  * thrown away
396  */
397
398 static int pullext(struct block *bp, int count)
399 {
400         struct extra_bdata *ed;
401         int i, rem, bytes = 0;
402
403         for (i = 0; bp->extra_len && count && i < bp->nr_extra_bufs; i++) {
404                 ed = &bp->extra_data[i];
405                 rem = MIN(count, ed->len);
406                 bp->extra_len -= rem;
407                 count -= rem;
408                 bytes += rem;
409                 ed->off += rem;
410                 ed->len -= rem;
411                 if (ed->len == 0) {
412                         kfree((void *)ed->base);
413                         ed->base = 0;
414                         ed->off = 0;
415                 }
416         }
417         return bytes;
418 }
419
420 /* throw away count bytes from the end of a
421  * block's extradata.  Returns count of bytes
422  * thrown away
423  */
424
425 static int dropext(struct block *bp, int count)
426 {
427         struct extra_bdata *ed;
428         int i, rem, bytes = 0;
429
430         for (i = bp->nr_extra_bufs - 1; bp->extra_len && count && i >= 0; i--) {
431                 ed = &bp->extra_data[i];
432                 rem = MIN(count, ed->len);
433                 bp->extra_len -= rem;
434                 count -= rem;
435                 bytes += rem;
436                 ed->len -= rem;
437                 if (ed->len == 0) {
438                         kfree((void *)ed->base);
439                         ed->base = 0;
440                         ed->off = 0;
441                 }
442         }
443         return bytes;
444 }
445
446 /*
447  *  throw away up to count bytes from a
448  *  list of blocks.  Return count of bytes
449  *  thrown away.
450  */
451 static int _pullblock(struct block **bph, int count, int free)
452 {
453         struct block *bp;
454         int n, bytes;
455
456         bytes = 0;
457         if (bph == NULL)
458                 return 0;
459
460         while (*bph != NULL && count != 0) {
461                 bp = *bph;
462
463                 n = MIN(BHLEN(bp), count);
464                 bytes += n;
465                 count -= n;
466                 bp->rp += n;
467                 n = pullext(bp, count);
468                 bytes += n;
469                 count -= n;
470                 QDEBUG checkb(bp, "pullblock ");
471                 if (BLEN(bp) == 0 && (free || count)) {
472                         *bph = bp->next;
473                         bp->next = NULL;
474                         freeb(bp);
475                 }
476         }
477         return bytes;
478 }
479
480 int pullblock(struct block **bph, int count)
481 {
482         return _pullblock(bph, count, 1);
483 }
484
485 /*
486  *  trim to len bytes starting at offset
487  */
488 struct block *trimblock(struct block *bp, int offset, int len)
489 {
490         uint32_t l, trim;
491         int olen = len;
492
493         QDEBUG checkb(bp, "trimblock 1");
494         if (blocklen(bp) < offset + len) {
495                 freeblist(bp);
496                 return NULL;
497         }
498
499         l =_pullblock(&bp, offset, 0);
500         if (bp == NULL)
501                 return NULL;
502         if (l != offset) {
503                 freeblist(bp);
504                 return NULL;
505         }
506
507         while ((l = BLEN(bp)) < len) {
508                 len -= l;
509                 bp = bp->next;
510         }
511
512         trim = BLEN(bp) - len;
513         trim -= dropext(bp, trim);
514         bp->wp -= trim;
515
516         if (bp->next) {
517                 freeblist(bp->next);
518                 bp->next = NULL;
519         }
520         return bp;
521 }
522
523 /* Adjust block @bp so that its size is exactly @len.
524  * If the size is increased, fill in the new contents with zeros.
525  * If the size is decreased, discard some of the old contents at the tail. */
526 struct block *adjustblock(struct block *bp, int len)
527 {
528         struct extra_bdata *ebd;
529         void *buf;
530         int i;
531
532         if (len < 0) {
533                 freeb(bp);
534                 return NULL;
535         }
536
537         if (len == BLEN(bp))
538                 return bp;
539
540         /* Shrink within block main body. */
541         if (len <= BHLEN(bp)) {
542                 free_block_extra(bp);
543                 bp->wp = bp->rp + len;
544                 QDEBUG checkb(bp, "adjustblock 1");
545                 return bp;
546         }
547
548         /* Need to grow. */
549         if (len > BLEN(bp)) {
550                 /* Grow within block main body. */
551                 if (bp->extra_len == 0 && bp->rp + len <= bp->lim) {
552                         memset(bp->wp, 0, len - BLEN(bp));
553                         bp->wp = bp->rp + len;
554                         QDEBUG checkb(bp, "adjustblock 2");
555                         return bp;
556                 }
557                 /* Grow with extra data buffers. */
558                 buf = kzmalloc(len - BLEN(bp), MEM_WAIT);
559                 block_append_extra(bp, (uintptr_t)buf, 0, len - BLEN(bp), MEM_WAIT);
560                 QDEBUG checkb(bp, "adjustblock 3");
561                 return bp;
562         }
563
564         /* Shrink extra data buffers.
565          * len is how much of ebd we need to keep.
566          * extra_len is re-accumulated. */
567         assert(bp->extra_len > 0);
568         len -= BHLEN(bp);
569         bp->extra_len = 0;
570         for (i = 0; i < bp->nr_extra_bufs; i++) {
571                 ebd = &bp->extra_data[i];
572                 if (len <= ebd->len)
573                         break;
574                 len -= ebd->len;
575                 bp->extra_len += ebd->len;
576         }
577         /* If len becomes zero, extra_data[i] should be freed. */
578         if (len > 0) {
579                 ebd = &bp->extra_data[i];
580                 ebd->len = len;
581                 bp->extra_len += ebd->len;
582                 i++;
583         }
584         for (; i < bp->nr_extra_bufs; i++) {
585                 ebd = &bp->extra_data[i];
586                 if (ebd->base)
587                         kfree((void*)ebd->base);
588                 ebd->base = ebd->off = ebd->len = 0;
589         }
590         QDEBUG checkb(bp, "adjustblock 4");
591         return bp;
592 }
593
594 /* Helper: removes and returns the first block from q */
595 static struct block *pop_first_block(struct queue *q)
596 {
597         struct block *b = q->bfirst;
598
599         q->dlen -= BLEN(b);
600         q->bytes_read += BLEN(b);
601         q->bfirst = b->next;
602         b->next = 0;
603         return b;
604 }
605
606 /* Helper: copies up to copy_amt from a buf to a block's main body (b->wp) */
607 static size_t copy_to_block_body(struct block *to, void *from, size_t copy_amt)
608 {
609         copy_amt = MIN(to->lim - to->wp, copy_amt);
610         memcpy(to->wp, from, copy_amt);
611         to->wp += copy_amt;
612         return copy_amt;
613 }
614
615 /* Accounting helper.  Block b in q lost amt extra_data */
616 static void block_and_q_lost_extra(struct block *b, struct queue *q, size_t amt)
617 {
618         b->extra_len -= amt;
619         q->dlen -= amt;
620         q->bytes_read += amt;
621 }
622
623 /* Helper: moves ebd from a block (in from_q) to another block.  The *ebd is
624  * fixed in 'from', so we move its contents and zero it out in 'from'.
625  *
626  * Returns the length moved (0 on failure). */
627 static size_t move_ebd(struct extra_bdata *ebd, struct block *to,
628                        struct block *from, struct queue *from_q)
629 {
630         size_t ret = ebd->len;
631
632         if (block_append_extra(to, ebd->base, ebd->off, ebd->len, MEM_ATOMIC))
633                 return 0;
634         block_and_q_lost_extra(from, from_q, ebd->len);
635         ebd->base = ebd->len = ebd->off = 0;
636         return ret;
637 }
638
639 /* Copy up to len bytes from q->bfirst to @to, leaving the block in place.  May
640  * return with less than len, but greater than 0, even if there is more
641  * available in q.
642  *
643  * At any moment that we have copied anything and things are tricky, we can just
644  * return.  The trickiness comes from a bunch of variables: is the main body
645  * empty?  How do we split the ebd?  If our alloc fails, then we can fall back
646  * to @to's main body, but only if we haven't used it yet. */
647 static size_t copy_from_first_block(struct queue *q, struct block *to,
648                                     size_t len)
649 {
650         struct block *from = q->bfirst;
651         size_t copy_amt, amt;
652         struct extra_bdata *ebd;
653
654         assert(len < BLEN(from));       /* sanity */
655         /* Try to extract from the main body */
656         copy_amt = MIN(BHLEN(from), len);
657         if (copy_amt) {
658                 copy_amt = copy_to_block_body(to, from->rp, copy_amt);
659                 from->rp += copy_amt;
660                 /* We only change dlen, (data len), not q->len, since the q still has
661                  * the same block memory allocation (no kfrees happened) */
662                 q->dlen -= copy_amt;
663                 q->bytes_read += copy_amt;
664         }
665         /* Try to extract the remainder from the extra data */
666         len -= copy_amt;
667         for (int i = 0; (i < from->nr_extra_bufs) && len; i++) {
668                 ebd = &from->extra_data[i];
669                 if (!ebd->base || !ebd->len)
670                         continue;
671                 if (len >= ebd->len) {
672                         amt = move_ebd(ebd, to, from, q);
673                         if (!amt) {
674                                 /* our internal alloc could have failed.   this ebd is now the
675                                  * last one we'll consider.  let's handle it separately and put
676                                  * it in the main body. */
677                                 if (copy_amt)
678                                         return copy_amt;
679                                 copy_amt = copy_to_block_body(to, (void*)ebd->base + ebd->off,
680                                                               ebd->len);
681                                 block_and_q_lost_extra(from, q, copy_amt);
682                                 break;
683                         }
684                         len -= amt;
685                         copy_amt += amt;
686                         continue;
687                 } else {
688                         /* If we're here, we reached our final ebd, which we'll need to
689                          * split to get anything from it. */
690                         if (copy_amt)
691                                 return copy_amt;
692                         copy_amt = copy_to_block_body(to, (void*)ebd->base + ebd->off,
693                                                       len);
694                         ebd->off += copy_amt;
695                         ebd->len -= copy_amt;
696                         block_and_q_lost_extra(from, q, copy_amt);
697                         break;
698                 }
699         }
700         if (len)
701                 assert(copy_amt);       /* sanity */
702         return copy_amt;
703 }
704
705 /* Return codes for __qbread and __try_qbread. */
706 enum {
707         QBR_OK,
708         QBR_FAIL,
709         QBR_SPARE,      /* we need a spare block */
710         QBR_AGAIN,      /* do it again, we are coalescing blocks */
711 };
712
713 /* Helper and back-end for __qbread: extracts and returns a list of blocks
714  * containing up to len bytes.  It may contain less than len even if q has more
715  * data.
716  *
717  * Returns a code interpreted by __qbread, and the returned blist in ret. */
718 static int __try_qbread(struct queue *q, size_t len, int qio_flags,
719                         struct block **real_ret, struct block *spare)
720 {
721         struct block *ret, *ret_last, *first;
722         size_t blen;
723         bool was_unwritable = FALSE;
724
725         if (qio_flags & QIO_CAN_ERR_SLEEP) {
726                 if (!qwait_and_ilock(q, qio_flags)) {
727                         spin_unlock_irqsave(&q->lock);
728                         return QBR_FAIL;
729                 }
730                 /* we qwaited and still hold the lock, so the q is not empty */
731                 first = q->bfirst;
732         } else {
733                 spin_lock_irqsave(&q->lock);
734                 first = q->bfirst;
735                 if (!first) {
736                         spin_unlock_irqsave(&q->lock);
737                         return QBR_FAIL;
738                 }
739         }
740         /* We need to check before adjusting q->len.  We're checking the writer's
741          * sleep condition / tap condition.  When set, we *might* be making an edge
742          * transition (from unwritable to writable), which needs to wake and fire
743          * taps.  But, our read might not drain the queue below q->lim.  We'll check
744          * again later to see if we should really wake them.  */
745         was_unwritable = !qwritable(q);
746         blen = BLEN(first);
747         if ((q->state & Qcoalesce) && (blen == 0)) {
748                 freeb(pop_first_block(q));
749                 spin_unlock_irqsave(&q->lock);
750                 /* Need to retry to make sure we have a first block */
751                 return QBR_AGAIN;
752         }
753         /* Qmsg: just return the first block.  Be careful, since our caller might
754          * not read all of the block and thus drop bytes.  Similar to SOCK_DGRAM. */
755         if (q->state & Qmsg) {
756                 ret = pop_first_block(q);
757                 goto out_ok;
758         }
759         /* Let's get at least something first - makes the code easier.  This way,
760          * we'll only ever split the block once. */
761         if (blen <= len) {
762                 ret = pop_first_block(q);
763                 len -= blen;
764         } else {
765                 /* need to split the block.  we won't actually take the first block out
766                  * of the queue - we're just extracting a little bit. */
767                 if (!spare) {
768                         /* We have nothing and need a spare block.  Retry! */
769                         spin_unlock_irqsave(&q->lock);
770                         return QBR_SPARE;
771                 }
772                 copy_from_first_block(q, spare, len);
773                 ret = spare;
774                 goto out_ok;
775         }
776         /* At this point, we just grabbed the first block.  We can try to grab some
777          * more, up to len (if they want). */
778         if (qio_flags & QIO_JUST_ONE_BLOCK)
779                 goto out_ok;
780         ret_last = ret;
781         while (q->bfirst && (len > 0)) {
782                 blen = BLEN(q->bfirst);
783                 if ((q->state & Qcoalesce) && (blen == 0)) {
784                         /* remove the intermediate 0 blocks */
785                         freeb(pop_first_block(q));
786                         continue;
787                 }
788                 if (blen > len) {
789                         /* We could try to split the block, but that's a huge pain.  For
790                          * instance, we might need to move the main body of b into an
791                          * extra_data of ret_last.  lots of ways for that to fail, and lots
792                          * of cases to consider.  Easier to just bail out.  This is why I
793                          * did the first block above: we don't need to worry about this. */
794                          break;
795                 }
796                 ret_last->next = pop_first_block(q);
797                 ret_last = ret_last->next;
798                 len -= blen;
799         }
800 out_ok:
801         /* Don't wake them up or fire tap if we didn't drain enough. */
802         if (!qwritable(q))
803                 was_unwritable = FALSE;
804         spin_unlock_irqsave(&q->lock);
805         if (was_unwritable) {
806                 if (q->kick && !(qio_flags & QIO_DONT_KICK))
807                         q->kick(q->arg);
808                 rendez_wakeup(&q->wr);
809                 qwake_cb(q, FDTAP_FILT_WRITABLE);
810         }
811         *real_ret = ret;
812         return QBR_OK;
813 }
814
815 /* Helper and front-end for __try_qbread: extracts and returns a list of blocks
816  * containing up to len bytes.  It may contain less than len even if q has more
817  * data.
818  *
819  * Returns 0 if the q is closed, if it would require blocking and !CAN_BLOCK, or
820  * if it required a spare and the memory allocation failed.
821  *
822  * Technically, there's a weird corner case with !Qcoalesce and Qmsg where you
823  * could get a zero length block back. */
824 static struct block *__qbread(struct queue *q, size_t len, int qio_flags,
825                               int mem_flags)
826 {
827         ERRSTACK(1);
828         struct block *ret = 0;
829         struct block *volatile spare = 0;       /* volatile for the waserror */
830
831         /* __try_qbread can throw, based on qio flags. */
832         if ((qio_flags & QIO_CAN_ERR_SLEEP) && waserror()) {
833                 if (spare)
834                         freeb(spare);
835                 nexterror();
836         }
837         while (1) {
838                 switch (__try_qbread(q, len, qio_flags, &ret, spare)) {
839                 case QBR_OK:
840                 case QBR_FAIL:
841                         if (spare && (ret != spare))
842                                 freeb(spare);
843                         goto out_ret;
844                 case QBR_SPARE:
845                         assert(!spare);
846                         /* Due to some nastiness, we need a fresh block so we can read out
847                          * anything from the queue.  'len' seems like a reasonable amount.
848                          * Maybe we can get away with less. */
849                         spare = block_alloc(len, mem_flags);
850                         if (!spare) {
851                                 /* Careful here: a memory failure (possible with MEM_ATOMIC)
852                                  * could look like 'no data in the queue' (QBR_FAIL).  The only
853                                  * one who does is this qget(), who happens to know that we
854                                  * won't need a spare, due to the len argument.  Spares are only
855                                  * needed when we need to split a block. */
856                                 ret = 0;
857                                 goto out_ret;
858                         }
859                         break;
860                 case QBR_AGAIN:
861                         /* if the first block is 0 and we are Qcoalesce, then we'll need to
862                          * try again.  We bounce out of __try so we can perform the "is
863                          * there a block" logic again from the top. */
864                         break;
865                 }
866         }
867         assert(0);
868 out_ret:
869         if (qio_flags & QIO_CAN_ERR_SLEEP)
870                 poperror();
871         return ret;
872 }
873
874 /*
875  *  get next block from a queue, return null if nothing there
876  */
877 struct block *qget(struct queue *q)
878 {
879         /* since len == SIZE_MAX, we should never need to do a mem alloc */
880         return __qbread(q, SIZE_MAX, QIO_JUST_ONE_BLOCK, MEM_ATOMIC);
881 }
882
883 /* Throw away the next 'len' bytes in the queue returning the number actually
884  * discarded.
885  *
886  * If the bytes are in the queue, then they must be discarded.  The only time to
887  * return less than len is if the q itself has less than len bytes.
888  *
889  * This won't trigger a kick when waking up any sleepers.  This seems to be Plan
890  * 9's intent, since the TCP stack will deadlock if qdiscard kicks. */
891 size_t qdiscard(struct queue *q, size_t len)
892 {
893         struct block *blist;
894         size_t removed_amt;
895         size_t sofar = 0;
896
897         /* This is racy.  There could be multiple qdiscarders or other consumers,
898          * where the consumption could be interleaved. */
899         while (qlen(q) && len) {
900                 blist = __qbread(q, len, QIO_DONT_KICK, MEM_WAIT);
901                 removed_amt = freeblist(blist);
902                 sofar += removed_amt;
903                 len -= removed_amt;
904         }
905         return sofar;
906 }
907
908 ssize_t qpass(struct queue *q, struct block *b)
909 {
910         return __qbwrite(q, b, QIO_LIMIT | QIO_DROP_OVERFLOW);
911 }
912
913 ssize_t qpassnolim(struct queue *q, struct block *b)
914 {
915         return __qbwrite(q, b, 0);
916 }
917
918 /*
919  *  if the allocated space is way out of line with the used
920  *  space, reallocate to a smaller block
921  */
922 struct block *packblock(struct block *bp)
923 {
924         struct block **l, *nbp;
925         int n;
926
927         if (bp->extra_len)
928                 return bp;
929         for (l = &bp; *l; l = &(*l)->next) {
930                 nbp = *l;
931                 n = BLEN(nbp);
932                 if ((n << 2) < BALLOC(nbp)) {
933                         *l = block_alloc(n, MEM_WAIT);
934                         memmove((*l)->wp, nbp->rp, n);
935                         (*l)->wp += n;
936                         (*l)->next = nbp->next;
937                         freeb(nbp);
938                 }
939         }
940
941         return bp;
942 }
943
944 /* Add an extra_data entry to newb at newb_idx pointing to b's body, starting at
945  * body_rp, for up to len.  Returns the len consumed.
946  *
947  * The base is 'b', so that we can kfree it later.  This currently ties us to
948  * using kfree for the release method for all extra_data.
949  *
950  * It is possible to have a body size that is 0, if there is no offset, and
951  * b->wp == b->rp.  This will have an extra data entry of 0 length. */
952 static size_t point_to_body(struct block *b, uint8_t *body_rp,
953                             struct block *newb, unsigned int newb_idx,
954                             size_t len)
955 {
956         struct extra_bdata *ebd = &newb->extra_data[newb_idx];
957
958         assert(newb_idx < newb->nr_extra_bufs);
959
960         kmalloc_incref(b);
961         ebd->base = (uintptr_t)b;
962         ebd->off = (uint32_t)(body_rp - (uint8_t*)b);
963         ebd->len = MIN(b->wp - body_rp, len);   /* think of body_rp as b->rp */
964         assert((int)ebd->len >= 0);
965         newb->extra_len += ebd->len;
966         return ebd->len;
967 }
968
969 /* Add an extra_data entry to newb at newb_idx pointing to b's b_idx'th
970  * extra_data buf, at b_off within that buffer, for up to len.  Returns the len
971  * consumed.
972  *
973  * We can have blocks with 0 length, but they are still refcnt'd.  See above. */
974 static size_t point_to_buf(struct block *b, unsigned int b_idx, uint32_t b_off,
975                            struct block *newb, unsigned int newb_idx,
976                            size_t len)
977 {
978         struct extra_bdata *n_ebd = &newb->extra_data[newb_idx];
979         struct extra_bdata *b_ebd = &b->extra_data[b_idx];
980
981         assert(b_idx < b->nr_extra_bufs);
982         assert(newb_idx < newb->nr_extra_bufs);
983
984         kmalloc_incref((void*)b_ebd->base);
985         n_ebd->base = b_ebd->base;
986         n_ebd->off = b_ebd->off + b_off;
987         n_ebd->len = MIN(b_ebd->len - b_off, len);
988         newb->extra_len += n_ebd->len;
989         return n_ebd->len;
990 }
991
992 /* given a string of blocks, sets up the new block's extra_data such that it
993  * *points* to the contents of the blist [offset, len + offset).  This does not
994  * make a separate copy of the contents of the blist.
995  *
996  * returns 0 on success.  the only failure is if the extra_data array was too
997  * small, so this returns a positive integer saying how big the extra_data needs
998  * to be.
999  *
1000  * callers are responsible for protecting the list structure. */
1001 static int __blist_clone_to(struct block *blist, struct block *newb, int len,
1002                             uint32_t offset)
1003 {
1004         struct block *b, *first;
1005         unsigned int nr_bufs = 0;
1006         unsigned int b_idx, newb_idx = 0;
1007         uint8_t *first_main_body = 0;
1008         ssize_t sofar = 0;
1009
1010         /* find the first block; keep offset relative to the latest b in the list */
1011         for (b = blist; b; b = b->next) {
1012                 if (BLEN(b) > offset)
1013                         break;
1014                 offset -= BLEN(b);
1015         }
1016         /* qcopy semantics: if you asked for an offset outside the block list, you
1017          * get an empty block back */
1018         if (!b)
1019                 return 0;
1020         first = b;
1021         sofar -= offset;        /* don't count the remaining offset in the first b */
1022         /* upper bound for how many buffers we'll need in newb */
1023         for (/* b is set*/; b; b = b->next) {
1024                 nr_bufs += BHLEN(b) ? 1 : 0;
1025                 nr_bufs += b->nr_extra_bufs; /* still assuming nr_extra == nr_valid */
1026                 sofar += BLEN(b);
1027                 if (sofar > len)
1028                         break;
1029         }
1030         /* we might be holding a spinlock here, so we won't wait for kmalloc */
1031         if (block_add_extd(newb, nr_bufs, 0) != 0) {
1032                 /* caller will need to alloc these, then re-call us */
1033                 return nr_bufs;
1034         }
1035         for (b = first; b && len; b = b->next) {
1036                 b_idx = 0;
1037                 if (offset) {
1038                         if (offset < BHLEN(b)) {
1039                                 /* off is in the main body */
1040                                 len -= point_to_body(b, b->rp + offset, newb, newb_idx, len);
1041                                 newb_idx++;
1042                         } else {
1043                                 /* off is in one of the buffers (or just past the last one).
1044                                  * we're not going to point to b's main body at all. */
1045                                 offset -= BHLEN(b);
1046                                 assert(b->extra_data);
1047                                 /* assuming these extrabufs are packed, or at least that len
1048                                  * isn't gibberish */
1049                                 while (b->extra_data[b_idx].len <= offset) {
1050                                         offset -= b->extra_data[b_idx].len;
1051                                         b_idx++;
1052                                 }
1053                                 /* now offset is set to our offset in the b_idx'th buf */
1054                                 len -= point_to_buf(b, b_idx, offset, newb, newb_idx, len);
1055                                 newb_idx++;
1056                                 b_idx++;
1057                         }
1058                         offset = 0;
1059                 } else {
1060                         if (BHLEN(b)) {
1061                                 len -= point_to_body(b, b->rp, newb, newb_idx, len);
1062                                 newb_idx++;
1063                         }
1064                 }
1065                 /* knock out all remaining bufs.  we only did one point_to_ op by now,
1066                  * and any point_to_ could be our last if it consumed all of len. */
1067                 for (int i = b_idx; (i < b->nr_extra_bufs) && len; i++) {
1068                         len -= point_to_buf(b, i, 0, newb, newb_idx, len);
1069                         newb_idx++;
1070                 }
1071         }
1072         return 0;
1073 }
1074
1075 struct block *blist_clone(struct block *blist, int header_len, int len,
1076                           uint32_t offset)
1077 {
1078         int ret;
1079         struct block *newb = block_alloc(header_len, MEM_WAIT);
1080         do {
1081                 ret = __blist_clone_to(blist, newb, len, offset);
1082                 if (ret)
1083                         block_add_extd(newb, ret, MEM_WAIT);
1084         } while (ret);
1085         return newb;
1086 }
1087
1088 /* given a queue, makes a single block with header_len reserved space in the
1089  * block main body, and the contents of [offset, len + offset) pointed to in the
1090  * new blocks ext_data.  This does not make a copy of the q's contents, though
1091  * you do have a ref count on the memory. */
1092 struct block *qclone(struct queue *q, int header_len, int len, uint32_t offset)
1093 {
1094         int ret;
1095         struct block *newb = block_alloc(header_len, MEM_WAIT);
1096         /* the while loop should rarely be used: it would require someone
1097          * concurrently adding to the queue. */
1098         do {
1099                 /* TODO: RCU: protecting the q list (b->next) (need read lock) */
1100                 spin_lock_irqsave(&q->lock);
1101                 ret = __blist_clone_to(q->bfirst, newb, len, offset);
1102                 spin_unlock_irqsave(&q->lock);
1103                 if (ret)
1104                         block_add_extd(newb, ret, MEM_WAIT);
1105         } while (ret);
1106         return newb;
1107 }
1108
1109 /*
1110  *  copy from offset in the queue
1111  */
1112 struct block *qcopy_old(struct queue *q, int len, uint32_t offset)
1113 {
1114         int sofar;
1115         int n;
1116         struct block *b, *nb;
1117         uint8_t *p;
1118
1119         nb = block_alloc(len, MEM_WAIT);
1120
1121         spin_lock_irqsave(&q->lock);
1122
1123         /* go to offset */
1124         b = q->bfirst;
1125         for (sofar = 0;; sofar += n) {
1126                 if (b == NULL) {
1127                         spin_unlock_irqsave(&q->lock);
1128                         return nb;
1129                 }
1130                 n = BLEN(b);
1131                 if (sofar + n > offset) {
1132                         p = b->rp + offset - sofar;
1133                         n -= offset - sofar;
1134                         break;
1135                 }
1136                 QDEBUG checkb(b, "qcopy");
1137                 b = b->next;
1138         }
1139
1140         /* copy bytes from there */
1141         for (sofar = 0; sofar < len;) {
1142                 if (n > len - sofar)
1143                         n = len - sofar;
1144                 PANIC_EXTRA(b);
1145                 memmove(nb->wp, p, n);
1146                 qcopycnt += n;
1147                 sofar += n;
1148                 nb->wp += n;
1149                 b = b->next;
1150                 if (b == NULL)
1151                         break;
1152                 n = BLEN(b);
1153                 p = b->rp;
1154         }
1155         spin_unlock_irqsave(&q->lock);
1156
1157         return nb;
1158 }
1159
1160 struct block *qcopy(struct queue *q, int len, uint32_t offset)
1161 {
1162 #ifdef CONFIG_BLOCK_EXTRAS
1163         return qclone(q, 0, len, offset);
1164 #else
1165         return qcopy_old(q, len, offset);
1166 #endif
1167 }
1168
1169 static void qinit_common(struct queue *q)
1170 {
1171         spinlock_init_irqsave(&q->lock);
1172         rendez_init(&q->rr);
1173         rendez_init(&q->wr);
1174 }
1175
1176 /*
1177  *  called by non-interrupt code
1178  */
1179 struct queue *qopen(int limit, int msg, void (*kick) (void *), void *arg)
1180 {
1181         struct queue *q;
1182
1183         q = kzmalloc(sizeof(struct queue), 0);
1184         if (q == 0)
1185                 return 0;
1186         qinit_common(q);
1187
1188         q->limit = q->inilim = limit;
1189         q->kick = kick;
1190         q->arg = arg;
1191         q->state = msg;
1192         q->eof = 0;
1193
1194         return q;
1195 }
1196
1197 /* open a queue to be bypassed */
1198 struct queue *qbypass(void (*bypass) (void *, struct block *), void *arg)
1199 {
1200         struct queue *q;
1201
1202         q = kzmalloc(sizeof(struct queue), 0);
1203         if (q == 0)
1204                 return 0;
1205         qinit_common(q);
1206
1207         q->limit = 0;
1208         q->arg = arg;
1209         q->bypass = bypass;
1210         q->state = 0;
1211
1212         return q;
1213 }
1214
1215 static int notempty(void *a)
1216 {
1217         struct queue *q = a;
1218
1219         return (q->state & Qclosed) || q->bfirst != 0;
1220 }
1221
1222 /* Block, waiting for the queue to be non-empty or closed.  Returns with
1223  * the spinlock held.  Returns TRUE when there queue is not empty, FALSE if it
1224  * was naturally closed.  Throws an error o/w. */
1225 static bool qwait_and_ilock(struct queue *q, int qio_flags)
1226 {
1227         while (1) {
1228                 spin_lock_irqsave(&q->lock);
1229                 if (q->bfirst != NULL)
1230                         return TRUE;
1231                 if (q->state & Qclosed) {
1232                         if (++q->eof > 3) {
1233                                 spin_unlock_irqsave(&q->lock);
1234                                 error(EPIPE, "multiple reads on a closed queue");
1235                         }
1236                         if (q->err[0]) {
1237                                 spin_unlock_irqsave(&q->lock);
1238                                 error(EPIPE, q->err);
1239                         }
1240                         return FALSE;
1241                 }
1242                 if (qio_flags & QIO_NON_BLOCK) {
1243                         spin_unlock_irqsave(&q->lock);
1244                         error(EAGAIN, "queue empty");
1245                 }
1246                 spin_unlock_irqsave(&q->lock);
1247                 /* As with the producer side, we check for a condition while holding the
1248                  * q->lock, decide to sleep, then unlock.  It's like the "check, signal,
1249                  * check again" pattern, but we do it conditionally.  Both sides agree
1250                  * synchronously to do it, and those decisions are made while holding
1251                  * q->lock.  I think this is OK.
1252                  *
1253                  * The invariant is that no reader sleeps when the queue has data.
1254                  * While holding the rendez lock, if we see there's no data, we'll
1255                  * sleep.  Since we saw there was no data, the next writer will see (or
1256                  * already saw) no data, and then the writer decides to rendez_wake,
1257                  * which will grab the rendez lock.  If the writer already did that,
1258                  * then we'll see notempty when we do our check-again. */
1259                 rendez_sleep(&q->rr, notempty, q);
1260         }
1261 }
1262
1263 /*
1264  * add a block list to a queue
1265  * XXX basically the same as enqueue blist, and has no locking!
1266  */
1267 void qaddlist(struct queue *q, struct block *b)
1268 {
1269         /* TODO: q lock? */
1270         /* queue the block */
1271         if (q->bfirst)
1272                 q->blast->next = b;
1273         else
1274                 q->bfirst = b;
1275         q->dlen += blocklen(b);
1276         while (b->next)
1277                 b = b->next;
1278         q->blast = b;
1279 }
1280
1281 static size_t read_from_block(struct block *b, uint8_t *to, size_t amt)
1282 {
1283         size_t copy_amt, retval = 0;
1284         struct extra_bdata *ebd;
1285
1286         copy_amt = MIN(BHLEN(b), amt);
1287         memcpy(to, b->rp, copy_amt);
1288         /* advance the rp, since this block not be completely consumed and future
1289          * reads need to know where to pick up from */
1290         b->rp += copy_amt;
1291         to += copy_amt;
1292         amt -= copy_amt;
1293         retval += copy_amt;
1294         for (int i = 0; (i < b->nr_extra_bufs) && amt; i++) {
1295                 ebd = &b->extra_data[i];
1296                 /* skip empty entires.  if we track this in the struct block, we can
1297                  * just start the for loop early */
1298                 if (!ebd->base || !ebd->len)
1299                         continue;
1300                 copy_amt = MIN(ebd->len, amt);
1301                 memcpy(to, (void*)(ebd->base + ebd->off), copy_amt);
1302                 /* we're actually consuming the entries, just like how we advance rp up
1303                  * above, and might only consume part of one. */
1304                 ebd->len -= copy_amt;
1305                 ebd->off += copy_amt;
1306                 b->extra_len -= copy_amt;
1307                 if (!ebd->len) {
1308                         /* we don't actually have to decref here.  it's also done in
1309                          * freeb().  this is the earliest we can free. */
1310                         kfree((void*)ebd->base);
1311                         ebd->base = ebd->off = 0;
1312                 }
1313                 to += copy_amt;
1314                 amt -= copy_amt;
1315                 retval += copy_amt;
1316         }
1317         return retval;
1318 }
1319
1320 /*
1321  *  copy the contents of a string of blocks into
1322  *  memory.  emptied blocks are freed.  return
1323  *  pointer to first unconsumed block.
1324  */
1325 struct block *bl2mem(uint8_t * p, struct block *b, int n)
1326 {
1327         int i;
1328         struct block *next;
1329
1330         /* could be slicker here, since read_from_block is smart */
1331         for (; b != NULL; b = next) {
1332                 i = BLEN(b);
1333                 if (i > n) {
1334                         /* partial block, consume some */
1335                         read_from_block(b, p, n);
1336                         return b;
1337                 }
1338                 /* full block, consume all and move on */
1339                 i = read_from_block(b, p, i);
1340                 n -= i;
1341                 p += i;
1342                 next = b->next;
1343                 freeb(b);
1344         }
1345         return NULL;
1346 }
1347
1348 /* Extract the contents of all blocks and copy to va, up to len.  Returns the
1349  * actual amount copied. */
1350 static size_t read_all_blocks(struct block *b, void *va, size_t len)
1351 {
1352         size_t sofar = 0;
1353         struct block *next;
1354
1355         do {
1356                 /* We should be draining every block completely. */
1357                 assert(BLEN(b) <= len - sofar);
1358                 assert(va);
1359                 assert(va + sofar);
1360                 assert(b->rp);
1361                 sofar += read_from_block(b, va + sofar, len - sofar);
1362                 next = b->next;
1363                 freeb(b);
1364                 b = next;
1365         } while (b);
1366         return sofar;
1367 }
1368
1369 /*
1370  *  copy the contents of memory into a string of blocks.
1371  *  return NULL on error.
1372  */
1373 struct block *mem2bl(uint8_t * p, int len)
1374 {
1375         ERRSTACK(1);
1376         int n;
1377         struct block *b, *first, **l;
1378
1379         first = NULL;
1380         l = &first;
1381         if (waserror()) {
1382                 freeblist(first);
1383                 nexterror();
1384         }
1385         do {
1386                 n = len;
1387                 if (n > Maxatomic)
1388                         n = Maxatomic;
1389
1390                 *l = b = block_alloc(n, MEM_WAIT);
1391                 /* TODO consider extra_data */
1392                 memmove(b->wp, p, n);
1393                 b->wp += n;
1394                 p += n;
1395                 len -= n;
1396                 l = &b->next;
1397         } while (len > 0);
1398         poperror();
1399
1400         return first;
1401 }
1402
1403 /*
1404  *  put a block back to the front of the queue
1405  *  called with q ilocked
1406  */
1407 void qputback(struct queue *q, struct block *b)
1408 {
1409         b->next = q->bfirst;
1410         if (q->bfirst == NULL)
1411                 q->blast = b;
1412         q->bfirst = b;
1413         q->dlen += BLEN(b);
1414         /* qputback seems to undo a read, so we can undo the accounting too. */
1415         q->bytes_read -= BLEN(b);
1416 }
1417
1418 /*
1419  *  get next block from a queue (up to a limit)
1420  *
1421  */
1422 struct block *qbread(struct queue *q, size_t len)
1423 {
1424         return __qbread(q, len, QIO_JUST_ONE_BLOCK | QIO_CAN_ERR_SLEEP, MEM_WAIT);
1425 }
1426
1427 struct block *qbread_nonblock(struct queue *q, size_t len)
1428 {
1429         return __qbread(q, len, QIO_JUST_ONE_BLOCK | QIO_CAN_ERR_SLEEP |
1430                         QIO_NON_BLOCK, MEM_WAIT);
1431 }
1432
1433 /* read up to len from a queue into vp. */
1434 size_t qread(struct queue *q, void *va, size_t len)
1435 {
1436         struct block *blist = __qbread(q, len, QIO_CAN_ERR_SLEEP, MEM_WAIT);
1437
1438         if (!blist)
1439                 return 0;
1440         return read_all_blocks(blist, va, len);
1441 }
1442
1443 size_t qread_nonblock(struct queue *q, void *va, size_t len)
1444 {
1445         struct block *blist = __qbread(q, len, QIO_CAN_ERR_SLEEP | QIO_NON_BLOCK,
1446                                        MEM_WAIT);
1447
1448         if (!blist)
1449                 return 0;
1450         return read_all_blocks(blist, va, len);
1451 }
1452
1453 /* This is the rendez wake condition for writers. */
1454 static int qwriter_should_wake(void *a)
1455 {
1456         struct queue *q = a;
1457
1458         return qwritable(q) || (q->state & Qclosed);
1459 }
1460
1461 /* Helper: enqueues a list of blocks to a queue.  Returns the total length. */
1462 static size_t enqueue_blist(struct queue *q, struct block *b)
1463 {
1464         size_t dlen;
1465
1466         if (q->bfirst)
1467                 q->blast->next = b;
1468         else
1469                 q->bfirst = b;
1470         dlen = BLEN(b);
1471         while (b->next) {
1472                 b = b->next;
1473                 dlen += BLEN(b);
1474         }
1475         q->blast = b;
1476         q->dlen += dlen;
1477         return dlen;
1478 }
1479
1480 /* Adds block (which can be a list of blocks) to the queue, subject to
1481  * qio_flags.  Returns the length written on success or -1 on non-throwable
1482  * error.  Adjust qio_flags to control the value-added features!. */
1483 static ssize_t __qbwrite(struct queue *q, struct block *b, int qio_flags)
1484 {
1485         ssize_t ret;
1486         bool was_unreadable;
1487
1488         if (q->bypass) {
1489                 ret = blocklen(b);
1490                 (*q->bypass) (q->arg, b);
1491                 return ret;
1492         }
1493         spin_lock_irqsave(&q->lock);
1494         was_unreadable = q->dlen == 0;
1495         if (q->state & Qclosed) {
1496                 spin_unlock_irqsave(&q->lock);
1497                 freeblist(b);
1498                 if (!(qio_flags & QIO_CAN_ERR_SLEEP))
1499                         return -1;
1500                 if (q->err[0])
1501                         error(EPIPE, q->err);
1502                 else
1503                         error(EPIPE, "connection closed");
1504         }
1505         if ((qio_flags & QIO_LIMIT) && (q->dlen >= q->limit)) {
1506                 /* drop overflow takes priority over regular non-blocking */
1507                 if ((qio_flags & QIO_DROP_OVERFLOW) || (q->state & Qdropoverflow)) {
1508                         spin_unlock_irqsave(&q->lock);
1509                         freeb(b);
1510                         return -1;
1511                 }
1512                 /* People shouldn't set NON_BLOCK without CAN_ERR, but we can be nice
1513                  * and catch it. */
1514                 if ((qio_flags & QIO_CAN_ERR_SLEEP) && (qio_flags & QIO_NON_BLOCK)) {
1515                         spin_unlock_irqsave(&q->lock);
1516                         freeb(b);
1517                         error(EAGAIN, "queue full");
1518                 }
1519         }
1520         ret = enqueue_blist(q, b);
1521         QDEBUG checkb(b, "__qbwrite");
1522         spin_unlock_irqsave(&q->lock);
1523         /* TODO: not sure if the usage of a kick is mutually exclusive with a
1524          * wakeup, meaning that actual users either want a kick or have qreaders. */
1525         if (q->kick && (was_unreadable || (q->state & Qkick)))
1526                 q->kick(q->arg);
1527         if (was_unreadable) {
1528                 /* Unlike the read side, there's no double-check to make sure the queue
1529                  * transitioned across an edge.  We know we added something, so that's
1530                  * enough.  We wake if the queue was empty.  Both sides are the same, in
1531                  * that the condition for which we do the rendez_wakeup() is the same as
1532                  * the condition done for the rendez_sleep(). */
1533                 rendez_wakeup(&q->rr);
1534                 qwake_cb(q, FDTAP_FILT_READABLE);
1535         }
1536         /*
1537          *  flow control, wait for queue to get below the limit
1538          *  before allowing the process to continue and queue
1539          *  more.  We do this here so that postnote can only
1540          *  interrupt us after the data has been queued.  This
1541          *  means that things like 9p flushes and ssl messages
1542          *  will not be disrupted by software interrupts.
1543          *
1544          *  Note - this is moderately dangerous since a process
1545          *  that keeps getting interrupted and rewriting will
1546          *  queue infinite crud.
1547          */
1548         if ((qio_flags & QIO_CAN_ERR_SLEEP) &&
1549             !(q->state & Qdropoverflow) && !(qio_flags & QIO_NON_BLOCK)) {
1550                 /* This is a racy peek at the q status.  If we accidentally block, our
1551                  * rendez will return.  The rendez's peak (qwriter_should_wake) is also
1552                  * racy w.r.t.  the q's spinlock (that lock protects writes, but not
1553                  * reads).
1554                  *
1555                  * Here's the deal: when holding the rendez lock, if we see the sleep
1556                  * condition, the consumer will wake us.  The condition will only ever
1557                  * be changed by the next qbread() (consumer, changes q->dlen).  That
1558                  * code will do a rendez wake, which will spin on the rendez lock,
1559                  * meaning it won't procede until we either see the new state (and
1560                  * return) or put ourselves on the rendez, and wake up.
1561                  *
1562                  * The pattern is one side writes mem, then signals.  Our side checks
1563                  * the signal, then reads the mem.  The goal is to not miss seeing the
1564                  * signal AND missing the memory write.  In this specific case, the
1565                  * signal is actually synchronous (the rendez lock) and not basic shared
1566                  * memory.
1567                  *
1568                  * Oh, and we spin in case we woke early and someone else filled the
1569                  * queue, mesa-style. */
1570                 while (!qwriter_should_wake(q))
1571                         rendez_sleep(&q->wr, qwriter_should_wake, q);
1572         }
1573         return ret;
1574 }
1575
1576 /*
1577  *  add a block to a queue obeying flow control
1578  */
1579 ssize_t qbwrite(struct queue *q, struct block *b)
1580 {
1581         return __qbwrite(q, b, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
1582 }
1583
1584 ssize_t qbwrite_nonblock(struct queue *q, struct block *b)
1585 {
1586         return __qbwrite(q, b, QIO_CAN_ERR_SLEEP | QIO_LIMIT | QIO_NON_BLOCK);
1587 }
1588
1589 ssize_t qibwrite(struct queue *q, struct block *b)
1590 {
1591         return __qbwrite(q, b, 0);
1592 }
1593
1594 /* Helper, allocs a block and copies [from, from + len) into it.  Returns the
1595  * block on success, 0 on failure. */
1596 static struct block *build_block(void *from, size_t len, int mem_flags)
1597 {
1598         struct block *b;
1599         void *ext_buf;
1600
1601         /* If len is small, we don't need to bother with the extra_data.  But until
1602          * the whole stack can handle extd blocks, we'll use them unconditionally.
1603          * */
1604 #ifdef CONFIG_BLOCK_EXTRAS
1605         /* allocb builds in 128 bytes of header space to all blocks, but this is
1606          * only available via padblock (to the left).  we also need some space
1607          * for pullupblock for some basic headers (like icmp) that get written
1608          * in directly */
1609         b = block_alloc(64, mem_flags);
1610         if (!b)
1611                 return 0;
1612         ext_buf = kmalloc(len, mem_flags);
1613         if (!ext_buf) {
1614                 kfree(b);
1615                 return 0;
1616         }
1617         memcpy(ext_buf, from, len);
1618         if (block_add_extd(b, 1, mem_flags)) {
1619                 kfree(ext_buf);
1620                 kfree(b);
1621                 return 0;
1622         }
1623         b->extra_data[0].base = (uintptr_t)ext_buf;
1624         b->extra_data[0].off = 0;
1625         b->extra_data[0].len = len;
1626         b->extra_len += len;
1627 #else
1628         b = block_alloc(len, mem_flags);
1629         if (!b)
1630                 return 0;
1631         memmove(b->wp, from, len);
1632         b->wp += len;
1633 #endif
1634         return b;
1635 }
1636
1637 static ssize_t __qwrite(struct queue *q, void *vp, size_t len, int mem_flags,
1638                         int qio_flags)
1639 {
1640         ERRSTACK(1);
1641         size_t n;
1642         volatile size_t sofar = 0;      /* volatile for the waserror */
1643         struct block *b;
1644         uint8_t *p = vp;
1645         void *ext_buf;
1646
1647         /* Only some callers can throw.  Others might be in a context where waserror
1648          * isn't safe. */
1649         if ((qio_flags & QIO_CAN_ERR_SLEEP) && waserror()) {
1650                 /* Any error (EAGAIN for nonblock, syscall aborted, even EPIPE) after
1651                  * some data has been sent should be treated as a partial write. */
1652                 if (sofar)
1653                         goto out_ok;
1654                 nexterror();
1655         }
1656         do {
1657                 n = len - sofar;
1658                 /* This is 64K, the max amount per single block.  Still a good value? */
1659                 if (n > Maxatomic)
1660                         n = Maxatomic;
1661                 b = build_block(p + sofar, n, mem_flags);
1662                 if (!b)
1663                         break;
1664                 if (__qbwrite(q, b, qio_flags) < 0)
1665                         break;
1666                 sofar += n;
1667         } while ((sofar < len) && (q->state & Qmsg) == 0);
1668 out_ok:
1669         if (qio_flags & QIO_CAN_ERR_SLEEP)
1670                 poperror();
1671         return sofar;
1672 }
1673
1674 ssize_t qwrite(struct queue *q, void *vp, int len)
1675 {
1676         return __qwrite(q, vp, len, MEM_WAIT, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
1677 }
1678
1679 ssize_t qwrite_nonblock(struct queue *q, void *vp, int len)
1680 {
1681         return __qwrite(q, vp, len, MEM_WAIT, QIO_CAN_ERR_SLEEP | QIO_LIMIT |
1682                                               QIO_NON_BLOCK);
1683 }
1684
1685 ssize_t qiwrite(struct queue *q, void *vp, int len)
1686 {
1687         return __qwrite(q, vp, len, MEM_ATOMIC, 0);
1688 }
1689
1690 /*
1691  *  be extremely careful when calling this,
1692  *  as there is no reference accounting
1693  */
1694 void qfree(struct queue *q)
1695 {
1696         qclose(q);
1697         kfree(q);
1698 }
1699
1700 /*
1701  *  Mark a queue as closed.  No further IO is permitted.
1702  *  All blocks are released.
1703  */
1704 void qclose(struct queue *q)
1705 {
1706         struct block *bfirst;
1707
1708         if (q == NULL)
1709                 return;
1710
1711         /* mark it */
1712         spin_lock_irqsave(&q->lock);
1713         q->state |= Qclosed;
1714         q->state &= ~Qdropoverflow;
1715         q->err[0] = 0;
1716         bfirst = q->bfirst;
1717         q->bfirst = 0;
1718         q->dlen = 0;
1719         spin_unlock_irqsave(&q->lock);
1720
1721         /* free queued blocks */
1722         freeblist(bfirst);
1723
1724         /* wake up readers/writers */
1725         rendez_wakeup(&q->rr);
1726         rendez_wakeup(&q->wr);
1727         qwake_cb(q, FDTAP_FILT_HANGUP);
1728 }
1729
1730 /* Mark a queue as closed.  Wakeup any readers.  Don't remove queued blocks.
1731  *
1732  * msg will be the errstr received by any waiters (qread, qbread, etc).  If
1733  * there is no message, which is what also happens during a natural qclose(),
1734  * those waiters will simply return 0.  qwriters will always error() on a
1735  * closed/hungup queue. */
1736 void qhangup(struct queue *q, char *msg)
1737 {
1738         /* mark it */
1739         spin_lock_irqsave(&q->lock);
1740         q->state |= Qclosed;
1741         if (msg == 0 || *msg == 0)
1742                 q->err[0] = 0;
1743         else
1744                 strlcpy(q->err, msg, ERRMAX);
1745         spin_unlock_irqsave(&q->lock);
1746
1747         /* wake up readers/writers */
1748         rendez_wakeup(&q->rr);
1749         rendez_wakeup(&q->wr);
1750         qwake_cb(q, FDTAP_FILT_HANGUP);
1751 }
1752
1753 /*
1754  *  return non-zero if the q is hungup
1755  */
1756 int qisclosed(struct queue *q)
1757 {
1758         return q->state & Qclosed;
1759 }
1760
1761 /*
1762  *  mark a queue as no longer hung up.  resets the wake_cb.
1763  */
1764 void qreopen(struct queue *q)
1765 {
1766         spin_lock_irqsave(&q->lock);
1767         q->state &= ~Qclosed;
1768         q->eof = 0;
1769         q->limit = q->inilim;
1770         q->wake_cb = 0;
1771         q->wake_data = 0;
1772         spin_unlock_irqsave(&q->lock);
1773 }
1774
1775 /*
1776  *  return bytes queued
1777  */
1778 int qlen(struct queue *q)
1779 {
1780         return q->dlen;
1781 }
1782
1783 size_t q_bytes_read(struct queue *q)
1784 {
1785         return q->bytes_read;
1786 }
1787
1788 /*
1789  * return space remaining before flow control
1790  *
1791  *  This used to be
1792  *  q->len < q->limit/2
1793  *  but it slows down tcp too much for certain write sizes.
1794  *  I really don't understand it completely.  It may be
1795  *  due to the queue draining so fast that the transmission
1796  *  stalls waiting for the app to produce more data.  - presotto
1797  *
1798  *  q->len was the amount of bytes, which is no longer used.  we now use
1799  *  q->dlen, the amount of usable data.  a.k.a. qlen()...  - brho
1800  */
1801 int qwindow(struct queue *q)
1802 {
1803         int l;
1804
1805         l = q->limit - q->dlen;
1806         if (l < 0)
1807                 l = 0;
1808         return l;
1809 }
1810
1811 /*
1812  *  return true if we can read without blocking
1813  */
1814 int qcanread(struct queue *q)
1815 {
1816         return q->bfirst != 0;
1817 }
1818
1819 /*
1820  *  change queue limit
1821  */
1822 void qsetlimit(struct queue *q, size_t limit)
1823 {
1824         bool was_writable = qwritable(q);
1825
1826         q->limit = limit;
1827         if (!was_writable && qwritable(q)) {
1828                 rendez_wakeup(&q->wr);
1829                 qwake_cb(q, FDTAP_FILT_WRITABLE);
1830         }
1831 }
1832
1833 size_t qgetlimit(struct queue *q)
1834 {
1835         return q->limit;
1836 }
1837
1838 /*
1839  *  set whether writes drop overflowing blocks, or if we sleep
1840  */
1841 void qdropoverflow(struct queue *q, bool onoff)
1842 {
1843         spin_lock_irqsave(&q->lock);
1844         if (onoff)
1845                 q->state |= Qdropoverflow;
1846         else
1847                 q->state &= ~Qdropoverflow;
1848         spin_unlock_irqsave(&q->lock);
1849 }
1850
1851 /* Be careful: this can affect concurrent reads/writes and code that might have
1852  * built-in expectations of the q's type. */
1853 void q_toggle_qmsg(struct queue *q, bool onoff)
1854 {
1855         spin_lock_irqsave(&q->lock);
1856         if (onoff)
1857                 q->state |= Qmsg;
1858         else
1859                 q->state &= ~Qmsg;
1860         spin_unlock_irqsave(&q->lock);
1861 }
1862
1863 /* Be careful: this can affect concurrent reads/writes and code that might have
1864  * built-in expectations of the q's type. */
1865 void q_toggle_qcoalesce(struct queue *q, bool onoff)
1866 {
1867         spin_lock_irqsave(&q->lock);
1868         if (onoff)
1869                 q->state |= Qcoalesce;
1870         else
1871                 q->state &= ~Qcoalesce;
1872         spin_unlock_irqsave(&q->lock);
1873 }
1874
1875 /*
1876  *  flush the output queue
1877  */
1878 void qflush(struct queue *q)
1879 {
1880         struct block *bfirst;
1881
1882         /* mark it */
1883         spin_lock_irqsave(&q->lock);
1884         bfirst = q->bfirst;
1885         q->bfirst = 0;
1886         q->dlen = 0;
1887         spin_unlock_irqsave(&q->lock);
1888
1889         /* free queued blocks */
1890         freeblist(bfirst);
1891
1892         /* wake up writers */
1893         rendez_wakeup(&q->wr);
1894         qwake_cb(q, FDTAP_FILT_WRITABLE);
1895 }
1896
1897 int qfull(struct queue *q)
1898 {
1899         return !qwritable(q);
1900 }
1901
1902 int qstate(struct queue *q)
1903 {
1904         return q->state;
1905 }
1906
1907 void qdump(struct queue *q)
1908 {
1909         if (q)
1910                 printk("q=%p bfirst=%p blast=%p dlen=%d limit=%d state=#%x\n",
1911                            q, q->bfirst, q->blast, q->dlen, q->limit, q->state);
1912 }
1913
1914 /* On certain wakeup events, qio will call func(q, data, filter), where filter
1915  * marks the type of wakeup event (flags from FDTAP).
1916  *
1917  * There's no sync protection.  If you change the CB while the qio is running,
1918  * you might get a CB with the data or func from a previous set_wake_cb.  You
1919  * should set this once per queue and forget it.
1920  *
1921  * You can remove the CB by passing in 0 for the func.  Alternatively, you can
1922  * just make sure that the func(data) pair are valid until the queue is freed or
1923  * reopened. */
1924 void qio_set_wake_cb(struct queue *q, qio_wake_cb_t func, void *data)
1925 {
1926         q->wake_data = data;
1927         wmb();  /* if we see func, we'll also see the data for it */
1928         q->wake_cb = func;
1929 }
1930
1931 /* Helper for detecting whether we'll block on a read at this instant. */
1932 bool qreadable(struct queue *q)
1933 {
1934         return qlen(q) > 0;
1935 }
1936
1937 /* Helper for detecting whether we'll block on a write at this instant. */
1938 bool qwritable(struct queue *q)
1939 {
1940         return !q->limit || qwindow(q) > 0;
1941 }