Overhaul lock_test.R
[akaros.git] / user / parlib / mutex.c
1 /* Copyright (c) 2016-2017 Google, Inc.
2  * Barret Rhoden <brho@cs.berkeley.edu>
3  * See LICENSE for details. */
4
5 /* Generic Uthread Semaphores, Mutexes, CVs, and other synchronization
6  * functions.  2LSs implement their own sync objects (bottom of the file). */
7
8 #include <parlib/uthread.h>
9 #include <sys/queue.h>
10 #include <parlib/spinlock.h>
11 #include <parlib/alarm.h>
12 #include <parlib/assert.h>
13 #include <malloc.h>
14
15 struct timeout_blob {
16         bool                            timed_out;
17         struct uthread                  *uth;
18         uth_sync_t                      *sync_ptr;
19         struct spin_pdr_lock            *lock_ptr;
20 };
21
22 /* When sync primitives want to time out, they can use this alarm handler.  It
23  * needs a timeout_blob, which is independent of any particular sync method. */
24 static void timeout_handler(struct alarm_waiter *waiter)
25 {
26         struct timeout_blob *blob = (struct timeout_blob*)waiter->data;
27
28         spin_pdr_lock(blob->lock_ptr);
29         if (__uth_sync_get_uth(blob->sync_ptr, blob->uth))
30                 blob->timed_out = TRUE;
31         spin_pdr_unlock(blob->lock_ptr);
32         if (blob->timed_out)
33                 uthread_runnable(blob->uth);
34 }
35
36 /* Minor helper, sets a blob's fields */
37 static void set_timeout_blob(struct timeout_blob *blob, uth_sync_t *sync_ptr,
38                              struct spin_pdr_lock *lock_ptr)
39 {
40         blob->timed_out = FALSE;
41         blob->uth = current_uthread;
42         blob->sync_ptr = sync_ptr;
43         blob->lock_ptr = lock_ptr;
44 }
45
46 /* Minor helper, sets an alarm for blob and a timespec */
47 static void set_timeout_alarm(struct alarm_waiter *waiter,
48                               struct timeout_blob *blob,
49                               const struct timespec *abs_timeout)
50 {
51         init_awaiter(waiter, timeout_handler);
52         waiter->data = blob;
53         set_awaiter_abs_unix(waiter, timespec_to_alarm_time(abs_timeout));
54         set_alarm(waiter);
55 }
56
57 /************** Semaphores and Mutexes **************/
58
59 static void __uth_semaphore_init(void *arg)
60 {
61         struct uth_semaphore *sem = (struct uth_semaphore*)arg;
62
63         spin_pdr_init(&sem->lock);
64         __uth_sync_init(&sem->sync_obj);
65         /* If we used a static initializer for a semaphore, count is already
66          * set.  o/w it will be set by _alloc() or _init() (via
67          * uth_semaphore_init()). */
68 }
69
70 /* Initializes a sem acquired from somewhere else.  POSIX's sem_init() needs
71  * this. */
72 void uth_semaphore_init(uth_semaphore_t *sem, unsigned int count)
73 {
74         __uth_semaphore_init(sem);
75         sem->count = count;
76         /* The once is to make sure the object is initialized. */
77         parlib_set_ran_once(&sem->once_ctl);
78 }
79
80 /* Undoes whatever was done in init. */
81 void uth_semaphore_destroy(uth_semaphore_t *sem)
82 {
83         __uth_sync_destroy(&sem->sync_obj);
84 }
85
86 uth_semaphore_t *uth_semaphore_alloc(unsigned int count)
87 {
88         struct uth_semaphore *sem;
89
90         sem = malloc(sizeof(struct uth_semaphore));
91         assert(sem);
92         uth_semaphore_init(sem, count);
93         return sem;
94 }
95
96 void uth_semaphore_free(uth_semaphore_t *sem)
97 {
98         uth_semaphore_destroy(sem);
99         free(sem);
100 }
101
102 static void __semaphore_cb(struct uthread *uth, void *arg)
103 {
104         struct uth_semaphore *sem = (struct uth_semaphore*)arg;
105
106         /* We need to tell the 2LS that its thread blocked.  We need to do this
107          * before unlocking the sem, since as soon as we unlock, the sem could
108          * be released and our thread restarted.
109          *
110          * Also note the lock-ordering rule.  The sem lock is grabbed before any
111          * locks the 2LS might grab. */
112         uthread_has_blocked(uth, UTH_EXT_BLK_MUTEX);
113         __uth_sync_enqueue(uth, &sem->sync_obj);
114         spin_pdr_unlock(&sem->lock);
115 }
116
117 bool uth_semaphore_timed_down(uth_semaphore_t *sem,
118                               const struct timespec *abs_timeout)
119 {
120         struct alarm_waiter waiter[1];
121         struct timeout_blob blob[1];
122
123         assert_can_block();
124         parlib_run_once(&sem->once_ctl, __uth_semaphore_init, sem);
125         spin_pdr_lock(&sem->lock);
126         if (sem->count > 0) {
127                 /* Only down if we got one.  This means a sem with no more
128                  * counts is 0, not negative (where -count == nr_waiters).
129                  * Doing it this way means our timeout function works for sems
130                  * and CVs. */
131                 sem->count--;
132                 spin_pdr_unlock(&sem->lock);
133                 return TRUE;
134         }
135         if (abs_timeout) {
136                 set_timeout_blob(blob, &sem->sync_obj, &sem->lock);
137                 set_timeout_alarm(waiter, blob, abs_timeout);
138         }
139         /* the unlock and sync enqueuing is done in the yield callback.  as
140          * always, we need to do this part in vcore context, since as soon as we
141          * unlock the uthread could restart.  (atomically yield and unlock). */
142         uthread_yield(TRUE, __semaphore_cb, sem);
143         if (abs_timeout) {
144                 /* We're guaranteed the alarm will either be cancelled or the
145                  * handler complete when unset_alarm() returns. */
146                 unset_alarm(waiter);
147                 return blob->timed_out ? FALSE : TRUE;
148         }
149         return TRUE;
150 }
151
152 void uth_semaphore_down(uth_semaphore_t *sem)
153 {
154         uth_semaphore_timed_down(sem, NULL);
155 }
156
157 bool uth_semaphore_trydown(uth_semaphore_t *sem)
158 {
159         bool ret = FALSE;
160
161         assert_can_block();
162         parlib_run_once(&sem->once_ctl, __uth_semaphore_init, sem);
163         spin_pdr_lock(&sem->lock);
164         if (sem->count > 0) {
165                 sem->count--;
166                 ret = TRUE;
167         }
168         spin_pdr_unlock(&sem->lock);
169         return ret;
170 }
171
172 void uth_semaphore_up(uth_semaphore_t *sem)
173 {
174         struct uthread *uth;
175
176         /* once-ing the 'up', unlike mtxs 'unlock', since sems can be special.
177          */
178         parlib_run_once(&sem->once_ctl, __uth_semaphore_init, sem);
179         spin_pdr_lock(&sem->lock);
180         uth = __uth_sync_get_next(&sem->sync_obj);
181         /* If there was a waiter, we pass our resource/count to them. */
182         if (!uth)
183                 sem->count++;
184         spin_pdr_unlock(&sem->lock);
185         if (uth)
186                 uthread_runnable(uth);
187 }
188
189 /* Takes a void * since it's called by parlib_run_once(), which enables us to
190  * statically initialize the mutex.  This init does everything not done by the
191  * static initializer.  Note we do not allow 'static' destruction.  (No one
192  * calls free). */
193 static void __uth_mutex_init(void *arg)
194 {
195         struct uth_semaphore *mtx = (struct uth_semaphore*)arg;
196
197         __uth_semaphore_init(mtx);
198         mtx->count = 1;
199 }
200
201 void uth_mutex_init(uth_mutex_t *mtx)
202 {
203         __uth_mutex_init(mtx);
204         parlib_set_ran_once(&mtx->once_ctl);
205 }
206
207 void uth_mutex_destroy(uth_mutex_t *mtx)
208 {
209         uth_semaphore_destroy(mtx);
210 }
211
212 uth_mutex_t *uth_mutex_alloc(void)
213 {
214         struct uth_semaphore *mtx;
215
216         mtx = malloc(sizeof(struct uth_semaphore));
217         assert(mtx);
218         uth_mutex_init(mtx);
219         return mtx;
220 }
221
222 void uth_mutex_free(uth_mutex_t *mtx)
223 {
224         uth_semaphore_free(mtx);
225 }
226
227 bool uth_mutex_timed_lock(uth_mutex_t *mtx, const struct timespec *abs_timeout)
228 {
229         parlib_run_once(&mtx->once_ctl, __uth_mutex_init, mtx);
230         return uth_semaphore_timed_down(mtx, abs_timeout);
231 }
232
233 void uth_mutex_lock(uth_mutex_t *mtx)
234 {
235         parlib_run_once(&mtx->once_ctl, __uth_mutex_init, mtx);
236         uth_semaphore_down(mtx);
237 }
238
239 bool uth_mutex_trylock(uth_mutex_t *mtx)
240 {
241         parlib_run_once(&mtx->once_ctl, __uth_mutex_init, mtx);
242         return uth_semaphore_trydown(mtx);
243 }
244
245 void uth_mutex_unlock(uth_mutex_t *mtx)
246 {
247         uth_semaphore_up(mtx);
248 }
249
250 /************** Recursive mutexes **************/
251
252 static void __uth_recurse_mutex_init(void *arg)
253 {
254         struct uth_recurse_mutex *r_mtx = (struct uth_recurse_mutex*)arg;
255
256         __uth_mutex_init(&r_mtx->mtx);
257         /* Since we always manually call __uth_mutex_init(), there's no reason
258          * to mess with the regular mutex's static initializer.  Just say it's
259          * been done. */
260         parlib_set_ran_once(&r_mtx->mtx.once_ctl);
261         r_mtx->lockholder = NULL;
262         r_mtx->count = 0;
263 }
264
265 void uth_recurse_mutex_init(uth_recurse_mutex_t *r_mtx)
266 {
267         __uth_recurse_mutex_init(r_mtx);
268         parlib_set_ran_once(&r_mtx->once_ctl);
269 }
270
271 void uth_recurse_mutex_destroy(uth_recurse_mutex_t *r_mtx)
272 {
273         uth_semaphore_destroy(&r_mtx->mtx);
274 }
275
276 uth_recurse_mutex_t *uth_recurse_mutex_alloc(void)
277 {
278         struct uth_recurse_mutex *r_mtx =
279                 malloc(sizeof(struct uth_recurse_mutex));
280
281         assert(r_mtx);
282         uth_recurse_mutex_init(r_mtx);
283         return r_mtx;
284 }
285
286 void uth_recurse_mutex_free(uth_recurse_mutex_t *r_mtx)
287 {
288         uth_recurse_mutex_destroy(r_mtx);
289         free(r_mtx);
290 }
291
292 bool uth_recurse_mutex_timed_lock(uth_recurse_mutex_t *r_mtx,
293                                   const struct timespec *abs_timeout)
294 {
295         assert_can_block();
296         parlib_run_once(&r_mtx->once_ctl, __uth_recurse_mutex_init, r_mtx);
297         /* We don't have to worry about races on current_uthread or count.  They
298          * are only written by the initial lockholder, and this check will only
299          * be true for the initial lockholder, which cannot concurrently call
300          * this function twice (a thread is single-threaded).
301          *
302          * A signal handler running for a thread should not attempt to grab a
303          * recursive mutex (that's probably a bug).  If we need to support that,
304          * we'll have to disable notifs temporarily. */
305         if (r_mtx->lockholder == current_uthread) {
306                 r_mtx->count++;
307                 return TRUE;
308         }
309         if (!uth_mutex_timed_lock(&r_mtx->mtx, abs_timeout))
310                 return FALSE;
311         r_mtx->lockholder = current_uthread;
312         r_mtx->count = 1;
313         return TRUE;
314 }
315
316 void uth_recurse_mutex_lock(uth_recurse_mutex_t *r_mtx)
317 {
318         uth_recurse_mutex_timed_lock(r_mtx, NULL);
319 }
320
321 bool uth_recurse_mutex_trylock(uth_recurse_mutex_t *r_mtx)
322 {
323         bool ret;
324
325         assert_can_block();
326         parlib_run_once(&r_mtx->once_ctl, __uth_recurse_mutex_init, r_mtx);
327         if (r_mtx->lockholder == current_uthread) {
328                 r_mtx->count++;
329                 return TRUE;
330         }
331         ret = uth_mutex_trylock(&r_mtx->mtx);
332         if (ret) {
333                 r_mtx->lockholder = current_uthread;
334                 r_mtx->count = 1;
335         }
336         return ret;
337 }
338
339 void uth_recurse_mutex_unlock(uth_recurse_mutex_t *r_mtx)
340 {
341         r_mtx->count--;
342         if (!r_mtx->count) {
343                 r_mtx->lockholder = NULL;
344                 uth_mutex_unlock(&r_mtx->mtx);
345         }
346 }
347
348
349 /************** Condition Variables **************/
350
351
352 static void __uth_cond_var_init(void *arg)
353 {
354         struct uth_cond_var *cv = (struct uth_cond_var*)arg;
355
356         spin_pdr_init(&cv->lock);
357         __uth_sync_init(&cv->sync_obj);
358 }
359
360 void uth_cond_var_init(uth_cond_var_t *cv)
361 {
362         __uth_cond_var_init(cv);
363         parlib_set_ran_once(&cv->once_ctl);
364 }
365
366 void uth_cond_var_destroy(uth_cond_var_t *cv)
367 {
368         __uth_sync_destroy(&cv->sync_obj);
369 }
370
371 uth_cond_var_t *uth_cond_var_alloc(void)
372 {
373         struct uth_cond_var *cv;
374
375         cv = malloc(sizeof(struct uth_cond_var));
376         assert(cv);
377         uth_cond_var_init(cv);
378         return cv;
379 }
380
381 void uth_cond_var_free(uth_cond_var_t *cv)
382 {
383         uth_cond_var_destroy(cv);
384         free(cv);
385 }
386
387 struct uth_cv_link {
388         struct uth_cond_var                     *cv;
389         struct uth_semaphore            *mtx;
390 };
391
392 static void __cv_wait_cb(struct uthread *uth, void *arg)
393 {
394         struct uth_cv_link *link = (struct uth_cv_link*)arg;
395         struct uth_cond_var *cv = link->cv;
396         struct uth_semaphore *mtx = link->mtx;
397
398         /* We need to tell the 2LS that its thread blocked.  We need to do this
399          * before unlocking the cv, since as soon as we unlock, the cv could be
400          * signalled and our thread restarted.
401          *
402          * Also note the lock-ordering rule.  The cv lock is grabbed before any
403          * locks the 2LS might grab. */
404         uthread_has_blocked(uth, UTH_EXT_BLK_MUTEX);
405         __uth_sync_enqueue(uth, &cv->sync_obj);
406         spin_pdr_unlock(&cv->lock);
407         /* This looks dangerous, since both the CV and MTX could use the
408          * uth->sync_next TAILQ_ENTRY (or whatever the 2LS uses), but the
409          * uthread never sleeps on both at the same time.  We *hold* the mtx -
410          * we aren't *sleeping* on it.  Sleeping uses the sync_next.  Holding it
411          * doesn't.
412          *
413          * Next, consider what happens as soon as we unlock the CV.  Our thread
414          * could get woken up, and then immediately try to grab the mtx and go
415          * to sleep! (see below).  If that happens, the uthread is no longer
416          * sleeping on the CV, and the sync_next is free.  The invariant is that
417          * a uthread can only sleep on one sync_object at a time. */
418         if (mtx)
419                 uth_mutex_unlock(mtx);
420 }
421
422 /* Caller holds mtx.  We will 'atomically' release it and wait.  On return,
423  * caller holds mtx again.  Once our uth is on the CV's list, we can release the
424  * mtx without fear of missing a signal.
425  *
426  * POSIX refers to atomicity in this context as "atomically with respect to
427  * access by another thread to the mutex and then the condition variable"
428  *
429  * The idea is that we hold the mutex to protect some invariant; we check it,
430  * and decide to sleep.  Now we get on the list before releasing so that any
431  * changes to that invariant (e.g. a flag is now TRUE) happen after we're on the
432  * list, and so that we don't miss the signal.  To be more clear, the invariant
433  * in a basic wake-up flag scenario is: "whenever a flag is set from FALSE to
434  * TRUE, all waiters that saw FALSE are on the CV's waitqueue."  The mutex is
435  * required for this invariant.
436  *
437  * Note that signal/broadcasters do not *need* to hold the mutex, in general,
438  * but they do in the basic wake-up flag scenario.  If not, the race is this:
439  *
440  * Sleeper:                                     Waker:
441  * -----------------------------------------------------------------
442  * Hold mutex
443  *   See flag is False
444  *   Decide to sleep
445  *                                              Set flag True
446  * PAUSE!                                       Grab CV lock
447  *                                              See list is empty, unlock
448  *
449  *   Grab CV lock
450  *     Get put on list
451  *   Unlock CV lock
452  * Unlock mutex
453  * (Never wake up; we missed the signal)
454  *
455  * For those familiar with the kernel's CVs, we don't couple mutexes with CVs.
456  * cv_lock() actually grabs the spinlock inside the CV and uses *that* to
457  * protect the invariant.  The signallers always grab that lock, so the sleeper
458  * is not in danger of missing the signal.  The tradeoff is that the kernel CVs
459  * use a spinlock instead of a mutex for protecting its invariant; there might
460  * be some case that preferred blocking sync.
461  *
462  * The uthread CVs take a mutex, unlike the kernel CVs, to map more cleanly to
463  * POSIX CVs.  Maybe one approach or the other is a bad idea; we'll see.
464  * However, we need both approaces in userspace.  To that end, we also support
465  * mutex-less CVs, where the synchronization typically provided by the mutex is
466  * provided by the CV's spinlock.  Just pass NULL for the mutex.  This is
467  * primarily useful for CVs that are signalled from event handlers in vcore
468  * context, since that code cannot block on a mutex and thus cannot use the
469  * mutex to avoid the races mentioned above.
470  *
471  * As far as lock ordering goes, once the sleeper holds the mutex and is on the
472  * CV's list, it can unlock in any order it wants.  However, unlocking a mutex
473  * actually requires grabbing its spinlock.  So as to not have a lock ordering
474  * between *spinlocks*, we let go of the CV's spinlock before unlocking the
475  * mutex.  There is an ordering between the mutex and the CV spinlock (mutex->cv
476  * spin), but there is no ordering between the mutex spin and cv spin.  And of
477  * course, we need to unlock the CV spinlock in the yield callback.
478  *
479  * Also note that we use the external API for the mutex operations.  A 2LS could
480  * have their own mutex ops but still use the generic cv ops. */
481 bool uth_cond_var_timed_wait(uth_cond_var_t *cv, uth_mutex_t *mtx,
482                              const struct timespec *abs_timeout)
483 {
484         struct uth_cv_link link;
485         struct alarm_waiter waiter[1];
486         struct timeout_blob blob[1];
487         bool ret = TRUE;
488
489         /* We're holding the CV PDR lock, so we lose the ability to detect
490          * blocking violations. */
491         if (mtx)
492                 assert_can_block();
493         parlib_run_once(&cv->once_ctl, __uth_cond_var_init, cv);
494         link.cv = cv;
495         link.mtx = mtx;
496         if (mtx)
497                 spin_pdr_lock(&cv->lock);
498         if (abs_timeout) {
499                 set_timeout_blob(blob, &cv->sync_obj, &cv->lock);
500                 set_timeout_alarm(waiter, blob, abs_timeout);
501         }
502         uthread_yield(TRUE, __cv_wait_cb, &link);
503         if (abs_timeout) {
504                 unset_alarm(waiter);
505                 ret = blob->timed_out ? FALSE : TRUE;
506         }
507         if (mtx)
508                 uth_mutex_lock(mtx);
509         else
510                 spin_pdr_lock(&cv->lock);
511         return ret;
512 }
513
514 void uth_cond_var_wait(uth_cond_var_t *cv, uth_mutex_t *mtx)
515 {
516         uth_cond_var_timed_wait(cv, mtx, NULL);
517 }
518
519 /* GCC doesn't list this as one of the C++0x functions, but it's easy to do and
520  * implement uth_cond_var_wait_recurse() with it, just like for all the other
521  * 'timed' functions.
522  *
523  * Note the timeout applies to getting the signal on the CV, not on reacquiring
524  * the mutex. */
525 bool uth_cond_var_timed_wait_recurse(uth_cond_var_t *cv,
526                                      uth_recurse_mutex_t *r_mtx,
527                                      const struct timespec *abs_timeout)
528 {
529         unsigned int old_count = r_mtx->count;
530         bool ret;
531
532         /* In cond_wait, we're going to unlock the internal mutex.  We'll do the
533          * prep-work for that now.  (invariant is that an unlocked r_mtx has no
534          * lockholder and count == 0. */
535         r_mtx->lockholder = NULL;
536         r_mtx->count = 0;
537         ret = uth_cond_var_timed_wait(cv, &r_mtx->mtx, abs_timeout);
538         /* Now we hold the internal mutex again.  Need to restore the tracking.
539          */
540         r_mtx->lockholder = current_uthread;
541         r_mtx->count = old_count;
542         return ret;
543 }
544
545 /* GCC wants this function, though its semantics are a little unclear.  I
546  * imagine you'd want to completely unlock it (say you locked it 3 times), and
547  * when you get it back, that you have your three locks back. */
548 void uth_cond_var_wait_recurse(uth_cond_var_t *cv, uth_recurse_mutex_t *r_mtx)
549 {
550         uth_cond_var_timed_wait_recurse(cv, r_mtx, NULL);
551 }
552
553 /* Caller holds the CV lock.  Returns a uth that needs to be woken up (or NULL),
554  * which the caller needs to do with uthread_runnable(). */
555 struct uthread *__uth_cond_var_wake_one(uth_cond_var_t *cv)
556 {
557         return __uth_sync_get_next(&cv->sync_obj);
558 }
559
560 /* Caller holds the CV lock. */
561 void __uth_cond_var_signal_and_unlock(uth_cond_var_t *cv)
562 {
563         struct uthread *uth = __uth_cond_var_wake_one(cv);
564
565         spin_pdr_unlock(&cv->lock);
566         if (uth)
567                 uthread_runnable(uth);
568 }
569
570 void uth_cond_var_signal(uth_cond_var_t *cv)
571 {
572         parlib_run_once(&cv->once_ctl, __uth_cond_var_init, cv);
573
574         spin_pdr_lock(&cv->lock);
575         __uth_cond_var_signal_and_unlock(cv);
576 }
577
578 /* Caller holds the CV lock.  Returns true if the restartees need to be woken
579  * up, which the caller needs to do with __uth_sync_wake_all(). */
580 bool __uth_cond_var_wake_all(uth_cond_var_t *cv, uth_sync_t *restartees)
581 {
582         if (__uth_sync_is_empty(&cv->sync_obj))
583                 return false;
584         __uth_sync_init(restartees);
585         __uth_sync_swap(restartees, &cv->sync_obj);
586         return true;
587 }
588
589 /* Caller holds the CV lock. */
590 void __uth_cond_var_broadcast_and_unlock(uth_cond_var_t *cv)
591 {
592         uth_sync_t restartees;
593         bool wake;
594
595         wake = __uth_cond_var_wake_all(cv, &restartees);
596         spin_pdr_unlock(&cv->lock);
597         if (wake)
598                 __uth_sync_wake_all(&restartees);
599 }
600
601 void uth_cond_var_broadcast(uth_cond_var_t *cv)
602 {
603         parlib_run_once(&cv->once_ctl, __uth_cond_var_init, cv);
604
605         spin_pdr_lock(&cv->lock);
606         __uth_cond_var_broadcast_and_unlock(cv);
607 }
608
609 /* Similar to the kernel, we can grab the CV's spinlock directly and use that
610  * for synchronization.  This is primarily so we can signal/broadcast from vcore
611  * context, and you typically need to hold some lock when changing state before
612  * signalling. */
613 void uth_cond_var_lock(uth_cond_var_t *cv)
614 {
615         parlib_run_once(&cv->once_ctl, __uth_cond_var_init, cv);
616
617         spin_pdr_lock(&cv->lock);
618 }
619
620 void uth_cond_var_unlock(uth_cond_var_t *cv)
621 {
622         spin_pdr_unlock(&cv->lock);
623 }
624
625 /************** Reader-writer Sleeping Locks **************/
626
627
628 static void __uth_rwlock_init(void *arg)
629 {
630         struct uth_rwlock *rwl = (struct uth_rwlock*)arg;
631
632         spin_pdr_init(&rwl->lock);
633         rwl->nr_readers = 0;
634         rwl->has_writer = FALSE;
635         __uth_sync_init(&rwl->readers);
636         __uth_sync_init(&rwl->writers);
637 }
638
639 void uth_rwlock_init(uth_rwlock_t *rwl)
640 {
641         __uth_rwlock_init(rwl);
642         parlib_set_ran_once(&rwl->once_ctl);
643 }
644
645 void uth_rwlock_destroy(uth_rwlock_t *rwl)
646 {
647         __uth_sync_destroy(&rwl->readers);
648         __uth_sync_destroy(&rwl->writers);
649 }
650
651 uth_rwlock_t *uth_rwlock_alloc(void)
652 {
653         struct uth_rwlock *rwl;
654
655         rwl = malloc(sizeof(struct uth_rwlock));
656         assert(rwl);
657         uth_rwlock_init(rwl);
658         return rwl;
659 }
660
661 void uth_rwlock_free(uth_rwlock_t *rwl)
662 {
663         uth_rwlock_destroy(rwl);
664         free(rwl);
665 }
666
667 /* Readers and writers block until they have the lock.  The delicacies are dealt
668  * with by the unlocker. */
669 static void __rwlock_rd_cb(struct uthread *uth, void *arg)
670 {
671         struct uth_rwlock *rwl = (struct uth_rwlock*)arg;
672
673         uthread_has_blocked(uth, UTH_EXT_BLK_MUTEX);
674         __uth_sync_enqueue(uth, &rwl->readers);
675         spin_pdr_unlock(&rwl->lock);
676 }
677
678 void uth_rwlock_rdlock(uth_rwlock_t *rwl)
679 {
680         assert_can_block();
681         parlib_run_once(&rwl->once_ctl, __uth_rwlock_init, rwl);
682         spin_pdr_lock(&rwl->lock);
683         /* Readers always make progress when there is no writer */
684         if (!rwl->has_writer) {
685                 rwl->nr_readers++;
686                 spin_pdr_unlock(&rwl->lock);
687                 return;
688         }
689         uthread_yield(TRUE, __rwlock_rd_cb, rwl);
690 }
691
692 bool uth_rwlock_try_rdlock(uth_rwlock_t *rwl)
693 {
694         bool ret = FALSE;
695
696         assert_can_block();
697         parlib_run_once(&rwl->once_ctl, __uth_rwlock_init, rwl);
698         spin_pdr_lock(&rwl->lock);
699         if (!rwl->has_writer) {
700                 rwl->nr_readers++;
701                 ret = TRUE;
702         }
703         spin_pdr_unlock(&rwl->lock);
704         return ret;
705 }
706
707 static void __rwlock_wr_cb(struct uthread *uth, void *arg)
708 {
709         struct uth_rwlock *rwl = (struct uth_rwlock*)arg;
710
711         uthread_has_blocked(uth, UTH_EXT_BLK_MUTEX);
712         __uth_sync_enqueue(uth, &rwl->writers);
713         spin_pdr_unlock(&rwl->lock);
714 }
715
716 void uth_rwlock_wrlock(uth_rwlock_t *rwl)
717 {
718         assert_can_block();
719         parlib_run_once(&rwl->once_ctl, __uth_rwlock_init, rwl);
720         spin_pdr_lock(&rwl->lock);
721         /* Writers require total mutual exclusion - no writers or readers */
722         if (!rwl->has_writer && !rwl->nr_readers) {
723                 rwl->has_writer = TRUE;
724                 spin_pdr_unlock(&rwl->lock);
725                 return;
726         }
727         uthread_yield(TRUE, __rwlock_wr_cb, rwl);
728 }
729
730 bool uth_rwlock_try_wrlock(uth_rwlock_t *rwl)
731 {
732         bool ret = FALSE;
733
734         assert_can_block();
735         parlib_run_once(&rwl->once_ctl, __uth_rwlock_init, rwl);
736         spin_pdr_lock(&rwl->lock);
737         if (!rwl->has_writer && !rwl->nr_readers) {
738                 rwl->has_writer = TRUE;
739                 ret = TRUE;
740         }
741         spin_pdr_unlock(&rwl->lock);
742         return ret;
743 }
744
745 /* Let's try to wake writers (yes, this is a policy decision), and if none, wake
746  * all the readers.  The invariant there is that if there is no writer, then
747  * there are no waiting readers. */
748 static void __rw_unlock_writer(struct uth_rwlock *rwl,
749                                struct uth_tailq *restartees)
750 {
751         struct uthread *uth;
752
753         uth = __uth_sync_get_next(&rwl->writers);
754         if (uth) {
755                 TAILQ_INSERT_TAIL(restartees, uth, sync_next);
756         } else {
757                 rwl->has_writer = FALSE;
758                 while ((uth = __uth_sync_get_next(&rwl->readers))) {
759                         TAILQ_INSERT_TAIL(restartees, uth, sync_next);
760                         rwl->nr_readers++;
761                 }
762         }
763 }
764
765 static void __rw_unlock_reader(struct uth_rwlock *rwl,
766                                struct uth_tailq *restartees)
767 {
768         struct uthread *uth;
769
770         rwl->nr_readers--;
771         if (!rwl->nr_readers) {
772                 uth = __uth_sync_get_next(&rwl->writers);
773                 if (uth) {
774                         TAILQ_INSERT_TAIL(restartees, uth, sync_next);
775                         rwl->has_writer = TRUE;
776                 }
777         }
778 }
779
780 /* Unlock works for either readers or writer locks.  You can tell which you were
781  * based on whether has_writer is set or not. */
782 void uth_rwlock_unlock(uth_rwlock_t *rwl)
783 {
784         struct uth_tailq restartees = TAILQ_HEAD_INITIALIZER(restartees);
785         struct uthread *i, *safe;
786
787         spin_pdr_lock(&rwl->lock);
788         if (rwl->has_writer)
789                 __rw_unlock_writer(rwl, &restartees);
790         else
791                 __rw_unlock_reader(rwl, &restartees);
792         spin_pdr_unlock(&rwl->lock);
793         TAILQ_FOREACH_SAFE(i, &restartees, sync_next, safe)
794                 uthread_runnable(i);
795 }
796
797
798 /************** Default Sync Obj Implementation **************/
799
800 static void uth_default_sync_init(uth_sync_t *sync)
801 {
802         struct uth_tailq *tq = (struct uth_tailq*)sync;
803
804         parlib_static_assert(sizeof(struct uth_tailq) <= sizeof(uth_sync_t));
805         TAILQ_INIT(tq);
806 }
807
808 static void uth_default_sync_destroy(uth_sync_t *sync)
809 {
810         struct uth_tailq *tq = (struct uth_tailq*)sync;
811
812         assert(TAILQ_EMPTY(tq));
813 }
814
815 static void uth_default_sync_enqueue(struct uthread *uth, uth_sync_t *sync)
816 {
817         struct uth_tailq *tq = (struct uth_tailq*)sync;
818
819         TAILQ_INSERT_TAIL(tq, uth, sync_next);
820 }
821
822 static struct uthread *uth_default_sync_get_next(uth_sync_t *sync)
823 {
824         struct uth_tailq *tq = (struct uth_tailq*)sync;
825         struct uthread *first;
826
827         first = TAILQ_FIRST(tq);
828         if (first)
829                 TAILQ_REMOVE(tq, first, sync_next);
830         return first;
831 }
832
833 static bool uth_default_sync_get_uth(uth_sync_t *sync, struct uthread *uth)
834 {
835         struct uth_tailq *tq = (struct uth_tailq*)sync;
836         struct uthread *i;
837
838         TAILQ_FOREACH(i, tq, sync_next) {
839                 if (i == uth) {
840                         TAILQ_REMOVE(tq, i, sync_next);
841                         return TRUE;
842                 }
843         }
844         return FALSE;
845 }
846
847 static void uth_default_sync_swap(uth_sync_t *a, uth_sync_t *b)
848 {
849         struct uth_tailq *tq_a = (struct uth_tailq*)a;
850         struct uth_tailq *tq_b = (struct uth_tailq*)b;
851
852         TAILQ_SWAP(tq_a, tq_b, uthread, sync_next);
853 }
854
855 static bool uth_default_sync_is_empty(uth_sync_t *sync)
856 {
857         struct uth_tailq *tq = (struct uth_tailq*)sync;
858
859         return TAILQ_EMPTY(tq);
860 }
861
862 /************** External uthread sync interface **************/
863
864 /* Called by 2LS-independent sync code when a sync object needs initialized. */
865 void __uth_sync_init(uth_sync_t *sync)
866 {
867         if (sched_ops->sync_init) {
868                 sched_ops->sync_init(sync);
869                 return;
870         }
871         uth_default_sync_init(sync);
872 }
873
874 /* Called by 2LS-independent sync code when a sync object is destroyed. */
875 void __uth_sync_destroy(uth_sync_t *sync)
876 {
877         if (sched_ops->sync_destroy) {
878                 sched_ops->sync_destroy(sync);
879                 return;
880         }
881         uth_default_sync_destroy(sync);
882 }
883
884 /* Called by 2LS-independent sync code when a thread blocks on sync */
885 void __uth_sync_enqueue(struct uthread *uth, uth_sync_t *sync)
886 {
887         if (sched_ops->sync_enqueue) {
888                 sched_ops->sync_enqueue(uth, sync);
889                 return;
890         }
891         uth_default_sync_enqueue(uth, sync);
892 }
893
894 /* Called by 2LS-independent sync code when a thread needs to be woken. */
895 struct uthread *__uth_sync_get_next(uth_sync_t *sync)
896 {
897         if (sched_ops->sync_get_next)
898                 return sched_ops->sync_get_next(sync);
899         return uth_default_sync_get_next(sync);
900 }
901
902 /* Called by 2LS-independent sync code when a specific thread needs to be woken.
903  * Returns TRUE if the uthread was blocked on the object, FALSE o/w. */
904 bool __uth_sync_get_uth(uth_sync_t *sync, struct uthread *uth)
905 {
906         if (sched_ops->sync_get_uth)
907                 return sched_ops->sync_get_uth(sync, uth);
908         return uth_default_sync_get_uth(sync, uth);
909 }
910
911 /* Called by 2LS-independent sync code to swap members of sync objects. */
912 void __uth_sync_swap(uth_sync_t *a, uth_sync_t *b)
913 {
914         if (sched_ops->sync_swap) {
915                 sched_ops->sync_swap(a, b);
916                 return;
917         }
918         uth_default_sync_swap(a, b);
919 }
920
921 /* Called by 2LS-independent sync code */
922 bool __uth_sync_is_empty(uth_sync_t *sync)
923 {
924         if (sched_ops->sync_is_empty)
925                 return sched_ops->sync_is_empty(sync);
926         return uth_default_sync_is_empty(sync);
927 }
928
929 /* Called by 2LS-independent sync code to wake up all uths on sync.  You should
930  * probably not hold locks while you do this - swap the items to a local sync
931  * object first. */
932 void __uth_sync_wake_all(uth_sync_t *wakees)
933 {
934         struct uthread *uth_i;
935
936         if (sched_ops->thread_bulk_runnable) {
937                 sched_ops->thread_bulk_runnable(wakees);
938         } else {
939                 while ((uth_i = __uth_sync_get_next(wakees)))
940                         uthread_runnable(uth_i);
941         }
942 }