Use mutexes in epoll instead of spinlocks
[akaros.git] / user / parlib / event.c
index e3c58e8..64dfaa0 100644 (file)
@@ -9,11 +9,12 @@
 #include <ros/event.h>
 #include <ros/procdata.h>
 #include <parlib/ucq.h>
-#include <parlib/bitmask.h>
+#include <parlib/evbitmap.h>
+#include <parlib/ceq.h>
 #include <parlib/vcore.h>
 #include <stdlib.h>
 #include <string.h>
-#include <assert.h>
+#include <parlib/assert.h>
 #include <errno.h>
 #include <parlib/parlib.h>
 #include <parlib/event.h>
@@ -35,13 +36,13 @@ __thread uint32_t __vc_rem_vcoreid;
  * these stitch up the big_q so its ev_mbox points to its internal mbox.  Never
  * access the internal mbox directly.
  *
- * Raw ones need to have their UCQs initialized.  If you're making a lot of
- * these, you can do one big mmap and init the ucqs on your own, which ought to
- * perform better.
+ * Raw ones need to have their mailboxes initialized.  If you're making a lot of
+ * these and they perform their own mmaps (e.g. UCQs), you can do one big mmap
+ * and init the ucqs on your own, which ought to perform better.
  *
- * Use the 'regular' one for big_qs if you don't want to worry about the ucq
+ * Use the 'regular' one for big_qs if you don't want to worry about the mbox
  * initalization */
-struct event_queue *get_big_event_q_raw(void)
+struct event_queue *get_eventq_raw(void)
 {
        /* TODO: (PIN) should be pinned memory */
        struct event_queue_big *big_q = malloc(sizeof(struct event_queue_big));
@@ -50,17 +51,39 @@ struct event_queue *get_big_event_q_raw(void)
        return (struct event_queue*)big_q;
 }
 
-struct event_queue *get_big_event_q(void)
+struct event_queue *get_eventq(int mbox_type)
 {
-       struct event_queue *big_q = get_big_event_q_raw();
-       /* uses the simpler, internally mmapping ucq_init() */
-       ucq_init(&big_q->ev_mbox->ev_msgs);
+       struct event_queue *big_q = get_eventq_raw();
+       event_mbox_init(big_q->ev_mbox, mbox_type);
        return big_q;
 }
 
+/* Basic initialization of a single mbox.  If you know the type, you can set up
+ * the mbox manually with possibly better performance.  For instance, ucq_init()
+ * calls mmap internally.  You could mmap a huge blob on your own and call
+ * ucq_raw_init (don't forget to set the mbox_type!) */
+void event_mbox_init(struct event_mbox *ev_mbox, int mbox_type)
+{
+       ev_mbox->type = mbox_type;
+       switch (ev_mbox->type) {
+               case (EV_MBOX_UCQ):
+                       ucq_init(&ev_mbox->ucq);
+                       break;
+               case (EV_MBOX_BITMAP):
+                       evbitmap_init(&ev_mbox->evbm);
+                       break;
+               case (EV_MBOX_CEQ):
+                       ceq_init(&ev_mbox->ceq, CEQ_OR, CEQ_DEFAULT_SZ, CEQ_DEFAULT_SZ);
+                       break;
+               default:
+                       printf("Unknown mbox type %d!\n", ev_mbox->type);
+                       break;
+       }
+}
+
 /* Give it up.  I don't recommend calling these unless you're sure the queues
  * aren't in use (unregistered, etc). (TODO: consider some checks for this) */
