futex: Implement futexes with CVs
authorBarret Rhoden <brho@cs.berkeley.edu>
Mon, 15 Oct 2018 20:24:50 +0000 (16:24 -0400)
committerBarret Rhoden <brho@cs.berkeley.edu>
Mon, 22 Oct 2018 23:35:50 +0000 (19:35 -0400)
This version is similar to the old one in that we still have a single
futex_element per futex_wait call, and there is a linked list of these
elements.  Future versions could use a hash table or something.

You could even consider having M:1 waiters to addresses, with those
structs in a hash table.  If you do this, consider keeping the futex
elements forever.

The thing to be careful of is the lifetime of the futex element.  This
is something the old code had a hard time with, and this new code is no
exception.

The old version had hand-rolled the timeout.  Although the new code
avoids the duplicated issues there, the one benefit of the timeout was
that it yanked 'e' off the list, such that if the futex_wait() uthread
was running, then we knew it was off the list.  In the current version,
we must yank it off the list, and then wait for any users (futex_wake())
to be done with it.  Fun with shared memory synchronization!  The
current approach is similar to the old awaiter.data = NULL approach, but
this is more explicit.

Here are a few other things I considered:
- Put the futex element in the uthread/pthread: our thread could exit,
  instead of just unwinding the stack, so this is broken.  It's still a
use-after-free issue.

- Have a custom timeout with the CV, and have the timeout just do
  futex_wake: Not a bad idea.  I wanted to use the CV infrastructure
instead of rolling our own.  We'd also need a bit to signal that the
wakeup was a timeout.  Also, in future versions, I want to use a single
CV for multiple waiters, and this would break that.

- Never have the waker yank the CV element, just kick the CV: We still
  have the lifetime issues.  Even if we know we didn't timeout, we still
could have another waker (consider multiple threads calling
futex_wake()).  The root cause here is that the waker kicks the CVs
outside the futex lock, which was an attempt to cut down on the number
of locks held across 2LS ops.  Though note the CV lock *is* held across
some 2LS ops (has_blocked).

- Have the waker grab the CV lock while holding the futex lock, in an
  attempt to transfer the lock.  The rule is to hold both locks when
adding/removing an element from the list: That would work, but would
require the waiter to do a trylock (lock ordering), and handling the
fallback (unlock CV, then lock for real).  I'd like to avoid gratuitous
global locking.

- Don't have elements at all, use a single CV: we want to wake based on
  uaddr, which would require knowing the waiters that waited on a
specific addr.  A CV per addr works.  (Right now, we have a CV per
waiter).

Signed-off-by: Barret Rhoden <brho@cs.berkeley.edu>
user/pthread/futex.c

index 662b53b..12d4355 100644 (file)
 #include <parlib/mcs.h>
 #include <parlib/alarm.h>
 
-static inline int futex_wake(int *uaddr, int count);
-static inline int futex_wait(int *uaddr, int val, uint64_t ms_timeout);
-static void *timer_thread(void *arg);
-
 struct futex_element {
-  TAILQ_ENTRY(futex_element) link;
-  struct uthread *uthread;
-  int *uaddr;
-  uint64_t us_timeout;
-  struct alarm_waiter awaiter;
-  bool timedout;
+       TAILQ_ENTRY(futex_element) link;
+       int *uaddr;
+       bool on_list;
+       bool waker_using;
+       uth_cond_var_t cv;
 };
 TAILQ_HEAD(futex_queue, futex_element);
 
 struct futex_data {
-  struct mcs_pdr_lock lock;
-  struct futex_queue queue;
+       struct mcs_pdr_lock lock;
+       struct futex_queue queue;
 };
 static struct futex_data __futex;
 
 static inline void futex_init(void *arg)
 {
-  mcs_pdr_init(&__futex.lock);
-  TAILQ_INIT(&__futex.queue);
-}
-
-static void __futex_timeout(struct alarm_waiter *awaiter) {
-  struct futex_element *__e = NULL;
-  struct futex_element *e = (struct futex_element*)awaiter->data;
-  //printf("timeout fired: %p\n", e->uaddr);
-
-  // Atomically remove the timed-out element from the futex queue if we won the
-  // race against actually completing.
-  mcs_pdr_lock(&__futex.lock);
-  TAILQ_FOREACH(__e, &__futex.queue, link)
-    if (__e == e) break;
-  if (__e != NULL)
-    TAILQ_REMOVE(&__futex.queue, e, link);
-  mcs_pdr_unlock(&__futex.lock);
-
-  // If we removed it, restart it outside the lock
-  if (__e != NULL) {
-    e->timedout = true;
-    //printf("timeout: %p\n", e->uaddr);
-    uthread_runnable(e->uthread);
-  }
+       mcs_pdr_init(&__futex.lock);
+       TAILQ_INIT(&__futex.queue);
 }
 
