Page cache rewrite, including page removal
authorBarret Rhoden <brho@cs.berkeley.edu>
Tue, 4 Feb 2014 23:21:38 +0000 (15:21 -0800)
committerBarret Rhoden <brho@cs.berkeley.edu>
Sun, 9 Feb 2014 07:22:59 +0000 (23:22 -0800)
Lightly tested.  pm_remove_contig() is made to handle all sorts of
races, but I haven't been able to test all of them.

If I screwed up something, like tracking dirty pages, we probably won't
notice til data magically disappears and programs silently fail.  It'd
be great to have someone review this, but that'll probably end up being
future-barret.

kern/include/page_alloc.h
kern/include/pagemap.h
kern/src/mm.c
kern/src/pagemap.c

index bda74ce..40ce8c4 100644 (file)
@@ -33,6 +33,7 @@ typedef LIST_ENTRY(page) page_list_entry_t;
 #define PG_DIRTY               0x004   /* page map, data is dirty */
 #define PG_BUFFER              0x008   /* is a buffer page, has BHs */
 #define PG_PAGEMAP             0x010   /* belongs to a page map */
+#define PG_REMOVAL             0x020   /* Working flag for page map removal */
 
 /* TODO: this struct is not protected from concurrent operations in some
  * functions.  If you want to lock on it, use the spinlock in the semaphore.
@@ -43,8 +44,9 @@ struct page {
        LIST_ENTRY(page)                        pg_link;        /* membership in various lists */
        struct kref                                     pg_kref;
        atomic_t                                        pg_flags;
-       struct page_map                         *pg_mapping;
+       struct page_map                         *pg_mapping; /* for debugging... */
        unsigned long                           pg_index;
+       void                                            **pg_tree_slot;
        void                                            *pg_private;    /* type depends on page usage */
        struct semaphore                        pg_sem;         /* for blocking on IO */
 };
index 7c10dc3..c23155e 100644 (file)
@@ -18,6 +18,7 @@
 struct page;
 struct inode;
 struct block_device;
+struct chan;
 struct page_map_operations;
 
 /* Every object that has pages, like an inode or the swap (or even direct block
@@ -27,15 +28,14 @@ struct page_map {
        union {
                struct inode                            *pm_host;       /* inode of the owner, if any */
                struct block_device                     *pm_bdev;       /* bdev of the owner, if any */
+               struct chan                                     *pm_chan;
        };
        struct radix_tree                       pm_tree;                /* tracks present pages */
-       spinlock_t                                      pm_tree_lock;   /* spinlock => we can't block */
        unsigned long                           pm_num_pages;   /* how many pages are present */
        struct page_map_operations      *pm_op;
-       unsigned int                            pm_flags;
-       /*... and private lists, backing block dev info, other mappings, etc. */
        spinlock_t                                      pm_lock;
        struct vmr_tailq                        pm_vmrs;
+       atomic_t                                        pm_removal;
 };
 
 /* Operations performed on a page_map.  These are usually FS specific, which
@@ -63,6 +63,8 @@ int pm_load_page(struct page_map *pm, unsigned long index, struct page **pp);
 void pm_put_page(struct page *page);
 void pm_add_vmr(struct page_map *pm, struct vm_region *vmr);
 void pm_remove_vmr(struct page_map *pm, struct vm_region *vmr);
+int pm_remove_contig(struct page_map *pm, unsigned long index,
+                     unsigned long nr_pgs);
 void print_page_map_info(struct page_map *pm);
 
 #endif /* ROS_KERN_PAGEMAP_H */
index bc231df..60f8d3d 100644 (file)
@@ -740,6 +740,15 @@ int handle_page_fault(struct proc* p, uintptr_t va, int prot)
        return ret;
 }
 
+/* Helper - drop the page differently based on where it is from */
+static void __put_page(struct page *page)
+{
+       if (atomic_read(&page->pg_flags) & PG_PAGEMAP)
+               pm_put_page(page);
+       else
+               page_decref(page);
+}
+
 /* Returns 0 on success, or an appropriate -error code.  Assumes you hold the
  * vmr_lock.
  *
@@ -767,6 +776,7 @@ int __handle_page_fault(struct proc *p, uintptr_t va, int prot)
                /* No file - just want anonymous memory */
                if (upage_alloc(p, &a_page, TRUE))
                        return -ENOMEM;
