UCQs (XCC)
[akaros.git] / user / parlib / ucq.c
1 /* Copyright (c) 2011 The Regents of the University of California
2  * Barret Rhoden <brho@cs.berkeley.edu>
3  * See LICENSE for details.
4  *
5  * Unbounded concurrent queues, user side.  Check k/i/r/ucq.h or the
6  * Documentation for more info. */
7
8 #include <ros/arch/membar.h>
9 #include <arch/atomic.h>
10 #include <arch/arch.h>
11 #include <ucq.h>
12 #include <mcs.h>
13 #include <sys/mman.h>
14 #include <assert.h>
15 #include <stdio.h>
16 #include <rassert.h> /* for the static_assert() */
17
18 /* Initializes a ucq.  You pass in addresses of mmaped pages for the main page
19  * (prod_idx) and the spare page.  I recommend mmaping a big chunk and breaking
20  * it up over a bunch of ucqs, instead of doing a lot of little mmap() calls. */
21 void ucq_init(struct ucq *ucq, uintptr_t pg1, uintptr_t pg2)
22 {
23         assert(!PGOFF(pg1));
24         assert(!PGOFF(pg2));
25         /* Prod and cons both start on the first page, slot 0.  When they are equal,
26          * the ucq is empty. */
27         atomic_set(&ucq->prod_idx, pg1);
28         atomic_set(&ucq->cons_idx, pg1);
29         ucq->prod_overflow = FALSE;
30         atomic_set(&ucq->nr_extra_pgs, 0);
31         atomic_set(&ucq->spare_pg, pg2);
32         static_assert(sizeof(struct mcs_lock) <= sizeof(ucq->u_lock));
33         mcs_lock_init((struct mcs_lock*)(&ucq->u_lock));
34         ucq->ucq_ready = TRUE;
35 }
36
37 /* Consumer side, returns 0 on success and fills *msg with the ev_msg.  If the
38  * ucq is empty, it will return -1. */
39 int get_ucq_msg(struct ucq *ucq, struct event_msg *msg)
40 {
41         uintptr_t my_idx;
42         struct ucq_page *old_page, *other_page;
43         struct msg_container *my_msg;
44         /* Locking stuff.  Would be better with a spinlock, if we had them, since
45          * this should be lightly contested.  */
46         struct mcs_lock_qnode local_qn = {0};
47         struct mcs_lock *ucq_lock = (struct mcs_lock*)(&ucq->u_lock);
48
49         do {
50 loop_top:
51                 cmb();
52                 my_idx = atomic_read(&ucq->cons_idx);
53                 /* The ucq is empty if the consumer and producer are on the same 'next'
54                  * slot. */
55                 if (my_idx == atomic_read(&ucq->prod_idx))
56                         return -1;
57                 /* Is the slot we want good?  If not, we're going to need to try and
58                  * move on to the next page.  If it is, we bypass all of this and try to
59                  * CAS on us getting my_idx. */
60                 if (slot_is_good(my_idx))
61                         goto claim_slot;
62                 /* Slot is bad, let's try and fix it */
63                 mcs_lock_notifsafe(ucq_lock, &local_qn);
64                 /* Reread the idx, in case someone else fixed things up while we
65                  * were waiting/fighting for the lock */
66                 my_idx = atomic_read(&ucq->cons_idx);
67                 if (slot_is_good(my_idx)) {
68                         /* Someone else fixed it already, let's just try to get out */
69                         mcs_unlock_notifsafe(ucq_lock, &local_qn);
70                         goto claim_slot;
71                 }
72                 /* At this point, the slot is bad, and all other possible consumers are
73                  * spinning on the lock.  Time to fix things up: Set the counter to the
74                  * next page, and free the old one. */
75                 /* First, we need to wait and make sure the kernel has posted the next
76                  * page.  Worst case, we know that the kernel is working on it, since
77                  * prod_idx != cons_idx */
78                 old_page = (struct ucq_page*)PTE_ADDR(my_idx);
79                 while (!old_page->header.cons_next_pg)
80                         cpu_relax();
81                 /* Now set the counter to the next page */
82                 assert(!PGOFF(old_page->header.cons_next_pg));
83                 atomic_set(&ucq->cons_idx, old_page->header.cons_next_pg);
84                 /* Side note: at this point, any *new* consumers coming in will grab
85                  * slots based off the new counter index (cons_idx) */
86                 /* Now free up the old page.  Need to make sure all other consumers are
87                  * done.  We spin til enough are done, like an inverted refcnt. */
88                 while (atomic_read(&old_page->header.nr_cons) < NR_MSG_PER_PAGE)
89                         cpu_relax();
90                 /* Now the page is done.  0 its metadata and give it up. */
91                 old_page->header.cons_next_pg = 0;
92                 atomic_set(&old_page->header.nr_cons, 0);
93                 /* We want to "free" the page.  We'll try and set it as the spare.  If
94                  * there is already a spare, we'll free that one. */
95                 other_page = (struct ucq_page*)atomic_swap(&ucq->spare_pg,
96                                                            (long)old_page);
97                 assert(!PGOFF(other_page));
98                 if (other_page) {
99                         munmap(other_page, PGSIZE);
100                         atomic_dec(&ucq->nr_extra_pgs);
101                 }
102                 /* All fixed up, unlock.  Other consumers may lock and check to make
103                  * sure things are done. */
104                 mcs_unlock_notifsafe(ucq_lock, &local_qn);
105                 /* Now that everything is fixed, try again from the top */
106                 goto loop_top;
107 claim_slot:
108                 cmb();  /* so we can goto claim_slot */
109                 /* If we're still here, my_idx is good, and we'll try to claim it.  If
110                  * we fail, we need to repeat the whole process. */
111         } while (!atomic_cas(&ucq->cons_idx, my_idx, my_idx + 1));
112         /* Now we have a good slot that we can consume */
113         my_msg = slot2msg(my_idx);
114         /* Wait til the msg is ready (kernel sets this flag) */
115         while (!my_msg->ready)
116                 cpu_relax();
117         /* Copy out */
118         *msg = my_msg->ev_msg;
119         /* Unset this for the next usage of the container */
120         my_msg->ready = FALSE;
121         wmb();
122         /* Increment nr_cons, showing we're done */
123         atomic_inc(&((struct ucq_page*)PTE_ADDR(my_idx))->header.nr_cons);
124         return 0;
125 }
126