* - there's no EPOLLONESHOT or level-triggered support.
* - you can only tap one FD at a time, so you can't add the same FD to
* multiple epoll sets.
- * - there is no support for growing the epoll set.
* - closing the epoll is a little dangerous, if there are outstanding INDIR
* events. this will only pop up if you're yielding cores, maybe getting
* preempted, and are unlucky.
* exec and flags in struct user_fd.
* - EPOLL_CTL_MOD is just a DEL then an ADD. There might be races associated
* with that.
- * - If you close a tracked FD without removing it from the epoll set, the
- * kernel will turn off the FD tap. You may still have an epoll event that was
- * concurrently sent. Likewise, that FD may be used again by your program, and
- * if you add *that* one to another epoll set before removing it from the
- * current one, weird things may happen (like having two epoll ctlrs turning on
- * and off taps).
* - epoll_pwait is probably racy.
- * - Using spin locks instead of mutexes during syscalls that could block. The
- * process won't deadlock, but it will busy wait on something like an RPC,
- * depending on the device being tapped.
* - You can't dup an epoll fd (same as other user FDs).
- * - If you add a BSD socket FD to an epoll set before calling listen(), you'll
- * only epoll on the data (which is inactive) instead of on the accept().
+ * - If you add a BSD socket FD to an epoll set, you'll get taps on both the
+ * data FD and the listen FD.
* - If you add the same BSD socket listener to multiple epoll sets, you will
* likely fail. This is in addition to being able to tap only one FD at a
* time.
#include <parlib/event.h>
#include <parlib/ceq.h>
#include <parlib/uthread.h>
-#include <parlib/spinlock.h>
#include <parlib/timing.h>
+#include <parlib/slab.h>
+#include <parlib/assert.h>
#include <sys/user_fd.h>
+#include <sys/close_cb.h>
#include <stdio.h>
#include <errno.h>
+#include <unistd.h>
+#include <malloc.h>
+#include <sys/queue.h>
+#include <sys/plan9_helpers.h>
+#include <ros/fs.h>
/* Sanity check, so we can ID our own FDs */
#define EPOLL_UFD_MAGIC 0xe9011
+/* Each waiter that uses a timeout will have its own structure for dealing with
+ * its timeout.
+ *
+ * TODO: (RCU/SLAB) it's not safe to reap the objects, until we sort out
+ * INDIRs and RCU-style grace periods. Not a big deal, since the number of
+ * these is the number of threads that concurrently do epoll timeouts. */
+struct ep_alarm {
+ struct event_queue *alarm_evq;
+ struct syscall sysc;
+};
+
+static struct kmem_cache *ep_alarms_cache;
+
struct epoll_ctlr {
+ TAILQ_ENTRY(epoll_ctlr) link;
struct event_queue *ceq_evq;
- struct ceq *ceq; /* convenience pointer */
- unsigned int size;
- struct spin_pdr_lock lock;
+ uth_mutex_t *mtx;
struct user_fd ufd;
};
+TAILQ_HEAD(epoll_ctlrs, epoll_ctlr);
+static struct epoll_ctlrs all_ctlrs = TAILQ_HEAD_INITIALIZER(all_ctlrs);
+static uth_mutex_t *ctlrs_mtx;
+
/* There's some bookkeeping we need to maintain on every FD. Right now, the FD
* is the index into the CEQ event array, so we can just hook this into the user
* data blob in the ceq_event.
struct epoll_event ep_event;
int fd;
int filter;
- int sock_listen_fd;
};
/* Converts epoll events to FD taps. */
return ep_ev;
}
+static unsigned int ep_get_ceq_max_ever(struct epoll_ctlr *ep)
+{
+ return atomic_read(&ep->ceq_evq->ev_mbox->ceq.max_event_ever);
+}
+
static struct ceq_event *ep_get_ceq_ev(struct epoll_ctlr *ep, size_t idx)
{
if (ep->ceq_evq->ev_mbox->ceq.nr_events <= idx)
}
/* Event queue helpers: */
-static struct event_queue *ep_get_ceq_evq(unsigned int ceq_size)
+static struct event_queue *ep_get_ceq_evq(unsigned int ceq_ring_sz)
{
struct event_queue *ceq_evq = get_eventq_raw();
ceq_evq->ev_mbox->type = EV_MBOX_CEQ;
- ceq_init(&ceq_evq->ev_mbox->ceq, CEQ_OR, ceq_size, ceq_size);
+ ceq_init(&ceq_evq->ev_mbox->ceq, CEQ_OR, NR_FILE_DESC_MAX, ceq_ring_sz);
ceq_evq->ev_flags = EVENT_INDIR | EVENT_SPAM_INDIR | EVENT_WAKEUP;
evq_attach_wakeup_ctlr(ceq_evq);
return ceq_evq;
* some sort of user deferred destruction. (TODO). */
static void ep_put_ceq_evq(struct event_queue *ceq_evq)
{
+#if 0 /* TODO: EVQ/INDIR Cleanup */
ceq_cleanup(&ceq_evq->ev_mbox->ceq);
evq_remove_wakeup_ctlr(ceq_evq);
put_eventq_raw(ceq_evq);
+#endif
}
static void ep_put_alarm_evq(struct event_queue *alarm_evq)
{
+#if 0 /* TODO: EVQ/INDIR Cleanup */
evq_remove_wakeup_ctlr(alarm_evq);
put_eventq(alarm_evq);
+#endif
}
static void epoll_close(struct user_fd *ufd)
struct ep_fd_data *ep_fd_i;
int nr_tap_req = 0;
int nr_done = 0;
+ unsigned int max_ceq_events = ep_get_ceq_max_ever(ep);
- tap_reqs = malloc(sizeof(struct fd_tap_req) * ep->size);
- memset(tap_reqs, 0, sizeof(struct fd_tap_req) * ep->size);
+ tap_reqs = malloc(sizeof(struct fd_tap_req) * max_ceq_events);
+ memset(tap_reqs, 0, sizeof(struct fd_tap_req) * max_ceq_events);
/* Slightly painful, O(n) with no escape hatch */
- for (int i = 0; i < ep->size; i++) {
+ for (int i = 0; i < max_ceq_events; i++) {
ceq_ev_i = ep_get_ceq_ev(ep, i);
/* CEQ should have been big enough for our size */
assert(ceq_ev_i);
ep_fd_i = (struct ep_fd_data*)ceq_ev_i->user_data;
if (!ep_fd_i)
continue;
- if (ep_fd_i->sock_listen_fd >= 0) {
- /* This tap is using a listen_fd, opened by __epoll_ctl_add, so the
- * user doesn't know about this FD. We need to remove the tap and
- * close the FD; the kernel will remove the tap when we close it. */
- close(ep_fd_i->sock_listen_fd);
- free(ep_fd_i);
- continue;
- }
tap_req_i = &tap_reqs[nr_tap_req++];
tap_req_i->fd = i;
tap_req_i->cmd = FDTAP_CMD_REM;
} while (nr_done < nr_tap_req);
free(tap_reqs);
ep_put_ceq_evq(ep->ceq_evq);
+ uth_mutex_lock(ctlrs_mtx);
+ TAILQ_REMOVE(&all_ctlrs, ep, link);
+ uth_mutex_unlock(ctlrs_mtx);
+ uth_mutex_free(ep->mtx);
+ free(ep);
}
static int init_ep_ctlr(struct epoll_ctlr *ep, int size)
{
- unsigned int ceq_size = ROUNDUPPWR2(size);
- /* TODO: we don't grow yet. Until then, we help out a little. */
if (size == 1)
size = 128;
- ep->size = ceq_size;
- spin_pdr_init(&ep->lock);
+ ep->mtx = uth_mutex_alloc();
ep->ufd.magic = EPOLL_UFD_MAGIC;
ep->ufd.close = epoll_close;
- ep->ceq_evq = ep_get_ceq_evq(ceq_size);
+ /* Size is a hint for the CEQ concurrency. We can actually handle as many
+ * kernel FDs as is possible. */
+ ep->ceq_evq = ep_get_ceq_evq(ROUNDUPPWR2(size));
+ return 0;
+}
+
+static void epoll_fd_closed(int fd)
+{
+ struct epoll_ctlr *ep;
+
+ /* Lockless peek, avoid locking for every close() */
+ if (TAILQ_EMPTY(&all_ctlrs))
+ return;
+ uth_mutex_lock(ctlrs_mtx);
+ TAILQ_FOREACH(ep, &all_ctlrs, link)
+ epoll_ctl(ep->ufd.fd, EPOLL_CTL_DEL, fd, 0);
+ uth_mutex_unlock(ctlrs_mtx);
+}
+
+static int ep_alarm_ctor(void *obj, void *priv, int flags)
+{
+ struct ep_alarm *ep_a = (struct ep_alarm*)obj;
+
+ ep_a->alarm_evq = ep_get_alarm_evq();
return 0;
}
+static void ep_alarm_dtor(void *obj, void *priv)
+{
+ struct ep_alarm *ep_a = (struct ep_alarm*)obj;
+
+ /* TODO: (RCU/SLAB). Somehow the slab allocator is trying to reap our
+ * objects. Note that when we update userspace to use magazines, the dtor
+ * will fire earlier (when the object is given to the slab layer). We'll
+ * need to be careful about the final freeing of the ev_q. */
+ panic("Epoll alarms should never be destroyed!");
+ ep_put_alarm_evq(ep_a->alarm_evq);
+}
+
+static void epoll_init(void *arg)
+{
+ static struct close_cb epoll_close_cb = {.func = epoll_fd_closed};
+
+ register_close_cb(&epoll_close_cb);
+ ctlrs_mtx = uth_mutex_alloc();
+ ep_alarms_cache = kmem_cache_create("epoll alarms",
+ sizeof(struct ep_alarm),
+ __alignof__(sizeof(struct ep_alarm)), 0,
+ ep_alarm_ctor, ep_alarm_dtor, NULL);
+ assert(ep_alarms_cache);
+}
+
int epoll_create(int size)
{
int fd;
struct epoll_ctlr *ep;
+ static parlib_once_t once = PARLIB_ONCE_INIT;
+
+ parlib_run_once(&once, epoll_init, NULL);
/* good thing the arg is a signed int... */
if (size < 0) {
errno = EINVAL;
fd = ufd_get_fd(&ep->ufd);
if (fd < 0)
free(ep);
+ uth_mutex_lock(ctlrs_mtx);
+ TAILQ_INSERT_TAIL(&all_ctlrs, ep, link);
+ uth_mutex_unlock(ctlrs_mtx);
return fd;
}
return epoll_create(1);
}
+/* Linux's epoll will check for events, even if edge-triggered, during
+ * additions (and probably modifications) to the epoll set. It's a questionable
+ * policy, since it can hide user bugs.
+ *
+ * We can do the same, though only for EPOLLIN and EPOLLOUT for FDs that can
+ * report their status via stat. (same as select()).
+ *
+ * Note that this could result in spurious events, which should be fine. */
+static void fire_existing_events(int fd, int ep_events,
+ struct event_queue *ev_q)
+{
+ struct stat stat_buf[1];
+ struct event_msg ev_msg[1];
+ int ret;
+ int synth_ep_events = 0;
+
+ ret = fstat(fd, stat_buf);
+ assert(!ret);
+ if ((ep_events & EPOLLIN) && S_READABLE(stat_buf->st_mode))
+ synth_ep_events |= EPOLLIN;
+ if ((ep_events & EPOLLOUT) && S_WRITABLE(stat_buf->st_mode))
+ synth_ep_events |= EPOLLOUT;
+ if (synth_ep_events) {
+ ev_msg->ev_type = fd;
+ ev_msg->ev_arg2 = ep_events_to_taps(synth_ep_events);
+ ev_msg->ev_arg3 = 0; /* tap->data is unused for epoll. */
+ sys_send_event(ev_q, ev_msg, vcore_id());
+ }
+}
+
static int __epoll_ctl_add(struct epoll_ctlr *ep, int fd,
struct epoll_event *event)
{
struct ep_fd_data *ep_fd;
struct fd_tap_req tap_req = {0};
int ret, filter, sock_listen_fd;
+ struct epoll_event listen_event;
/* Only support ET. Also, we just ignore EPOLLONESHOT. That might work,
* logically, just with spurious events firing. */
werrstr("Epoll level-triggered not supported");
return -1;
}
+ if (event->events & EPOLLONESHOT) {
+ errno = EPERM;
+ werrstr("Epoll one-shot not supported");
+ return -1;
+ }
/* The sockets-to-plan9 networking shims are a bit inconvenient. The user
- * asked us to epoll on an FD, but that FD is actually a Qdata FD. We need
- * to actually epoll on the listen_fd. We'll store this in the ep_fd, so
- * that later on we can close it.
+ * asked us to epoll on an FD, but that FD is actually a Qdata FD. We might
+ * need to actually epoll on the listen_fd. Further, we don't know yet
+ * whether or not they want the listen FD. They could epoll on the socket,
+ * then listen later and want to wake up on the listen.
+ *
+ * So in the case we have a socket FD, we'll actually open the listen FD
+ * regardless (glibc handles this), and we'll epoll on both FDs.
+ * Technically, either FD could fire and they'd get an epoll event for it,
+ * but I think socket users will use only listen or data.
*
* As far as tracking the FD goes for epoll_wait() reporting, if the app
* wants to track the FD they think we are using, then they already passed
- * that in event->data.
- *
- * But before we get too far, we need to make sure we aren't already tapping
- * this FD's listener (hence the lookup).
- *
- * This all assumes that this socket is only added to one epoll set at a
- * time. The _sock calls are racy, and once one epoller set up a listen_fd
- * in the Rock, we'll think that it was us. */
- extern int _sock_lookup_listen_fd(int sock_fd); /* in glibc */
- extern int _sock_get_listen_fd(int sock_fd);
- if (_sock_lookup_listen_fd(fd) >= 0) {
- errno = EEXIST;
- return -1;
+ * that in event->data. */
+ sock_listen_fd = _sock_lookup_listen_fd(fd);
+ if (sock_listen_fd >= 0) {
+ listen_event.events = EPOLLET | EPOLLIN | EPOLLHUP;
+ listen_event.data = event->data;
+ ret = __epoll_ctl_add(ep, sock_listen_fd, &listen_event);
+ if (ret < 0)
+ return ret;
}
- sock_listen_fd = _sock_get_listen_fd(fd);
- if (sock_listen_fd >= 0)
- fd = sock_listen_fd;
ceq_ev = ep_get_ceq_ev(ep, fd);
if (!ceq_ev) {
errno = ENOMEM;
ep_fd->filter = filter;
ep_fd->ep_event = *event;
ep_fd->ep_event.events |= EPOLLHUP;
- ep_fd->sock_listen_fd = sock_listen_fd;
ceq_ev->user_data = (uint64_t)ep_fd;
+ fire_existing_events(fd, ep_fd->ep_event.events, ep->ceq_evq);
return 0;
}
struct fd_tap_req tap_req = {0};
int ret, sock_listen_fd;
- /* They could be asking to clear an epoll for a listener. We need to remove
- * the tap for the real FD we tapped */
- extern int _sock_lookup_listen_fd(int sock_fd); /* in glibc */
+ /* If we were dealing with a socket shim FD, we tapped both the listen and
+ * the data file and need to untap both of them. */
sock_listen_fd = _sock_lookup_listen_fd(fd);
- if (sock_listen_fd >= 0)
- fd = sock_listen_fd;
+ if (sock_listen_fd >= 0) {
+ /* It's possible to fail here. Even though we tapped it already, if the
+ * deletion was triggered from close callbacks, it's possible for the
+ * sock_listen_fd to be closed first, which would have triggered an
+ * epoll_ctl_del. When we get around to closing the Rock FD, the listen
+ * FD was already closed. */
+ __epoll_ctl_del(ep, sock_listen_fd, event);
+ }
ceq_ev = ep_get_ceq_ev(ep, fd);
if (!ceq_ev) {
errno = ENOENT;
* has already closed and the kernel removed the tap. */
sys_tap_fds(&tap_req, 1);
ceq_ev->user_data = 0;
- assert(ep_fd->sock_listen_fd == sock_listen_fd);
- if (ep_fd->sock_listen_fd >= 0) {
- assert(ep_fd->sock_listen_fd == sock_listen_fd);
- close(ep_fd->sock_listen_fd);
- }
free(ep_fd);
return 0;
}
werrstr("Epoll can't track User FDs");
return -1;
}
- /* TODO: don't use a spinlock, use a mutex. sys_tap_fds can block. */
- spin_pdr_lock(&ep->lock);
+ uth_mutex_lock(ep->mtx);
switch (op) {
case (EPOLL_CTL_MOD):
/* In lieu of a proper MOD, just remove and readd. The errors might
errno = EINVAL;
ret = -1;
}
- spin_pdr_unlock(&ep->lock);
+ uth_mutex_unlock(ep->mtx);
return ret;
}
return FALSE;
}
ep_ev->data = ep_fd->ep_event.data;
- ep_ev->events = taps_to_ep_events(msg->ev_arg2);
+ /* The events field was initialized to 0 in epoll_wait() */
+ ep_ev->events |= taps_to_ep_events(msg->ev_arg2);
return TRUE;
}
+/* Helper: extracts as many epoll_events as possible from the ep. */
+static int __epoll_wait_poll(struct epoll_ctlr *ep, struct epoll_event *events,
+ int maxevents)
+{
+ struct event_msg msg = {0};
+ int nr_ret = 0;
+
+ if (maxevents <= 0)
+ return 0;
+ /* Locking to protect get_ep_event_from_msg, specifically that the ep_fd
+ * stored at ceq_ev->user_data does not get concurrently removed and
+ * freed. */
+ uth_mutex_lock(ep->mtx);
+ for (int i = 0; i < maxevents; i++) {
+retry:
+ if (!uth_check_evqs(&msg, NULL, 1, ep->ceq_evq))
+ break;
+ if (!get_ep_event_from_msg(ep, &msg, &events[i]))
+ goto retry;
+ nr_ret++;
+ }
+ uth_mutex_unlock(ep->mtx);
+ return nr_ret;
+}
+
/* We should be able to have multiple waiters. ep shouldn't be closed or
* anything, since we have the FD (that'd be bad programming on the user's
* behalf). We could have concurrent ADD/MOD/DEL operations (which lock). */
struct event_msg msg = {0};
struct event_msg dummy_msg;
struct event_queue *which_evq;
- struct event_queue *alarm_evq;
- int nr_ret = 0;
- int recurse_ret;
- struct syscall sysc;
+ struct ep_alarm *ep_a;
+ int nr_ret;
- /* Locking to protect get_ep_event_from_msg, specifically that the ep_fd
- * stored at ceq_ev->user_data does not get concurrently removed and
- * freed. */
- spin_pdr_lock(&ep->lock);
- for (int i = 0; i < maxevents; i++) {
- if (uth_check_evqs(&msg, &which_evq, 1, ep->ceq_evq)) {
- if (get_ep_event_from_msg(ep, &msg, &events[i]))
- nr_ret++;
- }
- }
- spin_pdr_unlock(&ep->lock);
+ nr_ret = __epoll_wait_poll(ep, events, maxevents);
if (nr_ret)
return nr_ret;
if (timeout == 0)
return 0;
+ /* From here on down, we're going to block until there is some activity */
if (timeout != -1) {
- alarm_evq = ep_get_alarm_evq();
- syscall_async(&sysc, SYS_block, timeout * 1000);
- if (!register_evq(&sysc, alarm_evq)) {
- /* timeout occurred before we could even block! */
- ep_put_alarm_evq(alarm_evq);
+ ep_a = kmem_cache_alloc(ep_alarms_cache, 0);
+ assert(ep_a);
+ syscall_async_evq(&ep_a->sysc, ep_a->alarm_evq, SYS_block,
+ timeout * 1000);
+ uth_blockon_evqs(&msg, &which_evq, 2, ep->ceq_evq, ep_a->alarm_evq);
+ if (which_evq == ep_a->alarm_evq) {
+ kmem_cache_free(ep_alarms_cache, ep_a);
return 0;
}
- uth_blockon_evqs(&msg, &which_evq, 2, ep->ceq_evq, alarm_evq);
- if (which_evq != alarm_evq) {
- /* sysc may or may not have finished yet. this will force it to
- * *start* to finish iff it is still a submitted syscall. */
- sys_abort_sysc(&sysc);
- /* But we still need to wait until the syscall completed. Need a
- * dummy msg, since we don't want to clobber the real msg. */
- uth_blockon_evqs(&dummy_msg, 0, 1, alarm_evq);
- }
- /* TODO: Slightly dangerous, due to spammed INDIRs */
- ep_put_alarm_evq(alarm_evq);
- if (which_evq == alarm_evq)
- return 0;
+ /* The alarm sysc may or may not have finished yet. This will force it
+ * to *start* to finish iff it is still a submitted syscall. */
+ sys_abort_sysc(&ep_a->sysc);
+ /* But we still need to wait until the syscall completed. Need a
+ * dummy msg, since we don't want to clobber the real msg. */
+ uth_blockon_evqs(&dummy_msg, 0, 1, ep_a->alarm_evq);
+ kmem_cache_free(ep_alarms_cache, ep_a);
} else {
uth_blockon_evqs(&msg, &which_evq, 1, ep->ceq_evq);
}
- spin_pdr_lock(&ep->lock);
+ uth_mutex_lock(ep->mtx);
if (get_ep_event_from_msg(ep, &msg, &events[0]))
- nr_ret++;
- spin_pdr_unlock(&ep->lock);
- /* We might not have gotten one yet. And regardless, there might be more
- * available. Let's try again, with timeout == 0 to ensure no blocking. We
- * use nr_ret (0 or 1 now) to adjust maxevents and events accordingly. */
- recurse_ret = __epoll_wait(ep, events + nr_ret, maxevents - nr_ret, 0);
- if (recurse_ret > 0)
- nr_ret += recurse_ret;
+ nr_ret = 1;
+ uth_mutex_unlock(ep->mtx);
+ /* We had to extract one message already as part of the blocking process.
+ * We might be able to get more. */
+ nr_ret += __epoll_wait_poll(ep, events + nr_ret, maxevents - nr_ret);
+ /* This is a little nasty and hopefully a rare race. We still might not
+ * have a ret, but we expected to block until we had something. We didn't
+ * time out yet, but we spuriously woke up. We need to try again (ideally,
+ * we'd subtract the time left from the original timeout). */
+ if (!nr_ret)
+ return __epoll_wait(ep, events, maxevents, timeout);
return nr_ret;
}
int timeout)
{
struct epoll_ctlr *ep = fd_to_cltr(epfd);
- int ret;
+
if (!ep) {
errno = EBADF;/* or EINVAL */
return -1;
errno = EINVAL;
return -1;
}
- ret = __epoll_wait(ep, events, maxevents, timeout);
- return ret;
+ for (int i = 0; i < maxevents; i++)
+ events[i].events = 0;
+ return __epoll_wait(ep, events, maxevents, timeout);
}
int epoll_pwait(int epfd, struct epoll_event *events, int maxevents,