1 /* Copyright (c) 2013 The Regents of the University of California
2 * Barret Rhoden <brho@cs.berkeley.edu>
3 * See LICENSE for details.
5 * Atomic pipes. Multi-reader, multi-writer pipes, similar to sys_pipe except
6 * that they operate on fixed sized chunks of data.
8 * A note on broadcast wakeups. We broadcast in a few places. If we don't,
9 * then all paths (like error paths) will have to signal. Not a big deal
10 * either way, but just need to catch all the cases. Other non-obvious
11 * cases are that read and write methods need to wake other readers and
12 * writers (in the absence of a broadcast wakeup) */
15 #include <ros/ring_buffer.h>
19 void apipe_init(struct atomic_pipe *ap, void *buf, size_t buf_sz,
23 /* power of two number of elements in the ring. */
24 ap->ap_ring_sz = ROUNDDOWNPWR2(buf_sz / elem_sz);
25 ap->ap_elem_sz = elem_sz;
28 ap->ap_nr_readers = 1;
29 ap->ap_nr_writers = 1;
30 /* Three CVs, all using the same lock. */
31 spinlock_init(&ap->ap_lock);
32 cv_init_with_lock(&ap->ap_priority_reader, &ap->ap_lock);
33 cv_init_with_lock(&ap->ap_general_readers, &ap->ap_lock);
34 cv_init_with_lock(&ap->ap_writers, &ap->ap_lock);
35 ap->ap_has_priority_reader = FALSE;
38 void apipe_open_reader(struct atomic_pipe *ap)
40 spin_lock(&ap->ap_lock);
42 spin_unlock(&ap->ap_lock);
45 void apipe_open_writer(struct atomic_pipe *ap)
47 spin_lock(&ap->ap_lock);
49 spin_unlock(&ap->ap_lock);
52 /* Helper: Wake the appropriate readers. When there's a priority reader, only
53 * that one wakes up. It's up to the priority reader to wake the other readers,
54 * by clearing has_prior and calling this again. */
55 static void __apipe_wake_readers(struct atomic_pipe *ap)
57 if (ap->ap_has_priority_reader)
58 __cv_signal(&ap->ap_priority_reader);
60 __cv_broadcast(&ap->ap_general_readers);
63 /* When closing, there might be others blocked waiting for us. For example,
64 * a writer could have blocked on a full pipe, waiting for us to read. Instead
65 * of reading, the last reader closes. The writer needs to be woken up so it
67 void apipe_close_reader(struct atomic_pipe *ap)
69 spin_lock(&ap->ap_lock);
71 __cv_broadcast(&ap->ap_writers);
72 spin_unlock(&ap->ap_lock);
75 void apipe_close_writer(struct atomic_pipe *ap)
77 spin_lock(&ap->ap_lock);
79 __apipe_wake_readers(ap);
80 spin_unlock(&ap->ap_lock);
83 /* read a pipe that is already locked. */
84 int apipe_read_locked(struct atomic_pipe *ap, void *buf, size_t nr_elem)
89 for (int i = 0; i < nr_elem; i++) {
90 /* readers that call read_locked directly might have failed to check for
91 * emptiness, so we'll double check early. */
92 if (__ring_empty(ap->ap_wr_off, ap->ap_rd_off))
94 /* power of 2 elements in the ring buffer, index is the lower n bits */
95 rd_idx = ap->ap_rd_off & (ap->ap_ring_sz - 1);
96 memcpy(buf, ap->ap_buf + rd_idx * ap->ap_elem_sz, ap->ap_elem_sz);
98 buf += ap->ap_elem_sz;
101 /* We could have multiple writers blocked. Just broadcast for them all.
102 * Alternatively, we could signal one, and then it's on the writers to
103 * signal further writers (see the note at the top of this file). */
104 __cv_broadcast(&ap->ap_writers);
109 int apipe_read(struct atomic_pipe *ap, void *buf, size_t nr_elem)
114 spin_lock(&ap->ap_lock);
115 /* Need to wait til the priority reader is gone, and the ring isn't empty.
116 * If we do this as two steps, (either of priority check or empty check
117 * first), there's a chance the second one will fail, and when we sleep and
118 * wake up, the first condition could have changed. (An alternative would
119 * be to block priority readers too, by promoting ourselves to a priority
121 while (ap->ap_has_priority_reader ||
122 __ring_empty(ap->ap_wr_off, ap->ap_rd_off)) {
123 if (!ap->ap_nr_writers) {
124 spin_unlock(&ap->ap_lock);
127 cv_wait(&ap->ap_general_readers);
130 /* This read call wakes up writers */
131 nr_copied = apipe_read_locked(ap, buf, nr_elem);
132 /* If the writer didn't broadcast, we'd need to wake other readers (imagine
133 * a long queue of blocked readers, and a queue filled by one massive
134 * write). (same with the error case). */
135 spin_unlock(&ap->ap_lock);
139 int apipe_write(struct atomic_pipe *ap, void *buf, size_t nr_elem)
144 spin_lock(&ap->ap_lock);
145 /* not sure if we want to check for readers first or not */
146 while (__ring_full(ap->ap_ring_sz, ap->ap_wr_off, ap->ap_rd_off)) {
147 if (!ap->ap_nr_readers) {
148 spin_unlock(&ap->ap_lock);
151 cv_wait(&ap->ap_writers);
154 for (int i = 0; i < nr_elem; i++) {
155 /* power of 2 elements in the ring buffer, index is the lower n bits */
156 wr_idx = ap->ap_wr_off & (ap->ap_ring_sz - 1);
157 memcpy(ap->ap_buf + wr_idx * ap->ap_elem_sz, buf, ap->ap_elem_sz);
159 buf += ap->ap_elem_sz;
161 if (__ring_full(ap->ap_ring_sz, ap->ap_wr_off, ap->ap_rd_off))
164 /* We only need to wake readers, since the reader that woke us used a
165 * broadcast. o/w, we'd need to wake the next writer. (same goes for the
167 __apipe_wake_readers(ap);
168 spin_unlock(&ap->ap_lock);
172 void *apipe_head(struct atomic_pipe *ap)
174 if (__ring_empty(ap->ap_wr_off, ap->ap_rd_off))
176 return ap->ap_buf + (ap->ap_rd_off & (ap->ap_ring_sz - 1)) * ap->ap_elem_sz;
180 * Read data from the pipe until a condition is satisfied.
181 * f is the function that determines the condition. f saves its
182 * state in arg. When f returns non-zero, this function exits,
183 * and returns the value to its caller. Note that f can return -1
184 * to indicate an error. But returning zero will keep you trapped in
185 * this function. The intent here is to ensure one-reader-at-a-time
188 int apipe_read_cond(struct atomic_pipe *ap,
189 int(*f)(struct atomic_pipe *pipe, void *arg), void *arg)
194 spin_lock(&ap->ap_lock);
195 /* Can only have one priority reader at a time. Wait our turn. */
196 while (ap->ap_has_priority_reader) {
197 cv_wait(&ap->ap_general_readers);
200 ap->ap_has_priority_reader = TRUE;
202 /* Each time there is a need to check the pipe, call
203 * f. f will maintain its state in arg. It is expected that f
204 * will dequeue elements from the pipe as they are available.
205 * N.B. this is being done for protocols like IPV4 that can
206 * fragment an RPC request. For IPV6, it is likely that this
207 * will end up looking like a blocking read. Thus was it ever
208 * with legacy code. F is supposed to call apipe_read_locked().
213 /* if nr_writers goes to zero, that's bad. return -1 because they're
214 * going to have to clean up. We should have been able to call f once
215 * though, to pull out any remaining elements. The main concern here is
216 * sleeping on the cv when no one (no writers) will wake us. */
217 if (!ap->ap_nr_writers) {
221 cv_wait(&ap->ap_priority_reader);
225 /* All out paths need to wake other readers. When we were woken up, there
226 * was no broadcast sent to the other readers. Plus, there may be other
227 * potential priority readers. */
228 ap->ap_has_priority_reader = FALSE;
229 __apipe_wake_readers(ap);
230 /* FYI, writers were woken up after an actual read. If we had an error (ret
231 * == -1), there should be no writers. */
232 spin_unlock(&ap->ap_lock);