Provide select() on top of epoll()
authorBarret Rhoden <brho@cs.berkeley.edu>
Wed, 6 Jan 2016 16:30:50 +0000 (11:30 -0500)
committerBarret Rhoden <brho@cs.berkeley.edu>
Thu, 14 Jan 2016 21:04:46 +0000 (16:04 -0500)
Our version of select will spuriously return, so your applications need
to handle being told an FD is ready even if it isn't.  If your operation
needs to not block, then use O_NONBLOCK.  This is somewhat similar to
Linux, where the man pages says:

Under Linux, select() may report a socket file descriptor as "ready
for reading", while nevertheless a subsequent read blocks.

Except in Akaros, this happens all the time.

For more caveats, see select.c.

Signed-off-by: Barret Rhoden <brho@cs.berkeley.edu>
tests/select_server.c [new file with mode: 0644]
user/iplib/select.c [new file with mode: 0644]

diff --git a/tests/select_server.c b/tests/select_server.c
new file mode 100644 (file)
index 0000000..c03424d
--- /dev/null
@@ -0,0 +1,224 @@
+/* Copyright (c) 2014 The Regents of the University of California
+ * Copyright (c) 2015 Google, Inc.
+ * Barret Rhoden <brho@cs.berkeley.edu>
+ * See LICENSE for details.
+ *
+ * Echo server using select, runs on port 23.  Used for debugging select.  Based
+ * on epoll_server.
+ *
+ * If you want to build the BSD sockets version, you need to comment out the
+ * #define for PLAN9NET. */
+
+/* Comment this out for BSD sockets */
+//#define PLAN9NET
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <parlib/parlib.h>
+#include <unistd.h>
+#include <parlib/event.h>
+#include <benchutil/measure.h>
+#include <parlib/uthread.h>
+#include <parlib/timing.h>
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+#include <sys/select.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#ifdef PLAN9NET
+
+#include <iplib/iplib.h>
+
+#else
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+
+#endif
+
+int main(void)
+{
+       int ret;
+       int afd, dfd, lcfd, listen_fd;
+       char adir[40], ldir[40];
+       int n;
+       char buf[256];
+
+       /* We'll use this to see if we actually did a select instead of blocking
+        * calls.  It's not 100%, but with a human on the other end, it should be
+        * fine. */
+       bool has_selected = FALSE;
+
+#ifdef PLAN9NET
+       printf("Using Plan 9's networking stack\n");
+       /* This clones a conversation (opens /net/tcp/clone), then reads the cloned
+        * fd (which is the ctl) to givure out the conv number (the line), then
+        * writes "announce [addr]" into ctl.  This "announce" command often has a
+        * "bind" in it too.  plan9 bind just sets the local addr/port.  TCP
+        * announce also does this.  Returns the ctlfd. */
+       afd = announce9("tcp!*!23", adir, O_NONBLOCK);
+
+       if (afd < 0) {
+               perror("Announce failure");
+               return -1;
+       }
+       printf("Announced on line %s\n", adir);
+#else
+       printf("Using the BSD socket shims over Plan 9's networking stack\n");
+       int srv_socket;
+       struct sockaddr_in dest, srv = {0};
+
+       srv.sin_family = AF_INET;
+       srv.sin_addr.s_addr = htonl(INADDR_ANY);
+       srv.sin_port = htons(23);
+       socklen_t socksize = sizeof(struct sockaddr_in);
+
+       /* Equiv to cloning a converstation in plan 9.  The shim returns the data FD
+        * for the conversation. */
+       srv_socket = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
+       if (srv_socket < 0) {
+               perror("Socket failure");
+               return -1;
+       }
+       /* bind + listen is equiv to announce() in plan 9.  Note that the "bind"
+        * command is used, unlike in the plan9 announce. */
+       /* Binds our socket to the given addr/port in srv. */
+       ret = bind(srv_socket, (struct sockaddr*)&srv, sizeof(struct sockaddr_in));
+       if (ret < 0) {
+               perror("Bind failure");
+               return -1;
+       }
+       /* marks the socket as a listener/server */
+       ret = listen(srv_socket, 1);
+       if (ret < 0) {
+               perror("Listen failure");
+               return -1;
+       }
+       printf("Listened on port %d\n", ntohs(srv.sin_port));
+#endif
+
+       /* at this point, the server has done all the prep necessary to be able to
+        * sleep/block/wait on an incoming connection. */
+       fd_set rfds;
+
+#ifdef PLAN9NET
+
+       snprintf(buf, sizeof(buf), "%s/listen", adir);
+       listen_fd = open(buf, O_PATH);
+       if (listen_fd < 0) {
+               perror("listen fd");
+               return -1;
+       }
+       FD_ZERO(&rfds);
+       FD_SET(listen_fd, &rfds);
+       has_selected = FALSE;
+       while (1) {
+               /* Opens the conversation's listen file.  This blocks til someone
+                * connects.  When they do, a new conversation is created, and that open
+                * returned an FD for the new conv's ctl.  listen() reads that to find
+                * out the conv number (the line) for this new conv.  listen() returns
+                * the ctl for this new conv.
+                *
+                * Non-block is for the new connection.  Not the act of listening. */
+               lcfd = listen9(adir, ldir, O_NONBLOCK);
+               if (lcfd >= 0)
+                       break;
+               if (errno != EAGAIN) {
+                       perror("Listen failure");
+                       return -1;
+               }
+               if (select(listen_fd + 1, &rfds, 0, 0, 0) < 0) {
+                       perror("select");
+                       return -1;
+               }
+               has_selected = TRUE;
+               assert(FD_ISSET(listen_fd, &rfds));
+       }
+       printf("Listened and got line %s\n", ldir);
+       assert(has_selected);
+       /* No longer need listen_fd. */
+       close(listen_fd);
+       /* Writes "accept [NUM]" into the ctlfd, then opens the conv's data file and
+        * returns that fd.  Writing "accept" is a noop for most of our protocols.
+        * */
+       dfd = accept9(lcfd, ldir);
+       if (dfd < 0) {
+               perror("Accept failure");
+               return -1;
+       }
+
+#else
+
+       FD_ZERO(&rfds);
+       FD_SET(srv_socket, &rfds);
+       has_selected = FALSE;
+       while (1) {
+               /* returns an FD for a new socket. */
+               dfd = accept(srv_socket, (struct sockaddr*)&dest, &socksize);
+               if (dfd >= 0)
+                       break;
+               if (errno != EAGAIN) {
+                       perror("Accept failure");
+                       return -1;
+               }
+               if (select(srv_socket + 1, &rfds, 0, 0, 0) < 0) {
+                       perror("select");
+                       return -1;
+               }
+               has_selected = TRUE;
+               assert(FD_ISSET(srv_socket, &rfds));
+       }
+       printf("Accepted and got dfd %d\n", dfd);
+       assert(has_selected);
+       /* In lieu of accept4, we set the new socket's nonblock status manually */
+       ret = fcntl(dfd, F_SETFL, O_NONBLOCK);
+       if (ret < 0) {
+               perror("setfl dfd");
+               exit(-1);
+       }
+
+#endif
+
+       FD_SET(dfd, &rfds);
+       /* echo until EOF */
+       has_selected = FALSE;
+       printf("Server read: ");
+       while (1) {
+               while ((n = read(dfd, buf, sizeof(buf))) > 0) {
+                       for (int i = 0; i < n; i++)
+                               printf("%c", buf[i]);
+                       fflush(stdout);
+                       /* Should select on this direction too. */
+                       if (write(dfd, buf, n) < 0) {
+                               perror("writing");
+                               exit(-1);
+                       }
+               }
+               if (n == 0)
+                       break;
+               if (select(dfd + 1, &rfds, 0, 0, 0) < 0) {
+                       perror("select 2");
+                       exit(-1);
+               }
+               has_selected = TRUE;
+               assert(FD_ISSET(dfd, &rfds));
+               /* you might get a HUP, but keep on reading! */
+       }
+       assert(has_selected);
+
+#ifdef PLAN9NET
+       close(dfd);             /* data fd for the new conv, from listen */
+       close(lcfd);    /* ctl fd for the new conv, from listen */
+       close(afd);             /* ctl fd for the listening conv */
+#else
+       close(dfd);             /* new connection socket, from accept */
+       close(srv_socket);
+#endif
+}
diff --git a/user/iplib/select.c b/user/iplib/select.c
new file mode 100644 (file)
index 0000000..2a5c52f
--- /dev/null
@@ -0,0 +1,207 @@
+/* Copyright (c) 2016 Google Inc.
+ * Barret Rhoden <brho@cs.berkeley.edu>
+ * See LICENSE for details.
+ *
+ * select()
+ *
+ * Our select() is super spurious and will only work with apps that use
+ * non-blocking I/O.
+ *
+ * Under the hood, our select() is implemented with epoll (and under that, FD
+ * taps).  Those can only detect edges (e.g. a socket becomes readable).
+ *
+ * The problem is that we want to detect a level status (e.g. socket is
+ * readable) with an edge event (e.g. socket *becomes* readable).  To do this,
+ * when someone initially selects, the FD gets tracked with epoll and we
+ * immediately return saying the FD is ready for whatever they asked for.  This
+ * is usually not true, and the application will need to poll all of its FDs
+ * once after the initial select() call.  Subsequent selects() will still be
+ * tracking the FD in the epoll set.  If any edge events that come after the
+ * poll (which eventually returns EAGAIN) will be caught by epoll, and a
+ * subsequent select will wake up (or never block in the first place) due to the
+ * reception of that edge event.
+ *
+ * We maintain one FD set per program.  It tracks *any* FD being tracked by
+ * *any* select call.  Regardless of whether the user asked for
+ * read/write/except, the FD gets watched for anything until it closes.  This
+ * will result in spurious wakeups.
+ *
+ * One issue with the global FD set is that one thread may consume the epoll
+ * events intended for another thread.  To get around this, only one thread is
+ * the actual epoller, and the others block on a mutex.  An alternative is to
+ * use a per-thread FD set, using TLS, but not every 2LS uses TLS, and
+ * performance is not a concern for code using select().
+ *
+ * Notes:
+ * - pselect might be racy
+ * - if the user has no read/write/except sets, we won't wait.  some users of
+ *   select use it as a timer only.  if that comes up, we can expand this.
+ * - if you epoll or FD tap an FD, then try to use select on it, you'll get an
+ *   error (only one tap per FD).  select() only knows about the FDs in its set.
+ */
+
+#include <sys/select.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include <ros/common.h>
+#include <parlib/uthread.h>
+#include <sys/close_cb.h>
+#include <sys/epoll.h>
+#include <malloc.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <signal.h>
+
+static int epoll_fd;
+static fd_set all_fds;
+static uth_mutex_t fdset_mtx;
+static struct uthread *owner;
+static uth_mutex_t sleep_mtx;
+
+static bool fd_is_set(unsigned int fd, fd_set *set)
+{
+       if (fd > FD_SETSIZE)
+               return FALSE;
+       if (!set)
+               return FALSE;
+       return FD_ISSET(fd, set);
+}
+
+static void select_fd_closed(int fd)
+{
+       /* Slightly racy, but anything concurrently added will be closed later, and
+        * after it is_set. */
+       if (!fd_is_set(fd, &all_fds))
+               return;
+       /* We just need to stop tracking FD.  We do not need to remove it from the
+        * epoll set, since that will happen automatically on close(). */
+       uth_mutex_lock(fdset_mtx);
+       FD_CLR(fd, &all_fds);
+       uth_mutex_unlock(fdset_mtx);
+}
+
+static void select_init(void)
+{
+       static struct close_cb select_close_cb = {.func = select_fd_closed};
+
+       register_close_cb(&select_close_cb);
+       epoll_fd = epoll_create(FD_SETSIZE);
+       if (epoll_fd < 0) {
+               perror("select failed epoll_create");
+               exit(-1);
+       }
+       fdset_mtx = uth_mutex_alloc();
+       sleep_mtx = uth_mutex_alloc();
+}
+
+static int select_tv_to_ep_timeout(struct timeval *tv)
+{
+       if (!tv)
+               return -1;
+       return tv->tv_sec * 1000 + DIV_ROUND_UP(tv->tv_usec, 1000);
+}
+
+int select(int nfds, fd_set *readfds, fd_set *writefds,
+           fd_set *exceptfds, struct timeval *timeout)
+{
+       bool changed_set = FALSE;
+       struct epoll_event ep_ev;
+       struct epoll_event *ep_results;
+       int ep_timeout = select_tv_to_ep_timeout(timeout);
+
+       run_once(select_init());
+       /* good thing nfds is a signed int... */
+       if (nfds < 0) {
+               errno = EINVAL;
+               return -1;
+       }
+       uth_mutex_lock(fdset_mtx);
+       for (int i = 0; i < nfds; i++) {
+               if ((fd_is_set(i, readfds) || fd_is_set(i, writefds) ||
+                   fd_is_set(i, exceptfds)) && !fd_is_set(i, &all_fds)) {
+
+                       changed_set = TRUE;
+                       FD_SET(i, &all_fds);
+                       /* FDs that we track for *any* reason with select will be
+                        * tracked for *all* reasons with epoll. */
+                       ep_ev.events = EPOLLET | EPOLLIN | EPOLLOUT | EPOLLHUP |
+                                      EPOLLERR;
+                       ep_ev.data.fd = i;
+                       if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, i, &ep_ev)) {
+                               /* We might have failed because we tried to set up too many
+                                * FD tap types.  Listen FDs, for instance, can only be
+                                * tapped for READABLE and HANGUP.  Let's try for one of
+                                * those. */
+                               if (errno == ENOSYS) {
+                                       ep_ev.events = EPOLLET | EPOLLIN | EPOLLHUP;
+                                       if (!epoll_ctl(epoll_fd, EPOLL_CTL_ADD, i, &ep_ev))
+                                               continue;
+                               }
+                               /* Careful to unlock before calling perror.  perror calls
+                                * close, which calls our CB, which grabs the lock. */
+                               uth_mutex_unlock(fdset_mtx);
+                               perror("select epoll_ctl failed");
+                               return -1;
+                       }
+               }
+       }
+       uth_mutex_unlock(fdset_mtx);
+       /* Since we just added some FD to our tracking set, we don't know if its
+        * readable or not.  We'll only catch edge-triggered changes in the future.
+        * We can spuriously tell the user all FDs are ready, and next time they
+        * can block until there is edge activity. */
+       if (changed_set)
+               return nfds;
+       /* Since there is a global epoll set, we could have multiple threads
+        * epolling at a time and one thread could consume the events that should
+        * wake another thread.  We don't know when the 'other' thread last polled,
+        * so we'll need to assume its event was consumed and just return.  If a
+        * thread selects again and no other thread has since selected, then we know
+        * no one consumed the events.  We use an 'owner' to track which thread most
+        * recently selected.  We use a mutex so that the extra threads sleep. */
+       uth_mutex_lock(sleep_mtx);
+       if (owner != current_uthread) {
+               /* Could thrash, if we fight with another uth for owner */
+               owner = current_uthread;
+               uth_mutex_unlock(sleep_mtx);
+               return nfds;
+       }
+       /* Need to check for up to FD_SETSIZE - nfds isn't the size of all FDs
+        * tracked; it's the size of only our current select call */
+       ep_results = malloc(sizeof(struct epoll_event) * FD_SETSIZE);
+       if (!ep_results) {
+               uth_mutex_unlock(sleep_mtx);
+               errno = ENOMEM;
+               return -1;
+       }
+       /* Don't care which ones were set; we'll just tell the user they all were
+        * set.  If they can't handle that, this whole plan won't work. */
+       epoll_wait(epoll_fd, ep_results, FD_SETSIZE, ep_timeout);
+       uth_mutex_unlock(sleep_mtx);
+       free(ep_results);
+       /* TODO: consider updating timeval.  It's not mandatory (POSIX). */
+       return nfds;
+}
+
+int pselect(int nfds, fd_set *readfds, fd_set *writefds,
+            fd_set *exceptfds, const struct timespec *timeout,
+            const sigset_t *sigmask)
+{
+       int ready;
+       sigset_t origmask;
+       struct timeval local_tv, *tv = &local_tv;
+
+       if (!timeout) {
+               tv = 0;
+       } else {
+               tv->tv_sec = timeout->tv_sec;
+               tv->tv_usec = DIV_ROUND_UP(timeout->tv_nsec, 1000);
+       }
+       /* TODO: this is probably racy */
+       sigprocmask(SIG_SETMASK, sigmask, &origmask);
+       ready = select(nfds, readfds, writefds, exceptfds, tv);
+       sigprocmask(SIG_SETMASK, &origmask, NULL);
+       return ready;
+}