Figure out where to install-libs
[akaros.git] / user / pthread / futex.c
index 381a9ef..3ada36d 100644 (file)
@@ -1,24 +1,32 @@
 #include <ros/common.h>
 #include <ros/common.h>
-#include <ros/futex.h>
+#include <futex.h>
 #include <sys/queue.h>
 #include <pthread.h>
 #include <sys/queue.h>
 #include <pthread.h>
+#include <parlib.h>
 #include <assert.h>
 #include <stdio.h>
 #include <errno.h>
 #include <slab.h>
 #include <mcs.h>
 #include <assert.h>
 #include <stdio.h>
 #include <errno.h>
 #include <slab.h>
 #include <mcs.h>
+#include <alarm.h>
+
+static inline int futex_wake(int *uaddr, int count);
+static inline int futex_wait(int *uaddr, int val, uint64_t ms_timeout);
+static void *timer_thread(void *arg);
 
 struct futex_element {
   TAILQ_ENTRY(futex_element) link;
   pthread_t pthread;
   int *uaddr;
 
 struct futex_element {
   TAILQ_ENTRY(futex_element) link;
   pthread_t pthread;
   int *uaddr;
+  uint64_t us_timeout;
+  struct alarm_waiter awaiter;
+  bool timedout;
 };
 TAILQ_HEAD(futex_queue, futex_element);
 
 struct futex_data {
   struct mcs_pdr_lock lock;
   struct futex_queue queue;
 };
 TAILQ_HEAD(futex_queue, futex_element);
 
 struct futex_data {
   struct mcs_pdr_lock lock;
   struct futex_queue queue;
-  struct kmem_cache *element_cache;
 };
 static struct futex_data __futex;
 
 };
 static struct futex_data __futex;
 
@@ -26,37 +34,113 @@ static inline void futex_init()
 {
   mcs_pdr_init(&__futex.lock);
   TAILQ_INIT(&__futex.queue);
 {
   mcs_pdr_init(&__futex.lock);
   TAILQ_INIT(&__futex.queue);
-  __futex.element_cache = kmem_cache_create("futex_element_cache", 
-    sizeof(struct futex_element), __alignof__(struct futex_element),
-    0, NULL, NULL);
+}
+
+static void __futex_timeout(struct alarm_waiter *awaiter) {
+  struct futex_element *__e = NULL;
+  struct futex_element *e = (struct futex_element*)awaiter->data;
+  //printf("timeout fired: %p\n", e->uaddr);
+
+  // Atomically remove the timed-out element from the futex queue if we won the
+  // race against actually completing.
+  mcs_pdr_lock(&__futex.lock);
+  TAILQ_FOREACH(__e, &__futex.queue, link)
+    if (__e == e) break;
+  if (__e != NULL)
+    TAILQ_REMOVE(&__futex.queue, e, link);
+  mcs_pdr_unlock(&__futex.lock);
+
+  // If we removed it, restart it outside the lock
+  if (__e != NULL) {
+    e->timedout = true;
+    //printf("timeout: %p\n", e->uaddr);
+    uthread_runnable((struct uthread*)e->pthread);
+  }
+  // Set this as the very last thing we do whether we successfully woke the
+  // thread blocked on the futex or not.  Either we set this or wake() sets
+  // this, not both.  Spin on this in the bottom-half of the wait() code to
+  // ensure there are no more references to awaiter before freeing the memory
+  // for it.
+  e->awaiter.data = NULL;
 }
 
 static void __futex_block(struct uthread *uthread, void *arg) {
 }
 
 static void __futex_block(struct uthread *uthread, void *arg) {
+  pthread_t pthread = (pthread_t)uthread;
   struct futex_element *e = (struct futex_element*)arg;
   struct futex_element *e = (struct futex_element*)arg;
-  e->pthread = (pthread_t)uthread;
-  e->pthread->state = PTH_BLK_MUTEX;
+
+  // Set the remaining properties of the futex element
+  e->pthread = pthread;
+  e->timedout = false;
+
+  // Insert the futex element into the queue
   TAILQ_INSERT_TAIL(&__futex.queue, e, link);
   TAILQ_INSERT_TAIL(&__futex.queue, e, link);
+
+  // Set an alarm for the futex timeout if applicable
+  if(e->us_timeout != (uint64_t)-1) {
+    e->awaiter.data = e;
+    init_awaiter(&e->awaiter, __futex_timeout);
+    set_awaiter_rel(&e->awaiter, e->us_timeout);
+    //printf("timeout set: %p\n", e->uaddr);
+    set_alarm(&e->awaiter);
+  }
+
+  // Notify the scheduler of the type of yield we did
+  __pthread_generic_yield(pthread);
+  pthread->state = PTH_BLK_MUTEX;
+
+  // Unlock the pdr_lock 
   mcs_pdr_unlock(&__futex.lock);
 }
 
   mcs_pdr_unlock(&__futex.lock);
 }
 
