UCQs (XCC)
authorBarret Rhoden <brho@cs.berkeley.edu>
Mon, 25 Jul 2011 23:27:44 +0000 (16:27 -0700)
committerKevin Klues <klueska@cs.berkeley.edu>
Thu, 3 Nov 2011 00:36:05 +0000 (17:36 -0700)
Unbounded concurrent queues, will eventually replace the BCQs in event
queues.  Don't manually call tests/ucq; it won't work.  If you want to
use them, you'll need to rebuild your cross compiler / reinstall your
kernel headers.  Future commits will actually use these.

Check the Documentation for specifics.

Documentation/ucq.txt [new file with mode: 0644]
kern/include/ros/ucq.h [new file with mode: 0644]
kern/include/testing.h
kern/include/ucq.h [new file with mode: 0644]
kern/src/Makefrag
kern/src/testing.c
kern/src/ucq.c [new file with mode: 0644]
tests/ucq.c [new file with mode: 0644]
user/parlib/include/ucq.h [new file with mode: 0644]
user/parlib/ucq.c [new file with mode: 0644]

diff --git a/Documentation/ucq.txt b/Documentation/ucq.txt
new file mode 100644 (file)
index 0000000..9e3fb96
--- /dev/null
@@ -0,0 +1,206 @@
+ucq.txt
+Barret Rhoden
+
+1. Overview
+====================
+1.1 What are they?
+------------------------------------------------------------------
+UCQs are a tool to send messages, with payloads, from the kernel to a process
+through shared memory.  The depth of the queue is unlimited, barring running
+out of memory.  They should be used in closed loop or low rate scenarios that
+require a payload (or that building a system that can handle missing a message
+is unwieldy).
+
+The old BCQs were designed for a shared memory segment, such as procdata.
+Since the kernel is sending the messages, we can use a more customized
+solution.  The problems in this area are running out of memory, having the
+kernel use user-provided-pointers, and synchronizing within the kernel (like a
+spinlock for a TAILQ). 
+
+The basic plan for these "unbounded concurrent queues" (UCQ) is to have linked
+mmap'd pages of arrays of event messages (the BCQ is a circular array of event
+messages, roughly).  As the kernel over-produces, it mmaps more pages and links
+them together (via a header at the beginning of the page).  Userspace munmaps
+when it is done with a page.  To avoid excessive mmap/munmap, we double-buffer
+with two pages, one current and one spare.  We only need to mmap new ones when
+the kernel gets far ahead of the user.
+
+- When we run out of room, the kernel will implicitly mmap another page,
+  solving the memory allocation issue.  The worst thing userspace can do is
+leak its memory, or cause an OOM condition, which we have to deal with anyway.
+
+- Using user-pointers isn't conceptually a problem any more, so long as the
+  kernel reads it in and verifies the address is in userspace (and that it can
+handle page faults).  This wasn't the case a couple years ago when I made the
+BCQs.  We still are careful about pointers - we only use them when messing with
+which page is current, etc, and not when atomically adding items.
+
+- Swapping pages/buffers requires locking, but we can't put the lock in the UCQ
+  structure, since userspace could muck with it.  Instead, we lock at the
+process level.  And instead of grabbing the process lock, we'll grab a hash
+lock (hash table of locks, hashed on the pointer of the UCQ).  This will happen
+every ~170 events or so.  Synchronization for normal ops (not buffer swaps) are
+done with atomics.
+
+Another option instead of allocating more memory would be to have the kernel
+block kthreads until the queue empties.  I really dislike that for a couple
+reasons.  It's easier to handle running out of memory than spawning too many
+kthreads, esp in critical parts of the code (where we can't sleep on memory
+allocation).  More importantly, it's a real pain to block on a user's actions.
+For instance, I'm not about to put a semaphore in user-writable memory.
+
+1.2 Why do we have them?
+------------------------------------------------------------------
+Enqueuing messages in BCQs could fail, due to either a misbehaving process, an
+unlucky circumstance, or most commonly because the queue was full.  Event
+queues, which used BCQs as a building block, would handle the failure as an
+'overflow' of events, and signal the process with a bit.  This means the
+program needs to know how to handle overflow, which becomes painful.
+
+A specific case was syscall completion events.  When a syscall was done, the
+kernel would send a message that a particular syscall was done.  Userspace
+needed to know exactly which one was done, under normal circumstances.  With
+BCQs, the 2LS needed to know how to handle overflow, which means it needs to
+track every outstanding syscall so that it can poll to see which call
+completed.  To do this in a scalable manner required each vcore to have its own
+TAILQ of oustanding uthreads (blocked on syscalls).  The problem with this is
+that blocked syscalls are now tied to vcores, and that vcore might not want to
+yield even if it has no work to do (since it needs to receive its message).
+
+Ultimately, the issue is that complicated systems could benefit from not
+needing to handle overflow.  For certain event queue usages, the unbounded
+nature of UCQs make the system far simpler.  When we build other systems in
+userspace, such as using ev_qs for syscalls and uthread blocking, then we can
+leverage the UCQs.
+
+Note the main downfall of UCQs is that a process can die if it the kernel runs
+out of mememory while trying to send it messages.  If the kernel sends messages
+faster than the process can handle them (for whatever reason, including being
+swapped out), eventually we'll run out of memory.  UCQs need to be used
+carefully, such that the process isn't requesting an *unbounded* amount of
+messages at one time.
+
+The main benefit of UCQs in this situation is that they can handle spikes of
+messages and they remove the need to handle overflow.  Using BCQs, we'd need to
+handle overflow even if it was unlikely, and the impact of this extended beyond
+actually handling overflow.  Check out the old overflow handling code (and
+maybe some notes in the Documentation) for details about how we needed to do
+work for every syscall, in the off chance we had to handle overflow.
+
+2. How do they work?
+====================
+2.1 Producer (kernel)
+------------------------------------------------------------------
+Producers atomically fight for slots in prod_idx, which encode both the page
+and the msg number within the page.  If the slot is good, we just write our
+message.  If it isn't, things get interesting.  We need to synchronize, so we
+lock externally to the ucq.  This is where the hash lock comes in.  A bad slot
+is one in which there is no page or the message slot is greater than the array
+of msgs in the page.
+
+Whoever grabs the lock first will be responsible for getting a new page and
+resetting the counter (which tracks which slot is next).  All future lockers
+can tell that the work is done by examining the counter and trying to get a
+slot.  If they got a good one, everything is okay and it is just like they got
+a good slot in the first place (with the exception of a lock/unlock pair).
+
+One minor problem is that with sufficient producers and a delayed fixer (who
+holds the lock), we could overflow the prod_idx (around 3900 would overflow
+into a new page on the 0th slot).  To avoid this, the first producer to detect
+the slot/index is no good will set an overflow flag.  All future producers will
+check this flag before attempting to get a slot, and if we're overflowing, they
+will jump to the "grab lock" phase.  This limits the window of vulnerability.
+In theory, you could have 3900 producers at exactly the same time all fetch and
+add, before any of the others have a chance to set the overflow.  Not only does
+this require 3900 cores, but they all have to be writing to the exact same UCQ.
+If this happens, we have a warning that will go off and I'll fix it.
+
+So the last part to deal with is getting a new page and linking it with the old
+one.  We simply atomic_swap on the spare_pg.  If there is a spare already
+mmaped, we use it.  If there isn't, we need to mmap a new page.  Either way, we
+tell the old page to follow to the new page.  We set the index to a good value,
+then unlock (and clear the overflow flag).
+
+When we set the counter, we set it to 1, instead of 0, thereby reserving slot 0
+for ourselves.  This prevents a DoS from the program.  The user could muck with
+the prod_idx endlessly, in a way that seems benign.  To prevent this, we make
+sure that any time the process grabs the hashlock that it gets a good slot.
+Whenever we fail to get a slot we lock, and whenever we lock we get a good
+slot.
+
+Side note: another related DoS involves using CAS.  You can't CAS with
+userspace unless you're willing to fail.  O/W, you could be DoS'd.
+
+When we have a slot, we write in the ev_msg, and then toggle the 'ready' flag
+in the message container.  This is in case the consumer is trying to
+concurrently read our message while we are writing.
+
+And finally, every time the kernel reads or writes to something, we need to
+verify the address (which is user-supplied).  Eventually, the kernel can handle
+a page fault on a user address, but we can't tolerate KVAs in UCQs.  Note that
+old versions allowed UCQs/BCQs to be at KVAs (procdata), but that behavior will
+probably go away soon (error prone, we now handle user pointers anyway, and we
+usually load 'current' when dealing with a proc).
+
+
+2.2 Consumer
+------------------------------------------------------------------
+Consumers CAS (compare-and-swap) on claiming a slot.  We need to use CAS, since
+we don't want to advance unless the queue is not empty.  If they detect the
+slot is bad, they fight among themselves for a lock, which is embedded in the
+UCQ, and the winner sets the counter to track the next page, frees extra pages,
+etc.
+
+Emptiness is defined as the prod_idx == cons_idx.  Both the producer's and the
+consumer's *next* slot is the same, so there is no items to be consumed.  If
+the prod_idx is bad, the consumer needs to wait til there is a good next page
+(the kernel is in the process of finding and filling a good slot).  If the
+cons_idx is bad, it means we need to go to the next page (lock first).
+
+When locking, the consumer does the same trick as the producer: try to get a
+slot again, in case someone else got the lock before you and fixed things up.
+
+So once we have the next page (it has been posted by the kernel), we can set up
+the cons_idx so consumers can grab slots.  We don't need to reserve a slot for
+ourselves (no DoS risk).
+
+After setting up the counter, our next job is to *free* the old page of
+consumed items.  The trick here is that there may be consumers still using it.
+We statically know how many items there are on the page, so we have all
+consumers increment another counter when they are done with the slot.  When
+that counter is the max, we know everyone is done with the page and it can be
+*freed*.  That counter is like an inverted reference count.  I don't care when
+it is 0, I care when it is the max.  Alternatively, we could have initialized
+it to MAX and decremented, but this way felt more natural.  When the page is
+done, we don't actually free it.  We'll atomic_swap it with the spare_pg,
+making it the spare.  If there already was a spare, then we have too many pages
+and need to munmap the old spare.
+
+So we finally get our good slot, and we spin til the kernel has loaded it with
+a message.  Then we just copy it out, increment the 'number consumed' counter,
+and we're done.
+
+3. Misc Other Notes:
+====================
+I think most of these things were discussed inline.  There are a few random
+things worth mentioning, some of which are obvious in hindsight:
+- The kernel can't CAS and also not fail, o/w it can be DoSed.
+
+- Without CAS, the kernel can't atomically change the page and the index unless
+  they are the same long / modifiable in the same atomic statement.  This is
+why the page and the counter/slot number are put together in prod_idx.
+
+- Kernel writers/producers need to stop/delay while another producer fixes
+  things up.  The only acceptable place to delay/spin is on the proc's
+hashlock.  Any other scheme will probably fail, even if all you want to do is
+swap the spare page and reset the counter.It doesn't work for the same reason
+we have the current page and the index in one memory location.
+
+- The prod_idx and cons_idx are both protected by the lock and atomically
+  incremented.  The lock protects who resets them, with the understanding that
+it should only be done when everyone agrees the counter is bad (too large), but
+the normal atomics happen lock-free.
+
+- Userspace should mmap a huge chunk of memory and then pass in page addresses
+  to the ucq_init() function.  I made the init function so that this would be
+really easy.
diff --git a/kern/include/ros/ucq.h b/kern/include/ros/ucq.h
new file mode 100644 (file)
index 0000000..11e9fd5
--- /dev/null
@@ -0,0 +1,80 @@
+/* Copyright (c) 2011 The Regents of the University of California
+ * Barret Rhoden <brho@cs.berkeley.edu>
+ * See LICENSE for details.
+ *
+ * Unbounded concurrent queues.  Linked buffers/arrays of elements, in page
+ * size chunks.  The pages/buffers are linked together by an info struct at the
+ * beginning of the page.  Producers and consumers sync on the idxes when
+ * operating in a page, and page swaps are synced via the proc* for the kernel
+ * and via the ucq's u_lock for the user. 
+ *
+ * There's a bunch of details and issues discussed in the Documentation.
+ *
+ * This header contains the stuff that the kernel and userspace need to agree
+ * on.  Each side of the implementation will have their own .c and .h files.
+ * The kernel's implementation is in kern/src/ucq.c, and the user's is in
+ * user/parlib/ucq.c. */
+
+#ifndef ROS_INC_UCQ_H
+#define ROS_INC_UCQ_H
+
+#include <ros/common.h>
+#include <ros/event.h>
+#include <ros/atomic.h>
+#include <ros/arch/mmu.h>
+
+/* The main UCQ structure, contains indexes and start points (for the indexes),
+ * etc. */
+struct ucq {
+       atomic_t                                        prod_idx;               /* both pg and slot nr */
+       bool                                            prod_overflow;  /* flag to prevent wraparound */
+       atomic_t                                        spare_pg;               /* mmaped, unused page */
+       atomic_t                                        nr_extra_pgs;   /* nr pages mmaped */
+       atomic_t                                        cons_idx;               /* cons pg and slot nr */
+       bool                                            ucq_ready;              /* ucq is ready to be used */
+       /* Userspace lock for modifying the UCQ */
+       uint64_t                                        u_lock;
+};
+
+/* Struct at the beginning of every page/buffer, tracking consumers and
+ * pointing to the next one, so that the consumer can follow. */
+struct ucq_page_header {
+       uintptr_t                                       cons_next_pg;   /* next page to consume */
+       atomic_t                                        nr_cons;                /* like an inverted refcnt */
+};
+
+struct msg_container {
+       struct event_msg                        ev_msg;
+       bool                                            ready;                  /* kernel has written */
+};
+
+struct ucq_page {
+       struct ucq_page_header          header;
+       struct msg_container            msgs[];
+};
+
+#define UCQ_WARN_THRESH                        1000                    /* nr pages befor warning */
+
+#define NR_MSG_PER_PAGE ((PGSIZE - ROUNDUP(sizeof(struct ucq_page_header),     \
+                                           __alignof__(struct msg_container))) \
+                         / sizeof(struct msg_container))
+
+/* A slot encodes both the page addr and the count within the page */
+static bool slot_is_good(uintptr_t slot)
+{
+       uintptr_t counter = PGOFF(slot);
+       uintptr_t pg_addr = PTE_ADDR(slot);
+       return ((counter < NR_MSG_PER_PAGE) && pg_addr) ? TRUE : FALSE;
+}
+
+/* Helper: converts a slot/index into a msg container.  The ucq_page is the
+ * PPN/PTE_ADDR of 'slot', and the specific slot *number* is the PGOFF.  Assumes
+ * the slot is good.  If it isn't, you're going to get random memory.
+ *
+ * Note that this doesn't actually read the memory, just computes an address. */
+static inline struct msg_container *slot2msg(uintptr_t slot)
+{
+       return &((struct ucq_page*)PTE_ADDR(slot))->msgs[PGOFF(slot)];
+}
+
+#endif /* ROS_INC_UCQ_H */
index 51d3856..a7fb688 100644 (file)
@@ -28,6 +28,7 @@ void test_slab(void);
 void test_kmalloc(void);
 void test_hashtable(void);
 void test_bcq(void);
