Pthread barrier work
authorBarret Rhoden <brho@cs.berkeley.edu>
Mon, 1 Apr 2013 20:48:07 +0000 (13:48 -0700)
committerBarret Rhoden <brho@cs.berkeley.edu>
Mon, 1 Apr 2013 20:48:07 +0000 (13:48 -0700)
Pthreads will block on a barrier now, instead of spinning and yielding
(pthread_yield() would keep them on the runqueue).

Not clear yet what is the best spin/block strategy.  !threads_active is
safe, but not always optimal.

tests/pthread_barrier_test.c
user/parlib/spinlock.c
user/parlib/uthread.c
user/pthread/pthread.c
user/pthread/pthread.h

index 7802c9b..f8d7b53 100644 (file)
@@ -3,23 +3,24 @@
 #include <stdlib.h>
 #include <parlib.h>
 #include <unistd.h>
+#include <sys/time.h>
 
-pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
-//#define printf_safe(...) {}
-#define printf_safe(...) \
-       pthread_mutex_lock(&lock); \
-       printf(__VA_ARGS__); \
-       pthread_mutex_unlock(&lock);
-
-#define NUM_TEST_THREADS 32
-pthread_t my_threads[NUM_TEST_THREADS];
-void *my_retvals[NUM_TEST_THREADS];
 pthread_barrier_t barrier;
 
