Add support for uthreads blocking on event queues
authorBarret Rhoden <brho@cs.berkeley.edu>
Tue, 11 Aug 2015 21:12:44 +0000 (17:12 -0400)
committerBarret Rhoden <brho@cs.berkeley.edu>
Mon, 28 Sep 2015 19:14:00 +0000 (15:14 -0400)
The generic problem is that we have M uthreads and want to block on a
subset of N event queues.  This is similar to Go's select call.  The
uthread will eventually return with a message from one of the queues.
In the case that multiple evqs fire before the uthread checks the
queues, it'll attempt to extract a message in the order the queues were
listed.

We do this by bypassing the default event handler, which means event
queues that are used for uthreads must be used exclusively for uthreads.
Though technically you can read from the mbox concurrently, a
handle_events() call won't handle any of the messages: that's up to the
uthread.

tests/evq_block.c [new file with mode: 0644]
user/parlib/event.c
user/parlib/include/event.h
user/parlib/include/uthread.h

diff --git a/tests/evq_block.c b/tests/evq_block.c
new file mode 100644 (file)
index 0000000..1de66a7
--- /dev/null
@@ -0,0 +1,70 @@
+/* Copyright (c) 2015 Google, Inc.
+ * Barret Rhoden <brho@cs.berkeley.edu>
+ * See LICENSE for details.
+ *
+ * Very basic test for blocking a uthread on event queues. */
+
+#include <stdio.h>
+#include <unistd.h>
+#include <parlib/event.h>
+#include <parlib/timing.h>
+#include <parlib/uthread.h>
+#include <benchutil/alarm.h>
+
+static struct event_queue *get_ectlr_evq(void)
+{
+       struct event_queue *ev_q = get_big_event_q();
+       evq_attach_wakeup_ctlr(ev_q);
+       return ev_q;
+}
+
+void trampoline_handler(struct event_queue *ev_q)
+{
+       printf("Got event on evp %p\n", ev_q);
+       evq_wakeup_handler(ev_q);
+}
+
+int main(int argc, char **argv)
+{
+       uint64_t now;
+       int ctlfd1, timerfd1;
+       int ctlfd2, timerfd2;
+       /* these need to just exist somewhere.  don't free them. */
+       struct event_queue *evq1 = get_ectlr_evq();
+       struct event_queue *evq2 = get_ectlr_evq();
+       evq1->ev_flags |= EVENT_JUSTHANDLEIT | EVENT_INDIR | EVENT_SPAM_INDIR |
+                         EVENT_WAKEUP;
+       evq2->ev_flags |= EVENT_JUSTHANDLEIT | EVENT_INDIR | EVENT_SPAM_INDIR |
+                         EVENT_WAKEUP;
+       /* hack in our own handler for debugging */
+       evq1->ev_handler = trampoline_handler;
+       evq2->ev_handler = trampoline_handler;
+
+       if (devalarm_get_fds(&ctlfd1, &timerfd1, 0))
+               return -1;
+       if (devalarm_get_fds(&ctlfd2, &timerfd2, 0))
+               return -1;
+       if (devalarm_set_evq(ctlfd1, evq1))
+               return -1;
+       if (devalarm_set_evq(ctlfd2, evq2))
+               return -1;
+       now = read_tsc();
+       /* with this setup and the early sleep, two fires, then one.  but we'll
+        * process one first, since that's the first one on the list passed to
+        * blockon */
+       if (devalarm_set_time(timerfd1, now + sec2tsc(4)))
+               return -1;
+       if (devalarm_set_time(timerfd2, now + sec2tsc(2)))
+               return -1;
+
+       /* if we remove this, two will fire first and wake us up.  if we don't exit
+        * right away, one will eventually fire and do nothing. */
+       uthread_sleep(5);
+       /* then the actual usage: */
+       struct event_msg msg;
+       struct event_queue *which;
+       uth_blockon_evqs(&msg, &which, 2, evq1, evq2);
+       printf("Got message type %d on evq %s (%p)\n", msg.ev_type,
+              which == evq1 ? "one" : "two", which);
+       return 0;
+}
index dc530cc..27f7d8d 100644 (file)
@@ -1,4 +1,5 @@
-/* Copyright (c) 2011 The Regents of the University of California
+/* Copyright (c) 2011-2014 The Regents of the University of California
+ * Copyright (c) 2015 Google Inc
  * Barret Rhoden <brho@cs.berkeley.edu>
  * See LICENSE for details.
  *
 #include <parlib/event.h>
 #include <parlib/uthread.h>
 #include <parlib/spinlock.h>
+#include <parlib/mcs.h>
+#include <parlib/poke.h>
+#include <sys/queue.h>
+#include <malloc.h>
 
 /* For remote VCPD mbox event handling */
 __thread bool __vc_handle_an_mbox = FALSE;
