Remove unnecessary panic() in rread()
[akaros.git] / kern / src / rcu.c
index 8481025..b543f7e 100644 (file)
@@ -116,10 +116,10 @@ void synchronize_rcu(void)
        struct sync_cb_blob b[1];
        struct semaphore sem[1];
 
-       if (in_rcu_cb_ctx(this_pcpui_ptr()))
-               panic("Attempted synchronize_rcu() from an RCU callback!");
+       if (!can_block(this_pcpui_ptr()))
+               panic("Attempted %s() from an unblockable context!", __func__);
        if (is_rcu_ktask(current_kthread))
-               panic("Attempted synchronize_rcu() from an RCU thread!");
+               panic("Attempted %s() from an RCU thread!", __func__);
        sem_init(sem, 0);
        init_rcu_head_on_stack(&b->h);
        b->sem = sem;
@@ -160,19 +160,13 @@ static void rcu_exec_cb(struct rcu_head *head)
                head->func(head);
 }
 
-static void __early_call_rcu_kmsg(uint32_t srcid, long a0, long a1, long a2)
-{
-       rcu_exec_cb((struct rcu_head*)a0);
-}
-
 void __early_call_rcu(struct rcu_head *head)
 {
        extern bool booting;
 
        assert(booting);
        assert(core_id() == 0);
-       send_kernel_message(0, __early_call_rcu_kmsg, (long)head, 0, 0,
-                           KMSG_ROUTINE);
+       run_as_rkm(rcu_exec_cb, head);
 }
 
 /* This could be called from a remote core, e.g. rcu_barrier().  Returns the
@@ -188,19 +182,21 @@ static int __call_rcu_rpi(struct rcu_state *rsp, struct rcu_pcpui *rpi,
                __early_call_rcu(head);
                return 0;
        }
-       /* rsp->gpnum is the one we're either working on (if > completed) or the one
-        * we already did.  Either way, it's a GP that may have already been ACKed
-        * during a core's QS, and that core could have started a read-side critical
-        * section that must complete before CB runs.  That requires another GP. */
+       /* rsp->gpnum is the one we're either working on (if > completed) or the
+        * one we already did.  Either way, it's a GP that may have already been
+        * ACKed during a core's QS, and that core could have started a
+        * read-side critical section that must complete before CB runs.  That
+        * requires another GP. */
        head->gpnum = READ_ONCE(rsp->gpnum) + 1;
        spin_lock_irqsave(&rpi->lock);
        list_add_tail(&head->link, &rpi->cbs);
        nr_cbs = ++rpi->nr_cbs;
        spin_unlock_irqsave(&rpi->lock);
        /* rcu_barrier requires that the write to ->nr_cbs be visible before any
-        * future writes.  unlock orders the write inside, but doesn't prevent other
-        * writes from moving in.  Technically, our lock implementations do that,
-        * but it's not part of our definition.  Maybe it should be.  Til then: */
+        * future writes.  unlock orders the write inside, but doesn't prevent
+        * other writes from moving in.  Technically, our lock implementations
+        * do that, but it's not part of our definition.  Maybe it should be.
+        * Til then: */
        wmb();
        return nr_cbs;
 }
@@ -231,31 +227,31 @@ void rcu_barrier(void)
        struct sync_cb_blob *b;
        int nr_sent = 0;
 
-       if (in_rcu_cb_ctx(this_pcpui_ptr()))
-               panic("Attempted rcu_barrier() from an RCU callback!");
+       if (!can_block(this_pcpui_ptr()))
+               panic("Attempted %s() from an unblockable context!", __func__);
        if (is_rcu_ktask(current_kthread))
-               panic("Attempted rcu_barrier() from an RCU thread!");
-       /* TODO: if we have concurrent rcu_barriers, we might be able to share the
-        * CBs.  Say we have 1 CB on a core, then N rcu_barriers.  We'll have N
-        * call_rcus in flight, though we could share.  Linux does this with a mtx
-        * and some accounting, I think. */
-
+               panic("Attempted %s() from an RCU thread!", __func__);
+       /* TODO: if we have concurrent rcu_barriers, we might be able to share
+        * the CBs.  Say we have 1 CB on a core, then N rcu_barriers.  We'll
+        * have N call_rcus in flight, though we could share.  Linux does this
+        * with a mtx and some accounting, I think. */
        b = kzmalloc(sizeof(struct sync_cb_blob) * num_cores, MEM_WAIT);
