Fixes potential livelock in preemption handling
[akaros.git] / user / parlib / mcs.c
1 #include <vcore.h>
2 #include <mcs.h>
3 #include <arch/atomic.h>
4 #include <string.h>
5 #include <stdlib.h>
6 #include <uthread.h>
7 #include <parlib.h>
8
9 // MCS locks
10 void mcs_lock_init(struct mcs_lock *lock)
11 {
12         memset(lock,0,sizeof(mcs_lock_t));
13 }
14
15 static inline mcs_lock_qnode_t *mcs_qnode_swap(mcs_lock_qnode_t **addr,
16                                                mcs_lock_qnode_t *val)
17 {
18         return (mcs_lock_qnode_t*)atomic_swap_ptr((void**)addr, val);
19 }
20
21 void mcs_lock_lock(struct mcs_lock *lock, struct mcs_lock_qnode *qnode)
22 {
23         qnode->next = 0;
24         cmb();  /* swap provides a CPU mb() */
25         mcs_lock_qnode_t *predecessor = mcs_qnode_swap(&lock->lock, qnode);
26         if (predecessor) {
27                 qnode->locked = 1;
28                 wmb();
29                 predecessor->next = qnode;
30                 /* no need for a wrmb(), since this will only get unlocked after they
31                  * read our previous write */
32                 while (qnode->locked)
33                         cpu_relax();
34         }
35         cmb();  /* just need a cmb, the swap handles the CPU wmb/wrmb() */
36 }
37
38 void mcs_lock_unlock(struct mcs_lock *lock, struct mcs_lock_qnode *qnode)
39 {
40         /* Check if someone is already waiting on us to unlock */
41         if (qnode->next == 0) {
42                 cmb();  /* no need for CPU mbs, since there's an atomic_swap() */
43                 /* Unlock it */
44                 mcs_lock_qnode_t *old_tail = mcs_qnode_swap(&lock->lock,0);
45                 /* no one else was already waiting, so we successfully unlocked and can
46                  * return */
47                 if (old_tail == qnode)
48                         return;
49                 /* someone else was already waiting on the lock (last one on the list),
50                  * and we accidentally took them off.  Try and put it back. */
51                 mcs_lock_qnode_t *usurper = mcs_qnode_swap(&lock->lock,old_tail);
52                 /* since someone else was waiting, they should have made themselves our
53                  * next.  spin (very briefly!) til it happens. */
54                 while (qnode->next == 0)
55                         cpu_relax();
56                 if (usurper) {
57                         /* an usurper is someone who snuck in before we could put the old
58                          * tail back.  They now have the lock.  Let's put whoever is
59                          * supposed to be next as their next one. */
60                         usurper->next = qnode->next;
61                 } else {
62                         /* No usurper meant we put things back correctly, so we should just
63                          * pass the lock / unlock whoever is next */
64                         qnode->next->locked = 0;
65                 }
66         } else {
67                 /* mb()s necessary since we didn't call an atomic_swap() */
68                 wmb();  /* need to make sure any previous writes don't pass unlocking */
69                 rwmb(); /* need to make sure any reads happen before the unlocking */
70                 /* simply unlock whoever is next */
71                 qnode->next->locked = 0;
72         }
73 }
74
75 /* CAS style mcs locks, kept around til we use them.  We're using the
76  * usurper-style, since RISCV and SPARC both don't have a real CAS. */
77 void mcs_lock_unlock_cas(struct mcs_lock *lock, struct mcs_lock_qnode *qnode)
78 {
79         /* Check if someone is already waiting on us to unlock */
80         if (qnode->next == 0) {
81                 cmb();  /* no need for CPU mbs, since there's an atomic_cas() */
82                 /* If we're still the lock, just swap it with 0 (unlock) and return */
83                 if (atomic_cas_ptr((void**)&lock->lock, qnode, 0))
84                         return;
85                 /* We failed, someone is there and we are some (maybe a different)
86                  * thread's pred.  Since someone else was waiting, they should have made
87                  * themselves our next.  Spin (very briefly!) til it happens. */
88                 while (qnode->next == 0)
89                         cpu_relax();
90                 /* Alpha wants a read_barrier_depends() here */
91                 /* Now that we have a next, unlock them */
92                 qnode->next->locked = 0;
93         } else {
94                 /* mb()s necessary since we didn't call an atomic_swap() */
95                 wmb();  /* need to make sure any previous writes don't pass unlocking */
96                 rwmb(); /* need to make sure any reads happen before the unlocking */
97                 /* simply unlock whoever is next */
98                 qnode->next->locked = 0;
99         }
100 }
101
102 /* We don't bother saving the state, like we do with irqsave, since we can use
103  * whether or not we are in vcore context to determine that.  This means you
104  * shouldn't call this from those moments when you fake being in vcore context
105  * (when switching into the TLS, etc). */
106 void mcs_lock_notifsafe(struct mcs_lock *lock, struct mcs_lock_qnode *qnode)
107 {
108         uth_disable_notifs();
109         mcs_lock_lock(lock, qnode);
110 }
111
112 /* Note we turn off the DONT_MIGRATE flag before enabling notifs.  This is fine,
113  * since we wouldn't receive any notifs that could lead to us migrating after we
114  * set DONT_MIGRATE but before enable_notifs().  We need it to be in this order,
115  * since we need to check messages after ~DONT_MIGRATE. */
116 void mcs_unlock_notifsafe(struct mcs_lock *lock, struct mcs_lock_qnode *qnode)
117 {
118         mcs_lock_unlock(lock, qnode);
119         uth_enable_notifs();
120 }
121
122 // MCS dissemination barrier!
123 int mcs_barrier_init(mcs_barrier_t* b, size_t np)
124 {
125         if(np > max_vcores())
126                 return -1;
127         b->allnodes = (mcs_dissem_flags_t*)malloc(np*sizeof(mcs_dissem_flags_t));
128         memset(b->allnodes,0,np*sizeof(mcs_dissem_flags_t));
129         b->nprocs = np;
130
131         b->logp = (np & (np-1)) != 0;
132         while(np >>= 1)
133                 b->logp++;
134
135         size_t i,k;
136         for(i = 0; i < b->nprocs; i++)
137         {
138                 b->allnodes[i].parity = 0;
139                 b->allnodes[i].sense = 1;
140
141                 for(k = 0; k < b->logp; k++)
142                 {
143                         size_t j = (i+(1<<k)) % b->nprocs;
144                         b->allnodes[i].partnerflags[0][k] = &b->allnodes[j].myflags[0][k];
145                         b->allnodes[i].partnerflags[1][k] = &b->allnodes[j].myflags[1][k];
146                 } 
147         }
148
149         return 0;
150 }
151
152 void mcs_barrier_wait(mcs_barrier_t* b, size_t pid)
153 {
154         mcs_dissem_flags_t* localflags = &b->allnodes[pid];
155         size_t i;
156         for(i = 0; i < b->logp; i++)
157         {
158                 *localflags->partnerflags[localflags->parity][i] = localflags->sense;
159                 while(localflags->myflags[localflags->parity][i] != localflags->sense);
160         }
161         if(localflags->parity)
162                 localflags->sense = 1-localflags->sense;
163         localflags->parity = 1-localflags->parity;
164 }
165
166 /* Preemption detection and recovering MCS locks.  These are memory safe ones.
167  * In the future, we can make ones that you pass the qnode to, so long as you
168  * never free the qnode storage (stacks) */
169 void mcs_pdr_init(struct mcs_pdr_lock *lock)
170 {
171         lock->lock = 0;
172         lock->lock_holder = 0;
173         lock->vc_qnodes = malloc(sizeof(struct mcs_pdr_qnode) * max_vcores());
174         assert(lock->vc_qnodes);
175         memset(lock->vc_qnodes, 0, sizeof(struct mcs_pdr_qnode) * max_vcores());
176         for (int i = 0; i < max_vcores(); i++)
177                 lock->vc_qnodes[i].vcoreid = i;
178 }
179
180 void mcs_pdr_fini(struct mcs_pdr_lock *lock)
181 {
182         assert(lock->vc_qnodes);
183         free(lock->vc_qnodes);
184 }
185
186 /* Helper, will make sure the vcore owning qnode is running.  If we change to
187  * that vcore, we'll continue when our vcore gets restarted.  If the change
188  * fails, it is because the vcore is running, and we'll continue.
189  *
190  * It's worth noting that changing to another vcore won't hurt correctness.
191  * Even if they are no longer the lockholder, they will be checking preemption
192  * messages and will help break out of the deadlock.  So long as we don't
193  * wastefully spin, we're okay. */
194 static void __ensure_qnode_runs(struct mcs_pdr_qnode *qnode)
195 {
196         /* This assert is somewhat useless.  __lock will compile it out, since it is
197          * impossible.  If you have a PF around here in __lock, odds are your stack
198          * is getting gently clobbered (syscall finished twice?). */
199         assert(qnode);
200         ensure_vcore_runs(qnode->vcoreid);
201 }
202
203 /* Internal version of the locking function, doesn't care about storage of qnode
204  * or if notifs are disabled. */
205 void __mcs_pdr_lock(struct mcs_pdr_lock *lock, struct mcs_pdr_qnode *qnode)
206 {
207         struct mcs_pdr_qnode *predecessor;
208         /* Now the actual lock */
209         qnode->next = 0;
210         cmb();  /* swap provides a CPU mb() */
211         predecessor = atomic_swap_ptr((void**)&lock->lock, qnode);
212         if (predecessor) {
213                 qnode->locked = 1;
214                 wmb();
215                 predecessor->next = qnode;
216                 /* no need for a wrmb(), since this will only get unlocked after they
217                  * read our previous write */
218                 while (qnode->locked) {
219                         /* Ideally, we know who the lock holder is, and we'll make sure they
220                          * run.  If not, we'll make sure our pred is running, which trickles
221                          * up to the lock holder, if it isn't them. */
222                         if (lock->lock_holder)
223                                 __ensure_qnode_runs(lock->lock_holder);
224                         else
225                                 __ensure_qnode_runs(predecessor);
226                         cpu_relax();
227                 }
228         }
229         cmb();  /* just need a cmb, the swap handles the CPU wmb/wrmb() */
230         /* publish ourselves as the lock holder (optimization) */
231         lock->lock_holder = qnode;      /* mbs() handled by the cmb/swap */
232 }
233
234 /* Using the CAS style unlocks, since the usurper recovery is a real pain in the
235  * ass */
236 void __mcs_pdr_unlock(struct mcs_pdr_lock *lock, struct mcs_pdr_qnode *qnode)
237 {
238         struct mcs_pdr_qnode *a_tail;
239         /* Clear us from being the lock holder */
240         lock->lock_holder = 0;  /* mbs() are covered by the cmb/cas and the wmb */
241         /* Check if someone is already waiting on us to unlock */
242         if (qnode->next == 0) {
243                 cmb();  /* no need for CPU mbs, since there's an atomic_cas() */
244                 /* If we're still the lock, just swap it with 0 (unlock) and return */
245                 if (atomic_cas_ptr((void**)&lock->lock, qnode, 0))
246                         goto out;
247                 cmb();  /* need to read a fresh tail.  the CAS was a CPU mb */
248                 /* Read in the tail (or someone who recent was the tail, but could now
249                  * be farther up the chain), in prep for our spinning. */
250                 a_tail = lock->lock;
251                 /* We failed, someone is there and we are some (maybe a different)
252                  * thread's pred.  Since someone else was waiting, they should have made
253                  * themselves our next.  Spin (very briefly!) til it happens. */
254                 while (qnode->next == 0) {
255                         /* We need to get our next to run, but we don't know who they are.
256                          * If we make sure a tail is running, that will percolate up to make
257                          * sure our qnode->next is running */
258                         __ensure_qnode_runs(a_tail);
259                         /* Arguably, that reads new tails each time, but it'll still work
260                          * for this rare case */
261                         cpu_relax();
262                 }
263                 /* Alpha wants a read_barrier_depends() here */
264                 /* Now that we have a next, unlock them */
265                 qnode->next->locked = 0;
266         } else {
267                 /* mb()s necessary since we didn't call an atomic_swap() */
268                 wmb();  /* need to make sure any previous writes don't pass unlocking */
269                 rwmb(); /* need to make sure any reads happen before the unlocking */
270                 /* simply unlock whoever is next */
271                 qnode->next->locked = 0;
272         }
273 out:
274         /* ease of debugging */
275         qnode->next = 0;
276 }
277
278 /* Actual MCS-PDR locks.  These use memory in the lock for their qnodes, though
279  * the internal locking code doesn't care where the qnodes come from: so long as
280  * they are not freed and can stand a random read of vcoreid. */
281 void mcs_pdr_lock(struct mcs_pdr_lock *lock)
282 {
283         struct mcs_pdr_qnode *qnode;
284         /* Disable notifs, if we're an _M uthread */
285         uth_disable_notifs();
286         /* Get our qnode from the array.  vcoreid was preset, and the other fields
287          * get handled by the lock */
288         qnode = &lock->vc_qnodes[vcore_id()];
289         assert(qnode->vcoreid == vcore_id());   /* sanity */
290         __mcs_pdr_lock(lock, qnode);
291 }
292
293 /* CAS-less unlock, not quite as efficient and will make sure every vcore runs
294  * (since we don't have a convenient way to make sure our qnode->next runs
295  * yet, other than making sure everyone runs).
296  *
297  * To use this without ensuring all vcores run, you'll need the unlock code to
298  * save pred to a specific field in the qnode and check both its initial pred
299  * as well as its run time pred (who could be an usurper).  It's all possible,
300  * but a little more difficult to follow.  Also, I'm adjusting this comment
301  * months after writing it originally, so it is probably not sufficient, but
302  * necessary. */
303 void __mcs_pdr_unlock_no_cas(struct mcs_pdr_lock *lock,
304                              struct mcs_pdr_qnode *qnode)
305 {
306         struct mcs_pdr_qnode *old_tail, *usurper;
307         /* Check if someone is already waiting on us to unlock */
308         if (qnode->next == 0) {
309                 cmb();  /* no need for CPU mbs, since there's an atomic_swap() */
310                 /* Unlock it */
311                 old_tail = atomic_swap_ptr((void**)&lock->lock, 0);
312                 /* no one else was already waiting, so we successfully unlocked and can
313                  * return */
314                 if (old_tail == qnode)
315                         return;
316                 /* someone else was already waiting on the lock (last one on the list),
317                  * and we accidentally took them off.  Try and put it back. */
318                 usurper = atomic_swap_ptr((void*)&lock->lock, old_tail);
319                 /* since someone else was waiting, they should have made themselves our
320                  * next.  spin (very briefly!) til it happens. */
321                 while (qnode->next == 0) {
322                         /* make sure old_tail isn't preempted.  best we can do for now is
323                          * to make sure all vcores run, and thereby get our next. */
324                         for (int i = 0; i < max_vcores(); i++)
325                                 ensure_vcore_runs(i);
326                         cpu_relax();
327                 }
328                 if (usurper) {
329                         /* an usurper is someone who snuck in before we could put the old
330                          * tail back.  They now have the lock.  Let's put whoever is
331                          * supposed to be next as their next one. 
332                          *
333                          * First, we need to change our next's pred.  There's a slight race
334                          * here, so our next will need to make sure both us and pred are
335                          * done */
336                         /* I was trying to do something so we didn't need to ensure all
337                          * vcores run, using more space in the qnode to figure out who our
338                          * pred was a lock time (guessing actually, since there's a race,
339                          * etc). */
340                         //qnode->next->pred = usurper;
341                         //wmb();
342                         usurper->next = qnode->next;
343                         /* could imagine another wmb() and a flag so our next knows to no
344                          * longer check us too. */
345                 } else {
346                         /* No usurper meant we put things back correctly, so we should just
347                          * pass the lock / unlock whoever is next */
348                         qnode->next->locked = 0;
349                 }
350         } else {
351                 /* mb()s necessary since we didn't call an atomic_swap() */
352                 wmb();  /* need to make sure any previous writes don't pass unlocking */
353                 rwmb(); /* need to make sure any reads happen before the unlocking */
354                 /* simply unlock whoever is next */
355                 qnode->next->locked = 0;
356         }
357 }
358
359 void mcs_pdr_unlock(struct mcs_pdr_lock *lock)
360 {
361         struct mcs_pdr_qnode *qnode = &lock->vc_qnodes[vcore_id()];
362         assert(qnode->vcoreid == vcore_id());   /* sanity */
363 #ifndef __riscv__
364         __mcs_pdr_unlock(lock, qnode);
365 #else
366         __mcs_pdr_unlock_no_cas(lock, qnode);
367 #endif
368         /* Enable notifs, if we're an _M uthread */
369         uth_enable_notifs();
370 }