parlib: Support mutex-less condition variables
authorBarret Rhoden <brho@cs.berkeley.edu>
Mon, 13 Aug 2018 22:26:19 +0000 (18:26 -0400)
committerBarret Rhoden <brho@cs.berkeley.edu>
Mon, 13 Aug 2018 22:26:19 +0000 (18:26 -0400)
If we want to signal or broadcast a condition variable, typically we also
want to have grabbed (and possibly released) the mutex that was protecting
whatever state the sleeper cares about.  See my notes in mutex.c for an
example.

Event handlers and other vcore context code cannot grab mutexes, however
they can grab spinlocks.  "Mutex-less" CVs use the CV's spinlock for
synchronization.  These locks can be grabbed in any context, and thus we
can kick the CVs from vcore context.

This is the same solution I came up with for the kernel's CVs, and probably
for the same reasons.  Both deal with interrupt context that can't block,
and both interact with threads that do block.

Whether or not mutex-less CVs are the best way to deal with the issue, they
work.  By comparison, the POSIX pthread_cond_var functions are not even
signal safe, let alone the mutexes that you'll often want to grab.

Signed-off-by: Barret Rhoden <brho@cs.berkeley.edu>
user/parlib/include/parlib/uthread.h
user/parlib/mutex.c
user/utest/cv.c

index f1df807..01a4170 100644 (file)
@@ -321,6 +321,14 @@ void uth_cond_var_wait_recurse(uth_cond_var_t *cv, uth_recurse_mutex_t *r_mtx);
 void uth_cond_var_signal(uth_cond_var_t *cv);
 void uth_cond_var_broadcast(uth_cond_var_t *cv);
 
+/* Mutex-less cond vars: Uses the CV's spinlock for synchronization */
+void uth_cond_var_lock(uth_cond_var_t *cv);
+void uth_cond_var_unlock(uth_cond_var_t *cv);
+void __uth_cond_var_signal_and_unlock(uth_cond_var_t *cv);
+void __uth_cond_var_broadcast_and_unlock(uth_cond_var_t *cv);
+struct uthread *__uth_cond_var_wake_one(uth_cond_var_t *cv);
+bool __uth_cond_var_wake_all(uth_cond_var_t *cv, uth_sync_t *restartees);
+
 void uth_rwlock_init(uth_rwlock_t *rwl);
 void uth_rwlock_destroy(uth_rwlock_t *rwl);
 uth_rwlock_t *uth_rwlock_alloc(void);
index c391c03..95dc7aa 100644 (file)
@@ -410,7 +410,8 @@ static void __cv_wait_cb(struct uthread *uth, void *arg)
         * sleep! (see below).  If that happens, the uthread is no longer sleeping
         * on the CV, and the sync_next is free.  The invariant is that a uthread
         * can only sleep on one sync_object at a time. */
-       uth_mutex_unlock(mtx);
+       if (mtx)
+               uth_mutex_unlock(mtx);
 }
 
 /* Caller holds mtx.  We will 'atomically' release it and wait.  On return,
@@ -455,6 +456,12 @@ static void __cv_wait_cb(struct uthread *uth, void *arg)
  *
  * The uthread CVs take a mutex, unlike the kernel CVs, to map more cleanly to
  * POSIX CVs.  Maybe one approach or the other is a bad idea; we'll see.
+ * However, we need both approaces in userspace.  To that end, we also support
+ * mutex-less CVs, where the synchronization typically provided by the mutex is
+ * provided by the CV's spinlock.  Just pass NULL for the mutex.  This is
+ * primarily useful for CVs that are signalled from event handlers in vcore
+ * context, since that code cannot block on a mutex and thus cannot use the
+ * mutex to avoid the races mentioned above.
  *
  * As far as lock ordering goes, once the sleeper holds the mutex and is on the
  * CV's list, it can unlock in any order it wants.  However, unlocking a mutex
@@ -474,11 +481,15 @@ bool uth_cond_var_timed_wait(uth_cond_var_t *cv, uth_mutex_t *mtx,
        struct timeout_blob blob[1];
        bool ret = TRUE;
 
-       assert_can_block();
+       /* We're holding the CV PDR lock, so we lose the ability to detect blocking
+        * violations. */
+       if (mtx)
+               assert_can_block();
        parlib_run_once(&cv->once_ctl, __uth_cond_var_init, cv);
        link.cv = cv;
        link.mtx = mtx;
-       spin_pdr_lock(&cv->lock);
+       if (mtx)
+               spin_pdr_lock(&cv->lock);
        if (abs_timeout) {
                set_timeout_blob(blob, &cv->sync_obj, &cv->lock);
                set_timeout_alarm(waiter, blob, abs_timeout);
@@ -488,7 +499,10 @@ bool uth_cond_var_timed_wait(uth_cond_var_t *cv, uth_mutex_t *mtx,
                unset_alarm(waiter);
                ret = blob->timed_out ? FALSE : TRUE;
        }
-       uth_mutex_lock(mtx);
+       if (mtx)
+               uth_mutex_lock(mtx);
+       else
+               spin_pdr_lock(&cv->lock);
        return ret;
 }
 
