Atomic pipes
authorBarret Rhoden <brho@cs.berkeley.edu>
Fri, 6 Sep 2013 02:10:38 +0000 (19:10 -0700)
committerBarret Rhoden <brho@cs.berkeley.edu>
Thu, 12 Sep 2013 00:45:40 +0000 (17:45 -0700)
Similar to pipe, but for in-kernel transfer and synchronization.  You
can't use them from IRQ context.  See apipe.h for info.

kern/include/apipe.h [new file with mode: 0644]
kern/src/Kbuild
kern/src/apipe.c [new file with mode: 0644]
kern/src/testing.c

diff --git a/kern/include/apipe.h b/kern/include/apipe.h
new file mode 100644 (file)
index 0000000..bb63ca5
--- /dev/null
@@ -0,0 +1,80 @@
+/* Copyright (c) 2013 The Regents of the University of California
+ * Barret Rhoden <brho@cs.berkeley.edu>
+ * See LICENSE for details.
+ *
+ * Atomic pipes.  Multi-reader, multi-writer pipes, similar to sys_pipe except
+ * that they operate on fixed sized chunks of data.
+ *
+ * Basic usage:
+ *
+ * Given:
+ *             struct some_struct {...};
+ *             struct atomic_pipe ap;
+ *             void *kpage_addr;
+ *
+ *             apipe_init(&ap, kpage_addr, PGSIZE, sizeof(struct some_struct));
+ *             ret = apipe_write(&ap, &from_local_struct, 1);
+ *             ...
+ *             ret = apipe_read(&ap, &to_local_struct, 1);
+ *             apipe_close_writer(&ap);
+ *             apipe_close_reader(&ap);
+ *     
+ * Read and write return the number of elements copied.  If they copied any
+ * amount, they will return.  They will block if the pipe is empty/full,
+ * and there exist corresponding writers/readers.
+ *
+ * The only time the number of readers or writers matter is when the pipe
+ * is empty or full.  I even allow writers to write, even if there are no
+ * readers, so long as the pipe isn't full yet.  This allows new readers
+ * to reattach, and pick up whatever was put in while there was no
+ * readers.  If you don't plan to shut down the pipe, you can ignore the
+ * readers/writers.
+ *
+ * Basically, this style prevents you from blocking if there is no one who
+ * will ever wake you up.  In these cases (e.g. reader sees an empty pipe
+ * and there are no writers), the read/write op returns 0, which means
+ * "nothing to do, and can't block since you (possibly) won't wake up".
+ * You might be able to try again in the future, but that's up to whatever
+ * subsystem/code is using the pipes.
+ *
+ * I don't make any assumptions about the memory for the apipe.  It could be
+ * kmalloced, embedded in a struct, whatever.  Hence the lack of refcnts too.
+ *
+ * Everything is multi-reader, multi-writer.  Pretty simple inside (no
+ * fancy tricks, just went with a cv_lock for all ops).  If we want to
+ * make this faster in the future, we can take a look at using some tricks
+ * from the BCQs and Xen ring buffers to allow concurrent reads and writes.
+ *
+ * Likewise, we can make this a little more complicated and optimize for copying
+ * many elements at once (like sys_pipe).  But we can hold off til we see how
+ * people use this.  For now, this is built for one copy at a time. */
+
+#ifndef ROS_KERN_APIPE_H
+#define ROS_KERN_APIPE_H
+
+#include <ros/common.h>
+#include <kthread.h>
+
+struct atomic_pipe {
+       char                                            *ap_buf;
+       size_t                                          ap_ring_sz;
+       size_t                                          ap_elem_sz;
+       size_t                                          ap_rd_off;
+       size_t                                          ap_wr_off;
+       unsigned int                            ap_nr_readers;
+       unsigned int                            ap_nr_writers;
+       struct cond_var                         ap_cv;
+};
+
+void apipe_init(struct atomic_pipe *ap, void *buf, size_t buf_sz,
+                size_t elem_sz);
+int apipe_read(struct atomic_pipe *ap, void *buf, size_t nr_elem);
+int apipe_write(struct atomic_pipe *ap, void *buf, size_t nr_elem);
+void *apipe_head(struct atomic_pipe *ap);
+
+void apipe_open_reader(struct atomic_pipe *ap);
+void apipe_open_writer(struct atomic_pipe *ap);
+void apipe_close_reader(struct atomic_pipe *ap);
+void apipe_close_writer(struct atomic_pipe *ap);
+
+#endif /* ROS_KERN_APIPE_H */
index 6adc935..ce020eb 100644 (file)
@@ -1,4 +1,5 @@
 obj-y                                          += alarm.o
+obj-y                                          += apipe.o
 obj-y                                          += arsc.o
 obj-y                                          += atomic.o
 obj-y                                          += blockdev.o
