Make glibc's printf with a vcore-ctx aware (XCC)
[akaros.git] / user / parlib / ucq.c
index 860e850..8d0551f 100644 (file)
@@ -6,20 +6,23 @@
  * Documentation for more info. */
 
 #include <ros/arch/membar.h>
-#include <arch/atomic.h>
-#include <arch/arch.h>
-#include <ucq.h>
-#include <mcs.h>
+#include <parlib/arch/atomic.h>
+#include <parlib/arch/arch.h>
+#include <parlib/ucq.h>
+#include <parlib/spinlock.h>
 #include <sys/mman.h>
-#include <assert.h>
+#include <parlib/assert.h>
 #include <stdio.h>
-#include <rassert.h> /* for the static_assert() */
+#include <stdlib.h>
+#include <parlib/vcore.h>
+#include <parlib/ros_debug.h> /* for printd() */
 
 /* Initializes a ucq.  You pass in addresses of mmaped pages for the main page
  * (prod_idx) and the spare page.  I recommend mmaping a big chunk and breaking
  * it up over a bunch of ucqs, instead of doing a lot of little mmap() calls. */
-void ucq_init(struct ucq *ucq, uintptr_t pg1, uintptr_t pg2)
+void ucq_init_raw(struct ucq *ucq, uintptr_t pg1, uintptr_t pg2)
 {
+       printd("[user] initializing ucq %08p for proc %d\n", ucq, getpid());
        assert(!PGOFF(pg1));
        assert(!PGOFF(pg2));
        /* Prod and cons both start on the first page, slot 0.  When they are equal,
@@ -29,22 +32,43 @@ void ucq_init(struct ucq *ucq, uintptr_t pg1, uintptr_t pg2)
        ucq->prod_overflow = FALSE;
        atomic_set(&ucq->nr_extra_pgs, 0);
        atomic_set(&ucq->spare_pg, pg2);
-       static_assert(sizeof(struct mcs_lock) <= sizeof(ucq->u_lock));
-       mcs_lock_init((struct mcs_lock*)(&ucq->u_lock));
+       parlib_static_assert(sizeof(struct spin_pdr_lock) <= sizeof(ucq->u_lock));
+       spin_pdr_init((struct spin_pdr_lock*)(&ucq->u_lock));
        ucq->ucq_ready = TRUE;
 }
 
-/* Consumer side, returns 0 on success and fills *msg with the ev_msg.  If the
- * ucq is empty, it will return -1. */
-int get_ucq_msg(struct ucq *ucq, struct event_msg *msg)
+/* Inits a ucq, where you don't have to bother with the memory allocation.  This
+ * would be appropriate for one or two UCQs, though if you're allocating in
+ * bulk, use the raw version. */
+void ucq_init(struct ucq *ucq)
+{
+       uintptr_t two_pages = (uintptr_t)mmap(0, PGSIZE * 2,
+                                             PROT_WRITE | PROT_READ,
+                                             MAP_POPULATE | MAP_ANONYMOUS, -1, 0);
+       assert(two_pages);
+       ucq_init_raw(ucq, two_pages, two_pages + PGSIZE);
+}
+
+/* Only call this on ucq's made with the simple ucq_init().  And be sure the ucq
+ * is no longer in use. */
+void ucq_free_pgs(struct ucq *ucq)
+{
+       uintptr_t pg1 = atomic_read(&ucq->prod_idx);
+       uintptr_t pg2 = atomic_read(&ucq->spare_pg);
+       assert(pg1 && pg2);
+       munmap((void*)pg1, PGSIZE);
+       munmap((void*)pg2, PGSIZE);
+}
+
+/* Consumer side, returns TRUE on success and fills *msg with the ev_msg.  If
+ * the ucq appears empty, it will return FALSE.  Messages may have arrived after
+ * we started getting that we do not receive. */
+bool get_ucq_msg(struct ucq *ucq, struct event_msg *msg)
 {
        uintptr_t my_idx;
        struct ucq_page *old_page, *other_page;
        struct msg_container *my_msg;
-       /* Locking stuff.  Would be better with a spinlock, if we had them, since
-        * this should be lightly contested.  */
-       struct mcs_lock_qnode local_qn = {0};
-       struct mcs_lock *ucq_lock = (struct mcs_lock*)(&ucq->u_lock);
+       struct spin_pdr_lock *ucq_lock = (struct spin_pdr_lock*)(&ucq->u_lock);
 
        do {
 loop_top:
@@ -53,20 +77,23 @@ loop_top:
                /* The ucq is empty if the consumer and producer are on the same 'next'
                 * slot. */
                if (my_idx == atomic_read(&ucq->prod_idx))
-                       return -1;
+                       return FALSE;
                /* Is the slot we want good?  If not, we're going to need to try and
                 * move on to the next page.  If it is, we bypass all of this and try to
                 * CAS on us getting my_idx. */
                if (slot_is_good(my_idx))
                        goto claim_slot;
                /* Slot is bad, let's try and fix it */
-               mcs_lock_notifsafe(ucq_lock, &local_qn);
+               spin_pdr_lock(ucq_lock);
                /* Reread the idx, in case someone else fixed things up while we
                 * were waiting/fighting for the lock */
                my_idx = atomic_read(&ucq->cons_idx);
                if (slot_is_good(my_idx)) {
                        /* Someone else fixed it already, let's just try to get out */
-                       mcs_unlock_notifsafe(ucq_lock, &local_qn);
+                       spin_pdr_unlock(ucq_lock);
+                       /* Make sure this new slot has a producer (ucq isn't empty) */
+                       if (my_idx == atomic_read(&ucq->prod_idx))
+                               return FALSE;
                        goto claim_slot;
                }
                /* At this point, the slot is bad, and all other possible consumers are
@@ -85,8 +112,12 @@ loop_top:
                 * slots based off the new counter index (cons_idx) */
                /* Now free up the old page.  Need to make sure all other consumers are
                 * done.  We spin til enough are done, like an inverted refcnt. */
-               while (atomic_read(&old_page->header.nr_cons) < NR_MSG_PER_PAGE)
-                       cpu_relax();
+               while (atomic_read(&old_page->header.nr_cons) < NR_MSG_PER_PAGE) {
+                       /* spinning on userspace here, specifically, another vcore and we
+                        * don't know who it is.  This will spin a bit, then make sure they
+                        * aren't preeempted */
+                       cpu_relax_vc(vcore_id());       /* pass in self to check everyone else*/
+               }
                /* Now the page is done.  0 its metadata and give it up. */
                old_page->header.cons_next_pg = 0;
                atomic_set(&old_page->header.nr_cons, 0);
@@ -101,7 +132,7 @@ loop_top:
                }
                /* All fixed up, unlock.  Other consumers may lock and check to make
                 * sure things are done. */
-               mcs_unlock_notifsafe(ucq_lock, &local_qn);
+               spin_pdr_unlock(ucq_lock);
                /* Now that everything is fixed, try again from the top */
                goto loop_top;
 claim_slot:
@@ -109,18 +140,27 @@ claim_slot:
                /* If we're still here, my_idx is good, and we'll try to claim it.  If
                 * we fail, we need to repeat the whole process. */
        } while (!atomic_cas(&ucq->cons_idx, my_idx, my_idx + 1));
+       assert(slot_is_good(my_idx));
        /* Now we have a good slot that we can consume */
        my_msg = slot2msg(my_idx);
+       /* linux would put an rmb_depends() here */
        /* Wait til the msg is ready (kernel sets this flag) */
        while (!my_msg->ready)
                cpu_relax();
+       rmb();  /* order the ready read before the contents */
        /* Copy out */
        *msg = my_msg->ev_msg;
        /* Unset this for the next usage of the container */
        my_msg->ready = FALSE;
-       wmb();
+       wmb();  /* post the ready write before incrementing */
        /* Increment nr_cons, showing we're done */
        atomic_inc(&((struct ucq_page*)PTE_ADDR(my_idx))->header.nr_cons);
-       return 0;
+       return TRUE;
 }
 
+bool ucq_is_empty(struct ucq *ucq)
+{
+       /* The ucq is empty if the consumer and producer are on the same 'next'
+        * slot. */
+       return (atomic_read(&ucq->cons_idx) == atomic_read(&ucq->prod_idx));
+}