-void *thread(void* arg)
+#define MAX_NR_TEST_THREADS 100000
+int nr_threads = 100;
+int nr_loops = 10000;
+int nr_vcores = 0;
+
+pthread_t *my_threads;
+void **my_retvals;
+bool run_barriertest = FALSE;
+
+void *thread(void *arg)
 {      
-       for(int i=0; i<NUM_TEST_THREADS; i++) {
-               //printf_safe("[A] pthread %d on vcore %d\n", pthread_self()->id, vcore_id());
+       while (!run_barriertest)
+               cpu_relax();
+       for(int i = 0; i < nr_loops; i++) {
                pthread_barrier_wait(&barrier);
        }
        return (void*)(long)pthread_self()->id;
@@ -27,18 +28,50 @@ void *thread(void* arg)
 
 int main(int argc, char** argv) 
 {
-       pthread_barrier_init(&barrier, NULL, NUM_TEST_THREADS);
-       #define NUM_ITERATIONS 5000
-       for(int j=0; j<NUM_ITERATIONS; j++) {
-//     while (1) {
-               for (int i = 1; i <= NUM_TEST_THREADS; i++) {
-                       pthread_create(&my_threads[i-1], NULL, &thread, NULL);
-               }
-               for (int i = 1; i <= NUM_TEST_THREADS; i++) {
-                       pthread_join(my_threads[i-1], &my_retvals[i-1]);
+       struct timeval start_tv = {0};
+       struct timeval end_tv = {0};
+       long usec_diff;
+       if (argc > 1)
+               nr_threads = strtol(argv[1], 0, 10);
+       if (argc > 2)
+               nr_loops = strtol(argv[2], 0, 10);
+       if (argc > 3)
+               nr_vcores = strtol(argv[3], 0, 10);
+       printf("Running %d threads for %d iterations on %d vcores\n",
+              nr_threads, nr_loops, nr_vcores);
+       nr_threads = MIN(nr_threads, MAX_NR_TEST_THREADS);
+       my_threads = malloc(sizeof(pthread_t) * nr_threads);
+       my_retvals = malloc(sizeof(void*) * nr_threads);
+       if (!(my_retvals && my_threads))
+               perror("Init threads/malloc");
+       if (nr_vcores) {
+               /* Only do the vcore trickery if requested */
+               pthread_can_vcore_request(FALSE);       /* 2LS won't manage vcores */
+               pthread_lib_init();                                     /* gives us one vcore */
+               vcore_request(nr_vcores - 1);           /* ghetto incremental interface */
+               for (int i = 0; i < nr_vcores; i++) {
+                       printd("Vcore %d mapped to pcore %d\n", i,
+                                  __procinfo.vcoremap[i].pcoreid);
                }
-               printf("Iteration %d of %d\n", j, NUM_ITERATIONS);
        }
+       pthread_barrier_init(&barrier, NULL, nr_threads);
+       for (int i = 0; i < nr_threads; i++) {
+               pthread_create(&my_threads[i], NULL, &thread, NULL);
+       }
+       if (gettimeofday(&start_tv, 0))
+               perror("Start time error...");
+       run_barriertest = TRUE;
+       for (int i = 0; i < nr_threads; i++) {
+               pthread_join(my_threads[i], &my_retvals[i]);
+       }
+       if (gettimeofday(&end_tv, 0))
+               perror("End time error...");
+       pthread_barrier_destroy(&barrier);
+       usec_diff = (end_tv.tv_sec - start_tv.tv_sec) * 1000000 +
+                   (end_tv.tv_usec - start_tv.tv_usec);
+       printf("Done: %d threads, %d loops, %d vcores\n",
+              nr_threads, nr_loops, nr_vcores);
+       printf("Time to run: %d usec, %f usec per barrier\n", usec_diff,
+              (float)usec_diff / nr_loops);
        pthread_barrier_destroy(&barrier);
-       sys_proc_destroy(getpid(), 0);
 } 
index abd8e88..148cd77 100644 (file)
@@ -39,6 +39,7 @@ int spinlock_trylock(spinlock_t *lock)
   return __sync_lock_test_and_set(&lock->lock, EBUSY);
 }
 
+/* TODO: this will perform worse than test, then test and set */
 void spinlock_lock(spinlock_t *lock) 
 {
   assert(lock);
index 888674f..d77fbed 100644 (file)
@@ -234,6 +234,7 @@ __uthread_yield(void)
        uthread->yield_func(uthread, uthread->yield_arg);
        /* Make sure you do not touch uthread after that func call */
        /* Leave the current vcore completely */
+       /* TODO: if the yield func can return a failure, we can abort the yield */
        current_uthread = NULL;
        /* Go back to the entry point, where we can handle notifications or
         * reschedule someone. */
index a2fb333..b1062df 100644 (file)
@@ -600,6 +600,24 @@ int pthread_mutex_init(pthread_mutex_t* m, const pthread_mutexattr_t* attr)
   return 0;
 }
 
+/* Helper for spinning sync, returns TRUE if it is okay to keep spinning.
+ *
+ * Alternatives include:
+ *             old_count <= num_vcores() (barrier code, pass in old_count as *state, 
+ *                                        but this only works if every awake pthread
+ *                                        will belong to the barrier).
+ *             just spin for a bit       (use *state to track spins)
+ *             FALSE                     (always is safe)
+ *             etc...
+ * 'threads_ready' isn't too great since sometimes it'll be non-zero when it is
+ * about to become 0.  We really want "I have no threads waiting to run that
+ * aren't going to run on their on unless this core yields instead of spins". */
+/* TODO: consider making this a 2LS op */
+static inline bool safe_to_spin(unsigned int *state)
+{
+       return !threads_ready;
+}
+
 /* Set *spun to 0 when calling this the first time.  It will yield after 'spins'
  * calls.  Use this for adaptive mutexes and such. */
 static inline void spin_to_sleep(unsigned int spins, unsigned int *spun)
@@ -795,45 +813,129 @@ int pthread_once(pthread_once_t* once_control, void (*init_routine)(void))
   return 0;
 }
 
-int pthread_barrier_init(pthread_barrier_t* b, const pthread_barrierattr_t* a, int count)
+int pthread_barrier_init(pthread_barrier_t *b,
+                         const pthread_barrierattr_t *a, int count)
 {
-  b->nprocs = b->count = count;
-  b->sense = 0;
-  pthread_mutex_init(&b->pmutex, 0);
-  return 0;
+       b->total_threads = count;
+       b->sense = 0;
+       atomic_set(&b->count, count);
+       spin_pdr_init(&b->lock);
+       TAILQ_INIT(&b->waiters);
+       b->nr_waiters = 0;
+       return 0;
 }
 
-int pthread_barrier_wait(pthread_barrier_t* b)
-{
-  unsigned int spinner = 0;
-  int ls = !b->sense;
+struct barrier_junk {
+       pthread_barrier_t                               *b;
+       int                                                             ls;
+};
 
-  pthread_mutex_lock(&b->pmutex);
-  int count = --b->count;
-  pthread_mutex_unlock(&b->pmutex);
+/* Callback/bottom half of barrier. */
+static void __pth_barrier_cb(struct uthread *uthread, void *junk)
+{
+       struct pthread_tcb *pthread = (struct pthread_tcb*)uthread;
+       pthread_barrier_t *b = ((struct barrier_junk*)junk)->b;
+       int ls = ((struct barrier_junk*)junk)->ls;
+       /* Removes from active list, we can reuse.  must also restart */
+       __pthread_generic_yield(pthread);
+       /* TODO: if we used a trylock, we could bail as soon as we see sense */
+       spin_pdr_lock(&b->lock);
+       /* If sense is ls (our free value), we lost the race and shouldn't sleep */
+       if (b->sense == ls) {
+               /* TODO: i'd like to fast-path the wakeup, skipping pth_runnable */
+               pthread->state = PTH_BLK_YIELDING;      /* not sure which state for this */
+               spin_pdr_unlock(&b->lock);
+               pth_thread_runnable(uthread);
+               return;
+       }
+       /* otherwise, we sleep */
+       pthread->state = PTH_BLK_MUTEX; /* TODO: consider ignoring this */
+       TAILQ_INSERT_TAIL(&b->waiters, pthread, next);
+       b->nr_waiters++;
+       spin_pdr_unlock(&b->lock);
+}
+
+/* We assume that the same threads participating in the barrier this time will
+ * also participate next time.  Imagine a thread stopped right after its fetch
+ * and add - we know it is coming through eventually.  We finish and change the
+ * sense, which should allow the delayed thread to eventually break through.
+ * But if another n threads come in first, we'll set the sense back to the old
+ * value, thereby catching the delayed thread til the next barrier. 
+ *
+ * A note on preemption: if any thread gets preempted and it is never dealt
+ * with, eventually we deadlock, with all threads waiting on the last one to
+ * enter (and any stragglers from one run will be the last in the next run).
+ * One way or another, we need to handle preemptions.  The current 2LS requests
+ * an IPI for a preempt, so we'll be fine.  Any other strategies will need to
+ * consider how barriers work.  Any time we sleep, we'll be okay (since that
+ * frees up our core to handle preemptions/run other threads. */
+int pthread_barrier_wait(pthread_barrier_t *b)
+{
+       unsigned int spin_state = 0;
+       int ls = !b->sense;     /* when b->sense is the value we read, then we're free*/
+       int nr_waiters;
+       struct pthread_queue restartees = TAILQ_HEAD_INITIALIZER(restartees);
+       struct pthread_tcb *pthread_i;
+       struct barrier_junk local_junk;
+       
+       long old_count = atomic_fetch_and_add(&b->count, -1);
+
+       if (old_count == 1) {
+               printd("Thread %d is last to hit the barrier, resetting...\n",
+                      pthread_self()->id);
+               /* TODO: we might want to grab the lock right away, so a few short
+                * circuit faster? */
+               atomic_set(&b->count, b->total_threads);
+               /* we still need to maintain ordering btw count and sense, in case
+                * another thread doesn't sleep (if we wrote sense first, they could
+                * break out, race around, and muck with count before it is time) */
+               /* wmb(); handled by the spin lock */
+               spin_pdr_lock(&b->lock);
+               /* Sense is only protected in addition to decisions to sleep */
+               b->sense = ls;  /* set to free everyone */
+               /* All access to nr_waiters is protected by the lock */
+               if (!b->nr_waiters) {
+                       spin_pdr_unlock(&b->lock);
+                       return PTHREAD_BARRIER_SERIAL_THREAD;
+               }
+               TAILQ_CONCAT(&restartees, &b->waiters, next);
+               nr_waiters = b->nr_waiters;
+               b->nr_waiters = 0;
+               spin_pdr_unlock(&b->lock);
+               /* TODO: do we really need this state tracking? */
+               TAILQ_FOREACH(pthread_i, &restartees, next)
+                       pthread_i->state = PTH_RUNNABLE;
+               /* bulk restart waiters (skipping pth_thread_runnable()) */
+               mcs_pdr_lock(&queue_lock);
+               threads_ready += nr_waiters;
+               TAILQ_CONCAT(&ready_queue, &restartees, next);
+               mcs_pdr_unlock(&queue_lock);
+               if (can_adjust_vcores)
+                       vcore_request(threads_ready);
+               return PTHREAD_BARRIER_SERIAL_THREAD;
+       } else {
+               /* Spin if there are no other threads to run.  No sense sleeping */
+               do {
+                       if (b->sense == ls)
+                               return 0;
+                       cpu_relax();
+               } while (safe_to_spin(&spin_state));
 
-  if(count == 0)
-  {
-    printd("Thread %d is last to hit the barrier, resetting...\n", pthread_self()->id);
-    b->count = b->nprocs;
-       wmb();
-    b->sense = ls;
-    return PTHREAD_BARRIER_SERIAL_THREAD;
-  }
-  else
-  {
-    while(b->sense != ls) {
-      cpu_relax();
-      spin_to_sleep(PTHREAD_BARRIER_SPINS, &spinner);
-    }
-    return 0;
-  }
+               /* Try to sleep, when we wake/return, we're free to go */
+               local_junk.b = b;
+               local_junk.ls = ls;
+               uthread_yield(TRUE, __pth_barrier_cb, &local_junk);
+               // assert(b->sense == ls);
+               return 0;
+       }
 }
 
-int pthread_barrier_destroy(pthread_barrier_tb)
+int pthread_barrier_destroy(pthread_barrier_t *b)
 {
-  pthread_mutex_destroy(&b->pmutex);
-  return 0;
+       assert(TAILQ_EMPTY(&b->waiters));
+       assert(!b->nr_waiters);
+       /* Free any locks (if we end up using an MCS) */
+       return 0;
 }
 
 int pthread_detach(pthread_t thread)
index 26ac9a6..8a8cdcd 100644 (file)
@@ -73,17 +73,14 @@ typedef struct
   atomic_t lock;
 } pthread_mutex_t;
 
-/* TODO: MAX_PTHREADS is arbitrarily defined for now.
- * It indicates the maximum number of threads that can wait on  
-   the same cond var/ barrier concurrently. */
-
-#define MAX_PTHREADS 32
 typedef struct
 {
-  volatile int sense;
-  int count;
-  int nprocs;
-  pthread_mutex_t pmutex;
+       int                                                     total_threads;
+       volatile int                            sense;  /* state of barrier, flips btw runs */
+       atomic_t                                        count;
+       struct spin_pdr_lock            lock;
+       struct pthread_queue            waiters;
+       int                                                     nr_waiters;
 } pthread_barrier_t;
 
 #define WAITER_CLEARED 0