diff --git a/kern/src/apipe.c b/kern/src/apipe.c
new file mode 100644 (file)
index 0000000..d44220c
--- /dev/null
@@ -0,0 +1,131 @@
+/* Copyright (c) 2013 The Regents of the University of California
+ * Barret Rhoden <brho@cs.berkeley.edu>
+ * See LICENSE for details.
+ *
+ * Atomic pipes.  Multi-reader, multi-writer pipes, similar to sys_pipe except
+ * that they operate on fixed sized chunks of data. */
+
+#include <apipe.h>
+#include <ros/ring_buffer.h>
+#include <string.h>
+#include <stdio.h>
+
+void apipe_init(struct atomic_pipe *ap, void *buf, size_t buf_sz,
+                size_t elem_sz)
+{
+       ap->ap_buf = buf;
+       /* power of two number of elements in the ring. */
+       ap->ap_ring_sz = ROUNDDOWNPWR2(buf_sz / elem_sz);
+       ap->ap_elem_sz = elem_sz;
+       ap->ap_rd_off = 0;
+       ap->ap_wr_off = 0;
+       ap->ap_nr_readers = 1;
+       ap->ap_nr_writers = 1;
+       cv_init(&ap->ap_cv);
+}
+
+void apipe_open_reader(struct atomic_pipe *ap)
+{
+       cv_lock(&ap->ap_cv);
+       ap->ap_nr_readers++;
+       cv_unlock(&ap->ap_cv);
+}
+
+void apipe_open_writer(struct atomic_pipe *ap)
+{
+       cv_lock(&ap->ap_cv);
+       ap->ap_nr_writers++;
+       cv_unlock(&ap->ap_cv);
+}
+
+/* When closing, there might be others blocked waiting for us.  For example,
+ * a writer could have blocked on a full pipe, waiting for us to read.  Instead
+ * of reading, the last reader closes.  The writer needs to be woken up so it
+ * can return 0. */
+void apipe_close_reader(struct atomic_pipe *ap)
+{
+       cv_lock(&ap->ap_cv);
+       ap->ap_nr_readers--;
+       __cv_broadcast(&ap->ap_cv);
+       cv_unlock(&ap->ap_cv);
+}
+
+void apipe_close_writer(struct atomic_pipe *ap)
+{
+       cv_lock(&ap->ap_cv);
+       ap->ap_nr_writers--;
+       __cv_broadcast(&ap->ap_cv);
+       cv_unlock(&ap->ap_cv);
+}
+
+int apipe_read(struct atomic_pipe *ap, void *buf, size_t nr_elem)
+{
+       size_t rd_idx;
+       int nr_copied = 0;
+
+       cv_lock(&ap->ap_cv);
+       while (__ring_empty(ap->ap_wr_off, ap->ap_rd_off)) {
+               if (!ap->ap_nr_writers) {
+                       cv_unlock(&ap->ap_cv);
+                       return 0;
+               }
+               cv_wait(&ap->ap_cv);
+               cpu_relax();
+       }
+       for (int i = 0; i < nr_elem; i++) {
+               /* power of 2 elements in the ring buffer, index is the lower n bits */
+               rd_idx = ap->ap_rd_off & (ap->ap_ring_sz - 1);
+               memcpy(buf, ap->ap_buf + rd_idx * ap->ap_elem_sz, ap->ap_elem_sz);
+               ap->ap_rd_off++;
+               buf += ap->ap_elem_sz;
+               nr_copied++;
+               if (__ring_empty(ap->ap_wr_off, ap->ap_rd_off))
+                       break;
+       }
+       /* since we're using just one CV, there could be readers and writers blocked
+        * on it.  need to wake them all, to make sure we signal any blocked
+        * writers. */
+       __cv_broadcast(&ap->ap_cv);
+       cv_unlock(&ap->ap_cv);
+       return nr_copied;
+}
+
+int apipe_write(struct atomic_pipe *ap, void *buf, size_t nr_elem)
+{
+       size_t wr_idx;
+       int nr_copied = 0;
+
+       cv_lock(&ap->ap_cv);
+       /* not sure if we want to check for readers first or not */
+       while (__ring_full(ap->ap_ring_sz, ap->ap_wr_off, ap->ap_rd_off)) {
+               if (!ap->ap_nr_readers) {
+                       cv_unlock(&ap->ap_cv);
+                       return 0;
+               }
+               cv_wait(&ap->ap_cv);
+               cpu_relax();
+       }
+       for (int i = 0; i < nr_elem; i++) {
+               /* power of 2 elements in the ring buffer, index is the lower n bits */
+               wr_idx = ap->ap_wr_off & (ap->ap_ring_sz - 1);
+               memcpy(ap->ap_buf + wr_idx * ap->ap_elem_sz, buf, ap->ap_elem_sz);
+               ap->ap_wr_off++;
+               buf += ap->ap_elem_sz;
+               nr_copied++;
+               if (__ring_full(ap->ap_ring_sz, ap->ap_wr_off, ap->ap_rd_off))
+                       break;
+       }
+       /* since we're using just one CV, there could be readers and writers blocked
+        * on it.  need to wake them all, to make sure we signal any blocked
+        * writers. */
+       __cv_broadcast(&ap->ap_cv);
+       cv_unlock(&ap->ap_cv);
+       return nr_copied;
+}
+
+void *apipe_head(struct atomic_pipe *ap)
+{
+       if (__ring_empty(ap->ap_wr_off, ap->ap_rd_off))
+               return 0;
+       return ap->ap_buf + (ap->ap_rd_off & (ap->ap_ring_sz - 1)) * ap->ap_elem_sz;
+}
index 672d321..06babcc 100644 (file)
@@ -38,6 +38,7 @@
 #include <umem.h>
 #include <ucq.h>
 #include <setjmp.h>
