akaros/user/parlib/event.c
<<
>>
Prefs
   1/* Copyright (c) 2011-2014 The Regents of the University of California
   2 * Copyright (c) 2015 Google Inc
   3 * Barret Rhoden <brho@cs.berkeley.edu>
   4 * See LICENSE for details.
   5 *
   6 * Userspace utility functions for receiving events and notifications (IPIs).
   7 * Some are higher level than others; just use what you need. */ 
   8
   9#include <ros/event.h>
  10#include <ros/procdata.h>
  11#include <parlib/ucq.h>
  12#include <parlib/evbitmap.h>
  13#include <parlib/ceq.h>
  14#include <parlib/vcore.h>
  15#include <stdlib.h>
  16#include <string.h>
  17#include <parlib/assert.h>
  18#include <parlib/stdio.h>
  19#include <errno.h>
  20#include <parlib/parlib.h>
  21#include <parlib/event.h>
  22#include <parlib/uthread.h>
  23#include <parlib/spinlock.h>
  24#include <parlib/mcs.h>
  25#include <parlib/poke.h>
  26#include <sys/queue.h>
  27#include <malloc.h>
  28
  29/* For remote VCPD mbox event handling */
  30__thread bool __vc_handle_an_mbox = FALSE;
  31__thread uint32_t __vc_rem_vcoreid;
  32
  33/********* Event_q Setup / Registration  ***********/
  34
  35/* Get event_qs via these interfaces, since eventually we'll want to either
  36 * allocate from pinned memory or use some form of a slab allocator.  Also,
  37 * these stitch up the big_q so its ev_mbox points to its internal mbox.  Never
  38 * access the internal mbox directly.
  39 *
  40 * Raw ones need to have their mailboxes initialized.  If you're making a lot of
  41 * these and they perform their own mmaps (e.g. UCQs), you can do one big mmap
  42 * and init the ucqs on your own, which ought to perform better.
  43 *
  44 * Use the 'regular' one for big_qs if you don't want to worry about the mbox
  45 * initalization */
  46struct event_queue *get_eventq_raw(void)
  47{
  48        /* TODO: (PIN) should be pinned memory */
  49        struct event_queue_big *big_q = malloc(sizeof(struct event_queue_big));
  50        memset(big_q, 0, sizeof(struct event_queue_big));
  51        big_q->ev_mbox = &big_q->ev_imbox;
  52        return (struct event_queue*)big_q;
  53}
  54
  55struct event_queue *get_eventq(int mbox_type)
  56{
  57        struct event_queue *big_q = get_eventq_raw();
  58        event_mbox_init(big_q->ev_mbox, mbox_type);
  59        return big_q;
  60}
  61
  62/* Basic initialization of a single mbox.  If you know the type, you can set up
  63 * the mbox manually with possibly better performance.  For instance, ucq_init()
  64 * calls mmap internally.  You could mmap a huge blob on your own and call
  65 * ucq_raw_init (don't forget to set the mbox_type!) */
  66void event_mbox_init(struct event_mbox *ev_mbox, int mbox_type)
  67{
  68        ev_mbox->type = mbox_type;
  69        switch (ev_mbox->type) {
  70        case (EV_MBOX_UCQ):
  71                ucq_init(&ev_mbox->ucq);
  72                break;
  73        case (EV_MBOX_BITMAP):
  74                evbitmap_init(&ev_mbox->evbm);
  75                break;
  76        case (EV_MBOX_CEQ):
  77                ceq_init(&ev_mbox->ceq, CEQ_OR, CEQ_DEFAULT_SZ, CEQ_DEFAULT_SZ);
  78                break;
  79        default:
  80                printf("Unknown mbox type %d!\n", ev_mbox->type);
  81                break;
  82        }
  83}
  84
  85/* Give it up.  I don't recommend calling these unless you're sure the queues
  86 * aren't in use (unregistered, etc). (TODO: consider some checks for this) */
  87void put_eventq_raw(struct event_queue *ev_q)
  88{
  89        /* if we use something other than malloc, we'll need to be aware that
  90         * ev_q is actually an event_queue_big.  One option is to use the flags,
  91         * though this could be error prone. */
  92        free(ev_q);
  93}
  94
  95void put_eventq(struct event_queue *ev_q)
  96{
  97        event_mbox_cleanup(ev_q->ev_mbox);
  98        put_eventq_raw(ev_q);
  99}
 100
 101void event_mbox_cleanup(struct event_mbox *ev_mbox)
 102{
 103        switch (ev_mbox->type) {
 104        case (EV_MBOX_UCQ):
 105                ucq_free_pgs(&ev_mbox->ucq);
 106                break;
 107        case (EV_MBOX_BITMAP):
 108                evbitmap_cleanup(&ev_mbox->evbm);
 109                break;
 110        case (EV_MBOX_CEQ):
 111                ceq_cleanup(&ev_mbox->ceq);
 112                break;
 113        default:
 114                printf("Unknown mbox type %d!\n", ev_mbox->type);
 115                break;
 116        }
 117}
 118
 119/* Need to point this event_q to an mbox - usually to a vcpd */
 120struct event_queue *get_eventq_slim(void)
 121{
 122        /* TODO: (PIN) should be pinned memory */
 123        struct event_queue *ev_q = malloc(sizeof(struct event_queue));
 124        memset(ev_q, 0, sizeof(struct event_queue));
 125        return ev_q;
 126}
 127
 128/* Gets a small ev_q, with ev_mbox pointing to the vcpd mbox of vcoreid.  If
 129 * ev_flags has EVENT_VCORE_PRIVATE set, it'll give you the private mbox.  o/w,
 130 * you'll get the public one. */
 131struct event_queue *get_eventq_vcpd(uint32_t vcoreid, int ev_flags)
 132{
 133        struct event_queue *ev_q = get_eventq_slim();
 134        if (ev_flags & EVENT_VCORE_PRIVATE)
 135                ev_q->ev_mbox = &vcpd_of(vcoreid)->ev_mbox_private;
 136        else
 137                ev_q->ev_mbox = &vcpd_of(vcoreid)->ev_mbox_public;
 138        return ev_q;
 139}
 140
 141void put_eventq_slim(struct event_queue *ev_q)
 142{
 143        /* if we use something other than malloc, we'll need to be aware that
 144         * ev_q is not an event_queue_big. */
 145        free(ev_q);
 146}
 147
 148void put_eventq_vcpd(struct event_queue *ev_q)
 149{
 150        put_eventq_slim(ev_q);
 151}
 152
 153/* Sets ev_q to be the receiving end for kernel event ev_type */
 154void register_kevent_q(struct event_queue *ev_q, unsigned int ev_type)
 155{
 156        __procdata.kernel_evts[ev_type] = ev_q;
 157}
 158
 159/* Clears the event, returning an ev_q if there was one there.  You'll need to
 160 * free it. */
 161struct event_queue *clear_kevent_q(unsigned int ev_type)
 162{
 163        struct event_queue *ev_q = __procdata.kernel_evts[ev_type];
 164
 165        __procdata.kernel_evts[ev_type] = 0;
 166        return ev_q;
 167}
 168
 169/* Enables an IPI/event combo for ev_type sent to vcoreid's default mbox.  IPI
 170 * if you want one or not.  If you want the event to go to the vcore private
 171 * mbox (meaning no other core should ever handle it), send in
 172 * EVENT_VCORE_PRIVATE with ev_flags.
 173 *
 174 * This is the simplest thing applications may want, and shows how you can put
 175 * the other event functions together to get similar things done. */
 176void enable_kevent(unsigned int ev_type, uint32_t vcoreid, int ev_flags)
 177{
 178        struct event_queue *ev_q = get_eventq_vcpd(vcoreid, ev_flags);
 179
 180        ev_q->ev_flags = ev_flags;
 181        ev_q->ev_vcore = vcoreid;
 182        ev_q->ev_handler = 0;
 183        wmb();  /* make sure ev_q is filled out before registering */
 184        register_kevent_q(ev_q, ev_type);
 185}
 186
 187/* Stop receiving the events (one could be on the way).  Caller needs to be
 188 * careful, since the kernel might be sending an event to the ev_q.  Depending
 189 * on the ev_q, it may be hard to know when it is done (for instance, if all
 190 * syscalls you ever registered with the ev_q are done, then it would be okay).
 191 * o/w, don't free it. */
 192struct event_queue *disable_kevent(unsigned int ev_type)
 193{
 194        return clear_kevent_q(ev_type);
 195}
 196
 197/********* Event Handling / Reception ***********/
 198/* Somewhat ghetto helper, for the lazy.  If all you care about is an event
 199 * number, this will see if the event happened or not.  It will try for a
 200 * message, but if there is none, it will go for a bit.  Note that multiple
 201 * bit messages will turn into just one bit. */
 202unsigned int get_event_type(struct event_mbox *ev_mbox)
 203{
 204        struct event_msg local_msg = {0};
 205
 206        if (extract_one_mbox_msg(ev_mbox, &local_msg))
 207                return local_msg.ev_type;
 208        return EV_NONE;
 209}
 210
 211/* Attempts to register ev_q with sysc, so long as sysc is not done/progress.
 212 * Returns true if it succeeded, and false otherwise.  False means that the
 213 * syscall is done, and does not need an event set (and should be handled
 214 * accordingly).
 215 *
 216 * A copy of this is in glibc/sysdeps/akaros/syscall.c.  Keep them in sync. */
 217bool register_evq(struct syscall *sysc, struct event_queue *ev_q)
 218{
 219        int old_flags;
 220
 221        sysc->ev_q = ev_q;
 222        wrmb(); /* don't let that write pass any future reads (flags) */
 223        /* Try and set the SC_UEVENT flag (so the kernel knows to look at ev_q)
 224         */
 225        do {
 226                /* no cmb() needed, the atomic_read will reread flags */
 227                old_flags = atomic_read(&sysc->flags);
 228                /* Spin if the kernel is mucking with syscall flags */
 229                while (old_flags & SC_K_LOCK)
 230                        old_flags = atomic_read(&sysc->flags);
 231                /* If the kernel finishes while we are trying to sign up for an
 232                 * event, we need to bail out */
 233                if (old_flags & (SC_DONE | SC_PROGRESS)) {
 234                        /* not necessary, but might help with bugs */
 235                        sysc->ev_q = 0;
 236                        return FALSE;
 237                }
 238        } while (!atomic_cas(&sysc->flags, old_flags, old_flags | SC_UEVENT));
 239        return TRUE;
 240}
 241
 242/* De-registers a syscall, so that the kernel will not send an event when it is
 243 * done.  The call could already be SC_DONE, or could even finish while we try
 244 * to unset SC_UEVENT.
 245 *
 246 * There is a chance the kernel sent an event if you didn't do this in time, but
 247 * once this returns, the kernel won't send a message.
 248 *
 249 * If the kernel is trying to send a message right now, this will spin (on
 250 * SC_K_LOCK).  We need to make sure we deregistered, and that if a message
 251 * is coming, that it already was sent (and possibly overflowed), before
 252 * returning. */
 253void deregister_evq(struct syscall *sysc)
 254{
 255        int old_flags;
 256
 257        sysc->ev_q = 0;
 258        wrmb(); /* don't let that write pass any future reads (flags) */
 259        /* Try and unset the SC_UEVENT flag */
 260        do {
 261                /* no cmb() needed, the atomic_read will reread flags */
 262                old_flags = atomic_read(&sysc->flags);
 263                /* Spin if the kernel is mucking with syscall flags */
 264                while (old_flags & SC_K_LOCK)
 265                        old_flags = atomic_read(&sysc->flags);
 266                /* Note we don't care if the SC_DONE flag is getting set.  We
 267                 * just need to avoid clobbering flags */
 268        } while (!atomic_cas(&sysc->flags, old_flags, old_flags & ~SC_UEVENT));
 269}
 270
 271/* Actual Event Handling */
 272
 273/* List of handler lists, process-wide.  They all must return (don't context
 274 * switch to a u_thread) */
 275struct ev_handler *ev_handlers[MAX_NR_EVENT] = {0};
 276spinpdrlock_t ev_h_wlock = SPINPDR_INITIALIZER;
 277
 278int register_ev_handler(unsigned int ev_type, handle_event_t handler,
 279                        void *data)
 280{
 281        /* Nasty uthread code assumes this was malloced */
 282        struct ev_handler *new_h = malloc(sizeof(struct ev_handler));
 283
 284        if (!new_h)
 285                return -1;
 286        new_h->func = handler;
 287        new_h->data = data;
 288        spin_pdr_lock(&ev_h_wlock);
 289        new_h->next = ev_handlers[ev_type];
 290        wmb();  /* make sure new_h is done before publishing to readers */
 291        ev_handlers[ev_type] = new_h;
 292        spin_pdr_unlock(&ev_h_wlock);
 293        return 0;
 294}
 295
 296int deregister_ev_handler(unsigned int ev_type, handle_event_t handler,
 297                          void *data)
 298{
 299        /* TODO: User-level RCU */
 300        printf("Failed to dereg handler, not supported yet!\n");
 301        return -1;
 302}
 303
 304static void run_ev_handlers(unsigned int ev_type, struct event_msg *ev_msg)
 305{
 306        struct ev_handler *handler;
 307
 308        /* TODO: RCU read lock */
 309        handler = ev_handlers[ev_type];
 310        while (handler) {
 311                handler->func(ev_msg, ev_type, handler->data);
 312                handler = handler->next;
 313        }
 314}
 315
 316/* Attempts to extract a message from an mbox, copying it into ev_msg.
 317 * Returns TRUE on success. */
 318bool extract_one_mbox_msg(struct event_mbox *ev_mbox, struct event_msg *ev_msg)
 319{
 320        switch (ev_mbox->type) {
 321        case (EV_MBOX_UCQ):
 322                return get_ucq_msg(&ev_mbox->ucq, ev_msg);
 323        case (EV_MBOX_BITMAP):
 324                return get_evbitmap_msg(&ev_mbox->evbm, ev_msg);
 325        case (EV_MBOX_CEQ):
 326                return get_ceq_msg(&ev_mbox->ceq, ev_msg);
 327        default:
 328                printf("Unknown mbox type %d!\n", ev_mbox->type);
 329                return FALSE;
 330        }
 331}
 332
 333/* Attempts to handle a message.  Returns 1 if we dequeued a msg, 0 o/w. */
 334int handle_one_mbox_msg(struct event_mbox *ev_mbox)
 335{
 336        struct event_msg local_msg;
 337        unsigned int ev_type;
 338
 339        /* extract returns TRUE on success, we return 1. */
 340        if (!extract_one_mbox_msg(ev_mbox, &local_msg))
 341                return 0;
 342        ev_type = local_msg.ev_type;
 343        assert(ev_type < MAX_NR_EVENT);
 344        printd("[event] UCQ (mbox %08p), ev_type: %d\n", ev_mbox, ev_type);
 345        run_ev_handlers(ev_type, &local_msg);
 346        return 1;
 347}
 348
 349/* Handle an mbox.  This is the receive-side processing of an event_queue.  It
 350 * takes an ev_mbox, since the vcpd mbox isn't a regular ev_q.  Returns 1 if we
 351 * handled something, 0 o/w. */
 352int handle_mbox(struct event_mbox *ev_mbox)
 353{
 354        int retval = 0;
 355        printd("[event] handling ev_mbox %08p on vcore %d\n", ev_mbox,
 356               vcore_id());
 357        /* Some stack-smashing bugs cause this to fail */
 358        assert(ev_mbox);
 359        /* Handle all full messages, tracking if we do at least one. */
 360        while (handle_one_mbox_msg(ev_mbox))
 361                retval = 1;
 362        return retval;
 363}
 364
 365/* Empty if the UCQ is empty and the bits don't need checked */
 366bool mbox_is_empty(struct event_mbox *ev_mbox)
 367{
 368        switch (ev_mbox->type) {
 369        case (EV_MBOX_UCQ):
 370                return ucq_is_empty(&ev_mbox->ucq);
 371        case (EV_MBOX_BITMAP):
 372                return evbitmap_is_empty(&ev_mbox->evbm);
 373        case (EV_MBOX_CEQ):
 374                return ceq_is_empty(&ev_mbox->ceq);
 375        default:
 376                printf("Unknown mbox type %d!\n", ev_mbox->type);
 377                return FALSE;
 378        }
 379}
 380
 381/* The EV_EVENT handler - extract the ev_q from the message. */
 382void handle_ev_ev(struct event_msg *ev_msg, unsigned int ev_type, void *data)
 383{
 384        struct event_queue *ev_q;
 385
 386        /* EV_EVENT can't handle not having a message / being a bit.  If we got
 387         * a bit message, it's a bug somewhere */
 388        assert(ev_msg);
 389        ev_q = ev_msg->ev_arg3;
 390        /* Same deal, a null ev_q is probably a bug, or someone being a jackass
 391         */
 392        assert(ev_q);
 393        /* Clear pending, so we can start getting INDIRs and IPIs again.  We
 394         * must set this before (compared to handle_events, then set it, then
 395         * handle again), since there is no guarantee handle_event_q() will
 396         * return.  If there is a pending preemption, the vcore quickly yields
 397         * and will deal with the remaining events in the future - meaning it
 398         * won't return to here. */
 399        ev_q->ev_alert_pending = FALSE;
 400        wmb();/* don't let the pending write pass the signaling of an ev recv */
 401        handle_event_q(ev_q);
 402}
 403
 404/* Handles VCPD events (public and private).  The kernel always sets
 405 * notif_pending after posting a message to either public or private mailbox.
 406 * When this returns, as far as we are concerned, notif_pending is FALSE.
 407 * However, a concurrent kernel writer could have reset it to true.  This is
 408 * fine; whenever we leave VC ctx we double check notif_pending.  Returns 1 or 2
 409 * if we actually handled a message, 0 o/w.
 410 *
 411 * WARNING: this might not return and/or current_uthread may change. */
 412int handle_events(uint32_t vcoreid)
 413{
 414        struct preempt_data *vcpd = vcpd_of(vcoreid);
 415        int retval = 0;
 416
 417        vcpd->notif_pending = FALSE;
 418        wrmb(); /* prevent future reads from happening before notif_p write */
 419        retval += handle_mbox(&vcpd->ev_mbox_private);
 420        retval += handle_mbox(&vcpd->ev_mbox_public);
 421        return retval;
 422}
 423
 424/* Handles the events on ev_q IAW the event_handlers[].  If the ev_q is
 425 * application specific, then this will dispatch/handle based on its flags. */
 426void handle_event_q(struct event_queue *ev_q)
 427{
 428        printd("[event] handling ev_q %08p on vcore %d\n", ev_q, vcore_id());
 429        /* If the program wants to handle the ev_q on its own: */
 430        if (ev_q->ev_handler) {
 431                /* Remember this can't block or page fault */
 432                ev_q->ev_handler(ev_q);
 433                return;
 434        }
 435        /* Raw ev_qs that haven't been connected to an mbox, user bug: */
 436        assert(ev_q->ev_mbox);
 437        /* The "default" ev_handler, common enough that I don't want a func ptr
 438         */
 439        handle_mbox(ev_q->ev_mbox);
 440}
 441
 442/* Sends the calling vcore a message to its public mbox.  This is purposefully
 443 * limited to just the calling vcore, since in future versions, we can send via
 444 * ucqs directly (in many cases).  That will require the caller to be the
 445 * vcoreid, due to some preemption recovery issues (another ucq poller is
 446 * waiting on us when we got preempted, and we never up nr_cons). */
 447void send_self_vc_msg(struct event_msg *ev_msg)
 448{
 449        // TODO: try to use UCQs (requires additional support)
 450        /* ev_type actually gets ignored currently.  ev_msg is what matters if
 451         * it is non-zero.  FALSE means it's going to the public mbox */
 452        sys_self_notify(vcore_id(), ev_msg->ev_type, ev_msg, FALSE);
 453}
 454
 455/* Helper: makes the current core handle a remote vcore's VCPD public mbox
 456 * events.
 457 *
 458 * Both cases (whether we are handling someone else's already or not) use some
 459 * method of telling our future self what to do.  When we aren't already
 460 * handling it, we use TLS, and jump to vcore entry.  When we are already
 461 * handling, then we send a message to ourself, which we deal with when we
 462 * handle our own events (which is later in vcore entry).
 463 *
 464 * We need to reset the stack and deal with it in vcore entry to avoid recursing
 465 * deeply and running off the transition stack.  (handler calling handle event).
 466 *
 467 * Note that we might not be the one that gets the message we send.  If we pull
 468 * a sys_change_to, someone else might be polling our public message box.  All
 469 * we're doing is making sure that we don't forget to check rem_vcoreid's mbox.
 470 *
 471 * Finally, note that this function might not return.  However, it'll handle the
 472 * details related to vcpd mboxes, so you don't use the ev_might_not_return()
 473 * helpers with this. */
 474void handle_vcpd_mbox(uint32_t rem_vcoreid)
 475{
 476        uint32_t vcoreid = vcore_id();
 477        struct preempt_data *vcpd = vcpd_of(vcoreid);
 478        struct event_msg local_msg = {0};
 479        assert(vcoreid != rem_vcoreid);
 480        /* If they are empty, then we're done */
 481        if (mbox_is_empty(&vcpd_of(rem_vcoreid)->ev_mbox_public))
 482                return;
 483        if (__vc_handle_an_mbox) {
 484                /* we might be already handling them, in which case, abort */
 485                if (__vc_rem_vcoreid == rem_vcoreid)
 486                        return;
 487                /* Already handling message for someone, need to send ourselves
 488                 * a message to check rem_vcoreid, which we'll process later. */
 489                local_msg.ev_type = EV_CHECK_MSGS;
 490                local_msg.ev_arg2 = rem_vcoreid;        /* 32bit arg */
 491                send_self_vc_msg(&local_msg);
 492                return;
 493        }
 494        /* No return after here */
 495        /* At this point, we aren't in the process of handling someone else's
 496         * messages, so just tell our future self what to do */
 497        __vc_handle_an_mbox = TRUE;
 498        __vc_rem_vcoreid = rem_vcoreid;
 499        /* Reset the stack and start over in vcore context */
 500        set_stack_pointer((void*)vcpd->vcore_stack);
 501        vcore_entry();
 502        assert(0);
 503}
 504
 505/* Handle remote vcpd public mboxes, if that's what we want to do.  Call this
 506 * from vcore entry, pairs with handle_vcpd_mbox(). */
 507void try_handle_remote_mbox(void)
 508{
 509        if (__vc_handle_an_mbox) {
 510                handle_mbox(&vcpd_of(__vc_rem_vcoreid)->ev_mbox_public);
 511                /* only clear the flag when we have returned from handling
 512                 * messages.  if an event handler (like preempt_recover) doesn't
 513                 * return, we'll clear this flag elsewhere. (it's actually not a
 514                 * big deal if we don't). */
 515                cmb();
 516                __vc_handle_an_mbox = FALSE;
 517        }
 518}
 519
 520/* Event handler helpers */
 521
 522/* For event handlers that might not return, we need to call this before the
 523 * command that might not return.  In the event we were handling a remote
 524 * vcore's messages, it'll send ourselves a messages that we (or someone who
 525 * polls us) will get so that someone finishes off that vcore's messages).
 526 * Doesn't matter who does, so long as someone does.
 527 *
 528 * This returns whether or not we were handling someone's messages.  Pass the
 529 * parameter to ev_we_returned() */
 530bool ev_might_not_return(void)
 531{
 532        struct event_msg local_msg = {0};
 533        bool were_handling_remotes = FALSE;
 534        if (__vc_handle_an_mbox) {
 535                /* slight chance we finished with their mbox (were on the last
 536                 * one) */
 537                if (!mbox_is_empty(&vcpd_of(__vc_rem_vcoreid)->ev_mbox_public))
 538                {
 539                        /* But we aren't, so we'll need to send a message */
 540                        local_msg.ev_type = EV_CHECK_MSGS;
 541                        local_msg.ev_arg2 = __vc_rem_vcoreid;   /* 32bit arg */
 542                        send_self_vc_msg(&local_msg);
 543                }
 544                /* Either way, we're not working on this one now.  Note this is
 545                 * more of an optimization - it'd be harmless (I think) to poll
 546                 * another vcore's pub mbox once when we pop up in vc_entry in
 547                 * the future */
 548                __vc_handle_an_mbox = FALSE;
 549                return TRUE;
 550        }
 551        return FALSE;
 552}
 553
 554/* Call this when you return, paired up with ev_might_not_return().  If
 555 * ev_might_not_return turned off uth_handle, we'll turn it back on. */
 556void ev_we_returned(bool were_handling_remotes)
 557{
 558        if (were_handling_remotes)
 559                __vc_handle_an_mbox = TRUE;
 560}
 561
 562/* Debugging */
 563void print_ev_msg(struct event_msg *msg)
 564{
 565        printf("MSG at %08p\n", msg);
 566        printf("\ttype: %d\n", msg->ev_type);
 567        printf("\targ1 (16): 0x%4x\n", msg->ev_arg1);
 568        printf("\targ2 (32): 0x%8x\n", msg->ev_arg2);
 569        printf("\targ3 (32): 0x%8x\n", msg->ev_arg3);
 570        printf("\targ4 (64): 0x%16x\n", msg->ev_arg4);
 571}
 572
 573/* Uthreads blocking on event queues
 574 *
 575 * It'd be nice to have a uthread sleep until an event queue has some activity
 576 * (e.g. a new message).  It'd also be nice to wake up early with a timer.  It
 577 * is tempting to try something like an INDIR and have one evq multiplex two
 578 * others (the real event and an alarm).  But then you can't separate the two
 579 * streams; what if one thread sleeps on just the event at the same time?  What
 580 * if we want to support something like Go's select: a thread wants to block
 581 * until there is some activity on some channel?
 582 *
 583 * Ultimately, we want to allow M uthreads to block on possibly different
 584 * subsets of N event queues.
 585 *
 586 * Every uthread will have a sleep controller, and every event queue will have a
 587 * wakeup controller.  There are up to MxN linkage structures connecting these.
 588 *
 589 * We'll use the event_queue handler to override the default event processing.
 590 * This means the event queues that are used for blocking uthreads can *only* be
 591 * used for that; the regular event processing will not happen.  This is mostly
 592 * true.  It is possible to extract events from an evq's mbox concurrently.
 593 *
 594 * I briefly considered having one global lock to protect all of the lists and
 595 * structures.  That's lousy for the obvious scalability reason, but it seemed
 596 * like it'd make things easier, especially when I thought I needed locks in
 597 * both the ectlr and the uctlr (in early versions, I considered having the
 598 * handler yank itself out of the ectlr, copying a message into that struct, or
 599 * o/w needing protection).  On occasion, we run into the "I'd like to split my
 600 * lock between two components and still somehow synchronize" issue (e.g. FD
 601 * taps, with the FDT lock and the blocking/whatever that goes on in a device).
 602 * Whenever that comes up, we usually can get some help from other shared memory
 603 * techniques.  For FD taps, it's the kref.  For us, it's post-and-poke, though
 604 * it didn't solve all of our problems - I use it as a tool with some basic
 605 * shared memory signalling. */
 606
 607struct evq_wait_link;
 608TAILQ_HEAD(wait_link_tailq, evq_wait_link);
 609
 610/* Bookkeeping for the uthread sleeping on a bunch of event queues.
 611 *
 612 * Notes on concurrency: most fields are not protected.  check_evqs is racy, and
 613 * written to by handlers.  The tailq is only used by the uthread.  blocked is
 614 * never concurrently *written*; see __uth_wakeup_poke() for details. */
 615struct uth_sleep_ctlr {
 616        struct uthread                  *uth;
 617        struct spin_pdr_lock            in_use;
 618        bool                            check_evqs;
 619        bool                            blocked;
 620        struct poke_tracker             poker;
 621        struct wait_link_tailq          evqs;
 622};
 623
 624/* Attaches to an event_queue (ev_udata), tracks the uthreads for this evq */
 625struct evq_wakeup_ctlr {
 626        /* If we ever use a sync_obj, that would replace waiters.  But also note
 627         * that we want a pointer to something other than the uthread, and currently
 628         * we also wake all threads - there's no scheduling decision. */
 629        struct wait_link_tailq          waiters;
 630        struct spin_pdr_lock            lock;
 631};
 632
 633/* Up to MxN of these, N of them per uthread. */
 634struct evq_wait_link {
 635        struct uth_sleep_ctlr           *uth_ctlr;
 636        TAILQ_ENTRY(evq_wait_link)      link_uth;
 637        struct evq_wakeup_ctlr          *evq_ctlr;
 638        TAILQ_ENTRY(evq_wait_link)      link_evq;
 639};
 640
 641/* Poke function: ensures the uth managed by uctlr wakes up.  poke() ensures
 642 * there is only one thread in this function at a time.  However, it could be
 643 * called spuriously, which is why we check 'blocked.' */
 644static void __uth_wakeup_poke(void *arg)
 645{
 646        struct uth_sleep_ctlr *uctlr = arg;
 647
 648        /* There are no concurrent writes to 'blocked'.  Blocked is only ever
 649         * written when the uth sleeps and only ever cleared here.  Once the uth
 650         * writes it, it does not write it again until after we clear it.
 651         *
 652         * This is still racy - we could see !blocked, then blocked gets set.
 653         * In that case, the poke failed, and that is harmless.  The uth will
 654         * see 'check_evqs', which was set before poke, which would be before
 655         * writing blocked, and the uth checks 'check_evqs' after writing. */
 656        if (uctlr->blocked) {
 657                uctlr->blocked = FALSE;
 658                cmb();  /* clear blocked before starting the uth */
 659                uthread_runnable(uctlr->uth);
 660        }
 661}
 662
 663static void uth_sleep_ctlr_init(struct uth_sleep_ctlr *uctlr,
 664                                struct uthread *uth)
 665{
 666        uctlr->uth = uth;
 667        spin_pdr_init(&uctlr->in_use);
 668        uctlr->check_evqs = FALSE;
 669        uctlr->blocked = FALSE;
 670        poke_init(&uctlr->poker, __uth_wakeup_poke);
 671        TAILQ_INIT(&uctlr->evqs);
 672}
 673
 674/* This handler runs when the ev_q is checked.  Instead of doing anything with
 675 * the ev_q, we make sure that every uthread that was waiting on us wakes up.
 676 * The uthreads could be waiting on several evqs, so there could be multiple
 677 * independent wake-up attempts, hence the poke.  Likewise, the uthread could be
 678 * awake when we poke.  The uthread will check check_evqs after sleeping, in
 679 * case we poke before it blocks (and the poke fails).
 680 *
 681 * Also, there could be concurrent callers of this handler, and other uthreads
 682 * signing up for a wakeup. */
 683void evq_wakeup_handler(struct event_queue *ev_q)
 684{
 685        struct evq_wakeup_ctlr *ectlr = ev_q->ev_udata;
 686        struct evq_wait_link *i;
 687
 688        assert(ectlr);
 689        spin_pdr_lock(&ectlr->lock);
 690        /* Note we wake up all sleepers, even though only one is likely to get
 691         * the message.  See the notes in unlink_ectlr() for more info. */
 692        TAILQ_FOREACH(i, &ectlr->waiters, link_evq) {
 693                i->uth_ctlr->check_evqs = TRUE;
 694                cmb();  /* order check write before poke (poke has atomic) */
 695                poke(&i->uth_ctlr->poker, i->uth_ctlr);
 696        }
 697        spin_pdr_unlock(&ectlr->lock);
 698}
 699
 700/* Helper, attaches a wakeup controller to the event queue. */
 701void evq_attach_wakeup_ctlr(struct event_queue *ev_q)
 702{
 703        struct evq_wakeup_ctlr *ectlr = malloc(sizeof(struct evq_wakeup_ctlr));
 704
 705        memset(ectlr, 0, sizeof(struct evq_wakeup_ctlr));
 706        spin_pdr_init(&ectlr->lock);
 707        TAILQ_INIT(&ectlr->waiters);
 708        ev_q->ev_udata = ectlr;
 709        ev_q->ev_handler = evq_wakeup_handler;
 710}
 711
 712void evq_remove_wakeup_ctlr(struct event_queue *ev_q)
 713{
 714        free(ev_q->ev_udata);
 715        ev_q->ev_udata = 0;
 716        ev_q->ev_handler = 0;
 717}
 718
 719static void link_uctlr_ectlr(struct uth_sleep_ctlr *uctlr,
 720                             struct evq_wakeup_ctlr *ectlr,
 721                             struct evq_wait_link *link)
 722{
 723        /* No lock needed for the uctlr; we're the only one modifying evqs */
 724        link->uth_ctlr = uctlr;
 725        TAILQ_INSERT_HEAD(&uctlr->evqs, link, link_uth);
 726        /* Once we add ourselves to the ectrl list, we could start getting poked
 727         */
 728        link->evq_ctlr = ectlr;
 729        spin_pdr_lock(&ectlr->lock);
 730        TAILQ_INSERT_HEAD(&ectlr->waiters, link, link_evq);
 731        spin_pdr_unlock(&ectlr->lock);
 732}
 733
 734/* Disconnects us from a wakeup controller.
 735 *
 736 * Our evq handlers wake up *all* uthreads that are waiting for activity
 737 * (broadcast).  It's a tradeoff.  If the list of uthreads is long, then it is
 738 * wasted effort.  An alternative is to wake up exactly one, with slightly
 739 * greater overheads.  In the exactly-one case, multiple handlers could wake
 740 * this uth up at once, but we can only extract one message.  If we do the
 741 * single wake up, then when we detach from an ectlr, we need to peak in the
 742 * mbox to see if it is not empty, and conditionally run its handler again, such
 743 * that no uthread sits on a ectlr that has activity/pending messages (in
 744 * essence, level triggered). */
 745static void unlink_ectlr(struct evq_wait_link *link)
 746{
 747        struct evq_wakeup_ctlr *ectlr = link->evq_ctlr;
 748
 749        spin_pdr_lock(&ectlr->lock);
 750        TAILQ_REMOVE(&ectlr->waiters, link, link_evq);
 751        spin_pdr_unlock(&ectlr->lock);
 752}
 753
 754/* Helper: polls all evqs once and extracts the first message available.  The
 755 * message is copied into ev_msg, and the evq with the activity is copied into
 756 * which_evq (if it is non-zero).  Returns TRUE on success. */
 757static bool extract_evqs_msg(struct event_queue *evqs[], size_t nr_evqs,
 758                             struct event_msg *ev_msg,
 759                             struct event_queue **which_evq)
 760{
 761        struct event_queue *evq_i;
 762        bool ret = FALSE;
 763
 764        /* We need to have notifs disabled when extracting messages from some
 765         * mboxes.  Many mboxes have some form of busy waiting between consumers
 766         * (userspace).  If we're just a uthread, we could wind up on a runqueue
 767         * somewhere while someone else spins, possibly in VC ctx. */
 768        uth_disable_notifs();
 769        for (int i = 0; i < nr_evqs; i++) {
 770                evq_i = evqs[i];
 771                if (extract_one_mbox_msg(evq_i->ev_mbox, ev_msg)) {
 772                        if (which_evq)
 773                                *which_evq = evq_i;
 774                        ret = TRUE;
 775                        break;
 776                }
 777        }
 778        uth_enable_notifs();
 779        return ret;
 780}
 781
 782/* Yield callback */
 783static void __uth_blockon_evq_cb(struct uthread *uth, void *arg)
 784{
 785        struct uth_sleep_ctlr *uctlr = arg;
 786
 787        uthread_has_blocked(uth, UTH_EXT_BLK_EVENTQ);
 788        cmb();  /* actually block before saying 'blocked' */
 789        uctlr->blocked = TRUE;  /* can be woken up now */
 790        wrmb(); /* write 'blocked' before read 'check_evqs' */
 791        /* If someone set check_evqs, we should wake up.  We're competing with
 792         * other wakers via poke (we may have already woken up!). */
 793        if (uctlr->check_evqs)
 794                poke(&uctlr->poker, uctlr);
 795        /* Once we say we're blocked, we could be woken up (possibly by our poke
 796         * here) and the uthread could run on another core.  Holding this lock
 797         * prevents the uthread from quickly returning and freeing the memory of
 798         * uctrl before we have a chance to check_evqs or poke. */
 799        spin_pdr_unlock(&uctlr->in_use);
 800}
 801
 802/* Direct version, with *evqs[]. */
 803void uth_blockon_evqs_arr(struct event_msg *ev_msg,
 804                          struct event_queue **which_evq,
 805                          struct event_queue *evqs[], size_t nr_evqs)
 806{
 807        struct uth_sleep_ctlr uctlr;
 808        struct evq_wait_link linkage[nr_evqs];
 809
 810        /* Catch user mistakes.  If they lack a handler, they didn't attach.
 811         * They are probably using our evq_wakeup_handler, but they might have
 812         * their own wrapper function. */
 813        for (int i = 0; i < nr_evqs; i++)
 814                assert(evqs[i]->ev_handler);
 815        /* Check for activity on the evqs before going through the hassle of
 816         * sleeping.  ("check, signal, check again" pattern). */
 817        if (extract_evqs_msg(evqs, nr_evqs, ev_msg, which_evq))
 818                return;
 819        uth_sleep_ctlr_init(&uctlr, current_uthread);
 820        memset(linkage, 0, sizeof(struct evq_wait_link) * nr_evqs);
 821        for (int i = 0; i < nr_evqs; i++)
 822                link_uctlr_ectlr(&uctlr,
 823                                 (struct evq_wakeup_ctlr*)evqs[i]->ev_udata,
 824                                 &linkage[i]);
 825        /* Mesa-style sleep until we get a message.  Mesa helps a bit here,
 826         * since we can just deregister from them all when we're done.  o/w it
 827         * is tempting to have us deregister from *the* one in the handler and
 828         * extract the message there; which can be tricky and harder to reason
 829         * about. */
 830        while (1) {
 831                /* We need to make sure only one 'version/ctx' of this thread is
 832                 * active at a time.  Later on, we'll unlock in vcore ctx on the
 833                 * other side of a yield.  We could restart from the yield,
 834                 * return, and free the uctlr before that ctx has a chance to
 835                 * finish. */
 836                spin_pdr_lock(&uctlr.in_use);
 837                /* We're signed up.  We might already have been told to check
 838                 * the evqs, or there could be messages still sitting in the
 839                 * evqs.  check_evqs is only ever cleared here, and only ever
 840                 * set in evq handlers. */
 841                uctlr.check_evqs = FALSE;
 842                cmb();  /* look for messages after clearing check_evqs */
 843                if (extract_evqs_msg(evqs, nr_evqs, ev_msg, which_evq))
 844                        break;
 845                uthread_yield(TRUE, __uth_blockon_evq_cb, &uctlr);
 846        }
 847        /* On the one hand, it's not necessary to unlock, since the memory will
 848         * be freed.  But we do need to go through the process to turn on notifs
 849         * and adjust the notif_disabled_depth for the case where we don't
 850         * yield. */
 851        spin_pdr_unlock(&uctlr.in_use);
 852        for (int i = 0; i < nr_evqs; i++)
 853                unlink_ectlr(&linkage[i]);
 854}
 855
 856/* ... are event_queue *s, nr_evqs of them.  This will block until it can
 857 * extract some message from one of evqs.  The message will be placed in ev_msg,
 858 * and the particular evq it extracted it from will be placed in which_evq, if
 859 * which is non-zero. */
 860void uth_blockon_evqs(struct event_msg *ev_msg, struct event_queue **which_evq,
 861                      size_t nr_evqs, ...)
 862{
 863        struct event_queue *evqs[nr_evqs];
 864        va_list va;
 865
 866        va_start(va, nr_evqs);
 867        for (int i = 0; i < nr_evqs; i++)
 868                evqs[i] = va_arg(va, struct event_queue *);
 869        va_end(va);
 870        uth_blockon_evqs_arr(ev_msg, which_evq, evqs, nr_evqs);
 871}
 872
 873/* ... are event_queue *s, nr_evqs of them.  This will attempt to extract some
 874 * message from one of evqs.  The message will be placed in ev_msg, and the
 875 * particular evq it extracted it from will be placed in which_evq.  Returns
 876 * TRUE if it extracted a message. */
 877bool uth_check_evqs(struct event_msg *ev_msg, struct event_queue **which_evq,
 878                    size_t nr_evqs, ...)
 879{
 880        struct event_queue *evqs[nr_evqs];
 881        va_list va;
 882
 883        va_start(va, nr_evqs);
 884        for (int i = 0; i < nr_evqs; i++)
 885                evqs[i] = va_arg(va, struct event_queue *);
 886        va_end(va);
 887        return extract_evqs_msg(evqs, nr_evqs, ev_msg, which_evq);
 888}
 889