45402b8174551219bbc61ad85cc006aaadc5d984
[akaros.git] / kern / include / ros / bcq.h
1 /* Copyright (c) 2010 The Regents of the University of California
2  * Barret Rhoden <brho@cs.berkeley.edu>
3  * See LICENSE for details.
4  *
5  * Multi-producer, multi-consumer queues.  Designed initially for an untrusted
6  * consumer.
7  */
8
9 #ifndef ROS_INC_BCQ_H
10 #define ROS_INC_BCQ_H
11
12 #include <atomic.h>
13
14 /* Bounded Concurrent Queues, untrusted consumer
15  *
16  * This is a producer/consumer circular buffer, in which the producer never
17  * blocks and does not need to trust the data structure (which is writable by
18  * the consumer).
19  *
20  * A producer enqueues and item, based on the indexes of the producer and
21  * consumer.  Enqueue cannot block, but can fail if the queue is full or if it
22  * fails to enqueue after a certain amount of tries.
23  *
24  * prod_idx:     the next item to be produced
25  * cons_pvt_idx: the next item a consumer can claim
26  * cons_pub_idx: the last item (farthest left / oldest) that hasn't been
27  *               consumed/made ready to be clobbered by the producer (it is
28  *               what the consumer produces).  Once all are clear, this will be
29  *               the same as the prod_idx.
30  *
31  * The number of free slots in the buffer is: BufSize - (prod_idx - cons_pub)
32  * The power-of-two nature of the number of elements makes this work when it
33  * wraps around, just like with Xen.  Check it yourself with a counter of 8 and
34  * bufsizes of 8 and 4.
35  *
36  *
37  * General plan:
38  *
39  * Producers compete among themselves, using the prod_idx, to get a free spot.
40  * Once they have a spot, they fill in the item, and then toggle the "ready for
41  * consumption" bool for a client.  If it cannot find one after a number of
42  * tries, it simply fails (could be a DoS from the client).
43  *
44  * Consumers fight with their private index, which they use to determine who is
45  * consuming which item.  If there is an unconsumed item, they try to advance
46  * the pvt counter.  If they succeed, they can consume the item.  The item
47  * might not be there yet, so they must spin until it is there.  Then, the
48  * consumer copies the item out, and clears the bool (rdy_for_cons).
49  *
50  * At this point, the consumer needs to make sure the pub_idx is advanced
51  * enough so the producer knows the item is free.  If pub_idx was their item,
52  * they move it forward to the next item.  If it is not, currently, they spin
53  * and wait until the previous consumer finishes, and then move it forward.
54  * This isn't ideal, and we can deal with this in the future.  
55  *
56  * Enqueue will enqueue the item pointed to by elem.  Dequeue will write an
57  * item into the memory pointed to by elem.
58  *
59  * The number of items must be a power of two.  In the future, we'll probably
60  * use Xen's rounding macros.  Not using powers of two is a pain, esp with mods
61  * of negative numbers.
62  *
63  * Here's how to use it:
64  *
65  * DEFINE_BCQ_TYPES(my_name, my_type, my_size);
66  * struct my_name_bcq some_bcq;
67  * bcq_init(&some_bcq, my_type, my_size);
68  *
69  * bcq_enqueue(&some_bcq, &some_my_type, my_size, num_fails_okay);
70  * bcq_dequeue(&some_bcq, &some_my_type, my_size);
71  *
72  *
73  * TODO later:
74  * How about an atomic_add_return for the prod?  Now that we use powers of two,
75  * CAS is probably overkill.
76  *
77  * Automatically round up.
78  *
79  * Watch out for ABA.  Could use ctrs in the top of the indexes.  Not really an
80  * issue, since that would be a wraparound.
81  *
82  * Consumers could also just set their bit, and have whoever has the pub_idx
83  * right before them be the one to advance it all the way up.
84  *
85  * Using uint32_t for now, since that's the comp_and_swap we have.  We'll
86  * probably get other sizes once we're sure we like the current one.  */
87
88 struct bcq_header {
89         uint32_t prod_idx;              /* next to be produced in */
90         uint32_t cons_pub_idx;  /* last completely consumed */
91         uint32_t cons_pvt_idx;  /* last a consumer has dibs on */
92 };
93
94 #define DEFINE_BCQ_TYPES(__name, __elem_t, __num_elems)                        \
95                                                                                \
96 /* Wrapper, per element, with the consumption bool */                          \
97 struct __name##_bcq_wrap {                                                     \
98         __elem_t elem;                                                             \
99         bool rdy_for_cons;      /* elem is ready for consumption */                    \
100 };                                                                             \
101                                                                                \
102 /* The actual BC queue */                                                      \
103 struct __name##_bcq {                                                          \
104         struct bcq_header hdr;                                                     \
105         struct __name##_bcq_wrap wraps[__num_elems];                               \
106 };                                                                             
107                                                                                
108 /* Functions */                                                                
109 #define bcq_init(_bcq, _ele_type, _num_elems)                                  \
110         memset((_bcq), 0, sizeof( _ele_type ) * (_num_elems))                                 
111
112 /* Num empty buffer slots in the BCQ */
113 #define BCQ_FREE_SLOTS(_p, _cp, _ne) ((_ne) - ((_p) - (_cp)))
114
115 /* Really empty */
116 #define BCQ_EMPTY(_p, _cp, _ne) ((_ne) == BCQ_FREE_SLOTS(_p, _cp, _ne))
117
118 /* All work claimed by a consumer */
119 #define BCQ_NO_WORK(_p, _cpv) ((_p) == (_cpv))
120
121 /* Buffer full */
122 #define BCQ_FULL(_p, _cp, _ne) (0 == BCQ_FREE_SLOTS(_p, _cp, _ne))
123
124 /* Figure out the slot you want, bail if it's full, or if you failed too many
125  * times, CAS to set the new prod.  Fill yours in, toggle the bool.  Sorry, the
126  * macro got a bit ugly, esp with the __retval hackery. */
127 #define bcq_enqueue(_bcq, _elem, _num_elems, _num_fail)                        \
128 ({                                                                             \
129         uint32_t __prod, __new_prod, __cons_pub, __failctr = 0;                    \
130         int __retval = 0;                                                          \
131         do {                                                                       \
132                 if (((_num_fail)) && (__failctr++ >= (_num_fail))) {                   \
133                         __retval = -EFAIL;                                                 \
134                         break;                                                             \
135                 }                                                                      \
136                 __prod = (_bcq)->hdr.prod_idx;                                         \
137                 __cons_pub = (_bcq)->hdr.cons_pub_idx;                                 \
138                 if (BCQ_FULL(__prod, __cons_pub, (_num_elems))) {                      \
139                         __retval = -EBUSY;                                                 \
140                         break;                                                             \
141                 }                                                                      \
142                 __new_prod = __prod + 1;                                               \
143         } while (!atomic_comp_swap(&(_bcq)->hdr.prod_idx, __prod, __new_prod));    \
144         if (!__retval) {                                                           \
145                 /* from here out, __prod is the local __prod that we won */            \
146                 (_bcq)->wraps[__prod & ((_num_elems)-1)].elem = *(_elem);              \
147                 (_bcq)->wraps[__prod & ((_num_elems)-1)].rdy_for_cons = TRUE;          \
148         }                                                                          \
149         __retval;                                                                  \
150 })
151
152 /* Similar to enqueue, spin afterwards til cons_pub is our element, then
153  * advance it. */
154 #define bcq_dequeue(_bcq, _elem, _num_elems)                                   \
155 ({                                                                             \
156         uint32_t __prod, __cons_pvt, __new_cons_pvt, __cons_pub;                   \
157         int __retval = 0;                                                          \
158         do {                                                                       \
159                 __prod = (_bcq)->hdr.prod_idx;                                         \
160                 __cons_pvt = (_bcq)->hdr.cons_pvt_idx;                                 \
161                 if (BCQ_NO_WORK(__prod, __cons_pvt)) {                                 \
162                         __retval = -EBUSY;                                                 \
163                         break;                                                             \
164                 }                                                                      \
165                 __new_cons_pvt = (__cons_pvt + 1);                                     \
166         } while (!atomic_comp_swap(&(_bcq)->hdr.cons_pvt_idx, __cons_pvt,          \
167                                    __new_cons_pvt));                               \
168         if (!__retval) {                                                           \
169                 /* from here out, __cons_pvt is the local __cons_pvt that we won */    \
170                 /* wait for the producer to finish copying it in */                    \
171                 while (!(_bcq)->wraps[__cons_pvt & ((_num_elems)-1)].rdy_for_cons)     \
172                         cpu_relax();                                                       \
173                 *(_elem) = (_bcq)->wraps[__cons_pvt & ((_num_elems)-1)].elem;          \
174                 (_bcq)->wraps[__cons_pvt & ((_num_elems)-1)].rdy_for_cons = FALSE;     \
175                 /* wait til we're the cons_pub, then advance it by one */              \
176                 while ((_bcq)->hdr.cons_pub_idx != __cons_pvt)                         \
177                         cpu_relax();                                                       \
178                 (_bcq)->hdr.cons_pub_idx = __cons_pvt + 1;                             \
179         }                                                                          \
180         __retval;                                                                  \
181 })
182
183 #if 0
184 /* Original C Code, for posterity / debugging */
185 static inline int enqueue(struct __name_bcq *bcq, __elem_t *elem,
186                           int num_fail)
187 {
188         uint32_t __prod, __new_prod, __cons_pub, __failctr = 0;
189         do {
190                 if ((num_fail) && (__failctr++ >= num_fail)) {
191                         printk("FAILED\n");
192                         return -EFAIL;
193                 }
194                 __prod = bcq->hdr.prod_idx;
195                 __cons_pub = bcq->hdr.cons_pub_idx;
196         printk("# free slots : %d\n", BCQ_FREE_SLOTS(__prod, __cons_pub, __num_elems));
197
198 //              printk("__prod = %p, cons_pub = %p\n", __prod, __cons_pub-1);
199 //              printk("__prod mod = %p, cons_pub mod = %p\n", __prod &(__num_elems-1), (__cons_pub-1) &(__num_elems-1));
200
201                 if (BCQ_FULL(__prod, __cons_pub, __num_elems)) {
202                         printk("BUSY\n");
203                         return -EBUSY;
204                 }
205                 __new_prod = __prod + 1;
206         } while (!atomic_comp_swap(&bcq->hdr.prod_idx, __prod, __new_prod));
207         /* from here out, __prod is the local __prod that we won */
208
209         printk("enqueuing to location %d\n", __prod & (__num_elems-1));
210
211         bcq->wraps[__prod & (__num_elems-1)].elem = *elem;
212         bcq->wraps[__prod & (__num_elems-1)].rdy_for_cons = TRUE;
213         return 0;
214 }
215
216 /* Similar to enqueue, spin afterwards til cons_pub is our element, then */
217 /* advance it. */
218 static inline int dequeue(struct __name_bcq *bcq, __elem_t *elem)
219 {
220         uint32_t __prod, __cons_pvt, __new_cons_pvt, __cons_pub;
221         do {
222                 __prod = bcq->hdr.prod_idx;
223                 __cons_pvt = bcq->hdr.cons_pvt_idx;
224                 if (BCQ_NO_WORK(__prod, __cons_pvt))
225                         return -EBUSY;
226                 __new_cons_pvt = (__cons_pvt + 1);
227         } while (!atomic_comp_swap(&bcq->hdr.cons_pvt_idx, __cons_pvt,
228                                    __new_cons_pvt));
229         /* from here out, __cons_pvt is the local __cons_pvt that we won */
230         printk("dequeueing from location %d\n", __cons_pvt & (__num_elems-1));
231
232         /* wait for the producer to finish copying it in */
233         while (!bcq->wraps[__cons_pvt & (__num_elems-1)].rdy_for_cons)
234                 cpu_relax();
235         *elem = bcq->wraps[__cons_pvt & (__num_elems-1)].elem;
236         bcq->wraps[__cons_pvt & (__num_elems-1)].rdy_for_cons = FALSE;
237         /* wait til we're the cons_pub, then advance it by one */
238         while (bcq->hdr.cons_pub_idx != __cons_pvt)
239                 cpu_relax();
240         bcq->hdr.cons_pub_idx = __cons_pvt + 1;
241         return 0;
242 }
243 #endif
244
245 #endif /* !ROS_INC_BCQ_H */