+void test_ucq(void);
 void test_vm_regions(void);
 void test_radix_tree(void);
 void test_random_fs(void);
diff --git a/kern/include/ucq.h b/kern/include/ucq.h
new file mode 100644 (file)
index 0000000..41adbac
--- /dev/null
@@ -0,0 +1,16 @@
+/* Copyright (c) 2011 The Regents of the University of California
+ * Barret Rhoden <brho@cs.berkeley.edu>
+ * See LICENSE for details.
+ *
+ * Unbounded concurrent queues, kernel side.  Check k/i/r/ucq.h or the
+ * Documentation for more info. */
+
+#ifndef ROS_KERN_UCQ_H
+#define ROS_KERN_UCQ_H
+
+#include <ros/ucq.h>
+#include <process.h>
+
+void send_ucq_msg(struct ucq *ucq, struct proc *p, struct event_msg *msg);
+
+#endif /* ROS_KERN_UCQ_H */
index c4db827..67bd0a3 100644 (file)
@@ -54,6 +54,7 @@ KERN_SRCFILES := $(KERN_ARCH_SRCFILES) \
                  $(KERN_SRC_DIR)/event.c \
                  $(KERN_SRC_DIR)/alarm.c \
                  $(KERN_SRC_DIR)/kdebug.c \
