Atomic pipes
[akaros.git] / kern / src / apipe.c
1 /* Copyright (c) 2013 The Regents of the University of California
2  * Barret Rhoden <brho@cs.berkeley.edu>
3  * See LICENSE for details.
4  *
5  * Atomic pipes.  Multi-reader, multi-writer pipes, similar to sys_pipe except
6  * that they operate on fixed sized chunks of data. */
7
8 #include <apipe.h>
9 #include <ros/ring_buffer.h>
10 #include <string.h>
11 #include <stdio.h>
12
13 void apipe_init(struct atomic_pipe *ap, void *buf, size_t buf_sz,
14                 size_t elem_sz)
15 {
16         ap->ap_buf = buf;
17         /* power of two number of elements in the ring. */
18         ap->ap_ring_sz = ROUNDDOWNPWR2(buf_sz / elem_sz);
19         ap->ap_elem_sz = elem_sz;
20         ap->ap_rd_off = 0;
21         ap->ap_wr_off = 0;
22         ap->ap_nr_readers = 1;
23         ap->ap_nr_writers = 1;
24         cv_init(&ap->ap_cv);
25 }
26
27 void apipe_open_reader(struct atomic_pipe *ap)
28 {
29         cv_lock(&ap->ap_cv);
30         ap->ap_nr_readers++;
31         cv_unlock(&ap->ap_cv);
32 }
33
34 void apipe_open_writer(struct atomic_pipe *ap)
35 {
36         cv_lock(&ap->ap_cv);
37         ap->ap_nr_writers++;
38         cv_unlock(&ap->ap_cv);
39 }
40
41 /* When closing, there might be others blocked waiting for us.  For example,
42  * a writer could have blocked on a full pipe, waiting for us to read.  Instead
43  * of reading, the last reader closes.  The writer needs to be woken up so it
44  * can return 0. */
45 void apipe_close_reader(struct atomic_pipe *ap)
46 {
47         cv_lock(&ap->ap_cv);
48         ap->ap_nr_readers--;
49         __cv_broadcast(&ap->ap_cv);
50         cv_unlock(&ap->ap_cv);
51 }
52
53 void apipe_close_writer(struct atomic_pipe *ap)
54 {
55         cv_lock(&ap->ap_cv);
56         ap->ap_nr_writers--;
57         __cv_broadcast(&ap->ap_cv);
58         cv_unlock(&ap->ap_cv);
59 }
60
61 int apipe_read(struct atomic_pipe *ap, void *buf, size_t nr_elem)
62 {
63         size_t rd_idx;
64         int nr_copied = 0;
65
66         cv_lock(&ap->ap_cv);
67         while (__ring_empty(ap->ap_wr_off, ap->ap_rd_off)) {
68                 if (!ap->ap_nr_writers) {
69                         cv_unlock(&ap->ap_cv);
70                         return 0;
71                 }
72                 cv_wait(&ap->ap_cv);
73                 cpu_relax();
74         }
75         for (int i = 0; i < nr_elem; i++) {
76                 /* power of 2 elements in the ring buffer, index is the lower n bits */
77                 rd_idx = ap->ap_rd_off & (ap->ap_ring_sz - 1);
78                 memcpy(buf, ap->ap_buf + rd_idx * ap->ap_elem_sz, ap->ap_elem_sz);
79                 ap->ap_rd_off++;
80                 buf += ap->ap_elem_sz;
81                 nr_copied++;
82                 if (__ring_empty(ap->ap_wr_off, ap->ap_rd_off))
83                         break;
84         }
85         /* since we're using just one CV, there could be readers and writers blocked
86          * on it.  need to wake them all, to make sure we signal any blocked
87          * writers. */
88         __cv_broadcast(&ap->ap_cv);
89         cv_unlock(&ap->ap_cv);
90         return nr_copied;
91 }
92
93 int apipe_write(struct atomic_pipe *ap, void *buf, size_t nr_elem)
94 {
95         size_t wr_idx;
96         int nr_copied = 0;
97
98         cv_lock(&ap->ap_cv);
99         /* not sure if we want to check for readers first or not */
100         while (__ring_full(ap->ap_ring_sz, ap->ap_wr_off, ap->ap_rd_off)) {
101                 if (!ap->ap_nr_readers) {
102                         cv_unlock(&ap->ap_cv);
103                         return 0;
104                 }
105                 cv_wait(&ap->ap_cv);
106                 cpu_relax();
107         }
108         for (int i = 0; i < nr_elem; i++) {
109                 /* power of 2 elements in the ring buffer, index is the lower n bits */
110                 wr_idx = ap->ap_wr_off & (ap->ap_ring_sz - 1);
111                 memcpy(ap->ap_buf + wr_idx * ap->ap_elem_sz, buf, ap->ap_elem_sz);
112                 ap->ap_wr_off++;
113                 buf += ap->ap_elem_sz;
114                 nr_copied++;
115                 if (__ring_full(ap->ap_ring_sz, ap->ap_wr_off, ap->ap_rd_off))
116                         break;
117         }
118         /* since we're using just one CV, there could be readers and writers blocked
119          * on it.  need to wake them all, to make sure we signal any blocked
120          * writers. */
121         __cv_broadcast(&ap->ap_cv);
122         cv_unlock(&ap->ap_cv);
123         return nr_copied;
124 }
125
126 void *apipe_head(struct atomic_pipe *ap)
127 {
128         if (__ring_empty(ap->ap_wr_off, ap->ap_rd_off))
129                 return 0;
130         return ap->ap_buf + (ap->ap_rd_off & (ap->ap_ring_sz - 1)) * ap->ap_elem_sz;
131 }