akaros/kern/src/ns/qio.c
<<
>>
Prefs
   1/* Copyright © 1994-1999 Lucent Technologies Inc.  All rights reserved.
   2 * Portions Copyright © 1997-1999 Vita Nuova Limited
   3 * Portions Copyright © 2000-2007 Vita Nuova Holdings Limited
   4 *                                (www.vitanuova.com)
   5 * Revisions Copyright © 2000-2007 Lucent Technologies Inc. and others
   6 *
   7 * Modified for the Akaros operating system:
   8 * Copyright (c) 2013-2014 The Regents of the University of California
   9 * Copyright (c) 2013-2015 Google Inc.
  10 *
  11 * Permission is hereby granted, free of charge, to any person obtaining a copy
  12 * of this software and associated documentation files (the "Software"), to deal
  13 * in the Software without restriction, including without limitation the rights
  14 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  15 * copies of the Software, and to permit persons to whom the Software is
  16 * furnished to do so, subject to the following conditions:
  17 *
  18 * The above copyright notice and this permission notice shall be included in
  19 * all copies or substantial portions of the Software.
  20 *
  21 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  22 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  23 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL THE
  24 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  25 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  26 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  27 * SOFTWARE. */
  28
  29#include <slab.h>
  30#include <kmalloc.h>
  31#include <kref.h>
  32#include <string.h>
  33#include <stdio.h>
  34#include <assert.h>
  35#include <error.h>
  36#include <cpio.h>
  37#include <pmap.h>
  38#include <smp.h>
  39#include <net/ip.h>
  40
  41static uint32_t padblockcnt;
  42static uint32_t concatblockcnt;
  43static uint32_t pullupblockcnt;
  44static uint32_t copyblockcnt;
  45static uint32_t consumecnt;
  46static uint32_t producecnt;
  47static uint32_t qcopycnt;
  48
  49static int debugging;
  50
  51#define QDEBUG  if(0)
  52
  53/*
  54 *  IO queues
  55 */
  56
  57struct queue {
  58        spinlock_t lock;;
  59
  60        struct block *bfirst;           /* buffer */
  61        struct block *blast;
  62
  63        int dlen;                       /* data bytes in queue */
  64        int limit;                      /* max bytes in queue */
  65        int inilim;                     /* initial limit */
  66        int state;
  67        int eof;                        /* number of eofs read by user */
  68        size_t bytes_read;
  69
  70        void (*kick) (void *);          /* restart output */
  71        void (*bypass) (void *, struct block *); /* bypass queue altogether */
  72        void *arg;                      /* argument to kick */
  73
  74        struct rendez rr;               /* process waiting to read */
  75        struct rendez wr;               /* process waiting to write */
  76        qio_wake_cb_t wake_cb;          /* callbacks for qio wakeups */
  77        void *wake_data;
  78
  79        char err[ERRMAX];
  80};
  81
  82enum {
  83        Maxatomic = 64 * 1024,
  84        QIO_CAN_ERR_SLEEP = (1 << 0),   /* can throw errors or block/sleep */
  85        QIO_LIMIT = (1 << 1),           /* respect q->limit */
  86        QIO_DROP_OVERFLOW = (1 << 2),   /* alternative to qdropoverflow */
  87        QIO_JUST_ONE_BLOCK = (1 << 3),  /* when qbreading, just get one block */
  88        QIO_NON_BLOCK = (1 << 4),       /* throw EAGAIN instead of blocking */
  89        QIO_DONT_KICK = (1 << 5),       /* don't kick when waking */
  90};
  91
  92unsigned int qiomaxatomic = Maxatomic;
  93
  94static ssize_t __qbwrite(struct queue *q, struct block *b, int flags);
  95static struct block *__qbread(struct queue *q, size_t len, int qio_flags,
  96                              int mem_flags);
  97static bool qwait_and_ilock(struct queue *q, int qio_flags);
  98
  99/* Helper: fires a wake callback, sending 'filter' */
 100static void qwake_cb(struct queue *q, int filter)
 101{
 102        if (q->wake_cb)
 103                q->wake_cb(q, q->wake_data, filter);
 104}
 105
 106void ixsummary(void)
 107{
 108        debugging ^= 1;
 109        printd("pad %lu, concat %lu, pullup %lu, copy %lu\n",
 110                   padblockcnt, concatblockcnt, pullupblockcnt, copyblockcnt);
 111        printd("consume %lu, produce %lu, qcopy %lu\n",
 112                   consumecnt, producecnt, qcopycnt);
 113}
 114
 115/* Pad a block to the front (or the back if size is negative).  Returns the
 116 * block pointer that you must use instead of bp.  i.e. bp = padblock(bp, x);
 117 *
 118 * This space is in the main body / header space, not the extra data.  In
 119 * essence, the block has size bytes of uninitialized data placed in the front
 120 * (left) of the old header data.  The caller needs to fill in that data,
 121 * presumably with packet headers.  Any block[offset] in the old block is now at
 122 * block[offset + size].
 123 *
 124 * Negative padding applies at the end of the block.  This means that there is
 125 * space in block header for size bytes (in between wp and lim).  Given that all
 126 * of the block 'padding' needs to be in the main header, that means we need to
 127 * linearize the entire block, such that the end padding is in the main
 128 * body/header space.
 129 */
 130struct block *padblock(struct block *bp, int size)
 131{
 132        int n;
 133        struct block *nbp;
 134
 135        QDEBUG checkb(bp, "padblock 1");
 136        if (size >= 0) {
 137                if (bp->rp - bp->base >= size) {
 138                        block_add_to_offsets(bp, size);
 139                        bp->rp -= size;
 140                        return bp;
 141                }
 142                if (bp->next)
 143                        panic("%s %p had a next", __func__, bp);
 144                n = BHLEN(bp);
 145                padblockcnt++;
 146                nbp = block_alloc(size + n, MEM_WAIT);
 147                block_copy_metadata(nbp, bp);
 148                block_replace_extras(nbp, bp);
 149                /* This effectively copies the old block main body such that we
 150                 * know we have size bytes to the left of rp.  All of the
 151                 * metadata offsets (e.g. tx_csum) are relative from this blob
 152                 * of data: i.e. nbp->rp + size. */
 153                nbp->rp += size;
 154                nbp->wp = nbp->rp;
 155                memmove(nbp->wp, bp->rp, n);
 156                nbp->wp += n;
 157                freeb(bp);
 158                block_add_to_offsets(nbp, size);
 159                nbp->rp -= size;
 160        } else {
 161                /* No one I know of calls this yet, so this is untested.  Maybe
 162                 * we can remove it. */
 163                warn_once("pad attempt with negative size %d", size);
 164                size = -size;
 165                if (bp->next)
 166                        panic("%s %p had a next", __func__, bp);
 167                if (bp->lim - bp->wp >= size)
 168                        return bp;
 169                /* Negative padding goes after all data.  In essence, we'll need
 170                 * to pull all block extra data up into the headers.  The
 171                 * easiest thing is to linearize, then do the old algorithm.  We
 172                 * may do extra allocations - if we ever use this, we can fix it
 173                 * up.  Maybe make linearizeblock/copyblock take neg-padding. */
 174                if (bp->extra_len) {
 175                        bp = linearizeblock(bp);
 176                        if (bp->lim - bp->wp >= size)
 177                                return bp;
 178                }
 179                n = BLEN(bp);
 180                padblockcnt++;
 181                nbp = block_alloc(size + n, MEM_WAIT);
 182                block_copy_metadata(nbp, bp);
 183                memmove(nbp->wp, bp->rp, n);
 184                nbp->wp += n;
 185                freeb(bp);
 186        }
 187        QDEBUG checkb(nbp, "padblock 1");
 188        return nbp;
 189}
 190
 191/*
 192 *  return count of bytes in a string of blocks
 193 */
 194int blocklen(struct block *bp)
 195{
 196        int len;
 197
 198        len = 0;
 199        while (bp) {
 200                len += BLEN(bp);
 201                bp = bp->next;
 202        }
 203        return len;
 204}
 205
 206/*
 207 * return count of space in blocks
 208 */
 209int blockalloclen(struct block *bp)
 210{
 211        int len;
 212
 213        len = 0;
 214        while (bp) {
 215                len += BALLOC(bp);
 216                bp = bp->next;
 217        }
 218        return len;
 219}
 220
 221/*
 222 *  copy the  string of blocks into
 223 *  a single block and free the string
 224 */
 225struct block *concatblock(struct block *bp)
 226{
 227        if (bp->next == 0)
 228                return bp;
 229        /* If this is slow, we can get fancy and append a bunch of ebds to bp
 230         * for each subsequent block in the list. */
 231        return pullupblock(bp, blocklen(bp));
 232}
 233
 234/* Makes an identical copy of the block, collapsing all the data into the block
 235 * body.  It does not point to the contents of the original, it is a copy
 236 * (unlike qclone).  Since we're copying, we might as well put the memory into
 237 * one contiguous chunk. */
 238struct block *copyblock(struct block *bp, int mem_flags)
 239{
 240        struct block *newb;
 241        struct extra_bdata *ebd;
 242        size_t amt;
 243
 244        QDEBUG checkb(bp, "copyblock 0");
 245        newb = block_alloc(BLEN(bp), mem_flags);
 246        if (!newb)
 247                return 0;
 248        amt = block_copy_to_body(newb, bp->rp, BHLEN(bp));
 249        assert(amt == BHLEN(bp));
 250        for (int i = 0; i < bp->nr_extra_bufs; i++) {
 251                ebd = &bp->extra_data[i];
 252                if (!ebd->base || !ebd->len)
 253                        continue;
 254                amt = block_copy_to_body(newb, (void*)ebd->base + ebd->off,
 255                                         ebd->len);
 256                assert(amt == ebd->len);
 257        }
 258        block_copy_metadata(newb, bp);
 259        copyblockcnt++;
 260        QDEBUG checkb(newb, "copyblock 1");
 261        return newb;
 262}
 263
 264/* Returns a block with the remaining contents of b all in the main body of the
 265 * returned block.  Replace old references to b with the returned value (which
 266 * may still be 'b', if no change was needed. */
 267struct block *linearizeblock(struct block *b)
 268{
 269        struct block *newb;
 270
 271        if (!b->extra_len)
 272                return b;
 273        newb = copyblock(b, MEM_WAIT);
 274        freeb(b);
 275        return newb;
 276}
 277
 278/* Helper for bookkeeping: we removed amt bytes from block b's ebd, which may
 279 * have emptied it. */
 280static void block_consume_ebd_bytes(struct block *b, struct extra_bdata *ebd,
 281                                    size_t amt)
 282{
 283        ebd->len -= amt;
 284        ebd->off += amt;
 285        b->extra_len -= amt;
 286        if (!ebd->len) {
 287                /* we don't actually have to decref here.  it's also
 288                 * done in freeb().  this is the earliest we can free. */
 289                kfree((void*)ebd->base);
 290                ebd->base = ebd->off = 0;
 291        }
 292}
 293
 294/* Helper: Yanks up to size bytes worth of extra data blocks from 'from', puts
 295 * them in the main body of 'to'.  Returns the amount yanked.  Will panic if
 296 * there is not enough room in 'to'. */
 297static size_t block_yank_ebd_bytes(struct block *to, struct block *from,
 298                                   size_t amt)
 299{
 300        size_t copy_amt, total = 0;
 301        struct extra_bdata *ebd;
 302
 303        for (int i = 0; amt && i < from->nr_extra_bufs; i++) {
 304                ebd = &from->extra_data[i];
 305                if (!ebd->base || !ebd->len)
 306                        continue;
 307                copy_amt = MIN(amt, ebd->len);
 308                copy_amt = block_copy_to_body(to, (void*)ebd->base + ebd->off,
 309                                              copy_amt);
 310                if (copy_amt != MIN(amt, ebd->len))
 311                        panic("'to' block didn't have enough space! %d %d %d",
 312                              amt, ebd->len, copy_amt);
 313                block_consume_ebd_bytes(from, ebd, copy_amt);
 314                total += copy_amt;
 315                amt -= copy_amt;
 316        }
 317        return total;
 318}
 319
 320/* Make sure the first block has at least n bytes in its headers/main body.
 321 * Pulls up data from the *list* of blocks.  Returns 0 if there is not enough
 322 * data in the block list.
 323 *
 324 * Any data to the *left* of rp, such as old network headers that were popped
 325 * off when processing an inbound packet, may be discarded. */
 326struct block *pullupblock(struct block *bp, size_t n)
 327{
 328        struct block *i;
 329        struct extra_bdata *ebd;
 330        size_t copy_amt;
 331
 332        /*
 333         *  this should almost always be true, it's
 334         *  just to avoid every caller checking.
 335         */
 336        if (BHLEN(bp) >= n)
 337                return bp;
 338
 339        /* If there's no chance, just bail out now.  This might be slightly
 340         * wasteful if there's a long blist that does have enough data. */
 341        if (n > blocklen(bp))
 342                return 0;
 343
 344        /* Replace bp with a new block with enough size, and yank up enough of
 345         * its ebds. */
 346        bp = block_realloc(bp, n);
 347        pullupblockcnt++;
 348        n -= BHLEN(bp);
 349        n -= block_yank_ebd_bytes(bp, bp, n);
 350        /* Need to pull from the remainder of the list, both the main bodies and
 351         * the ebds. */
 352        while (n) {
 353                i = bp->next;
 354                if (!i)
 355                        panic("Not enough b's in the list; we checked the len");
 356                copy_amt = MIN(BHLEN(i), n);
 357                block_copy_to_body(bp, i->rp, copy_amt);
 358                /* That may or may not have consumed all of the main body */
 359                i->rp += copy_amt;
 360                /* Subsequent blocks with offsets shouldn't be subject to
 361                 * pullup. */
 362                warn_on(i->tx_csum_offset || i->network_offset ||
 363                        i->transport_offset);
 364                n -= copy_amt;
 365                n -= block_yank_ebd_bytes(bp, i, n);
 366                if (!BLEN(i)) {
 367                        bp->next = i->next;
 368                        i->next = NULL;
 369                        freeb(i);
 370                } else {
 371                        assert(!n);
 372                }
 373        }
 374        return bp;
 375}
 376
 377/*
 378 *  make sure the first block has at least n bytes in its main body
 379 */
 380struct block *pullupqueue(struct queue *q, size_t n)
 381{
 382        struct block *b;
 383
 384        /* TODO: lock to protect the queue links? */
 385        if ((BHLEN(q->bfirst) >= n))
 386                return q->bfirst;
 387        /* This is restoring qio metadata.  If pullupblock did any work, it
 388         * changed the list - at least the first block and possibly the last in
 389         * the blist. */
 390        q->bfirst = pullupblock(q->bfirst, n);
 391        for (b = q->bfirst; b != NULL && b->next != NULL; b = b->next) ;
 392        q->blast = b;
 393        return q->bfirst;
 394}
 395
 396/* throw away count bytes from the front of
 397 * block's extradata.  Returns count of bytes
 398 * thrown away
 399 */
 400
 401static int pullext(struct block *bp, int count)
 402{
 403        struct extra_bdata *ed;
 404        int i, rem, bytes = 0;
 405
 406        for (i = 0; bp->extra_len && count && i < bp->nr_extra_bufs; i++) {
 407                ed = &bp->extra_data[i];
 408                rem = MIN(count, ed->len);
 409                bp->extra_len -= rem;
 410                count -= rem;
 411                bytes += rem;
 412                ed->off += rem;
 413                ed->len -= rem;
 414                if (ed->len == 0) {
 415                        kfree((void *)ed->base);
 416                        ed->base = 0;
 417                        ed->off = 0;
 418                }
 419        }
 420        return bytes;
 421}
 422
 423/* throw away count bytes from the end of a
 424 * block's extradata.  Returns count of bytes
 425 * thrown away
 426 */
 427
 428static int dropext(struct block *bp, int count)
 429{
 430        struct extra_bdata *ed;
 431        int i, rem, bytes = 0;
 432
 433        for (i = bp->nr_extra_bufs - 1; bp->extra_len && count && i >= 0; i--) {
 434                ed = &bp->extra_data[i];
 435                rem = MIN(count, ed->len);
 436                bp->extra_len -= rem;
 437                count -= rem;
 438                bytes += rem;
 439                ed->len -= rem;
 440                if (ed->len == 0) {
 441                        kfree((void *)ed->base);
 442                        ed->base = 0;
 443                        ed->off = 0;
 444                }
 445        }
 446        return bytes;
 447}
 448
 449/*
 450 *  throw away up to count bytes from a
 451 *  list of blocks.  Return count of bytes
 452 *  thrown away.
 453 */
 454static int _pullblock(struct block **bph, int count, int free)
 455{
 456        struct block *bp;
 457        int n, bytes;
 458
 459        bytes = 0;
 460        if (bph == NULL)
 461                return 0;
 462
 463        while (*bph != NULL && count != 0) {
 464                bp = *bph;
 465
 466                n = MIN(BHLEN(bp), count);
 467                bytes += n;
 468                count -= n;
 469                bp->rp += n;
 470                n = pullext(bp, count);
 471                bytes += n;
 472                count -= n;
 473                QDEBUG checkb(bp, "pullblock ");
 474                if (BLEN(bp) == 0 && (free || count)) {
 475                        *bph = bp->next;
 476                        bp->next = NULL;
 477                        freeb(bp);
 478                }
 479        }
 480        return bytes;
 481}
 482
 483int pullblock(struct block **bph, int count)
 484{
 485        return _pullblock(bph, count, 1);
 486}
 487
 488/*
 489 *  trim to len bytes starting at offset
 490 */
 491struct block *trimblock(struct block *bp, int offset, int len)
 492{
 493        uint32_t l, trim;
 494        int olen = len;
 495
 496        QDEBUG checkb(bp, "trimblock 1");
 497        if (blocklen(bp) < offset + len) {
 498                freeblist(bp);
 499                return NULL;
 500        }
 501
 502        l =_pullblock(&bp, offset, 0);
 503        if (bp == NULL)
 504                return NULL;
 505        if (l != offset) {
 506                freeblist(bp);
 507                return NULL;
 508        }
 509
 510        while ((l = BLEN(bp)) < len) {
 511                len -= l;
 512                bp = bp->next;
 513        }
 514
 515        trim = BLEN(bp) - len;
 516        trim -= dropext(bp, trim);
 517        bp->wp -= trim;
 518
 519        if (bp->next) {
 520                freeblist(bp->next);
 521                bp->next = NULL;
 522        }
 523        return bp;
 524}
 525
 526/* Adjust block @bp so that its size is exactly @len.
 527 * If the size is increased, fill in the new contents with zeros.
 528 * If the size is decreased, discard some of the old contents at the tail. */
 529struct block *adjustblock(struct block *bp, int len)
 530{
 531        struct extra_bdata *ebd;
 532        void *buf;
 533        int i;
 534
 535        if (len < 0) {
 536                freeb(bp);
 537                return NULL;
 538        }
 539
 540        if (len == BLEN(bp))
 541                return bp;
 542
 543        /* Shrink within block main body. */
 544        if (len <= BHLEN(bp)) {
 545                free_block_extra(bp);
 546                bp->wp = bp->rp + len;
 547                QDEBUG checkb(bp, "adjustblock 1");
 548                return bp;
 549        }
 550
 551        /* Need to grow. */
 552        if (len > BLEN(bp)) {
 553                /* Grow within block main body. */
 554                if (bp->extra_len == 0 && bp->rp + len <= bp->lim) {
 555                        memset(bp->wp, 0, len - BLEN(bp));
 556                        bp->wp = bp->rp + len;
 557                        QDEBUG checkb(bp, "adjustblock 2");
 558                        return bp;
 559                }
 560                /* Grow with extra data buffers. */
 561                buf = kzmalloc(len - BLEN(bp), MEM_WAIT);
 562                block_append_extra(bp, (uintptr_t)buf, 0, len - BLEN(bp),
 563                                   MEM_WAIT);
 564                QDEBUG checkb(bp, "adjustblock 3");
 565                return bp;
 566        }
 567
 568        /* Shrink extra data buffers.
 569         * len is how much of ebd we need to keep.
 570         * extra_len is re-accumulated. */
 571        assert(bp->extra_len > 0);
 572        len -= BHLEN(bp);
 573        bp->extra_len = 0;
 574        for (i = 0; i < bp->nr_extra_bufs; i++) {
 575                ebd = &bp->extra_data[i];
 576                if (len <= ebd->len)
 577                        break;
 578                len -= ebd->len;
 579                bp->extra_len += ebd->len;
 580        }
 581        /* If len becomes zero, extra_data[i] should be freed. */
 582        if (len > 0) {
 583                ebd = &bp->extra_data[i];
 584                ebd->len = len;
 585                bp->extra_len += ebd->len;
 586                i++;
 587        }
 588        for (; i < bp->nr_extra_bufs; i++) {
 589                ebd = &bp->extra_data[i];
 590                if (ebd->base)
 591                        kfree((void*)ebd->base);
 592                ebd->base = ebd->off = ebd->len = 0;
 593        }
 594        QDEBUG checkb(bp, "adjustblock 4");
 595        return bp;
 596}
 597
 598/* Helper: removes and returns the first block from q */
 599static struct block *pop_first_block(struct queue *q)
 600{
 601        struct block *b = q->bfirst;
 602
 603        q->dlen -= BLEN(b);
 604        q->bytes_read += BLEN(b);
 605        q->bfirst = b->next;
 606        b->next = 0;
 607        return b;
 608}
 609
 610/* Accounting helper.  Block b in q lost amt extra_data */
 611static void block_and_q_lost_extra(struct block *b, struct queue *q, size_t amt)
 612{
 613        b->extra_len -= amt;
 614        q->dlen -= amt;
 615        q->bytes_read += amt;
 616}
 617
 618/* Helper: moves ebd from a block (in from_q) to another block.  The *ebd is
 619 * fixed in 'from', so we move its contents and zero it out in 'from'.
 620 *
 621 * Returns the length moved (0 on failure). */
 622static size_t move_ebd(struct extra_bdata *ebd, struct block *to,
 623                       struct block *from, struct queue *from_q)
 624{
 625        size_t ret = ebd->len;
 626
 627        if (block_append_extra(to, ebd->base, ebd->off, ebd->len, MEM_ATOMIC))
 628                return 0;
 629        block_and_q_lost_extra(from, from_q, ebd->len);
 630        ebd->base = ebd->len = ebd->off = 0;
 631        return ret;
 632}
 633
 634/* Copy up to len bytes from q->bfirst to @to, leaving the block in place.  May
 635 * return with less than len, but greater than 0, even if there is more
 636 * available in q.
 637 *
 638 * At any moment that we have copied anything and things are tricky, we can just
 639 * return.  The trickiness comes from a bunch of variables: is the main body
 640 * empty?  How do we split the ebd?  If our alloc fails, then we can fall back
 641 * to @to's main body, but only if we haven't used it yet. */
 642static size_t copy_from_first_block(struct queue *q, struct block *to,
 643                                    size_t len)
 644{
 645        struct block *from = q->bfirst;
 646        size_t copy_amt, amt;
 647        struct extra_bdata *ebd;
 648
 649        assert(len < BLEN(from));       /* sanity */
 650        /* Try to extract from the main body */
 651        copy_amt = MIN(BHLEN(from), len);
 652        if (copy_amt) {
 653                copy_amt = block_copy_to_body(to, from->rp, copy_amt);
 654                from->rp += copy_amt;
 655                /* We only change dlen, (data len), not q->len, since the q
 656                 * still has the same block memory allocation (no kfrees
 657                 * happened) */
 658                q->dlen -= copy_amt;
 659                q->bytes_read += copy_amt;
 660        }
 661        /* Try to extract the remainder from the extra data */
 662        len -= copy_amt;
 663        for (int i = 0; (i < from->nr_extra_bufs) && len; i++) {
 664                ebd = &from->extra_data[i];
 665                if (!ebd->base || !ebd->len)
 666                        continue;
 667                if (len >= ebd->len) {
 668                        amt = move_ebd(ebd, to, from, q);
 669                        if (!amt) {
 670                                /* our internal alloc could have failed.   this
 671                                 * ebd is now the last one we'll consider.
 672                                 * let's handle it separately and put it in the
 673                                 * main body. */
 674                                if (copy_amt)
 675                                        return copy_amt;
 676                                copy_amt = block_copy_to_body(to,
 677                                                              (void*)ebd->base +
 678                                                              ebd->off,
 679                                                              ebd->len);
 680                                block_and_q_lost_extra(from, q, copy_amt);
 681                                break;
 682                        }
 683                        len -= amt;
 684                        copy_amt += amt;
 685                        continue;
 686                } else {
 687                        /* If we're here, we reached our final ebd, which we'll
 688                         * need to split to get anything from it. */
 689                        if (copy_amt)
 690                                return copy_amt;
 691                        copy_amt = block_copy_to_body(to, (void*)ebd->base +
 692                                                      ebd->off, len);
 693                        ebd->off += copy_amt;
 694                        ebd->len -= copy_amt;
 695                        block_and_q_lost_extra(from, q, copy_amt);
 696                        break;
 697                }
 698        }
 699        if (len)
 700                assert(copy_amt);       /* sanity */
 701        return copy_amt;
 702}
 703
 704/* Return codes for __qbread and __try_qbread. */
 705enum {
 706        QBR_OK,
 707        QBR_FAIL,
 708        QBR_SPARE,      /* we need a spare block */
 709        QBR_AGAIN,      /* do it again, we are coalescing blocks */
 710};
 711
 712/* Helper and back-end for __qbread: extracts and returns a list of blocks
 713 * containing up to len bytes.  It may contain less than len even if q has more
 714 * data.
 715 *
 716 * Returns a code interpreted by __qbread, and the returned blist in ret. */
 717static int __try_qbread(struct queue *q, size_t len, int qio_flags,
 718                        struct block **real_ret, struct block *spare)
 719{
 720        struct block *ret, *ret_last, *first;
 721        size_t blen;
 722        bool was_unwritable = FALSE;
 723
 724        if (qio_flags & QIO_CAN_ERR_SLEEP) {
 725                if (!qwait_and_ilock(q, qio_flags)) {
 726                        spin_unlock_irqsave(&q->lock);
 727                        return QBR_FAIL;
 728                }
 729                /* we qwaited and still hold the lock, so the q is not empty */
 730                first = q->bfirst;
 731        } else {
 732                spin_lock_irqsave(&q->lock);
 733                first = q->bfirst;
 734                if (!first) {
 735                        spin_unlock_irqsave(&q->lock);
 736                        return QBR_FAIL;
 737                }
 738        }
 739        /* We need to check before adjusting q->len.  We're checking the
 740         * writer's sleep condition / tap condition.  When set, we *might* be
 741         * making an edge transition (from unwritable to writable), which needs
 742         * to wake and fire taps.  But, our read might not drain the queue below
 743         * q->lim.  We'll check again later to see if we should really wake
 744         * them.  */
 745        was_unwritable = !qwritable(q);
 746        blen = BLEN(first);
 747        if ((q->state & Qcoalesce) && (blen == 0)) {
 748                freeb(pop_first_block(q));
 749                spin_unlock_irqsave(&q->lock);
 750                /* Need to retry to make sure we have a first block */
 751                return QBR_AGAIN;
 752        }
 753        /* Qmsg: just return the first block.  Be careful, since our caller
 754         * might not read all of the block and thus drop bytes.  Similar to
 755         * SOCK_DGRAM. */
 756        if (q->state & Qmsg) {
 757                ret = pop_first_block(q);
 758                goto out_ok;
 759        }
 760        /* Let's get at least something first - makes the code easier.  This
 761         * way, we'll only ever split the block once. */
 762        if (blen <= len) {
 763                ret = pop_first_block(q);
 764                len -= blen;
 765        } else {
 766                /* need to split the block.  we won't actually take the first
 767                 * block out of the queue - we're just extracting a little bit.
 768                 */
 769                if (!spare) {
 770                        /* We have nothing and need a spare block.  Retry! */
 771                        spin_unlock_irqsave(&q->lock);
 772                        return QBR_SPARE;
 773                }
 774                copy_from_first_block(q, spare, len);
 775                ret = spare;
 776                goto out_ok;
 777        }
 778        /* At this point, we just grabbed the first block.  We can try to grab
 779         * some more, up to len (if they want). */
 780        if (qio_flags & QIO_JUST_ONE_BLOCK)
 781                goto out_ok;
 782        ret_last = ret;
 783        while (q->bfirst && (len > 0)) {
 784                blen = BLEN(q->bfirst);
 785                if ((q->state & Qcoalesce) && (blen == 0)) {
 786                        /* remove the intermediate 0 blocks */
 787                        freeb(pop_first_block(q));
 788                        continue;
 789                }
 790                if (blen > len) {
 791                        /* We could try to split the block, but that's a huge
 792                         * pain.  For instance, we might need to move the main
 793                         * body of b into an extra_data of ret_last.  lots of
 794                         * ways for that to fail, and lots of cases to consider.
 795                         * Easier to just bail out.  This is why I did the first
 796                         * block above: we don't need to worry about this. */
 797                         break;
 798                }
 799                ret_last->next = pop_first_block(q);
 800                ret_last = ret_last->next;
 801                len -= blen;
 802        }
 803out_ok:
 804        /* Don't wake them up or fire tap if we didn't drain enough. */
 805        if (!qwritable(q))
 806                was_unwritable = FALSE;
 807        spin_unlock_irqsave(&q->lock);
 808        if (was_unwritable) {
 809                if (q->kick && !(qio_flags & QIO_DONT_KICK))
 810                        q->kick(q->arg);
 811                rendez_wakeup(&q->wr);
 812                qwake_cb(q, FDTAP_FILT_WRITABLE);
 813        }
 814        *real_ret = ret;
 815        return QBR_OK;
 816}
 817
 818/* Helper and front-end for __try_qbread: extracts and returns a list of blocks
 819 * containing up to len bytes.  It may contain less than len even if q has more
 820 * data.
 821 *
 822 * Returns 0 if the q is closed, if it would require blocking and !CAN_BLOCK, or
 823 * if it required a spare and the memory allocation failed.
 824 *
 825 * Technically, there's a weird corner case with !Qcoalesce and Qmsg where you
 826 * could get a zero length block back. */
 827static struct block *__qbread(struct queue *q, size_t len, int qio_flags,
 828                              int mem_flags)
 829{
 830        ERRSTACK(1);
 831        struct block *ret = 0;
 832        struct block *volatile spare = 0;       /* volatile for the waserror */
 833
 834        /* __try_qbread can throw, based on qio flags. */
 835        if ((qio_flags & QIO_CAN_ERR_SLEEP) && waserror()) {
 836                if (spare)
 837                        freeb(spare);
 838                nexterror();
 839        }
 840        while (1) {
 841                switch (__try_qbread(q, len, qio_flags, &ret, spare)) {
 842                case QBR_OK:
 843                case QBR_FAIL:
 844                        if (spare && (ret != spare))
 845                                freeb(spare);
 846                        goto out_ret;
 847                case QBR_SPARE:
 848                        assert(!spare);
 849                        /* Due to some nastiness, we need a fresh block so we
 850                         * can read out anything from the queue.  'len' seems
 851                         * like a reasonable amount.  Maybe we can get away with
 852                         * less. */
 853                        spare = block_alloc(len, mem_flags);
 854                        if (!spare) {
 855                                /* Careful here: a memory failure (possible with
 856                                 * MEM_ATOMIC) could look like 'no data in the
 857                                 * queue' (QBR_FAIL).  The only one who does is
 858                                 * this qget(), who happens to know that we
 859                                 * won't need a spare, due to the len argument.
 860                                 * Spares are only needed when we need to split
 861                                 * a block. */
 862                                ret = 0;
 863                                goto out_ret;
 864                        }
 865                        break;
 866                case QBR_AGAIN:
 867                        /* if the first block is 0 and we are Qcoalesce, then
 868                         * we'll need to try again.  We bounce out of __try so
 869                         * we can perform the "is there a block" logic again
 870                         * from the top. */
 871                        break;
 872                }
 873        }
 874        assert(0);
 875out_ret:
 876        if (qio_flags & QIO_CAN_ERR_SLEEP)
 877                poperror();
 878        return ret;
 879}
 880
 881/*
 882 *  get next block from a queue, return null if nothing there
 883 */
 884struct block *qget(struct queue *q)
 885{
 886        /* since len == SIZE_MAX, we should never need to do a mem alloc */
 887        return __qbread(q, SIZE_MAX, QIO_JUST_ONE_BLOCK, MEM_ATOMIC);
 888}
 889
 890/* Throw away the next 'len' bytes in the queue returning the number actually
 891 * discarded.
 892 *
 893 * If the bytes are in the queue, then they must be discarded.  The only time to
 894 * return less than len is if the q itself has less than len bytes.
 895 *
 896 * This won't trigger a kick when waking up any sleepers.  This seems to be Plan
 897 * 9's intent, since the TCP stack will deadlock if qdiscard kicks. */
 898size_t qdiscard(struct queue *q, size_t len)
 899{
 900        struct block *blist;
 901        size_t removed_amt;
 902        size_t sofar = 0;
 903
 904        /* This is racy.  There could be multiple qdiscarders or other
 905         * consumers, where the consumption could be interleaved. */
 906        while (qlen(q) && len) {
 907                blist = __qbread(q, len, QIO_DONT_KICK, MEM_WAIT);
 908                removed_amt = freeblist(blist);
 909                sofar += removed_amt;
 910                len -= removed_amt;
 911        }
 912        return sofar;
 913}
 914
 915ssize_t qpass(struct queue *q, struct block *b)
 916{
 917        return __qbwrite(q, b, QIO_LIMIT | QIO_DROP_OVERFLOW);
 918}
 919
 920ssize_t qpassnolim(struct queue *q, struct block *b)
 921{
 922        return __qbwrite(q, b, 0);
 923}
 924
 925/*
 926 *  if the allocated space is way out of line with the used
 927 *  space, reallocate to a smaller block
 928 */
 929struct block *packblock(struct block *bp)
 930{
 931        struct block **l, *nbp;
 932        int n;
 933
 934        if (bp->extra_len)
 935                return bp;
 936        for (l = &bp; *l; l = &(*l)->next) {
 937                nbp = *l;
 938                n = BLEN(nbp);
 939                if ((n << 2) < BALLOC(nbp)) {
 940                        *l = block_alloc(n, MEM_WAIT);
 941                        memmove((*l)->wp, nbp->rp, n);
 942                        (*l)->wp += n;
 943                        (*l)->next = nbp->next;
 944                        nbp->next = NULL;
 945                        freeb(nbp);
 946                }
 947        }
 948
 949        return bp;
 950}
 951
 952/* Add an extra_data entry to newb at newb_idx pointing to b's body, starting at
 953 * body_rp, for up to len.  Returns the len consumed.
 954 *
 955 * The base is 'b', so that we can kfree it later.  This currently ties us to
 956 * using kfree for the release method for all extra_data.
 957 *
 958 * It is possible to have a body size that is 0, if there is no offset, and
 959 * b->wp == b->rp.  This will have an extra data entry of 0 length. */
 960static size_t point_to_body(struct block *b, uint8_t *body_rp,
 961                            struct block *newb, unsigned int newb_idx,
 962                            size_t len)
 963{
 964        struct extra_bdata *ebd = &newb->extra_data[newb_idx];
 965
 966        assert(newb_idx < newb->nr_extra_bufs);
 967
 968        kmalloc_incref(b);
 969        ebd->base = (uintptr_t)b;
 970        ebd->off = (uint32_t)(body_rp - (uint8_t*)b);
 971        ebd->len = MIN(b->wp - body_rp, len);   /* think of body_rp as b->rp */
 972        assert((int)ebd->len >= 0);
 973        newb->extra_len += ebd->len;
 974        return ebd->len;
 975}
 976
 977/* Add an extra_data entry to newb at newb_idx pointing to b's b_idx'th
 978 * extra_data buf, at b_off within that buffer, for up to len.  Returns the len
 979 * consumed.
 980 *
 981 * We can have blocks with 0 length, but they are still refcnt'd.  See above. */
 982static size_t point_to_buf(struct block *b, unsigned int b_idx, uint32_t b_off,
 983                           struct block *newb, unsigned int newb_idx,
 984                           size_t len)
 985{
 986        struct extra_bdata *n_ebd = &newb->extra_data[newb_idx];
 987        struct extra_bdata *b_ebd = &b->extra_data[b_idx];
 988
 989        assert(b_idx < b->nr_extra_bufs);
 990        assert(newb_idx < newb->nr_extra_bufs);
 991
 992        kmalloc_incref((void*)b_ebd->base);
 993        n_ebd->base = b_ebd->base;
 994        n_ebd->off = b_ebd->off + b_off;
 995        n_ebd->len = MIN(b_ebd->len - b_off, len);
 996        newb->extra_len += n_ebd->len;
 997        return n_ebd->len;
 998}
 999
