vmm: Use VMM_CTL to set VMM flags (XCC)
[akaros.git] / user / iplib / epoll.c
index d4f97a0..9339a58 100644 (file)
@@ -11,7 +11,6 @@
  *     - there's no EPOLLONESHOT or level-triggered support.
  *     - you can only tap one FD at a time, so you can't add the same FD to
  *     multiple epoll sets.
- *     - there is no support for growing the epoll set.
  *     - closing the epoll is a little dangerous, if there are outstanding INDIR
  *     events.  this will only pop up if you're yielding cores, maybe getting
  *     preempted, and are unlucky.
@@ -34,6 +33,8 @@
 #include <parlib/ceq.h>
 #include <parlib/uthread.h>
 #include <parlib/timing.h>
+#include <parlib/slab.h>
+#include <parlib/assert.h>
 #include <sys/user_fd.h>
 #include <sys/close_cb.h>
 #include <stdio.h>
 #include <malloc.h>
 #include <sys/queue.h>
 #include <sys/plan9_helpers.h>
+#include <ros/fs.h>
 
 /* Sanity check, so we can ID our own FDs */
 #define EPOLL_UFD_MAGIC                0xe9011
 
+/* Each waiter that uses a timeout will have its own structure for dealing with
+ * its timeout.
+ *
+ * TODO: (RCU/SLAB) it's not safe to reap the objects, until we sort out
+ * INDIRs and RCU-style grace periods.  Not a big deal, since the number of
+ * these is the number of threads that concurrently do epoll timeouts. */
+struct ep_alarm {
+       struct event_queue                      *alarm_evq;
+       struct syscall                          sysc;
+};
+
+static struct kmem_cache *ep_alarms_cache;
+
 struct epoll_ctlr {
        TAILQ_ENTRY(epoll_ctlr)         link;
        struct event_queue                      *ceq_evq;
-       struct event_queue                      *alarm_evq;
-       struct ceq                                      *ceq;   /* convenience pointer */
-       unsigned int                            size;
-       uth_mutex_t                                     mtx;
+       uth_mutex_t                                     *mtx;
        struct user_fd                          ufd;
 };
 
 TAILQ_HEAD(epoll_ctlrs, epoll_ctlr);
 static struct epoll_ctlrs all_ctlrs = TAILQ_HEAD_INITIALIZER(all_ctlrs);
-static uth_mutex_t ctlrs_mtx;
+static uth_mutex_t *ctlrs_mtx;
 
 /* There's some bookkeeping we need to maintain on every FD.  Right now, the FD
  * is the index into the CEQ event array, so we can just hook this into the user
@@ -111,6 +123,11 @@ static uint32_t taps_to_ep_events(int taps)
        return ep_ev;
 }
 
+static unsigned int ep_get_ceq_max_ever(struct epoll_ctlr *ep)
+{
+       return atomic_read(&ep->ceq_evq->ev_mbox->ceq.max_event_ever);
+}
+
 static struct ceq_event *ep_get_ceq_ev(struct epoll_ctlr *ep, size_t idx)
 {
        if (ep->ceq_evq->ev_mbox->ceq.nr_events <= idx)
@@ -131,11 +148,11 @@ static struct epoll_ctlr *fd_to_cltr(int fd)
 }
 
 /* Event queue helpers: */
