pm: Allow RCU lookups and qlocked updaters
authorBarret Rhoden <brho@cs.berkeley.edu>
Thu, 29 Mar 2018 18:42:22 +0000 (14:42 -0400)
committerBarret Rhoden <brho@cs.berkeley.edu>
Mon, 30 Apr 2018 18:36:28 +0000 (14:36 -0400)
We're going to need to do writeback, and the easiest way to do that is to
hold a qlock on our callback.  We can't just replace the old pm spinlock
with a qlock, since it is grabbed by other spinlock holders (old VMR code,
both for pm_add_vmr and for pm_load_page_nowait()).

Also, this allows lockless lookups, which is nice.

Overall, the VMR and PM/radix code still needs a lot of structural work,
possibly involving thinking more about huge pages.  For now, this is a
modest change and we'll keep the basic infrastructure in place - enough
that we can make progress with the TFS code.

Signed-off-by: Barret Rhoden <brho@cs.berkeley.edu>
kern/include/pagemap.h
kern/include/radix.h
kern/src/pagemap.c
kern/src/radix.c

index 5146310..bd2e5e4 100644 (file)
@@ -22,13 +22,13 @@ struct page_map_operations;
  * currently in memory.  It is a map, per object, from index to physical page
  * frame. */
 struct page_map {
+       qlock_t                                         pm_qlock;               /* for the radix tree nr_pgs */
        struct fs_file                          *pm_file;
        struct radix_tree                       pm_tree;                /* tracks present pages */
        unsigned long                           pm_num_pages;   /* how many pages are present */
        struct page_map_operations      *pm_op;
-       spinlock_t                                      pm_lock;
+       spinlock_t                                      pm_lock;                /* for the VMR list */
        struct vmr_tailq                        pm_vmrs;
-       atomic_t                                        pm_removal;
 };
 
 /* Operations performed on a page_map.  These are usually FS specific, which
index 3abc6da..260c2ee 100644 (file)
@@ -22,6 +22,8 @@
 #define NR_RNODE_SLOTS (1 << LOG_RNODE_SLOTS)
 
 #include <ros/common.h>
+#include <kthread.h>
+#include <atomic.h>
 
 struct radix_node {
        void                                            *items[NR_RNODE_SLOTS];
@@ -31,15 +33,36 @@ struct radix_node {
        struct radix_node                       **my_slot;
 };
 
-/* Defines the whole tree. */
+/* writers (insert, delete, and callbacks) must synchronize externally, e.g. a
+ * qlock in the pagemap.
+ *
+ * radix_lookup_slot returns an rcu-protected pointer that needs to be
+ * rcu_read_locked.  The item in the slot (either by *slot or by a regular
+ * radix_lookup) has limited protections.  Higher layers (pagemap) can do their
+ * own thing.  For instance, the page cache writers can zero an item if they
+ * know they cleared the page without someone else grabbing a ref.  We'll
+ * rcu-protect the item pointer, so that higher layers can use RCU for the
+ * actual object.
+ *
+ * Basically the only functions that don't need the caller to hold a qlock are
+ * the two lookup routines: the readers.  The seq counter protects the atomicity
+ * of root, depth, and upper_bound, which defines the reader's start point.  RCU
+ * protects the lifetime of the rnodes, which is where lookup_slot's pointer
+ * points to.
+ *
+ * Note that we use rcu_assign_pointer and rcu_dereference whenever we
+ * manipulate pointers in the tree (pointers to or within rnodes).  We use RCU
+ * for the item pointer too, so that our callers can use RCU if they want.  Both
+ * the slot pointer and what it points to are protected by RCU.
+ */
 struct radix_tree {
+       seq_ctr_t                                       seq;
        struct radix_node                       *root;
        unsigned int                            depth;
        unsigned long                           upper_bound;
 };
 
 void radix_init(void);         /* initializes the whole radix system */
-#define RADIX_INITIALIZER {0, 0, 0}
 void radix_tree_init(struct radix_tree *tree); /* inits one tree */
 void radix_tree_destroy(struct radix_tree *tree);
 
index 66249ce..b78f9ef 100644 (file)
@@ -12,6 +12,7 @@
 #include <assert.h>
 #include <stdio.h>
 #include <pagemap.h>
