pth: Use barriers in pthread tests
[akaros.git] / user / iplib / epoll.c
1 /* Copyright (c) 2015 Google Inc.
2  * Barret Rhoden <brho@cs.berkeley.edu>
3  * See LICENSE for details.
4  *
5  * Epoll, built on FD taps, CEQs, and blocking uthreads on event queues.
6  *
7  * TODO: There are a few incompatibilities with Linux's epoll, some of which are
8  * artifacts of the implementation, and other issues:
9  *      - you can't epoll on an epoll fd (or any user fd).  you can only epoll on a
10  *      kernel FD that accepts your FD taps.
11  *      - there's no EPOLLONESHOT or level-triggered support.
12  *      - you can only tap one FD at a time, so you can't add the same FD to
13  *      multiple epoll sets.
14  *      - closing the epoll is a little dangerous, if there are outstanding INDIR
15  *      events.  this will only pop up if you're yielding cores, maybe getting
16  *      preempted, and are unlucky.
17  *      - epoll_create1 does not support CLOEXEC.  That'd need some work in glibc's
18  *      exec and flags in struct user_fd.
19  *      - EPOLL_CTL_MOD is just a DEL then an ADD.  There might be races associated
20  *      with that.
21  *      - epoll_pwait is probably racy.
22  *      - You can't dup an epoll fd (same as other user FDs).
23  *      - If you add a BSD socket FD to an epoll set, you'll get taps on both the
24  *      data FD and the listen FD.
25  *      - If you add the same BSD socket listener to multiple epoll sets, you will
26  *      likely fail.  This is in addition to being able to tap only one FD at a
27  *      time.
28  * */
29
30 #include <sys/epoll.h>
31 #include <parlib/parlib.h>
32 #include <parlib/event.h>
33 #include <parlib/ceq.h>
34 #include <parlib/uthread.h>
35 #include <parlib/timing.h>
36 #include <sys/user_fd.h>
37 #include <sys/close_cb.h>
38 #include <stdio.h>
39 #include <errno.h>
40 #include <unistd.h>
41 #include <malloc.h>
42 #include <sys/queue.h>
43 #include <sys/plan9_helpers.h>
44
45 /* Sanity check, so we can ID our own FDs */
46 #define EPOLL_UFD_MAGIC                 0xe9011
47
48 struct epoll_ctlr {
49         TAILQ_ENTRY(epoll_ctlr)         link;
50         struct event_queue                      *ceq_evq;
51         struct event_queue                      *alarm_evq;
52         struct ceq                                      *ceq;   /* convenience pointer */
53         uth_mutex_t                                     mtx;
54         struct user_fd                          ufd;
55 };
56
57 TAILQ_HEAD(epoll_ctlrs, epoll_ctlr);
58 static struct epoll_ctlrs all_ctlrs = TAILQ_HEAD_INITIALIZER(all_ctlrs);
59 static uth_mutex_t ctlrs_mtx;
60
61 /* There's some bookkeeping we need to maintain on every FD.  Right now, the FD
62  * is the index into the CEQ event array, so we can just hook this into the user
63  * data blob in the ceq_event.
64  *
65  * If we ever do not maintain a 1:1 mapping from FDs to CEQ IDs, we can use this
66  * to track the CEQ ID and FD. */
67 struct ep_fd_data {
68         struct epoll_event                      ep_event;
69         int                                                     fd;
70         int                                                     filter;
71 };
72
73 /* Converts epoll events to FD taps. */
74 static int ep_events_to_taps(uint32_t ep_ev)
75 {
76         int taps = 0;
77         if (ep_ev & EPOLLIN)
78                 taps |= FDTAP_FILT_READABLE;
79         if (ep_ev & EPOLLOUT)
80                 taps |= FDTAP_FILT_WRITABLE;
81         if (ep_ev & EPOLLRDHUP)
82                 taps |= FDTAP_FILT_RDHUP;
83         if (ep_ev & EPOLLPRI)
84                 taps |= FDTAP_FILT_PRIORITY;
85         if (ep_ev & EPOLLERR)
86                 taps |= FDTAP_FILT_ERROR;
87         if (ep_ev & EPOLLHUP)
88                 taps |= FDTAP_FILT_HANGUP;
89         return taps;
90 }
91
92 /* Converts corresponding FD Taps to epoll events.  There are other taps that do
93  * not make sense for epoll. */
94 static uint32_t taps_to_ep_events(int taps)
95 {
96         uint32_t ep_ev = 0;
97         if (taps & FDTAP_FILT_READABLE)
98                 ep_ev |= EPOLLIN;
99         if (taps & FDTAP_FILT_WRITABLE)
100                 ep_ev |= EPOLLOUT;
101         if (taps & FDTAP_FILT_RDHUP)
102                 ep_ev |= EPOLLRDHUP;
103         if (taps & FDTAP_FILT_PRIORITY)
104                 ep_ev |= EPOLLPRI;
105         if (taps & FDTAP_FILT_ERROR)
106                 ep_ev |= EPOLLERR;
107         if (taps & FDTAP_FILT_HANGUP)
108                 ep_ev |= EPOLLHUP;
109         return ep_ev;
110 }
111
112 static unsigned int ep_get_ceq_max_ever(struct epoll_ctlr *ep)
113 {
114         return atomic_read(&ep->ceq_evq->ev_mbox->ceq.max_event_ever);
115 }
116
117 static struct ceq_event *ep_get_ceq_ev(struct epoll_ctlr *ep, size_t idx)
118 {
119         if (ep->ceq_evq->ev_mbox->ceq.nr_events <= idx)
120                 return 0;
121         return &ep->ceq_evq->ev_mbox->ceq.events[idx];
122 }
123
124 static struct epoll_ctlr *fd_to_cltr(int fd)
125 {
126         struct user_fd *ufd = ufd_lookup(fd);
127         if (!ufd)
128                 return 0;
129         if (ufd->magic != EPOLL_UFD_MAGIC) {
130                 errno = EBADF;
131                 return 0;
132         }
133         return container_of(ufd, struct epoll_ctlr, ufd);
134 }
135
136 /* Event queue helpers: */
137 static struct event_queue *ep_get_ceq_evq(unsigned int ceq_ring_sz)
138 {
139         struct event_queue *ceq_evq = get_eventq_raw();
140         ceq_evq->ev_mbox->type = EV_MBOX_CEQ;
141         ceq_init(&ceq_evq->ev_mbox->ceq, CEQ_OR, NR_FILE_DESC_MAX, ceq_ring_sz);
142         ceq_evq->ev_flags = EVENT_INDIR | EVENT_SPAM_INDIR | EVENT_WAKEUP;
143         evq_attach_wakeup_ctlr(ceq_evq);
144         return ceq_evq;
145 }
146
147 static struct event_queue *ep_get_alarm_evq(void)
148 {
149         /* Don't care about the actual message, just using it for a wakeup */
150         struct event_queue *alarm_evq = get_eventq(EV_MBOX_BITMAP);
151         alarm_evq->ev_flags = EVENT_INDIR | EVENT_SPAM_INDIR | EVENT_WAKEUP;
152         evq_attach_wakeup_ctlr(alarm_evq);
153         return alarm_evq;
154 }
155
156 /* Once we've closed our sources of events, we can try to clean up the event
157  * queues.  These are actually dangerous, since there could be INDIRs floating
158  * around for these evqs still, which are basically pointers.  We'll need to run
159  * some sort of user deferred destruction. (TODO). */
160 static void ep_put_ceq_evq(struct event_queue *ceq_evq)
161 {
162 #if 0 /* TODO: EVQ/INDIR Cleanup */
163         ceq_cleanup(&ceq_evq->ev_mbox->ceq);
164         evq_remove_wakeup_ctlr(ceq_evq);
165         put_eventq_raw(ceq_evq);
166 #endif
167 }
168
169 static void ep_put_alarm_evq(struct event_queue *alarm_evq)
170 {
171 #if 0 /* TODO: EVQ/INDIR Cleanup */
172         evq_remove_wakeup_ctlr(alarm_evq);
173         put_eventq(alarm_evq);
174 #endif
175 }
176
177 static void epoll_close(struct user_fd *ufd)
178 {
179         struct epoll_ctlr *ep = container_of(ufd, struct epoll_ctlr, ufd);
180         struct fd_tap_req *tap_reqs, *tap_req_i;
181         struct ceq_event *ceq_ev_i;
182         struct ep_fd_data *ep_fd_i;
183         int nr_tap_req = 0;
184         int nr_done = 0;
185         unsigned int max_ceq_events = ep_get_ceq_max_ever(ep);
186
187         tap_reqs = malloc(sizeof(struct fd_tap_req) * max_ceq_events);
188         memset(tap_reqs, 0, sizeof(struct fd_tap_req) * max_ceq_events);
189         /* Slightly painful, O(n) with no escape hatch */
190         for (int i = 0; i < max_ceq_events; i++) {
191                 ceq_ev_i = ep_get_ceq_ev(ep, i);
192                 /* CEQ should have been big enough for our size */
193                 assert(ceq_ev_i);
194                 ep_fd_i = (struct ep_fd_data*)ceq_ev_i->user_data;
195                 if (!ep_fd_i)
196                         continue;
197                 tap_req_i = &tap_reqs[nr_tap_req++];
198                 tap_req_i->fd = i;
199                 tap_req_i->cmd = FDTAP_CMD_REM;
200                 free(ep_fd_i);
201         }
202         /* Requests could fail if the tapped files are already closed.  We need to
203          * skip the failed one (the +1) and untap the rest. */
204         do {
205                 nr_done += sys_tap_fds(tap_reqs + nr_done, nr_tap_req - nr_done);
206                 nr_done += 1;   /* nr_done could be more than nr_tap_req now */
207         } while (nr_done < nr_tap_req);
208         free(tap_reqs);
209         ep_put_ceq_evq(ep->ceq_evq);
210         ep_put_alarm_evq(ep->alarm_evq);
211         uth_mutex_lock(ctlrs_mtx);
212         TAILQ_REMOVE(&all_ctlrs, ep, link);
213         uth_mutex_unlock(ctlrs_mtx);
214         uth_mutex_free(ep->mtx);
215         free(ep);
216 }
217
218 static int init_ep_ctlr(struct epoll_ctlr *ep, int size)
219 {
220         if (size == 1)
221                 size = 128;
222         ep->mtx = uth_mutex_alloc();
223         ep->ufd.magic = EPOLL_UFD_MAGIC;
224         ep->ufd.close = epoll_close;
225         /* Size is a hint for the CEQ concurrency.  We can actually handle as many
226          * kernel FDs as is possible. */
227         ep->ceq_evq = ep_get_ceq_evq(ROUNDUPPWR2(size));
228         ep->alarm_evq = ep_get_alarm_evq();
229         return 0;
230 }
231
232 static void epoll_fd_closed(int fd)
233 {
234         struct epoll_ctlr *ep;
235
236         /* Lockless peek, avoid locking for every close() */
237         if (TAILQ_EMPTY(&all_ctlrs))
238                 return;
239         uth_mutex_lock(ctlrs_mtx);
240         TAILQ_FOREACH(ep, &all_ctlrs, link)
241                 epoll_ctl(ep->ufd.fd, EPOLL_CTL_DEL, fd, 0);
242         uth_mutex_unlock(ctlrs_mtx);
243 }
244
245 static void epoll_init(void)
246 {
247         static struct close_cb epoll_close_cb = {.func = epoll_fd_closed};
248
249         register_close_cb(&epoll_close_cb);
250         ctlrs_mtx = uth_mutex_alloc();
251 }
252
253 int epoll_create(int size)
254 {
255         int fd;
256         struct epoll_ctlr *ep;
257
258         run_once(epoll_init());
259         /* good thing the arg is a signed int... */
260         if (size < 0) {
261                 errno = EINVAL;
262                 return -1;
263         }
264         ep = malloc(sizeof(struct epoll_ctlr));
265         memset(ep, 0, sizeof(struct epoll_ctlr));
266         if (init_ep_ctlr(ep, size)) {
267                 free(ep);
268                 return -1;
269         }
270         fd = ufd_get_fd(&ep->ufd);
271         if (fd < 0)
272                 free(ep);
273         uth_mutex_lock(ctlrs_mtx);
274         TAILQ_INSERT_TAIL(&all_ctlrs, ep, link);
275         uth_mutex_unlock(ctlrs_mtx);
276         return fd;
277 }
278
279 int epoll_create1(int flags)
280 {
281         /* TODO: we're supposed to support CLOEXEC.  Our FD is a user_fd, so that'd
282          * require some support in glibc's exec to close our epoll ctlr. */
283         return epoll_create(1);
284 }
285
286 static int __epoll_ctl_add(struct epoll_ctlr *ep, int fd,
287                            struct epoll_event *event)
288 {
289         struct ceq_event *ceq_ev;
290         struct ep_fd_data *ep_fd;
291         struct fd_tap_req tap_req = {0};
292         int ret, filter, sock_listen_fd;
293         struct epoll_event listen_event;
294
295         /* Only support ET.  Also, we just ignore EPOLLONESHOT.  That might work,
296          * logically, just with spurious events firing. */
297         if (!(event->events & EPOLLET)) {
298                 errno = EPERM;
299                 werrstr("Epoll level-triggered not supported");
300                 return -1;
301         }
302         /* The sockets-to-plan9 networking shims are a bit inconvenient.  The user
303          * asked us to epoll on an FD, but that FD is actually a Qdata FD.  We might
304          * need to actually epoll on the listen_fd.  Further, we don't know yet
305          * whether or not they want the listen FD.  They could epoll on the socket,
306          * then listen later and want to wake up on the listen.
307          *
308          * So in the case we have a socket FD, we'll actually open the listen FD
309          * regardless (glibc handles this), and we'll epoll on both FDs.
310          * Technically, either FD could fire and they'd get an epoll event for it,
311          * but I think socket users will use only listen or data.
312          *
313          * As far as tracking the FD goes for epoll_wait() reporting, if the app
314          * wants to track the FD they think we are using, then they already passed
315          * that in event->data. */
316         sock_listen_fd = _sock_lookup_listen_fd(fd);
317         if (sock_listen_fd >= 0) {
318                 listen_event.events = EPOLLET | EPOLLIN | EPOLLHUP;
319                 listen_event.data = event->data;
320                 ret = __epoll_ctl_add(ep, sock_listen_fd, &listen_event);
321                 if (ret < 0)
322                         return ret;
323         }
324         ceq_ev = ep_get_ceq_ev(ep, fd);
325         if (!ceq_ev) {
326                 errno = ENOMEM;
327                 werrstr("Epoll set cannot grow yet!");
328                 return -1;
329         }
330         ep_fd = (struct ep_fd_data*)ceq_ev->user_data;
331         if (ep_fd) {
332                 errno = EEXIST;
333                 return -1;
334         }
335         tap_req.fd = fd;
336         tap_req.cmd = FDTAP_CMD_ADD;
337         /* EPOLLHUP is implicitly set for all epolls. */
338         filter = ep_events_to_taps(event->events | EPOLLHUP);
339         tap_req.filter = filter;
340         tap_req.ev_q = ep->ceq_evq;
341         tap_req.ev_id = fd;     /* using FD as the CEQ ID */
342         ret = sys_tap_fds(&tap_req, 1);
343         if (ret != 1)
344                 return -1;
345         ep_fd = malloc(sizeof(struct ep_fd_data));
346         ep_fd->fd = fd;
347         ep_fd->filter = filter;
348         ep_fd->ep_event = *event;
349         ep_fd->ep_event.events |= EPOLLHUP;
350         ceq_ev->user_data = (uint64_t)ep_fd;
351         return 0;
352 }
353
354 static int __epoll_ctl_del(struct epoll_ctlr *ep, int fd,
355                            struct epoll_event *event)
356 {
357         struct ceq_event *ceq_ev;
358         struct ep_fd_data *ep_fd;
359         struct fd_tap_req tap_req = {0};
360         int ret, sock_listen_fd;
361
362         /* If we were dealing with a socket shim FD, we tapped both the listen and
363          * the data file and need to untap both of them. */
364         sock_listen_fd = _sock_lookup_listen_fd(fd);
365         if (sock_listen_fd >= 0) {
366                 /* It's possible to fail here.  Even though we tapped it already, if the
367                  * deletion was triggered from close callbacks, it's possible for the
368                  * sock_listen_fd to be closed first, which would have triggered an
369                  * epoll_ctl_del.  When we get around to closing the Rock FD, the listen
370                  * FD was already closed. */
371                 __epoll_ctl_del(ep, sock_listen_fd, event);
372         }
373         ceq_ev = ep_get_ceq_ev(ep, fd);
374         if (!ceq_ev) {
375                 errno = ENOENT;
376                 return -1;
377         }
378         ep_fd = (struct ep_fd_data*)ceq_ev->user_data;
379         if (!ep_fd) {
380                 errno = ENOENT;
381                 return -1;
382         }
383         assert(ep_fd->fd == fd);
384         tap_req.fd = fd;
385         tap_req.cmd = FDTAP_CMD_REM;
386         /* ignoring the return value; we could have failed to remove it if the FD
387          * has already closed and the kernel removed the tap. */
388         sys_tap_fds(&tap_req, 1);
389         ceq_ev->user_data = 0;
390         free(ep_fd);
391         return 0;
392 }
393
394 int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
395 {
396         int ret;
397         struct epoll_ctlr *ep = fd_to_cltr(epfd);
398         if (!ep) {
399                 errno = EBADF;/* or EINVAL */
400                 return -1;
401         }
402         if (fd >= USER_FD_BASE) {
403                 errno = EINVAL;
404                 werrstr("Epoll can't track User FDs");
405                 return -1;
406         }
407         uth_mutex_lock(ep->mtx);
408         switch (op) {
409                 case (EPOLL_CTL_MOD):
410                         /* In lieu of a proper MOD, just remove and readd.  The errors might
411                          * not work out well, and there could be a missed event in the
412                          * middle.  Not sure what the guarantees are, but we can fake a
413                          * poke. (TODO). */
414                         ret = __epoll_ctl_del(ep, fd, 0);
415                         if (ret)
416                                 break;
417                         ret = __epoll_ctl_add(ep, fd, event);
418                         break;
419                 case (EPOLL_CTL_ADD):
420                         ret = __epoll_ctl_add(ep, fd, event);
421                         break;
422                 case (EPOLL_CTL_DEL):
423                         ret = __epoll_ctl_del(ep, fd, event);
424                         break;
425                 default:
426                         errno = EINVAL;
427                         ret = -1;
428         }
429         uth_mutex_unlock(ep->mtx);
430         return ret;
431 }
432
433 static bool get_ep_event_from_msg(struct epoll_ctlr *ep, struct event_msg *msg,
434                                   struct epoll_event *ep_ev)
435 {
436         struct ceq_event *ceq_ev;
437         struct ep_fd_data *ep_fd;
438
439         ceq_ev = ep_get_ceq_ev(ep, msg->ev_type);
440         /* should never get a tap FD > size of the epoll set */
441         assert(ceq_ev);
442         ep_fd = (struct ep_fd_data*)ceq_ev->user_data;
443         if (!ep_fd) {
444                 /* it's possible the FD was unregistered and this was an old
445                  * event sent to this epoll set. */
446                 return FALSE;
447         }
448         ep_ev->data = ep_fd->ep_event.data;
449         /* The events field was initialized to 0 in epoll_wait() */
450         ep_ev->events |= taps_to_ep_events(msg->ev_arg2);
451         return TRUE;
452 }
453
454 /* Helper: extracts as many epoll_events as possible from the ep. */
455 static int __epoll_wait_poll(struct epoll_ctlr *ep, struct epoll_event *events,
456                              int maxevents)
457 {
458         struct event_msg msg = {0};
459         int nr_ret = 0;
460
461         if (maxevents <= 0)
462                 return 0;
463         /* Locking to protect get_ep_event_from_msg, specifically that the ep_fd
464          * stored at ceq_ev->user_data does not get concurrently removed and
465          * freed. */
466         uth_mutex_lock(ep->mtx);
467         for (int i = 0; i < maxevents; i++) {
468 retry:
469                 if (!uth_check_evqs(&msg, NULL, 1, ep->ceq_evq))
470                         break;
471                 if (!get_ep_event_from_msg(ep, &msg, &events[i]))
472                         goto retry;
473                 nr_ret++;
474         }
475         uth_mutex_unlock(ep->mtx);
476         return nr_ret;
477 }
478
479 /* We should be able to have multiple waiters.  ep shouldn't be closed or
480  * anything, since we have the FD (that'd be bad programming on the user's
481  * behalf).  We could have concurrent ADD/MOD/DEL operations (which lock). */
482 static int __epoll_wait(struct epoll_ctlr *ep, struct epoll_event *events,
483                         int maxevents, int timeout)
484 {
485         struct event_msg msg = {0};
486         struct event_msg dummy_msg;
487         struct event_queue *which_evq;
488         int nr_ret;
489         struct syscall sysc;
490
491         nr_ret = __epoll_wait_poll(ep, events, maxevents);
492         if (nr_ret)
493                 return nr_ret;
494         if (timeout == 0)
495                 return 0;
496         /* From here on down, we're going to block until there is some activity */
497         if (timeout != -1) {
498                 syscall_async_evq(&sysc, ep->alarm_evq, SYS_block, timeout * 1000);
499                 uth_blockon_evqs(&msg, &which_evq, 2, ep->ceq_evq, ep->alarm_evq);
500                 if (which_evq == ep->alarm_evq)
501                         return 0;
502                 /* The alarm sysc may or may not have finished yet.  This will force it
503                  * to *start* to finish iff it is still a submitted syscall. */
504                 sys_abort_sysc(&sysc);
505                 /* But we still need to wait until the syscall completed.  Need a
506                  * dummy msg, since we don't want to clobber the real msg. */
507                 uth_blockon_evqs(&dummy_msg, 0, 1, ep->alarm_evq);
508         } else {
509                 uth_blockon_evqs(&msg, &which_evq, 1, ep->ceq_evq);
510         }
511         uth_mutex_lock(ep->mtx);
512         if (get_ep_event_from_msg(ep, &msg, &events[0]))
513                 nr_ret = 1;
514         uth_mutex_unlock(ep->mtx);
515         /* We had to extract one message already as part of the blocking process.
516          * We might be able to get more. */
517         nr_ret += __epoll_wait_poll(ep, events + nr_ret, maxevents - nr_ret);
518         /* This is a little nasty and hopefully a rare race.  We still might not
519          * have a ret, but we expected to block until we had something.  We didn't
520          * time out yet, but we spuriously woke up.  We need to try again (ideally,
521          * we'd subtract the time left from the original timeout). */
522         if (!nr_ret)
523                 return __epoll_wait(ep, events, maxevents, timeout);
524         return nr_ret;
525 }
526
527 int epoll_wait(int epfd, struct epoll_event *events, int maxevents,
528                int timeout)
529 {
530         struct epoll_ctlr *ep = fd_to_cltr(epfd);
531
532         if (!ep) {
533                 errno = EBADF;/* or EINVAL */
534                 return -1;
535         }
536         if (maxevents <= 0) {
537                 errno = EINVAL;
538                 return -1;
539         }
540         for (int i = 0; i < maxevents; i++)
541                 events[i].events = 0;
542         return __epoll_wait(ep, events, maxevents, timeout);
543 }
544
545 int epoll_pwait(int epfd, struct epoll_event *events, int maxevents,
546                 int timeout, const sigset_t *sigmask)
547 {
548         int ready;
549         sigset_t origmask;
550         /* TODO: this is probably racy */
551         sigprocmask(SIG_SETMASK, sigmask, &origmask);
552         ready = epoll_wait(epfd, events, maxevents, timeout);
553         sigprocmask(SIG_SETMASK, &origmask, NULL);
554         return ready;
555 }