-static void __futex_block(struct uthread *uthread, void *arg) {
-  struct futex_element *e = (struct futex_element*)arg;
-
-  // Set the remaining properties of the futex element
-  e->uthread = uthread;
-  e->timedout = false;
-
-  // Insert the futex element into the queue
-  TAILQ_INSERT_TAIL(&__futex.queue, e, link);
-
-  // Set an alarm for the futex timeout if applicable
-  if(e->us_timeout != (uint64_t)-1) {
-    e->awaiter.data = e;
-    init_awaiter(&e->awaiter, __futex_timeout);
-    set_awaiter_rel(&e->awaiter, e->us_timeout);
-    //printf("timeout set: %p\n", e->uaddr);
-    set_alarm(&e->awaiter);
-  }
-
-  // Notify the scheduler of the type of yield we did
-  uthread_has_blocked(uthread, UTH_EXT_BLK_MUTEX);
-
-  // Unlock the pdr_lock 
-  mcs_pdr_unlock(&__futex.lock);
-}
-
-static inline int futex_wait(int *uaddr, int val, uint64_t us_timeout)
+static inline int futex_wait(int *uaddr, int val,
+                             const struct timespec *abs_timeout)
 {
-  // Atomically do the following...
-  mcs_pdr_lock(&__futex.lock);
-  // If the value of *uaddr matches val
-  if(*uaddr == val) {
-    //printf("wait: %p, %d\n", uaddr, us_timeout);
-    // Create a new futex element and initialize it.
-    struct futex_element e;
-    e.uaddr = uaddr;
-    e.us_timeout = us_timeout;
-    // Yield the uthread...
-    // We set the remaining properties of the futex element, set the timeout
-    // timer, and unlock the pdr lock on the other side.  It is important that
-    // we do the unlock on the other side, because (unlike linux, etc.) its
-    // possible to get interrupted and drop into vcore context right after
-    // releasing the lock.  If that vcore code then calls futex_wake(), we
-    // would be screwed.  Doing things this way means we have to hold the lock
-    // longer, but its necessary for correctness.
-    uthread_yield(TRUE, __futex_block, &e);
-    // We are unlocked here!
-
-       // Unset ensures the timeout won't happen, and if it did, that the alarm
-       // service is done with the awaiter
-    if(e.us_timeout != (uint64_t)-1)
-         unset_alarm(&e.awaiter);
-
-    // After waking, if we timed out, set the error
-    // code appropriately and return
-    if(e.timedout) {
-      errno = ETIMEDOUT;
-      return -1;
-    }
-  } else {
-      mcs_pdr_unlock(&__futex.lock);
-  }
-  return 0;
+       struct futex_element e[1];
+       bool timed_out;
+
+       mcs_pdr_lock(&__futex.lock);
+       if (*uaddr != val) {
+               mcs_pdr_unlock(&__futex.lock);
+               return 0;
+       }
+       e->uaddr = uaddr;
+       uth_cond_var_init(&e->cv);
+       e->waker_using = false;
+       e->on_list = true;
+       TAILQ_INSERT_TAIL(&__futex.queue, e, link);
+       /* Lock switch.  Any waker will grab the global lock, then grab ours.
+        * We're downgrading to the CV lock, which still protects us from
+        * missing the signal (which is someone calling Wake after changing
+        * *uaddr).  The CV code will atomically block (with timeout) and unlock
+        * the CV lock.
+        *
+        * Ordering is __futex.lock -> CV lock, but you can have the inner lock
+        * without holding the outer lock. */
+       uth_cond_var_lock(&e->cv);
+       mcs_pdr_unlock(&__futex.lock);
+
+       timed_out = !uth_cond_var_timed_wait(&e->cv, NULL, abs_timeout);
+       /* CV wait returns with the lock held, which is unneccessary for
+        * futexes.  We could use this cv lock and maybe a trylock on the futex
+        * to sync with futex_wake, instead of the current synchronization
+        * techniques with the bools. */
+       uth_cond_var_unlock(&e->cv);
+
+       /* In the common case, the waker woke us and already cleared on_list,
+        * and we'd rather not grab the __futex lock again.  Note the outer
+        * on_list check is an optimization, and we need the lock to be sure.
+        * Also note the waker sets waker_using before on_list, so if we happen
+        * to see !on_list (while the waker is mucking with the list), we'll see
+        * waker_using and spin below. */
+       if (e->on_list) {
+               mcs_pdr_lock(&__futex.lock);
+               if (e->on_list)
+                       TAILQ_REMOVE(&__futex.queue, e, link);
+               mcs_pdr_unlock(&__futex.lock);
+       }
+       rmb();  /* read on_list before waker_using */
+       /* The waker might have yanked us and is about to kick the CV.  Need to
+        * wait til they are done before freeing e. */
+       while (e->waker_using)
+               cpu_relax_any();
+
+       if (timed_out) {
+               errno = ETIMEDOUT;
+               return -1;
+       }
+       return 0;
 }
 
 static inline int futex_wake(int *uaddr, int count)
 {
-  int max = count;
-  struct futex_element *e,*n = NULL;
-  struct futex_queue q = TAILQ_HEAD_INITIALIZER(q);
-
-  // Atomically grab all relevant futex blockers
-  // from the global futex queue
-  mcs_pdr_lock(&__futex.lock);
-  e = TAILQ_FIRST(&__futex.queue);
-  while(e != NULL) {
-    if(count > 0) {
-      n = TAILQ_NEXT(e, link);
-      if(e->uaddr == uaddr) {
-        TAILQ_REMOVE(&__futex.queue, e, link);
-        TAILQ_INSERT_TAIL(&q, e, link);
-        count--;
-      }
-      e = n;
-    }
-    else break;
-  }
-  mcs_pdr_unlock(&__futex.lock);
-
-  // Unblock them outside the lock
-  e = TAILQ_FIRST(&q);
-  while(e != NULL) {
-    n = TAILQ_NEXT(e, link);
-    TAILQ_REMOVE(&q, e, link);
-    uthread_runnable(e->uthread);
-    e = n;
-  }
-  return max-count;
+       int max = count;
+       struct futex_element *e, *temp;
+       struct futex_queue q = TAILQ_HEAD_INITIALIZER(q);
+
+       mcs_pdr_lock(&__futex.lock);
+       TAILQ_FOREACH_SAFE(e, &__futex.queue, link, temp) {
+               if (count <= 0)
+                       break;
+               if (e->uaddr == uaddr) {
+                       e->waker_using = true;
+                       /* flag waker_using before saying !on_list */
+                       wmb();
+                       e->on_list = false;
+                       TAILQ_REMOVE(&__futex.queue, e, link);
+                       TAILQ_INSERT_TAIL(&q, e, link);
+                       count--;
+               }
+       }
+       mcs_pdr_unlock(&__futex.lock);
+
+       TAILQ_FOREACH_SAFE(e, &q, link, temp) {
+               TAILQ_REMOVE(&q, e, link);
+               uth_cond_var_signal(&e->cv);
+               /* Do not touch e after marking it. */
+               e->waker_using = false;
+       }
+
+       return max - count;
 }
 
