Provide select() on top of epoll()
[akaros.git] / user / iplib / epoll.c
1 /* Copyright (c) 2015 Google Inc.
2  * Barret Rhoden <brho@cs.berkeley.edu>
3  * See LICENSE for details.
4  *
5  * Epoll, built on FD taps, CEQs, and blocking uthreads on event queues.
6  *
7  * TODO: There are a few incompatibilities with Linux's epoll, some of which are
8  * artifacts of the implementation, and other issues:
9  *      - you can't epoll on an epoll fd (or any user fd).  you can only epoll on a
10  *      kernel FD that accepts your FD taps.
11  *      - there's no EPOLLONESHOT or level-triggered support.
12  *      - you can only tap one FD at a time, so you can't add the same FD to
13  *      multiple epoll sets.
14  *      - there is no support for growing the epoll set.
15  *      - closing the epoll is a little dangerous, if there are outstanding INDIR
16  *      events.  this will only pop up if you're yielding cores, maybe getting
17  *      preempted, and are unlucky.
18  *      - epoll_create1 does not support CLOEXEC.  That'd need some work in glibc's
19  *      exec and flags in struct user_fd.
20  *      - EPOLL_CTL_MOD is just a DEL then an ADD.  There might be races associated
21  *      with that.
22  *      - epoll_pwait is probably racy.
23  *      - You can't dup an epoll fd (same as other user FDs).
24  *      - If you add a BSD socket FD to an epoll set before calling listen(), you'll
25  *      only epoll on the data (which is inactive) instead of on the accept().
26  *      - If you add the same BSD socket listener to multiple epoll sets, you will
27  *      likely fail.  This is in addition to being able to tap only one FD at a
28  *      time.
29  * */
30
31 #include <sys/epoll.h>
32 #include <parlib/parlib.h>
33 #include <parlib/event.h>
34 #include <parlib/ceq.h>
35 #include <parlib/uthread.h>
36 #include <parlib/timing.h>
37 #include <sys/user_fd.h>
38 #include <sys/close_cb.h>
39 #include <stdio.h>
40 #include <errno.h>
41 #include <unistd.h>
42 #include <malloc.h>
43 #include <sys/queue.h>
44
45 /* Sanity check, so we can ID our own FDs */
46 #define EPOLL_UFD_MAGIC                 0xe9011
47
48 struct epoll_ctlr {
49         TAILQ_ENTRY(epoll_ctlr)         link;
50         struct event_queue                      *ceq_evq;
51         struct ceq                                      *ceq;   /* convenience pointer */
52         unsigned int                            size;
53         uth_mutex_t                                     mtx;
54         struct user_fd                          ufd;
55 };
56
57 TAILQ_HEAD(epoll_ctlrs, epoll_ctlr);
58 static struct epoll_ctlrs all_ctlrs = TAILQ_HEAD_INITIALIZER(all_ctlrs);
59 static uth_mutex_t ctlrs_mtx;
60
61 /* There's some bookkeeping we need to maintain on every FD.  Right now, the FD
62  * is the index into the CEQ event array, so we can just hook this into the user
63  * data blob in the ceq_event.
64  *
65  * If we ever do not maintain a 1:1 mapping from FDs to CEQ IDs, we can use this
66  * to track the CEQ ID and FD. */
67 struct ep_fd_data {
68         struct epoll_event                      ep_event;
69         int                                                     fd;
70         int                                                     filter;
71 };
72
73 /* Converts epoll events to FD taps. */
74 static int ep_events_to_taps(uint32_t ep_ev)
75 {
76         int taps = 0;
77         if (ep_ev & EPOLLIN)
78                 taps |= FDTAP_FILT_READABLE;
79         if (ep_ev & EPOLLOUT)
80                 taps |= FDTAP_FILT_WRITABLE;
81         if (ep_ev & EPOLLRDHUP)
82                 taps |= FDTAP_FILT_RDHUP;
83         if (ep_ev & EPOLLPRI)
84                 taps |= FDTAP_FILT_PRIORITY;
85         if (ep_ev & EPOLLERR)
86                 taps |= FDTAP_FILT_ERROR;
87         if (ep_ev & EPOLLHUP)
88                 taps |= FDTAP_FILT_HANGUP;
89         return taps;
90 }
91
92 /* Converts corresponding FD Taps to epoll events.  There are other taps that do
93  * not make sense for epoll. */
94 static uint32_t taps_to_ep_events(int taps)
95 {
96         uint32_t ep_ev = 0;
97         if (taps & FDTAP_FILT_READABLE)
98                 ep_ev |= EPOLLIN;
99         if (taps & FDTAP_FILT_WRITABLE)
100                 ep_ev |= EPOLLOUT;
101         if (taps & FDTAP_FILT_RDHUP)
102                 ep_ev |= EPOLLRDHUP;
103         if (taps & FDTAP_FILT_PRIORITY)
104                 ep_ev |= EPOLLPRI;
105         if (taps & FDTAP_FILT_ERROR)
106                 ep_ev |= EPOLLERR;
107         if (taps & FDTAP_FILT_HANGUP)
108                 ep_ev |= EPOLLHUP;
109         return ep_ev;
110 }
111
112 static struct ceq_event *ep_get_ceq_ev(struct epoll_ctlr *ep, size_t idx)
113 {
114         if (ep->ceq_evq->ev_mbox->ceq.nr_events <= idx)
115                 return 0;
116         return &ep->ceq_evq->ev_mbox->ceq.events[idx];
117 }
118
119 static struct epoll_ctlr *fd_to_cltr(int fd)
120 {
121         struct user_fd *ufd = ufd_lookup(fd);
122         if (!ufd)
123                 return 0;
124         if (ufd->magic != EPOLL_UFD_MAGIC) {
125                 errno = EBADF;
126                 return 0;
127         }
128         return container_of(ufd, struct epoll_ctlr, ufd);
129 }
130
131 /* Event queue helpers: */
132 static struct event_queue *ep_get_ceq_evq(unsigned int ceq_size)
133 {
134         struct event_queue *ceq_evq = get_eventq_raw();
135         ceq_evq->ev_mbox->type = EV_MBOX_CEQ;
136         ceq_init(&ceq_evq->ev_mbox->ceq, CEQ_OR, ceq_size, ceq_size);
137         ceq_evq->ev_flags = EVENT_INDIR | EVENT_SPAM_INDIR | EVENT_WAKEUP;
138         evq_attach_wakeup_ctlr(ceq_evq);
139         return ceq_evq;
140 }
141
142 static struct event_queue *ep_get_alarm_evq(void)
143 {
144         /* Don't care about the actual message, just using it for a wakeup */
145         struct event_queue *alarm_evq = get_eventq(EV_MBOX_BITMAP);
146         alarm_evq->ev_flags = EVENT_INDIR | EVENT_SPAM_INDIR | EVENT_WAKEUP;
147         evq_attach_wakeup_ctlr(alarm_evq);
148         return alarm_evq;
149 }
150
151 /* Once we've closed our sources of events, we can try to clean up the event
152  * queues.  These are actually dangerous, since there could be INDIRs floating
153  * around for these evqs still, which are basically pointers.  We'll need to run
154  * some sort of user deferred destruction. (TODO). */
155 static void ep_put_ceq_evq(struct event_queue *ceq_evq)
156 {
157 #if 0 /* TODO: EVQ/INDIR Cleanup */
158         ceq_cleanup(&ceq_evq->ev_mbox->ceq);
159         evq_remove_wakeup_ctlr(ceq_evq);
160         put_eventq_raw(ceq_evq);
161 #endif
162 }
163
164 static void ep_put_alarm_evq(struct event_queue *alarm_evq)
165 {
166 #if 0 /* TODO: EVQ/INDIR Cleanup */
167         evq_remove_wakeup_ctlr(alarm_evq);
168         put_eventq(alarm_evq);
169 #endif
170 }
171
172 static void epoll_close(struct user_fd *ufd)
173 {
174         struct epoll_ctlr *ep = container_of(ufd, struct epoll_ctlr, ufd);
175         struct fd_tap_req *tap_reqs, *tap_req_i;
176         struct ceq_event *ceq_ev_i;
177         struct ep_fd_data *ep_fd_i;
178         int nr_tap_req = 0;
179         int nr_done = 0;
180
181         tap_reqs = malloc(sizeof(struct fd_tap_req) * ep->size);
182         memset(tap_reqs, 0, sizeof(struct fd_tap_req) * ep->size);
183         /* Slightly painful, O(n) with no escape hatch */
184         for (int i = 0; i < ep->size; i++) {
185                 ceq_ev_i = ep_get_ceq_ev(ep, i);
186                 /* CEQ should have been big enough for our size */
187                 assert(ceq_ev_i);
188                 ep_fd_i = (struct ep_fd_data*)ceq_ev_i->user_data;
189                 if (!ep_fd_i)
190                         continue;
191                 tap_req_i = &tap_reqs[nr_tap_req++];
192                 tap_req_i->fd = i;
193                 tap_req_i->cmd = FDTAP_CMD_REM;
194                 free(ep_fd_i);
195         }
196         /* Requests could fail if the tapped files are already closed.  We need to
197          * skip the failed one (the +1) and untap the rest. */
198         do {
199                 nr_done += sys_tap_fds(tap_reqs + nr_done, nr_tap_req - nr_done);
200                 nr_done += 1;   /* nr_done could be more than nr_tap_req now */
201         } while (nr_done < nr_tap_req);
202         free(tap_reqs);
203         ep_put_ceq_evq(ep->ceq_evq);
204         uth_mutex_lock(ctlrs_mtx);
205         TAILQ_REMOVE(&all_ctlrs, ep, link);
206         uth_mutex_unlock(ctlrs_mtx);
207         uth_mutex_free(ep->mtx);
208         free(ep);
209 }
210
211 static int init_ep_ctlr(struct epoll_ctlr *ep, int size)
212 {
213         unsigned int ceq_size = ROUNDUPPWR2(size);
214         /* TODO: we don't grow yet.  Until then, we help out a little. */
215         if (size == 1)
216                 size = 128;
217         ep->size = ceq_size;
218         ep->mtx = uth_mutex_alloc();
219         ep->ufd.magic = EPOLL_UFD_MAGIC;
220         ep->ufd.close = epoll_close;
221         ep->ceq_evq = ep_get_ceq_evq(ceq_size);
222         return 0;
223 }
224
225 static void epoll_fd_closed(int fd)
226 {
227         struct epoll_ctlr *ep;
228
229         /* Lockless peek, avoid locking for every close() */
230         if (TAILQ_EMPTY(&all_ctlrs))
231                 return;
232         uth_mutex_lock(ctlrs_mtx);
233         TAILQ_FOREACH(ep, &all_ctlrs, link)
234                 epoll_ctl(ep->ufd.fd, EPOLL_CTL_DEL, fd, 0);
235         uth_mutex_unlock(ctlrs_mtx);
236 }
237
238 static void epoll_init(void)
239 {
240         static struct close_cb epoll_close_cb = {.func = epoll_fd_closed};
241
242         register_close_cb(&epoll_close_cb);
243         ctlrs_mtx = uth_mutex_alloc();
244 }
245
246 int epoll_create(int size)
247 {
248         int fd;
249         struct epoll_ctlr *ep;
250
251         run_once(epoll_init());
252         /* good thing the arg is a signed int... */
253         if (size < 0) {
254                 errno = EINVAL;
255                 return -1;
256         }
257         ep = malloc(sizeof(struct epoll_ctlr));
258         memset(ep, 0, sizeof(struct epoll_ctlr));
259         if (init_ep_ctlr(ep, size)) {
260                 free(ep);
261                 return -1;
262         }
263         fd = ufd_get_fd(&ep->ufd);
264         if (fd < 0)
265                 free(ep);
266         uth_mutex_lock(ctlrs_mtx);
267         TAILQ_INSERT_TAIL(&all_ctlrs, ep, link);
268         uth_mutex_unlock(ctlrs_mtx);
269         return fd;
270 }
271
272 int epoll_create1(int flags)
273 {
274         /* TODO: we're supposed to support CLOEXEC.  Our FD is a user_fd, so that'd
275          * require some support in glibc's exec to close our epoll ctlr. */
276         return epoll_create(1);
277 }
278
279 static int __epoll_ctl_add(struct epoll_ctlr *ep, int fd,
280                            struct epoll_event *event)
281 {
282         struct ceq_event *ceq_ev;
283         struct ep_fd_data *ep_fd;
284         struct fd_tap_req tap_req = {0};
285         int ret, filter, sock_listen_fd;
286
287         /* Only support ET.  Also, we just ignore EPOLLONESHOT.  That might work,
288          * logically, just with spurious events firing. */
289         if (!(event->events & EPOLLET)) {
290                 errno = EPERM;
291                 werrstr("Epoll level-triggered not supported");
292                 return -1;
293         }
294         /* The sockets-to-plan9 networking shims are a bit inconvenient.  The user
295          * asked us to epoll on an FD, but that FD is actually a Qdata FD.  We need
296          * to actually epoll on the listen_fd.
297          *
298          * As far as tracking the FD goes for epoll_wait() reporting, if the app
299          * wants to track the FD they think we are using, then they already passed
300          * that in event->data. */
301         extern int _sock_lookup_listen_fd(int sock_fd); /* in glibc */
302
303         sock_listen_fd = _sock_lookup_listen_fd(fd);
304         if (sock_listen_fd >= 0)
305                 fd = sock_listen_fd;
306         ceq_ev = ep_get_ceq_ev(ep, fd);
307         if (!ceq_ev) {
308                 errno = ENOMEM;
309                 werrstr("Epoll set cannot grow yet!");
310                 return -1;
311         }
312         ep_fd = (struct ep_fd_data*)ceq_ev->user_data;
313         if (ep_fd) {
314                 errno = EEXIST;
315                 return -1;
316         }
317         tap_req.fd = fd;
318         tap_req.cmd = FDTAP_CMD_ADD;
319         /* EPOLLHUP is implicitly set for all epolls. */
320         filter = ep_events_to_taps(event->events | EPOLLHUP);
321         tap_req.filter = filter;
322         tap_req.ev_q = ep->ceq_evq;
323         tap_req.ev_id = fd;     /* using FD as the CEQ ID */
324         ret = sys_tap_fds(&tap_req, 1);
325         if (ret != 1)
326                 return -1;
327         ep_fd = malloc(sizeof(struct ep_fd_data));
328         ep_fd->fd = fd;
329         ep_fd->filter = filter;
330         ep_fd->ep_event = *event;
331         ep_fd->ep_event.events |= EPOLLHUP;
332         ceq_ev->user_data = (uint64_t)ep_fd;
333         return 0;
334 }
335
336 static int __epoll_ctl_del(struct epoll_ctlr *ep, int fd,
337                            struct epoll_event *event)
338 {
339         struct ceq_event *ceq_ev;
340         struct ep_fd_data *ep_fd;
341         struct fd_tap_req tap_req = {0};
342         int ret, sock_listen_fd;
343
344         /* They could be asking to clear an epoll for a listener.  We need to remove
345          * the tap for the real FD we tapped */
346         extern int _sock_lookup_listen_fd(int sock_fd); /* in glibc */
347         sock_listen_fd = _sock_lookup_listen_fd(fd);
348         if (sock_listen_fd >= 0)
349                 fd = sock_listen_fd;
350         ceq_ev = ep_get_ceq_ev(ep, fd);
351         if (!ceq_ev) {
352                 errno = ENOENT;
353                 return -1;
354         }
355         ep_fd = (struct ep_fd_data*)ceq_ev->user_data;
356         if (!ep_fd) {
357                 errno = ENOENT;
358                 return -1;
359         }
360         assert(ep_fd->fd == fd);
361         tap_req.fd = fd;
362         tap_req.cmd = FDTAP_CMD_REM;
363         /* ignoring the return value; we could have failed to remove it if the FD
364          * has already closed and the kernel removed the tap. */
365         sys_tap_fds(&tap_req, 1);
366         ceq_ev->user_data = 0;
367         free(ep_fd);
368         return 0;
369 }
370
371 int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
372 {
373         int ret;
374         struct epoll_ctlr *ep = fd_to_cltr(epfd);
375         if (!ep) {
376                 errno = EBADF;/* or EINVAL */
377                 return -1;
378         }
379         if (fd >= USER_FD_BASE) {
380                 errno = EINVAL;
381                 werrstr("Epoll can't track User FDs");
382                 return -1;
383         }
384         uth_mutex_lock(ep->mtx);
385         switch (op) {
386                 case (EPOLL_CTL_MOD):
387                         /* In lieu of a proper MOD, just remove and readd.  The errors might
388                          * not work out well, and there could be a missed event in the
389                          * middle.  Not sure what the guarantees are, but we can fake a
390                          * poke. (TODO). */
391                         ret = __epoll_ctl_del(ep, fd, 0);
392                         if (ret)
393                                 break;
394                         ret = __epoll_ctl_add(ep, fd, event);
395                         break;
396                 case (EPOLL_CTL_ADD):
397                         ret = __epoll_ctl_add(ep, fd, event);
398                         break;
399                 case (EPOLL_CTL_DEL):
400                         ret = __epoll_ctl_del(ep, fd, event);
401                         break;
402                 default:
403                         errno = EINVAL;
404                         ret = -1;
405         }
406         uth_mutex_unlock(ep->mtx);
407         return ret;
408 }
409
410 static bool get_ep_event_from_msg(struct epoll_ctlr *ep, struct event_msg *msg,
411                                   struct epoll_event *ep_ev)
412 {
413         struct ceq_event *ceq_ev;
414         struct ep_fd_data *ep_fd;
415
416         ceq_ev = ep_get_ceq_ev(ep, msg->ev_type);
417         /* should never get a tap FD > size of the epoll set */
418         assert(ceq_ev);
419         ep_fd = (struct ep_fd_data*)ceq_ev->user_data;
420         if (!ep_fd) {
421                 /* it's possible the FD was unregistered and this was an old
422                  * event sent to this epoll set. */
423                 return FALSE;
424         }
425         ep_ev->data = ep_fd->ep_event.data;
426         ep_ev->events = taps_to_ep_events(msg->ev_arg2);
427         return TRUE;
428 }
429
430 /* We should be able to have multiple waiters.  ep shouldn't be closed or
431  * anything, since we have the FD (that'd be bad programming on the user's
432  * behalf).  We could have concurrent ADD/MOD/DEL operations (which lock). */
433 static int __epoll_wait(struct epoll_ctlr *ep, struct epoll_event *events,
434                         int maxevents, int timeout)
435 {
436         struct event_msg msg = {0};
437         struct event_msg dummy_msg;
438         struct event_queue *which_evq;
439         struct event_queue *alarm_evq;
440         int nr_ret = 0;
441         int recurse_ret;
442         struct syscall sysc;
443
444         /* Locking to protect get_ep_event_from_msg, specifically that the ep_fd
445          * stored at ceq_ev->user_data does not get concurrently removed and
446          * freed. */
447         uth_mutex_lock(ep->mtx);
448         for (int i = 0; i < maxevents; i++) {
449                 if (uth_check_evqs(&msg, &which_evq, 1, ep->ceq_evq)) {
450                         if (get_ep_event_from_msg(ep, &msg, &events[i]))
451                                 nr_ret++;
452                 }
453         }
454         uth_mutex_unlock(ep->mtx);
455         if (nr_ret)
456                 return nr_ret;
457         if (timeout == 0)
458                 return 0;
459         if (timeout != -1) {
460                 alarm_evq = ep_get_alarm_evq();
461                 syscall_async(&sysc, SYS_block, timeout * 1000);
462                 if (!register_evq(&sysc, alarm_evq)) {
463                         /* timeout occurred before we could even block! */
464                         ep_put_alarm_evq(alarm_evq);
465                         return 0;
466                 }
467                 uth_blockon_evqs(&msg, &which_evq, 2, ep->ceq_evq, alarm_evq);
468                 if (which_evq != alarm_evq) {
469                         /* sysc may or may not have finished yet.  this will force it to
470                          * *start* to finish iff it is still a submitted syscall. */
471                         sys_abort_sysc(&sysc);
472                         /* But we still need to wait until the syscall completed.  Need a
473                          * dummy msg, since we don't want to clobber the real msg. */
474                         uth_blockon_evqs(&dummy_msg, 0, 1, alarm_evq);
475                 }
476                 /* TODO: Slightly dangerous, due to spammed INDIRs */
477                 ep_put_alarm_evq(alarm_evq);
478                 if (which_evq == alarm_evq)
479                         return 0;
480         } else {
481                 uth_blockon_evqs(&msg, &which_evq, 1, ep->ceq_evq);
482         }
483         uth_mutex_lock(ep->mtx);
484         if (get_ep_event_from_msg(ep, &msg, &events[0]))
485                 nr_ret++;
486         uth_mutex_unlock(ep->mtx);
487         /* We might not have gotten one yet.  And regardless, there might be more
488          * available.  Let's try again, with timeout == 0 to ensure no blocking.  We
489          * use nr_ret (0 or 1 now) to adjust maxevents and events accordingly. */
490         recurse_ret = __epoll_wait(ep, events + nr_ret, maxevents - nr_ret, 0);
491         if (recurse_ret > 0)
492                 nr_ret += recurse_ret;
493         return nr_ret;
494 }
495
496 int epoll_wait(int epfd, struct epoll_event *events, int maxevents,
497                int timeout)
498 {
499         struct epoll_ctlr *ep = fd_to_cltr(epfd);
500         int ret;
501         if (!ep) {
502                 errno = EBADF;/* or EINVAL */
503                 return -1;
504         }
505         if (maxevents <= 0) {
506                 errno = EINVAL;
507                 return -1;
508         }
509         ret = __epoll_wait(ep, events, maxevents, timeout);
510         return ret;
511 }
512
513 int epoll_pwait(int epfd, struct epoll_event *events, int maxevents,
514                 int timeout, const sigset_t *sigmask)
515 {
516         int ready;
517         sigset_t origmask;
518         /* TODO: this is probably racy */
519         sigprocmask(SIG_SETMASK, sigmask, &origmask);
520         ready = epoll_wait(epfd, events, maxevents, timeout);
521         sigprocmask(SIG_SETMASK, &origmask, NULL);
522         return ready;
523 }