Add support for uthreads blocking on event queues
[akaros.git] / user / parlib / event.c
1 /* Copyright (c) 2011-2014 The Regents of the University of California
2  * Copyright (c) 2015 Google Inc
3  * Barret Rhoden <brho@cs.berkeley.edu>
4  * See LICENSE for details.
5  *
6  * Userspace utility functions for receiving events and notifications (IPIs).
7  * Some are higher level than others; just use what you need. */ 
8
9 #include <ros/event.h>
10 #include <ros/procdata.h>
11 #include <parlib/ucq.h>
12 #include <parlib/bitmask.h>
13 #include <parlib/vcore.h>
14 #include <stdlib.h>
15 #include <string.h>
16 #include <assert.h>
17 #include <errno.h>
18 #include <parlib/parlib.h>
19 #include <parlib/event.h>
20 #include <parlib/uthread.h>
21 #include <parlib/spinlock.h>
22 #include <parlib/mcs.h>
23 #include <parlib/poke.h>
24 #include <sys/queue.h>
25 #include <malloc.h>
26
27 /* For remote VCPD mbox event handling */
28 __thread bool __vc_handle_an_mbox = FALSE;
29 __thread uint32_t __vc_rem_vcoreid;
30
31 /********* Event_q Setup / Registration  ***********/
32
33 /* Get event_qs via these interfaces, since eventually we'll want to either
34  * allocate from pinned memory or use some form of a slab allocator.  Also,
35  * these stitch up the big_q so its ev_mbox points to its internal mbox.  Never
36  * access the internal mbox directly.
37  *
38  * Raw ones need to have their UCQs initialized.  If you're making a lot of
39  * these, you can do one big mmap and init the ucqs on your own, which ought to
40  * perform better.
41  *
42  * Use the 'regular' one for big_qs if you don't want to worry about the ucq
43  * initalization */
44 struct event_queue *get_big_event_q_raw(void)
45 {
46         /* TODO: (PIN) should be pinned memory */
47         struct event_queue_big *big_q = malloc(sizeof(struct event_queue_big));
48         memset(big_q, 0, sizeof(struct event_queue_big));
49         big_q->ev_mbox = &big_q->ev_imbox;
50         return (struct event_queue*)big_q;
51 }
52
53 struct event_queue *get_big_event_q(void)
54 {
55         struct event_queue *big_q = get_big_event_q_raw();
56         /* uses the simpler, internally mmapping ucq_init() */
57         ucq_init(&big_q->ev_mbox->ev_msgs);
58         return big_q;
59 }
60
61 /* Give it up.  I don't recommend calling these unless you're sure the queues
62  * aren't in use (unregistered, etc). (TODO: consider some checks for this) */
63 void put_big_event_q_raw(struct event_queue *ev_q)
64 {
65         /* if we use something other than malloc, we'll need to be aware that ev_q
66          * is actually an event_queue_big.  One option is to use the flags, though
67          * this could be error prone. */
68         free(ev_q);
69 }
70
71 void put_big_event_q(struct event_queue *ev_q)
72 {
73         ucq_free_pgs(&ev_q->ev_mbox->ev_msgs);
74         put_big_event_q_raw(ev_q);
75 }
76
77 /* Need to point this event_q to an mbox - usually to a vcpd */
78 struct event_queue *get_event_q(void)
79 {
80         /* TODO: (PIN) should be pinned memory */
81         struct event_queue *ev_q = malloc(sizeof(struct event_queue));
82         memset(ev_q, 0, sizeof(struct event_queue));
83         return ev_q;
84 }
85
86 /* Gets a small ev_q, with ev_mbox pointing to the vcpd mbox of vcoreid.  If
87  * ev_flags has EVENT_VCORE_PRIVATE set, it'll give you the private mbox.  o/w,
88  * you'll get the public one. */
89 struct event_queue *get_event_q_vcpd(uint32_t vcoreid, int ev_flags)
90 {
91         struct event_queue *ev_q = get_event_q();
92         if (ev_flags & EVENT_VCORE_PRIVATE)
93                 ev_q->ev_mbox = &vcpd_of(vcoreid)->ev_mbox_private;
94         else
95                 ev_q->ev_mbox = &vcpd_of(vcoreid)->ev_mbox_public;
96         return ev_q;
97 }
98
99 void put_event_q(struct event_queue *ev_q)
100 {
101         /* if we use something other than malloc, we'll need to be aware that ev_q
102          * is not an event_queue_big. */
103         free(ev_q);
104 }
105
106 /* Sets ev_q to be the receiving end for kernel event ev_type */
107 void register_kevent_q(struct event_queue *ev_q, unsigned int ev_type)
108 {
109         __procdata.kernel_evts[ev_type] = ev_q;
110 }
111
112 /* Clears the event, returning an ev_q if there was one there.  You'll need to
113  * free it. */
114 struct event_queue *clear_kevent_q(unsigned int ev_type)
115 {
116         struct event_queue *ev_q = __procdata.kernel_evts[ev_type];
117         __procdata.kernel_evts[ev_type] = 0;
118         return ev_q;
119 }
120
121 /* Enables an IPI/event combo for ev_type sent to vcoreid's default mbox.  IPI
122  * if you want one or not.  If you want the event to go to the vcore private
123  * mbox (meaning no other core should ever handle it), send in
124  * EVENT_VCORE_PRIVATE with ev_flags.
125  *
126  * This is the simplest thing applications may want, and shows how you can put
127  * the other event functions together to get similar things done. */
128 void enable_kevent(unsigned int ev_type, uint32_t vcoreid, int ev_flags)
129 {
130         struct event_queue *ev_q = get_event_q_vcpd(vcoreid, ev_flags);
131         ev_q->ev_flags = ev_flags;
132         ev_q->ev_vcore = vcoreid;
133         ev_q->ev_handler = 0;
134         wmb();  /* make sure ev_q is filled out before registering */
135         register_kevent_q(ev_q, ev_type);
136 }
137
138 /* Stop receiving the events (one could be on the way).  Caller needs to be
139  * careful, since the kernel might be sending an event to the ev_q.  Depending
140  * on the ev_q, it may be hard to know when it is done (for instance, if all
141  * syscalls you ever registered with the ev_q are done, then it would be okay).
142  * o/w, don't free it. */
143 struct event_queue *disable_kevent(unsigned int ev_type)
144 {
145         return clear_kevent_q(ev_type);
146 }
147
148 /********* Event Handling / Reception ***********/
149 /* Somewhat ghetto helper, for the lazy.  If all you care about is an event
150  * number, this will see if the event happened or not.  It will try for a
151  * message, but if there is none, it will go for a bit.  Note that multiple
152  * bit messages will turn into just one bit. */
153 unsigned int get_event_type(struct event_mbox *ev_mbox)
154 {
155         struct event_msg local_msg = {0};
156
157         if (extract_one_mbox_msg(ev_mbox, &local_msg))
158                 return local_msg.ev_type;
159         if (BITMASK_IS_CLEAR(&ev_mbox->ev_bitmap, MAX_NR_EVENT))
160                 return EV_NONE; /* aka, 0 */
161         for (int i = 0; i < MAX_NR_EVENT; i++) {
162                 if (GET_BITMASK_BIT(ev_mbox->ev_bitmap, i)) {
163                         CLR_BITMASK_BIT_ATOMIC(ev_mbox->ev_bitmap, i);
164                         return i;
165                 }
166         }
167         return EV_NONE;
168 }
169
170 /* Actual Event Handling */
171
172 /* List of handler lists, process-wide.  They all must return (don't context
173  * switch to a u_thread) */
174 struct ev_handler *ev_handlers[MAX_NR_EVENT] = {0};
175 spinpdrlock_t ev_h_wlock = SPINPDR_INITIALIZER;
176
177 int register_ev_handler(unsigned int ev_type, handle_event_t handler,
178                         void *data)
179 {
180         struct ev_handler *new_h = malloc(sizeof(struct ev_handler));
181         if (!new_h)
182                 return -1;
183         new_h->func = handler;
184         new_h->data = data;
185         spin_pdr_lock(&ev_h_wlock);
186         new_h->next = ev_handlers[ev_type];
187         wmb();  /* make sure new_h is done before publishing to readers */
188         ev_handlers[ev_type] = new_h;
189         spin_pdr_unlock(&ev_h_wlock);
190         return 0;
191 }
192
193 int deregister_ev_handler(unsigned int ev_type, handle_event_t handler,
194                           void *data)
195 {
196         /* TODO: User-level RCU */
197         printf("Failed to dereg handler, not supported yet!\n");
198 }
199
200 static void run_ev_handlers(unsigned int ev_type, struct event_msg *ev_msg)
201 {
202         struct ev_handler *handler;
203         /* TODO: RCU read lock */
204         handler = ev_handlers[ev_type];
205         while (handler) {
206                 handler->func(ev_msg, ev_type, handler->data);
207                 handler = handler->next;
208         }
209 }
210
211 /* Attempts to extract a message from an mbox, copying it into ev_msg.
212  * Returns TRUE on success. */
213 bool extract_one_mbox_msg(struct event_mbox *ev_mbox, struct event_msg *ev_msg)
214 {
215         /* get_ucq returns 0 on success, -1 on empty */
216         return get_ucq_msg(&ev_mbox->ev_msgs, ev_msg) == 0;
217 }
218
219 /* Attempts to handle a message.  Returns 1 if we dequeued a msg, 0 o/w. */
220 int handle_one_mbox_msg(struct event_mbox *ev_mbox)
221 {
222         struct event_msg local_msg;
223         unsigned int ev_type;
224         /* extract returns TRUE on success, we return 1. */
225         if (!extract_one_mbox_msg(ev_mbox, &local_msg))
226                 return 0;
227         ev_type = local_msg.ev_type;
228         assert(ev_type < MAX_NR_EVENT);
229         printd("[event] UCQ (mbox %08p), ev_type: %d\n", ev_mbox, ev_type);
230         run_ev_handlers(ev_type, &local_msg);
231         return 1;
232 }
233
234 /* Handle an mbox.  This is the receive-side processing of an event_queue.  It
235  * takes an ev_mbox, since the vcpd mbox isn't a regular ev_q.  Returns 1 if we
236  * handled something, 0 o/w. */
237 int handle_mbox(struct event_mbox *ev_mbox)
238 {
239         int retval = 0;
240         uint32_t vcoreid = vcore_id();
241         void bit_handler(unsigned int bit) {
242                 printd("[event] Bit: ev_type: %d\n", bit);
243                 run_ev_handlers(bit, 0);
244                 retval = 1;
245                 /* Consider checking the queue for incoming messages while we're here */
246         }
247         printd("[event] handling ev_mbox %08p on vcore %d\n", ev_mbox, vcore_id());
248         /* Some stack-smashing bugs cause this to fail */
249         assert(ev_mbox);
250         /* Handle all full messages, tracking if we do at least one. */
251         while (handle_one_mbox_msg(ev_mbox))
252                 retval = 1;
253         /* Process all bits, if the kernel tells us any bit is set.  We don't clear
254          * the flag til after we check everything, in case one of the handlers
255          * doesn't return.  After we clear it, we recheck. */
256         if (ev_mbox->ev_check_bits) {
257                 do {
258                         ev_mbox->ev_check_bits = TRUE;  /* in case we don't return */
259                         cmb();
260                         BITMASK_FOREACH_SET(ev_mbox->ev_bitmap, MAX_NR_EVENT, bit_handler,
261                                             TRUE);
262                         ev_mbox->ev_check_bits = FALSE;
263                         wrmb(); /* check_bits written before we check for it being clear */
264                 } while (!BITMASK_IS_CLEAR(ev_mbox->ev_bitmap, MAX_NR_EVENT));
265         }
266         return retval;
267 }
268
269 /* Empty if the UCQ is empty and the bits don't need checked */
270 bool mbox_is_empty(struct event_mbox *ev_mbox)
271 {
272         return (ucq_is_empty(&ev_mbox->ev_msgs) && (!ev_mbox->ev_check_bits));
273 }
274
275 /* The EV_EVENT handler - extract the ev_q from the message. */
276 void handle_ev_ev(struct event_msg *ev_msg, unsigned int ev_type, void *data)
277 {
278         struct event_queue *ev_q;
279         /* EV_EVENT can't handle not having a message / being a bit.  If we got a
280          * bit message, it's a bug somewhere */
281         assert(ev_msg);
282         ev_q = ev_msg->ev_arg3;
283         /* Same deal, a null ev_q is probably a bug, or someone being a jackass */
284         assert(ev_q);
285         /* Clear pending, so we can start getting INDIRs and IPIs again.  We must
286          * set this before (compared to handle_events, then set it, then handle
287          * again), since there is no guarantee handle_event_q() will return.  If
288          * there is a pending preemption, the vcore quickly yields and will deal
289          * with the remaining events in the future - meaning it won't return to
290          * here. */
291         ev_q->ev_alert_pending = FALSE;
292         wmb();  /* don't let the pending write pass the signaling of an ev recv */
293         handle_event_q(ev_q);
294 }
295
296 /* Handles VCPD events (public and private).  The kernel always sets
297  * notif_pending after posting a message to either public or private mailbox.
298  * When this returns, as far as we are concerned, notif_pending is FALSE.
299  * However, a concurrent kernel writer could have reset it to true.  This is
300  * fine; whenever we leave VC ctx we double check notif_pending.  Returns 1 or 2
301  * if we actually handled a message, 0 o/w.
302  *
303  * WARNING: this might not return and/or current_uthread may change. */
304 int handle_events(uint32_t vcoreid)
305 {
306         struct preempt_data *vcpd = vcpd_of(vcoreid);
307         int retval = 0;
308         vcpd->notif_pending = FALSE;
309         wrmb(); /* prevent future reads from happening before notif_p write */
310         retval += handle_mbox(&vcpd->ev_mbox_private);
311         retval += handle_mbox(&vcpd->ev_mbox_public);
312         return retval;
313 }
314
315 /* Handles the events on ev_q IAW the event_handlers[].  If the ev_q is
316  * application specific, then this will dispatch/handle based on its flags. */
317 void handle_event_q(struct event_queue *ev_q)
318 {
319         /* If the program wants to handle the ev_q on its own: */
320         if (ev_q->ev_flags & (EVENT_JUSTHANDLEIT | EVENT_THREAD)) {
321                 if (!ev_q->ev_handler) {
322                         printf("No ev_handler installed for ev_q %08p, aborting!\n", ev_q);
323                         return;
324                 }
325                 if (ev_q->ev_flags & EVENT_JUSTHANDLEIT) {
326                         /* Remember this can't block or page fault */
327                         ev_q->ev_handler(ev_q);
328                 } else if (ev_q->ev_flags & EVENT_THREAD) {
329                         /* 2LS sched op.  The 2LS can use an existing thread if it wants,
330                          * but do so inside spawn_thread() */
331                         if (sched_ops->spawn_thread)
332                                 sched_ops->spawn_thread((uintptr_t)ev_q->ev_handler, ev_q);
333                         else
334                                 printf("2LS can't spawn a thread for ev_q %08p\n", ev_q);
335                 }
336                 return;
337         }
338         printd("[event] handling ev_q %08p on vcore %d\n", ev_q, vcore_id());
339         /* Raw ev_qs that haven't been connected to an mbox, user bug: */
340         assert(ev_q->ev_mbox);
341         handle_mbox(ev_q->ev_mbox);
342 }
343
344 /* Sends the calling vcore a message to its public mbox.  This is purposefully
345  * limited to just the calling vcore, since in future versions, we can send via
346  * ucqs directly (in many cases).  That will require the caller to be the
347  * vcoreid, due to some preemption recovery issues (another ucq poller is
348  * waiting on us when we got preempted, and we never up nr_cons). */
349 void send_self_vc_msg(struct event_msg *ev_msg)
350 {
351         // TODO: try to use UCQs (requires additional support)
352         /* ev_type actually gets ignored currently.  ev_msg is what matters if it is
353          * non-zero.  FALSE means it's going to the public mbox */
354         sys_self_notify(vcore_id(), ev_msg->ev_type, ev_msg, FALSE);
355 }
356
357 /* Helper: makes the current core handle a remote vcore's VCPD public mbox events.
358  *
359  * Both cases (whether we are handling someone else's already or not) use some
360  * method of telling our future self what to do.  When we aren't already
361  * handling it, we use TLS, and jump to vcore entry.  When we are already
362  * handling, then we send a message to ourself, which we deal with when we
363  * handle our own events (which is later in vcore entry).
364  *
365  * We need to reset the stack and deal with it in vcore entry to avoid recursing
366  * deeply and running off the transition stack.  (handler calling handle event).
367  *
368  * Note that we might not be the one that gets the message we send.  If we pull
369  * a sys_change_to, someone else might be polling our public message box.  All
370  * we're doing is making sure that we don't forget to check rem_vcoreid's mbox.
371  *
372  * Finally, note that this function might not return.  However, it'll handle the
373  * details related to vcpd mboxes, so you don't use the ev_might_not_return()
374  * helpers with this. */
375 void handle_vcpd_mbox(uint32_t rem_vcoreid)
376 {
377         uint32_t vcoreid = vcore_id();
378         struct preempt_data *vcpd = vcpd_of(vcoreid);
379         struct event_msg local_msg = {0};
380         assert(vcoreid != rem_vcoreid);                 /* this shouldn't happen */
381         /* If they are empty, then we're done */
382         if (mbox_is_empty(&vcpd_of(rem_vcoreid)->ev_mbox_public))
383                 return;
384         if (__vc_handle_an_mbox) {
385                 /* we might be already handling them, in which case, abort */
386                 if (__vc_rem_vcoreid == rem_vcoreid)
387                         return;
388                 /* Already handling message for someone, need to send ourselves a
389                  * message to check rem_vcoreid, which we'll process later. */
390                 local_msg.ev_type = EV_CHECK_MSGS;
391                 local_msg.ev_arg2 = rem_vcoreid;        /* 32bit arg */
392                 send_self_vc_msg(&local_msg);
393                 return;
394         }
395         /* No return after here */
396         /* At this point, we aren't in the process of handling someone else's
397          * messages, so just tell our future self what to do */
398         __vc_handle_an_mbox = TRUE;
399         __vc_rem_vcoreid = rem_vcoreid;
400         /* Reset the stack and start over in vcore context */
401         set_stack_pointer((void*)vcpd->vcore_stack);
402         vcore_entry();
403         assert(0);
404 }
405
406 /* Handle remote vcpd public mboxes, if that's what we want to do.  Call this
407  * from vcore entry, pairs with handle_vcpd_mbox(). */
408 void try_handle_remote_mbox(void)
409 {
410         if (__vc_handle_an_mbox) {
411                 handle_mbox(&vcpd_of(__vc_rem_vcoreid)->ev_mbox_public);
412                 /* only clear the flag when we have returned from handling messages.  if
413                  * an event handler (like preempt_recover) doesn't return, we'll clear
414                  * this flag elsewhere. (it's actually not a big deal if we don't). */
415                 cmb();
416                 __vc_handle_an_mbox = FALSE;
417         }
418 }
419
420 /* Event handler helpers */
421
422 /* For event handlers that might not return, we need to call this before the
423  * command that might not return.  In the event we were handling a remote
424  * vcore's messages, it'll send ourselves a messages that we (or someone who
425  * polls us) will get so that someone finishes off that vcore's messages).
426  * Doesn't matter who does, so long as someone does.
427  *
428  * This returns whether or not we were handling someone's messages.  Pass the
429  * parameter to ev_we_returned() */
430 bool ev_might_not_return(void)
431 {
432         struct event_msg local_msg = {0};
433         bool were_handling_remotes = FALSE;
434         if (__vc_handle_an_mbox) {
435                 /* slight chance we finished with their mbox (were on the last one) */
436                 if (!mbox_is_empty(&vcpd_of(__vc_rem_vcoreid)->ev_mbox_public)) {
437                         /* But we aren't, so we'll need to send a message */
438                         local_msg.ev_type = EV_CHECK_MSGS;
439                         local_msg.ev_arg2 = __vc_rem_vcoreid;   /* 32bit arg */
440                         send_self_vc_msg(&local_msg);
441                 }
442                 /* Either way, we're not working on this one now.  Note this is more of
443                  * an optimization - it'd be harmless (I think) to poll another vcore's
444                  * pub mbox once when we pop up in vc_entry in the future */
445                 __vc_handle_an_mbox = FALSE;
446                 return TRUE;
447         }
448         return FALSE;
449 }
450
451 /* Call this when you return, paired up with ev_might_not_return().  If
452  * ev_might_not_return turned off uth_handle, we'll turn it back on. */
453 void ev_we_returned(bool were_handling_remotes)
454 {
455         if (were_handling_remotes)
456                 __vc_handle_an_mbox = TRUE;
457 }
458
459 /* Debugging */
460 void print_ev_msg(struct event_msg *msg)
461 {
462         printf("MSG at %08p\n", msg);
463         printf("\ttype: %d\n", msg->ev_type);
464         printf("\targ1 (16): 0x%4x\n", msg->ev_arg1);
465         printf("\targ2 (32): 0x%8x\n", msg->ev_arg2);
466         printf("\targ3 (32): 0x%8x\n", msg->ev_arg3);
467         printf("\targ4 (64): 0x%16x\n", msg->ev_arg4);
468 }
469
470 /* Uthreads blocking on event queues
471  *
472  * It'd be nice to have a uthread sleep until an event queue has some activity
473  * (e.g. a new message).  It'd also be nice to wake up early with a timer.  It
474  * is tempting to try something like an INDIR and have one evq multiplex two
475  * others (the real event and an alarm).  But then you can't separate the two
476  * streams; what if one thread sleeps on just the event at the same time?  What
477  * if we want to support something like Go's select: a thread wants to block
478  * until there is some activity on some channel?
479  *
480  * Ultimately, we want to allow M uthreads to block on possibly different
481  * subsets of N event queues.
482  *
483  * Every uthread will have a sleep controller, and every event queue will have a
484  * wakeup controller.  There are up to MxN linkage structures connecting these.
485  *
486  * We'll use the event_queue handler to override the default event processing.
487  * This means the event queues that are used for blocking uthreads can *only* be
488  * used for that; the regular event processing will not happen.  This is mostly
489  * true.  It is possible to extract events from an evq's mbox concurrently.
490  *
491  * I briefly considered having one global lock to protect all of the lists and
492  * structures.  That's lousy for the obvious scalability reason, but it seemed
493  * like it'd make things easier, especially when I thought I needed locks in
494  * both the ectlr and the uctlr (in early versions, I considered having the
495  * handler yank itself out of the ectlr, copying a message into that struct, or
496  * o/w needing protection).  On occasion, we run into the "I'd like to split my
497  * lock between two components and still somehow synchronize" issue (e.g. FD
498  * taps, with the FDT lock and the blocking/whatever that goes on in a device).
499  * Whenever that comes up, we usually can get some help from other shared memory
500  * techniques.  For FD taps, it's the kref.  For us, it's post-and-poke, though
501  * it didn't solve all of our problems - I use it as a tool with some basic
502  * shared memory signalling. */
503
504 struct evq_wait_link;
505 TAILQ_HEAD(wait_link_tailq, evq_wait_link);
506
507 /* Bookkeeping for the uthread sleeping on a bunch of event queues.
508  *
509  * Notes on concurrency: most fields are not protected.  check_evqs is racy, and
510  * written to by handlers.  The tailq is only used by the uthread.  blocked is
511  * never concurrently *written*; see __uth_wakeup_poke() for details. */
512 struct uth_sleep_ctlr {
513         struct uthread                          *uth;
514         struct spin_pdr_lock            in_use;
515         bool                                            check_evqs;
516         bool                                            blocked;
517         struct poke_tracker                     poker;
518         struct wait_link_tailq          evqs;
519 };
520
521 /* Attaches to an event_queue (ev_udata), tracks the uthreads for this evq */
522 struct evq_wakeup_ctlr {
523         struct wait_link_tailq          waiters;
524         struct spin_pdr_lock            lock;
525 };
526
527 /* Up to MxN of these, N of them per uthread. */
528 struct evq_wait_link {
529         struct uth_sleep_ctlr           *uth_ctlr;
530         TAILQ_ENTRY(evq_wait_link)      link_uth;
531         struct evq_wakeup_ctlr          *evq_ctlr;
532         TAILQ_ENTRY(evq_wait_link)      link_evq;
533 };
534
535 /* Poke function: ensures the uth managed by uctlr wakes up.  poke() ensures
536  * there is only one thread in this function at a time.  However, it could be
537  * called spuriously, which is why we check 'blocked.' */
538 static void __uth_wakeup_poke(void *arg)
539 {
540         struct uth_sleep_ctlr *uctlr = arg;
541         /* There are no concurrent writes to 'blocked'.  Blocked is only ever
542          * written when the uth sleeps and only ever cleared here.  Once the uth
543          * writes it, it does not write it again until after we clear it.
544          *
545          * This is still racy - we could see !blocked, then blocked gets set.  In
546          * that case, the poke failed, and that is harmless.  The uth will see
547          * 'check_evqs', which was set before poke, which would be before writing
548          * blocked, and the uth checks 'check_evqs' after writing. */
549         if (uctlr->blocked) {
550                 uctlr->blocked = FALSE;
551                 cmb();  /* clear blocked before starting the uth */
552                 uthread_runnable(uctlr->uth);
553         }
554 }
555
556 static void uth_sleep_ctlr_init(struct uth_sleep_ctlr *uctlr,
557                                 struct uthread *uth)
558 {
559         uctlr->uth = uth;
560         spin_pdr_init(&uctlr->in_use);
561         uctlr->check_evqs = FALSE;
562         uctlr->blocked = FALSE;
563         poke_init(&uctlr->poker, __uth_wakeup_poke);
564         TAILQ_INIT(&uctlr->evqs);
565 }
566
567 /* This handler runs when the ev_q is checked.  Instead of doing anything with
568  * the ev_q, we make sure that every uthread that was waiting on us wakes up.
569  * The uthreads could be waiting on several evqs, so there could be multiple
570  * independent wake-up attempts, hence the poke.  Likewise, the uthread could be
571  * awake when we poke.  The uthread will check check_evqs after sleeping, in
572  * case we poke before it blocks (and the poke fails).
573  *
574  * Also, there could be concurrent callers of this handler, and other uthreads
575  * signing up for a wakeup. */
576 void evq_wakeup_handler(struct event_queue *ev_q)
577 {
578         struct evq_wakeup_ctlr *ectlr = ev_q->ev_udata;
579         struct evq_wait_link *i;
580         assert(ectlr);
581         spin_pdr_lock(&ectlr->lock);
582         /* Note we wake up all sleepers, even though only one is likely to get the
583          * message.  See the notes in unlink_ectlr() for more info. */
584         TAILQ_FOREACH(i, &ectlr->waiters, link_evq) {
585                 i->uth_ctlr->check_evqs = TRUE;
586                 cmb();  /* order check write before poke (poke has atomic) */
587                 poke(&i->uth_ctlr->poker, i->uth_ctlr);
588         }
589         spin_pdr_unlock(&ectlr->lock);
590 }
591
592 /* Helper, attaches a wakeup controller to the event queue. */
593 void evq_attach_wakeup_ctlr(struct event_queue *ev_q)
594 {
595         struct evq_wakeup_ctlr *ectlr = malloc(sizeof(struct evq_wakeup_ctlr));
596         memset(ectlr, 0, sizeof(struct evq_wakeup_ctlr));
597         spin_pdr_init(&ectlr->lock);
598         TAILQ_INIT(&ectlr->waiters);
599         ev_q->ev_udata = ectlr;
600         ev_q->ev_handler = evq_wakeup_handler;
601 }
602
603 void evq_remove_wakeup_ctlr(struct event_queue *ev_q)
604 {
605         free(ev_q->ev_udata);
606         ev_q->ev_udata = 0;
607         ev_q->ev_handler = 0;
608 }
609
610 static void link_uctlr_ectlr(struct uth_sleep_ctlr *uctlr,
611                              struct evq_wakeup_ctlr *ectlr,
612                              struct evq_wait_link *link)
613 {
614         /* No lock needed for the uctlr; we're the only one modifying evqs */
615         link->uth_ctlr = uctlr;
616         TAILQ_INSERT_HEAD(&uctlr->evqs, link, link_uth);
617         /* Once we add ourselves to the ectrl list, we could start getting poked */
618         link->evq_ctlr = ectlr;
619         spin_pdr_lock(&ectlr->lock);
620         TAILQ_INSERT_HEAD(&ectlr->waiters, link, link_evq);
621         spin_pdr_unlock(&ectlr->lock);
622 }
623
624 /* Disconnects us from a wakeup controller.
625  *
626  * Our evq handlers wake up *all* uthreads that are waiting for activity
627  * (broadcast).  It's a tradeoff.  If the list of uthreads is long, then it is
628  * wasted effort.  An alternative is to wake up exactly one, with slightly
629  * greater overheads.  In the exactly-one case, multiple handlers could wake
630  * this uth up at once, but we can only extract one message.  If we do the
631  * single wake up, then when we detach from an ectlr, we need to peak in the
632  * mbox to see if it is not empty, and conditionally run its handler again, such
633  * that no uthread sits on a ectlr that has activity/pending messages (in
634  * essence, level triggered). */
635 static void unlink_ectlr(struct evq_wait_link *link)
636 {
637         struct evq_wakeup_ctlr *ectlr = link->evq_ctlr;
638         spin_pdr_lock(&ectlr->lock);
639         TAILQ_REMOVE(&ectlr->waiters, link, link_evq);
640         spin_pdr_unlock(&ectlr->lock);
641 }
642
643 /* Helper: polls all evqs once and extracts the first message available.  The
644  * message is copied into ev_msg, and the evq with the activity is copied into
645  * which_evq (if it is non-zero).  Returns TRUE on success. */
646 static bool extract_evqs_msg(struct event_queue *evqs[], size_t nr_evqs,
647                              struct event_msg *ev_msg,
648                              struct event_queue **which_evq)
649 {
650         struct event_queue *evq_i;
651         bool ret = FALSE;
652         /* We need to have notifs disabled when extracting messages from some
653          * mboxes.  Many mboxes have some form of busy waiting between consumers
654          * (userspace).  If we're just a uthread, we could wind up on a runqueue
655          * somewhere while someone else spins, possibly in VC ctx. */
656         uth_disable_notifs();
657         for (int i = 0; i < nr_evqs; i++) {
658                 evq_i = evqs[i];
659                 if (extract_one_mbox_msg(evq_i->ev_mbox, ev_msg)) {
660                         if (which_evq)
661                                 *which_evq = evq_i;
662                         ret = TRUE;
663                         break;
664                 }
665         }
666         uth_enable_notifs();
667         return ret;
668 }
669
670 /* Yield callback */
671 static void __uth_blockon_evq_cb(struct uthread *uth, void *arg)
672 {
673         struct uth_sleep_ctlr *uctlr = arg;
674         uthread_has_blocked(uth, UTH_EXT_BLK_EVENTQ);
675         cmb();  /* actually block before saying 'blocked' */
676         uctlr->blocked = TRUE;  /* can be woken up now */
677         wrmb(); /* write 'blocked' before read 'check_evqs' */
678         /* If someone set check_evqs, we should wake up.  We're competing with other
679          * wakers via poke (we may have already woken up!). */
680         if (uctlr->check_evqs)
681                 poke(&uctlr->poker, uctlr);
682         /* Once we say we're blocked, we could be woken up (possibly by our poke
683          * here) and the uthread could run on another core.  Holding this lock
684          * prevents the uthread from quickly returning and freeing the memory of
685          * uctrl before we have a chance to check_evqs or poke. */
686         spin_pdr_unlock(&uctlr->in_use);
687 }
688
689 /* Direct version, with *evqs[]. */
690 void uth_blockon_evqs_arr(struct event_msg *ev_msg,
691                           struct event_queue **which_evq,
692                           struct event_queue *evqs[], size_t nr_evqs)
693 {
694         struct uth_sleep_ctlr uctlr;
695         struct evq_wait_link linkage[nr_evqs];
696
697         /* Catch user mistakes.  If they lack a handler, they didn't attach.  They
698          * are probably using our evq_wakeup_handler, but they might have their own
699          * wrapper function. */
700         for (int i = 0; i < nr_evqs; i++)
701                 assert(evqs[i]->ev_handler);
702         /* Check for activity on the evqs before going through the hassle of
703          * sleeping.  ("check, signal, check again" pattern). */
704         if (extract_evqs_msg(evqs, nr_evqs, ev_msg, which_evq))
705                 return;
706         uth_sleep_ctlr_init(&uctlr, current_uthread);
707         memset(linkage, 0, sizeof(struct evq_wait_link) * nr_evqs);
708         for (int i = 0; i < nr_evqs; i++)
709                 link_uctlr_ectlr(&uctlr, (struct evq_wakeup_ctlr*)evqs[i]->ev_udata,
710                                  &linkage[i]);
711         /* Mesa-style sleep until we get a message.  Mesa helps a bit here, since we
712          * can just deregister from them all when we're done.  o/w it is tempting to
713          * have us deregister from *the* one in the handler and extract the message
714          * there; which can be tricky and harder to reason about. */
715         while (1) {
716                 /* We need to make sure only one 'version/ctx' of this thread is active
717                  * at a time.  Later on, we'll unlock in vcore ctx on the other side of
718                  * a yield.  We could restart from the yield, return, and free the uctlr
719                  * before that ctx has a chance to finish. */
720                 spin_pdr_lock(&uctlr.in_use);
721                 /* We're signed up.  We might already have been told to check the evqs,
722                  * or there could be messages still sitting in the evqs.  check_evqs is
723                  * only ever cleared here, and only ever set in evq handlers. */
724                 uctlr.check_evqs = FALSE;
725                 cmb();  /* look for messages after clearing check_evqs */
726                 if (extract_evqs_msg(evqs, nr_evqs, ev_msg, which_evq))
727                         break;
728                 uthread_yield(TRUE, __uth_blockon_evq_cb, &uctlr);
729         }
730         /* On the one hand, it's not necessary to unlock, since the memory will be
731          * freed.  But we do need to go through the process to turn on notifs and
732          * adjust the notif_disabled_depth for the case where we don't yield. */
733         spin_pdr_unlock(&uctlr.in_use);
734         for (int i = 0; i < nr_evqs; i++)
735                 unlink_ectlr(&linkage[i]);
736 }
737
738 /* ... are event_queue *s, nr_evqs of them.  This will block until it can
739  * extract some message from one of evqs.  The message will be placed in ev_msg,
740  * and the particular evq it extracted it from will be placed in which_evq, if
741  * which is non-zero. */
742 void uth_blockon_evqs(struct event_msg *ev_msg, struct event_queue **which_evq,
743                       size_t nr_evqs, ...)
744 {
745         struct event_queue *evqs[nr_evqs];
746         va_list va;
747         va_start(va, nr_evqs);
748         for (int i = 0; i < nr_evqs; i++)
749                 evqs[i] = va_arg(va, struct event_queue *);
750         va_end(va);
751         uth_blockon_evqs_arr(ev_msg, which_evq, evqs, nr_evqs);
752 }
753
754 /* ... are event_queue *s, nr_evqs of them.  This will attempt to extract some
755  * message from one of evqs.  The message will be placed in ev_msg, and the
756  * particular evq it extracted it from will be placed in which_evq.  Returns
757  * TRUE if it extracted a message. */
758 bool uth_check_evqs(struct event_msg *ev_msg, struct event_queue **which_evq,
759                     size_t nr_evqs, ...)
760 {
761         struct event_queue *evqs[nr_evqs];
762         va_list va;
763         va_start(va, nr_evqs);
764         for (int i = 0; i < nr_evqs; i++)
765                 evqs[i] = va_arg(va, struct event_queue *);
766         va_end(va);
767         return extract_evqs_msg(evqs, nr_evqs, ev_msg, which_evq);
768 }