Use mutexes in epoll instead of spinlocks
[akaros.git] / user / parlib / event.c
index b4880c3..64dfaa0 100644 (file)
@@ -1,4 +1,5 @@
-/* Copyright (c) 2011 The Regents of the University of California
+/* Copyright (c) 2011-2014 The Regents of the University of California
+ * Copyright (c) 2015 Google Inc
  * Barret Rhoden <brho@cs.berkeley.edu>
  * See LICENSE for details.
  *
@@ -7,16 +8,22 @@
 
 #include <ros/event.h>
 #include <ros/procdata.h>
-#include <ucq.h>
-#include <bitmask.h>
-#include <vcore.h>
+#include <parlib/ucq.h>
+#include <parlib/evbitmap.h>
+#include <parlib/ceq.h>
+#include <parlib/vcore.h>
 #include <stdlib.h>
 #include <string.h>
-#include <assert.h>
+#include <parlib/assert.h>
 #include <errno.h>
-#include <parlib.h>
-#include <event.h>
-#include <uthread.h>
+#include <parlib/parlib.h>
+#include <parlib/event.h>
+#include <parlib/uthread.h>
+#include <parlib/spinlock.h>
+#include <parlib/mcs.h>
+#include <parlib/poke.h>
+#include <sys/queue.h>
+#include <malloc.h>
 
 /* For remote VCPD mbox event handling */
 __thread bool __vc_handle_an_mbox = FALSE;
@@ -29,13 +36,13 @@ __thread uint32_t __vc_rem_vcoreid;
  * these stitch up the big_q so its ev_mbox points to its internal mbox.  Never
  * access the internal mbox directly.
  *
- * Raw ones need to have their UCQs initialized.  If you're making a lot of
- * these, you can do one big mmap and init the ucqs on your own, which ought to
- * perform better.
+ * Raw ones need to have their mailboxes initialized.  If you're making a lot of
+ * these and they perform their own mmaps (e.g. UCQs), you can do one big mmap
+ * and init the ucqs on your own, which ought to perform better.
  *
- * Use the 'regular' one for big_qs if you don't want to worry about the ucq
+ * Use the 'regular' one for big_qs if you don't want to worry about the mbox
  * initalization */
-struct event_queue *get_big_event_q_raw(void)
+struct event_queue *get_eventq_raw(void)
 {
        /* TODO: (PIN) should be pinned memory */
        struct event_queue_big *big_q = malloc(sizeof(struct event_queue_big));
@@ -44,17 +51,39 @@ struct event_queue *get_big_event_q_raw(void)
        return (struct event_queue*)big_q;
 }
 
-struct event_queue *get_big_event_q(void)
+struct event_queue *get_eventq(int mbox_type)
 {
-       struct event_queue *big_q = get_big_event_q_raw();
-       /* uses the simpler, internally mmapping ucq_init() */
-       ucq_init(&big_q->ev_mbox->ev_msgs);
+       struct event_queue *big_q = get_eventq_raw();
+       event_mbox_init(big_q->ev_mbox, mbox_type);
        return big_q;
 }
 
+/* Basic initialization of a single mbox.  If you know the type, you can set up
+ * the mbox manually with possibly better performance.  For instance, ucq_init()
+ * calls mmap internally.  You could mmap a huge blob on your own and call
+ * ucq_raw_init (don't forget to set the mbox_type!) */
+void event_mbox_init(struct event_mbox *ev_mbox, int mbox_type)
+{
+       ev_mbox->type = mbox_type;
+       switch (ev_mbox->type) {
+               case (EV_MBOX_UCQ):
+                       ucq_init(&ev_mbox->ucq);
+                       break;
+               case (EV_MBOX_BITMAP):
+                       evbitmap_init(&ev_mbox->evbm);
+                       break;
+               case (EV_MBOX_CEQ):
+                       ceq_init(&ev_mbox->ceq, CEQ_OR, CEQ_DEFAULT_SZ, CEQ_DEFAULT_SZ);
+                       break;
+               default:
+                       printf("Unknown mbox type %d!\n", ev_mbox->type);
+                       break;
+       }
+}
+
 /* Give it up.  I don't recommend calling these unless you're sure the queues
  * aren't in use (unregistered, etc). (TODO: consider some checks for this) */
