Adding test case for multproducer async calls.
authorDavid Zhu <yuzhu@cs.berkeley.edu>
Mon, 15 Nov 2010 02:23:32 +0000 (18:23 -0800)
committerKevin Klues <klueska@cs.berkeley.edu>
Thu, 3 Nov 2011 00:36:01 +0000 (17:36 -0700)
Fixed ring buffer and arsc to support out of order processing.
Eventually this will be converted to use similar mechanisms as local
async calls and simply use the ring buffer as a way of shipping async
call structures around.

kern/include/ros/ring_buffer.h
kern/include/ros/sysevent.h
kern/src/arsc.c
tests/arsc_mt.c [new file with mode: 0644]
tests/arsc_test.c
user/parlib/asynccall.c
user/parlib/syscall.c

index 06f4e61..1a114b3 100644 (file)
@@ -26,7 +26,6 @@
 
 #ifndef ROS_INC_RING_BUFFER_H
 #define ROS_INC_RING_BUFFER_H
-
 #include <string.h>
 #include <ros/arch/membar.h>
 
@@ -36,9 +35,6 @@
 
 typedef unsigned int RING_IDX;
 
-// zra: smp.c is v. slow to build because these RDn() things cause expressions
-//      to grow exponentially.
-
 /* Round a 32-bit unsigned constant down to the nearest power of two. */
 #define __RD2(_x)  (((_x) & 0x00000002UL) ? 0x2                  : ((_x) & 0x1))
 #define __RD4(_x)  (((_x) & 0x0000000cUL) ? __RD2((_x)>>2)<<2    : __RD2(_x))
@@ -46,39 +42,20 @@ typedef unsigned int RING_IDX;
 #define __RD16(_x) (((_x) & 0x0000ff00UL) ? __RD8((_x)>>8)<<8    : __RD8(_x))
 #define __RD32(_x) (((_x) & 0xffff0000UL) ? __RD16((_x)>>16)<<16 : __RD16(_x))
 
-/* Statically assert that two values are in fact equal.
- * It works by enducing a compil error from a duplicate case in a switch 
- * statement if the assertion is false.
- */
-#define __ASSERT_EQUAL(x, y) \
-       switch ((x) == (y)) case 0: case ((x) == (y)):
-
 /*
  * Calculate size of a shared ring, given the total available space for the
  * ring and indexes (_sz), and the name tag of the request/response structure.
  * A ring contains as many entries as will fit, rounded down to the nearest 
  * power of two (so we can mask with (size-1) to loop around).
- * This tells us how many elements the ring _s can contain, given _sz space.
- */
-#define __RING_SIZE(_s, _sz) \
-    __RING_SIZE_BYTES(_s, _sz) / sizeof((_s)->ring[0])
-
-/*
- * This is the same as above, except in terms of bytes instead of elements
  */
-#define __RING_SIZE_BYTES(_s, _sz) \
-    (__RD32((_sz) - (long)(_s)->ring + (long)(_s)))
-
+#define __CONST_RING_SIZE(_s, _sz) \
+    (__RD32(((_sz) - offsetof(struct _s##_sring, ring)) / \
+           sizeof(((struct _s##_sring *)0)->ring[0])))
 /*
- * These two are the same as above except that they rely on type information
- * to determine the sizes statically, rather than the runtime instantiation
- * of the ring buffer variable
+ * The same for passing in an actual pointer instead of a name tag.
  */
-#define __RING_SIZE_STATIC(__name, _sz) \
-    (__RING_SIZE_BYTES_STATIC(_sz) / sizeof(union __name##_sring_entry))
-
-#define __RING_SIZE_BYTES_STATIC(_sz) \
-    (__RD32((_sz) - __RING_HEADER_SIZE()))
+#define __RING_SIZE(_s, _sz) \
+    (__RD32(((_sz) - (long)(_s)->ring + (long)(_s)) / sizeof((_s)->ring[0])))
 
 /*
  * Macros to make the correct C datatypes for a new kind of ring.
@@ -89,25 +66,14 @@ typedef unsigned int RING_IDX;
  * In a header where you want the ring datatype declared, you then do:
  *
  *     DEFINE_RING_TYPES(mytag, request_t, response_t);
- * or
- *     DEFINE_RING_TYPES_WITH_SIZE(mytag, request_t, response_t, size);
  *
- * Both macros expand out to give you a set of types, as you can see below.
+ * These expand out to give you a set of types, as you can see below.
  * The most important of these are:
  * 
  *     mytag_sring_t      - The shared ring.
  *     mytag_front_ring_t - The 'front' half of the ring.
  *     mytag_back_ring_t  - The 'back' half of the ring.
  *
- * The first of these macros will only declare a single element array to 
- * represent the ring buffer in the shared ring struct that is ultimately
- * created.  
- *
- * The second macro actually statically declares space of size (size) inside
- * the shared ring struct. This size is rounded down to the nearest power of 2
- * and space is subtracted off to account for holding any necessary ring 
- * buffer headers.
- *
  * To initialize a ring in your code you need to know the location and size
  * of the shared memory area (PAGE_SIZE, for instance). To initialise
  * the front half:
@@ -121,41 +87,31 @@ typedef unsigned int RING_IDX;
  *
  *     mytag_back_ring_t back_ring;
  *     BACK_RING_INIT(&back_ring, (mytag_sring_t *)shared_page, PAGE_SIZE);
- *
- * If you use the second of the two macros when first defining your ring 
- * structures, then the size you use when initializing your front and back 
- * rings *should* match the size you passed into this macro (e.g. PAGE_SIZE
- * in this example).
  */
 
-#define __RING_HEADER()                                                 \
-    RING_IDX req_prod, req_event;                                       \
-    RING_IDX rsp_prod, rsp_event;                                       \
-    uint8_t  pad[48];
-    
-struct rhs_struct {
-       __RING_HEADER()
-};
-
-#define __RING_HEADER_SIZE() \
-    (sizeof(struct rhs_struct))
-
 #define DEFINE_RING_TYPES(__name, __req_t, __rsp_t)                     \
-       DEFINE_RING_TYPES_WITH_SIZE(__name, __req_t, __rsp_t,               \
-                                   __RING_HEADER_SIZE() + 1)
-
-#define DEFINE_RING_TYPES_WITH_SIZE(__name, __req_t, __rsp_t, __size)   \
                                                                         \
 /* Shared ring entry */                                                 \
 union __name##_sring_entry {                                            \
     __req_t req;                                                        \
     __rsp_t rsp;                                                        \
-} TRUSTED;                                                              \
+};                                                                      \
                                                                         \
 /* Shared ring page */                                                  \
 struct __name##_sring {                                                 \
-       __RING_HEADER()                                                     \
-    union __name##_sring_entry ring[__RING_SIZE_STATIC(__name, __size)];\
+    RING_IDX req_prod, req_event;                                       \
+    RING_IDX rsp_prod, rsp_event;                                       \
+    union {                                                             \
+        struct {                                                        \
+            uint8_t smartpoll_active;                                   \
+        } netif;                                                        \
+        struct {                                                        \
+            uint8_t msg;                                                \
+        } tapif_user;                                                   \
+        uint8_t pvt_pad[4];                                             \
+    } private;                                                          \
+    uint8_t __pad[44];                                                  \
+    union __name##_sring_entry ring[1]; /* variable-length */           \
 };                                                                      \
                                                                         \
 /* "Front" end's private variables */                                   \
@@ -177,21 +133,7 @@ struct __name##_back_ring {                                             \
 /* Syntactic sugar */                                                   \
 typedef struct __name##_sring __name##_sring_t;                         \
 typedef struct __name##_front_ring __name##_front_ring_t;               \
-typedef struct __name##_back_ring __name##_back_ring_t;                 \
-/* This is a dummy function just used to statically assert that         \
- * there are no weird padding issues associated with our sring structs  \
- */                                                                     \
-static void __name##_assert_sring_size() __attribute__((used));         \
-static void __name##_assert_sring_size() {                              \
-       __ASSERT_EQUAL( sizeof(__name##_sring_t),                           \
-                       ( __RING_HEADER_SIZE()  +                           \
-                         ( __RING_SIZE_STATIC(__name, __size) *            \
-                           sizeof(union __name##_sring_entry)              \
-                         )                                                 \
-                       )                                                   \
-                     );                                                    \
-}              
-
+typedef struct __name##_back_ring __name##_back_ring_t
 
 /*
  * Macros for manipulating rings.
@@ -212,7 +154,8 @@ static void __name##_assert_sring_size() {                              \
 #define SHARED_RING_INIT(_s) do {                                       \
     (_s)->req_prod  = (_s)->rsp_prod  = 0;                              \
     (_s)->req_event = (_s)->rsp_event = 1;                              \
-    (void)memset((_s)->pad, 0, sizeof((_s)->pad));                      \
+    (void)memset((_s)->private.pvt_pad, 0, sizeof((_s)->private.pvt_pad)); \
+    (void)memset((_s)->__pad, 0, sizeof((_s)->__pad));                  \
 } while(0)
 
 #define FRONT_RING_INIT(_r, _s, __size) do {                            \
@@ -376,4 +319,3 @@ static void __name##_assert_sring_size() {                              \
  * indent-tabs-mode: nil
  * End:
  */
-
index 174ff07..c022c31 100644 (file)
@@ -33,7 +33,7 @@ typedef struct sysevent_rsp {
 
 // Generic Sysevent Ring Buffer
 #define SYSEVENTRINGSIZE    PGSIZE
-DEFINE_RING_TYPES_WITH_SIZE(sysevent, sysevent_t, sysevent_rsp_t, SYSEVENTRINGSIZE);
+DEFINE_RING_TYPES(sysevent, sysevent_t, sysevent_rsp_t);
 
 #endif //ROS_SYSEVENT_H
 
index 0d6a684..594ddf4 100644 (file)
@@ -121,10 +121,10 @@ static intreg_t process_generic_syscalls(struct proc *p, size_t max)
                rsp.syserr = sysc.err;
                rsp.cleanup = req->cleanup;
                rsp.data = req->data;
-               rsp.status = RES_ready;
                // write response into the slot it came from
                memcpy(req, &rsp, sizeof(syscall_rsp_t));
                (sysbr->rsp_prod_pvt)++;
+               req->status = RES_ready;
                RING_PUSH_RESPONSES(sysbr);
 
                //printk("DEBUG POST: sring->req_prod: %d, sring->rsp_prod: %d\n",
diff --git a/tests/arsc_mt.c b/tests/arsc_mt.c
new file mode 100644 (file)
index 0000000..e5fee5e
--- /dev/null
@@ -0,0 +1,56 @@
+#include <rstdio.h>
+#include <stdlib.h>
+#include <pthread.h>
+#include <unistd.h>
+#include <vcore.h>
+#include <parlib.h>
+#include <ros/syscall.h>
+#include <arc.h>
+#include <stdio.h>
+
+#define NUM_THREADS 4 
+
+pthread_t t1;
+
+async_desc_t desc1;
+
+syscall_desc_t* sys_cputs_async(const char *s, size_t len,                                             
+                     void (*cleanup_handler)(void*), void* cleanup_data)
+{                                                                                                                     
+    // could just hardcode 4 0's, will eventually wrap this marshaller anyway                                         
+       syscall_desc_t* desc;
+    syscall_req_t syscall = {REQ_alloc, cleanup_handler, cleanup_data,
+                                                       SYS_cputs,{(uint32_t)s, len, [2 ... (NUM_SYSCALL_ARGS-1)] 0} };                          
+    syscall.cleanup = cleanup_handler;                                                                                  
+    syscall.data = cleanup_data;
+    async_syscall(&syscall, &desc);
+       return desc;
+}
+
+void *syscall_thread(void* arg)
+{      
+       char testme ='a';
+       char buf[20] = {0};
+       sprintf(buf, "%d", pthread_self()->id );
+       char tid = buf[0];
+       syscall_desc_t* sysdesc;
+       syscall_rsp_t sysrsp;
+       sysdesc = sys_cputs_async(&tid, 1, NULL, NULL);
+       waiton_syscall(sysdesc, &sysrsp);
+}
+
+int main(int argc, char** argv){
+       int pid = sys_getpid();
+       pthread_t *my_threads = malloc(sizeof(pthread_t) * NUM_THREADS);
+       char testme = 't';
+       printf ("multi thread - init arsc \n");
+       init_arc(&SYS_CHANNEL);
+       for (int i = 0; i < NUM_THREADS ; i++)
+               pthread_create(&my_threads[i], NULL, &syscall_thread, NULL);
+       
+       for (int i = 0; i < NUM_THREADS; i++)
+               pthread_join(my_threads[i], NULL);
+
+       printf("multi thread - end\n");
+}
+
index 7d32b54..40da172 100644 (file)
@@ -21,15 +21,17 @@ int main(int argc, char** argv){
        int pid = sys_getpid();
        char testme = 't';
        printf ("single thread - init arsc \n");
-       syscall_desc_t* sysdesc;
+       syscall_desc_t* sysdesc[2];
        syscall_rsp_t sysrsp;
        init_arc(&SYS_CHANNEL);
 
        printf ("single thread - init complete \n");
        // cprintf_async(&desc1, "Cross-Core call 1, coming from process %08x\n", pid);
-       sysdesc = sys_cputs_async(&testme, 1, NULL, NULL);
+       sysdesc[0] = sys_cputs_async(&testme, 1, NULL, NULL);
+       sysdesc[1] = sys_cputs_async(&testme, 1, NULL, NULL);
 
        printf ("single thread - call placed \n");
-       waiton_syscall(sysdesc, &sysrsp);       
+       waiton_syscall(sysdesc[0], &sysrsp);
+       waiton_syscall(sysdesc[1], &sysrsp);
        printf ("single thread - dummy call \n");       
 }
index 3392aa6..1e57dc1 100644 (file)
@@ -8,6 +8,9 @@
 #include <errno.h>
 #include <arch/arch.h>
 #include <sys/param.h>
+#include <arch/atomic.h>
+
+#include <pthread.h>
 
 syscall_desc_pool_t syscall_desc_pool;
 async_desc_pool_t async_desc_pool;
@@ -115,9 +118,10 @@ int async_syscall(syscall_req_t* req, syscall_desc_t** desc_ptr2)
        // abort if there is no room for our request.  ring size is currently 64.
        // we could spin til it's free, but that could deadlock if this same thread
        // is supposed to consume the requests it is waiting on later.
-       syscall_desc_t* desc = *desc_ptr2 = malloc(sizeof (syscall_desc_t));
+       syscall_desc_t* desc = malloc(sizeof (syscall_desc_t));
        desc->channel = &SYS_CHANNEL;
        syscall_front_ring_t *fr = &(desc->channel->sysfr);
+       //TODO: can do it locklessly using CAS, but could change with local async calls
        mcs_lock_lock(&desc->channel->aclock);
        if (RING_FULL(fr)) {
                errno = EBUSY;
@@ -128,9 +132,9 @@ int async_syscall(syscall_req_t* req, syscall_desc_t** desc_ptr2)
        // at some point, we need to listen for the responses.
        desc->idx = ++(fr->req_prod_pvt);
        syscall_req_t* r = RING_GET_REQUEST(fr, desc->idx);
-       assert (r->status == RES_free);
+       // CAS on the req->status perhaps
        req->status = REQ_alloc;
-       
+
        memcpy(r, req, sizeof(syscall_req_t));
        r->status = REQ_ready;
        // push our updates to syscallfrontring.req_prod_pvt
@@ -139,34 +143,38 @@ int async_syscall(syscall_req_t* req, syscall_desc_t** desc_ptr2)
        RING_PUSH_REQUESTS(fr);
        //cprintf("DEBUG: sring->req_prod: %d, sring->rsp_prod: %d\n", 
        mcs_lock_unlock(&desc->channel->aclock);
+       *desc_ptr2 = desc;
        return 0;
 }
 
 // consider a timeout too
 int waiton_syscall(syscall_desc_t* desc, syscall_rsp_t* rsp)
 {
+       if (desc == NULL || desc->channel == NULL){
+               //errno = EFAIL;
+               return -1;
+       }
        // Make sure we were given a desc with a non-NULL frontring.  This could
        // happen if someone forgot to check the error code on the paired syscall.
        syscall_front_ring_t *fr =  &desc->channel->sysfr;
        
        if (!fr){
-               errno = EFAIL;
+               //errno = EFAIL;
                return -1;
        }
-       // this forces us to call wait in the order in which the syscalls are made.
-       if (desc->idx != fr->rsp_cons + 1){
-               errno = EDEADLOCK;
-               return -1;
-       }
-       while (!(RING_HAS_UNCONSUMED_RESPONSES(fr)))
+       syscall_rsp_t* rsp_inring = RING_GET_RESPONSE(fr, desc->idx);
+
+       // ignoring the ring push response from the kernel side now
+       while (rsp_inring->status != RES_ready)
                cpu_relax();
-       memcpy(rsp, RING_GET_RESPONSE(fr, desc->idx), sizeof(*rsp));
-       fr->rsp_cons++;
+       memcpy(rsp, rsp_inring, sizeof(*rsp));
+       
+       atomic_inc((atomic_t*) &(fr->rsp_cons));
     // run a cleanup function for this desc, if available
-    if (rsp->cleanup)
-       rsp->cleanup(rsp->data);
+    //if (rsp->cleanup)
+    // rsp->cleanup(rsp->data);
        if (rsp->syserr){
-               errno = rsp->syserr;
+               //      errno = rsp->syserr;
                return -1;
        } else 
                return 0;
index 941fc21..189ebc2 100644 (file)
@@ -131,5 +131,4 @@ int sys_halt_core(unsigned int usec)
 void* sys_init_arsc()
 {
        return (void*)ros_syscall(SYS_init_arsc, 0, 0, 0, 0, 0, 0);
-       return (void*)ros_syscall(SYS_init_arsc, 0, 0, 0, 0, 0);
 }