912a13ca7eea492ee9ad3c92747e5bf0e902b947
[akaros.git] / user / parlib / bthread.c
1 #include <bthread.h>
2 #include <vcore.h>
3 #include <mcs.h>
4 #include <stdlib.h>
5 #include <string.h>
6 #include <assert.h>
7 #include <stdio.h>
8 #include <errno.h>
9 #include <arch/atomic.h>
10
11 int threads_active = 1;
12 mcs_lock_t work_queue_lock = MCS_LOCK_INIT;
13 bthread_t work_queue_head = 0;
14 bthread_t work_queue_tail = 0;
15 bthread_once_t init_once = BTHREAD_ONCE_INIT;
16 bthread_t active_threads[MAX_VCORES] = {0};
17
18 void queue_insert(bthread_t* head, bthread_t* tail, bthread_t node)
19 {
20   node->next = 0;
21   if(*head == 0)
22     *head = node;
23   else
24     (*tail)->next = node;
25   *tail = node;
26 }
27
28 bthread_t queue_remove(bthread_t* head, bthread_t* tail)
29 {
30   bthread_t node = *head;
31   *head = (*head)->next;
32   if(*head == 0)
33     *tail = 0;
34   node->next = 0;
35   return node;
36 }
37
38 void vcore_entry()
39 {
40   struct mcs_lock_qnode local_qn = {0};
41   bthread_t node = NULL;
42   while(1)
43   {
44     mcs_lock_lock(&work_queue_lock, &local_qn);
45     if(work_queue_head)
46       node = queue_remove(&work_queue_head,&work_queue_tail);
47     mcs_lock_unlock(&work_queue_lock, &local_qn);
48
49     if(node)
50       break;
51     cpu_relax();
52   }
53
54   active_threads[vcore_id()] = node;
55
56   bthread_exit(node->start_routine(node->arg));
57 }
58
59 void _bthread_init()
60 {
61   // if we allocated active_threads dynamically, we'd do so here
62 }
63
64 int bthread_attr_init(bthread_attr_t *a)
65 {
66   return 0;
67 }
68
69 int bthread_attr_destroy(bthread_attr_t *a)
70 {
71   return 0;
72 }
73
74 int bthread_create(bthread_t* thread, const bthread_attr_t* attr,
75                    void *(*start_routine)(void *), void* arg)
76 {
77   struct mcs_lock_qnode local_qn = {0};
78   bthread_once(&init_once,&_bthread_init);
79
80   *thread = (bthread_t)malloc(sizeof(work_queue_t));
81   (*thread)->start_routine = start_routine;
82   (*thread)->arg = arg;
83   (*thread)->next = 0;
84   (*thread)->finished = 0;
85   (*thread)->detached = 0;
86   mcs_lock_lock(&work_queue_lock, &local_qn);
87   {
88     threads_active++;
89     queue_insert(&work_queue_head,&work_queue_tail,*thread);
90     // don't return until we get a vcore
91     while(threads_active > num_vcores() && vcore_request(1));
92   }
93   mcs_lock_unlock(&work_queue_lock, &local_qn);
94
95   return 0;
96 }
97
98 int bthread_join(bthread_t t, void** arg)
99 {
100   volatile bthread_t thread = t;
101   while(!thread->finished);
102   if(arg) *arg = thread->arg;
103   free(thread);
104   return 0;
105 }
106
107 int bthread_mutexattr_init(bthread_mutexattr_t* attr)
108 {
109   attr->type = BTHREAD_MUTEX_DEFAULT;
110   return 0;
111 }
112
113 int bthread_mutexattr_destroy(bthread_mutexattr_t* attr)
114 {
115   return 0;
116 }
117
118
119 int bthread_attr_setdetachstate(bthread_attr_t *__attr,int __detachstate) {
120         *__attr = __detachstate;
121         return 0;
122 }
123
124 int bthread_mutexattr_gettype(const bthread_mutexattr_t* attr, int* type)
125 {
126   *type = attr ? attr->type : BTHREAD_MUTEX_DEFAULT;
127   return 0;
128 }
129
130 int bthread_mutexattr_settype(bthread_mutexattr_t* attr, int type)
131 {
132   if(type != BTHREAD_MUTEX_NORMAL)
133     return EINVAL;
134   attr->type = type;
135   return 0;
136 }
137
138 int bthread_mutex_init(bthread_mutex_t* m, const bthread_mutexattr_t* attr)
139 {
140   m->attr = attr;
141   atomic_init(&m->lock, 0);
142   return 0;
143 }
144
145 int bthread_mutex_lock(bthread_mutex_t* m)
146 {
147   while(bthread_mutex_trylock(m))
148     while(*(volatile size_t*)&m->lock);
149   return 0;
150 }
151
152 int bthread_mutex_trylock(bthread_mutex_t* m)
153 {
154   return atomic_swap(&m->lock, 1) == 0 ? 0 : EBUSY;
155 }
156
157 int bthread_mutex_unlock(bthread_mutex_t* m)
158 {
159   atomic_set(&m->lock, 0);
160   return 0;
161 }
162
163 int bthread_mutex_destroy(bthread_mutex_t* m)
164 {
165   return 0;
166 }
167
168 int bthread_cond_init(bthread_cond_t *c, const bthread_condattr_t *a)
169 {
170   c->attr = a;
171   memset(c->waiters,0,sizeof(c->waiters));
172   return 0;
173 }
174
175 int bthread_cond_destroy(bthread_cond_t *c)
176 {
177   return 0;
178 }
179
180 int bthread_cond_broadcast(bthread_cond_t *c)
181 {
182   memset(c->waiters,0,sizeof(c->waiters));
183   return 0;
184 }
185
186 int bthread_cond_signal(bthread_cond_t *c)
187 {
188   int i;
189   for(i = 0; i < max_vcores(); i++)
190   {
191     if(c->waiters[i])
192     {
193       c->waiters[i] = 0;
194       break;
195     }
196   }
197   return 0;
198 }
199
200 int bthread_cond_wait(bthread_cond_t *c, bthread_mutex_t *m)
201 {
202   c->waiters[vcore_id()] = 1;
203   bthread_mutex_unlock(m);
204
205   volatile int* poll = &c->waiters[vcore_id()];
206   while(*poll);
207
208   bthread_mutex_lock(m);
209
210   return 0;
211 }
212
213 int bthread_condattr_init(bthread_condattr_t *a)
214 {
215   a = BTHREAD_PROCESS_PRIVATE;
216   return 0;
217 }
218
219 int bthread_condattr_destroy(bthread_condattr_t *a)
220 {
221   return 0;
222 }
223
224 int bthread_condattr_setpshared(bthread_condattr_t *a, int s)
225 {
226   a->pshared = s;
227   return 0;
228 }
229
230 int bthread_condattr_getpshared(bthread_condattr_t *a, int *s)
231 {
232   *s = a->pshared;
233   return 0;
234 }
235
236 bthread_t bthread_self()
237 {
238   return active_threads[vcore_id()];
239 }
240
241 int bthread_equal(bthread_t t1, bthread_t t2)
242 {
243   return t1 == t2;
244 }
245
246 void bthread_exit(void* ret)
247 {
248   struct mcs_lock_qnode local_qn = {0};
249   bthread_once(&init_once,&_bthread_init);
250
251   bthread_t t = bthread_self();
252
253   mcs_lock_lock(&work_queue_lock, &local_qn);
254   threads_active--;
255   if(threads_active == 0)
256     exit(0);
257   mcs_lock_unlock(&work_queue_lock, &local_qn);
258
259   if(t)
260   {
261     t->arg = ret;
262     t->finished = 1;
263     if(t->detached)
264       free(t);
265   }
266
267   vcore_entry();
268 }
269
270 int bthread_once(bthread_once_t* once_control, void (*init_routine)(void))
271 {
272   if (atomic_swap_u32(once_control, 1) == 0)
273     init_routine();
274   return 0;
275 }
276
277 int bthread_barrier_init(bthread_barrier_t* b, const bthread_barrierattr_t* a, int count)
278 {
279   struct mcs_lock_qnode local_qn = {0};
280   memset(b->local_sense,0,sizeof(b->local_sense));
281
282   b->sense = 0;
283   b->nprocs = b->count = count;
284   mcs_lock_init(&b->lock);
285   return 0;
286 }
287
288 int bthread_barrier_wait(bthread_barrier_t* b)
289 {
290   struct mcs_lock_qnode local_qn = {0};
291   int id = vcore_id();
292   int ls = b->local_sense[32*id] = 1 - b->local_sense[32*id];
293
294   mcs_lock_lock(&b->lock, &local_qn);
295   int count = --b->count;
296   mcs_lock_unlock(&b->lock, &local_qn);
297
298   if(count == 0)
299   {
300     b->count = b->nprocs;
301     b->sense = ls;
302     return BTHREAD_BARRIER_SERIAL_THREAD;
303   }
304   else
305   {
306     while(b->sense != ls);
307     return 0;
308   }
309 }
310
311 int bthread_barrier_destroy(bthread_barrier_t* b)
312 {
313   return 0;
314 }