Add generic uthread condition variables
authorBarret Rhoden <brho@cs.berkeley.edu>
Wed, 11 May 2016 22:48:48 +0000 (18:48 -0400)
committerBarret Rhoden <brho@cs.berkeley.edu>
Fri, 13 May 2016 15:08:12 +0000 (11:08 -0400)
Like with mutexes, apps will want a 2LS-independent way to perform
synchronization.  Although a 2LS can be closely coupled with an
application, it's conceivable that some apps and libraries don't care
which 2LS they link with.

Further, some 2LSs probably don't care about how exactly their mutexes
and CVs work.  In that case, having the generic implementation saves
the 2LS writers from the hassle of rolling their own CVs.

In the future, we might change things up a bit.  When a 2LS wants to
control its mutex and CV implementations, it's probably so that they can
pick which thread runs from the queue.  In that case, perhaps we only
call out to the 2LS for the structure and the decision.

For instance, the generic uthread implementation uses a TAILQ and a FIFO
policy.  The 2LS could have a different structure and a different
policy, where the operations are "here's a Sync object (cv/mtx) and a
uth, block on it" and "wake on from this sync object" and "wake all."
The sync object would likely be embedded in a larger object, which means
it is allocated by the 2LS (so there are two more ops).  There are a few
other details; we'll hold off until we actually have a use case.

Signed-off-by: Barret Rhoden <brho@cs.berkeley.edu>
user/parlib/include/parlib/uthread.h
user/parlib/mutex.c

index cb1519a..f0b2b07 100644 (file)
@@ -39,7 +39,12 @@ struct uthread {
        char err_str[MAX_ERRSTR_LEN];
 };
 extern __thread struct uthread *current_uthread;
-typedef void* uth_mutex_t;
+
+/* These structs are undefined.  We use them instead of void * so we can get
+ * compiler warnings if someone passes the wrong pointer type.  Internally, we
+ * use another struct type for mtx and cvs. */
+typedef struct __uth_mtx_opaque * uth_mutex_t;
+typedef struct __uth_cv_opaque * uth_cond_var_t;
 
 /* 2L-Scheduler operations.  Examples in pthread.c. */
 struct schedule_ops {
@@ -52,11 +57,16 @@ struct schedule_ops {
        void (*thread_has_blocked)(struct uthread *, int);
        void (*thread_refl_fault)(struct uthread *, struct user_context *);
        /**** Defining these functions is optional. ****/
-       /* 2LSs can leave the mutex funcs empty for a default implementation */
+       /* 2LSs can leave the mutex/cv funcs empty for a default implementation */
        uth_mutex_t (*mutex_alloc)(void);
        void (*mutex_free)(uth_mutex_t);
        void (*mutex_lock)(uth_mutex_t);
        void (*mutex_unlock)(uth_mutex_t);
+       uth_cond_var_t (*cond_var_alloc)(void);
+       void (*cond_var_free)(uth_cond_var_t);
+       void (*cond_var_wait)(uth_cond_var_t, uth_mutex_t);
+       void (*cond_var_signal)(uth_cond_var_t);
+       void (*cond_var_broadcast)(uth_cond_var_t);
        /* Functions event handling wants */
        void (*preempt_pending)(void);
 };
@@ -136,4 +146,14 @@ void uth_mutex_free(uth_mutex_t m);
 void uth_mutex_lock(uth_mutex_t m);
 void uth_mutex_unlock(uth_mutex_t m);
 
+/* Generic Uthread Condition Variables.  2LSs can implement their own methods.
+ * Callers to cv_wait must hold the mutex, which it will atomically wait and
+ * unlock, then relock when it returns.  Callers to signal and broadcast may
+ * hold the mutex, if they choose. */
+uth_cond_var_t uth_cond_var_alloc(void);
+void uth_cond_var_free(uth_cond_var_t cv);
+void uth_cond_var_wait(uth_cond_var_t cv, uth_mutex_t m);
+void uth_cond_var_signal(uth_cond_var_t cv);
+void uth_cond_var_broadcast(uth_cond_var_t cv);
+
 __END_DECLS
index 7902c61..0e4e7dd 100644 (file)
@@ -2,27 +2,47 @@
  * Barret Rhoden <brho@cs.berkeley.edu>
  * See LICENSE for details. */
 
-/* Generic Uthread Mutexes.  2LSs implement their own methods, but we need a
- * 2LS-independent interface and default implementation. */
+/* Generic Uthread Mutexes and CVs.  2LSs implement their own methods, but we
+ * need a 2LS-independent interface and default implementation. */
 
 #include <parlib/uthread.h>
 #include <sys/queue.h>
 #include <parlib/spinlock.h>
 #include <malloc.h>
 
+/* The linkage structs are for the yield callbacks */
 struct uth_default_mtx;
 struct uth_mtx_link {
        TAILQ_ENTRY(uth_mtx_link)       next;
        struct uth_default_mtx          *mtx;
        struct uthread                          *uth;
 };
+TAILQ_HEAD(mtx_link_tq, uth_mtx_link);
 
 struct uth_default_mtx {
        struct spin_pdr_lock            lock;
-       TAILQ_HEAD(t, uth_mtx_link)     waiters;
+       struct mtx_link_tq                      waiters;
        bool                                            locked;
 };
 
+struct uth_default_cv;
+struct uth_cv_link {
+       TAILQ_ENTRY(uth_cv_link)        next;
+       struct uth_default_cv           *cv;
+       struct uth_default_mtx          *mtx;
+       struct uthread                          *uth;
+};
+TAILQ_HEAD(cv_link_tq, uth_cv_link);
+
+struct uth_default_cv {
+       struct spin_pdr_lock            lock;
+       struct cv_link_tq                       waiters;
+};
+
+
+/************** Default Mutex Implementation **************/
+
+
 static struct uth_default_mtx *uth_default_mtx_alloc(void)
 {
        struct uth_default_mtx *mtx;
@@ -90,6 +110,10 @@ static void uth_default_mtx_unlock(struct uth_default_mtx *mtx)
                uthread_runnable(first->uth);
 }
 
+
+/************** Wrappers for the uthread mutex interface **************/
+
+
 uth_mutex_t uth_mutex_alloc(void)
 {
        if (sched_ops->mutex_alloc)
@@ -123,3 +147,190 @@ void uth_mutex_unlock(uth_mutex_t m)
        }
        uth_default_mtx_unlock((struct uth_default_mtx*)m);
 }
