Pthread condition variables redone
authorBarret Rhoden <brho@cs.berkeley.edu>
Sat, 23 Mar 2013 02:51:40 +0000 (19:51 -0700)
committerBarret Rhoden <brho@cs.berkeley.edu>
Sat, 23 Mar 2013 05:05:52 +0000 (22:05 -0700)
Now they block in the 2LS, instead of spinning like on an HPC barrier.
The test is basically the same as the one in the kernel, maybe a bit
better (definitely clearer, due to yields and join calls).

tests/condvar_test.c [new file with mode: 0644]
user/pthread/pthread.c
user/pthread/pthread.h

diff --git a/tests/condvar_test.c b/tests/condvar_test.c
new file mode 100644 (file)
index 0000000..6480e16
--- /dev/null
@@ -0,0 +1,151 @@
+#include <stdio.h>
+#include <pthread.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <sys/time.h>
+
+/* OS dependent #incs */
+#include <parlib.h>
+#include <vcore.h>
+#include <timing.h>
+
+#define MAX_NR_TEST_THREADS 1000
+
+pthread_t my_threads[MAX_NR_TEST_THREADS];
+void *my_retvals[MAX_NR_TEST_THREADS];
+
+
+/* Funcs and global vars for test_cv() */
+pthread_cond_t local_cv;
+pthread_cond_t *cv = &local_cv;
+pthread_mutex_t local_mutex = PTHREAD_MUTEX_INITIALIZER;
+pthread_mutex_t *pth_m = &local_mutex;
+
+atomic_t counter;
+volatile bool state = FALSE;           /* for test 3 */
+
+void *__test_pthread_cond_signal(void *arg)
+{
+       if (atomic_read(&counter) % 4)
+               pthread_cond_signal(cv);
+       else
+               pthread_cond_broadcast(cv);
+       atomic_dec(&counter);
+}
+
+void *__test_pthread_cond_waiter(void *arg)
+{
+       pthread_mutex_lock(pth_m);
+       /* check state, etc */
+       pthread_cond_wait(cv, pth_m);
+       pthread_mutex_unlock(pth_m);
+       atomic_dec(&counter);
+}
+
+void *__test_pthread_cond_waiter_t3(void *arg)
+{
+       udelay((int)arg);
+       /* if state == false, we haven't seen the signal yet */
+       pthread_mutex_lock(pth_m);
+       printd("Came in, saw state %d\n", state);
+       while (!state) {
+               cpu_relax();
+               pthread_cond_wait(cv, pth_m);   /* unlocks and relocks */
+       }
+       pthread_mutex_unlock(pth_m);
+       /* Make sure we are done, tell the controller we are done */
+       cmb();
+       assert(state);
+       atomic_dec(&counter);
+}
+
+int main(void)
+{
+       int nr_msgs;
+       pthread_cond_init(cv, 0);
+       pthread_mutex_init(pth_m, 0);
+
+       /* Test 0: signal without waiting */
+       pthread_cond_broadcast(cv);
+       pthread_cond_signal(cv);
+       printf("test_cv: signal without waiting complete\n");
+
+       /* Test 1: single / minimal shit */
+       nr_msgs = max_vcores() - 1;
+       atomic_init(&counter, nr_msgs);
+       for (int i = 0; i < nr_msgs; i++) {
+               if (pthread_create(&my_threads[i], NULL, &__test_pthread_cond_waiter,
+                   NULL))
+                       perror("pth_create failed");
+       }
+       udelay(1000000);
+       pthread_cond_signal(cv);
+       /* wait for one to make it */
+       while (atomic_read(&counter) != nr_msgs - 1)
+               pthread_yield();
+       printf("test_cv: single signal complete\n");
+       pthread_cond_broadcast(cv);
+       for (int i = 0; i < nr_msgs; i++)
+               pthread_join(my_threads[i], &my_retvals[i]);
+       printf("test_cv: broadcast signal complete\n");
+
+       /* Test 2: shitloads of waiters and signalers */
+       nr_msgs = MAX_NR_TEST_THREADS;
+       atomic_init(&counter, nr_msgs);
+       for (int i = 0; i < nr_msgs; i++) {
+               if (i % 5) {
+                       if (pthread_create(&my_threads[i], NULL,
+                           &__test_pthread_cond_waiter, NULL))
+                               perror("pth_create failed");
+               } else {
+                       if (pthread_create(&my_threads[i], NULL,
+                           &__test_pthread_cond_signal, NULL))
+                               perror("pth_create failed");
+               }
+       }
+       pthread_yield();
+       while (atomic_read(&counter)) {
+               cpu_relax();
+               pthread_cond_broadcast(cv);
+               pthread_yield();
+       }
+       for (int i = 0; i < nr_msgs; i++)
+               pthread_join(my_threads[i], &my_retvals[i]);
+       printf("test_cv: massive message storm complete\n");
+
+       /* Test 3: basic one signaller, one receiver.  we want to vary the amount of
+        * time the sender and receiver delays, starting with (1ms, 0ms) and ending
+        * with (0ms, 1ms).  At each extreme, such as with the sender waiting 1ms,
+        * the receiver/waiter should hit the "check and wait" point well before the
+        * sender/signaller hits the "change state and signal" point.
+        *
+        * Need to make sure we are running in parallel here.  Temp turned off the
+        * 2LSs VC management and got up to 2 VC.  Assuming no preemption. */
+       pthread_can_vcore_request(FALSE);       /* 2LS won't manage vcores */
+       while (num_vcores() < 2)
+               vcore_request(1);
+       for (int i = 0; i < 1000; i++) {
+               for (int j = 0; j < 10; j++) {  /* some extra chances at each point */
+                       state = FALSE;
+                       /* client waits for i usec */
+                       if (pthread_create(&my_threads[0], NULL,
+                           &__test_pthread_cond_waiter_t3, (void*)i))
+                               perror("pth_create failed");
+                       cmb();
+                       udelay(1000 - i);       /* senders wait time: 1000..0 */
+                       /* Need to lock the mutex when touching state and signalling about
+                        * that state (atomically touch and signal).  Thanks pthreads, for
+                        * mandating a cond_signal that doesn't require locking. */
+                       pthread_mutex_lock(pth_m);
+                       state = TRUE;
+                       pthread_cond_signal(cv);
+                       pthread_mutex_unlock(pth_m);
+                       /* they might not have run at all yet (in which case they lost the
+                        * race and don't need the signal).  but we need to wait til they're
+                        * done */
+                       pthread_join(my_threads[0], my_retvals[0]);
+               }
+       }
+       pthread_can_vcore_request(TRUE);        /* 2LS controls VCs again */
+       printf("test_cv: single sender/receiver complete\n");
+}
index 2909113..fe20ad2 100644 (file)
@@ -642,83 +642,132 @@ int pthread_mutex_destroy(pthread_mutex_t* m)
 
 int pthread_cond_init(pthread_cond_t *c, const pthread_condattr_t *a)
 {
-  c->attr = a;
-  memset(c->waiters,0,sizeof(c->waiters));
-  memset(c->in_use,0,sizeof(c->in_use));
-  c->next_waiter = 0;
-  return 0;
+       TAILQ_INIT(&c->waiters);
+       spin_pdr_init(&c->spdr_lock);
+       if (a) {
+               c->attr_pshared = a->pshared;
+               c->attr_clock = a->clock;
+       } else {
+               c->attr_pshared = PTHREAD_PROCESS_PRIVATE;
+               c->attr_clock = 0;
+       }
+       return 0;
 }
 
 int pthread_cond_destroy(pthread_cond_t *c)
 {
-  return 0;
+       return 0;
 }
 
 int pthread_cond_broadcast(pthread_cond_t *c)
 {
-  memset(c->waiters,0,sizeof(c->waiters));
-  return 0;
+       struct pthread_queue restartees = TAILQ_HEAD_INITIALIZER(restartees);
+       struct pthread_tcb *pthread_i, *temp;
+       spin_pdr_lock(&c->spdr_lock);
+       /* moves all items from waiters onto the end of restartees */
+       TAILQ_CONCAT(&restartees, &c->waiters, next);
+       spin_pdr_unlock(&c->spdr_lock);
+       /* Disable notifs when calling the 2LS op - might protect against a few
+        * things (we don't do this very often).  We're still a uthread, according
+        * to TLS, but other cores (and the kernel) will think we are in VC ctx
+        * (notifs disabled) */
+       uth_disable_notifs();
+       TAILQ_FOREACH_SAFE(pthread_i, &restartees, next, temp) {
+               TAILQ_REMOVE(&restartees, pthread_i, next);
+               uthread_runnable((struct uthread*)pthread_i);
+       }
+       uth_enable_notifs();
+       return 0;
 }
 
+/* spec says this needs to work regardless of whether or not it holds the mutex
+ * already. */
 int pthread_cond_signal(pthread_cond_t *c)
 {
-  int i;
-  for(i = 0; i < MAX_PTHREADS; i++)
-  {
-    if(c->waiters[i])
-    {
-      c->waiters[i] = 0;
-      break;
-    }
-  }
-  return 0;
+       struct pthread_tcb *pthread;
+       spin_pdr_lock(&c->spdr_lock);
+       pthread = TAILQ_FIRST(&c->waiters);
+       if (!pthread) {
+               spin_pdr_unlock(&c->spdr_lock);
+               return 0;
+       }
+       TAILQ_REMOVE(&c->waiters, pthread, next);
+       spin_pdr_unlock(&c->spdr_lock);
+       uthread_runnable((struct uthread*)pthread);
+       return 0;
 }
 
-int pthread_cond_wait(pthread_cond_t *c, pthread_mutex_t *m)
-{
-  uint32_t old_waiter = c->next_waiter;
-  uint32_t my_waiter = c->next_waiter;
-  
-  //allocate a slot
-  while (atomic_swap_u32(& (c->in_use[my_waiter]), SLOT_IN_USE) == SLOT_IN_USE)
-  {
-    my_waiter = (my_waiter + 1) % MAX_PTHREADS;
-    assert (old_waiter != my_waiter);  // do not want to wrap around
-  }
-  c->waiters[my_waiter] = WAITER_WAITING;
-  c->next_waiter = (my_waiter+1) % MAX_PTHREADS;  // race on next_waiter but ok, because it is advisary
-
-  pthread_mutex_unlock(m);
+/* Communicate btw cond_wait and its callback */
+struct cond_junk {
+       pthread_cond_t                          *c;
+       pthread_mutex_t                         *m;
+};
 
-  volatile int* poll = &c->waiters[my_waiter];
-  while(*poll);
-  c->in_use[my_waiter] = SLOT_FREE;
-  pthread_mutex_lock(m);
+/* Callback/bottom half of cond wait.  For those writing these pth callbacks,
+ * the minimum is call generic, set state (communicate with runnable), then do
+ * something that causes it to be runnable in the future (or right now). */
+static void __pth_wait_cb(struct uthread *uthread, void *junk)
+{
+       struct pthread_tcb *pthread = (struct pthread_tcb*)uthread;
+       pthread_cond_t *c = ((struct cond_junk*)junk)->c;
+       pthread_mutex_t *m = ((struct cond_junk*)junk)->m;
+       /* this removes us from the active list; we can reuse next below */
+       __pthread_generic_yield(pthread);
+       pthread->state = PTH_BLK_MUTEX;
+       spin_pdr_lock(&c->spdr_lock);
+       TAILQ_INSERT_TAIL(&c->waiters, pthread, next);
+       spin_pdr_unlock(&c->spdr_lock);
+       pthread_mutex_unlock(m);
+}
 
-  return 0;
+int pthread_cond_wait(pthread_cond_t *c, pthread_mutex_t *m)
+{
+       struct cond_junk local_junk;
+       local_junk.c = c;
+       local_junk.m = m;
+       uthread_yield(TRUE, __pth_wait_cb, &local_junk);
+       pthread_mutex_lock(m);
+       return 0;
 }
 
 int pthread_condattr_init(pthread_condattr_t *a)
 {
-  a = PTHREAD_PROCESS_PRIVATE;
-  return 0;
+       a->pshared = PTHREAD_PROCESS_PRIVATE;
+       a->clock = 0;
+       return 0;
 }
 
 int pthread_condattr_destroy(pthread_condattr_t *a)
 {
-  return 0;
+       return 0;
+}
+
+int pthread_condattr_getpshared(pthread_condattr_t *a, int *s)
+{
+       *s = a->pshared;
+       return 0;
 }
 
 int pthread_condattr_setpshared(pthread_condattr_t *a, int s)
 {
-  a->pshared = s;
-  return 0;
+       a->pshared = s;
+       if (s == PTHREAD_PROCESS_SHARED) {
+               printf("Warning: we don't do shared pthread condvars btw diff MCPs\n");
+               return -1;
+       }
+       return 0;
 }
 
-int pthread_condattr_getpshared(pthread_condattr_t *a, int *s)
+int pthread_condattr_getclock(const pthread_condattr_t *attr,
+                              clockid_t *clock_id)
 {
-  *s = a->pshared;
-  return 0;
+       *clock_id = attr->clock;
+}
+
+int pthread_condattr_setclock(pthread_condattr_t *attr, clockid_t clock_id)
+{
+       printf("Warning: we don't do pthread condvar clock stuff\n");
+       attr->clock = clock_id;
 }
 
 pthread_t pthread_self()
index b0c835c..26ac9a6 100644 (file)
@@ -7,6 +7,7 @@
 #include <uthread.h>
 #include <mcs.h>
 #include <dtls.h>
+#include <spinlock.h>
 
 #ifdef __cplusplus
   extern "C" {
@@ -59,6 +60,7 @@ struct sysc_mgmt {
 #define PTHREAD_BARRIER_SPINS 100 // totally arbitrary
 #define PTHREAD_COND_INITIALIZER {0,{0},{0},0}
 #define PTHREAD_PROCESS_PRIVATE 0
+#define PTHREAD_PROCESS_SHARED 1
 
 typedef struct
 {
@@ -103,19 +105,24 @@ enum
 #define PTHREAD_STACK_PAGES 4
 #define PTHREAD_STACK_SIZE (PTHREAD_STACK_PAGES*PGSIZE)
 
+typedef int clockid_t;
 typedef struct
 {
   int pshared;
+  clockid_t clock;
 } pthread_condattr_t;
 
-
+/* Regarding the spinlock vs MCS, I don't expect this lock to be heavily
+ * contended.  Most of the time, the caller already holds the mutex associated
+ * with the cond var. */
 typedef struct
 {
-  const pthread_condattr_t* attr;
-  uint32_t waiters[MAX_PTHREADS];
-  uint32_t in_use[MAX_PTHREADS];
-  uint32_t next_waiter; //start the search for an available waiter at this spot
+       struct pthread_queue            waiters;
+       struct spin_pdr_lock            spdr_lock;
+       int                                             attr_pshared;
+       int                                             attr_clock;
 } pthread_cond_t;
+
 typedef struct 
 {
        size_t stacksize;
@@ -163,8 +170,11 @@ int pthread_cond_wait(pthread_cond_t *, pthread_mutex_t *);
 
 int pthread_condattr_init(pthread_condattr_t *);
 int pthread_condattr_destroy(pthread_condattr_t *);
-int pthread_condattr_setpshared(pthread_condattr_t *, int);
 int pthread_condattr_getpshared(pthread_condattr_t *, int *);
+int pthread_condattr_setpshared(pthread_condattr_t *, int);
+int pthread_condattr_getclock(const pthread_condattr_t *attr,
+                              clockid_t *clock_id);
+int pthread_condattr_setclock(pthread_condattr_t *attr, clockid_t clock_id);
 
 #define pthread_rwlock_t pthread_mutex_t
 #define pthread_rwlockattr_t pthread_mutexattr_t