cdc9d49a0dd5ce235006017d152ac5d3b17c33a4
[akaros.git] / user / parlib / ceq.c
1 /* Copyright (c) 2015 Google Inc.
2  * Barret Rhoden <brho@cs.berkeley.edu>
3  * See LICENSE for details.
4  *
5  * Coalescing Event Queue: encapuslates the essence of epoll/kqueue in shared
6  * memory: a dense array of sticky status bits.
7  *
8  * User side (consumer).
9  *
10  * When initializing, the nr_events is the maximum count of events you are
11  * tracking, e.g. 100 FDs being tapped, but not the actual FD numbers.  If you
12  * use a large number, don't worry about memory; the memory is reserved but only
13  * allocated on demand (i.e. mmap without MAP_POPULATE).
14  *
15  * The ring_sz is a rough guess of the number of concurrent events.  It's not a
16  * big deal what you pick, but it must be a power of 2.  Otherwise the kernel
17  * will probably scribble over your memory.  If you pick a value that is too
18  * small, then the ring may overflow, triggering an O(n) scan of the events
19  * array (where n is the largest event ID ever seen).  You could make it the
20  * nearest power of 2 >= nr_expected_events, for reasonable behavior at the
21  * expense of memory.  It'll be very rare for the ring to have more entries than
22  * the array has events. */
23
24 #include <parlib/ceq.h>
25 #include <parlib/arch/atomic.h>
26 #include <parlib/vcore.h>
27 #include <parlib/assert.h>
28 #include <parlib/spinlock.h>
29 #include <stdlib.h>
30 #include <stdio.h>
31 #include <sys/mman.h>
32
33 void ceq_init(struct ceq *ceq, uint8_t op, unsigned int nr_events,
34               size_t ring_sz)
35 {
36         /* In case they already had an mbox initialized, cleanup whatever was
37          * there so we don't leak memory.  They better not have asked for events
38          * before doing this init call... */
39         ceq_cleanup(ceq);
40         ceq->events = mmap(NULL, sizeof(struct ceq_event) * nr_events,
41                            PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS,
42                            -1, 0);
43         parlib_assert_perror(ceq->events != MAP_FAILED);
44         ceq->nr_events = nr_events;
45         atomic_init(&ceq->max_event_ever, 0);
46         assert(IS_PWR2(ring_sz));
47         ceq->ring = malloc(sizeof(int32_t) * ring_sz);
48         memset(ceq->ring, 0xff, sizeof(int32_t) * ring_sz);
49         ceq->ring_sz = ring_sz;
50         ceq->operation = op;
51         ceq->ring_overflowed = FALSE;
52         atomic_init(&ceq->prod_idx, 0);
53         atomic_init(&ceq->cons_pub_idx, 0);
54         atomic_init(&ceq->cons_pvt_idx, 0);
55         parlib_static_assert(sizeof(struct spin_pdr_lock) <=
56                              sizeof(ceq->u_lock));
57         spin_pdr_init((struct spin_pdr_lock*)&ceq->u_lock);
58 }
59
60 /* Helper, returns an index into the events array from the ceq ring.  -1 if the
61  * ring was empty when we looked (could be filled right after we looked).  This
62  * is the same algorithm used with BCQs, but with a magic value (-1) instead of
63  * a bool to track whether or not the slot is ready for consumption. */
64 static int32_t get_ring_idx(struct ceq *ceq)
65 {
66         long pvt_idx, prod_idx;
67         int32_t ret;
68
69         do {
70                 prod_idx = atomic_read(&ceq->prod_idx);
71                 pvt_idx = atomic_read(&ceq->cons_pvt_idx);
72                 if (__ring_empty(prod_idx, pvt_idx))
73                         return -1;
74         } while (!atomic_cas(&ceq->cons_pvt_idx, pvt_idx, pvt_idx + 1));
75         /* We claimed our slot, which is pvt_idx.  The new cons_pvt_idx is
76          * advanced by 1 for the next consumer.  Now we need to wait on the
77          * kernel to fill the value: */
78         while ((ret = ceq->ring[pvt_idx & (ceq->ring_sz - 1)]) == -1)
79                 cpu_relax();
80         /* Set the value back to -1 for the next time the slot is used */
81         ceq->ring[pvt_idx & (ceq->ring_sz - 1)] = -1;
82         /* We now have our entry.  We need to make sure the pub_idx is updated.
83          * All consumers are doing this.  We can just wait on all of them to
84          * update the cons_pub to our location, then we update it to the next.
85          *
86          * We're waiting on other vcores, but we don't know which one(s). */
87         while (atomic_read(&ceq->cons_pub_idx) != pvt_idx)
88                 cpu_relax_any();
89         /* This is the only time we update cons_pub.  We also know no one else
90          * is updating it at this moment; the while loop acts as a lock, such
91          * that no one gets to this point until pub == their pvt_idx, all of
92          * which are unique. */
93         /* No rwmb needed, it's the same variable (con_pub) */
94         atomic_set(&ceq->cons_pub_idx, pvt_idx + 1);
95         return ret;
96 }
97
98 /* Helper, extracts a message from a ceq[idx], returning TRUE if there was a
99  * message.  Note that there might have been nothing in the message (coal == 0).
100  * still, that counts; it's more about idx_posted.  A concurrent reader could
101  * have swapped out the coal contents (imagine two consumers, each gets past the
102  * idx_posted check).  If having an "empty" coal is a problem, then higher level
103  * software can ask for another event.
104  *
105  * Implied in all of that is that idx_posted is also racy.  The consumer blindly
106  * sets it to false.  So long as it extracts coal after doing so, we're fine. */
107 static bool extract_ceq_msg(struct ceq *ceq, int32_t idx, struct event_msg *msg)
108 {
109         struct ceq_event *ceq_ev = &ceq->events[idx];
110
111         if (!ceq_ev->idx_posted)
112                 return FALSE;
113         /* Once we clear this flag, any new coalesces will trigger another ring
114          * event, so we don't need to worry about missing anything.  It is
115          * possible that this CEQ event will get those new coalesces as part of
116          * this message, and future messages will have nothing.  That's fine. */
117         ceq_ev->idx_posted = FALSE;
118         cmb();  /* order the read after the flag write.  swap provides cpu_mb */
119         /* We extract the existing coals and reset the collection to 0; now the
120          * collected events are in our msg. */
121         msg->ev_arg2 = atomic_swap(&ceq_ev->coalesce, 0);
122         /* if the user wants access to user_data, they can peak in the event
123          * array via ceq->events[msg->ev_type].user_data. */
124         msg->ev_type = idx;
125         msg->ev_arg3 = (void*)ceq_ev->blob_data;
126         ceq_ev->blob_data = 0;  /* racy, but there are no blob guarantees */
127         return TRUE;
128 }
129
130 /* Consumer side, returns TRUE on success and fills *msg with the ev_msg.  If
131  * the ceq appears empty, it will return FALSE.  Messages may have arrived after
132  * we started getting that we do not receive. */
133 bool get_ceq_msg(struct ceq *ceq, struct event_msg *msg)
134 {
135         int32_t idx = get_ring_idx(ceq);
136
137         if (idx == -1) {
138                 /* We didn't get anything via the ring, but if we're overflowed,
139                  * then we need to look in the array directly.  Note that we
140                  * only handle overflow when we failed to get something.
141                  * Eventually, we'll deal with overflow (which should be very
142                  * rare).  Also note that while we are dealing with overflow,
143                  * the kernel could be producing and using the ring, and we
144                  * could have consumers consuming from the ring.
145                  *
146                  * Overall, we need to clear the overflow flag, then check every
147                  * event.  If we find an event, we need to make sure the *next*
148                  * consumer continues our recovery, hence the overflow_recovery
149                  * field.  We could do the check for recovery immediately, but
150                  * that adds complexity and there's no stated guarantee of CEQ
151                  * message ordering (you don't have it with the ring, either,
152                  * technically (consider a coalesce)).  So we're fine by having
153                  * *a* consumer finish the recovery, but not necesarily the
154                  * *next* consumer.  So long as no one thinks the CEQ is empty
155                  * when there actually are old messages, then we're okay. */
156                 if (!ceq->ring_overflowed && !ceq->overflow_recovery)
157                         return FALSE;
158                 spin_pdr_lock((struct spin_pdr_lock*)&ceq->u_lock);
159                 if (!ceq->overflow_recovery) {
160                         ceq->overflow_recovery = TRUE;
161                         /* set recovery before clearing overflow */
162                         wmb();
163                         ceq->ring_overflowed = FALSE;
164                         ceq->last_recovered = 0;
165                         /* clear overflowed before reading event entries */
166                         wrmb();
167                 }
168                 for (int i = ceq->last_recovered;
169                      i <= atomic_read(&ceq->max_event_ever);
170                      i++) {
171                         /* Regardles of whether there's a msg here, we checked
172                          * it. */
173                         ceq->last_recovered++;
174                         if (extract_ceq_msg(ceq, i, msg)) {
175                                 /* We found something.  There might be more, but
176                                  * a future consumer will have to deal with it,
177                                  * or verify there isn't. */
178                                 spin_pdr_unlock(
179                                         (struct spin_pdr_lock*)&ceq->u_lock);
180                                 return TRUE;
181                         }
182                 }
183                 ceq->overflow_recovery = FALSE;
184                 /* made it to the end, looks like there was no overflow left.
185                  * there could be new ones added behind us (they'd be in the
186                  * ring or overflow would be turned on again), but those message
187                  * were added after we started consuming, and therefore not our
188                  * obligation to extract. */
189                 spin_pdr_unlock((struct spin_pdr_lock*)&ceq->u_lock);
190                 return FALSE;
191         }
192         if (!extract_ceq_msg(ceq, idx, msg))
193                 return FALSE;
194         return TRUE;
195 }
196
197 /* pvt_idx is the next slot that a new consumer will try to consume.  when
198  * pvt_idx != pub_idx, pub_idx is lagging, and it represents consumptions in
199  * progress. */
200 static bool __ceq_ring_is_empty(struct ceq *ceq)
201 {
202         return __ring_empty(atomic_read(&ceq->prod_idx),
203                             atomic_read(&ceq->cons_pvt_idx));
204 }
205
206 bool ceq_is_empty(struct ceq *ceq)
207 {
208         if (!__ceq_ring_is_empty(ceq) ||
209             ceq->ring_overflowed ||
210             spin_pdr_locked((struct spin_pdr_lock*)&ceq->u_lock)) {
211                 return FALSE;
212         }
213         return TRUE;
214 }
215
216 void ceq_cleanup(struct ceq *ceq)
217 {
218         munmap(ceq->events, sizeof(struct ceq_event) * ceq->nr_events);
219         free(ceq->ring);
220 }