@@ -530,34 +544,77 @@ void uth_cond_var_wait_recurse(uth_cond_var_t *cv, uth_recurse_mutex_t *r_mtx)
        uth_cond_var_timed_wait_recurse(cv, r_mtx, NULL);
 }
 
-void uth_cond_var_signal(uth_cond_var_t *cv)
+/* Caller holds the CV lock.  Returns a uth that needs to be woken up (or NULL),
+ * which the caller needs to do with uthread_runnable(). */
+struct uthread *__uth_cond_var_wake_one(uth_cond_var_t *cv)
 {
-       struct uthread *uth;
+       return __uth_sync_get_next(&cv->sync_obj);
+}
+
+/* Caller holds the CV lock. */
+void __uth_cond_var_signal_and_unlock(uth_cond_var_t *cv)
+{
+       struct uthread *uth = __uth_cond_var_wake_one(cv);
 
-       parlib_run_once(&cv->once_ctl, __uth_cond_var_init, cv);
-       spin_pdr_lock(&cv->lock);
-       uth = __uth_sync_get_next(&cv->sync_obj);
        spin_pdr_unlock(&cv->lock);
        if (uth)
                uthread_runnable(uth);
 }
 
-void uth_cond_var_broadcast(uth_cond_var_t *cv)
+void uth_cond_var_signal(uth_cond_var_t *cv)
+{
+       parlib_run_once(&cv->once_ctl, __uth_cond_var_init, cv);
+
+       spin_pdr_lock(&cv->lock);
+       __uth_cond_var_signal_and_unlock(cv);
+}
+
+/* Caller holds the CV lock.  Returns true if the restartees need to be woken
+ * up, which the caller needs to do with __uth_sync_wake_all(). */
+bool __uth_cond_var_wake_all(uth_cond_var_t *cv, uth_sync_t *restartees)
+{
+       if (__uth_sync_is_empty(&cv->sync_obj))
+               return false;
+       __uth_sync_init(restartees);
+       __uth_sync_swap(restartees, &cv->sync_obj);
+       return true;
+}
+
+/* Caller holds the CV lock. */
+void __uth_cond_var_broadcast_and_unlock(uth_cond_var_t *cv)
 {
        uth_sync_t restartees;
+       bool wake;
 
+       wake = __uth_cond_var_wake_all(cv, &restartees);
+       spin_pdr_unlock(&cv->lock);
+       if (wake)
+               __uth_sync_wake_all(&restartees);
+}
+
+void uth_cond_var_broadcast(uth_cond_var_t *cv)
+{
        parlib_run_once(&cv->once_ctl, __uth_cond_var_init, cv);
+
        spin_pdr_lock(&cv->lock);
-       if (__uth_sync_is_empty(&cv->sync_obj)) {
-               spin_pdr_unlock(&cv->lock);
-               return;
-       }
-       __uth_sync_init(&restartees);
-       __uth_sync_swap(&restartees, &cv->sync_obj);
-       spin_pdr_unlock(&cv->lock);
-       __uth_sync_wake_all(&restartees);
+       __uth_cond_var_broadcast_and_unlock(cv);
 }
 
+/* Similar to the kernel, we can grab the CV's spinlock directly and use that
+ * for synchronization.  This is primarily so we can signal/broadcast from vcore
+ * context, and you typically need to hold some lock when changing state before
+ * signalling. */
+void uth_cond_var_lock(uth_cond_var_t *cv)
+{
+       parlib_run_once(&cv->once_ctl, __uth_cond_var_init, cv);
+
+       spin_pdr_lock(&cv->lock);
+}
+
+void uth_cond_var_unlock(uth_cond_var_t *cv)
+{
+       spin_pdr_unlock(&cv->lock);
+}
 
 /************** Reader-writer Sleeping Locks **************/
 
index f7fb3d7..5a56eb9 100644 (file)
@@ -150,6 +150,106 @@ bool test_broadcast(void)
        return TRUE;
 }
 