-       /* Remember, you block when sem is <= 0.  We'll get nr_sent ups, and we'll
-        * down 1 for each.  This is just like the synchronize_rcu() case; there,
-        * nr_sent == 1. */
+       /* Remember, you block when sem is <= 0.  We'll get nr_sent ups, and
+        * we'll down 1 for each.  This is just like the synchronize_rcu() case;
+        * there, nr_sent == 1. */
        sem_init(sem, 0);
-       /* Order any signal we received from someone who called call_rcu() before
-        * our rpi->nr_cbs reads. */
+       /* Order any signal we received from someone who called call_rcu()
+        * before our rpi->nr_cbs reads. */
        rmb();
        for_each_core(i) {
                rpi = _PERCPU_VARPTR(rcu_pcpui, i);
                /* Lockless peek at nr_cbs.  Two things to note here:
-                * - We look at nr_cbs and not the list, since there could be CBs on the
-                *   stack-local work list or that have blocked.
-                * - The guarantee is that we wait for any CBs from call_rcus that can
-                *   be proved to happen before rcu_barrier.  That means call_rcu had to
-                *   return, which means it had to set the nr_cbs. */
+                * - We look at nr_cbs and not the list, since there could be
+                *   CBs on the stack-local work list or that have blocked.
+                * - The guarantee is that we wait for any CBs from call_rcus
+                *   that can be proved to happen before rcu_barrier.  That
+                *   means call_rcu had to return, which means it had to set the
+                *   nr_cbs. */
                if (!rpi->nr_cbs)
                        continue;
                init_rcu_head_on_stack(&b[i].h);
@@ -268,10 +264,10 @@ void rcu_barrier(void)
                return;
        }
        wake_gp_ktask(rpi->rsp, true);
-       /* sem_down_bulk is currently slow.  Even with some fixes, we actually want
-        * a barrier, which you could imagine doing with a tree.  sem_down_bulk()
-        * doesn't have the info that we have: that the wakeups are coming from N
-        * cores on the leaves of the tree. */
+       /* sem_down_bulk is currently slow.  Even with some fixes, we actually
+        * want a barrier, which you could imagine doing with a tree.
+        * sem_down_bulk() doesn't have the info that we have: that the wakeups
+        * are coming from N cores on the leaves of the tree. */
        sem_down_bulk(sem, nr_sent);
        kfree(b);
 }
