akaros/user/parlib/ucq.c
<<
>>
Prefs
   1/* Copyright (c) 2011 The Regents of the University of California
   2 * Barret Rhoden <brho@cs.berkeley.edu>
   3 * See LICENSE for details.
   4 *
   5 * Unbounded concurrent queues, user side.  Check k/i/r/ucq.h or the
   6 * Documentation for more info. */
   7
   8#include <ros/arch/membar.h>
   9#include <parlib/arch/atomic.h>
  10#include <parlib/arch/arch.h>
  11#include <parlib/ucq.h>
  12#include <parlib/spinlock.h>
  13#include <sys/mman.h>
  14#include <parlib/assert.h>
  15#include <parlib/stdio.h>
  16#include <stdlib.h>
  17#include <parlib/vcore.h>
  18#include <parlib/ros_debug.h> /* for printd() */
  19
  20/* Initializes a ucq.  You pass in addresses of mmaped pages for the main page
  21 * (prod_idx) and the spare page.  I recommend mmaping a big chunk and breaking
  22 * it up over a bunch of ucqs, instead of doing a lot of little mmap() calls. */
  23void ucq_init_raw(struct ucq *ucq, uintptr_t pg1, uintptr_t pg2)
  24{
  25        printd("[user] initializing ucq %08p for proc %d\n", ucq, getpid());
  26        assert(!PGOFF(pg1));
  27        assert(!PGOFF(pg2));
  28        /* Prod and cons both start on the first page, slot 0.  When they are
  29         * equal, the ucq is empty. */
  30        atomic_set(&ucq->prod_idx, pg1);
  31        atomic_set(&ucq->cons_idx, pg1);
  32        ucq->prod_overflow = FALSE;
  33        atomic_set(&ucq->nr_extra_pgs, 0);
  34        atomic_set(&ucq->spare_pg, pg2);
  35        parlib_static_assert(sizeof(struct spin_pdr_lock) <=
  36                             sizeof(ucq->u_lock));
  37        spin_pdr_init((struct spin_pdr_lock*)(&ucq->u_lock));
  38        ucq->ucq_ready = TRUE;
  39}
  40
  41/* Inits a ucq, where you don't have to bother with the memory allocation.  This
  42 * would be appropriate for one or two UCQs, though if you're allocating in
  43 * bulk, use the raw version. */
  44void ucq_init(struct ucq *ucq)
  45{
  46        uintptr_t two_pages = (uintptr_t)mmap(0, PGSIZE * 2,
  47                                              PROT_WRITE | PROT_READ,
  48                                              MAP_POPULATE | MAP_ANONYMOUS |
  49                                              MAP_PRIVATE, -1, 0);
  50
  51        assert(two_pages);
  52        ucq_init_raw(ucq, two_pages, two_pages + PGSIZE);
  53}
  54
  55/* Only call this on ucq's made with the simple ucq_init().  And be sure the ucq
  56 * is no longer in use. */
  57void ucq_free_pgs(struct ucq *ucq)
  58{
  59        uintptr_t pg1 = atomic_read(&ucq->prod_idx);
  60        uintptr_t pg2 = atomic_read(&ucq->spare_pg);
  61
  62        assert(pg1 && pg2);
  63        munmap((void*)pg1, PGSIZE);
  64        munmap((void*)pg2, PGSIZE);
  65}
  66
  67/* Consumer side, returns TRUE on success and fills *msg with the ev_msg.  If
  68 * the ucq appears empty, it will return FALSE.  Messages may have arrived after
  69 * we started getting that we do not receive. */
  70bool get_ucq_msg(struct ucq *ucq, struct event_msg *msg)
  71{
  72        uintptr_t my_idx;
  73        struct ucq_page *old_page, *other_page;
  74        struct msg_container *my_msg;
  75        struct spin_pdr_lock *ucq_lock = (struct spin_pdr_lock*)(&ucq->u_lock);
  76
  77        do {
  78loop_top:
  79                cmb();
  80                my_idx = atomic_read(&ucq->cons_idx);
  81                /* The ucq is empty if the consumer and producer are on the same
  82                 * 'next' slot. */
  83                if (my_idx == atomic_read(&ucq->prod_idx))
  84                        return FALSE;
  85                /* Is the slot we want good?  If not, we're going to need to try
  86                 * and move on to the next page.  If it is, we bypass all of
  87                 * this and try to CAS on us getting my_idx. */
  88                if (slot_is_good(my_idx))
  89                        goto claim_slot;
  90                /* Slot is bad, let's try and fix it */
  91                spin_pdr_lock(ucq_lock);
  92                /* Reread the idx, in case someone else fixed things up while we
  93                 * were waiting/fighting for the lock */
  94                my_idx = atomic_read(&ucq->cons_idx);
  95                if (slot_is_good(my_idx)) {
  96                        /* Someone else fixed it already, let's just try to get
  97                         * out */
  98                        spin_pdr_unlock(ucq_lock);
  99                        /* Make sure this new slot has a producer (ucq isn't
 100                         * empty) */
 101                        if (my_idx == atomic_read(&ucq->prod_idx))
 102                                return FALSE;
 103                        goto claim_slot;
 104                }
 105                /* At this point, the slot is bad, and all other possible
 106                 * consumers are spinning on the lock.  Time to fix things up:
 107                 * Set the counter to the next page, and free the old one. */
 108                /* First, we need to wait and make sure the kernel has posted
 109                 * the next page.  Worst case, we know that the kernel is
 110                 * working on it, since prod_idx != cons_idx */
 111                old_page = (struct ucq_page*)PTE_ADDR(my_idx);
 112                while (!old_page->header.cons_next_pg)
 113                        cpu_relax();
 114                /* Now set the counter to the next page */
 115                assert(!PGOFF(old_page->header.cons_next_pg));
 116                atomic_set(&ucq->cons_idx, old_page->header.cons_next_pg);
 117                /* Side note: at this point, any *new* consumers coming in will
 118                 * grab slots based off the new counter index (cons_idx) */
 119                /* Now free up the old page.  Need to make sure all other
 120                 * consumers are done.  We spin til enough are done, like an
 121                 * inverted refcnt. */
 122                while (atomic_read(&old_page->header.nr_cons) < NR_MSG_PER_PAGE)
 123                {
 124                        /* spinning on userspace here, specifically, another
 125                         * vcore and we don't know who it is.  This will spin a
 126                         * bit, then make sure they aren't preeempted */
 127                        cpu_relax_any();
 128                }
 129                /* Now the page is done.  0 its metadata and give it up. */
 130                old_page->header.cons_next_pg = 0;
 131                atomic_set(&old_page->header.nr_cons, 0);
 132                /* We want to "free" the page.  We'll try and set it as the
 133                 * spare.  If there is already a spare, we'll free that one. */
 134                other_page = (struct ucq_page*)atomic_swap(&ucq->spare_pg,
 135                                                           (long)old_page);
 136                assert(!PGOFF(other_page));
 137                if (other_page) {
 138                        munmap(other_page, PGSIZE);
 139                        atomic_dec(&ucq->nr_extra_pgs);
 140                }
 141                /* All fixed up, unlock.  Other consumers may lock and check to
 142                 * make sure things are done. */
 143                spin_pdr_unlock(ucq_lock);
 144                /* Now that everything is fixed, try again from the top */
 145                goto loop_top;
 146claim_slot:
 147                cmb();  /* so we can goto claim_slot */
 148                /* If we're still here, my_idx is good, and we'll try to claim
 149                 * it.  If we fail, we need to repeat the whole process. */
 150        } while (!atomic_cas(&ucq->cons_idx, my_idx, my_idx + 1));
 151        assert(slot_is_good(my_idx));
 152        /* Now we have a good slot that we can consume */
 153        my_msg = slot2msg(my_idx);
 154        /* linux would put an rmb_depends() here */
 155        /* Wait til the msg is ready (kernel sets this flag) */
 156        while (!my_msg->ready)
 157                cpu_relax();
 158        rmb();  /* order the ready read before the contents */
 159        /* Copy out */
 160        *msg = my_msg->ev_msg;
 161        /* Unset this for the next usage of the container */
 162        my_msg->ready = FALSE;
 163        wmb();  /* post the ready write before incrementing */
 164        /* Increment nr_cons, showing we're done */
 165        atomic_inc(&((struct ucq_page*)PTE_ADDR(my_idx))->header.nr_cons);
 166        return TRUE;
 167}
 168
 169bool ucq_is_empty(struct ucq *ucq)
 170{
 171        /* The ucq is empty if the consumer and producer are on the same 'next'
 172         * slot. */
 173        return (atomic_read(&ucq->cons_idx) == atomic_read(&ucq->prod_idx));
 174}
 175