1 #include <parlib/vcore.h>
2 #include <parlib/mcs.h>
3 #include <parlib/arch/atomic.h>
6 #include <parlib/uthread.h>
7 #include <parlib/parlib.h>
11 void mcs_lock_init(struct mcs_lock *lock)
13 memset(lock,0,sizeof(mcs_lock_t));
16 static inline mcs_lock_qnode_t *mcs_qnode_swap(mcs_lock_qnode_t **addr,
17 mcs_lock_qnode_t *val)
19 return (mcs_lock_qnode_t*)atomic_swap_ptr((void**)addr, val);
22 void mcs_lock_lock(struct mcs_lock *lock, struct mcs_lock_qnode *qnode)
25 cmb(); /* swap provides a CPU mb() */
26 mcs_lock_qnode_t *predecessor = mcs_qnode_swap(&lock->lock, qnode);
30 predecessor->next = qnode;
31 /* no need for a wrmb(), since this will only get unlocked after they
32 * read our previous write */
36 cmb(); /* just need a cmb, the swap handles the CPU wmb/wrmb() */
39 void mcs_lock_unlock(struct mcs_lock *lock, struct mcs_lock_qnode *qnode)
41 /* Check if someone is already waiting on us to unlock */
42 if (qnode->next == 0) {
43 cmb(); /* no need for CPU mbs, since there's an atomic_swap() */
45 mcs_lock_qnode_t *old_tail = mcs_qnode_swap(&lock->lock,0);
46 /* no one else was already waiting, so we successfully unlocked and can
48 if (old_tail == qnode)
50 /* someone else was already waiting on the lock (last one on the list),
51 * and we accidentally took them off. Try and put it back. */
52 mcs_lock_qnode_t *usurper = mcs_qnode_swap(&lock->lock,old_tail);
53 /* since someone else was waiting, they should have made themselves our
54 * next. spin (very briefly!) til it happens. */
55 while (qnode->next == 0)
58 /* an usurper is someone who snuck in before we could put the old
59 * tail back. They now have the lock. Let's put whoever is
60 * supposed to be next as their next one. */
61 usurper->next = qnode->next;
63 /* No usurper meant we put things back correctly, so we should just
64 * pass the lock / unlock whoever is next */
65 qnode->next->locked = 0;
68 /* mb()s necessary since we didn't call an atomic_swap() */
69 wmb(); /* need to make sure any previous writes don't pass unlocking */
70 rwmb(); /* need to make sure any reads happen before the unlocking */
71 /* simply unlock whoever is next */
72 qnode->next->locked = 0;
76 /* CAS style mcs locks, kept around til we use them. We're using the
77 * usurper-style, since RISCV doesn't have a real CAS (yet?). */
78 void mcs_lock_unlock_cas(struct mcs_lock *lock, struct mcs_lock_qnode *qnode)
80 /* Check if someone is already waiting on us to unlock */
81 if (qnode->next == 0) {
82 cmb(); /* no need for CPU mbs, since there's an atomic_cas() */
83 /* If we're still the lock, just swap it with 0 (unlock) and return */
84 if (atomic_cas_ptr((void**)&lock->lock, qnode, 0))
86 /* We failed, someone is there and we are some (maybe a different)
87 * thread's pred. Since someone else was waiting, they should have made
88 * themselves our next. Spin (very briefly!) til it happens. */
89 while (qnode->next == 0)
91 /* Alpha wants a read_barrier_depends() here */
92 /* Now that we have a next, unlock them */
93 qnode->next->locked = 0;
95 /* mb()s necessary since we didn't call an atomic_swap() */
96 wmb(); /* need to make sure any previous writes don't pass unlocking */
97 rwmb(); /* need to make sure any reads happen before the unlocking */
98 /* simply unlock whoever is next */
99 qnode->next->locked = 0;
103 /* We don't bother saving the state, like we do with irqsave, since we can use
104 * whether or not we are in vcore context to determine that. This means you
105 * shouldn't call this from those moments when you fake being in vcore context
106 * (when switching into the TLS, etc). */
107 void mcs_lock_notifsafe(struct mcs_lock *lock, struct mcs_lock_qnode *qnode)
109 uth_disable_notifs();
110 mcs_lock_lock(lock, qnode);
113 /* Note we turn off the DONT_MIGRATE flag before enabling notifs. This is fine,
114 * since we wouldn't receive any notifs that could lead to us migrating after we
115 * set DONT_MIGRATE but before enable_notifs(). We need it to be in this order,
116 * since we need to check messages after ~DONT_MIGRATE. */
117 void mcs_unlock_notifsafe(struct mcs_lock *lock, struct mcs_lock_qnode *qnode)
119 mcs_lock_unlock(lock, qnode);
123 /* Preemption detection and recovering MCS locks. */
124 /* Old style. Has trouble getting out of 'preempt/change-to storms' under
125 * heavy contention and with preemption. */
126 void mcs_pdro_init(struct mcs_pdro_lock *lock)
131 void mcs_pdro_fini(struct mcs_pdro_lock *lock)
135 /* Internal version of the locking function, doesn't care if notifs are
136 * disabled. While spinning, we'll check to see if other vcores involved in the
137 * locking are running. If we change to that vcore, we'll continue when our
138 * vcore gets restarted. If the change fails, it is because the vcore is
139 * running, and we'll continue.
141 * It's worth noting that changing to another vcore won't hurt correctness.
142 * Even if they are no longer the lockholder, they will be checking preemption
143 * messages and will help break out of the deadlock. So long as we don't
144 * spin uncontrollably, we're okay. */
145 void __mcs_pdro_lock(struct mcs_pdro_lock *lock, struct mcs_pdro_qnode *qnode)
147 struct mcs_pdro_qnode *predecessor;
148 uint32_t pred_vcoreid;
149 /* Now the actual lock */
151 cmb(); /* swap provides a CPU mb() */
152 predecessor = atomic_swap_ptr((void**)&lock->lock, qnode);
155 /* Read-in the vcoreid before releasing them. We won't need to worry
156 * about their qnode memory being freed/reused (they can't til we fill
157 * in the 'next' slot), which is a bit of a performance win. This also
158 * cuts down on cache-line contention when we ensure they run, which
159 * helps a lot too. */
160 pred_vcoreid = ACCESS_ONCE(predecessor->vcoreid);
161 wmb(); /* order the locked write before the next write */
162 predecessor->next = qnode;
163 /* no need for a wrmb(), since this will only get unlocked after they
164 * read our previous write */
165 while (qnode->locked) {
166 /* We don't know who the lock holder is (it hurts performance via
167 * 'true' sharing to track it) Instead we'll make sure our pred is
168 * running, which trickles up to the lock holder. */
169 ensure_vcore_runs(pred_vcoreid);
175 /* Using the CAS style unlocks, since the usurper recovery is a real pain in the
177 void __mcs_pdro_unlock(struct mcs_pdro_lock *lock, struct mcs_pdro_qnode *qnode)
179 uint32_t a_tail_vcoreid;
180 /* Check if someone is already waiting on us to unlock */
181 if (qnode->next == 0) {
182 cmb(); /* no need for CPU mbs, since there's an atomic_cas() */
183 /* If we're still the lock, just swap it with 0 (unlock) and return */
184 if (atomic_cas_ptr((void**)&lock->lock, qnode, 0))
186 /* Read in the tail (or someone who recently was the tail, but could now
187 * be farther up the chain), in prep for our spinning. */
188 a_tail_vcoreid = ACCESS_ONCE(lock->lock->vcoreid);
189 /* We failed, someone is there and we are some (maybe a different)
190 * thread's pred. Since someone else was waiting, they should have made
191 * themselves our next. Spin (very briefly!) til it happens. */
192 while (qnode->next == 0) {
193 /* We need to get our next to run, but we don't know who they are.
194 * If we make sure a tail is running, that will percolate up to make
195 * sure our qnode->next is running */
196 ensure_vcore_runs(a_tail_vcoreid);
199 /* Alpha wants a read_barrier_depends() here */
200 /* Now that we have a next, unlock them */
201 qnode->next->locked = 0;
203 /* mb()s necessary since we didn't call an atomic_swap() */
204 wmb(); /* need to make sure any previous writes don't pass unlocking */
205 rwmb(); /* need to make sure any reads happen before the unlocking */
206 /* simply unlock whoever is next */
207 qnode->next->locked = 0;
211 /* Actual MCS-PDRO locks. Don't worry about initializing any fields of qnode.
212 * We'll do vcoreid here, and the locking code deals with the other fields */
213 void mcs_pdro_lock(struct mcs_pdro_lock *lock, struct mcs_pdro_qnode *qnode)
215 /* Disable notifs, if we're an _M uthread */
216 uth_disable_notifs();
217 cmb(); /* in the off-chance the compiler wants to read vcoreid early */
218 qnode->vcoreid = vcore_id();
219 __mcs_pdro_lock(lock, qnode);
222 /* CAS-less unlock, not quite as efficient and will make sure every vcore runs
223 * (since we don't have a convenient way to make sure our qnode->next runs
224 * yet, other than making sure everyone runs).
226 * To use this without ensuring all vcores run, you'll need the unlock code to
227 * save pred to a specific field in the qnode and check both its initial pred
228 * as well as its run time pred (who could be an usurper). It's all possible,
229 * but a little more difficult to follow. Also, I'm adjusting this comment
230 * months after writing it originally, so it is probably not sufficient, but
232 void __mcs_pdro_unlock_no_cas(struct mcs_pdro_lock *lock,
233 struct mcs_pdro_qnode *qnode)
235 struct mcs_pdro_qnode *old_tail, *usurper;
236 /* Check if someone is already waiting on us to unlock */
237 if (qnode->next == 0) {
238 cmb(); /* no need for CPU mbs, since there's an atomic_swap() */
240 old_tail = atomic_swap_ptr((void**)&lock->lock, 0);
241 /* no one else was already waiting, so we successfully unlocked and can
243 if (old_tail == qnode)
245 /* someone else was already waiting on the lock (last one on the list),
246 * and we accidentally took them off. Try and put it back. */
247 usurper = atomic_swap_ptr((void*)&lock->lock, old_tail);
248 /* since someone else was waiting, they should have made themselves our
249 * next. spin (very briefly!) til it happens. */
250 while (qnode->next == 0) {
251 /* make sure old_tail isn't preempted. best we can do for now is
252 * to make sure all vcores run, and thereby get our next. */
253 for (int i = 0; i < max_vcores(); i++)
254 ensure_vcore_runs(i);
258 /* an usurper is someone who snuck in before we could put the old
259 * tail back. They now have the lock. Let's put whoever is
260 * supposed to be next as their next one.
262 * First, we need to change our next's pred. There's a slight race
263 * here, so our next will need to make sure both us and pred are
265 /* I was trying to do something so we didn't need to ensure all
266 * vcores run, using more space in the qnode to figure out who our
267 * pred was a lock time (guessing actually, since there's a race,
269 //qnode->next->pred = usurper;
271 usurper->next = qnode->next;
272 /* could imagine another wmb() and a flag so our next knows to no
273 * longer check us too. */
275 /* No usurper meant we put things back correctly, so we should just
276 * pass the lock / unlock whoever is next */
277 qnode->next->locked = 0;
280 /* mb()s necessary since we didn't call an atomic_swap() */
281 wmb(); /* need to make sure any previous writes don't pass unlocking */
282 rwmb(); /* need to make sure any reads happen before the unlocking */
283 /* simply unlock whoever is next */
284 qnode->next->locked = 0;
288 void mcs_pdro_unlock(struct mcs_pdro_lock *lock, struct mcs_pdro_qnode *qnode)
290 __mcs_pdro_unlock(lock, qnode);
291 /* Enable notifs, if we're an _M uthread */
295 /* New style: under heavy contention with preemption, they won't enter the
296 * 'preempt/change_to storm' that can happen to PDRs, at the cost of some
297 * performance. This is the default. */
298 void mcs_pdr_init(struct mcs_pdr_lock *lock)
302 lock->lockholder_vcoreid = MCSPDR_NO_LOCKHOLDER;
303 ret = posix_memalign((void**)&lock->qnodes,
304 __alignof__(struct mcs_pdr_qnode),
305 sizeof(struct mcs_pdr_qnode) * max_vcores());
309 void mcs_pdr_fini(struct mcs_pdr_lock *lock)
314 /* Similar to the original PDR lock, this tracks the lockholder for better
315 * recovery from preemptions. Under heavy contention, changing to the
316 * lockholder instead of pred makes it more likely to have a vcore outside the
317 * MCS chain handle the preemption. If that never happens, performance will
320 * Simply checking the lockholder causes a lot of unnecessary traffic, so we
321 * first look for signs of preemption in read-mostly locations (by comparison,
322 * the lockholder changes on every lock/unlock).
324 * We also use the "qnodes are in the lock" style, which is slightly slower than
325 * using the stack in regular MCS/MCSPDR locks, but it speeds PDR up a bit by
326 * not having to read other qnodes' memory to determine their vcoreid. The
327 * slowdown may be due to some weird caching/prefetch settings (like Adjacent
328 * Cacheline Prefetch).
330 * Note that these locks, like all PDR locks, have opportunities to accidentally
331 * ensure some vcore runs that isn't in the chain. Whenever we read lockholder
332 * or even pred, that particular vcore might subsequently unlock and then get
333 * preempted (or change_to someone else) before we ensure they run. If this
334 * happens and there is another VC in the MCS chain, it will make sure the right
335 * cores run. If there are no other vcores in the chain, it is up to the rest
336 * of the vcore/event handling system to deal with this, which should happen
337 * when one of the other vcores handles the preemption message generated by our
339 void __mcs_pdr_lock(struct mcs_pdr_lock *lock, struct mcs_pdr_qnode *qnode)
341 struct mcs_pdr_qnode *predecessor;
342 uint32_t pred_vcoreid;
343 struct mcs_pdr_qnode *qnode0 = qnode - vcore_id();
346 cmb(); /* swap provides a CPU mb() */
347 predecessor = atomic_swap_ptr((void**)&lock->lock, qnode);
350 pred_vcoreid = predecessor - qnode0; /* can compute this whenever */
351 wmb(); /* order the locked write before the next write */
352 predecessor->next = qnode;
353 seq = ACCESS_ONCE(__procinfo.coremap_seqctr);
354 /* no need for a wrmb(), since this will only get unlocked after they
355 * read our pred->next write */
356 while (qnode->locked) {
357 /* Check to see if anything is amiss. If someone in the chain is
358 * preempted, then someone will notice. Simply checking our pred
359 * isn't that great of an indicator of preemption. The reason is
360 * that the offline vcore is most likely the lockholder (under heavy
361 * lock contention), and we want someone farther back in the chain
362 * to notice (someone that will stay preempted long enough for a
363 * vcore outside the chain to recover them). Checking the seqctr
364 * will tell us of any preempts since we started, so if a storm
365 * starts while we're spinning, we can join in and try to save the
366 * lockholder before its successor gets it.
368 * Also, if we're the lockholder, then we need to let our pred run
369 * so they can hand us the lock. */
370 if (vcore_is_preempted(pred_vcoreid) ||
371 seq != __procinfo.coremap_seqctr) {
372 /* Note that we don't normally ensure our *pred* runs. */
373 if (lock->lockholder_vcoreid == MCSPDR_NO_LOCKHOLDER ||
374 lock->lockholder_vcoreid == vcore_id())
375 ensure_vcore_runs(pred_vcoreid);
377 ensure_vcore_runs(lock->lockholder_vcoreid);
382 lock->lockholder_vcoreid = vcore_id();
386 void __mcs_pdr_unlock(struct mcs_pdr_lock *lock, struct mcs_pdr_qnode *qnode)
388 uint32_t a_tail_vcoreid;
389 struct mcs_pdr_qnode *qnode0 = qnode - vcore_id();
390 /* Check if someone is already waiting on us to unlock */
391 if (qnode->next == 0) {
392 cmb(); /* no need for CPU mbs, since there's an atomic_cas() */
393 /* If we're still the lock, just swap it with 0 (unlock) and return */
394 if (atomic_cas_ptr((void**)&lock->lock, qnode, 0)) {
395 /* This is racy with the new lockholder. it's possible that we'll
396 * clobber their legit write, though it doesn't actually hurt
397 * correctness. it'll get sorted out on the next unlock. */
398 lock->lockholder_vcoreid = MCSPDR_NO_LOCKHOLDER;
401 /* Get the tail (or someone who recently was the tail, but could now
402 * be farther up the chain), in prep for our spinning. Could do an
403 * ACCESS_ONCE on lock->lock */
404 a_tail_vcoreid = lock->lock - qnode0;
405 /* We failed, someone is there and we are some (maybe a different)
406 * thread's pred. Since someone else was waiting, they should have made
407 * themselves our next. Spin (very briefly!) til it happens. */
408 while (qnode->next == 0) {
409 /* We need to get our next to run, but we don't know who they are.
410 * If we make sure a tail is running, that will percolate up to make
411 * sure our qnode->next is running.
413 * But first, we need to tell everyone that there is no specific
414 * lockholder. lockholder_vcoreid is a short-circuit on the "walk
415 * the chain" PDR. Normally, that's okay. But now we need to make
416 * sure everyone is walking the chain from a_tail up to our pred. */
417 lock->lockholder_vcoreid = MCSPDR_NO_LOCKHOLDER;
418 ensure_vcore_runs(a_tail_vcoreid);
421 /* Alpha wants a read_barrier_depends() here */
422 lock->lockholder_vcoreid = qnode->next - qnode0;
423 wmb(); /* order the vcoreid write before the unlock */
424 qnode->next->locked = 0;
426 /* Note we're saying someone else is the lockholder, though we still are
427 * the lockholder until we unlock the next qnode. Our next knows that
428 * if it sees itself is the lockholder, that it needs to make sure we
430 lock->lockholder_vcoreid = qnode->next - qnode0;
431 /* mb()s necessary since we didn't call an atomic_swap() */
432 wmb(); /* need to make sure any previous writes don't pass unlocking */
433 rwmb(); /* need to make sure any reads happen before the unlocking */
434 /* simply unlock whoever is next */
435 qnode->next->locked = 0;
439 void mcs_pdr_lock(struct mcs_pdr_lock *lock)
441 uth_disable_notifs();
442 cmb(); /* in the off-chance the compiler wants to read vcoreid early */
443 __mcs_pdr_lock(lock, &lock->qnodes[vcore_id()]);
446 void mcs_pdr_unlock(struct mcs_pdr_lock *lock)
448 __mcs_pdr_unlock(lock, &lock->qnodes[vcore_id()]);