Update pthread futexes to use alarms for timeouts
authorKevin Klues <klueska@cs.berkeley.edu>
Fri, 21 Mar 2014 06:11:22 +0000 (23:11 -0700)
committerKevin Klues <klueska@cs.berkeley.edu>
Fri, 21 Mar 2014 06:11:22 +0000 (23:11 -0700)
One side effect of this is that the pthread library now depends on
benchutil.  We may want to revisit this dependency.

tests/futex_timeout.c [new file with mode: 0644]
user/pthread/futex.c

diff --git a/tests/futex_timeout.c b/tests/futex_timeout.c
new file mode 100644 (file)
index 0000000..fbbe46f
--- /dev/null
@@ -0,0 +1,32 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <parlib.h>
+#include <vcore.h>
+#include <futex.h>
+#include <pthread.h>
+
+#define NUM_THREADS 10
+pthread_t thandlers[NUM_THREADS];
+
+void *handler(void *arg) {
+       int id = pthread_self()->id;
+       int var = 0;
+    struct timespec timeout = {
+               .tv_sec = id,
+               .tv_nsec = 0
+       };
+       printf("Begin thread: %d\n", id);
+    futex(&var, FUTEX_WAIT, 0, &timeout, NULL, 0);
+       printf("End thread: %d\n", id);
+}
+
+int main(int argc, char **argv)
+{
+       for (int i=0; i<NUM_THREADS; i++) {
+               pthread_create(&thandlers[i], NULL, &handler, NULL);
+       }
+       for (int i=0; i<NUM_THREADS; i++) {
+               pthread_join(thandlers[i], NULL);
+       }
+       return 0;
+}
index 936ce9f..5dbcfad 100644 (file)
@@ -8,6 +8,7 @@
 #include <errno.h>
 #include <slab.h>
 #include <mcs.h>
+#include <alarm.h>
 
 static inline int futex_wake(int *uaddr, int count);
 static inline int futex_wait(int *uaddr, int val, uint64_t ms_timeout);
@@ -17,7 +18,8 @@ struct futex_element {
   TAILQ_ENTRY(futex_element) link;
   pthread_t pthread;
   int *uaddr;
-  uint64_t ms_timeout;
+  uint64_t us_timeout;
+  struct alarm_waiter awaiter;
   bool timedout;
 };
 TAILQ_HEAD(futex_queue, futex_element);
@@ -25,9 +27,6 @@ TAILQ_HEAD(futex_queue, futex_element);
 struct futex_data {
   struct mcs_pdr_lock lock;
   struct futex_queue queue;
-  int timer_enabled;
-  pthread_t timer;
-  long time;
 };
 static struct futex_data __futex;
 
