b439fc0e48b3205ec2caa049d15233ded1b564d1
[akaros.git] / user / parlib / ucq.c
1 /* Copyright (c) 2011 The Regents of the University of California
2  * Barret Rhoden <brho@cs.berkeley.edu>
3  * See LICENSE for details.
4  *
5  * Unbounded concurrent queues, user side.  Check k/i/r/ucq.h or the
6  * Documentation for more info. */
7
8 #include <ros/arch/membar.h>
9 #include <parlib/arch/atomic.h>
10 #include <parlib/arch/arch.h>
11 #include <parlib/ucq.h>
12 #include <parlib/spinlock.h>
13 #include <sys/mman.h>
14 #include <parlib/assert.h>
15 #include <parlib/stdio.h>
16 #include <stdio.h>
17 #include <stdlib.h>
18 #include <parlib/vcore.h>
19 #include <parlib/ros_debug.h> /* for printd() */
20
21 /* Initializes a ucq.  You pass in addresses of mmaped pages for the main page
22  * (prod_idx) and the spare page.  I recommend mmaping a big chunk and breaking
23  * it up over a bunch of ucqs, instead of doing a lot of little mmap() calls. */
24 void ucq_init_raw(struct ucq *ucq, uintptr_t pg1, uintptr_t pg2)
25 {
26         printd("[user] initializing ucq %08p for proc %d\n", ucq, getpid());
27         assert(!PGOFF(pg1));
28         assert(!PGOFF(pg2));
29         /* Prod and cons both start on the first page, slot 0.  When they are
30          * equal, the ucq is empty. */
31         atomic_set(&ucq->prod_idx, pg1);
32         atomic_set(&ucq->cons_idx, pg1);
33         ucq->prod_overflow = FALSE;
34         atomic_set(&ucq->nr_extra_pgs, 0);
35         atomic_set(&ucq->spare_pg, pg2);
36         parlib_static_assert(sizeof(struct spin_pdr_lock) <=
37                              sizeof(ucq->u_lock));
38         spin_pdr_init((struct spin_pdr_lock*)(&ucq->u_lock));
39         ucq->ucq_ready = TRUE;
40 }
41
42 /* Inits a ucq, where you don't have to bother with the memory allocation.  This
43  * would be appropriate for one or two UCQs, though if you're allocating in
44  * bulk, use the raw version. */
45 void ucq_init(struct ucq *ucq)
46 {
47         uintptr_t two_pages = (uintptr_t)mmap(0, PGSIZE * 2,
48                                               PROT_WRITE | PROT_READ,
49                                               MAP_POPULATE | MAP_ANONYMOUS |
50                                               MAP_PRIVATE, -1, 0);
51
52         assert(two_pages);
53         ucq_init_raw(ucq, two_pages, two_pages + PGSIZE);
54 }
55
56 /* Only call this on ucq's made with the simple ucq_init().  And be sure the ucq
57  * is no longer in use. */
58 void ucq_free_pgs(struct ucq *ucq)
59 {
60         uintptr_t pg1 = atomic_read(&ucq->prod_idx);
61         uintptr_t pg2 = atomic_read(&ucq->spare_pg);
62
63         assert(pg1 && pg2);
64         munmap((void*)pg1, PGSIZE);
65         munmap((void*)pg2, PGSIZE);
66 }
67
68 /* Consumer side, returns TRUE on success and fills *msg with the ev_msg.  If
69  * the ucq appears empty, it will return FALSE.  Messages may have arrived after
70  * we started getting that we do not receive. */
71 bool get_ucq_msg(struct ucq *ucq, struct event_msg *msg)
72 {
73         uintptr_t my_idx;
74         struct ucq_page *old_page, *other_page;
75         struct msg_container *my_msg;
76         struct spin_pdr_lock *ucq_lock = (struct spin_pdr_lock*)(&ucq->u_lock);
77
78         do {
79 loop_top:
80                 cmb();
81                 my_idx = atomic_read(&ucq->cons_idx);
82                 /* The ucq is empty if the consumer and producer are on the same
83                  * 'next' slot. */
84                 if (my_idx == atomic_read(&ucq->prod_idx))
85                         return FALSE;
86                 /* Is the slot we want good?  If not, we're going to need to try
87                  * and move on to the next page.  If it is, we bypass all of
88                  * this and try to CAS on us getting my_idx. */
89                 if (slot_is_good(my_idx))
90                         goto claim_slot;
91                 /* Slot is bad, let's try and fix it */
92                 spin_pdr_lock(ucq_lock);
93                 /* Reread the idx, in case someone else fixed things up while we
94                  * were waiting/fighting for the lock */
95                 my_idx = atomic_read(&ucq->cons_idx);
96                 if (slot_is_good(my_idx)) {
97                         /* Someone else fixed it already, let's just try to get
98                          * out */
99                         spin_pdr_unlock(ucq_lock);
100                         /* Make sure this new slot has a producer (ucq isn't
101                          * empty) */
102                         if (my_idx == atomic_read(&ucq->prod_idx))
103                                 return FALSE;
104                         goto claim_slot;
105                 }
106                 /* At this point, the slot is bad, and all other possible
107                  * consumers are spinning on the lock.  Time to fix things up:
108                  * Set the counter to the next page, and free the old one. */
109                 /* First, we need to wait and make sure the kernel has posted
110                  * the next page.  Worst case, we know that the kernel is
111                  * working on it, since prod_idx != cons_idx */
112                 old_page = (struct ucq_page*)PTE_ADDR(my_idx);
113                 while (!old_page->header.cons_next_pg)
114                         cpu_relax();
115                 /* Now set the counter to the next page */
116                 assert(!PGOFF(old_page->header.cons_next_pg));
117                 atomic_set(&ucq->cons_idx, old_page->header.cons_next_pg);
118                 /* Side note: at this point, any *new* consumers coming in will
119                  * grab slots based off the new counter index (cons_idx) */
120                 /* Now free up the old page.  Need to make sure all other
121                  * consumers are done.  We spin til enough are done, like an
122                  * inverted refcnt. */
123                 while (atomic_read(&old_page->header.nr_cons) < NR_MSG_PER_PAGE)
124                 {
125                         /* spinning on userspace here, specifically, another
126                          * vcore and we don't know who it is.  This will spin a
127                          * bit, then make sure they aren't preeempted */
128                         cpu_relax_any();
129                 }
130                 /* Now the page is done.  0 its metadata and give it up. */
131                 old_page->header.cons_next_pg = 0;
132                 atomic_set(&old_page->header.nr_cons, 0);
133                 /* We want to "free" the page.  We'll try and set it as the
134                  * spare.  If there is already a spare, we'll free that one. */
135                 other_page = (struct ucq_page*)atomic_swap(&ucq->spare_pg,
136                                                            (long)old_page);
137                 assert(!PGOFF(other_page));
138                 if (other_page) {
139                         munmap(other_page, PGSIZE);
140                         atomic_dec(&ucq->nr_extra_pgs);
141                 }
142                 /* All fixed up, unlock.  Other consumers may lock and check to
143                  * make sure things are done. */
144                 spin_pdr_unlock(ucq_lock);
145                 /* Now that everything is fixed, try again from the top */
146                 goto loop_top;
147 claim_slot:
148                 cmb();  /* so we can goto claim_slot */
149                 /* If we're still here, my_idx is good, and we'll try to claim
150                  * it.  If we fail, we need to repeat the whole process. */
151         } while (!atomic_cas(&ucq->cons_idx, my_idx, my_idx + 1));
152         assert(slot_is_good(my_idx));
153         /* Now we have a good slot that we can consume */
154         my_msg = slot2msg(my_idx);
155         /* linux would put an rmb_depends() here */
156         /* Wait til the msg is ready (kernel sets this flag) */
157         while (!my_msg->ready)
158                 cpu_relax();
159         rmb();  /* order the ready read before the contents */
160         /* Copy out */
161         *msg = my_msg->ev_msg;
162         /* Unset this for the next usage of the container */
163         my_msg->ready = FALSE;
164         wmb();  /* post the ready write before incrementing */
165         /* Increment nr_cons, showing we're done */
166         atomic_inc(&((struct ucq_page*)PTE_ADDR(my_idx))->header.nr_cons);
167         return TRUE;
168 }
169
170 bool ucq_is_empty(struct ucq *ucq)
171 {
172         /* The ucq is empty if the consumer and producer are on the same 'next'
173          * slot. */
174         return (atomic_read(&ucq->cons_idx) == atomic_read(&ucq->prod_idx));
175 }