parlib: Catch illegal block attempts
[akaros.git] / user / parlib / mutex.c
index 0e4e7dd..d2a9c8e 100644 (file)
-/* Copyright (c) 2016 Google, Inc.
+/* Copyright (c) 2016-2017 Google, Inc.
  * Barret Rhoden <brho@cs.berkeley.edu>
  * See LICENSE for details. */
 
-/* Generic Uthread Mutexes and CVs.  2LSs implement their own methods, but we
- * need a 2LS-independent interface and default implementation. */
+/* Generic Uthread Semaphores, Mutexes, CVs, and other synchronization
+ * functions.  2LSs implement their own sync objects (bottom of the file). */
 
 #include <parlib/uthread.h>
 #include <sys/queue.h>
 #include <parlib/spinlock.h>
+#include <parlib/alarm.h>
+#include <parlib/assert.h>
 #include <malloc.h>
 
-/* The linkage structs are for the yield callbacks */
-struct uth_default_mtx;
-struct uth_mtx_link {
-       TAILQ_ENTRY(uth_mtx_link)       next;
-       struct uth_default_mtx          *mtx;
+struct timeout_blob {
+       bool                                            timed_out;
        struct uthread                          *uth;
+       uth_sync_t                                      *sync_ptr;
+       struct spin_pdr_lock            *lock_ptr;
 };
-TAILQ_HEAD(mtx_link_tq, uth_mtx_link);
 
-struct uth_default_mtx {
-       struct spin_pdr_lock            lock;
-       struct mtx_link_tq                      waiters;
-       bool                                            locked;
-};
+/* When sync primitives want to time out, they can use this alarm handler.  It
+ * needs a timeout_blob, which is independent of any particular sync method. */
+static void timeout_handler(struct alarm_waiter *waiter)
+{
+       struct timeout_blob *blob = (struct timeout_blob*)waiter->data;
+
+       spin_pdr_lock(blob->lock_ptr);
+       if (__uth_sync_get_uth(blob->sync_ptr, blob->uth))
+               blob->timed_out = TRUE;
+       spin_pdr_unlock(blob->lock_ptr);
+       if (blob->timed_out)
+               uthread_runnable(blob->uth);
+}
 
-struct uth_default_cv;
-struct uth_cv_link {
-       TAILQ_ENTRY(uth_cv_link)        next;
-       struct uth_default_cv           *cv;
-       struct uth_default_mtx          *mtx;
-       struct uthread                          *uth;
-};
-TAILQ_HEAD(cv_link_tq, uth_cv_link);
+/* Minor helper, sets a blob's fields */
+static void set_timeout_blob(struct timeout_blob *blob, uth_sync_t *sync_ptr,
+                             struct spin_pdr_lock *lock_ptr)
+{
+       blob->timed_out = FALSE;
+       blob->uth = current_uthread;
+       blob->sync_ptr = sync_ptr;
+       blob->lock_ptr = lock_ptr;
+}
 
-struct uth_default_cv {
-       struct spin_pdr_lock            lock;
-       struct cv_link_tq                       waiters;
-};
+/* Minor helper, sets an alarm for blob and a timespec */
+static void set_timeout_alarm(struct alarm_waiter *waiter,
+                              struct timeout_blob *blob,
+                              const struct timespec *abs_timeout)
+{
+       init_awaiter(waiter, timeout_handler);
+       waiter->data = blob;
+       set_awaiter_abs_unix(waiter, timespec_to_alarm_time(abs_timeout));
+       set_alarm(waiter);
+}
+
+/************** Semaphores and Mutexes **************/
 
+static void __uth_semaphore_init(void *arg)
+{
+       struct uth_semaphore *sem = (struct uth_semaphore*)arg;
 
-/************** Default Mutex Implementation **************/
+       spin_pdr_init(&sem->lock);
+       __uth_sync_init(&sem->sync_obj);
+       /* If we used a static initializer for a semaphore, count is already set.
+        * o/w it will be set by _alloc() or _init() (via uth_semaphore_init()). */
+}
 
+/* Initializes a sem acquired from somewhere else.  POSIX's sem_init() needs
+ * this. */
+void uth_semaphore_init(uth_semaphore_t *sem, unsigned int count)
+{
+       __uth_semaphore_init(sem);
+       sem->count = count;
+       /* The once is to make sure the object is initialized. */
+       parlib_set_ran_once(&sem->once_ctl);
+}
 
-static struct uth_default_mtx *uth_default_mtx_alloc(void)
+/* Undoes whatever was done in init. */
+void uth_semaphore_destroy(uth_semaphore_t *sem)
 {
-       struct uth_default_mtx *mtx;
+       __uth_sync_destroy(&sem->sync_obj);
+}
 
-       mtx = malloc(sizeof(struct uth_default_mtx));
-       assert(mtx);
-       spin_pdr_init(&mtx->lock);
-       TAILQ_INIT(&mtx->waiters);
-       mtx->locked = FALSE;
-       return mtx;
+uth_semaphore_t *uth_semaphore_alloc(unsigned int count)
+{
+       struct uth_semaphore *sem;
+
+       sem = malloc(sizeof(struct uth_semaphore));
+       assert(sem);
+       uth_semaphore_init(sem, count);
+       return sem;
 }
 
-static void uth_default_mtx_free(struct uth_default_mtx *mtx)
+void uth_semaphore_free(uth_semaphore_t *sem)
 {
-       assert(TAILQ_EMPTY(&mtx->waiters));
-       free(mtx);
+       uth_semaphore_destroy(sem);
+       free(sem);
 }
 
-static void __mutex_cb(struct uthread *uth, void *arg)
+static void __semaphore_cb(struct uthread *uth, void *arg)
 {
-       struct uth_mtx_link *link = (struct uth_mtx_link*)arg;
-       struct uth_default_mtx *mtx = link->mtx;
+       struct uth_semaphore *sem = (struct uth_semaphore*)arg;
 
        /* We need to tell the 2LS that its thread blocked.  We need to do this
-        * before unlocking the mtx, since as soon as we unlock, the mtx could be
+        * before unlocking the sem, since as soon as we unlock, the sem could be
         * released and our thread restarted.
         *
-        * Also note the lock-ordering rule.  The mtx lock is grabbed before any
+        * Also note the lock-ordering rule.  The sem lock is grabbed before any
         * locks the 2LS might grab. */
-       uthread_has_blocked(uth, UTH_EXT_BLK_MUTEX);
-       spin_pdr_unlock(&mtx->lock);
+       uthread_has_blocked(uth, &sem->sync_obj, UTH_EXT_BLK_MUTEX);
+       spin_pdr_unlock(&sem->lock);
 }
 
-static void uth_default_mtx_lock(struct uth_default_mtx *mtx)
+bool uth_semaphore_timed_down(uth_semaphore_t *sem,
+                              const struct timespec *abs_timeout)
 {
-       struct uth_mtx_link link;
+       struct alarm_waiter waiter[1];
+       struct timeout_blob blob[1];
+
+       assert_can_block();
+       parlib_run_once(&sem->once_ctl, __uth_semaphore_init, sem);
+       spin_pdr_lock(&sem->lock);
+       if (sem->count > 0) {
+               /* Only down if we got one.  This means a sem with no more counts is 0,
+                * not negative (where -count == nr_waiters).  Doing it this way means
+                * our timeout function works for sems and CVs. */
+               sem->count--;
+               spin_pdr_unlock(&sem->lock);
+               return TRUE;
+       }
+       if (abs_timeout) {
+               set_timeout_blob(blob, &sem->sync_obj, &sem->lock);
+               set_timeout_alarm(waiter, blob, abs_timeout);
+       }
+       /* the unlock and sync enqueuing is done in the yield callback.  as always,
+        * we need to do this part in vcore context, since as soon as we unlock the
+        * uthread could restart.  (atomically yield and unlock). */
+       uthread_yield(TRUE, __semaphore_cb, sem);
+       if (abs_timeout) {
+               /* We're guaranteed the alarm will either be cancelled or the handler
+                * complete when unset_alarm() returns. */
+               unset_alarm(waiter);
+               return blob->timed_out ? FALSE : TRUE;
+       }
+       return TRUE;
+}
 
-       spin_pdr_lock(&mtx->lock);
-       if (!mtx->locked) {
-               mtx->locked = TRUE;
-               spin_pdr_unlock(&mtx->lock);
-               return;
+void uth_semaphore_down(uth_semaphore_t *sem)
+{
+       uth_semaphore_timed_down(sem, NULL);
+}
+
+bool uth_semaphore_trydown(uth_semaphore_t *sem)
+{
+       bool ret = FALSE;
+
+       assert_can_block();
+       parlib_run_once(&sem->once_ctl, __uth_semaphore_init, sem);
+       spin_pdr_lock(&sem->lock);
+       if (sem->count > 0) {
+               sem->count--;
+               ret = TRUE;
        }
-       link.mtx = mtx;
-       link.uth = current_uthread;
-       TAILQ_INSERT_TAIL(&mtx->waiters, &link, next);
-       /* the unlock is done in the yield callback.  as always, we need to do this
-        * part in vcore context, since as soon as we unlock the uthread could
-        * restart.  (atomically yield and unlock). */
-       uthread_yield(TRUE, __mutex_cb, &link);
+       spin_pdr_unlock(&sem->lock);
+       return ret;
 }
 
-static void uth_default_mtx_unlock(struct uth_default_mtx *mtx)
+void uth_semaphore_up(uth_semaphore_t *sem)
 {
-       struct uth_mtx_link *first;
+       struct uthread *uth;
+
+       /* once-ing the 'up', unlike mtxs 'unlock', since sems can be special. */
+       parlib_run_once(&sem->once_ctl, __uth_semaphore_init, sem);
+       spin_pdr_lock(&sem->lock);
+       uth = __uth_sync_get_next(&sem->sync_obj);
+       /* If there was a waiter, we pass our resource/count to them. */
+       if (!uth)
+               sem->count++;
+       spin_pdr_unlock(&sem->lock);
+       if (uth)
+               uthread_runnable(uth);
+}
 
-       spin_pdr_lock(&mtx->lock);
-       first = TAILQ_FIRST(&mtx->waiters);
-       if (first)
-               TAILQ_REMOVE(&mtx->waiters, first, next);
-       else
-               mtx->locked = FALSE;
-       spin_pdr_unlock(&mtx->lock);
-       if (first)
-               uthread_runnable(first->uth);
+/* Takes a void * since it's called by parlib_run_once(), which enables us to
+ * statically initialize the mutex.  This init does everything not done by the
+ * static initializer.  Note we do not allow 'static' destruction.  (No one
+ * calls free). */
+static void __uth_mutex_init(void *arg)
+{
+       struct uth_semaphore *mtx = (struct uth_semaphore*)arg;
+
+       __uth_semaphore_init(mtx);
+       mtx->count = 1;
 }
 
+void uth_mutex_init(uth_mutex_t *mtx)
+{
+       __uth_mutex_init(mtx);
+       parlib_set_ran_once(&mtx->once_ctl);
+}
 
-/************** Wrappers for the uthread mutex interface **************/
+void uth_mutex_destroy(uth_mutex_t *mtx)
+{
+       uth_semaphore_destroy(mtx);
+}
 
+uth_mutex_t *uth_mutex_alloc(void)
+{
+       struct uth_semaphore *mtx;
 
-uth_mutex_t uth_mutex_alloc(void)
+       mtx = malloc(sizeof(struct uth_semaphore));
+       assert(mtx);
+       uth_mutex_init(mtx);
+       return mtx;
+}
+
+void uth_mutex_free(uth_mutex_t *mtx)
 {
-       if (sched_ops->mutex_alloc)
-               return sched_ops->mutex_alloc();
-       return (uth_mutex_t)uth_default_mtx_alloc();
+       uth_semaphore_free(mtx);
 }
 
-void uth_mutex_free(uth_mutex_t m)
+bool uth_mutex_timed_lock(uth_mutex_t *mtx, const struct timespec *abs_timeout)
 {
-       if (sched_ops->mutex_free) {
-               sched_ops->mutex_free(m);
-               return;
+       parlib_run_once(&mtx->once_ctl, __uth_mutex_init, mtx);
+       return uth_semaphore_timed_down(mtx, abs_timeout);
+}
+
+void uth_mutex_lock(uth_mutex_t *mtx)
+{
+       parlib_run_once(&mtx->once_ctl, __uth_mutex_init, mtx);
+       uth_semaphore_down(mtx);
+}
+
+bool uth_mutex_trylock(uth_mutex_t *mtx)
+{
+       parlib_run_once(&mtx->once_ctl, __uth_mutex_init, mtx);
+       return uth_semaphore_trydown(mtx);
+}
+
+void uth_mutex_unlock(uth_mutex_t *mtx)
+{
+       uth_semaphore_up(mtx);
+}
+
+/************** Recursive mutexes **************/
+
+static void __uth_recurse_mutex_init(void *arg)
+{
+       struct uth_recurse_mutex *r_mtx = (struct uth_recurse_mutex*)arg;
+
+       __uth_mutex_init(&r_mtx->mtx);
+       /* Since we always manually call __uth_mutex_init(), there's no reason to
+        * mess with the regular mutex's static initializer.  Just say it's been
+        * done. */
+       parlib_set_ran_once(&r_mtx->mtx.once_ctl);
+       r_mtx->lockholder = NULL;
+       r_mtx->count = 0;
+}
+
+void uth_recurse_mutex_init(uth_recurse_mutex_t *r_mtx)
+{
+       __uth_recurse_mutex_init(r_mtx);
+       parlib_set_ran_once(&r_mtx->once_ctl);
+}
+
+void uth_recurse_mutex_destroy(uth_recurse_mutex_t *r_mtx)
+{
+       uth_semaphore_destroy(&r_mtx->mtx);
+}
+
+uth_recurse_mutex_t *uth_recurse_mutex_alloc(void)
+{
+       struct uth_recurse_mutex *r_mtx = malloc(sizeof(struct uth_recurse_mutex));
+
+       assert(r_mtx);
+       uth_recurse_mutex_init(r_mtx);
+       return r_mtx;
+}
+
+void uth_recurse_mutex_free(uth_recurse_mutex_t *r_mtx)
+{
+       uth_recurse_mutex_destroy(r_mtx);
+       free(r_mtx);
+}
+
+bool uth_recurse_mutex_timed_lock(uth_recurse_mutex_t *r_mtx,
+                                  const struct timespec *abs_timeout)
+{
+       assert_can_block();
+       parlib_run_once(&r_mtx->once_ctl, __uth_recurse_mutex_init, r_mtx);
+       /* We don't have to worry about races on current_uthread or count.  They are
+        * only written by the initial lockholder, and this check will only be true
+        * for the initial lockholder, which cannot concurrently call this function
+        * twice (a thread is single-threaded).
+        *
+        * A signal handler running for a thread should not attempt to grab a
+        * recursive mutex (that's probably a bug).  If we need to support that,
+        * we'll have to disable notifs temporarily. */
+       if (r_mtx->lockholder == current_uthread) {
+               r_mtx->count++;
+               return TRUE;
        }
-       uth_default_mtx_free((struct uth_default_mtx*)m);
+       if (!uth_mutex_timed_lock(&r_mtx->mtx, abs_timeout))
+               return FALSE;
+       r_mtx->lockholder = current_uthread;
+       r_mtx->count = 1;
+       return TRUE;
 }
 
-void uth_mutex_lock(uth_mutex_t m)
+void uth_recurse_mutex_lock(uth_recurse_mutex_t *r_mtx)
 {
-       if (sched_ops->mutex_lock) {
-               sched_ops->mutex_lock(m);
-               return;
+       uth_recurse_mutex_timed_lock(r_mtx, NULL);
+}
+
+bool uth_recurse_mutex_trylock(uth_recurse_mutex_t *r_mtx)
+{
+       bool ret;
+
+       assert_can_block();
+       parlib_run_once(&r_mtx->once_ctl, __uth_recurse_mutex_init, r_mtx);
+       if (r_mtx->lockholder == current_uthread) {
+               r_mtx->count++;
+               return TRUE;
+       }
+       ret = uth_mutex_trylock(&r_mtx->mtx);
+       if (ret) {
+               r_mtx->lockholder = current_uthread;
+               r_mtx->count = 1;
        }
-       uth_default_mtx_lock((struct uth_default_mtx*)m);
+       return ret;
 }
 
-void uth_mutex_unlock(uth_mutex_t m)
+void uth_recurse_mutex_unlock(uth_recurse_mutex_t *r_mtx)
 {
-       if (sched_ops->mutex_unlock) {
-               sched_ops->mutex_unlock(m);
-               return;
+       r_mtx->count--;
+       if (!r_mtx->count) {
+               r_mtx->lockholder = NULL;
+               uth_mutex_unlock(&r_mtx->mtx);
        }
-       uth_default_mtx_unlock((struct uth_default_mtx*)m);
 }
 
 
-/************** Default Condition Variable Implementation **************/
+/************** Condition Variables **************/
 
 
-static struct uth_default_cv *uth_default_cv_alloc(void)
+static void __uth_cond_var_init(void *arg)
 {
-       struct uth_default_cv *cv;
+       struct uth_cond_var *cv = (struct uth_cond_var*)arg;
 
-       cv = malloc(sizeof(struct uth_default_cv));
-       assert(cv);
        spin_pdr_init(&cv->lock);
-       TAILQ_INIT(&cv->waiters);
+       __uth_sync_init(&cv->sync_obj);
+}
+
+void uth_cond_var_init(uth_cond_var_t *cv)
+{
+       __uth_cond_var_init(cv);
+       parlib_set_ran_once(&cv->once_ctl);
+}
+
+void uth_cond_var_destroy(uth_cond_var_t *cv)
+{
+       __uth_sync_destroy(&cv->sync_obj);
+}
+
+uth_cond_var_t *uth_cond_var_alloc(void)
+{
+       struct uth_cond_var *cv;
+
+       cv = malloc(sizeof(struct uth_cond_var));
+       assert(cv);
+       uth_cond_var_init(cv);
        return cv;
 }
 
-static void uth_default_cv_free(struct uth_default_cv *cv)
+void uth_cond_var_free(uth_cond_var_t *cv)
 {
-       assert(TAILQ_EMPTY(&cv->waiters));
+       uth_cond_var_destroy(cv);
        free(cv);
 }
 
+struct uth_cv_link {
+       struct uth_cond_var                     *cv;
+       struct uth_semaphore            *mtx;
+};
+
 static void __cv_wait_cb(struct uthread *uth, void *arg)
 {
        struct uth_cv_link *link = (struct uth_cv_link*)arg;
-       struct uth_default_cv *cv = link->cv;
-       struct uth_default_mtx *mtx = link->mtx;
+       struct uth_cond_var *cv = link->cv;
+       struct uth_semaphore *mtx = link->mtx;
 
        /* We need to tell the 2LS that its thread blocked.  We need to do this
         * before unlocking the cv, since as soon as we unlock, the cv could be
@@ -181,9 +396,19 @@ static void __cv_wait_cb(struct uthread *uth, void *arg)
         *
         * Also note the lock-ordering rule.  The cv lock is grabbed before any
         * locks the 2LS might grab. */
-       uthread_has_blocked(uth, UTH_EXT_BLK_MUTEX);
+       uthread_has_blocked(uth, &cv->sync_obj, UTH_EXT_BLK_MUTEX);
        spin_pdr_unlock(&cv->lock);
-       uth_mutex_unlock((uth_mutex_t)mtx);
+       /* This looks dangerous, since both the CV and MTX could use the
+        * uth->sync_next TAILQ_ENTRY (or whatever the 2LS uses), but the uthread
+        * never sleeps on both at the same time.  We *hold* the mtx - we aren't
+        * *sleeping* on it.  Sleeping uses the sync_next.  Holding it doesn't.
+        *
+        * Next, consider what happens as soon as we unlock the CV.  Our thread
+        * could get woken up, and then immediately try to grab the mtx and go to
+        * sleep! (see below).  If that happens, the uthread is no longer sleeping
+        * on the CV, and the sync_next is free.  The invariant is that a uthread
+        * can only sleep on one sync_object at a time. */
+       uth_mutex_unlock(mtx);
 }
 
 /* Caller holds mtx.  We will 'atomically' release it and wait.  On return,
@@ -239,98 +464,358 @@ static void __cv_wait_cb(struct uthread *uth, void *arg)
  *
  * Also note that we use the external API for the mutex operations.  A 2LS could
  * have their own mutex ops but still use the generic cv ops. */
-static void uth_default_cv_wait(struct uth_default_cv *cv,
-                                struct uth_default_mtx *mtx)
+bool uth_cond_var_timed_wait(uth_cond_var_t *cv, uth_mutex_t *mtx,
+                             const struct timespec *abs_timeout)
 {
        struct uth_cv_link link;
+       struct alarm_waiter waiter[1];
+       struct timeout_blob blob[1];
+       bool ret = TRUE;
 
+       assert_can_block();
+       parlib_run_once(&cv->once_ctl, __uth_cond_var_init, cv);
        link.cv = cv;
        link.mtx = mtx;
-       link.uth = current_uthread;
        spin_pdr_lock(&cv->lock);
-       TAILQ_INSERT_TAIL(&cv->waiters, &link, next);
+       if (abs_timeout) {
+               set_timeout_blob(blob, &cv->sync_obj, &cv->lock);
+               set_timeout_alarm(waiter, blob, abs_timeout);
+       }
        uthread_yield(TRUE, __cv_wait_cb, &link);
-       uth_mutex_lock((uth_mutex_t)mtx);
+       if (abs_timeout) {
+               unset_alarm(waiter);
+               ret = blob->timed_out ? FALSE : TRUE;
+       }
+       uth_mutex_lock(mtx);
+       return ret;
 }
 
-static void uth_default_cv_signal(struct uth_default_cv *cv)
+void uth_cond_var_wait(uth_cond_var_t *cv, uth_mutex_t *mtx)
 {
-       struct uth_cv_link *first;
+       uth_cond_var_timed_wait(cv, mtx, NULL);
+}
 
-       spin_pdr_lock(&cv->lock);
-       first = TAILQ_FIRST(&cv->waiters);
-       if (first)
-               TAILQ_REMOVE(&cv->waiters, first, next);
-       spin_pdr_unlock(&cv->lock);
-       if (first)
-               uthread_runnable(first->uth);
+/* GCC doesn't list this as one of the C++0x functions, but it's easy to do and
+ * implement uth_cond_var_wait_recurse() with it, just like for all the other
+ * 'timed' functions.
+ *
+ * Note the timeout applies to getting the signal on the CV, not on reacquiring
+ * the mutex. */
+bool uth_cond_var_timed_wait_recurse(uth_cond_var_t *cv,
+                                     uth_recurse_mutex_t *r_mtx,
+                                     const struct timespec *abs_timeout)
+{
+       unsigned int old_count = r_mtx->count;
+       bool ret;
+
+       /* In cond_wait, we're going to unlock the internal mutex.  We'll do the
+        * prep-work for that now.  (invariant is that an unlocked r_mtx has no
+        * lockholder and count == 0. */
+       r_mtx->lockholder = NULL;
+       r_mtx->count = 0;
+       ret = uth_cond_var_timed_wait(cv, &r_mtx->mtx, abs_timeout);
+       /* Now we hold the internal mutex again.  Need to restore the tracking. */
+       r_mtx->lockholder = current_uthread;
+       r_mtx->count = old_count;
+       return ret;
+}
+
+/* GCC wants this function, though its semantics are a little unclear.  I
+ * imagine you'd want to completely unlock it (say you locked it 3 times), and
+ * when you get it back, that you have your three locks back. */
+void uth_cond_var_wait_recurse(uth_cond_var_t *cv, uth_recurse_mutex_t *r_mtx)
+{
+       uth_cond_var_timed_wait_recurse(cv, r_mtx, NULL);
 }
 
-static void swap_cv_lists(struct cv_link_tq *a, struct cv_link_tq *b)
+void uth_cond_var_signal(uth_cond_var_t *cv)
 {
-       struct cv_link_tq temp;
+       struct uthread *uth;
 
-       temp = *a;
-       *a = *b;
-       *b = temp;
+       parlib_run_once(&cv->once_ctl, __uth_cond_var_init, cv);
+       spin_pdr_lock(&cv->lock);
+       uth = __uth_sync_get_next(&cv->sync_obj);
+       spin_pdr_unlock(&cv->lock);
+       if (uth)
+               uthread_runnable(uth);
 }
 
-static void uth_default_cv_broadcast(struct uth_default_cv *cv)
+void uth_cond_var_broadcast(uth_cond_var_t *cv)
 {
-       struct cv_link_tq restartees = TAILQ_HEAD_INITIALIZER(restartees);
-       struct uth_cv_link *i, *safe;
+       struct uth_tailq restartees = TAILQ_HEAD_INITIALIZER(restartees);
+       struct uthread *i, *safe;
 
+       parlib_run_once(&cv->once_ctl, __uth_cond_var_init, cv);
        spin_pdr_lock(&cv->lock);
-       swap_cv_lists(&cv->waiters, &restartees);
+       /* If this turns out to be slow or painful for 2LSs, we can implement a
+        * get_all or something (default used to use TAILQ_SWAP). */
+       while ((i = __uth_sync_get_next(&cv->sync_obj))) {
+               /* Once the uth is out of the sync obj, we can reuse sync_next. */
+               TAILQ_INSERT_TAIL(&restartees, i, sync_next);
+       }
        spin_pdr_unlock(&cv->lock);
        /* Need the SAFE, since we can't touch the linkage once the uth could run */
-       TAILQ_FOREACH_SAFE(i, &restartees, next, safe)
-               uthread_runnable(i->uth);
+       TAILQ_FOREACH_SAFE(i, &restartees, sync_next, safe)
+               uthread_runnable(i);
 }
 
 
-/************** Wrappers for the uthread CV interface **************/
+/************** Reader-writer Sleeping Locks **************/
 
 
-uth_cond_var_t uth_cond_var_alloc(void)
+static void __uth_rwlock_init(void *arg)
 {
-       if (sched_ops->cond_var_alloc)
-               return sched_ops->cond_var_alloc();
-       return (uth_cond_var_t)uth_default_cv_alloc();
+       struct uth_rwlock *rwl = (struct uth_rwlock*)arg;
+
+       spin_pdr_init(&rwl->lock);
+       rwl->nr_readers = 0;
+       rwl->has_writer = FALSE;
+       __uth_sync_init(&rwl->readers);
+       __uth_sync_init(&rwl->writers);
 }
 
-void uth_cond_var_free(uth_cond_var_t cv)
+void uth_rwlock_init(uth_rwlock_t *rwl)
 {
-       if (sched_ops->cond_var_free) {
-               sched_ops->cond_var_free(cv);
+       __uth_rwlock_init(rwl);
+       parlib_set_ran_once(&rwl->once_ctl);
+}
+
+void uth_rwlock_destroy(uth_rwlock_t *rwl)
+{
+       __uth_sync_destroy(&rwl->readers);
+       __uth_sync_destroy(&rwl->writers);
+}
+
+uth_rwlock_t *uth_rwlock_alloc(void)
+{
+       struct uth_rwlock *rwl;
+
+       rwl = malloc(sizeof(struct uth_rwlock));
+       assert(rwl);
+       uth_rwlock_init(rwl);
+       return rwl;
+}
+
+void uth_rwlock_free(uth_rwlock_t *rwl)
+{
+       uth_rwlock_destroy(rwl);
+       free(rwl);
+}
+
+/* Readers and writers block until they have the lock.  The delicacies are dealt
+ * with by the unlocker. */
+static void __rwlock_rd_cb(struct uthread *uth, void *arg)
+{
+       struct uth_rwlock *rwl = (struct uth_rwlock*)arg;
+
+       uthread_has_blocked(uth, &rwl->readers, UTH_EXT_BLK_MUTEX);
+       spin_pdr_unlock(&rwl->lock);
+}
+
+void uth_rwlock_rdlock(uth_rwlock_t *rwl)
+{
+       assert_can_block();
+       parlib_run_once(&rwl->once_ctl, __uth_rwlock_init, rwl);
+       spin_pdr_lock(&rwl->lock);
+       /* Readers always make progress when there is no writer */
+       if (!rwl->has_writer) {
+               rwl->nr_readers++;
+               spin_pdr_unlock(&rwl->lock);
                return;
        }
-       uth_default_cv_free((struct uth_default_cv*)cv);
+       uthread_yield(TRUE, __rwlock_rd_cb, rwl);
+}
+
+bool uth_rwlock_try_rdlock(uth_rwlock_t *rwl)
+{
+       bool ret = FALSE;
+
+       assert_can_block();
+       parlib_run_once(&rwl->once_ctl, __uth_rwlock_init, rwl);
+       spin_pdr_lock(&rwl->lock);
+       if (!rwl->has_writer) {
+               rwl->nr_readers++;
+               ret = TRUE;
+       }
+       spin_pdr_unlock(&rwl->lock);
+       return ret;
+}
+
+static void __rwlock_wr_cb(struct uthread *uth, void *arg)
+{
+       struct uth_rwlock *rwl = (struct uth_rwlock*)arg;
+
+       uthread_has_blocked(uth, &rwl->writers, UTH_EXT_BLK_MUTEX);
+       spin_pdr_unlock(&rwl->lock);
 }
 
-void uth_cond_var_wait(uth_cond_var_t cv, uth_mutex_t m)
+void uth_rwlock_wrlock(uth_rwlock_t *rwl)
 {
-       if (sched_ops->cond_var_wait) {
-               sched_ops->cond_var_wait(cv, m);
+       assert_can_block();
+       parlib_run_once(&rwl->once_ctl, __uth_rwlock_init, rwl);
+       spin_pdr_lock(&rwl->lock);
+       /* Writers require total mutual exclusion - no writers or readers */
+       if (!rwl->has_writer && !rwl->nr_readers) {
+               rwl->has_writer = TRUE;
+               spin_pdr_unlock(&rwl->lock);
                return;
        }
-       uth_default_cv_wait((struct uth_default_cv*)cv, (struct uth_default_mtx*)m);
+       uthread_yield(TRUE, __rwlock_wr_cb, rwl);
 }
 
-void uth_cond_var_signal(uth_cond_var_t cv)
+bool uth_rwlock_try_wrlock(uth_rwlock_t *rwl)
 {
-       if (sched_ops->cond_var_signal) {
-               sched_ops->cond_var_signal(cv);
+       bool ret = FALSE;
+
+       assert_can_block();
+       parlib_run_once(&rwl->once_ctl, __uth_rwlock_init, rwl);
+       spin_pdr_lock(&rwl->lock);
+       if (!rwl->has_writer && !rwl->nr_readers) {
+               rwl->has_writer = TRUE;
+               ret = TRUE;
+       }
+       spin_pdr_unlock(&rwl->lock);
+       return ret;
+}
+
+/* Let's try to wake writers (yes, this is a policy decision), and if none, wake
+ * all the readers.  The invariant there is that if there is no writer, then
+ * there are no waiting readers. */
+static void __rw_unlock_writer(struct uth_rwlock *rwl,
+                               struct uth_tailq *restartees)
+{
+       struct uthread *uth;
+
+       uth = __uth_sync_get_next(&rwl->writers);
+       if (uth) {
+               TAILQ_INSERT_TAIL(restartees, uth, sync_next);
+       } else {
+               rwl->has_writer = FALSE;
+               while ((uth = __uth_sync_get_next(&rwl->readers))) {
+                       TAILQ_INSERT_TAIL(restartees, uth, sync_next);
+                       rwl->nr_readers++;
+               }
+       }
+}
+
+static void __rw_unlock_reader(struct uth_rwlock *rwl,
+                               struct uth_tailq *restartees)
+{
+       struct uthread *uth;
+
+       rwl->nr_readers--;
+       if (!rwl->nr_readers) {
+               uth = __uth_sync_get_next(&rwl->writers);
+               if (uth) {
+                       TAILQ_INSERT_TAIL(restartees, uth, sync_next);
+                       rwl->has_writer = TRUE;
+               }
+       }
+}
+
+/* Unlock works for either readers or writer locks.  You can tell which you were
+ * based on whether has_writer is set or not. */
+void uth_rwlock_unlock(uth_rwlock_t *rwl)
+{
+       struct uth_tailq restartees = TAILQ_HEAD_INITIALIZER(restartees);
+       struct uthread *i, *safe;
+
+       spin_pdr_lock(&rwl->lock);
+       if (rwl->has_writer)
+               __rw_unlock_writer(rwl, &restartees);
+       else
+               __rw_unlock_reader(rwl, &restartees);
+       spin_pdr_unlock(&rwl->lock);
+       TAILQ_FOREACH_SAFE(i, &restartees, sync_next, safe)
+               uthread_runnable(i);
+}
+
+
+/************** Default Sync Obj Implementation **************/
+
+static void uth_default_sync_init(uth_sync_t *sync)
+{
+       struct uth_tailq *tq = (struct uth_tailq*)sync;
+
+       parlib_static_assert(sizeof(struct uth_tailq) <= sizeof(uth_sync_t));
+       TAILQ_INIT(tq);
+}
+
+static void uth_default_sync_destroy(uth_sync_t *sync)
+{
+       struct uth_tailq *tq = (struct uth_tailq*)sync;
+
+       assert(TAILQ_EMPTY(tq));
+}
+
+static struct uthread *uth_default_sync_get_next(uth_sync_t *sync)
+{
+       struct uth_tailq *tq = (struct uth_tailq*)sync;
+       struct uthread *first;
+
+       first = TAILQ_FIRST(tq);
+       if (first)
+               TAILQ_REMOVE(tq, first, sync_next);
+       return first;
+}
+
+static bool uth_default_sync_get_uth(uth_sync_t *sync, struct uthread *uth)
+{
+       struct uth_tailq *tq = (struct uth_tailq*)sync;
+       struct uthread *i;
+
+       TAILQ_FOREACH(i, tq, sync_next) {
+               if (i == uth) {
+                       TAILQ_REMOVE(tq, i, sync_next);
+                       return TRUE;
+               }
+       }
+       return FALSE;
+}
+
+/************** External uthread sync interface **************/
+
+/* Called by the 2LS->has_blocked op, if they are using the default sync.*/
+void __uth_default_sync_enqueue(struct uthread *uth, uth_sync_t *sync)
+{
+       struct uth_tailq *tq = (struct uth_tailq*)sync;
+
+       TAILQ_INSERT_TAIL(tq, uth, sync_next);
+}
+
+/* Called by 2LS-independent sync code when a sync object needs initialized. */
+void __uth_sync_init(uth_sync_t *sync)
+{
+       if (sched_ops->sync_init) {
+               sched_ops->sync_init(sync);
                return;
        }
-       uth_default_cv_signal((struct uth_default_cv*)cv);
+       uth_default_sync_init(sync);
 }
 
-void uth_cond_var_broadcast(uth_cond_var_t cv)
+/* Called by 2LS-independent sync code when a sync object is destroyed. */
+void __uth_sync_destroy(uth_sync_t *sync)
 {
-       if (sched_ops->cond_var_broadcast) {
-               sched_ops->cond_var_broadcast(cv);
+       if (sched_ops->sync_destroy) {
+               sched_ops->sync_destroy(sync);
                return;
        }
-       uth_default_cv_broadcast((struct uth_default_cv*)cv);
+       uth_default_sync_destroy(sync);
+}
+
+/* Called by 2LS-independent sync code when a thread needs to be woken. */
+struct uthread *__uth_sync_get_next(uth_sync_t *sync)
+{
+       if (sched_ops->sync_get_next)
+               return sched_ops->sync_get_next(sync);
+       return uth_default_sync_get_next(sync);
+}
+
+/* Called by 2LS-independent sync code when a specific thread needs to be woken.
+ * Returns TRUE if the uthread was blocked on the object, FALSE o/w. */
+bool __uth_sync_get_uth(uth_sync_t *sync, struct uthread *uth)
+{
+       if (sched_ops->sync_get_uth)
+               return sched_ops->sync_get_uth(sync, uth);
+       return uth_default_sync_get_uth(sync, uth);
 }