MCS PDR locks use memalign
[akaros.git] / user / parlib / mcs.c
1 #include <vcore.h>
2 #include <mcs.h>
3 #include <arch/atomic.h>
4 #include <string.h>
5 #include <stdlib.h>
6 #include <uthread.h>
7 #include <parlib.h>
8 #include <malloc.h>
9
10 // MCS locks
11 void mcs_lock_init(struct mcs_lock *lock)
12 {
13         memset(lock,0,sizeof(mcs_lock_t));
14 }
15
16 static inline mcs_lock_qnode_t *mcs_qnode_swap(mcs_lock_qnode_t **addr,
17                                                mcs_lock_qnode_t *val)
18 {
19         return (mcs_lock_qnode_t*)atomic_swap_ptr((void**)addr, val);
20 }
21
22 void mcs_lock_lock(struct mcs_lock *lock, struct mcs_lock_qnode *qnode)
23 {
24         qnode->next = 0;
25         cmb();  /* swap provides a CPU mb() */
26         mcs_lock_qnode_t *predecessor = mcs_qnode_swap(&lock->lock, qnode);
27         if (predecessor) {
28                 qnode->locked = 1;
29                 wmb();
30                 predecessor->next = qnode;
31                 /* no need for a wrmb(), since this will only get unlocked after they
32                  * read our previous write */
33                 while (qnode->locked)
34                         cpu_relax();
35         }
36         cmb();  /* just need a cmb, the swap handles the CPU wmb/wrmb() */
37 }
38
39 void mcs_lock_unlock(struct mcs_lock *lock, struct mcs_lock_qnode *qnode)
40 {
41         /* Check if someone is already waiting on us to unlock */
42         if (qnode->next == 0) {
43                 cmb();  /* no need for CPU mbs, since there's an atomic_swap() */
44                 /* Unlock it */
45                 mcs_lock_qnode_t *old_tail = mcs_qnode_swap(&lock->lock,0);
46                 /* no one else was already waiting, so we successfully unlocked and can
47                  * return */
48                 if (old_tail == qnode)
49                         return;
50                 /* someone else was already waiting on the lock (last one on the list),
51                  * and we accidentally took them off.  Try and put it back. */
52                 mcs_lock_qnode_t *usurper = mcs_qnode_swap(&lock->lock,old_tail);
53                 /* since someone else was waiting, they should have made themselves our
54                  * next.  spin (very briefly!) til it happens. */
55                 while (qnode->next == 0)
56                         cpu_relax();
57                 if (usurper) {
58                         /* an usurper is someone who snuck in before we could put the old
59                          * tail back.  They now have the lock.  Let's put whoever is
60                          * supposed to be next as their next one. */
61                         usurper->next = qnode->next;
62                 } else {
63                         /* No usurper meant we put things back correctly, so we should just
64                          * pass the lock / unlock whoever is next */
65                         qnode->next->locked = 0;
66                 }
67         } else {
68                 /* mb()s necessary since we didn't call an atomic_swap() */
69                 wmb();  /* need to make sure any previous writes don't pass unlocking */
70                 rwmb(); /* need to make sure any reads happen before the unlocking */
71                 /* simply unlock whoever is next */
72                 qnode->next->locked = 0;
73         }
74 }
75
76 /* CAS style mcs locks, kept around til we use them.  We're using the
77  * usurper-style, since RISCV and SPARC both don't have a real CAS. */
78 void mcs_lock_unlock_cas(struct mcs_lock *lock, struct mcs_lock_qnode *qnode)
79 {
80         /* Check if someone is already waiting on us to unlock */
81         if (qnode->next == 0) {
82                 cmb();  /* no need for CPU mbs, since there's an atomic_cas() */
83                 /* If we're still the lock, just swap it with 0 (unlock) and return */
84                 if (atomic_cas_ptr((void**)&lock->lock, qnode, 0))
85                         return;
86                 /* We failed, someone is there and we are some (maybe a different)
87                  * thread's pred.  Since someone else was waiting, they should have made
88                  * themselves our next.  Spin (very briefly!) til it happens. */
89                 while (qnode->next == 0)
90                         cpu_relax();
91                 /* Alpha wants a read_barrier_depends() here */
92                 /* Now that we have a next, unlock them */
93                 qnode->next->locked = 0;
94         } else {
95                 /* mb()s necessary since we didn't call an atomic_swap() */
96                 wmb();  /* need to make sure any previous writes don't pass unlocking */
97                 rwmb(); /* need to make sure any reads happen before the unlocking */
98                 /* simply unlock whoever is next */
99                 qnode->next->locked = 0;
100         }
101 }
102
103 /* We don't bother saving the state, like we do with irqsave, since we can use
104  * whether or not we are in vcore context to determine that.  This means you
105  * shouldn't call this from those moments when you fake being in vcore context
106  * (when switching into the TLS, etc). */
107 void mcs_lock_notifsafe(struct mcs_lock *lock, struct mcs_lock_qnode *qnode)
108 {
109         uth_disable_notifs();
110         mcs_lock_lock(lock, qnode);
111 }
112
113 /* Note we turn off the DONT_MIGRATE flag before enabling notifs.  This is fine,
114  * since we wouldn't receive any notifs that could lead to us migrating after we
115  * set DONT_MIGRATE but before enable_notifs().  We need it to be in this order,
116  * since we need to check messages after ~DONT_MIGRATE. */
117 void mcs_unlock_notifsafe(struct mcs_lock *lock, struct mcs_lock_qnode *qnode)
118 {
119         mcs_lock_unlock(lock, qnode);
120         uth_enable_notifs();
121 }
122
123 // MCS dissemination barrier!
124 int mcs_barrier_init(mcs_barrier_t* b, size_t np)
125 {
126         if(np > max_vcores())
127                 return -1;
128         b->allnodes = (mcs_dissem_flags_t*)malloc(np*sizeof(mcs_dissem_flags_t));
129         memset(b->allnodes,0,np*sizeof(mcs_dissem_flags_t));
130         b->nprocs = np;
131
132         b->logp = (np & (np-1)) != 0;
133         while(np >>= 1)
134                 b->logp++;
135
136         size_t i,k;
137         for(i = 0; i < b->nprocs; i++)
138         {
139                 b->allnodes[i].parity = 0;
140                 b->allnodes[i].sense = 1;
141
142                 for(k = 0; k < b->logp; k++)
143                 {
144                         size_t j = (i+(1<<k)) % b->nprocs;
145                         b->allnodes[i].partnerflags[0][k] = &b->allnodes[j].myflags[0][k];
146                         b->allnodes[i].partnerflags[1][k] = &b->allnodes[j].myflags[1][k];
147                 } 
148         }
149
150         return 0;
151 }
152
153 void mcs_barrier_wait(mcs_barrier_t* b, size_t pid)
154 {
155         mcs_dissem_flags_t* localflags = &b->allnodes[pid];
156         size_t i;
157         for(i = 0; i < b->logp; i++)
158         {
159                 *localflags->partnerflags[localflags->parity][i] = localflags->sense;
160                 while(localflags->myflags[localflags->parity][i] != localflags->sense);
161         }
162         if(localflags->parity)
163                 localflags->sense = 1-localflags->sense;
164         localflags->parity = 1-localflags->parity;
165 }
166
167 /* Preemption detection and recovering MCS locks.  These are memory safe ones.
168  * In the future, we can make ones that you pass the qnode to, so long as you
169  * never free the qnode storage (stacks) */
170 void mcs_pdr_init(struct mcs_pdr_lock *lock)
171 {
172         lock->lock = 0;
173         lock->lock_holder = 0;
174         lock->vc_qnodes = memalign(__alignof(struct mcs_pdr_qnode),
175                                    sizeof(struct mcs_pdr_qnode) * max_vcores());
176         assert(lock->vc_qnodes);
177         memset(lock->vc_qnodes, 0, sizeof(struct mcs_pdr_qnode) * max_vcores());
178         for (int i = 0; i < max_vcores(); i++)
179                 lock->vc_qnodes[i].vcoreid = i;
180 }
181
182 void mcs_pdr_fini(struct mcs_pdr_lock *lock)
183 {
184         assert(lock->vc_qnodes);
185         free(lock->vc_qnodes);
186 }
187
188 /* Helper, will make sure the vcore owning qnode is running.  If we change to
189  * that vcore, we'll continue when our vcore gets restarted.  If the change
190  * fails, it is because the vcore is running, and we'll continue.
191  *
192  * It's worth noting that changing to another vcore won't hurt correctness.
193  * Even if they are no longer the lockholder, they will be checking preemption
194  * messages and will help break out of the deadlock.  So long as we don't
195  * wastefully spin, we're okay. */
196 static void __ensure_qnode_runs(struct mcs_pdr_qnode *qnode)
197 {
198         /* This assert is somewhat useless.  __lock will compile it out, since it is
199          * impossible.  If you have a PF around here in __lock, odds are your stack
200          * is getting gently clobbered (syscall finished twice?). */
201         assert(qnode);
202         ensure_vcore_runs(qnode->vcoreid);
203 }
204
205 /* Internal version of the locking function, doesn't care about storage of qnode
206  * or if notifs are disabled. */
207 void __mcs_pdr_lock(struct mcs_pdr_lock *lock, struct mcs_pdr_qnode *qnode)
208 {
209         struct mcs_pdr_qnode *predecessor;
210         /* Now the actual lock */
211         qnode->next = 0;
212         cmb();  /* swap provides a CPU mb() */
213         predecessor = atomic_swap_ptr((void**)&lock->lock, qnode);
214         if (predecessor) {
215                 qnode->locked = 1;
216                 wmb();
217                 predecessor->next = qnode;
218                 /* no need for a wrmb(), since this will only get unlocked after they
219                  * read our previous write */
220                 while (qnode->locked) {
221                         /* Ideally, we know who the lock holder is, and we'll make sure they
222                          * run.  If not, we'll make sure our pred is running, which trickles
223                          * up to the lock holder, if it isn't them. */
224                         if (lock->lock_holder)
225                                 __ensure_qnode_runs(lock->lock_holder);
226                         else
227                                 __ensure_qnode_runs(predecessor);
228                         cpu_relax();
229                 }
230         }
231         cmb();  /* just need a cmb, the swap handles the CPU wmb/wrmb() */
232         /* publish ourselves as the lock holder (optimization) */
233         lock->lock_holder = qnode;      /* mbs() handled by the cmb/swap */
234 }
235
236 /* Using the CAS style unlocks, since the usurper recovery is a real pain in the
237  * ass */
238 void __mcs_pdr_unlock(struct mcs_pdr_lock *lock, struct mcs_pdr_qnode *qnode)
239 {
240         struct mcs_pdr_qnode *a_tail;
241         /* Clear us from being the lock holder */
242         lock->lock_holder = 0;  /* mbs() are covered by the cmb/cas and the wmb */
243         /* Check if someone is already waiting on us to unlock */
244         if (qnode->next == 0) {
245                 cmb();  /* no need for CPU mbs, since there's an atomic_cas() */
246                 /* If we're still the lock, just swap it with 0 (unlock) and return */
247                 if (atomic_cas_ptr((void**)&lock->lock, qnode, 0))
248                         goto out;
249                 cmb();  /* need to read a fresh tail.  the CAS was a CPU mb */
250                 /* Read in the tail (or someone who recent was the tail, but could now
251                  * be farther up the chain), in prep for our spinning. */
252                 a_tail = lock->lock;
253                 /* We failed, someone is there and we are some (maybe a different)
254                  * thread's pred.  Since someone else was waiting, they should have made
255                  * themselves our next.  Spin (very briefly!) til it happens. */
256                 while (qnode->next == 0) {
257                         /* We need to get our next to run, but we don't know who they are.
258                          * If we make sure a tail is running, that will percolate up to make
259                          * sure our qnode->next is running */
260                         __ensure_qnode_runs(a_tail);
261                         /* Arguably, that reads new tails each time, but it'll still work
262                          * for this rare case */
263                         cpu_relax();
264                 }
265                 /* Alpha wants a read_barrier_depends() here */
266                 /* Now that we have a next, unlock them */
267                 qnode->next->locked = 0;
268         } else {
269                 /* mb()s necessary since we didn't call an atomic_swap() */
270                 wmb();  /* need to make sure any previous writes don't pass unlocking */
271                 rwmb(); /* need to make sure any reads happen before the unlocking */
272                 /* simply unlock whoever is next */
273                 qnode->next->locked = 0;
274         }
275 out:
276         /* ease of debugging */
277         qnode->next = 0;
278 }
279
280 /* Actual MCS-PDR locks.  These use memory in the lock for their qnodes, though
281  * the internal locking code doesn't care where the qnodes come from: so long as
282  * they are not freed and can stand a random read of vcoreid. */
283 void mcs_pdr_lock(struct mcs_pdr_lock *lock)
284 {
285         struct mcs_pdr_qnode *qnode;
286         /* Disable notifs, if we're an _M uthread */
287         uth_disable_notifs();
288         /* Get our qnode from the array.  vcoreid was preset, and the other fields
289          * get handled by the lock */
290         qnode = &lock->vc_qnodes[vcore_id()];
291         assert(qnode->vcoreid == vcore_id());   /* sanity */
292         __mcs_pdr_lock(lock, qnode);
293 }
294
295 /* CAS-less unlock, not quite as efficient and will make sure every vcore runs
296  * (since we don't have a convenient way to make sure our qnode->next runs
297  * yet, other than making sure everyone runs).
298  *
299  * To use this without ensuring all vcores run, you'll need the unlock code to
300  * save pred to a specific field in the qnode and check both its initial pred
301  * as well as its run time pred (who could be an usurper).  It's all possible,
302  * but a little more difficult to follow.  Also, I'm adjusting this comment
303  * months after writing it originally, so it is probably not sufficient, but
304  * necessary. */
305 void __mcs_pdr_unlock_no_cas(struct mcs_pdr_lock *lock,
306                              struct mcs_pdr_qnode *qnode)
307 {
308         struct mcs_pdr_qnode *old_tail, *usurper;
309         /* Check if someone is already waiting on us to unlock */
310         if (qnode->next == 0) {
311                 cmb();  /* no need for CPU mbs, since there's an atomic_swap() */
312                 /* Unlock it */
313                 old_tail = atomic_swap_ptr((void**)&lock->lock, 0);
314                 /* no one else was already waiting, so we successfully unlocked and can
315                  * return */
316                 if (old_tail == qnode)
317                         return;
318                 /* someone else was already waiting on the lock (last one on the list),
319                  * and we accidentally took them off.  Try and put it back. */
320                 usurper = atomic_swap_ptr((void*)&lock->lock, old_tail);
321                 /* since someone else was waiting, they should have made themselves our
322                  * next.  spin (very briefly!) til it happens. */
323                 while (qnode->next == 0) {
324                         /* make sure old_tail isn't preempted.  best we can do for now is
325                          * to make sure all vcores run, and thereby get our next. */
326                         for (int i = 0; i < max_vcores(); i++)
327                                 ensure_vcore_runs(i);
328                         cpu_relax();
329                 }
330                 if (usurper) {
331                         /* an usurper is someone who snuck in before we could put the old
332                          * tail back.  They now have the lock.  Let's put whoever is
333                          * supposed to be next as their next one. 
334                          *
335                          * First, we need to change our next's pred.  There's a slight race
336                          * here, so our next will need to make sure both us and pred are
337                          * done */
338                         /* I was trying to do something so we didn't need to ensure all
339                          * vcores run, using more space in the qnode to figure out who our
340                          * pred was a lock time (guessing actually, since there's a race,
341                          * etc). */
342                         //qnode->next->pred = usurper;
343                         //wmb();
344                         usurper->next = qnode->next;
345                         /* could imagine another wmb() and a flag so our next knows to no
346                          * longer check us too. */
347                 } else {
348                         /* No usurper meant we put things back correctly, so we should just
349                          * pass the lock / unlock whoever is next */
350                         qnode->next->locked = 0;
351                 }
352         } else {
353                 /* mb()s necessary since we didn't call an atomic_swap() */
354                 wmb();  /* need to make sure any previous writes don't pass unlocking */
355                 rwmb(); /* need to make sure any reads happen before the unlocking */
356                 /* simply unlock whoever is next */
357                 qnode->next->locked = 0;
358         }
359 }
360
361 void mcs_pdr_unlock(struct mcs_pdr_lock *lock)
362 {
363         struct mcs_pdr_qnode *qnode = &lock->vc_qnodes[vcore_id()];
364         assert(qnode->vcoreid == vcore_id());   /* sanity */
365 #ifndef __riscv__
366         __mcs_pdr_unlock(lock, qnode);
367 #else
368         __mcs_pdr_unlock_no_cas(lock, qnode);
369 #endif
370         /* Enable notifs, if we're an _M uthread */
371         uth_enable_notifs();
372 }