futex: Fix buggy timeout
[akaros.git] / user / pthread / futex.c
index 5dbcfad..300b310 100644 (file)
-#include <ros/common.h>
+#include <parlib/common.h>
 #include <futex.h>
 #include <sys/queue.h>
-#include <pthread.h>
-#include <parlib.h>
-#include <assert.h>
+#include <parlib/uthread.h>
+#include <parlib/parlib.h>
+#include <parlib/assert.h>
 #include <stdio.h>
 #include <errno.h>
-#include <slab.h>
-#include <mcs.h>
-#include <alarm.h>
-
-static inline int futex_wake(int *uaddr, int count);
-static inline int futex_wait(int *uaddr, int val, uint64_t ms_timeout);
-static void *timer_thread(void *arg);
+#include <parlib/slab.h>
+#include <parlib/mcs.h>
+#include <parlib/alarm.h>
 
 struct futex_element {
-  TAILQ_ENTRY(futex_element) link;
-  pthread_t pthread;
-  int *uaddr;
-  uint64_t us_timeout;
-  struct alarm_waiter awaiter;
-  bool timedout;
+       TAILQ_ENTRY(futex_element) link;
+       int *uaddr;
+       bool on_list;
+       bool waker_using;
+       uth_cond_var_t cv;
 };
 TAILQ_HEAD(futex_queue, futex_element);
 
 struct futex_data {
-  struct mcs_pdr_lock lock;
-  struct futex_queue queue;
+       struct mcs_pdr_lock lock;
+       struct futex_queue queue;
 };
 static struct futex_data __futex;
 
-static inline void futex_init()
+static inline void futex_init(void *arg)
 {
-  mcs_pdr_init(&__futex.lock);
-  TAILQ_INIT(&__futex.queue);
-}
-
-static void __futex_timeout(struct alarm_waiter *awaiter) {
-  struct futex_element *__e = NULL;
-  struct futex_element *e = (struct futex_element*)awaiter->data;
-
-  // Atomically remove the timed-out element from the futex queue if we won the
-  // race against actually completing.
-  mcs_pdr_lock(&__futex.lock);
-  TAILQ_FOREACH(__e, &__futex.queue, link)
-    if (__e == e) break;
-  if (__e != NULL)
-    TAILQ_REMOVE(&__futex.queue, e, link);
-  mcs_pdr_unlock(&__futex.lock);
-
-  // If we removed it, restart it outside the lock
-  if (__e != NULL) {
-    uthread_runnable((struct uthread*)e->pthread);
-    e->timedout = true;
-  }
-  // Set this as the very last thing we do.  Spin on this in the wake code if
-  // we are trying to wake something that has already fired this timer.
-  awaiter->data = NULL;
+       mcs_pdr_init(&__futex.lock);
+       TAILQ_INIT(&__futex.queue);
 }
 
