Complete rewrite of c3po for multicore
[akaros.git] / user / c3po / threads / threadlib.c
1 // ROS Specific headers
2 #include <vcore.h>
3 #include <mcs.h>
4 #include <ros/syscall.h>
5
6 // Capriccio Specific headers
7 #include "ucontext.h"
8 #include "threadlib_internal.h"
9 #include "util.h"
10 #include "config.h"
11 #include "blocking_graph.h"
12 #include "stacklink.h"
13
14 // Glibc Specific headers
15 #include <stdlib.h>
16 #include <stdio.h>
17 #include <string.h>
18 #include <execinfo.h>
19 #include <sys/time.h>
20 #include <unistd.h>
21 #include <syscall.h>
22 #include <sys/syscall.h>
23
24 /******************************************************************************/
25 /******************************* Configuration ********************************/
26 /******************************************************************************/
27
28 // Comment out, to enable debugging in this file
29 #ifndef DEBUG_threadlib_c
30 #undef debug
31 #define debug(...)
32 #undef tdebug
33 #define tdebug(...)
34 #endif
35
36 // Chose the size of the scheduler threads stack in the case of using
37 // NIO vs. AIO
38 #ifdef USE_NIO
39 #define SCHEDULER_STACK_SIZE 1024*128
40 #else
41 #define SCHEDULER_STACK_SIZE 1024
42 #endif
43
44 /******************************************************************************/
45 /***************************** Global Variables *******************************/
46 /******************************************************************************/
47
48 // The PID of the main process.  This is useful to prevent errors when
49 // forked children call exit_func().
50 static pid_t main_pid;
51
52 // Variable used to hold the time when the program first started executing
53 static unsigned long long start_usec;
54
55 // Variables used to store the amount of time spent in different parts of 
56 // the application code: the scheduler, main thread, all other threads
57 static struct {
58   cap_timer_t scheduler;
59   cap_timer_t main;
60   cap_timer_t app;
61 } timers;
62
63 // Pointer to the main thread
64 static thread_t* main_thread=NULL;
65
66 // Lock used to restrict access to all thread lists, thread state changes,
67 // and the count of threads in different states
68 static mcs_lock_t thread_lock = MCS_LOCK_INIT;
69
70 // A list of all threads in the system (i.e. spawned, but not exited yet)
71 static pointer_list_t *threadlist = NULL;
72
73 // Variable used to maintain global counts of how 
74 // many threads are in a given state (used only for bookkeeping)
75 static struct {
76   int running;
77   int runnable;
78   int suspended;
79   int detached;
80   int zombie;
81   int total;
82 } num_threads = {0, 0, 0, 0};
83
84 // Set of global flags
85 static struct {
86   // Flag to indicate whether or not to override syscalls.  We don't
87   // override during initialization, in order to avoid loops w/
88   // libraries such as perfctr that must do IO.  We also don't override
89   // syscalls after SIG_ABRT, so we can get proper core dumps.
90   // Only really necessary for linux, as ROS does all IO asyncronously.
91   volatile unsigned int override_syscalls: 1;
92
93   // Flags regarding the state of the main_thread
94   volatile unsigned int main_exited: 1;
95   volatile unsigned int exit_func_done: 1;
96 } gflags = {1, 0, 0};
97
98 // Set of thread local flags (only useful in scheduler or vcore context)
99 static __thread struct {
100   // Flag indicating whether we are currently running scheduler code 
101   // on the core or not
102   int in_scheduler: 1;
103 } lflags = {0};
104
105 // Function pointers for plugging in modular scheduling algorithms
106 // Run queues are maintained locally in each algorithm
107 static void (*sched_init)(void); 
108 static void (*sched_add_thread)(thread_t *t); 
109 static thread_t* (*sched_next_thread)(void); 
110
111 // Sleep queue, containing any sleeping threads
112 static pointer_list_t *sleepq = NULL;
113
114 // Variables used to regulate sleep times of sleeping threads
115 static struct {
116   // When is the sleep time calculated from
117   unsigned long long last_check;   
118   // Wall clock time of the wake time of the first sleeping thread
119   unsigned long long first_wake;
120   // Length of the whole sleep queue, in microseconds
121   unsigned long long max;    
122 } sleep_times = {0, 0, 0};
123
124 // Function pointer to the io polling function being used
125 // Only really makes sense in Linux, as ROS uses all async IO and
126 // event queues for IO
127 static void (*io_polling_func)(long long usecs);
128
129 // Variables set by CIL when executing using linked stacks
130 void **start_node_addrs = NULL;
131 int *start_node_stacks = NULL;
132
133 /******************************************************************************/
134 /************************** Function Declarations *****************************/
135 /******************************************************************************/
136
137 // Vcore functions
138 inline static bool __query_vcore_request();
139 inline static bool __query_vcore_yield();
140
141 // Sleep queue functions
142 inline static void sleepq_add_thread(thread_t *t, unsigned long long timeout);
143 inline static void sleepq_remove_thread(thread_t *t);
144 inline static void sleepq_check_wakeup();
145
146 // Thread related functions
147 void run_next_thread();
148 inline static void __thread_resume(thread_t *t);
149 inline static void __thread_make_runnable(thread_t *t);
150 inline static void __free_thread_prep(thread_t *t);
151 inline static void free_thread(thread_t *t);
152
153 // Helper Functions 
154 static void exit_func(void);
155 static void pick_scheduler();
156 static int get_stack_size_kb_log2(void *func);
157
158 /******************************************************************************/
159 /********************************** Macros ************************************/
160 /******************************************************************************/
161
162 // Sanity check THREAD_KEY_MAX and size of key_data_count
163 //#if THREAD_KEY_MAX >> (sizeof(thread_t.key_data_count)-1) != 1
164 //#error not enough space in thread_t.key_data_count
165 //#endif
166
167 /******************************************************************************/
168 /*************************** Function Definitions *****************************/
169 /******************************************************************************/
170
171 /**
172  * This will be called as part of the initialization sequence
173  **/
174 void main_thread_init() 
175 {
176   tdebug("Enter\n");
177   static int init_done = 0;
178   if(init_done) 
179     return;
180   init_done = 1;
181
182   // Make sure the clock init is already done, so we don't wind up w/
183   // a dependancy loop b/w perfctr and the rest of the code.
184 //  init_cycle_clock();
185 //  init_debug();
186
187   // Initialize and Register all timers
188 //  init_timer(&timers.main);
189 //  init_timer(&timers.scheduler);
190 //  init_timer(&timers.app);
191 //  register_timer("total", &timers.main);
192 //  register_timer("sheduler", &timers.scheduler);
193 //  register_timer("app", &timers.app);
194
195   // Start the main timer
196 //  start_timer(&timers.main);
197
198   // Init the scheduler function pointers
199   pick_scheduler();
200
201   // Init the scheduler code
202   sched_init();
203
204   // Create the main thread
205   main_thread = malloc(sizeof(thread_t));  
206   assert(main_thread);
207   bzero(main_thread, sizeof(thread_t));
208   main_thread->context = create_context(main_thread, NULL, NULL);
209   main_thread->name = "main_thread";
210   main_thread->initial_arg = NULL;
211   main_thread->initial_func = NULL;
212   main_thread->tid = 0;   // fixed value
213   main_thread->joinable = 0;
214   main_thread->sleep = -1;
215
216   // Create global thread list
217   threadlist = new_pointer_list("thread_list");
218
219   // Create sleep queue
220   sleepq = new_pointer_list("sleep_queue");
221   sleep_times.max = 0;
222   sleep_times.last_check = 0;
223   sleep_times.first_wake = 0;
224
225   // Add main thread to the global list of threads
226   mcs_lock_lock(&thread_lock);
227   pl_add_tail(threadlist, main_thread);
228   num_threads.total++;
229   // Update number of running threads
230   main_thread->state = RUNNING;
231   num_threads.running++;
232   num_threads.detached++;
233   mcs_lock_unlock(&thread_lock);
234
235   tdebug("running: %d, runnable: %d, suspended: %d, detached: %d\n", 
236          num_threads.running, num_threads.runnable, 
237          num_threads.suspended, num_threads.detached);
238   
239   // intialize blocking graph functions
240 //  init_blocking_graph();
241
242 //  // Set stats for the main thread
243 //  {
244 //    bg_dummy_node->num_here++;
245 //    current_thread->curr_stats.node = bg_dummy_node;
246 //    current_thread->curr_stats.files = 0;
247 //    current_thread->curr_stats.sockets = 0;
248 //    current_thread->curr_stats.heap = 0;
249 //    bg_set_current_stats( &current_thread->curr_stats );
250 //
251 //    current_thread->prev_stats = current_thread->curr_stats;
252 //  }
253
254   // Setup custom exit function called by glibc when exit() called
255   // Don't exit when main exits - wait for threads to die
256   atexit(exit_func);
257
258   // Mark the time the program starts running
259 //  start_usec = current_usecs();
260
261   // Things are all set up, so now turn on the syscall overrides
262   // Only really required by the linux port.
263 //  gflags.override_syscalls = 1;
264   tdebug("Exit\n");
265 }
266
267 /**
268  * Function to select and launch the next thread with the selected scheduler.
269  * Only called from within vcore context.  Keep this in mind when reasoning
270  * about how thread local variables, etc. are used.
271  **/
272 void run_next_thread()
273 {
274   tdebug("Enter\n");
275   // Make sure we start out by saving edge stats
276 //  static int init_done = 0;
277 //  static cpu_tick_t next_info_dump = 0, next_graph_stats = 0, now = 0;
278 //  if( !init_done ) {
279 //    init_done = 1;
280 //    if (conf_no_statcollect) 
281 //      bg_save_stats = 0;
282 //    else
283 //      bg_save_stats = 1;
284 //    
285 //    GET_REAL_CPU_TICKS( now );
286 //    next_graph_stats = now + 1 * ticks_per_second;
287 //    
288 //    start_timer(&timers.scheduler);
289 //  }
290  
291 //  // Cheesy way of handling things with timing requirements
292 //  {
293 //    GET_REAL_CPU_TICKS( now );
294 //      
295 //    // toggle stats collection
296 //    if( conf_no_statcollect == 0 && next_graph_stats < now ) {
297 //      bg_save_stats = 1 - bg_save_stats;
298 // 
299 //      if( bg_save_stats ) { 
300 //        // record stats for 100 ms
301 //        next_graph_stats = now + 100 * ticks_per_millisecond;
302 //          
303 //        // update the stats epoch, to allow proper handling of the first data items
304 //        bg_stats_epoch++;
305 //      }            
306 //      else {
307 //        // avoid stats for 2000 ms
308 //        next_graph_stats = now + 2000 * ticks_per_millisecond;
309 //      }
310 //      //output(" *********************** graph stats %s\n", bg_save_stats ? "ON" : "OFF" );
311 //    }
312 //      
313 //    // Poll for I/O
314 //    static cpu_tick_t next_poll = 0;
315 //    static int pollcount = 1000;
316 //    if( likely( (int)io_polling_func) ) {
317 //      if( num_threads.runnable == 0  ||  --pollcount <= 0  ||  next_poll < now ) {
318 //        long long timeout = 0;
319 // 
320 //        if( num_threads.runnable == 0 ) {
321 //          if (sleep_times.first_wake == 0) {
322 //            timeout = -1;
323 //          } else {
324 //            // there are threads in the sleep queue
325 //            // so poll for i/o till at most that time
326 //            unsigned long long now;
327 //            now = current_usecs();
328 //            tdebug ("first_wake: %lld, now: %lld\n", sleep_times.first_wake, now);
329 //            if (sleep_times.first_wake > now)
330 //              timeout = sleep_times.first_wake - now;
331 //          }
332 //        }
333 // 
334 //        stop_timer(&timers.scheduler);
335 //        //if( timeout != -1 )  output("timeout is not zero\n");
336 //        io_polling_func( timeout ); // allow blocking
337 //        start_timer(&timers.scheduler);
338 // 
339 //        next_poll = now + (ticks_per_millisecond << 13);
340 //        pollcount = 10000;
341 //      }
342 //    }
343 //
344 //    // Gather debug stats
345 //    if( 0 && next_info_dump < now ) {
346 //      dump_debug_info();
347 //      next_info_dump = now + 5 * ticks_per_second;
348 //    }
349 //  }
350
351   // Wake up threads that are asleep who's timeouts have expired
352   sleepq_check_wakeup(FALSE);   
353  
354   // Keep trying to get a thread off of the scheduler queue
355   thread_t *t; 
356   while(1) {
357     mcs_lock_lock(&thread_lock);
358         // Check to see if we are in the processes of exiting the entire program.
359         // If we are, then go ahead and yield this vcore. We are dying, afterall..
360         if(gflags.exit_func_done) {
361       bool yieldcore = __query_vcore_yield();
362       mcs_lock_unlock(&thread_lock);
363       if(yieldcore) vcore_yield();
364     }
365                 
366     // Otherwise, grab a thread from the scheduler queue 
367     t = sched_next_thread();
368
369         // If there aren't any, if there aren't any, go ahead and yield the core
370         // back to the kernel.  This can only really happen when there are only
371         // running and suspended threads in the system, but no runnable ones. When
372         // a suspended thread is woken up, it will try and request a new vcore from
373         // the system if appropriate.
374     if(t == NULL) {
375       bool yieldcore = __query_vcore_yield();
376       mcs_lock_unlock(&thread_lock);
377       if(yieldcore) vcore_yield();
378     }
379         // Otherwise, if the thread is in the ZOMBIE state, then it must have been
380         // detached and added back to the queue for the scheduler to reap.  
381     // Reap it now, then go and grab the next thread.
382     else if(t->state == ZOMBIE) {
383       __free_thread_prep(t);
384       mcs_lock_unlock(&thread_lock);
385       free_thread(t);
386     }
387     // Otherwise, we've found a thread to run, so continue.
388     else
389       break;
390   }
391   // Update the num_threads variables and the thread state
392   num_threads.runnable--;
393   num_threads.running++;
394   t->state = RUNNING;
395   mcs_lock_unlock(&thread_lock);
396  
397   tdebug("running: %d, runnable: %d, suspended: %d, detached: %d\n", 
398          num_threads.running, num_threads.runnable, 
399          num_threads.suspended, num_threads.detached);
400  
401   tdebug("About to run TID %d (%s)\n", t->tid, t->name ? t->name : "no name");
402  
403   // Run the thread
404 //  stop_timer(&timers.scheduler);
405 //  start_timer(&timers.app);
406   tdebug("Exit\n");
407   assert(!current_thread);
408   restore_context(t->context);
409 }
410
411 /**
412  * Wrapper function for new threads.  This allows us to clean up
413  * correctly if a thread exits without calling thread_exit().
414  **/
415 static void* __attribute__((noreturn)) new_thread_wrapper(void *arg)
416 {
417   tdebug("Enter\n");
418   // set up initial stats
419 //  current_thread->curr_stats.files = 0;
420 //  current_thread->curr_stats.sockets = 0;
421 //  current_thread->curr_stats.heap = 0;
422 //  bg_set_current_stats( &current_thread->curr_stats );
423 //  current_thread->prev_stats = current_thread->curr_stats;
424 //
425 //  // set up stack limit for new thread
426 //  stack_bottom = current_thread->stack_bottom;
427 //  stack_fingerprint = current_thread->stack_fingerprint;
428
429   // start the thread
430   tdebug("Initial arg = %p\n", current_thread->initial_arg);
431   void *ret = current_thread->initial_func(current_thread->initial_arg);
432   
433   // call thread_exit() to do the cleanup
434   thread_exit(ret);
435   assert(0);
436 }
437
438 inline static bool __query_vcore_request()
439 {
440   return FALSE;
441   //return TRUE;
442   //return (num_vcores() < 2);
443   //return ((num_threads.total-num_threads.zombie) > num_vcores());
444   //return ((num_threads.total-num_threads.zombie) > num_vcores());
445 }
446
447 inline static bool __query_vcore_yield()
448 {
449   return FALSE;
450   //return TRUE;
451 }
452
453 /* Create a new thread and add it to the scheduler queue */
454 static thread_t* new_thread(char *name, void* (*func)(void *), void *arg, thread_attr_t attr)
455 {
456   tdebug("Enter\n");
457   static unsigned max_tid = 1;
458   thread_t *t = malloc( sizeof(thread_t) );
459   int stack_size_kb_log2 = 10;//get_stack_size_kb_log2(func);
460   int stack_size = 1 << (stack_size_kb_log2 + 10);
461   void *stack = malloc(stack_size);//stack_get_chunk( stack_size_kb_log2 );
462
463   if( !t || !stack ) {
464     if (t) free(t);
465     if (stack) free(stack);//stack_return_chunk(stack_size_kb_log2, stack);
466     printf("Uh Oh!\n");
467     return NULL;
468   }
469
470   bzero(t, sizeof(thread_t));
471   t->context = create_context(t, new_thread_wrapper, stack+stack_size);
472   t->stack = stack;
473   t->stack_size_kb_log2 = stack_size_kb_log2;
474   t->stack_bottom = stack;
475   t->stack_fingerprint = 0;
476   t->name = (name ? name : "noname"); 
477   t->initial_func = func;
478   t->initial_arg = arg;
479   t->joinable = 1;
480   t->tid = max_tid++;
481   t->sleep = -1;
482
483   // Make sure the thread has a valid node before we add it to the scheduling list
484 //  bg_dummy_node->num_here++;
485 //  t->curr_stats.node = bg_dummy_node;
486
487   mcs_lock_lock(&thread_lock);
488   // Up the count of detached threads if this thread should be detached
489   if( attr ) {
490     t->joinable = attr->joinable;
491     if(!t->joinable) {
492       num_threads.detached++;
493     }
494   }
495   // Add the thread to the global list of all threads
496   pl_add_tail(threadlist, t);
497   num_threads.total++;
498   // Add the thread to the scheduler to make it runnable
499   sched_add_thread(t);
500   t->state = RUNNABLE;
501   num_threads.runnable++;
502   bool requestcore = __query_vcore_request();
503   mcs_lock_unlock(&thread_lock);
504
505   tdebug("running: %d, runnable: %d, suspended: %d, detached: %d\n", 
506          num_threads.running, num_threads.runnable, 
507          num_threads.suspended, num_threads.detached);
508
509   /* Possibly request a new vcore.  In the best case, we have 1 core per thread
510    * that we launch.  If not, it never hurts to ask for another one.  The
511    * system will simply deny us and the scheduler will multiplex all threads on
512    * the available vcores. */
513   if(requestcore) vcore_request(1);
514
515   tdebug("Exit\n");
516   // Return the newly created thread.
517   return t;
518 }
519
520 /**
521  * Free the memory associated with the given thread.
522  * Needs to be protected by the thread_lock.
523  **/
524 inline static void __free_thread_prep(thread_t *t)
525 {
526   // Make sure we should actually be freeing this thread
527   assert(t->state == ZOMBIE);
528   // Make this zombie a ghost!
529   t->state = GHOST;
530   // Drop the count of zombie threads in the system
531   num_threads.zombie--;
532   // Remove this thread from the global list of all threads
533   pl_remove_pointer(threadlist, t);
534   num_threads.total--;
535
536   tdebug("running: %d, runnable: %d, suspended: %d, detached: %d\n", 
537          num_threads.running, num_threads.runnable, 
538          num_threads.suspended, num_threads.detached);
539 }
540
541 /**
542  * Actually free the memory associated with a thread.  Should only be called
543  * after first calling __free_thread_prep while holding the thread_lock.  Make
544  * sure to NOT call this function while holding the thread_lock though.
545  **/
546 inline static void free_thread(thread_t *t)
547 {
548   // Free the thread memory
549   if( t != main_thread ) {
550     free(t->stack);//stack_return_chunk(t->stack_size_kb_log2, t->stack);
551   }
552   destroy_context(t->context);
553   free(t);
554 }
555
556 /**
557  * Give control back to the scheduler after main() exits.  This allows
558  * remaining threads to continue running.
559  * FIXME: we don't know whether user explicitly calls exit() or main() normally returns
560  * in the previous case, we should exit immediately, while in the latter, we should 
561  * join other threads.
562  * Overriding exit() does not work because normal returning from
563  * main() also calls exit().
564  **/
565 static void exit_func(void)
566 {
567   tdebug("Enter\n");
568   // Don't do anything if we're in a forked child process
569   if(current_thread != main_thread)
570     return;
571
572   tdebug("current=%s, gflags.main_exited=%d\n", 
573           current_thread?current_thread->name : "NULL", gflags.main_exited);
574
575   gflags.main_exited = TRUE;
576 //  if( !gflags.exit_whole_program )
577         // this will block until all other threads finish
578     thread_exit(NULL);
579
580 //  // dump the blocking graph before we exit
581 //  if( conf_dump_blocking_graph ) {
582 //    tdebug("dumping blocking graph from exit_func()\n");
583 //    dump_blocking_graph(); 
584 //  }
585 //
586 //  // FIXME: make sure to kill cloned children
587 //
588 //  if( conf_dump_timing_info ) {
589 //    if( timers.main.running )   stop_timer(&timers.main);
590 //    if( timers.scheduler.running )   stop_timer(&timers.scheduler);
591 //    if( timers.app.running )   stop_timer(&timers.app);
592 //    print_timers();
593 //  }
594   tdebug("Exit\n");
595 }
596
597 static char *THREAD_STATES[] = {"RUNNING", "RUNNABLE", "SUSPENDED", "ZOMBIE", "GHOST"};
598
599 // dump status to stderr 
600 void dump_debug_info()
601 {
602   output("\n\n-- Capriccio Status Dump --\n");
603   output("Current thread %d  (%s)\n", 
604          current_thread ? (int)thread_tid(current_thread) : -1,
605          (current_thread && current_thread->name) ? current_thread->name : "noname");
606   output("threads:    %d runnable    %d suspended    %d detached\n", 
607          num_threads.runnable, num_threads.suspended, num_threads.detached);
608
609   print_resource_stats();
610
611   stack_report_usage_stats();
612   stack_report_call_stats();
613
614   {
615     int i;
616     output("thread locations:");
617     for(i=0; i<bg_num_nodes; i++) {
618       bg_node_t *node = bg_nodelist[i];
619       if(node->num_here)
620         output("  %d:%d", node->node_num, node->num_here);
621     }
622     output("\n");
623   }
624
625   output("\n\n");
626 }
627  
628 void dump_thread_state()
629 {
630   void *bt[100];
631   int count, i;
632   linked_list_entry_t *e;
633
634   output("\n-- State of all threads --\n");
635
636   e = ll_view_head(threadlist);
637   while (e) {
638     char sleepstr[80];
639     thread_t *t = (thread_t *) pl_get_pointer(e);
640
641     output("Thread %2d (%s)  %s    %d file   %d sock    %ld KB heap   ? KB stack    %s%s%s\n", 
642            thread_tid(t), t->name, THREAD_STATES[t->state], 
643            t->curr_stats.files, 
644            t->curr_stats.sockets, 
645            t->curr_stats.heap / 1024,
646            //(long)-1, // FIXME: add total stack numbers after incorporating Jeremy's code
647            t->joinable ? "joinable  " : "",
648            t->sleep > 0 ? (sprintf(sleepstr,"sleep=%lld  ",t->sleep), sleepstr) : "");
649     e = ll_view_next(threadlist, e);
650   }
651
652   return;
653 }
654
655
656
657
658 /**
659  * decide on the scheduler to use, based on the CAPRICCIO_SCHEDULER
660  * environment variable.  This function should only be called once,
661  * durring the initialization of the thread runtime.
662  **/
663 #define SET_SCHEDULER(s) do {\
664   sched_init = sched_##s##_init; \
665   sched_next_thread = sched_##s##_next_thread; \
666   sched_add_thread = sched_##s##_add_thread; \
667   if( !conf_no_init_messages ) \
668     output("CAPRICCIO_SCHEDULER=%s\n",__STRING(s)); \
669 } while(0)
670
671 static void pick_scheduler()
672 {
673   char *sched = getenv("CAPRICCIO_SCHEDULER");
674
675   if(sched == NULL) 
676     SET_SCHEDULER( global_rr ); // defaults
677   else if( !strcasecmp(sched,"global_rr") )
678     SET_SCHEDULER( global_rr );
679   else if( !strcasecmp(sched,"global_lifo") )
680     SET_SCHEDULER( global_lifo );
681   else if( !strcasecmp(sched,"graph_rr") )
682     SET_SCHEDULER( graph_rr );
683   else if( !strcasecmp(sched,"graph_rr_down") )
684     SET_SCHEDULER( graph_rr_down );
685   else if( !strcasecmp(sched,"graph_batch") )
686     SET_SCHEDULER( graph_batch );
687   else if( !strcasecmp(sched,"graph_highnum") )
688     SET_SCHEDULER( graph_highnum );
689   else if( !strcasecmp(sched,"graph_priority") )
690     SET_SCHEDULER( graph_priority );
691   else
692     fatal("Invalid value for CAPRICCIO_SCHEDULER: '%s'\n",sched);
693
694 }
695
696 /**
697  * Perform necessary management to yield the current thread
698  * if suspend == TRUE && timeout != 0 -> the thread is added 
699  * to the sleep queue and later woken up when the clock times out.
700  * Returns FALSE if time-out actually happens, TRUE if woken up
701  * by other threads, INTERRUPTED if interrupted by a signal.
702  **/
703 static int __thread_yield(int suspend, unsigned long long timeout)
704 {
705   tdebug("Enter\n");
706   // Now we use a per-thread errno stored in thread_t
707   int savederrno;
708   savederrno = errno;
709
710   tdebug("current_thread=%p\n",current_thread);
711
712 //  {
713 //#ifdef SHOW_EDGE_TIMES
714 //    cpu_tick_t start, end, rstart, rend;
715 //    GET_CPU_TICKS(start);
716 //    GET_REAL_CPU_TICKS(rstart);
717 //#endif
718 //
719 //    // Figure out the current node in the graph
720 //    if( !conf_no_stacktrace )
721 //      bg_backtrace_set_node();
722 //    // FIXME: fake out what cil would do...  current_thread->curr_stats.node = bg_dummy_node;
723 //
724 //    // We should already have been told the node by CIL or directly by the programmer
725 //    assert( current_thread->curr_stats.node != NULL );
726 //    
727 //    // Update node counts
728 //    current_thread->prev_stats.node->num_here--;
729 //    current_thread->curr_stats.node->num_here++;
730 //    
731 //    // Update the blocking graph info
732 //    if( bg_save_stats )
733 //      bg_update_stats();
734 //  
735 //#ifdef SHOW_EDGE_TIMES
736 //    GET_CPU_TICKS(end);
737 //    GET_REAL_CPU_TICKS(rend);
738 //    {
739 //      thread_stats_t *curr = &current_thread->curr_stats;
740 //      thread_stats_t *prev = &current_thread->prev_stats;
741 //      output(" %3d -> %-3d     %7lld ticks  (%lld ms)   %7lld rticks (%lld ms)    ", 
742 //             prev->node->node_num,  curr->node->node_num, 
743 //             curr->cpu_ticks - prev->cpu_ticks,
744 //             (curr->cpu_ticks - prev->cpu_ticks) / ticks_per_millisecond,
745 //# ifdef USE_PERFCTR
746 //             curr->real_ticks - prev->real_ticks,
747 //             (curr->real_ticks - prev->real_ticks) / ticks_per_millisecond
748 //# else
749 //             curr->cpu_ticks - prev->cpu_ticks,
750 //             (curr->cpu_ticks - prev->cpu_ticks) / ticks_per_millisecond
751 //# endif
752 //             );
753 //
754 //      output("update bg node %d:   %lld   (%lld ms)   real: %lld (%lld ms)\n", 
755 //             current_thread->curr_stats.node->node_num, 
756 //             (end-start), (end-start)/ticks_per_millisecond, 
757 //             (rend-rstart), (rend-rstart)/ticks_per_millisecond);
758 //    }
759 //#endif
760 //  }
761
762   // Decide what to do with the thread
763   mcs_lock_lock(&thread_lock);
764   // Drop the count of running threads
765   num_threads.running--;
766   // If we should suspend it, do so for the specified timeout
767   if(suspend) { 
768     current_thread->state = SUSPENDED;
769     num_threads.suspended++;
770         // Add the thread to the sleep queue if a timeout was given
771     // If no timeout was given, that means we should sleep forever
772         // or until some other thread wakes us up (i.e. on a join)      
773     if(timeout)
774       sleepq_add_thread(current_thread, timeout);
775   }
776   else {
777     current_thread->state = RUNNABLE;
778     num_threads.runnable++;
779     sched_add_thread(current_thread);
780   }
781   mcs_lock_unlock(&thread_lock);
782
783   tdebug("running: %d, runnable: %d, suspended: %d, detached: %d\n", 
784          num_threads.running, num_threads.runnable, 
785          num_threads.suspended, num_threads.detached);
786
787 //  // squirrel away the stack limit for next time
788 //  current_thread->stack_bottom = stack_bottom;
789 //  current_thread->stack_fingerprint = stack_fingerprint;
790
791   // Save the context of the currently running thread.
792   // When it is restored it will start up again right here
793   save_context(current_thread->context);
794
795   // We only want to call switch_to_vcore() when running through
796   // this function at the time of the yield() call.  Since our 
797   // context is restored right here though, we need a way to jump around
798   // the call to switch_to_vcore() once the thread is woken back up.
799   volatile bool yielding = TRUE;
800   if (yielding) {
801     yielding = FALSE; /* for when it starts back up */
802     // Switch back to vcore context to run the scheduler
803     switch_to_vcore();
804   }
805   // Thread context restored...
806   
807 //  // Set up stack limit for new thread
808 //  stack_bottom = current_thread->stack_bottom;
809 //  stack_fingerprint = current_thread->stack_fingerprint;
810 //
811 //  // rotate the stats
812 //  if( bg_save_stats ) {
813 //    current_thread->prev_stats = current_thread->curr_stats;
814 //    
815 //    // update thread time, to skip time asleep
816 //    GET_CPU_TICKS( current_thread->prev_stats.cpu_ticks );
817 //    current_thread->prev_stats.cpu_ticks -= ticks_diff;  // FIXME: subtract out time to do debug output
818 //#ifdef USE_PERFCTR
819 //    GET_REAL_CPU_TICKS( current_thread->prev_stats.real_ticks );
820 //    current_thread->prev_stats.real_ticks -= ticks_rdiff;  // FIXME: subtract out time to do debug output
821 //#endif    
822 //  } else {
823 //    current_thread->prev_stats.node = current_thread->curr_stats.node;
824 //  }
825 //  
826 //  // Check whether time-out happened already or not
827   int rv = OK;
828 //  if (suspend && timeout && current_thread->timeout) {
829 //    rv = TIMEDOUT;
830 //    current_thread->timeout = 0;
831 //  }
832 //
833 //  // Check for and process pending signals
834 //  if ( likely(!current_thread->sig_waiting) ) {
835 //    if (sig_process_pending())
836 //              rv = INTERRUPTED;
837 //  } else {
838 //        // If sig_waiting is 1, sigwait() itself will handle the remaining    
839 //        rv = INTERRUPTED;
840 //  }
841   
842   errno = savederrno;
843   tdebug("Exit\n");
844   return rv;
845 }
846
847 void thread_yield()
848 {
849   CAP_SET_SYSCALL();
850   __thread_yield(FALSE,0);
851   CAP_CLEAR_SYSCALL();
852 }
853
854 int sched_yield(void)
855 {
856   thread_yield();
857   return 0;
858 }
859 strong_alias(sched_yield,__sched_yield);
860
861 // Timeout == 0 means infinite time
862 int thread_suspend_self(unsigned long long timeout)
863 {
864   return __thread_yield(TRUE, timeout);
865 }
866
867 //////////////////////////////////////////////////////////////////////
868 // 
869 //  External functions
870 // 
871 //////////////////////////////////////////////////////////////////////
872
873 inline thread_t *thread_spawn_with_attr(char *name, void* (*func)(void *), 
874                                  void *arg, thread_attr_t attr)
875 {
876   return new_thread(name, func, arg, attr);
877 }
878
879 inline thread_t *thread_spawn(char *name, void* (*func)(void *), void *arg)
880 {
881   return new_thread(name, func, arg, NULL);
882 }
883
884 static inline bool time_to_die() {
885 return ((gflags.main_exited == TRUE) &&
886         ((num_threads.total-num_threads.zombie) == num_threads.detached)
887        );
888 }
889
890 void thread_exit(void *ret)
891 {
892   tdebug("Enter\n");
893   thread_t *t = current_thread;
894
895   //printf("current=%s, gflags.main_exited=%d\n", 
896   //        current_thread?current_thread->name : "NULL", gflags.main_exited);
897
898   if (current_thread == main_thread && gflags.main_exited == FALSE) {
899         // The case when the user calls thread_exit() in main thread is complicated
900         // we cannot simply terminate the main thread, because we need that stack to terminate the
901         // whole program normally.  so we call exit() to make the c runtime help us get the stack
902         // context where we can just return to terminate the whole program
903         // this will call exit_func() and in turn call thread_exit() again
904     gflags.main_exited = TRUE;
905         exit(0);                
906   }
907
908 //  // Note the thread exit in the blocking graph
909 //  current_thread->curr_stats.node = bg_exit_node;
910 //  current_thread->prev_stats.node->num_here--;
911 //  current_thread->curr_stats.node->num_here++;
912 //  if( bg_save_stats ) {
913 //    bg_update_stats();
914 //  }
915     
916   // If we are the main thread...
917   while(unlikely(t == main_thread)) {
918     // Check if we really can exit the program now.
919     // If so, end of program!
920     if(time_to_die()) {
921       //// Dump the blocking graph
922       //if( gflags.exit_func_done && conf_dump_blocking_graph ) {
923       //  tdebug("dumping blocking graph from run_next_thread()\n");
924       //  dump_blocking_graph(); 
925       //}
926
927       // Return back to glibc and exit the program!
928           // First set a global flag so no other vcores try to pull new threads off
929           // of any lists (these lists are about to be deallocated...)
930       mcs_lock_lock(&thread_lock);
931       gflags.exit_func_done = TRUE;
932       mcs_lock_unlock(&thread_lock);
933
934       printf("Dying with %d vcores\n", num_vcores());
935       printf("Program exiting normally!\n");
936       return;
937     }
938     // Otherwise, suspend ourselves to be woken up when it is time to die
939     else {
940       // Suspend myself
941       thread_suspend_self(0);
942     }
943   }
944   // Otherwise...
945   // Update thread counts and resume blocked threads
946   mcs_lock_lock(&thread_lock);
947   num_threads.running--;
948   num_threads.zombie++;
949   t->state = ZOMBIE;
950
951   // Check if it's time to die now. If it is, wakeup the main thread so we can
952   // exit the program
953   if(unlikely(time_to_die()))
954       __thread_resume(main_thread);
955
956   // Otherwise, if the thread is joinable, resume the thread that joined on it.
957   // If no one has joined on it yet, we have alreadu set its thread state to
958   // ZOMBIE so that the thread that eventually tries to join on it can see
959   // this, and free it.
960   else if(likely(t->joinable)) {
961     t->ret = ret;
962     if (t->join_thread) {
963       __thread_resume(t->join_thread);
964     }
965   }
966
967   // Otherwise, update the count of detached threads and put the thread back on
968   // the scheduler queue. The thread will be freed by the scheduler the next
969   // time it attempts to run.
970   else {
971     num_threads.detached--;
972     sched_add_thread(t);
973   }
974
975   // Check to see if we now have less threads than we have vcores.  If so,
976   // prepare to yield the current core back to the system.
977   bool yieldcore = __query_vcore_yield();
978   mcs_lock_unlock(&thread_lock);
979
980   tdebug("running: %d, runnable: %d, suspended: %d, detached: %d\n", 
981          num_threads.running, num_threads.runnable, 
982          num_threads.suspended, num_threads.detached);
983
984   /* If we were told to yield the vcore, do it! */
985   if(yieldcore)
986     vcore_yield();
987           
988   /* Otherwise switch back to vcore context to schedule the next thread. */
989   switch_to_vcore();
990   assert(0);
991 }
992
993 int thread_join(thread_t *t, void **ret)
994 {
995   tdebug("Enter\n");
996   // Return errors if the argument is bogus
997   if(t == NULL)
998     return_errno(FALSE, EINVAL);
999   if(!t->joinable)
1000     return_errno(FALSE, EINVAL);
1001
1002   // A thread can be joined only once
1003   if(t->join_thread)   
1004     return_errno(FALSE, EACCES);   
1005
1006   // Wait for the thread to complete
1007   tdebug( "thread state: %d\n" ,t->state);
1008   mcs_lock_lock(&thread_lock);
1009   if(t->state != ZOMBIE) {
1010     t->join_thread = current_thread;
1011     mcs_lock_unlock(&thread_lock);
1012         CAP_SET_SYSCALL();
1013     thread_suspend_self(0);
1014     CAP_CLEAR_SYSCALL();
1015     mcs_lock_lock(&thread_lock);
1016   }
1017
1018   // Set the return value
1019   if(ret != NULL) 
1020     *ret = t->ret;
1021
1022   // Free the memory associated with the joined thread. 
1023   __free_thread_prep(t);
1024   mcs_lock_unlock(&thread_lock);
1025   free_thread(t);
1026
1027   tdebug("Exit\n");
1028   return TRUE;
1029 }
1030
1031 // Only resume the thread internally
1032 // Don't touch the timeout flag and the sleep queue
1033 // Call to this needs to be protected by the thread_lock
1034 static void __thread_make_runnable(thread_t *t)
1035 {
1036   tdebug("Enter\n");
1037   tdebug("t=%p\n",t);
1038   if (t->state != SUSPENDED)
1039     return;
1040
1041   assert(t->state == SUSPENDED);
1042   assert( t->sleep == -1 );
1043   t->state = RUNNABLE;
1044   num_threads.suspended--;
1045   num_threads.runnable++;
1046   sched_add_thread(t);
1047
1048   tdebug("running: %d, runnable: %d, suspended: %d, detached: %d\n", 
1049          num_threads.running, num_threads.runnable, 
1050          num_threads.suspended, num_threads.detached);
1051
1052   tdebug("Exit\n");
1053 }
1054
1055 // Resume a sleeping thread
1056 // Call to this needs to be protected by the thread_lock
1057 static void __thread_resume(thread_t *t)
1058 {
1059   // Remove the thread from the sleep queue
1060   if (t->sleep != -1)
1061     sleepq_remove_thread(t);
1062
1063   // Make the thread runnable
1064   __thread_make_runnable(t);
1065 }
1066
1067 void thread_resume(thread_t *t)
1068 {
1069   mcs_lock_lock(&thread_lock);
1070   __thread_resume(t);
1071   bool requestcore = __query_vcore_request();
1072   mcs_lock_unlock(&thread_lock);
1073
1074   // Maybe request a new vcore if we are running low
1075   if(requestcore) vcore_request(1);
1076 }
1077
1078 void thread_set_detached(thread_t *t)
1079 {
1080   if(!t->joinable)
1081     return;
1082   
1083   mcs_lock_lock(&thread_lock);
1084   t->joinable = 0;
1085   num_threads.detached++;
1086   mcs_lock_unlock(&thread_lock);
1087
1088   tdebug("running: %d, runnable: %d, suspended: %d, detached: %d\n", 
1089          num_threads.running, num_threads.runnable, 
1090          num_threads.suspended, num_threads.detached);
1091 }
1092
1093 inline char* thread_name(thread_t *t)
1094 {
1095   return t->name;
1096 }
1097
1098 void thread_exit_program(int exitcode)
1099 {
1100   raise( SIGINT );
1101   syscall(SYS_proc_destroy, exitcode);
1102 }
1103
1104 // Thread attribute handling
1105 thread_attr_t thread_attr_of(thread_t *t) {
1106   thread_attr_t attr = (thread_attr_t)malloc(sizeof(struct _thread_attr));
1107   attr->thread = t;
1108   return attr;
1109 }
1110
1111 thread_attr_t thread_attr_new()
1112 {
1113   thread_attr_t attr = (thread_attr_t)malloc(sizeof(struct _thread_attr));
1114   attr->thread = NULL;
1115   thread_attr_init(attr);
1116   return attr;
1117 }
1118
1119 int thread_attr_init(thread_attr_t attr)
1120 {
1121   if (attr == NULL)
1122     return_errno(FALSE, EINVAL);
1123   if (attr->thread)
1124     return_errno(FALSE, EPERM);
1125   attr->joinable = TRUE;
1126   return TRUE;
1127 }
1128
1129 int thread_attr_set(thread_attr_t attr, int field, ...)
1130 {
1131   va_list ap;
1132   int rc = TRUE;
1133   if(attr == NULL) 
1134     return EINVAL;
1135
1136   va_start(ap, field);
1137   switch (field) {
1138   case THREAD_ATTR_JOINABLE: {
1139     int val = va_arg(ap, int);
1140     if(attr->thread == NULL) {
1141       if( val == THREAD_CREATE_JOINABLE ) 
1142         attr->joinable = TRUE;
1143       else
1144         attr->joinable = FALSE;
1145     } else {
1146       if( val == THREAD_CREATE_JOINABLE ) 
1147         attr->thread->joinable = 1;
1148       else
1149         attr->thread->joinable = 0;
1150     }
1151     break;
1152   }
1153   default:
1154     notimplemented(thread_attr_set);
1155   }
1156   va_end(ap);
1157   return rc;
1158 }
1159
1160 int thread_attr_get(thread_attr_t attr, int field, ...) 
1161 {
1162   va_list ap;
1163   int rc = TRUE;
1164   va_start(ap, field);
1165   switch (field) {
1166   case THREAD_ATTR_JOINABLE: {
1167     int *val = va_arg(ap, int *);
1168     int joinable = (attr->thread == NULL) ? attr->joinable : attr->thread->joinable;
1169     *val = joinable ? THREAD_CREATE_JOINABLE : THREAD_CREATE_DETACHED;
1170   }
1171   default:
1172     notimplemented(thread_attr_get);
1173   }
1174   va_end(ap);
1175   return rc;
1176 }
1177
1178 int thread_attr_destroy(thread_attr_t attr)
1179 {
1180   free(attr);
1181   return TRUE;
1182 }
1183
1184 // Thread-specific storage
1185 struct thread_keytab_st {
1186     int used;
1187     void (*destructor)(void *);
1188 };
1189
1190 static struct thread_keytab_st thread_keytab[THREAD_KEY_MAX];
1191
1192 int thread_key_create(thread_key_t *key, void (*func)(void *))
1193 {
1194     for ((*key) = 0; (*key) < THREAD_KEY_MAX; (*key)++) {
1195         if (thread_keytab[(*key)].used == FALSE) {
1196             thread_keytab[(*key)].used = TRUE;
1197             thread_keytab[(*key)].destructor = func;
1198             return TRUE;
1199         }
1200     }
1201     return_errno(FALSE, EAGAIN);
1202 }
1203
1204 int thread_key_delete(thread_key_t key)
1205 {
1206     if (key >= THREAD_KEY_MAX)
1207         return_errno(FALSE, EINVAL);
1208     if (!thread_keytab[key].used)
1209         return_errno(FALSE, EINVAL);
1210     thread_keytab[key].used = FALSE;
1211     return TRUE;
1212 }
1213
1214 int thread_key_setdata(thread_key_t key, const void *value)
1215 {
1216     if (key >= THREAD_KEY_MAX)
1217         return_errno(FALSE, EINVAL);
1218     if (!thread_keytab[key].used)
1219         return_errno(FALSE, EINVAL);
1220     if (current_thread->key_data_value == NULL) {
1221         current_thread->key_data_value = (const void **)calloc(1, sizeof(void *)*THREAD_KEY_MAX);
1222         if (current_thread->key_data_value == NULL)
1223             return_errno(FALSE, ENOMEM);
1224     }
1225     if (current_thread->key_data_value[key] == NULL) {
1226         if (value != NULL)
1227             current_thread->key_data_count++;
1228     }
1229     else {
1230         if (value == NULL)
1231             current_thread->key_data_count--;
1232     }
1233     current_thread->key_data_value[key] = value;
1234     return TRUE;
1235 }
1236
1237 void *thread_key_getdata(thread_key_t key)
1238 {
1239     if (key >= THREAD_KEY_MAX)
1240         return_errno(NULL, EINVAL);
1241     if (!thread_keytab[key].used)
1242         return_errno(NULL, EINVAL);
1243     if (current_thread->key_data_value == NULL)
1244         return NULL;
1245     return (void *)current_thread->key_data_value[key];
1246 }
1247
1248 void thread_key_destroydata(thread_t *t)
1249 {
1250     void *data;
1251     int key;
1252     int itr;
1253     void (*destructor)(void *);
1254
1255     if (t == NULL)
1256         return;
1257     if (t->key_data_value == NULL)
1258         return;
1259     /* POSIX thread iteration scheme */
1260     for (itr = 0; itr < THREAD_DESTRUCTOR_ITERATIONS; itr++) {
1261         for (key = 0; key < THREAD_KEY_MAX; key++) {
1262             if (t->key_data_count > 0) {
1263                 destructor = NULL;
1264                 data = NULL;
1265                 if (thread_keytab[key].used) {
1266                     if (t->key_data_value[key] != NULL) {
1267                         data = (void *)t->key_data_value[key];
1268                         t->key_data_value[key] = NULL;
1269                         t->key_data_count--;
1270                         destructor = thread_keytab[key].destructor;
1271                     }
1272                 }
1273                 if (destructor != NULL)
1274                     destructor(data);
1275             }
1276             if (t->key_data_count == 0)
1277                 break;
1278         }
1279         if (t->key_data_count == 0)
1280             break;
1281     }
1282     free(t->key_data_value);
1283     t->key_data_value = NULL;
1284     return;
1285 }
1286
1287 unsigned thread_tid(thread_t *t)
1288 {
1289   return t ? t->tid : 0xffffffff;
1290 }
1291
1292 int __attribute__((unused)) print_sleep_queue(void)
1293 {
1294   linked_list_entry_t *e; 
1295   unsigned long long _total = 0; 
1296   e = ll_view_head(sleepq);
1297
1298   while (e) {
1299     thread_t *tt = (thread_t *)pl_get_pointer(e);
1300     _total += tt->sleep;
1301     output(" %s:  %lld   (%lld)\n", tt->name ? tt->name : "null", tt->sleep, _total );
1302     e = ll_view_next(sleepq, e);
1303   }
1304   return 1;
1305 }
1306
1307 /**
1308  * Put a thread to sleep for the specified timeout 
1309  **/
1310 void thread_usleep(unsigned long long timeout)
1311 {
1312   thread_suspend_self(timeout);
1313 }
1314
1315 /**
1316  * Check sleep queue to wake up all timed-out threads
1317  * sync == TRUE -> force synchronization of last_check_time
1318  **/
1319 static void sleepq_check_wakeup(int sync)
1320 {
1321   // Shortcut to return if no threads sleeping
1322   if (!sync && sleep_times.max == 0) {  
1323     sleep_times.first_wake = 0;
1324     return;
1325   }
1326
1327   // Get interval since last check time and update 
1328   // last check time to now
1329   unsigned long long now;
1330   long long interval;
1331   now = current_usecs();
1332   if( now > sleep_times.last_check ) 
1333     interval = now-sleep_times.last_check;
1334   else 
1335     interval = 0;
1336   sleep_times.last_check = now;
1337
1338   // Adjust max_sleep_time based on the interval just computed
1339   if (sleep_times.max < (unsigned long long)interval)
1340     sleep_times.max = 0;
1341   else
1342     sleep_times.max -= interval;
1343   
1344   // Walk through the sleepq and pull off and resume all threads
1345   // whose remaining sleep time is less than the interval since 
1346   // the last check. If it's greater, update the remaining sleep time
1347   // and set first_wake to now + the new sleep time.
1348   linked_list_entry_t *e;
1349   while (interval > 0 && (e = ll_view_head(sleepq))) {
1350     thread_t *t = (thread_t *)pl_get_pointer(e);
1351
1352     if (t->sleep > interval) {
1353       t->sleep -= interval;
1354       sleep_times.first_wake = now + t->sleep;
1355       break;
1356     }
1357
1358     interval -= t->sleep;
1359     t->sleep = -1;
1360     t->timeout = 1;
1361
1362     mcs_lock_lock(&thread_lock);
1363     ll_free_entry(sleepq, ll_remove_head(sleepq));
1364     __thread_make_runnable(t);
1365     mcs_lock_unlock(&thread_lock);
1366   }
1367
1368   if (ll_size(sleepq) == 0) {
1369      // the sleepq is empty again
1370      sleep_times.first_wake = 0;
1371   }
1372 }
1373
1374 /**
1375  * Set a timer on a thread that will wake up after timeout
1376  * microseconds.  This is used to implement thread_suspend_self(timeout)
1377  **/
1378 static void sleepq_add_thread(thread_t *t, unsigned long long timeout)
1379 {
1380   // Make sure the current thread doesn't already have a sleep time set
1381   assert(t->sleep == -1);
1382
1383   // No need to grab the sleepq_lock before making the following function
1384   // call, as calls to this function should already be protected by it.
1385   sleepq_check_wakeup(TRUE); // make sure: last_check_time == now
1386   
1387   // If the tieout is greater than the maximum sleep time of the 
1388   // longest sleeping thread, update the maximum, set the sleep
1389   // time of the thread (relative to all inserted before it), and
1390   // update the max sleep time
1391   if (timeout >= sleep_times.max) {
1392     // Set sleep_times.first_wake if this is the first item
1393     if( pl_view_head(sleepq) == NULL )
1394       sleep_times.first_wake = current_usecs() + timeout;
1395
1396     // Just append the thread to the end of sleep queue
1397     pl_add_tail(sleepq, t);
1398     t->sleep = timeout - sleep_times.max;
1399     assert( t->sleep >= 0 );
1400     sleep_times.max = timeout;
1401     return;
1402   }
1403
1404   // Otherwise we need to find the proper place to insert the thread in
1405   // the sleep queue, given its timeout length. 
1406   // We search the list backwards.
1407   linked_list_entry_t *e;
1408   long long total_time;
1409   e = ll_view_tail(sleepq);
1410   total_time = sleep_times.max;
1411   while (e) {
1412     thread_t *tt = (thread_t *)pl_get_pointer(e);
1413     assert(tt->sleep >= 0);
1414     total_time -= tt->sleep;
1415
1416     assert (total_time >= 0); // can be == 0 if we are adding the head item
1417     if ((unsigned long long)total_time <= timeout) {
1418       // insert t into the queue
1419       linked_list_entry_t *newp = ll_insert_before(sleepq, e);
1420       pl_set_pointer(newp, t);
1421       t->sleep = timeout - total_time;
1422       assert( t->sleep > 0 );
1423
1424       // set sleep_times.first_wake if this is the first item
1425       if( total_time == 0 )
1426         sleep_times.first_wake = current_usecs() + timeout;
1427
1428       // update the sleep time of the thread right after t
1429       tt->sleep -= t->sleep;
1430       assert( tt->sleep > 0 );
1431       break;
1432     }
1433     e = ll_view_prev(sleepq, e);
1434   }
1435
1436   // We're sure to find such an e
1437   assert (e != NULL);
1438 }
1439
1440 /**
1441  * Remove a sleeping thread from the sleep queue before
1442  * its timer expires.
1443  **/
1444 inline static void sleepq_remove_thread(thread_t *t)
1445 {
1446   // The thread must be in the sleep queue
1447   assert(t->sleep >= 0);  
1448   
1449   // Let's find the thread in the queue
1450   linked_list_entry_t *e;
1451   e = ll_view_head(sleepq);
1452   while (e) {
1453     thread_t *tt = (thread_t *)pl_get_pointer(e);
1454     if (tt == t) {
1455       linked_list_entry_t *nexte = ll_view_next(sleepq, e);
1456       if (nexte) {
1457             // e is not the last thread in the queue
1458             // we need to lengthen the time the next thread will sleep
1459             thread_t *nextt = (thread_t *)pl_get_pointer(nexte);
1460             nextt->sleep += t->sleep;
1461       } else {
1462             // e is the last thread, so we need to adjust max_sleep_time
1463             sleep_times.max -= t->sleep;
1464       }
1465       // remove t
1466       ll_remove_entry(sleepq, e);
1467       ll_free_entry(sleepq, e);
1468       t->sleep = -1;
1469       assert (!t->timeout);    // if this fails, someone must have
1470                                // forgotten to reset timeout some time ago
1471       break;
1472     }
1473     e = ll_view_next(sleepq, e);
1474   }
1475   assert( t->sleep == -1);
1476   assert (e != NULL);   // we must find t in sleep queue
1477 }
1478
1479 /**
1480  * Set the IO polling function.  Used by the aio routines.  Shouldn't
1481  * be used elsewhere.
1482  **/
1483 void set_io_polling_func(void (*func)(long long))
1484 {
1485   assert( !io_polling_func );
1486   io_polling_func = func;
1487 }
1488
1489
1490 /******************************************************************************/
1491 /****************************** Helper Functions ******************************/
1492 /******************************************************************************/
1493
1494 static int get_stack_size_kb_log2(void *func)
1495 {
1496   int result = conf_new_stack_kb_log2;
1497   if (start_node_addrs != NULL) {
1498     int i = 0;
1499     while (start_node_addrs[i] != NULL && start_node_addrs[i] != func) {
1500       i++;
1501     }
1502     if (start_node_addrs[i] == func) {
1503       result = start_node_stacks[i];
1504     } else {
1505       fatal("Couldn't find stack size for thread entry point %p\n", func);
1506     }
1507   }
1508   return result;
1509 }
1510
1511