+               a_page->pg_tree_slot = 0;
        } else {
                /* If this fails, either something got screwed up with the VMR, or the
                 * permissions changed after mmap/mprotect.  Either way, I want to know
@@ -801,11 +811,12 @@ int __handle_page_fault(struct proc *p, uintptr_t va, int prot)
                if ((vmr->vm_flags & MAP_PRIVATE)) {
                        struct page *cache_page = a_page;
                        if (upage_alloc(p, &a_page, FALSE)) {
-                               page_decref(cache_page);        /* was the original a_page */
+                               __put_page(cache_page); /* was the original a_page */
                                return -ENOMEM;
                        }
+                       a_page->pg_tree_slot = 0;
                        memcpy(page2kva(a_page), page2kva(cache_page), PGSIZE);
-                       page_decref(cache_page);                /* was the original a_page */
+                       __put_page(cache_page);         /* was the original a_page */
                        /* Debugging */
                        if (!(vmr->vm_prot & PROT_WRITE))
                                printd("[kernel] private, but unwritable file mapping of %s "
@@ -826,7 +837,7 @@ int __handle_page_fault(struct proc *p, uintptr_t va, int prot)
        pte = pgdir_walk(p->env_pgdir, (void*)va, 1);
        if (!pte) {
                spin_unlock(&p->pte_lock);
-               pm_put_page(a_page);
+               __put_page(a_page);
                return -ENOMEM;
        }
        /* a spurious, valid PF is possible due to a legit race: the page might have
@@ -834,18 +845,19 @@ int __handle_page_fault(struct proc *p, uintptr_t va, int prot)
         * in which case we should just return. */
        if (PAGE_PRESENT(*pte)) {
                spin_unlock(&p->pte_lock);
-               pm_put_page(a_page);
+               __put_page(a_page);
                return 0;
-       } else if (PAGE_PAGED_OUT(*pte)) {
-               /* TODO: (SWAP) bring in the paged out frame. (BLK) */
-               panic("Swapping not supported!");
-               spin_unlock(&p->pte_lock);
-               pm_put_page(a_page);
-               return -1;
        }
+       /* preserve the dirty bit - pm removal could be looking concurrently */
+       pte_prot |= (*pte & PTE_D ? PTE_D : 0);
        /* We have a ref to a_page, which we are storing in the PTE */
        *pte = PTE(page2ppn(a_page), PTE_P | pte_prot);
        spin_unlock(&p->pte_lock);
+       /* the VMR's existence in the PM (via the mmap) allows us to have PTE point
+        * to a_page without it magically being reallocated.  For non-PM memory
+        * (anon memory or private pages) we transferred the ref to the PTE. */
+       if (atomic_read(&a_page->pg_flags) & PG_PAGEMAP)
+               pm_put_page(a_page);
        return 0;
 }
 
index 524bb29..38dc45d 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (c) 2010 The Regents of the University of California
+/* Copyright (c) 2014 The Regents of the University of California
  * Barret Rhoden <brho@cs.berkeley.edu>
  * See LICENSE for details.
  *
@@ -15,7 +15,8 @@
 void pm_add_vmr(struct page_map *pm, struct vm_region *vmr)
 {
        /* note that the VMR being reverse-mapped by the PM is protected by the PM's
-        * lock.  so later when removal holds this, it delays munmaps and keeps the
+        * lock.  we clearly need a write lock here, but removal also needs a write
+        * lock, so later when removal holds this, it delays munmaps and keeps the
         * VMR connected. */
        spin_lock(&pm->pm_lock);
        TAILQ_INSERT_TAIL(&pm->pm_vmrs, vmr, vm_pm_link);
@@ -29,81 +30,186 @@ void pm_remove_vmr(struct page_map *pm, struct vm_region *vmr)
        spin_unlock(&pm->pm_lock);
 }
 
