printk: check for user pointers in format string parameters
[akaros.git] / kern / src / rcu.c
1 /* Copyright (c) 2018 Google Inc
2  * Barret Rhoden <brho@cs.berkeley.edu>
3  * See LICENSE for details.
4  *
5  * RCU.  We borrow a few things from Linux - mostly the header bits and the
6  * tree-rcu structure.
7  *
8  * Acronyms/definitions:
9  * - CB: RCU callbacks (call_rcu)
10  * - QS: quiescent state - a time when we know a core isn't in an RCU read-side
11  *   critical section.
12  * - GP: grace period.  Some quotes from Linux/Paul:
13  *   - "A time period during which all such pre-existing readers complete is
14  *   called a 'grace period'."
15  *   - "Anything outside of an RCU read-side critical section is a quiescent
16  *   state, and a grace period is any time period in which every CPU (or task,
17  *   for
18  * - gpnum: number of the current grace period we are working on
19  * - completed: number of the grace periods completed
20  *
21  * We differ in a few ways from Linux's implementation:
22  *
23  * - Callbacks run on management cores (a.k.a, LL cores, e.g. core 0).  This way
24  *   we don't have to kick idle or user space cores to run their CBs, and those
25  *   CBs don't interfere with a possibly unrelated process.
26  *
27  * - Our RCU is most similar to rcu_sched (classic RCU), and not the preemptible
28  *   RCU.  Our kthreads don't get preempted, so we don't need to worry about
29  *   read side critical sections being interrupted.
30  *
31  * - There is no softirq processing to note the passing of GPs or to run CBs.
32  *
33  * - Our tree uses atomic ops to trace grace periods within the rcu_nodes.
34  *   Linux's tree-rcu uses locks.  They need the locks since under some
35  *   circumstances, a QS would be marked during a read-side critical section,
36  *   and the QS marking needed to track the gpnum to keep the QS matched to the
37  *   GP.  See
38  *   https://www.kernel.org/doc/Documentation/RCU/Design/Data-Structures/Data-Structures.html
39  *   and grep "Come on".  We don't need to worry about this since we only mark a
40  *   QS under two situations:
41  *
42  *   - The core knows it is does not hold an rcu_read_lock, so we can always
43  *   mark QS.
44  *   - The GP kthread saw the core either idle or in userspace after the gp
45  *   started.  That means we know that core had a QS after the GP started.
46  *
47  *   So any time we mark a QS is actually a QS.  I think Linux has times where
48  *   they note a QS for an older GP, and set a note to mark that QS *for that
49  *   GP* in the future.  Their locks make sure they are marking for the right
50  *   gpnum.  There might be some element of the rnps not knowing about the
51  *   latest GP yet too.
52  *
53  * - We do use locking at the per-core level to decide whether or not to start
54  *   mark a QS for a given GP.  (lock, compare gp_acked to gpnum, etc).  This
55  *   ensures only one thread (the core or the GP kth) marks the core for a given
56  *   GP.  We actually could handle it if the both did, (make the trickle-up
57  *   idempotent, which we do for the interior nodes) but we could run into
58  *   situations where a core checks in for a GP before the global gpnum was set.
59  *   This could happen when the GP kth is resetting the tree for the next GP.
60  *   I think it'd be OK, but not worth the hassle and confusion.
61  *
62  * - We have a kthread for GP management, like Linux.  Callbacks are enqueued
63  *   locally (on the core that calls call_rcu), like Linux.  We have a kthread
64  *   per management core to process the callbacks, and these threads will handle
65  *   the callbacks of *all* cores.  Each core has a specific mgmt kthread that
66  *   will run its callbacks.  It is important that a particular core's callbacks
67  *   are processed by the same thread - I rely on this to implement rcu_barrier
68  *   easily.  In that case, we just need to schedule a CB on every core that has
69  *   CBs, and when those N CBs are done, our barrier passed.  This relies on CBs
70  *   being processed in order for a given core.  We could do the barrier in
71  *   other ways, but it doesn't seem like a big deal.
72  *
73  * - I kept around some seq counter and locking stuff in rcu_helper.h.  We might
74  *   use that in the future.
75  */
76
77 #include <rcu.h>
78 #include <kthread.h>
79 #include <smp.h>
80 #include <kmalloc.h>
81
82 /* How many CBs to queue up before we trigger a GP */
83 #define RCU_CB_THRESH 10
84 /* How long (usec) we wait between running a GP if we weren't triggered. */
85 #define RCU_GP_MIN_PERIOD 25000
86 /* How long (usec) we wait for cores to check in. */
87 #define RCU_GP_TARDY_PERIOD 1000
88
89 /* In rcu_tree_helper.c */
90 extern int rcu_num_cores;
91 extern int rcu_num_lvls;
92
93 /* Controls whether we skip cores when we expedite, which forces tardy cores. */
94 static bool rcu_debug_tardy;
95
96 /* Externed in rcu_tree_helper.c */
97 struct rcu_state rcu_state;
98
99
100 DEFINE_PERCPU(struct rcu_pcpui, rcu_pcpui);
101
102 struct sync_cb_blob {
103         struct rcu_head h;
104         struct semaphore *sem;
105 };
106
107 static void __sync_cb(struct rcu_head *head)
108 {
109         struct sync_cb_blob *b = container_of(head, struct sync_cb_blob, h);
110
111         sem_up(b->sem);
112 }
113
114 void synchronize_rcu(void)
115 {
116         struct sync_cb_blob b[1];
117         struct semaphore sem[1];
118
119         if (!can_block(this_pcpui_ptr()))
120                 panic("Attempted %s() from an unblockable context!", __func__);
121         if (is_rcu_ktask(current_kthread))
122                 panic("Attempted %s() from an RCU thread!", __func__);
123         sem_init(sem, 0);
124         init_rcu_head_on_stack(&b->h);
125         b->sem = sem;
126         call_rcu(&b->h, __sync_cb);
127         sem_down(sem);
128 }
129
130 static inline bool gp_in_progress(struct rcu_state *rsp)
131 {
132         unsigned long completed = READ_ONCE(rsp->completed);
133         unsigned long gpnum = READ_ONCE(rsp->gpnum);
134
135         assert(gpnum - completed <= 1);
136         return completed != gpnum;
137 }
138
139 /* Wakes the kthread to run a grace period if it isn't already running.
140  *
141  * If 'force', we'll make sure it runs a fresh GP, which will catch all CBs
142  * registered before this call.  That's not 100% true.  It might be possible on
143  * some non-x86 architectures for the writes that wake the ktask are reordered
144  * before the read of gpnum that our caller made.  Thus the caller could have a
145  * CB in a later GP.  Worst case, they'll wait an extra GP timeout.  Not too
146  * concerned, though I probably should be. */
147 static void wake_gp_ktask(struct rcu_state *rsp, bool force)
148 {
149         if (!force && gp_in_progress(rsp))
150                 return;
151         rsp->gp_ktask_ctl = 1;
152         rendez_wakeup(&rsp->gp_ktask_rv);
153 }
154
155 static void rcu_exec_cb(struct rcu_head *head)
156 {
157         if (__is_kfree_rcu_offset((unsigned long)head->func))
158                 kfree((void*)head - (unsigned long)head->func);
159         else
160                 head->func(head);
161 }
162
163 void __early_call_rcu(struct rcu_head *head)
164 {
165         extern bool booting;
166
167         assert(booting);
168         assert(core_id() == 0);
169         run_as_rkm(rcu_exec_cb, head);
170 }
171
172 /* This could be called from a remote core, e.g. rcu_barrier().  Returns the
173  * number of enqueued CBs, including the one we pass in. */
174 static int __call_rcu_rpi(struct rcu_state *rsp, struct rcu_pcpui *rpi,
175                            struct rcu_head *head, rcu_callback_t func)
176 {
177         unsigned int nr_cbs;
178
179         head->func = func;
180
181         if (!rpi->booted) {
182                 __early_call_rcu(head);
183                 return 0;
184         }
185         /* rsp->gpnum is the one we're either working on (if > completed) or the
186          * one we already did.  Either way, it's a GP that may have already been
187          * ACKed during a core's QS, and that core could have started a
188          * read-side critical section that must complete before CB runs.  That
189          * requires another GP. */
190         head->gpnum = READ_ONCE(rsp->gpnum) + 1;
191         spin_lock_irqsave(&rpi->lock);
192         list_add_tail(&head->link, &rpi->cbs);
193         nr_cbs = ++rpi->nr_cbs;
194         spin_unlock_irqsave(&rpi->lock);
195         /* rcu_barrier requires that the write to ->nr_cbs be visible before any
196          * future writes.  unlock orders the write inside, but doesn't prevent
197          * other writes from moving in.  Technically, our lock implementations
198          * do that, but it's not part of our definition.  Maybe it should be.
199          * Til then: */
200         wmb();
201         return nr_cbs;
202 }
203
204 /* Minus the kfree offset check */
205 static void __call_rcu(struct rcu_head *head, rcu_callback_t func)
206 {
207         struct rcu_pcpui *rpi = PERCPU_VARPTR(rcu_pcpui);
208         struct rcu_state *rsp = rpi->rsp;
209         unsigned int thresh;
210
211         thresh = __call_rcu_rpi(rsp, rpi, head, func);
212         if (thresh > RCU_CB_THRESH)
213                 wake_gp_ktask(rpi->rsp, false);
214 }
215
216 void call_rcu(struct rcu_head *head, rcu_callback_t func)
217 {
218         assert(!__is_kfree_rcu_offset((unsigned long)func));
219         __call_rcu(head, func);
220 }
221
222 void rcu_barrier(void)
223 {
224         struct rcu_state *rsp = PERCPU_VAR(rcu_pcpui).rsp;
225         struct rcu_pcpui *rpi;
226         struct semaphore sem[1];
227         struct sync_cb_blob *b;
228         int nr_sent = 0;
229
230         if (!can_block(this_pcpui_ptr()))
231                 panic("Attempted %s() from an unblockable context!", __func__);
232         if (is_rcu_ktask(current_kthread))
233                 panic("Attempted %s() from an RCU thread!", __func__);
234         /* TODO: if we have concurrent rcu_barriers, we might be able to share
235          * the CBs.  Say we have 1 CB on a core, then N rcu_barriers.  We'll
236          * have N call_rcus in flight, though we could share.  Linux does this
237          * with a mtx and some accounting, I think. */
238         b = kzmalloc(sizeof(struct sync_cb_blob) * num_cores, MEM_WAIT);
239         /* Remember, you block when sem is <= 0.  We'll get nr_sent ups, and
240          * we'll down 1 for each.  This is just like the synchronize_rcu() case;
241          * there, nr_sent == 1. */
242         sem_init(sem, 0);
243         /* Order any signal we received from someone who called call_rcu()
244          * before our rpi->nr_cbs reads. */
245         rmb();
246         for_each_core(i) {
247                 rpi = _PERCPU_VARPTR(rcu_pcpui, i);
248                 /* Lockless peek at nr_cbs.  Two things to note here:
249                  * - We look at nr_cbs and not the list, since there could be
250                  *   CBs on the stack-local work list or that have blocked.
251                  * - The guarantee is that we wait for any CBs from call_rcus
252                  *   that can be proved to happen before rcu_barrier.  That
253                  *   means call_rcu had to return, which means it had to set the
254                  *   nr_cbs. */
255                 if (!rpi->nr_cbs)
256                         continue;
257                 init_rcu_head_on_stack(&b[i].h);
258                 b[i].sem = sem;
259                 __call_rcu_rpi(rsp, rpi, &b[i].h, __sync_cb);
260                 nr_sent++;
261         }
262         if (!nr_sent) {
263                 kfree(b);
264                 return;
265         }
266         wake_gp_ktask(rpi->rsp, true);
267         /* sem_down_bulk is currently slow.  Even with some fixes, we actually
268          * want a barrier, which you could imagine doing with a tree.
269          * sem_down_bulk() doesn't have the info that we have: that the wakeups
270          * are coming from N cores on the leaves of the tree. */
271         sem_down_bulk(sem, nr_sent);
272         kfree(b);
273 }
274
275 void rcu_force_quiescent_state(void)
276 {
277         /* It's unclear if we want to block until the QS has passed */
278         wake_gp_ktask(PERCPU_VAR(rcu_pcpui).rsp, true);
279 }
280
281 void kfree_call_rcu(struct rcu_head *head, rcu_callback_t off)
282 {
283         __call_rcu(head, off);
284 }
285
286 /* Clears the bits core(s) in grpmask present in rnp, trickling up to the root.
287  * Note that a 1 in qsmask means you haven't checked in - like a todo list.
288  * Last one out kicks the GP kthread. */
289 static void __mark_qs(struct rcu_state *rsp, struct rcu_node *rnp,
290                       unsigned long grpmask)
291 {
292         unsigned long new_qsm;
293
294         new_qsm = __sync_and_and_fetch(&rnp->qsmask, ~grpmask);
295         /* I don't fully understand this, but we need some form of transitive
296          * barrier across the entire tree.  Linux does this when they
297          * lock/unlock.  Our equivalent is the atomic op. */
298         smp_mb__after_unlock_lock();
299         /* Only one thread will get 0 back - the last one to check in */
300         if (new_qsm)
301                 return;
302         if (rnp->parent)
303                 __mark_qs(rsp, rnp->parent, rnp->grpmask);
304         else
305                 rendez_wakeup(&rsp->gp_ktask_rv);
306 }
307
308 static void rcu_report_qs_rpi(struct rcu_state *rsp, struct rcu_pcpui *rpi)
309 {
310         /* Note we don't check ->completed == ->gpnum (gp_in_progress()).  We
311          * only care if our core hasn't reported in for a GP.  This time is a
312          * subset of gp_in_progress. */
313         if (rpi->gp_acked == READ_ONCE(rsp->gpnum)) {
314                 /* If a GP starts right afterwards, oh well.  Catch it next
315                  * time. */
316                 return;
317         }
318         /* Lock ensures we only report a QS once per GP. */
319         spin_lock_irqsave(&rpi->lock);
320         if (rpi->gp_acked == READ_ONCE(rsp->gpnum)) {
321                 spin_unlock_irqsave(&rpi->lock);
322                 return;
323         }
324         /* A gp can start concurrently, but once started, we should never be
325          * behind by more than 1. */
326         assert(rpi->gp_acked + 1 == READ_ONCE(rsp->gpnum));
327         /* Up our gp_acked before actually marking it.  I don't want to hold the
328          * lock too long (e.g. some debug code in rendez_wakeup() calls
329          * call_rcu).  So we've unlocked, but haven't actually checked in yet -
330          * that's fine.  No one else will attempt to check in until the next GP,
331          * which can't happen until after we check in for this GP. */
332         rpi->gp_acked++;
333         spin_unlock_irqsave(&rpi->lock);
334         __mark_qs(rsp, rpi->my_node, rpi->grpmask);
335 }
336
337 /* Cores advertise when they are in QSs.  If the core already reported in, or if
338  * we're not in a GP, this is a quick check (given a global read of ->gpnum). */
339 void rcu_report_qs(void)
340 {
341         rcu_report_qs_rpi(&rcu_state, PERCPU_VARPTR(rcu_pcpui));
342 }
343
344 /* For debugging checks on large trees.  Keep this in sync with
345  * rcu_init_fake_cores(). */
346 static void rcu_report_qs_fake_cores(struct rcu_state *rsp)
347 {
348         struct rcu_node *rnp;
349
350         rnp = rsp->level[rcu_num_lvls - 1];
351         for (int i = num_cores; i < rcu_num_cores; i++) {
352                 while (i > rnp->grphi)
353                         rnp++;
354                 if (rcu_debug_tardy && (i % 2))
355                         continue;
356                 __mark_qs(rsp, rnp, 1 << (i - rnp->grplo));
357         }
358 }
359
360 static void rcu_report_qs_remote_core(struct rcu_state *rsp, int coreid)
361 {
362         int cpu_state = READ_ONCE(pcpui_var(coreid, cpu_state));
363         struct rcu_pcpui *rpi = _PERCPU_VARPTR(rcu_pcpui, coreid);
364
365         /* Lockless peek.  If we ever saw them idle/user after a GP started, we
366          * know they had a QS, and we know we're still in the original GP. */
367         if (cpu_state == CPU_STATE_IDLE || cpu_state == CPU_STATE_USER)
368                 rcu_report_qs_rpi(rsp, rpi);
369 }
370
371 /* Checks every core, remotely via the cpu state, to see if it is in a QS.
372  * This is like an expedited grace period. */
373 static void rcu_report_qs_remote_cores(struct rcu_state *rsp)
374 {
375         for_each_core(i) {
376                 if (rcu_debug_tardy && (i % 2))
377                         continue;
378                 rcu_report_qs_remote_core(rsp, i);
379         }
380 }
381
382 static void rcu_report_qs_tardy_cores(struct rcu_state *rsp)
383 {
384         struct rcu_node *rnp;
385         unsigned long qsmask;
386         int i;
387
388         rcu_for_each_leaf_node(rsp, rnp) {
389                 qsmask = READ_ONCE(rnp->qsmask);
390                 if (!qsmask)
391                         continue;
392                 for_each_set_bit(i, &qsmask, BITS_PER_LONG) {
393                         /* Fake cores */
394                         if (i + rnp->grplo >= num_cores) {
395                                 __mark_qs(rsp, rnp, 1 << i);
396                                 continue;
397                         }
398                         rcu_report_qs_remote_core(rsp, i + rnp->grplo);
399                 }
400         }
401 }
402
403 static int root_qsmask_empty(void *arg)
404 {
405         struct rcu_state *rsp = arg;
406
407         return READ_ONCE(rsp->node[0].qsmask) == 0 ? 1 : 0;
408 }
409
410 static void rcu_run_gp(struct rcu_state *rsp)
411 {
412         struct rcu_node *rnp;
413
414         assert(rsp->gpnum == rsp->completed);
415         /* Initialize the tree for accumulating QSs.  We know there are no users
416          * on the tree.  The only time a core looks at the tree is when
417          * reporting a QS for a GP.  The previous GP is done, thus all cores
418          * reported their GP already (for the previous GP), and they won't try
419          * again until we advertise the next GP. */
420         rcu_for_each_node_breadth_first(rsp, rnp)
421                 rnp->qsmask = rnp->qsmaskinit;
422         /* Need the tree set for reporting QSs before advertising the GP */
423         wmb();
424         WRITE_ONCE(rsp->gpnum, rsp->gpnum + 1);
425         /* At this point, the cores can start reporting in. */
426         /* Fake cores help test a tree larger than num_cores. */
427         rcu_report_qs_fake_cores(rsp);
428         /* Expediting aggressively.  We could also wait briefly and then check
429          * the tardy cores. */
430         rcu_report_qs_remote_cores(rsp);
431         /* Note that even when we expedite the GP by checking remote cores,
432          * there's a race where a core halted but we didn't see it.  (they
433          * report QS, decide to halt, pause, we start GP, see they haven't
434          * halted, etc.  They could report the QS after setting the state, but I
435          * didn't want to . */
436         do {
437                 rendez_sleep_timeout(&rsp->gp_ktask_rv, root_qsmask_empty, rsp,
438                                      RCU_GP_TARDY_PERIOD);
439                 rcu_report_qs_tardy_cores(rsp);
440         } while (!root_qsmask_empty(rsp));
441         /* Not sure if we need any barriers here.  Once we post 'completed', the
442          * CBs can start running.  But no one should touch the tree til gpnum is
443          * incremented. */
444         WRITE_ONCE(rsp->completed, rsp->gpnum);
445 }
446
447 static int should_wake_ctl(void *arg)
448 {
449         int *ctl = arg;
450
451         return *ctl != 0 ? 1 : 0;
452 }
453
454 static void wake_mgmt_ktasks(struct rcu_state *rsp)
455 {
456         struct rcu_pcpui *rpi;
457
458         /* TODO: For each mgmt core */
459         rpi = _PERCPU_VARPTR(rcu_pcpui, 0);
460         rpi->mgmt_ktask_ctl = 1;
461         rendez_wakeup(&rpi->mgmt_ktask_rv);
462 }
463
464 static void rcu_gp_ktask(void *arg)
465 {
466         struct rcu_state *rsp = arg;
467
468         current_kthread->flags |= KTH_IS_RCU_KTASK;
469         while (1) {
470                 rendez_sleep_timeout(&rsp->gp_ktask_rv, should_wake_ctl,
471                                      &rsp->gp_ktask_ctl, RCU_GP_MIN_PERIOD);
472                 rsp->gp_ktask_ctl = 0;
473                 /* Our write of 0 must happen before starting the GP.  If
474                  * rcu_barrier's CBs miss the start of the GP (and thus are in
475                  * an unscheduled GP), their write of 1 must happen after our
476                  * write of 0 so that we rerun.  This is the post-and-poke
477                  * pattern.  It's not a huge deal, since we'll catch it after
478                  * the GP period timeout. */
479                 wmb();
480                 rcu_run_gp(rsp);
481                 wake_mgmt_ktasks(rsp);
482         };
483 }
484
485 static void run_rcu_cbs(struct rcu_state *rsp, int coreid)
486 {
487         struct rcu_pcpui *rpi = _PERCPU_VARPTR(rcu_pcpui, coreid);
488         struct list_head work = LIST_HEAD_INIT(work);
489         struct rcu_head *head, *temp, *last_for_gp = NULL;
490         int nr_cbs = 0;
491         unsigned long completed;
492
493         /* We'll run the CBs for any GP completed so far, but not any GP that
494          * could be completed concurrently.  "CBs for a GP" means callbacks that
495          * must wait for that GP to complete. */
496         completed = READ_ONCE(rsp->completed);
497
498         /* This lockless peek is an optimization.  We're guaranteed to not miss
499          * the CB for the given GP: If the core had a CB for this GP, it must
500          * have put it on the list before checking in, before the GP completes,
501          * and before we run. */
502         if (list_empty(&rpi->cbs))
503                 return;
504
505         spin_lock_irqsave(&rpi->lock);
506         list_for_each_entry(head, &rpi->cbs, link) {
507                 if (ULONG_CMP_LT(completed, head->gpnum))
508                         break;
509                 nr_cbs++;
510                 last_for_gp = head;
511         }
512         if (last_for_gp)
513                 list_cut_position(&work, &rpi->cbs, &last_for_gp->link);
514         spin_unlock_irqsave(&rpi->lock);
515
516         if (!nr_cbs) {
517                 assert(list_empty(&work));
518                 return;
519         }
520         /* When we're in an RCU callback, we can't block.  In our non-preemptive
521          * world, not blocking also means our kthread won't migrate from this core,
522          * such that the pcpui pointer (and thus the specific __ctx_depth) won't
523          * change. */
524         set_cannot_block(this_pcpui_ptr());
525         list_for_each_entry_safe(head, temp, &work, link) {
526                 list_del(&head->link);
527                 rcu_exec_cb(head);
528         }
529         clear_cannot_block(this_pcpui_ptr());
530
531         /* We kept nr_cbs in place until the CBs, which could block, completed.
532          * This allows other readers (rcu_barrier()) of our pcpui to tell if we
533          * have any CBs pending.  This relies on us being the only
534          * consumer/runner of CBs for this core. */
535         spin_lock_irqsave(&rpi->lock);
536         rpi->nr_cbs -= nr_cbs;
537         spin_unlock_irqsave(&rpi->lock);
538 }
539
540 static void rcu_mgmt_ktask(void *arg)
541 {
542         struct rcu_pcpui *rpi = arg;
543         struct rcu_state *rsp = rpi->rsp;
544
545         current_kthread->flags |= KTH_IS_RCU_KTASK;
546         while (1) {
547                 rendez_sleep(&rpi->mgmt_ktask_rv, should_wake_ctl,
548                              &rpi->mgmt_ktask_ctl);
549                 rpi->mgmt_ktask_ctl = 0;
550                 /* TODO: given the number of mgmt kthreads, we need to assign
551                  * cores */
552                 for_each_core(i)
553                         run_rcu_cbs(rsp, i);
554         };
555 }
556
557 void rcu_init_pcpui(struct rcu_state *rsp, struct rcu_pcpui *rpi, int coreid)
558 {
559         struct rcu_node *rnp = rpi->my_node;
560
561         rpi->rsp = rsp;
562         assert(rnp->grplo <= coreid);
563         assert(coreid <= rnp->grphi);
564         rpi->coreid = coreid;
565         rpi->grpnum = coreid - rnp->grplo;
566         rpi->grpmask = 1 << rpi->grpnum;
567         rpi->booted = false;
568
569         /* We're single threaded now, so this is OK. */
570         rnp->qsmaskinit |= rpi->grpmask;
571
572         spinlock_init_irqsave(&rpi->lock);
573         INIT_LIST_HEAD(&rpi->cbs);
574         rpi->nr_cbs = 0;
575         rpi->gp_acked = rsp->completed;
576
577         /* TODO: For each mgmt core only */
578         if (coreid == 0) {
579                 rendez_init(&rpi->mgmt_ktask_rv);
580                 rpi->mgmt_ktask_ctl = 0;
581         }
582 }
583
584 /* Initializes the fake cores.  Works with rcu_report_qs_fake_cores() */
585 static void rcu_init_fake_cores(struct rcu_state *rsp)
586 {
587         struct rcu_node *rnp;
588
589         rnp = rsp->level[rcu_num_lvls - 1];
590         for (int i = num_cores; i < rcu_num_cores; i++) {
591                 while (i > rnp->grphi)
592                         rnp++;
593                 rnp->qsmaskinit |= 1 << (i - rnp->grplo);
594         }
595 }
596
597 void rcu_init(void)
598 {
599         struct rcu_state *rsp = &rcu_state;
600         struct rcu_pcpui *rpi;
601
602         rcu_init_geometry();
603         rcu_init_one(rsp);
604         rcu_init_fake_cores(rsp);
605         rcu_dump_rcu_node_tree(rsp);
606
607         ktask("rcu_gp", rcu_gp_ktask, rsp);
608         /* TODO: For each mgmt core */
609         ktask("rcu_mgmt_0", rcu_mgmt_ktask, _PERCPU_VARPTR(rcu_pcpui, 0));
610
611         /* If we have a call_rcu before percpu_init, we might be using the spot
612          * in the actual __percpu .section.  We'd be core 0, so that'd be OK,
613          * since all we're using it for is reading 'booted'. */
614         for_each_core(i) {
615                 rpi = _PERCPU_VARPTR(rcu_pcpui, i);
616                 rpi->booted = true;
617         }
618 }