epoll: Set up the alarm_evq at init time
[akaros.git] / user / iplib / epoll.c
1 /* Copyright (c) 2015 Google Inc.
2  * Barret Rhoden <brho@cs.berkeley.edu>
3  * See LICENSE for details.
4  *
5  * Epoll, built on FD taps, CEQs, and blocking uthreads on event queues.
6  *
7  * TODO: There are a few incompatibilities with Linux's epoll, some of which are
8  * artifacts of the implementation, and other issues:
9  *      - you can't epoll on an epoll fd (or any user fd).  you can only epoll on a
10  *      kernel FD that accepts your FD taps.
11  *      - there's no EPOLLONESHOT or level-triggered support.
12  *      - you can only tap one FD at a time, so you can't add the same FD to
13  *      multiple epoll sets.
14  *      - there is no support for growing the epoll set.
15  *      - closing the epoll is a little dangerous, if there are outstanding INDIR
16  *      events.  this will only pop up if you're yielding cores, maybe getting
17  *      preempted, and are unlucky.
18  *      - epoll_create1 does not support CLOEXEC.  That'd need some work in glibc's
19  *      exec and flags in struct user_fd.
20  *      - EPOLL_CTL_MOD is just a DEL then an ADD.  There might be races associated
21  *      with that.
22  *      - epoll_pwait is probably racy.
23  *      - You can't dup an epoll fd (same as other user FDs).
24  *      - If you add a BSD socket FD to an epoll set, you'll get taps on both the
25  *      data FD and the listen FD.
26  *      - If you add the same BSD socket listener to multiple epoll sets, you will
27  *      likely fail.  This is in addition to being able to tap only one FD at a
28  *      time.
29  * */
30
31 #include <sys/epoll.h>
32 #include <parlib/parlib.h>
33 #include <parlib/event.h>
34 #include <parlib/ceq.h>
35 #include <parlib/uthread.h>
36 #include <parlib/timing.h>
37 #include <sys/user_fd.h>
38 #include <sys/close_cb.h>
39 #include <stdio.h>
40 #include <errno.h>
41 #include <unistd.h>
42 #include <malloc.h>
43 #include <sys/queue.h>
44 #include <sys/plan9_helpers.h>
45
46 /* Sanity check, so we can ID our own FDs */
47 #define EPOLL_UFD_MAGIC                 0xe9011
48
49 struct epoll_ctlr {
50         TAILQ_ENTRY(epoll_ctlr)         link;
51         struct event_queue                      *ceq_evq;
52         struct event_queue                      *alarm_evq;
53         struct ceq                                      *ceq;   /* convenience pointer */
54         unsigned int                            size;
55         uth_mutex_t                                     mtx;
56         struct user_fd                          ufd;
57 };
58
59 TAILQ_HEAD(epoll_ctlrs, epoll_ctlr);
60 static struct epoll_ctlrs all_ctlrs = TAILQ_HEAD_INITIALIZER(all_ctlrs);
61 static uth_mutex_t ctlrs_mtx;
62
63 /* There's some bookkeeping we need to maintain on every FD.  Right now, the FD
64  * is the index into the CEQ event array, so we can just hook this into the user
65  * data blob in the ceq_event.
66  *
67  * If we ever do not maintain a 1:1 mapping from FDs to CEQ IDs, we can use this
68  * to track the CEQ ID and FD. */
69 struct ep_fd_data {
70         struct epoll_event                      ep_event;
71         int                                                     fd;
72         int                                                     filter;
73 };
74
75 /* Converts epoll events to FD taps. */
76 static int ep_events_to_taps(uint32_t ep_ev)
77 {
78         int taps = 0;
79         if (ep_ev & EPOLLIN)
80                 taps |= FDTAP_FILT_READABLE;
81         if (ep_ev & EPOLLOUT)
82                 taps |= FDTAP_FILT_WRITABLE;
83         if (ep_ev & EPOLLRDHUP)
84                 taps |= FDTAP_FILT_RDHUP;
85         if (ep_ev & EPOLLPRI)
86                 taps |= FDTAP_FILT_PRIORITY;
87         if (ep_ev & EPOLLERR)
88                 taps |= FDTAP_FILT_ERROR;
89         if (ep_ev & EPOLLHUP)
90                 taps |= FDTAP_FILT_HANGUP;
91         return taps;
92 }
93
94 /* Converts corresponding FD Taps to epoll events.  There are other taps that do
95  * not make sense for epoll. */
96 static uint32_t taps_to_ep_events(int taps)
97 {
98         uint32_t ep_ev = 0;
99         if (taps & FDTAP_FILT_READABLE)
100                 ep_ev |= EPOLLIN;
101         if (taps & FDTAP_FILT_WRITABLE)
102                 ep_ev |= EPOLLOUT;
103         if (taps & FDTAP_FILT_RDHUP)
104                 ep_ev |= EPOLLRDHUP;
105         if (taps & FDTAP_FILT_PRIORITY)
106                 ep_ev |= EPOLLPRI;
107         if (taps & FDTAP_FILT_ERROR)
108                 ep_ev |= EPOLLERR;
109         if (taps & FDTAP_FILT_HANGUP)
110                 ep_ev |= EPOLLHUP;
111         return ep_ev;
112 }
113
114 static struct ceq_event *ep_get_ceq_ev(struct epoll_ctlr *ep, size_t idx)
115 {
116         if (ep->ceq_evq->ev_mbox->ceq.nr_events <= idx)
117                 return 0;
118         return &ep->ceq_evq->ev_mbox->ceq.events[idx];
119 }
120
121 static struct epoll_ctlr *fd_to_cltr(int fd)
122 {
123         struct user_fd *ufd = ufd_lookup(fd);
124         if (!ufd)
125                 return 0;
126         if (ufd->magic != EPOLL_UFD_MAGIC) {
127                 errno = EBADF;
128                 return 0;
129         }
130         return container_of(ufd, struct epoll_ctlr, ufd);
131 }
132
133 /* Event queue helpers: */
134 static struct event_queue *ep_get_ceq_evq(unsigned int ceq_size)
135 {
136         struct event_queue *ceq_evq = get_eventq_raw();
137         ceq_evq->ev_mbox->type = EV_MBOX_CEQ;
138         ceq_init(&ceq_evq->ev_mbox->ceq, CEQ_OR, ceq_size, ceq_size);
139         ceq_evq->ev_flags = EVENT_INDIR | EVENT_SPAM_INDIR | EVENT_WAKEUP;
140         evq_attach_wakeup_ctlr(ceq_evq);
141         return ceq_evq;
142 }
143
144 static struct event_queue *ep_get_alarm_evq(void)
145 {
146         /* Don't care about the actual message, just using it for a wakeup */
147         struct event_queue *alarm_evq = get_eventq(EV_MBOX_BITMAP);
148         alarm_evq->ev_flags = EVENT_INDIR | EVENT_SPAM_INDIR | EVENT_WAKEUP;
149         evq_attach_wakeup_ctlr(alarm_evq);
150         return alarm_evq;
151 }
152
153 /* Once we've closed our sources of events, we can try to clean up the event
154  * queues.  These are actually dangerous, since there could be INDIRs floating
155  * around for these evqs still, which are basically pointers.  We'll need to run
156  * some sort of user deferred destruction. (TODO). */
157 static void ep_put_ceq_evq(struct event_queue *ceq_evq)
158 {
159 #if 0 /* TODO: EVQ/INDIR Cleanup */
160         ceq_cleanup(&ceq_evq->ev_mbox->ceq);
161         evq_remove_wakeup_ctlr(ceq_evq);
162         put_eventq_raw(ceq_evq);
163 #endif
164 }
165
166 static void ep_put_alarm_evq(struct event_queue *alarm_evq)
167 {
168 #if 0 /* TODO: EVQ/INDIR Cleanup */
169         evq_remove_wakeup_ctlr(alarm_evq);
170         put_eventq(alarm_evq);
171 #endif
172 }
173
174 static void epoll_close(struct user_fd *ufd)
175 {
176         struct epoll_ctlr *ep = container_of(ufd, struct epoll_ctlr, ufd);
177         struct fd_tap_req *tap_reqs, *tap_req_i;
178         struct ceq_event *ceq_ev_i;
179         struct ep_fd_data *ep_fd_i;
180         int nr_tap_req = 0;
181         int nr_done = 0;
182
183         tap_reqs = malloc(sizeof(struct fd_tap_req) * ep->size);
184         memset(tap_reqs, 0, sizeof(struct fd_tap_req) * ep->size);
185         /* Slightly painful, O(n) with no escape hatch */
186         for (int i = 0; i < ep->size; i++) {
187                 ceq_ev_i = ep_get_ceq_ev(ep, i);
188                 /* CEQ should have been big enough for our size */
189                 assert(ceq_ev_i);
190                 ep_fd_i = (struct ep_fd_data*)ceq_ev_i->user_data;
191                 if (!ep_fd_i)
192                         continue;
193                 tap_req_i = &tap_reqs[nr_tap_req++];
194                 tap_req_i->fd = i;
195                 tap_req_i->cmd = FDTAP_CMD_REM;
196                 free(ep_fd_i);
197         }
198         /* Requests could fail if the tapped files are already closed.  We need to
199          * skip the failed one (the +1) and untap the rest. */
200         do {
201                 nr_done += sys_tap_fds(tap_reqs + nr_done, nr_tap_req - nr_done);
202                 nr_done += 1;   /* nr_done could be more than nr_tap_req now */
203         } while (nr_done < nr_tap_req);
204         free(tap_reqs);
205         ep_put_ceq_evq(ep->ceq_evq);
206         ep_put_alarm_evq(ep->alarm_evq);
207         uth_mutex_lock(ctlrs_mtx);
208         TAILQ_REMOVE(&all_ctlrs, ep, link);
209         uth_mutex_unlock(ctlrs_mtx);
210         uth_mutex_free(ep->mtx);
211         free(ep);
212 }
213
214 static int init_ep_ctlr(struct epoll_ctlr *ep, int size)
215 {
216         unsigned int ceq_size;
217
218         /* TODO: we don't grow yet.  Until then, we help out a little. */
219         if (size == 1)
220                 size = 128;
221         ceq_size = ROUNDUPPWR2(size);
222         ep->size = ceq_size;
223         ep->mtx = uth_mutex_alloc();
224         ep->ufd.magic = EPOLL_UFD_MAGIC;
225         ep->ufd.close = epoll_close;
226         ep->ceq_evq = ep_get_ceq_evq(ceq_size);
227         ep->alarm_evq = ep_get_alarm_evq();
228         return 0;
229 }
230
231 static void epoll_fd_closed(int fd)
232 {
233         struct epoll_ctlr *ep;
234
235         /* Lockless peek, avoid locking for every close() */
236         if (TAILQ_EMPTY(&all_ctlrs))
237                 return;
238         uth_mutex_lock(ctlrs_mtx);
239         TAILQ_FOREACH(ep, &all_ctlrs, link)
240                 epoll_ctl(ep->ufd.fd, EPOLL_CTL_DEL, fd, 0);
241         uth_mutex_unlock(ctlrs_mtx);
242 }
243
244 static void epoll_init(void)
245 {
246         static struct close_cb epoll_close_cb = {.func = epoll_fd_closed};
247
248         register_close_cb(&epoll_close_cb);
249         ctlrs_mtx = uth_mutex_alloc();
250 }
251
252 int epoll_create(int size)
253 {
254         int fd;
255         struct epoll_ctlr *ep;
256
257         run_once(epoll_init());
258         /* good thing the arg is a signed int... */
259         if (size < 0) {
260                 errno = EINVAL;
261                 return -1;
262         }
263         ep = malloc(sizeof(struct epoll_ctlr));
264         memset(ep, 0, sizeof(struct epoll_ctlr));
265         if (init_ep_ctlr(ep, size)) {
266                 free(ep);
267                 return -1;
268         }
269         fd = ufd_get_fd(&ep->ufd);
270         if (fd < 0)
271                 free(ep);
272         uth_mutex_lock(ctlrs_mtx);
273         TAILQ_INSERT_TAIL(&all_ctlrs, ep, link);
274         uth_mutex_unlock(ctlrs_mtx);
275         return fd;
276 }
277
278 int epoll_create1(int flags)
279 {
280         /* TODO: we're supposed to support CLOEXEC.  Our FD is a user_fd, so that'd
281          * require some support in glibc's exec to close our epoll ctlr. */
282         return epoll_create(1);
283 }
284
285 static int __epoll_ctl_add(struct epoll_ctlr *ep, int fd,
286                            struct epoll_event *event)
287 {
288         struct ceq_event *ceq_ev;
289         struct ep_fd_data *ep_fd;
290         struct fd_tap_req tap_req = {0};
291         int ret, filter, sock_listen_fd;
292         struct epoll_event listen_event;
293
294         /* Only support ET.  Also, we just ignore EPOLLONESHOT.  That might work,
295          * logically, just with spurious events firing. */
296         if (!(event->events & EPOLLET)) {
297                 errno = EPERM;
298                 werrstr("Epoll level-triggered not supported");
299                 return -1;
300         }
301         /* The sockets-to-plan9 networking shims are a bit inconvenient.  The user
302          * asked us to epoll on an FD, but that FD is actually a Qdata FD.  We might
303          * need to actually epoll on the listen_fd.  Further, we don't know yet
304          * whether or not they want the listen FD.  They could epoll on the socket,
305          * then listen later and want to wake up on the listen.
306          *
307          * So in the case we have a socket FD, we'll actually open the listen FD
308          * regardless (glibc handles this), and we'll epoll on both FDs.
309          * Technically, either FD could fire and they'd get an epoll event for it,
310          * but I think socket users will use only listen or data.
311          *
312          * As far as tracking the FD goes for epoll_wait() reporting, if the app
313          * wants to track the FD they think we are using, then they already passed
314          * that in event->data. */
315         sock_listen_fd = _sock_lookup_listen_fd(fd);
316         if (sock_listen_fd >= 0) {
317                 listen_event.events = EPOLLET | EPOLLIN | EPOLLHUP;
318                 listen_event.data = event->data;
319                 ret = __epoll_ctl_add(ep, sock_listen_fd, &listen_event);
320                 if (ret < 0)
321                         return ret;
322         }
323         ceq_ev = ep_get_ceq_ev(ep, fd);
324         if (!ceq_ev) {
325                 errno = ENOMEM;
326                 werrstr("Epoll set cannot grow yet!");
327                 return -1;
328         }
329         ep_fd = (struct ep_fd_data*)ceq_ev->user_data;
330         if (ep_fd) {
331                 errno = EEXIST;
332                 return -1;
333         }
334         tap_req.fd = fd;
335         tap_req.cmd = FDTAP_CMD_ADD;
336         /* EPOLLHUP is implicitly set for all epolls. */
337         filter = ep_events_to_taps(event->events | EPOLLHUP);
338         tap_req.filter = filter;
339         tap_req.ev_q = ep->ceq_evq;
340         tap_req.ev_id = fd;     /* using FD as the CEQ ID */
341         ret = sys_tap_fds(&tap_req, 1);
342         if (ret != 1)
343                 return -1;
344         ep_fd = malloc(sizeof(struct ep_fd_data));
345         ep_fd->fd = fd;
346         ep_fd->filter = filter;
347         ep_fd->ep_event = *event;
348         ep_fd->ep_event.events |= EPOLLHUP;
349         ceq_ev->user_data = (uint64_t)ep_fd;
350         return 0;
351 }
352
353 static int __epoll_ctl_del(struct epoll_ctlr *ep, int fd,
354                            struct epoll_event *event)
355 {
356         struct ceq_event *ceq_ev;
357         struct ep_fd_data *ep_fd;
358         struct fd_tap_req tap_req = {0};
359         int ret, sock_listen_fd;
360
361         /* If we were dealing with a socket shim FD, we tapped both the listen and
362          * the data file and need to untap both of them. */
363         sock_listen_fd = _sock_lookup_listen_fd(fd);
364         if (sock_listen_fd >= 0) {
365                 /* It's possible to fail here.  Even though we tapped it already, if the
366                  * deletion was triggered from close callbacks, it's possible for the
367                  * sock_listen_fd to be closed first, which would have triggered an
368                  * epoll_ctl_del.  When we get around to closing the Rock FD, the listen
369                  * FD was already closed. */
370                 __epoll_ctl_del(ep, sock_listen_fd, event);
371         }
372         ceq_ev = ep_get_ceq_ev(ep, fd);
373         if (!ceq_ev) {
374                 errno = ENOENT;
375                 return -1;
376         }
377         ep_fd = (struct ep_fd_data*)ceq_ev->user_data;
378         if (!ep_fd) {
379                 errno = ENOENT;
380                 return -1;
381         }
382         assert(ep_fd->fd == fd);
383         tap_req.fd = fd;
384         tap_req.cmd = FDTAP_CMD_REM;
385         /* ignoring the return value; we could have failed to remove it if the FD
386          * has already closed and the kernel removed the tap. */
387         sys_tap_fds(&tap_req, 1);
388         ceq_ev->user_data = 0;
389         free(ep_fd);
390         return 0;
391 }
392
393 int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
394 {
395         int ret;
396         struct epoll_ctlr *ep = fd_to_cltr(epfd);
397         if (!ep) {
398                 errno = EBADF;/* or EINVAL */
399                 return -1;
400         }
401         if (fd >= USER_FD_BASE) {
402                 errno = EINVAL;
403                 werrstr("Epoll can't track User FDs");
404                 return -1;
405         }
406         uth_mutex_lock(ep->mtx);
407         switch (op) {
408                 case (EPOLL_CTL_MOD):
409                         /* In lieu of a proper MOD, just remove and readd.  The errors might
410                          * not work out well, and there could be a missed event in the
411                          * middle.  Not sure what the guarantees are, but we can fake a
412                          * poke. (TODO). */
413                         ret = __epoll_ctl_del(ep, fd, 0);
414                         if (ret)
415                                 break;
416                         ret = __epoll_ctl_add(ep, fd, event);
417                         break;
418                 case (EPOLL_CTL_ADD):
419                         ret = __epoll_ctl_add(ep, fd, event);
420                         break;
421                 case (EPOLL_CTL_DEL):
422                         ret = __epoll_ctl_del(ep, fd, event);
423                         break;
424                 default:
425                         errno = EINVAL;
426                         ret = -1;
427         }
428         uth_mutex_unlock(ep->mtx);
429         return ret;
430 }
431
432 static bool get_ep_event_from_msg(struct epoll_ctlr *ep, struct event_msg *msg,
433                                   struct epoll_event *ep_ev)
434 {
435         struct ceq_event *ceq_ev;
436         struct ep_fd_data *ep_fd;
437
438         ceq_ev = ep_get_ceq_ev(ep, msg->ev_type);
439         /* should never get a tap FD > size of the epoll set */
440         assert(ceq_ev);
441         ep_fd = (struct ep_fd_data*)ceq_ev->user_data;
442         if (!ep_fd) {
443                 /* it's possible the FD was unregistered and this was an old
444                  * event sent to this epoll set. */
445                 return FALSE;
446         }
447         ep_ev->data = ep_fd->ep_event.data;
448         /* The events field was initialized to 0 in epoll_wait() */
449         ep_ev->events |= taps_to_ep_events(msg->ev_arg2);
450         return TRUE;
451 }
452
453 /* Helper: extracts as many epoll_events as possible from the ep. */
454 static int __epoll_wait_poll(struct epoll_ctlr *ep, struct epoll_event *events,
455                              int maxevents)
456 {
457         struct event_msg msg = {0};
458         int nr_ret = 0;
459
460         if (maxevents <= 0)
461                 return 0;
462         /* Locking to protect get_ep_event_from_msg, specifically that the ep_fd
463          * stored at ceq_ev->user_data does not get concurrently removed and
464          * freed. */
465         uth_mutex_lock(ep->mtx);
466         for (int i = 0; i < maxevents; i++) {
467 retry:
468                 if (!uth_check_evqs(&msg, NULL, 1, ep->ceq_evq))
469                         break;
470                 if (!get_ep_event_from_msg(ep, &msg, &events[i]))
471                         goto retry;
472                 nr_ret++;
473         }
474         uth_mutex_unlock(ep->mtx);
475         return nr_ret;
476 }
477
478 /* We should be able to have multiple waiters.  ep shouldn't be closed or
479  * anything, since we have the FD (that'd be bad programming on the user's
480  * behalf).  We could have concurrent ADD/MOD/DEL operations (which lock). */
481 static int __epoll_wait(struct epoll_ctlr *ep, struct epoll_event *events,
482                         int maxevents, int timeout)
483 {
484         struct event_msg msg = {0};
485         struct event_msg dummy_msg;
486         struct event_queue *which_evq;
487         int nr_ret;
488         struct syscall sysc;
489
490         nr_ret = __epoll_wait_poll(ep, events, maxevents);
491         if (nr_ret)
492                 return nr_ret;
493         if (timeout == 0)
494                 return 0;
495         /* From here on down, we're going to block until there is some activity */
496         if (timeout != -1) {
497                 syscall_async_evq(&sysc, ep->alarm_evq, SYS_block, timeout * 1000);
498                 uth_blockon_evqs(&msg, &which_evq, 2, ep->ceq_evq, ep->alarm_evq);
499                 if (which_evq == ep->alarm_evq)
500                         return 0;
501                 /* The alarm sysc may or may not have finished yet.  This will force it
502                  * to *start* to finish iff it is still a submitted syscall. */
503                 sys_abort_sysc(&sysc);
504                 /* But we still need to wait until the syscall completed.  Need a
505                  * dummy msg, since we don't want to clobber the real msg. */
506                 uth_blockon_evqs(&dummy_msg, 0, 1, ep->alarm_evq);
507         } else {
508                 uth_blockon_evqs(&msg, &which_evq, 1, ep->ceq_evq);
509         }
510         uth_mutex_lock(ep->mtx);
511         if (get_ep_event_from_msg(ep, &msg, &events[0]))
512                 nr_ret = 1;
513         uth_mutex_unlock(ep->mtx);
514         /* We had to extract one message already as part of the blocking process.
515          * We might be able to get more. */
516         nr_ret += __epoll_wait_poll(ep, events + nr_ret, maxevents - nr_ret);
517         /* This is a little nasty and hopefully a rare race.  We still might not
518          * have a ret, but we expected to block until we had something.  We didn't
519          * time out yet, but we spuriously woke up.  We need to try again (ideally,
520          * we'd subtract the time left from the original timeout). */
521         if (!nr_ret)
522                 return __epoll_wait(ep, events, maxevents, timeout);
523         return nr_ret;
524 }
525
526 int epoll_wait(int epfd, struct epoll_event *events, int maxevents,
527                int timeout)
528 {
529         struct epoll_ctlr *ep = fd_to_cltr(epfd);
530
531         if (!ep) {
532                 errno = EBADF;/* or EINVAL */
533                 return -1;
534         }
535         if (maxevents <= 0) {
536                 errno = EINVAL;
537                 return -1;
538         }
539         for (int i = 0; i < maxevents; i++)
540                 events[i].events = 0;
541         return __epoll_wait(ep, events, maxevents, timeout);
542 }
543
544 int epoll_pwait(int epfd, struct epoll_event *events, int maxevents,
545                 int timeout, const sigset_t *sigmask)
546 {
547         int ready;
548         sigset_t origmask;
549         /* TODO: this is probably racy */
550         sigprocmask(SIG_SETMASK, sigmask, &origmask);
551         ready = epoll_wait(epfd, events, maxevents, timeout);
552         sigprocmask(SIG_SETMASK, &origmask, NULL);
553         return ready;
554 }