Initial pthreads library on top of vcores
[akaros.git] / user / parlib / bthread.c
1 #include <bthread.h>
2 #include <vcore.h>
3 #include <mcs.h>
4 #include <stdlib.h>
5 #include <string.h>
6 #include <assert.h>
7 #include <rstdio.h>
8 #include <errno.h>
9 #include <arch/atomic.h>
10
11 int threads_active = 1;
12 mcs_lock_t work_queue_lock = MCS_LOCK_INIT;
13 bthread_t work_queue_head = 0;
14 bthread_t work_queue_tail = 0;
15 bthread_once_t init_once = BTHREAD_ONCE_INIT;
16 bthread_t active_threads[MAX_VCORES] = {0};
17
18 void queue_insert(bthread_t* head, bthread_t* tail, bthread_t node)
19 {
20   node->next = 0;
21   if(*head == 0)
22     *head = node;
23   else
24     (*tail)->next = node;
25   *tail = node;
26 }
27
28 bthread_t queue_remove(bthread_t* head, bthread_t* tail)
29 {
30   bthread_t node = *head;
31   *head = (*head)->next;
32   if(*head == 0)
33     *tail = 0;
34   node->next = 0;
35   return node;
36 }
37
38 void vcore_entry()
39 {
40   bthread_t node = NULL;
41   while(1)
42   {
43     mcs_lock_lock(&work_queue_lock);
44     if(work_queue_head)
45       node = queue_remove(&work_queue_head,&work_queue_tail);
46     mcs_lock_unlock(&work_queue_lock);
47
48     if(node)
49       break;
50     cpu_relax();
51   }
52
53   active_threads[vcore_id()] = node;
54
55   bthread_exit(node->start_routine(node->arg));
56 }
57
58 void _bthread_init()
59 {
60   // if we allocated active_threads dynamically, we'd do so here
61 }
62
63 int bthread_attr_init(bthread_attr_t *a)
64 {
65   return 0;
66 }
67
68 int bthread_attr_destroy(bthread_attr_t *a)
69 {
70   return 0;
71 }
72
73 int bthread_create(bthread_t* thread, const bthread_attr_t* attr,
74                    void *(*start_routine)(void *), void* arg)
75 {
76   bthread_once(&init_once,&_bthread_init);
77
78   *thread = (bthread_t)malloc(sizeof(work_queue_t));
79   (*thread)->start_routine = start_routine;
80   (*thread)->arg = arg;
81   (*thread)->next = 0;
82   (*thread)->finished = 0;
83   (*thread)->detached = 0;
84   mcs_lock_lock(&work_queue_lock);
85   {
86     threads_active++;
87     queue_insert(&work_queue_head,&work_queue_tail,*thread);
88     // don't return until we get a vcore
89     while(threads_active > num_vcores() && vcore_request(1));
90   }
91   mcs_lock_unlock(&work_queue_lock);
92
93   return 0;
94 }
95
96 int bthread_join(bthread_t t, void** arg)
97 {
98   volatile bthread_t thread = t;
99   while(!thread->finished);
100   if(arg) *arg = thread->arg;
101   free(thread);
102   return 0;
103 }
104
105 int bthread_mutexattr_init(bthread_mutexattr_t* attr)
106 {
107   attr->type = BTHREAD_MUTEX_DEFAULT;
108   return 0;
109 }
110
111 int bthread_mutexattr_destroy(bthread_mutexattr_t* attr)
112 {
113   return 0;
114 }
115
116
117 int bthread_attr_setdetachstate(bthread_attr_t *__attr,int __detachstate) {
118         *__attr = __detachstate;
119         return 0;
120 }
121
122 int bthread_mutexattr_gettype(const bthread_mutexattr_t* attr, int* type)
123 {
124   *type = attr ? attr->type : BTHREAD_MUTEX_DEFAULT;
125   return 0;
126 }
127
128 int bthread_mutexattr_settype(bthread_mutexattr_t* attr, int type)
129 {
130   if(type != BTHREAD_MUTEX_NORMAL)
131     return EINVAL;
132   attr->type = type;
133   return 0;
134 }
135
136 int bthread_mutex_init(bthread_mutex_t* m, const bthread_mutexattr_t* attr)
137 {
138   m->attr = attr;
139   m->lock = 0;
140   return 0;
141 }
142
143 int bthread_mutex_lock(bthread_mutex_t* m)
144 {
145   while(bthread_mutex_trylock(m))
146     while(*(volatile size_t*)&m->lock);
147   return 0;
148 }
149
150 int bthread_mutex_trylock(bthread_mutex_t* m)
151 {
152   return atomic_swap(&m->lock,1) == 0 ? 0 : EBUSY;
153 }
154
155 int bthread_mutex_unlock(bthread_mutex_t* m)
156 {
157   m->lock = 0;
158   return 0;
159 }
160
161 int bthread_mutex_destroy(bthread_mutex_t* m)
162 {
163   return 0;
164 }
165
166 int bthread_cond_init(bthread_cond_t *c, const bthread_condattr_t *a)
167 {
168   c->attr = a;
169   memset(c->waiters,0,sizeof(c->waiters));
170   return 0;
171 }
172
173 int bthread_cond_destroy(bthread_cond_t *c)
174 {
175   return 0;
176 }
177
178 int bthread_cond_broadcast(bthread_cond_t *c)
179 {
180   memset(c->waiters,0,sizeof(c->waiters));
181   return 0;
182 }
183
184 int bthread_cond_signal(bthread_cond_t *c)
185 {
186   int i;
187   for(i = 0; i < max_vcores(); i++)
188   {
189     if(c->waiters[i])
190     {
191       c->waiters[i] = 0;
192       break;
193     }
194   }
195   return 0;
196 }
197
198 int bthread_cond_wait(bthread_cond_t *c, bthread_mutex_t *m)
199 {
200   c->waiters[vcore_id()] = 1;
201   bthread_mutex_unlock(m);
202
203   volatile int* poll = &c->waiters[vcore_id()];
204   while(*poll);
205
206   bthread_mutex_lock(m);
207
208   return 0;
209 }
210
211 int bthread_condattr_init(bthread_condattr_t *a)
212 {
213   a = BTHREAD_PROCESS_PRIVATE;
214   return 0;
215 }
216
217 int bthread_condattr_destroy(bthread_condattr_t *a)
218 {
219   return 0;
220 }
221
222 int bthread_condattr_setpshared(bthread_condattr_t *a, int s)
223 {
224   a->pshared = s;
225   return 0;
226 }
227
228 int bthread_condattr_getpshared(bthread_condattr_t *a, int *s)
229 {
230   *s = a->pshared;
231   return 0;
232 }
233
234 bthread_t bthread_self()
235 {
236   return active_threads[vcore_id()];
237 }
238
239 int bthread_equal(bthread_t t1, bthread_t t2)
240 {
241   return t1 == t2;
242 }
243
244 void bthread_exit(void* ret)
245 {
246   bthread_once(&init_once,&_bthread_init);
247
248   bthread_t t = bthread_self();
249
250   mcs_lock_lock(&work_queue_lock);
251   threads_active--;
252   if(threads_active == 0)
253     exit(0);
254   mcs_lock_unlock(&work_queue_lock);
255
256   if(t)
257   {
258     t->arg = ret;
259     t->finished = 1;
260     if(t->detached)
261       free(t);
262   }
263
264   vcore_entry();
265 }
266
267 int bthread_once(bthread_once_t* once_control, void (*init_routine)(void))
268 {
269   if(atomic_swap(once_control,1) == 0)
270     init_routine();
271   return 0;
272 }
273
274 int bthread_barrier_init(bthread_barrier_t* b, const bthread_barrierattr_t* a, int count)
275 {
276   memset(b->local_sense,0,sizeof(b->local_sense));
277
278   b->sense = 0;
279   b->nprocs = b->count = count;
280   mcs_lock_init(&b->lock);
281   return 0;
282 }
283
284 int bthread_barrier_wait(bthread_barrier_t* b)
285 {
286   int id = vcore_id();
287   int ls = b->local_sense[32*id] = 1 - b->local_sense[32*id];
288
289   mcs_lock_lock(&b->lock);
290   int count = --b->count;
291   mcs_lock_unlock(&b->lock);
292
293   if(count == 0)
294   {
295     b->count = b->nprocs;
296     b->sense = ls;
297     return BTHREAD_BARRIER_SERIAL_THREAD;
298   }
299   else
300   {
301     while(b->sense != ls);
302     return 0;
303   }
304 }
305
306 int bthread_barrier_destroy(bthread_barrier_t* b)
307 {
308   return 0;
309 }