-void put_big_event_q_raw(struct event_queue *ev_q)
+void put_eventq_raw(struct event_queue *ev_q)
 {
        /* if we use something other than malloc, we'll need to be aware that ev_q
         * is actually an event_queue_big.  One option is to use the flags, though
@@ -68,14 +91,32 @@ void put_big_event_q_raw(struct event_queue *ev_q)
        free(ev_q);
 }
 
-void put_big_event_q(struct event_queue *ev_q)
+void put_eventq(struct event_queue *ev_q)
+{
+       event_mbox_cleanup(ev_q->ev_mbox);
+       put_eventq_raw(ev_q);
+}
+
+void event_mbox_cleanup(struct event_mbox *ev_mbox)
 {
-       ucq_free_pgs(&ev_q->ev_mbox->ev_msgs);
-       put_big_event_q_raw(ev_q);
+       switch (ev_mbox->type) {
+               case (EV_MBOX_UCQ):
+                       ucq_free_pgs(&ev_mbox->ucq);
+                       break;
+               case (EV_MBOX_BITMAP):
+                       evbitmap_cleanup(&ev_mbox->evbm);
+                       break;
+               case (EV_MBOX_CEQ):
+                       ceq_cleanup(&ev_mbox->ceq);
+                       break;
+               default:
+                       printf("Unknown mbox type %d!\n", ev_mbox->type);
+                       break;
+       }
 }
 
 /* Need to point this event_q to an mbox - usually to a vcpd */
-struct event_queue *get_event_q(void)
+struct event_queue *get_eventq_slim(void)
 {
        /* TODO: (PIN) should be pinned memory */
        struct event_queue *ev_q = malloc(sizeof(struct event_queue));
@@ -86,9 +127,9 @@ struct event_queue *get_event_q(void)
 /* Gets a small ev_q, with ev_mbox pointing to the vcpd mbox of vcoreid.  If
  * ev_flags has EVENT_VCORE_PRIVATE set, it'll give you the private mbox.  o/w,
  * you'll get the public one. */
-struct event_queue *get_event_q_vcpd(uint32_t vcoreid, int ev_flags)
+struct event_queue *get_eventq_vcpd(uint32_t vcoreid, int ev_flags)
 {
-       struct event_queue *ev_q = get_event_q();
+       struct event_queue *ev_q = get_eventq_slim();
        if (ev_flags & EVENT_VCORE_PRIVATE)
                ev_q->ev_mbox = &vcpd_of(vcoreid)->ev_mbox_private;
        else
@@ -96,13 +137,18 @@ struct event_queue *get_event_q_vcpd(uint32_t vcoreid, int ev_flags)
        return ev_q;
 }
 
-void put_event_q(struct event_queue *ev_q)
+void put_eventq_slim(struct event_queue *ev_q)
 {
        /* if we use something other than malloc, we'll need to be aware that ev_q
         * is not an event_queue_big. */
        free(ev_q);
 }
 
+void put_eventq_vcpd(struct event_queue *ev_q)
+{
+       put_eventq_slim(ev_q);
+}
+
 /* Sets ev_q to be the receiving end for kernel event ev_type */
 void register_kevent_q(struct event_queue *ev_q, unsigned int ev_type)
 {
@@ -127,7 +173,7 @@ struct event_queue *clear_kevent_q(unsigned int ev_type)
  * the other event functions together to get similar things done. */
 void enable_kevent(unsigned int ev_type, uint32_t vcoreid, int ev_flags)
 {
-       struct event_queue *ev_q = get_event_q_vcpd(vcoreid, ev_flags);
+       struct event_queue *ev_q = get_eventq_vcpd(vcoreid, ev_flags);
        ev_q->ev_flags = ev_flags;
        ev_q->ev_vcore = vcoreid;
        ev_q->ev_handler = 0;
@@ -156,17 +202,65 @@ unsigned int get_event_type(struct event_mbox *ev_mbox)
 
        if (extract_one_mbox_msg(ev_mbox, &local_msg))
                return local_msg.ev_type;
-       if (BITMASK_IS_CLEAR(&ev_mbox->ev_bitmap, MAX_NR_EVENT))
-               return EV_NONE; /* aka, 0 */
-       for (int i = 0; i < MAX_NR_EVENT; i++) {
-               if (GET_BITMASK_BIT(ev_mbox->ev_bitmap, i)) {
-                       CLR_BITMASK_BIT_ATOMIC(ev_mbox->ev_bitmap, i);
-                       return i;
-               }
-       }
        return EV_NONE;
 }
 
+/* Attempts to register ev_q with sysc, so long as sysc is not done/progress.
+ * Returns true if it succeeded, and false otherwise.  False means that the
+ * syscall is done, and does not need an event set (and should be handled
+ * accordingly).
+ *
+ * A copy of this is in glibc/sysdeps/akaros/syscall.c.  Keep them in sync. */
+bool register_evq(struct syscall *sysc, struct event_queue *ev_q)
+{
+       int old_flags;
+       sysc->ev_q = ev_q;
+       wrmb(); /* don't let that write pass any future reads (flags) */
+       /* Try and set the SC_UEVENT flag (so the kernel knows to look at ev_q) */
+       do {
+               /* no cmb() needed, the atomic_read will reread flags */
+               old_flags = atomic_read(&sysc->flags);
+               /* Spin if the kernel is mucking with syscall flags */
+               while (old_flags & SC_K_LOCK)
+                       old_flags = atomic_read(&sysc->flags);
+               /* If the kernel finishes while we are trying to sign up for an event,
+                * we need to bail out */
+               if (old_flags & (SC_DONE | SC_PROGRESS)) {
+                       sysc->ev_q = 0;         /* not necessary, but might help with bugs */
+                       return FALSE;
+               }
+       } while (!atomic_cas(&sysc->flags, old_flags, old_flags | SC_UEVENT));
+       return TRUE;
+}
+
+/* De-registers a syscall, so that the kernel will not send an event when it is
+ * done.  The call could already be SC_DONE, or could even finish while we try
+ * to unset SC_UEVENT.
+ *
+ * There is a chance the kernel sent an event if you didn't do this in time, but
+ * once this returns, the kernel won't send a message.
+ *
+ * If the kernel is trying to send a message right now, this will spin (on
+ * SC_K_LOCK).  We need to make sure we deregistered, and that if a message
+ * is coming, that it already was sent (and possibly overflowed), before
+ * returning. */
+void deregister_evq(struct syscall *sysc)
+{
+       int old_flags;
+       sysc->ev_q = 0;
+       wrmb(); /* don't let that write pass any future reads (flags) */
+       /* Try and unset the SC_UEVENT flag */
+       do {
+               /* no cmb() needed, the atomic_read will reread flags */
+               old_flags = atomic_read(&sysc->flags);
+               /* Spin if the kernel is mucking with syscall flags */
+               while (old_flags & SC_K_LOCK)
+                       old_flags = atomic_read(&sysc->flags);
+               /* Note we don't care if the SC_DONE flag is getting set.  We just need
+                * to avoid clobbering flags */
+       } while (!atomic_cas(&sysc->flags, old_flags, old_flags & ~SC_UEVENT));
+}
+
 /* Actual Event Handling */
 
 /* List of handler lists, process-wide.  They all must return (don't context
@@ -212,8 +306,17 @@ static void run_ev_handlers(unsigned int ev_type, struct event_msg *ev_msg)
  * Returns TRUE on success. */
 bool extract_one_mbox_msg(struct event_mbox *ev_mbox, struct event_msg *ev_msg)
 {
-       /* get_ucq returns 0 on success, -1 on empty */
-       return get_ucq_msg(&ev_mbox->ev_msgs, ev_msg) == 0;
+       switch (ev_mbox->type) {
+               case (EV_MBOX_UCQ):
+                       return get_ucq_msg(&ev_mbox->ucq, ev_msg);
+               case (EV_MBOX_BITMAP):
+                       return get_evbitmap_msg(&ev_mbox->evbm, ev_msg);
+               case (EV_MBOX_CEQ):
+                       return get_ceq_msg(&ev_mbox->ceq, ev_msg);
+               default:
+                       printf("Unknown mbox type %d!\n", ev_mbox->type);
+                       return FALSE;
+       }
 }
 
 /* Attempts to handle a message.  Returns 1 if we dequeued a msg, 0 o/w. */
@@ -237,39 +340,29 @@ int handle_one_mbox_msg(struct event_mbox *ev_mbox)
 int handle_mbox(struct event_mbox *ev_mbox)
 {
        int retval = 0;
-       uint32_t vcoreid = vcore_id();
-       void bit_handler(unsigned int bit) {
-               printd("[event] Bit: ev_type: %d\n", bit);
-               run_ev_handlers(bit, 0);
-               retval = 1;
-               /* Consider checking the queue for incoming messages while we're here */
-       }
        printd("[event] handling ev_mbox %08p on vcore %d\n", ev_mbox, vcore_id());
        /* Some stack-smashing bugs cause this to fail */
        assert(ev_mbox);
        /* Handle all full messages, tracking if we do at least one. */
        while (handle_one_mbox_msg(ev_mbox))
                retval = 1;
-       /* Process all bits, if the kernel tells us any bit is set.  We don't clear
-        * the flag til after we check everything, in case one of the handlers
-        * doesn't return.  After we clear it, we recheck. */
-       if (ev_mbox->ev_check_bits) {
-               do {
-                       ev_mbox->ev_check_bits = TRUE;  /* in case we don't return */
-                       cmb();
-                       BITMASK_FOREACH_SET(ev_mbox->ev_bitmap, MAX_NR_EVENT, bit_handler,
-                                           TRUE);
-                       ev_mbox->ev_check_bits = FALSE;
-                       wrmb(); /* check_bits written before we check for it being clear */
-               } while (!BITMASK_IS_CLEAR(ev_mbox->ev_bitmap, MAX_NR_EVENT));
-       }
        return retval;
 }
 
 /* Empty if the UCQ is empty and the bits don't need checked */
 bool mbox_is_empty(struct event_mbox *ev_mbox)
 {
-       return (ucq_is_empty(&ev_mbox->ev_msgs) && (!ev_mbox->ev_check_bits));
+       switch (ev_mbox->type) {
+               case (EV_MBOX_UCQ):
+                       return ucq_is_empty(&ev_mbox->ucq);
+               case (EV_MBOX_BITMAP):
+                       return evbitmap_is_empty(&ev_mbox->evbm);
+               case (EV_MBOX_CEQ):
+                       return ceq_is_empty(&ev_mbox->ceq);
+               default:
+                       printf("Unknown mbox type %d!\n", ev_mbox->type);
+                       return FALSE;
+       }
 }
 
 /* The EV_EVENT handler - extract the ev_q from the message. */
@@ -316,19 +409,16 @@ int handle_events(uint32_t vcoreid)
  * application specific, then this will dispatch/handle based on its flags. */
 void handle_event_q(struct event_queue *ev_q)
 {
+       printd("[event] handling ev_q %08p on vcore %d\n", ev_q, vcore_id());
        /* If the program wants to handle the ev_q on its own: */
-       if (ev_q->ev_flags & EVENT_JUSTHANDLEIT) {
-               if (!ev_q->ev_handler) {
-                       printf("No ev_handler installed for ev_q %08p, aborting!\n", ev_q);
-                       return;
-               }
+       if (ev_q->ev_handler) {
                /* Remember this can't block or page fault */
                ev_q->ev_handler(ev_q);
                return;
        }
-       printd("[event] handling ev_q %08p on vcore %d\n", ev_q, vcore_id());
        /* Raw ev_qs that haven't been connected to an mbox, user bug: */
        assert(ev_q->ev_mbox);
+       /* The "default" ev_handler, common enough that I don't want a func ptr */
        handle_mbox(ev_q->ev_mbox);
 }