-static struct event_queue *ep_get_ceq_evq(unsigned int ceq_size)
+static struct event_queue *ep_get_ceq_evq(unsigned int ceq_ring_sz)
 {
        struct event_queue *ceq_evq = get_eventq_raw();
        ceq_evq->ev_mbox->type = EV_MBOX_CEQ;
-       ceq_init(&ceq_evq->ev_mbox->ceq, CEQ_OR, ceq_size, ceq_size);
+       ceq_init(&ceq_evq->ev_mbox->ceq, CEQ_OR, NR_FILE_DESC_MAX, ceq_ring_sz);
        ceq_evq->ev_flags = EVENT_INDIR | EVENT_SPAM_INDIR | EVENT_WAKEUP;
        evq_attach_wakeup_ctlr(ceq_evq);
        return ceq_evq;
@@ -179,11 +196,12 @@ static void epoll_close(struct user_fd *ufd)
        struct ep_fd_data *ep_fd_i;
        int nr_tap_req = 0;
        int nr_done = 0;
+       unsigned int max_ceq_events = ep_get_ceq_max_ever(ep);
 
-       tap_reqs = malloc(sizeof(struct fd_tap_req) * ep->size);
-       memset(tap_reqs, 0, sizeof(struct fd_tap_req) * ep->size);
+       tap_reqs = malloc(sizeof(struct fd_tap_req) * max_ceq_events);
+       memset(tap_reqs, 0, sizeof(struct fd_tap_req) * max_ceq_events);
        /* Slightly painful, O(n) with no escape hatch */
-       for (int i = 0; i < ep->size; i++) {
+       for (int i = 0; i < max_ceq_events; i++) {
                ceq_ev_i = ep_get_ceq_ev(ep, i);
                /* CEQ should have been big enough for our size */
                assert(ceq_ev_i);
@@ -203,7 +221,6 @@ static void epoll_close(struct user_fd *ufd)
        } while (nr_done < nr_tap_req);
        free(tap_reqs);
        ep_put_ceq_evq(ep->ceq_evq);
-       ep_put_alarm_evq(ep->alarm_evq);
        uth_mutex_lock(ctlrs_mtx);
        TAILQ_REMOVE(&all_ctlrs, ep, link);
        uth_mutex_unlock(ctlrs_mtx);
@@ -213,18 +230,14 @@ static void epoll_close(struct user_fd *ufd)
 
 static int init_ep_ctlr(struct epoll_ctlr *ep, int size)
 {
-       unsigned int ceq_size;
-
-       /* TODO: we don't grow yet.  Until then, we help out a little. */
        if (size == 1)
                size = 128;
-       ceq_size = ROUNDUPPWR2(size);
-       ep->size = ceq_size;
        ep->mtx = uth_mutex_alloc();
        ep->ufd.magic = EPOLL_UFD_MAGIC;
        ep->ufd.close = epoll_close;
-       ep->ceq_evq = ep_get_ceq_evq(ceq_size);
-       ep->alarm_evq = ep_get_alarm_evq();
+       /* Size is a hint for the CEQ concurrency.  We can actually handle as many
+        * kernel FDs as is possible. */
+       ep->ceq_evq = ep_get_ceq_evq(ROUNDUPPWR2(size));
        return 0;
 }
 
@@ -241,20 +254,46 @@ static void epoll_fd_closed(int fd)
        uth_mutex_unlock(ctlrs_mtx);
 }
 
-static void epoll_init(void)
+static int ep_alarm_ctor(void *obj, void *priv, int flags)
+{
+       struct ep_alarm *ep_a = (struct ep_alarm*)obj;
+
+       ep_a->alarm_evq = ep_get_alarm_evq();
+       return 0;
+}
+
+static void ep_alarm_dtor(void *obj, void *priv)
+{
+       struct ep_alarm *ep_a = (struct ep_alarm*)obj;
+
+       /* TODO: (RCU/SLAB).  Somehow the slab allocator is trying to reap our
+        * objects.  Note that when we update userspace to use magazines, the dtor
+        * will fire earlier (when the object is given to the slab layer).  We'll
+        * need to be careful about the final freeing of the ev_q. */
+       panic("Epoll alarms should never be destroyed!");
+       ep_put_alarm_evq(ep_a->alarm_evq);
+}
+
+static void epoll_init(void *arg)
 {
        static struct close_cb epoll_close_cb = {.func = epoll_fd_closed};
 
        register_close_cb(&epoll_close_cb);
        ctlrs_mtx = uth_mutex_alloc();
+       ep_alarms_cache = kmem_cache_create("epoll alarms",
+                                           sizeof(struct ep_alarm),
+                                           __alignof__(sizeof(struct ep_alarm)), 0,
+                                           ep_alarm_ctor, ep_alarm_dtor, NULL);
+       assert(ep_alarms_cache);
 }
 
 int epoll_create(int size)
 {
        int fd;
        struct epoll_ctlr *ep;
+       static parlib_once_t once = PARLIB_ONCE_INIT;
 
-       run_once(epoll_init());
+       parlib_run_once(&once, epoll_init, NULL);
        /* good thing the arg is a signed int... */
        if (size < 0) {
                errno = EINVAL;
@@ -282,6 +321,36 @@ int epoll_create1(int flags)
        return epoll_create(1);
 }
 
+/* Linux's epoll will check for events, even if edge-triggered, during
+ * additions (and probably modifications) to the epoll set.  It's a questionable
+ * policy, since it can hide user bugs.
+ *
+ * We can do the same, though only for EPOLLIN and EPOLLOUT for FDs that can
+ * report their status via stat.  (same as select()).
+ *
+ * Note that this could result in spurious events, which should be fine. */
+static void fire_existing_events(int fd, int ep_events,
+                                 struct event_queue *ev_q)
+{
+       struct stat stat_buf[1];
+       struct event_msg ev_msg[1];
+       int ret;
+       int synth_ep_events = 0;
+
+       ret = fstat(fd, stat_buf);
+       assert(!ret);
+       if ((ep_events & EPOLLIN) && S_READABLE(stat_buf->st_mode))
+               synth_ep_events |= EPOLLIN;
+       if ((ep_events & EPOLLOUT) && S_WRITABLE(stat_buf->st_mode))
+               synth_ep_events |= EPOLLOUT;
+       if (synth_ep_events) {
+               ev_msg->ev_type = fd;
+               ev_msg->ev_arg2 = ep_events_to_taps(synth_ep_events);
+               ev_msg->ev_arg3 = 0; /* tap->data is unused for epoll. */
+               sys_send_event(ev_q, ev_msg, vcore_id());
+       }
+}
+
 static int __epoll_ctl_add(struct epoll_ctlr *ep, int fd,
                            struct epoll_event *event)
 {
@@ -298,6 +367,11 @@ static int __epoll_ctl_add(struct epoll_ctlr *ep, int fd,
                werrstr("Epoll level-triggered not supported");
                return -1;
        }
+       if (event->events & EPOLLONESHOT) {
+               errno = EPERM;
+               werrstr("Epoll one-shot not supported");
+               return -1;
+       }
        /* The sockets-to-plan9 networking shims are a bit inconvenient.  The user
         * asked us to epoll on an FD, but that FD is actually a Qdata FD.  We might
         * need to actually epoll on the listen_fd.  Further, we don't know yet
@@ -347,6 +421,7 @@ static int __epoll_ctl_add(struct epoll_ctlr *ep, int fd,
        ep_fd->ep_event = *event;
        ep_fd->ep_event.events |= EPOLLHUP;
        ceq_ev->user_data = (uint64_t)ep_fd;
+       fire_existing_events(fd, ep_fd->ep_event.events, ep->ceq_evq);
        return 0;
 }
 
@@ -484,8 +559,8 @@ static int __epoll_wait(struct epoll_ctlr *ep, struct epoll_event *events,
        struct event_msg msg = {0};
        struct event_msg dummy_msg;
        struct event_queue *which_evq;
+       struct ep_alarm *ep_a;
        int nr_ret;
-       struct syscall sysc;
 
        nr_ret = __epoll_wait_poll(ep, events, maxevents);
        if (nr_ret)
@@ -494,16 +569,22 @@ static int __epoll_wait(struct epoll_ctlr *ep, struct epoll_event *events,
                return 0;
        /* From here on down, we're going to block until there is some activity */
        if (timeout != -1) {
-               syscall_async_evq(&sysc, ep->alarm_evq, SYS_block, timeout * 1000);
-               uth_blockon_evqs(&msg, &which_evq, 2, ep->ceq_evq, ep->alarm_evq);
-               if (which_evq == ep->alarm_evq)
+               ep_a = kmem_cache_alloc(ep_alarms_cache, 0);
+               assert(ep_a);
+               syscall_async_evq(&ep_a->sysc, ep_a->alarm_evq, SYS_block,
+                                 timeout * 1000);
+               uth_blockon_evqs(&msg, &which_evq, 2, ep->ceq_evq, ep_a->alarm_evq);
+               if (which_evq == ep_a->alarm_evq) {
+                       kmem_cache_free(ep_alarms_cache, ep_a);
                        return 0;
+               }
                /* The alarm sysc may or may not have finished yet.  This will force it
                 * to *start* to finish iff it is still a submitted syscall. */
-               sys_abort_sysc(&sysc);
+               sys_abort_sysc(&ep_a->sysc);
                /* But we still need to wait until the syscall completed.  Need a
                 * dummy msg, since we don't want to clobber the real msg. */
-               uth_blockon_evqs(&dummy_msg, 0, 1, ep->alarm_evq);
+               uth_blockon_evqs(&dummy_msg, 0, 1, ep_a->alarm_evq);
+               kmem_cache_free(ep_alarms_cache, ep_a);
        } else {
                uth_blockon_evqs(&msg, &which_evq, 1, ep->ceq_evq);
        }