Split ev_mbox into a union of mbox types (XCC)
authorBarret Rhoden <brho@cs.berkeley.edu>
Wed, 12 Aug 2015 19:55:45 +0000 (15:55 -0400)
committerBarret Rhoden <brho@cs.berkeley.edu>
Mon, 28 Sep 2015 19:14:00 +0000 (15:14 -0400)
Previously, an event mailbox consisted of a UCQ for messages with
payloads and a bitmap for the "NOMSG" events.  In general, we want to
support different types of mailboxes, with the functionality switched on
by type.

The rule is that all mboxes must know how to handle an event_msg, in an
mbox-specific manner.  For bitmaps, it's to just send the type.  If the
user screws up and loses the payload, that's on them.

There is one usage of NOMSG at the moment - the simple_evq.  What's
happening now is that the mbox is being treated as a UCQ, and in
early-scp context may eventually overflow if there are enough syscalls
before initializing the uthread library.

Also note that you can still do the 'raw' initialization of mailboxes.
For instance, with UCQs, you may want to do one big mmap for the UCQ
pages, instead of two mmaps per initialization.  You can still do this,
if you know you're dealing with a UCQ.

Rebuild the world.

kern/include/ros/evbitmap.h [new file with mode: 0644]
kern/include/ros/event.h
kern/src/event.c
user/parlib/evbitmap.c [new file with mode: 0644]
user/parlib/event.c
user/parlib/include/evbitmap.h [new file with mode: 0644]
user/parlib/include/event.h
user/parlib/vcore.c
user/pthread/pthread.c

diff --git a/kern/include/ros/evbitmap.h b/kern/include/ros/evbitmap.h
new file mode 100644 (file)
index 0000000..5ccc111
--- /dev/null
@@ -0,0 +1,19 @@
+/* Copyright (c) 2015 Google Inc.
+ * Barret Rhoden <brho@cs.berkeley.edu>
+ * See LICENSE for details.
+ *
+ * Event bitmaps.  These are a type of event mailbox where the message type is
+ * translated to a bit, which is set in the bitmap. */
+
+/* Include this outside the ifndef, due to circular include concerns. */
+#include <ros/event.h>
+
+#ifndef ROS_INC_EVBITMAP_H
+#define ROS_INC_EVBITMAP_H
+
+struct evbitmap {
+       bool                                            check_bits;
+       uint8_t                                         bitmap[(MAX_NR_EVENT - 1) / 8 + 1];
+};
+
+#endif /* ROS_INC_EVBITMAP_H */
index dee54b0..d5c9afb 100644 (file)
@@ -11,6 +11,7 @@
 #include <ros/atomic.h>
 #include <ros/trapframe.h>
 /* #include <ros/ucq.h> included below */
+/* #include <ros/evbitmap.h> included below */
 
 /* Event Delivery Flags from the process to the kernel */
 #define EVENT_IPI                              0x00001 /* IPI the vcore (usually with INDIR) */
@@ -57,16 +58,22 @@ struct event_msg {
        uint64_t                                        ev_arg4;
 };
 
-/* Including here since ucq.h needs to know about struct event_msg */
+/* Include here since the mboxes need to know event.h basics (e.g. event_msg) */
 #include <ros/ucq.h>
