Adding test case for multproducer async calls.
[akaros.git] / user / parlib / asynccall.c
1 #include <stdlib.h>
2
3 #include <ros/common.h>
4 #include <ros/syscall.h>
5 #include <ros/ring_syscall.h>
6 #include <ros/sysevent.h>
7 #include <arc.h>
8 #include <errno.h>
9 #include <arch/arch.h>
10 #include <sys/param.h>
11 #include <arch/atomic.h>
12
13 #include <pthread.h>
14
15 syscall_desc_pool_t syscall_desc_pool;
16 async_desc_pool_t async_desc_pool;
17 async_desc_t* current_async_desc;
18
19 struct arsc_channel global_ac;
20
21 void init_arc(struct arsc_channel* ac)
22 {
23         // Set up the front ring for the general syscall ring
24         // and the back ring for the general sysevent ring
25         mcs_lock_init(&ac->aclock);
26         ac->ring_page = (syscall_sring_t*)sys_init_arsc();
27
28         FRONT_RING_INIT(&ac->sysfr, ac->ring_page, SYSCALLRINGSIZE);
29         //BACK_RING_INIT(&syseventbackring, &(__procdata.syseventring), SYSEVENTRINGSIZE);
30         //TODO: eventually rethink about desc pools, they are here but no longer necessary
31         POOL_INIT(&syscall_desc_pool, MAX_SYSCALLS);
32         POOL_INIT(&async_desc_pool, MAX_ASYNCCALLS);
33 }
34
35 // Wait on all syscalls within this async call.  TODO - timeout or something?
36
37 int waiton_group_call(async_desc_t* desc, async_rsp_t* rsp)
38 {
39         syscall_rsp_t syscall_rsp;
40         syscall_desc_t* d;
41         int err = 0;
42         if (!desc) {
43                 errno = EINVAL;
44                 return -1;
45         }
46
47         while (!(TAILQ_EMPTY(&desc->syslist))) {
48                 d = TAILQ_FIRST(&desc->syslist);
49                 err = waiton_syscall(d, &syscall_rsp);
50                 // TODO: processing the retval out of rsp here.  might be specific to
51                 // the async call.  do we want to accumulate?  return any negative
52                 // values?  depends what we want from the return value, so we might
53                 // have to pass in a function that is used to do the processing and
54                 // pass the answer back out in rsp.
55                 //rsp->retval += syscall_rsp.retval; // For example
56                 rsp->retval = MIN(rsp->retval, syscall_rsp.retval);
57                 // remove from the list and free the syscall desc
58                 TAILQ_REMOVE(&desc->syslist, d, next);
59                 POOL_PUT(&syscall_desc_pool, d);
60         }
61         // run a cleanup function for this desc, if available
62         if (desc->cleanup)
63                 desc->cleanup(desc->data);
64         // free the asynccall desc
65         POOL_PUT(&async_desc_pool, desc);
66         return err;
67 }
68
69 // Finds a free async_desc_t, on which you can wait for a series of syscalls
70 async_desc_t* get_async_desc(void)
71 {
72         async_desc_t* desc = POOL_GET(&async_desc_pool);
73         if (desc) {
74                 // Clear out any data that was in the old desc
75                 memset(desc, 0, sizeof(*desc));
76                 TAILQ_INIT(&desc->syslist);
77         }
78         return desc;
79 }
80
81 // Finds a free sys_desc_t, on which you can wait for a specific syscall, and
82 // binds it to the group desc.
83 syscall_desc_t* get_sys_desc(async_desc_t* desc)
84 {
85         syscall_desc_t* d = POOL_GET(&syscall_desc_pool);
86         if (d) {
87                 // Clear out any data that was in the old desc
88                 memset(d, 0, sizeof(*d));
89         TAILQ_INSERT_TAIL(&desc->syslist, d, next);
90         }
91         return d;
92 }
93
94 // Gets an async and a sys desc, with the sys bound to async.  Also sets
95 // current_async_desc.  This is meant as an easy wrapper when there is only one
96 // syscall for an async call.
97 int get_all_desc(async_desc_t** a_desc, syscall_desc_t** s_desc)
98 {
99         assert(a_desc && s_desc);
100         if ((current_async_desc = get_async_desc()) == NULL){
101                 errno = EBUSY;
102                 return -1;
103         }
104         *a_desc = current_async_desc;
105         if ((*s_desc = get_sys_desc(current_async_desc)))
106                 return 0;
107         // in case we could get an async, but not a syscall desc, then clean up.
108         POOL_PUT(&async_desc_pool, current_async_desc);
109         current_async_desc = NULL;
110         errno = EBUSY;
111         return -1;
112 }
113
114 // This runs one syscall instead of a group. 
115 int async_syscall(syscall_req_t* req, syscall_desc_t** desc_ptr2)
116 {
117         // Note that this assumes one global frontring (TODO)
118         // abort if there is no room for our request.  ring size is currently 64.
119         // we could spin til it's free, but that could deadlock if this same thread
120         // is supposed to consume the requests it is waiting on later.
121         syscall_desc_t* desc = malloc(sizeof (syscall_desc_t));
122         desc->channel = &SYS_CHANNEL;
123         syscall_front_ring_t *fr = &(desc->channel->sysfr);
124         //TODO: can do it locklessly using CAS, but could change with local async calls
125         mcs_lock_lock(&desc->channel->aclock);
126         if (RING_FULL(fr)) {
127                 errno = EBUSY;
128                 return -1;
129         }
130         // req_prod_pvt comes in as the previously produced item.  need to
131         // increment to the next available spot, which is the one we'll work on.
132         // at some point, we need to listen for the responses.
133         desc->idx = ++(fr->req_prod_pvt);
134         syscall_req_t* r = RING_GET_REQUEST(fr, desc->idx);
135         // CAS on the req->status perhaps
136         req->status = REQ_alloc;
137
138         memcpy(r, req, sizeof(syscall_req_t));
139         r->status = REQ_ready;
140         // push our updates to syscallfrontring.req_prod_pvt
141         // note: it is ok to push without protection since it is atomic and kernel
142         // won't process any requests until they are marked REQ_ready (also atomic)
143         RING_PUSH_REQUESTS(fr);
144         //cprintf("DEBUG: sring->req_prod: %d, sring->rsp_prod: %d\n", 
145         mcs_lock_unlock(&desc->channel->aclock);
146         *desc_ptr2 = desc;
147         return 0;
148 }
149
150 // consider a timeout too
151 int waiton_syscall(syscall_desc_t* desc, syscall_rsp_t* rsp)
152 {
153         if (desc == NULL || desc->channel == NULL){
154                 //errno = EFAIL;
155                 return -1;
156         }
157         // Make sure we were given a desc with a non-NULL frontring.  This could
158         // happen if someone forgot to check the error code on the paired syscall.
159         syscall_front_ring_t *fr =  &desc->channel->sysfr;
160         
161         if (!fr){
162                 //errno = EFAIL;
163                 return -1;
164         }
165         syscall_rsp_t* rsp_inring = RING_GET_RESPONSE(fr, desc->idx);
166
167         // ignoring the ring push response from the kernel side now
168         while (rsp_inring->status != RES_ready)
169                 cpu_relax();
170         memcpy(rsp, rsp_inring, sizeof(*rsp));
171         
172         atomic_inc((atomic_t*) &(fr->rsp_cons));
173     // run a cleanup function for this desc, if available
174     //if (rsp->cleanup)
175     //  rsp->cleanup(rsp->data);
176         if (rsp->syserr){
177                 //      errno = rsp->syserr;
178                 return -1;
179         } else 
180                 return 0;
181 }
182
183