parlib: Add 'timed' functions for sems/mtxs/cvs
authorBarret Rhoden <brho@cs.berkeley.edu>
Fri, 7 Apr 2017 15:23:58 +0000 (11:23 -0400)
committerBarret Rhoden <brho@cs.berkeley.edu>
Wed, 3 May 2017 16:13:02 +0000 (12:13 -0400)
These allow waiters to timeout while waiting on their sync objects.  GCC's
threading interface needs them, and in general they are fairly common
operations.

One nice thing is that all of the waiting operations can use the same
mechanism.  Thanks to the generic sync_object, CVs and semaphores (and thus
mutexes) use the same alarm handler + callbacks.

We can use the timed versions to implement the untimed versions.  The
differences are very minor (e.g., just branches in the contended mutex
case).  GCC didn't need a CV-timed-recurse-wait function, but it was easy
to do, and stylistically the same as the others.

Note that the timeouts for CVs apply only to the CV, not the mutex.  If the
CV times out, you will still have the mutex locked when you return.  This
is also the expected pthread semantics:

When such timeouts occur, pthread_cond_timedwait() shall
nonetheless release and re-acquire the mutex referenced by mutex.

Signed-off-by: Barret Rhoden <brho@cs.berkeley.edu>
user/parlib/include/parlib/uthread.h
user/parlib/mutex.c
user/utest/cv.c

index c0a7c0a..03ec741 100644 (file)
@@ -6,6 +6,7 @@
 #include <parlib/parlib.h>
 #include <ros/syscall.h>
 #include <sys/queue.h>
+#include <time.h>
 
 __BEGIN_DECLS
 
@@ -201,6 +202,8 @@ void uth_semaphore_init(uth_semaphore_t *sem, unsigned int count);
 void uth_semaphore_destroy(uth_semaphore_t *sem);
 uth_semaphore_t *uth_semaphore_alloc(unsigned int count);
 void uth_semaphore_free(uth_semaphore_t *sem);
+bool uth_semaphore_timed_down(uth_semaphore_t *sem,
+                              const struct timespec *abs_timeout);
 void uth_semaphore_down(uth_semaphore_t *sem);
 bool uth_semaphore_trydown(uth_semaphore_t *sem);
 void uth_semaphore_up(uth_semaphore_t *sem);
@@ -209,6 +212,7 @@ void uth_mutex_init(uth_mutex_t *m);
 void uth_mutex_destroy(uth_mutex_t *m);
 uth_mutex_t *uth_mutex_alloc(void);
 void uth_mutex_free(uth_mutex_t *m);
+bool uth_mutex_timed_lock(uth_mutex_t *m, const struct timespec *abs_timeout);
 void uth_mutex_lock(uth_mutex_t *m);
 bool uth_mutex_trylock(uth_mutex_t *m);
 void uth_mutex_unlock(uth_mutex_t *m);
@@ -217,6 +221,8 @@ void uth_recurse_mutex_init(uth_recurse_mutex_t *r_m);
 void uth_recurse_mutex_destroy(uth_recurse_mutex_t *r_m);
 uth_recurse_mutex_t *uth_recurse_mutex_alloc(void);
 void uth_recurse_mutex_free(uth_recurse_mutex_t *r_m);
+bool uth_recurse_mutex_timed_lock(uth_recurse_mutex_t *m,
+                                  const struct timespec *abs_timeout);
 void uth_recurse_mutex_lock(uth_recurse_mutex_t *r_m);
 bool uth_recurse_mutex_trylock(uth_recurse_mutex_t *r_m);
 void uth_recurse_mutex_unlock(uth_recurse_mutex_t *r_m);
@@ -228,7 +234,12 @@ void uth_cond_var_init(uth_cond_var_t *cv);
 void uth_cond_var_destroy(uth_cond_var_t *cv);
 uth_cond_var_t *uth_cond_var_alloc(void);
 void uth_cond_var_free(uth_cond_var_t *cv);
+bool uth_cond_var_timed_wait(uth_cond_var_t *cv, uth_mutex_t *m,
+                             const struct timespec *abs_timeout);
 void uth_cond_var_wait(uth_cond_var_t *cv, uth_mutex_t *m);
+bool uth_cond_var_timed_wait_recurse(uth_cond_var_t *cv,
+                                     uth_recurse_mutex_t *r_mtx,
+                                     const struct timespec *abs_timeout);
 void uth_cond_var_wait_recurse(uth_cond_var_t *cv, uth_recurse_mutex_t *r_mtx);
 void uth_cond_var_signal(uth_cond_var_t *cv);
 void uth_cond_var_broadcast(uth_cond_var_t *cv);
