epoll: Support very large CEQ sets
authorBarret Rhoden <brho@cs.berkeley.edu>
Fri, 6 Jan 2017 19:24:18 +0000 (14:24 -0500)
committerBarret Rhoden <brho@cs.berkeley.edu>
Tue, 10 Jan 2017 00:01:40 +0000 (19:01 -0500)
We now NR_FILE_DESC_MAX for the CEQ.  This allows the user to tap as many
(kernel) FDs as they want, without us worrying about the ceq size.  We'll
take the epoll size parameter as a hint for the ring size (which is the
expected max number of unique events in the CEQ).

Signed-off-by: Barret Rhoden <brho@cs.berkeley.edu>
user/iplib/epoll.c

index d4f97a0..6d0e1f7 100644 (file)
@@ -11,7 +11,6 @@
  *     - there's no EPOLLONESHOT or level-triggered support.
  *     - you can only tap one FD at a time, so you can't add the same FD to
  *     multiple epoll sets.
- *     - there is no support for growing the epoll set.
  *     - closing the epoll is a little dangerous, if there are outstanding INDIR
  *     events.  this will only pop up if you're yielding cores, maybe getting
  *     preempted, and are unlucky.
@@ -51,7 +50,6 @@ struct epoll_ctlr {
        struct event_queue                      *ceq_evq;
        struct event_queue                      *alarm_evq;
        struct ceq                                      *ceq;   /* convenience pointer */
-       unsigned int                            size;
        uth_mutex_t                                     mtx;
        struct user_fd                          ufd;
 };
@@ -111,6 +109,11 @@ static uint32_t taps_to_ep_events(int taps)
        return ep_ev;
 }
 
+static unsigned int ep_get_ceq_max_ever(struct epoll_ctlr *ep)
+{
+       return atomic_read(&ep->ceq_evq->ev_mbox->ceq.max_event_ever);
+}
+
 static struct ceq_event *ep_get_ceq_ev(struct epoll_ctlr *ep, size_t idx)
 {
        if (ep->ceq_evq->ev_mbox->ceq.nr_events <= idx)
@@ -131,11 +134,11 @@ static struct epoll_ctlr *fd_to_cltr(int fd)
 }
 
 /* Event queue helpers: */
-static struct event_queue *ep_get_ceq_evq(unsigned int ceq_size)
+static struct event_queue *ep_get_ceq_evq(unsigned int ceq_ring_sz)
 {
        struct event_queue *ceq_evq = get_eventq_raw();
        ceq_evq->ev_mbox->type = EV_MBOX_CEQ;
-       ceq_init(&ceq_evq->ev_mbox->ceq, CEQ_OR, ceq_size, ceq_size);
+       ceq_init(&ceq_evq->ev_mbox->ceq, CEQ_OR, NR_FILE_DESC_MAX, ceq_ring_sz);
        ceq_evq->ev_flags = EVENT_INDIR | EVENT_SPAM_INDIR | EVENT_WAKEUP;
        evq_attach_wakeup_ctlr(ceq_evq);
        return ceq_evq;
@@ -179,11 +182,12 @@ static void epoll_close(struct user_fd *ufd)
        struct ep_fd_data *ep_fd_i;
        int nr_tap_req = 0;
        int nr_done = 0;
+       unsigned int max_ceq_events = ep_get_ceq_max_ever(ep);
 
-       tap_reqs = malloc(sizeof(struct fd_tap_req) * ep->size);
-       memset(tap_reqs, 0, sizeof(struct fd_tap_req) * ep->size);
+       tap_reqs = malloc(sizeof(struct fd_tap_req) * max_ceq_events);
+       memset(tap_reqs, 0, sizeof(struct fd_tap_req) * max_ceq_events);
        /* Slightly painful, O(n) with no escape hatch */
-       for (int i = 0; i < ep->size; i++) {
+       for (int i = 0; i < max_ceq_events; i++) {
                ceq_ev_i = ep_get_ceq_ev(ep, i);
                /* CEQ should have been big enough for our size */
                assert(ceq_ev_i);
@@ -213,17 +217,14 @@ static void epoll_close(struct user_fd *ufd)
 
 static int init_ep_ctlr(struct epoll_ctlr *ep, int size)
 {
-       unsigned int ceq_size;
-
-       /* TODO: we don't grow yet.  Until then, we help out a little. */
        if (size == 1)
                size = 128;
-       ceq_size = ROUNDUPPWR2(size);
-       ep->size = ceq_size;
        ep->mtx = uth_mutex_alloc();
        ep->ufd.magic = EPOLL_UFD_MAGIC;
        ep->ufd.close = epoll_close;
-       ep->ceq_evq = ep_get_ceq_evq(ceq_size);
+       /* Size is a hint for the CEQ concurrency.  We can actually handle as many
+        * kernel FDs as is possible. */
+       ep->ceq_evq = ep_get_ceq_evq(ROUNDUPPWR2(size));
        ep->alarm_evq = ep_get_alarm_evq();
        return 0;
 }