+#include <rcu.h>
 
 void pm_add_vmr(struct page_map *pm, struct vm_region *vmr)
 {
@@ -105,11 +106,11 @@ void pm_init(struct page_map *pm, struct page_map_operations *op, void *host)
 {
        pm->pm_file = host;
        radix_tree_init(&pm->pm_tree);
-       pm->pm_num_pages = 0;                                   /* no pages in a new pm */
+       pm->pm_num_pages = 0;
        pm->pm_op = op;
+       qlock_init(&pm->pm_qlock);
        spinlock_init(&pm->pm_lock);
        TAILQ_INIT(&pm->pm_vmrs);
-       atomic_set(&pm->pm_removal, 0);
 }
 
 /* Looks up the index'th page in the page map, returning a refcnt'd reference
@@ -119,8 +120,10 @@ static struct page *pm_find_page(struct page_map *pm, unsigned long index)
        void **tree_slot;
        void *old_slot_val, *slot_val;
        struct page *page = 0;
-       /* Read walking the PM tree TODO: (RCU) */
-       spin_lock(&pm->pm_lock);
+
+       /* We use rcu to protect our radix walk, specifically the tree_slot pointer.
+        * We get our own 'pm refcnt' on the slot itself, which doesn't need RCU. */
+       rcu_read_lock();
        /* We're syncing with removal.  The deal is that if we grab the page (and
         * we'd only do that if the page != 0), we up the slot ref and clear
         * removal.  A remover will only remove it if removal is still set.  If we
@@ -142,7 +145,7 @@ static struct page *pm_find_page(struct page_map *pm, unsigned long index)
        } while (!atomic_cas_ptr(tree_slot, old_slot_val, slot_val));
        assert(page->pg_tree_slot == tree_slot);
 out:
-       spin_unlock(&pm->pm_lock);
+       rcu_read_unlock();
        return page;
 }
 
@@ -161,8 +164,7 @@ static int pm_insert_page(struct page_map *pm, unsigned long index,
        int ret;
        void **tree_slot;
        void *slot_val = 0;
-       /* write locking the PM */
-       spin_lock(&pm->pm_lock);
+
        page->pg_mapping = pm;  /* debugging */
        page->pg_index = index;
        /* no one should be looking at the tree slot til we stop write locking.  the
@@ -171,16 +173,15 @@ static int pm_insert_page(struct page_map *pm, unsigned long index,
        slot_val = pm_slot_inc_refcnt(slot_val);
        /* passing the page ref from the caller to the slot */
        slot_val = pm_slot_set_page(slot_val, page);
-       /* shouldn't need a CAS or anything for the slot write, since we hold the
-        * write lock.  o/w, we'd need to get the slot and CAS instead of insert. */
+       qlock(&pm->pm_qlock);
        ret = radix_insert(&pm->pm_tree, index, slot_val, &tree_slot);
        if (ret) {
-               spin_unlock(&pm->pm_lock);
+               qunlock(&pm->pm_qlock);
                return ret;
        }
        page->pg_tree_slot = tree_slot;
        pm->pm_num_pages++;
-       spin_unlock(&pm->pm_lock);
+       qunlock(&pm->pm_qlock);
        return 0;
 }
 
