parlib: Fix the use-after-func issue
[akaros.git] / user / iplib / epoll.c
index 1a1d03d..f415cb2 100644 (file)
@@ -11,7 +11,6 @@
  *     - there's no EPOLLONESHOT or level-triggered support.
  *     - you can only tap one FD at a time, so you can't add the same FD to
  *     multiple epoll sets.
- *     - there is no support for growing the epoll set.
  *     - closing the epoll is a little dangerous, if there are outstanding INDIR
  *     events.  this will only pop up if you're yielding cores, maybe getting
  *     preempted, and are unlucky.
@@ -34,6 +33,8 @@
 #include <parlib/ceq.h>
 #include <parlib/uthread.h>
 #include <parlib/timing.h>
+#include <parlib/slab.h>
+#include <parlib/assert.h>
 #include <sys/user_fd.h>
 #include <sys/close_cb.h>
 #include <stdio.h>
 #include <malloc.h>
 #include <sys/queue.h>
 #include <sys/plan9_helpers.h>
+#include <ros/fs.h>
 
 /* Sanity check, so we can ID our own FDs */
 #define EPOLL_UFD_MAGIC                0xe9011
 
+/* Each waiter that uses a timeout will have its own structure for dealing with
+ * its timeout.
+ *
+ * TODO: (RCU/SLAB) it's not safe to reap the objects, until we sort out
+ * INDIRs and RCU-style grace periods.  Not a big deal, since the number of
+ * these is the number of threads that concurrently do epoll timeouts. */
+struct ep_alarm {
+       struct event_queue                      *alarm_evq;
+       struct syscall                          sysc;
+};
+
+static struct kmem_cache *ep_alarms_cache;
+
 struct epoll_ctlr {
        TAILQ_ENTRY(epoll_ctlr)         link;
        struct event_queue                      *ceq_evq;
-       struct ceq                                      *ceq;   /* convenience pointer */
-       unsigned int                            size;
-       uth_mutex_t                                     mtx;
+       uth_mutex_t                                     *mtx;
        struct user_fd                          ufd;
 };
 
 TAILQ_HEAD(epoll_ctlrs, epoll_ctlr);
 static struct epoll_ctlrs all_ctlrs = TAILQ_HEAD_INITIALIZER(all_ctlrs);
-static uth_mutex_t ctlrs_mtx;
+static uth_mutex_t *ctlrs_mtx;
 
 /* There's some bookkeeping we need to maintain on every FD.  Right now, the FD
  * is the index into the CEQ event array, so we can just hook this into the user
@@ -110,6 +123,11 @@ static uint32_t taps_to_ep_events(int taps)
        return ep_ev;
 }
 
+static unsigned int ep_get_ceq_max_ever(struct epoll_ctlr *ep)
+{
+       return atomic_read(&ep->ceq_evq->ev_mbox->ceq.max_event_ever);
+}
+
 static struct ceq_event *ep_get_ceq_ev(struct epoll_ctlr *ep, size_t idx)
 {
        if (ep->ceq_evq->ev_mbox->ceq.nr_events <= idx)
@@ -130,11 +148,11 @@ static struct epoll_ctlr *fd_to_cltr(int fd)
 }
 
 /* Event queue helpers: */