@@ -35,9 +34,29 @@ static inline void futex_init()
 {
   mcs_pdr_init(&__futex.lock);
   TAILQ_INIT(&__futex.queue);
-  __futex.timer_enabled = false;
-  pthread_create(&__futex.timer, NULL, timer_thread, NULL);
-  __futex.time = 0;
+}
+
+static void __futex_timeout(struct alarm_waiter *awaiter) {
+  struct futex_element *__e = NULL;
+  struct futex_element *e = (struct futex_element*)awaiter->data;
+
+  // Atomically remove the timed-out element from the futex queue if we won the
+  // race against actually completing.
+  mcs_pdr_lock(&__futex.lock);
+  TAILQ_FOREACH(__e, &__futex.queue, link)
+    if (__e == e) break;
+  if (__e != NULL)
+    TAILQ_REMOVE(&__futex.queue, e, link);
+  mcs_pdr_unlock(&__futex.lock);
+
+  // If we removed it, restart it outside the lock
+  if (__e != NULL) {
+    uthread_runnable((struct uthread*)e->pthread);
+    e->timedout = true;
+  }
+  // Set this as the very last thing we do.  Spin on this in the wake code if
+  // we are trying to wake something that has already fired this timer.
+  awaiter->data = NULL;
 }
 
 static void __futex_block(struct uthread *uthread, void *arg) {
@@ -47,30 +66,27 @@ static void __futex_block(struct uthread *uthread, void *arg) {
   // Set the remaining properties of the futex element
   e->pthread = pthread;
   e->timedout = false;
-  // Adjust the timeout properties on the futex element
-  bool enable_timer = false;
-  if(e->ms_timeout != (uint64_t)-1) {
-    e->ms_timeout += __futex.time;
-       // If we are setting the timeout, get ready to
-       // enable the timer if it is currently disabled.
-    if(__futex.timer_enabled == false) {
-      __futex.timer_enabled = true;
-      enable_timer = true;
-    }
+
+  // Insert the futex element into the queue
+  TAILQ_INSERT_TAIL(&__futex.queue, e, link);
+
+  // Set an alarm for futex timeout if applicable
+  if(e->us_timeout != (uint64_t)-1) {
+    e->awaiter.data = e;
+    init_awaiter(&e->awaiter, __futex_timeout);
+    set_awaiter_rel(&e->awaiter, e->us_timeout);
+    set_alarm(&e->awaiter);
   }
+
   // Notify the scheduler of the type of yield we did
   __pthread_generic_yield(pthread);
   pthread->state = PTH_BLK_MUTEX;
-  // Insert the futex element into the queue
-  TAILQ_INSERT_TAIL(&__futex.queue, e, link);
+
   // Unlock the pdr_lock 
   mcs_pdr_unlock(&__futex.lock);
-  // Enable the timer if we need to outside the lock
-  if(enable_timer)
-    futex_wake(&__futex.timer_enabled, 1);
 }
 
-static inline int futex_wait(int *uaddr, int val, uint64_t ms_timeout)
+static inline int futex_wait(int *uaddr, int val, uint64_t us_timeout)
 {
   // Atomically do the following...
   mcs_pdr_lock(&__futex.lock);
@@ -79,15 +95,15 @@ static inline int futex_wait(int *uaddr, int val, uint64_t ms_timeout)
     // Create a new futex element and initialize it.
     struct futex_element e;
     e.uaddr = uaddr;
-    e.ms_timeout = ms_timeout;
+    e.us_timeout = us_timeout;
     // Yield the uthread...
-    // We set the remaining properties of the futex element, adjust the
-    // timeout, set the timer, and unlock the pdr lock on the other side.
-    // It is important that we do the unlock on the other side, because (unlike
-    // linux, etc.) its possible to get interrupted and drop into vcore context
-    // right after releasing the lock.  If that vcore code then calls
-    // futex_wake(), we would be screwed.  Doing things this way means we have
-    // to hold the lock longer, but its necessary for correctness.
+    // We set the remaining properties of the futex element, set the timer, and
+    // unlock the pdr lock on the other side.  It is important that we do the
+    // unlock on the other side, because (unlike linux, etc.) its possible to
+    // get interrupted and drop into vcore context right after releasing the
+    // lock.  If that vcore code then calls futex_wake(), we would be screwed.
+    // Doing things this way means we have to hold the lock longer, but its
+    // necessary for correctness.
     uthread_yield(TRUE, __futex_block, &e);
     // We are unlocked here!
 
@@ -115,7 +131,23 @@ static inline int futex_wake(int *uaddr, int count)
   while(e != NULL) {
     if(count > 0) {
       n = TAILQ_NEXT(e, link);
+      // If this element is blocked on uaddr
       if(e->uaddr == uaddr) {
+        // Cancel the timeout if one was set
+        if(e->us_timeout != (uint64_t)-1) {
+          // Try and unset the alarm.  If this fails, then we have already
+          // started running the alarm callback, so spin on awaiter->data being
+          // set to NULL.  The fact that we made it here though, means that WE
+          // are the one who removed e from the queue, so we are basically
+          // spinning just to make sure that no more references to awaiter
+          // exist.
+          if(!unset_alarm(&e->awaiter)) {
+            while(&e->awaiter)
+              cpu_relax();
+          } else {
+          }
+        }
+        // Remove it from the queue
         TAILQ_REMOVE(&__futex.queue, e, link);
         TAILQ_INSERT_TAIL(&q, e, link);
         count--;
@@ -137,73 +169,23 @@ static inline int futex_wake(int *uaddr, int count)
   return 0;
 }
 
-static void *timer_thread(void *arg)
-{
-  struct futex_element *e,*n = NULL;
-  struct futex_queue q = TAILQ_HEAD_INITIALIZER(q);
-
-  // Do this forever...
-  for(;;) {
-    // Block for 1 millisecond
-    sys_block(1000);
-
-    // Then atomically do the following...
-    mcs_pdr_lock(&__futex.lock);
-    // Up the time
-    __futex.time++;
-
-       // Find all futexes that have timed out on this iteration,
-       // and count those still waiting
-    int waiting = 0;
-    e = TAILQ_FIRST(&__futex.queue);
-    while(e != NULL) {
-      n = TAILQ_NEXT(e, link);
-      if(e->ms_timeout == __futex.time) {
-        e->timedout = true;
-        TAILQ_REMOVE(&__futex.queue, e, link);
-        TAILQ_INSERT_TAIL(&q, e, link);
-      }
-      else if(e->ms_timeout != (uint64_t)-1)
-        waiting++;
-      e = n;
-    }
-    // If there are no more waiting, disable the timer
-    if(waiting == 0) {
-      __futex.time = 0;
-      __futex.timer_enabled = false;
-    }
-    mcs_pdr_unlock(&__futex.lock);
-
-    // Unblock any futexes that have timed out outside the lock
-    e = TAILQ_FIRST(&q);
-    while(e != NULL) {
-      n = TAILQ_NEXT(e, link);
-      TAILQ_REMOVE(&q, e, link);
-      uthread_runnable((struct uthread*)e->pthread);
-      e = n;
-    }
-
-    // If we have disabled the timer, park this thread
-    futex_wait(&__futex.timer_enabled, false, -1);
-  }
-}
-
 int futex(int *uaddr, int op, int val,
           const struct timespec *timeout,
           int *uaddr2, int val3)
 {
-  uint64_t ms_timeout = (uint64_t)-1;
+  // Round to the nearest micro-second
+  uint64_t us_timeout = (uint64_t)-1;
   assert(uaddr2 == NULL);
   assert(val3 == 0);
   if(timeout != NULL) {
-    ms_timeout = timeout->tv_sec*1000 + timeout->tv_nsec/1000000L;
-    assert(ms_timeout > 0);
+    us_timeout = timeout->tv_sec*1000000L + timeout->tv_nsec/1000L;
+    assert(us_timeout > 0);
   }
 
   run_once(futex_init());
   switch(op) {
     case FUTEX_WAIT:
-      return futex_wait(uaddr, val, ms_timeout);
+      return futex_wait(uaddr, val, us_timeout);
     case FUTEX_WAKE:
       return futex_wake(uaddr, val);
     default: