Condition variables
authorBarret Rhoden <brho@cs.berkeley.edu>
Thu, 25 Oct 2012 21:08:13 +0000 (14:08 -0700)
committerBarret Rhoden <brho@cs.berkeley.edu>
Thu, 25 Oct 2012 21:08:13 +0000 (14:08 -0700)
Built with semaphores.  This is notoriously tricky, and I might not have
done it correctly.

See http://research.microsoft.com/pubs/64242/implementingcvs.pdf for a
discussion of some of the issues.

I get around it by peaking in the semaphore and using its count as a
'lock' to enforce an ordering on waiters so that the order in which you
hit the CV is the order that you hit the internal sem.  And since it's
the sem's count, we can atomically sleep on the sem and 'release' that
'lock'.

kern/include/kthread.h
kern/src/kthread.c
kern/src/testing.c

index a0f3c56..93d6d11 100644 (file)
@@ -37,6 +37,12 @@ struct semaphore {
        spinlock_t                                      lock;
 };
 
+struct cond_var {
+       struct semaphore                        sem;
+       spinlock_t                                      lock;
+       unsigned long                           nr_waiters;
+};
+
 /* This doesn't have to be inline, but it doesn't matter for now */
 static inline void init_sem(struct semaphore *sem, int signals)
 {
@@ -97,4 +103,12 @@ void __launch_kthread(struct trapframe *tf, uint32_t srcid, long a0, long a1,
                          long a2);
 void kthread_yield(void);
 
+void cv_init(struct cond_var *cv);
+void cv_lock(struct cond_var *cv);
+void cv_unlock(struct cond_var *cv);
+void cv_wait_and_unlock(struct cond_var *cv);
+void cv_wait(struct cond_var *cv);
+void cv_signal(struct cond_var *cv);
+void cv_broadcast(struct cond_var *cv);
+
 #endif /* ROS_KERN_KTHREAD_H */
index 1f9fc35..a977714 100644 (file)
@@ -275,3 +275,96 @@ void kthread_yield(void)
        send_kernel_message(core_id(), __wake_me_up, 0, 0, 0, KMSG_ROUTINE);
        sleep_on(sem);
 }
+
+/* Condition variables, using semaphores and kthreads */
+void cv_init(struct cond_var *cv)
+{
+       init_sem(&cv->sem, 0);
+       spinlock_init(&cv->lock);
+       cv->nr_waiters = 0;
+}
+
+void cv_lock(struct cond_var *cv)
+{
+       spin_lock_irqsave(&cv->lock);
+}
+
+void cv_unlock(struct cond_var *cv)
+{
+       spin_unlock_irqsave(&cv->lock);
+}
+
+/* Helper to clarify the wait/signalling code */
+static int nr_sem_waiters(struct semaphore *sem)
+{
+       int retval;
+       retval = 0 - sem->nr_signals;
+       assert(retval >= 0);
+       return retval;
+}
+
+/* Comes in locked */
+void cv_wait_and_unlock(struct cond_var *cv)
+{
+       unsigned long nr_prev_waiters;
+       nr_prev_waiters = cv->nr_waiters++;
+       spin_unlock_irqsave(&cv->lock);
+       /* Wait til our turn.  This forces an ordering of all waiters such that the
+        * order in which they wait is the order in which they down the sem. */
+       while (nr_prev_waiters != nr_sem_waiters(&cv->sem))
+               cpu_relax();
+       printd("core %d, sees nr_sem_waiters: %d, cv_nr_waiters %d\n",
+              core_id(), nr_sem_waiters(&cv->sem), cv->nr_waiters);
+       /* Atomically sleeps and 'unlocks' the next kthread from its busy loop (the
+        * one right above this), when it changes the sems nr_signals/waiters. */
+       sleep_on(&cv->sem);
+}
+
+/* Comes in locked */
+void cv_wait(struct cond_var *cv)
+{
+       cv_wait_and_unlock(cv);
+       cv_lock(cv);
+}
+
+/* Helper, wakes exactly one, and there should have been at least one waiter. */
+static void sem_wake_one(struct semaphore *sem)
+{
+       struct kthread *kthread;
+       spin_lock_irqsave(&sem->lock);
+       assert(sem->nr_signals < 0);
+       sem->nr_signals++;
+       kthread = TAILQ_FIRST(&sem->waiters);
+       TAILQ_REMOVE(&sem->waiters, kthread, link);
+       spin_unlock_irqsave(&sem->lock);
+       kthread_runnable(kthread);
+}
+
+void cv_signal(struct cond_var *cv)
+{
+       spin_lock_irqsave(&cv->lock);
+       /* Can't short circuit this stuff.  We need to make sure any waiters that
+        * made it past upping the cv->nr_waiters has also downed the sem.
+        * Otherwise we muck with nr_waiters, which could break the ordering
+        * required by the waiters.  We also need to lock while making this check,
+        * o/w a new waiter can slip in after our while loop. */
+       while (cv->nr_waiters != nr_sem_waiters(&cv->sem))
+               cpu_relax();
+       if (cv->nr_waiters) {
+               cv->nr_waiters--;
+               sem_wake_one(&cv->sem);
+       }
+       spin_unlock_irqsave(&cv->lock);
+}
+
+void cv_broadcast(struct cond_var *cv)
+{
+       spin_lock_irqsave(&cv->lock);
+       while (cv->nr_waiters != nr_sem_waiters(&cv->sem))
+               cpu_relax();
+       while (cv->nr_waiters) {
+               cv->nr_waiters--;
+               sem_wake_one(&cv->sem);
+       }
+       spin_unlock_irqsave(&cv->lock);
+}
index 4857697..9ad9038 100644 (file)
@@ -1456,3 +1456,137 @@ void test_abort_halt(void)
        printk("Core 0 sent the IPI\n");
 #endif /* __i386__ */
 }
