Update pthread futex_waiti() to unlock after yield
[akaros.git] / user / pthread / futex.c
1 #include <ros/common.h>
2 #include <futex.h>
3 #include <sys/queue.h>
4 #include <pthread.h>
5 #include <parlib.h>
6 #include <assert.h>
7 #include <stdio.h>
8 #include <errno.h>
9 #include <slab.h>
10 #include <mcs.h>
11
12 static inline int futex_wake(int *uaddr, int count);
13 static inline int futex_wait(int *uaddr, int val, uint64_t ms_timeout);
14 static void *timer_thread(void *arg);
15
16 struct futex_element {
17   TAILQ_ENTRY(futex_element) link;
18   pthread_t pthread;
19   int *uaddr;
20   uint64_t ms_timeout;
21   bool timedout;
22 };
23 TAILQ_HEAD(futex_queue, futex_element);
24
25 struct futex_data {
26   struct mcs_pdr_lock lock;
27   struct futex_queue queue;
28   int timer_enabled;
29   pthread_t timer;
30   long time;
31 };
32 static struct futex_data __futex;
33
34 static inline void futex_init()
35 {
36   mcs_pdr_init(&__futex.lock);
37   TAILQ_INIT(&__futex.queue);
38   __futex.timer_enabled = false;
39   pthread_create(&__futex.timer, NULL, timer_thread, NULL);
40   __futex.time = 0;
41 }
42
43 static void __futex_block(struct uthread *uthread, void *arg) {
44   pthread_t pthread = (pthread_t)uthread;
45   struct futex_element *e = (struct futex_element*)arg;
46
47   // Set the remaining properties of the futex element
48   e->pthread = pthread;
49   e->timedout = false;
50   // Adjust the timeout properties on the futex element
51   bool enable_timer = false;
52   if(e->ms_timeout != (uint64_t)-1) {
53     e->ms_timeout += __futex.time;
54         // If we are setting the timeout, get ready to
55         // enable the timer if it is currently disabled.
56     if(__futex.timer_enabled == false) {
57       __futex.timer_enabled = true;
58       enable_timer = true;
59     }
60   }
61   // Notify the scheduler of the type of yield we did
62   __pthread_generic_yield(pthread);
63   pthread->state = PTH_BLK_MUTEX;
64   // Insert the futex element into the queue
65   TAILQ_INSERT_TAIL(&__futex.queue, e, link);
66   // Unlock the pdr_lock 
67   mcs_pdr_unlock(&__futex.lock);
68   // Enable the timer if we need to outside the lock
69   if(enable_timer)
70     futex_wake(&__futex.timer_enabled, 1);
71 }
72
73 static inline int futex_wait(int *uaddr, int val, uint64_t ms_timeout)
74 {
75   // Atomically do the following...
76   mcs_pdr_lock(&__futex.lock);
77   // If the value of *uaddr matches val
78   if(*uaddr == val) {
79     // Create a new futex element and initialize it.
80     struct futex_element e;
81     e.uaddr = uaddr;
82     e.ms_timeout = ms_timeout;
83     // Yield the uthread...
84     // We set the remaining properties of the futex element, adjust the
85     // timeout, set the timer, and unlock the pdr lock on the other side.
86     // It is important that we do the unlock on the other side, because (unlike
87     // linux, etc.) its possible to get interrupted and drop into vcore context
88     // right after releasing the lock.  If that vcore code then calls
89     // futex_wake(), we would be screwed.  Doing things this way means we have
90     // to hold the lock longer, but its necessary for correctness.
91     uthread_yield(TRUE, __futex_block, &e);
92     // We are unlocked here!
93
94     // After waking, if we timed out, set the error
95     // code appropriately and return
96     if(e.timedout) {
97       errno = ETIMEDOUT;
98       return -1;
99     }
100   } else {
101       mcs_pdr_unlock(&__futex.lock);
102   }
103   return 0;
104 }
105
106 static inline int futex_wake(int *uaddr, int count)
107 {
108   struct futex_element *e,*n = NULL;
109   struct futex_queue q = TAILQ_HEAD_INITIALIZER(q);
110
111   // Atomically grab all relevant futex blockers
112   // from the global futex queue
113   mcs_pdr_lock(&__futex.lock);
114   e = TAILQ_FIRST(&__futex.queue);
115   while(e != NULL) {
116     if(count > 0) {
117       n = TAILQ_NEXT(e, link);
118       if(e->uaddr == uaddr) {
119         TAILQ_REMOVE(&__futex.queue, e, link);
120         TAILQ_INSERT_TAIL(&q, e, link);
121         count--;
122       }
123       e = n;
124     }
125     else break;
126   }
127   mcs_pdr_unlock(&__futex.lock);
128
129   // Unblock them outside the lock
130   e = TAILQ_FIRST(&q);
131   while(e != NULL) {
132     n = TAILQ_NEXT(e, link);
133     TAILQ_REMOVE(&q, e, link);
134     uthread_runnable((struct uthread*)e->pthread);
135     e = n;
136   }
137   return 0;
138 }
139
140 static void *timer_thread(void *arg)
141 {
142   struct futex_element *e,*n = NULL;
143   struct futex_queue q = TAILQ_HEAD_INITIALIZER(q);
144
145   // Do this forever...
146   for(;;) {
147     // Block for 1 millisecond
148     sys_block(1000);
149
150     // Then atomically do the following...
151     mcs_pdr_lock(&__futex.lock);
152     // Up the time
153     __futex.time++;
154
155         // Find all futexes that have timed out on this iteration,
156         // and count those still waiting
157     int waiting = 0;
158     e = TAILQ_FIRST(&__futex.queue);
159     while(e != NULL) {
160       n = TAILQ_NEXT(e, link);
161       if(e->ms_timeout == __futex.time) {
162         e->timedout = true;
163         TAILQ_REMOVE(&__futex.queue, e, link);
164         TAILQ_INSERT_TAIL(&q, e, link);
165       }
166       else if(e->ms_timeout != (uint64_t)-1)
167         waiting++;
168       e = n;
169     }
170     // If there are no more waiting, disable the timer
171     if(waiting == 0) {
172       __futex.time = 0;
173       __futex.timer_enabled = false;
174     }
175     mcs_pdr_unlock(&__futex.lock);
176
177     // Unblock any futexes that have timed out outside the lock
178     e = TAILQ_FIRST(&q);
179     while(e != NULL) {
180       n = TAILQ_NEXT(e, link);
181       TAILQ_REMOVE(&q, e, link);
182       uthread_runnable((struct uthread*)e->pthread);
183       e = n;
184     }
185
186     // If we have disabled the timer, park this thread
187     futex_wait(&__futex.timer_enabled, false, -1);
188   }
189 }
190
191 int futex(int *uaddr, int op, int val,
192           const struct timespec *timeout,
193           int *uaddr2, int val3)
194 {
195   uint64_t ms_timeout = (uint64_t)-1;
196   assert(uaddr2 == NULL);
197   assert(val3 == 0);
198   if(timeout != NULL) {
199     ms_timeout = timeout->tv_sec*1000 + timeout->tv_nsec/1000000L;
200     assert(ms_timeout > 0);
201   }
202
203   run_once(futex_init());
204   switch(op) {
205     case FUTEX_WAIT:
206       return futex_wait(uaddr, val, ms_timeout);
207     case FUTEX_WAKE:
208       return futex_wake(uaddr, val);
209     default:
210       errno = ENOSYS;
211       return -1;
212   }
213   return -1;
214 }
215