-int futex(int *uaddr, int op, int val,
-          const struct timespec *timeout,
+int futex(int *uaddr, int op, int val, const struct timespec *timeout,
           int *uaddr2, int val3)
 {
-  static parlib_once_t once = PARLIB_ONCE_INIT;
-
-  parlib_run_once(&once, futex_init, NULL);
-  // Round to the nearest micro-second
-  uint64_t us_timeout = (uint64_t)-1;
-  assert(uaddr2 == NULL);
-  assert(val3 == 0);
-  if(timeout != NULL) {
-    us_timeout = timeout->tv_sec*1000000L + timeout->tv_nsec/1000L;
-    assert(us_timeout > 0);
-  }
-  switch(op) {
-    case FUTEX_WAIT:
-      return futex_wait(uaddr, val, us_timeout);
-    case FUTEX_WAKE:
-      return futex_wake(uaddr, val);
-    default:
-      errno = ENOSYS;
-      return -1;
-  }
-  return -1;
+       static parlib_once_t once = PARLIB_ONCE_INIT;
+       struct timespec abs_timeout[1];
+
+       parlib_run_once(&once, futex_init, NULL);
+       assert(uaddr2 == NULL);
+       assert(val3 == 0);
+
+       /* futex timeouts are relative.  Internally, we use absolute timeouts */
+       if (timeout) {
+               clock_gettime(CLOCK_MONOTONIC, abs_timeout);
+               /* timespec_add is available inside glibc, but not out here. */
+               abs_timeout->tv_sec += timeout->tv_sec;
+               abs_timeout->tv_nsec += timeout->tv_nsec;
+               if (abs_timeout->tv_nsec >= 1000000000) {
+                       abs_timeout->tv_nsec -= 1000000000;
+                       abs_timeout->tv_sec++;
+               }
+       }
+
+       switch (op) {
+       case FUTEX_WAIT:
+               return futex_wait(uaddr, val, abs_timeout);
+       case FUTEX_WAKE:
+               return futex_wake(uaddr, val);
+       default:
+               errno = ENOSYS;
+               return -1;
+       }
+       return -1;
 }
-