tests/linux: use Akaros's CFLAGS
[akaros.git] / user / parlib / mcs.c
1 #include <parlib/vcore.h>
2 #include <parlib/mcs.h>
3 #include <parlib/arch/atomic.h>
4 #include <string.h>
5 #include <stdlib.h>
6 #include <parlib/uthread.h>
7 #include <parlib/parlib.h>
8 #include <malloc.h>
9
10 // MCS locks
11 void mcs_lock_init(struct mcs_lock *lock)
12 {
13         memset(lock,0,sizeof(mcs_lock_t));
14 }
15
16 static inline mcs_lock_qnode_t *mcs_qnode_swap(mcs_lock_qnode_t **addr,
17                                                mcs_lock_qnode_t *val)
18 {
19         return (mcs_lock_qnode_t*)atomic_swap_ptr((void**)addr, val);
20 }
21
22 void mcs_lock_lock(struct mcs_lock *lock, struct mcs_lock_qnode *qnode)
23 {
24         qnode->next = 0;
25         cmb();  /* swap provides a CPU mb() */
26         mcs_lock_qnode_t *predecessor = mcs_qnode_swap(&lock->lock, qnode);
27         if (predecessor) {
28                 qnode->locked = 1;
29                 wmb();
30                 predecessor->next = qnode;
31                 /* no need for a wrmb(), since this will only get unlocked after
32                  * they read our previous write */
33                 while (qnode->locked)
34                         cpu_relax();
35         }
36         cmb();  /* just need a cmb, the swap handles the CPU wmb/wrmb() */
37 }
38
39 void mcs_lock_unlock(struct mcs_lock *lock, struct mcs_lock_qnode *qnode)
40 {
41         /* Check if someone is already waiting on us to unlock */
42         if (qnode->next == 0) {
43                 cmb(); /* no need for CPU mbs, since there's an atomic_swap() */
44                 /* Unlock it */
45                 mcs_lock_qnode_t *old_tail = mcs_qnode_swap(&lock->lock,0);
46                 /* no one else was already waiting, so we successfully unlocked
47                  * and can return */
48                 if (old_tail == qnode)
49                         return;
50                 /* someone else was already waiting on the lock (last one on the
51                  * list), and we accidentally took them off.  Try and put it
52                  * back. */
53                 mcs_lock_qnode_t *usurper = mcs_qnode_swap(&lock->lock,old_tail);
54                 /* since someone else was waiting, they should have made
55                  * themselves our next.  spin (very briefly!) til it happens. */
56                 while (qnode->next == 0)
57                         cpu_relax();
58                 if (usurper) {
59                         /* an usurper is someone who snuck in before we could
60                          * put the old tail back.  They now have the lock.
61                          * Let's put whoever is supposed to be next as their
62                          * next one. */
63                         usurper->next = qnode->next;
64                 } else {
65                         /* No usurper meant we put things back correctly, so we
66                          * should just pass the lock / unlock whoever is next */
67                         qnode->next->locked = 0;
68                 }
69         } else {
70                 /* mb()s necessary since we didn't call an atomic_swap() */
71                 /* need to make sure any previous writes don't pass unlocking */
72                 wmb();
73                 /* need to make sure any reads happen before the unlocking */
74                 rwmb();
75                 /* simply unlock whoever is next */
76                 qnode->next->locked = 0;
77         }
78 }
79
80 /* CAS style mcs locks, kept around til we use them.  We're using the
81  * usurper-style, since RISCV doesn't have a real CAS (yet?). */
82 void mcs_lock_unlock_cas(struct mcs_lock *lock, struct mcs_lock_qnode *qnode)
83 {
84         /* Check if someone is already waiting on us to unlock */
85         if (qnode->next == 0) {
86                 cmb();  /* no need for CPU mbs, since there's an atomic_cas() */
87                 /* If we're still the lock, just swap it with 0 (unlock) and
88                  * return */
89                 if (atomic_cas_ptr((void**)&lock->lock, qnode, 0))
90                         return;
91                 /* We failed, someone is there and we are some (maybe a
92                  * different) thread's pred.  Since someone else was waiting,
93                  * they should have made themselves our next.  Spin (very
94                  * briefly!) til it happens. */
95                 while (qnode->next == 0)
96                         cpu_relax();
97                 /* Alpha wants a read_barrier_depends() here */
98                 /* Now that we have a next, unlock them */
99                 qnode->next->locked = 0;
100         } else {
101                 /* mb()s necessary since we didn't call an atomic_swap() */
102                 /* need to make sure any previous writes don't pass unlocking */
103                 wmb();
104                 /* need to make sure any reads happen before the unlocking */
105                 rwmb();
106                 /* simply unlock whoever is next */
107                 qnode->next->locked = 0;
108         }
109 }
110
111 /* We don't bother saving the state, like we do with irqsave, since we can use
112  * whether or not we are in vcore context to determine that.  This means you
113  * shouldn't call this from those moments when you fake being in vcore context
114  * (when switching into the TLS, etc). */
115 void mcs_lock_notifsafe(struct mcs_lock *lock, struct mcs_lock_qnode *qnode)
116 {
117         uth_disable_notifs();
118         mcs_lock_lock(lock, qnode);
119 }
120
121 /* Note we turn off the DONT_MIGRATE flag before enabling notifs.  This is fine,
122  * since we wouldn't receive any notifs that could lead to us migrating after we
123  * set DONT_MIGRATE but before enable_notifs().  We need it to be in this order,
124  * since we need to check messages after ~DONT_MIGRATE. */
125 void mcs_unlock_notifsafe(struct mcs_lock *lock, struct mcs_lock_qnode *qnode)
126 {
127         mcs_lock_unlock(lock, qnode);
128         uth_enable_notifs();
129 }
130
131 /* Preemption detection and recovering MCS locks. */
132 /* Old style.  Has trouble getting out of 'preempt/change-to storms' under
133  * heavy contention and with preemption. */
134 void mcs_pdro_init(struct mcs_pdro_lock *lock)
135 {
136         lock->lock = 0;
137 }
138
139 void mcs_pdro_fini(struct mcs_pdro_lock *lock)
140 {
141 }
142
143 /* Internal version of the locking function, doesn't care if notifs are
144  * disabled.  While spinning, we'll check to see if other vcores involved in the
145  * locking are running.  If we change to that vcore, we'll continue when our
146  * vcore gets restarted.  If the change fails, it is because the vcore is
147  * running, and we'll continue.
148  *
149  * It's worth noting that changing to another vcore won't hurt correctness.
150  * Even if they are no longer the lockholder, they will be checking preemption
151  * messages and will help break out of the deadlock.  So long as we don't
152  * spin uncontrollably, we're okay. */
153 void __mcs_pdro_lock(struct mcs_pdro_lock *lock, struct mcs_pdro_qnode *qnode)
154 {
155         struct mcs_pdro_qnode *predecessor;
156         uint32_t pred_vcoreid;
157
158         qnode->next = 0;
159         cmb();  /* swap provides a CPU mb() */
160         predecessor = atomic_swap_ptr((void**)&lock->lock, qnode);
161         if (predecessor) {
162                 qnode->locked = 1;
163                 /* Read-in the vcoreid before releasing them.  We won't need to
164                  * worry about their qnode memory being freed/reused (they can't
165                  * til we fill in the 'next' slot), which is a bit of a
166                  * performance win.  This also cuts down on cache-line
167                  * contention when we ensure they run, which helps a lot too. */
168                 pred_vcoreid = ACCESS_ONCE(predecessor->vcoreid);
169                 wmb();  /* order the locked write before the next write */
170                 predecessor->next = qnode;
171                 /* no need for a wrmb(), since this will only get unlocked after
172                  * they read our previous write */
173                 while (qnode->locked) {
174                         /* We don't know who the lock holder is (it hurts
175                          * performance via 'true' sharing to track it)  Instead
176                          * we'll make sure our pred is running, which trickles
177                          * up to the lock holder. */
178                         ensure_vcore_runs(pred_vcoreid);
179                         cpu_relax();
180                 }
181         }
182 }
183
184 /* Using the CAS style unlocks, since the usurper recovery is a real pain in the
185  * ass */
186 void __mcs_pdro_unlock(struct mcs_pdro_lock *lock, struct mcs_pdro_qnode *qnode)
187 {
188         uint32_t a_tail_vcoreid;
189         /* Check if someone is already waiting on us to unlock */
190         if (qnode->next == 0) {
191                 cmb();  /* no need for CPU mbs, since there's an atomic_cas() */
192                 /* If we're still the lock, just swap it with 0 (unlock) and
193                  * return */
194                 if (atomic_cas_ptr((void**)&lock->lock, qnode, 0))
195                         return;
196                 /* Read in the tail (or someone who recently was the tail, but
197                  * could now be farther up the chain), in prep for our spinning.
198                  */
199                 a_tail_vcoreid = ACCESS_ONCE(lock->lock->vcoreid);
200                 /* We failed, someone is there and we are some (maybe a
201                  * different) thread's pred.  Since someone else was waiting,
202                  * they should have made themselves our next.  Spin (very
203                  * briefly!) til it happens. */
204                 while (qnode->next == 0) {
205                         /* We need to get our next to run, but we don't know who
206                          * they are.  If we make sure a tail is running, that
207                          * will percolate up to make sure our qnode->next is
208                          * running */
209                         ensure_vcore_runs(a_tail_vcoreid);
210                         cpu_relax();
211                 }
212                 /* Alpha wants a read_barrier_depends() here */
213                 /* Now that we have a next, unlock them */
214                 qnode->next->locked = 0;
215         } else {
216                 /* mb()s necessary since we didn't call an atomic_swap() */
217                 /* need to make sure any previous writes don't pass unlocking */
218                 wmb();
219                 /* need to make sure any reads happen before the unlocking */
220                 rwmb();
221                 /* simply unlock whoever is next */
222                 qnode->next->locked = 0;
223         }
224 }
225
226 /* Actual MCS-PDRO locks.  Don't worry about initializing any fields of qnode.
227  * We'll do vcoreid here, and the locking code deals with the other fields */
228 void mcs_pdro_lock(struct mcs_pdro_lock *lock, struct mcs_pdro_qnode *qnode)
229 {
230         /* Disable notifs, if we're an _M uthread */
231         uth_disable_notifs();
232         cmb();  /* in the off-chance the compiler wants to read vcoreid early */
233         qnode->vcoreid = vcore_id();
234         __mcs_pdro_lock(lock, qnode);
235 }
236
237 /* CAS-less unlock, not quite as efficient and will make sure every vcore runs
238  * (since we don't have a convenient way to make sure our qnode->next runs
239  * yet, other than making sure everyone runs).
240  *
241  * To use this without ensuring all vcores run, you'll need the unlock code to
242  * save pred to a specific field in the qnode and check both its initial pred
243  * as well as its run time pred (who could be an usurper).  It's all possible,
244  * but a little more difficult to follow.  Also, I'm adjusting this comment
245  * months after writing it originally, so it is probably not sufficient, but
246  * necessary. */
247 void __mcs_pdro_unlock_no_cas(struct mcs_pdro_lock *lock,
248                              struct mcs_pdro_qnode *qnode)
249 {
250         struct mcs_pdro_qnode *old_tail, *usurper;
251
252         /* Check if someone is already waiting on us to unlock */
253         if (qnode->next == 0) {
254                 cmb(); /* no need for CPU mbs, since there's an atomic_swap() */
255                 /* Unlock it */
256                 old_tail = atomic_swap_ptr((void**)&lock->lock, 0);
257                 /* no one else was already waiting, so we successfully unlocked
258                  * and can return */
259                 if (old_tail == qnode)
260                         return;
261                 /* someone else was already waiting on the lock (last one on the
262                  * list), and we accidentally took them off.  Try and put it
263                  * back. */
264                 usurper = atomic_swap_ptr((void*)&lock->lock, old_tail);
265                 /* since someone else was waiting, they should have made
266                  * themselves our next.  spin (very briefly!) til it happens. */
267                 while (qnode->next == 0) {
268                         /* make sure old_tail isn't preempted.  best we can do
269                          * for now is to make sure all vcores run, and thereby
270                          * get our next. */
271                         for (int i = 0; i < max_vcores(); i++)
272                                 ensure_vcore_runs(i);
273                         cpu_relax();
274                 }
275                 if (usurper) {
276                         /* an usurper is someone who snuck in before we could
277                          * put the old tail back.  They now have the lock.
278                          * Let's put whoever is supposed to be next as their
279                          * next one. 
280                          *
281                          * First, we need to change our next's pred.  There's a
282                          * slight race here, so our next will need to make sure
283                          * both us and pred are done */
284                         /* I was trying to do something so we didn't need to
285                          * ensure all vcores run, using more space in the qnode
286                          * to figure out who our pred was a lock time (guessing
287                          * actually, since there's a race, etc). */
288                         //qnode->next->pred = usurper;
289                         //wmb();
290                         usurper->next = qnode->next;
291                         /* could imagine another wmb() and a flag so our next
292                          * knows to no longer check us too. */
293                 } else {
294                         /* No usurper meant we put things back correctly, so we
295                          * should just pass the lock / unlock whoever is next */
296                         qnode->next->locked = 0;
297                 }
298         } else {
299                 /* mb()s necessary since we didn't call an atomic_swap() */
300                 /* need to make sure any previous writes don't pass unlocking */
301                 wmb();
302                 /* need to make sure any reads happen before the unlocking */
303                 rwmb();
304                 /* simply unlock whoever is next */
305                 qnode->next->locked = 0;
306         }
307 }
308
309 void mcs_pdro_unlock(struct mcs_pdro_lock *lock, struct mcs_pdro_qnode *qnode)
310 {
311         __mcs_pdro_unlock(lock, qnode);
312         /* Enable notifs, if we're an _M uthread */
313         uth_enable_notifs();
314 }
315
316 /* New style: under heavy contention with preemption, they won't enter the
317  * 'preempt/change_to storm' that can happen to PDRs, at the cost of some
318  * performance.  This is the default. */
319 void mcs_pdr_init(struct mcs_pdr_lock *lock)
320 {
321         int ret;
322
323         lock->lock = 0;
324         lock->lockholder_vcoreid = MCSPDR_NO_LOCKHOLDER;
325         ret = posix_memalign((void**)&lock->qnodes,
326                              __alignof__(struct mcs_pdr_qnode),
327                              sizeof(struct mcs_pdr_qnode) * max_vcores());
328         assert(!ret);
329 }
330
331 void mcs_pdr_fini(struct mcs_pdr_lock *lock)
332 {
333         free(lock->qnodes);
334 }
335
336 /* Similar to the original PDR lock, this tracks the lockholder for better
337  * recovery from preemptions.  Under heavy contention, changing to the
338  * lockholder instead of pred makes it more likely to have a vcore outside the
339  * MCS chain handle the preemption.  If that never happens, performance will
340  * suffer.
341  *
342  * Simply checking the lockholder causes a lot of unnecessary traffic, so we
343  * first look for signs of preemption in read-mostly locations (by comparison,
344  * the lockholder changes on every lock/unlock).
345  *
346  * We also use the "qnodes are in the lock" style, which is slightly slower than
347  * using the stack in regular MCS/MCSPDR locks, but it speeds PDR up a bit by
348  * not having to read other qnodes' memory to determine their vcoreid.  The
349  * slowdown may be due to some weird caching/prefetch settings (like Adjacent
350  * Cacheline Prefetch).
351  *
352  * Note that these locks, like all PDR locks, have opportunities to accidentally
353  * ensure some vcore runs that isn't in the chain.  Whenever we read lockholder
354  * or even pred, that particular vcore might subsequently unlock and then get
355  * preempted (or change_to someone else) before we ensure they run.  If this
356  * happens and there is another VC in the MCS chain, it will make sure the right
357  * cores run.  If there are no other vcores in the chain, it is up to the rest
358  * of the vcore/event handling system to deal with this, which should happen
359  * when one of the other vcores handles the preemption message generated by our
360  * change_to. */
361 void __mcs_pdr_lock(struct mcs_pdr_lock *lock, struct mcs_pdr_qnode *qnode)
362 {
363         struct mcs_pdr_qnode *predecessor;
364         uint32_t pred_vcoreid;
365         struct mcs_pdr_qnode *qnode0 = qnode - vcore_id();
366         seq_ctr_t seq;
367         qnode->next = 0;
368         cmb();  /* swap provides a CPU mb() */
369         predecessor = atomic_swap_ptr((void**)&lock->lock, qnode);
370         if (predecessor) {
371                 qnode->locked = 1;
372                 /* can compute this whenever */
373                 pred_vcoreid = predecessor - qnode0;
374                 wmb();  /* order the locked write before the next write */
375                 predecessor->next = qnode;
376                 seq = ACCESS_ONCE(__procinfo.coremap_seqctr);
377                 /* no need for a wrmb(), since this will only get unlocked after
378                  * they read our pred->next write */
379                 while (qnode->locked) {
380                         /* Check to see if anything is amiss.  If someone in the
381                          * chain is preempted, then someone will notice.  Simply
382                          * checking our pred isn't that great of an indicator of
383                          * preemption.  The reason is that the offline vcore is
384                          * most likely the lockholder (under heavy lock
385                          * contention), and we want someone farther back in the
386                          * chain to notice (someone that will stay preempted
387                          * long enough for a vcore outside the chain to recover
388                          * them).  Checking the seqctr will tell us of any
389                          * preempts since we started, so if a storm starts while
390                          * we're spinning, we can join in and try to save the
391                          * lockholder before its successor gets it.
392                          *
393                          * Also, if we're the lockholder, then we need to let
394                          * our pred run so they can hand us the lock. */
395                         if (vcore_is_preempted(pred_vcoreid) ||
396                             seq != __procinfo.coremap_seqctr) {
397                                 /* Note that we don't normally ensure our *pred*
398                                  * runs. */
399                                 if (lock->lockholder_vcoreid ==
400                                     MCSPDR_NO_LOCKHOLDER ||
401                                     lock->lockholder_vcoreid == vcore_id())
402                                         ensure_vcore_runs(pred_vcoreid);
403                                 else
404                                         ensure_vcore_runs(
405                                                 lock->lockholder_vcoreid);
406                         }
407                         cpu_relax();
408                 }
409         } else {
410                 lock->lockholder_vcoreid = vcore_id();
411         }
412 }
413
414 void __mcs_pdr_unlock(struct mcs_pdr_lock *lock, struct mcs_pdr_qnode *qnode)
415 {
416         uint32_t a_tail_vcoreid;
417         struct mcs_pdr_qnode *qnode0 = qnode - vcore_id();
418
419         /* Check if someone is already waiting on us to unlock */
420         if (qnode->next == 0) {
421                 cmb();  /* no need for CPU mbs, since there's an atomic_cas() */
422                 /* If we're still the lock, just swap it with 0 (unlock) and
423                  * return */
424                 if (atomic_cas_ptr((void**)&lock->lock, qnode, 0)) {
425                         /* This is racy with the new lockholder.  it's possible
426                          * that we'll clobber their legit write, though it
427                          * doesn't actually hurt correctness.  it'll get sorted
428                          * out on the next unlock. */
429                         lock->lockholder_vcoreid = MCSPDR_NO_LOCKHOLDER;
430                         return;
431                 }
432                 /* Get the tail (or someone who recently was the tail, but could
433                  * now be farther up the chain), in prep for our spinning.
434                  * Could do an ACCESS_ONCE on lock->lock */
435                 a_tail_vcoreid = lock->lock - qnode0;
436                 /* We failed, someone is there and we are some (maybe a
437                  * different) thread's pred.  Since someone else was waiting,
438                  * they should have made themselves our next.  Spin (very
439                  * briefly!) til it happens. */
440                 while (qnode->next == 0) {
441                         /* We need to get our next to run, but we don't know who
442                          * they are.  If we make sure a tail is running, that
443                          * will percolate up to make sure our qnode->next is
444                          * running.
445                          *
446                          * But first, we need to tell everyone that there is no
447                          * specific lockholder.  lockholder_vcoreid is a
448                          * short-circuit on the "walk the chain" PDR.  Normally,
449                          * that's okay.  But now we need to make sure everyone
450                          * is walking the chain from a_tail up to our pred. */
451                         lock->lockholder_vcoreid = MCSPDR_NO_LOCKHOLDER;
452                         ensure_vcore_runs(a_tail_vcoreid);
453                         cpu_relax();
454                 }
455                 /* Alpha wants a read_barrier_depends() here */
456                 lock->lockholder_vcoreid = qnode->next - qnode0;
457                 wmb();  /* order the vcoreid write before the unlock */
458                 qnode->next->locked = 0;
459         } else {
460                 /* Note we're saying someone else is the lockholder, though we
461                  * still are the lockholder until we unlock the next qnode.  Our
462                  * next knows that if it sees itself is the lockholder, that it
463                  * needs to make sure we run. */
464                 lock->lockholder_vcoreid = qnode->next - qnode0;
465                 /* mb()s necessary since we didn't call an atomic_swap() */
466                 /* need to make sure any previous writes don't pass unlocking */
467                 wmb();
468                 /* need to make sure any reads happen before the unlocking */
469                 rwmb();
470                 /* simply unlock whoever is next */
471                 qnode->next->locked = 0;
472         }
473 }
474
475 void mcs_pdr_lock(struct mcs_pdr_lock *lock)
476 {
477         uth_disable_notifs();
478         cmb();  /* in the off-chance the compiler wants to read vcoreid early */
479         __mcs_pdr_lock(lock, &lock->qnodes[vcore_id()]);
480 }
481
482 void mcs_pdr_unlock(struct mcs_pdr_lock *lock)
483 {
484         __mcs_pdr_unlock(lock, &lock->qnodes[vcore_id()]);
485         uth_enable_notifs();
486 }