+#include <ros/evbitmap.h>
+
+#define EV_MBOX_UCQ                            1
+#define EV_MBOX_BITMAP                 2
 
 /* Structure for storing / receiving event messages.  An overflow causes the
  * bit of the event to get set in the bitmap.  You can also have just the bit
  * sent (and no message). */
 struct event_mbox {
-       struct ucq                                      ev_msgs;
-       bool                                            ev_check_bits;
-       uint8_t                                         ev_bitmap[(MAX_NR_EVENT - 1) / 8 + 1];
+       int                                             type;
+       union {
+               struct ucq                              ucq;
+               struct evbitmap                 evbm;
+       };
 };
 
 /* The kernel sends messages to this structure, which describes how and where
index f98322d..6f69e98 100644 (file)
@@ -63,6 +63,13 @@ static void set_vcore_msgable(uint32_t vcoreid)
        atomic_or(&vcpd->flags, VC_CAN_RCV_MSG);
 }
 
+static void send_evbitmap_msg(struct evbitmap *evbm, struct event_msg *msg)
+{
+       SET_BITMASK_BIT_ATOMIC(evbm->bitmap, msg->ev_type);
+       wmb();
+       evbm->check_bits = TRUE;
+}
+
 /* Posts a message to the mbox, subject to flags.  Feel free to send 0 for the
  * flags if you don't want to give them the option of EVENT_NOMSG (which is what
  * we do when sending an indirection event).  Make sure that if mbox is a user
@@ -74,13 +81,15 @@ static void post_ev_msg(struct proc *p, struct event_mbox *mbox,
        printd("[kernel] Sending event type %d to mbox %p\n", msg->ev_type, mbox);
        /* Sanity check */
        assert(p);
-       /* If they just want a bit (NOMSG), just set the bit */
-       if (ev_flags & EVENT_NOMSG) {
-               SET_BITMASK_BIT_ATOMIC(mbox->ev_bitmap, msg->ev_type);
-               wmb();
-               mbox->ev_check_bits = TRUE;
-       } else {
-               send_ucq_msg(&mbox->ev_msgs, p, msg);
+       switch (mbox->type) {
+               case (EV_MBOX_UCQ):
+                       send_ucq_msg(&mbox->ucq, p, msg);
+                       break;
+               case (EV_MBOX_BITMAP):
+                       send_evbitmap_msg(&mbox->evbm, msg);
+                       break;
+               default:
+                       printk("[kernel] Unknown mbox type %d!\n", mbox->type);
        }
 }
 
diff --git a/user/parlib/evbitmap.c b/user/parlib/evbitmap.c
new file mode 100644 (file)
index 0000000..1affd39
--- /dev/null
@@ -0,0 +1,48 @@
+/* Copyright (c) 2015 Google Inc.
+ * Barret Rhoden <brho@cs.berkeley.edu>
+ * See LICENSE for details.
+ *
+ * Event bitmaps.  These are a type of event mailbox where the message type is
+ * translated to a bit, which is set in the bitmap. */
+
+#include <parlib/evbitmap.h>
+#include <parlib/bitmask.h>
+#include <string.h>
+
+void evbitmap_init(struct evbitmap *evbm)
+{
+       memset(evbm, 0, sizeof(struct evbitmap));
+}
+
+void evbitmap_cleanup(struct evbitmap *evbm)
+{
+}
+
+bool evbitmap_is_empty(struct evbitmap *evbm)
+{
+       return !evbm->check_bits;
+}
+
+bool get_evbitmap_msg(struct evbitmap *evbm, struct event_msg *ev_msg)
+{
+       if (evbitmap_is_empty(evbm))
+               return FALSE;
+       while (1) {
+               for (int i = 0; i < MAX_NR_EVENT; i++) {
+                       if (GET_BITMASK_BIT(evbm->bitmap, i)) {
+                               CLR_BITMASK_BIT_ATOMIC(evbm->bitmap, i);
+                               /* bit messages are empty except for the type. */
+                               memset(ev_msg, 0, sizeof(struct event_msg));
+                               ev_msg->ev_type = i;
+                               return TRUE;
+                       }
+               }
+               /* If we made it here, then the bitmap might be empty. */
+               evbm->check_bits = FALSE;
+               wrmb(); /* check_bits written before we check for it being clear */
+               if (BITMASK_IS_CLEAR(evbm->bitmap, MAX_NR_EVENT))
+                       return FALSE;
+               cmb();
+               evbm->check_bits = TRUE;
+       }
+}
index 392f344..b113096 100644 (file)
@@ -9,7 +9,7 @@
 #include <ros/event.h>
 #include <ros/procdata.h>
 #include <parlib/ucq.h>
-#include <parlib/bitmask.h>
+#include <parlib/evbitmap.h>
 #include <parlib/vcore.h>
 #include <stdlib.h>
 #include <string.h>
