kconfig: use pkg-config for ncurses detection
[akaros.git] / kern / src / apipe.c
1 /* Copyright (c) 2013 The Regents of the University of California
2  * Barret Rhoden <brho@cs.berkeley.edu>
3  * See LICENSE for details.
4  *
5  * Atomic pipes.  Multi-reader, multi-writer pipes, similar to sys_pipe except
6  * that they operate on fixed sized chunks of data.
7  *
8  * A note on broadcast wakeups.  We broadcast in a few places.  If we don't,
9  * then all paths (like error paths) will have to signal.  Not a big deal
10  * either way, but just need to catch all the cases.  Other non-obvious
11  * cases are that read and write methods need to wake other readers and
12  * writers (in the absence of a broadcast wakeup) */
13
14 #include <apipe.h>
15 #include <ros/ring_buffer.h>
16 #include <string.h>
17 #include <stdio.h>
18
19 void apipe_init(struct atomic_pipe *ap, void *buf, size_t buf_sz,
20                 size_t elem_sz)
21 {
22         ap->ap_buf = buf;
23         /* power of two number of elements in the ring. */
24         ap->ap_ring_sz = ROUNDDOWNPWR2(buf_sz / elem_sz);
25         ap->ap_elem_sz = elem_sz;
26         ap->ap_rd_off = 0;
27         ap->ap_wr_off = 0;
28         ap->ap_nr_readers = 1;
29         ap->ap_nr_writers = 1;
30         /* Three CVs, all using the same lock. */
31         spinlock_init(&ap->ap_lock);
32         cv_init_with_lock(&ap->ap_priority_reader, &ap->ap_lock);
33         cv_init_with_lock(&ap->ap_general_readers, &ap->ap_lock);
34         cv_init_with_lock(&ap->ap_writers, &ap->ap_lock);
35         ap->ap_has_priority_reader = FALSE;
36 }
37
38 void apipe_open_reader(struct atomic_pipe *ap)
39 {
40         spin_lock(&ap->ap_lock);
41         ap->ap_nr_readers++;
42         spin_unlock(&ap->ap_lock);
43 }
44
45 void apipe_open_writer(struct atomic_pipe *ap)
46 {
47         spin_lock(&ap->ap_lock);
48         ap->ap_nr_writers++;
49         spin_unlock(&ap->ap_lock);
50 }
51
52 /* Helper: Wake the appropriate readers.  When there's a priority reader, only
53  * that one wakes up.  It's up to the priority reader to wake the other readers,
54  * by clearing has_prior and calling this again. */
55 static void __apipe_wake_readers(struct atomic_pipe *ap)
56 {
57         if (ap->ap_has_priority_reader)
58                 __cv_signal(&ap->ap_priority_reader);
59         else
60                 __cv_broadcast(&ap->ap_general_readers);
61 }
62
63 /* When closing, there might be others blocked waiting for us.  For example,
64  * a writer could have blocked on a full pipe, waiting for us to read.  Instead
65  * of reading, the last reader closes.  The writer needs to be woken up so it
66  * can return 0. */
67 void apipe_close_reader(struct atomic_pipe *ap)
68 {
69         spin_lock(&ap->ap_lock);
70         ap->ap_nr_readers--;
71         __cv_broadcast(&ap->ap_writers);
72         spin_unlock(&ap->ap_lock);
73 }
74
75 void apipe_close_writer(struct atomic_pipe *ap)
76 {
77         spin_lock(&ap->ap_lock);
78         ap->ap_nr_writers--;
79         __apipe_wake_readers(ap);
80         spin_unlock(&ap->ap_lock);
81 }
82
83 /* read a pipe that is already locked. */
84 int apipe_read_locked(struct atomic_pipe *ap, void *buf, size_t nr_elem)
85 {
86         size_t rd_idx;
87         int nr_copied = 0;
88
89         for (int i = 0; i < nr_elem; i++) {
90                 /* readers that call read_locked directly might have failed to
91                  * check for emptiness, so we'll double check early. */
92                 if (__ring_empty(ap->ap_wr_off, ap->ap_rd_off))
93                         break;
94                 /* power of 2 elements in the ring buffer, index is the lower n
95                  * bits */
96                 rd_idx = ap->ap_rd_off & (ap->ap_ring_sz - 1);
97                 memcpy(buf, ap->ap_buf + rd_idx * ap->ap_elem_sz,
98                        ap->ap_elem_sz);
99                 ap->ap_rd_off++;
100                 buf += ap->ap_elem_sz;
101                 nr_copied++;
102         }
103         /* We could have multiple writers blocked.  Just broadcast for them all.
104          * Alternatively, we could signal one, and then it's on the writers to
105          * signal further writers (see the note at the top of this file). */
106         __cv_broadcast(&ap->ap_writers);
107         return nr_copied;
108 }
109
110
111 int apipe_read(struct atomic_pipe *ap, void *buf, size_t nr_elem)
112 {
113         size_t rd_idx;
114         int nr_copied = 0;
115
116         spin_lock(&ap->ap_lock);
117         /* Need to wait til the priority reader is gone, and the ring isn't
118          * empty.  If we do this as two steps, (either of priority check or
119          * empty check first), there's a chance the second one will fail, and
120          * when we sleep and wake up, the first condition could have changed.
121          * (An alternative would be to block priority readers too, by promoting
122          * ourselves to a priority reader). */
123         while (ap->ap_has_priority_reader ||
124                __ring_empty(ap->ap_wr_off, ap->ap_rd_off)) {
125                 if (!ap->ap_nr_writers) {
126                         spin_unlock(&ap->ap_lock);
127                         return 0;
128                 }
129                 cv_wait(&ap->ap_general_readers);
130                 cpu_relax();
131         }
132         /* This read call wakes up writers */
133         nr_copied = apipe_read_locked(ap, buf, nr_elem);
134         /* If the writer didn't broadcast, we'd need to wake other readers
135          * (imagine a long queue of blocked readers, and a queue filled by one
136          * massive write).  (same with the error case). */
137         spin_unlock(&ap->ap_lock);
138         return nr_copied;
139 }
140
141 int apipe_write(struct atomic_pipe *ap, void *buf, size_t nr_elem)
142 {
143         size_t wr_idx;
144         int nr_copied = 0;
145
146         spin_lock(&ap->ap_lock);
147         /* not sure if we want to check for readers first or not */
148         while (__ring_full(ap->ap_ring_sz, ap->ap_wr_off, ap->ap_rd_off)) {
149                 if (!ap->ap_nr_readers) {
150                         spin_unlock(&ap->ap_lock);
151                         return 0;
152                 }
153                 cv_wait(&ap->ap_writers);
154                 cpu_relax();
155         }
156         for (int i = 0; i < nr_elem; i++) {
157                 /* power of 2 elements in the ring buffer, index is the lower n
158                  * bits */
159                 wr_idx = ap->ap_wr_off & (ap->ap_ring_sz - 1);
160                 memcpy(ap->ap_buf + wr_idx * ap->ap_elem_sz, buf,
161                        ap->ap_elem_sz);
162                 ap->ap_wr_off++;
163                 buf += ap->ap_elem_sz;
164                 nr_copied++;
165                 if (__ring_full(ap->ap_ring_sz, ap->ap_wr_off, ap->ap_rd_off))
166                         break;
167         }
168         /* We only need to wake readers, since the reader that woke us used a
169          * broadcast.  o/w, we'd need to wake the next writer.  (same goes for
170          * the error case). */
171         __apipe_wake_readers(ap);
172         spin_unlock(&ap->ap_lock);
173         return nr_copied;
174 }
175
176 void *apipe_head(struct atomic_pipe *ap)
177 {
178         if (__ring_empty(ap->ap_wr_off, ap->ap_rd_off))
179                 return 0;
180         return ap->ap_buf +
181                (ap->ap_rd_off & (ap->ap_ring_sz - 1)) * ap->ap_elem_sz;
182 }
183
184 /*
185  * Read data from the pipe until a condition is satisfied.
186  * f is the function that determines the condition. f saves its
187  * state in arg. When f returns non-zero, this function exits,
188  * and returns the value to its caller. Note that f can return -1
189  * to indicate an error. But returning zero will keep you trapped in
190  * this function. The intent here is to ensure one-reader-at-a-time
191  * operation.
192  */
193 int apipe_read_cond(struct atomic_pipe *ap,
194                     int(*f)(struct atomic_pipe *pipe, void *arg), void *arg)
195 {
196         size_t rd_idx;
197         int ret;
198
199         spin_lock(&ap->ap_lock);
200         /* Can only have one priority reader at a time.  Wait our turn. */
201         while (ap->ap_has_priority_reader) {
202                 cv_wait(&ap->ap_general_readers);
203                 cpu_relax();
204         }
205         ap->ap_has_priority_reader = TRUE;
206         while (1) {
207                 /* Each time there is a need to check the pipe, call
208                  * f. f will maintain its state in arg. It is expected that f
209                  * will dequeue elements from the pipe as they are available.
210                  * N.B. this is being done for protocols like IPV4 that can
211                  * fragment an RPC request. For IPV6, it is likely that this
212                  * will end up looking like a blocking read. Thus was it ever
213                  * with legacy code. F is supposed to call apipe_read_locked().
214                  */
215                 ret = f(ap, arg);
216                 if (ret)
217                         break;
218                 /* if nr_writers goes to zero, that's bad.  return -1 because
219                  * they're going to have to clean up.  We should have been able
220                  * to call f once though, to pull out any remaining elements.
221                  * The main concern here is sleeping on the cv when no one (no
222                  * writers) will wake us. */
223                 if (!ap->ap_nr_writers) {
224                         ret = -1;
225                         goto out;
226                 }
227                 cv_wait(&ap->ap_priority_reader);
228                 cpu_relax();
229         }
230 out:
231         /* All out paths need to wake other readers.  When we were woken up,
232          * there was no broadcast sent to the other readers.  Plus, there may be
233          * other potential priority readers. */
234         ap->ap_has_priority_reader = FALSE;
235         __apipe_wake_readers(ap);
236         /* FYI, writers were woken up after an actual read.  If we had an error
237          * (ret == -1), there should be no writers. */
238         spin_unlock(&ap->ap_lock);
239         return ret;
240 }