index dea1d65..7152340 100644 (file)
@@ -8,8 +8,50 @@
 #include <parlib/uthread.h>
 #include <sys/queue.h>
 #include <parlib/spinlock.h>
+#include <parlib/alarm.h>
 #include <malloc.h>
 
+struct timeout_blob {
+       bool                                            timed_out;
+       struct uthread                          *uth;
+       uth_sync_t                                      *sync_ptr;
+       struct spin_pdr_lock            *lock_ptr;
+};
+
+/* When sync primitives want to time out, they can use this alarm handler.  It
+ * needs a timeout_blob, which is independent of any particular sync method. */
+static void timeout_handler(struct alarm_waiter *waiter)
+{
+       struct timeout_blob *blob = (struct timeout_blob*)waiter->data;
+
+       spin_pdr_lock(blob->lock_ptr);
+       if (__uth_sync_get_uth(*blob->sync_ptr, blob->uth))
+               blob->timed_out = TRUE;
+       spin_pdr_unlock(blob->lock_ptr);
+       if (blob->timed_out)
+               uthread_runnable(blob->uth);
+}
+
+/* Minor helper, sets a blob's fields */
+static void set_timeout_blob(struct timeout_blob *blob, uth_sync_t *sync_ptr,
+                             struct spin_pdr_lock *lock_ptr)
+{
+       blob->timed_out = FALSE;
+       blob->uth = current_uthread;
+       blob->sync_ptr = sync_ptr;
+       blob->lock_ptr = lock_ptr;
+}
+
+/* Minor helper, sets an alarm for blob and a timespec */
+static void set_timeout_alarm(struct alarm_waiter *waiter,
+                              struct timeout_blob *blob,
+                              const struct timespec *abs_timeout)
+{
+       init_awaiter(waiter, timeout_handler);
+       waiter->data = blob;
+       set_awaiter_abs_unix(waiter, timespec_to_alarm_time(abs_timeout));
+       set_alarm(waiter);
+}
 
 /************** Semaphores and Mutexes **************/
 
@@ -69,8 +111,12 @@ static void __semaphore_cb(struct uthread *uth, void *arg)
        spin_pdr_unlock(&sem->lock);
 }
 
-void uth_semaphore_down(uth_semaphore_t *sem)
+bool uth_semaphore_timed_down(uth_semaphore_t *sem,
+                              const struct timespec *abs_timeout)
 {
+       struct alarm_waiter waiter[1];
+       struct timeout_blob blob[1];
+
        parlib_run_once(&sem->once_ctl, __uth_semaphore_init, sem);
        spin_pdr_lock(&sem->lock);
        if (sem->count > 0) {
@@ -79,12 +125,28 @@ void uth_semaphore_down(uth_semaphore_t *sem)
                 * our timeout function works for sems and CVs. */
                sem->count--;
                spin_pdr_unlock(&sem->lock);
-               return;
+               return TRUE;
+       }
+       if (abs_timeout) {
+               set_timeout_blob(blob, &sem->sync_obj, &sem->lock);
+               set_timeout_alarm(waiter, blob, abs_timeout);
        }
        /* the unlock and sync enqueuing is done in the yield callback.  as always,
         * we need to do this part in vcore context, since as soon as we unlock the
         * uthread could restart.  (atomically yield and unlock). */
        uthread_yield(TRUE, __semaphore_cb, sem);
+       if (abs_timeout) {
+               /* We're guaranteed the alarm will either be cancelled or the handler
+                * complete when unset_alarm() returns. */
+               unset_alarm(waiter);
+               return blob->timed_out ? FALSE : TRUE;
+       }
+       return TRUE;
+}
+
+void uth_semaphore_down(uth_semaphore_t *sem)
+{
+       uth_semaphore_timed_down(sem, NULL);
 }
 
 bool uth_semaphore_trydown(uth_semaphore_t *sem)
@@ -155,6 +217,12 @@ void uth_mutex_free(uth_mutex_t *mtx)
        uth_semaphore_free(mtx);
 }
 
+bool uth_mutex_timed_lock(uth_mutex_t *mtx, const struct timespec *abs_timeout)
+{
+       parlib_run_once(&mtx->once_ctl, __uth_mutex_init, mtx);
+       return uth_semaphore_timed_down(mtx, abs_timeout);
+}
+
 void uth_mutex_lock(uth_mutex_t *mtx)
 {
        parlib_run_once(&mtx->once_ctl, __uth_mutex_init, mtx);
@@ -213,7 +281,8 @@ void uth_recurse_mutex_free(uth_recurse_mutex_t *r_mtx)
        free(r_mtx);
 }
 
