RFC: support pipe reads based on a condition.
[akaros.git] / kern / src / apipe.c
1 /* Copyright (c) 2013 The Regents of the University of California
2  * Barret Rhoden <brho@cs.berkeley.edu>
3  * See LICENSE for details.
4  *
5  * Atomic pipes.  Multi-reader, multi-writer pipes, similar to sys_pipe except
6  * that they operate on fixed sized chunks of data. */
7
8 #include <apipe.h>
9 #include <ros/ring_buffer.h>
10 #include <string.h>
11 #include <stdio.h>
12
13 void apipe_init(struct atomic_pipe *ap, void *buf, size_t buf_sz,
14                 size_t elem_sz)
15 {
16         ap->ap_buf = buf;
17         /* power of two number of elements in the ring. */
18         ap->ap_ring_sz = ROUNDDOWNPWR2(buf_sz / elem_sz);
19         ap->ap_elem_sz = elem_sz;
20         ap->ap_rd_off = 0;
21         ap->ap_wr_off = 0;
22         ap->ap_nr_readers = 1;
23         ap->ap_nr_writers = 1;
24         cv_init(&ap->ap_cv);
25 }
26
27 void apipe_open_reader(struct atomic_pipe *ap)
28 {
29         cv_lock(&ap->ap_cv);
30         ap->ap_nr_readers++;
31         cv_unlock(&ap->ap_cv);
32 }
33
34 void apipe_open_writer(struct atomic_pipe *ap)
35 {
36         cv_lock(&ap->ap_cv);
37         ap->ap_nr_writers++;
38         cv_unlock(&ap->ap_cv);
39 }
40
41 /* When closing, there might be others blocked waiting for us.  For example,
42  * a writer could have blocked on a full pipe, waiting for us to read.  Instead
43  * of reading, the last reader closes.  The writer needs to be woken up so it
44  * can return 0. */
45 void apipe_close_reader(struct atomic_pipe *ap)
46 {
47         cv_lock(&ap->ap_cv);
48         ap->ap_nr_readers--;
49         __cv_broadcast(&ap->ap_cv);
50         cv_unlock(&ap->ap_cv);
51 }
52
53 void apipe_close_writer(struct atomic_pipe *ap)
54 {
55         cv_lock(&ap->ap_cv);
56         ap->ap_nr_writers--;
57         __cv_broadcast(&ap->ap_cv);
58         cv_unlock(&ap->ap_cv);
59 }
60
61 /* read a pipe that is already locked. */
62 int apipe_read_locked(struct atomic_pipe *ap, void *buf, size_t nr_elem)
63 {
64         size_t rd_idx;
65         int nr_copied = 0;
66
67         for (int i = 0; i < nr_elem; i++) {
68                 /* power of 2 elements in the ring buffer, index is the lower n bits */
69                 rd_idx = ap->ap_rd_off & (ap->ap_ring_sz - 1);
70                 memcpy(buf, ap->ap_buf + rd_idx * ap->ap_elem_sz, ap->ap_elem_sz);
71                 ap->ap_rd_off++;
72                 buf += ap->ap_elem_sz;
73                 nr_copied++;
74                 if (__ring_empty(ap->ap_wr_off, ap->ap_rd_off))
75                         break;
76         }
77         return nr_copied;
78 }
79
80
81 int apipe_read(struct atomic_pipe *ap, void *buf, size_t nr_elem)
82 {
83         size_t rd_idx;
84         int nr_copied = 0;
85
86         cv_lock(&ap->ap_cv);
87         while (__ring_empty(ap->ap_wr_off, ap->ap_rd_off)) {
88                 if (!ap->ap_nr_writers) {
89                         cv_unlock(&ap->ap_cv);
90                         return 0;
91                 }
92                 cv_wait(&ap->ap_cv);
93                 cpu_relax();
94         }
95         nr_copied = apipe_read_locked(ap, buf, nr_elem);
96         /* since we're using just one CV, there could be readers and writers blocked
97          * on it.  need to wake them all, to make sure we signal any blocked
98          * writers. */
99         __cv_broadcast(&ap->ap_cv);
100         cv_unlock(&ap->ap_cv);
101         return nr_copied;
102 }
103
104 int apipe_write(struct atomic_pipe *ap, void *buf, size_t nr_elem)
105 {
106         size_t wr_idx;
107         int nr_copied = 0;
108
109         cv_lock(&ap->ap_cv);
110         /* not sure if we want to check for readers first or not */
111         while (__ring_full(ap->ap_ring_sz, ap->ap_wr_off, ap->ap_rd_off)) {
112                 if (!ap->ap_nr_readers) {
113                         cv_unlock(&ap->ap_cv);
114                         return 0;
115                 }
116                 cv_wait(&ap->ap_cv);
117                 cpu_relax();
118         }
119         for (int i = 0; i < nr_elem; i++) {
120                 /* power of 2 elements in the ring buffer, index is the lower n bits */
121                 wr_idx = ap->ap_wr_off & (ap->ap_ring_sz - 1);
122                 memcpy(ap->ap_buf + wr_idx * ap->ap_elem_sz, buf, ap->ap_elem_sz);
123                 ap->ap_wr_off++;
124                 buf += ap->ap_elem_sz;
125                 nr_copied++;
126                 if (__ring_full(ap->ap_ring_sz, ap->ap_wr_off, ap->ap_rd_off))
127                         break;
128         }
129         /* since we're using just one CV, there could be readers and writers blocked
130          * on it.  need to wake them all, to make sure we signal any blocked
131          * writers. */
132         __cv_broadcast(&ap->ap_cv);
133         cv_unlock(&ap->ap_cv);
134         return nr_copied;
135 }
136
137 void *apipe_head(struct atomic_pipe *ap)
138 {
139         if (__ring_empty(ap->ap_wr_off, ap->ap_rd_off))
140                 return 0;
141         return ap->ap_buf + (ap->ap_rd_off & (ap->ap_ring_sz - 1)) * ap->ap_elem_sz;
142 }
143
144 /* 
145  * Read data from the pipe until a condition is satisfied.
146  * f is the function that determines the condition. f saves its
147  * state in arg. When f returns non-zero, this function exits,
148  * and returns the value to its caller. Note that f can return -1
149  * to indicate an error. But returning zero will keep you trapped in
150  * this function. The intent here is to ensure one-reader-at-a-time
151  * operation.
152  */
153 int apipe_read_cond(struct atomic_pipe *ap,
154                     int(*f)(struct atomic_pipe *pipe, void *arg), void *arg)
155 {
156         size_t rd_idx;
157         int ret;
158
159         cv_lock(&ap->ap_cv);
160         while (1){
161                 /* Each time there is a need to check the pipe, call
162                  * f. f will maintain its state in arg. It is expected that f
163                  * will dequeue elements from the pipe as they are available.
164                  * N.B. this is being done for protocols like IPV4 that can
165                  * fragment an RPC request. For IPV6, it is likely that this
166                  * will end up looking like a blocking read. Thus was it ever
167                  * with legacy code. F is supposed to call apipe_read_locked().
168                  */
169                 /* at any point, if nr_writers goes to zero, that's bad. */
170                 if (!ap->ap_nr_writers) {
171                         cv_unlock(&ap->ap_cv);
172                         /* return -1 because they're going to have to clean up. */
173                         return -1;
174                 }
175                 ret = f(ap, arg);
176                 if (ret)
177                         break;
178                 cv_wait(&ap->ap_cv);
179                 cpu_relax();
180         }
181         cv_unlock(&ap->ap_cv);
182         return ret;
183 }
184