@@ -297,8 +293,8 @@ static void __mark_qs(struct rcu_state *rsp, struct rcu_node *rnp,
 
        new_qsm = __sync_and_and_fetch(&rnp->qsmask, ~grpmask);
        /* I don't fully understand this, but we need some form of transitive
-        * barrier across the entire tree.  Linux does this when they lock/unlock.
-        * Our equivalent is the atomic op. */
+        * barrier across the entire tree.  Linux does this when they
+        * lock/unlock.  Our equivalent is the atomic op. */
        smp_mb__after_unlock_lock();
        /* Only one thread will get 0 back - the last one to check in */
        if (new_qsm)
@@ -311,11 +307,12 @@ static void __mark_qs(struct rcu_state *rsp, struct rcu_node *rnp,
 
 static void rcu_report_qs_rpi(struct rcu_state *rsp, struct rcu_pcpui *rpi)
 {
-       /* Note we don't check ->completed == ->gpnum (gp_in_progress()).  We only
-        * care if our core hasn't reported in for a GP.  This time is a subset of
-        * gp_in_progress. */
+       /* Note we don't check ->completed == ->gpnum (gp_in_progress()).  We
+        * only care if our core hasn't reported in for a GP.  This time is a
+        * subset of gp_in_progress. */
        if (rpi->gp_acked == READ_ONCE(rsp->gpnum)) {
-               /* If a GP starts right afterwards, oh well.  Catch it next time. */
+               /* If a GP starts right afterwards, oh well.  Catch it next
+                * time. */
                return;
        }
        /* Lock ensures we only report a QS once per GP. */
@@ -324,14 +321,14 @@ static void rcu_report_qs_rpi(struct rcu_state *rsp, struct rcu_pcpui *rpi)
                spin_unlock_irqsave(&rpi->lock);
                return;
        }
-       /* A gp can start concurrently, but once started, we should never be behind
-        * by more than 1. */
+       /* A gp can start concurrently, but once started, we should never be
+        * behind by more than 1. */
        assert(rpi->gp_acked + 1 == READ_ONCE(rsp->gpnum));
        /* Up our gp_acked before actually marking it.  I don't want to hold the
-        * lock too long (e.g. some debug code in rendez_wakeup() calls call_rcu).
-        * So we've unlocked, but haven't actually checked in yet - that's fine.  No
-        * one else will attempt to check in until the next GP, which can't happen
-        * until after we check in for this GP. */
+        * lock too long (e.g. some debug code in rendez_wakeup() calls
+        * call_rcu).  So we've unlocked, but haven't actually checked in yet -
+        * that's fine.  No one else will attempt to check in until the next GP,
+        * which can't happen until after we check in for this GP. */
        rpi->gp_acked++;
        spin_unlock_irqsave(&rpi->lock);
        __mark_qs(rsp, rpi->my_node, rpi->grpmask);
@@ -415,11 +412,11 @@ static void rcu_run_gp(struct rcu_state *rsp)
        struct rcu_node *rnp;
 
        assert(rsp->gpnum == rsp->completed);
-       /* Initialize the tree for accumulating QSs.  We know there are no users on
-        * the tree.  The only time a core looks at the tree is when reporting a QS
-        * for a GP.  The previous GP is done, thus all cores reported their GP
-        * already (for the previous GP), and they won't try again until we
-        * advertise the next GP. */
+       /* Initialize the tree for accumulating QSs.  We know there are no users
+        * on the tree.  The only time a core looks at the tree is when
+        * reporting a QS for a GP.  The previous GP is done, thus all cores
+        * reported their GP already (for the previous GP), and they won't try
+        * again until we advertise the next GP. */
        rcu_for_each_node_breadth_first(rsp, rnp)
                rnp->qsmask = rnp->qsmaskinit;
        /* Need the tree set for reporting QSs before advertising the GP */
@@ -428,20 +425,21 @@ static void rcu_run_gp(struct rcu_state *rsp)
        /* At this point, the cores can start reporting in. */
        /* Fake cores help test a tree larger than num_cores. */
        rcu_report_qs_fake_cores(rsp);
-       /* Expediting aggressively.  We could also wait briefly and then check the
-        * tardy cores. */
+       /* Expediting aggressively.  We could also wait briefly and then check
+        * the tardy cores. */
        rcu_report_qs_remote_cores(rsp);
-       /* Note that even when we expedite the GP by checking remote cores, there's
-        * a race where a core halted but we didn't see it.  (they report QS, decide
-        * to halt, pause, we start GP, see they haven't halted, etc.  They could
-        * report the QS after setting the state, but I didn't want to . */
+       /* Note that even when we expedite the GP by checking remote cores,
+        * there's a race where a core halted but we didn't see it.  (they
+        * report QS, decide to halt, pause, we start GP, see they haven't
+        * halted, etc.  They could report the QS after setting the state, but I
+        * didn't want to . */
        do {
                rendez_sleep_timeout(&rsp->gp_ktask_rv, root_qsmask_empty, rsp,
                                     RCU_GP_TARDY_PERIOD);
                rcu_report_qs_tardy_cores(rsp);
        } while (!root_qsmask_empty(rsp));
-       /* Not sure if we need any barriers here.  Once we post 'completed', the CBs
-        * can start running.  But no one should touch the tree til gpnum is
+       /* Not sure if we need any barriers here.  Once we post 'completed', the
+        * CBs can start running.  But no one should touch the tree til gpnum is
         * incremented. */
        WRITE_ONCE(rsp->completed, rsp->gpnum);
 }
@@ -472,11 +470,12 @@ static void rcu_gp_ktask(void *arg)
                rendez_sleep_timeout(&rsp->gp_ktask_rv, should_wake_ctl,
                                     &rsp->gp_ktask_ctl, RCU_GP_MIN_PERIOD);
                rsp->gp_ktask_ctl = 0;
-               /* Our write of 0 must happen before starting the GP.  If rcu_barrier's
-                * CBs miss the start of the GP (and thus are in an unscheduled GP),
-                * their write of 1 must happen after our write of 0 so that we rerun.
-                * This is the post-and-poke pattern.  It's not a huge deal, since we'll
-                * catch it after the GP period timeout. */
+               /* Our write of 0 must happen before starting the GP.  If
+                * rcu_barrier's CBs miss the start of the GP (and thus are in
+                * an unscheduled GP), their write of 1 must happen after our
+                * write of 0 so that we rerun.  This is the post-and-poke
+                * pattern.  It's not a huge deal, since we'll catch it after
+                * the GP period timeout. */
                wmb();
                rcu_run_gp(rsp);
                wake_mgmt_ktasks(rsp);
@@ -491,15 +490,15 @@ static void run_rcu_cbs(struct rcu_state *rsp, int coreid)
        int nr_cbs = 0;
        unsigned long completed;
 
-       /* We'll run the CBs for any GP completed so far, but not any GP that could
-        * be completed concurrently.  "CBs for a GP" means callbacks that must wait
-        * for that GP to complete. */
+       /* We'll run the CBs for any GP completed so far, but not any GP that
+        * could be completed concurrently.  "CBs for a GP" means callbacks that
+        * must wait for that GP to complete. */
        completed = READ_ONCE(rsp->completed);
 
-       /* This lockless peek is an optimization.  We're guaranteed to not miss the
-        * CB for the given GP: If the core had a CB for this GP, it must have
-        * put it on the list before checking in, before the GP completes, and
-        * before we run. */
+       /* This lockless peek is an optimization.  We're guaranteed to not miss
+        * the CB for the given GP: If the core had a CB for this GP, it must
+        * have put it on the list before checking in, before the GP completes,
+        * and before we run. */
        if (list_empty(&rpi->cbs))
                return;
 
@@ -522,17 +521,17 @@ static void run_rcu_cbs(struct rcu_state *rsp, int coreid)
         * world, not blocking also means our kthread won't migrate from this core,
         * such that the pcpui pointer (and thus the specific __ctx_depth) won't
         * change. */
-       set_rcu_cb(this_pcpui_ptr());
+       set_cannot_block(this_pcpui_ptr());
        list_for_each_entry_safe(head, temp, &work, link) {
                list_del(&head->link);
                rcu_exec_cb(head);
        }
-       clear_rcu_cb(this_pcpui_ptr());
+       clear_cannot_block(this_pcpui_ptr());
 
        /* We kept nr_cbs in place until the CBs, which could block, completed.
-        * This allows other readers (rcu_barrier()) of our pcpui to tell if we have
-        * any CBs pending.  This relies on us being the only consumer/runner of CBs
-        * for this core. */
+        * This allows other readers (rcu_barrier()) of our pcpui to tell if we
+        * have any CBs pending.  This relies on us being the only
+        * consumer/runner of CBs for this core. */
        spin_lock_irqsave(&rpi->lock);
        rpi->nr_cbs -= nr_cbs;
        spin_unlock_irqsave(&rpi->lock);
@@ -548,7 +547,8 @@ static void rcu_mgmt_ktask(void *arg)
                rendez_sleep(&rpi->mgmt_ktask_rv, should_wake_ctl,
                             &rpi->mgmt_ktask_ctl);
                rpi->mgmt_ktask_ctl = 0;
-               /* TODO: given the number of mgmt kthreads, we need to assign cores */
+               /* TODO: given the number of mgmt kthreads, we need to assign
+                * cores */
                for_each_core(i)
                        run_rcu_cbs(rsp, i);
        };
@@ -608,9 +608,9 @@ void rcu_init(void)
        /* TODO: For each mgmt core */
        ktask("rcu_mgmt_0", rcu_mgmt_ktask, _PERCPU_VARPTR(rcu_pcpui, 0));
 
-       /* If we have a call_rcu before percpu_init, we might be using the spot in
-        * the actual __percpu .section.  We'd be core 0, so that'd be OK, since all
-        * we're using it for is reading 'booted'. */
+       /* If we have a call_rcu before percpu_init, we might be using the spot
+        * in the actual __percpu .section.  We'd be core 0, so that'd be OK,
+        * since all we're using it for is reading 'booted'. */
        for_each_core(i) {
                rpi = _PERCPU_VARPTR(rcu_pcpui, i);
                rpi->booted = true;