-void uth_recurse_mutex_lock(uth_recurse_mutex_t *r_mtx)
+bool uth_recurse_mutex_timed_lock(uth_recurse_mutex_t *r_mtx,
+                                  const struct timespec *abs_timeout)
 {
        parlib_run_once(&r_mtx->once_ctl, __uth_recurse_mutex_init, r_mtx);
        assert(!in_vcore_context());
@@ -228,11 +297,18 @@ void uth_recurse_mutex_lock(uth_recurse_mutex_t *r_mtx)
         * we'll have to disable notifs temporarily. */
        if (r_mtx->lockholder == current_uthread) {
                r_mtx->count++;
-               return;
+               return TRUE;
        }
-       uth_mutex_lock(&r_mtx->mtx);
+       if (!uth_mutex_timed_lock(&r_mtx->mtx, abs_timeout))
+               return FALSE;
        r_mtx->lockholder = current_uthread;
        r_mtx->count = 1;
+       return TRUE;
+}
+
+void uth_recurse_mutex_lock(uth_recurse_mutex_t *r_mtx)
+{
+       uth_recurse_mutex_timed_lock(r_mtx, NULL);
 }
 
 bool uth_recurse_mutex_trylock(uth_recurse_mutex_t *r_mtx)
@@ -387,34 +463,67 @@ static void __cv_wait_cb(struct uthread *uth, void *arg)
  *
  * Also note that we use the external API for the mutex operations.  A 2LS could
  * have their own mutex ops but still use the generic cv ops. */
-void uth_cond_var_wait(uth_cond_var_t *cv, uth_mutex_t *mtx)
+bool uth_cond_var_timed_wait(uth_cond_var_t *cv, uth_mutex_t *mtx,
+                             const struct timespec *abs_timeout)
 {
        struct uth_cv_link link;
+       struct alarm_waiter waiter[1];
+       struct timeout_blob blob[1];
+       bool ret = TRUE;
 
        parlib_run_once(&cv->once_ctl, __uth_cond_var_init, cv);
        link.cv = cv;
        link.mtx = mtx;
        spin_pdr_lock(&cv->lock);
+       if (abs_timeout) {
+               set_timeout_blob(blob, &cv->sync_obj, &cv->lock);
+               set_timeout_alarm(waiter, blob, abs_timeout);
+       }
        uthread_yield(TRUE, __cv_wait_cb, &link);
+       if (abs_timeout) {
+               unset_alarm(waiter);
+               ret = blob->timed_out ? FALSE : TRUE;
+       }
        uth_mutex_lock(mtx);
+       return ret;
 }
 
-/* GCC wants this function, though its semantics are a little unclear.  I
- * imagine you'd want to completely unlock it (say you locked it 3 times), and
- * when you get it back, that you have your three locks back. */
-void uth_cond_var_wait_recurse(uth_cond_var_t *cv, uth_recurse_mutex_t *r_mtx)
+void uth_cond_var_wait(uth_cond_var_t *cv, uth_mutex_t *mtx)
+{
+       uth_cond_var_timed_wait(cv, mtx, NULL);
+}
+
+/* GCC doesn't list this as one of the C++0x functions, but it's easy to do and
+ * implement uth_cond_var_wait_recurse() with it, just like for all the other
+ * 'timed' functions.
+ *
+ * Note the timeout applies to getting the signal on the CV, not on reacquiring
+ * the mutex. */
+bool uth_cond_var_timed_wait_recurse(uth_cond_var_t *cv,
+                                     uth_recurse_mutex_t *r_mtx,
+                                     const struct timespec *abs_timeout)
 {
        unsigned int old_count = r_mtx->count;
+       bool ret;
 
        /* In cond_wait, we're going to unlock the internal mutex.  We'll do the
         * prep-work for that now.  (invariant is that an unlocked r_mtx has no
         * lockholder and count == 0. */
        r_mtx->lockholder = NULL;
        r_mtx->count = 0;
-       uth_cond_var_wait(cv, &r_mtx->mtx);
+       ret = uth_cond_var_timed_wait(cv, &r_mtx->mtx, abs_timeout);
        /* Now we hold the internal mutex again.  Need to restore the tracking. */
        r_mtx->lockholder = current_uthread;
        r_mtx->count = old_count;
+       return ret;
+}
+
+/* GCC wants this function, though its semantics are a little unclear.  I
+ * imagine you'd want to completely unlock it (say you locked it 3 times), and
+ * when you get it back, that you have your three locks back. */
+void uth_cond_var_wait_recurse(uth_cond_var_t *cv, uth_recurse_mutex_t *r_mtx)
+{
+       uth_cond_var_timed_wait_recurse(cv, r_mtx, NULL);
 }
 
 void uth_cond_var_signal(uth_cond_var_t *cv)
