akaros/user/parlib/mcs.c
<<
>>
Prefs
   1#include <parlib/vcore.h>
   2#include <parlib/mcs.h>
   3#include <parlib/arch/atomic.h>
   4#include <string.h>
   5#include <stdlib.h>
   6#include <parlib/uthread.h>
   7#include <parlib/parlib.h>
   8#include <malloc.h>
   9
  10// MCS locks
  11void mcs_lock_init(struct mcs_lock *lock)
  12{
  13        memset(lock,0,sizeof(mcs_lock_t));
  14}
  15
  16static inline mcs_lock_qnode_t *mcs_qnode_swap(mcs_lock_qnode_t **addr,
  17                                               mcs_lock_qnode_t *val)
  18{
  19        return (mcs_lock_qnode_t*)atomic_swap_ptr((void**)addr, val);
  20}
  21
  22void mcs_lock_lock(struct mcs_lock *lock, struct mcs_lock_qnode *qnode)
  23{
  24        qnode->next = 0;
  25        cmb();  /* swap provides a CPU mb() */
  26        mcs_lock_qnode_t *predecessor = mcs_qnode_swap(&lock->lock, qnode);
  27        if (predecessor) {
  28                qnode->locked = 1;
  29                wmb();
  30                predecessor->next = qnode;
  31                /* no need for a wrmb(), since this will only get unlocked after
  32                 * they read our previous write */
  33                while (qnode->locked)
  34                        cpu_relax();
  35        }
  36        cmb();  /* just need a cmb, the swap handles the CPU wmb/wrmb() */
  37}
  38
  39void mcs_lock_unlock(struct mcs_lock *lock, struct mcs_lock_qnode *qnode)
  40{
  41        /* Check if someone is already waiting on us to unlock */
  42        if (qnode->next == 0) {
  43                cmb(); /* no need for CPU mbs, since there's an atomic_swap() */
  44                /* Unlock it */
  45                mcs_lock_qnode_t *old_tail = mcs_qnode_swap(&lock->lock,0);
  46                /* no one else was already waiting, so we successfully unlocked
  47                 * and can return */
  48                if (old_tail == qnode)
  49                        return;
  50                /* someone else was already waiting on the lock (last one on the
  51                 * list), and we accidentally took them off.  Try and put it
  52                 * back. */
  53                mcs_lock_qnode_t *usurper = mcs_qnode_swap(&lock->lock,old_tail);
  54                /* since someone else was waiting, they should have made
  55                 * themselves our next.  spin (very briefly!) til it happens. */
  56                while (qnode->next == 0)
  57                        cpu_relax();
  58                if (usurper) {
  59                        /* an usurper is someone who snuck in before we could
  60                         * put the old tail back.  They now have the lock.
  61                         * Let's put whoever is supposed to be next as their
  62                         * next one. */
  63                        usurper->next = qnode->next;
  64                } else {
  65                        /* No usurper meant we put things back correctly, so we
  66                         * should just pass the lock / unlock whoever is next */
  67                        qnode->next->locked = 0;
  68                }
  69        } else {
  70                /* mb()s necessary since we didn't call an atomic_swap() */
  71                /* need to make sure any previous writes don't pass unlocking */
  72                wmb();
  73                /* need to make sure any reads happen before the unlocking */
  74                rwmb();
  75                /* simply unlock whoever is next */
  76                qnode->next->locked = 0;
  77        }
  78}
  79
  80/* CAS style mcs locks, kept around til we use them.  We're using the
  81 * usurper-style, since RISCV doesn't have a real CAS (yet?). */
  82void mcs_lock_unlock_cas(struct mcs_lock *lock, struct mcs_lock_qnode *qnode)
  83{
  84        /* Check if someone is already waiting on us to unlock */
  85        if (qnode->next == 0) {
  86                cmb();  /* no need for CPU mbs, since there's an atomic_cas() */
  87                /* If we're still the lock, just swap it with 0 (unlock) and
  88                 * return */
  89                if (atomic_cas_ptr((void**)&lock->lock, qnode, 0))
  90                        return;
  91                /* We failed, someone is there and we are some (maybe a
  92                 * different) thread's pred.  Since someone else was waiting,
  93                 * they should have made themselves our next.  Spin (very
  94                 * briefly!) til it happens. */
  95                while (qnode->next == 0)
  96                        cpu_relax();
  97                /* Alpha wants a read_barrier_depends() here */
  98                /* Now that we have a next, unlock them */
  99                qnode->next->locked = 0;
 100        } else {
 101                /* mb()s necessary since we didn't call an atomic_swap() */
 102                /* need to make sure any previous writes don't pass unlocking */
 103                wmb();
 104                /* need to make sure any reads happen before the unlocking */
 105                rwmb();
 106                /* simply unlock whoever is next */
 107                qnode->next->locked = 0;
 108        }
 109}
 110
 111/* We don't bother saving the state, like we do with irqsave, since we can use
 112 * whether or not we are in vcore context to determine that.  This means you
 113 * shouldn't call this from those moments when you fake being in vcore context
 114 * (when switching into the TLS, etc). */
 115void mcs_lock_notifsafe(struct mcs_lock *lock, struct mcs_lock_qnode *qnode)
 116{
 117        uth_disable_notifs();
 118        mcs_lock_lock(lock, qnode);
 119}
 120
 121/* Note we turn off the DONT_MIGRATE flag before enabling notifs.  This is fine,
 122 * since we wouldn't receive any notifs that could lead to us migrating after we
 123 * set DONT_MIGRATE but before enable_notifs().  We need it to be in this order,
 124 * since we need to check messages after ~DONT_MIGRATE. */
 125void mcs_unlock_notifsafe(struct mcs_lock *lock, struct mcs_lock_qnode *qnode)
 126{
 127        mcs_lock_unlock(lock, qnode);
 128        uth_enable_notifs();
 129}
 130
 131/* Preemption detection and recovering MCS locks. */
 132/* Old style.  Has trouble getting out of 'preempt/change-to storms' under
 133 * heavy contention and with preemption. */
 134void mcs_pdro_init(struct mcs_pdro_lock *lock)
 135{
 136        lock->lock = 0;
 137}
 138
 139void mcs_pdro_fini(struct mcs_pdro_lock *lock)
 140{
 141}
 142
 143/* Internal version of the locking function, doesn't care if notifs are
 144 * disabled.  While spinning, we'll check to see if other vcores involved in the
 145 * locking are running.  If we change to that vcore, we'll continue when our
 146 * vcore gets restarted.  If the change fails, it is because the vcore is
 147 * running, and we'll continue.
 148 *
 149 * It's worth noting that changing to another vcore won't hurt correctness.
 150 * Even if they are no longer the lockholder, they will be checking preemption
 151 * messages and will help break out of the deadlock.  So long as we don't
 152 * spin uncontrollably, we're okay. */
 153void __mcs_pdro_lock(struct mcs_pdro_lock *lock, struct mcs_pdro_qnode *qnode)
 154{
 155        struct mcs_pdro_qnode *predecessor;
 156        uint32_t pred_vcoreid;
 157
 158        qnode->next = 0;
 159        cmb();  /* swap provides a CPU mb() */
 160        predecessor = atomic_swap_ptr((void**)&lock->lock, qnode);
 161        if (predecessor) {
 162                qnode->locked = 1;
 163                /* Read-in the vcoreid before releasing them.  We won't need to
 164                 * worry about their qnode memory being freed/reused (they can't
 165                 * til we fill in the 'next' slot), which is a bit of a
 166                 * performance win.  This also cuts down on cache-line
 167                 * contention when we ensure they run, which helps a lot too. */
 168                pred_vcoreid = ACCESS_ONCE(predecessor->vcoreid);
 169                wmb();  /* order the locked write before the next write */
 170                predecessor->next = qnode;
 171                /* no need for a wrmb(), since this will only get unlocked after
 172                 * they read our previous write */
 173                while (qnode->locked) {
 174                        /* We don't know who the lock holder is (it hurts
 175                         * performance via 'true' sharing to track it)  Instead
 176                         * we'll make sure our pred is running, which trickles
 177                         * up to the lock holder. */
 178                        ensure_vcore_runs(pred_vcoreid);
 179                        cpu_relax();
 180                }
 181        }
 182}
 183
 184/* Using the CAS style unlocks, since the usurper recovery is a real pain in the
 185 * ass */
 186void __mcs_pdro_unlock(struct mcs_pdro_lock *lock, struct mcs_pdro_qnode *qnode)
 187{
 188        uint32_t a_tail_vcoreid;
 189        /* Check if someone is already waiting on us to unlock */
 190        if (qnode->next == 0) {
 191                cmb();  /* no need for CPU mbs, since there's an atomic_cas() */
 192                /* If we're still the lock, just swap it with 0 (unlock) and
 193                 * return */
 194                if (atomic_cas_ptr((void**)&lock->lock, qnode, 0))
 195                        return;
 196                /* Read in the tail (or someone who recently was the tail, but
 197                 * could now be farther up the chain), in prep for our spinning.
 198                 */
 199                a_tail_vcoreid = ACCESS_ONCE(lock->lock->vcoreid);
 200                /* We failed, someone is there and we are some (maybe a
 201                 * different) thread's pred.  Since someone else was waiting,
 202                 * they should have made themselves our next.  Spin (very
 203                 * briefly!) til it happens. */
 204                while (qnode->next == 0) {
 205                        /* We need to get our next to run, but we don't know who
 206                         * they are.  If we make sure a tail is running, that
 207                         * will percolate up to make sure our qnode->next is
 208                         * running */
 209                        ensure_vcore_runs(a_tail_vcoreid);
 210                        cpu_relax();
 211                }
 212                /* Alpha wants a read_barrier_depends() here */
 213                /* Now that we have a next, unlock them */
 214                qnode->next->locked = 0;
 215        } else {
 216                /* mb()s necessary since we didn't call an atomic_swap() */
 217                /* need to make sure any previous writes don't pass unlocking */
 218                wmb();
 219                /* need to make sure any reads happen before the unlocking */
 220                rwmb();
 221                /* simply unlock whoever is next */
 222                qnode->next->locked = 0;
 223        }
 224}
 225
 226/* Actual MCS-PDRO locks.  Don't worry about initializing any fields of qnode.
 227 * We'll do vcoreid here, and the locking code deals with the other fields */
 228void mcs_pdro_lock(struct mcs_pdro_lock *lock, struct mcs_pdro_qnode *qnode)
 229{
 230        /* Disable notifs, if we're an _M uthread */
 231        uth_disable_notifs();
 232        cmb();  /* in the off-chance the compiler wants to read vcoreid early */
 233        qnode->vcoreid = vcore_id();
 234        __mcs_pdro_lock(lock, qnode);
 235}
 236
 237/* CAS-less unlock, not quite as efficient and will make sure every vcore runs
 238 * (since we don't have a convenient way to make sure our qnode->next runs
 239 * yet, other than making sure everyone runs).
 240 *
 241 * To use this without ensuring all vcores run, you'll need the unlock code to
 242 * save pred to a specific field in the qnode and check both its initial pred
 243 * as well as its run time pred (who could be an usurper).  It's all possible,
 244 * but a little more difficult to follow.  Also, I'm adjusting this comment
 245 * months after writing it originally, so it is probably not sufficient, but
 246 * necessary. */
 247void __mcs_pdro_unlock_no_cas(struct mcs_pdro_lock *lock,
 248                             struct mcs_pdro_qnode *qnode)
 249{
 250        struct mcs_pdro_qnode *old_tail, *usurper;
 251
 252        /* Check if someone is already waiting on us to unlock */
 253        if (qnode->next == 0) {
 254                cmb(); /* no need for CPU mbs, since there's an atomic_swap() */
 255                /* Unlock it */
 256                old_tail = atomic_swap_ptr((void**)&lock->lock, 0);
 257                /* no one else was already waiting, so we successfully unlocked
 258                 * and can return */
 259                if (old_tail == qnode)
 260                        return;
 261                /* someone else was already waiting on the lock (last one on the
 262                 * list), and we accidentally took them off.  Try and put it
 263                 * back. */
 264                usurper = atomic_swap_ptr((void*)&lock->lock, old_tail);
 265                /* since someone else was waiting, they should have made
 266                 * themselves our next.  spin (very briefly!) til it happens. */
 267                while (qnode->next == 0) {
 268                        /* make sure old_tail isn't preempted.  best we can do
 269                         * for now is to make sure all vcores run, and thereby
 270                         * get our next. */
 271                        for (int i = 0; i < max_vcores(); i++)
 272                                ensure_vcore_runs(i);
 273                        cpu_relax();
 274                }
 275                if (usurper) {
 276                        /* an usurper is someone who snuck in before we could
 277                         * put the old tail back.  They now have the lock.
 278                         * Let's put whoever is supposed to be next as their
 279                         * next one. 
 280                         *
 281                         * First, we need to change our next's pred.  There's a
 282                         * slight race here, so our next will need to make sure
 283                         * both us and pred are done */
 284                        /* I was trying to do something so we didn't need to
 285                         * ensure all vcores run, using more space in the qnode
 286                         * to figure out who our pred was a lock time (guessing
 287                         * actually, since there's a race, etc). */
 288                        //qnode->next->pred = usurper;
 289                        //wmb();
 290                        usurper->next = qnode->next;
 291                        /* could imagine another wmb() and a flag so our next
 292                         * knows to no longer check us too. */
 293                } else {
 294                        /* No usurper meant we put things back correctly, so we
 295                         * should just pass the lock / unlock whoever is next */
 296                        qnode->next->locked = 0;
 297                }
 298        } else {
 299                /* mb()s necessary since we didn't call an atomic_swap() */
 300                /* need to make sure any previous writes don't pass unlocking */
 301                wmb();
 302                /* need to make sure any reads happen before the unlocking */
 303                rwmb();
 304                /* simply unlock whoever is next */
 305                qnode->next->locked = 0;
 306        }
 307}
 308
 309void mcs_pdro_unlock(struct mcs_pdro_lock *lock, struct mcs_pdro_qnode *qnode)
 310{
 311        __mcs_pdro_unlock(lock, qnode);
 312        /* Enable notifs, if we're an _M uthread */
 313        uth_enable_notifs();
 314}
 315
 316/* New style: under heavy contention with preemption, they won't enter the
 317 * 'preempt/change_to storm' that can happen to PDRs, at the cost of some
 318 * performance.  This is the default. */
 319void mcs_pdr_init(struct mcs_pdr_lock *lock)
 320{
 321        int ret;
 322
 323        lock->lock = 0;
 324        lock->lockholder_vcoreid = MCSPDR_NO_LOCKHOLDER;
 325        ret = posix_memalign((void**)&lock->qnodes,
 326                             __alignof__(struct mcs_pdr_qnode),
 327                             sizeof(struct mcs_pdr_qnode) * max_vcores());
 328        assert(!ret);
 329}
 330
 331void mcs_pdr_fini(struct mcs_pdr_lock *lock)
 332{
 333        free(lock->qnodes);
 334}
 335
 336/* Similar to the original PDR lock, this tracks the lockholder for better
 337 * recovery from preemptions.  Under heavy contention, changing to the
 338 * lockholder instead of pred makes it more likely to have a vcore outside the
 339 * MCS chain handle the preemption.  If that never happens, performance will
 340 * suffer.
 341 *
 342 * Simply checking the lockholder causes a lot of unnecessary traffic, so we
 343 * first look for signs of preemption in read-mostly locations (by comparison,
 344 * the lockholder changes on every lock/unlock).
 345 *
 346 * We also use the "qnodes are in the lock" style, which is slightly slower than
 347 * using the stack in regular MCS/MCSPDR locks, but it speeds PDR up a bit by
 348 * not having to read other qnodes' memory to determine their vcoreid.  The
 349 * slowdown may be due to some weird caching/prefetch settings (like Adjacent
 350 * Cacheline Prefetch).
 351 *
 352 * Note that these locks, like all PDR locks, have opportunities to accidentally
 353 * ensure some vcore runs that isn't in the chain.  Whenever we read lockholder
 354 * or even pred, that particular vcore might subsequently unlock and then get
 355 * preempted (or change_to someone else) before we ensure they run.  If this
 356 * happens and there is another VC in the MCS chain, it will make sure the right
 357 * cores run.  If there are no other vcores in the chain, it is up to the rest
 358 * of the vcore/event handling system to deal with this, which should happen
 359 * when one of the other vcores handles the preemption message generated by our
 360 * change_to. */
 361void __mcs_pdr_lock(struct mcs_pdr_lock *lock, struct mcs_pdr_qnode *qnode)
 362{
 363        struct mcs_pdr_qnode *predecessor;
 364        uint32_t pred_vcoreid;
 365        struct mcs_pdr_qnode *qnode0 = qnode - vcore_id();
 366        seq_ctr_t seq;
 367        qnode->next = 0;
 368        cmb();  /* swap provides a CPU mb() */
 369        predecessor = atomic_swap_ptr((void**)&lock->lock, qnode);
 370        if (predecessor) {
 371                qnode->locked = 1;
 372                /* can compute this whenever */
 373                pred_vcoreid = predecessor - qnode0;
 374                wmb();  /* order the locked write before the next write */
 375                predecessor->next = qnode;
 376                seq = ACCESS_ONCE(__procinfo.coremap_seqctr);
 377                /* no need for a wrmb(), since this will only get unlocked after
 378                 * they read our pred->next write */
 379                while (qnode->locked) {
 380                        /* Check to see if anything is amiss.  If someone in the
 381                         * chain is preempted, then someone will notice.  Simply
 382                         * checking our pred isn't that great of an indicator of
 383                         * preemption.  The reason is that the offline vcore is
 384                         * most likely the lockholder (under heavy lock
 385                         * contention), and we want someone farther back in the
 386                         * chain to notice (someone that will stay preempted
 387                         * long enough for a vcore outside the chain to recover
 388                         * them).  Checking the seqctr will tell us of any
 389                         * preempts since we started, so if a storm starts while
 390                         * we're spinning, we can join in and try to save the
 391                         * lockholder before its successor gets it.
 392                         *
 393                         * Also, if we're the lockholder, then we need to let
 394                         * our pred run so they can hand us the lock. */
 395                        if (vcore_is_preempted(pred_vcoreid) ||
 396                            seq != __procinfo.coremap_seqctr) {
 397                                /* Note that we don't normally ensure our *pred*
 398                                 * runs. */
 399                                if (lock->lockholder_vcoreid ==
 400                                    MCSPDR_NO_LOCKHOLDER ||
 401                                    lock->lockholder_vcoreid == vcore_id())
 402                                        ensure_vcore_runs(pred_vcoreid);
 403                                else
 404                                        ensure_vcore_runs(
 405                                                lock->lockholder_vcoreid);
 406                        }
 407                        cpu_relax();
 408                }
 409        } else {
 410                lock->lockholder_vcoreid = vcore_id();
 411        }
 412}
 413
 414void __mcs_pdr_unlock(struct mcs_pdr_lock *lock, struct mcs_pdr_qnode *qnode)
 415{
 416        uint32_t a_tail_vcoreid;
 417        struct mcs_pdr_qnode *qnode0 = qnode - vcore_id();
 418
 419        /* Check if someone is already waiting on us to unlock */
 420        if (qnode->next == 0) {
 421                cmb();  /* no need for CPU mbs, since there's an atomic_cas() */
 422                /* If we're still the lock, just swap it with 0 (unlock) and
 423                 * return */
 424                if (atomic_cas_ptr((void**)&lock->lock, qnode, 0)) {
 425                        /* This is racy with the new lockholder.  it's possible
 426                         * that we'll clobber their legit write, though it
 427                         * doesn't actually hurt correctness.  it'll get sorted
 428                         * out on the next unlock. */
 429                        lock->lockholder_vcoreid = MCSPDR_NO_LOCKHOLDER;
 430                        return;
 431                }
 432                /* Get the tail (or someone who recently was the tail, but could
 433                 * now be farther up the chain), in prep for our spinning.
 434                 * Could do an ACCESS_ONCE on lock->lock */
 435                a_tail_vcoreid = lock->lock - qnode0;
 436                /* We failed, someone is there and we are some (maybe a
 437                 * different) thread's pred.  Since someone else was waiting,
 438                 * they should have made themselves our next.  Spin (very
 439                 * briefly!) til it happens. */
 440                while (qnode->next == 0) {
 441                        /* We need to get our next to run, but we don't know who
 442                         * they are.  If we make sure a tail is running, that
 443                         * will percolate up to make sure our qnode->next is
 444                         * running.
 445                         *
 446                         * But first, we need to tell everyone that there is no
 447                         * specific lockholder.  lockholder_vcoreid is a
 448                         * short-circuit on the "walk the chain" PDR.  Normally,
 449                         * that's okay.  But now we need to make sure everyone
 450                         * is walking the chain from a_tail up to our pred. */
 451                        lock->lockholder_vcoreid = MCSPDR_NO_LOCKHOLDER;
 452                        ensure_vcore_runs(a_tail_vcoreid);
 453                        cpu_relax();
 454                }
 455                /* Alpha wants a read_barrier_depends() here */
 456                lock->lockholder_vcoreid = qnode->next - qnode0;
 457                wmb();  /* order the vcoreid write before the unlock */
 458                qnode->next->locked = 0;
 459        } else {
 460                /* Note we're saying someone else is the lockholder, though we
 461                 * still are the lockholder until we unlock the next qnode.  Our
 462                 * next knows that if it sees itself is the lockholder, that it
 463                 * needs to make sure we run. */
 464                lock->lockholder_vcoreid = qnode->next - qnode0;
 465                /* mb()s necessary since we didn't call an atomic_swap() */
 466                /* need to make sure any previous writes don't pass unlocking */
 467                wmb();
 468                /* need to make sure any reads happen before the unlocking */
 469                rwmb();
 470                /* simply unlock whoever is next */
 471                qnode->next->locked = 0;
 472        }
 473}
 474
 475void mcs_pdr_lock(struct mcs_pdr_lock *lock)
 476{
 477        uth_disable_notifs();
 478        cmb();  /* in the off-chance the compiler wants to read vcoreid early */
 479        __mcs_pdr_lock(lock, &lock->qnodes[vcore_id()]);
 480}
 481
 482void mcs_pdr_unlock(struct mcs_pdr_lock *lock)
 483{
 484        __mcs_pdr_unlock(lock, &lock->qnodes[vcore_id()]);
 485        uth_enable_notifs();
 486}
 487