Add basic timeout functionality to pthread futexes
authorKevin Klues <klueska@cs.berkeley.edu>
Tue, 30 Jul 2013 03:28:56 +0000 (20:28 -0700)
committerKevin Klues <klueska@cs.berkeley.edu>
Tue, 30 Jul 2013 04:32:45 +0000 (21:32 -0700)
In the current implementation we simply launch a helper pthread that
calls sys_block(1000) in a tight loop.  It wakes up every millisecond
and checks if there are any futexes whose timeouts have expired and it
wakes them up. If there are no futexes waiting on a timer, we park the
pthread and wait until another futex is waited on that has timeout.

user/pthread/futex.c

index 2d459a0..1cf3bb1 100644 (file)
@@ -2,22 +2,32 @@
 #include <futex.h>
 #include <sys/queue.h>
 #include <pthread.h>
+#include <parlib.h>
 #include <assert.h>
 #include <stdio.h>
 #include <errno.h>
 #include <slab.h>
 #include <mcs.h>
 
+static inline int futex_wake(int *uaddr, int count);
+static inline int futex_wait(int *uaddr, int val, uint64_t ms_timeout);
+static void *timer_thread(void *arg);
+
 struct futex_element {
   TAILQ_ENTRY(futex_element) link;
   pthread_t pthread;
   int *uaddr;
+  uint64_t ms_timeout;
+  bool timedout;
 };
 TAILQ_HEAD(futex_queue, futex_element);
 
 struct futex_data {
   struct mcs_pdr_lock lock;
   struct futex_queue queue;
+  int timer_enabled;
+  pthread_t timer;
+  long time;
 };
 static struct futex_data __futex;
 
@@ -25,6 +35,9 @@ static inline void futex_init()
 {
   mcs_pdr_init(&__futex.lock);
   TAILQ_INIT(&__futex.queue);
+  __futex.timer_enabled = false;
+  pthread_create(&__futex.timer, NULL, timer_thread, NULL);
+  __futex.time = 0;
 }
 
 static void __futex_block(struct uthread *uthread, void *arg) {
@@ -35,16 +48,45 @@ static void __futex_block(struct uthread *uthread, void *arg) {
   e->pthread = pthread;
 }
 
-static inline int futex_wait(int *uaddr, int val)
+static inline int futex_wait(int *uaddr, int val, uint64_t ms_timeout)
 {
+  // Atomically do the following...
   mcs_pdr_lock(&__futex.lock);
+  // If the value of *uaddr matches val
   if(*uaddr == val) {
+    // Create a new futex element and initialize it.
     struct futex_element e;
+    bool enable_timer = false;
     e.uaddr = uaddr;
     e.pthread = NULL;
+    e.ms_timeout = ms_timeout;
+    e.timedout = false;
+    if(e.ms_timeout != (uint64_t)-1) {
+      e.ms_timeout += __futex.time;
+         // If we are setting the timeout, get ready to
+         // enable the timer if it is currently disabled.
+      if(__futex.timer_enabled == false) {
+        __futex.timer_enabled = true;
+        enable_timer = true;
+      }
+    }
+    // Insert the futex element into the queue
     TAILQ_INSERT_TAIL(&__futex.queue, &e, link);
     mcs_pdr_unlock(&__futex.lock);
+
+    // Enable the timer if we need to outside the lock
+    if(enable_timer)
+      futex_wake(&__futex.timer_enabled, 1);
+
+    // Yield the current uthread
     uthread_yield(TRUE, __futex_block, &e);
+
+       // After waking, if we timed out, set the error
+       // code appropriately and return
+    if(e.timedout) {
+      errno = ETIMEDOUT;
+      return -1;
+    }
   }
   else {
     mcs_pdr_unlock(&__futex.lock);
@@ -88,17 +130,75 @@ static inline int futex_wake(int *uaddr, int count)
   return 0;
 }
 
-int futex(int *uaddr, int op, int val, const struct timespec *timeout,
-                 int *uaddr2, int val3)
+static void *timer_thread(void *arg)
 {
-  assert(timeout == NULL);
+  struct futex_element *e,*n = NULL;
+  struct futex_queue q = TAILQ_HEAD_INITIALIZER(q);
+
+  // Do this forever...
+  for(;;) {
+    // Block for 1 millisecond
+    sys_block(1000);
+
+    // Then atomically do the following...
+    mcs_pdr_lock(&__futex.lock);
+    // Up the time
+    __futex.time++;
+
+       // Find all futexes that have timed out on this iteration,
+       // and count those still waiting
+    int waiting = 0;
+    e = TAILQ_FIRST(&__futex.queue);
+    while(e != NULL) {
+      n = TAILQ_NEXT(e, link);
+      if(e->ms_timeout == __futex.time) {
+        e->timedout = true;
+        TAILQ_REMOVE(&__futex.queue, e, link);
+        TAILQ_INSERT_TAIL(&q, e, link);
+      }
+      else if(e->ms_timeout != (uint64_t)-1)
+        waiting++;
+      e = n;
+    }
+    // If there are no more waiting, disable the timer
+    if(waiting == 0) {
+      __futex.time = 0;
+      __futex.timer_enabled = false;
+    }
+    mcs_pdr_unlock(&__futex.lock);
+
+    // Unblock any futexes that have timed out outside the lock
+    e = TAILQ_FIRST(&q);
+    while(e != NULL) {
+      n = TAILQ_NEXT(e, link);
+      TAILQ_REMOVE(&q, e, link);
+      while(e->pthread == NULL)
+        cpu_relax();
+      uthread_runnable((struct uthread*)e->pthread);
+      e = n;
+    }
+
+    // If we have disabled the timer, park this thread
+    futex_wait(&__futex.timer_enabled, false, -1);
+  }
+}
+
+int futex(int *uaddr, int op, int val,
+          const struct timespec *timeout,
+          int *uaddr2, int val3)
+{
+  uint64_t ms_timeout = (uint64_t)-1;
   assert(uaddr2 == NULL);
   assert(val3 == 0);
+  if(timeout != NULL) {
+    ms_timeout = timeout->tv_sec*1000 + timeout->tv_nsec/1000000L;
+    assert(ms_timeout > 0);
+  }
 
   run_once(futex_init());
   switch(op) {
     case FUTEX_WAIT:
-      return futex_wait(uaddr, val);
+      return futex_wait(uaddr, val, ms_timeout);
     case FUTEX_WAKE:
       return futex_wake(uaddr, val);
     default: