parlib: Add uthread_create()
[akaros.git] / user / parlib / uthread.c
index ff685f1..70ef9b4 100644 (file)
@@ -46,6 +46,8 @@ static void uthread_init_thread0(struct uthread *uthread)
        current_uthread = uthread;
        /* Thread is currently running (it is 'us') */
        uthread->state = UT_RUNNING;
+       /* Thread is detached */
+       atomic_set(&uthread->join_ctl.state, UTH_JOIN_DETACHED);
        /* Reset the signal state */
        uthread->sigstate.mask = 0;
        __sigemptyset(&uthread->sigstate.pending);
@@ -330,6 +332,8 @@ void uthread_init(struct uthread *new_thread, struct uth_thread_attr *attr)
         * were interrupted off a core. */
        new_thread->flags |= UTHREAD_SAVED;
        new_thread->notif_disabled_depth = 0;
+       /* TODO: on a reinit, if they changed whether or not they want TLS, we'll
+        * have issues (checking tls_desc, assert in allocate_tls, maybe more). */
        if (attr && attr->want_tls) {
                /* Get a TLS.  If we already have one, reallocate/refresh it */
                if (new_thread->tls_desc)
@@ -348,6 +352,10 @@ void uthread_init(struct uthread *new_thread, struct uth_thread_attr *attr)
        } else {
                new_thread->tls_desc = UTH_TLSDESC_NOTLS;
        }
+       if (attr && attr->detached)
+               atomic_set(&new_thread->join_ctl.state, UTH_JOIN_DETACHED);
+       else
+               atomic_set(&new_thread->join_ctl.state, UTH_JOIN_JOINABLE);
 }
 
 /* This is a wrapper for the sched_ops thread_runnable, for use by functions
@@ -364,17 +372,22 @@ void uthread_runnable(struct uthread *uthread)
  * the 2LS.  This is for informational purposes, and some semantic meaning
  * should be passed by flags (from uthread.h's UTH_EXT_BLK_xxx options).
  * Eventually, whoever calls this will call uthread_runnable(), giving the
- * thread back to the 2LS.
+ * thread back to the 2LS.  If the 2LS provide sync ops, it will have a say in
+ * which thread wakes up at a given time.
  *
  * If code outside the 2LS has blocked a thread (via uthread_yield) and ran its
  * own callback/yield_func instead of some 2LS code, that callback needs to
  * call this.
  *
+ * If sync is set, then the 2LS must save the uthread in the sync object.  Using
+ * the default implementatin is fine.  If sync is NULL, then there's nothing the
+ * 2LS should do regarding sync; it'll be told when the thread is runnable.
+ *
  * AKA: obviously_a_uthread_has_blocked_in_lincoln_park() */
-void uthread_has_blocked(struct uthread *uthread, int flags)
+void uthread_has_blocked(struct uthread *uthread, uth_sync_t sync, int flags)
 {
        assert(sched_ops->thread_has_blocked);
-       sched_ops->thread_has_blocked(uthread, flags);
+       sched_ops->thread_has_blocked(uthread, sync, flags);
 }
 
 /* Function indicating an external event has temporarily paused a uthread, but
@@ -516,7 +529,7 @@ void uthread_usleep(unsigned int usecs)
 
 static void __sleep_forever_cb(struct uthread *uth, void *arg)
 {
-       uthread_has_blocked(uth, UTH_EXT_BLK_JUSTICE);
+       uthread_has_blocked(uth, NULL, UTH_EXT_BLK_JUSTICE);
 }
 
 void __attribute__((noreturn)) uthread_sleep_forever(void)
@@ -1200,3 +1213,169 @@ bool uth_2ls_is_multithreaded(void)
         * will be multithreaded. */
        return sched_ops != &thread0_2ls_ops;
 }