-static void __futex_block(struct uthread *uthread, void *arg) {
-  pthread_t pthread = (pthread_t)uthread;
-  struct futex_element *e = (struct futex_element*)arg;
-
-  // Set the remaining properties of the futex element
-  e->pthread = pthread;
-  e->timedout = false;
-
-  // Insert the futex element into the queue
-  TAILQ_INSERT_TAIL(&__futex.queue, e, link);
-
-  // Set an alarm for futex timeout if applicable
-  if(e->us_timeout != (uint64_t)-1) {
-    e->awaiter.data = e;
-    init_awaiter(&e->awaiter, __futex_timeout);
-    set_awaiter_rel(&e->awaiter, e->us_timeout);
-    set_alarm(&e->awaiter);
-  }
-
-  // Notify the scheduler of the type of yield we did
-  __pthread_generic_yield(pthread);
-  pthread->state = PTH_BLK_MUTEX;
-
-  // Unlock the pdr_lock 
-  mcs_pdr_unlock(&__futex.lock);
-}
-
-static inline int futex_wait(int *uaddr, int val, uint64_t us_timeout)
+static inline int futex_wait(int *uaddr, int val,
+                             const struct timespec *abs_timeout)
 {
-  // Atomically do the following...
-  mcs_pdr_lock(&__futex.lock);
-  // If the value of *uaddr matches val
-  if(*uaddr == val) {
-    // Create a new futex element and initialize it.
-    struct futex_element e;
-    e.uaddr = uaddr;
-    e.us_timeout = us_timeout;
-    // Yield the uthread...
-    // We set the remaining properties of the futex element, set the timer, and
-    // unlock the pdr lock on the other side.  It is important that we do the
-    // unlock on the other side, because (unlike linux, etc.) its possible to
-    // get interrupted and drop into vcore context right after releasing the
-    // lock.  If that vcore code then calls futex_wake(), we would be screwed.
-    // Doing things this way means we have to hold the lock longer, but its
-    // necessary for correctness.
-    uthread_yield(TRUE, __futex_block, &e);
-    // We are unlocked here!
-
-    // After waking, if we timed out, set the error
-    // code appropriately and return
-    if(e.timedout) {
-      errno = ETIMEDOUT;
-      return -1;
-    }
-  } else {
-      mcs_pdr_unlock(&__futex.lock);
-  }
-  return 0;
+       struct futex_element e[1];
+       bool timed_out;
+
+       mcs_pdr_lock(&__futex.lock);
+       if (*uaddr != val) {
+               mcs_pdr_unlock(&__futex.lock);
+               return 0;
+       }
+       e->uaddr = uaddr;
+       uth_cond_var_init(&e->cv);
+       e->waker_using = false;
+       e->on_list = true;
+       TAILQ_INSERT_TAIL(&__futex.queue, e, link);
+       /* Lock switch.  Any waker will grab the global lock, then grab ours.
+        * We're downgrading to the CV lock, which still protects us from
+        * missing the signal (which is someone calling Wake after changing
+        * *uaddr).  The CV code will atomically block (with timeout) and unlock
+        * the CV lock.
+        *
+        * Ordering is __futex.lock -> CV lock, but you can have the inner lock
+        * without holding the outer lock. */
+       uth_cond_var_lock(&e->cv);
+       mcs_pdr_unlock(&__futex.lock);
+
+       timed_out = !uth_cond_var_timed_wait(&e->cv, NULL, abs_timeout);
+       /* CV wait returns with the lock held, which is unneccessary for
+        * futexes.  We could use this cv lock and maybe a trylock on the futex
+        * to sync with futex_wake, instead of the current synchronization
+        * techniques with the bools. */
+       uth_cond_var_unlock(&e->cv);
+
+       /* In the common case, the waker woke us and already cleared on_list,
+        * and we'd rather not grab the __futex lock again.  Note the outer
+        * on_list check is an optimization, and we need the lock to be sure.
+        * Also note the waker sets waker_using before on_list, so if we happen
+        * to see !on_list (while the waker is mucking with the list), we'll see
+        * waker_using and spin below. */
+       if (e->on_list) {
+               mcs_pdr_lock(&__futex.lock);
+               if (e->on_list)
+                       TAILQ_REMOVE(&__futex.queue, e, link);
+               mcs_pdr_unlock(&__futex.lock);
+       }
+       rmb();  /* read on_list before waker_using */
+       /* The waker might have yanked us and is about to kick the CV.  Need to
+        * wait til they are done before freeing e. */
+       while (e->waker_using)
+               cpu_relax_any();
+
+       if (timed_out) {
+               errno = ETIMEDOUT;
+               return -1;
+       }
+       return 0;
 }
 
 static inline int futex_wake(int *uaddr, int count)
 {
-  struct futex_element *e,*n = NULL;
-  struct futex_queue q = TAILQ_HEAD_INITIALIZER(q);
-
-  // Atomically grab all relevant futex blockers
-  // from the global futex queue
-  mcs_pdr_lock(&__futex.lock);
-  e = TAILQ_FIRST(&__futex.queue);
-  while(e != NULL) {
-    if(count > 0) {
-      n = TAILQ_NEXT(e, link);
-      // If this element is blocked on uaddr
-      if(e->uaddr == uaddr) {
-        // Cancel the timeout if one was set
-        if(e->us_timeout != (uint64_t)-1) {
-          // Try and unset the alarm.  If this fails, then we have already
-          // started running the alarm callback, so spin on awaiter->data being
-          // set to NULL.  The fact that we made it here though, means that WE
-          // are the one who removed e from the queue, so we are basically
-          // spinning just to make sure that no more references to awaiter
-          // exist.
-          if(!unset_alarm(&e->awaiter)) {
-            while(&e->awaiter)
-              cpu_relax();
-          } else {
-          }
-        }
-        // Remove it from the queue
-        TAILQ_REMOVE(&__futex.queue, e, link);
-        TAILQ_INSERT_TAIL(&q, e, link);
-        count--;
-      }
-      e = n;
-    }
-    else break;
-  }
-  mcs_pdr_unlock(&__futex.lock);
-
-  // Unblock them outside the lock
-  e = TAILQ_FIRST(&q);
-  while(e != NULL) {
-    n = TAILQ_NEXT(e, link);
-    TAILQ_REMOVE(&q, e, link);
-    uthread_runnable((struct uthread*)e->pthread);
-    e = n;
-  }
-  return 0;
+       int max = count;
+       struct futex_element *e, *temp;
+       struct futex_queue q = TAILQ_HEAD_INITIALIZER(q);
+
+       /* The waiter spins on us with cpu_relax_any().  That code assumes the
+        * target of the wait/spin is in vcore context, or at least has notifs
+        * disabled. */
+       uth_disable_notifs();
+       mcs_pdr_lock(&__futex.lock);
+       TAILQ_FOREACH_SAFE(e, &__futex.queue, link, temp) {
+               if (count <= 0)
+                       break;
+               if (e->uaddr == uaddr) {
+                       e->waker_using = true;
+                       /* flag waker_using before saying !on_list */
+                       wmb();
+                       e->on_list = false;
+                       TAILQ_REMOVE(&__futex.queue, e, link);
+                       TAILQ_INSERT_TAIL(&q, e, link);
+                       count--;
+               }
+       }
+       mcs_pdr_unlock(&__futex.lock);
+       TAILQ_FOREACH_SAFE(e, &q, link, temp) {
+               TAILQ_REMOVE(&q, e, link);
+               uth_cond_var_signal(&e->cv);
+               /* Do not touch e after marking it. */
+               e->waker_using = false;
+       }
+       uth_enable_notifs();
+
+       return max - count;
 }
 