@@ -461,3 +466,303 @@ void print_ev_msg(struct event_msg *msg)
        printf("\targ3 (32): 0x%8x\n", msg->ev_arg3);
        printf("\targ4 (64): 0x%16x\n", msg->ev_arg4);
 }
+
+/* Uthreads blocking on event queues
+ *
+ * It'd be nice to have a uthread sleep until an event queue has some activity
+ * (e.g. a new message).  It'd also be nice to wake up early with a timer.  It
+ * is tempting to try something like an INDIR and have one evq multiplex two
+ * others (the real event and an alarm).  But then you can't separate the two
+ * streams; what if one thread sleeps on just the event at the same time?  What
+ * if we want to support something like Go's select: a thread wants to block
+ * until there is some activity on some channel?
+ *
+ * Ultimately, we want to allow M uthreads to block on possibly different
+ * subsets of N event queues.
+ *
+ * Every uthread will have a sleep controller, and every event queue will have a
+ * wakeup controller.  There are up to MxN linkage structures connecting these.
+ *
+ * We'll use the event_queue handler to override the default event processing.
+ * This means the event queues that are used for blocking uthreads can *only* be
+ * used for that; the regular event processing will not happen.  This is mostly
+ * true.  It is possible to extract events from an evq's mbox concurrently.
+ *
+ * I briefly considered having one global lock to protect all of the lists and
+ * structures.  That's lousy for the obvious scalability reason, but it seemed
+ * like it'd make things easier, especially when I thought I needed locks in
+ * both the ectlr and the uctlr (in early versions, I considered having the
+ * handler yank itself out of the ectlr, copying a message into that struct, or
+ * o/w needing protection).  On occasion, we run into the "I'd like to split my
+ * lock between two components and still somehow synchronize" issue (e.g. FD
+ * taps, with the FDT lock and the blocking/whatever that goes on in a device).
+ * Whenever that comes up, we usually can get some help from other shared memory
+ * techniques.  For FD taps, it's the kref.  For us, it's post-and-poke, though
+ * it didn't solve all of our problems - I use it as a tool with some basic
+ * shared memory signalling. */
+
+struct evq_wait_link;
+TAILQ_HEAD(wait_link_tailq, evq_wait_link);
+
+/* Bookkeeping for the uthread sleeping on a bunch of event queues.
+ *
+ * Notes on concurrency: most fields are not protected.  check_evqs is racy, and
+ * written to by handlers.  The tailq is only used by the uthread.  blocked is
+ * never concurrently *written*; see __uth_wakeup_poke() for details. */
+struct uth_sleep_ctlr {
+       struct uthread                          *uth;
+       struct spin_pdr_lock            in_use;
+       bool                                            check_evqs;
+       bool                                            blocked;
+       struct poke_tracker                     poker;
+       struct wait_link_tailq          evqs;
+};
+
+/* Attaches to an event_queue (ev_udata), tracks the uthreads for this evq */
+struct evq_wakeup_ctlr {
+       struct wait_link_tailq          waiters;
+       struct spin_pdr_lock            lock;
+};
+
+/* Up to MxN of these, N of them per uthread. */
+struct evq_wait_link {
+       struct uth_sleep_ctlr           *uth_ctlr;
+       TAILQ_ENTRY(evq_wait_link)      link_uth;
+       struct evq_wakeup_ctlr          *evq_ctlr;
+       TAILQ_ENTRY(evq_wait_link)      link_evq;
+};
+
+/* Poke function: ensures the uth managed by uctlr wakes up.  poke() ensures
+ * there is only one thread in this function at a time.  However, it could be
+ * called spuriously, which is why we check 'blocked.' */
+static void __uth_wakeup_poke(void *arg)
+{
+       struct uth_sleep_ctlr *uctlr = arg;
+       /* There are no concurrent writes to 'blocked'.  Blocked is only ever
+        * written when the uth sleeps and only ever cleared here.  Once the uth
+        * writes it, it does not write it again until after we clear it.
+        *
+        * This is still racy - we could see !blocked, then blocked gets set.  In
+        * that case, the poke failed, and that is harmless.  The uth will see
+        * 'check_evqs', which was set before poke, which would be before writing
+        * blocked, and the uth checks 'check_evqs' after writing. */
+       if (uctlr->blocked) {
+               uctlr->blocked = FALSE;
+               cmb();  /* clear blocked before starting the uth */
+               uthread_runnable(uctlr->uth);
+       }
+}
+
+static void uth_sleep_ctlr_init(struct uth_sleep_ctlr *uctlr,
+                                struct uthread *uth)
+{
+       uctlr->uth = uth;
+       spin_pdr_init(&uctlr->in_use);
+       uctlr->check_evqs = FALSE;
+       uctlr->blocked = FALSE;
+       poke_init(&uctlr->poker, __uth_wakeup_poke);
+       TAILQ_INIT(&uctlr->evqs);
+}
+
+/* This handler runs when the ev_q is checked.  Instead of doing anything with
+ * the ev_q, we make sure that every uthread that was waiting on us wakes up.
+ * The uthreads could be waiting on several evqs, so there could be multiple
+ * independent wake-up attempts, hence the poke.  Likewise, the uthread could be
+ * awake when we poke.  The uthread will check check_evqs after sleeping, in
+ * case we poke before it blocks (and the poke fails).
+ *
+ * Also, there could be concurrent callers of this handler, and other uthreads
+ * signing up for a wakeup. */
+void evq_wakeup_handler(struct event_queue *ev_q)
+{
+       struct evq_wakeup_ctlr *ectlr = ev_q->ev_udata;
+       struct evq_wait_link *i;
+       assert(ectlr);
+       spin_pdr_lock(&ectlr->lock);
+       /* Note we wake up all sleepers, even though only one is likely to get the
+        * message.  See the notes in unlink_ectlr() for more info. */
+       TAILQ_FOREACH(i, &ectlr->waiters, link_evq) {
+               i->uth_ctlr->check_evqs = TRUE;
+               cmb();  /* order check write before poke (poke has atomic) */
+               poke(&i->uth_ctlr->poker, i->uth_ctlr);
+       }
+       spin_pdr_unlock(&ectlr->lock);
+}
+
+/* Helper, attaches a wakeup controller to the event queue. */
+void evq_attach_wakeup_ctlr(struct event_queue *ev_q)
+{
+       struct evq_wakeup_ctlr *ectlr = malloc(sizeof(struct evq_wakeup_ctlr));
+       memset(ectlr, 0, sizeof(struct evq_wakeup_ctlr));
+       spin_pdr_init(&ectlr->lock);
+       TAILQ_INIT(&ectlr->waiters);
+       ev_q->ev_udata = ectlr;
+       ev_q->ev_handler = evq_wakeup_handler;
+}
+
+void evq_remove_wakeup_ctlr(struct event_queue *ev_q)
+{
+       free(ev_q->ev_udata);
+       ev_q->ev_udata = 0;
+       ev_q->ev_handler = 0;
+}
+
+static void link_uctlr_ectlr(struct uth_sleep_ctlr *uctlr,
+                             struct evq_wakeup_ctlr *ectlr,
+                             struct evq_wait_link *link)
+{
+       /* No lock needed for the uctlr; we're the only one modifying evqs */
+       link->uth_ctlr = uctlr;
+       TAILQ_INSERT_HEAD(&uctlr->evqs, link, link_uth);
+       /* Once we add ourselves to the ectrl list, we could start getting poked */
+       link->evq_ctlr = ectlr;
+       spin_pdr_lock(&ectlr->lock);
+       TAILQ_INSERT_HEAD(&ectlr->waiters, link, link_evq);
+       spin_pdr_unlock(&ectlr->lock);
+}
+
+/* Disconnects us from a wakeup controller.
+ *
+ * Our evq handlers wake up *all* uthreads that are waiting for activity
+ * (broadcast).  It's a tradeoff.  If the list of uthreads is long, then it is
+ * wasted effort.  An alternative is to wake up exactly one, with slightly
+ * greater overheads.  In the exactly-one case, multiple handlers could wake
+ * this uth up at once, but we can only extract one message.  If we do the
+ * single wake up, then when we detach from an ectlr, we need to peak in the
+ * mbox to see if it is not empty, and conditionally run its handler again, such
+ * that no uthread sits on a ectlr that has activity/pending messages (in
+ * essence, level triggered). */
+static void unlink_ectlr(struct evq_wait_link *link)
+{
+       struct evq_wakeup_ctlr *ectlr = link->evq_ctlr;
+       spin_pdr_lock(&ectlr->lock);
+       TAILQ_REMOVE(&ectlr->waiters, link, link_evq);
+       spin_pdr_unlock(&ectlr->lock);
+}
+
+/* Helper: polls all evqs once and extracts the first message available.  The
+ * message is copied into ev_msg, and the evq with the activity is copied into
+ * which_evq (if it is non-zero).  Returns TRUE on success. */
+static bool extract_evqs_msg(struct event_queue *evqs[], size_t nr_evqs,
+                             struct event_msg *ev_msg,
+                             struct event_queue **which_evq)
+{
+       struct event_queue *evq_i;
+       bool ret = FALSE;
+       /* We need to have notifs disabled when extracting messages from some
+        * mboxes.  Many mboxes have some form of busy waiting between consumers
+        * (userspace).  If we're just a uthread, we could wind up on a runqueue
+        * somewhere while someone else spins, possibly in VC ctx. */
+       uth_disable_notifs();
+       for (int i = 0; i < nr_evqs; i++) {
+               evq_i = evqs[i];
+               if (extract_one_mbox_msg(evq_i->ev_mbox, ev_msg)) {
+                       if (which_evq)
+                               *which_evq = evq_i;
+                       ret = TRUE;
+                       break;
+               }
+       }
+       uth_enable_notifs();
+       return ret;
+}
+
+/* Yield callback */
+static void __uth_blockon_evq_cb(struct uthread *uth, void *arg)
+{
+       struct uth_sleep_ctlr *uctlr = arg;
+       uthread_has_blocked(uth, UTH_EXT_BLK_EVENTQ);
+       cmb();  /* actually block before saying 'blocked' */
+       uctlr->blocked = TRUE;  /* can be woken up now */
+       wrmb(); /* write 'blocked' before read 'check_evqs' */
+       /* If someone set check_evqs, we should wake up.  We're competing with other
+        * wakers via poke (we may have already woken up!). */
+       if (uctlr->check_evqs)
+               poke(&uctlr->poker, uctlr);
+       /* Once we say we're blocked, we could be woken up (possibly by our poke
+        * here) and the uthread could run on another core.  Holding this lock
+        * prevents the uthread from quickly returning and freeing the memory of
+        * uctrl before we have a chance to check_evqs or poke. */
+       spin_pdr_unlock(&uctlr->in_use);
+}
+
+/* Direct version, with *evqs[]. */
+void uth_blockon_evqs_arr(struct event_msg *ev_msg,
+                          struct event_queue **which_evq,
+                          struct event_queue *evqs[], size_t nr_evqs)
+{
+       struct uth_sleep_ctlr uctlr;
+       struct evq_wait_link linkage[nr_evqs];
+
+       /* Catch user mistakes.  If they lack a handler, they didn't attach.  They
+        * are probably using our evq_wakeup_handler, but they might have their own
+        * wrapper function. */
+       for (int i = 0; i < nr_evqs; i++)
+               assert(evqs[i]->ev_handler);
+       /* Check for activity on the evqs before going through the hassle of
+        * sleeping.  ("check, signal, check again" pattern). */
+       if (extract_evqs_msg(evqs, nr_evqs, ev_msg, which_evq))
+               return;
+       uth_sleep_ctlr_init(&uctlr, current_uthread);
+       memset(linkage, 0, sizeof(struct evq_wait_link) * nr_evqs);
+       for (int i = 0; i < nr_evqs; i++)
+               link_uctlr_ectlr(&uctlr, (struct evq_wakeup_ctlr*)evqs[i]->ev_udata,
+                                &linkage[i]);
+       /* Mesa-style sleep until we get a message.  Mesa helps a bit here, since we
+        * can just deregister from them all when we're done.  o/w it is tempting to
+        * have us deregister from *the* one in the handler and extract the message
+        * there; which can be tricky and harder to reason about. */
+       while (1) {
+               /* We need to make sure only one 'version/ctx' of this thread is active
+                * at a time.  Later on, we'll unlock in vcore ctx on the other side of
+                * a yield.  We could restart from the yield, return, and free the uctlr
+                * before that ctx has a chance to finish. */
+               spin_pdr_lock(&uctlr.in_use);
+               /* We're signed up.  We might already have been told to check the evqs,
+                * or there could be messages still sitting in the evqs.  check_evqs is
+                * only ever cleared here, and only ever set in evq handlers. */
+               uctlr.check_evqs = FALSE;
+               cmb();  /* look for messages after clearing check_evqs */
+               if (extract_evqs_msg(evqs, nr_evqs, ev_msg, which_evq))
+                       break;
+               uthread_yield(TRUE, __uth_blockon_evq_cb, &uctlr);
+       }
+       /* On the one hand, it's not necessary to unlock, since the memory will be
+        * freed.  But we do need to go through the process to turn on notifs and
+        * adjust the notif_disabled_depth for the case where we don't yield. */
+       spin_pdr_unlock(&uctlr.in_use);
+       for (int i = 0; i < nr_evqs; i++)
+               unlink_ectlr(&linkage[i]);
+}
+
+/* ... are event_queue *s, nr_evqs of them.  This will block until it can
+ * extract some message from one of evqs.  The message will be placed in ev_msg,
+ * and the particular evq it extracted it from will be placed in which_evq, if
+ * which is non-zero. */
+void uth_blockon_evqs(struct event_msg *ev_msg, struct event_queue **which_evq,
+                      size_t nr_evqs, ...)
+{
+       struct event_queue *evqs[nr_evqs];
+       va_list va;
+       va_start(va, nr_evqs);
+       for (int i = 0; i < nr_evqs; i++)
+               evqs[i] = va_arg(va, struct event_queue *);
+       va_end(va);
+       uth_blockon_evqs_arr(ev_msg, which_evq, evqs, nr_evqs);
+}
+
+/* ... are event_queue *s, nr_evqs of them.  This will attempt to extract some
+ * message from one of evqs.  The message will be placed in ev_msg, and the
+ * particular evq it extracted it from will be placed in which_evq.  Returns
+ * TRUE if it extracted a message. */
+bool uth_check_evqs(struct event_msg *ev_msg, struct event_queue **which_evq,
+                    size_t nr_evqs, ...)
+{
+       struct event_queue *evqs[nr_evqs];
+       va_list va;
+       va_start(va, nr_evqs);
+       for (int i = 0; i < nr_evqs; i++)
+               evqs[i] = va_arg(va, struct event_queue *);
+       va_end(va);
+       return extract_evqs_msg(evqs, nr_evqs, ev_msg, which_evq);
+}
index 8478142..448bf6c 100644 (file)
@@ -1,4 +1,5 @@
-/* Copyright (c) 2011 The Regents of the University of California
+/* Copyright (c) 2011-2014 The Regents of the University of California
+ * Copyright (c) 2015 Google Inc
  * Barret Rhoden <brho@cs.berkeley.edu>
  * See LICENSE for details.
  *
@@ -61,6 +62,21 @@ void ev_we_returned(bool were_handling_remotes);
 /* Debugging */
 void print_ev_msg(struct event_msg *msg);
 
+/* Uthreads blocking on event queues.  M uthreads can block on subsets of N
+ * event queues.  The structs and details are buried in event.c.  We can move
+ * some of them here if users need greater control over their evqs. */
+void evq_attach_wakeup_ctlr(struct event_queue *ev_q);
+void evq_remove_wakeup_ctlr(struct event_queue *ev_q);
+/* Handler, attaches to the ev_q.  Most people won't need this directly. */
+void evq_wakeup_handler(struct event_queue *ev_q);
+void uth_blockon_evqs_arr(struct event_msg *ev_msg,
+                          struct event_queue **which_evq,
+                          struct event_queue *evqs[], size_t nr_evqs);
+void uth_blockon_evqs(struct event_msg *ev_msg, struct event_queue **which_evq,
+                      size_t nr_evqs, ...);
+bool uth_check_evqs(struct event_msg *ev_msg, struct event_queue **which_evq,
+                    size_t nr_evqs, ...);
+
 __END_DECLS
 
 #endif /* PARLIB_EVENT_H */
index 2f3ca1e..9bb71f8 100644 (file)
@@ -17,7 +17,8 @@ __BEGIN_DECLS
 
 /* Externally blocked thread reasons (for uthread_has_blocked()) */
 #define UTH_EXT_BLK_MUTEX                      1
-#define UTH_EXT_BLK_JUSTICE                    2       /* whatever.  might need more options */
+#define UTH_EXT_BLK_EVENTQ                     2
+#define UTH_EXT_BLK_JUSTICE                    3       /* whatever.  might need more options */
 
 /* Bare necessities of a user thread.  2LSs should allocate a bigger struct and
  * cast their threads to uthreads when talking with vcore code.  Vcore/default