parlib: Tease out uth_sync_t from has_blocked()
[akaros.git] / user / parlib / mutex.c
1 /* Copyright (c) 2016-2017 Google, Inc.
2  * Barret Rhoden <brho@cs.berkeley.edu>
3  * See LICENSE for details. */
4
5 /* Generic Uthread Semaphores, Mutexes, CVs, and other synchronization
6  * functions.  2LSs implement their own sync objects (bottom of the file). */
7
8 #include <parlib/uthread.h>
9 #include <sys/queue.h>
10 #include <parlib/spinlock.h>
11 #include <parlib/alarm.h>
12 #include <parlib/assert.h>
13 #include <malloc.h>
14
15 struct timeout_blob {
16         bool                                            timed_out;
17         struct uthread                          *uth;
18         uth_sync_t                                      *sync_ptr;
19         struct spin_pdr_lock            *lock_ptr;
20 };
21
22 /* When sync primitives want to time out, they can use this alarm handler.  It
23  * needs a timeout_blob, which is independent of any particular sync method. */
24 static void timeout_handler(struct alarm_waiter *waiter)
25 {
26         struct timeout_blob *blob = (struct timeout_blob*)waiter->data;
27
28         spin_pdr_lock(blob->lock_ptr);
29         if (__uth_sync_get_uth(blob->sync_ptr, blob->uth))
30                 blob->timed_out = TRUE;
31         spin_pdr_unlock(blob->lock_ptr);
32         if (blob->timed_out)
33                 uthread_runnable(blob->uth);
34 }
35
36 /* Minor helper, sets a blob's fields */
37 static void set_timeout_blob(struct timeout_blob *blob, uth_sync_t *sync_ptr,
38                              struct spin_pdr_lock *lock_ptr)
39 {
40         blob->timed_out = FALSE;
41         blob->uth = current_uthread;
42         blob->sync_ptr = sync_ptr;
43         blob->lock_ptr = lock_ptr;
44 }
45
46 /* Minor helper, sets an alarm for blob and a timespec */
47 static void set_timeout_alarm(struct alarm_waiter *waiter,
48                               struct timeout_blob *blob,
49                               const struct timespec *abs_timeout)
50 {
51         init_awaiter(waiter, timeout_handler);
52         waiter->data = blob;
53         set_awaiter_abs_unix(waiter, timespec_to_alarm_time(abs_timeout));
54         set_alarm(waiter);
55 }
56
57 /************** Semaphores and Mutexes **************/
58
59 static void __uth_semaphore_init(void *arg)
60 {
61         struct uth_semaphore *sem = (struct uth_semaphore*)arg;
62
63         spin_pdr_init(&sem->lock);
64         __uth_sync_init(&sem->sync_obj);
65         /* If we used a static initializer for a semaphore, count is already set.
66          * o/w it will be set by _alloc() or _init() (via uth_semaphore_init()). */
67 }
68
69 /* Initializes a sem acquired from somewhere else.  POSIX's sem_init() needs
70  * this. */
71 void uth_semaphore_init(uth_semaphore_t *sem, unsigned int count)
72 {
73         __uth_semaphore_init(sem);
74         sem->count = count;
75         /* The once is to make sure the object is initialized. */
76         parlib_set_ran_once(&sem->once_ctl);
77 }
78
79 /* Undoes whatever was done in init. */
80 void uth_semaphore_destroy(uth_semaphore_t *sem)
81 {
82         __uth_sync_destroy(&sem->sync_obj);
83 }
84
85 uth_semaphore_t *uth_semaphore_alloc(unsigned int count)
86 {
87         struct uth_semaphore *sem;
88
89         sem = malloc(sizeof(struct uth_semaphore));
90         assert(sem);
91         uth_semaphore_init(sem, count);
92         return sem;
93 }
94
95 void uth_semaphore_free(uth_semaphore_t *sem)
96 {
97         uth_semaphore_destroy(sem);
98         free(sem);
99 }
100
101 static void __semaphore_cb(struct uthread *uth, void *arg)
102 {
103         struct uth_semaphore *sem = (struct uth_semaphore*)arg;
104
105         /* We need to tell the 2LS that its thread blocked.  We need to do this
106          * before unlocking the sem, since as soon as we unlock, the sem could be
107          * released and our thread restarted.
108          *
109          * Also note the lock-ordering rule.  The sem lock is grabbed before any
110          * locks the 2LS might grab. */
111         uthread_has_blocked(uth, UTH_EXT_BLK_MUTEX);
112         __uth_sync_enqueue(uth, &sem->sync_obj);
113         spin_pdr_unlock(&sem->lock);
114 }
115
116 bool uth_semaphore_timed_down(uth_semaphore_t *sem,
117                               const struct timespec *abs_timeout)
118 {
119         struct alarm_waiter waiter[1];
120         struct timeout_blob blob[1];
121
122         assert_can_block();
123         parlib_run_once(&sem->once_ctl, __uth_semaphore_init, sem);
124         spin_pdr_lock(&sem->lock);
125         if (sem->count > 0) {
126                 /* Only down if we got one.  This means a sem with no more counts is 0,
127                  * not negative (where -count == nr_waiters).  Doing it this way means
128                  * our timeout function works for sems and CVs. */
129                 sem->count--;
130                 spin_pdr_unlock(&sem->lock);
131                 return TRUE;
132         }
133         if (abs_timeout) {
134                 set_timeout_blob(blob, &sem->sync_obj, &sem->lock);
135                 set_timeout_alarm(waiter, blob, abs_timeout);
136         }
137         /* the unlock and sync enqueuing is done in the yield callback.  as always,
138          * we need to do this part in vcore context, since as soon as we unlock the
139          * uthread could restart.  (atomically yield and unlock). */
140         uthread_yield(TRUE, __semaphore_cb, sem);
141         if (abs_timeout) {
142                 /* We're guaranteed the alarm will either be cancelled or the handler
143                  * complete when unset_alarm() returns. */
144                 unset_alarm(waiter);
145                 return blob->timed_out ? FALSE : TRUE;
146         }
147         return TRUE;
148 }
149
150 void uth_semaphore_down(uth_semaphore_t *sem)
151 {
152         uth_semaphore_timed_down(sem, NULL);
153 }
154
155 bool uth_semaphore_trydown(uth_semaphore_t *sem)
156 {
157         bool ret = FALSE;
158
159         assert_can_block();
160         parlib_run_once(&sem->once_ctl, __uth_semaphore_init, sem);
161         spin_pdr_lock(&sem->lock);
162         if (sem->count > 0) {
163                 sem->count--;
164                 ret = TRUE;
165         }
166         spin_pdr_unlock(&sem->lock);
167         return ret;
168 }
169
170 void uth_semaphore_up(uth_semaphore_t *sem)
171 {
172         struct uthread *uth;
173
174         /* once-ing the 'up', unlike mtxs 'unlock', since sems can be special. */
175         parlib_run_once(&sem->once_ctl, __uth_semaphore_init, sem);
176         spin_pdr_lock(&sem->lock);
177         uth = __uth_sync_get_next(&sem->sync_obj);
178         /* If there was a waiter, we pass our resource/count to them. */
179         if (!uth)
180                 sem->count++;
181         spin_pdr_unlock(&sem->lock);
182         if (uth)
183                 uthread_runnable(uth);
184 }
185
186 /* Takes a void * since it's called by parlib_run_once(), which enables us to
187  * statically initialize the mutex.  This init does everything not done by the
188  * static initializer.  Note we do not allow 'static' destruction.  (No one
189  * calls free). */
190 static void __uth_mutex_init(void *arg)
191 {
192         struct uth_semaphore *mtx = (struct uth_semaphore*)arg;
193
194         __uth_semaphore_init(mtx);
195         mtx->count = 1;
196 }
197
198 void uth_mutex_init(uth_mutex_t *mtx)
199 {
200         __uth_mutex_init(mtx);
201         parlib_set_ran_once(&mtx->once_ctl);
202 }
203
204 void uth_mutex_destroy(uth_mutex_t *mtx)
205 {
206         uth_semaphore_destroy(mtx);
207 }
208
209 uth_mutex_t *uth_mutex_alloc(void)
210 {
211         struct uth_semaphore *mtx;
212
213         mtx = malloc(sizeof(struct uth_semaphore));
214         assert(mtx);
215         uth_mutex_init(mtx);
216         return mtx;
217 }
218
219 void uth_mutex_free(uth_mutex_t *mtx)
220 {
221         uth_semaphore_free(mtx);
222 }
223
224 bool uth_mutex_timed_lock(uth_mutex_t *mtx, const struct timespec *abs_timeout)
225 {
226         parlib_run_once(&mtx->once_ctl, __uth_mutex_init, mtx);
227         return uth_semaphore_timed_down(mtx, abs_timeout);
228 }
229
230 void uth_mutex_lock(uth_mutex_t *mtx)
231 {
232         parlib_run_once(&mtx->once_ctl, __uth_mutex_init, mtx);
233         uth_semaphore_down(mtx);
234 }
235
236 bool uth_mutex_trylock(uth_mutex_t *mtx)
237 {
238         parlib_run_once(&mtx->once_ctl, __uth_mutex_init, mtx);
239         return uth_semaphore_trydown(mtx);
240 }
241
242 void uth_mutex_unlock(uth_mutex_t *mtx)
243 {
244         uth_semaphore_up(mtx);
245 }
246
247 /************** Recursive mutexes **************/
248
249 static void __uth_recurse_mutex_init(void *arg)
250 {
251         struct uth_recurse_mutex *r_mtx = (struct uth_recurse_mutex*)arg;
252
253         __uth_mutex_init(&r_mtx->mtx);
254         /* Since we always manually call __uth_mutex_init(), there's no reason to
255          * mess with the regular mutex's static initializer.  Just say it's been
256          * done. */
257         parlib_set_ran_once(&r_mtx->mtx.once_ctl);
258         r_mtx->lockholder = NULL;
259         r_mtx->count = 0;
260 }
261
262 void uth_recurse_mutex_init(uth_recurse_mutex_t *r_mtx)
263 {
264         __uth_recurse_mutex_init(r_mtx);
265         parlib_set_ran_once(&r_mtx->once_ctl);
266 }
267
268 void uth_recurse_mutex_destroy(uth_recurse_mutex_t *r_mtx)
269 {
270         uth_semaphore_destroy(&r_mtx->mtx);
271 }
272
273 uth_recurse_mutex_t *uth_recurse_mutex_alloc(void)
274 {
275         struct uth_recurse_mutex *r_mtx = malloc(sizeof(struct uth_recurse_mutex));
276
277         assert(r_mtx);
278         uth_recurse_mutex_init(r_mtx);
279         return r_mtx;
280 }
281
282 void uth_recurse_mutex_free(uth_recurse_mutex_t *r_mtx)
283 {
284         uth_recurse_mutex_destroy(r_mtx);
285         free(r_mtx);
286 }
287
288 bool uth_recurse_mutex_timed_lock(uth_recurse_mutex_t *r_mtx,
289                                   const struct timespec *abs_timeout)
290 {
291         assert_can_block();
292         parlib_run_once(&r_mtx->once_ctl, __uth_recurse_mutex_init, r_mtx);
293         /* We don't have to worry about races on current_uthread or count.  They are
294          * only written by the initial lockholder, and this check will only be true
295          * for the initial lockholder, which cannot concurrently call this function
296          * twice (a thread is single-threaded).
297          *
298          * A signal handler running for a thread should not attempt to grab a
299          * recursive mutex (that's probably a bug).  If we need to support that,
300          * we'll have to disable notifs temporarily. */
301         if (r_mtx->lockholder == current_uthread) {
302                 r_mtx->count++;
303                 return TRUE;
304         }
305         if (!uth_mutex_timed_lock(&r_mtx->mtx, abs_timeout))
306                 return FALSE;
307         r_mtx->lockholder = current_uthread;
308         r_mtx->count = 1;
309         return TRUE;
310 }
311
312 void uth_recurse_mutex_lock(uth_recurse_mutex_t *r_mtx)
313 {
314         uth_recurse_mutex_timed_lock(r_mtx, NULL);
315 }
316
317 bool uth_recurse_mutex_trylock(uth_recurse_mutex_t *r_mtx)
318 {
319         bool ret;
320
321         assert_can_block();
322         parlib_run_once(&r_mtx->once_ctl, __uth_recurse_mutex_init, r_mtx);
323         if (r_mtx->lockholder == current_uthread) {
324                 r_mtx->count++;
325                 return TRUE;
326         }
327         ret = uth_mutex_trylock(&r_mtx->mtx);
328         if (ret) {
329                 r_mtx->lockholder = current_uthread;
330                 r_mtx->count = 1;
331         }
332         return ret;
333 }
334
335 void uth_recurse_mutex_unlock(uth_recurse_mutex_t *r_mtx)
336 {
337         r_mtx->count--;
338         if (!r_mtx->count) {
339                 r_mtx->lockholder = NULL;
340                 uth_mutex_unlock(&r_mtx->mtx);
341         }
342 }
343
344
345 /************** Condition Variables **************/
346
347
348 static void __uth_cond_var_init(void *arg)
349 {
350         struct uth_cond_var *cv = (struct uth_cond_var*)arg;
351
352         spin_pdr_init(&cv->lock);
353         __uth_sync_init(&cv->sync_obj);
354 }
355
356 void uth_cond_var_init(uth_cond_var_t *cv)
357 {
358         __uth_cond_var_init(cv);
359         parlib_set_ran_once(&cv->once_ctl);
360 }
361
362 void uth_cond_var_destroy(uth_cond_var_t *cv)
363 {
364         __uth_sync_destroy(&cv->sync_obj);
365 }
366
367 uth_cond_var_t *uth_cond_var_alloc(void)
368 {
369         struct uth_cond_var *cv;
370
371         cv = malloc(sizeof(struct uth_cond_var));
372         assert(cv);
373         uth_cond_var_init(cv);
374         return cv;
375 }
376
377 void uth_cond_var_free(uth_cond_var_t *cv)
378 {
379         uth_cond_var_destroy(cv);
380         free(cv);
381 }
382
383 struct uth_cv_link {
384         struct uth_cond_var                     *cv;
385         struct uth_semaphore            *mtx;
386 };
387
388 static void __cv_wait_cb(struct uthread *uth, void *arg)
389 {
390         struct uth_cv_link *link = (struct uth_cv_link*)arg;
391         struct uth_cond_var *cv = link->cv;
392         struct uth_semaphore *mtx = link->mtx;
393
394         /* We need to tell the 2LS that its thread blocked.  We need to do this
395          * before unlocking the cv, since as soon as we unlock, the cv could be
396          * signalled and our thread restarted.
397          *
398          * Also note the lock-ordering rule.  The cv lock is grabbed before any
399          * locks the 2LS might grab. */
400         uthread_has_blocked(uth, UTH_EXT_BLK_MUTEX);
401         __uth_sync_enqueue(uth, &cv->sync_obj);
402         spin_pdr_unlock(&cv->lock);
403         /* This looks dangerous, since both the CV and MTX could use the
404          * uth->sync_next TAILQ_ENTRY (or whatever the 2LS uses), but the uthread
405          * never sleeps on both at the same time.  We *hold* the mtx - we aren't
406          * *sleeping* on it.  Sleeping uses the sync_next.  Holding it doesn't.
407          *
408          * Next, consider what happens as soon as we unlock the CV.  Our thread
409          * could get woken up, and then immediately try to grab the mtx and go to
410          * sleep! (see below).  If that happens, the uthread is no longer sleeping
411          * on the CV, and the sync_next is free.  The invariant is that a uthread
412          * can only sleep on one sync_object at a time. */
413         uth_mutex_unlock(mtx);
414 }
415
416 /* Caller holds mtx.  We will 'atomically' release it and wait.  On return,
417  * caller holds mtx again.  Once our uth is on the CV's list, we can release the
418  * mtx without fear of missing a signal.
419  *
420  * POSIX refers to atomicity in this context as "atomically with respect to
421  * access by another thread to the mutex and then the condition variable"
422  *
423  * The idea is that we hold the mutex to protect some invariant; we check it,
424  * and decide to sleep.  Now we get on the list before releasing so that any
425  * changes to that invariant (e.g. a flag is now TRUE) happen after we're on the
426  * list, and so that we don't miss the signal.  To be more clear, the invariant
427  * in a basic wake-up flag scenario is: "whenever a flag is set from FALSE to
428  * TRUE, all waiters that saw FALSE are on the CV's waitqueue."  The mutex is
429  * required for this invariant.
430  *
431  * Note that signal/broadcasters do not *need* to hold the mutex, in general,
432  * but they do in the basic wake-up flag scenario.  If not, the race is this:
433  *
434  * Sleeper:                                                             Waker:
435  * -----------------------------------------------------------------
436  * Hold mutex
437  *   See flag is False
438  *   Decide to sleep
439  *                                                                              Set flag True
440  * PAUSE!                                                               Grab CV lock
441  *                                                                              See list is empty, unlock
442  *
443  *   Grab CV lock
444  *     Get put on list
445  *   Unlock CV lock
446  * Unlock mutex
447  * (Never wake up; we missed the signal)
448  *
449  * For those familiar with the kernel's CVs, we don't couple mutexes with CVs.
450  * cv_lock() actually grabs the spinlock inside the CV and uses *that* to
451  * protect the invariant.  The signallers always grab that lock, so the sleeper
452  * is not in danger of missing the signal.  The tradeoff is that the kernel CVs
453  * use a spinlock instead of a mutex for protecting its invariant; there might
454  * be some case that preferred blocking sync.
455  *
456  * The uthread CVs take a mutex, unlike the kernel CVs, to map more cleanly to
457  * POSIX CVs.  Maybe one approach or the other is a bad idea; we'll see.
458  *
459  * As far as lock ordering goes, once the sleeper holds the mutex and is on the
460  * CV's list, it can unlock in any order it wants.  However, unlocking a mutex
461  * actually requires grabbing its spinlock.  So as to not have a lock ordering
462  * between *spinlocks*, we let go of the CV's spinlock before unlocking the
463  * mutex.  There is an ordering between the mutex and the CV spinlock (mutex->cv
464  * spin), but there is no ordering between the mutex spin and cv spin.  And of
465  * course, we need to unlock the CV spinlock in the yield callback.
466  *
467  * Also note that we use the external API for the mutex operations.  A 2LS could
468  * have their own mutex ops but still use the generic cv ops. */
469 bool uth_cond_var_timed_wait(uth_cond_var_t *cv, uth_mutex_t *mtx,
470                              const struct timespec *abs_timeout)
471 {
472         struct uth_cv_link link;
473         struct alarm_waiter waiter[1];
474         struct timeout_blob blob[1];
475         bool ret = TRUE;
476
477         assert_can_block();
478         parlib_run_once(&cv->once_ctl, __uth_cond_var_init, cv);
479         link.cv = cv;
480         link.mtx = mtx;
481         spin_pdr_lock(&cv->lock);
482         if (abs_timeout) {
483                 set_timeout_blob(blob, &cv->sync_obj, &cv->lock);
484                 set_timeout_alarm(waiter, blob, abs_timeout);
485         }
486         uthread_yield(TRUE, __cv_wait_cb, &link);
487         if (abs_timeout) {
488                 unset_alarm(waiter);
489                 ret = blob->timed_out ? FALSE : TRUE;
490         }
491         uth_mutex_lock(mtx);
492         return ret;
493 }
494
495 void uth_cond_var_wait(uth_cond_var_t *cv, uth_mutex_t *mtx)
496 {
497         uth_cond_var_timed_wait(cv, mtx, NULL);
498 }
499
500 /* GCC doesn't list this as one of the C++0x functions, but it's easy to do and
501  * implement uth_cond_var_wait_recurse() with it, just like for all the other
502  * 'timed' functions.
503  *
504  * Note the timeout applies to getting the signal on the CV, not on reacquiring
505  * the mutex. */
506 bool uth_cond_var_timed_wait_recurse(uth_cond_var_t *cv,
507                                      uth_recurse_mutex_t *r_mtx,
508                                      const struct timespec *abs_timeout)
509 {
510         unsigned int old_count = r_mtx->count;
511         bool ret;
512
513         /* In cond_wait, we're going to unlock the internal mutex.  We'll do the
514          * prep-work for that now.  (invariant is that an unlocked r_mtx has no
515          * lockholder and count == 0. */
516         r_mtx->lockholder = NULL;
517         r_mtx->count = 0;
518         ret = uth_cond_var_timed_wait(cv, &r_mtx->mtx, abs_timeout);
519         /* Now we hold the internal mutex again.  Need to restore the tracking. */
520         r_mtx->lockholder = current_uthread;
521         r_mtx->count = old_count;
522         return ret;
523 }
524
525 /* GCC wants this function, though its semantics are a little unclear.  I
526  * imagine you'd want to completely unlock it (say you locked it 3 times), and
527  * when you get it back, that you have your three locks back. */
528 void uth_cond_var_wait_recurse(uth_cond_var_t *cv, uth_recurse_mutex_t *r_mtx)
529 {
530         uth_cond_var_timed_wait_recurse(cv, r_mtx, NULL);
531 }
532
533 void uth_cond_var_signal(uth_cond_var_t *cv)
534 {
535         struct uthread *uth;
536
537         parlib_run_once(&cv->once_ctl, __uth_cond_var_init, cv);
538         spin_pdr_lock(&cv->lock);
539         uth = __uth_sync_get_next(&cv->sync_obj);
540         spin_pdr_unlock(&cv->lock);
541         if (uth)
542                 uthread_runnable(uth);
543 }
544
545 void uth_cond_var_broadcast(uth_cond_var_t *cv)
546 {
547         struct uth_tailq restartees = TAILQ_HEAD_INITIALIZER(restartees);
548         struct uthread *i, *safe;
549
550         parlib_run_once(&cv->once_ctl, __uth_cond_var_init, cv);
551         spin_pdr_lock(&cv->lock);
552         /* If this turns out to be slow or painful for 2LSs, we can implement a
553          * get_all or something (default used to use TAILQ_SWAP). */
554         while ((i = __uth_sync_get_next(&cv->sync_obj))) {
555                 /* Once the uth is out of the sync obj, we can reuse sync_next. */
556                 TAILQ_INSERT_TAIL(&restartees, i, sync_next);
557         }
558         spin_pdr_unlock(&cv->lock);
559         /* Need the SAFE, since we can't touch the linkage once the uth could run */
560         TAILQ_FOREACH_SAFE(i, &restartees, sync_next, safe)
561                 uthread_runnable(i);
562 }
563
564
565 /************** Reader-writer Sleeping Locks **************/
566
567
568 static void __uth_rwlock_init(void *arg)
569 {
570         struct uth_rwlock *rwl = (struct uth_rwlock*)arg;
571
572         spin_pdr_init(&rwl->lock);
573         rwl->nr_readers = 0;
574         rwl->has_writer = FALSE;
575         __uth_sync_init(&rwl->readers);
576         __uth_sync_init(&rwl->writers);
577 }
578
579 void uth_rwlock_init(uth_rwlock_t *rwl)
580 {
581         __uth_rwlock_init(rwl);
582         parlib_set_ran_once(&rwl->once_ctl);
583 }
584
585 void uth_rwlock_destroy(uth_rwlock_t *rwl)
586 {
587         __uth_sync_destroy(&rwl->readers);
588         __uth_sync_destroy(&rwl->writers);
589 }
590
591 uth_rwlock_t *uth_rwlock_alloc(void)
592 {
593         struct uth_rwlock *rwl;
594
595         rwl = malloc(sizeof(struct uth_rwlock));
596         assert(rwl);
597         uth_rwlock_init(rwl);
598         return rwl;
599 }
600
601 void uth_rwlock_free(uth_rwlock_t *rwl)
602 {
603         uth_rwlock_destroy(rwl);
604         free(rwl);
605 }
606
607 /* Readers and writers block until they have the lock.  The delicacies are dealt
608  * with by the unlocker. */
609 static void __rwlock_rd_cb(struct uthread *uth, void *arg)
610 {
611         struct uth_rwlock *rwl = (struct uth_rwlock*)arg;
612
613         uthread_has_blocked(uth, UTH_EXT_BLK_MUTEX);
614         __uth_sync_enqueue(uth, &rwl->readers);
615         spin_pdr_unlock(&rwl->lock);
616 }
617
618 void uth_rwlock_rdlock(uth_rwlock_t *rwl)
619 {
620         assert_can_block();
621         parlib_run_once(&rwl->once_ctl, __uth_rwlock_init, rwl);
622         spin_pdr_lock(&rwl->lock);
623         /* Readers always make progress when there is no writer */
624         if (!rwl->has_writer) {
625                 rwl->nr_readers++;
626                 spin_pdr_unlock(&rwl->lock);
627                 return;
628         }
629         uthread_yield(TRUE, __rwlock_rd_cb, rwl);
630 }
631
632 bool uth_rwlock_try_rdlock(uth_rwlock_t *rwl)
633 {
634         bool ret = FALSE;
635
636         assert_can_block();
637         parlib_run_once(&rwl->once_ctl, __uth_rwlock_init, rwl);
638         spin_pdr_lock(&rwl->lock);
639         if (!rwl->has_writer) {
640                 rwl->nr_readers++;
641                 ret = TRUE;
642         }
643         spin_pdr_unlock(&rwl->lock);
644         return ret;
645 }
646
647 static void __rwlock_wr_cb(struct uthread *uth, void *arg)
648 {
649         struct uth_rwlock *rwl = (struct uth_rwlock*)arg;
650
651         uthread_has_blocked(uth, UTH_EXT_BLK_MUTEX);
652         __uth_sync_enqueue(uth, &rwl->writers);
653         spin_pdr_unlock(&rwl->lock);
654 }
655
656 void uth_rwlock_wrlock(uth_rwlock_t *rwl)
657 {
658         assert_can_block();
659         parlib_run_once(&rwl->once_ctl, __uth_rwlock_init, rwl);
660         spin_pdr_lock(&rwl->lock);
661         /* Writers require total mutual exclusion - no writers or readers */
662         if (!rwl->has_writer && !rwl->nr_readers) {
663                 rwl->has_writer = TRUE;
664                 spin_pdr_unlock(&rwl->lock);
665                 return;
666         }
667         uthread_yield(TRUE, __rwlock_wr_cb, rwl);
668 }
669
670 bool uth_rwlock_try_wrlock(uth_rwlock_t *rwl)
671 {
672         bool ret = FALSE;
673
674         assert_can_block();
675         parlib_run_once(&rwl->once_ctl, __uth_rwlock_init, rwl);
676         spin_pdr_lock(&rwl->lock);
677         if (!rwl->has_writer && !rwl->nr_readers) {
678                 rwl->has_writer = TRUE;
679                 ret = TRUE;
680         }
681         spin_pdr_unlock(&rwl->lock);
682         return ret;
683 }
684
685 /* Let's try to wake writers (yes, this is a policy decision), and if none, wake
686  * all the readers.  The invariant there is that if there is no writer, then
687  * there are no waiting readers. */
688 static void __rw_unlock_writer(struct uth_rwlock *rwl,
689                                struct uth_tailq *restartees)
690 {
691         struct uthread *uth;
692
693         uth = __uth_sync_get_next(&rwl->writers);
694         if (uth) {
695                 TAILQ_INSERT_TAIL(restartees, uth, sync_next);
696         } else {
697                 rwl->has_writer = FALSE;
698                 while ((uth = __uth_sync_get_next(&rwl->readers))) {
699                         TAILQ_INSERT_TAIL(restartees, uth, sync_next);
700                         rwl->nr_readers++;
701                 }
702         }
703 }
704
705 static void __rw_unlock_reader(struct uth_rwlock *rwl,
706                                struct uth_tailq *restartees)
707 {
708         struct uthread *uth;
709
710         rwl->nr_readers--;
711         if (!rwl->nr_readers) {
712                 uth = __uth_sync_get_next(&rwl->writers);
713                 if (uth) {
714                         TAILQ_INSERT_TAIL(restartees, uth, sync_next);
715                         rwl->has_writer = TRUE;
716                 }
717         }
718 }
719
720 /* Unlock works for either readers or writer locks.  You can tell which you were
721  * based on whether has_writer is set or not. */
722 void uth_rwlock_unlock(uth_rwlock_t *rwl)
723 {
724         struct uth_tailq restartees = TAILQ_HEAD_INITIALIZER(restartees);
725         struct uthread *i, *safe;
726
727         spin_pdr_lock(&rwl->lock);
728         if (rwl->has_writer)
729                 __rw_unlock_writer(rwl, &restartees);
730         else
731                 __rw_unlock_reader(rwl, &restartees);
732         spin_pdr_unlock(&rwl->lock);
733         TAILQ_FOREACH_SAFE(i, &restartees, sync_next, safe)
734                 uthread_runnable(i);
735 }
736
737
738 /************** Default Sync Obj Implementation **************/
739
740 static void uth_default_sync_init(uth_sync_t *sync)
741 {
742         struct uth_tailq *tq = (struct uth_tailq*)sync;
743
744         parlib_static_assert(sizeof(struct uth_tailq) <= sizeof(uth_sync_t));
745         TAILQ_INIT(tq);
746 }
747
748 static void uth_default_sync_destroy(uth_sync_t *sync)
749 {
750         struct uth_tailq *tq = (struct uth_tailq*)sync;
751
752         assert(TAILQ_EMPTY(tq));
753 }
754
755 static void uth_default_sync_enqueue(struct uthread *uth, uth_sync_t *sync)
756 {
757         struct uth_tailq *tq = (struct uth_tailq*)sync;
758
759         TAILQ_INSERT_TAIL(tq, uth, sync_next);
760 }
761
762 static struct uthread *uth_default_sync_get_next(uth_sync_t *sync)
763 {
764         struct uth_tailq *tq = (struct uth_tailq*)sync;
765         struct uthread *first;
766
767         first = TAILQ_FIRST(tq);
768         if (first)
769                 TAILQ_REMOVE(tq, first, sync_next);
770         return first;
771 }
772
773 static bool uth_default_sync_get_uth(uth_sync_t *sync, struct uthread *uth)
774 {
775         struct uth_tailq *tq = (struct uth_tailq*)sync;
776         struct uthread *i;
777
778         TAILQ_FOREACH(i, tq, sync_next) {
779                 if (i == uth) {
780                         TAILQ_REMOVE(tq, i, sync_next);
781                         return TRUE;
782                 }
783         }
784         return FALSE;
785 }
786
787 /************** External uthread sync interface **************/
788
789 /* Called by 2LS-independent sync code when a sync object needs initialized. */
790 void __uth_sync_init(uth_sync_t *sync)
791 {
792         if (sched_ops->sync_init) {
793                 sched_ops->sync_init(sync);
794                 return;
795         }
796         uth_default_sync_init(sync);
797 }
798
799 /* Called by 2LS-independent sync code when a sync object is destroyed. */
800 void __uth_sync_destroy(uth_sync_t *sync)
801 {
802         if (sched_ops->sync_destroy) {
803                 sched_ops->sync_destroy(sync);
804                 return;
805         }
806         uth_default_sync_destroy(sync);
807 }
808
809 /* Called by 2LS-independent sync code when a thread blocks on sync */
810 void __uth_sync_enqueue(struct uthread *uth, uth_sync_t *sync)
811 {
812         if (sched_ops->sync_enqueue) {
813                 sched_ops->sync_enqueue(uth, sync);
814                 return;
815         }
816         uth_default_sync_enqueue(uth, sync);
817 }
818
819 /* Called by 2LS-independent sync code when a thread needs to be woken. */
820 struct uthread *__uth_sync_get_next(uth_sync_t *sync)
821 {
822         if (sched_ops->sync_get_next)
823                 return sched_ops->sync_get_next(sync);
824         return uth_default_sync_get_next(sync);
825 }
826
827 /* Called by 2LS-independent sync code when a specific thread needs to be woken.
828  * Returns TRUE if the uthread was blocked on the object, FALSE o/w. */
829 bool __uth_sync_get_uth(uth_sync_t *sync, struct uthread *uth)
830 {
831         if (sched_ops->sync_get_uth)
832                 return sched_ops->sync_get_uth(sync, uth);
833         return uth_default_sync_get_uth(sync, uth);
834 }