Use mutexes in epoll instead of spinlocks
[akaros.git] / user / parlib / event.c
index f565d1e..64dfaa0 100644 (file)
 #include <ros/procdata.h>
 #include <parlib/ucq.h>
 #include <parlib/evbitmap.h>
+#include <parlib/ceq.h>
 #include <parlib/vcore.h>
 #include <stdlib.h>
 #include <string.h>
-#include <assert.h>
+#include <parlib/assert.h>
 #include <errno.h>
 #include <parlib/parlib.h>
 #include <parlib/event.h>
@@ -41,7 +42,7 @@ __thread uint32_t __vc_rem_vcoreid;
  *
  * Use the 'regular' one for big_qs if you don't want to worry about the mbox
  * initalization */
-struct event_queue *get_big_event_q_raw(void)
+struct event_queue *get_eventq_raw(void)
 {
        /* TODO: (PIN) should be pinned memory */
        struct event_queue_big *big_q = malloc(sizeof(struct event_queue_big));
@@ -50,9 +51,9 @@ struct event_queue *get_big_event_q_raw(void)
        return (struct event_queue*)big_q;
 }
 
-struct event_queue *get_big_event_q(int mbox_type)
+struct event_queue *get_eventq(int mbox_type)
 {
-       struct event_queue *big_q = get_big_event_q_raw();
+       struct event_queue *big_q = get_eventq_raw();
        event_mbox_init(big_q->ev_mbox, mbox_type);
        return big_q;
 }
@@ -71,6 +72,9 @@ void event_mbox_init(struct event_mbox *ev_mbox, int mbox_type)
                case (EV_MBOX_BITMAP):
                        evbitmap_init(&ev_mbox->evbm);
                        break;
+               case (EV_MBOX_CEQ):
+                       ceq_init(&ev_mbox->ceq, CEQ_OR, CEQ_DEFAULT_SZ, CEQ_DEFAULT_SZ);
+                       break;
                default:
                        printf("Unknown mbox type %d!\n", ev_mbox->type);
                        break;
@@ -79,7 +83,7 @@ void event_mbox_init(struct event_mbox *ev_mbox, int mbox_type)
 
 /* Give it up.  I don't recommend calling these unless you're sure the queues
  * aren't in use (unregistered, etc). (TODO: consider some checks for this) */
