1 /* Copyright (c) 2011 The Regents of the University of California
2 * Barret Rhoden <brho@cs.berkeley.edu>
3 * See LICENSE for details.
5 * Unbounded concurrent queues, user side. Check k/i/r/ucq.h or the
6 * Documentation for more info. */
8 #include <ros/arch/membar.h>
9 #include <parlib/arch/atomic.h>
10 #include <parlib/arch/arch.h>
11 #include <parlib/ucq.h>
12 #include <parlib/spinlock.h>
14 #include <parlib/assert.h>
17 #include <parlib/vcore.h>
18 #include <parlib/ros_debug.h> /* for printd() */
20 /* Initializes a ucq. You pass in addresses of mmaped pages for the main page
21 * (prod_idx) and the spare page. I recommend mmaping a big chunk and breaking
22 * it up over a bunch of ucqs, instead of doing a lot of little mmap() calls. */
23 void ucq_init_raw(struct ucq *ucq, uintptr_t pg1, uintptr_t pg2)
25 printd("[user] initializing ucq %08p for proc %d\n", ucq, getpid());
28 /* Prod and cons both start on the first page, slot 0. When they are equal,
29 * the ucq is empty. */
30 atomic_set(&ucq->prod_idx, pg1);
31 atomic_set(&ucq->cons_idx, pg1);
32 ucq->prod_overflow = FALSE;
33 atomic_set(&ucq->nr_extra_pgs, 0);
34 atomic_set(&ucq->spare_pg, pg2);
35 parlib_static_assert(sizeof(struct spin_pdr_lock) <= sizeof(ucq->u_lock));
36 spin_pdr_init((struct spin_pdr_lock*)(&ucq->u_lock));
37 ucq->ucq_ready = TRUE;
40 /* Inits a ucq, where you don't have to bother with the memory allocation. This
41 * would be appropriate for one or two UCQs, though if you're allocating in
42 * bulk, use the raw version. */
43 void ucq_init(struct ucq *ucq)
45 uintptr_t two_pages = (uintptr_t)mmap(0, PGSIZE * 2,
46 PROT_WRITE | PROT_READ,
47 MAP_POPULATE | MAP_ANONYMOUS, -1, 0);
49 ucq_init_raw(ucq, two_pages, two_pages + PGSIZE);
52 /* Only call this on ucq's made with the simple ucq_init(). And be sure the ucq
53 * is no longer in use. */
54 void ucq_free_pgs(struct ucq *ucq)
56 uintptr_t pg1 = atomic_read(&ucq->prod_idx);
57 uintptr_t pg2 = atomic_read(&ucq->spare_pg);
59 munmap((void*)pg1, PGSIZE);
60 munmap((void*)pg2, PGSIZE);
63 /* Consumer side, returns TRUE on success and fills *msg with the ev_msg. If
64 * the ucq appears empty, it will return FALSE. Messages may have arrived after
65 * we started getting that we do not receive. */
66 bool get_ucq_msg(struct ucq *ucq, struct event_msg *msg)
69 struct ucq_page *old_page, *other_page;
70 struct msg_container *my_msg;
71 struct spin_pdr_lock *ucq_lock = (struct spin_pdr_lock*)(&ucq->u_lock);
76 my_idx = atomic_read(&ucq->cons_idx);
77 /* The ucq is empty if the consumer and producer are on the same 'next'
79 if (my_idx == atomic_read(&ucq->prod_idx))
81 /* Is the slot we want good? If not, we're going to need to try and
82 * move on to the next page. If it is, we bypass all of this and try to
83 * CAS on us getting my_idx. */
84 if (slot_is_good(my_idx))
86 /* Slot is bad, let's try and fix it */
87 spin_pdr_lock(ucq_lock);
88 /* Reread the idx, in case someone else fixed things up while we
89 * were waiting/fighting for the lock */
90 my_idx = atomic_read(&ucq->cons_idx);
91 if (slot_is_good(my_idx)) {
92 /* Someone else fixed it already, let's just try to get out */
93 spin_pdr_unlock(ucq_lock);
94 /* Make sure this new slot has a producer (ucq isn't empty) */
95 if (my_idx == atomic_read(&ucq->prod_idx))
99 /* At this point, the slot is bad, and all other possible consumers are
100 * spinning on the lock. Time to fix things up: Set the counter to the
101 * next page, and free the old one. */
102 /* First, we need to wait and make sure the kernel has posted the next
103 * page. Worst case, we know that the kernel is working on it, since
104 * prod_idx != cons_idx */
105 old_page = (struct ucq_page*)PTE_ADDR(my_idx);
106 while (!old_page->header.cons_next_pg)
108 /* Now set the counter to the next page */
109 assert(!PGOFF(old_page->header.cons_next_pg));
110 atomic_set(&ucq->cons_idx, old_page->header.cons_next_pg);
111 /* Side note: at this point, any *new* consumers coming in will grab
112 * slots based off the new counter index (cons_idx) */
113 /* Now free up the old page. Need to make sure all other consumers are
114 * done. We spin til enough are done, like an inverted refcnt. */
115 while (atomic_read(&old_page->header.nr_cons) < NR_MSG_PER_PAGE) {
116 /* spinning on userspace here, specifically, another vcore and we
117 * don't know who it is. This will spin a bit, then make sure they
118 * aren't preeempted */
119 cpu_relax_vc(vcore_id()); /* pass in self to check everyone else*/
121 /* Now the page is done. 0 its metadata and give it up. */
122 old_page->header.cons_next_pg = 0;
123 atomic_set(&old_page->header.nr_cons, 0);
124 /* We want to "free" the page. We'll try and set it as the spare. If
125 * there is already a spare, we'll free that one. */
126 other_page = (struct ucq_page*)atomic_swap(&ucq->spare_pg,
128 assert(!PGOFF(other_page));
130 munmap(other_page, PGSIZE);
131 atomic_dec(&ucq->nr_extra_pgs);
133 /* All fixed up, unlock. Other consumers may lock and check to make
134 * sure things are done. */
135 spin_pdr_unlock(ucq_lock);
136 /* Now that everything is fixed, try again from the top */
139 cmb(); /* so we can goto claim_slot */
140 /* If we're still here, my_idx is good, and we'll try to claim it. If
141 * we fail, we need to repeat the whole process. */
142 } while (!atomic_cas(&ucq->cons_idx, my_idx, my_idx + 1));
143 assert(slot_is_good(my_idx));
144 /* Now we have a good slot that we can consume */
145 my_msg = slot2msg(my_idx);
146 /* linux would put an rmb_depends() here */
147 /* Wait til the msg is ready (kernel sets this flag) */
148 while (!my_msg->ready)
150 rmb(); /* order the ready read before the contents */
152 *msg = my_msg->ev_msg;
153 /* Unset this for the next usage of the container */
154 my_msg->ready = FALSE;
155 wmb(); /* post the ready write before incrementing */
156 /* Increment nr_cons, showing we're done */
157 atomic_inc(&((struct ucq_page*)PTE_ADDR(my_idx))->header.nr_cons);
161 bool ucq_is_empty(struct ucq *ucq)
163 /* The ucq is empty if the consumer and producer are on the same 'next'
165 return (atomic_read(&ucq->cons_idx) == atomic_read(&ucq->prod_idx));