+
+
+/************** Default Condition Variable Implementation **************/
+
+
+static struct uth_default_cv *uth_default_cv_alloc(void)
+{
+       struct uth_default_cv *cv;
+
+       cv = malloc(sizeof(struct uth_default_cv));
+       assert(cv);
+       spin_pdr_init(&cv->lock);
+       TAILQ_INIT(&cv->waiters);
+       return cv;
+}
+
+static void uth_default_cv_free(struct uth_default_cv *cv)
+{
+       assert(TAILQ_EMPTY(&cv->waiters));
+       free(cv);
+}
+
+static void __cv_wait_cb(struct uthread *uth, void *arg)
+{
+       struct uth_cv_link *link = (struct uth_cv_link*)arg;
+       struct uth_default_cv *cv = link->cv;
+       struct uth_default_mtx *mtx = link->mtx;
+
+       /* We need to tell the 2LS that its thread blocked.  We need to do this
+        * before unlocking the cv, since as soon as we unlock, the cv could be
+        * signalled and our thread restarted.
+        *
+        * Also note the lock-ordering rule.  The cv lock is grabbed before any
+        * locks the 2LS might grab. */
+       uthread_has_blocked(uth, UTH_EXT_BLK_MUTEX);
+       spin_pdr_unlock(&cv->lock);
+       uth_mutex_unlock((uth_mutex_t)mtx);
+}
+
+/* Caller holds mtx.  We will 'atomically' release it and wait.  On return,
+ * caller holds mtx again.  Once our uth is on the CV's list, we can release the
+ * mtx without fear of missing a signal.
+ *
+ * POSIX refers to atomicity in this context as "atomically with respect to
+ * access by another thread to the mutex and then the condition variable"
+ *
+ * The idea is that we hold the mutex to protect some invariant; we check it,
+ * and decide to sleep.  Now we get on the list before releasing so that any
+ * changes to that invariant (e.g. a flag is now TRUE) happen after we're on the
+ * list, and so that we don't miss the signal.  To be more clear, the invariant
+ * in a basic wake-up flag scenario is: "whenever a flag is set from FALSE to
+ * TRUE, all waiters that saw FALSE are on the CV's waitqueue."  The mutex is
+ * required for this invariant.
+ *
+ * Note that signal/broadcasters do not *need* to hold the mutex, in general,
+ * but they do in the basic wake-up flag scenario.  If not, the race is this:
+ *
+ * Sleeper:                                                            Waker:
+ * -----------------------------------------------------------------
+ * Hold mutex
+ *   See flag is False
+ *   Decide to sleep
+ *                                                                             Set flag True
+ * PAUSE!                                                              Grab CV lock
+ *                                                                             See list is empty, unlock
+ *
+ *   Grab CV lock
+ *     Get put on list
+ *   Unlock CV lock
+ * Unlock mutex
+ * (Never wake up; we missed the signal)
+ *
+ * For those familiar with the kernel's CVs, we don't couple mutexes with CVs.
+ * cv_lock() actually grabs the spinlock inside the CV and uses *that* to
+ * protect the invariant.  The signallers always grab that lock, so the sleeper
+ * is not in danger of missing the signal.  The tradeoff is that the kernel CVs
+ * use a spinlock instead of a mutex for protecting its invariant; there might
+ * be some case that preferred blocking sync.
+ *
+ * The uthread CVs take a mutex, unlike the kernel CVs, to map more cleanly to
+ * POSIX CVs.  Maybe one approach or the other is a bad idea; we'll see.
+ *
+ * As far as lock ordering goes, once the sleeper holds the mutex and is on the
+ * CV's list, it can unlock in any order it wants.  However, unlocking a mutex
+ * actually requires grabbing its spinlock.  So as to not have a lock ordering
+ * between *spinlocks*, we let go of the CV's spinlock before unlocking the
+ * mutex.  There is an ordering between the mutex and the CV spinlock (mutex->cv
+ * spin), but there is no ordering between the mutex spin and cv spin.  And of
+ * course, we need to unlock the CV spinlock in the yield callback.
+ *
+ * Also note that we use the external API for the mutex operations.  A 2LS could
+ * have their own mutex ops but still use the generic cv ops. */
+static void uth_default_cv_wait(struct uth_default_cv *cv,
+                                struct uth_default_mtx *mtx)
+{
+       struct uth_cv_link link;
+
+       link.cv = cv;
+       link.mtx = mtx;
+       link.uth = current_uthread;
+       spin_pdr_lock(&cv->lock);
+       TAILQ_INSERT_TAIL(&cv->waiters, &link, next);
+       uthread_yield(TRUE, __cv_wait_cb, &link);
+       uth_mutex_lock((uth_mutex_t)mtx);
+}
+
+static void uth_default_cv_signal(struct uth_default_cv *cv)
+{
+       struct uth_cv_link *first;
+
+       spin_pdr_lock(&cv->lock);
+       first = TAILQ_FIRST(&cv->waiters);
+       if (first)
+               TAILQ_REMOVE(&cv->waiters, first, next);
+       spin_pdr_unlock(&cv->lock);
+       if (first)
+               uthread_runnable(first->uth);
+}
+
+static void swap_cv_lists(struct cv_link_tq *a, struct cv_link_tq *b)
+{
+       struct cv_link_tq temp;
+
+       temp = *a;
+       *a = *b;
+       *b = temp;
+}
+
+static void uth_default_cv_broadcast(struct uth_default_cv *cv)
+{
+       struct cv_link_tq restartees = TAILQ_HEAD_INITIALIZER(restartees);
+       struct uth_cv_link *i, *safe;
+
+       spin_pdr_lock(&cv->lock);
+       swap_cv_lists(&cv->waiters, &restartees);
+       spin_pdr_unlock(&cv->lock);
+       /* Need the SAFE, since we can't touch the linkage once the uth could run */
+       TAILQ_FOREACH_SAFE(i, &restartees, next, safe)
+               uthread_runnable(i->uth);
+}
+
+
+/************** Wrappers for the uthread CV interface **************/
+
+
+uth_cond_var_t uth_cond_var_alloc(void)
+{
+       if (sched_ops->cond_var_alloc)
+               return sched_ops->cond_var_alloc();
+       return (uth_cond_var_t)uth_default_cv_alloc();
+}
+
+void uth_cond_var_free(uth_cond_var_t cv)
+{
+       if (sched_ops->cond_var_free) {
+               sched_ops->cond_var_free(cv);
+               return;
+       }
+       uth_default_cv_free((struct uth_default_cv*)cv);
+}
+
+void uth_cond_var_wait(uth_cond_var_t cv, uth_mutex_t m)
+{
+       if (sched_ops->cond_var_wait) {
+               sched_ops->cond_var_wait(cv, m);
+               return;
+       }
+       uth_default_cv_wait((struct uth_default_cv*)cv, (struct uth_default_mtx*)m);
+}
+
+void uth_cond_var_signal(uth_cond_var_t cv)
+{
+       if (sched_ops->cond_var_signal) {
+               sched_ops->cond_var_signal(cv);
+               return;
+       }
+       uth_default_cv_signal((struct uth_default_cv*)cv);
+}
+
+void uth_cond_var_broadcast(uth_cond_var_t cv)
+{
+       if (sched_ops->cond_var_broadcast) {
+               sched_ops->cond_var_broadcast(cv);
+               return;
+       }
+       uth_default_cv_broadcast((struct uth_default_cv*)cv);
+}