parlib: Implement uthread mutexes with semaphores
authorBarret Rhoden <brho@cs.berkeley.edu>
Thu, 6 Apr 2017 16:43:13 +0000 (12:43 -0400)
committerBarret Rhoden <brho@cs.berkeley.edu>
Wed, 3 May 2017 16:13:02 +0000 (12:13 -0400)
Mutexes are now wrappers around semaphores of value 1.  For the most part,
this commit moves the old mutex logic into semaphore functions (renaming
mtx->sem), and then changes the locking logic from 'locked' to 'count'.
Note the mutexes and semaphores use different once_ctl functions, and the
semaphore static initializer sets count directly.

I probably should have done this from the start.  There's a good chance
we'll implement user/pthread/semaphore.h with uthread semaphores, in an
effort to not have a dozen different synchronization functions.  Though
that one might end up being a kernel #device if we need inter-process
semaphores.  (All the uthread sync is intra-process.)

Signed-off-by: Barret Rhoden <brho@cs.berkeley.edu>
user/parlib/include/parlib/uthread.h
user/parlib/mutex.c
user/utest/cv.c

index 723bce1..6607b5f 100644 (file)
@@ -168,16 +168,18 @@ static inline struct user_context *get_cur_uth_ctx(void)
 
 /* Uthread Mutexes / CVs / etc. */
 
 
 /* Uthread Mutexes / CVs / etc. */
 
-typedef struct uth_mutex uth_mutex_t;
+typedef struct uth_semaphore uth_semaphore_t;
+typedef struct uth_semaphore uth_mutex_t;
 typedef struct uth_recurse_mutex uth_recurse_mutex_t;
 typedef struct uth_cond_var uth_cond_var_t;
 
 typedef struct uth_recurse_mutex uth_recurse_mutex_t;
 typedef struct uth_cond_var uth_cond_var_t;
 
-struct uth_mutex {
+struct uth_semaphore {
        struct spin_pdr_lock            lock;
        uth_sync_t                                      sync_obj;
        struct spin_pdr_lock            lock;
        uth_sync_t                                      sync_obj;
-       bool                                            locked;
+       unsigned int                            count;
        parlib_once_t                           once_ctl;
 };
        parlib_once_t                           once_ctl;
 };
