MCS Preemption-Detection and Recovery locks
authorBarret Rhoden <brho@cs.berkeley.edu>
Wed, 12 Oct 2011 01:06:54 +0000 (18:06 -0700)
committerKevin Klues <klueska@cs.berkeley.edu>
Thu, 3 Nov 2011 00:36:08 +0000 (17:36 -0700)
The basic idea is that if you are spinning, make sure someone else is
online/running that is on the way to making progress/unblocking you.
Normally, this is the lockholder, but it could also be someone else on a
chain of qnodes.  You make sure they are running, they make sure the
next one up is running, etc.  Even if we have only one pcore, this
should still unjam some preempted vcores involved in MCS locking.

Any MCS lock that is grabbed in vcore context will need to use these.
This includes all cases of notif_safe that needed to be called from
uthread context (though that needs a review, in case we still need
that).

Regardless the mcs_pdr locks can handle being called from uthread
context, and odds are that anyone desiring to use an MCS lock will need
to use these (even regular old uthread code).

user/parlib/include/mcs.h
user/parlib/include/vcore.h
user/parlib/mcs.c

index 9016302..57d332d 100644 (file)
@@ -50,6 +50,35 @@ void mcs_lock_unlock(struct mcs_lock *lock, struct mcs_lock_qnode *qnode);
 void mcs_lock_notifsafe(struct mcs_lock *lock, struct mcs_lock_qnode *qnode);
 void mcs_unlock_notifsafe(struct mcs_lock *lock, struct mcs_lock_qnode *qnode);
 
+/* Preemption detection and recovering MCS locks.
+ *
+ * The basic idea is that when spinning, vcores make sure someone else is
+ * making progress that will lead to them not spinning.  Usually, it'll be to
+ * make sure the lock holder (if known) is running.  If we don't know the lock
+ * holder, we nsure the end of whatever chain we can see is running, which will
+ * make sure its predecessor runs, which will eventually unjam the system.
+ *
+ * These are memory safe ones.  In the future, we can make ones that you pass
+ * the qnode to, so long as you never free the qnode storage (stacks).  */
+struct mcs_pdr_qnode
+{
+       struct mcs_pdr_qnode *next;
+       int locked;
+       uint32_t vcoreid;
+}__attribute__((aligned(ARCH_CL_SIZE)));
+
+struct mcs_pdr_lock
+{
+       struct mcs_pdr_qnode *lock;
+       struct mcs_pdr_qnode *lock_holder;
+       struct mcs_pdr_qnode *vc_qnodes;        /* malloc this at init time */
+};
+
+void mcs_pdr_init(struct mcs_pdr_lock *lock);
+void mcs_pdr_fini(struct mcs_pdr_lock *lock);
+void mcs_pdr_lock(struct mcs_pdr_lock *lock);
+void mcs_pdr_unlock(struct mcs_pdr_lock *lock);
+
 #ifdef __cplusplus
 }
 #endif
index 5b2be7a..3641658 100644 (file)
@@ -45,6 +45,7 @@ static inline bool in_multi_mode(void);
 static inline void __enable_notifs(uint32_t vcoreid);
 static inline void __disable_notifs(uint32_t vcoreid);
 static inline bool notif_is_enabled(uint32_t vcoreid);
+static inline bool vcore_is_mapped(uint32_t vcoreid);
 int vcore_init(void);
 int vcore_request(size_t k);
 void vcore_yield(bool preempt_pending);
@@ -95,6 +96,11 @@ static inline bool notif_is_enabled(uint32_t vcoreid)
        return __procdata.vcore_preempt_data[vcoreid].notif_enabled;
 }
 
+static inline bool vcore_is_mapped(uint32_t vcoreid)
+{
+       return __procinfo.vcoremap[vcoreid].valid;
+}
+
 #ifdef __cplusplus
 }
 #endif
index 6a961f7..8c303ec 100644 (file)
@@ -4,6 +4,7 @@
 #include <string.h>
 #include <stdlib.h>
 #include <uthread.h>
+#include <parlib.h>
 
 // MCS locks
 void mcs_lock_init(struct mcs_lock *lock)
@@ -162,3 +163,187 @@ void mcs_barrier_wait(mcs_barrier_t* b, size_t pid)
        localflags->parity = 1-localflags->parity;
 }
 