+/* PM slot void *s look like this:
+ *
+ * |--11--|--1--|----52 or 20 bits--|
+ * | ref  | flag|    ppn of page    |
+ *              \  <--- meta shift -/
+ *
+ * The setter funcs return the void* that should update slot_val; it doesn't
+ * change slot_val in place (it's a val, not the addr) */
+
+#ifdef CONFIG_64BIT
+# define PM_FLAGS_SHIFT 52
+#else
+# define PM_FLAGS_SHIFT 20
+#endif
+#define PM_REFCNT_SHIFT (PM_FLAGS_SHIFT + 1)
+
+#define PM_REMOVAL (1UL << PM_FLAGS_SHIFT)
+
+static bool pm_slot_check_removal(void *slot_val)
+{
+       return (unsigned long)slot_val & PM_REMOVAL ? TRUE : FALSE;
+}
+
+static void *pm_slot_set_removal(void *slot_val)
+{
+       return (void*)((unsigned long)slot_val | PM_REMOVAL);
+}
+
+static void *pm_slot_clear_removal(void *slot_val)
+{
+       return (void*)((unsigned long)slot_val & ~PM_REMOVAL);
+}
+
+static int pm_slot_check_refcnt(void *slot_val)
+{
+       return (unsigned long)slot_val >> PM_REFCNT_SHIFT;
+}
+
+static void *pm_slot_inc_refcnt(void *slot_val)
+{
+       return (void*)((unsigned long)slot_val + (1UL << PM_REFCNT_SHIFT));
+}
+
+static void *pm_slot_dec_refcnt(void *slot_val)
+{
+       return (void*)((unsigned long)slot_val - (1UL << PM_REFCNT_SHIFT));
+}
+
+static struct page *pm_slot_get_page(void *slot_val)
+{
+       if (!slot_val)
+               return 0;
+       return ppn2page((unsigned long)slot_val & ((1UL << PM_FLAGS_SHIFT) - 1));
+}
+
+static void *pm_slot_set_page(void *slot_val, struct page *pg)
+{
+       assert(pg != pages);    /* we should never alloc page 0, for sanity */
+       return (void*)(page2ppn(pg) | ((unsigned long)slot_val &
+                                      ~((1UL << PM_FLAGS_SHIFT) - 1)));
+}
+
 /* Initializes a PM.  Host should be an *inode or a *bdev (doesn't matter).  The
  * reference this stores is uncounted. */
 void pm_init(struct page_map *pm, struct page_map_operations *op, void *host)
 {
        pm->pm_bdev = host;                                             /* note the uncounted ref */
        radix_tree_init(&pm->pm_tree);
-       spinlock_init(&pm->pm_tree_lock);
        pm->pm_num_pages = 0;                                   /* no pages in a new pm */
        pm->pm_op = op;
-       pm->pm_flags = 0;
        spinlock_init(&pm->pm_lock);
        TAILQ_INIT(&pm->pm_vmrs);
+       atomic_set(&pm->pm_removal, 0);
 }
 