@@ -393,21 +394,14 @@ int pm_remove_contig(struct page_map *pm, unsigned long index,
        void *ptr_store[PTR_ARR_LEN];
        int ptr_free_idx = 0;
        struct page *page;
+
        /* could also call a simpler remove if nr_pgs == 1 */
        if (!nr_pgs)
                return 0;
-       /* only one remover at a time (since we walk the PM multiple times as our
-        * 'working list', and need the REMOVAL flag to tell us which pages we're
-        * working on.  with more than one remover, we'd be confused and would need
-        * another list.) */
-       if (atomic_swap(&pm->pm_removal, 1)) {
-               /* We got a 1 back, so someone else is already removing */
-               return 0;
-       }
-       /* TODO: RCU: we're read walking the PM tree and write walking the VMR list.
-        * the reason for the write lock is since we need to prevent new VMRs or the
-        * changing of a VMR to being pinned. o/w, we could fail to unmap and check
-        * for dirtiness. */
+
+       /* This is a mess.  Qlock due to the radix_delete later.  spinlock for the
+        * VMR lists. */
+       qlock(&pm->pm_qlock);
        spin_lock(&pm->pm_lock);
        assert(index + nr_pgs > index); /* til we figure out who validates */
        /* check for any pinned VMRs.  if we have none, then we can skip some loops
@@ -609,7 +603,7 @@ handle_dirty:
        }
        pm->pm_num_pages -= nr_removed;
        spin_unlock(&pm->pm_lock);
-       atomic_set(&pm->pm_removal, 0);
+       qunlock(&pm->pm_qlock);
        return nr_removed;
 }
 
@@ -617,10 +611,14 @@ static bool pm_has_vmr_with_page(struct page_map *pm, unsigned long pg_idx)
 {
        struct vm_region *vmr_i;
 
+       spin_lock(&pm->pm_lock);
        TAILQ_FOREACH(vmr_i, &pm->pm_vmrs, vm_pm_link) {
-               if (vmr_has_page_idx(vmr_i, pg_idx))
+               if (vmr_has_page_idx(vmr_i, pg_idx)) {
+                       spin_unlock(&pm->pm_lock);
                        return true;
+               }
        }
+       spin_unlock(&pm->pm_lock);
        return false;
 }
 
@@ -634,7 +632,7 @@ static bool __remove_or_zero_cb(void **slot, unsigned long tree_idx, void *arg)
        slot_val = old_slot_val;
        page = pm_slot_get_page(slot_val);
        /* We shouldn't have an item in the tree without a page, unless there's
-        * another removal.  Currently, this CB is called with a spinlock. */
+        * another removal.  Currently, this CB is called with a qlock. */
        assert(page);
        /* Don't even bother with VMRs that might have faulted in the page */
        if (pm_has_vmr_with_page(pm, tree_idx)) {
@@ -663,12 +661,10 @@ void pm_remove_or_zero_pages(struct page_map *pm, unsigned long start_idx,
        unsigned long end_idx = start_idx + nr_pgs;
 
        assert(end_idx > start_idx);
-       /* Before removing the lock, double check the CB.  For instance, we assume a
-        * single remover */
-       spin_lock(&pm->pm_lock);
+       qlock(&pm->pm_qlock);
        radix_for_each_slot_in_range(&pm->pm_tree, start_idx, end_idx,
                                     __remove_or_zero_cb, pm);
-       spin_unlock(&pm->pm_lock);
+       qunlock(&pm->pm_qlock);
 }
 
 static bool __destroy_cb(void **slot, unsigned long tree_idx, void *arg)
index 969ae93..8390efa 100644 (file)
@@ -7,8 +7,10 @@
 #include <ros/errno.h>
 #include <radix.h>
 #include <slab.h>
+#include <kmalloc.h>
 #include <string.h>
 #include <stdio.h>
+#include <rcu.h>
 
 struct kmem_cache *radix_kcache;
 static struct radix_node *__radix_lookup_node(struct radix_tree *tree,
@@ -28,6 +30,7 @@ void radix_init(void)
 /* Initializes a tree dynamically */
 void radix_tree_init(struct radix_tree *tree)
 {
+       tree->seq = SEQCTR_INITIALIZER;
        tree->root = 0;
        tree->depth = 0;
        tree->upper_bound = 0;
@@ -56,19 +59,19 @@ void radix_tree_destroy(struct radix_tree *tree)
 
 /* Attempts to insert an item in the tree at the given key.  ENOMEM if we ran
  * out of memory, EEXIST if an item is already in the tree.  On success, will
- * also return the slot pointer, if requested. */
+ * also return the slot pointer, if requested.
+ *
+ * Caller must maintain mutual exclusion (qlock) */
 int radix_insert(struct radix_tree *tree, unsigned long key, void *item,
                  void ***slot_p)
 {
-       printd("RADIX: insert %p at %d\n", item, key);
        struct radix_node *r_node;
        void **slot;
+
        /* Is the tree tall enough?  if not, it needs to grow a level.  This will
         * also create the initial node (upper bound starts at 0). */
        while (key >= tree->upper_bound) {
-               r_node = kmem_cache_alloc(radix_kcache, 0);
-               if (!r_node)
-                       return -ENOMEM;
+               r_node = kmem_cache_alloc(radix_kcache, MEM_WAIT);
                memset(r_node, 0, sizeof(struct radix_node));
                if (tree->root) {
                        /* tree->root is the old root, now a child of the future root */
@@ -81,32 +84,38 @@ int radix_insert(struct radix_tree *tree, unsigned long key, void *item,
                        r_node->leaf = TRUE;
                        r_node->parent = 0;
                }
+               /* Need to atomically change root, depth, and upper_bound for our
+                * readers, who will check the seq ctr. */
+               __seq_start_write(&tree->seq);
                tree->root = r_node;
                r_node->my_slot = &tree->root;
                tree->depth++;
                tree->upper_bound = 1ULL << (LOG_RNODE_SLOTS * tree->depth);
+               __seq_end_write(&tree->seq);
        }
        assert(tree->root);
        /* the tree now thinks it is tall enough, so find the last node, insert in
         * it, etc */
+       /* This gives us an rcu-protected pointer, though we hold the lock. */
        r_node = __radix_lookup_node(tree, key, TRUE);
        assert(r_node);         /* we want an ENOMEM actually, but i want to see this */
        slot = &r_node->items[key & (NR_RNODE_SLOTS - 1)];
        if (*slot)
                return -EEXIST;
-       *slot = item;
+       rcu_assign_pointer(*slot, item);
        r_node->num_items++;
        if (slot_p)
-               *slot_p = slot;
+               *slot_p = slot; /* passing back an rcu-protected pointer */
        return 0;
 }
 
 /* Removes an item from it's parent's structure, freeing the parent if there is
  * nothing left, potentially recursively. */
-static void __radix_remove_slot(struct radix_node *r_node, struct radix_node **slot)
+static void __radix_remove_slot(struct radix_node *r_node,
+                                struct radix_node **slot)
 {
        assert(*slot);          /* make sure there is something there */
-       *slot = 0;
+       rcu_assign_pointer(*slot, NULL);
        r_node->num_items--;
        /* this check excludes the root, but the if else handles it.  For now, once
         * we have a root, we'll always keep it (will need some changing in
@@ -116,6 +125,7 @@ static void __radix_remove_slot(struct radix_node *r_node, struct radix_node **s
                        __radix_remove_slot(r_node->parent, r_node->my_slot);
                else                    /* we're the last node, attached to the actual tree */
                        *(r_node->my_slot) = 0;
+               synchronize_rcu();
                kmem_cache_free(radix_kcache, r_node);
        }
 }
@@ -123,17 +133,21 @@ static void __radix_remove_slot(struct radix_node *r_node, struct radix_node **s
 /* Removes a key/item from the tree, returning that item (the void*).  If it
  * detects a radix_node is now unused, it will dealloc that node.  Though the
  * tree will still think it is tall enough to handle its old upper_bound.  It
- * won't "shrink". */
+ * won't "shrink".
+ *
+ * Caller must maintain mutual exclusion (qlock) */
 void *radix_delete(struct radix_tree *tree, unsigned long key)
 {
-       printd("RADIX: delete %d\n", key);
        void **slot;
        void *retval;
-       struct radix_node *r_node = __radix_lookup_node(tree, key, 0);
+       struct radix_node *r_node;
+
+       /* This is an rcu-protected pointer, though the caller holds a lock. */
+       r_node = __radix_lookup_node(tree, key, false);
        if (!r_node)
                return 0;
        slot = &r_node->items[key & (NR_RNODE_SLOTS - 1)];
-       retval = *slot;
+       retval = rcu_dereference(*slot);
        if (retval) {
                __radix_remove_slot(r_node, (struct radix_node**)slot);
        } else {
@@ -146,11 +160,13 @@ void *radix_delete(struct radix_tree *tree, unsigned long key)
 /* Returns the item for a given key.  0 means no item, etc. */
 void *radix_lookup(struct radix_tree *tree, unsigned long key)
 {
-       printd("RADIX: lookup %d\n", key);
        void **slot = radix_lookup_slot(tree, key);
+
        if (!slot)
                return 0;
-       return *slot;
+       /* slot was rcu-protected, pointing into the memory of an r_node.  we also
+        * want *slot, which is "void *item" to be an rcu-protected pointer. */
+       return rcu_dereference(*slot);
 }
 
 /* Returns a pointer to the radix_node holding a given key.  0 if there is no
@@ -161,42 +177,47 @@ void *radix_lookup(struct radix_tree *tree, unsigned long key)
  * ......444444333333222222111111
  *
  * If an interior node of the tree is missing, this will add one if it was
- * directed to extend the tree. */
+ * directed to extend the tree.
+ *
+ * If we might extend, the caller must maintain mutual exclusion (qlock) */
 static struct radix_node *__radix_lookup_node(struct radix_tree *tree,
                                               unsigned long key, bool extend)
 {
-       printd("RADIX: lookup_node %d, %d\n", key, extend);
-       unsigned long idx;
-       struct radix_node *child_node, *r_node = tree->root;
-       if (key >= tree->upper_bound) {
+       unsigned long idx, upper_bound;
+       unsigned int depth;
+       seq_ctr_t seq;
+       struct radix_node *child_node, *r_node;
+
+       do {
+               seq = ACCESS_ONCE(tree->seq);
+               rmb();
+               r_node = rcu_dereference(tree->root);
+               depth = tree->depth;
+               upper_bound = tree->upper_bound;
+       } while (seqctr_retry(tree->seq, seq));
+
+       if (key >= upper_bound) {
                if (extend)
-                       warn("Bound (%d) not set for key %d!\n", tree->upper_bound, key);
+                       warn("Bound (%d) not set for key %d!\n", upper_bound, key);
                return 0;
        }
-       for (int i = tree->depth; i > 1; i--) {  /* i = ..., 4, 3, 2 */
+       for (int i = depth; i > 1; i--) {        /* i = ..., 4, 3, 2 */
                idx = (key >> (LOG_RNODE_SLOTS * (i - 1))) & (NR_RNODE_SLOTS - 1);
                /* There might not be a node at this part of the tree */
                if (!r_node->items[idx]) {
-                       if (!extend) {
+                       if (!extend)
                                return 0;
-                       } else {
-                               /* so build one, possibly returning 0 if we couldn't */
-                               child_node = kmem_cache_alloc(radix_kcache, 0);
-                               if (!child_node)
-                                       return 0;
-                               r_node->items[idx] = child_node;
-                               memset(child_node, 0, sizeof(struct radix_node));
-                               /* when we are on the last iteration (i == 2), the child will be
-                                * a leaf. */
-                               child_node->leaf = (i == 2) ? TRUE : FALSE;
-                               child_node->parent = r_node;
-                               child_node->my_slot = (struct radix_node**)&r_node->items[idx];
-                               r_node->num_items++;
-                               r_node = (struct radix_node*)r_node->items[idx];
-                       }
-               } else {
-                       r_node = (struct radix_node*)r_node->items[idx];
+                       child_node = kmem_cache_alloc(radix_kcache, MEM_WAIT);
+                       memset(child_node, 0, sizeof(struct radix_node));
+                       /* when we are on the last iteration (i == 2), the child will be
+                        * a leaf. */
+                       child_node->leaf = (i == 2) ? TRUE : FALSE;
+                       child_node->parent = r_node;
+                       child_node->my_slot = (struct radix_node**)&r_node->items[idx];
+                       r_node->num_items++;
+                       rcu_assign_pointer(r_node->items[idx], child_node);
                }
+               r_node = (struct radix_node*)rcu_dereference(r_node->items[idx]);
        }
        return r_node;
 }
@@ -205,11 +226,13 @@ static struct radix_node *__radix_lookup_node(struct radix_tree *tree,
  * etc */
 void **radix_lookup_slot(struct radix_tree *tree, unsigned long key)
 {
-       printd("RADIX: lookup slot %d\n", key);
        struct radix_node *r_node = __radix_lookup_node(tree, key, FALSE);
+
        if (!r_node)
                return 0;
        key = key & (NR_RNODE_SLOTS - 1);
+       /* r_node is rcu-protected.  Our retval is too, since it's a pointer into
+        * the same object as r_node. */
        return &r_node->items[key];
 }
 
@@ -289,7 +312,9 @@ static bool rnode_for_each(struct radix_node *r_node, int depth,
        return num_children ? false : true;
 }
 
-/* [start_idx, end_idx) */
+/* [start_idx, end_idx).
+ *
+ * Caller must maintain mutual exclusion (qlock) */
 void radix_for_each_slot_in_range(struct radix_tree *tree,
                                   unsigned long start_idx,
                                   unsigned long end_idx,
@@ -300,6 +325,7 @@ void radix_for_each_slot_in_range(struct radix_tree *tree,
        rnode_for_each(tree->root, tree->depth, 0, start_idx, end_idx, cb, arg);
 }
 
+/* Caller must maintain mutual exclusion (qlock) */
 void radix_for_each_slot(struct radix_tree *tree, radix_cb_t cb, void *arg)
 {
        radix_for_each_slot_in_range(tree, 0, ULONG_MAX, cb, arg);