-static struct event_queue *ep_get_ceq_evq(unsigned int ceq_size)
+static struct event_queue *ep_get_ceq_evq(unsigned int ceq_ring_sz)
 {
        struct event_queue *ceq_evq = get_eventq_raw();
        ceq_evq->ev_mbox->type = EV_MBOX_CEQ;
-       ceq_init(&ceq_evq->ev_mbox->ceq, CEQ_OR, ceq_size, ceq_size);
+       ceq_init(&ceq_evq->ev_mbox->ceq, CEQ_OR, NR_FILE_DESC_MAX, ceq_ring_sz);
        ceq_evq->ev_flags = EVENT_INDIR | EVENT_SPAM_INDIR | EVENT_WAKEUP;
        evq_attach_wakeup_ctlr(ceq_evq);
        return ceq_evq;
@@ -178,11 +196,12 @@ static void epoll_close(struct user_fd *ufd)
        struct ep_fd_data *ep_fd_i;
        int nr_tap_req = 0;
        int nr_done = 0;
+       unsigned int max_ceq_events = ep_get_ceq_max_ever(ep);
 
-       tap_reqs = malloc(sizeof(struct fd_tap_req) * ep->size);
-       memset(tap_reqs, 0, sizeof(struct fd_tap_req) * ep->size);
+       tap_reqs = malloc(sizeof(struct fd_tap_req) * max_ceq_events);
+       memset(tap_reqs, 0, sizeof(struct fd_tap_req) * max_ceq_events);
        /* Slightly painful, O(n) with no escape hatch */
-       for (int i = 0; i < ep->size; i++) {
+       for (int i = 0; i < max_ceq_events; i++) {
                ceq_ev_i = ep_get_ceq_ev(ep, i);
                /* CEQ should have been big enough for our size */
                assert(ceq_ev_i);
@@ -211,17 +230,14 @@ static void epoll_close(struct user_fd *ufd)
 
 static int init_ep_ctlr(struct epoll_ctlr *ep, int size)
 {
-       unsigned int ceq_size;
-
-       /* TODO: we don't grow yet.  Until then, we help out a little. */
        if (size == 1)
                size = 128;
-       ceq_size = ROUNDUPPWR2(size);
-       ep->size = ceq_size;
        ep->mtx = uth_mutex_alloc();
        ep->ufd.magic = EPOLL_UFD_MAGIC;
        ep->ufd.close = epoll_close;
-       ep->ceq_evq = ep_get_ceq_evq(ceq_size);
+       /* Size is a hint for the CEQ concurrency.  We can actually handle as many
+        * kernel FDs as is possible. */
+       ep->ceq_evq = ep_get_ceq_evq(ROUNDUPPWR2(size));
        return 0;
 }
 
@@ -238,20 +254,46 @@ static void epoll_fd_closed(int fd)
        uth_mutex_unlock(ctlrs_mtx);
 }
 
-static void epoll_init(void)
+static int ep_alarm_ctor(void *obj, void *priv, int flags)
+{
+       struct ep_alarm *ep_a = (struct ep_alarm*)obj;
+
+       ep_a->alarm_evq = ep_get_alarm_evq();
+       return 0;
+}
+
+static void ep_alarm_dtor(void *obj, void *priv)
+{
+       struct ep_alarm *ep_a = (struct ep_alarm*)obj;
+
+       /* TODO: (RCU/SLAB).  Somehow the slab allocator is trying to reap our
+        * objects.  Note that when we update userspace to use magazines, the dtor
+        * will fire earlier (when the object is given to the slab layer).  We'll
+        * need to be careful about the final freeing of the ev_q. */
+       panic("Epoll alarms should never be destroyed!");
+       ep_put_alarm_evq(ep_a->alarm_evq);
+}
+
+static void epoll_init(void *arg)
 {
        static struct close_cb epoll_close_cb = {.func = epoll_fd_closed};
 
        register_close_cb(&epoll_close_cb);
        ctlrs_mtx = uth_mutex_alloc();
+       ep_alarms_cache = kmem_cache_create("epoll alarms",
+                                           sizeof(struct ep_alarm),
+                                           __alignof__(sizeof(struct ep_alarm)), 0,
+                                           ep_alarm_ctor, ep_alarm_dtor, NULL);
+       assert(ep_alarms_cache);
 }
 
 int epoll_create(int size)
 {
        int fd;
        struct epoll_ctlr *ep;
+       static parlib_once_t once = PARLIB_ONCE_INIT;
 
-       run_once(epoll_init());
+       parlib_run_once(&once, epoll_init, NULL);
        /* good thing the arg is a signed int... */
        if (size < 0) {
                errno = EINVAL;
@@ -279,44 +321,44 @@ int epoll_create1(int flags)
        return epoll_create(1);
 }
 
-static int __epoll_ctl_add(struct epoll_ctlr *ep, int fd,
-                           struct epoll_event *event)
+/* Linux's epoll will check for events, even if edge-triggered, during
+ * additions (and probably modifications) to the epoll set.  It's a questionable
+ * policy, since it can hide user bugs.
+ *
+ * We can do the same, though only for EPOLLIN and EPOLLOUT for FDs that can
+ * report their status via stat.  (same as select()).
+ *
+ * Note that this could result in spurious events, which should be fine. */
+static void fire_existing_events(int fd, int ep_events,
+                                 struct event_queue *ev_q)
+{
+       struct stat stat_buf[1];
+       struct event_msg ev_msg[1];
+       int ret;
+       int synth_ep_events = 0;
+
+       ret = fstat(fd, stat_buf);
+       assert(!ret);
+       if ((ep_events & EPOLLIN) && S_READABLE(stat_buf->st_mode))
+               synth_ep_events |= EPOLLIN;
+       if ((ep_events & EPOLLOUT) && S_WRITABLE(stat_buf->st_mode))
+               synth_ep_events |= EPOLLOUT;
+       if (synth_ep_events) {
+               ev_msg->ev_type = fd;
+               ev_msg->ev_arg2 = ep_events_to_taps(synth_ep_events);
+               ev_msg->ev_arg3 = 0; /* tap->data is unused for epoll. */
+               sys_send_event(ev_q, ev_msg, vcore_id());
+       }
+}
+
+static int __epoll_ctl_add_raw(struct epoll_ctlr *ep, int fd,
+                               struct epoll_event *event)
 {
        struct ceq_event *ceq_ev;
        struct ep_fd_data *ep_fd;
        struct fd_tap_req tap_req = {0};
-       int ret, filter, sock_listen_fd;
-       struct epoll_event listen_event;
+       int ret, filter;
 
-       /* Only support ET.  Also, we just ignore EPOLLONESHOT.  That might work,
-        * logically, just with spurious events firing. */
-       if (!(event->events & EPOLLET)) {
-               errno = EPERM;
-               werrstr("Epoll level-triggered not supported");
-               return -1;
-       }
-       /* The sockets-to-plan9 networking shims are a bit inconvenient.  The user
-        * asked us to epoll on an FD, but that FD is actually a Qdata FD.  We might
-        * need to actually epoll on the listen_fd.  Further, we don't know yet
-        * whether or not they want the listen FD.  They could epoll on the socket,
-        * then listen later and want to wake up on the listen.
-        *
-        * So in the case we have a socket FD, we'll actually open the listen FD
-        * regardless (glibc handles this), and we'll epoll on both FDs.
-        * Technically, either FD could fire and they'd get an epoll event for it,
-        * but I think socket users will use only listen or data.
-        *
-        * As far as tracking the FD goes for epoll_wait() reporting, if the app
-        * wants to track the FD they think we are using, then they already passed
-        * that in event->data. */
-       sock_listen_fd = _sock_lookup_listen_fd(fd);
-       if (sock_listen_fd >= 0) {
-               listen_event.events = EPOLLET | EPOLLIN | EPOLLHUP;
-               listen_event.data = event->data;
-               ret = __epoll_ctl_add(ep, sock_listen_fd, &listen_event);
-               if (ret < 0)
-                       return ret;
-       }
        ceq_ev = ep_get_ceq_ev(ep, fd);
        if (!ceq_ev) {
                errno = ENOMEM;
@@ -344,28 +386,61 @@ static int __epoll_ctl_add(struct epoll_ctlr *ep, int fd,
        ep_fd->ep_event = *event;
        ep_fd->ep_event.events |= EPOLLHUP;
        ceq_ev->user_data = (uint64_t)ep_fd;
+       fire_existing_events(fd, ep_fd->ep_event.events, ep->ceq_evq);
        return 0;
 }
 
-static int __epoll_ctl_del(struct epoll_ctlr *ep, int fd,
+static int __epoll_ctl_add(struct epoll_ctlr *ep, int fd,
                            struct epoll_event *event)
 {
-       struct ceq_event *ceq_ev;
-       struct ep_fd_data *ep_fd;
        struct fd_tap_req tap_req = {0};
-       int ret, sock_listen_fd;
+       int ret, sock_listen_fd, sock_ctl_fd;
+       struct epoll_event listen_event;
 
-       /* If we were dealing with a socket shim FD, we tapped both the listen and
-        * the data file and need to untap both of them. */
-       sock_listen_fd = _sock_lookup_listen_fd(fd);
+       /* Only support ET.  Also, we just ignore EPOLLONESHOT.  That might work,
+        * logically, just with spurious events firing. */
+       if (!(event->events & EPOLLET)) {
+               errno = EPERM;
+               werrstr("Epoll level-triggered not supported");
+               return -1;
+       }
+       if (event->events & EPOLLONESHOT) {
+               errno = EPERM;
+               werrstr("Epoll one-shot not supported");
+               return -1;
+       }
+       /* The sockets-to-plan9 networking shims are a bit inconvenient.  The user
+        * asked us to epoll on an FD, but that FD is actually a Qdata FD.  We might
+        * need to actually epoll on the listen_fd.  Further, we don't know yet
+        * whether or not they want the listen FD.  They could epoll on the socket,
+        * then listen later and want to wake up on the listen.
+        *
+        * So in the case we have a socket FD, we'll actually open the listen FD
+        * regardless (glibc handles this), and we'll epoll on both FDs.
+        * Technically, either FD could fire and they'd get an epoll event for it,
+        * but I think socket users will use only listen or data.
+        *
+        * As far as tracking the FD goes for epoll_wait() reporting, if the app
+        * wants to track the FD they think we are using, then they already passed
+        * that in event->data. */
+       _sock_lookup_rock_fds(fd, TRUE, &sock_listen_fd, &sock_ctl_fd);
        if (sock_listen_fd >= 0) {
-               /* It's possible to fail here.  Even though we tapped it already, if the
-                * deletion was triggered from close callbacks, it's possible for the
-                * sock_listen_fd to be closed first, which would have triggered an
-                * epoll_ctl_del.  When we get around to closing the Rock FD, the listen
-                * FD was already closed. */
-               __epoll_ctl_del(ep, sock_listen_fd, event);
+               listen_event.events = EPOLLET | EPOLLIN | EPOLLHUP;
+               listen_event.data = event->data;
+               ret = __epoll_ctl_add_raw(ep, sock_listen_fd, &listen_event);
+               if (ret < 0)
+                       return ret;
        }
+       return __epoll_ctl_add_raw(ep, fd, event);
+}
+
+static int __epoll_ctl_del_raw(struct epoll_ctlr *ep, int fd,
+                               struct epoll_event *event)
+{
+       struct ceq_event *ceq_ev;
+       struct ep_fd_data *ep_fd;
+       struct fd_tap_req tap_req = {0};
+
        ceq_ev = ep_get_ceq_ev(ep, fd);
        if (!ceq_ev) {
                errno = ENOENT;
@@ -387,6 +462,30 @@ static int __epoll_ctl_del(struct epoll_ctlr *ep, int fd,
        return 0;
 }
 
+static int __epoll_ctl_del(struct epoll_ctlr *ep, int fd,
+                           struct epoll_event *event)
+{
+       int sock_listen_fd, sock_ctl_fd;
+
+       /* If we were dealing with a socket shim FD, we tapped both the listen and
+        * the data file and need to untap both of them.
+        *
+        * We could be called from a close_cb, and we already closed the listen FD.
+        * In that case, we don't want to try and open it.  If the listen FD isn't
+        * open, then we know it isn't in an epoll set.  We also know the data FD
+        * isn't epolled either, since we always epoll both FDs for rocks. */
+       _sock_lookup_rock_fds(fd, FALSE, &sock_listen_fd, &sock_ctl_fd);
+       if (sock_listen_fd >= 0) {
+               /* It's possible to fail here.  Even though we tapped it already, if the
+                * deletion was triggered from close callbacks, it's possible for the
+                * sock_listen_fd to be closed first, which would have triggered an
+                * epoll_ctl_del.  When we get around to closing the Rock FD, the listen
+                * FD was already closed. */
+               __epoll_ctl_del_raw(ep, sock_listen_fd, event);
+       }
+       return __epoll_ctl_del_raw(ep, fd, event);
+}
+
 int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
 {
        int ret;
@@ -442,73 +541,87 @@ static bool get_ep_event_from_msg(struct epoll_ctlr *ep, struct event_msg *msg,
                return FALSE;
        }
        ep_ev->data = ep_fd->ep_event.data;
-       ep_ev->events = taps_to_ep_events(msg->ev_arg2);
+       /* The events field was initialized to 0 in epoll_wait() */
+       ep_ev->events |= taps_to_ep_events(msg->ev_arg2);
        return TRUE;
 }
 
-/* We should be able to have multiple waiters.  ep shouldn't be closed or
- * anything, since we have the FD (that'd be bad programming on the user's
- * behalf).  We could have concurrent ADD/MOD/DEL operations (which lock). */
-static int __epoll_wait(struct epoll_ctlr *ep, struct epoll_event *events,
-                        int maxevents, int timeout)
+/* Helper: extracts as many epoll_events as possible from the ep. */
+static int __epoll_wait_poll(struct epoll_ctlr *ep, struct epoll_event *events,
+                             int maxevents)
 {
        struct event_msg msg = {0};
-       struct event_msg dummy_msg;
-       struct event_queue *which_evq;
-       struct event_queue *alarm_evq;
        int nr_ret = 0;
-       int recurse_ret;
-       struct syscall sysc;
 
+       if (maxevents <= 0)
+               return 0;
        /* Locking to protect get_ep_event_from_msg, specifically that the ep_fd
         * stored at ceq_ev->user_data does not get concurrently removed and
         * freed. */
        uth_mutex_lock(ep->mtx);
        for (int i = 0; i < maxevents; i++) {
-               if (uth_check_evqs(&msg, &which_evq, 1, ep->ceq_evq)) {
-                       if (get_ep_event_from_msg(ep, &msg, &events[i]))
-                               nr_ret++;
-               }
+retry:
+               if (!uth_check_evqs(&msg, NULL, 1, ep->ceq_evq))
+                       break;
+               if (!get_ep_event_from_msg(ep, &msg, &events[i]))
+                       goto retry;
+               nr_ret++;
        }
        uth_mutex_unlock(ep->mtx);
+       return nr_ret;
+}
+
+/* We should be able to have multiple waiters.  ep shouldn't be closed or
+ * anything, since we have the FD (that'd be bad programming on the user's
+ * behalf).  We could have concurrent ADD/MOD/DEL operations (which lock). */
+static int __epoll_wait(struct epoll_ctlr *ep, struct epoll_event *events,
+                        int maxevents, int timeout)
+{
+       struct event_msg msg = {0};
+       struct event_msg dummy_msg;
+       struct event_queue *which_evq;
+       struct ep_alarm *ep_a;
+       int nr_ret;
+
+       nr_ret = __epoll_wait_poll(ep, events, maxevents);
        if (nr_ret)
                return nr_ret;
        if (timeout == 0)
                return 0;
+       /* From here on down, we're going to block until there is some activity */
        if (timeout != -1) {
-               alarm_evq = ep_get_alarm_evq();
-               syscall_async(&sysc, SYS_block, timeout * 1000);
-               if (!register_evq(&sysc, alarm_evq)) {
-                       /* timeout occurred before we could even block! */
-                       ep_put_alarm_evq(alarm_evq);
+               ep_a = kmem_cache_alloc(ep_alarms_cache, 0);
+               assert(ep_a);
+               syscall_async_evq(&ep_a->sysc, ep_a->alarm_evq, SYS_block,
+                                 timeout * 1000);
+               uth_blockon_evqs(&msg, &which_evq, 2, ep->ceq_evq, ep_a->alarm_evq);
+               if (which_evq == ep_a->alarm_evq) {
+                       kmem_cache_free(ep_alarms_cache, ep_a);
                        return 0;
                }
-               uth_blockon_evqs(&msg, &which_evq, 2, ep->ceq_evq, alarm_evq);
-               if (which_evq != alarm_evq) {
-                       /* sysc may or may not have finished yet.  this will force it to
-                        * *start* to finish iff it is still a submitted syscall. */
-                       sys_abort_sysc(&sysc);
-                       /* But we still need to wait until the syscall completed.  Need a
-                        * dummy msg, since we don't want to clobber the real msg. */
-                       uth_blockon_evqs(&dummy_msg, 0, 1, alarm_evq);
-               }
-               /* TODO: Slightly dangerous, due to spammed INDIRs */
-               ep_put_alarm_evq(alarm_evq);
-               if (which_evq == alarm_evq)
-                       return 0;
+               /* The alarm sysc may or may not have finished yet.  This will force it
+                * to *start* to finish iff it is still a submitted syscall. */
+               sys_abort_sysc(&ep_a->sysc);
+               /* But we still need to wait until the syscall completed.  Need a
+                * dummy msg, since we don't want to clobber the real msg. */
+               uth_blockon_evqs(&dummy_msg, 0, 1, ep_a->alarm_evq);
+               kmem_cache_free(ep_alarms_cache, ep_a);
        } else {
                uth_blockon_evqs(&msg, &which_evq, 1, ep->ceq_evq);
        }
        uth_mutex_lock(ep->mtx);
        if (get_ep_event_from_msg(ep, &msg, &events[0]))
-               nr_ret++;
+               nr_ret = 1;
        uth_mutex_unlock(ep->mtx);
-       /* We might not have gotten one yet.  And regardless, there might be more
-        * available.  Let's try again, with timeout == 0 to ensure no blocking.  We
-        * use nr_ret (0 or 1 now) to adjust maxevents and events accordingly. */
-       recurse_ret = __epoll_wait(ep, events + nr_ret, maxevents - nr_ret, 0);
-       if (recurse_ret > 0)
-               nr_ret += recurse_ret;
+       /* We had to extract one message already as part of the blocking process.
+        * We might be able to get more. */
+       nr_ret += __epoll_wait_poll(ep, events + nr_ret, maxevents - nr_ret);
+       /* This is a little nasty and hopefully a rare race.  We still might not
+        * have a ret, but we expected to block until we had something.  We didn't
+        * time out yet, but we spuriously woke up.  We need to try again (ideally,
+        * we'd subtract the time left from the original timeout). */
+       if (!nr_ret)
+               return __epoll_wait(ep, events, maxevents, timeout);
        return nr_ret;
 }
 
@@ -516,7 +629,7 @@ int epoll_wait(int epfd, struct epoll_event *events, int maxevents,
                int timeout)
 {
        struct epoll_ctlr *ep = fd_to_cltr(epfd);
-       int ret;
+
        if (!ep) {
                errno = EBADF;/* or EINVAL */
                return -1;
@@ -525,8 +638,9 @@ int epoll_wait(int epfd, struct epoll_event *events, int maxevents,
                errno = EINVAL;
                return -1;
        }
-       ret = __epoll_wait(ep, events, maxevents, timeout);
-       return ret;
+       for (int i = 0; i < maxevents; i++)
+               events[i].events = 0;
+       return __epoll_wait(ep, events, maxevents, timeout);
 }
 
 int epoll_pwait(int epfd, struct epoll_event *events, int maxevents,