Update pthread futexes to use alarms for timeouts
[akaros.git] / user / pthread / futex.c
1 #include <ros/common.h>
2 #include <futex.h>
3 #include <sys/queue.h>
4 #include <pthread.h>
5 #include <parlib.h>
6 #include <assert.h>
7 #include <stdio.h>
8 #include <errno.h>
9 #include <slab.h>
10 #include <mcs.h>
11 #include <alarm.h>
12
13 static inline int futex_wake(int *uaddr, int count);
14 static inline int futex_wait(int *uaddr, int val, uint64_t ms_timeout);
15 static void *timer_thread(void *arg);
16
17 struct futex_element {
18   TAILQ_ENTRY(futex_element) link;
19   pthread_t pthread;
20   int *uaddr;
21   uint64_t us_timeout;
22   struct alarm_waiter awaiter;
23   bool timedout;
24 };
25 TAILQ_HEAD(futex_queue, futex_element);
26
27 struct futex_data {
28   struct mcs_pdr_lock lock;
29   struct futex_queue queue;
30 };
31 static struct futex_data __futex;
32
33 static inline void futex_init()
34 {
35   mcs_pdr_init(&__futex.lock);
36   TAILQ_INIT(&__futex.queue);
37 }
38
39 static void __futex_timeout(struct alarm_waiter *awaiter) {
40   struct futex_element *__e = NULL;
41   struct futex_element *e = (struct futex_element*)awaiter->data;
42
43   // Atomically remove the timed-out element from the futex queue if we won the
44   // race against actually completing.
45   mcs_pdr_lock(&__futex.lock);
46   TAILQ_FOREACH(__e, &__futex.queue, link)
47     if (__e == e) break;
48   if (__e != NULL)
49     TAILQ_REMOVE(&__futex.queue, e, link);
50   mcs_pdr_unlock(&__futex.lock);
51
52   // If we removed it, restart it outside the lock
53   if (__e != NULL) {
54     uthread_runnable((struct uthread*)e->pthread);
55     e->timedout = true;
56   }
57   // Set this as the very last thing we do.  Spin on this in the wake code if
58   // we are trying to wake something that has already fired this timer.
59   awaiter->data = NULL;
60 }
61
62 static void __futex_block(struct uthread *uthread, void *arg) {
63   pthread_t pthread = (pthread_t)uthread;
64   struct futex_element *e = (struct futex_element*)arg;
65
66   // Set the remaining properties of the futex element
67   e->pthread = pthread;
68   e->timedout = false;
69
70   // Insert the futex element into the queue
71   TAILQ_INSERT_TAIL(&__futex.queue, e, link);
72
73   // Set an alarm for futex timeout if applicable
74   if(e->us_timeout != (uint64_t)-1) {
75     e->awaiter.data = e;
76     init_awaiter(&e->awaiter, __futex_timeout);
77     set_awaiter_rel(&e->awaiter, e->us_timeout);
78     set_alarm(&e->awaiter);
79   }
80
81   // Notify the scheduler of the type of yield we did
82   __pthread_generic_yield(pthread);
83   pthread->state = PTH_BLK_MUTEX;
84
85   // Unlock the pdr_lock 
86   mcs_pdr_unlock(&__futex.lock);
87 }
88
89 static inline int futex_wait(int *uaddr, int val, uint64_t us_timeout)
90 {
91   // Atomically do the following...
92   mcs_pdr_lock(&__futex.lock);
93   // If the value of *uaddr matches val
94   if(*uaddr == val) {
95     // Create a new futex element and initialize it.
96     struct futex_element e;
97     e.uaddr = uaddr;
98     e.us_timeout = us_timeout;
99     // Yield the uthread...
100     // We set the remaining properties of the futex element, set the timer, and
101     // unlock the pdr lock on the other side.  It is important that we do the
102     // unlock on the other side, because (unlike linux, etc.) its possible to
103     // get interrupted and drop into vcore context right after releasing the
104     // lock.  If that vcore code then calls futex_wake(), we would be screwed.
105     // Doing things this way means we have to hold the lock longer, but its
106     // necessary for correctness.
107     uthread_yield(TRUE, __futex_block, &e);
108     // We are unlocked here!
109
110     // After waking, if we timed out, set the error
111     // code appropriately and return
112     if(e.timedout) {
113       errno = ETIMEDOUT;
114       return -1;
115     }
116   } else {
117       mcs_pdr_unlock(&__futex.lock);
118   }
119   return 0;
120 }
121
122 static inline int futex_wake(int *uaddr, int count)
123 {
124   struct futex_element *e,*n = NULL;
125   struct futex_queue q = TAILQ_HEAD_INITIALIZER(q);
126
127   // Atomically grab all relevant futex blockers
128   // from the global futex queue
129   mcs_pdr_lock(&__futex.lock);
130   e = TAILQ_FIRST(&__futex.queue);
131   while(e != NULL) {
132     if(count > 0) {
133       n = TAILQ_NEXT(e, link);
134       // If this element is blocked on uaddr
135       if(e->uaddr == uaddr) {
136         // Cancel the timeout if one was set
137         if(e->us_timeout != (uint64_t)-1) {
138           // Try and unset the alarm.  If this fails, then we have already
139           // started running the alarm callback, so spin on awaiter->data being
140           // set to NULL.  The fact that we made it here though, means that WE
141           // are the one who removed e from the queue, so we are basically
142           // spinning just to make sure that no more references to awaiter
143           // exist.
144           if(!unset_alarm(&e->awaiter)) {
145             while(&e->awaiter)
146               cpu_relax();
147           } else {
148           }
149         }
150         // Remove it from the queue
151         TAILQ_REMOVE(&__futex.queue, e, link);
152         TAILQ_INSERT_TAIL(&q, e, link);
153         count--;
154       }
155       e = n;
156     }
157     else break;
158   }
159   mcs_pdr_unlock(&__futex.lock);
160
161   // Unblock them outside the lock
162   e = TAILQ_FIRST(&q);
163   while(e != NULL) {
164     n = TAILQ_NEXT(e, link);
165     TAILQ_REMOVE(&q, e, link);
166     uthread_runnable((struct uthread*)e->pthread);
167     e = n;
168   }
169   return 0;
170 }
171
172 int futex(int *uaddr, int op, int val,
173           const struct timespec *timeout,
174           int *uaddr2, int val3)
175 {
176   // Round to the nearest micro-second
177   uint64_t us_timeout = (uint64_t)-1;
178   assert(uaddr2 == NULL);
179   assert(val3 == 0);
180   if(timeout != NULL) {
181     us_timeout = timeout->tv_sec*1000000L + timeout->tv_nsec/1000L;
182     assert(us_timeout > 0);
183   }
184
185   run_once(futex_init());
186   switch(op) {
187     case FUTEX_WAIT:
188       return futex_wait(uaddr, val, us_timeout);
189     case FUTEX_WAKE:
190       return futex_wake(uaddr, val);
191     default:
192       errno = ENOSYS;
193       return -1;
194   }
195   return -1;
196 }
197