-void put_big_event_q_raw(struct event_queue *ev_q)
+void put_eventq_raw(struct event_queue *ev_q)
 {
        /* if we use something other than malloc, we'll need to be aware that ev_q
         * is actually an event_queue_big.  One option is to use the flags, though
@@ -87,10 +91,10 @@ void put_big_event_q_raw(struct event_queue *ev_q)
        free(ev_q);
 }
 
-void put_big_event_q(struct event_queue *ev_q)
+void put_eventq(struct event_queue *ev_q)
 {
        event_mbox_cleanup(ev_q->ev_mbox);
-       put_big_event_q_raw(ev_q);
+       put_eventq_raw(ev_q);
 }
 
 void event_mbox_cleanup(struct event_mbox *ev_mbox)
@@ -102,6 +106,9 @@ void event_mbox_cleanup(struct event_mbox *ev_mbox)
                case (EV_MBOX_BITMAP):
                        evbitmap_cleanup(&ev_mbox->evbm);
                        break;
+               case (EV_MBOX_CEQ):
+                       ceq_cleanup(&ev_mbox->ceq);
+                       break;
                default:
                        printf("Unknown mbox type %d!\n", ev_mbox->type);
                        break;
@@ -109,7 +116,7 @@ void event_mbox_cleanup(struct event_mbox *ev_mbox)
 }
 
 /* Need to point this event_q to an mbox - usually to a vcpd */
-struct event_queue *get_event_q(void)
+struct event_queue *get_eventq_slim(void)
 {
        /* TODO: (PIN) should be pinned memory */
        struct event_queue *ev_q = malloc(sizeof(struct event_queue));
@@ -120,9 +127,9 @@ struct event_queue *get_event_q(void)
 /* Gets a small ev_q, with ev_mbox pointing to the vcpd mbox of vcoreid.  If
  * ev_flags has EVENT_VCORE_PRIVATE set, it'll give you the private mbox.  o/w,
  * you'll get the public one. */
-struct event_queue *get_event_q_vcpd(uint32_t vcoreid, int ev_flags)
+struct event_queue *get_eventq_vcpd(uint32_t vcoreid, int ev_flags)
 {
-       struct event_queue *ev_q = get_event_q();
+       struct event_queue *ev_q = get_eventq_slim();
        if (ev_flags & EVENT_VCORE_PRIVATE)
                ev_q->ev_mbox = &vcpd_of(vcoreid)->ev_mbox_private;
        else
@@ -130,13 +137,18 @@ struct event_queue *get_event_q_vcpd(uint32_t vcoreid, int ev_flags)
        return ev_q;
 }
 
-void put_event_q(struct event_queue *ev_q)
+void put_eventq_slim(struct event_queue *ev_q)
 {
        /* if we use something other than malloc, we'll need to be aware that ev_q
         * is not an event_queue_big. */
        free(ev_q);
 }
 
+void put_eventq_vcpd(struct event_queue *ev_q)
+{
+       put_eventq_slim(ev_q);
+}
+
 /* Sets ev_q to be the receiving end for kernel event ev_type */
 void register_kevent_q(struct event_queue *ev_q, unsigned int ev_type)
 {
@@ -161,7 +173,7 @@ struct event_queue *clear_kevent_q(unsigned int ev_type)
  * the other event functions together to get similar things done. */
 void enable_kevent(unsigned int ev_type, uint32_t vcoreid, int ev_flags)
 {
-       struct event_queue *ev_q = get_event_q_vcpd(vcoreid, ev_flags);
+       struct event_queue *ev_q = get_eventq_vcpd(vcoreid, ev_flags);
        ev_q->ev_flags = ev_flags;
        ev_q->ev_vcore = vcoreid;
        ev_q->ev_handler = 0;
@@ -193,6 +205,62 @@ unsigned int get_event_type(struct event_mbox *ev_mbox)
        return EV_NONE;
 }
 
+/* Attempts to register ev_q with sysc, so long as sysc is not done/progress.
+ * Returns true if it succeeded, and false otherwise.  False means that the
+ * syscall is done, and does not need an event set (and should be handled
+ * accordingly).
+ *
+ * A copy of this is in glibc/sysdeps/akaros/syscall.c.  Keep them in sync. */
+bool register_evq(struct syscall *sysc, struct event_queue *ev_q)
+{
+       int old_flags;
+       sysc->ev_q = ev_q;
+       wrmb(); /* don't let that write pass any future reads (flags) */
+       /* Try and set the SC_UEVENT flag (so the kernel knows to look at ev_q) */
+       do {
+               /* no cmb() needed, the atomic_read will reread flags */
+               old_flags = atomic_read(&sysc->flags);
+               /* Spin if the kernel is mucking with syscall flags */
+               while (old_flags & SC_K_LOCK)
+                       old_flags = atomic_read(&sysc->flags);
+               /* If the kernel finishes while we are trying to sign up for an event,
+                * we need to bail out */
+               if (old_flags & (SC_DONE | SC_PROGRESS)) {
+                       sysc->ev_q = 0;         /* not necessary, but might help with bugs */
+                       return FALSE;
+               }
+       } while (!atomic_cas(&sysc->flags, old_flags, old_flags | SC_UEVENT));
+       return TRUE;
+}
+
+/* De-registers a syscall, so that the kernel will not send an event when it is
+ * done.  The call could already be SC_DONE, or could even finish while we try
+ * to unset SC_UEVENT.
+ *
+ * There is a chance the kernel sent an event if you didn't do this in time, but
+ * once this returns, the kernel won't send a message.
+ *
+ * If the kernel is trying to send a message right now, this will spin (on
+ * SC_K_LOCK).  We need to make sure we deregistered, and that if a message
+ * is coming, that it already was sent (and possibly overflowed), before
+ * returning. */
+void deregister_evq(struct syscall *sysc)
+{
+       int old_flags;
+       sysc->ev_q = 0;
+       wrmb(); /* don't let that write pass any future reads (flags) */
+       /* Try and unset the SC_UEVENT flag */
+       do {
+               /* no cmb() needed, the atomic_read will reread flags */
+               old_flags = atomic_read(&sysc->flags);
+               /* Spin if the kernel is mucking with syscall flags */
+               while (old_flags & SC_K_LOCK)
+                       old_flags = atomic_read(&sysc->flags);
+               /* Note we don't care if the SC_DONE flag is getting set.  We just need
+                * to avoid clobbering flags */
+       } while (!atomic_cas(&sysc->flags, old_flags, old_flags & ~SC_UEVENT));
+}
+
 /* Actual Event Handling */
 
 /* List of handler lists, process-wide.  They all must return (don't context
@@ -240,10 +308,11 @@ bool extract_one_mbox_msg(struct event_mbox *ev_mbox, struct event_msg *ev_msg)
 {
        switch (ev_mbox->type) {
                case (EV_MBOX_UCQ):
-                       /* get_ucq returns 0 on success, -1 on empty */
-                       return get_ucq_msg(&ev_mbox->ucq, ev_msg) == 0;
+                       return get_ucq_msg(&ev_mbox->ucq, ev_msg);
                case (EV_MBOX_BITMAP):
                        return get_evbitmap_msg(&ev_mbox->evbm, ev_msg);
+               case (EV_MBOX_CEQ):
+                       return get_ceq_msg(&ev_mbox->ceq, ev_msg);
                default:
                        printf("Unknown mbox type %d!\n", ev_mbox->type);
                        return FALSE;
@@ -288,6 +357,8 @@ bool mbox_is_empty(struct event_mbox *ev_mbox)
                        return ucq_is_empty(&ev_mbox->ucq);
                case (EV_MBOX_BITMAP):
                        return evbitmap_is_empty(&ev_mbox->evbm);
+               case (EV_MBOX_CEQ):
+                       return ceq_is_empty(&ev_mbox->ceq);
                default:
                        printf("Unknown mbox type %d!\n", ev_mbox->type);
                        return FALSE;