+#include <apipe.h>
 
 #define l1 (available_caches.l1)
 #define l2 (available_caches.l2)
@@ -1622,3 +1623,90 @@ void test_setjmp()
        printk("Exiting: %s\n", __FUNCTION__);
 }
 
+void test_apipe(void)
+{
+       static struct atomic_pipe test_pipe;
+
+       struct some_struct {
+               long x;
+               int y;
+       };
+       /* Don't go too big, or you'll run off the stack */
+       #define MAX_BATCH 100
+
+       void __test_apipe_writer(uint32_t srcid, long a0, long a1, long a2)
+       {
+               int ret, count_todo;
+               int total = 0;
+               struct some_struct local_str[MAX_BATCH];
+               for (int i = 0; i < MAX_BATCH; i++) {
+                       local_str[i].x = 0xf00;
+                       local_str[i].y = 0xba5;
+               }
+               /* testing 0, and max out at 50. [0, ... 50] */
+               for (int i = 0; i < MAX_BATCH + 1; i++) {
+                       count_todo = i;
+                       while (count_todo) {
+                               ret = apipe_write(&test_pipe, &local_str, count_todo);
+                               /* Shouldn't break, based on the loop counters */
+                               if (!ret) {
+                                       printk("Writer breaking with %d left\n", count_todo);
+                                       break;
+                               }
+                               total += ret;
+                               count_todo -= ret;
+                       }
+               }
+               printk("Writer done, added %d elems\n", total);
+               apipe_close_writer(&test_pipe);
+       }
+
+       void __test_apipe_reader(uint32_t srcid, long a0, long a1, long a2)
+       {
+               int ret, count_todo;
+               int total = 0;
+               struct some_struct local_str[MAX_BATCH] = {{0}};
+               /* reversed loop compared to the writer [50, ... 0] */
+               for (int i = MAX_BATCH; i >= 0; i--) {
+                       count_todo = i;
+                       while (count_todo) {
+                               ret = apipe_read(&test_pipe, &local_str, count_todo);
+                               if (!ret) {
+                                       printk("Reader breaking with %d left\n", count_todo);
+                                       break;
+                               }
+                               total += ret;
+                               count_todo -= ret;
+                       }
+               }
+               printk("Reader done, took %d elems\n", total);
+               for (int i = 0; i < MAX_BATCH; i++) {
+                       assert(local_str[i].x == 0xf00);
+                       assert(local_str[i].y == 0xba5);
+               }
+               apipe_close_reader(&test_pipe);
+       }
+
+       void *pipe_buf = kpage_alloc_addr();
+       assert(pipe_buf);
+       apipe_init(&test_pipe, pipe_buf, PGSIZE, sizeof(struct some_struct));
+       printd("*ap_buf %p\n", test_pipe.ap_buf);
+       printd("ap_ring_sz %p\n", test_pipe.ap_ring_sz);
+       printd("ap_elem_sz %p\n", test_pipe.ap_elem_sz);
+       printd("ap_rd_off %p\n", test_pipe.ap_rd_off);
+       printd("ap_wr_off %p\n", test_pipe.ap_wr_off);
+       printd("ap_nr_readers %p\n", test_pipe.ap_nr_readers);
+       printd("ap_nr_writers %p\n", test_pipe.ap_nr_writers);
+       send_kernel_message(0, __test_apipe_writer, 0, 0, 0, KMSG_ROUTINE);
+       __test_apipe_reader(0, 0, 0, 0);
+       /* Wait til the first test is done */
+       while (test_pipe.ap_nr_writers) {
+               kthread_yield();
+               cpu_relax();
+       }
+//     /* Try cross core (though CV wake ups schedule on the waking core) */
+//     apipe_open_reader(&test_pipe);
+//     apipe_open_writer(&test_pipe);
+//     send_kernel_message(1, __test_apipe_writer, 0, 0, 0, KMSG_ROUTINE);
+//     __test_apipe_reader(0, 0, 0, 0);
+}