314be10535b414e8cc666a167f164ce898dce187
[akaros.git] / user / parlib / mutex.c
1 /* Copyright (c) 2016-2017 Google, Inc.
2  * Barret Rhoden <brho@cs.berkeley.edu>
3  * See LICENSE for details. */
4
5 /* Generic Uthread Semaphores, Mutexes, CVs, and other synchronization
6  * functions.  2LSs implement their own sync objects (bottom of the file). */
7
8 #include <parlib/uthread.h>
9 #include <sys/queue.h>
10 #include <parlib/spinlock.h>
11 #include <parlib/alarm.h>
12 #include <parlib/assert.h>
13 #include <malloc.h>
14
15 struct timeout_blob {
16         bool                                            timed_out;
17         struct uthread                          *uth;
18         uth_sync_t                                      *sync_ptr;
19         struct spin_pdr_lock            *lock_ptr;
20 };
21
22 /* When sync primitives want to time out, they can use this alarm handler.  It
23  * needs a timeout_blob, which is independent of any particular sync method. */
24 static void timeout_handler(struct alarm_waiter *waiter)
25 {
26         struct timeout_blob *blob = (struct timeout_blob*)waiter->data;
27
28         spin_pdr_lock(blob->lock_ptr);
29         if (__uth_sync_get_uth(blob->sync_ptr, blob->uth))
30                 blob->timed_out = TRUE;
31         spin_pdr_unlock(blob->lock_ptr);
32         if (blob->timed_out)
33                 uthread_runnable(blob->uth);
34 }
35
36 /* Minor helper, sets a blob's fields */
37 static void set_timeout_blob(struct timeout_blob *blob, uth_sync_t *sync_ptr,
38                              struct spin_pdr_lock *lock_ptr)
39 {
40         blob->timed_out = FALSE;
41         blob->uth = current_uthread;
42         blob->sync_ptr = sync_ptr;
43         blob->lock_ptr = lock_ptr;
44 }
45
46 /* Minor helper, sets an alarm for blob and a timespec */
47 static void set_timeout_alarm(struct alarm_waiter *waiter,
48                               struct timeout_blob *blob,
49                               const struct timespec *abs_timeout)
50 {
51         init_awaiter(waiter, timeout_handler);
52         waiter->data = blob;
53         set_awaiter_abs_unix(waiter, timespec_to_alarm_time(abs_timeout));
54         set_alarm(waiter);
55 }
56
57 /************** Semaphores and Mutexes **************/
58
59 static void __uth_semaphore_init(void *arg)
60 {
61         struct uth_semaphore *sem = (struct uth_semaphore*)arg;
62
63         spin_pdr_init(&sem->lock);
64         __uth_sync_init(&sem->sync_obj);
65         /* If we used a static initializer for a semaphore, count is already set.
66          * o/w it will be set by _alloc() or _init() (via uth_semaphore_init()). */
67 }
68
69 /* Initializes a sem acquired from somewhere else.  POSIX's sem_init() needs
70  * this. */
71 void uth_semaphore_init(uth_semaphore_t *sem, unsigned int count)
72 {
73         __uth_semaphore_init(sem);
74         sem->count = count;
75         /* The once is to make sure the object is initialized. */
76         parlib_set_ran_once(&sem->once_ctl);
77 }
78
79 /* Undoes whatever was done in init. */
80 void uth_semaphore_destroy(uth_semaphore_t *sem)
81 {
82         __uth_sync_destroy(&sem->sync_obj);
83 }
84
85 uth_semaphore_t *uth_semaphore_alloc(unsigned int count)
86 {
87         struct uth_semaphore *sem;
88
89         sem = malloc(sizeof(struct uth_semaphore));
90         assert(sem);
91         uth_semaphore_init(sem, count);
92         return sem;
93 }
94
95 void uth_semaphore_free(uth_semaphore_t *sem)
96 {
97         uth_semaphore_destroy(sem);
98         free(sem);
99 }
100
101 static void __semaphore_cb(struct uthread *uth, void *arg)
102 {
103         struct uth_semaphore *sem = (struct uth_semaphore*)arg;
104
105         /* We need to tell the 2LS that its thread blocked.  We need to do this
106          * before unlocking the sem, since as soon as we unlock, the sem could be
107          * released and our thread restarted.
108          *
109          * Also note the lock-ordering rule.  The sem lock is grabbed before any
110          * locks the 2LS might grab. */
111         uthread_has_blocked(uth, &sem->sync_obj, UTH_EXT_BLK_MUTEX);
112         spin_pdr_unlock(&sem->lock);
113 }
114
115 bool uth_semaphore_timed_down(uth_semaphore_t *sem,
116                               const struct timespec *abs_timeout)
117 {
118         struct alarm_waiter waiter[1];
119         struct timeout_blob blob[1];
120
121         parlib_run_once(&sem->once_ctl, __uth_semaphore_init, sem);
122         spin_pdr_lock(&sem->lock);
123         if (sem->count > 0) {
124                 /* Only down if we got one.  This means a sem with no more counts is 0,
125                  * not negative (where -count == nr_waiters).  Doing it this way means
126                  * our timeout function works for sems and CVs. */
127                 sem->count--;
128                 spin_pdr_unlock(&sem->lock);
129                 return TRUE;
130         }
131         if (abs_timeout) {
132                 set_timeout_blob(blob, &sem->sync_obj, &sem->lock);
133                 set_timeout_alarm(waiter, blob, abs_timeout);
134         }
135         /* the unlock and sync enqueuing is done in the yield callback.  as always,
136          * we need to do this part in vcore context, since as soon as we unlock the
137          * uthread could restart.  (atomically yield and unlock). */
138         uthread_yield(TRUE, __semaphore_cb, sem);
139         if (abs_timeout) {
140                 /* We're guaranteed the alarm will either be cancelled or the handler
141                  * complete when unset_alarm() returns. */
142                 unset_alarm(waiter);
143                 return blob->timed_out ? FALSE : TRUE;
144         }
145         return TRUE;
146 }
147
148 void uth_semaphore_down(uth_semaphore_t *sem)
149 {
150         uth_semaphore_timed_down(sem, NULL);
151 }
152
153 bool uth_semaphore_trydown(uth_semaphore_t *sem)
154 {
155         bool ret = FALSE;
156
157         parlib_run_once(&sem->once_ctl, __uth_semaphore_init, sem);
158         spin_pdr_lock(&sem->lock);
159         if (sem->count > 0) {
160                 sem->count--;
161                 ret = TRUE;
162         }
163         spin_pdr_unlock(&sem->lock);
164         return ret;
165 }
166
167 void uth_semaphore_up(uth_semaphore_t *sem)
168 {
169         struct uthread *uth;
170
171         /* once-ing the 'up', unlike mtxs 'unlock', since sems can be special. */
172         parlib_run_once(&sem->once_ctl, __uth_semaphore_init, sem);
173         spin_pdr_lock(&sem->lock);
174         uth = __uth_sync_get_next(&sem->sync_obj);
175         /* If there was a waiter, we pass our resource/count to them. */
176         if (!uth)
177                 sem->count++;
178         spin_pdr_unlock(&sem->lock);
179         if (uth)
180                 uthread_runnable(uth);
181 }
182
183 /* Takes a void * since it's called by parlib_run_once(), which enables us to
184  * statically initialize the mutex.  This init does everything not done by the
185  * static initializer.  Note we do not allow 'static' destruction.  (No one
186  * calls free). */
187 static void __uth_mutex_init(void *arg)
188 {
189         struct uth_semaphore *mtx = (struct uth_semaphore*)arg;
190
191         __uth_semaphore_init(mtx);
192         mtx->count = 1;
193 }
194
195 void uth_mutex_init(uth_mutex_t *mtx)
196 {
197         __uth_mutex_init(mtx);
198         parlib_set_ran_once(&mtx->once_ctl);
199 }
200
201 void uth_mutex_destroy(uth_mutex_t *mtx)
202 {
203         uth_semaphore_destroy(mtx);
204 }
205
206 uth_mutex_t *uth_mutex_alloc(void)
207 {
208         struct uth_semaphore *mtx;
209
210         mtx = malloc(sizeof(struct uth_semaphore));
211         assert(mtx);
212         uth_mutex_init(mtx);
213         return mtx;
214 }
215
216 void uth_mutex_free(uth_mutex_t *mtx)
217 {
218         uth_semaphore_free(mtx);
219 }
220
221 bool uth_mutex_timed_lock(uth_mutex_t *mtx, const struct timespec *abs_timeout)
222 {
223         parlib_run_once(&mtx->once_ctl, __uth_mutex_init, mtx);
224         return uth_semaphore_timed_down(mtx, abs_timeout);
225 }
226
227 void uth_mutex_lock(uth_mutex_t *mtx)
228 {
229         parlib_run_once(&mtx->once_ctl, __uth_mutex_init, mtx);
230         uth_semaphore_down(mtx);
231 }
232
233 bool uth_mutex_trylock(uth_mutex_t *mtx)
234 {
235         parlib_run_once(&mtx->once_ctl, __uth_mutex_init, mtx);
236         return uth_semaphore_trydown(mtx);
237 }
238
239 void uth_mutex_unlock(uth_mutex_t *mtx)
240 {
241         uth_semaphore_up(mtx);
242 }
243
244 /************** Recursive mutexes **************/
245
246 static void __uth_recurse_mutex_init(void *arg)
247 {
248         struct uth_recurse_mutex *r_mtx = (struct uth_recurse_mutex*)arg;
249
250         __uth_mutex_init(&r_mtx->mtx);
251         /* Since we always manually call __uth_mutex_init(), there's no reason to
252          * mess with the regular mutex's static initializer.  Just say it's been
253          * done. */
254         parlib_set_ran_once(&r_mtx->mtx.once_ctl);
255         r_mtx->lockholder = NULL;
256         r_mtx->count = 0;
257 }
258
259 void uth_recurse_mutex_init(uth_recurse_mutex_t *r_mtx)
260 {
261         __uth_recurse_mutex_init(r_mtx);
262         parlib_set_ran_once(&r_mtx->once_ctl);
263 }
264
265 void uth_recurse_mutex_destroy(uth_recurse_mutex_t *r_mtx)
266 {
267         uth_semaphore_destroy(&r_mtx->mtx);
268 }
269
270 uth_recurse_mutex_t *uth_recurse_mutex_alloc(void)
271 {
272         struct uth_recurse_mutex *r_mtx = malloc(sizeof(struct uth_recurse_mutex));
273
274         assert(r_mtx);
275         uth_recurse_mutex_init(r_mtx);
276         return r_mtx;
277 }
278
279 void uth_recurse_mutex_free(uth_recurse_mutex_t *r_mtx)
280 {
281         uth_recurse_mutex_destroy(r_mtx);
282         free(r_mtx);
283 }
284
285 bool uth_recurse_mutex_timed_lock(uth_recurse_mutex_t *r_mtx,
286                                   const struct timespec *abs_timeout)
287 {
288         parlib_run_once(&r_mtx->once_ctl, __uth_recurse_mutex_init, r_mtx);
289         assert(!in_vcore_context());
290         assert(current_uthread);
291         /* We don't have to worry about races on current_uthread or count.  They are
292          * only written by the initial lockholder, and this check will only be true
293          * for the initial lockholder, which cannot concurrently call this function
294          * twice (a thread is single-threaded).
295          *
296          * A signal handler running for a thread should not attempt to grab a
297          * recursive mutex (that's probably a bug).  If we need to support that,
298          * we'll have to disable notifs temporarily. */
299         if (r_mtx->lockholder == current_uthread) {
300                 r_mtx->count++;
301                 return TRUE;
302         }
303         if (!uth_mutex_timed_lock(&r_mtx->mtx, abs_timeout))
304                 return FALSE;
305         r_mtx->lockholder = current_uthread;
306         r_mtx->count = 1;
307         return TRUE;
308 }
309
310 void uth_recurse_mutex_lock(uth_recurse_mutex_t *r_mtx)
311 {
312         uth_recurse_mutex_timed_lock(r_mtx, NULL);
313 }
314
315 bool uth_recurse_mutex_trylock(uth_recurse_mutex_t *r_mtx)
316 {
317         bool ret;
318
319         parlib_run_once(&r_mtx->once_ctl, __uth_recurse_mutex_init, r_mtx);
320         assert(!in_vcore_context());
321         assert(current_uthread);
322         if (r_mtx->lockholder == current_uthread) {
323                 r_mtx->count++;
324                 return TRUE;
325         }
326         ret = uth_mutex_trylock(&r_mtx->mtx);
327         if (ret) {
328                 r_mtx->lockholder = current_uthread;
329                 r_mtx->count = 1;
330         }
331         return ret;
332 }
333
334 void uth_recurse_mutex_unlock(uth_recurse_mutex_t *r_mtx)
335 {
336         r_mtx->count--;
337         if (!r_mtx->count) {
338                 r_mtx->lockholder = NULL;
339                 uth_mutex_unlock(&r_mtx->mtx);
340         }
341 }
342
343
344 /************** Condition Variables **************/
345
346
347 static void __uth_cond_var_init(void *arg)
348 {
349         struct uth_cond_var *cv = (struct uth_cond_var*)arg;
350
351         spin_pdr_init(&cv->lock);
352         __uth_sync_init(&cv->sync_obj);
353 }
354
355 void uth_cond_var_init(uth_cond_var_t *cv)
356 {
357         __uth_cond_var_init(cv);
358         parlib_set_ran_once(&cv->once_ctl);
359 }
360
361 void uth_cond_var_destroy(uth_cond_var_t *cv)
362 {
363         __uth_sync_destroy(&cv->sync_obj);
364 }
365
366 uth_cond_var_t *uth_cond_var_alloc(void)
367 {
368         struct uth_cond_var *cv;
369
370         cv = malloc(sizeof(struct uth_cond_var));
371         assert(cv);
372         uth_cond_var_init(cv);
373         return cv;
374 }
375
376 void uth_cond_var_free(uth_cond_var_t *cv)
377 {
378         uth_cond_var_destroy(cv);
379         free(cv);
380 }
381
382 struct uth_cv_link {
383         struct uth_cond_var                     *cv;
384         struct uth_semaphore            *mtx;
385 };
386
387 static void __cv_wait_cb(struct uthread *uth, void *arg)
388 {
389         struct uth_cv_link *link = (struct uth_cv_link*)arg;
390         struct uth_cond_var *cv = link->cv;
391         struct uth_semaphore *mtx = link->mtx;
392
393         /* We need to tell the 2LS that its thread blocked.  We need to do this
394          * before unlocking the cv, since as soon as we unlock, the cv could be
395          * signalled and our thread restarted.
396          *
397          * Also note the lock-ordering rule.  The cv lock is grabbed before any
398          * locks the 2LS might grab. */
399         uthread_has_blocked(uth, &cv->sync_obj, UTH_EXT_BLK_MUTEX);
400         spin_pdr_unlock(&cv->lock);
401         /* This looks dangerous, since both the CV and MTX could use the
402          * uth->sync_next TAILQ_ENTRY (or whatever the 2LS uses), but the uthread
403          * never sleeps on both at the same time.  We *hold* the mtx - we aren't
404          * *sleeping* on it.  Sleeping uses the sync_next.  Holding it doesn't.
405          *
406          * Next, consider what happens as soon as we unlock the CV.  Our thread
407          * could get woken up, and then immediately try to grab the mtx and go to
408          * sleep! (see below).  If that happens, the uthread is no longer sleeping
409          * on the CV, and the sync_next is free.  The invariant is that a uthread
410          * can only sleep on one sync_object at a time. */
411         uth_mutex_unlock(mtx);
412 }
413
414 /* Caller holds mtx.  We will 'atomically' release it and wait.  On return,
415  * caller holds mtx again.  Once our uth is on the CV's list, we can release the
416  * mtx without fear of missing a signal.
417  *
418  * POSIX refers to atomicity in this context as "atomically with respect to
419  * access by another thread to the mutex and then the condition variable"
420  *
421  * The idea is that we hold the mutex to protect some invariant; we check it,
422  * and decide to sleep.  Now we get on the list before releasing so that any
423  * changes to that invariant (e.g. a flag is now TRUE) happen after we're on the
424  * list, and so that we don't miss the signal.  To be more clear, the invariant
425  * in a basic wake-up flag scenario is: "whenever a flag is set from FALSE to
426  * TRUE, all waiters that saw FALSE are on the CV's waitqueue."  The mutex is
427  * required for this invariant.
428  *
429  * Note that signal/broadcasters do not *need* to hold the mutex, in general,
430  * but they do in the basic wake-up flag scenario.  If not, the race is this:
431  *
432  * Sleeper:                                                             Waker:
433  * -----------------------------------------------------------------
434  * Hold mutex
435  *   See flag is False
436  *   Decide to sleep
437  *                                                                              Set flag True
438  * PAUSE!                                                               Grab CV lock
439  *                                                                              See list is empty, unlock
440  *
441  *   Grab CV lock
442  *     Get put on list
443  *   Unlock CV lock
444  * Unlock mutex
445  * (Never wake up; we missed the signal)
446  *
447  * For those familiar with the kernel's CVs, we don't couple mutexes with CVs.
448  * cv_lock() actually grabs the spinlock inside the CV and uses *that* to
449  * protect the invariant.  The signallers always grab that lock, so the sleeper
450  * is not in danger of missing the signal.  The tradeoff is that the kernel CVs
451  * use a spinlock instead of a mutex for protecting its invariant; there might
452  * be some case that preferred blocking sync.
453  *
454  * The uthread CVs take a mutex, unlike the kernel CVs, to map more cleanly to
455  * POSIX CVs.  Maybe one approach or the other is a bad idea; we'll see.
456  *
457  * As far as lock ordering goes, once the sleeper holds the mutex and is on the
458  * CV's list, it can unlock in any order it wants.  However, unlocking a mutex
459  * actually requires grabbing its spinlock.  So as to not have a lock ordering
460  * between *spinlocks*, we let go of the CV's spinlock before unlocking the
461  * mutex.  There is an ordering between the mutex and the CV spinlock (mutex->cv
462  * spin), but there is no ordering between the mutex spin and cv spin.  And of
463  * course, we need to unlock the CV spinlock in the yield callback.
464  *
465  * Also note that we use the external API for the mutex operations.  A 2LS could
466  * have their own mutex ops but still use the generic cv ops. */
467 bool uth_cond_var_timed_wait(uth_cond_var_t *cv, uth_mutex_t *mtx,
468                              const struct timespec *abs_timeout)
469 {
470         struct uth_cv_link link;
471         struct alarm_waiter waiter[1];
472         struct timeout_blob blob[1];
473         bool ret = TRUE;
474
475         parlib_run_once(&cv->once_ctl, __uth_cond_var_init, cv);
476         link.cv = cv;
477         link.mtx = mtx;
478         spin_pdr_lock(&cv->lock);
479         if (abs_timeout) {
480                 set_timeout_blob(blob, &cv->sync_obj, &cv->lock);
481                 set_timeout_alarm(waiter, blob, abs_timeout);
482         }
483         uthread_yield(TRUE, __cv_wait_cb, &link);
484         if (abs_timeout) {
485                 unset_alarm(waiter);
486                 ret = blob->timed_out ? FALSE : TRUE;
487         }
488         uth_mutex_lock(mtx);
489         return ret;
490 }
491
492 void uth_cond_var_wait(uth_cond_var_t *cv, uth_mutex_t *mtx)
493 {
494         uth_cond_var_timed_wait(cv, mtx, NULL);
495 }
496
497 /* GCC doesn't list this as one of the C++0x functions, but it's easy to do and
498  * implement uth_cond_var_wait_recurse() with it, just like for all the other
499  * 'timed' functions.
500  *
501  * Note the timeout applies to getting the signal on the CV, not on reacquiring
502  * the mutex. */
503 bool uth_cond_var_timed_wait_recurse(uth_cond_var_t *cv,
504                                      uth_recurse_mutex_t *r_mtx,
505                                      const struct timespec *abs_timeout)
506 {
507         unsigned int old_count = r_mtx->count;
508         bool ret;
509
510         /* In cond_wait, we're going to unlock the internal mutex.  We'll do the
511          * prep-work for that now.  (invariant is that an unlocked r_mtx has no
512          * lockholder and count == 0. */
513         r_mtx->lockholder = NULL;
514         r_mtx->count = 0;
515         ret = uth_cond_var_timed_wait(cv, &r_mtx->mtx, abs_timeout);
516         /* Now we hold the internal mutex again.  Need to restore the tracking. */
517         r_mtx->lockholder = current_uthread;
518         r_mtx->count = old_count;
519         return ret;
520 }
521
522 /* GCC wants this function, though its semantics are a little unclear.  I
523  * imagine you'd want to completely unlock it (say you locked it 3 times), and
524  * when you get it back, that you have your three locks back. */
525 void uth_cond_var_wait_recurse(uth_cond_var_t *cv, uth_recurse_mutex_t *r_mtx)
526 {
527         uth_cond_var_timed_wait_recurse(cv, r_mtx, NULL);
528 }
529
530 void uth_cond_var_signal(uth_cond_var_t *cv)
531 {
532         struct uthread *uth;
533
534         parlib_run_once(&cv->once_ctl, __uth_cond_var_init, cv);
535         spin_pdr_lock(&cv->lock);
536         uth = __uth_sync_get_next(&cv->sync_obj);
537         spin_pdr_unlock(&cv->lock);
538         if (uth)
539                 uthread_runnable(uth);
540 }
541
542 void uth_cond_var_broadcast(uth_cond_var_t *cv)
543 {
544         struct uth_tailq restartees = TAILQ_HEAD_INITIALIZER(restartees);
545         struct uthread *i, *safe;
546
547         parlib_run_once(&cv->once_ctl, __uth_cond_var_init, cv);
548         spin_pdr_lock(&cv->lock);
549         /* If this turns out to be slow or painful for 2LSs, we can implement a
550          * get_all or something (default used to use TAILQ_SWAP). */
551         while ((i = __uth_sync_get_next(&cv->sync_obj))) {
552                 /* Once the uth is out of the sync obj, we can reuse sync_next. */
553                 TAILQ_INSERT_TAIL(&restartees, i, sync_next);
554         }
555         spin_pdr_unlock(&cv->lock);
556         /* Need the SAFE, since we can't touch the linkage once the uth could run */
557         TAILQ_FOREACH_SAFE(i, &restartees, sync_next, safe)
558                 uthread_runnable(i);
559 }
560
561
562 /************** Reader-writer Sleeping Locks **************/
563
564
565 static void __uth_rwlock_init(void *arg)
566 {
567         struct uth_rwlock *rwl = (struct uth_rwlock*)arg;
568
569         spin_pdr_init(&rwl->lock);
570         rwl->nr_readers = 0;
571         rwl->has_writer = FALSE;
572         __uth_sync_init(&rwl->readers);
573         __uth_sync_init(&rwl->writers);
574 }
575
576 void uth_rwlock_init(uth_rwlock_t *rwl)
577 {
578         __uth_rwlock_init(rwl);
579         parlib_set_ran_once(&rwl->once_ctl);
580 }
581
582 void uth_rwlock_destroy(uth_rwlock_t *rwl)
583 {
584         __uth_sync_destroy(&rwl->readers);
585         __uth_sync_destroy(&rwl->writers);
586 }
587
588 uth_rwlock_t *uth_rwlock_alloc(void)
589 {
590         struct uth_rwlock *rwl;
591
592         rwl = malloc(sizeof(struct uth_rwlock));
593         assert(rwl);
594         uth_rwlock_init(rwl);
595         return rwl;
596 }
597
598 void uth_rwlock_free(uth_rwlock_t *rwl)
599 {
600         uth_rwlock_destroy(rwl);
601         free(rwl);
602 }
603
604 /* Readers and writers block until they have the lock.  The delicacies are dealt
605  * with by the unlocker. */
606 static void __rwlock_rd_cb(struct uthread *uth, void *arg)
607 {
608         struct uth_rwlock *rwl = (struct uth_rwlock*)arg;
609
610         uthread_has_blocked(uth, &rwl->readers, UTH_EXT_BLK_MUTEX);
611         spin_pdr_unlock(&rwl->lock);
612 }
613
614 void uth_rwlock_rdlock(uth_rwlock_t *rwl)
615 {
616         parlib_run_once(&rwl->once_ctl, __uth_rwlock_init, rwl);
617         spin_pdr_lock(&rwl->lock);
618         /* Readers always make progress when there is no writer */
619         if (!rwl->has_writer) {
620                 rwl->nr_readers++;
621                 spin_pdr_unlock(&rwl->lock);
622                 return;
623         }
624         uthread_yield(TRUE, __rwlock_rd_cb, rwl);
625 }
626
627 bool uth_rwlock_try_rdlock(uth_rwlock_t *rwl)
628 {
629         bool ret = FALSE;
630
631         parlib_run_once(&rwl->once_ctl, __uth_rwlock_init, rwl);
632         spin_pdr_lock(&rwl->lock);
633         if (!rwl->has_writer) {
634                 rwl->nr_readers++;
635                 ret = TRUE;
636         }
637         spin_pdr_unlock(&rwl->lock);
638         return ret;
639 }
640
641 static void __rwlock_wr_cb(struct uthread *uth, void *arg)
642 {
643         struct uth_rwlock *rwl = (struct uth_rwlock*)arg;
644
645         uthread_has_blocked(uth, &rwl->writers, UTH_EXT_BLK_MUTEX);
646         spin_pdr_unlock(&rwl->lock);
647 }
648
649 void uth_rwlock_wrlock(uth_rwlock_t *rwl)
650 {
651         parlib_run_once(&rwl->once_ctl, __uth_rwlock_init, rwl);
652         spin_pdr_lock(&rwl->lock);
653         /* Writers require total mutual exclusion - no writers or readers */
654         if (!rwl->has_writer && !rwl->nr_readers) {
655                 rwl->has_writer = TRUE;
656                 spin_pdr_unlock(&rwl->lock);
657                 return;
658         }
659         uthread_yield(TRUE, __rwlock_wr_cb, rwl);
660 }
661
662 bool uth_rwlock_try_wrlock(uth_rwlock_t *rwl)
663 {
664         bool ret = FALSE;
665
666         parlib_run_once(&rwl->once_ctl, __uth_rwlock_init, rwl);
667         spin_pdr_lock(&rwl->lock);
668         if (!rwl->has_writer && !rwl->nr_readers) {
669                 rwl->has_writer = TRUE;
670                 ret = TRUE;
671         }
672         spin_pdr_unlock(&rwl->lock);
673         return ret;
674 }
675
676 /* Let's try to wake writers (yes, this is a policy decision), and if none, wake
677  * all the readers.  The invariant there is that if there is no writer, then
678  * there are no waiting readers. */
679 static void __rw_unlock_writer(struct uth_rwlock *rwl,
680                                struct uth_tailq *restartees)
681 {
682         struct uthread *uth;
683
684         uth = __uth_sync_get_next(&rwl->writers);
685         if (uth) {
686                 TAILQ_INSERT_TAIL(restartees, uth, sync_next);
687         } else {
688                 rwl->has_writer = FALSE;
689                 while ((uth = __uth_sync_get_next(&rwl->readers))) {
690                         TAILQ_INSERT_TAIL(restartees, uth, sync_next);
691                         rwl->nr_readers++;
692                 }
693         }
694 }
695
696 static void __rw_unlock_reader(struct uth_rwlock *rwl,
697                                struct uth_tailq *restartees)
698 {
699         struct uthread *uth;
700
701         rwl->nr_readers--;
702         if (!rwl->nr_readers) {
703                 uth = __uth_sync_get_next(&rwl->writers);
704                 if (uth) {
705                         TAILQ_INSERT_TAIL(restartees, uth, sync_next);
706                         rwl->has_writer = TRUE;
707                 }
708         }
709 }
710
711 /* Unlock works for either readers or writer locks.  You can tell which you were
712  * based on whether has_writer is set or not. */
713 void uth_rwlock_unlock(uth_rwlock_t *rwl)
714 {
715         struct uth_tailq restartees = TAILQ_HEAD_INITIALIZER(restartees);
716         struct uthread *i, *safe;
717
718         spin_pdr_lock(&rwl->lock);
719         if (rwl->has_writer)
720                 __rw_unlock_writer(rwl, &restartees);
721         else
722                 __rw_unlock_reader(rwl, &restartees);
723         spin_pdr_unlock(&rwl->lock);
724         TAILQ_FOREACH_SAFE(i, &restartees, sync_next, safe)
725                 uthread_runnable(i);
726 }
727
728
729 /************** Default Sync Obj Implementation **************/
730
731 static void uth_default_sync_init(uth_sync_t *sync)
732 {
733         struct uth_tailq *tq = (struct uth_tailq*)sync;
734
735         parlib_static_assert(sizeof(struct uth_tailq) <= sizeof(uth_sync_t));
736         TAILQ_INIT(tq);
737 }
738
739 static void uth_default_sync_destroy(uth_sync_t *sync)
740 {
741         struct uth_tailq *tq = (struct uth_tailq*)sync;
742
743         assert(TAILQ_EMPTY(tq));
744 }
745
746 static struct uthread *uth_default_sync_get_next(uth_sync_t *sync)
747 {
748         struct uth_tailq *tq = (struct uth_tailq*)sync;
749         struct uthread *first;
750
751         first = TAILQ_FIRST(tq);
752         if (first)
753                 TAILQ_REMOVE(tq, first, sync_next);
754         return first;
755 }
756
757 static bool uth_default_sync_get_uth(uth_sync_t *sync, struct uthread *uth)
758 {
759         struct uth_tailq *tq = (struct uth_tailq*)sync;
760         struct uthread *i;
761
762         TAILQ_FOREACH(i, tq, sync_next) {
763                 if (i == uth) {
764                         TAILQ_REMOVE(tq, i, sync_next);
765                         return TRUE;
766                 }
767         }
768         return FALSE;
769 }
770
771 /************** External uthread sync interface **************/
772
773 /* Called by the 2LS->has_blocked op, if they are using the default sync.*/
774 void __uth_default_sync_enqueue(struct uthread *uth, uth_sync_t *sync)
775 {
776         struct uth_tailq *tq = (struct uth_tailq*)sync;
777
778         TAILQ_INSERT_TAIL(tq, uth, sync_next);
779 }
780
781 /* Called by 2LS-independent sync code when a sync object needs initialized. */
782 void __uth_sync_init(uth_sync_t *sync)
783 {
784         if (sched_ops->sync_init) {
785                 sched_ops->sync_init(sync);
786                 return;
787         }
788         uth_default_sync_init(sync);
789 }
790
791 /* Called by 2LS-independent sync code when a sync object is destroyed. */
792 void __uth_sync_destroy(uth_sync_t *sync)
793 {
794         if (sched_ops->sync_destroy) {
795                 sched_ops->sync_destroy(sync);
796                 return;
797         }
798         uth_default_sync_destroy(sync);
799 }
800
801 /* Called by 2LS-independent sync code when a thread needs to be woken. */
802 struct uthread *__uth_sync_get_next(uth_sync_t *sync)
803 {
804         if (sched_ops->sync_get_next)
805                 return sched_ops->sync_get_next(sync);
806         return uth_default_sync_get_next(sync);
807 }
808
809 /* Called by 2LS-independent sync code when a specific thread needs to be woken.
810  * Returns TRUE if the uthread was blocked on the object, FALSE o/w. */
811 bool __uth_sync_get_uth(uth_sync_t *sync, struct uthread *uth)
812 {
813         if (sched_ops->sync_get_uth)
814                 return sched_ops->sync_get_uth(sync, uth);
815         return uth_default_sync_get_uth(sync, uth);
816 }