parlib: Add __uth_sync_swap and __uth_sync_is_empty
[akaros.git] / user / parlib / mcs.c
1 #include <parlib/vcore.h>
2 #include <parlib/mcs.h>
3 #include <parlib/arch/atomic.h>
4 #include <string.h>
5 #include <stdlib.h>
6 #include <parlib/uthread.h>
7 #include <parlib/parlib.h>
8 #include <malloc.h>
9
10 // MCS locks
11 void mcs_lock_init(struct mcs_lock *lock)
12 {
13         memset(lock,0,sizeof(mcs_lock_t));
14 }
15
16 static inline mcs_lock_qnode_t *mcs_qnode_swap(mcs_lock_qnode_t **addr,
17                                                mcs_lock_qnode_t *val)
18 {
19         return (mcs_lock_qnode_t*)atomic_swap_ptr((void**)addr, val);
20 }
21
22 void mcs_lock_lock(struct mcs_lock *lock, struct mcs_lock_qnode *qnode)
23 {
24         qnode->next = 0;
25         cmb();  /* swap provides a CPU mb() */
26         mcs_lock_qnode_t *predecessor = mcs_qnode_swap(&lock->lock, qnode);
27         if (predecessor) {
28                 qnode->locked = 1;
29                 wmb();
30                 predecessor->next = qnode;
31                 /* no need for a wrmb(), since this will only get unlocked after they
32                  * read our previous write */
33                 while (qnode->locked)
34                         cpu_relax();
35         }
36         cmb();  /* just need a cmb, the swap handles the CPU wmb/wrmb() */
37 }
38
39 void mcs_lock_unlock(struct mcs_lock *lock, struct mcs_lock_qnode *qnode)
40 {
41         /* Check if someone is already waiting on us to unlock */
42         if (qnode->next == 0) {
43                 cmb();  /* no need for CPU mbs, since there's an atomic_swap() */
44                 /* Unlock it */
45                 mcs_lock_qnode_t *old_tail = mcs_qnode_swap(&lock->lock,0);
46                 /* no one else was already waiting, so we successfully unlocked and can
47                  * return */
48                 if (old_tail == qnode)
49                         return;
50                 /* someone else was already waiting on the lock (last one on the list),
51                  * and we accidentally took them off.  Try and put it back. */
52                 mcs_lock_qnode_t *usurper = mcs_qnode_swap(&lock->lock,old_tail);
53                 /* since someone else was waiting, they should have made themselves our
54                  * next.  spin (very briefly!) til it happens. */
55                 while (qnode->next == 0)
56                         cpu_relax();
57                 if (usurper) {
58                         /* an usurper is someone who snuck in before we could put the old
59                          * tail back.  They now have the lock.  Let's put whoever is
60                          * supposed to be next as their next one. */
61                         usurper->next = qnode->next;
62                 } else {
63                         /* No usurper meant we put things back correctly, so we should just
64                          * pass the lock / unlock whoever is next */
65                         qnode->next->locked = 0;
66                 }
67         } else {
68                 /* mb()s necessary since we didn't call an atomic_swap() */
69                 wmb();  /* need to make sure any previous writes don't pass unlocking */
70                 rwmb(); /* need to make sure any reads happen before the unlocking */
71                 /* simply unlock whoever is next */
72                 qnode->next->locked = 0;
73         }
74 }
75
76 /* CAS style mcs locks, kept around til we use them.  We're using the
77  * usurper-style, since RISCV doesn't have a real CAS (yet?). */
78 void mcs_lock_unlock_cas(struct mcs_lock *lock, struct mcs_lock_qnode *qnode)
79 {
80         /* Check if someone is already waiting on us to unlock */
81         if (qnode->next == 0) {
82                 cmb();  /* no need for CPU mbs, since there's an atomic_cas() */
83                 /* If we're still the lock, just swap it with 0 (unlock) and return */
84                 if (atomic_cas_ptr((void**)&lock->lock, qnode, 0))
85                         return;
86                 /* We failed, someone is there and we are some (maybe a different)
87                  * thread's pred.  Since someone else was waiting, they should have made
88                  * themselves our next.  Spin (very briefly!) til it happens. */
89                 while (qnode->next == 0)
90                         cpu_relax();
91                 /* Alpha wants a read_barrier_depends() here */
92                 /* Now that we have a next, unlock them */
93                 qnode->next->locked = 0;
94         } else {
95                 /* mb()s necessary since we didn't call an atomic_swap() */
96                 wmb();  /* need to make sure any previous writes don't pass unlocking */
97                 rwmb(); /* need to make sure any reads happen before the unlocking */
98                 /* simply unlock whoever is next */
99                 qnode->next->locked = 0;
100         }
101 }
102
103 /* We don't bother saving the state, like we do with irqsave, since we can use
104  * whether or not we are in vcore context to determine that.  This means you
105  * shouldn't call this from those moments when you fake being in vcore context
106  * (when switching into the TLS, etc). */
107 void mcs_lock_notifsafe(struct mcs_lock *lock, struct mcs_lock_qnode *qnode)
108 {
109         uth_disable_notifs();
110         mcs_lock_lock(lock, qnode);
111 }
112
113 /* Note we turn off the DONT_MIGRATE flag before enabling notifs.  This is fine,
114  * since we wouldn't receive any notifs that could lead to us migrating after we
115  * set DONT_MIGRATE but before enable_notifs().  We need it to be in this order,
116  * since we need to check messages after ~DONT_MIGRATE. */
117 void mcs_unlock_notifsafe(struct mcs_lock *lock, struct mcs_lock_qnode *qnode)
118 {
119         mcs_lock_unlock(lock, qnode);
120         uth_enable_notifs();
121 }
122
123 /* Preemption detection and recovering MCS locks. */
124 /* Old style.  Has trouble getting out of 'preempt/change-to storms' under
125  * heavy contention and with preemption. */
126 void mcs_pdro_init(struct mcs_pdro_lock *lock)
127 {
128         lock->lock = 0;
129 }
130
131 void mcs_pdro_fini(struct mcs_pdro_lock *lock)
132 {
133 }
134
135 /* Internal version of the locking function, doesn't care if notifs are
136  * disabled.  While spinning, we'll check to see if other vcores involved in the
137  * locking are running.  If we change to that vcore, we'll continue when our
138  * vcore gets restarted.  If the change fails, it is because the vcore is
139  * running, and we'll continue.
140  *
141  * It's worth noting that changing to another vcore won't hurt correctness.
142  * Even if they are no longer the lockholder, they will be checking preemption
143  * messages and will help break out of the deadlock.  So long as we don't
144  * spin uncontrollably, we're okay. */
145 void __mcs_pdro_lock(struct mcs_pdro_lock *lock, struct mcs_pdro_qnode *qnode)
146 {
147         struct mcs_pdro_qnode *predecessor;
148         uint32_t pred_vcoreid;
149         /* Now the actual lock */
150         qnode->next = 0;
151         cmb();  /* swap provides a CPU mb() */
152         predecessor = atomic_swap_ptr((void**)&lock->lock, qnode);
153         if (predecessor) {
154                 qnode->locked = 1;
155                 /* Read-in the vcoreid before releasing them.  We won't need to worry
156                  * about their qnode memory being freed/reused (they can't til we fill
157                  * in the 'next' slot), which is a bit of a performance win.  This also
158                  * cuts down on cache-line contention when we ensure they run, which
159                  * helps a lot too. */
160                 pred_vcoreid = ACCESS_ONCE(predecessor->vcoreid);
161                 wmb();  /* order the locked write before the next write */
162                 predecessor->next = qnode;
163                 /* no need for a wrmb(), since this will only get unlocked after they
164                  * read our previous write */
165                 while (qnode->locked) {
166                         /* We don't know who the lock holder is (it hurts performance via
167                          * 'true' sharing to track it)  Instead we'll make sure our pred is
168                          * running, which trickles up to the lock holder. */
169                         ensure_vcore_runs(pred_vcoreid);
170                         cpu_relax();
171                 }
172         }
173 }
174
175 /* Using the CAS style unlocks, since the usurper recovery is a real pain in the
176  * ass */
177 void __mcs_pdro_unlock(struct mcs_pdro_lock *lock, struct mcs_pdro_qnode *qnode)
178 {
179         uint32_t a_tail_vcoreid;
180         /* Check if someone is already waiting on us to unlock */
181         if (qnode->next == 0) {
182                 cmb();  /* no need for CPU mbs, since there's an atomic_cas() */
183                 /* If we're still the lock, just swap it with 0 (unlock) and return */
184                 if (atomic_cas_ptr((void**)&lock->lock, qnode, 0))
185                         return;
186                 /* Read in the tail (or someone who recently was the tail, but could now
187                  * be farther up the chain), in prep for our spinning. */
188                 a_tail_vcoreid = ACCESS_ONCE(lock->lock->vcoreid);
189                 /* We failed, someone is there and we are some (maybe a different)
190                  * thread's pred.  Since someone else was waiting, they should have made
191                  * themselves our next.  Spin (very briefly!) til it happens. */
192                 while (qnode->next == 0) {
193                         /* We need to get our next to run, but we don't know who they are.
194                          * If we make sure a tail is running, that will percolate up to make
195                          * sure our qnode->next is running */
196                         ensure_vcore_runs(a_tail_vcoreid);
197                         cpu_relax();
198                 }
199                 /* Alpha wants a read_barrier_depends() here */
200                 /* Now that we have a next, unlock them */
201                 qnode->next->locked = 0;
202         } else {
203                 /* mb()s necessary since we didn't call an atomic_swap() */
204                 wmb();  /* need to make sure any previous writes don't pass unlocking */
205                 rwmb(); /* need to make sure any reads happen before the unlocking */
206                 /* simply unlock whoever is next */
207                 qnode->next->locked = 0;
208         }
209 }
210
211 /* Actual MCS-PDRO locks.  Don't worry about initializing any fields of qnode.
212  * We'll do vcoreid here, and the locking code deals with the other fields */
213 void mcs_pdro_lock(struct mcs_pdro_lock *lock, struct mcs_pdro_qnode *qnode)
214 {
215         /* Disable notifs, if we're an _M uthread */
216         uth_disable_notifs();
217         cmb();  /* in the off-chance the compiler wants to read vcoreid early */
218         qnode->vcoreid = vcore_id();
219         __mcs_pdro_lock(lock, qnode);
220 }
221
222 /* CAS-less unlock, not quite as efficient and will make sure every vcore runs
223  * (since we don't have a convenient way to make sure our qnode->next runs
224  * yet, other than making sure everyone runs).
225  *
226  * To use this without ensuring all vcores run, you'll need the unlock code to
227  * save pred to a specific field in the qnode and check both its initial pred
228  * as well as its run time pred (who could be an usurper).  It's all possible,
229  * but a little more difficult to follow.  Also, I'm adjusting this comment
230  * months after writing it originally, so it is probably not sufficient, but
231  * necessary. */
232 void __mcs_pdro_unlock_no_cas(struct mcs_pdro_lock *lock,
233                              struct mcs_pdro_qnode *qnode)
234 {
235         struct mcs_pdro_qnode *old_tail, *usurper;
236         /* Check if someone is already waiting on us to unlock */
237         if (qnode->next == 0) {
238                 cmb();  /* no need for CPU mbs, since there's an atomic_swap() */
239                 /* Unlock it */
240                 old_tail = atomic_swap_ptr((void**)&lock->lock, 0);
241                 /* no one else was already waiting, so we successfully unlocked and can
242                  * return */
243                 if (old_tail == qnode)
244                         return;
245                 /* someone else was already waiting on the lock (last one on the list),
246                  * and we accidentally took them off.  Try and put it back. */
247                 usurper = atomic_swap_ptr((void*)&lock->lock, old_tail);
248                 /* since someone else was waiting, they should have made themselves our
249                  * next.  spin (very briefly!) til it happens. */
250                 while (qnode->next == 0) {
251                         /* make sure old_tail isn't preempted.  best we can do for now is
252                          * to make sure all vcores run, and thereby get our next. */
253                         for (int i = 0; i < max_vcores(); i++)
254                                 ensure_vcore_runs(i);
255                         cpu_relax();
256                 }
257                 if (usurper) {
258                         /* an usurper is someone who snuck in before we could put the old
259                          * tail back.  They now have the lock.  Let's put whoever is
260                          * supposed to be next as their next one. 
261                          *
262                          * First, we need to change our next's pred.  There's a slight race
263                          * here, so our next will need to make sure both us and pred are
264                          * done */
265                         /* I was trying to do something so we didn't need to ensure all
266                          * vcores run, using more space in the qnode to figure out who our
267                          * pred was a lock time (guessing actually, since there's a race,
268                          * etc). */
269                         //qnode->next->pred = usurper;
270                         //wmb();
271                         usurper->next = qnode->next;
272                         /* could imagine another wmb() and a flag so our next knows to no
273                          * longer check us too. */
274                 } else {
275                         /* No usurper meant we put things back correctly, so we should just
276                          * pass the lock / unlock whoever is next */
277                         qnode->next->locked = 0;
278                 }
279         } else {
280                 /* mb()s necessary since we didn't call an atomic_swap() */
281                 wmb();  /* need to make sure any previous writes don't pass unlocking */
282                 rwmb(); /* need to make sure any reads happen before the unlocking */
283                 /* simply unlock whoever is next */
284                 qnode->next->locked = 0;
285         }
286 }
287
288 void mcs_pdro_unlock(struct mcs_pdro_lock *lock, struct mcs_pdro_qnode *qnode)
289 {
290         __mcs_pdro_unlock(lock, qnode);
291         /* Enable notifs, if we're an _M uthread */
292         uth_enable_notifs();
293 }
294
295 /* New style: under heavy contention with preemption, they won't enter the
296  * 'preempt/change_to storm' that can happen to PDRs, at the cost of some
297  * performance.  This is the default. */
298 void mcs_pdr_init(struct mcs_pdr_lock *lock)
299 {
300         int ret;
301         lock->lock = 0;
302         lock->lockholder_vcoreid = MCSPDR_NO_LOCKHOLDER;
303         ret = posix_memalign((void**)&lock->qnodes,
304                              __alignof__(struct mcs_pdr_qnode),
305                              sizeof(struct mcs_pdr_qnode) * max_vcores());
306         assert(!ret);
307 }
308
309 void mcs_pdr_fini(struct mcs_pdr_lock *lock)
310 {
311         free(lock->qnodes);
312 }
313
314 /* Similar to the original PDR lock, this tracks the lockholder for better
315  * recovery from preemptions.  Under heavy contention, changing to the
316  * lockholder instead of pred makes it more likely to have a vcore outside the
317  * MCS chain handle the preemption.  If that never happens, performance will
318  * suffer.
319  *
320  * Simply checking the lockholder causes a lot of unnecessary traffic, so we
321  * first look for signs of preemption in read-mostly locations (by comparison,
322  * the lockholder changes on every lock/unlock).
323  *
324  * We also use the "qnodes are in the lock" style, which is slightly slower than
325  * using the stack in regular MCS/MCSPDR locks, but it speeds PDR up a bit by
326  * not having to read other qnodes' memory to determine their vcoreid.  The
327  * slowdown may be due to some weird caching/prefetch settings (like Adjacent
328  * Cacheline Prefetch).
329  *
330  * Note that these locks, like all PDR locks, have opportunities to accidentally
331  * ensure some vcore runs that isn't in the chain.  Whenever we read lockholder
332  * or even pred, that particular vcore might subsequently unlock and then get
333  * preempted (or change_to someone else) before we ensure they run.  If this
334  * happens and there is another VC in the MCS chain, it will make sure the right
335  * cores run.  If there are no other vcores in the chain, it is up to the rest
336  * of the vcore/event handling system to deal with this, which should happen
337  * when one of the other vcores handles the preemption message generated by our
338  * change_to. */
339 void __mcs_pdr_lock(struct mcs_pdr_lock *lock, struct mcs_pdr_qnode *qnode)
340 {
341         struct mcs_pdr_qnode *predecessor;
342         uint32_t pred_vcoreid;
343         struct mcs_pdr_qnode *qnode0 = qnode - vcore_id();
344         seq_ctr_t seq;
345         qnode->next = 0;
346         cmb();  /* swap provides a CPU mb() */
347         predecessor = atomic_swap_ptr((void**)&lock->lock, qnode);
348         if (predecessor) {
349                 qnode->locked = 1;
350                 pred_vcoreid = predecessor - qnode0;    /* can compute this whenever */
351                 wmb();  /* order the locked write before the next write */
352                 predecessor->next = qnode;
353                 seq = ACCESS_ONCE(__procinfo.coremap_seqctr);
354                 /* no need for a wrmb(), since this will only get unlocked after they
355                  * read our pred->next write */
356                 while (qnode->locked) {
357                         /* Check to see if anything is amiss.  If someone in the chain is
358                          * preempted, then someone will notice.  Simply checking our pred
359                          * isn't that great of an indicator of preemption.  The reason is
360                          * that the offline vcore is most likely the lockholder (under heavy
361                          * lock contention), and we want someone farther back in the chain
362                          * to notice (someone that will stay preempted long enough for a
363                          * vcore outside the chain to recover them).  Checking the seqctr
364                          * will tell us of any preempts since we started, so if a storm
365                          * starts while we're spinning, we can join in and try to save the
366                          * lockholder before its successor gets it.
367                          *
368                          * Also, if we're the lockholder, then we need to let our pred run
369                          * so they can hand us the lock. */
370                         if (vcore_is_preempted(pred_vcoreid) ||
371                             seq != __procinfo.coremap_seqctr) {
372                                 /* Note that we don't normally ensure our *pred* runs. */
373                                 if (lock->lockholder_vcoreid == MCSPDR_NO_LOCKHOLDER ||
374                                     lock->lockholder_vcoreid == vcore_id())
375                                         ensure_vcore_runs(pred_vcoreid);
376                                 else
377                                         ensure_vcore_runs(lock->lockholder_vcoreid);
378                         }
379                         cpu_relax();
380                 }
381         } else {
382                 lock->lockholder_vcoreid = vcore_id();
383         }
384 }
385
386 void __mcs_pdr_unlock(struct mcs_pdr_lock *lock, struct mcs_pdr_qnode *qnode)
387 {
388         uint32_t a_tail_vcoreid;
389         struct mcs_pdr_qnode *qnode0 = qnode - vcore_id();
390         /* Check if someone is already waiting on us to unlock */
391         if (qnode->next == 0) {
392                 cmb();  /* no need for CPU mbs, since there's an atomic_cas() */
393                 /* If we're still the lock, just swap it with 0 (unlock) and return */
394                 if (atomic_cas_ptr((void**)&lock->lock, qnode, 0)) {
395                         /* This is racy with the new lockholder.  it's possible that we'll
396                          * clobber their legit write, though it doesn't actually hurt
397                          * correctness.  it'll get sorted out on the next unlock. */
398                         lock->lockholder_vcoreid = MCSPDR_NO_LOCKHOLDER;
399                         return;
400                 }
401                 /* Get the tail (or someone who recently was the tail, but could now
402                  * be farther up the chain), in prep for our spinning.  Could do an
403                  * ACCESS_ONCE on lock->lock */
404                 a_tail_vcoreid = lock->lock - qnode0;
405                 /* We failed, someone is there and we are some (maybe a different)
406                  * thread's pred.  Since someone else was waiting, they should have made
407                  * themselves our next.  Spin (very briefly!) til it happens. */
408                 while (qnode->next == 0) {
409                         /* We need to get our next to run, but we don't know who they are.
410                          * If we make sure a tail is running, that will percolate up to make
411                          * sure our qnode->next is running.
412                          *
413                          * But first, we need to tell everyone that there is no specific
414                          * lockholder.  lockholder_vcoreid is a short-circuit on the "walk
415                          * the chain" PDR.  Normally, that's okay.  But now we need to make
416                          * sure everyone is walking the chain from a_tail up to our pred. */
417                         lock->lockholder_vcoreid = MCSPDR_NO_LOCKHOLDER;
418                         ensure_vcore_runs(a_tail_vcoreid);
419                         cpu_relax();
420                 }
421                 /* Alpha wants a read_barrier_depends() here */
422                 lock->lockholder_vcoreid = qnode->next - qnode0;
423                 wmb();  /* order the vcoreid write before the unlock */
424                 qnode->next->locked = 0;
425         } else {
426                 /* Note we're saying someone else is the lockholder, though we still are
427                  * the lockholder until we unlock the next qnode.  Our next knows that
428                  * if it sees itself is the lockholder, that it needs to make sure we
429                  * run. */
430                 lock->lockholder_vcoreid = qnode->next - qnode0;
431                 /* mb()s necessary since we didn't call an atomic_swap() */
432                 wmb();  /* need to make sure any previous writes don't pass unlocking */
433                 rwmb(); /* need to make sure any reads happen before the unlocking */
434                 /* simply unlock whoever is next */
435                 qnode->next->locked = 0;
436         }
437 }
438
439 void mcs_pdr_lock(struct mcs_pdr_lock *lock)
440 {
441         uth_disable_notifs();
442         cmb();  /* in the off-chance the compiler wants to read vcoreid early */
443         __mcs_pdr_lock(lock, &lock->qnodes[vcore_id()]);
444 }
445
446 void mcs_pdr_unlock(struct mcs_pdr_lock *lock)
447 {
448         __mcs_pdr_unlock(lock, &lock->qnodes[vcore_id()]);
449         uth_enable_notifs();
450 }