index 6a159f8..822f703 100644 (file)
@@ -5,6 +5,7 @@
 #include <utest/utest.h>
 #include <parlib/uthread.h>
 #include <pthread.h>
+#include <time.h>
 
 TEST_SUITE("CV");
 
@@ -258,6 +259,84 @@ bool test_semaphore_static(void)
        return __test_semaphore(&static_semaphore, 5);
 }
 
+bool test_semaphore_timeout(void)
+{
+       static uth_semaphore_t sem = UTH_SEMAPHORE_INIT(1);
+       struct timespec timeout[1];
+       int ret;
+       bool got_it;
+
+       ret = clock_gettime(CLOCK_REALTIME, timeout);
+       UT_ASSERT(!ret);
+       timeout->tv_nsec += 500000000;
+       got_it = uth_semaphore_timed_down(&sem, timeout);
+       UT_ASSERT(got_it);
+
+       /* Second time we still hold the sem and would block and should time out. */
+       UT_ASSERT(sem.count == 0);
+       ret = clock_gettime(CLOCK_REALTIME, timeout);
+       UT_ASSERT(!ret);
+       timeout->tv_nsec += 500000000;
+       got_it = uth_semaphore_timed_down(&sem, timeout);
+       UT_ASSERT(!got_it);
+
+       return TRUE;
+}
+
+bool test_cv_timeout(void)
+{
+       static uth_mutex_t mtx = UTH_MUTEX_INIT;
+       static uth_cond_var_t cv = UTH_COND_VAR_INIT;
+       struct timespec timeout[1];
+       int ret;
+       bool was_signalled;
+
+       uth_mutex_lock(&mtx);
+       ret = clock_gettime(CLOCK_REALTIME, timeout);
+       UT_ASSERT(!ret);
+       timeout->tv_nsec += 500000000;
+       was_signalled = uth_cond_var_timed_wait(&cv, &mtx, timeout);
+       UT_ASSERT(!was_signalled);
+       UT_ASSERT(mtx.count == 0);      /* semaphore's count variable */
+       uth_mutex_unlock(&mtx);
+       UT_ASSERT(mtx.count == 1);
+
+       return TRUE;
+}
+
+bool test_cv_recurse_timeout(void)
+{
+       static uth_recurse_mutex_t r_mtx = UTH_RECURSE_MUTEX_INIT;
+       static uth_cond_var_t cv = UTH_COND_VAR_INIT;
+       struct timespec timeout[1];
+       int ret;
+       bool was_signalled;
+
+       /* Get three-deep locks, make sure the bookkeeping is right */
+       uth_recurse_mutex_lock(&r_mtx);
+       uth_recurse_mutex_lock(&r_mtx);
+       uth_recurse_mutex_lock(&r_mtx);
+       UT_ASSERT(r_mtx.count == 3);
+       UT_ASSERT(r_mtx.mtx.count == 0);
+
+       ret = clock_gettime(CLOCK_REALTIME, timeout);
+       UT_ASSERT(!ret);
+       timeout->tv_nsec += 500000000;
+       was_signalled = uth_cond_var_timed_wait_recurse(&cv, &r_mtx, timeout);
+       UT_ASSERT(!was_signalled);
+       UT_ASSERT(r_mtx.count == 3);
+       UT_ASSERT(r_mtx.mtx.count == 0);
+
+       /* Unlock our three locks, then make sure the semaphore/mtx is unlocked. */
+       uth_recurse_mutex_unlock(&r_mtx);
+       uth_recurse_mutex_unlock(&r_mtx);
+       uth_recurse_mutex_unlock(&r_mtx);
+       UT_ASSERT(r_mtx.count == 0);
+       UT_ASSERT(r_mtx.mtx.count == 1);
+
+       return TRUE;
+}
+
 /* <--- End definition of test cases ---> */
 
 struct utest utests[] = {
@@ -268,6 +347,9 @@ struct utest utests[] = {
        UTEST_REG(recurse_static),
        UTEST_REG(semaphore),
        UTEST_REG(semaphore_static),
+       UTEST_REG(semaphore_timeout),
+       UTEST_REG(cv_timeout),
+       UTEST_REG(cv_recurse_timeout),
 };
 int num_utests = sizeof(utests) / sizeof(struct utest);