+
+void test_cv(void)
+{
+       static struct cond_var local_cv;
+       static atomic_t counter;
+       struct cond_var *cv = &local_cv;
+       int nr_msgs;
+       volatile bool state = FALSE;    /* for test 3 */
+
+       void __test_cv_signal(struct trapframe *tf, uint32_t srcid, long a0,
+                               long a1, long a2)
+       {
+               if (atomic_read(&counter) % 4)
+                       cv_signal(cv);
+               else
+                       cv_broadcast(cv);
+               atomic_dec(&counter);
+               smp_idle();
+       }
+       void __test_cv_waiter(struct trapframe *tf, uint32_t srcid, long a0,
+                             long a1, long a2)
+       {
+               cv_lock(cv);
+               /* check state, etc */
+               cv_wait_and_unlock(cv);
+               atomic_dec(&counter);
+               smp_idle();
+       }
+       void __test_cv_waiter_t3(struct trapframe *tf, uint32_t srcid, long a0,
+                                long a1, long a2)
+       {
+               udelay(a0);
+               /* if state == false, we haven't seen the signal yet */
+#if 0
+               /* this way is a little more verbose, but avoids unnecessary locking */
+               while (!state) {
+                       cv_lock(cv);
+                       /* first check is an optimization */
+                       if (state) {
+                               cv_unlock(cv);
+                               break;
+                       }
+                       cpu_relax();
+                       cv_wait_and_unlock(cv);
+               }
+#endif
+               /* this is the more traditional CV style */
+               cv_lock(cv);
+               while (!state) {
+                       cpu_relax();
+                       cv_wait(cv);    /* unlocks and relocks */
+               }
+               cv_unlock(cv);
+
+               /* Make sure we are done, tell the controller we are done */
+               cmb();
+               assert(state);
+               atomic_dec(&counter);
+               smp_idle();     /* kmsgs that might block cannot return! */
+       }
+
+       cv_init(cv);
+       /* Test 0: signal without waiting */
+       cv_broadcast(cv);
+       cv_signal(cv);
+       kthread_yield();
+       printk("test_cv: signal without waiting complete\n");
+
+       /* Test 1: single / minimal shit */
+       nr_msgs = num_cpus - 1; /* not using cpu 0 */
+       atomic_init(&counter, nr_msgs);
+       for (int i = 1; i < num_cpus; i++)
+               send_kernel_message(i, __test_cv_waiter, 0, 0, 0, KMSG_ROUTINE);
+       udelay(1000000);
+       cv_signal(cv);
+       kthread_yield();
+       while (atomic_read(&counter) != nr_msgs - 1)
+               cpu_relax();
+       printk("test_cv: single signal complete\n");
+       cv_broadcast(cv);
+       /* broadcast probably woke up the waiters on our core.  since we want to
+        * spin on their completion, we need to yield for a bit. */
+       kthread_yield();
+       while (atomic_read(&counter))
+               cpu_relax();
+       printk("test_cv: broadcast signal complete\n");
+
+       /* Test 2: shitloads of waiters and signalers */
+       nr_msgs = 0x500;        /* any more than 0x20000 could go OOM */
+       atomic_init(&counter, nr_msgs);
+       for (int i = 0; i < nr_msgs; i++) {
+               int cpu = (i % (num_cpus - 1)) + 1;
+               if (atomic_read(&counter) % 5)
+                       send_kernel_message(cpu, __test_cv_waiter, 0, 0, 0, KMSG_ROUTINE);
+               else
+                       send_kernel_message(cpu, __test_cv_signal, 0, 0, 0, KMSG_ROUTINE);
+       }
+       kthread_yield();        /* run whatever messages we sent to ourselves */
+       while (atomic_read(&counter)) {
+               cpu_relax();
+               cv_broadcast(cv);
+               udelay(1000000);
+               kthread_yield();        /* run whatever messages we sent to ourselves */
+       }
+       assert(!cv->nr_waiters);
+       printk("test_cv: massive message storm complete\n");
+
+       /* Test 3: basic one signaller, one receiver.  we want to vary the amount of
+        * time the sender and receiver delays, starting with (1ms, 0ms) and ending
+        * with (0ms, 1ms).  At each extreme, such as with the sender waiting 1ms,
+        * the receiver/waiter should hit the "check and wait" point well before the
+        * sender/signaller hits the "change state and signal" point. */
+       for (int i = 0; i < 1000; i++) {
+               for (int j = 0; j < 10; j++) {  /* some extra chances at each point */
+                       state = FALSE;
+                       atomic_init(&counter, 1);       /* signal that the client is done */
+                       /* client waits for i usec */
+                       send_kernel_message(2, __test_cv_waiter_t3, i, 0, 0, KMSG_ROUTINE);
+                       cmb();
+                       udelay(1000 - i);       /* senders wait time: 1000..0 */
+                       state = TRUE;
+                       cv_signal(cv);
+                       /* signal might have unblocked a kthread, let it run */
+                       kthread_yield();
+                       /* they might not have run at all yet (in which case they lost the
+                        * race and don't need the signal).  but we need to wait til they're
+                        * done */
+                       while (atomic_read(&counter))
+                               cpu_relax();
+                       assert(!cv->nr_waiters);
+               }
+       }
+       printk("test_cv: single sender/receiver complete\n");
+}