@@ -35,11 +35,11 @@ __thread uint32_t __vc_rem_vcoreid;
  * these stitch up the big_q so its ev_mbox points to its internal mbox.  Never
  * access the internal mbox directly.
  *
- * Raw ones need to have their UCQs initialized.  If you're making a lot of
- * these, you can do one big mmap and init the ucqs on your own, which ought to
- * perform better.
+ * Raw ones need to have their mailboxes initialized.  If you're making a lot of
+ * these and they perform their own mmaps (e.g. UCQs), you can do one big mmap
+ * and init the ucqs on your own, which ought to perform better.
  *
- * Use the 'regular' one for big_qs if you don't want to worry about the ucq
+ * Use the 'regular' one for big_qs if you don't want to worry about the mbox
  * initalization */
 struct event_queue *get_big_event_q_raw(void)
 {
@@ -53,11 +53,30 @@ struct event_queue *get_big_event_q_raw(void)
 struct event_queue *get_big_event_q(void)
 {
        struct event_queue *big_q = get_big_event_q_raw();
-       /* uses the simpler, internally mmapping ucq_init() */
-       ucq_init(&big_q->ev_mbox->ev_msgs);
+       event_mbox_init(big_q->ev_mbox, EV_MBOX_UCQ);
        return big_q;
 }
 
+/* Basic initialization of a single mbox.  If you know the type, you can set up
+ * the mbox manually with possibly better performance.  For instance, ucq_init()
+ * calls mmap internally.  You could mmap a huge blob on your own and call
+ * ucq_raw_init (don't forget to set the mbox_type!) */
+void event_mbox_init(struct event_mbox *ev_mbox, int mbox_type)
+{
+       ev_mbox->type = mbox_type;
+       switch (ev_mbox->type) {
+               case (EV_MBOX_UCQ):
+                       ucq_init(&ev_mbox->ucq);
+                       break;
+               case (EV_MBOX_BITMAP):
+                       evbitmap_init(&ev_mbox->evbm);
+                       break;
+               default:
+                       printf("Unknown mbox type %d!\n", ev_mbox->type);
+                       break;
+       }
+}
+
 /* Give it up.  I don't recommend calling these unless you're sure the queues
  * aren't in use (unregistered, etc). (TODO: consider some checks for this) */
 void put_big_event_q_raw(struct event_queue *ev_q)
@@ -70,10 +89,25 @@ void put_big_event_q_raw(struct event_queue *ev_q)
 
 void put_big_event_q(struct event_queue *ev_q)
 {
-       ucq_free_pgs(&ev_q->ev_mbox->ev_msgs);
+       event_mbox_cleanup(ev_q->ev_mbox);
        put_big_event_q_raw(ev_q);
 }
 
+void event_mbox_cleanup(struct event_mbox *ev_mbox)
+{
+       switch (ev_mbox->type) {
+               case (EV_MBOX_UCQ):
+                       ucq_free_pgs(&ev_mbox->ucq);
+                       break;
+               case (EV_MBOX_BITMAP):
+                       evbitmap_cleanup(&ev_mbox->evbm);
+                       break;
+               default:
+                       printf("Unknown mbox type %d!\n", ev_mbox->type);
+                       break;
+       }
+}
+
 /* Need to point this event_q to an mbox - usually to a vcpd */
 struct event_queue *get_event_q(void)
 {
@@ -156,14 +190,6 @@ unsigned int get_event_type(struct event_mbox *ev_mbox)
 
        if (extract_one_mbox_msg(ev_mbox, &local_msg))
                return local_msg.ev_type;
-       if (BITMASK_IS_CLEAR(&ev_mbox->ev_bitmap, MAX_NR_EVENT))
-               return EV_NONE; /* aka, 0 */
-       for (int i = 0; i < MAX_NR_EVENT; i++) {
-               if (GET_BITMASK_BIT(ev_mbox->ev_bitmap, i)) {
-                       CLR_BITMASK_BIT_ATOMIC(ev_mbox->ev_bitmap, i);
-                       return i;
-               }
-       }
        return EV_NONE;
 }
 
@@ -212,8 +238,16 @@ static void run_ev_handlers(unsigned int ev_type, struct event_msg *ev_msg)
  * Returns TRUE on success. */
 bool extract_one_mbox_msg(struct event_mbox *ev_mbox, struct event_msg *ev_msg)
 {
-       /* get_ucq returns 0 on success, -1 on empty */
-       return get_ucq_msg(&ev_mbox->ev_msgs, ev_msg) == 0;
+       switch (ev_mbox->type) {
+               case (EV_MBOX_UCQ):
+                       /* get_ucq returns 0 on success, -1 on empty */
+                       return get_ucq_msg(&ev_mbox->ucq, ev_msg) == 0;
+               case (EV_MBOX_BITMAP):
+                       return get_evbitmap_msg(&ev_mbox->evbm, ev_msg);
+               default:
+                       printf("Unknown mbox type %d!\n", ev_mbox->type);
+                       return FALSE;
+       }
 }
 
 /* Attempts to handle a message.  Returns 1 if we dequeued a msg, 0 o/w. */
@@ -237,39 +271,27 @@ int handle_one_mbox_msg(struct event_mbox *ev_mbox)
 int handle_mbox(struct event_mbox *ev_mbox)
 {
        int retval = 0;
-       uint32_t vcoreid = vcore_id();
-       void bit_handler(unsigned int bit) {
-               printd("[event] Bit: ev_type: %d\n", bit);
-               run_ev_handlers(bit, 0);
-               retval = 1;
-               /* Consider checking the queue for incoming messages while we're here */
-       }
        printd("[event] handling ev_mbox %08p on vcore %d\n", ev_mbox, vcore_id());
        /* Some stack-smashing bugs cause this to fail */
        assert(ev_mbox);
        /* Handle all full messages, tracking if we do at least one. */
        while (handle_one_mbox_msg(ev_mbox))
                retval = 1;
-       /* Process all bits, if the kernel tells us any bit is set.  We don't clear
-        * the flag til after we check everything, in case one of the handlers
-        * doesn't return.  After we clear it, we recheck. */
-       if (ev_mbox->ev_check_bits) {
-               do {
-                       ev_mbox->ev_check_bits = TRUE;  /* in case we don't return */
-                       cmb();
-                       BITMASK_FOREACH_SET(ev_mbox->ev_bitmap, MAX_NR_EVENT, bit_handler,
-                                           TRUE);
-                       ev_mbox->ev_check_bits = FALSE;
-                       wrmb(); /* check_bits written before we check for it being clear */
-               } while (!BITMASK_IS_CLEAR(ev_mbox->ev_bitmap, MAX_NR_EVENT));
-       }
        return retval;
 }
 
 /* Empty if the UCQ is empty and the bits don't need checked */
 bool mbox_is_empty(struct event_mbox *ev_mbox)
 {
-       return (ucq_is_empty(&ev_mbox->ev_msgs) && (!ev_mbox->ev_check_bits));
+       switch (ev_mbox->type) {
+               case (EV_MBOX_UCQ):
+                       return ucq_is_empty(&ev_mbox->ucq);
+               case (EV_MBOX_BITMAP):
+                       return evbitmap_is_empty(&ev_mbox->evbm);
+               default:
+                       printf("Unknown mbox type %d!\n", ev_mbox->type);
+                       return FALSE;
+       }
 }
 
 /* The EV_EVENT handler - extract the ev_q from the message. */
diff --git a/user/parlib/include/evbitmap.h b/user/parlib/include/evbitmap.h
new file mode 100644 (file)
index 0000000..1e783cc
--- /dev/null
@@ -0,0 +1,23 @@
+/* Copyright (c) 2015 Google Inc.
+ * Barret Rhoden <brho@cs.berkeley.edu>
+ * See LICENSE for details.
+ *
+ * Event bitmaps.  These are a type of event mailbox where the message type is
+ * translated to a bit, which is set in the bitmap. */
+
+#ifndef PARLIB_EVBITMAP_H
+#define PARLIB_EVBITMAP_H
+
+#include <ros/evbitmap.h>
+
+__BEGIN_DECLS
+
+void evbitmap_init(struct evbitmap *evbm);
+void evbitmap_cleanup(struct evbitmap *evbm);
+bool evbitmap_is_empty(struct evbitmap *evbm);
+void evbitmap_init(struct evbitmap *evbm);
+bool get_evbitmap_msg(struct evbitmap *evbm, struct event_msg *ev_msg);
+
+__END_DECLS
+
+#endif /* PARLIB_EVBITMAP_H */
index 448bf6c..3b4b1eb 100644 (file)
@@ -17,8 +17,10 @@ __BEGIN_DECLS
 /********* Event_q Setup / Registration  ***********/
 struct event_queue *get_big_event_q_raw(void);
 struct event_queue *get_big_event_q(void);
+void event_mbox_init(struct event_mbox *ev_mbox, int mbox_type);
 void put_big_event_q_raw(struct event_queue *ev_q);
 void put_big_event_q(struct event_queue *ev_q);
+void event_mbox_cleanup(struct event_mbox *ev_mbox);
 struct event_queue *get_event_q(void);
 struct event_queue *get_event_q_vcpd(uint32_t vcoreid, int ev_flags);
 void put_event_q(struct event_queue *ev_q);
index b440c74..680aee2 100644 (file)
@@ -174,10 +174,12 @@ void __attribute__((constructor)) vcore_lib_init(void)
         * separate ev_q for that. */
        for (int i = 0; i < max_vcores(); i++) {
                /* four pages total for both ucqs from the big block (2 pages each) */
-               ucq_init_raw(&vcpd_of(i)->ev_mbox_public.ev_msgs,
+               vcpd_of(i)->ev_mbox_public.type = EV_MBOX_UCQ;
+               ucq_init_raw(&vcpd_of(i)->ev_mbox_public.ucq,
                             mmap_block + (4 * i    ) * PGSIZE,
                             mmap_block + (4 * i + 1) * PGSIZE);
-               ucq_init_raw(&vcpd_of(i)->ev_mbox_private.ev_msgs,
+               vcpd_of(i)->ev_mbox_private.type = EV_MBOX_UCQ;
+               ucq_init_raw(&vcpd_of(i)->ev_mbox_private.ucq,
                             mmap_block + (4 * i + 2) * PGSIZE,
                             mmap_block + (4 * i + 3) * PGSIZE);
                /* Set the lowest level entry point for each vcore. */
index f471c78..b52554c 100644 (file)
@@ -640,7 +640,8 @@ void __attribute__((constructor)) pthread_lib_init(void)
                sysc_mgmt[i].ev_q->ev_flags = EVENT_IPI | EVENT_INDIR |
                                              EVENT_SPAM_INDIR | EVENT_WAKEUP;
                sysc_mgmt[i].ev_q->ev_vcore = i;
-               ucq_init_raw(&sysc_mgmt[i].ev_q->ev_mbox->ev_msgs, 
+               sysc_mgmt[i].ev_q->ev_mbox->type = EV_MBOX_UCQ;
+               ucq_init_raw(&sysc_mgmt[i].ev_q->ev_mbox->ucq,
                             mmap_block + (2 * i    ) * PGSIZE, 
                             mmap_block + (2 * i + 1) * PGSIZE); 
        }
@@ -655,7 +656,8 @@ void __attribute__((constructor)) pthread_lib_init(void)
        assert(sysc_mbox);
        assert(two_pages);
        memset(sysc_mbox, 0, sizeof(struct event_mbox));
-       ucq_init_raw(&sysc_mbox->ev_msgs, two_pages, two_pages + PGSIZE);
+       sysc_mbox->type = EV_MBOX_UCQ;
+       ucq_init_raw(&sysc_mbox->ucq, two_pages, two_pages + PGSIZE);
        for (int i = 0; i < max_vcores(); i++) {
                sysc_mgmt[i].ev_q = get_event_q();
                sysc_mgmt[i].ev_q->ev_flags = EVENT_IPI | EVENT_INDIR |