+                 $(KERN_SRC_DIR)/ucq.c \
                  $(KERN_SRC_DIR)/arsc.c
 
 # Only build files if they exist.
index ab92055..be55257 100644 (file)
@@ -11,6 +11,7 @@
 #include <ros/memlayout.h>
 #include <ros/common.h>
 #include <ros/bcq.h>
+#include <ros/ucq.h>
 
 #include <atomic.h>
 #include <stdio.h>
@@ -33,6 +34,9 @@
 #include <radix.h>
 #include <monitor.h>
 #include <kthread.h>
+#include <schedule.h>
+#include <umem.h>
+#include <ucq.h>
 
 #define l1 (available_caches.l1)
 #define l2 (available_caches.l2)
@@ -959,6 +963,93 @@ void test_bcq(void)
        }
 }
 
+/* Test a simple concurrent send and receive (one prod, one cons).  We spawn a
+ * process that will go into _M mode on another core, and we'll do the test from
+ * an alarm handler run on our core.  When we start up the process, we won't
+ * return so we need to defer the work with an alarm. */
+void test_ucq(void)
+{
+       struct timer_chain *tchain = &per_cpu_info[core_id()].tchain;
+       struct alarm_waiter *waiter = kmalloc(sizeof(struct alarm_waiter), 0);
+
+       /* Alarm handler: what we want to do after the process is up */
+       void send_msgs(struct alarm_waiter *waiter)
+       {
+               struct timer_chain *tchain;
+               struct proc *p = waiter->data;
+               struct ucq *ucq = (struct ucq*)USTACKTOP;
+               physaddr_t old_cr3 = rcr3();
+               struct event_msg msg;
+
+               printk("Running the alarm handler!\n");
+               /* might not be mmaped yet, if not, abort */
+               if (!user_mem_check(p, ucq, PGSIZE, 1, PTE_USER_RW)) {
+                       printk("Not mmaped yet\n");
+                       goto abort;
+               }
+               /* load their address space */
+               lcr3(p->env_cr3);
+               /* So it's mmaped, see if it is ready (note that this is dangerous) */
+               if (!ucq->ucq_ready) {
+                       printk("Not ready yet\n");
+                       lcr3(old_cr3);
+                       goto abort;
+               }
+               /* So it's ready, time to finally do the tests... */
+               printk("[kernel] Finally starting the tests... \n");
+               /* 1: Send a simple message */
+               printk("[kernel] Sending simple message (7, deadbeef)\n");
+               msg.ev_type = 7;
+               msg.ev_arg2 = 0xdeadbeef;
+               send_ucq_msg(ucq, p, &msg);
+               /* 2: Send a bunch.  In a VM, this causes one swap, and then a bunch of
+                * mmaps. */
+               for (int i = 0; i < 5000; i++) {
+                       msg.ev_type = i;
+                       send_ucq_msg(ucq, p, &msg);
+               }
+               /* other things we could do:
+                *  - concurrent producers / consumers...  ugh.
+                */
+               /* done, switch back and free things */
+               lcr3(old_cr3);
+               proc_decref(p);
+               kfree(waiter); /* since it was kmalloc()d */
+               return;
+       abort:
+               tchain = &per_cpu_info[core_id()].tchain;
+               /* Set to run again */
+               set_awaiter_rel(waiter, 1000000);
+               set_alarm(tchain, waiter);
+       }
+       /* Set up a handler to run the real part of the test */
+       init_awaiter(waiter, send_msgs);
+       set_awaiter_rel(waiter, 1000000);       /* 1s should be long enough */
+       set_alarm(tchain, waiter);
+       /* Just spawn the program */
+       struct file *program;
+       program = do_file_open("/bin/ucq", 0, 0);
+       if (!program) {
+               printk("Unable to find /bin/ucq!\n");
+               return;
+       }
+       char *p_envp[] = {"LD_LIBRARY_PATH=/lib", 0};
+       struct proc *p = proc_create(program, 0, p_envp);
+       spin_lock(&p->proc_lock);
+       __proc_set_state(p, PROC_RUNNABLE_S);
+       schedule_proc(p);
+       spin_unlock(&p->proc_lock);
+       /* instead of getting rid of the reference created in proc_create, we'll put
+        * it in the awaiter */
+       waiter->data = p;
+       kref_put(&program->f_kref);
+       /* Should never return from schedule (env_pop in there) also note you may
+        * not get the process you created, in the event there are others floating
+        * around that are runnable */
+       schedule();
+       assert(0);
+}
+
 /* rudimentary tests.  does the basics, create, merge, split, etc.  Feel free to
  * add more, esp for the error conditions and finding free slots.  This is also
  * a bit lazy with setting the caller's fields (perm, flags, etc). */