+/* Preemption detection and recovering MCS locks.  These are memory safe ones.
+ * In the future, we can make ones that you pass the qnode to, so long as you
+ * never free the qnode storage (stacks) */
+void mcs_pdr_init(struct mcs_pdr_lock *lock)
+{
+       lock->lock = 0;
+       lock->lock_holder = 0;
+       lock->vc_qnodes = malloc(sizeof(struct mcs_pdr_qnode) * max_vcores());
+       assert(lock->vc_qnodes);
+       for (int i = 0; i < max_vcores(); i++)
+               lock->vc_qnodes[i].vcoreid = i;
+}
+
+void mcs_pdr_fini(struct mcs_pdr_lock *lock)
+{
+       assert(lock->vc_qnodes);
+       free(lock->vc_qnodes);
+}
+
+/* Helper, will make sure the vcore owning qnode is running.  If we change to
+ * that vcore, we'll continue when our vcore gets restarted.  If the change
+ * fails, it is because the vcore is running, and we'll continue. */
+void __ensure_qnode_runs(struct mcs_pdr_qnode *qnode)
+{
+       if (!vcore_is_mapped(qnode->vcoreid)) {
+               /* We want to recover them from preemption.  Since we know they have
+                * notifs disabled, they will need to be directly restarted, so we can
+                * skip the other logic and cut straight to the sys_change_vcore() */
+               sys_change_vcore(qnode->vcoreid, FALSE);
+               cmb();
+       }
+}
+
+/* Internal version of the locking function, doesn't care about storage of qnode
+ * or if notifs are disabled. */
+void __mcs_pdr_lock(struct mcs_pdr_lock *lock, struct mcs_pdr_qnode *qnode)
+{
+       struct mcs_pdr_qnode *predecessor;
+       /* Now the actual lock */
+       qnode->next = 0;
+       cmb();  /* swap provides a CPU mb() */
+       predecessor = atomic_swap_ptr((void**)&lock->lock, qnode);
+       if (predecessor) {
+               qnode->locked = 1;
+               wmb();
+               predecessor->next = qnode;
+               /* no need for a wrmb(), since this will only get unlocked after they
+                * read our previous write */
+               while (qnode->locked) {
+                       /* Ideally, we know who the lock holder is, and we'll make sure they
+                        * run.  If not, we'll make sure our pred is running, which trickles
+                        * up to the lock holder, if it isn't them. */
+                       if (lock->lock_holder)
+                               __ensure_qnode_runs(lock->lock_holder);
+                       else
+                               __ensure_qnode_runs(predecessor);
+                       cpu_relax();
+               }
+       }
+       cmb();  /* just need a cmb, the swap handles the CPU wmb/wrmb() */
+       /* publish ourselves as the lock holder (optimization) */
+       lock->lock_holder = qnode;      /* mbs() handled by the cmb/swap */
+}
+
+/* Using the CAS style unlocks, since the usurper recovery is a real pain in the
+ * ass */
+void __mcs_pdr_unlock(struct mcs_pdr_lock *lock, struct mcs_pdr_qnode *qnode)
+{
+       struct mcs_pdr_qnode *a_tail;
+       /* Clear us from being the lock holder */
+       lock->lock_holder = 0;  /* mbs() are covered by the cmb/cas and the wmb */
+       /* Check if someone is already waiting on us to unlock */
+       if (qnode->next == 0) {
+               cmb();  /* no need for CPU mbs, since there's an atomic_cas() */
+               /* If we're still the lock, just swap it with 0 (unlock) and return */
+               if (atomic_cas_ptr((void**)&lock->lock, qnode, 0))
+                       return;
+               cmb();  /* need to read a fresh tail.  the CAS was a CPU mb */
+               /* Read in the tail (or someone who recent was the tail, but could now
+                * be farther up the chain), in prep for our spinning. */
+               a_tail = lock->lock;
+               /* We failed, someone is there and we are some (maybe a different)
+                * thread's pred.  Since someone else was waiting, they should have made
+                * themselves our next.  Spin (very briefly!) til it happens. */
+               while (qnode->next == 0) {
+                       /* We need to get our next to run, but we don't know who they are.
+                        * If we make sure a tail is running, that will percolate up to make
+                        * sure our qnode->next is running */
+                       __ensure_qnode_runs(a_tail);
+                       /* Arguably, that reads new tails each time, but it'll still work
+                        * for this rare case */
+                       cpu_relax();
+               }
+               /* Alpha wants a read_barrier_depends() here */
+               /* Now that we have a next, unlock them */
+               qnode->next->locked = 0;
+       } else {
+               /* mb()s necessary since we didn't call an atomic_swap() */
+               wmb();  /* need to make sure any previous writes don't pass unlocking */
+               rwmb(); /* need to make sure any reads happen before the unlocking */
+               /* simply unlock whoever is next */
+               qnode->next->locked = 0;
+       }
+}
+
+/* Actual MCS-PDR locks.  These use memory in the lock for their qnodes, though
+ * the internal locking code doesn't care where the qnodes come from: so long as
+ * they are not freed and can stand a random read of vcoreid. */
+void mcs_pdr_lock(struct mcs_pdr_lock *lock)
+{
+       struct mcs_pdr_qnode *qnode;
+       /* Disable notifs, if we're an _M uthread */
+       uth_disable_notifs();
+       /* Get our qnode from the array.  vcoreid was preset, and the other fields
+        * get handled by the lock */
+       qnode = &lock->vc_qnodes[vcore_id()];
+       assert(qnode->vcoreid == vcore_id());   /* sanity */
+       __mcs_pdr_lock(lock, qnode);
+}
+
+void mcs_pdr_unlock(struct mcs_pdr_lock *lock)
+{
+       struct mcs_pdr_qnode *qnode = &lock->vc_qnodes[vcore_id()];
+       assert(qnode->vcoreid == vcore_id());   /* sanity */
+       __mcs_pdr_unlock(lock, qnode);
+       /* Enable notifs, if we're an _M uthread */
+       uth_enable_notifs();
+}
+
+#if 0
+/* We don't actually use this.  To use this, you'll need the unlock code to save
+ * pred to a specific field in the qnode and check both its initial pred as well
+ * as its run time pred (who could be an usurper).  It's all possible, but a
+ * little more difficult to follow. */
+void __mcs_pdr_unlock_no_cas(struct mcs_pdr_lock *lock,
+                             struct mcs_pdr_qnode *qnode)
+{
+       struct mcs_pdr_qnode *old_tail, *usurper;
+       /* Check if someone is already waiting on us to unlock */
+       if (qnode->next == 0) {
+               cmb();  /* no need for CPU mbs, since there's an atomic_swap() */
+               /* Unlock it */
+               old_tail = atomic_swap_ptr((void**)&lock->lock, 0);
+               /* no one else was already waiting, so we successfully unlocked and can
+                * return */
+               if (old_tail == qnode)
+                       return;
+               /* someone else was already waiting on the lock (last one on the list),
+                * and we accidentally took them off.  Try and put it back. */
+               usurper = atomic_swap_ptr((void*)&lock->lock, old_tail);
+               /* since someone else was waiting, they should have made themselves our
+                * next.  spin (very briefly!) til it happens. */
+               while (qnode->next == 0) {
+                       /* make sure old_tail isn't preempted */
+
+                       cpu_relax();
+               }
+               if (usurper) {
+                       /* an usurper is someone who snuck in before we could put the old
+                        * tail back.  They now have the lock.  Let's put whoever is
+                        * supposed to be next as their next one. 
+                        *
+                        * First, we need to change our next's pred.  There's a slight race
+                        * here, so our next will need to make sure both us and pred are
+                        * done */
+                       qnode->next->pred = usurper;
+                       wmb();
+                       usurper->next = qnode->next;
+                       /* could imagine another wmb() and a flag so our next knows to no
+                        * longer check us too. */
+               } else {
+                       /* No usurper meant we put things back correctly, so we should just
+                        * pass the lock / unlock whoever is next */
+                       qnode->next->locked = 0;
+               }
+       } else {
+               /* mb()s necessary since we didn't call an atomic_swap() */
+               wmb();  /* need to make sure any previous writes don't pass unlocking */
+               rwmb(); /* need to make sure any reads happen before the unlocking */
+               /* simply unlock whoever is next */
+               qnode->next->locked = 0;
+       }
+}
+#endif