Add basic timeout functionality to pthread futexes
[akaros.git] / user / pthread / futex.c
1 #include <ros/common.h>
2 #include <futex.h>
3 #include <sys/queue.h>
4 #include <pthread.h>
5 #include <parlib.h>
6 #include <assert.h>
7 #include <stdio.h>
8 #include <errno.h>
9 #include <slab.h>
10 #include <mcs.h>
11
12 static inline int futex_wake(int *uaddr, int count);
13 static inline int futex_wait(int *uaddr, int val, uint64_t ms_timeout);
14 static void *timer_thread(void *arg);
15
16 struct futex_element {
17   TAILQ_ENTRY(futex_element) link;
18   pthread_t pthread;
19   int *uaddr;
20   uint64_t ms_timeout;
21   bool timedout;
22 };
23 TAILQ_HEAD(futex_queue, futex_element);
24
25 struct futex_data {
26   struct mcs_pdr_lock lock;
27   struct futex_queue queue;
28   int timer_enabled;
29   pthread_t timer;
30   long time;
31 };
32 static struct futex_data __futex;
33
34 static inline void futex_init()
35 {
36   mcs_pdr_init(&__futex.lock);
37   TAILQ_INIT(&__futex.queue);
38   __futex.timer_enabled = false;
39   pthread_create(&__futex.timer, NULL, timer_thread, NULL);
40   __futex.time = 0;
41 }
42
43 static void __futex_block(struct uthread *uthread, void *arg) {
44   pthread_t pthread = (pthread_t)uthread;
45   struct futex_element *e = (struct futex_element*)arg;
46   __pthread_generic_yield(pthread);
47   pthread->state = PTH_BLK_MUTEX;
48   e->pthread = pthread;
49 }
50
51 static inline int futex_wait(int *uaddr, int val, uint64_t ms_timeout)
52 {
53   // Atomically do the following...
54   mcs_pdr_lock(&__futex.lock);
55   // If the value of *uaddr matches val
56   if(*uaddr == val) {
57     // Create a new futex element and initialize it.
58     struct futex_element e;
59     bool enable_timer = false;
60     e.uaddr = uaddr;
61     e.pthread = NULL;
62     e.ms_timeout = ms_timeout;
63     e.timedout = false;
64     if(e.ms_timeout != (uint64_t)-1) {
65       e.ms_timeout += __futex.time;
66           // If we are setting the timeout, get ready to
67           // enable the timer if it is currently disabled.
68       if(__futex.timer_enabled == false) {
69         __futex.timer_enabled = true;
70         enable_timer = true;
71       }
72     }
73     // Insert the futex element into the queue
74     TAILQ_INSERT_TAIL(&__futex.queue, &e, link);
75     mcs_pdr_unlock(&__futex.lock);
76
77     // Enable the timer if we need to outside the lock
78     if(enable_timer)
79       futex_wake(&__futex.timer_enabled, 1);
80
81     // Yield the current uthread
82     uthread_yield(TRUE, __futex_block, &e);
83
84         // After waking, if we timed out, set the error
85         // code appropriately and return
86     if(e.timedout) {
87       errno = ETIMEDOUT;
88       return -1;
89     }
90   }
91   else {
92     mcs_pdr_unlock(&__futex.lock);
93   }
94   return 0;
95 }
96
97 static inline int futex_wake(int *uaddr, int count)
98 {
99   struct futex_element *e,*n = NULL;
100   struct futex_queue q = TAILQ_HEAD_INITIALIZER(q);
101
102   // Atomically grab all relevant futex blockers
103   // from the global futex queue
104   mcs_pdr_lock(&__futex.lock);
105   e = TAILQ_FIRST(&__futex.queue);
106   while(e != NULL) {
107     if(count > 0) {
108       n = TAILQ_NEXT(e, link);
109       if(e->uaddr == uaddr) {
110         TAILQ_REMOVE(&__futex.queue, e, link);
111         TAILQ_INSERT_TAIL(&q, e, link);
112         count--;
113       }
114       e = n;
115     }
116     else break;
117   }
118   mcs_pdr_unlock(&__futex.lock);
119
120   // Unblock them outside the lock
121   e = TAILQ_FIRST(&q);
122   while(e != NULL) {
123     n = TAILQ_NEXT(e, link);
124     TAILQ_REMOVE(&q, e, link);
125     while(e->pthread == NULL)
126       cpu_relax();
127     uthread_runnable((struct uthread*)e->pthread);
128     e = n;
129   }
130   return 0;
131 }
132
133 static void *timer_thread(void *arg)
134 {
135   struct futex_element *e,*n = NULL;
136   struct futex_queue q = TAILQ_HEAD_INITIALIZER(q);
137
138   // Do this forever...
139   for(;;) {
140     // Block for 1 millisecond
141     sys_block(1000);
142
143     // Then atomically do the following...
144     mcs_pdr_lock(&__futex.lock);
145     // Up the time
146     __futex.time++;
147
148         // Find all futexes that have timed out on this iteration,
149         // and count those still waiting
150     int waiting = 0;
151     e = TAILQ_FIRST(&__futex.queue);
152     while(e != NULL) {
153       n = TAILQ_NEXT(e, link);
154       if(e->ms_timeout == __futex.time) {
155         e->timedout = true;
156         TAILQ_REMOVE(&__futex.queue, e, link);
157         TAILQ_INSERT_TAIL(&q, e, link);
158       }
159       else if(e->ms_timeout != (uint64_t)-1)
160         waiting++;
161       e = n;
162     }
163     // If there are no more waiting, disable the timer
164     if(waiting == 0) {
165       __futex.time = 0;
166       __futex.timer_enabled = false;
167     }
168     mcs_pdr_unlock(&__futex.lock);
169
170     // Unblock any futexes that have timed out outside the lock
171     e = TAILQ_FIRST(&q);
172     while(e != NULL) {
173       n = TAILQ_NEXT(e, link);
174       TAILQ_REMOVE(&q, e, link);
175       while(e->pthread == NULL)
176         cpu_relax();
177       uthread_runnable((struct uthread*)e->pthread);
178       e = n;
179     }
180
181     // If we have disabled the timer, park this thread
182     futex_wait(&__futex.timer_enabled, false, -1);
183   }
184 }
185
186 int futex(int *uaddr, int op, int val,
187           const struct timespec *timeout,
188           int *uaddr2, int val3)
189 {
190   uint64_t ms_timeout = (uint64_t)-1;
191   assert(uaddr2 == NULL);
192   assert(val3 == 0);
193   if(timeout != NULL) {
194     ms_timeout = timeout->tv_sec*1000 + timeout->tv_nsec/1000000L;
195     assert(ms_timeout > 0);
196   }
197
198   run_once(futex_init());
199   switch(op) {
200     case FUTEX_WAIT:
201       return futex_wait(uaddr, val, ms_timeout);
202     case FUTEX_WAKE:
203       return futex_wake(uaddr, val);
204     default:
205       errno = ENOSYS;
206       return -1;
207   }
208   return -1;
209 }
210