Fixes slab page allocators
[akaros.git] / user / parlib / asynccall.c
index f9aefd9..ab525fb 100644 (file)
@@ -7,32 +7,36 @@
 #include <arc.h>
 #include <errno.h>
 #include <arch/arch.h>
+#include <sys/param.h>
+#include <arch/atomic.h>
+#include <vcore.h>
 
-syscall_front_ring_t syscallfrontring;
-sysevent_back_ring_t syseventbackring;
 syscall_desc_pool_t syscall_desc_pool;
 async_desc_pool_t async_desc_pool;
 async_desc_t* current_async_desc;
 
-// use globals for now
-void init_arc()
+struct arsc_channel global_ac;
+
+void init_arc(struct arsc_channel* ac)
 {
        // Set up the front ring for the general syscall ring
        // and the back ring for the general sysevent ring
-       // TODO: Reorganize these global variables
-       FRONT_RING_INIT(&syscallfrontring, &(__procdata.syscallring), SYSCALLRINGSIZE);
-       BACK_RING_INIT(&syseventbackring, &(__procdata.syseventring), SYSEVENTRINGSIZE);
+       mcs_lock_init(&ac->aclock);
+       ac->ring_page = (syscall_sring_t*)sys_init_arsc();
+
+       FRONT_RING_INIT(&ac->sysfr, ac->ring_page, SYSCALLRINGSIZE);
+       //BACK_RING_INIT(&syseventbackring, &(__procdata.syseventring), SYSEVENTRINGSIZE);
+       //TODO: eventually rethink about desc pools, they are here but no longer necessary
        POOL_INIT(&syscall_desc_pool, MAX_SYSCALLS);
        POOL_INIT(&async_desc_pool, MAX_ASYNCCALLS);
-       sys_init_arsc();
-
-
 }
+
 // Wait on all syscalls within this async call.  TODO - timeout or something?
-int waiton_async_call(async_desc_t* desc, async_rsp_t* rsp)
+int waiton_group_call(async_desc_t* desc, async_rsp_t* rsp)
 {
        syscall_rsp_t syscall_rsp;
        syscall_desc_t* d;
+       int retval = 0;
        int err = 0;
        if (!desc) {
                errno = EINVAL;
@@ -41,14 +45,14 @@ int waiton_async_call(async_desc_t* desc, async_rsp_t* rsp)
 
        while (!(TAILQ_EMPTY(&desc->syslist))) {
                d = TAILQ_FIRST(&desc->syslist);
-               err = waiton_syscall(d, &syscall_rsp);
+               err = waiton_syscall(d);
                // TODO: processing the retval out of rsp here.  might be specific to
                // the async call.  do we want to accumulate?  return any negative
                // values?  depends what we want from the return value, so we might
                // have to pass in a function that is used to do the processing and
                // pass the answer back out in rsp.
                //rsp->retval += syscall_rsp.retval; // For example
-               rsp->retval = MIN(rsp->retval, syscall_rsp.retval);
+               retval = MIN(retval, err);
                // remove from the list and free the syscall desc
                TAILQ_REMOVE(&desc->syslist, d, next);
                POOL_PUT(&syscall_desc_pool, d);
@@ -107,52 +111,110 @@ int get_all_desc(async_desc_t** a_desc, syscall_desc_t** s_desc)
 }
 
 // This runs one syscall instead of a group. 
-int async_syscall(syscall_req_t* req, syscall_desc_t* desc)
+
+// TODO: right now there is one channel (remote), in the future, the caller
+// may specify local which will cause it to give up the core to do the work.
+// creation of additional remote channel also allows the caller to prioritize
+// work, because the default policy for the kernel is to roundrobin between them.
+int async_syscall(arsc_channel_t* chan, syscall_req_t* req, syscall_desc_t** desc_ptr2)
 {
        // Note that this assumes one global frontring (TODO)
        // abort if there is no room for our request.  ring size is currently 64.
        // we could spin til it's free, but that could deadlock if this same thread
        // is supposed to consume the requests it is waiting on later.
-       if (RING_FULL(&syscallfrontring)) {
+       syscall_desc_t* desc = malloc(sizeof (syscall_desc_t));
+       desc->channel = chan;
+       syscall_front_ring_t *fr = &(desc->channel->sysfr);
+       //TODO: can do it locklessly using CAS, but could change with local async calls
+       struct mcs_lock_qnode local_qn = {0};
+       mcs_lock_lock(&(chan->aclock), &local_qn);
+       if (RING_FULL(fr)) {
                errno = EBUSY;
                return -1;
        }
        // req_prod_pvt comes in as the previously produced item.  need to
        // increment to the next available spot, which is the one we'll work on.
        // at some point, we need to listen for the responses.
-       desc->idx = ++(syscallfrontring.req_prod_pvt);
-       desc->sysfr = &syscallfrontring;
-       syscall_req_t* r = RING_GET_REQUEST(&syscallfrontring, desc->idx);
+       desc->idx = ++(fr->req_prod_pvt);
+       syscall_req_t* r = RING_GET_REQUEST(fr, desc->idx);
+       // CAS on the req->status perhaps
+       req->status = REQ_alloc;
+
        memcpy(r, req, sizeof(syscall_req_t));
+       r->status = REQ_ready;
        // push our updates to syscallfrontring.req_prod_pvt
-       RING_PUSH_REQUESTS(&syscallfrontring);
+       // note: it is ok to push without protection since it is atomic and kernel
+       // won't process any requests until they are marked REQ_ready (also atomic)
+       RING_PUSH_REQUESTS(fr);
        //cprintf("DEBUG: sring->req_prod: %d, sring->rsp_prod: %d\n", 
-       //   syscallfrontring.sring->req_prod, syscallfrontring.sring->rsp_prod);
+       mcs_lock_unlock(&desc->channel->aclock, &local_qn);
+       *desc_ptr2 = desc;
        return 0;
 }
+// Default convinence wrapper before other method of posting calls are available
+
+syscall_desc_t* arc_call(long int num, ...)
+{
+       va_list vl;
+       va_start(vl,num);
+       struct syscall *p_sysc = malloc(sizeof (struct syscall));
+       syscall_desc_t* desc;
+       if (p_sysc == NULL) {
+               errno = ENOMEM;
+               return 0;
+       }
+       p_sysc->num = num;
+       p_sysc->arg0 = va_arg(vl,long int);
+       p_sysc->arg1 = va_arg(vl,long int);
+       p_sysc->arg2 = va_arg(vl,long int);
+       p_sysc->arg3 = va_arg(vl,long int);
+       p_sysc->arg4 = va_arg(vl,long int);
+       p_sysc->arg5 = va_arg(vl,long int);
+       va_end(vl);
+       syscall_req_t arc = {REQ_alloc,NULL, NULL, p_sysc};
+       async_syscall(&SYS_CHANNEL, &arc, &desc);
+       printf ( "%d pushed at %p \n", desc);
+       return desc;
+}
 
 // consider a timeout too
-int waiton_syscall(syscall_desc_t* desc, syscall_rsp_t* rsp)
+// Wait until arsc returns, caller provides rsp buffer.
+// eventually change this to return ret_val, set errno
+
+// What if someone calls waiton the same desc several times?
+int waiton_syscall(syscall_desc_t* desc)
 {
-       // Make sure we were given a desc with a non-NULL frontring.  This could
-       // happen if someone forgot to check the error code on the paired syscall.
-       if (!desc->sysfr){
+       int retval = 0;
+       if (desc == NULL || desc->channel == NULL){
                errno = EFAIL;
                return -1;
        }
-       // this forces us to call wait in the order in which the syscalls are made.
-       if (desc->idx != desc->sysfr->rsp_cons + 1){
-               errno = EDEADLOCK;
+       // Make sure we were given a desc with a non-NULL frontring.  This could
+       // happen if someone forgot to check the error code on the paired syscall.
+       syscall_front_ring_t *fr =  &desc->channel->sysfr;
+       
+       if (!fr){
+               errno = EFAIL;
                return -1;
        }
-       while (!(RING_HAS_UNCONSUMED_RESPONSES(desc->sysfr)))
+       printf("waiting %d\n", vcore_id());
+       syscall_rsp_t* rsp = RING_GET_RESPONSE(fr, desc->idx);
+
+       // ignoring the ring push response from the kernel side now
+       while (atomic_read(&rsp->sc->flags) != SC_DONE)
                cpu_relax();
-       memcpy(rsp, RING_GET_RESPONSE(desc->sysfr, desc->idx), sizeof(*rsp));
-       desc->sysfr->rsp_cons++;
+       // memcpy(rsp, rsp_inring, sizeof(*rsp));
+       
     // run a cleanup function for this desc, if available
-    if (desc->cleanup)
-       desc->cleanup(desc->data);
-       return 0;
+    if (rsp->cleanup)
+       rsp->cleanup(rsp->data);
+       if (RSP_ERRNO(rsp)){
+               errno = RSP_ERRNO(rsp);
+               retval = -1;
+       } else 
+               retval =  RSP_RESULT(rsp); 
+       atomic_inc((atomic_t*) &(fr->rsp_cons));
+       return retval;
 }