diff --git a/kern/src/ucq.c b/kern/src/ucq.c
new file mode 100644 (file)
index 0000000..fe4e4c5
--- /dev/null
@@ -0,0 +1,122 @@
+/* Copyright (c) 2011 The Regents of the University of California
+ * Barret Rhoden <brho@cs.berkeley.edu>
+ * See LICENSE for details.
+ *
+ * Kernel side of ucqs. */
+
+#include <ucq.h>
+#include <umem.h>
+#include <assert.h>
+#include <mm.h>
+#include <atomic.h>
+
+/* Proc p needs to be current, and you should have checked that ucq is valid
+ * memory.  We'll assert it here, to catch any of your bugs.  =) */
+void send_ucq_msg(struct ucq *ucq, struct proc *p, struct event_msg *msg)
+{
+       uintptr_t my_slot = 0;
+       struct ucq_page *new_page, *old_page;
+       struct msg_container *my_msg;
+
+       assert(is_user_rwaddr(ucq, sizeof(struct ucq)));
+       /* So we can try to send ucqs to _Ss before they initialize */
+       if (!ucq->ucq_ready)
+               return;
+       /* Bypass fetching/incrementing the counter if we're overflowing, helps
+        * prevent wraparound issues on the counter (only 12 bits of counter) */
+       if (ucq->prod_overflow)
+               goto grab_lock;
+       /* Grab a potential slot */
+       my_slot = (uintptr_t)atomic_fetch_and_add(&ucq->prod_idx, 1);
+       if (slot_is_good(my_slot))
+               goto have_slot;
+       /* Warn others to not bother with the fetch_and_add */
+       ucq->prod_overflow = TRUE;
+       /* Sanity check */
+       if (PGOFF(my_slot) > 3000)
+               warn("Abnormally high counter, there's probably something wrong!");
+grab_lock:
+       /* TODO: use a hashlock, instead of the proc lock.  think about irqsave.  do
+        * we send events from IRQ context?  The proc_lock isn't irqsave */
+       spin_lock(&p->proc_lock);
+       /* Grab a potential slot (again, preventing another DoS) */
+       my_slot = (uintptr_t)atomic_fetch_and_add(&ucq->prod_idx, 1);
+       if (slot_is_good(my_slot))
+               goto unlock_lock;
+       /* Check to make sure the old_page was good before we do anything too
+        * intense (we deref it later).  Bad pages are likely due to
+        * user-malfeasance or neglect.
+        *
+        * The is_user_rwaddr() check on old_page might catch addresses below
+        * MMAP_LOWEST_VA, and we can also handle a PF, but we'll explicitly check
+        * for 0 just to be sure (and it's a likely error). */
+       old_page = (struct ucq_page*)PTE_ADDR(my_slot);
+       if (!is_user_rwaddr(old_page, PGSIZE) || !old_page)
+               goto error_addr_unlock;
+       /* Things still aren't fixed, so we need to reset everything */
+       /* Try to get the spare page, so we don't have to mmap a new one */
+       new_page = (struct ucq_page*)atomic_swap(&ucq->spare_pg, 0);
+       if (!new_page) {
+               /* Warn if we have a ridiculous amount of pages in the ucq */
+               if (atomic_fetch_and_add(&ucq->nr_extra_pgs, 1) > UCQ_WARN_THRESH)
+                       warn("Over %d pages in ucq %08p!\n", UCQ_WARN_THRESH, ucq);
+               /* TODO: use a proc_lock/mm_lock or call the external one.  Need to do
+                * this for now since we don't have hash locks yet HASH LOCK */
+               new_page = (struct ucq_page*)__do_mmap(p, 0, PGSIZE,
+                                                      PROT_READ | PROT_WRITE,
+                                                      MAP_ANON | MAP_POPULATE, 0, 0);
+               assert(new_page);
+               assert(!PGOFF(new_page));
+       } else {
+               /* If we're using the user-supplied new_page, we need to check it */
+               if (!is_user_rwaddr(new_page, PGSIZE) || PGOFF(new_page))
+                       goto error_addr_unlock;
+       }
+       /* Now we have a page.  Lets make sure it's set up properly */
+       new_page->header.cons_next_pg = 0;
+       new_page->header.nr_cons = 0;
+       /* Link the old page to the new one, so consumers know how to follow */
+       old_page->header.cons_next_pg = (uintptr_t)new_page;
+       /* Set the prod_idx counter to 1 (and the new_page), reserving the first
+        * slot (number '0') for us (reservation prevents DoS). */
+       my_slot = (uintptr_t)new_page;
+       atomic_set(&ucq->prod_idx, my_slot + 1);
+       /* Fallthrough to clear overflow and unlock */
+unlock_lock:
+       /* Clear the overflow, so new producers will try to get a slot */
+       ucq->prod_overflow = FALSE;
+       /* At this point, any normal (non-locking) producers can succeed in getting
+        * a slot.  The ones that failed earlier will fight for the lock, then
+        * quickly proceed when they get a good slot */
+       spin_unlock(&p->proc_lock);     /* TODO HASH LOCK */
+       /* Fall through to having a slot */
+have_slot:
+       /* Sanity check on our slot. */
+       assert(slot_is_good(my_slot));
+       /* Convert slot to actual msg_container.  Note we never actually deref
+        * my_slot here (o/w we'd need a rw_addr check). */
+       my_msg = slot2msg(my_slot);
+       /* Make sure our msg is user RW */
+       if (!is_user_rwaddr(my_msg, sizeof(struct msg_container)))
+               goto error_addr;
+       /* Finally write the message */
+       my_msg->ev_msg = *msg;
+       wmb();
+       /* Now that the write is done, signal to the consumer that they can consume
+        * our message (they could have been spinning on it) */
+       my_msg->ready = TRUE;
+       return;
+error_addr_unlock:
+       /* Had a bad addr while holding the lock.  This is a bit more serious */
+       warn("Bad addr in ucq page management!");
+       ucq->prod_overflow = FALSE;
+       spin_unlock(&p->proc_lock);     /* TODO HASH LOCK */
+       /* Fall-through to normal error out */
+error_addr:
+       warn("Invalid user address, not sending a message");
+       /* TODO: consider killing the process here.  For now, just catch it.  For
+        * some cases, we have a slot that we never fill in, though if we had a bad
+        * addr, none of this will work out and the kernel just needs to protect
+        * itself. */
+       return;
+}
diff --git a/tests/ucq.c b/tests/ucq.c
new file mode 100644 (file)
index 0000000..43bd8d2
--- /dev/null
@@ -0,0 +1,37 @@
+#include <stdlib.h>
+#include <stdio.h>
+#include <parlib.h>
+#include <sys/mman.h>
+#include <ucq.h>
+
+int main(int argc, char** argv)
+{
+       /* this program should only be started from the kernel for tests */
+       printf("Attempting to read ucq messages from test_ucq().  "
+              "Don't call this manually.\n");
+       /* Map into a known, extremely ghetto location.  The kernel knows to look
+        * here. */
+       struct ucq *ucq = mmap((void*)USTACKTOP, PGSIZE, PROT_WRITE | PROT_READ,
+                              MAP_POPULATE, -1, 0);
+       assert((uintptr_t)ucq == USTACKTOP);
+       /* Now init it */
+       uintptr_t two_pages = (uintptr_t)mmap(0, PGSIZE * 2, PROT_WRITE | PROT_READ,
+                                             MAP_POPULATE, -1, 0);
+       assert(two_pages);
+       ucq_init(ucq, two_pages, two_pages + PGSIZE);
+       /* try to get a simple message */
+       struct event_msg msg;
+       /* 1: Spin til we can get a message (0 on success breaks) */
+       while (get_ucq_msg(ucq, &msg))
+               cpu_relax();
+       printf("[user] Got simple message type %d(7) with A2 %08p(0xdeadbeef)\n",
+              msg.ev_type, msg.ev_arg2);
+       /* 2: get a bunch */
+       for (int i = 0; i < 5000; i++) {
+               while (get_ucq_msg(ucq, &msg))
+                       cpu_relax();
+               assert(msg.ev_type == i);
+       }
+       printf("Received a bunch!  Last one was %d(4999)\n", msg.ev_type);
+       return 0;
+}
diff --git a/user/parlib/include/ucq.h b/user/parlib/include/ucq.h
new file mode 100644 (file)
index 0000000..3d633a9
--- /dev/null
@@ -0,0 +1,16 @@
+/* Copyright (c) 2011 The Regents of the University of California
+ * Barret Rhoden <brho@cs.berkeley.edu>
+ * See LICENSE for details.
+ *
+ * Unbounded concurrent queues, user side.  Check k/i/r/ucq.h or the
+ * Documentation for more info. */
+
+#ifndef _UCQ_H
+#define _UCQ_H
+
+#include <ros/ucq.h>
+
+void ucq_init(struct ucq *ucq, uintptr_t pg1, uintptr_t pg2);
+int get_ucq_msg(struct ucq *ucq, struct event_msg *msg);
+
+#endif /* _UCQ_H */
diff --git a/user/parlib/ucq.c b/user/parlib/ucq.c
new file mode 100644 (file)
index 0000000..860e850
--- /dev/null
@@ -0,0 +1,126 @@
+/* Copyright (c) 2011 The Regents of the University of California
+ * Barret Rhoden <brho@cs.berkeley.edu>
+ * See LICENSE for details.
+ *
+ * Unbounded concurrent queues, user side.  Check k/i/r/ucq.h or the
+ * Documentation for more info. */
+
+#include <ros/arch/membar.h>
+#include <arch/atomic.h>
+#include <arch/arch.h>
+#include <ucq.h>
+#include <mcs.h>
+#include <sys/mman.h>
+#include <assert.h>
+#include <stdio.h>
+#include <rassert.h> /* for the static_assert() */
+
+/* Initializes a ucq.  You pass in addresses of mmaped pages for the main page
+ * (prod_idx) and the spare page.  I recommend mmaping a big chunk and breaking
+ * it up over a bunch of ucqs, instead of doing a lot of little mmap() calls. */
+void ucq_init(struct ucq *ucq, uintptr_t pg1, uintptr_t pg2)
+{
+       assert(!PGOFF(pg1));
+       assert(!PGOFF(pg2));
+       /* Prod and cons both start on the first page, slot 0.  When they are equal,
+        * the ucq is empty. */
+       atomic_set(&ucq->prod_idx, pg1);
+       atomic_set(&ucq->cons_idx, pg1);
+       ucq->prod_overflow = FALSE;
+       atomic_set(&ucq->nr_extra_pgs, 0);
+       atomic_set(&ucq->spare_pg, pg2);
+       static_assert(sizeof(struct mcs_lock) <= sizeof(ucq->u_lock));
+       mcs_lock_init((struct mcs_lock*)(&ucq->u_lock));
+       ucq->ucq_ready = TRUE;
+}
+
+/* Consumer side, returns 0 on success and fills *msg with the ev_msg.  If the
+ * ucq is empty, it will return -1. */
+int get_ucq_msg(struct ucq *ucq, struct event_msg *msg)
+{
+       uintptr_t my_idx;
+       struct ucq_page *old_page, *other_page;
+       struct msg_container *my_msg;
+       /* Locking stuff.  Would be better with a spinlock, if we had them, since
+        * this should be lightly contested.  */
+       struct mcs_lock_qnode local_qn = {0};
+       struct mcs_lock *ucq_lock = (struct mcs_lock*)(&ucq->u_lock);
+
+       do {
+loop_top:
+               cmb();
+               my_idx = atomic_read(&ucq->cons_idx);
+               /* The ucq is empty if the consumer and producer are on the same 'next'
+                * slot. */
+               if (my_idx == atomic_read(&ucq->prod_idx))
+                       return -1;
+               /* Is the slot we want good?  If not, we're going to need to try and
+                * move on to the next page.  If it is, we bypass all of this and try to
+                * CAS on us getting my_idx. */
+               if (slot_is_good(my_idx))
+                       goto claim_slot;
+               /* Slot is bad, let's try and fix it */
+               mcs_lock_notifsafe(ucq_lock, &local_qn);
+               /* Reread the idx, in case someone else fixed things up while we
+                * were waiting/fighting for the lock */
+               my_idx = atomic_read(&ucq->cons_idx);
+               if (slot_is_good(my_idx)) {
+                       /* Someone else fixed it already, let's just try to get out */
+                       mcs_unlock_notifsafe(ucq_lock, &local_qn);
+                       goto claim_slot;
+               }
+               /* At this point, the slot is bad, and all other possible consumers are
+                * spinning on the lock.  Time to fix things up: Set the counter to the
+                * next page, and free the old one. */
+               /* First, we need to wait and make sure the kernel has posted the next
+                * page.  Worst case, we know that the kernel is working on it, since
+                * prod_idx != cons_idx */
+               old_page = (struct ucq_page*)PTE_ADDR(my_idx);
+               while (!old_page->header.cons_next_pg)
+                       cpu_relax();
+               /* Now set the counter to the next page */
+               assert(!PGOFF(old_page->header.cons_next_pg));
+               atomic_set(&ucq->cons_idx, old_page->header.cons_next_pg);
+               /* Side note: at this point, any *new* consumers coming in will grab
+                * slots based off the new counter index (cons_idx) */
+               /* Now free up the old page.  Need to make sure all other consumers are
+                * done.  We spin til enough are done, like an inverted refcnt. */
+               while (atomic_read(&old_page->header.nr_cons) < NR_MSG_PER_PAGE)
+                       cpu_relax();
+               /* Now the page is done.  0 its metadata and give it up. */
+               old_page->header.cons_next_pg = 0;
+               atomic_set(&old_page->header.nr_cons, 0);
+               /* We want to "free" the page.  We'll try and set it as the spare.  If
+                * there is already a spare, we'll free that one. */
+               other_page = (struct ucq_page*)atomic_swap(&ucq->spare_pg,
+                                                          (long)old_page);
+               assert(!PGOFF(other_page));
+               if (other_page) {
+                       munmap(other_page, PGSIZE);
+                       atomic_dec(&ucq->nr_extra_pgs);
+               }
+               /* All fixed up, unlock.  Other consumers may lock and check to make
+                * sure things are done. */
+               mcs_unlock_notifsafe(ucq_lock, &local_qn);
+               /* Now that everything is fixed, try again from the top */
+               goto loop_top;
+claim_slot:
+               cmb();  /* so we can goto claim_slot */
+               /* If we're still here, my_idx is good, and we'll try to claim it.  If
+                * we fail, we need to repeat the whole process. */
+       } while (!atomic_cas(&ucq->cons_idx, my_idx, my_idx + 1));
+       /* Now we have a good slot that we can consume */
+       my_msg = slot2msg(my_idx);
+       /* Wait til the msg is ready (kernel sets this flag) */
+       while (!my_msg->ready)
+               cpu_relax();
+       /* Copy out */
+       *msg = my_msg->ev_msg;
+       /* Unset this for the next usage of the container */
+       my_msg->ready = FALSE;
+       wmb();
+       /* Increment nr_cons, showing we're done */
+       atomic_inc(&((struct ucq_page*)PTE_ADDR(my_idx))->header.nr_cons);
+       return 0;
+}
+