+
+struct uthread *uthread_create(void *(*func)(void *), void *arg)
+{
+       return sched_ops->thread_create(func, arg);
+}
+
+/* Who does the thread_exited callback (2LS-specific cleanup)?  It depends.  If
+ * the thread exits first, then join/detach does it.  o/w, the exit path does.
+ *
+ * What are the valid state changes?
+ *
+ *             JOINABLE   -> DETACHED (only by detach())
+ *             JOINABLE   -> HAS_JOINER (only by join())
+ *             JOINABLE   -> EXITED (only by uth_2ls_thread_exit())
+ *
+ * That's it.  The initial state is either JOINABLE or DETACHED. */
+void uthread_detach(struct uthread *uth)
+{
+       struct uth_join_ctl *jc = &uth->join_ctl;
+       long old_state;
+
+       do {
+               old_state = atomic_read(&jc->state);
+               switch (old_state) {
+               case UTH_JOIN_EXITED:
+                       sched_ops->thread_exited(uth);
+                       return;
+               case UTH_JOIN_DETACHED:
+                       panic("Uth %p has already been detached!", uth);
+               case UTH_JOIN_HAS_JOINER:
+                       panic("Uth %p has a pending joiner, can't detach!", uth);
+               };
+               assert(old_state == UTH_JOIN_JOINABLE);
+       } while (!atomic_cas(&jc->state, old_state, UTH_JOIN_DETACHED));
+}
+
+/* Helper.  We have a joiner.  So we'll write the retval to the final location
+ * (the one passed to join() and decref to wake the joiner.  This may seem a
+ * little odd for a normal join, but it works identically a parallel join - and
+ * there's only one wakeup (hence the kref). */
+static void uth_post_and_kick_joiner(struct uthread *uth, void *retval)
+{
+       struct uth_join_ctl *jc = &uth->join_ctl;
+
+       if (jc->retval_loc)
+               *jc->retval_loc = retval;
+       /* Note the JC has a pointer to the kicker.  There's one kicker for the
+        * joiner, but there could be many joinees. */
+       kref_put(&jc->kicker->kref);
+}
+
+/* Callback after the exiting uthread has yielded and is in vcore context.  Note
+ * that the thread_exited callback can be called concurrently (e.g., a racing
+ * call to detach()), so it's important to not be in the uthread's context. */
+static void __uth_2ls_thread_exit_cb(struct uthread *uth, void *retval)
+{
+       struct uth_join_ctl *jc = &uth->join_ctl;
+       long old_state;
+
+       do {
+               old_state = atomic_read(&jc->state);
+               switch (old_state) {
+               case UTH_JOIN_DETACHED:
+                       sched_ops->thread_exited(uth);
+                       return;
+               case UTH_JOIN_HAS_JOINER:
+                       uth_post_and_kick_joiner(uth, retval);
+                       sched_ops->thread_exited(uth);
+                       return;
+               case UTH_JOIN_JOINABLE:
+                       /* This write is harmless and idempotent; we can lose the race and
+                        * still be safe.  Assuming we don't, the joiner will look here for
+                        * the retval.  It's temporary storage since we don't know the final
+                        * retval location (since join hasn't happened yet). */
+                       jc->retval = retval;
+                       break;
+               };
+               assert(old_state == UTH_JOIN_JOINABLE);
+       } while (!atomic_cas(&jc->state, old_state, UTH_JOIN_EXITED));
+       /* We were joinable, now we have exited.  A detacher or joiner will trigger
+        * thread_exited. */
+}
+
+/* 2LSs call this when their threads are exiting.  The 2LS will regain control
+ * of the thread in sched_ops->thread_exited.  This will be after the
+ * join/detach/exit has completed, and might be in vcore context. */
+void __attribute__((noreturn)) uth_2ls_thread_exit(void *retval)
+{
+       uthread_yield(FALSE, __uth_2ls_thread_exit_cb, retval);
+       assert(0);
+}
+
+/* Helper: Attaches the caller (specifically the jk) to the target uthread.
+ * When the thread has been joined (either due to the UTH_EXITED case or due to
+ * __uth_2ls_thread_exit_cb), the join kicker will be decreffed. */
+static void join_one(struct uthread *uth, struct uth_join_kicker *jk,
+                     void **retval_loc)
+{
+       struct uth_join_ctl *jc = &uth->join_ctl;
+       long old_state;
+
+       /* We can safely write to the join_ctl, even if we don't end up setting
+        * HAS_JOINER.  There's only supposed to be one joiner, and if not, we'll
+        * catch the bad state. */
+       jc->retval_loc = retval_loc;
+       jc->kicker = jk;
+       do {
+               old_state = atomic_read(&jc->state);
+               switch (old_state) {
+               case UTH_JOIN_EXITED:
+                       if (retval_loc)
+                               *retval_loc = jc->retval;
+                       sched_ops->thread_exited(uth);
+                       kref_put(&jk->kref);
+                       return;
+               case UTH_JOIN_DETACHED:
+                       panic("Uth %p has been detached, can't join!", uth);
+               case UTH_JOIN_HAS_JOINER:
+                       panic("Uth %p has another pending joiner!", uth);
+               };
+               assert(old_state == UTH_JOIN_JOINABLE);
+       } while (!atomic_cas(&jc->state, old_state, UTH_JOIN_HAS_JOINER));
+}
+
+/* Bottom half of the join, in vcore context */
+static void __uth_join_cb(struct uthread *uth, void *arg)
+{
+       struct uth_join_kicker *jk = (struct uth_join_kicker*)arg;
+
+       uthread_has_blocked(uth, NULL, UTH_EXT_BLK_MUTEX);
+       /* After this, and after all threads join, we could be woken up. */
+       kref_put(&jk->kref);
+}
+
+static void kicker_release(struct kref *k)
+{
+       struct uth_join_kicker *jk = container_of(k, struct uth_join_kicker, kref);
+
+       uthread_runnable(jk->joiner);
+}
+
+void uthread_join_arr(struct uth_join_request reqs[], size_t nr_req)
+{
+       struct uth_join_kicker jk[1];
+
+       jk->joiner = current_uthread;
+       /* One ref for each target, another for *us*, which we drop in the yield
+        * callback.  As as soon as it is fully decreffed, our thread will be
+        * restarted.  We must block before that (in the yield callback). */
+       kref_init(&jk->kref, kicker_release, nr_req + 1);
+       for (int i = 0; i < nr_req; i++)
+               join_one(reqs[i].uth, jk, reqs[i].retval_loc);
+       uthread_yield(TRUE, __uth_join_cb, jk);
+}
+
+/* Unlike POSIX, we don't bother with returning error codes.  Anything that can
+ * go wrong is so horrendous that you should crash (the specs say the behavior
+ * is undefined). */
+void uthread_join(struct uthread *uth, void **retval_loc)
+{
+       struct uth_join_request req[1];
+
+       req->uth = uth;
+       req->retval_loc = retval_loc;
+       uthread_join_arr(req, 1);
+}