+static void *__cv_signaller_no_mutex(void *arg)
+{
+       struct common_args *args = (struct common_args*)arg;
+
+       uthread_usleep(args->sig_sleep);
+       uth_cond_var_lock(args->cv);
+       args->flag = TRUE;
+       __uth_cond_var_signal_and_unlock(args->cv);
+       return PTH_TEST_TRUE;
+}
+
+static void *__cv_broadcaster_no_mutex(void *arg)
+{
+       struct common_args *args = (struct common_args*)arg;
+
+       uthread_usleep(args->sig_sleep);
+       uth_cond_var_lock(args->cv);
+       args->flag = TRUE;
+       __uth_cond_var_broadcast_and_unlock(args->cv);
+       return PTH_TEST_TRUE;
+}
+
+static void *__cv_waiter_no_mutex(void *arg)
+{
+       struct common_args *args = (struct common_args*)arg;
+
+       uthread_usleep(args->wait_sleep);
+       uth_cond_var_lock(args->cv);
+       while (!args->flag)
+               uth_cond_var_wait(args->cv, NULL);
+       UT_ASSERT(args->flag);
+       uth_cond_var_unlock(args->cv);
+       return PTH_TEST_TRUE;
+}
+
+bool test_signal_no_mutex(void)
+{
+       struct common_args local_a, *args = &local_a;
+       pthread_t signaller, waiter;
+       void *sig_join, *wait_join;
+       int ret;
+       static uth_cond_var_t static_cv = UTH_COND_VAR_INIT;
+
+       args->cv = &static_cv;
+       args->mtx = NULL;
+
+       for (int i = 0; i < 1000; i += 10) {
+               args->flag = FALSE;
+               args->wait_sleep = i;
+               args->sig_sleep = 1000 - i;
+
+               ret = pthread_create(&waiter, 0, __cv_waiter_no_mutex, args);
+               UT_ASSERT(!ret);
+               ret = pthread_create(&signaller, 0, __cv_signaller_no_mutex, args);
+               UT_ASSERT(!ret);
+               ret = pthread_join(waiter, &wait_join);
+               UT_ASSERT(!ret);
+               ret = pthread_join(signaller, &sig_join);
+               UT_ASSERT(!ret);
+               UT_ASSERT_M("Waiter Failed", wait_join == PTH_TEST_TRUE);
+               UT_ASSERT_M("Signaller Failed", sig_join == PTH_TEST_TRUE);
+       }
+
+       return TRUE;
+}
+
+bool test_broadcast_no_mutex(void)
+{
+       #define NR_WAITERS 20
+       struct common_args local_a, *args = &local_a;
+       pthread_t bcaster, waiters[NR_WAITERS];
+       void *bcast_join, *wait_joins[NR_WAITERS];
+       int ret;
+
+       args->cv = uth_cond_var_alloc();
+       args->mtx = NULL;
+       args->flag = FALSE;
+       args->wait_sleep = 0;
+       args->sig_sleep = 1000;
+
+       for (int i = 0; i < NR_WAITERS; i++) {
+               ret = pthread_create(&waiters[i], 0, __cv_waiter_no_mutex, args);
+               UT_ASSERT(!ret);
+       }
+       ret = pthread_create(&bcaster, 0, __cv_broadcaster_no_mutex, args);
+       UT_ASSERT(!ret);
+
+       ret = pthread_join(bcaster, &bcast_join);
+       UT_ASSERT(!ret);
+       UT_ASSERT_M("Broadcaster Failed", bcast_join == PTH_TEST_TRUE);
+       for (int i = 0; i < NR_WAITERS; i++) {
+               ret = pthread_join(waiters[i], &wait_joins[i]);
+               UT_ASSERT(!ret);
+               UT_ASSERT_M("Waiter Failed", wait_joins[i] == PTH_TEST_TRUE);
+       }
+
+       uth_cond_var_free(args->cv);
+       return TRUE;
+}
+
 static bool __test_recurse(struct uth_recurse_mutex *r_mtx)
 {
        bool test;
@@ -337,6 +437,24 @@ bool test_cv_recurse_timeout(void)
        return TRUE;
 }
 
+bool test_cv_timeout_no_mutex(void)
+{
+       static uth_cond_var_t cv = UTH_COND_VAR_INIT;
+       struct timespec timeout[1];
+       int ret;
+       bool was_signalled;
+
+       uth_cond_var_lock(&cv);
+       ret = clock_gettime(CLOCK_REALTIME, timeout);
+       UT_ASSERT(!ret);
+       timeout->tv_nsec += 500000000;
+       was_signalled = uth_cond_var_timed_wait(&cv, NULL, timeout);
+       UT_ASSERT(!was_signalled);
+       uth_cond_var_unlock(&cv);
+
+       return TRUE;
+}
+
 static uth_rwlock_t rwl = UTH_RWLOCK_INIT;
 static int rw_value;
 
@@ -398,12 +516,15 @@ struct utest utests[] = {
        UTEST_REG(signal_no_wait),
        UTEST_REG(signal),
        UTEST_REG(broadcast),
+       UTEST_REG(signal_no_mutex),
+       UTEST_REG(broadcast_no_mutex),
        UTEST_REG(recurse),
        UTEST_REG(recurse_static),
        UTEST_REG(semaphore),
        UTEST_REG(semaphore_static),
        UTEST_REG(semaphore_timeout),
        UTEST_REG(cv_timeout),
+       UTEST_REG(cv_timeout_no_mutex),
        UTEST_REG(cv_recurse_timeout),
        UTEST_REG(rwlock),
 };