-/* Looks up the index'th page in the page map, returning an incref'd reference,
- * or 0 if it was not in the map. */
+/* Looks up the index'th page in the page map, returning a refcnt'd reference
+ * that need to be dropped with pm_put_page, or 0 if it was not in the map. */
 static struct page *pm_find_page(struct page_map *pm, unsigned long index)
 {
-       spin_lock(&pm->pm_tree_lock);
-       struct page *page = (struct page*)radix_lookup(&pm->pm_tree, index);
-       if (page)
-               page_incref(page);
-       spin_unlock(&pm->pm_tree_lock);
+       void **tree_slot;
+       void *old_slot_val, *slot_val;
+       struct page *page = 0;
+       /* Read walking the PM tree TODO: (RCU) */
+       spin_lock(&pm->pm_lock);
+       /* We're syncing with removal.  The deal is that if we grab the page (and
+        * we'd only do that if the page != 0), we up the slot ref and clear
+        * removal.  A remover will only remove it if removal is still set.  If we
+        * grab and release while removal is in progress, even though we no longer
+        * hold the ref, we have unset removal.  Also, to prevent removal where we
+        * get a page well before the removal process, the removal won't even bother
+        * when the slot refcnt is upped. */
+       tree_slot = radix_lookup_slot(&pm->pm_tree, index);
+       if (!tree_slot)
+               goto out;
+       do {
+               old_slot_val = ACCESS_ONCE(*tree_slot);
+               slot_val = old_slot_val;
+               page = pm_slot_get_page(slot_val);
+               if (!page)
+                       goto out;
+               slot_val = pm_slot_clear_removal(slot_val);
+               slot_val = pm_slot_inc_refcnt(slot_val);        /* not a page kref */
+       } while (!atomic_cas_ptr(tree_slot, old_slot_val, slot_val));
+       assert(page->pg_tree_slot == tree_slot);
+out:
+       spin_unlock(&pm->pm_lock);
        return page;
 }
 
 /* Attempts to insert the page into the page_map, returns 0 for success, or an
  * error code if there was one already (EEXIST) or we ran out of memory
- * (ENOMEM).  On success, this will preemptively lock the page, and will also
- * store a reference to the page in the pm. */
+ * (ENOMEM).
+ *
+ * On success, callers *lose* their page ref, but get a PM slot ref.  This slot
+ * ref is sufficient to keep the page alive (slot ref protects the page ref)..
+ *
+ * Makes no assumptions about the quality of the data loaded, that's up to the
+ * caller. */
 static int pm_insert_page(struct page_map *pm, unsigned long index,
                           struct page *page)
 {
-       int error = 0;
-       spin_lock(&pm->pm_tree_lock);
-       error = radix_insert(&pm->pm_tree, index, page, 0);
-       if (!error) {
-               page_incref(page);
-               /* setting PG_BUF since we know it'll be used for IO later... */
-               atomic_or(&page->pg_flags, PG_LOCKED | PG_BUFFER | PG_PAGEMAP);
-               page->pg_sem.nr_signals = 0;            /* ensure others will block */
-               page->pg_mapping = pm;
-               page->pg_index = index;
-               pm->pm_num_pages++;
+       int ret;
+       void **tree_slot;
+       void *slot_val = 0;
+       /* write locking the PM */
+       spin_lock(&pm->pm_lock);
+       page->pg_mapping = pm;  /* debugging */
+       page->pg_index = index;
+       /* no one should be looking at the tree slot til we stop write locking.  the
+        * only other one who looks is removal, who requires a PM write lock. */
+       page->pg_tree_slot = (void*)0xdeadbeef; /* poison */
+       slot_val = pm_slot_inc_refcnt(slot_val);
+       /* passing the page ref from the caller to the slot */
+       slot_val = pm_slot_set_page(slot_val, page);
+       /* shouldn't need a CAS or anything for the slot write, since we hold the
+        * write lock.  o/w, we'd need to get the slot and CAS instead of insert. */
+       ret = radix_insert(&pm->pm_tree, index, slot_val, &tree_slot);
+       if (ret) {
+               spin_unlock(&pm->pm_lock);
+               return ret;
        }
-       spin_unlock(&pm->pm_tree_lock);
-       return error;
+       page->pg_tree_slot = tree_slot;
+       pm->pm_num_pages++;
+       spin_unlock(&pm->pm_lock);
+       return 0;
 }
 
+/* Decrefs the PM slot ref (usage of a PM page).  The PM's page ref remains. */
 void pm_put_page(struct page *page)
 {
-       page_decref(page);
+       void **tree_slot = page->pg_tree_slot;
+       assert(tree_slot);
+       /* decref, don't care about CASing */
+       atomic_add((atomic_t*)tree_slot, -(1UL << PM_REFCNT_SHIFT));
 }
 
 /* Makes sure the index'th page of the mapped object is loaded in the page cache
- * and returns its location via **pp.  Note this will give you a refcnt'd
- * reference to the page.  This may block! TODO: (BLK) */
+ * and returns its location via **pp.
+ *
+ * You'll get a pm-slot refcnt back, which you need to put when you're done. */
 int pm_load_page(struct page_map *pm, unsigned long index, struct page **pp)
 {
        struct page *page;
        int error;
-       bool page_was_mapped = TRUE;
 
        page = pm_find_page(pm, index);
        while (!page) {
-               /* kpage_alloc, since we want the page to persist after the proc
-                * dies (can be used by others, until the inode shuts down). */
                if (kpage_alloc(&page))
                        return -ENOMEM;
-               /* might want to initialize other things, perhaps in page_alloc() */
-               atomic_set(&page->pg_flags, 0);
+               /* important that UP_TO_DATE is not set.  once we put it in the PM,
+                * others can find it, and we still need to fill it.
+                *
+                * TODO: seems shitty: setting PG_BUF since we know it'll be used for IO
+                * later... */
+               atomic_set(&page->pg_flags, PG_LOCKED | PG_BUFFER | PG_PAGEMAP);
+               page->pg_sem.nr_signals = 0;    /* preemptively locking */
                error = pm_insert_page(pm, index, page);
                switch (error) {
                        case 0:
-                               page_was_mapped = FALSE;
+                               goto load_locked_page;
                                break;
                        case -EEXIST:
                                /* the page was mapped already (benign race), just get rid of
@@ -112,46 +218,366 @@ int pm_load_page(struct page_map *pm, unsigned long index, struct page **pp)
                                page = pm_find_page(pm, index);
                                break;
                        default:
-                               /* something is wrong, bail out! */
                                page_decref(page);
                                return error;
                }
        }
-       assert(page && kref_refcnt(&page->pg_kref));
-       /* At this point, page is a refcnt'd page, and we return the reference.
-        * Also, there's an unlikely race where we're not in the page cache anymore,
-        * and this all is useless work. */
-       *pp = page;
-       /* if the page was in the map, we need to do some checks, and might have to
-        * read in the page later.  If the page was freshly inserted to the pm by
-        * us, we skip this since we are the one doing the readpage(). */
-       if (page_was_mapped) {
-               /* is it already here and up to date?  if so, we're done */
-               if (atomic_read(&page->pg_flags) & PG_UPTODATE)
-                       return 0;
-               /* if not, try to lock the page (could BLOCK) */
-               lock_page(page);
-               /* we got it, is our page still in the cache?  check the mapping.  if
-                * not, start over, perhaps with EAGAIN and outside support */
-               if (!page->pg_mapping)
-                       panic("Page is not in the mapping!  Haven't implemented this!");
-               /* double check, are we up to date?  if so, we're done */
-               if (atomic_read(&page->pg_flags) & PG_UPTODATE) {
-                       unlock_page(page);
-                       return 0;
-               }
+       assert(page && pm_slot_check_refcnt(*page->pg_tree_slot));
+       if (atomic_read(&page->pg_flags) & PG_UPTODATE) {
+               *pp = page;
+               printd("pm %p FOUND page %p, addr %p, idx %d\n", pm, page,
+                      page2kva(page), index);
+               return 0;
        }
-       /* if we're here, the page is locked by us, and it needs to be read in */
-       assert(page->pg_mapping == pm);
-       /* Readpage will block internally, returning when it is done */
+       lock_page(page);
+       /* double-check.  if we we blocked on lock_page, it was probably for someone
+        * else loading.  plus, we can't load a page more than once (it could
+        * clobber newer writes) */
+       if (atomic_read(&page->pg_flags) & PG_UPTODATE) {
+               unlock_page(page);
+               *pp = page;
+               return 0;
+       }
+       /* fall through */
+load_locked_page:
+       /* TODO: this is slightly shitty: readpage sets UPTODATE */
        error = pm->pm_op->readpage(pm, page);
        assert(!error);
-       /* Unlock, since we're done with the page and it is up to date */
-       unlock_page(page);
        assert(atomic_read(&page->pg_flags) & PG_UPTODATE);
+       unlock_page(page);
+       *pp = page;
+       printd("pm %p LOADS page %p, addr %p, idx %d\n", pm, page,
+              page2kva(page), index);
        return 0;
 }
 
+static bool vmr_has_page_idx(struct vm_region *vmr, unsigned long pg_idx)
+{
+       unsigned long nr_pgs = (vmr->vm_end - vmr->vm_base) >> PGSHIFT;
+       unsigned long start_pg = vmr->vm_foff >> PGSHIFT;
+       return ((start_pg <= pg_idx) && (pg_idx < start_pg + nr_pgs));
+}
+
+static void *vmr_idx_to_va(struct vm_region *vmr, unsigned long pg_idx)
+{
+       uintptr_t va = vmr->vm_base + ((pg_idx << PGSHIFT) - vmr->vm_foff);
+       assert(va < vmr->vm_end);
+       return (void*)va;
+}
+
+static unsigned long vmr_get_end_idx(struct vm_region *vmr)
+{
+       return ((vmr->vm_end - vmr->vm_base) + vmr->vm_foff) >> PGSHIFT;
+}
+
+static void vmr_for_each(struct vm_region *vmr, unsigned long pg_idx,
+                         unsigned long max_nr_pgs, mem_walk_callback_t callback)
+{
+       void *start_va = vmr_idx_to_va(vmr, pg_idx);
+       size_t len = vmr->vm_end - (uintptr_t)start_va;
+       len = MIN(len, max_nr_pgs << PGSHIFT);
+       /* TODO: start using pml_for_each, across all arches */
+       env_user_mem_walk(vmr->vm_proc, start_va, len, callback, 0);
+}
+
+/* These next two helpers are called on a VMR's range of VAs corresponding to a
+ * pages in a PM undergoing removal.
+ *
+ * In general, it is safe to mark !P or 0 a PTE so long as the page the PTE
+ * points to belongs to a PM.  We'll refault, find the page, and rebuild the
+ * PTE.  This allows us to handle races like: {pm marks !p, {fault, find page,
+ * abort removal, write new pte}, pm clears pte}.
+ *
+ * In that race, HPF is writing the PTE, which removal code subsequently looks
+ * at to determine if the underlying page is dirty.  We need to make sure no one
+ * clears dirty bits unless they handle the WB (or discard).  HPF preserves the
+ * dirty bit for this reason. */
+static int __pm_mark_not_present(struct proc *p, pte_t *pte, void *va, void *arg)
+{
+       struct page *page;
+       if (!PAGE_PRESENT(*pte))
+               return 0;
+       page = ppn2page(PTE2PPN(*pte));
+       if (atomic_read(&page->pg_flags) & PG_REMOVAL)
+               *pte &= ~PTE_P;
+       return 0;
+}
+
+static int __pm_mark_dirty_pgs_unmap(struct proc *p, pte_t *pte, void *va,
+                                     void *arg)
+{
+       struct page *page;
+       /* we're not checking for 'present' or not, since we marked them !P earlier.
+        * but the CB is still called on everything in the range.  we can tell the
+        * formerly-valid PTEs from the completely unmapped, since the latter are
+        * == 0, while the former have other things in them, but just are !P. */
+       if (!(*pte))
+               return 0;
+       page = ppn2page(PTE2PPN(*pte));
+       /* need to check for removal again, just like in mark_not_present */
+       if (atomic_read(&page->pg_flags) & PG_REMOVAL) {
+               if (*pte & PTE_D)
+                       atomic_or(&page->pg_flags, PG_DIRTY);
+               *pte = 0;
+       }
+       return 0;
+}
+
+static int __pm_mark_unmap(struct proc *p, pte_t *pte, void *va, void *arg)
+{
+       struct page *page;
+       if (!(*pte))
+               return 0;
+       page = ppn2page(PTE2PPN(*pte));
+       if (atomic_read(&page->pg_flags) & PG_REMOVAL)
+               *pte = 0;
+       return 0;
+}
+
+static void shootdown_and_reset_ptrstore(void *proc_ptrs[], int *arr_idx)
+{
+       for (int i = 0; i < *arr_idx; i++)
+               proc_tlbshootdown((struct proc*)proc_ptrs[i], 0, 0);
+       *arr_idx = 0;
+}
+
+/* Attempts to remove pages from the pm, from [index, index + nr_pgs).  Returns
+ * the number of pages removed.  There can only be one remover at a time per PM
+ * - others will return 0. */
+int pm_remove_contig(struct page_map *pm, unsigned long index,
+                     unsigned long nr_pgs)
+{
+       unsigned long i;
+       int nr_removed = 0;
+       void **tree_slot;
+       void *old_slot_val, *slot_val;
+       struct vm_region *vmr_i;
+       bool pm_has_pinned_vmrs = FALSE;
+       /* using this for both procs and later WBs */
+       #define PTR_ARR_LEN 10
+       void *ptr_store[PTR_ARR_LEN];
+       int ptr_free_idx = 0;
+       struct page *page;
+       /* could also call a simpler remove if nr_pgs == 1 */
+       if (!nr_pgs)
+               return 0;
+       /* only one remover at a time (since we walk the PM multiple times as our
+        * 'working list', and need the REMOVAL flag to tell us which pages we're
+        * working on.  with more than one remover, we'd be confused and would need
+        * another list.) */
+       if (atomic_swap(&pm->pm_removal, 1)) {
+               /* We got a 1 back, so someone else is already removing */
+               return 0;
+       }
+       /* TODO: RCU: we're read walking the PM tree and write walking the VMR list.
+        * the reason for the write lock is since we need to prevent new VMRs or the
+        * changing of a VMR to being pinned. o/w, we could fail to unmap and check
+        * for dirtiness. */
+       spin_lock(&pm->pm_lock);
+       assert(index + nr_pgs > index); /* til we figure out who validates */
+       /* check for any pinned VMRs.  if we have none, then we can skip some loops
+        * later */
+       TAILQ_FOREACH(vmr_i, &pm->pm_vmrs, vm_pm_link) {
+               if (vmr_i->vm_flags & MAP_LOCKED)
+                       pm_has_pinned_vmrs = TRUE;
+       }
+       /* this pass, we mark pages for removal */
+       for (i = index; i < index + nr_pgs; i++) {
+               if (pm_has_pinned_vmrs) {
+                       /* for pinned pages, we don't even want to attempt to remove them */
+                       TAILQ_FOREACH(vmr_i, &pm->pm_vmrs, vm_pm_link) {
+                               /* once we've found a pinned page, we can skip over the rest of
+                                * the range of pages mapped by this vmr - even if the vmr
+                                * hasn't actually faulted them in yet. */
+                               if ((vmr_i->vm_flags & MAP_LOCKED) &&
+                                   (vmr_has_page_idx(vmr_i, i))) {
+                                       i = vmr_get_end_idx(vmr_i) - 1; /* loop will +1 */
+                                       goto next_loop_mark_rm;
+                               }
+                       }
+               }
+               /* TODO: would like a radix_next_slot() iterator (careful with skipping
+                * chunks of the loop) */
+               tree_slot = radix_lookup_slot(&pm->pm_tree, i);
+               if (!tree_slot)
+                       continue;
+               old_slot_val = ACCESS_ONCE(*tree_slot);
+               slot_val = old_slot_val;
+               page = pm_slot_get_page(slot_val);
+               if (!page)
+                       continue;
+               /* syncing with lookups, writebacks, etc.  only one remover per pm in
+                * general.  any new ref-getter (WB, lookup, etc) will clear removal,
+                * causing us to abort later. */
+               if (pm_slot_check_refcnt(slot_val))
+                       continue;
+               /* it's possible that removal is already set, if we happened to repeat a
+                * loop (due to running out of space in the proc arr) */
+               slot_val = pm_slot_set_removal(slot_val);
+               if (!atomic_cas_ptr(tree_slot, old_slot_val, slot_val))
+                       continue;
+               /* mark the page itself.  this isn't used for syncing - just out of
+                * convenience for ourselves (memwalk callbacks are easier).  need the
+                * atomic in case a new user comes in and tries mucking with the flags*/
+               atomic_or(&page->pg_flags, PG_REMOVAL);
+next_loop_mark_rm:
+               ;
+       }
+       /* second pass, over VMRs instead of pages.  we remove the marked pages from
+        * all VMRs, collecting the procs for batch shootdowns.  not sure how often
+        * we'll have more than one VMR (for a PM) per proc.  shared libs tend to
+        * have a couple, so we'll still batch things up */
+       TAILQ_FOREACH(vmr_i, &pm->pm_vmrs, vm_pm_link) {
+               /* might have some pinned VMRs that only map part of the file we aren't
+                * messing with (so they didn't trigger earlier). */
+               if (vmr_i->vm_flags & MAP_LOCKED)
+                       continue;
+               /* Private mappings: for each page, either PMs have a separate copy
+                * hanging off their PTE (in which case they aren't using the PM page,
+                * and the actual page in use won't have PG_REMOVAL set, and the CB will
+                * ignore it), or they are still using the shared version.  In which
+                * case they haven't written it yet, and we can remove it.  If they
+                * concurrently are performing a write fault to CoW the page, they will
+                * incref and clear REMOVAL, thereby aborting the remove anyways.
+                *
+                * Though if the entire mapping is unique-copies of private pages, we
+                * won't need a shootdown.  mem_walk can't handle this yet though. */
+               if (!vmr_has_page_idx(vmr_i, index))
+                       continue;
+               spin_lock(&vmr_i->vm_proc->pte_lock);
+               /* all PTEs for pages marked for removal are marked !P for the entire
+                * range.  it's possible we'll remove some extra PTEs (races with
+                * loaders, etc), but those pages will remain in the PM and should get
+                * soft-faulted back in. */
+               vmr_for_each(vmr_i, index, nr_pgs, __pm_mark_not_present);
+               spin_unlock(&vmr_i->vm_proc->pte_lock);
+               /* batching TLB shootdowns for a given proc (continue if found).
+                * the proc stays alive while we hold a read lock on the PM tree,
+                * since the VMR can't get yanked out yet. */
+               for (i = 0; i < ptr_free_idx; i++) {
+                       if (ptr_store[i] == vmr_i->vm_proc)
+                               break;
+               }
+               if (i != ptr_free_idx)
+                       continue;
+               if (ptr_free_idx == PTR_ARR_LEN)
+                       shootdown_and_reset_ptrstore(ptr_store, &ptr_free_idx);
+               ptr_store[ptr_free_idx++] = vmr_i->vm_proc;
+       }
+       /* Need to shootdown so that all TLBs have the page marked absent.  Then we
+        * can check the dirty bit, now that concurrent accesses will fault.  btw,
+        * we have a lock ordering: pm (RCU) -> proc lock (state, vcmap, etc) */
+       shootdown_and_reset_ptrstore(ptr_store, &ptr_free_idx);
+       /* Now that we've shotdown, we can check for dirtiness.  One downside to
+        * this approach is we check every VMR for a page, even once we know the
+        * page is dirty.  We also need to unmap the pages (set ptes to 0) for any
+        * that we previously marked not present (complete the unmap).  We're racing
+        * with munmap here, which treats the PTE as a weak ref on a page. */
+       TAILQ_FOREACH(vmr_i, &pm->pm_vmrs, vm_pm_link) {
+               if (vmr_i->vm_flags & MAP_LOCKED)
+                       continue;
+               if (!vmr_has_page_idx(vmr_i, index))
+                       continue;
+               spin_lock(&vmr_i->vm_proc->pte_lock);
+               if (vmr_i->vm_prot & PROT_WRITE)
+                       vmr_for_each(vmr_i, index, nr_pgs, __pm_mark_dirty_pgs_unmap);
+               else
+                       vmr_for_each(vmr_i, index, nr_pgs, __pm_mark_unmap);
+               spin_unlock(&vmr_i->vm_proc->pte_lock);
+       }
+       /* Now we'll go through from the PM again and deal with pages are dirty. */
+       i = index;
+handle_dirty:
+       for (/* i set already */; i < index + nr_pgs; i++) {
+               /* TODO: consider putting in the pinned check & advance again.  Careful,
+                * since we could unlock on a handle_dirty loop, and skipping could skip
+                * over a new VMR, but those pages would still be marked for removal.
+                * It's not wrong, currently, to have spurious REMOVALs. */
+               tree_slot = radix_lookup_slot(&pm->pm_tree, i);
+               if (!tree_slot)
+                       continue;
+               page = pm_slot_get_page(*tree_slot);
+               if (!page)
+                       continue;
+               /* only operate on pages we marked earlier */
+               if (!(atomic_read(&page->pg_flags) & PG_REMOVAL))
+                       continue;
+               /* if someone has used it since we grabbed it, we lost the race and
+                * won't remove it later.  no sense writing it back now either. */
+               if (!pm_slot_check_removal(*tree_slot)) {
+                       /* since we set PG_REMOVAL, we're the ones to clear it */
+                       atomic_and(&page->pg_flags, ~PG_REMOVAL);
+                       continue;
+               }
+               /* this dirty flag could also be set by write()s, not just VMRs */
+               if (atomic_read(&page->pg_flags) & PG_DIRTY) {
+                       /* need to bail out.  after we WB, we'll restart this big loop where
+                        * we left off ('i' is still set) */
+                       if (ptr_free_idx == PTR_ARR_LEN)
+                               break;
+                       ptr_store[ptr_free_idx++] = page;
+                       /* once we've decided to WB, we can clear the dirty flag.  might
+                        * have an extra WB later, but we won't miss new data */
+                       atomic_and(&page->pg_flags, ~PG_DIRTY);
+               }
+       }
+       /* we're unlocking, meaning VMRs and the radix tree can be changed, but we
+        * are still the only remover. still can have new refs that clear REMOVAL */
+       spin_unlock(&pm->pm_lock);
+       /* could batch these up, etc. */
+       for (int j = 0; j < ptr_free_idx; j++)
+               pm->pm_op->writepage(pm, (struct page*)ptr_store[j]);
+       ptr_free_idx = 0;
+       spin_lock(&pm->pm_lock);
+       /* bailed out of the dirty check loop earlier, need to finish and WB.  i is
+        * still set to where we failed and left off in the big loop. */
+       if (i < index + nr_pgs)
+               goto handle_dirty;
+       /* TODO: RCU - we need a write lock here (the current spinlock is fine) */
+       /* All dirty pages were WB, anything left as REMOVAL can be removed */
+       for (i = index; i < index + nr_pgs; i++) {
+               /* TODO: consider putting in the pinned check & advance again */
+               tree_slot = radix_lookup_slot(&pm->pm_tree, i);
+               if (!tree_slot)
+                       continue;
+               old_slot_val = ACCESS_ONCE(*tree_slot);
+               slot_val = old_slot_val;
+               page = pm_slot_get_page(*tree_slot);
+               if (!page)
+                       continue;
+               if (!(atomic_read(&page->pg_flags) & PG_REMOVAL))
+                       continue;
+               /* syncing with lookups, writebacks, etc.  if someone has used it since
+                * we started removing, they would have cleared the slot's REMOVAL (but
+                * not PG_REMOVAL), though the refcnt could be back down to 0 again. */
+               if (!pm_slot_check_removal(slot_val)) {
+                       /* since we set PG_REMOVAL, we're the ones to clear it */
+                       atomic_and(&page->pg_flags, ~PG_REMOVAL);
+                       continue;
+               }
+               if (pm_slot_check_refcnt(slot_val))
+                       warn("Unexpected refcnt in PM remove!");
+               /* Note that we keep slot REMOVAL set, so the radix tree thinks it's
+                * still an item (artifact of that implementation). */
+               slot_val = pm_slot_set_page(slot_val, 0);
+               if (!atomic_cas_ptr(tree_slot, old_slot_val, slot_val)) {
+                       atomic_and(&page->pg_flags, ~PG_REMOVAL);
+                       continue;
+               }
+               /* at this point, we're free at last!  When we update the radix tree, it
+                * still thinks it has an item.  This is fine.  Lookups will now fail
+                * (since the page is 0), and insertions will block on the write lock.*/
+               atomic_set(&page->pg_flags, 0); /* cause/catch bugs */
+               page_decref(page);
+               nr_removed++;
+               radix_delete(&pm->pm_tree, i);
+       }
+       pm->pm_num_pages -= nr_removed;
+       spin_unlock(&pm->pm_lock);
+       atomic_set(&pm->pm_removal, 0);
+       return nr_removed;
+}
+
 void print_page_map_info(struct page_map *pm)
 {
        struct vm_region *vmr_i;