parlib: Catch illegal block attempts
[akaros.git] / user / parlib / mutex.c
1 /* Copyright (c) 2016-2017 Google, Inc.
2  * Barret Rhoden <brho@cs.berkeley.edu>
3  * See LICENSE for details. */
4
5 /* Generic Uthread Semaphores, Mutexes, CVs, and other synchronization
6  * functions.  2LSs implement their own sync objects (bottom of the file). */
7
8 #include <parlib/uthread.h>
9 #include <sys/queue.h>
10 #include <parlib/spinlock.h>
11 #include <parlib/alarm.h>
12 #include <parlib/assert.h>
13 #include <malloc.h>
14
15 struct timeout_blob {
16         bool                                            timed_out;
17         struct uthread                          *uth;
18         uth_sync_t                                      *sync_ptr;
19         struct spin_pdr_lock            *lock_ptr;
20 };
21
22 /* When sync primitives want to time out, they can use this alarm handler.  It
23  * needs a timeout_blob, which is independent of any particular sync method. */
24 static void timeout_handler(struct alarm_waiter *waiter)
25 {
26         struct timeout_blob *blob = (struct timeout_blob*)waiter->data;
27
28         spin_pdr_lock(blob->lock_ptr);
29         if (__uth_sync_get_uth(blob->sync_ptr, blob->uth))
30                 blob->timed_out = TRUE;
31         spin_pdr_unlock(blob->lock_ptr);
32         if (blob->timed_out)
33                 uthread_runnable(blob->uth);
34 }
35
36 /* Minor helper, sets a blob's fields */
37 static void set_timeout_blob(struct timeout_blob *blob, uth_sync_t *sync_ptr,
38                              struct spin_pdr_lock *lock_ptr)
39 {
40         blob->timed_out = FALSE;
41         blob->uth = current_uthread;
42         blob->sync_ptr = sync_ptr;
43         blob->lock_ptr = lock_ptr;
44 }
45
46 /* Minor helper, sets an alarm for blob and a timespec */
47 static void set_timeout_alarm(struct alarm_waiter *waiter,
48                               struct timeout_blob *blob,
49                               const struct timespec *abs_timeout)
50 {
51         init_awaiter(waiter, timeout_handler);
52         waiter->data = blob;
53         set_awaiter_abs_unix(waiter, timespec_to_alarm_time(abs_timeout));
54         set_alarm(waiter);
55 }
56
57 /************** Semaphores and Mutexes **************/
58
59 static void __uth_semaphore_init(void *arg)
60 {
61         struct uth_semaphore *sem = (struct uth_semaphore*)arg;
62
63         spin_pdr_init(&sem->lock);
64         __uth_sync_init(&sem->sync_obj);
65         /* If we used a static initializer for a semaphore, count is already set.
66          * o/w it will be set by _alloc() or _init() (via uth_semaphore_init()). */
67 }
68
69 /* Initializes a sem acquired from somewhere else.  POSIX's sem_init() needs
70  * this. */
71 void uth_semaphore_init(uth_semaphore_t *sem, unsigned int count)
72 {
73         __uth_semaphore_init(sem);
74         sem->count = count;
75         /* The once is to make sure the object is initialized. */
76         parlib_set_ran_once(&sem->once_ctl);
77 }
78
79 /* Undoes whatever was done in init. */
80 void uth_semaphore_destroy(uth_semaphore_t *sem)
81 {
82         __uth_sync_destroy(&sem->sync_obj);
83 }
84
85 uth_semaphore_t *uth_semaphore_alloc(unsigned int count)
86 {
87         struct uth_semaphore *sem;
88
89         sem = malloc(sizeof(struct uth_semaphore));
90         assert(sem);
91         uth_semaphore_init(sem, count);
92         return sem;
93 }
94
95 void uth_semaphore_free(uth_semaphore_t *sem)
96 {
97         uth_semaphore_destroy(sem);
98         free(sem);
99 }
100
101 static void __semaphore_cb(struct uthread *uth, void *arg)
102 {
103         struct uth_semaphore *sem = (struct uth_semaphore*)arg;
104
105         /* We need to tell the 2LS that its thread blocked.  We need to do this
106          * before unlocking the sem, since as soon as we unlock, the sem could be
107          * released and our thread restarted.
108          *
109          * Also note the lock-ordering rule.  The sem lock is grabbed before any
110          * locks the 2LS might grab. */
111         uthread_has_blocked(uth, &sem->sync_obj, UTH_EXT_BLK_MUTEX);
112         spin_pdr_unlock(&sem->lock);
113 }
114
115 bool uth_semaphore_timed_down(uth_semaphore_t *sem,
116                               const struct timespec *abs_timeout)
117 {
118         struct alarm_waiter waiter[1];
119         struct timeout_blob blob[1];
120
121         assert_can_block();
122         parlib_run_once(&sem->once_ctl, __uth_semaphore_init, sem);
123         spin_pdr_lock(&sem->lock);
124         if (sem->count > 0) {
125                 /* Only down if we got one.  This means a sem with no more counts is 0,
126                  * not negative (where -count == nr_waiters).  Doing it this way means
127                  * our timeout function works for sems and CVs. */
128                 sem->count--;
129                 spin_pdr_unlock(&sem->lock);
130                 return TRUE;
131         }
132         if (abs_timeout) {
133                 set_timeout_blob(blob, &sem->sync_obj, &sem->lock);
134                 set_timeout_alarm(waiter, blob, abs_timeout);
135         }
136         /* the unlock and sync enqueuing is done in the yield callback.  as always,
137          * we need to do this part in vcore context, since as soon as we unlock the
138          * uthread could restart.  (atomically yield and unlock). */
139         uthread_yield(TRUE, __semaphore_cb, sem);
140         if (abs_timeout) {
141                 /* We're guaranteed the alarm will either be cancelled or the handler
142                  * complete when unset_alarm() returns. */
143                 unset_alarm(waiter);
144                 return blob->timed_out ? FALSE : TRUE;
145         }
146         return TRUE;
147 }
148
149 void uth_semaphore_down(uth_semaphore_t *sem)
150 {
151         uth_semaphore_timed_down(sem, NULL);
152 }
153
154 bool uth_semaphore_trydown(uth_semaphore_t *sem)
155 {
156         bool ret = FALSE;
157
158         assert_can_block();
159         parlib_run_once(&sem->once_ctl, __uth_semaphore_init, sem);
160         spin_pdr_lock(&sem->lock);
161         if (sem->count > 0) {
162                 sem->count--;
163                 ret = TRUE;
164         }
165         spin_pdr_unlock(&sem->lock);
166         return ret;
167 }
168
169 void uth_semaphore_up(uth_semaphore_t *sem)
170 {
171         struct uthread *uth;
172
173         /* once-ing the 'up', unlike mtxs 'unlock', since sems can be special. */
174         parlib_run_once(&sem->once_ctl, __uth_semaphore_init, sem);
175         spin_pdr_lock(&sem->lock);
176         uth = __uth_sync_get_next(&sem->sync_obj);
177         /* If there was a waiter, we pass our resource/count to them. */
178         if (!uth)
179                 sem->count++;
180         spin_pdr_unlock(&sem->lock);
181         if (uth)
182                 uthread_runnable(uth);
183 }
184
185 /* Takes a void * since it's called by parlib_run_once(), which enables us to
186  * statically initialize the mutex.  This init does everything not done by the
187  * static initializer.  Note we do not allow 'static' destruction.  (No one
188  * calls free). */
189 static void __uth_mutex_init(void *arg)
190 {
191         struct uth_semaphore *mtx = (struct uth_semaphore*)arg;
192
193         __uth_semaphore_init(mtx);
194         mtx->count = 1;
195 }
196
197 void uth_mutex_init(uth_mutex_t *mtx)
198 {
199         __uth_mutex_init(mtx);
200         parlib_set_ran_once(&mtx->once_ctl);
201 }
202
203 void uth_mutex_destroy(uth_mutex_t *mtx)
204 {
205         uth_semaphore_destroy(mtx);
206 }
207
208 uth_mutex_t *uth_mutex_alloc(void)
209 {
210         struct uth_semaphore *mtx;
211
212         mtx = malloc(sizeof(struct uth_semaphore));
213         assert(mtx);
214         uth_mutex_init(mtx);
215         return mtx;
216 }
217
218 void uth_mutex_free(uth_mutex_t *mtx)
219 {
220         uth_semaphore_free(mtx);
221 }
222
223 bool uth_mutex_timed_lock(uth_mutex_t *mtx, const struct timespec *abs_timeout)
224 {
225         parlib_run_once(&mtx->once_ctl, __uth_mutex_init, mtx);
226         return uth_semaphore_timed_down(mtx, abs_timeout);
227 }
228
229 void uth_mutex_lock(uth_mutex_t *mtx)
230 {
231         parlib_run_once(&mtx->once_ctl, __uth_mutex_init, mtx);
232         uth_semaphore_down(mtx);
233 }
234
235 bool uth_mutex_trylock(uth_mutex_t *mtx)
236 {
237         parlib_run_once(&mtx->once_ctl, __uth_mutex_init, mtx);
238         return uth_semaphore_trydown(mtx);
239 }
240
241 void uth_mutex_unlock(uth_mutex_t *mtx)
242 {
243         uth_semaphore_up(mtx);
244 }
245
246 /************** Recursive mutexes **************/
247
248 static void __uth_recurse_mutex_init(void *arg)
249 {
250         struct uth_recurse_mutex *r_mtx = (struct uth_recurse_mutex*)arg;
251
252         __uth_mutex_init(&r_mtx->mtx);
253         /* Since we always manually call __uth_mutex_init(), there's no reason to
254          * mess with the regular mutex's static initializer.  Just say it's been
255          * done. */
256         parlib_set_ran_once(&r_mtx->mtx.once_ctl);
257         r_mtx->lockholder = NULL;
258         r_mtx->count = 0;
259 }
260
261 void uth_recurse_mutex_init(uth_recurse_mutex_t *r_mtx)
262 {
263         __uth_recurse_mutex_init(r_mtx);
264         parlib_set_ran_once(&r_mtx->once_ctl);
265 }
266
267 void uth_recurse_mutex_destroy(uth_recurse_mutex_t *r_mtx)
268 {
269         uth_semaphore_destroy(&r_mtx->mtx);
270 }
271
272 uth_recurse_mutex_t *uth_recurse_mutex_alloc(void)
273 {
274         struct uth_recurse_mutex *r_mtx = malloc(sizeof(struct uth_recurse_mutex));
275
276         assert(r_mtx);
277         uth_recurse_mutex_init(r_mtx);
278         return r_mtx;
279 }
280
281 void uth_recurse_mutex_free(uth_recurse_mutex_t *r_mtx)
282 {
283         uth_recurse_mutex_destroy(r_mtx);
284         free(r_mtx);
285 }
286
287 bool uth_recurse_mutex_timed_lock(uth_recurse_mutex_t *r_mtx,
288                                   const struct timespec *abs_timeout)
289 {
290         assert_can_block();
291         parlib_run_once(&r_mtx->once_ctl, __uth_recurse_mutex_init, r_mtx);
292         /* We don't have to worry about races on current_uthread or count.  They are
293          * only written by the initial lockholder, and this check will only be true
294          * for the initial lockholder, which cannot concurrently call this function
295          * twice (a thread is single-threaded).
296          *
297          * A signal handler running for a thread should not attempt to grab a
298          * recursive mutex (that's probably a bug).  If we need to support that,
299          * we'll have to disable notifs temporarily. */
300         if (r_mtx->lockholder == current_uthread) {
301                 r_mtx->count++;
302                 return TRUE;
303         }
304         if (!uth_mutex_timed_lock(&r_mtx->mtx, abs_timeout))
305                 return FALSE;
306         r_mtx->lockholder = current_uthread;
307         r_mtx->count = 1;
308         return TRUE;
309 }
310
311 void uth_recurse_mutex_lock(uth_recurse_mutex_t *r_mtx)
312 {
313         uth_recurse_mutex_timed_lock(r_mtx, NULL);
314 }
315
316 bool uth_recurse_mutex_trylock(uth_recurse_mutex_t *r_mtx)
317 {
318         bool ret;
319
320         assert_can_block();
321         parlib_run_once(&r_mtx->once_ctl, __uth_recurse_mutex_init, r_mtx);
322         if (r_mtx->lockholder == current_uthread) {
323                 r_mtx->count++;
324                 return TRUE;
325         }
326         ret = uth_mutex_trylock(&r_mtx->mtx);
327         if (ret) {
328                 r_mtx->lockholder = current_uthread;
329                 r_mtx->count = 1;
330         }
331         return ret;
332 }
333
334 void uth_recurse_mutex_unlock(uth_recurse_mutex_t *r_mtx)
335 {
336         r_mtx->count--;
337         if (!r_mtx->count) {
338                 r_mtx->lockholder = NULL;
339                 uth_mutex_unlock(&r_mtx->mtx);
340         }
341 }
342
343
344 /************** Condition Variables **************/
345
346
347 static void __uth_cond_var_init(void *arg)
348 {
349         struct uth_cond_var *cv = (struct uth_cond_var*)arg;
350
351         spin_pdr_init(&cv->lock);
352         __uth_sync_init(&cv->sync_obj);
353 }
354
355 void uth_cond_var_init(uth_cond_var_t *cv)
356 {
357         __uth_cond_var_init(cv);
358         parlib_set_ran_once(&cv->once_ctl);
359 }
360
361 void uth_cond_var_destroy(uth_cond_var_t *cv)
362 {
363         __uth_sync_destroy(&cv->sync_obj);
364 }
365
366 uth_cond_var_t *uth_cond_var_alloc(void)
367 {
368         struct uth_cond_var *cv;
369
370         cv = malloc(sizeof(struct uth_cond_var));
371         assert(cv);
372         uth_cond_var_init(cv);
373         return cv;
374 }
375
376 void uth_cond_var_free(uth_cond_var_t *cv)
377 {
378         uth_cond_var_destroy(cv);
379         free(cv);
380 }
381
382 struct uth_cv_link {
383         struct uth_cond_var                     *cv;
384         struct uth_semaphore            *mtx;
385 };
386
387 static void __cv_wait_cb(struct uthread *uth, void *arg)
388 {
389         struct uth_cv_link *link = (struct uth_cv_link*)arg;
390         struct uth_cond_var *cv = link->cv;
391         struct uth_semaphore *mtx = link->mtx;
392
393         /* We need to tell the 2LS that its thread blocked.  We need to do this
394          * before unlocking the cv, since as soon as we unlock, the cv could be
395          * signalled and our thread restarted.
396          *
397          * Also note the lock-ordering rule.  The cv lock is grabbed before any
398          * locks the 2LS might grab. */
399         uthread_has_blocked(uth, &cv->sync_obj, UTH_EXT_BLK_MUTEX);
400         spin_pdr_unlock(&cv->lock);
401         /* This looks dangerous, since both the CV and MTX could use the
402          * uth->sync_next TAILQ_ENTRY (or whatever the 2LS uses), but the uthread
403          * never sleeps on both at the same time.  We *hold* the mtx - we aren't
404          * *sleeping* on it.  Sleeping uses the sync_next.  Holding it doesn't.
405          *
406          * Next, consider what happens as soon as we unlock the CV.  Our thread
407          * could get woken up, and then immediately try to grab the mtx and go to
408          * sleep! (see below).  If that happens, the uthread is no longer sleeping
409          * on the CV, and the sync_next is free.  The invariant is that a uthread
410          * can only sleep on one sync_object at a time. */
411         uth_mutex_unlock(mtx);
412 }
413
414 /* Caller holds mtx.  We will 'atomically' release it and wait.  On return,
415  * caller holds mtx again.  Once our uth is on the CV's list, we can release the
416  * mtx without fear of missing a signal.
417  *
418  * POSIX refers to atomicity in this context as "atomically with respect to
419  * access by another thread to the mutex and then the condition variable"
420  *
421  * The idea is that we hold the mutex to protect some invariant; we check it,
422  * and decide to sleep.  Now we get on the list before releasing so that any
423  * changes to that invariant (e.g. a flag is now TRUE) happen after we're on the
424  * list, and so that we don't miss the signal.  To be more clear, the invariant
425  * in a basic wake-up flag scenario is: "whenever a flag is set from FALSE to
426  * TRUE, all waiters that saw FALSE are on the CV's waitqueue."  The mutex is
427  * required for this invariant.
428  *
429  * Note that signal/broadcasters do not *need* to hold the mutex, in general,
430  * but they do in the basic wake-up flag scenario.  If not, the race is this:
431  *
432  * Sleeper:                                                             Waker:
433  * -----------------------------------------------------------------
434  * Hold mutex
435  *   See flag is False
436  *   Decide to sleep
437  *                                                                              Set flag True
438  * PAUSE!                                                               Grab CV lock
439  *                                                                              See list is empty, unlock
440  *
441  *   Grab CV lock
442  *     Get put on list
443  *   Unlock CV lock
444  * Unlock mutex
445  * (Never wake up; we missed the signal)
446  *
447  * For those familiar with the kernel's CVs, we don't couple mutexes with CVs.
448  * cv_lock() actually grabs the spinlock inside the CV and uses *that* to
449  * protect the invariant.  The signallers always grab that lock, so the sleeper
450  * is not in danger of missing the signal.  The tradeoff is that the kernel CVs
451  * use a spinlock instead of a mutex for protecting its invariant; there might
452  * be some case that preferred blocking sync.
453  *
454  * The uthread CVs take a mutex, unlike the kernel CVs, to map more cleanly to
455  * POSIX CVs.  Maybe one approach or the other is a bad idea; we'll see.
456  *
457  * As far as lock ordering goes, once the sleeper holds the mutex and is on the
458  * CV's list, it can unlock in any order it wants.  However, unlocking a mutex
459  * actually requires grabbing its spinlock.  So as to not have a lock ordering
460  * between *spinlocks*, we let go of the CV's spinlock before unlocking the
461  * mutex.  There is an ordering between the mutex and the CV spinlock (mutex->cv
462  * spin), but there is no ordering between the mutex spin and cv spin.  And of
463  * course, we need to unlock the CV spinlock in the yield callback.
464  *
465  * Also note that we use the external API for the mutex operations.  A 2LS could
466  * have their own mutex ops but still use the generic cv ops. */
467 bool uth_cond_var_timed_wait(uth_cond_var_t *cv, uth_mutex_t *mtx,
468                              const struct timespec *abs_timeout)
469 {
470         struct uth_cv_link link;
471         struct alarm_waiter waiter[1];
472         struct timeout_blob blob[1];
473         bool ret = TRUE;
474
475         assert_can_block();
476         parlib_run_once(&cv->once_ctl, __uth_cond_var_init, cv);
477         link.cv = cv;
478         link.mtx = mtx;
479         spin_pdr_lock(&cv->lock);
480         if (abs_timeout) {
481                 set_timeout_blob(blob, &cv->sync_obj, &cv->lock);
482                 set_timeout_alarm(waiter, blob, abs_timeout);
483         }
484         uthread_yield(TRUE, __cv_wait_cb, &link);
485         if (abs_timeout) {
486                 unset_alarm(waiter);
487                 ret = blob->timed_out ? FALSE : TRUE;
488         }
489         uth_mutex_lock(mtx);
490         return ret;
491 }
492
493 void uth_cond_var_wait(uth_cond_var_t *cv, uth_mutex_t *mtx)
494 {
495         uth_cond_var_timed_wait(cv, mtx, NULL);
496 }
497
498 /* GCC doesn't list this as one of the C++0x functions, but it's easy to do and
499  * implement uth_cond_var_wait_recurse() with it, just like for all the other
500  * 'timed' functions.
501  *
502  * Note the timeout applies to getting the signal on the CV, not on reacquiring
503  * the mutex. */
504 bool uth_cond_var_timed_wait_recurse(uth_cond_var_t *cv,
505                                      uth_recurse_mutex_t *r_mtx,
506                                      const struct timespec *abs_timeout)
507 {
508         unsigned int old_count = r_mtx->count;
509         bool ret;
510
511         /* In cond_wait, we're going to unlock the internal mutex.  We'll do the
512          * prep-work for that now.  (invariant is that an unlocked r_mtx has no
513          * lockholder and count == 0. */
514         r_mtx->lockholder = NULL;
515         r_mtx->count = 0;
516         ret = uth_cond_var_timed_wait(cv, &r_mtx->mtx, abs_timeout);
517         /* Now we hold the internal mutex again.  Need to restore the tracking. */
518         r_mtx->lockholder = current_uthread;
519         r_mtx->count = old_count;
520         return ret;
521 }
522
523 /* GCC wants this function, though its semantics are a little unclear.  I
524  * imagine you'd want to completely unlock it (say you locked it 3 times), and
525  * when you get it back, that you have your three locks back. */
526 void uth_cond_var_wait_recurse(uth_cond_var_t *cv, uth_recurse_mutex_t *r_mtx)
527 {
528         uth_cond_var_timed_wait_recurse(cv, r_mtx, NULL);
529 }
530
531 void uth_cond_var_signal(uth_cond_var_t *cv)
532 {
533         struct uthread *uth;
534
535         parlib_run_once(&cv->once_ctl, __uth_cond_var_init, cv);
536         spin_pdr_lock(&cv->lock);
537         uth = __uth_sync_get_next(&cv->sync_obj);
538         spin_pdr_unlock(&cv->lock);
539         if (uth)
540                 uthread_runnable(uth);
541 }
542
543 void uth_cond_var_broadcast(uth_cond_var_t *cv)
544 {
545         struct uth_tailq restartees = TAILQ_HEAD_INITIALIZER(restartees);
546         struct uthread *i, *safe;
547
548         parlib_run_once(&cv->once_ctl, __uth_cond_var_init, cv);
549         spin_pdr_lock(&cv->lock);
550         /* If this turns out to be slow or painful for 2LSs, we can implement a
551          * get_all or something (default used to use TAILQ_SWAP). */
552         while ((i = __uth_sync_get_next(&cv->sync_obj))) {
553                 /* Once the uth is out of the sync obj, we can reuse sync_next. */
554                 TAILQ_INSERT_TAIL(&restartees, i, sync_next);
555         }
556         spin_pdr_unlock(&cv->lock);
557         /* Need the SAFE, since we can't touch the linkage once the uth could run */
558         TAILQ_FOREACH_SAFE(i, &restartees, sync_next, safe)
559                 uthread_runnable(i);
560 }
561
562
563 /************** Reader-writer Sleeping Locks **************/
564
565
566 static void __uth_rwlock_init(void *arg)
567 {
568         struct uth_rwlock *rwl = (struct uth_rwlock*)arg;
569
570         spin_pdr_init(&rwl->lock);
571         rwl->nr_readers = 0;
572         rwl->has_writer = FALSE;
573         __uth_sync_init(&rwl->readers);
574         __uth_sync_init(&rwl->writers);
575 }
576
577 void uth_rwlock_init(uth_rwlock_t *rwl)
578 {
579         __uth_rwlock_init(rwl);
580         parlib_set_ran_once(&rwl->once_ctl);
581 }
582
583 void uth_rwlock_destroy(uth_rwlock_t *rwl)
584 {
585         __uth_sync_destroy(&rwl->readers);
586         __uth_sync_destroy(&rwl->writers);
587 }
588
589 uth_rwlock_t *uth_rwlock_alloc(void)
590 {
591         struct uth_rwlock *rwl;
592
593         rwl = malloc(sizeof(struct uth_rwlock));
594         assert(rwl);
595         uth_rwlock_init(rwl);
596         return rwl;
597 }
598
599 void uth_rwlock_free(uth_rwlock_t *rwl)
600 {
601         uth_rwlock_destroy(rwl);
602         free(rwl);
603 }
604
605 /* Readers and writers block until they have the lock.  The delicacies are dealt
606  * with by the unlocker. */
607 static void __rwlock_rd_cb(struct uthread *uth, void *arg)
608 {
609         struct uth_rwlock *rwl = (struct uth_rwlock*)arg;
610
611         uthread_has_blocked(uth, &rwl->readers, UTH_EXT_BLK_MUTEX);
612         spin_pdr_unlock(&rwl->lock);
613 }
614
615 void uth_rwlock_rdlock(uth_rwlock_t *rwl)
616 {
617         assert_can_block();
618         parlib_run_once(&rwl->once_ctl, __uth_rwlock_init, rwl);
619         spin_pdr_lock(&rwl->lock);
620         /* Readers always make progress when there is no writer */
621         if (!rwl->has_writer) {
622                 rwl->nr_readers++;
623                 spin_pdr_unlock(&rwl->lock);
624                 return;
625         }
626         uthread_yield(TRUE, __rwlock_rd_cb, rwl);
627 }
628
629 bool uth_rwlock_try_rdlock(uth_rwlock_t *rwl)
630 {
631         bool ret = FALSE;
632
633         assert_can_block();
634         parlib_run_once(&rwl->once_ctl, __uth_rwlock_init, rwl);
635         spin_pdr_lock(&rwl->lock);
636         if (!rwl->has_writer) {
637                 rwl->nr_readers++;
638                 ret = TRUE;
639         }
640         spin_pdr_unlock(&rwl->lock);
641         return ret;
642 }
643
644 static void __rwlock_wr_cb(struct uthread *uth, void *arg)
645 {
646         struct uth_rwlock *rwl = (struct uth_rwlock*)arg;
647
648         uthread_has_blocked(uth, &rwl->writers, UTH_EXT_BLK_MUTEX);
649         spin_pdr_unlock(&rwl->lock);
650 }
651
652 void uth_rwlock_wrlock(uth_rwlock_t *rwl)
653 {
654         assert_can_block();
655         parlib_run_once(&rwl->once_ctl, __uth_rwlock_init, rwl);
656         spin_pdr_lock(&rwl->lock);
657         /* Writers require total mutual exclusion - no writers or readers */
658         if (!rwl->has_writer && !rwl->nr_readers) {
659                 rwl->has_writer = TRUE;
660                 spin_pdr_unlock(&rwl->lock);
661                 return;
662         }
663         uthread_yield(TRUE, __rwlock_wr_cb, rwl);
664 }
665
666 bool uth_rwlock_try_wrlock(uth_rwlock_t *rwl)
667 {
668         bool ret = FALSE;
669
670         assert_can_block();
671         parlib_run_once(&rwl->once_ctl, __uth_rwlock_init, rwl);
672         spin_pdr_lock(&rwl->lock);
673         if (!rwl->has_writer && !rwl->nr_readers) {
674                 rwl->has_writer = TRUE;
675                 ret = TRUE;
676         }
677         spin_pdr_unlock(&rwl->lock);
678         return ret;
679 }
680
681 /* Let's try to wake writers (yes, this is a policy decision), and if none, wake
682  * all the readers.  The invariant there is that if there is no writer, then
683  * there are no waiting readers. */
684 static void __rw_unlock_writer(struct uth_rwlock *rwl,
685                                struct uth_tailq *restartees)
686 {
687         struct uthread *uth;
688
689         uth = __uth_sync_get_next(&rwl->writers);
690         if (uth) {
691                 TAILQ_INSERT_TAIL(restartees, uth, sync_next);
692         } else {
693                 rwl->has_writer = FALSE;
694                 while ((uth = __uth_sync_get_next(&rwl->readers))) {
695                         TAILQ_INSERT_TAIL(restartees, uth, sync_next);
696                         rwl->nr_readers++;
697                 }
698         }
699 }
700
701 static void __rw_unlock_reader(struct uth_rwlock *rwl,
702                                struct uth_tailq *restartees)
703 {
704         struct uthread *uth;
705
706         rwl->nr_readers--;
707         if (!rwl->nr_readers) {
708                 uth = __uth_sync_get_next(&rwl->writers);
709                 if (uth) {
710                         TAILQ_INSERT_TAIL(restartees, uth, sync_next);
711                         rwl->has_writer = TRUE;
712                 }
713         }
714 }
715
716 /* Unlock works for either readers or writer locks.  You can tell which you were
717  * based on whether has_writer is set or not. */
718 void uth_rwlock_unlock(uth_rwlock_t *rwl)
719 {
720         struct uth_tailq restartees = TAILQ_HEAD_INITIALIZER(restartees);
721         struct uthread *i, *safe;
722
723         spin_pdr_lock(&rwl->lock);
724         if (rwl->has_writer)
725                 __rw_unlock_writer(rwl, &restartees);
726         else
727                 __rw_unlock_reader(rwl, &restartees);
728         spin_pdr_unlock(&rwl->lock);
729         TAILQ_FOREACH_SAFE(i, &restartees, sync_next, safe)
730                 uthread_runnable(i);
731 }
732
733
734 /************** Default Sync Obj Implementation **************/
735
736 static void uth_default_sync_init(uth_sync_t *sync)
737 {
738         struct uth_tailq *tq = (struct uth_tailq*)sync;
739
740         parlib_static_assert(sizeof(struct uth_tailq) <= sizeof(uth_sync_t));
741         TAILQ_INIT(tq);
742 }
743
744 static void uth_default_sync_destroy(uth_sync_t *sync)
745 {
746         struct uth_tailq *tq = (struct uth_tailq*)sync;
747
748         assert(TAILQ_EMPTY(tq));
749 }
750
751 static struct uthread *uth_default_sync_get_next(uth_sync_t *sync)
752 {
753         struct uth_tailq *tq = (struct uth_tailq*)sync;
754         struct uthread *first;
755
756         first = TAILQ_FIRST(tq);
757         if (first)
758                 TAILQ_REMOVE(tq, first, sync_next);
759         return first;
760 }
761
762 static bool uth_default_sync_get_uth(uth_sync_t *sync, struct uthread *uth)
763 {
764         struct uth_tailq *tq = (struct uth_tailq*)sync;
765         struct uthread *i;
766
767         TAILQ_FOREACH(i, tq, sync_next) {
768                 if (i == uth) {
769                         TAILQ_REMOVE(tq, i, sync_next);
770                         return TRUE;
771                 }
772         }
773         return FALSE;
774 }
775
776 /************** External uthread sync interface **************/
777
778 /* Called by the 2LS->has_blocked op, if they are using the default sync.*/
779 void __uth_default_sync_enqueue(struct uthread *uth, uth_sync_t *sync)
780 {
781         struct uth_tailq *tq = (struct uth_tailq*)sync;
782
783         TAILQ_INSERT_TAIL(tq, uth, sync_next);
784 }
785
786 /* Called by 2LS-independent sync code when a sync object needs initialized. */
787 void __uth_sync_init(uth_sync_t *sync)
788 {
789         if (sched_ops->sync_init) {
790                 sched_ops->sync_init(sync);
791                 return;
792         }
793         uth_default_sync_init(sync);
794 }
795
796 /* Called by 2LS-independent sync code when a sync object is destroyed. */
797 void __uth_sync_destroy(uth_sync_t *sync)
798 {
799         if (sched_ops->sync_destroy) {
800                 sched_ops->sync_destroy(sync);
801                 return;
802         }
803         uth_default_sync_destroy(sync);
804 }
805
806 /* Called by 2LS-independent sync code when a thread needs to be woken. */
807 struct uthread *__uth_sync_get_next(uth_sync_t *sync)
808 {
809         if (sched_ops->sync_get_next)
810                 return sched_ops->sync_get_next(sync);
811         return uth_default_sync_get_next(sync);
812 }
813
814 /* Called by 2LS-independent sync code when a specific thread needs to be woken.
815  * Returns TRUE if the uthread was blocked on the object, FALSE o/w. */
816 bool __uth_sync_get_uth(uth_sync_t *sync, struct uthread *uth)
817 {
818         if (sched_ops->sync_get_uth)
819                 return sched_ops->sync_get_uth(sync, uth);
820         return uth_default_sync_get_uth(sync, uth);
821 }