Add memory clobber to RISC-V set_stack_pointer
[akaros.git] / user / parlib / mcs.c
index 6a961f7..03c8dca 100644 (file)
@@ -4,6 +4,7 @@
 #include <string.h>
 #include <stdlib.h>
 #include <uthread.h>
+#include <parlib.h>
 
 // MCS locks
 void mcs_lock_init(struct mcs_lock *lock)
@@ -162,3 +163,208 @@ void mcs_barrier_wait(mcs_barrier_t* b, size_t pid)
        localflags->parity = 1-localflags->parity;
 }
 
+/* Preemption detection and recovering MCS locks.  These are memory safe ones.
+ * In the future, we can make ones that you pass the qnode to, so long as you
+ * never free the qnode storage (stacks) */
+void mcs_pdr_init(struct mcs_pdr_lock *lock)
+{
+       lock->lock = 0;
+       lock->lock_holder = 0;
+       lock->vc_qnodes = malloc(sizeof(struct mcs_pdr_qnode) * max_vcores());
+       assert(lock->vc_qnodes);
+       memset(lock->vc_qnodes, 0, sizeof(struct mcs_pdr_qnode) * max_vcores());
+       for (int i = 0; i < max_vcores(); i++)
+               lock->vc_qnodes[i].vcoreid = i;
+}
+
+void mcs_pdr_fini(struct mcs_pdr_lock *lock)
+{
+       assert(lock->vc_qnodes);
+       free(lock->vc_qnodes);
+}
+
+/* Helper, will make sure the vcore owning qnode is running.  If we change to
+ * that vcore, we'll continue when our vcore gets restarted.  If the change
+ * fails, it is because the vcore is running, and we'll continue.
+ *
+ * It's worth noting that changing to another vcore won't hurt correctness.
+ * Even if they are no longer the lockholder, they will be checking preemption
+ * messages and will help break out of the deadlock.  So long as we don't
+ * wastefully spin, we're okay. */
+static void __ensure_qnode_runs(struct mcs_pdr_qnode *qnode)
+{
+       /* This assert is somewhat useless.  __lock will compile it out, since it is
+        * impossible.  If you have a PF around here in __lock, odds are your stack
+        * is getting gently clobbered (syscall finished twice?). */
+       assert(qnode);
+       ensure_vcore_runs(qnode->vcoreid);
+}
+
+/* Internal version of the locking function, doesn't care about storage of qnode
+ * or if notifs are disabled. */
+void __mcs_pdr_lock(struct mcs_pdr_lock *lock, struct mcs_pdr_qnode *qnode)
+{
+       struct mcs_pdr_qnode *predecessor;
+       /* Now the actual lock */
+       qnode->next = 0;
+       cmb();  /* swap provides a CPU mb() */
+       predecessor = atomic_swap_ptr((void**)&lock->lock, qnode);
+       if (predecessor) {
+               qnode->locked = 1;
+               wmb();
+               predecessor->next = qnode;
+               /* no need for a wrmb(), since this will only get unlocked after they
+                * read our previous write */
+               while (qnode->locked) {
+                       /* Ideally, we know who the lock holder is, and we'll make sure they
+                        * run.  If not, we'll make sure our pred is running, which trickles
+                        * up to the lock holder, if it isn't them. */
+                       if (lock->lock_holder)
+                               __ensure_qnode_runs(lock->lock_holder);
+                       else
+                               __ensure_qnode_runs(predecessor);
+                       cpu_relax();
+               }
+       }
+       cmb();  /* just need a cmb, the swap handles the CPU wmb/wrmb() */
+       /* publish ourselves as the lock holder (optimization) */
+       lock->lock_holder = qnode;      /* mbs() handled by the cmb/swap */
+}
+
+/* Using the CAS style unlocks, since the usurper recovery is a real pain in the
+ * ass */
+void __mcs_pdr_unlock(struct mcs_pdr_lock *lock, struct mcs_pdr_qnode *qnode)
+{
+       struct mcs_pdr_qnode *a_tail;
+       /* Clear us from being the lock holder */
+       lock->lock_holder = 0;  /* mbs() are covered by the cmb/cas and the wmb */
+       /* Check if someone is already waiting on us to unlock */
+       if (qnode->next == 0) {
+               cmb();  /* no need for CPU mbs, since there's an atomic_cas() */
+               /* If we're still the lock, just swap it with 0 (unlock) and return */
+               if (atomic_cas_ptr((void**)&lock->lock, qnode, 0))
+                       goto out;
+               cmb();  /* need to read a fresh tail.  the CAS was a CPU mb */
+               /* Read in the tail (or someone who recent was the tail, but could now
+                * be farther up the chain), in prep for our spinning. */
+               a_tail = lock->lock;
+               /* We failed, someone is there and we are some (maybe a different)
+                * thread's pred.  Since someone else was waiting, they should have made
+                * themselves our next.  Spin (very briefly!) til it happens. */
+               while (qnode->next == 0) {
+                       /* We need to get our next to run, but we don't know who they are.
+                        * If we make sure a tail is running, that will percolate up to make
+                        * sure our qnode->next is running */
+                       __ensure_qnode_runs(a_tail);
+                       /* Arguably, that reads new tails each time, but it'll still work
+                        * for this rare case */
+                       cpu_relax();
+               }
+               /* Alpha wants a read_barrier_depends() here */
+               /* Now that we have a next, unlock them */
+               qnode->next->locked = 0;
+       } else {
+               /* mb()s necessary since we didn't call an atomic_swap() */
+               wmb();  /* need to make sure any previous writes don't pass unlocking */
+               rwmb(); /* need to make sure any reads happen before the unlocking */
+               /* simply unlock whoever is next */
+               qnode->next->locked = 0;
+       }
+out:
+       /* ease of debugging */
+       qnode->next = 0;
+}
+
+/* Actual MCS-PDR locks.  These use memory in the lock for their qnodes, though
+ * the internal locking code doesn't care where the qnodes come from: so long as
+ * they are not freed and can stand a random read of vcoreid. */
+void mcs_pdr_lock(struct mcs_pdr_lock *lock)
+{
+       struct mcs_pdr_qnode *qnode;
+       /* Disable notifs, if we're an _M uthread */
+       uth_disable_notifs();
+       /* Get our qnode from the array.  vcoreid was preset, and the other fields
+        * get handled by the lock */
+       qnode = &lock->vc_qnodes[vcore_id()];
+       assert(qnode->vcoreid == vcore_id());   /* sanity */
+       __mcs_pdr_lock(lock, qnode);
+}
+
+/* CAS-less unlock, not quite as efficient and will make sure every vcore runs
+ * (since we don't have a convenient way to make sure our qnode->next runs
+ * yet, other than making sure everyone runs).
+ *
+ * To use this without ensuring all vcores run, you'll need the unlock code to
+ * save pred to a specific field in the qnode and check both its initial pred
+ * as well as its run time pred (who could be an usurper).  It's all possible,
+ * but a little more difficult to follow.  Also, I'm adjusting this comment
+ * months after writing it originally, so it is probably not sufficient, but
+ * necessary. */
+void __mcs_pdr_unlock_no_cas(struct mcs_pdr_lock *lock,
+                             struct mcs_pdr_qnode *qnode)
+{
+       struct mcs_pdr_qnode *old_tail, *usurper;
+       /* Check if someone is already waiting on us to unlock */
+       if (qnode->next == 0) {
+               cmb();  /* no need for CPU mbs, since there's an atomic_swap() */
+               /* Unlock it */
+               old_tail = atomic_swap_ptr((void**)&lock->lock, 0);
+               /* no one else was already waiting, so we successfully unlocked and can
+                * return */
+               if (old_tail == qnode)
+                       return;
+               /* someone else was already waiting on the lock (last one on the list),
+                * and we accidentally took them off.  Try and put it back. */
+               usurper = atomic_swap_ptr((void*)&lock->lock, old_tail);
+               /* since someone else was waiting, they should have made themselves our
+                * next.  spin (very briefly!) til it happens. */
+               while (qnode->next == 0) {
+                       /* make sure old_tail isn't preempted.  best we can do for now is
+                        * to make sure all vcores run, and thereby get our next. */
+                       for (int i = 0; i < max_vcores(); i++)
+                               ensure_vcore_runs(i);
+                       cpu_relax();
+               }
+               if (usurper) {
+                       /* an usurper is someone who snuck in before we could put the old
+                        * tail back.  They now have the lock.  Let's put whoever is
+                        * supposed to be next as their next one. 
+                        *
+                        * First, we need to change our next's pred.  There's a slight race
+                        * here, so our next will need to make sure both us and pred are
+                        * done */
+                       /* I was trying to do something so we didn't need to ensure all
+                        * vcores run, using more space in the qnode to figure out who our
+                        * pred was a lock time (guessing actually, since there's a race,
+                        * etc). */
+                       //qnode->next->pred = usurper;
+                       //wmb();
+                       usurper->next = qnode->next;
+                       /* could imagine another wmb() and a flag so our next knows to no
+                        * longer check us too. */
+               } else {
+                       /* No usurper meant we put things back correctly, so we should just
+                        * pass the lock / unlock whoever is next */
+                       qnode->next->locked = 0;
+               }
+       } else {
+               /* mb()s necessary since we didn't call an atomic_swap() */
+               wmb();  /* need to make sure any previous writes don't pass unlocking */
+               rwmb(); /* need to make sure any reads happen before the unlocking */
+               /* simply unlock whoever is next */
+               qnode->next->locked = 0;
+       }
+}
+
+void mcs_pdr_unlock(struct mcs_pdr_lock *lock)
+{
+       struct mcs_pdr_qnode *qnode = &lock->vc_qnodes[vcore_id()];
+       assert(qnode->vcoreid == vcore_id());   /* sanity */
+#ifndef __riscv__
+       __mcs_pdr_unlock(lock, qnode);
+#else
+       __mcs_pdr_unlock_no_cas(lock, qnode);
+#endif
+       /* Enable notifs, if we're an _M uthread */
+       uth_enable_notifs();
+}