-static inline int futex_wait(int *uaddr, int val)
+static inline int futex_wait(int *uaddr, int val, uint64_t us_timeout)
 {
 {
+  // Atomically do the following...
   mcs_pdr_lock(&__futex.lock);
   mcs_pdr_lock(&__futex.lock);
+  // If the value of *uaddr matches val
   if(*uaddr == val) {
   if(*uaddr == val) {
-    // We unlock in the body of __futex_block
-    struct futex_element *e = kmem_cache_alloc(__futex.element_cache, 0); 
-    e->uaddr = uaddr;
-    uthread_yield(TRUE, __futex_block, e);
-  }
-  else {
-    mcs_pdr_unlock(&__futex.lock);
+    //printf("wait: %p, %d\n", uaddr, us_timeout);
+    // Create a new futex element and initialize it.
+    struct futex_element e;
+    e.uaddr = uaddr;
+    e.us_timeout = us_timeout;
+    // Yield the uthread...
+    // We set the remaining properties of the futex element, set the timeout
+    // timer, and unlock the pdr lock on the other side.  It is important that
+    // we do the unlock on the other side, because (unlike linux, etc.) its
+    // possible to get interrupted and drop into vcore context right after
+    // releasing the lock.  If that vcore code then calls futex_wake(), we
+    // would be screwed.  Doing things this way means we have to hold the lock
+    // longer, but its necessary for correctness.
+    uthread_yield(TRUE, __futex_block, &e);
+    // We are unlocked here!
+
+    // If this futex had a timeout, spin briefly to make sure that all
+    // references to e are gone between the wake() and the timeout() code. We
+    // use e.awaiter.data to do this.
+    if(e.us_timeout != (uint64_t)-1)
+      while (e.awaiter.data != NULL)
+        cpu_relax();
+
+    // After waking, if we timed out, set the error
+    // code appropriately and return
+    if(e.timedout) {
+      errno = ETIMEDOUT;
+      return -1;
+    }
+  } else {
+      mcs_pdr_unlock(&__futex.lock);
   }
   return 0;
 }
 
 static inline int futex_wake(int *uaddr, int count)
 {
   }
   return 0;
 }
 
 static inline int futex_wake(int *uaddr, int count)
 {
+  int max = count;
   struct futex_element *e,*n = NULL;
   struct futex_element *e,*n = NULL;
+  struct futex_queue q = TAILQ_HEAD_INITIALIZER(q);
+
+  // Atomically grab all relevant futex blockers
+  // from the global futex queue
   mcs_pdr_lock(&__futex.lock);
   e = TAILQ_FIRST(&__futex.queue);
   while(e != NULL) {
   mcs_pdr_lock(&__futex.lock);
   e = TAILQ_FIRST(&__futex.queue);
   while(e != NULL) {
@@ -64,8 +148,7 @@ static inline int futex_wake(int *uaddr, int count)
       n = TAILQ_NEXT(e, link);
       if(e->uaddr == uaddr) {
         TAILQ_REMOVE(&__futex.queue, e, link);
       n = TAILQ_NEXT(e, link);
       if(e->uaddr == uaddr) {
         TAILQ_REMOVE(&__futex.queue, e, link);
-        uthread_runnable((struct uthread*)e->pthread);
-        kmem_cache_free(__futex.element_cache, e); 
+        TAILQ_INSERT_TAIL(&q, e, link);
         count--;
       }
       e = n;
         count--;
       }
       e = n;
@@ -73,20 +156,51 @@ static inline int futex_wake(int *uaddr, int count)
     else break;
   }
   mcs_pdr_unlock(&__futex.lock);
     else break;
   }
   mcs_pdr_unlock(&__futex.lock);
-  return 0;
+
+  // Unblock them outside the lock
+  e = TAILQ_FIRST(&q);
+  while(e != NULL) {
+    n = TAILQ_NEXT(e, link);
+    TAILQ_REMOVE(&q, e, link);
+    // Cancel the timeout if one was set
+    if(e->us_timeout != (uint64_t)-1) {
+      // Try and unset the alarm.  If this fails, then we have already
+      // started running the alarm callback.  If it succeeds, then we can
+      // set awaiter->data to NULL so that the bottom half of wake can
+      // proceed. Either we set awaiter->data to NULL or __futex_timeout
+      // does. The fact that we made it here though, means that WE are the
+      // one who removed e from the queue, so we are basically just
+      // deciding who should set awaiter->data to NULL to indicate that
+      // there are no more references to it.
+      if(unset_alarm(&e->awaiter)) {
+        //printf("timeout canceled: %p\n", e->uaddr);
+        e->awaiter.data = NULL;
+      }
+    }
+    //printf("wake: %p\n", uaddr);
+    uthread_runnable((struct uthread*)e->pthread);
+    e = n;
+  }
+  return max-count;
 }
 
 }
 
-int futex(int *uaddr, int op, int val, const struct timespec *timeout,
-                 int *uaddr2, int val3)
+int futex(int *uaddr, int op, int val,
+          const struct timespec *timeout,
+          int *uaddr2, int val3)
 {
 {
-  assert(timeout == NULL);
+  // Round to the nearest micro-second
+  uint64_t us_timeout = (uint64_t)-1;
   assert(uaddr2 == NULL);
   assert(val3 == 0);
   assert(uaddr2 == NULL);
   assert(val3 == 0);
+  if(timeout != NULL) {
+    us_timeout = timeout->tv_sec*1000000L + timeout->tv_nsec/1000L;
+    assert(us_timeout > 0);
+  }
 
 
-  run_once_safe(futex_init());
+  run_once(futex_init());
   switch(op) {
     case FUTEX_WAIT:
   switch(op) {
     case FUTEX_WAIT:
-      return futex_wait(uaddr, val);
+      return futex_wait(uaddr, val, us_timeout);
     case FUTEX_WAKE:
       return futex_wake(uaddr, val);
     default:
     case FUTEX_WAKE:
       return futex_wake(uaddr, val);
     default: