Allow larger CEQs (XCC)
authorBarret Rhoden <brho@cs.berkeley.edu>
Fri, 6 Jan 2017 19:19:43 +0000 (14:19 -0500)
committerBarret Rhoden <brho@cs.berkeley.edu>
Tue, 10 Jan 2017 00:01:40 +0000 (19:01 -0500)
Previously, the user may have been punished for having a very large CEQ
event space.  This is the range of possible event IDs the CEQ would ever
see.  For FDs, technically the user might want NR_FILE_DESC_MAX (2^19).
That'd be about a 16 MB CEQ structure, and an O(2^19) recovery scan.

With this change, we only grow the CEQ's actual memory use on demand (soft
page fault) and only scan up to the max ever seen.  The memory tricks are
OK - the kernel needs to protect against PFs when accessing user memory
anyways, and it is safe for vcore context to have a soft PF on anonymous
memory.  (If we're OOM, we'd need to handle that regardless of whether or
not the user happened to be in vcore context).

Rebuild the world.  (At least glibc and dropbear.  Probably anything
multicore too).

Signed-off-by: Barret Rhoden <brho@cs.berkeley.edu>
kern/include/ros/ceq.h
kern/src/ceq.c
user/parlib/ceq.c

index fa23e7e..4dce3de 100644 (file)
@@ -69,6 +69,7 @@ struct ceq {
        struct ceq_event                        *events;                /* consumer pointer */
        unsigned int                            nr_events;
        unsigned int                            last_recovered;
+       atomic_t                                        max_event_ever;
        int32_t                                         *ring;                  /* consumer pointer */
        uint32_t                                        ring_sz;                /* size (power of 2) */
        uint8_t                                         operation;              /* e.g. CEQ_OR */
index 1ae6156..6a3cdd9 100644 (file)
@@ -20,6 +20,17 @@ static void error_addr(struct ceq *ceq, struct proc *p, void *addr)
               addr, p->pid);
 }
 
+static void ceq_update_max_event(struct ceq *ceq, unsigned int new_max)
+{
+       unsigned int old_max;
+
+       do {
+               old_max = atomic_read(&ceq->max_event_ever);
+               if (new_max <= old_max)
+                       return;
+       } while (!atomic_cas(&ceq->max_event_ever, old_max, new_max));
+}
+
 void send_ceq_msg(struct ceq *ceq, struct proc *p, struct event_msg *msg)
 {
        struct ceq_event *ceq_ev;
@@ -35,6 +46,7 @@ void send_ceq_msg(struct ceq *ceq, struct proc *p, struct event_msg *msg)
                       msg->ev_type, ceq->nr_events);
                return;
        }
+       ceq_update_max_event(ceq, msg->ev_type);
        /* ACCESS_ONCE, prevent the compiler from rereading ceq->events later, and
         * possibly getting a new, illegal version after our check */
        ceq_ev = &(ACCESS_ONCE(ceq->events))[msg->ev_type];
@@ -58,7 +70,7 @@ void send_ceq_msg(struct ceq *ceq, struct proc *p, struct event_msg *msg)
                        atomic_add(&ceq_ev->coalesce, msg->ev_arg2);
                        break;
                default:
-                       printk("[kernel] CEQ %p invalid op%d\n", ceq, ceq->operation);
+                       printk("[kernel] CEQ %p invalid op %d\n", ceq, ceq->operation);
                        return;
        }
        /* write before checking if we need to post (covered by the atomic) */
index 0af89a5..ae5aedf 100644 (file)
@@ -8,15 +8,18 @@
  * User side (consumer).
  *
  * When initializing, the nr_events is the maximum count of events you are
- * tracking, e.g. 100 FDs being tapped, but not the actual FD numbers.
+ * tracking, e.g. 100 FDs being tapped, but not the actual FD numbers.  If you
+ * use a large number, don't worry about memory; the memory is reserved but only
+ * allocated on demand (i.e. mmap without MAP_POPULATE).
  *
  * The ring_sz is a rough guess of the number of concurrent events.  It's not a
  * big deal what you pick, but it must be a power of 2.  Otherwise the kernel
  * will probably scribble over your memory.  If you pick a value that is too
  * small, then the ring may overflow, triggering an O(n) scan of the events
- * array.  You could make it the nearest power of 2 >= nr_events, for reasonable
- * behavior at the expense of memory.  It'll be very rare for the ring to have
- * more entries than the array has events. */
+ * array (where n is the largest event ID ever seen).  You could make it the
+ * nearest power of 2 >= nr_expected_events, for reasonable behavior at the
+ * expense of memory.  It'll be very rare for the ring to have more entries than
+ * the array has events. */
 
 #include <parlib/ceq.h>
 #include <parlib/arch/atomic.h>
@@ -25,6 +28,7 @@
 #include <parlib/spinlock.h>
 #include <stdlib.h>
 #include <stdio.h>
+#include <sys/mman.h>
 
 void ceq_init(struct ceq *ceq, uint8_t op, unsigned int nr_events,
               size_t ring_sz)
@@ -33,9 +37,12 @@ void ceq_init(struct ceq *ceq, uint8_t op, unsigned int nr_events,
         * so we don't leak memory.  They better not have asked for events before
         * doing this init call... */
        ceq_cleanup(ceq);
-       ceq->events = malloc(sizeof(struct ceq_event) * nr_events);
-       memset(ceq->events, 0, sizeof(struct ceq_event) * nr_events);
+       ceq->events = mmap(NULL, sizeof(struct ceq_event) * nr_events,
+                          PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS,
+                          -1, 0);
+       parlib_assert_perror(ceq->events != MAP_FAILED);
        ceq->nr_events = nr_events;
+       atomic_init(&ceq->max_event_ever, 0);
        assert(IS_PWR2(ring_sz));
        ceq->ring = malloc(sizeof(int32_t) * ring_sz);
        memset(ceq->ring, 0xff, sizeof(int32_t) * ring_sz);
@@ -151,7 +158,9 @@ bool get_ceq_msg(struct ceq *ceq, struct event_msg *msg)
                        ceq->last_recovered = 0;
                        wrmb(); /* clear overflowed before reading event entries */
                }
-               for (int i = ceq->last_recovered; i < ceq->nr_events; i++) {
+               for (int i = ceq->last_recovered;
+                    i <= atomic_read(&ceq->max_event_ever);
+                    i++) {
                        /* Regardles of whether there's a msg here, we checked it. */
                        ceq->last_recovered++;
                        if (extract_ceq_msg(ceq, i, msg)) {
@@ -195,6 +204,6 @@ bool ceq_is_empty(struct ceq *ceq)
 
 void ceq_cleanup(struct ceq *ceq)
 {
-       free(ceq->events);
+       munmap(ceq->events, sizeof(struct ceq_event) * ceq->nr_events);
        free(ceq->ring);
 }