parlib: Add uth_cond_var_wait_recurse()
[akaros.git] / user / parlib / mutex.c
1 /* Copyright (c) 2016-2017 Google, Inc.
2  * Barret Rhoden <brho@cs.berkeley.edu>
3  * See LICENSE for details. */
4
5 /* Generic Uthread Semaphores, Mutexes, CVs, and other synchronization
6  * functions.  2LSs implement their own sync objects (bottom of the file). */
7
8 #include <parlib/uthread.h>
9 #include <sys/queue.h>
10 #include <parlib/spinlock.h>
11 #include <malloc.h>
12
13
14 /************** Semaphores and Mutexes **************/
15
16 static void __uth_semaphore_init(void *arg)
17 {
18         struct uth_semaphore *sem = (struct uth_semaphore*)arg;
19
20         spin_pdr_init(&sem->lock);
21         sem->sync_obj = __uth_sync_alloc();
22         /* If we used a static initializer for a semaphore, count is already set.
23          * o/w it will be set by _alloc() or _init() (via uth_semaphore_init()). */
24 }
25
26 /* Initializes a sem acquired from somewhere else.  POSIX's sem_init() needs
27  * this. */
28 void uth_semaphore_init(uth_semaphore_t *sem, unsigned int count)
29 {
30         __uth_semaphore_init(sem);
31         sem->count = count;
32         /* The once is to make sure the object is initialized. */
33         parlib_set_ran_once(&sem->once_ctl);
34 }
35
36 /* Undoes whatever was done in init. */
37 void uth_semaphore_destroy(uth_semaphore_t *sem)
38 {
39         __uth_sync_free(sem->sync_obj);
40 }
41
42 uth_semaphore_t *uth_semaphore_alloc(unsigned int count)
43 {
44         struct uth_semaphore *sem;
45
46         sem = malloc(sizeof(struct uth_semaphore));
47         assert(sem);
48         uth_semaphore_init(sem, count);
49         return sem;
50 }
51
52 void uth_semaphore_free(uth_semaphore_t *sem)
53 {
54         uth_semaphore_destroy(sem);
55         free(sem);
56 }
57
58 static void __semaphore_cb(struct uthread *uth, void *arg)
59 {
60         struct uth_semaphore *sem = (struct uth_semaphore*)arg;
61
62         /* We need to tell the 2LS that its thread blocked.  We need to do this
63          * before unlocking the sem, since as soon as we unlock, the sem could be
64          * released and our thread restarted.
65          *
66          * Also note the lock-ordering rule.  The sem lock is grabbed before any
67          * locks the 2LS might grab. */
68         uthread_has_blocked(uth, sem->sync_obj, UTH_EXT_BLK_MUTEX);
69         spin_pdr_unlock(&sem->lock);
70 }
71
72 void uth_semaphore_down(uth_semaphore_t *sem)
73 {
74         parlib_run_once(&sem->once_ctl, __uth_semaphore_init, sem);
75         spin_pdr_lock(&sem->lock);
76         if (sem->count > 0) {
77                 /* Only down if we got one.  This means a sem with no more counts is 0,
78                  * not negative (where -count == nr_waiters).  Doing it this way means
79                  * our timeout function works for sems and CVs. */
80                 sem->count--;
81                 spin_pdr_unlock(&sem->lock);
82                 return;
83         }
84         /* the unlock and sync enqueuing is done in the yield callback.  as always,
85          * we need to do this part in vcore context, since as soon as we unlock the
86          * uthread could restart.  (atomically yield and unlock). */
87         uthread_yield(TRUE, __semaphore_cb, sem);
88 }
89
90 bool uth_semaphore_trydown(uth_semaphore_t *sem)
91 {
92         bool ret = FALSE;
93
94         parlib_run_once(&sem->once_ctl, __uth_semaphore_init, sem);
95         spin_pdr_lock(&sem->lock);
96         if (sem->count > 0) {
97                 sem->count--;
98                 ret = TRUE;
99         }
100         spin_pdr_unlock(&sem->lock);
101         return ret;
102 }
103
104 void uth_semaphore_up(uth_semaphore_t *sem)
105 {
106         struct uthread *uth;
107
108         /* once-ing the 'up', unlike mtxs 'unlock', since sems can be special. */
109         parlib_run_once(&sem->once_ctl, __uth_semaphore_init, sem);
110         spin_pdr_lock(&sem->lock);
111         uth = __uth_sync_get_next(sem->sync_obj);
112         /* If there was a waiter, we pass our resource/count to them. */
113         if (!uth)
114                 sem->count++;
115         spin_pdr_unlock(&sem->lock);
116         if (uth)
117                 uthread_runnable(uth);
118 }
119
120 /* Takes a void * since it's called by parlib_run_once(), which enables us to
121  * statically initialize the mutex.  This init does everything not done by the
122  * static initializer.  Note we do not allow 'static' destruction.  (No one
123  * calls free). */
124 static void __uth_mutex_init(void *arg)
125 {
126         struct uth_semaphore *mtx = (struct uth_semaphore*)arg;
127
128         __uth_semaphore_init(mtx);
129         mtx->count = 1;
130 }
131
132 void uth_mutex_init(uth_mutex_t *mtx)
133 {
134         __uth_mutex_init(mtx);
135         parlib_set_ran_once(&mtx->once_ctl);
136 }
137
138 void uth_mutex_destroy(uth_mutex_t *mtx)
139 {
140         uth_semaphore_destroy(mtx);
141 }
142
143 uth_mutex_t *uth_mutex_alloc(void)
144 {
145         struct uth_semaphore *mtx;
146
147         mtx = malloc(sizeof(struct uth_semaphore));
148         assert(mtx);
149         uth_mutex_init(mtx);
150         return mtx;
151 }
152
153 void uth_mutex_free(uth_mutex_t *mtx)
154 {
155         uth_semaphore_free(mtx);
156 }
157
158 void uth_mutex_lock(uth_mutex_t *mtx)
159 {
160         parlib_run_once(&mtx->once_ctl, __uth_mutex_init, mtx);
161         uth_semaphore_down(mtx);
162 }
163
164 bool uth_mutex_trylock(uth_mutex_t *mtx)
165 {
166         parlib_run_once(&mtx->once_ctl, __uth_mutex_init, mtx);
167         return uth_semaphore_trydown(mtx);
168 }
169
170 void uth_mutex_unlock(uth_mutex_t *mtx)
171 {
172         uth_semaphore_up(mtx);
173 }
174
175 /************** Recursive mutexes **************/
176
177 static void __uth_recurse_mutex_init(void *arg)
178 {
179         struct uth_recurse_mutex *r_mtx = (struct uth_recurse_mutex*)arg;
180
181         __uth_mutex_init(&r_mtx->mtx);
182         /* Since we always manually call __uth_mutex_init(), there's no reason to
183          * mess with the regular mutex's static initializer.  Just say it's been
184          * done. */
185         parlib_set_ran_once(&r_mtx->mtx.once_ctl);
186         r_mtx->lockholder = NULL;
187         r_mtx->count = 0;
188 }
189
190 void uth_recurse_mutex_init(uth_recurse_mutex_t *r_mtx)
191 {
192         __uth_recurse_mutex_init(r_mtx);
193         parlib_set_ran_once(&r_mtx->once_ctl);
194 }
195
196 void uth_recurse_mutex_destroy(uth_recurse_mutex_t *r_mtx)
197 {
198         uth_semaphore_destroy(&r_mtx->mtx);
199 }
200
201 uth_recurse_mutex_t *uth_recurse_mutex_alloc(void)
202 {
203         struct uth_recurse_mutex *r_mtx = malloc(sizeof(struct uth_recurse_mutex));
204
205         assert(r_mtx);
206         uth_recurse_mutex_init(r_mtx);
207         return r_mtx;
208 }
209
210 void uth_recurse_mutex_free(uth_recurse_mutex_t *r_mtx)
211 {
212         uth_recurse_mutex_destroy(r_mtx);
213         free(r_mtx);
214 }
215
216 void uth_recurse_mutex_lock(uth_recurse_mutex_t *r_mtx)
217 {
218         parlib_run_once(&r_mtx->once_ctl, __uth_recurse_mutex_init, r_mtx);
219         assert(!in_vcore_context());
220         assert(current_uthread);
221         /* We don't have to worry about races on current_uthread or count.  They are
222          * only written by the initial lockholder, and this check will only be true
223          * for the initial lockholder, which cannot concurrently call this function
224          * twice (a thread is single-threaded).
225          *
226          * A signal handler running for a thread should not attempt to grab a
227          * recursive mutex (that's probably a bug).  If we need to support that,
228          * we'll have to disable notifs temporarily. */
229         if (r_mtx->lockholder == current_uthread) {
230                 r_mtx->count++;
231                 return;
232         }
233         uth_mutex_lock(&r_mtx->mtx);
234         r_mtx->lockholder = current_uthread;
235         r_mtx->count = 1;
236 }
237
238 bool uth_recurse_mutex_trylock(uth_recurse_mutex_t *r_mtx)
239 {
240         bool ret;
241
242         parlib_run_once(&r_mtx->once_ctl, __uth_recurse_mutex_init, r_mtx);
243         assert(!in_vcore_context());
244         assert(current_uthread);
245         if (r_mtx->lockholder == current_uthread) {
246                 r_mtx->count++;
247                 return TRUE;
248         }
249         ret = uth_mutex_trylock(&r_mtx->mtx);
250         if (ret) {
251                 r_mtx->lockholder = current_uthread;
252                 r_mtx->count = 1;
253         }
254         return ret;
255 }
256
257 void uth_recurse_mutex_unlock(uth_recurse_mutex_t *r_mtx)
258 {
259         r_mtx->count--;
260         if (!r_mtx->count) {
261                 r_mtx->lockholder = NULL;
262                 uth_mutex_unlock(&r_mtx->mtx);
263         }
264 }
265
266
267 /************** Condition Variables **************/
268
269
270 static void __uth_cond_var_init(void *arg)
271 {
272         struct uth_cond_var *cv = (struct uth_cond_var*)arg;
273
274         spin_pdr_init(&cv->lock);
275         cv->sync_obj = __uth_sync_alloc();
276 }
277
278 void uth_cond_var_init(uth_cond_var_t *cv)
279 {
280         __uth_cond_var_init(cv);
281         parlib_set_ran_once(&cv->once_ctl);
282 }
283
284 void uth_cond_var_destroy(uth_cond_var_t *cv)
285 {
286         __uth_sync_free(cv->sync_obj);
287 }
288
289 uth_cond_var_t *uth_cond_var_alloc(void)
290 {
291         struct uth_cond_var *cv;
292
293         cv = malloc(sizeof(struct uth_cond_var));
294         assert(cv);
295         uth_cond_var_init(cv);
296         return cv;
297 }
298
299 void uth_cond_var_free(uth_cond_var_t *cv)
300 {
301         uth_cond_var_destroy(cv);
302         free(cv);
303 }
304
305 struct uth_cv_link {
306         struct uth_cond_var                     *cv;
307         struct uth_semaphore            *mtx;
308 };
309
310 static void __cv_wait_cb(struct uthread *uth, void *arg)
311 {
312         struct uth_cv_link *link = (struct uth_cv_link*)arg;
313         struct uth_cond_var *cv = link->cv;
314         struct uth_semaphore *mtx = link->mtx;
315
316         /* We need to tell the 2LS that its thread blocked.  We need to do this
317          * before unlocking the cv, since as soon as we unlock, the cv could be
318          * signalled and our thread restarted.
319          *
320          * Also note the lock-ordering rule.  The cv lock is grabbed before any
321          * locks the 2LS might grab. */
322         uthread_has_blocked(uth, cv->sync_obj, UTH_EXT_BLK_MUTEX);
323         spin_pdr_unlock(&cv->lock);
324         /* This looks dangerous, since both the CV and MTX could use the
325          * uth->sync_next TAILQ_ENTRY (or whatever the 2LS uses), but the uthread
326          * never sleeps on both at the same time.  We *hold* the mtx - we aren't
327          * *sleeping* on it.  Sleeping uses the sync_next.  Holding it doesn't.
328          *
329          * Next, consider what happens as soon as we unlock the CV.  Our thread
330          * could get woken up, and then immediately try to grab the mtx and go to
331          * sleep! (see below).  If that happens, the uthread is no longer sleeping
332          * on the CV, and the sync_next is free.  The invariant is that a uthread
333          * can only sleep on one sync_object at a time. */
334         uth_mutex_unlock(mtx);
335 }
336
337 /* Caller holds mtx.  We will 'atomically' release it and wait.  On return,
338  * caller holds mtx again.  Once our uth is on the CV's list, we can release the
339  * mtx without fear of missing a signal.
340  *
341  * POSIX refers to atomicity in this context as "atomically with respect to
342  * access by another thread to the mutex and then the condition variable"
343  *
344  * The idea is that we hold the mutex to protect some invariant; we check it,
345  * and decide to sleep.  Now we get on the list before releasing so that any
346  * changes to that invariant (e.g. a flag is now TRUE) happen after we're on the
347  * list, and so that we don't miss the signal.  To be more clear, the invariant
348  * in a basic wake-up flag scenario is: "whenever a flag is set from FALSE to
349  * TRUE, all waiters that saw FALSE are on the CV's waitqueue."  The mutex is
350  * required for this invariant.
351  *
352  * Note that signal/broadcasters do not *need* to hold the mutex, in general,
353  * but they do in the basic wake-up flag scenario.  If not, the race is this:
354  *
355  * Sleeper:                                                             Waker:
356  * -----------------------------------------------------------------
357  * Hold mutex
358  *   See flag is False
359  *   Decide to sleep
360  *                                                                              Set flag True
361  * PAUSE!                                                               Grab CV lock
362  *                                                                              See list is empty, unlock
363  *
364  *   Grab CV lock
365  *     Get put on list
366  *   Unlock CV lock
367  * Unlock mutex
368  * (Never wake up; we missed the signal)
369  *
370  * For those familiar with the kernel's CVs, we don't couple mutexes with CVs.
371  * cv_lock() actually grabs the spinlock inside the CV and uses *that* to
372  * protect the invariant.  The signallers always grab that lock, so the sleeper
373  * is not in danger of missing the signal.  The tradeoff is that the kernel CVs
374  * use a spinlock instead of a mutex for protecting its invariant; there might
375  * be some case that preferred blocking sync.
376  *
377  * The uthread CVs take a mutex, unlike the kernel CVs, to map more cleanly to
378  * POSIX CVs.  Maybe one approach or the other is a bad idea; we'll see.
379  *
380  * As far as lock ordering goes, once the sleeper holds the mutex and is on the
381  * CV's list, it can unlock in any order it wants.  However, unlocking a mutex
382  * actually requires grabbing its spinlock.  So as to not have a lock ordering
383  * between *spinlocks*, we let go of the CV's spinlock before unlocking the
384  * mutex.  There is an ordering between the mutex and the CV spinlock (mutex->cv
385  * spin), but there is no ordering between the mutex spin and cv spin.  And of
386  * course, we need to unlock the CV spinlock in the yield callback.
387  *
388  * Also note that we use the external API for the mutex operations.  A 2LS could
389  * have their own mutex ops but still use the generic cv ops. */
390 void uth_cond_var_wait(uth_cond_var_t *cv, uth_mutex_t *mtx)
391 {
392         struct uth_cv_link link;
393
394         parlib_run_once(&cv->once_ctl, __uth_cond_var_init, cv);
395         link.cv = cv;
396         link.mtx = mtx;
397         spin_pdr_lock(&cv->lock);
398         uthread_yield(TRUE, __cv_wait_cb, &link);
399         uth_mutex_lock(mtx);
400 }
401
402 /* GCC wants this function, though its semantics are a little unclear.  I
403  * imagine you'd want to completely unlock it (say you locked it 3 times), and
404  * when you get it back, that you have your three locks back. */
405 void uth_cond_var_wait_recurse(uth_cond_var_t *cv, uth_recurse_mutex_t *r_mtx)
406 {
407         unsigned int old_count = r_mtx->count;
408
409         /* In cond_wait, we're going to unlock the internal mutex.  We'll do the
410          * prep-work for that now.  (invariant is that an unlocked r_mtx has no
411          * lockholder and count == 0. */
412         r_mtx->lockholder = NULL;
413         r_mtx->count = 0;
414         uth_cond_var_wait(cv, &r_mtx->mtx);
415         /* Now we hold the internal mutex again.  Need to restore the tracking. */
416         r_mtx->lockholder = current_uthread;
417         r_mtx->count = old_count;
418 }
419
420 void uth_cond_var_signal(uth_cond_var_t *cv)
421 {
422         struct uthread *uth;
423
424         parlib_run_once(&cv->once_ctl, __uth_cond_var_init, cv);
425         spin_pdr_lock(&cv->lock);
426         uth = __uth_sync_get_next(cv->sync_obj);
427         spin_pdr_unlock(&cv->lock);
428         if (uth)
429                 uthread_runnable(uth);
430 }
431
432 void uth_cond_var_broadcast(uth_cond_var_t *cv)
433 {
434         struct uth_tailq restartees = TAILQ_HEAD_INITIALIZER(restartees);
435         struct uthread *i, *safe;
436
437         parlib_run_once(&cv->once_ctl, __uth_cond_var_init, cv);
438         spin_pdr_lock(&cv->lock);
439         /* If this turns out to be slow or painful for 2LSs, we can implement a
440          * get_all or something (default used to use TAILQ_SWAP). */
441         while ((i = __uth_sync_get_next(cv->sync_obj))) {
442                 /* Once the uth is out of the sync obj, we can reuse sync_next. */
443                 TAILQ_INSERT_TAIL(&restartees, i, sync_next);
444         }
445         spin_pdr_unlock(&cv->lock);
446         /* Need the SAFE, since we can't touch the linkage once the uth could run */
447         TAILQ_FOREACH_SAFE(i, &restartees, sync_next, safe)
448                 uthread_runnable(i);
449 }
450
451
452 /************** Default Sync Obj Implementation **************/
453
454 static uth_sync_t uth_default_sync_alloc(void)
455 {
456         struct uth_tailq *tq;
457
458         tq = malloc(sizeof(struct uth_tailq));
459         assert(tq);
460         TAILQ_INIT(tq);
461         return (uth_sync_t)tq;
462 }
463
464 static void uth_default_sync_free(uth_sync_t sync)
465 {
466         struct uth_tailq *tq = (struct uth_tailq*)sync;
467
468         assert(TAILQ_EMPTY(tq));
469         free(tq);
470 }
471
472 static struct uthread *uth_default_sync_get_next(uth_sync_t sync)
473 {
474         struct uth_tailq *tq = (struct uth_tailq*)sync;
475         struct uthread *first;
476
477         first = TAILQ_FIRST(tq);
478         if (first)
479                 TAILQ_REMOVE(tq, first, sync_next);
480         return first;
481 }
482
483 static bool uth_default_sync_get_uth(uth_sync_t sync, struct uthread *uth)
484 {
485         struct uth_tailq *tq = (struct uth_tailq*)sync;
486         struct uthread *i;
487
488         TAILQ_FOREACH(i, tq, sync_next) {
489                 if (i == uth) {
490                         TAILQ_REMOVE(tq, i, sync_next);
491                         return TRUE;
492                 }
493         }
494         return FALSE;
495 }
496
497 /************** External uthread sync interface **************/
498
499 /* Called by the 2LS->has_blocked op, if they are using the default sync.*/
500 void __uth_default_sync_enqueue(struct uthread *uth, uth_sync_t sync)
501 {
502         struct uth_tailq *tq = (struct uth_tailq*)sync;
503
504         TAILQ_INSERT_TAIL(tq, uth, sync_next);
505 }
506
507 /* Called by 2LS-independent sync code when a sync object is created. */
508 uth_sync_t __uth_sync_alloc(void)
509 {
510         if (sched_ops->sync_alloc)
511                 return sched_ops->sync_alloc();
512         return uth_default_sync_alloc();
513 }
514
515 /* Called by 2LS-independent sync code when a sync object is destroyed. */
516 void __uth_sync_free(uth_sync_t sync)
517 {
518         if (sched_ops->sync_free) {
519                 sched_ops->sync_free(sync);
520                 return;
521         }
522         uth_default_sync_free(sync);
523 }
524
525 /* Called by 2LS-independent sync code when a thread needs to be woken. */
526 struct uthread *__uth_sync_get_next(uth_sync_t sync)
527 {
528         if (sched_ops->sync_get_next)
529                 return sched_ops->sync_get_next(sync);
530         return uth_default_sync_get_next(sync);
531 }
532
533 /* Called by 2LS-independent sync code when a specific thread needs to be woken.
534  * Returns TRUE if the uthread was blocked on the object, FALSE o/w. */
535 bool __uth_sync_get_uth(uth_sync_t sync, struct uthread *uth)
536 {
537         if (sched_ops->sync_get_uth)
538                 return sched_ops->sync_get_uth(sync, uth);
539         return uth_default_sync_get_uth(sync, uth);
540 }