UCQs now use mcs_pdr_locks (XCC)
authorBarret Rhoden <brho@cs.berkeley.edu>
Wed, 12 Oct 2011 01:12:22 +0000 (18:12 -0700)
committerBarret Rhoden <brho@cs.berkeley.edu>
Thu, 15 Dec 2011 22:48:39 +0000 (14:48 -0800)
I'm using the memory safe locks for now.  You can get away with unsafe
versions if the storage for them never gets freed (and we'd need some
sanity checks in __ensure_qnode_runs()), though I'm not 100% on whether
or not uthread code will ever call ucq code, and as of right now,
uthread stacks get freed (2LS decision).

The downside to this is that our MCS-PDR locks use more memory, and that
memory was not as likely to be in the cache as the stack.

Reinstall your kernel headers (ucq.h).

kern/include/ros/ucq.h
user/parlib/mcs.c
user/parlib/ucq.c

index d855e8a..d1166e4 100644 (file)
@@ -33,7 +33,7 @@ struct ucq {
        atomic_t                                        cons_idx;               /* cons pg and slot nr */
        bool                                            ucq_ready;              /* ucq is ready to be used */
        /* Userspace lock for modifying the UCQ */
-       uint64_t                                        u_lock;
+       void                                            *u_lock[3];             /* sizeof an mcs_pdr_lock */
 };
 
 /* Struct at the beginning of every page/buffer, tracking consumers and
index 8c303ec..9a68c63 100644 (file)
@@ -184,7 +184,12 @@ void mcs_pdr_fini(struct mcs_pdr_lock *lock)
 
 /* Helper, will make sure the vcore owning qnode is running.  If we change to
  * that vcore, we'll continue when our vcore gets restarted.  If the change
- * fails, it is because the vcore is running, and we'll continue. */
+ * fails, it is because the vcore is running, and we'll continue.
+ *
+ * It's worth noting that changing to another vcore won't hurt correctness.
+ * Even if they are no longer the lockholder, they will be checking preemption
+ * messages and will help break out of the deadlock.  So long as we don't
+ * wastefully spin, we're okay. */
 void __ensure_qnode_runs(struct mcs_pdr_qnode *qnode)
 {
        if (!vcore_is_mapped(qnode->vcoreid)) {
index 1e58500..5e647e7 100644 (file)
@@ -29,8 +29,8 @@ void ucq_init_raw(struct ucq *ucq, uintptr_t pg1, uintptr_t pg2)
        ucq->prod_overflow = FALSE;
        atomic_set(&ucq->nr_extra_pgs, 0);
        atomic_set(&ucq->spare_pg, pg2);
-       static_assert(sizeof(struct mcs_lock) <= sizeof(ucq->u_lock));
-       mcs_lock_init((struct mcs_lock*)(&ucq->u_lock));
+       static_assert(sizeof(struct mcs_pdr_lock) <= sizeof(ucq->u_lock));
+       mcs_pdr_init((struct mcs_pdr_lock*)(&ucq->u_lock));
        ucq->ucq_ready = TRUE;
 }
 
@@ -55,6 +55,7 @@ void ucq_free_pgs(struct ucq *ucq)
        assert(pg1 && pg2);
        munmap((void*)pg1, PGSIZE);
        munmap((void*)pg2, PGSIZE);
+       mcs_pdr_fini((struct mcs_pdr_lock*)&ucq->u_lock);
 }
 
 /* Consumer side, returns 0 on success and fills *msg with the ev_msg.  If the
@@ -66,8 +67,7 @@ int get_ucq_msg(struct ucq *ucq, struct event_msg *msg)
        struct msg_container *my_msg;
        /* Locking stuff.  Would be better with a spinlock, if we had them, since
         * this should be lightly contested.  */
-       struct mcs_lock_qnode local_qn = {0};
-       struct mcs_lock *ucq_lock = (struct mcs_lock*)(&ucq->u_lock);
+       struct mcs_pdr_lock *ucq_lock = (struct mcs_pdr_lock*)(&ucq->u_lock);
 
        do {
 loop_top:
@@ -83,13 +83,13 @@ loop_top:
                if (slot_is_good(my_idx))
                        goto claim_slot;
                /* Slot is bad, let's try and fix it */
-               mcs_lock_notifsafe(ucq_lock, &local_qn);
+               mcs_pdr_lock(ucq_lock);
                /* Reread the idx, in case someone else fixed things up while we
                 * were waiting/fighting for the lock */
                my_idx = atomic_read(&ucq->cons_idx);
                if (slot_is_good(my_idx)) {
                        /* Someone else fixed it already, let's just try to get out */
-                       mcs_unlock_notifsafe(ucq_lock, &local_qn);
+                       mcs_pdr_unlock(ucq_lock);
                        /* Make sure this new slot has a producer (ucq isn't empty) */
                        if (my_idx == atomic_read(&ucq->prod_idx))
                                return -1;
@@ -127,7 +127,7 @@ loop_top:
                }
                /* All fixed up, unlock.  Other consumers may lock and check to make
                 * sure things are done. */
-               mcs_unlock_notifsafe(ucq_lock, &local_qn);
+               mcs_pdr_unlock(ucq_lock);
                /* Now that everything is fixed, try again from the top */
                goto loop_top;
 claim_slot: