Merge branch 'master' into net-dev
[akaros.git] / user / parlib / src / channel.c
1 /* Copyright (c) 2009 The Regents of the University  of California. 
2  * See the COPYRIGHT files at the top of this source tree for full 
3  * license information.
4  * 
5  * Kevin Klues <klueska@cs.berkeley.edu>    
6  */
7
8 #include <parlib.h>
9 #include <stdio.h>
10 #include <string.h>
11 #include <channel.h>
12 #include <ros/env.h>
13 #include <ros/syscall.h>
14 #include <arch/arch.h>
15
16 void simulate_rsp(channel_t* ch) {
17         channel_t ch_server;
18         channel_msg_t msg;
19         ch_server.endpoint = ch->endpoint;
20         ch_server.type = CHANNEL_SERVER;
21         ch_server.ring_addr = ch->ring_addr;
22         ch_server.data_addr = ch->data_addr;    
23         BACK_RING_INIT(&(ch_server.ring_side.back), (ch_server.ring_addr), PGSIZE);
24         
25         channel_recvmsg(&ch_server, &msg);
26 }
27
28 error_t channel_create(envid_t server, channel_t* ch, channel_attr_t* ch_attr) {
29         error_t e;
30         void *COUNT(PGSIZE) ring_addr = NULL;
31         void *COUNT(PGSIZE) data_addr = NULL;
32         
33         /*
34          * Attempt to create two shared pages with the 'server' partition.
35          * One for holding our ring buffer, and one for holding our data.
36          * If there is an error at any point during this process, we need
37          * to cleanup and abort, returning the proper error code.  First we
38          * do the page for the ring buffer, deferring the data page till later.
39          */
40         e = sys_shared_page_alloc(&ring_addr, server, PG_RDWR, PG_RDWR);
41         if(e < 0) return e;
42         
43         /* 
44          * If we've made it to here, then we know the shared memory page
45          * for our ring buffer has been mapped successfully into both our own 
46          * address space as well as the server's.  We also know that the 
47          * server has been notified of its creation.  We still haven't mapped in 
48          * our data page, but we will do that after this page has been initialized
49          * properly.  We do this because we need to somehow synchronize the 
50          * the initilization of the ring buffer on this page with 
51          * the server being able to access it.  We do this by forcing the 
52          * initialization now on this end, and only accessing it on the server side
53          * after our data page has been created.
54          */
55         memset((void*SAFE) TC(ch), 0, sizeof(channel_t));
56         ch->endpoint = server;
57         ch->ring_addr = (channel_sring_t *COUNT(1)) TC(ring_addr);
58         ch->type = CHANNEL_CLIENT;
59                 
60         /*
61          * As the creator of this channel, we take on the responsibility of 
62          * setting up the ring buffer itself, as well as setting up our own front 
63          * ring so we can push requests (a.ka. messages) out for the server to 
64          * process.  It is the job of the server to setup its back ring so it can
65          * push responses (a.k.a acks) back for us to know the message has been 
66          * processed.
67          */
68         SHARED_RING_INIT(ch->ring_addr);
69         FRONT_RING_INIT(&(ch->ring_side.front), (ch->ring_addr), PGSIZE);
70         
71         /* 
72          * Now we map in the data page on both ends and add a pointer to it in 
73          * our channel struct.
74          */
75         e = sys_shared_page_alloc(&data_addr, server, PG_RDWR, PG_RDONLY);
76         if(e < 0) {
77                 sys_shared_page_free(ring_addr, server);
78                 return e;
79         }
80         ch->data_addr = data_addr;
81         
82         /* 
83          * Once both pages have been mapped, our data structures have been set up, 
84          * and everything is ready to go, we push an empty message out into the 
85          * ring indicating we are ready on our end.  This implicitly waits for the 
86          * server  to push a response indicating it has finished setting up the other 
87          * side of the channel.
88          */
89         channel_msg_t msg;
90         channel_sendmsg(ch, &msg);
91     
92     /*
93      * If everything has gone according to plan, both ends have set up 
94      * their respective ends of the channel and we can return successfully
95      */
96         return ESUCCESS;
97 }
98
99 error_t channel_destroy(channel_t* ch) {
100         sys_shared_page_free(ch->data_addr, ch->endpoint);
101         sys_shared_page_free((void *COUNT(PGSIZE)) TC(ch->ring_addr), ch->endpoint);
102         return ESUCCESS;
103 }
104
105 error_t channel_sendmsg(channel_t* ch, channel_msg_t* msg) {
106         /*
107          * As a first cut implementation, just copy the message handed to 
108          * us into our ring buffer and copy the buffer pointed to in 
109          * the message at the first address in our shared data page.
110          */
111         channel_msg_t* msg_copy;
112         msg_copy = RING_GET_REQUEST(&(ch->ring_side.front), 
113                                     ch->ring_side.front.req_prod_pvt++);
114         msg_copy->len = msg->len;
115         msg_copy->buf = 0;
116         memcpy(ch->data_addr + (size_t)(msg_copy->buf), msg->buf, msg->len);
117         
118         /*
119          * Now that we have copied the message properly, we push the request out
120          * and wait for a response from the server.
121          */
122         RING_PUSH_REQUESTS(&(ch->ring_side.front));
123         
124         while (!(RING_HAS_UNCONSUMED_RESPONSES(&(ch->ring_side.front))))
125                 cpu_relax();
126         RING_GET_RESPONSE(&(ch->ring_side.front), ch->ring_side.front.rsp_cons++);
127         
128         return ESUCCESS;
129 }
130
131 error_t channel_create_wait(channel_t* ch, channel_attr_t* ch_attr) {
132 #if 0
133         error_t e;
134         envid_t* client;
135         void *COUNT(PGSIZE) ring_addr = NULL;
136         void *COUNT(PGSIZE) data_addr = NULL;
137
138         /* 
139          * Set the type of the channel to the server
140          */
141         ch->type = CHANNEL_SERVER;
142         
143         /*
144          * Wait for the shared ring page to be established and set up
145          * the channel struct with its properties
146          */
147         sysevent_shared_page_alloc_wait(client, ring_addr);
148         ch->endpoint = *client;
149         ch->ring_addr = ring_addr;
150         
151         /*
152          * Now wait for the shared data page to be established and then set up
153          * the backring of the shared ring buffer.
154          */
155         sysevent_shared_page_alloc_wait(client, data_addr);
156         ch->data_addr = data_addr;      
157         BACK_RING_INIT(&(ch->ring_side.back), (ch->ring_addr), PGSIZE);
158         
159         /*
160          * If we've reached this point, then the creating side should already
161          * have a message sitting in the ring buffer waiting for use to 
162          * process.  Now we pull that message off and acknowledge it.
163          */
164         channel_msg_t msg;
165         channel_recvmsg(&ch, &msg);
166 #endif  
167         return ESUCCESS;
168 }
169
170 error_t channel_recvmsg(channel_t* ch, channel_msg_t* msg) {
171         /*
172          * First copy the data contained in the message from shared page pointed 
173          * to by the entry in the ring buffer to the msg struct passedinto this 
174          * function
175          */
176         channel_msg_t* msg_copy = RING_GET_REQUEST(&(ch->ring_side.back), 
177                                                   ch->ring_side.back.req_cons++);
178         msg->len = msg_copy->len; 
179         memcpy(msg->buf, ch->data_addr + (size_t)(msg_copy->buf), msg->len);
180         
181         /*
182          * Then acknowledge that its been serviced / in the process of being 
183          * serviced.
184          */     
185         channel_ack_t* ack = RING_GET_RESPONSE(&(ch->ring_side.back), 
186                                                ch->ring_side.back.rsp_prod_pvt++);      
187         RING_PUSH_RESPONSES(&(ch->ring_side.back));     
188
189         return ESUCCESS;
190 }