1 /* Copyright © 1994-1999 Lucent Technologies Inc. All rights reserved.
2 * Portions Copyright © 1997-1999 Vita Nuova Limited
3 * Portions Copyright © 2000-2007 Vita Nuova Holdings Limited
5 * Revisions Copyright © 2000-2007 Lucent Technologies Inc. and others
7 * Modified for the Akaros operating system:
8 * Copyright (c) 2013-2014 The Regents of the University of California
9 * Copyright (c) 2013-2015 Google Inc.
11 * Permission is hereby granted, free of charge, to any person obtaining a copy
12 * of this software and associated documentation files (the "Software"), to deal
13 * in the Software without restriction, including without limitation the rights
14 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
15 * copies of the Software, and to permit persons to whom the Software is
16 * furnished to do so, subject to the following conditions:
18 * The above copyright notice and this permission notice shall be included in
19 * all copies or substantial portions of the Software.
21 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
22 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
23 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
24 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
25 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
26 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
43 #define PANIC_EXTRA(b) \
45 if ((b)->extra_len) { \
48 panic("%s doesn't handle extra_data", __FUNCTION__); \
52 static uint32_t padblockcnt;
53 static uint32_t concatblockcnt;
54 static uint32_t pullupblockcnt;
55 static uint32_t copyblockcnt;
56 static uint32_t consumecnt;
57 static uint32_t producecnt;
58 static uint32_t qcopycnt;
71 struct block *bfirst; /* buffer */
74 int dlen; /* data bytes in queue */
75 int limit; /* max bytes in queue */
76 int inilim; /* initial limit */
78 int eof; /* number of eofs read by user */
81 void (*kick) (void *); /* restart output */
82 void (*bypass) (void *, struct block *); /* bypass queue altogether */
83 void *arg; /* argument to kick */
85 struct rendez rr; /* process waiting to read */
86 struct rendez wr; /* process waiting to write */
87 qio_wake_cb_t wake_cb; /* callbacks for qio wakeups */
94 Maxatomic = 64 * 1024,
95 QIO_CAN_ERR_SLEEP = (1 << 0), /* can throw errors or block/sleep */
96 QIO_LIMIT = (1 << 1), /* respect q->limit */
97 QIO_DROP_OVERFLOW = (1 << 2), /* alternative to setting qdropoverflow */
98 QIO_JUST_ONE_BLOCK = (1 << 3), /* when qbreading, just get one block */
99 QIO_NON_BLOCK = (1 << 4), /* throw EAGAIN instead of blocking */
100 QIO_DONT_KICK = (1 << 5), /* don't kick when waking */
103 unsigned int qiomaxatomic = Maxatomic;
105 static size_t copy_to_block_body(struct block *to, void *from, size_t copy_amt);
106 static ssize_t __qbwrite(struct queue *q, struct block *b, int flags);
107 static struct block *__qbread(struct queue *q, size_t len, int qio_flags,
109 static bool qwait_and_ilock(struct queue *q, int qio_flags);
111 /* Helper: fires a wake callback, sending 'filter' */
112 static void qwake_cb(struct queue *q, int filter)
115 q->wake_cb(q, q->wake_data, filter);
121 printd("pad %lu, concat %lu, pullup %lu, copy %lu\n",
122 padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
123 printd("consume %lu, produce %lu, qcopy %lu\n",
124 consumecnt, producecnt, qcopycnt);
128 * pad a block to the front (or the back if size is negative)
130 struct block *padblock(struct block *bp, int size)
134 uint8_t bcksum = bp->flag & BCKSUM_FLAGS;
135 uint16_t checksum_start = bp->checksum_start;
136 uint16_t checksum_offset = bp->checksum_offset;
137 uint16_t mss = bp->mss;
138 uint16_t transport_offset = bp->transport_offset;
140 QDEBUG checkb(bp, "padblock 1");
142 if (bp->rp - bp->base >= size) {
143 bp->checksum_start += size;
144 bp->transport_offset += size;
151 panic("padblock %p", getcallerpc(&bp));
154 nbp = block_alloc(size + n, MEM_WAIT);
157 memmove(nbp->wp, bp->rp, n);
167 panic("padblock %p", getcallerpc(&bp));
169 if (bp->lim - bp->wp >= size)
174 nbp = block_alloc(size + n, MEM_WAIT);
175 memmove(nbp->wp, bp->rp, n);
181 nbp->checksum_start = checksum_start;
182 nbp->checksum_offset = checksum_offset;
184 nbp->transport_offset = transport_offset;
186 QDEBUG checkb(nbp, "padblock 1");
191 * return count of bytes in a string of blocks
193 int blocklen(struct block *bp)
206 * return count of space in blocks
208 int blockalloclen(struct block *bp)
221 * copy the string of blocks into
222 * a single block and free the string
224 struct block *concatblock(struct block *bp)
227 struct block *nb, *f;
232 /* probably use parts of qclone */
234 nb = block_alloc(blocklen(bp), MEM_WAIT);
235 for (f = bp; f; f = f->next) {
237 memmove(nb->wp, f->rp, len);
240 concatblockcnt += BLEN(nb);
242 QDEBUG checkb(nb, "concatblock 1");
246 /* Makes an identical copy of the block, collapsing all the data into the block
247 * body. It does not point to the contents of the original, it is a copy
248 * (unlike qclone). Since we're copying, we might as well put the memory into
249 * one contiguous chunk. */
250 struct block *copyblock(struct block *bp, int mem_flags)
253 struct extra_bdata *ebd;
256 QDEBUG checkb(bp, "copyblock 0");
257 newb = block_alloc(BLEN(bp), mem_flags);
260 amt = copy_to_block_body(newb, bp->rp, BHLEN(bp));
261 assert(amt == BHLEN(bp));
262 for (int i = 0; i < bp->nr_extra_bufs; i++) {
263 ebd = &bp->extra_data[i];
264 if (!ebd->base || !ebd->len)
266 amt = copy_to_block_body(newb, (void*)ebd->base + ebd->off, ebd->len);
267 assert(amt == ebd->len);
269 /* TODO: any other flags that need copied over? */
270 if (bp->flag & BCKSUM_FLAGS) {
271 newb->flag |= (bp->flag & BCKSUM_FLAGS);
272 newb->checksum_start = bp->checksum_start;
273 newb->checksum_offset = bp->checksum_offset;
275 newb->transport_offset = bp->transport_offset;
278 QDEBUG checkb(newb, "copyblock 1");
282 /* Returns a block with the remaining contents of b all in the main body of the
283 * returned block. Replace old references to b with the returned value (which
284 * may still be 'b', if no change was needed. */
285 struct block *linearizeblock(struct block *b)
291 newb = copyblock(b, MEM_WAIT);
296 /* Make sure the first block has at least n bytes in its main body. Pulls up
297 * data from the *list* of blocks. Returns 0 if there is not enough data in the
299 struct block *pullupblock(struct block *bp, int n)
303 struct extra_bdata *ebd;
306 * this should almost always be true, it's
307 * just to avoid every caller checking.
312 /* If there's no chance, just bail out now. This might be slightly wasteful
313 * if there's a long blist that does have enough data. */
314 if (n > blocklen(bp))
316 /* a start at explicit main-body / header management */
318 if (n > bp->lim - bp->rp) {
319 /* would need to realloc a new block and copy everything over. */
320 panic("can't pullup %d bytes, no place to put it: bp->lim %p, bp->rp %p, bp->lim-bp->rp %d\n",
321 n, bp->lim, bp->rp, bp->lim-bp->rp);
324 /* Would need to recursively call this, or otherwise pull from later
325 * blocks and put chunks of their data into the block we're building. */
326 if (len > bp->extra_len)
327 panic("pullup more than extra (%d, %d, %d)\n",
328 n, BHLEN(bp), bp->extra_len);
329 QDEBUG checkb(bp, "before pullup");
330 for (int i = 0; (i < bp->nr_extra_bufs) && len; i++) {
331 ebd = &bp->extra_data[i];
332 if (!ebd->base || !ebd->len)
334 seglen = MIN(ebd->len, len);
335 memcpy(bp->wp, (void*)(ebd->base + ebd->off), seglen);
340 bp->extra_len -= seglen;
342 kfree((void *)ebd->base);
347 /* maybe just call pullupblock recursively here */
349 panic("pullup %d bytes overdrawn\n", len);
350 QDEBUG checkb(bp, "after pullup");
355 * if not enough room in the first block,
356 * add another to the front of the list.
358 if (bp->lim - bp->rp < n) {
359 nbp = block_alloc(n, MEM_WAIT);
365 * copy bytes from the trailing blocks into the first
368 while ((nbp = bp->next)) {
371 memmove(bp->wp, nbp->rp, n);
375 QDEBUG checkb(bp, "pullupblock 1");
378 memmove(bp->wp, nbp->rp, i);
381 bp->next = nbp->next;
386 QDEBUG checkb(bp, "pullupblock 2");
396 * make sure the first block has at least n bytes in its main body
398 struct block *pullupqueue(struct queue *q, int n)
402 /* TODO: lock to protect the queue links? */
403 if ((BHLEN(q->bfirst) >= n))
405 q->bfirst = pullupblock(q->bfirst, n);
406 for (b = q->bfirst; b != NULL && b->next != NULL; b = b->next) ;
411 /* throw away count bytes from the front of
412 * block's extradata. Returns count of bytes
416 static int pullext(struct block *bp, int count)
418 struct extra_bdata *ed;
419 int i, rem, bytes = 0;
421 for (i = 0; bp->extra_len && count && i < bp->nr_extra_bufs; i++) {
422 ed = &bp->extra_data[i];
423 rem = MIN(count, ed->len);
424 bp->extra_len -= rem;
430 kfree((void *)ed->base);
438 /* throw away count bytes from the end of a
439 * block's extradata. Returns count of bytes
443 static int dropext(struct block *bp, int count)
445 struct extra_bdata *ed;
446 int i, rem, bytes = 0;
448 for (i = bp->nr_extra_bufs - 1; bp->extra_len && count && i >= 0; i--) {
449 ed = &bp->extra_data[i];
450 rem = MIN(count, ed->len);
451 bp->extra_len -= rem;
456 kfree((void *)ed->base);
465 * throw away up to count bytes from a
466 * list of blocks. Return count of bytes
469 static int _pullblock(struct block **bph, int count, int free)
478 while (*bph != NULL && count != 0) {
481 n = MIN(BHLEN(bp), count);
485 n = pullext(bp, count);
488 QDEBUG checkb(bp, "pullblock ");
489 if (BLEN(bp) == 0 && (free || count)) {
498 int pullblock(struct block **bph, int count)
500 return _pullblock(bph, count, 1);
504 * trim to len bytes starting at offset
506 struct block *trimblock(struct block *bp, int offset, int len)
511 QDEBUG checkb(bp, "trimblock 1");
512 if (blocklen(bp) < offset + len) {
517 l =_pullblock(&bp, offset, 0);
525 while ((l = BLEN(bp)) < len) {
530 trim = BLEN(bp) - len;
531 trim -= dropext(bp, trim);
541 /* Adjust block @bp so that its size is exactly @len.
542 * If the size is increased, fill in the new contents with zeros.
543 * If the size is decreased, discard some of the old contents at the tail. */
544 struct block *adjustblock(struct block *bp, int len)
546 struct extra_bdata *ebd;
558 /* Shrink within block main body. */
559 if (len <= BHLEN(bp)) {
560 free_block_extra(bp);
561 bp->wp = bp->rp + len;
562 QDEBUG checkb(bp, "adjustblock 1");
567 if (len > BLEN(bp)) {
568 /* Grow within block main body. */
569 if (bp->extra_len == 0 && bp->rp + len <= bp->lim) {
570 memset(bp->wp, 0, len - BLEN(bp));
571 bp->wp = bp->rp + len;
572 QDEBUG checkb(bp, "adjustblock 2");
575 /* Grow with extra data buffers. */
576 buf = kzmalloc(len - BLEN(bp), MEM_WAIT);
577 block_append_extra(bp, (uintptr_t)buf, 0, len - BLEN(bp), MEM_WAIT);
578 QDEBUG checkb(bp, "adjustblock 3");
582 /* Shrink extra data buffers.
583 * len is how much of ebd we need to keep.
584 * extra_len is re-accumulated. */
585 assert(bp->extra_len > 0);
588 for (i = 0; i < bp->nr_extra_bufs; i++) {
589 ebd = &bp->extra_data[i];
593 bp->extra_len += ebd->len;
595 /* If len becomes zero, extra_data[i] should be freed. */
597 ebd = &bp->extra_data[i];
599 bp->extra_len += ebd->len;
602 for (; i < bp->nr_extra_bufs; i++) {
603 ebd = &bp->extra_data[i];
605 kfree((void*)ebd->base);
606 ebd->base = ebd->off = ebd->len = 0;
608 QDEBUG checkb(bp, "adjustblock 4");
612 /* Helper: removes and returns the first block from q */
613 static struct block *pop_first_block(struct queue *q)
615 struct block *b = q->bfirst;
618 q->bytes_read += BLEN(b);
624 /* Helper: copies up to copy_amt from a buf to a block's main body (b->wp) */
625 static size_t copy_to_block_body(struct block *to, void *from, size_t copy_amt)
627 copy_amt = MIN(to->lim - to->wp, copy_amt);
628 memcpy(to->wp, from, copy_amt);
633 /* Accounting helper. Block b in q lost amt extra_data */
634 static void block_and_q_lost_extra(struct block *b, struct queue *q, size_t amt)
638 q->bytes_read += amt;
641 /* Helper: moves ebd from a block (in from_q) to another block. The *ebd is
642 * fixed in 'from', so we move its contents and zero it out in 'from'.
644 * Returns the length moved (0 on failure). */
645 static size_t move_ebd(struct extra_bdata *ebd, struct block *to,
646 struct block *from, struct queue *from_q)
648 size_t ret = ebd->len;
650 if (block_append_extra(to, ebd->base, ebd->off, ebd->len, MEM_ATOMIC))
652 block_and_q_lost_extra(from, from_q, ebd->len);
653 ebd->base = ebd->len = ebd->off = 0;
657 /* Copy up to len bytes from q->bfirst to @to, leaving the block in place. May
658 * return with less than len, but greater than 0, even if there is more
661 * At any moment that we have copied anything and things are tricky, we can just
662 * return. The trickiness comes from a bunch of variables: is the main body
663 * empty? How do we split the ebd? If our alloc fails, then we can fall back
664 * to @to's main body, but only if we haven't used it yet. */
665 static size_t copy_from_first_block(struct queue *q, struct block *to,
668 struct block *from = q->bfirst;
669 size_t copy_amt, amt;
670 struct extra_bdata *ebd;
672 assert(len < BLEN(from)); /* sanity */
673 /* Try to extract from the main body */
674 copy_amt = MIN(BHLEN(from), len);
676 copy_amt = copy_to_block_body(to, from->rp, copy_amt);
677 from->rp += copy_amt;
678 /* We only change dlen, (data len), not q->len, since the q still has
679 * the same block memory allocation (no kfrees happened) */
681 q->bytes_read += copy_amt;
683 /* Try to extract the remainder from the extra data */
685 for (int i = 0; (i < from->nr_extra_bufs) && len; i++) {
686 ebd = &from->extra_data[i];
687 if (!ebd->base || !ebd->len)
689 if (len >= ebd->len) {
690 amt = move_ebd(ebd, to, from, q);
692 /* our internal alloc could have failed. this ebd is now the
693 * last one we'll consider. let's handle it separately and put
694 * it in the main body. */
697 copy_amt = copy_to_block_body(to, (void*)ebd->base + ebd->off,
699 block_and_q_lost_extra(from, q, copy_amt);
706 /* If we're here, we reached our final ebd, which we'll need to
707 * split to get anything from it. */
710 copy_amt = copy_to_block_body(to, (void*)ebd->base + ebd->off,
712 ebd->off += copy_amt;
713 ebd->len -= copy_amt;
714 block_and_q_lost_extra(from, q, copy_amt);
719 assert(copy_amt); /* sanity */
723 /* Return codes for __qbread and __try_qbread. */
727 QBR_SPARE, /* we need a spare block */
728 QBR_AGAIN, /* do it again, we are coalescing blocks */
731 /* Helper and back-end for __qbread: extracts and returns a list of blocks
732 * containing up to len bytes. It may contain less than len even if q has more
735 * Returns a code interpreted by __qbread, and the returned blist in ret. */
736 static int __try_qbread(struct queue *q, size_t len, int qio_flags,
737 struct block **real_ret, struct block *spare)
739 struct block *ret, *ret_last, *first;
741 bool was_unwritable = FALSE;
743 if (qio_flags & QIO_CAN_ERR_SLEEP) {
744 if (!qwait_and_ilock(q, qio_flags)) {
745 spin_unlock_irqsave(&q->lock);
748 /* we qwaited and still hold the lock, so the q is not empty */
751 spin_lock_irqsave(&q->lock);
754 spin_unlock_irqsave(&q->lock);
758 /* We need to check before adjusting q->len. We're checking the writer's
759 * sleep condition / tap condition. When set, we *might* be making an edge
760 * transition (from unwritable to writable), which needs to wake and fire
761 * taps. But, our read might not drain the queue below q->lim. We'll check
762 * again later to see if we should really wake them. */
763 was_unwritable = !qwritable(q);
765 if ((q->state & Qcoalesce) && (blen == 0)) {
766 freeb(pop_first_block(q));
767 spin_unlock_irqsave(&q->lock);
768 /* Need to retry to make sure we have a first block */
771 /* Qmsg: just return the first block. Be careful, since our caller might
772 * not read all of the block and thus drop bytes. Similar to SOCK_DGRAM. */
773 if (q->state & Qmsg) {
774 ret = pop_first_block(q);
777 /* Let's get at least something first - makes the code easier. This way,
778 * we'll only ever split the block once. */
780 ret = pop_first_block(q);
783 /* need to split the block. we won't actually take the first block out
784 * of the queue - we're just extracting a little bit. */
786 /* We have nothing and need a spare block. Retry! */
787 spin_unlock_irqsave(&q->lock);
790 copy_from_first_block(q, spare, len);
794 /* At this point, we just grabbed the first block. We can try to grab some
795 * more, up to len (if they want). */
796 if (qio_flags & QIO_JUST_ONE_BLOCK)
799 while (q->bfirst && (len > 0)) {
800 blen = BLEN(q->bfirst);
801 if ((q->state & Qcoalesce) && (blen == 0)) {
802 /* remove the intermediate 0 blocks */
803 freeb(pop_first_block(q));
807 /* We could try to split the block, but that's a huge pain. For
808 * instance, we might need to move the main body of b into an
809 * extra_data of ret_last. lots of ways for that to fail, and lots
810 * of cases to consider. Easier to just bail out. This is why I
811 * did the first block above: we don't need to worry about this. */
814 ret_last->next = pop_first_block(q);
815 ret_last = ret_last->next;
819 /* Don't wake them up or fire tap if we didn't drain enough. */
821 was_unwritable = FALSE;
822 spin_unlock_irqsave(&q->lock);
823 if (was_unwritable) {
824 if (q->kick && !(qio_flags & QIO_DONT_KICK))
826 rendez_wakeup(&q->wr);
827 qwake_cb(q, FDTAP_FILT_WRITABLE);
833 /* Helper and front-end for __try_qbread: extracts and returns a list of blocks
834 * containing up to len bytes. It may contain less than len even if q has more
837 * Returns 0 if the q is closed, if it would require blocking and !CAN_BLOCK, or
838 * if it required a spare and the memory allocation failed.
840 * Technically, there's a weird corner case with !Qcoalesce and Qmsg where you
841 * could get a zero length block back. */
842 static struct block *__qbread(struct queue *q, size_t len, int qio_flags,
846 struct block *ret = 0;
847 struct block *volatile spare = 0; /* volatile for the waserror */
849 /* __try_qbread can throw, based on qio flags. */
850 if ((qio_flags & QIO_CAN_ERR_SLEEP) && waserror()) {
856 switch (__try_qbread(q, len, qio_flags, &ret, spare)) {
859 if (spare && (ret != spare))
864 /* Due to some nastiness, we need a fresh block so we can read out
865 * anything from the queue. 'len' seems like a reasonable amount.
866 * Maybe we can get away with less. */
867 spare = block_alloc(len, mem_flags);
869 /* Careful here: a memory failure (possible with MEM_ATOMIC)
870 * could look like 'no data in the queue' (QBR_FAIL). The only
871 * one who does is this qget(), who happens to know that we
872 * won't need a spare, due to the len argument. Spares are only
873 * needed when we need to split a block. */
879 /* if the first block is 0 and we are Qcoalesce, then we'll need to
880 * try again. We bounce out of __try so we can perform the "is
881 * there a block" logic again from the top. */
887 if (qio_flags & QIO_CAN_ERR_SLEEP)
893 * get next block from a queue, return null if nothing there
895 struct block *qget(struct queue *q)
897 /* since len == SIZE_MAX, we should never need to do a mem alloc */
898 return __qbread(q, SIZE_MAX, QIO_JUST_ONE_BLOCK, MEM_ATOMIC);
901 /* Throw away the next 'len' bytes in the queue returning the number actually
904 * If the bytes are in the queue, then they must be discarded. The only time to
905 * return less than len is if the q itself has less than len bytes.
907 * This won't trigger a kick when waking up any sleepers. This seems to be Plan
908 * 9's intent, since the TCP stack will deadlock if qdiscard kicks. */
909 size_t qdiscard(struct queue *q, size_t len)
915 /* This is racy. There could be multiple qdiscarders or other consumers,
916 * where the consumption could be interleaved. */
917 while (qlen(q) && len) {
918 blist = __qbread(q, len, QIO_DONT_KICK, MEM_WAIT);
919 removed_amt = freeblist(blist);
920 sofar += removed_amt;
926 ssize_t qpass(struct queue *q, struct block *b)
928 return __qbwrite(q, b, QIO_LIMIT | QIO_DROP_OVERFLOW);
931 ssize_t qpassnolim(struct queue *q, struct block *b)
933 return __qbwrite(q, b, 0);
937 * if the allocated space is way out of line with the used
938 * space, reallocate to a smaller block
940 struct block *packblock(struct block *bp)
942 struct block **l, *nbp;
947 for (l = &bp; *l; l = &(*l)->next) {
950 if ((n << 2) < BALLOC(nbp)) {
951 *l = block_alloc(n, MEM_WAIT);
952 memmove((*l)->wp, nbp->rp, n);
954 (*l)->next = nbp->next;
962 /* Add an extra_data entry to newb at newb_idx pointing to b's body, starting at
963 * body_rp, for up to len. Returns the len consumed.
965 * The base is 'b', so that we can kfree it later. This currently ties us to
966 * using kfree for the release method for all extra_data.
968 * It is possible to have a body size that is 0, if there is no offset, and
969 * b->wp == b->rp. This will have an extra data entry of 0 length. */
970 static size_t point_to_body(struct block *b, uint8_t *body_rp,
971 struct block *newb, unsigned int newb_idx,
974 struct extra_bdata *ebd = &newb->extra_data[newb_idx];
976 assert(newb_idx < newb->nr_extra_bufs);
979 ebd->base = (uintptr_t)b;
980 ebd->off = (uint32_t)(body_rp - (uint8_t*)b);
981 ebd->len = MIN(b->wp - body_rp, len); /* think of body_rp as b->rp */
982 assert((int)ebd->len >= 0);
983 newb->extra_len += ebd->len;
987 /* Add an extra_data entry to newb at newb_idx pointing to b's b_idx'th
988 * extra_data buf, at b_off within that buffer, for up to len. Returns the len
991 * We can have blocks with 0 length, but they are still refcnt'd. See above. */
992 static size_t point_to_buf(struct block *b, unsigned int b_idx, uint32_t b_off,
993 struct block *newb, unsigned int newb_idx,
996 struct extra_bdata *n_ebd = &newb->extra_data[newb_idx];
997 struct extra_bdata *b_ebd = &b->extra_data[b_idx];
999 assert(b_idx < b->nr_extra_bufs);
1000 assert(newb_idx < newb->nr_extra_bufs);
1002 kmalloc_incref((void*)b_ebd->base);
1003 n_ebd->base = b_ebd->base;
1004 n_ebd->off = b_ebd->off + b_off;
1005 n_ebd->len = MIN(b_ebd->len - b_off, len);
1006 newb->extra_len += n_ebd->len;
1010 /* given a string of blocks, sets up the new block's extra_data such that it
1011 * *points* to the contents of the blist [offset, len + offset). This does not
1012 * make a separate copy of the contents of the blist.
1014 * returns 0 on success. the only failure is if the extra_data array was too
1015 * small, so this returns a positive integer saying how big the extra_data needs
1018 * callers are responsible for protecting the list structure. */
1019 static int __blist_clone_to(struct block *blist, struct block *newb, int len,
1022 struct block *b, *first;
1023 unsigned int nr_bufs = 0;
1024 unsigned int b_idx, newb_idx = 0;
1025 uint8_t *first_main_body = 0;
1027 /* find the first block; keep offset relative to the latest b in the list */
1028 for (b = blist; b; b = b->next) {
1029 if (BLEN(b) > offset)
1033 /* qcopy semantics: if you asked for an offset outside the block list, you
1034 * get an empty block back */
1038 /* upper bound for how many buffers we'll need in newb */
1039 for (/* b is set*/; b; b = b->next) {
1040 nr_bufs += 1 + b->nr_extra_bufs; /* 1 for the main body */
1042 /* we might be holding a spinlock here, so we won't wait for kmalloc */
1043 if (block_add_extd(newb, nr_bufs, 0) != 0) {
1044 /* caller will need to alloc these, then re-call us */
1047 for (b = first; b && len; b = b->next) {
1050 if (offset < BHLEN(b)) {
1051 /* off is in the main body */
1052 len -= point_to_body(b, b->rp + offset, newb, newb_idx, len);
1055 /* off is in one of the buffers (or just past the last one).
1056 * we're not going to point to b's main body at all. */
1058 assert(b->extra_data);
1059 /* assuming these extrabufs are packed, or at least that len
1060 * isn't gibberish */
1061 while (b->extra_data[b_idx].len <= offset) {
1062 offset -= b->extra_data[b_idx].len;
1065 /* now offset is set to our offset in the b_idx'th buf */
1066 len -= point_to_buf(b, b_idx, offset, newb, newb_idx, len);
1072 len -= point_to_body(b, b->rp, newb, newb_idx, len);
1075 /* knock out all remaining bufs. we only did one point_to_ op by now,
1076 * and any point_to_ could be our last if it consumed all of len. */
1077 for (int i = b_idx; (i < b->nr_extra_bufs) && len; i++) {
1078 len -= point_to_buf(b, i, 0, newb, newb_idx, len);
1085 struct block *blist_clone(struct block *blist, int header_len, int len,
1089 struct block *newb = block_alloc(header_len, MEM_WAIT);
1091 ret = __blist_clone_to(blist, newb, len, offset);
1093 block_add_extd(newb, ret, MEM_WAIT);
1098 /* given a queue, makes a single block with header_len reserved space in the
1099 * block main body, and the contents of [offset, len + offset) pointed to in the
1100 * new blocks ext_data. This does not make a copy of the q's contents, though
1101 * you do have a ref count on the memory. */
1102 struct block *qclone(struct queue *q, int header_len, int len, uint32_t offset)
1105 struct block *newb = block_alloc(header_len, MEM_WAIT);
1106 /* the while loop should rarely be used: it would require someone
1107 * concurrently adding to the queue. */
1109 /* TODO: RCU: protecting the q list (b->next) (need read lock) */
1110 spin_lock_irqsave(&q->lock);
1111 ret = __blist_clone_to(q->bfirst, newb, len, offset);
1112 spin_unlock_irqsave(&q->lock);
1114 block_add_extd(newb, ret, MEM_WAIT);
1120 * copy from offset in the queue
1122 struct block *qcopy_old(struct queue *q, int len, uint32_t offset)
1126 struct block *b, *nb;
1129 nb = block_alloc(len, MEM_WAIT);
1131 spin_lock_irqsave(&q->lock);
1135 for (sofar = 0;; sofar += n) {
1137 spin_unlock_irqsave(&q->lock);
1141 if (sofar + n > offset) {
1142 p = b->rp + offset - sofar;
1143 n -= offset - sofar;
1146 QDEBUG checkb(b, "qcopy");
1150 /* copy bytes from there */
1151 for (sofar = 0; sofar < len;) {
1152 if (n > len - sofar)
1155 memmove(nb->wp, p, n);
1165 spin_unlock_irqsave(&q->lock);
1170 struct block *qcopy(struct queue *q, int len, uint32_t offset)
1172 #ifdef CONFIG_BLOCK_EXTRAS
1173 return qclone(q, 0, len, offset);
1175 return qcopy_old(q, len, offset);
1179 static void qinit_common(struct queue *q)
1181 spinlock_init_irqsave(&q->lock);
1182 rendez_init(&q->rr);
1183 rendez_init(&q->wr);
1187 * called by non-interrupt code
1189 struct queue *qopen(int limit, int msg, void (*kick) (void *), void *arg)
1193 q = kzmalloc(sizeof(struct queue), 0);
1198 q->limit = q->inilim = limit;
1207 /* open a queue to be bypassed */
1208 struct queue *qbypass(void (*bypass) (void *, struct block *), void *arg)
1212 q = kzmalloc(sizeof(struct queue), 0);
1225 static int notempty(void *a)
1227 struct queue *q = a;
1229 return (q->state & Qclosed) || q->bfirst != 0;
1232 /* Block, waiting for the queue to be non-empty or closed. Returns with
1233 * the spinlock held. Returns TRUE when there queue is not empty, FALSE if it
1234 * was naturally closed. Throws an error o/w. */
1235 static bool qwait_and_ilock(struct queue *q, int qio_flags)
1238 spin_lock_irqsave(&q->lock);
1239 if (q->bfirst != NULL)
1241 if (q->state & Qclosed) {
1243 spin_unlock_irqsave(&q->lock);
1244 error(EPIPE, "multiple reads on a closed queue");
1247 spin_unlock_irqsave(&q->lock);
1248 error(EPIPE, q->err);
1252 if (qio_flags & QIO_NON_BLOCK) {
1253 spin_unlock_irqsave(&q->lock);
1254 error(EAGAIN, "queue empty");
1256 spin_unlock_irqsave(&q->lock);
1257 /* As with the producer side, we check for a condition while holding the
1258 * q->lock, decide to sleep, then unlock. It's like the "check, signal,
1259 * check again" pattern, but we do it conditionally. Both sides agree
1260 * synchronously to do it, and those decisions are made while holding
1261 * q->lock. I think this is OK.
1263 * The invariant is that no reader sleeps when the queue has data.
1264 * While holding the rendez lock, if we see there's no data, we'll
1265 * sleep. Since we saw there was no data, the next writer will see (or
1266 * already saw) no data, and then the writer decides to rendez_wake,
1267 * which will grab the rendez lock. If the writer already did that,
1268 * then we'll see notempty when we do our check-again. */
1269 rendez_sleep(&q->rr, notempty, q);
1274 * add a block list to a queue
1275 * XXX basically the same as enqueue blist, and has no locking!
1277 void qaddlist(struct queue *q, struct block *b)
1280 /* queue the block */
1285 q->dlen += blocklen(b);
1291 static size_t read_from_block(struct block *b, uint8_t *to, size_t amt)
1293 size_t copy_amt, retval = 0;
1294 struct extra_bdata *ebd;
1296 copy_amt = MIN(BHLEN(b), amt);
1297 memcpy(to, b->rp, copy_amt);
1298 /* advance the rp, since this block not be completely consumed and future
1299 * reads need to know where to pick up from */
1304 for (int i = 0; (i < b->nr_extra_bufs) && amt; i++) {
1305 ebd = &b->extra_data[i];
1306 /* skip empty entires. if we track this in the struct block, we can
1307 * just start the for loop early */
1308 if (!ebd->base || !ebd->len)
1310 copy_amt = MIN(ebd->len, amt);
1311 memcpy(to, (void*)(ebd->base + ebd->off), copy_amt);
1312 /* we're actually consuming the entries, just like how we advance rp up
1313 * above, and might only consume part of one. */
1314 ebd->len -= copy_amt;
1315 ebd->off += copy_amt;
1316 b->extra_len -= copy_amt;
1318 /* we don't actually have to decref here. it's also done in
1319 * freeb(). this is the earliest we can free. */
1320 kfree((void*)ebd->base);
1321 ebd->base = ebd->off = 0;
1331 * copy the contents of a string of blocks into
1332 * memory. emptied blocks are freed. return
1333 * pointer to first unconsumed block.
1335 struct block *bl2mem(uint8_t * p, struct block *b, int n)
1340 /* could be slicker here, since read_from_block is smart */
1341 for (; b != NULL; b = next) {
1344 /* partial block, consume some */
1345 read_from_block(b, p, n);
1348 /* full block, consume all and move on */
1349 i = read_from_block(b, p, i);
1358 /* Extract the contents of all blocks and copy to va, up to len. Returns the
1359 * actual amount copied. */
1360 static size_t read_all_blocks(struct block *b, void *va, size_t len)
1366 /* We should be draining every block completely. */
1367 assert(BLEN(b) <= len - sofar);
1371 sofar += read_from_block(b, va + sofar, len - sofar);
1380 * copy the contents of memory into a string of blocks.
1381 * return NULL on error.
1383 struct block *mem2bl(uint8_t * p, int len)
1387 struct block *b, *first, **l;
1400 *l = b = block_alloc(n, MEM_WAIT);
1401 /* TODO consider extra_data */
1402 memmove(b->wp, p, n);
1414 * put a block back to the front of the queue
1415 * called with q ilocked
1417 void qputback(struct queue *q, struct block *b)
1419 b->next = q->bfirst;
1420 if (q->bfirst == NULL)
1424 /* qputback seems to undo a read, so we can undo the accounting too. */
1425 q->bytes_read -= BLEN(b);
1429 * get next block from a queue (up to a limit)
1432 struct block *qbread(struct queue *q, size_t len)
1434 return __qbread(q, len, QIO_JUST_ONE_BLOCK | QIO_CAN_ERR_SLEEP, MEM_WAIT);
1437 struct block *qbread_nonblock(struct queue *q, size_t len)
1439 return __qbread(q, len, QIO_JUST_ONE_BLOCK | QIO_CAN_ERR_SLEEP |
1440 QIO_NON_BLOCK, MEM_WAIT);
1443 /* read up to len from a queue into vp. */
1444 size_t qread(struct queue *q, void *va, size_t len)
1446 struct block *blist = __qbread(q, len, QIO_CAN_ERR_SLEEP, MEM_WAIT);
1450 return read_all_blocks(blist, va, len);
1453 size_t qread_nonblock(struct queue *q, void *va, size_t len)
1455 struct block *blist = __qbread(q, len, QIO_CAN_ERR_SLEEP | QIO_NON_BLOCK,
1460 return read_all_blocks(blist, va, len);
1463 /* This is the rendez wake condition for writers. */
1464 static int qwriter_should_wake(void *a)
1466 struct queue *q = a;
1468 return qwritable(q) || (q->state & Qclosed);
1471 /* Helper: enqueues a list of blocks to a queue. Returns the total length. */
1472 static size_t enqueue_blist(struct queue *q, struct block *b)
1490 /* Adds block (which can be a list of blocks) to the queue, subject to
1491 * qio_flags. Returns the length written on success or -1 on non-throwable
1492 * error. Adjust qio_flags to control the value-added features!. */
1493 static ssize_t __qbwrite(struct queue *q, struct block *b, int qio_flags)
1496 bool was_unreadable;
1500 (*q->bypass) (q->arg, b);
1503 spin_lock_irqsave(&q->lock);
1504 was_unreadable = q->dlen == 0;
1505 if (q->state & Qclosed) {
1506 spin_unlock_irqsave(&q->lock);
1508 if (!(qio_flags & QIO_CAN_ERR_SLEEP))
1511 error(EPIPE, q->err);
1513 error(EPIPE, "connection closed");
1515 if ((qio_flags & QIO_LIMIT) && (q->dlen >= q->limit)) {
1516 /* drop overflow takes priority over regular non-blocking */
1517 if ((qio_flags & QIO_DROP_OVERFLOW) || (q->state & Qdropoverflow)) {
1518 spin_unlock_irqsave(&q->lock);
1522 /* People shouldn't set NON_BLOCK without CAN_ERR, but we can be nice
1524 if ((qio_flags & QIO_CAN_ERR_SLEEP) && (qio_flags & QIO_NON_BLOCK)) {
1525 spin_unlock_irqsave(&q->lock);
1527 error(EAGAIN, "queue full");
1530 ret = enqueue_blist(q, b);
1531 QDEBUG checkb(b, "__qbwrite");
1532 spin_unlock_irqsave(&q->lock);
1533 /* TODO: not sure if the usage of a kick is mutually exclusive with a
1534 * wakeup, meaning that actual users either want a kick or have qreaders. */
1535 if (q->kick && (was_unreadable || (q->state & Qkick)))
1537 if (was_unreadable) {
1538 /* Unlike the read side, there's no double-check to make sure the queue
1539 * transitioned across an edge. We know we added something, so that's
1540 * enough. We wake if the queue was empty. Both sides are the same, in
1541 * that the condition for which we do the rendez_wakeup() is the same as
1542 * the condition done for the rendez_sleep(). */
1543 rendez_wakeup(&q->rr);
1544 qwake_cb(q, FDTAP_FILT_READABLE);
1547 * flow control, wait for queue to get below the limit
1548 * before allowing the process to continue and queue
1549 * more. We do this here so that postnote can only
1550 * interrupt us after the data has been queued. This
1551 * means that things like 9p flushes and ssl messages
1552 * will not be disrupted by software interrupts.
1554 * Note - this is moderately dangerous since a process
1555 * that keeps getting interrupted and rewriting will
1556 * queue infinite crud.
1558 if ((qio_flags & QIO_CAN_ERR_SLEEP) &&
1559 !(q->state & Qdropoverflow) && !(qio_flags & QIO_NON_BLOCK)) {
1560 /* This is a racy peek at the q status. If we accidentally block, our
1561 * rendez will return. The rendez's peak (qwriter_should_wake) is also
1562 * racy w.r.t. the q's spinlock (that lock protects writes, but not
1565 * Here's the deal: when holding the rendez lock, if we see the sleep
1566 * condition, the consumer will wake us. The condition will only ever
1567 * be changed by the next qbread() (consumer, changes q->dlen). That
1568 * code will do a rendez wake, which will spin on the rendez lock,
1569 * meaning it won't procede until we either see the new state (and
1570 * return) or put ourselves on the rendez, and wake up.
1572 * The pattern is one side writes mem, then signals. Our side checks
1573 * the signal, then reads the mem. The goal is to not miss seeing the
1574 * signal AND missing the memory write. In this specific case, the
1575 * signal is actually synchronous (the rendez lock) and not basic shared
1578 * Oh, and we spin in case we woke early and someone else filled the
1579 * queue, mesa-style. */
1580 while (!qwriter_should_wake(q))
1581 rendez_sleep(&q->wr, qwriter_should_wake, q);
1587 * add a block to a queue obeying flow control
1589 ssize_t qbwrite(struct queue *q, struct block *b)
1591 return __qbwrite(q, b, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
1594 ssize_t qbwrite_nonblock(struct queue *q, struct block *b)
1596 return __qbwrite(q, b, QIO_CAN_ERR_SLEEP | QIO_LIMIT | QIO_NON_BLOCK);
1599 ssize_t qibwrite(struct queue *q, struct block *b)
1601 return __qbwrite(q, b, 0);
1604 /* Helper, allocs a block and copies [from, from + len) into it. Returns the
1605 * block on success, 0 on failure. */
1606 static struct block *build_block(void *from, size_t len, int mem_flags)
1611 /* If len is small, we don't need to bother with the extra_data. But until
1612 * the whole stack can handle extd blocks, we'll use them unconditionally.
1614 #ifdef CONFIG_BLOCK_EXTRAS
1615 /* allocb builds in 128 bytes of header space to all blocks, but this is
1616 * only available via padblock (to the left). we also need some space
1617 * for pullupblock for some basic headers (like icmp) that get written
1619 b = block_alloc(64, mem_flags);
1622 ext_buf = kmalloc(len, mem_flags);
1627 memcpy(ext_buf, from, len);
1628 if (block_add_extd(b, 1, mem_flags)) {
1633 b->extra_data[0].base = (uintptr_t)ext_buf;
1634 b->extra_data[0].off = 0;
1635 b->extra_data[0].len = len;
1636 b->extra_len += len;
1638 b = block_alloc(len, mem_flags);
1641 memmove(b->wp, from, len);
1647 static ssize_t __qwrite(struct queue *q, void *vp, size_t len, int mem_flags,
1652 volatile size_t sofar = 0; /* volatile for the waserror */
1657 /* Only some callers can throw. Others might be in a context where waserror
1659 if ((qio_flags & QIO_CAN_ERR_SLEEP) && waserror()) {
1660 /* Any error (EAGAIN for nonblock, syscall aborted, even EPIPE) after
1661 * some data has been sent should be treated as a partial write. */
1668 /* This is 64K, the max amount per single block. Still a good value? */
1671 b = build_block(p + sofar, n, mem_flags);
1674 if (__qbwrite(q, b, qio_flags) < 0)
1677 } while ((sofar < len) && (q->state & Qmsg) == 0);
1679 if (qio_flags & QIO_CAN_ERR_SLEEP)
1684 ssize_t qwrite(struct queue *q, void *vp, int len)
1686 return __qwrite(q, vp, len, MEM_WAIT, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
1689 ssize_t qwrite_nonblock(struct queue *q, void *vp, int len)
1691 return __qwrite(q, vp, len, MEM_WAIT, QIO_CAN_ERR_SLEEP | QIO_LIMIT |
1695 ssize_t qiwrite(struct queue *q, void *vp, int len)
1697 return __qwrite(q, vp, len, MEM_ATOMIC, 0);
1701 * be extremely careful when calling this,
1702 * as there is no reference accounting
1704 void qfree(struct queue *q)
1711 * Mark a queue as closed. No further IO is permitted.
1712 * All blocks are released.
1714 void qclose(struct queue *q)
1716 struct block *bfirst;
1722 spin_lock_irqsave(&q->lock);
1723 q->state |= Qclosed;
1724 q->state &= ~Qdropoverflow;
1729 spin_unlock_irqsave(&q->lock);
1731 /* free queued blocks */
1734 /* wake up readers/writers */
1735 rendez_wakeup(&q->rr);
1736 rendez_wakeup(&q->wr);
1737 qwake_cb(q, FDTAP_FILT_HANGUP);
1740 /* Mark a queue as closed. Wakeup any readers. Don't remove queued blocks.
1742 * msg will be the errstr received by any waiters (qread, qbread, etc). If
1743 * there is no message, which is what also happens during a natural qclose(),
1744 * those waiters will simply return 0. qwriters will always error() on a
1745 * closed/hungup queue. */
1746 void qhangup(struct queue *q, char *msg)
1749 spin_lock_irqsave(&q->lock);
1750 q->state |= Qclosed;
1751 if (msg == 0 || *msg == 0)
1754 strlcpy(q->err, msg, ERRMAX);
1755 spin_unlock_irqsave(&q->lock);
1757 /* wake up readers/writers */
1758 rendez_wakeup(&q->rr);
1759 rendez_wakeup(&q->wr);
1760 qwake_cb(q, FDTAP_FILT_HANGUP);
1764 * return non-zero if the q is hungup
1766 int qisclosed(struct queue *q)
1768 return q->state & Qclosed;
1772 * mark a queue as no longer hung up. resets the wake_cb.
1774 void qreopen(struct queue *q)
1776 spin_lock_irqsave(&q->lock);
1777 q->state &= ~Qclosed;
1779 q->limit = q->inilim;
1782 spin_unlock_irqsave(&q->lock);
1786 * return bytes queued
1788 int qlen(struct queue *q)
1793 size_t q_bytes_read(struct queue *q)
1795 return q->bytes_read;
1799 * return space remaining before flow control
1802 * q->len < q->limit/2
1803 * but it slows down tcp too much for certain write sizes.
1804 * I really don't understand it completely. It may be
1805 * due to the queue draining so fast that the transmission
1806 * stalls waiting for the app to produce more data. - presotto
1808 * q->len was the amount of bytes, which is no longer used. we now use
1809 * q->dlen, the amount of usable data. a.k.a. qlen()... - brho
1811 int qwindow(struct queue *q)
1815 l = q->limit - q->dlen;
1822 * return true if we can read without blocking
1824 int qcanread(struct queue *q)
1826 return q->bfirst != 0;
1830 * change queue limit
1832 void qsetlimit(struct queue *q, size_t limit)
1834 bool was_writable = qwritable(q);
1837 if (!was_writable && qwritable(q)) {
1838 rendez_wakeup(&q->wr);
1839 qwake_cb(q, FDTAP_FILT_WRITABLE);
1843 size_t qgetlimit(struct queue *q)
1849 * set whether writes drop overflowing blocks, or if we sleep
1851 void qdropoverflow(struct queue *q, bool onoff)
1853 spin_lock_irqsave(&q->lock);
1855 q->state |= Qdropoverflow;
1857 q->state &= ~Qdropoverflow;
1858 spin_unlock_irqsave(&q->lock);
1861 /* Be careful: this can affect concurrent reads/writes and code that might have
1862 * built-in expectations of the q's type. */
1863 void q_toggle_qmsg(struct queue *q, bool onoff)
1865 spin_lock_irqsave(&q->lock);
1870 spin_unlock_irqsave(&q->lock);
1873 /* Be careful: this can affect concurrent reads/writes and code that might have
1874 * built-in expectations of the q's type. */
1875 void q_toggle_qcoalesce(struct queue *q, bool onoff)
1877 spin_lock_irqsave(&q->lock);
1879 q->state |= Qcoalesce;
1881 q->state &= ~Qcoalesce;
1882 spin_unlock_irqsave(&q->lock);
1886 * flush the output queue
1888 void qflush(struct queue *q)
1890 struct block *bfirst;
1893 spin_lock_irqsave(&q->lock);
1897 spin_unlock_irqsave(&q->lock);
1899 /* free queued blocks */
1902 /* wake up writers */
1903 rendez_wakeup(&q->wr);
1904 qwake_cb(q, FDTAP_FILT_WRITABLE);
1907 int qfull(struct queue *q)
1909 return !qwritable(q);
1912 int qstate(struct queue *q)
1917 void qdump(struct queue *q)
1920 printk("q=%p bfirst=%p blast=%p dlen=%d limit=%d state=#%x\n",
1921 q, q->bfirst, q->blast, q->dlen, q->limit, q->state);
1924 /* On certain wakeup events, qio will call func(q, data, filter), where filter
1925 * marks the type of wakeup event (flags from FDTAP).
1927 * There's no sync protection. If you change the CB while the qio is running,
1928 * you might get a CB with the data or func from a previous set_wake_cb. You
1929 * should set this once per queue and forget it.
1931 * You can remove the CB by passing in 0 for the func. Alternatively, you can
1932 * just make sure that the func(data) pair are valid until the queue is freed or
1934 void qio_set_wake_cb(struct queue *q, qio_wake_cb_t func, void *data)
1936 q->wake_data = data;
1937 wmb(); /* if we see func, we'll also see the data for it */
1941 /* Helper for detecting whether we'll block on a read at this instant. */
1942 bool qreadable(struct queue *q)
1947 /* Helper for detecting whether we'll block on a write at this instant. */
1948 bool qwritable(struct queue *q)
1950 return !q->limit || qwindow(q) > 0;