Reader-writer queue locks
authorBarret Rhoden <brho@cs.berkeley.edu>
Thu, 26 Sep 2013 18:20:36 +0000 (11:20 -0700)
committerBarret Rhoden <brho@cs.berkeley.edu>
Thu, 16 Jan 2014 02:13:15 +0000 (18:13 -0800)
These will be the qlocks in 9ns/inferno.

kern/include/rwlock.h [new file with mode: 0644]
kern/src/Kbuild
kern/src/rwlock.c [new file with mode: 0644]
kern/src/testing.c

diff --git a/kern/include/rwlock.h b/kern/include/rwlock.h
new file mode 100644 (file)
index 0000000..c7018ed
--- /dev/null
@@ -0,0 +1,36 @@
+/* Copyright (c) 2013 The Regents of the University of California
+ * Barret Rhoden <brho@cs.berkeley.edu>
+ * See LICENSE for details.
+ *
+ * Reader-writer queue locks (sleeping locks).
+ *
+ * Readers favor readers; writers favor writers.  Check out rwlock.c for more
+ * info. 
+ *
+ * One consequence of this: "if some reader holds a rwlock, then any other
+ * thread (including itself) can get an rlock". */
+
+#ifndef ROS_KERN_RWLOCK_H
+#define ROS_KERN_RWLOCK_H
+
+#include <ros/common.h>
+#include <kthread.h>
+#include <atomic.h>
+
+struct rwlock {
+       spinlock_t                                      lock;
+       atomic_t                                        nr_readers;
+       bool                                            writing;
+       struct cond_var                         readers;
+       struct cond_var                         writers;
+};
+typedef struct rwlock rwlock_t;
+
+void rwinit(struct rwlock *rw_lock);
+void rlock(struct rwlock *rw_lock);
+bool canrlock(struct rwlock *rw_lock);
+void runlock(struct rwlock *rw_lock);
+void wlock(struct rwlock *rw_lock);
+void wunlock(struct rwlock *rw_lock);
+
+#endif /* ROS_KERN_RWLOCK_H */
index ce020eb..3aaf92d 100644 (file)
@@ -32,6 +32,7 @@ obj-y                                         += printfmt.o
 obj-y                                          += process.o
 obj-y                                          += radix.o
 obj-y                                          += readline.o
+obj-y                                          += rwlock.o
 obj-y                                          += schedule.o
 obj-y                                          += slab.o
 obj-y                                          += smp.o
diff --git a/kern/src/rwlock.c b/kern/src/rwlock.c
new file mode 100644 (file)
index 0000000..ffd4bd3
--- /dev/null
@@ -0,0 +1,121 @@
+/* Copyright (c) 2013 The Regents of the University of California
+ * Barret Rhoden <brho@cs.berkeley.edu>
+ * See LICENSE for details.
+ *
+ * Reader-writer queue locks (sleeping locks).
+ *
+ * We favor readers when reading, meaning new readers can move ahead of writers.
+ * Ex: If i have some readers, then a writer, clearly the writer blocks.  If
+ * more readers come in, they can just come in and the presence of the writer
+ * doesn't stop them.
+ *
+ * You get potential writer starvation, but you also get the property that
+ * if a thread holds a read-lock, that thread can grab the same reader
+ * lock again.  A more general statement would be "if some reader holds
+ * a rwlock, then any other thread (including itself) can get an rlock".
+ *
+ * Similarly, writers favor other writers.  So if a writer is unlocking, it'll
+ * pass the lock to another writer first.  Here, there is potential reader
+ * starvation.
+ *
+ * We also pass locks, instead of letting recently woken threads fight for it.
+ * In the case of a reader wakeup, we know that they all will wake up and read.
+ * Instead of having them fight for a lock and then incref, the waker (the last
+ * writer) will up the count and just wake everyone.
+ *
+ * This also helps when a writer wants to favor another writer.  If we didn't
+ * pass the lock, then a new reader could squeeze in after our old writer
+ * signalled the new writer.  Even worse, in this case, the readers that we
+ * didn't wake up are still sleeping, even though a reader now holds the lock.
+ * It won't deadlock, (since eventually the reader will wake the writer, who
+ * wakes the old readers) but it breaks the notion of a RW lock a bit. */
+
+#include <rwlock.h>
+#include <atomic.h>
+#include <kthread.h>
+
+void rwinit(struct rwlock *rw_lock)
+{
+       spinlock_init(&rw_lock->lock);
+       atomic_init(&rw_lock->nr_readers, 0);
+       rw_lock->writing = FALSE;
+       cv_init_with_lock(&rw_lock->readers, &rw_lock->lock);
+       cv_init_with_lock(&rw_lock->writers, &rw_lock->lock);
+}
+
+void rlock(struct rwlock *rw_lock)
+{
+       /* If we already have a reader, we can just increment and return.  This is
+        * the only access to nr_readers outside the lock.  All locked uses need to
+        * be aware that the nr could be concurrently increffed (unless it is 0). */
+       if (atomic_add_not_zero(&rw_lock->nr_readers, 1))
+               return;
+       /* Here's an alternate style: the broadcaster (a writer) will up the readers
+        * count and just wake us.  All readers just proceed, instead of fighting to
+        * lock and up the count.  The writer 'passed' the rlock to us. */
+       spin_lock(&rw_lock->lock);
+       if (rw_lock->writing) {
+               cv_wait_and_unlock(&rw_lock->readers);
+               return;
+       }
+       atomic_inc(&rw_lock->nr_readers);
+       spin_unlock(&rw_lock->lock);
+}
+
+bool canrlock(struct rwlock *rw_lock)
+{
+       if (atomic_add_not_zero(&rw_lock->nr_readers, 1))
+               return TRUE;
+       spin_lock(&rw_lock->lock);
+       if (rw_lock->writing) {
+               spin_unlock(&rw_lock->lock);
+               return FALSE;
+       }
+       atomic_inc(&rw_lock->nr_readers);
+       spin_unlock(&rw_lock->lock);
+       return TRUE;
+}
+
+void runlock(struct rwlock *rw_lock)
+{
+       spin_lock(&rw_lock->lock);
+       /* sub and test will tell us if we got the refcnt to 0, atomically.  syncing
+        * with the atomic_add_not_zero of new readers.  Since we're passing the
+        * lock, we need to make sure someone is sleeping.  Contrast to the wunlock,
+        * where we can just blindly broadcast and add (potentially == 0). */
+       if (atomic_sub_and_test(&rw_lock->nr_readers, 1) &&
+               rw_lock->writers.nr_waiters) {
+               /* passing the lock to the one writer we signal. */
+               rw_lock->writing = TRUE;
+               __cv_signal(&rw_lock->writers);
+       }
+       spin_unlock(&rw_lock->lock);
+}
+
+void wlock(struct rwlock *rw_lock)
+{
+       spin_lock(&rw_lock->lock);
+       if (atomic_read(&rw_lock->nr_readers) || rw_lock->writing) {
+               /* If we slept, the lock was passed to us */
+               cv_wait_and_unlock(&rw_lock->writers);
+               return;
+       }
+       rw_lock->writing = TRUE;
+       spin_unlock(&rw_lock->lock);
+}
+
+void wunlock(struct rwlock *rw_lock)
+{
+       /* Pass the lock to another writer (we leave writing = TRUE) */
+       spin_lock(&rw_lock->lock);
+       if (rw_lock->writers.nr_waiters) {
+               /* Just waking one */
+               __cv_signal(&rw_lock->writers);
+               spin_unlock(&rw_lock->lock);
+               return;
+       }
+       rw_lock->writing = FALSE;
+       atomic_set(&rw_lock->nr_readers, rw_lock->readers.nr_waiters);
+       __cv_broadcast(&rw_lock->readers);
+       spin_unlock(&rw_lock->lock);
+}
index 0dbb40c..7c7a072 100644 (file)
@@ -39,6 +39,7 @@
 #include <ucq.h>
 #include <setjmp.h>
 #include <apipe.h>
+#include <rwlock.h>
 
 #define l1 (available_caches.l1)
 #define l2 (available_caches.l2)
@@ -1445,10 +1446,10 @@ void test_abort_halt(void)
 }
 
 /* Funcs and global vars for test_cv() */
-struct cond_var local_cv;
-atomic_t counter;
-struct cond_var *cv = &local_cv;
-volatile bool state = FALSE;           /* for test 3 */
+static struct cond_var local_cv;
+static atomic_t counter;
+static struct cond_var *cv = &local_cv;
+static volatile bool state = FALSE;            /* for test 3 */
 
 void __test_cv_signal(uint32_t srcid, long a0, long a1, long a2)
 {
@@ -1715,3 +1716,55 @@ void test_apipe(void)
         * might expect us to return while being on core 0 (like if we were kfunc'd
         * from the monitor.  Be careful if you copy this code. */
 }
+
+static struct rwlock rwlock, *rwl = &rwlock;
+static atomic_t rwlock_counter;
+void test_rwlock(void)
+{
+       bool ret;
+       rwinit(rwl);
+       /* Basic: can i lock twice, recursively? */
+       rlock(rwl);
+       ret = canrlock(rwl);
+       assert(ret);
+       runlock(rwl);
+       runlock(rwl);
+       /* Other simply tests */
+       wlock(rwl);
+       wunlock(rwl);
+
+       /* Just some half-assed different operations */
+       void __test_rwlock(uint32_t srcid, long a0, long a1, long a2)
+       {
+               int rand = read_tsc() & 0xff;
+               for (int i = 0; i < 10000; i++) {
+                       switch ((rand * i) % 5) {
+                               case 0:
+                               case 1:
+                                       rlock(rwl);
+                                       runlock(rwl);
+                                       break;
+                               case 2:
+                               case 3:
+                                       if (canrlock(rwl))
+                                               runlock(rwl);
+                                       break;
+                               case 4:
+                                       wlock(rwl);
+                                       wunlock(rwl);
+                                       break;
+                       }
+               }
+               /* signal to allow core 0 to finish */
+               atomic_dec(&rwlock_counter);
+       }
+               
+       /* send 4 messages to each non core 0 */
+       atomic_init(&rwlock_counter, (num_cpus - 1) * 4);
+       for (int i = 1; i < num_cpus; i++)
+               for (int j = 0; j < 4; j++)
+                       send_kernel_message(i, __test_rwlock, 0, 0, 0, KMSG_ROUTINE);
+       while (atomic_read(&rwlock_counter))
+               cpu_relax();
+       printk("rwlock test complete\n");
+}