Rename parlib/rassert.h -> parlib/assert.h
[akaros.git] / user / parlib / ceq.c
1 /* Copyright (c) 2015 Google Inc.
2  * Barret Rhoden <brho@cs.berkeley.edu>
3  * See LICENSE for details.
4  *
5  * Coalescing Event Queue: encapuslates the essence of epoll/kqueue in shared
6  * memory: a dense array of sticky status bits.
7  *
8  * User side (consumer).
9  *
10  * When initializing, the nr_events is the maximum count of events you are
11  * tracking, e.g. 100 FDs being tapped, but not the actual FD numbers.
12  *
13  * The ring_sz is a rough guess of the number of concurrent events.  It's not a
14  * big deal what you pick, but it must be a power of 2.  Otherwise the kernel
15  * will probably scribble over your memory.  If you pick a value that is too
16  * small, then the ring may overflow, triggering an O(n) scan of the events
17  * array.  You could make it the nearest power of 2 >= nr_events, for reasonable
18  * behavior at the expense of memory.  It'll be very rare for the ring to have
19  * more entries than the array has events. */
20
21 #include <parlib/ceq.h>
22 #include <parlib/arch/atomic.h>
23 #include <parlib/vcore.h>
24 #include <parlib/assert.h>
25 #include <parlib/spinlock.h>
26 #include <stdlib.h>
27 #include <stdio.h>
28 #include <assert.h>
29
30 void ceq_init(struct ceq *ceq, uint8_t op, size_t nr_events, size_t ring_sz)
31 {
32         /* In case they already had an mbox initialized, cleanup whatever was there
33          * so we don't leak memory.  They better not have asked for events before
34          * doing this init call... */
35         ceq_cleanup(ceq);
36         ceq->events = malloc(sizeof(struct ceq_event) * nr_events);
37         memset(ceq->events, 0, sizeof(struct ceq_event) * nr_events);
38         ceq->nr_events = nr_events;
39         assert(IS_PWR2(ring_sz));
40         ceq->ring = malloc(sizeof(int32_t) * ring_sz);
41         memset(ceq->ring, 0xff, sizeof(int32_t) * ring_sz);
42         ceq->ring_sz = ring_sz;
43         ceq->operation = op;
44         ceq->ring_overflowed = FALSE;
45         atomic_init(&ceq->prod_idx, 0);
46         atomic_init(&ceq->cons_pub_idx, 0);
47         atomic_init(&ceq->cons_pvt_idx, 0);
48         static_assert(sizeof(struct spin_pdr_lock) <= sizeof(ceq->u_lock));
49         spin_pdr_init((struct spin_pdr_lock*)&ceq->u_lock);
50 }
51
52 /* Helper, returns an index into the events array from the ceq ring.  -1 if the
53  * ring was empty when we looked (could be filled right after we looked).  This
54  * is the same algorithm used with BCQs, but with a magic value (-1) instead of
55  * a bool to track whether or not the slot is ready for consumption. */
56 static int32_t get_ring_idx(struct ceq *ceq)
57 {
58         long pvt_idx, prod_idx;
59         int32_t ret;
60         do {
61                 prod_idx = atomic_read(&ceq->prod_idx);
62                 pvt_idx = atomic_read(&ceq->cons_pvt_idx);
63                 if (__ring_empty(prod_idx, pvt_idx))
64                         return -1;
65         } while (!atomic_cas(&ceq->cons_pvt_idx, pvt_idx, pvt_idx + 1));
66         /* We claimed our slot, which is pvt_idx.  The new cons_pvt_idx is advanced
67          * by 1 for the next consumer.  Now we need to wait on the kernel to fill
68          * the value: */
69         while ((ret = ceq->ring[pvt_idx & (ceq->ring_sz - 1)]) == -1)
70                 cpu_relax();
71         /* Set the value back to -1 for the next time the slot is used */
72         ceq->ring[pvt_idx & (ceq->ring_sz - 1)] = -1;
73         /* We now have our entry.  We need to make sure the pub_idx is updated.  All
74          * consumers are doing this.  We can just wait on all of them to update the
75          * cons_pub to our location, then we update it to the next.
76          *
77          * We're waiting on other vcores, but we don't know which one(s). */
78         while (atomic_read(&ceq->cons_pub_idx) != pvt_idx)
79                 cpu_relax_vc(vcore_id());       /* wait on all of them */
80         /* This is the only time we update cons_pub.  We also know no one else is
81          * updating it at this moment; the while loop acts as a lock, such that
82          * no one gets to this point until pub == their pvt_idx, all of which are
83          * unique. */
84         /* No rwmb needed, it's the same variable (con_pub) */
85         atomic_set(&ceq->cons_pub_idx, pvt_idx + 1);
86         return ret;
87 }
88
89 /* Helper, extracts a message from a ceq[idx], returning TRUE if there was a
90  * message.  Note that there might have been nothing in the message (coal == 0).
91  * still, that counts; it's more about idx_posted.  A concurrent reader could
92  * have swapped out the coal contents (imagine two consumers, each gets past the
93  * idx_posted check).  If having an "empty" coal is a problem, then higher level
94  * software can ask for another event.
95  *
96  * Implied in all of that is that idx_posted is also racy.  The consumer blindly
97  * sets it to false.  So long as it extracts coal after doing so, we're fine. */
98 static bool extract_ceq_msg(struct ceq *ceq, int32_t idx, struct event_msg *msg)
99 {
100         struct ceq_event *ceq_ev = &ceq->events[idx];
101         if (!ceq_ev->idx_posted)
102                 return FALSE;
103         /* Once we clear this flag, any new coalesces will trigger another ring
104          * event, so we don't need to worry about missing anything.  It is possible
105          * that this CEQ event will get those new coalesces as part of this message,
106          * and future messages will have nothing.  That's fine. */
107         ceq_ev->idx_posted = FALSE;
108         cmb();  /* order the read after the flag write.  swap provides cpu_mb */
109         /* We extract the existing coals and reset the collection to 0; now the
110          * collected events are in our msg. */
111         msg->ev_arg2 = atomic_swap(&ceq_ev->coalesce, 0);
112         /* if the user wants access to user_data, they can peak in the event array
113          * via ceq->events[msg->ev_type].user_data. */
114         msg->ev_type = idx;
115         msg->ev_arg3 = (void*)ceq_ev->blob_data;
116         ceq_ev->blob_data = 0;  /* racy, but there are no blob guarantees */
117         return TRUE;
118 }
119
120 /* Consumer side, returns TRUE on success and fills *msg with the ev_msg.  If
121  * the ceq appears empty, it will return FALSE.  Messages may have arrived after
122  * we started getting that we do not receive. */
123 bool get_ceq_msg(struct ceq *ceq, struct event_msg *msg)
124 {
125         int32_t idx = get_ring_idx(ceq);
126         if (idx == -1) {
127                 if (!ceq->ring_overflowed)
128                         return FALSE;
129                 /* We didn't get anything via the ring, but if we're overflowed, then we
130                  * need to look in the array directly.  Note that we only handle
131                  * overflow when we failed to get something.  Eventually, we'll deal
132                  * with overflow (which should be very rare).  Also note that while we
133                  * are dealing with overflow, the kernel could be producing and using
134                  * the ring, and we could have consumers consuming from the ring.
135                  *
136                  * Overall, we need to clear the overflow flag, make sure the list is
137                  * empty, and turn the flag back on if it isn't.  That'll make sure
138                  * overflow is set if there's a chance there is a message in the array
139                  * that doesn't have an idx in the ring.
140                  *
141                  * However, if we do that, there's a time when overflow isn't set and
142                  * the ring is empty.  A concurrent consumer could think that the ring
143                  * is empty, when in fact it isn't.  That's bad, since we could miss a
144                  * message (i.e. sleep when we have a message we needed).  So we'll need
145                  * to deal with concurrent consumers, and whatever we do will also need
146                  * to deal with concurrent conusmers who handle overflow too.  Easiest
147                  * thing is to just lock.  If the lock is set, then that also means the
148                  * mailbox isn't empty. */
149                 spin_pdr_lock((struct spin_pdr_lock*)&ceq->u_lock);
150                 /* Check again - someone may have handled it while we were waiting on
151                  * the lock */
152                 if (!ceq->ring_overflowed) {
153                         spin_pdr_unlock((struct spin_pdr_lock*)&ceq->u_lock);
154                         return FALSE;
155                 }
156                 ceq->ring_overflowed = FALSE;
157                 wrmb(); /* clear overflowed before reading event entries */
158                 for (int i = 0; i < ceq->nr_events; i++) {
159                         if (extract_ceq_msg(ceq, i, msg)) {
160                                 /* We found something.  There might be more, but a future
161                                  * consumer will have to deal with it, or verify there isn't. */
162                                 ceq->ring_overflowed = TRUE;
163                                 spin_pdr_unlock((struct spin_pdr_lock*)&ceq->u_lock);
164                                 return TRUE;
165                         }
166                 }
167                 /* made it to the end, looks like there was no overflow left.  there
168                  * could be new ones added behind us (they'd be in the ring or overflow
169                  * would be turned on again), but those message were added after we
170                  * started consuming, and therefore not our obligation to extract. */
171                 spin_pdr_unlock((struct spin_pdr_lock*)&ceq->u_lock);
172                 return FALSE;
173         }
174         if (!extract_ceq_msg(ceq, idx, msg))
175                 return FALSE;
176         return TRUE;
177 }
178
179 /* pvt_idx is the next slot that a new consumer will try to consume.  when
180  * pvt_idx != pub_idx, pub_idx is lagging, and it represents consumptions in
181  * progress. */
182 static bool __ceq_ring_is_empty(struct ceq *ceq)
183 {
184         return __ring_empty(atomic_read(&ceq->prod_idx),
185                             atomic_read(&ceq->cons_pvt_idx));
186 }
187
188 bool ceq_is_empty(struct ceq *ceq)
189 {
190         if (!__ceq_ring_is_empty(ceq) ||
191             ceq->ring_overflowed ||
192             spin_pdr_locked((struct spin_pdr_lock*)&ceq->u_lock)) {
193                 return FALSE;
194         }
195         return TRUE;
196 }
197
198 void ceq_cleanup(struct ceq *ceq)
199 {
200         free(ceq->events);
201         free(ceq->ring);
202 }