-void put_big_event_q_raw(struct event_queue *ev_q)
+void put_eventq_raw(struct event_queue *ev_q)
 {
        /* if we use something other than malloc, we'll need to be aware that ev_q
         * is actually an event_queue_big.  One option is to use the flags, though
@@ -62,14 +91,32 @@ void put_big_event_q_raw(struct event_queue *ev_q)
        free(ev_q);
 }
 
-void put_big_event_q(struct event_queue *ev_q)
+void put_eventq(struct event_queue *ev_q)
 {
-       ucq_free_pgs(&ev_q->ev_mbox->ev_msgs);
-       put_big_event_q_raw(ev_q);
+       event_mbox_cleanup(ev_q->ev_mbox);
+       put_eventq_raw(ev_q);
+}
+
+void event_mbox_cleanup(struct event_mbox *ev_mbox)
+{
+       switch (ev_mbox->type) {
+               case (EV_MBOX_UCQ):
+                       ucq_free_pgs(&ev_mbox->ucq);
+                       break;
+               case (EV_MBOX_BITMAP):
+                       evbitmap_cleanup(&ev_mbox->evbm);
+                       break;
+               case (EV_MBOX_CEQ):
+                       ceq_cleanup(&ev_mbox->ceq);
+                       break;
+               default:
+                       printf("Unknown mbox type %d!\n", ev_mbox->type);
+                       break;
+       }
 }
 
 /* Need to point this event_q to an mbox - usually to a vcpd */
-struct event_queue *get_event_q(void)
+struct event_queue *get_eventq_slim(void)
 {
        /* TODO: (PIN) should be pinned memory */
        struct event_queue *ev_q = malloc(sizeof(struct event_queue));
@@ -80,9 +127,9 @@ struct event_queue *get_event_q(void)
 /* Gets a small ev_q, with ev_mbox pointing to the vcpd mbox of vcoreid.  If
  * ev_flags has EVENT_VCORE_PRIVATE set, it'll give you the private mbox.  o/w,
  * you'll get the public one. */
-struct event_queue *get_event_q_vcpd(uint32_t vcoreid, int ev_flags)
+struct event_queue *get_eventq_vcpd(uint32_t vcoreid, int ev_flags)
 {
-       struct event_queue *ev_q = get_event_q();
+       struct event_queue *ev_q = get_eventq_slim();
        if (ev_flags & EVENT_VCORE_PRIVATE)
                ev_q->ev_mbox = &vcpd_of(vcoreid)->ev_mbox_private;
        else
@@ -90,13 +137,18 @@ struct event_queue *get_event_q_vcpd(uint32_t vcoreid, int ev_flags)
        return ev_q;
 }
 
-void put_event_q(struct event_queue *ev_q)
+void put_eventq_slim(struct event_queue *ev_q)
 {
        /* if we use something other than malloc, we'll need to be aware that ev_q
         * is not an event_queue_big. */
        free(ev_q);
 }
 
+void put_eventq_vcpd(struct event_queue *ev_q)
+{
+       put_eventq_slim(ev_q);
+}
+
 /* Sets ev_q to be the receiving end for kernel event ev_type */
 void register_kevent_q(struct event_queue *ev_q, unsigned int ev_type)
 {
@@ -121,7 +173,7 @@ struct event_queue *clear_kevent_q(unsigned int ev_type)
  * the other event functions together to get similar things done. */
 void enable_kevent(unsigned int ev_type, uint32_t vcoreid, int ev_flags)
 {
-       struct event_queue *ev_q = get_event_q_vcpd(vcoreid, ev_flags);
+       struct event_queue *ev_q = get_eventq_vcpd(vcoreid, ev_flags);
        ev_q->ev_flags = ev_flags;
        ev_q->ev_vcore = vcoreid;
        ev_q->ev_handler = 0;
@@ -147,88 +199,174 @@ struct event_queue *disable_kevent(unsigned int ev_type)
 unsigned int get_event_type(struct event_mbox *ev_mbox)
 {
        struct event_msg local_msg = {0};
-       /* UCQ returns 0 on success, so this will dequeue and return the type. */
-       if (!get_ucq_msg(&ev_mbox->ev_msgs, &local_msg)) {
+
+       if (extract_one_mbox_msg(ev_mbox, &local_msg))
                return local_msg.ev_type;
-       }
-       if (BITMASK_IS_CLEAR(&ev_mbox->ev_bitmap, MAX_NR_EVENT))
-               return EV_NONE; /* aka, 0 */
-       for (int i = 0; i < MAX_NR_EVENT; i++) {
-               if (GET_BITMASK_BIT(ev_mbox->ev_bitmap, i)) {
-                       CLR_BITMASK_BIT_ATOMIC(ev_mbox->ev_bitmap, i);
-                       return i;
-               }
-       }
        return EV_NONE;
 }
 
+/* Attempts to register ev_q with sysc, so long as sysc is not done/progress.
+ * Returns true if it succeeded, and false otherwise.  False means that the
+ * syscall is done, and does not need an event set (and should be handled
+ * accordingly).
+ *
+ * A copy of this is in glibc/sysdeps/akaros/syscall.c.  Keep them in sync. */
+bool register_evq(struct syscall *sysc, struct event_queue *ev_q)
+{
+       int old_flags;
+       sysc->ev_q = ev_q;
+       wrmb(); /* don't let that write pass any future reads (flags) */
+       /* Try and set the SC_UEVENT flag (so the kernel knows to look at ev_q) */
+       do {
+               /* no cmb() needed, the atomic_read will reread flags */
+               old_flags = atomic_read(&sysc->flags);
+               /* Spin if the kernel is mucking with syscall flags */
+               while (old_flags & SC_K_LOCK)
+                       old_flags = atomic_read(&sysc->flags);
+               /* If the kernel finishes while we are trying to sign up for an event,
+                * we need to bail out */
+               if (old_flags & (SC_DONE | SC_PROGRESS)) {
+                       sysc->ev_q = 0;         /* not necessary, but might help with bugs */
+                       return FALSE;
+               }
+       } while (!atomic_cas(&sysc->flags, old_flags, old_flags | SC_UEVENT));
+       return TRUE;
+}
+
+/* De-registers a syscall, so that the kernel will not send an event when it is
+ * done.  The call could already be SC_DONE, or could even finish while we try
+ * to unset SC_UEVENT.
+ *
+ * There is a chance the kernel sent an event if you didn't do this in time, but
+ * once this returns, the kernel won't send a message.
+ *
+ * If the kernel is trying to send a message right now, this will spin (on
+ * SC_K_LOCK).  We need to make sure we deregistered, and that if a message
+ * is coming, that it already was sent (and possibly overflowed), before
+ * returning. */
+void deregister_evq(struct syscall *sysc)
+{
+       int old_flags;
+       sysc->ev_q = 0;
+       wrmb(); /* don't let that write pass any future reads (flags) */
+       /* Try and unset the SC_UEVENT flag */
+       do {
+               /* no cmb() needed, the atomic_read will reread flags */
+               old_flags = atomic_read(&sysc->flags);
+               /* Spin if the kernel is mucking with syscall flags */
+               while (old_flags & SC_K_LOCK)
+                       old_flags = atomic_read(&sysc->flags);
+               /* Note we don't care if the SC_DONE flag is getting set.  We just need
+                * to avoid clobbering flags */
+       } while (!atomic_cas(&sysc->flags, old_flags, old_flags & ~SC_UEVENT));
+}
+
 /* Actual Event Handling */
 
-/* List of handlers, process-wide, that the 2LS should fill in.  They all must
- * return (don't context switch to a u_thread) */
-handle_event_t ev_handlers[MAX_NR_EVENT] = {[EV_EVENT] handle_ev_ev,
-                                            [EV_CHECK_MSGS] handle_check_msgs,
-                                            0};
+/* List of handler lists, process-wide.  They all must return (don't context
+ * switch to a u_thread) */
+struct ev_handler *ev_handlers[MAX_NR_EVENT] = {0};
+spinpdrlock_t ev_h_wlock = SPINPDR_INITIALIZER;
+
+int register_ev_handler(unsigned int ev_type, handle_event_t handler,
+                        void *data)
+{
+       struct ev_handler *new_h = malloc(sizeof(struct ev_handler));
+       if (!new_h)
+               return -1;
+       new_h->func = handler;
+       new_h->data = data;
+       spin_pdr_lock(&ev_h_wlock);
+       new_h->next = ev_handlers[ev_type];
+       wmb();  /* make sure new_h is done before publishing to readers */
+       ev_handlers[ev_type] = new_h;
+       spin_pdr_unlock(&ev_h_wlock);
+       return 0;
+}
+
+int deregister_ev_handler(unsigned int ev_type, handle_event_t handler,
+                          void *data)
+{
+       /* TODO: User-level RCU */
+       printf("Failed to dereg handler, not supported yet!\n");
+}
+
+static void run_ev_handlers(unsigned int ev_type, struct event_msg *ev_msg)
+{
+       struct ev_handler *handler;
+       /* TODO: RCU read lock */
+       handler = ev_handlers[ev_type];
+       while (handler) {
+               handler->func(ev_msg, ev_type, handler->data);
+               handler = handler->next;
+       }
+}
+
+/* Attempts to extract a message from an mbox, copying it into ev_msg.
+ * Returns TRUE on success. */
+bool extract_one_mbox_msg(struct event_mbox *ev_mbox, struct event_msg *ev_msg)
+{
+       switch (ev_mbox->type) {
+               case (EV_MBOX_UCQ):
+                       return get_ucq_msg(&ev_mbox->ucq, ev_msg);
+               case (EV_MBOX_BITMAP):
+                       return get_evbitmap_msg(&ev_mbox->evbm, ev_msg);
+               case (EV_MBOX_CEQ):
+                       return get_ceq_msg(&ev_mbox->ceq, ev_msg);
+               default:
+                       printf("Unknown mbox type %d!\n", ev_mbox->type);
+                       return FALSE;
+       }
+}
 
 /* Attempts to handle a message.  Returns 1 if we dequeued a msg, 0 o/w. */
 int handle_one_mbox_msg(struct event_mbox *ev_mbox)
 {
        struct event_msg local_msg;
        unsigned int ev_type;
-       /* get_ucq returns 0 on success, -1 on empty */
-       if (get_ucq_msg(&ev_mbox->ev_msgs, &local_msg) == -1)
+       /* extract returns TRUE on success, we return 1. */
+       if (!extract_one_mbox_msg(ev_mbox, &local_msg))
                return 0;
        ev_type = local_msg.ev_type;
+       assert(ev_type < MAX_NR_EVENT);
        printd("[event] UCQ (mbox %08p), ev_type: %d\n", ev_mbox, ev_type);
-       if (ev_handlers[ev_type])
-               ev_handlers[ev_type](&local_msg, ev_type);
+       run_ev_handlers(ev_type, &local_msg);
        return 1;
 }
 
 /* Handle an mbox.  This is the receive-side processing of an event_queue.  It
- * takes an ev_mbox, since the vcpd mbox isn't a regular ev_q.  Returns the
- * number handled/attempted: counts if you don't have a handler. */
+ * takes an ev_mbox, since the vcpd mbox isn't a regular ev_q.  Returns 1 if we
+ * handled something, 0 o/w. */
 int handle_mbox(struct event_mbox *ev_mbox)
 {
        int retval = 0;
-       uint32_t vcoreid = vcore_id();
-       void bit_handler(unsigned int bit) {
-               printd("[event] Bit: ev_type: %d\n", bit);
-               if (ev_handlers[bit])
-                       ev_handlers[bit](0, bit);
-               retval++;
-               /* Consider checking the queue for incoming messages while we're here */
-       }
        printd("[event] handling ev_mbox %08p on vcore %d\n", ev_mbox, vcore_id());
        /* Some stack-smashing bugs cause this to fail */
        assert(ev_mbox);
-       /* Handle all full messages, tracking the number of attempts. */
+       /* Handle all full messages, tracking if we do at least one. */
        while (handle_one_mbox_msg(ev_mbox))
-               retval++;
-       /* Process all bits, if the kernel tells us any bit is set.  We don't clear
-        * the flag til after we check everything, in case one of the handlers
-        * doesn't return.  After we clear it, we recheck. */
-       if (ev_mbox->ev_check_bits) {
-               do {
-                       ev_mbox->ev_check_bits = TRUE;  /* in case we don't return */
-                       cmb();
-                       BITMASK_FOREACH_SET(ev_mbox->ev_bitmap, MAX_NR_EVENT, bit_handler,
-                                           TRUE);
-                       ev_mbox->ev_check_bits = FALSE;
-                       wrmb(); /* check_bits written before we check for it being clear */
-               } while (!BITMASK_IS_CLEAR(ev_mbox->ev_bitmap, MAX_NR_EVENT));
-       }
+               retval = 1;
        return retval;
 }
 
 /* Empty if the UCQ is empty and the bits don't need checked */
 bool mbox_is_empty(struct event_mbox *ev_mbox)
 {
-       return (ucq_is_empty(&ev_mbox->ev_msgs) && (!ev_mbox->ev_check_bits));
+       switch (ev_mbox->type) {
+               case (EV_MBOX_UCQ):
+                       return ucq_is_empty(&ev_mbox->ucq);
+               case (EV_MBOX_BITMAP):
+                       return evbitmap_is_empty(&ev_mbox->evbm);
+               case (EV_MBOX_CEQ):
+                       return ceq_is_empty(&ev_mbox->ceq);
+               default:
+                       printf("Unknown mbox type %d!\n", ev_mbox->type);
+                       return FALSE;
+       }
 }
 
 /* The EV_EVENT handler - extract the ev_q from the message. */
-void handle_ev_ev(struct event_msg *ev_msg, unsigned int ev_type)
+void handle_ev_ev(struct event_msg *ev_msg, unsigned int ev_type, void *data)
 {
        struct event_queue *ev_q;
        /* EV_EVENT can't handle not having a message / being a bit.  If we got a
@@ -248,33 +386,20 @@ void handle_ev_ev(struct event_msg *ev_msg, unsigned int ev_type)
        handle_event_q(ev_q);
 }
 
-/* This handler tells us to check the public message box of a vcore. */
-void handle_check_msgs(struct event_msg *ev_msg, unsigned int ev_type)
-{
-       uint32_t rem_vcoreid;
-       assert(ev_msg);
-       rem_vcoreid = ev_msg->ev_arg2;
-       printd("[event] handle check msgs for VC %d on VC %d\n", rem_vcoreid,
-              vcore_id());
-       /* if it is a message for ourselves, then we can abort.  Vcores will check
-        * their own messages via handle_events() (which either we're doing now, or
-        * will do when we are done dealing with another vcore's mbox). */
-       if (rem_vcoreid == vcore_id())
-               return;
-       /* they should have had their can_rcv turned off at some point, though it is
-        * possible that it was turned back on by now.  we don't really care - our
-        * job is to make sure their messages get checked. */
-       handle_vcpd_mbox(rem_vcoreid);
-}
-
-/* 2LS will probably call this in vcore_entry and places where it wants to check
- * for / handle events.  This will process all the events for the given vcore.
- * Note, it probably should be the calling vcore you do this to...  Returns the
- * number of events handled. */
+/* Handles VCPD events (public and private).  The kernel always sets
+ * notif_pending after posting a message to either public or private mailbox.
+ * When this returns, as far as we are concerned, notif_pending is FALSE.
+ * However, a concurrent kernel writer could have reset it to true.  This is
+ * fine; whenever we leave VC ctx we double check notif_pending.  Returns 1 or 2
+ * if we actually handled a message, 0 o/w.
+ *
+ * WARNING: this might not return and/or current_uthread may change. */
 int handle_events(uint32_t vcoreid)
 {
        struct preempt_data *vcpd = vcpd_of(vcoreid);
        int retval = 0;
+       vcpd->notif_pending = FALSE;
+       wrmb(); /* prevent future reads from happening before notif_p write */
        retval += handle_mbox(&vcpd->ev_mbox_private);
        retval += handle_mbox(&vcpd->ev_mbox_public);
        return retval;
@@ -284,28 +409,16 @@ int handle_events(uint32_t vcoreid)
  * application specific, then this will dispatch/handle based on its flags. */
 void handle_event_q(struct event_queue *ev_q)
 {
+       printd("[event] handling ev_q %08p on vcore %d\n", ev_q, vcore_id());
        /* If the program wants to handle the ev_q on its own: */
-       if (ev_q->ev_flags & (EVENT_JUSTHANDLEIT | EVENT_THREAD)) {
-               if (!ev_q->ev_handler) {
-                       printf("No ev_handler installed for ev_q %08p, aborting!\n", ev_q);
-                       return;
-               }
-               if (ev_q->ev_flags & EVENT_JUSTHANDLEIT) {
-                       /* Remember this can't block or page fault */
-                       ev_q->ev_handler(ev_q);
-               } else if (ev_q->ev_flags & EVENT_THREAD) {
-                       /* 2LS sched op.  The 2LS can use an existing thread if it wants,
-                        * but do so inside spawn_thread() */
-                       if (sched_ops->spawn_thread)
-                               sched_ops->spawn_thread((uintptr_t)ev_q->ev_handler, ev_q);
-                       else
-                               printf("2LS can't spawn a thread for ev_q %08p\n", ev_q);
-               }
+       if (ev_q->ev_handler) {
+               /* Remember this can't block or page fault */
+               ev_q->ev_handler(ev_q);
                return;
        }
-       printd("[event] handling ev_q %08p on vcore %d\n", ev_q, vcore_id());
        /* Raw ev_qs that haven't been connected to an mbox, user bug: */
        assert(ev_q->ev_mbox);
+       /* The "default" ev_handler, common enough that I don't want a func ptr */
        handle_mbox(ev_q->ev_mbox);
 }
 
@@ -366,7 +479,7 @@ void handle_vcpd_mbox(uint32_t rem_vcoreid)
        __vc_handle_an_mbox = TRUE;
        __vc_rem_vcoreid = rem_vcoreid;
        /* Reset the stack and start over in vcore context */
-       set_stack_pointer((void*)vcpd->transition_stack);
+       set_stack_pointer((void*)vcpd->vcore_stack);
        vcore_entry();
        assert(0);
 }
@@ -423,3 +536,314 @@ void ev_we_returned(bool were_handling_remotes)
        if (were_handling_remotes)
                __vc_handle_an_mbox = TRUE;
 }
+
+/* Debugging */
+void print_ev_msg(struct event_msg *msg)
+{
+       printf("MSG at %08p\n", msg);
+       printf("\ttype: %d\n", msg->ev_type);
+       printf("\targ1 (16): 0x%4x\n", msg->ev_arg1);
+       printf("\targ2 (32): 0x%8x\n", msg->ev_arg2);
+       printf("\targ3 (32): 0x%8x\n", msg->ev_arg3);
+       printf("\targ4 (64): 0x%16x\n", msg->ev_arg4);
+}
+
+/* Uthreads blocking on event queues
+ *
+ * It'd be nice to have a uthread sleep until an event queue has some activity
+ * (e.g. a new message).  It'd also be nice to wake up early with a timer.  It
+ * is tempting to try something like an INDIR and have one evq multiplex two
+ * others (the real event and an alarm).  But then you can't separate the two
+ * streams; what if one thread sleeps on just the event at the same time?  What
+ * if we want to support something like Go's select: a thread wants to block
+ * until there is some activity on some channel?
+ *
+ * Ultimately, we want to allow M uthreads to block on possibly different
+ * subsets of N event queues.
+ *
+ * Every uthread will have a sleep controller, and every event queue will have a
+ * wakeup controller.  There are up to MxN linkage structures connecting these.
+ *
+ * We'll use the event_queue handler to override the default event processing.
+ * This means the event queues that are used for blocking uthreads can *only* be
+ * used for that; the regular event processing will not happen.  This is mostly
+ * true.  It is possible to extract events from an evq's mbox concurrently.
+ *
+ * I briefly considered having one global lock to protect all of the lists and
+ * structures.  That's lousy for the obvious scalability reason, but it seemed
+ * like it'd make things easier, especially when I thought I needed locks in
+ * both the ectlr and the uctlr (in early versions, I considered having the
+ * handler yank itself out of the ectlr, copying a message into that struct, or
+ * o/w needing protection).  On occasion, we run into the "I'd like to split my
+ * lock between two components and still somehow synchronize" issue (e.g. FD
+ * taps, with the FDT lock and the blocking/whatever that goes on in a device).
+ * Whenever that comes up, we usually can get some help from other shared memory
+ * techniques.  For FD taps, it's the kref.  For us, it's post-and-poke, though
+ * it didn't solve all of our problems - I use it as a tool with some basic
+ * shared memory signalling. */
+
+struct evq_wait_link;
+TAILQ_HEAD(wait_link_tailq, evq_wait_link);
+
+/* Bookkeeping for the uthread sleeping on a bunch of event queues.
+ *
+ * Notes on concurrency: most fields are not protected.  check_evqs is racy, and
+ * written to by handlers.  The tailq is only used by the uthread.  blocked is
+ * never concurrently *written*; see __uth_wakeup_poke() for details. */
+struct uth_sleep_ctlr {
+       struct uthread                          *uth;
+       struct spin_pdr_lock            in_use;
+       bool                                            check_evqs;
+       bool                                            blocked;
+       struct poke_tracker                     poker;
+       struct wait_link_tailq          evqs;
+};
+
+/* Attaches to an event_queue (ev_udata), tracks the uthreads for this evq */
+struct evq_wakeup_ctlr {
+       struct wait_link_tailq          waiters;
+       struct spin_pdr_lock            lock;
+};
+
+/* Up to MxN of these, N of them per uthread. */
+struct evq_wait_link {
+       struct uth_sleep_ctlr           *uth_ctlr;
+       TAILQ_ENTRY(evq_wait_link)      link_uth;
+       struct evq_wakeup_ctlr          *evq_ctlr;
+       TAILQ_ENTRY(evq_wait_link)      link_evq;
+};
+
+/* Poke function: ensures the uth managed by uctlr wakes up.  poke() ensures
+ * there is only one thread in this function at a time.  However, it could be
+ * called spuriously, which is why we check 'blocked.' */
+static void __uth_wakeup_poke(void *arg)
+{
+       struct uth_sleep_ctlr *uctlr = arg;
+       /* There are no concurrent writes to 'blocked'.  Blocked is only ever
+        * written when the uth sleeps and only ever cleared here.  Once the uth
+        * writes it, it does not write it again until after we clear it.
+        *
+        * This is still racy - we could see !blocked, then blocked gets set.  In
+        * that case, the poke failed, and that is harmless.  The uth will see
+        * 'check_evqs', which was set before poke, which would be before writing
+        * blocked, and the uth checks 'check_evqs' after writing. */
+       if (uctlr->blocked) {
+               uctlr->blocked = FALSE;
+               cmb();  /* clear blocked before starting the uth */
+               uthread_runnable(uctlr->uth);
+       }
+}
+
+static void uth_sleep_ctlr_init(struct uth_sleep_ctlr *uctlr,
+                                struct uthread *uth)
+{
+       uctlr->uth = uth;
+       spin_pdr_init(&uctlr->in_use);
+       uctlr->check_evqs = FALSE;
+       uctlr->blocked = FALSE;
+       poke_init(&uctlr->poker, __uth_wakeup_poke);
+       TAILQ_INIT(&uctlr->evqs);
+}
+
+/* This handler runs when the ev_q is checked.  Instead of doing anything with
+ * the ev_q, we make sure that every uthread that was waiting on us wakes up.
+ * The uthreads could be waiting on several evqs, so there could be multiple
+ * independent wake-up attempts, hence the poke.  Likewise, the uthread could be
+ * awake when we poke.  The uthread will check check_evqs after sleeping, in
+ * case we poke before it blocks (and the poke fails).
+ *
+ * Also, there could be concurrent callers of this handler, and other uthreads
+ * signing up for a wakeup. */
+void evq_wakeup_handler(struct event_queue *ev_q)
+{
+       struct evq_wakeup_ctlr *ectlr = ev_q->ev_udata;
+       struct evq_wait_link *i;
+       assert(ectlr);
+       spin_pdr_lock(&ectlr->lock);
+       /* Note we wake up all sleepers, even though only one is likely to get the
+        * message.  See the notes in unlink_ectlr() for more info. */
+       TAILQ_FOREACH(i, &ectlr->waiters, link_evq) {
+               i->uth_ctlr->check_evqs = TRUE;
+               cmb();  /* order check write before poke (poke has atomic) */
+               poke(&i->uth_ctlr->poker, i->uth_ctlr);
+       }
+       spin_pdr_unlock(&ectlr->lock);
+}
+
+/* Helper, attaches a wakeup controller to the event queue. */
+void evq_attach_wakeup_ctlr(struct event_queue *ev_q)
+{
+       struct evq_wakeup_ctlr *ectlr = malloc(sizeof(struct evq_wakeup_ctlr));
+       memset(ectlr, 0, sizeof(struct evq_wakeup_ctlr));
+       spin_pdr_init(&ectlr->lock);
+       TAILQ_INIT(&ectlr->waiters);
+       ev_q->ev_udata = ectlr;
+       ev_q->ev_handler = evq_wakeup_handler;
+}
+
+void evq_remove_wakeup_ctlr(struct event_queue *ev_q)
+{
+       free(ev_q->ev_udata);
+       ev_q->ev_udata = 0;
+       ev_q->ev_handler = 0;
+}
+
+static void link_uctlr_ectlr(struct uth_sleep_ctlr *uctlr,
+                             struct evq_wakeup_ctlr *ectlr,
+                             struct evq_wait_link *link)
+{
+       /* No lock needed for the uctlr; we're the only one modifying evqs */
+       link->uth_ctlr = uctlr;
+       TAILQ_INSERT_HEAD(&uctlr->evqs, link, link_uth);
+       /* Once we add ourselves to the ectrl list, we could start getting poked */
+       link->evq_ctlr = ectlr;
+       spin_pdr_lock(&ectlr->lock);
+       TAILQ_INSERT_HEAD(&ectlr->waiters, link, link_evq);
+       spin_pdr_unlock(&ectlr->lock);
+}
+
+/* Disconnects us from a wakeup controller.
+ *
+ * Our evq handlers wake up *all* uthreads that are waiting for activity
+ * (broadcast).  It's a tradeoff.  If the list of uthreads is long, then it is
+ * wasted effort.  An alternative is to wake up exactly one, with slightly
+ * greater overheads.  In the exactly-one case, multiple handlers could wake
+ * this uth up at once, but we can only extract one message.  If we do the
+ * single wake up, then when we detach from an ectlr, we need to peak in the
+ * mbox to see if it is not empty, and conditionally run its handler again, such
+ * that no uthread sits on a ectlr that has activity/pending messages (in
+ * essence, level triggered). */
+static void unlink_ectlr(struct evq_wait_link *link)
+{
+       struct evq_wakeup_ctlr *ectlr = link->evq_ctlr;
+       spin_pdr_lock(&ectlr->lock);
+       TAILQ_REMOVE(&ectlr->waiters, link, link_evq);
+       spin_pdr_unlock(&ectlr->lock);
+}
+
+/* Helper: polls all evqs once and extracts the first message available.  The
+ * message is copied into ev_msg, and the evq with the activity is copied into
+ * which_evq (if it is non-zero).  Returns TRUE on success. */
+static bool extract_evqs_msg(struct event_queue *evqs[], size_t nr_evqs,
+                             struct event_msg *ev_msg,
+                             struct event_queue **which_evq)
+{
+       struct event_queue *evq_i;
+       bool ret = FALSE;
+       /* We need to have notifs disabled when extracting messages from some
+        * mboxes.  Many mboxes have some form of busy waiting between consumers
+        * (userspace).  If we're just a uthread, we could wind up on a runqueue
+        * somewhere while someone else spins, possibly in VC ctx. */
+       uth_disable_notifs();
+       for (int i = 0; i < nr_evqs; i++) {
+               evq_i = evqs[i];
+               if (extract_one_mbox_msg(evq_i->ev_mbox, ev_msg)) {
+                       if (which_evq)
+                               *which_evq = evq_i;
+                       ret = TRUE;
+                       break;
+               }
+       }
+       uth_enable_notifs();
+       return ret;
+}
+
+/* Yield callback */
+static void __uth_blockon_evq_cb(struct uthread *uth, void *arg)
+{
+       struct uth_sleep_ctlr *uctlr = arg;
+       uthread_has_blocked(uth, UTH_EXT_BLK_EVENTQ);
+       cmb();  /* actually block before saying 'blocked' */
+       uctlr->blocked = TRUE;  /* can be woken up now */
+       wrmb(); /* write 'blocked' before read 'check_evqs' */
+       /* If someone set check_evqs, we should wake up.  We're competing with other
+        * wakers via poke (we may have already woken up!). */
+       if (uctlr->check_evqs)
+               poke(&uctlr->poker, uctlr);
+       /* Once we say we're blocked, we could be woken up (possibly by our poke
+        * here) and the uthread could run on another core.  Holding this lock
+        * prevents the uthread from quickly returning and freeing the memory of
+        * uctrl before we have a chance to check_evqs or poke. */
+       spin_pdr_unlock(&uctlr->in_use);
+}
+
+/* Direct version, with *evqs[]. */
+void uth_blockon_evqs_arr(struct event_msg *ev_msg,
+                          struct event_queue **which_evq,
+                          struct event_queue *evqs[], size_t nr_evqs)
+{
+       struct uth_sleep_ctlr uctlr;
+       struct evq_wait_link linkage[nr_evqs];
+
+       /* Catch user mistakes.  If they lack a handler, they didn't attach.  They
+        * are probably using our evq_wakeup_handler, but they might have their own
+        * wrapper function. */
+       for (int i = 0; i < nr_evqs; i++)
+               assert(evqs[i]->ev_handler);
+       /* Check for activity on the evqs before going through the hassle of
+        * sleeping.  ("check, signal, check again" pattern). */
+       if (extract_evqs_msg(evqs, nr_evqs, ev_msg, which_evq))
+               return;
+       uth_sleep_ctlr_init(&uctlr, current_uthread);
+       memset(linkage, 0, sizeof(struct evq_wait_link) * nr_evqs);
+       for (int i = 0; i < nr_evqs; i++)
+               link_uctlr_ectlr(&uctlr, (struct evq_wakeup_ctlr*)evqs[i]->ev_udata,
+                                &linkage[i]);
+       /* Mesa-style sleep until we get a message.  Mesa helps a bit here, since we
+        * can just deregister from them all when we're done.  o/w it is tempting to
+        * have us deregister from *the* one in the handler and extract the message
+        * there; which can be tricky and harder to reason about. */
+       while (1) {
+               /* We need to make sure only one 'version/ctx' of this thread is active
+                * at a time.  Later on, we'll unlock in vcore ctx on the other side of
+                * a yield.  We could restart from the yield, return, and free the uctlr
+                * before that ctx has a chance to finish. */
+               spin_pdr_lock(&uctlr.in_use);
+               /* We're signed up.  We might already have been told to check the evqs,
+                * or there could be messages still sitting in the evqs.  check_evqs is
+                * only ever cleared here, and only ever set in evq handlers. */
+               uctlr.check_evqs = FALSE;
+               cmb();  /* look for messages after clearing check_evqs */
+               if (extract_evqs_msg(evqs, nr_evqs, ev_msg, which_evq))
+                       break;
+               uthread_yield(TRUE, __uth_blockon_evq_cb, &uctlr);
+       }
+       /* On the one hand, it's not necessary to unlock, since the memory will be
+        * freed.  But we do need to go through the process to turn on notifs and
+        * adjust the notif_disabled_depth for the case where we don't yield. */
+       spin_pdr_unlock(&uctlr.in_use);
+       for (int i = 0; i < nr_evqs; i++)
+               unlink_ectlr(&linkage[i]);
+}
+
+/* ... are event_queue *s, nr_evqs of them.  This will block until it can
+ * extract some message from one of evqs.  The message will be placed in ev_msg,
+ * and the particular evq it extracted it from will be placed in which_evq, if
+ * which is non-zero. */
+void uth_blockon_evqs(struct event_msg *ev_msg, struct event_queue **which_evq,
+                      size_t nr_evqs, ...)
+{
+       struct event_queue *evqs[nr_evqs];
+       va_list va;
+       va_start(va, nr_evqs);
+       for (int i = 0; i < nr_evqs; i++)
+               evqs[i] = va_arg(va, struct event_queue *);
+       va_end(va);
+       uth_blockon_evqs_arr(ev_msg, which_evq, evqs, nr_evqs);
+}
+
+/* ... are event_queue *s, nr_evqs of them.  This will attempt to extract some
+ * message from one of evqs.  The message will be placed in ev_msg, and the
+ * particular evq it extracted it from will be placed in which_evq.  Returns
+ * TRUE if it extracted a message. */
+bool uth_check_evqs(struct event_msg *ev_msg, struct event_queue **which_evq,
+                    size_t nr_evqs, ...)
+{
+       struct event_queue *evqs[nr_evqs];
+       va_list va;
+       va_start(va, nr_evqs);
+       for (int i = 0; i < nr_evqs; i++)
+               evqs[i] = va_arg(va, struct event_queue *);
+       va_end(va);
+       return extract_evqs_msg(evqs, nr_evqs, ev_msg, which_evq);
+}