+#define UTH_SEMAPHORE_INIT(n) { .once_ctl = PARLIB_ONCE_INIT, .count = (n) }
 #define UTH_MUTEX_INIT { .once_ctl = PARLIB_ONCE_INIT }
 
 struct uth_recurse_mutex {
 #define UTH_MUTEX_INIT { .once_ctl = PARLIB_ONCE_INIT }
 
 struct uth_recurse_mutex {
@@ -195,6 +197,12 @@ struct uth_cond_var {
 };
 #define UTH_COND_VAR_INIT { .once_ctl = PARLIB_ONCE_INIT }
 
 };
 #define UTH_COND_VAR_INIT { .once_ctl = PARLIB_ONCE_INIT }
 
+uth_semaphore_t *uth_semaphore_alloc(unsigned int count);
+void uth_semaphore_free(uth_semaphore_t *sem);
+void uth_semaphore_down(uth_semaphore_t *sem);
+bool uth_semaphore_trydown(uth_semaphore_t *sem);
+void uth_semaphore_up(uth_semaphore_t *sem);
+
 uth_mutex_t *uth_mutex_alloc(void);
 void uth_mutex_free(uth_mutex_t *m);
 void uth_mutex_lock(uth_mutex_t *m);
 uth_mutex_t *uth_mutex_alloc(void);
 void uth_mutex_free(uth_mutex_t *m);
 void uth_mutex_lock(uth_mutex_t *m);
index a383ab3..221221a 100644 (file)
@@ -2,8 +2,8 @@
  * Barret Rhoden <brho@cs.berkeley.edu>
  * See LICENSE for details. */
 
  * Barret Rhoden <brho@cs.berkeley.edu>
  * See LICENSE for details. */
 
-/* Generic Uthread Mutexes and CVs.  2LSs implement their own methods, but we
- * need a 2LS-independent interface and default implementation. */
+/* Generic Uthread Semaphores, Mutexes, CVs, and other synchronization
+ * functions.  2LSs implement their own sync objects (bottom of the file). */
 
 #include <parlib/uthread.h>
 #include <sys/queue.h>
 
 #include <parlib/uthread.h>
 #include <sys/queue.h>
 #include <malloc.h>
 
 
 #include <malloc.h>
 
 
-/************** Mutexes **************/
+/************** Semaphores and Mutexes **************/
 
 
-/* Takes a void * since it's called by parlib_run_once(), which enables us to
- * statically initialize the mutex.  This init does everything not done by the
- * static initializer.  Note we do not allow 'static' destruction.  (No one
- * calls free). */
-static void __uth_mutex_init(void *arg)
+static void __uth_semaphore_init(void *arg)
 {
 {
-       struct uth_mutex *mtx = (struct uth_mutex*)arg;
+       struct uth_semaphore *sem = (struct uth_semaphore*)arg;
 
 
-       spin_pdr_init(&mtx->lock);
-       mtx->sync_obj = __uth_sync_alloc();
-       mtx->locked = FALSE;
+       spin_pdr_init(&sem->lock);
+       sem->sync_obj = __uth_sync_alloc();
+       /* If we used a static initializer for a semaphore, count is already set.
+        * o/w it will be set by _alloc(). */
 }
 
 }
 
-uth_mutex_t *uth_mutex_alloc(void)
+uth_semaphore_t *uth_semaphore_alloc(unsigned int count)
 {
 {
-       struct uth_mutex *mtx;
+       struct uth_semaphore *sem;
 
 
-       mtx = malloc(sizeof(struct uth_mutex));
-       assert(mtx);
-       __uth_mutex_init(mtx);
+       sem = malloc(sizeof(struct uth_semaphore));
+       assert(sem);
+       __uth_semaphore_init(sem);
+       sem->count = count;
        /* The once is to make sure the object is initialized. */
        /* The once is to make sure the object is initialized. */
-       parlib_set_ran_once(&mtx->once_ctl);
-       return mtx;
+       parlib_set_ran_once(&sem->once_ctl);
+       return sem;
 }
 
 }
 
-void uth_mutex_free(uth_mutex_t *mtx)
+void uth_semaphore_free(uth_semaphore_t *sem)
 {
        /* Keep this in sync with uth_recurse_mutex_free(). */
 {
        /* Keep this in sync with uth_recurse_mutex_free(). */
-       __uth_sync_free(mtx->sync_obj);
-       free(mtx);
+       __uth_sync_free(sem->sync_obj);
+       free(sem);
 }
 
 }
 
-static void __mutex_cb(struct uthread *uth, void *arg)
+static void __semaphore_cb(struct uthread *uth, void *arg)
 {
 {
-       struct uth_mutex *mtx = (struct uth_mutex*)arg;
+       struct uth_semaphore *sem = (struct uth_semaphore*)arg;
 
        /* We need to tell the 2LS that its thread blocked.  We need to do this
 
        /* We need to tell the 2LS that its thread blocked.  We need to do this
-        * before unlocking the mtx, since as soon as we unlock, the mtx could be
+        * before unlocking the sem, since as soon as we unlock, the sem could be
         * released and our thread restarted.
         *
         * released and our thread restarted.
         *
-        * Also note the lock-ordering rule.  The mtx lock is grabbed before any
+        * Also note the lock-ordering rule.  The sem lock is grabbed before any
         * locks the 2LS might grab. */
         * locks the 2LS might grab. */
-       uthread_has_blocked(uth, mtx->sync_obj, UTH_EXT_BLK_MUTEX);
-       spin_pdr_unlock(&mtx->lock);
+       uthread_has_blocked(uth, sem->sync_obj, UTH_EXT_BLK_MUTEX);
+       spin_pdr_unlock(&sem->lock);
 }
 
 }
 
-void uth_mutex_lock(uth_mutex_t *mtx)
+void uth_semaphore_down(uth_semaphore_t *sem)
 {
 {
-       parlib_run_once(&mtx->once_ctl, __uth_mutex_init, mtx);
-       spin_pdr_lock(&mtx->lock);
-       if (!mtx->locked) {
-               mtx->locked = TRUE;
-               spin_pdr_unlock(&mtx->lock);
+       parlib_run_once(&sem->once_ctl, __uth_semaphore_init, sem);
+       spin_pdr_lock(&sem->lock);
+       if (sem->count > 0) {
+               /* Only down if we got one.  This means a sem with no more counts is 0,
+                * not negative (where -count == nr_waiters).  Doing it this way means
+                * our timeout function works for sems and CVs. */
+               sem->count--;
+               spin_pdr_unlock(&sem->lock);
                return;
        }
        /* the unlock and sync enqueuing is done in the yield callback.  as always,
         * we need to do this part in vcore context, since as soon as we unlock the
         * uthread could restart.  (atomically yield and unlock). */
                return;
        }
        /* the unlock and sync enqueuing is done in the yield callback.  as always,
         * we need to do this part in vcore context, since as soon as we unlock the
         * uthread could restart.  (atomically yield and unlock). */
-       uthread_yield(TRUE, __mutex_cb, mtx);
+       uthread_yield(TRUE, __semaphore_cb, sem);
 }
 
 }
 
-bool uth_mutex_trylock(uth_mutex_t *mtx)
+bool uth_semaphore_trydown(uth_semaphore_t *sem)
 {
        bool ret = FALSE;
 
 {
        bool ret = FALSE;
 
-       parlib_run_once(&mtx->once_ctl, __uth_mutex_init, mtx);
-       spin_pdr_lock(&mtx->lock);
-       if (!mtx->locked) {
-               mtx->locked = TRUE;
+       parlib_run_once(&sem->once_ctl, __uth_semaphore_init, sem);
+       spin_pdr_lock(&sem->lock);
+       if (sem->count > 0) {
+               sem->count--;
                ret = TRUE;
        }
                ret = TRUE;
        }
-       spin_pdr_unlock(&mtx->lock);
+       spin_pdr_unlock(&sem->lock);
        return ret;
 }
 
        return ret;
 }
 
-void uth_mutex_unlock(uth_mutex_t *mtx)
+void uth_semaphore_up(uth_semaphore_t *sem)
 {
        struct uthread *uth;
 
 {
        struct uthread *uth;
 
-       spin_pdr_lock(&mtx->lock);
-       uth = __uth_sync_get_next(mtx->sync_obj);
+       /* once-ing the 'up', unlike mtxs 'unlock', since sems can be special. */
+       parlib_run_once(&sem->once_ctl, __uth_semaphore_init, sem);
+       spin_pdr_lock(&sem->lock);
+       uth = __uth_sync_get_next(sem->sync_obj);
+       /* If there was a waiter, we pass our resource/count to them. */
        if (!uth)
        if (!uth)
-               mtx->locked = FALSE;
-       spin_pdr_unlock(&mtx->lock);
+               sem->count++;
+       spin_pdr_unlock(&sem->lock);
        if (uth)
                uthread_runnable(uth);
 }
 
        if (uth)
                uthread_runnable(uth);
 }
 
+/* Takes a void * since it's called by parlib_run_once(), which enables us to
+ * statically initialize the mutex.  This init does everything not done by the
+ * static initializer.  Note we do not allow 'static' destruction.  (No one
+ * calls free). */
+static void __uth_mutex_init(void *arg)
+{
+       struct uth_semaphore *mtx = (struct uth_semaphore*)arg;
+
+       __uth_semaphore_init(mtx);
+       mtx->count = 1;
+}
+
+uth_mutex_t *uth_mutex_alloc(void)
+{
+       struct uth_semaphore *mtx;
+
+       mtx = malloc(sizeof(struct uth_semaphore));
+       assert(mtx);
+       __uth_mutex_init(mtx);
+       parlib_set_ran_once(&mtx->once_ctl);
+       return mtx;
+}
+
+void uth_mutex_free(uth_mutex_t *mtx)
+{
+       uth_semaphore_free(mtx);
+}
+
+void uth_mutex_lock(uth_mutex_t *mtx)
+{
+       parlib_run_once(&mtx->once_ctl, __uth_mutex_init, mtx);
+       uth_semaphore_down(mtx);
+}
+
+bool uth_mutex_trylock(uth_mutex_t *mtx)
+{
+       parlib_run_once(&mtx->once_ctl, __uth_mutex_init, mtx);
+       return uth_semaphore_trydown(mtx);
+}
+
+void uth_mutex_unlock(uth_mutex_t *mtx)
+{
+       uth_semaphore_up(mtx);
+}
 
 /************** Recursive mutexes **************/
 
 
 /************** Recursive mutexes **************/
 
@@ -214,14 +262,14 @@ void uth_cond_var_free(uth_cond_var_t *cv)
 
 struct uth_cv_link {
        struct uth_cond_var                     *cv;
 
 struct uth_cv_link {
        struct uth_cond_var                     *cv;
-       struct uth_mutex                        *mtx;
+       struct uth_semaphore            *mtx;
 };
 
 static void __cv_wait_cb(struct uthread *uth, void *arg)
 {
        struct uth_cv_link *link = (struct uth_cv_link*)arg;
        struct uth_cond_var *cv = link->cv;
 };
 
 static void __cv_wait_cb(struct uthread *uth, void *arg)
 {
        struct uth_cv_link *link = (struct uth_cv_link*)arg;
        struct uth_cond_var *cv = link->cv;
-       struct uth_mutex *mtx = link->mtx;
+       struct uth_semaphore *mtx = link->mtx;
 
        /* We need to tell the 2LS that its thread blocked.  We need to do this
         * before unlocking the cv, since as soon as we unlock, the cv could be
 
        /* We need to tell the 2LS that its thread blocked.  We need to do this
         * before unlocking the cv, since as soon as we unlock, the cv could be
index 7ed39c6..6a159f8 100644 (file)
@@ -207,6 +207,57 @@ bool test_recurse_static(void)
        return __test_recurse(&static_recurse_mtx);
 }
 
        return __test_recurse(&static_recurse_mtx);
 }
 
+static bool __test_semaphore(struct uth_semaphore *sem, int count)
+{
+       bool can_down;
+
+       UT_ASSERT(count > 2);   /* for our own use */
+       UT_ASSERT(sem->count == count);
+       /* should be able to down it count times without blocking */
+       for (int i = 0; i < count; i++) {
+               uth_semaphore_down(sem);
+               UT_ASSERT(sem->count == count - (i + 1));
+       }
+
+       /* shouldn't be able to down, since we grabbed them all */
+       can_down = uth_semaphore_trydown(sem);
+       UT_ASSERT(!can_down);
+       UT_ASSERT(sem->count == 0);
+
+       uth_semaphore_up(sem);
+       UT_ASSERT(sem->count == 1);
+       can_down = uth_semaphore_trydown(sem);
+       UT_ASSERT(can_down);
+       UT_ASSERT(sem->count == 0);
+
+       for (int i = 0; i < count; i++) {
+               uth_semaphore_up(sem);
+               UT_ASSERT(sem->count == i + 1);
+       }
+
+       return TRUE;
+}
+
+bool test_semaphore(void)
+{
+       uth_semaphore_t *sem;
+
+       sem = uth_semaphore_alloc(5);
+       UT_ASSERT(sem);
+       if (!__test_semaphore(sem, 5))
+               return FALSE;
+       uth_semaphore_free(sem);
+
+       return TRUE;
+}
+
+bool test_semaphore_static(void)
+{
+       static uth_semaphore_t static_semaphore = UTH_SEMAPHORE_INIT(5);
+
+       return __test_semaphore(&static_semaphore, 5);
+}
+
 /* <--- End definition of test cases ---> */
 
 struct utest utests[] = {
 /* <--- End definition of test cases ---> */
 
 struct utest utests[] = {
@@ -215,6 +266,8 @@ struct utest utests[] = {
        UTEST_REG(broadcast),
        UTEST_REG(recurse),
        UTEST_REG(recurse_static),
        UTEST_REG(broadcast),
        UTEST_REG(recurse),
        UTEST_REG(recurse_static),
+       UTEST_REG(semaphore),
+       UTEST_REG(semaphore_static),
 };
 int num_utests = sizeof(utests) / sizeof(struct utest);
 
 };
 int num_utests = sizeof(utests) / sizeof(struct utest);