-int futex(int *uaddr, int op, int val,
-          const struct timespec *timeout,
+int futex(int *uaddr, int op, int val, const struct timespec *timeout,
           int *uaddr2, int val3)
 {
-  // Round to the nearest micro-second
-  uint64_t us_timeout = (uint64_t)-1;
-  assert(uaddr2 == NULL);
-  assert(val3 == 0);
-  if(timeout != NULL) {
-    us_timeout = timeout->tv_sec*1000000L + timeout->tv_nsec/1000L;
-    assert(us_timeout > 0);
-  }
-
-  run_once(futex_init());
-  switch(op) {
-    case FUTEX_WAIT:
-      return futex_wait(uaddr, val, us_timeout);
-    case FUTEX_WAKE:
-      return futex_wake(uaddr, val);
-    default:
-      errno = ENOSYS;
-      return -1;
-  }
-  return -1;
+       static parlib_once_t once = PARLIB_ONCE_INIT;
+       struct timespec abs_timeout[1];
+
+       parlib_run_once(&once, futex_init, NULL);
+       assert(uaddr2 == NULL);
+       assert(val3 == 0);
+
+       /* futex timeouts are relative.  Internally, we use absolute timeouts */
+       if (timeout) {
+               clock_gettime(CLOCK_MONOTONIC, abs_timeout);
+               /* timespec_add is available inside glibc, but not out here. */
+               abs_timeout->tv_sec += timeout->tv_sec;
+               abs_timeout->tv_nsec += timeout->tv_nsec;
+               if (abs_timeout->tv_nsec >= 1000000000) {
+                       abs_timeout->tv_nsec -= 1000000000;
+                       abs_timeout->tv_sec++;
+               }
+       }
+
+       switch (op) {
+       case FUTEX_WAIT:
+               return futex_wait(uaddr, val, timeout ? abs_timeout : NULL);
+       case FUTEX_WAKE:
+               return futex_wake(uaddr, val);
+       default:
+               errno = ENOSYS;
+               return -1;
+       }
+       return -1;
 }
-