1000/* given a string of blocks, sets up the new block's extra_data such that it
1001 * *points* to the contents of the blist [offset, len + offset).  This does not
1002 * make a separate copy of the contents of the blist.
1003 *
1004 * returns 0 on success.  the only failure is if the extra_data array was too
1005 * small, so this returns a positive integer saying how big the extra_data needs
1006 * to be.
1007 *
1008 * callers are responsible for protecting the list structure. */
1009static int __blist_clone_to(struct block *blist, struct block *newb, int len,
1010                            uint32_t offset)
1011{
1012        struct block *b, *first;
1013        unsigned int nr_bufs = 0;
1014        unsigned int b_idx, newb_idx = 0;
1015        uint8_t *first_main_body = 0;
1016        ssize_t sofar = 0;
1017
1018        /* find the first block; keep offset relative to the latest b in the
1019         * list */
1020        for (b = blist; b; b = b->next) {
1021                if (BLEN(b) > offset)
1022                        break;
1023                offset -= BLEN(b);
1024        }
1025        /* qcopy semantics: if you asked for an offset outside the block list,
1026         * you get an empty block back */
1027        if (!b)
1028                return 0;
1029        first = b;
1030        sofar -= offset; /* don't count the remaining offset in the first b */
1031        /* upper bound for how many buffers we'll need in newb */
1032        for (/* b is set*/; b; b = b->next) {
1033                nr_bufs += BHLEN(b) ? 1 : 0;
1034                /* still assuming nr_extra == nr_valid */
1035                nr_bufs += b->nr_extra_bufs;
1036                sofar += BLEN(b);
1037                if (sofar > len)
1038                        break;
1039        }
1040        /* we might be holding a spinlock here, so we won't wait for kmalloc */
1041        if (block_add_extd(newb, nr_bufs, 0) != 0) {
1042                /* caller will need to alloc these, then re-call us */
1043                return nr_bufs;
1044        }
1045        for (b = first; b && len; b = b->next) {
1046                b_idx = 0;
1047                if (offset) {
1048                        if (offset < BHLEN(b)) {
1049                                /* off is in the main body */
1050                                len -= point_to_body(b, b->rp + offset, newb,
1051                                                     newb_idx, len);
1052                                newb_idx++;
1053                        } else {
1054                                /* off is in one of the buffers (or just past
1055                                 * the last one).  we're not going to point to
1056                                 * b's main body at all. */
1057                                offset -= BHLEN(b);
1058                                assert(b->extra_data);
1059                                /* assuming these extrabufs are packed, or at
1060                                 * least that len isn't gibberish */
1061                                while (b->extra_data[b_idx].len <= offset) {
1062                                        offset -= b->extra_data[b_idx].len;
1063                                        b_idx++;
1064                                }
1065                                /* now offset is set to our offset in the
1066                                 * b_idx'th buf */
1067                                len -= point_to_buf(b, b_idx, offset, newb,
1068                                                    newb_idx, len);
1069                                newb_idx++;
1070                                b_idx++;
1071                        }
1072                        offset = 0;
1073                } else {
1074                        if (BHLEN(b)) {
1075                                len -= point_to_body(b, b->rp, newb, newb_idx,
1076                                                     len);
1077                                newb_idx++;
1078                        }
1079                }
1080                /* knock out all remaining bufs.  we only did one point_to_ op
1081                 * by now, and any point_to_ could be our last if it consumed
1082                 * all of len. */
1083                for (int i = b_idx; (i < b->nr_extra_bufs) && len; i++) {
1084                        len -= point_to_buf(b, i, 0, newb, newb_idx, len);
1085                        newb_idx++;
1086                }
1087        }
1088        return 0;
1089}
1090
1091struct block *blist_clone(struct block *blist, int header_len, int len,
1092                          uint32_t offset)
1093{
1094        int ret;
1095        struct block *newb = block_alloc(header_len, MEM_WAIT);
1096
1097        do {
1098                ret = __blist_clone_to(blist, newb, len, offset);
1099                if (ret)
1100                        block_add_extd(newb, ret, MEM_WAIT);
1101        } while (ret);
1102        return newb;
1103}
1104
1105/* given a queue, makes a single block with header_len reserved space in the
1106 * block main body, and the contents of [offset, len + offset) pointed to in the
1107 * new blocks ext_data.  This does not make a copy of the q's contents, though
1108 * you do have a ref count on the memory. */
1109struct block *qclone(struct queue *q, int header_len, int len, uint32_t offset)
1110{
1111        int ret;
1112        struct block *newb = block_alloc(header_len, MEM_WAIT);
1113        /* the while loop should rarely be used: it would require someone
1114         * concurrently adding to the queue. */
1115        do {
1116                /* TODO: RCU protect the q list (b->next) (need read lock) */
1117                spin_lock_irqsave(&q->lock);
1118                ret = __blist_clone_to(q->bfirst, newb, len, offset);
1119                spin_unlock_irqsave(&q->lock);
1120                if (ret)
1121                        block_add_extd(newb, ret, MEM_WAIT);
1122        } while (ret);
1123        return newb;
1124}
1125
1126struct block *qcopy(struct queue *q, int len, uint32_t offset)
1127{
1128        return qclone(q, 0, len, offset);
1129}
1130
1131static void qinit_common(struct queue *q)
1132{
1133        spinlock_init_irqsave(&q->lock);
1134        rendez_init(&q->rr);
1135        rendez_init(&q->wr);
1136}
1137
1138/*
1139 *  called by non-interrupt code
1140 */
1141struct queue *qopen(int limit, int msg, void (*kick) (void *), void *arg)
1142{
1143        struct queue *q;
1144
1145        q = kzmalloc(sizeof(struct queue), 0);
1146        if (q == 0)
1147                return 0;
1148        qinit_common(q);
1149
1150        q->limit = q->inilim = limit;
1151        q->kick = kick;
1152        q->arg = arg;
1153        q->state = msg;
1154        q->eof = 0;
1155
1156        return q;
1157}
1158
1159/* open a queue to be bypassed */
1160struct queue *qbypass(void (*bypass) (void *, struct block *), void *arg)
1161{
1162        struct queue *q;
1163
1164        q = kzmalloc(sizeof(struct queue), 0);
1165        if (q == 0)
1166                return 0;
1167        qinit_common(q);
1168
1169        q->limit = 0;
1170        q->arg = arg;
1171        q->bypass = bypass;
1172        q->state = 0;
1173
1174        return q;
1175}
1176
1177static int notempty(void *a)
1178{
1179        struct queue *q = a;
1180
1181        return (q->state & Qclosed) || q->bfirst != 0;
1182}
1183
1184/* Block, waiting for the queue to be non-empty or closed.  Returns with
1185 * the spinlock held.  Returns TRUE when there queue is not empty, FALSE if it
1186 * was naturally closed.  Throws an error o/w. */
1187static bool qwait_and_ilock(struct queue *q, int qio_flags)
1188{
1189        while (1) {
1190                spin_lock_irqsave(&q->lock);
1191                if (q->bfirst != NULL)
1192                        return TRUE;
1193                if (q->state & Qclosed) {
1194                        if (++q->eof > 3) {
1195                                spin_unlock_irqsave(&q->lock);
1196                                error(EPIPE,
1197                                      "multiple reads on a closed queue");
1198                        }
1199                        if (q->err[0]) {
1200                                spin_unlock_irqsave(&q->lock);
1201                                error(EPIPE, q->err);
1202                        }
1203                        return FALSE;
1204                }
1205                if (qio_flags & QIO_NON_BLOCK) {
1206                        spin_unlock_irqsave(&q->lock);
1207                        error(EAGAIN, "queue empty");
1208                }
1209                spin_unlock_irqsave(&q->lock);
1210                /* As with the producer side, we check for a condition while
1211                 * holding the q->lock, decide to sleep, then unlock.  It's like
1212                 * the "check, signal, check again" pattern, but we do it
1213                 * conditionally.  Both sides agree synchronously to do it, and
1214                 * those decisions are made while holding q->lock.  I think this
1215                 * is OK.
1216                 *
1217                 * The invariant is that no reader sleeps when the queue has
1218                 * data.  While holding the rendez lock, if we see there's no
1219                 * data, we'll sleep.  Since we saw there was no data, the next
1220                 * writer will see (or already saw) no data, and then the writer
1221                 * decides to rendez_wake, which will grab the rendez lock.  If
1222                 * the writer already did that, then we'll see notempty when we
1223                 * do our check-again. */
1224                rendez_sleep(&q->rr, notempty, q);
1225        }
1226}
1227
1228/*
1229 * add a block list to a queue
1230 * XXX basically the same as enqueue blist, and has no locking!
1231 */
1232void qaddlist(struct queue *q, struct block *b)
1233{
1234        /* TODO: q lock? */
1235        /* queue the block */
1236        if (q->bfirst)
1237                q->blast->next = b;
1238        else
1239                q->bfirst = b;
1240        q->dlen += blocklen(b);
1241        while (b->next)
1242                b = b->next;
1243        q->blast = b;
1244}
1245
1246static size_t read_from_block(struct block *b, uint8_t *to, size_t amt)
1247{
1248        size_t copy_amt, retval = 0;
1249        struct extra_bdata *ebd;
1250
1251        copy_amt = MIN(BHLEN(b), amt);
1252        memcpy(to, b->rp, copy_amt);
1253        /* advance the rp, since this block might not be completely consumed and
1254         * future reads need to know where to pick up from */
1255        b->rp += copy_amt;
1256        to += copy_amt;
1257        amt -= copy_amt;
1258        retval += copy_amt;
1259        for (int i = 0; (i < b->nr_extra_bufs) && amt; i++) {
1260                ebd = &b->extra_data[i];
1261                /* skip empty entires.  if we track this in the struct block, we
1262                 * can just start the for loop early */
1263                if (!ebd->base || !ebd->len)
1264                        continue;
1265                copy_amt = MIN(ebd->len, amt);
1266                memcpy(to, (void*)(ebd->base + ebd->off), copy_amt);
1267                /* we're actually consuming the entries, just like how we
1268                 * advance rp up above, and might only consume part of one. */
1269                block_consume_ebd_bytes(b, ebd, copy_amt);
1270                to += copy_amt;
1271                amt -= copy_amt;
1272                retval += copy_amt;
1273        }
1274        return retval;
1275}
1276
1277/*
1278 *  copy the contents of a string of blocks into
1279 *  memory.  emptied blocks are freed.  return
1280 *  pointer to first unconsumed block.
1281 */
1282struct block *bl2mem(uint8_t * p, struct block *b, int n)
1283{
1284        int i;
1285        struct block *next;
1286
1287        /* could be slicker here, since read_from_block is smart */
1288        for (; b != NULL; b = next) {
1289                i = BLEN(b);
1290                if (i > n) {
1291                        /* partial block, consume some */
1292                        read_from_block(b, p, n);
1293                        return b;
1294                }
1295                /* full block, consume all and move on */
1296                i = read_from_block(b, p, i);
1297                n -= i;
1298                p += i;
1299                next = b->next;
1300                b->next = NULL;
1301                freeb(b);
1302        }
1303        return NULL;
1304}
1305
1306/* Extract the contents of all blocks and copy to va, up to len.  Returns the
1307 * actual amount copied. */
1308static size_t read_all_blocks(struct block *b, void *va, size_t len)
1309{
1310        size_t sofar = 0;
1311        struct block *next;
1312
1313        do {
1314                assert(va);
1315                assert(b->rp);
1316                sofar += read_from_block(b, va + sofar, len - sofar);
1317                if (BLEN(b) && b->next)
1318                        panic("Failed to drain entire block (Qmsg?) but had a next!");
1319                next = b->next;
1320                b->next = NULL;
1321                freeb(b);
1322                b = next;
1323        } while (b);
1324        return sofar;
1325}
1326
1327/*
1328 *  copy the contents of memory into a string of blocks.
1329 *  return NULL on error.
1330 */
1331struct block *mem2bl(uint8_t * p, int len)
1332{
1333        ERRSTACK(1);
1334        int n;
1335        struct block *b, *first, **l;
1336
1337        first = NULL;
1338        l = &first;
1339        if (waserror()) {
1340                freeblist(first);
1341                nexterror();
1342        }
1343        do {
1344                n = len;
1345                if (n > Maxatomic)
1346                        n = Maxatomic;
1347
1348                *l = b = block_alloc(n, MEM_WAIT);
1349                /* TODO consider extra_data */
1350                memmove(b->wp, p, n);
1351                b->wp += n;
1352                p += n;
1353                len -= n;
1354                l = &b->next;
1355        } while (len > 0);
1356        poperror();
1357
1358        return first;
1359}
1360
1361/*
1362 *  put a block back to the front of the queue
1363 *  called with q ilocked
1364 */
1365void qputback(struct queue *q, struct block *b)
1366{
1367        b->next = q->bfirst;
1368        if (q->bfirst == NULL)
1369                q->blast = b;
1370        q->bfirst = b;
1371        q->dlen += BLEN(b);
1372        /* qputback seems to undo a read, so we can undo the accounting too. */
1373        q->bytes_read -= BLEN(b);
1374}
1375
1376/*
1377 *  get next block from a queue (up to a limit)
1378 *
1379 */
1380struct block *qbread(struct queue *q, size_t len)
1381{
1382        return __qbread(q, len, QIO_JUST_ONE_BLOCK | QIO_CAN_ERR_SLEEP,
1383                        MEM_WAIT);
1384}
1385
1386struct block *qbread_nonblock(struct queue *q, size_t len)
1387{
1388        return __qbread(q, len, QIO_JUST_ONE_BLOCK | QIO_CAN_ERR_SLEEP |
1389                        QIO_NON_BLOCK, MEM_WAIT);
1390}
1391
1392/* read up to len from a queue into vp. */
1393size_t qread(struct queue *q, void *va, size_t len)
1394{
1395        struct block *blist = __qbread(q, len, QIO_CAN_ERR_SLEEP, MEM_WAIT);
1396
1397        if (!blist)
1398                return 0;
1399        return read_all_blocks(blist, va, len);
1400}
1401
1402size_t qread_nonblock(struct queue *q, void *va, size_t len)
1403{
1404        struct block *blist = __qbread(q, len, QIO_CAN_ERR_SLEEP |
1405                                       QIO_NON_BLOCK, MEM_WAIT);
1406
1407        if (!blist)
1408                return 0;
1409        return read_all_blocks(blist, va, len);
1410}
1411
1412/* This is the rendez wake condition for writers. */
1413static int qwriter_should_wake(void *a)
1414{
1415        struct queue *q = a;
1416
1417        return qwritable(q) || (q->state & Qclosed);
1418}
1419
1420/* Helper: enqueues a list of blocks to a queue.  Returns the total length. */
1421static size_t enqueue_blist(struct queue *q, struct block *b)
1422{
1423        size_t dlen;
1424
1425        if (q->bfirst)
1426                q->blast->next = b;
1427        else
1428                q->bfirst = b;
1429        dlen = BLEN(b);
1430        while (b->next) {
1431                b = b->next;
1432                dlen += BLEN(b);
1433        }
1434        q->blast = b;
1435        q->dlen += dlen;
1436        return dlen;
1437}
1438
1439/* Adds block (which can be a list of blocks) to the queue, subject to
1440 * qio_flags.  Returns the length written on success or -1 on non-throwable
1441 * error.  Adjust qio_flags to control the value-added features!. */
1442static ssize_t __qbwrite(struct queue *q, struct block *b, int qio_flags)
1443{
1444        ssize_t ret;
1445        bool was_unreadable;
1446
1447        if (q->bypass) {
1448                ret = blocklen(b);
1449                (*q->bypass) (q->arg, b);
1450                return ret;
1451        }
1452        spin_lock_irqsave(&q->lock);
1453        was_unreadable = q->dlen == 0;
1454        if (q->state & Qclosed) {
1455                spin_unlock_irqsave(&q->lock);
1456                freeblist(b);
1457                if (!(qio_flags & QIO_CAN_ERR_SLEEP))
1458                        return -1;
1459                if (q->err[0])
1460                        error(EPIPE, q->err);
1461                else
1462                        error(EPIPE, "connection closed");
1463        }
1464        if ((qio_flags & QIO_LIMIT) && (q->dlen >= q->limit)) {
1465                /* drop overflow takes priority over regular non-blocking */
1466                if ((qio_flags & QIO_DROP_OVERFLOW)
1467                    || (q->state & Qdropoverflow)) {
1468                        spin_unlock_irqsave(&q->lock);
1469                        freeb(b);
1470                        return -1;
1471                }
1472                /* People shouldn't set NON_BLOCK without CAN_ERR, but we can be
1473                 * nice and catch it. */
1474                if ((qio_flags & QIO_CAN_ERR_SLEEP)
1475                    && (qio_flags & QIO_NON_BLOCK)) {
1476                        spin_unlock_irqsave(&q->lock);
1477                        freeb(b);
1478                        error(EAGAIN, "queue full");
1479                }
1480        }
1481        ret = enqueue_blist(q, b);
1482        QDEBUG checkb(b, "__qbwrite");
1483        spin_unlock_irqsave(&q->lock);
1484        /* TODO: not sure if the usage of a kick is mutually exclusive with a
1485         * wakeup, meaning that actual users either want a kick or have
1486         * qreaders. */
1487        if (q->kick && (was_unreadable || (q->state & Qkick)))
1488                q->kick(q->arg);
1489        if (was_unreadable) {
1490                /* Unlike the read side, there's no double-check to make sure
1491                 * the queue transitioned across an edge.  We know we added
1492                 * something, so that's enough.  We wake if the queue was empty.
1493                 * Both sides are the same, in that the condition for which we
1494                 * do the rendez_wakeup() is the same as the condition done for
1495                 * the rendez_sleep(). */
1496                rendez_wakeup(&q->rr);
1497                qwake_cb(q, FDTAP_FILT_READABLE);
1498        }
1499        /*
1500         *  flow control, wait for queue to get below the limit
1501         *  before allowing the process to continue and queue
1502         *  more.  We do this here so that postnote can only
1503         *  interrupt us after the data has been queued.  This
1504         *  means that things like 9p flushes and ssl messages
1505         *  will not be disrupted by software interrupts.
1506         *
1507         *  Note - this is moderately dangerous since a process
1508         *  that keeps getting interrupted and rewriting will
1509         *  queue infinite crud.
1510         */
1511        if ((qio_flags & QIO_CAN_ERR_SLEEP) &&
1512            !(q->state & Qdropoverflow) && !(qio_flags & QIO_NON_BLOCK)) {
1513                /* This is a racy peek at the q status.  If we accidentally
1514                 * block, our rendez will return.  The rendez's peak
1515                 * (qwriter_should_wake) is also racy w.r.t.  the q's spinlock
1516                 * (that lock protects writes, but not reads).
1517                 *
1518                 * Here's the deal: when holding the rendez lock, if we see the
1519                 * sleep condition, the consumer will wake us.  The condition
1520                 * will only ever be changed by the next qbread() (consumer,
1521                 * changes q->dlen).  That code will do a rendez wake, which
1522                 * will spin on the rendez lock, meaning it won't procede until
1523                 * we either see the new state (and return) or put ourselves on
1524                 * the rendez, and wake up.
1525                 *
1526                 * The pattern is one side writes mem, then signals.  Our side
1527                 * checks the signal, then reads the mem.  The goal is to not
1528                 * miss seeing the signal AND missing the memory write.  In this
1529                 * specific case, the signal is actually synchronous (the rendez
1530                 * lock) and not basic shared memory.
1531                 *
1532                 * Oh, and we spin in case we woke early and someone else filled
1533                 * the queue, mesa-style. */
1534                while (!qwriter_should_wake(q))
1535                        rendez_sleep(&q->wr, qwriter_should_wake, q);
1536        }
1537        return ret;
1538}
1539
1540/*
1541 *  add a block to a queue obeying flow control
1542 */
1543ssize_t qbwrite(struct queue *q, struct block *b)
1544{
1545        return __qbwrite(q, b, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
1546}
1547
1548ssize_t qbwrite_nonblock(struct queue *q, struct block *b)
1549{
1550        return __qbwrite(q, b, QIO_CAN_ERR_SLEEP | QIO_LIMIT | QIO_NON_BLOCK);
1551}
1552
1553ssize_t qibwrite(struct queue *q, struct block *b)
1554{
1555        return __qbwrite(q, b, 0);
1556}
1557
1558/* Helper, allocs a block and copies [from, from + len) into it.  Returns the
1559 * block on success, 0 on failure. */
1560static struct block *build_block(void *from, size_t len, int mem_flags)
1561{
1562        struct block *b;
1563        void *ext_buf;
1564
1565        /* If len is small, we don't need to bother with the extra_data.  But
1566         * until the whole stack can handle extd blocks, we'll use them
1567         * unconditionally.  */
1568
1569        /* allocb builds in 128 bytes of header space to all blocks, but this is
1570         * only available via padblock (to the left).  we also need some space
1571         * for pullupblock for some basic headers (like icmp) that get written
1572         * in directly */
1573        b = block_alloc(64, mem_flags);
1574        if (!b)
1575                return 0;
1576        ext_buf = kmalloc(len, mem_flags);
1577        if (!ext_buf) {
1578                kfree(b);
1579                return 0;
1580        }
1581        memcpy(ext_buf, from, len);
1582        if (block_add_extd(b, 1, mem_flags)) {
1583                kfree(ext_buf);
1584                kfree(b);
1585                return 0;
1586        }
1587        b->extra_data[0].base = (uintptr_t)ext_buf;
1588        b->extra_data[0].off = 0;
1589        b->extra_data[0].len = len;
1590        b->extra_len += len;
1591        return b;
1592}
1593
1594static ssize_t __qwrite(struct queue *q, void *vp, size_t len, int mem_flags,
1595                        int qio_flags)
1596{
1597        ERRSTACK(1);
1598        size_t n;
1599        volatile size_t sofar = 0;      /* volatile for the waserror */
1600        struct block *b;
1601        uint8_t *p = vp;
1602        void *ext_buf;
1603
1604        /* Only some callers can throw.  Others might be in a context where
1605         * waserror isn't safe. */
1606        if ((qio_flags & QIO_CAN_ERR_SLEEP) && waserror()) {
1607                /* Any error (EAGAIN for nonblock, syscall aborted, even EPIPE)
1608                 * after some data has been sent should be treated as a partial
1609                 * write. */
1610                if (sofar)
1611                        goto out_ok;
1612                nexterror();
1613        }
1614        do {
1615                n = len - sofar;
1616                /* This is 64K, the max amount per single block.  Still a good
1617                 * value? */
1618                if (n > Maxatomic)
1619                        n = Maxatomic;
1620                b = build_block(p + sofar, n, mem_flags);
1621                if (!b)
1622                        break;
1623                if (__qbwrite(q, b, qio_flags) < 0)
1624                        break;
1625                sofar += n;
1626        } while ((sofar < len) && (q->state & Qmsg) == 0);
1627out_ok:
1628        if (qio_flags & QIO_CAN_ERR_SLEEP)
1629                poperror();
1630        return sofar;
1631}
1632
1633ssize_t qwrite(struct queue *q, void *vp, int len)
1634{
1635        return __qwrite(q, vp, len, MEM_WAIT, QIO_CAN_ERR_SLEEP | QIO_LIMIT);
1636}
1637
1638ssize_t qwrite_nonblock(struct queue *q, void *vp, int len)
1639{
1640        return __qwrite(q, vp, len, MEM_WAIT, QIO_CAN_ERR_SLEEP | QIO_LIMIT |
1641                                              QIO_NON_BLOCK);
1642}
1643
1644ssize_t qiwrite(struct queue *q, void *vp, int len)
1645{
1646        return __qwrite(q, vp, len, MEM_ATOMIC, 0);
1647}
1648
1649/*
1650 *  be extremely careful when calling this,
1651 *  as there is no reference accounting
1652 */
1653void qfree(struct queue *q)
1654{
1655        qclose(q);
1656        kfree(q);
1657}
1658
1659/*
1660 *  Mark a queue as closed.  No further IO is permitted.
1661 *  All blocks are released.
1662 */
1663void qclose(struct queue *q)
1664{
1665        struct block *bfirst;
1666
1667        if (q == NULL)
1668                return;
1669
1670        /* mark it */
1671        spin_lock_irqsave(&q->lock);
1672        q->state |= Qclosed;
1673        q->state &= ~Qdropoverflow;
1674        q->err[0] = 0;
1675        bfirst = q->bfirst;
1676        q->bfirst = 0;
1677        q->dlen = 0;
1678        spin_unlock_irqsave(&q->lock);
1679
1680        /* free queued blocks */
1681        freeblist(bfirst);
1682
1683        /* wake up readers/writers */
1684        rendez_wakeup(&q->rr);
1685        rendez_wakeup(&q->wr);
1686        qwake_cb(q, FDTAP_FILT_HANGUP);
1687}
1688
1689/* Mark a queue as closed.  Wakeup any readers.  Don't remove queued blocks.
1690 *
1691 * msg will be the errstr received by any waiters (qread, qbread, etc).  If
1692 * there is no message, which is what also happens during a natural qclose(),
1693 * those waiters will simply return 0.  qwriters will always error() on a
1694 * closed/hungup queue. */
1695void qhangup(struct queue *q, char *msg)
1696{
1697        /* mark it */
1698        spin_lock_irqsave(&q->lock);
1699        q->state |= Qclosed;
1700        if (msg == 0 || *msg == 0)
1701                q->err[0] = 0;
1702        else
1703                strlcpy(q->err, msg, ERRMAX);
1704        spin_unlock_irqsave(&q->lock);
1705
1706        /* wake up readers/writers */
1707        rendez_wakeup(&q->rr);
1708        rendez_wakeup(&q->wr);
1709        qwake_cb(q, FDTAP_FILT_HANGUP);
1710}
1711
1712/*
1713 *  return non-zero if the q is hungup
1714 */
1715int qisclosed(struct queue *q)
1716{
1717        return q->state & Qclosed;
1718}
1719
1720/*
1721 *  mark a queue as no longer hung up.  resets the wake_cb.
1722 */
1723void qreopen(struct queue *q)
1724{
1725        spin_lock_irqsave(&q->lock);
1726        q->state &= ~Qclosed;
1727        q->eof = 0;
1728        q->limit = q->inilim;
1729        q->wake_cb = 0;
1730        q->wake_data = 0;
1731        spin_unlock_irqsave(&q->lock);
1732}
1733
1734/*
1735 *  return bytes queued
1736 */
1737int qlen(struct queue *q)
1738{
1739        return q->dlen;
1740}
1741
1742size_t q_bytes_read(struct queue *q)
1743{
1744        return q->bytes_read;
1745}
1746
1747/*
1748 * return space remaining before flow control
1749 *
1750 *  This used to be
1751 *  q->len < q->limit/2
1752 *  but it slows down tcp too much for certain write sizes.
1753 *  I really don't understand it completely.  It may be
1754 *  due to the queue draining so fast that the transmission
1755 *  stalls waiting for the app to produce more data.  - presotto
1756 *
1757 *  q->len was the amount of bytes, which is no longer used.  we now use
1758 *  q->dlen, the amount of usable data.  a.k.a. qlen()...  - brho
1759 */
1760int qwindow(struct queue *q)
1761{
1762        int l;
1763
1764        l = q->limit - q->dlen;
1765        if (l < 0)
1766                l = 0;
1767        return l;
1768}
1769
1770/*
1771 *  return true if we can read without blocking
1772 */
1773int qcanread(struct queue *q)
1774{
1775        return q->bfirst != 0;
1776}
1777
1778/*
1779 *  change queue limit
1780 */
1781void qsetlimit(struct queue *q, size_t limit)
1782{
1783        bool was_writable = qwritable(q);
1784
1785        q->limit = limit;
1786        if (!was_writable && qwritable(q)) {
1787                rendez_wakeup(&q->wr);
1788                qwake_cb(q, FDTAP_FILT_WRITABLE);
1789        }
1790}
1791
1792size_t qgetlimit(struct queue *q)
1793{
1794        return q->limit;
1795}
1796
1797/*
1798 *  set whether writes drop overflowing blocks, or if we sleep
1799 */
1800void qdropoverflow(struct queue *q, bool onoff)
1801{
1802        spin_lock_irqsave(&q->lock);
1803        if (onoff)
1804                q->state |= Qdropoverflow;
1805        else
1806                q->state &= ~Qdropoverflow;
1807        spin_unlock_irqsave(&q->lock);
1808}
1809
1810/* Be careful: this can affect concurrent reads/writes and code that might have
1811 * built-in expectations of the q's type. */
1812void q_toggle_qmsg(struct queue *q, bool onoff)
1813{
1814        spin_lock_irqsave(&q->lock);
1815        if (onoff)
1816                q->state |= Qmsg;
1817        else
1818                q->state &= ~Qmsg;
1819        spin_unlock_irqsave(&q->lock);
1820}
1821
1822/* Be careful: this can affect concurrent reads/writes and code that might have
1823 * built-in expectations of the q's type. */
1824void q_toggle_qcoalesce(struct queue *q, bool onoff)
1825{
1826        spin_lock_irqsave(&q->lock);
1827        if (onoff)
1828                q->state |= Qcoalesce;
1829        else
1830                q->state &= ~Qcoalesce;
1831        spin_unlock_irqsave(&q->lock);
1832}
1833
1834/*
1835 *  flush the output queue
1836 */
1837void qflush(struct queue *q)
1838{
1839        struct block *bfirst;
1840
1841        /* mark it */
1842        spin_lock_irqsave(&q->lock);
1843        bfirst = q->bfirst;
1844        q->bfirst = 0;
1845        q->dlen = 0;
1846        spin_unlock_irqsave(&q->lock);
1847
1848        /* free queued blocks */
1849        freeblist(bfirst);
1850
1851        /* wake up writers */
1852        rendez_wakeup(&q->wr);
1853        qwake_cb(q, FDTAP_FILT_WRITABLE);
1854}
1855
1856int qfull(struct queue *q)
1857{
1858        return !qwritable(q);
1859}
1860
1861int qstate(struct queue *q)
1862{
1863        return q->state;
1864}
1865
1866void qdump(struct queue *q)
1867{
1868        if (q)
1869                printk("q=%p bfirst=%p blast=%p dlen=%d limit=%d state=#%x\n",
1870                           q, q->bfirst, q->blast, q->dlen, q->limit, q->state);
1871}
1872
1873/* On certain wakeup events, qio will call func(q, data, filter), where filter
1874 * marks the type of wakeup event (flags from FDTAP).
1875 *
1876 * There's no sync protection.  If you change the CB while the qio is running,
1877 * you might get a CB with the data or func from a previous set_wake_cb.  You
1878 * should set this once per queue and forget it.
1879 *
1880 * You can remove the CB by passing in 0 for the func.  Alternatively, you can
1881 * just make sure that the func(data) pair are valid until the queue is freed or
1882 * reopened. */
1883void qio_set_wake_cb(struct queue *q, qio_wake_cb_t func, void *data)
1884{
1885        q->wake_data = data;
1886        wmb();  /* if we see func, we'll also see the data for it */
1887        q->wake_cb = func;
1888}
1889
1890/* Helper for detecting whether we'll block on a read at this instant. */
1891bool qreadable(struct queue *q)
1892{
1893        return qlen(q) > 0;
1894}
1895
1896/* Helper for detecting whether we'll block on a write at this instant. */
1897bool qwritable(struct queue *q)
1898{
1899        return !q->limit || qwindow(q) > 0;
1900}
1901